天天看點

架構設計:系統間通信(28)——Kafka及場景應用(中1)4、Kafka及特性

(接上文《架構設計:系統間通信(27)——其他消息中間件及場景應用(上)》)

在本月初的寫作計劃中,我本來隻打算粗略介紹一下Kafka(同樣是因為進度原因)。但是,最近有很多朋友要求我詳細講講Kafka的設計和使用,另外兩年前我在研究Kafka準備将其應用到生産環境時,由于沒有仔細了解Kafka的設計結構所導緻的問題最後也還沒有進行交代。是以我決定即使耽誤一些時間,也要将Kafka的原理和使用場景給讀者詳細讨論讨論。這樣,也算是對兩年來自己學習和使用Kafka的一個總結。

4、Kafka及特性

Apache Kafka最初由LinkedIn貢獻,目前它是Apache下的一個頂級開源項目。Apache Kafka設計的首要目标是解決LinkedIn網站中海量的使用者操作行為記錄、頁面浏覽記錄,後繼的Apache Kafka版本也都是将“滿足高資料吞吐量”作為版本優化的首要目标。為了達到這個目标,Apache Kafka甚至在其他功能方面上做了一定的犧牲,例如:消息的事務性。如果您的系統需要進行機關時間内大量的資料采集工作,那麼可以考慮在您的系統設計方案中加入Apache Kafka。

4-1、Kafka叢集安裝

4-1-1、安裝環境介紹

Apache Kafka的安裝過程非常簡單。為了節約篇幅我不準備像介紹Apache ActiveMQ那樣,專門花費筆墨來介紹它的單機(單服務節點)安裝過程和最簡單的生産者、消費者的編碼過程。而是換一種思路:

直接介紹Apache Kafka多節點叢集的安裝過程,并且在這個Apache Kafka叢集中為新的Topic劃分多個分區,示範Apache Kafka的消息負載均衡原理。可能在這個過程中,我會使用一些您還不太了解的詞語(或者某些操作您暫時不會了解其中的原因),但是沒有關系,您隻需要按照我給出的步驟一步一步的做——這些詞語和操作會在後文被逐一解釋。

首先我們列出将要安裝的Kafka叢集中需要的服務節點,以及每個服務節點在其中的作用:

節點位置 節點作用
192.168.61.139 Apache Kafka Brocker 1
192.168.61.138 Apache Kafka Brocker 2
192.168.61.140 zookeeper server

在這個Apache Kafka叢集安裝的示範執行個體中,我們準備了兩個Apache Kafka的Brocker服務節點,并且使用其中一個節點充當zookeeper的運作節點。

Apache Kafka叢集需要使用Zookeeper服務進行協調工作,是以安裝Apache Kafka前需要首先安裝和運作Zookeeper服務。由于這邊文章主要介紹的是Apache Kafka的工作原理,是以怎樣安裝和使用Zookeeper的内容就不再進行贅述了,不清楚的讀者可以參考我另一篇文章:《hadoop系列:zookeeper(1)——zookeeper單點和叢集安裝》。這裡我們運作zookeeper隻是使用了zookeeper服務的單節點工作模式,如果您需要在實際生産環境運作Apache Kafka叢集,那麼zookeeper clusters的服務節點數量至少應該是3個(且使用不同的實體機)。

4-1-2、Kafka叢集安裝過程

  • 首先我們在192.168.61.140的伺服器上安裝Zookeeper以後,直接啟動zookeeper即可:
  • 您可以在Apache Kafka的官網下載下傳V0.8.X版本的安裝包(http://kafka.apache.org/downloads.html),請不要下載下傳V0.9.X版本的安裝包,因為V0.9.X版本中消費者端的配置屬性發生了相當的變化。我們本章節的講解将基于V0.8.1.1版本,并且全部針對V0.8.X版本相容的配置屬性(https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz)。

您可以直接使用wget指令,也可以通過浏覽器(或者第三方軟體)下載下傳:

  • 下載下傳後,運作指令進行壓縮檔案的解壓操作:
tar -xvf ./kafka_2.-..tgz
           

筆者習慣将可運作軟體放置在/usr目錄下,您可以按照您自己的操作習慣或者您所在團隊的規範要求放置解壓後的目錄(正式環境下不建議使用root賬号運作Kafka):

mv /root/kafka_2.-. /usr/kafka_2.-./
           
  • Apache Kafka所有的管理指令都存放在安裝路徑下的./bin目錄中。是以,如果您希望後續管理友善就可以設定一下環境變量:
export PATH=/usr/kafka_2.-./bin:$PATH
#記得在/etc/profile檔案的末尾加入相同的設定
           
  • Apache Kafka的配置檔案存放在安裝路徑下的./config目錄下。如下所示:
-rw-rw-r--.  root root  月    consumer.properties
-rw-rw-r--.  root root  月    log4j.properties
-rw-rw-r--.  root root  月    producer.properties
-rw-rw-r--.  root root  月   : server.properties
-rw-rw-r--.  root root  月    test-log4j.properties
-rw-rw-r--.  root root   月    tools-log4j.properties
-rw-rw-r--.  root root  月    zookeeper.properties
           

如果您進行的是Apache Kafka叢集安裝,那麼您隻需要關心“server.properties”這個配置檔案(其他配置檔案的作用,我們後續會讨論到)。

其中目錄下有一個zookeeper.properties不建議使用。之是以有這個配置檔案,是因為Kafka中帶有一個zookeeper運作環境,如果您使用Kafka中的“zookeeper-server-start.sh”指令啟動這個自帶zookeeper環境,才會用到這個配置檔案。

  • 開始編輯server.properties配置檔案。這個配置檔案中預設的配置項就有很多,但是您不必全部進行更改。下面我們列舉了更改後的配置檔案情況,其中您需要主要關心的屬性使用中文進行了說明(當然原有的注釋也會進行保留):
# The id of the broker. This must be set to a unique integer for each broker.
# 非常重要的一個屬性,在Kafka叢集中每一個brocker的id一定要不一樣,否則啟動時會報錯
broker.id=

# The port the socket server listens on
port=

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost

# The number of threads handling network requests
num.network.threads=

# The number of threads doing disk I/O
# 故名思議,就是有多少個線程同時進行磁盤IO操作。
# 這個值實際上并不是設定得越大性能越好。
# 在我後續的“存儲”專題會講到,如果您提供給Kafka使用的檔案系統實體層隻有一個磁頭在工作
# 那麼這個值就變得沒有任何意義了
num.io.threads=

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=

# A comma seperated list of directories under which to store log files
# 很多開發人員在使用Kafka時,不重視這個屬性。
# 實際上Kafka的工作性能絕大部分就取決于您提供什麼樣的檔案系統
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across the brokers.
num.partitions=

# The number of messages to accept before forcing a flush of data to disk
# 從Page Cache中将消息正式寫入磁盤上的閥值:以待轉儲消息數量為依據
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
# 從Page Cache中将消息正式寫入磁盤上的閥值:以轉儲間隔時間為依據
#log.flush.interval.ms=1000

# The minimum age of a log file to be eligible for deletion
# log消息資訊儲存時長,預設為168個小時
log.retention.hours=

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
# 預設為1GB,在此之前log檔案不會執行删除政策
# 實際環境中,由于磁盤空間根本不是問題,并且記憶體空間足夠大。是以筆者會将這個值設定的較大,例如100GB。
#log.retention.bytes=1073741824

# The maximum size of a log segment file. 
# When this size is reached a new log segment will be created.
# 預設為512MB,當達到這個大小,Kafka将為這個Partition建立一個新的分段檔案
log.segment.bytes=

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies
# 檔案删除的保留政策,多久被檢查一次(機關毫秒)
# 實際生産環境中,6-12小時檢查一次就夠了
log.retention.check.interval.ms=

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# root directory for all kafka znodes.
# 到zookeeper的連接配接資訊,如果有多個zookeeper服務節點,則使用“,”進行分割
# 例如:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
zookeeper.connect=:

# Timeout in ms for connecting to zookeeper
# zookeeper連接配接逾時時間
zookeeper.connection.timeout.ms=
           

當然以上系統自帶的Brocker服務節點的配置項還不是最完整的,在官網(http://kafka.apache.org/documentation.html#brokerconfigs)上完整的“server.properties”檔案的配置屬性和說明資訊。

再次強調一下,以上配置屬性中必須按照您自己的環境更改的屬性有:“broker.id”、“log.dirs”以及“zookeeper.connect”。其中每一個Kafka服務節點的“broker.id”屬性都必須不一樣。

  • 這樣我們就完成了其中一個Broker節點的安裝和配置。接下來您需要按照以上描述的步驟進行Kafka叢集中另一個Broker節點的安裝和配置。一定注意每一個Kafka服務節點的“broker.id”屬性都必須不一樣,在本示範執行個體中,我設定的broker.id分别為1和2。
  • 接下來我們啟動Apache Kafka叢集中已經完成安裝和配置的兩個Broker節點。如果以上所有步驟您都正确完成了,那麼您将會看到類似如下的啟動日志輸出:
#分别在兩個節點上執行這條指令,以便完成節點啟動:
kafka-server-start.sh /usr/kafka_2.10-0.8.1.1/config/server.properties

#如果啟動成功,您将看到類似如下的日志提示:
......
[2016-04-30 02:53:17,787] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2016-04-30 02:53:17,799] INFO [Socket Server on Broker 2], Started (kafka.network.SocketServer)
......
           
  • 啟動成功後,我們可以在某一個Kafka Broker 節點上運作以下指令來建立一個topic。為了後續進行講解,我們建立的topic有4個分區和兩個複制因子:

4-1-3、Kafka中的常用指令

在安裝Kafka叢集的時候,我們使用到了Kafka提供的腳本指令進行叢集啟動、topic建立等相關操作。實際上Kafka提供了相當豐富的腳本指令,以便于開發者進行叢集管理、叢集狀态監控、消費者/生産者測試等工作,這裡為大家列舉一些常用的指令:

4-1-3-1 叢集啟動:

kafka-server-start.sh config/server.properties
           

這個指令帶有一個參數——指定啟動服務所需要的配置檔案。預設的配置檔案上文已經提到過,存在于Kafka安裝路徑的./config檔案夾下,檔案名為server.properties。

4-1-3-2 建立Topic:

帶有 –create參數的kafka-topics指令腳本用于在Kafka叢集上建立一個新的topic。後續的四個參數為:

  • zookeeper 該參數用來指定Kafka叢集所使用的zookeeper的位址,這是因為當topic被建立時,zookeeper下的/config/topics目錄中會記錄新的topic的配置資訊。
  • replication-factor 複制因子數量。副本是Kafka V0.8.X版本中加入的保證消息可靠性的功能,複制因為是指某一條消息進行複制的副本數量,該功能以叢集中Broker服務節點的數量為機關。也就是說當Broker服務節點的數量為X時,複制因子的數量最多為X。否則在執行topic建立時會報告類似如下的錯誤:

Kafka的複制過程将在本文的後續章節進行介紹。當然,這個參數可以不進行設定,如果不進行設定該參數的預設值則為1。

  • partitions 分區數量(預設分區為1)。一個topic可以有若幹分區,這些分區分布在Kafka叢集的一個或者多個Broker上。後文我們将讨論到,partition分區是Kafka叢集實作消息負載均衡功能的重要基礎,且topic中partition分區一旦建立就不允許進行動态更改。是以一旦您準備在正式生産環境建立topic,就一定要慎重考慮它的分區數量。
  • topic 新建立的topic的名稱。該參數在建立topic時指定,且在Kafka叢集中topic的名稱必須是唯一的。

4-1-3-3 以生産者身份登入測試

kafka-console-producer.sh --broker-list localhost: --topic test

# 或者
kafka-console-producer.sh --producer.config client-ssl.properties
           

使用指令腳本(而不是Kafka提供的各種語言的API),模拟一個消息生産者登入叢集,主要是為了測試指定的topic的工作情況是否正常。可以有兩種方式作為消息生産者登入Kafka叢集:

第一種方式指定broker-list參數和topic參數,broker-list攜帶需要連接配接的一個或者多個broker服務節點;topic為指定的該消息生産者所使用的topic的名稱。

第二種方式是指定producer生産者配置檔案和用戶端ssl加密資訊配置檔案(後一個檔案也可不進行指定,如果您沒有在Kafka叢集中配置ssl加密規則的話)。預設的producer生産者配置檔案存放在kafka安裝路徑的./config目錄下,檔案名為producer.properties。

4-1-3-4 以消費者身份登入測試

同樣您可以使用指令腳本的方式,以消息消費者的身份登入Kafka叢集,目的相同:為了測試Kafka叢集下您建立的topic是否能夠正常工作。該指令有兩個參數:

  • zookeeper 指定的Kafka叢集所使用的zookeeper位址,如果有多個zookeeper節點就是用“,”進行分割。該參數必須進行指定。
  • topic 該參數用于指定使用的topic名稱資訊。如果您的topic在kafka叢集下工作正常的話,那麼在成功使用消費者身份登入後,就可以收到topic中有生産者發送的消息資訊了。

4-1-3-5 檢視Topic狀态

以上指令可以用來查詢指定的topic(my_topic2)的關鍵屬性,包括topic的名稱、分區情況、每個分期的主要節點、複制因子、複制序列已經指派序列的同步狀态等資訊。指令可能的結果如下所示:

Topic:my_topic2 PartitionCount:        ReplicationFactor:     Configs:
        Topic: my_topic2        Partition:     Leader:        Replicas: ,   Isr: ,
        Topic: my_topic2        Partition:     Leader:        Replicas: ,   Isr: ,
        Topic: my_topic2        Partition:     Leader:        Replicas: ,   Isr: ,
        Topic: my_topic2        Partition:     Leader:        Replicas: ,   Isr: ,
           

請注意這個查詢指令,因為這個查詢指令所反映的結果透露出了Apache Kafka V0.8.X版本的主要設計原理,我們本節下半部分的内容将從這裡展開。

4-2、Kafka原理:設計結構

一個完整的Apache Kafka解決方案的組成包括四個要素:Producer(消息生産者)、Server Broker(服務代理器)、Zookeeper(協調者)、Consumer(消息消費者)。 Apache Kafka在設計之初就被認為是叢集化工作的,是以要說清楚Apache Kafa的設計結構除了要講述每一個Kafka Broker是如何工作的以外,還要講述清楚整個Apache Kafka叢集是如何工作的。

4-2-1、Kafka Broker工作結構

架構設計:系統間通信(28)——Kafka及場景應用(中1)4、Kafka及特性

在Apache Kafka的Server Broker設計中,一個獨立進行消息擷取、消息記錄和消息分送操作的隊列稱之為Topic(和ActiveMQ中Queue或者Topic的概念同屬一個級别)。以下我們讨論的内容都是針對一個Topic而言,後續内容就不再進行說明了。

  • 上圖描述了一個獨立的Topic構造結構:Apache Kafka将Topic拆分成多個分區(Partition),這些分區(Partition)可能存在于同一個Broker上也可能存在于不同的Broker上。如果您觀察Kafka的檔案存儲結構就會發現Kafka會為Topic中每一個分區建立一個獨立的檔案加,類似如下所示(以下的Topic——my_topic2一共建立了4個分區):
[[email protected] my_topic2-0]# ls
drwxr-xr-x. 2 root root 4096 4月  29 18:32 my_topic2-0
drwxr-xr-x. 2 root root 4096 4月  29 18:32 my_topic2-1
drwxr-xr-x. 2 root root 4096 4月  29 18:32 my_topic2-2
drwxr-xr-x. 2 root root 4096 4月  29 18:32 my_topic2-3
           
  • 由Producer發送的消息會被配置設定到各個分區(Partition)中進行存儲,至于它們是按照什麼樣的規則被配置設定的在後文會進行講述。一條消息記錄隻會被配置設定到一個分區進行存儲,并且這些消息以分區為機關保持順序排列。這些分區是Apache Kafka性能的第一種保證方式:機關數量相同的消息将分發到存在于多個Broker服務節點上的多個Partition中,并利用每個Broker服務節點的計算資源進行獨立處理。
  • 每一個分區都中會有一個或者多個段(segment)結構。如上圖所示,一個段(segment)結構包含兩種類型的檔案:.index字尾的索引檔案和.log字尾的資料檔案。前一個index檔案記錄了消息在整個topic中的序号以及消息在log檔案中的偏移位置(offset),通過這兩個資訊,Kafka可以在後一個log檔案中找到這條消息的真實内容。
  • 我們在之前的文章中已經介紹過(在我後續的專題中還會繼續讨論這個問題),在磁盤上進行的檔案操作隻有采用順序讀和順序寫才能做到高效的磁盤I/O性能。這是Kafka保證性能的又一種方式——對索引index檔案始終保證順序讀寫:當在磁盤上記錄一條消息時,始終在檔案的末尾進行操作;當在磁盤上讀取一條消息時,通過index順序查找到消息的offset位置,再進行消息讀取。後一種消息讀取操作下,如果index檔案過大,Kafka的磁盤操作就會耗費掉相當的時間。是以Kafak需要對index檔案和log檔案進行分段。
  • 實際上Kafka之是以“快”,并不隻是因為它的I/O操作是順序讀寫和多個分區的概念;畢竟類似于AcitveMQ也有多節點叢集的概念,并且後者通過使用LevelDB或者KahaDB這樣的存儲方案也可以實作磁盤的順序I/O操作。要知道如果消息消費者真正需要到磁盤上尋找資料了,那麼整個Kafka叢集的性能也不會好到哪兒去:目前SATA3序列槽通訊的理論速度也隻有6Gpbs,使用SATA3序列槽通訊的固态硬碟,真實的順序讀取最快速度也不過550M/S。
  • Kafka對Linux作業系統下Page Cache技術的應用,才是其高性能的最大保證。檔案内容的組織結構隻是其保證消息可靠性的一種方式,真實的業務環境下Kafka一般不需要在磁盤上為消費者尋找消息記錄(隻要您的記憶體空間夠大)。關于Linux作業系統下的Page Cache技術又是另外一個技術話題,我會在随後推出的“存儲”專題中為各位讀者進行詳細介紹(LevelDB也應用到了Linux Page Cache技術)。

4-2-2、Kafka Cluster結構

說清楚了單個Kafka Broker結構,我們再來看看整個Kafka叢集是怎樣工作的。以下視圖描述了某個Topic下的一條消息是如何在Kafka 叢集結構中流動的(實線有向箭頭):

架構設計:系統間通信(28)——Kafka及場景應用(中1)4、Kafka及特性
  • 整個Kafka叢集中,可以有多個消息生産者。這些消息生産者可能在同一個實體節點上,也可能在不同的實體節點。它們都必須知道哪些Kafka Broker List是将要發送的目标:消息生産者會決定發送的消息将會送入Topic的哪一個分區(Partition)。
  • 消費者都是按照“組”的機關進行消息隔離:在同一個Topic下,Apache Kafka會為不同的消費者組建立獨立的index索引定位。也就是說當消息生産者發送一條消息後,同一個Topic下不同組的消費者都會收到這條資訊。
  • 同一組下的消息消費者可以消費Topic下一個分區或者多個分區中的消息,但是一個分區中的消息隻能被同一組下的某一個消息消費者所處理。也就是說,如果某個Topic下隻有一個分區,就不能實作消息的負載均衡。另外Topic下的分區數量也隻能是固定的,不可以在使用Topic時動态改變,這些分區在Topic被建立時使用指令行指定或者參考Broker Server中配置的預設值。
  • 由于存在以上的操作規則,是以Kafka叢集中Consumer(消費者)需要和Kafka叢集中的Server Broker進行協調工作:這個協調工作者交給了Zookeeper叢集。zookeeper叢集需要記錄/協調的工作包括:目前整個Kafka叢集中有哪些Broker節點以及每一個節點處于什麼狀态(活動/離線/狀态)、目前叢集中所有已建立的Topic以及分區情況、目前叢集中所有活動的消費者組/消費者、每一個消費者組針對每個topic的索引位置等。
  • 當一個消費者上線,并且在消費消息之前。首先會通過zookeeper協調叢集擷取目前消費組中其他消費者的連接配接狀态,并得到目前Topic下可用于消費的分區和該消費者組中其他消費者的對應關系。如果目前消費者發現Topic下所有的分區都已經有一一對應的消費者了,就将自己置于挂起狀态(和broker、zookeeper的連接配接還是會建立,但是不會到分區Pull消息),以便在其他消費者失效後進行接替。
  • 如果目前消費者連接配接時,發現整個Kafka叢集中存在一個消費者(記為消費者A)關聯Topic下多個分區的情況,且消費者A處于繁忙無法處理這些分區下新的消息(即消費者A的上一批Pull的消息還沒有處理完成)。這時新的消費者将接替原消費者A所關聯的一個(或者多個)分區,并且一直保持和這個分區的關聯。
  • 由于Kafka叢集中隻保證同一個分區(Partition)下消息隊列中消息的順序。是以當一個或者多個消費者分别Pull一個Topic下的多個消息分區時,您在消費者端觀察的現象可能就是消息順序是混亂的。這裡我們一直在說消費者端的Pull行為,是指的Topic下分區中的消息并不是由Broker主動推送到(Push)到消費者端,而是由消費者端主動拉取(Pull)。

===========================

(接下文)

繼續閱讀