----本節内容-------
1.Kafka基礎概念
1.1 出世背景
1.2 基本原理
1.2.1.前置知識
1.2.2.架構和原理
1.2.3.基本概念
1.2.4.kafka特點
2.Kafka初體驗
2.1 環境準備
2.2 Kafka小試牛刀
2.2.1單個broker初體驗
2.2.2 多個broker初體驗
2.3 Kafka分布式叢集建構
2.3.1 Kafka分布式叢集建構
2.3.2 Kafka主題建立
2.3.3 生産者生産資料
2.3.4消費者消費資料
2.3.5消息的壓縮
2.4 Kafka在ZK目錄節點
2.4.1 kafka鏡像原理
2.4.2 Kafka副本模型
2.4.3 在ZK目錄節點内容
2.5 實體間互動流程
2.5.1主題與zk
2.5.2 消費者與zk
2.5.3 broker與生産者
2.5.4 消費者與消費者組
3.參考資料
---------------------

1.1 出世背景
Kafka是一個消息系統,是LinkedIn公司開發并開源出來的元件。Kafka原本用作LinkedIn的活動流(Activity Stream)和營運資料處理管道(Pipeline)的基礎。現在它已被多家公司作為多種類型的資料管道和消息系統使用。歪果仁就喜歡整一些看不懂的詞彙,比如活動流,比如營運資料,就不能好好說話麼。
活動流是啥?簡單了解,就是使用者使用網站或者系統時産生的資料流,比如點選一個頁面,檢視一個圖檔,翻看一網頁,搜尋一個關鍵字,網站營運者需要對使用者的這些行為進行統計,形成報表。
營運資料是啥?就是計算機産生的監控日志資訊,如CPU資料,IO資料等, 這些資料都是動态生成的,Linkein這樣的大公司,營運資料量非常大,通常的方式是生成這些資料,寫入到log檔案中,然後進行統計。活動流資料和營運資料對網站和軟體産品非常重要,舉幾個栗子
1)動态彙總,将使用者的資訊動态彙總,或者自己監控,或者發給使用者的朋友圈
2) 安全,實時監控使用者通路資訊,防止網絡爬蟲或者使用者擴散垃圾資訊,對API使用速率進行實時監控和控制,切斷網站某些不正常活動。
3)機器硬體實時監控:對機器運作效率實時監控,對異常情況自動觸發告警。
4)報表和批處理:将資料導入Hadoop平台,進行離線報表分析。
LinkedIn處理的時候就碰到幾個問題:
1) 日志量大,每天要處理10億多條資料。
2)高吞吐量。
3)實時性能差。
現有的消息隊列系統(messaging and queuing system)卻很适合于在實時或近實時(near-real-time)的情況下使用,但它們對很長的未被處理的消息隊列的處理很不給力,往往并不将數 據持久化作為首要的事情考慮。這樣就會造成一種情況,就是當把大量資料傳送給Hadoop這樣的離線系統後, 這些離線系統每個小時或每天僅能處理掉部分源資料。Kafka的目的就是要成為一個隊列平台,僅僅使用它就能夠既支援離線又支援線上使用這兩種情況。
1.2 基本架構和原理
1.2.1.前置知識
消息隊列
為什麼要引入消息隊列?舉個例子,假如A發送消息給B,如果B線上,那麼可以很順利的通訊發消息,那如果B不線上,那就比較麻煩了,消息隊列技術可以很好的解決這個問題。
消息隊列技術是分布式應用間交換資訊的一種技術,是兩個系統通訊的橋梁和媒介,将兩個系統解耦,不需要知道對方的位置和資訊。 通過消息隊列技術2個異構的系統可以進行通訊,尤其是大型系統。消息隊列可以儲存在磁盤或者記憶體中。
消息隊列技術底層都是socket通訊,socket在很多地方有用到,比如資料庫,程序間通訊,jdbc等等底層都是socket通訊。
JMS是消息服務的規範,很多消息中間件技術都遵循JMS規範。
消息隊列通訊模式
1)點對點通訊:點對點方式是最為傳統和常見的通訊方式,它支援一對一、一對多、多對多、多對一等多種配置方式,支援樹狀、網狀等多種拓撲結構。用人話描述一遍:就是一個人放東西,一個人去東西,這就是點對點。
2)釋出/訂閱 (Publish/Subscribe) 模式:釋出/訂閱功能使消息的分發可以突破目的隊列地理指向的限制,使消息按照特定的主題甚至内容進行分發,使用者或應用程式可以根據主題或内容接收到所需要的消息。釋出/訂閱功能使得發送者和接收者之間的耦合關系變得更為松散,發送者不必關心接收者的目的位址,而接收者也不必關心消息的發送位址,而隻是根據消息的主題進行消息的收發。釋出/訂閱 模式:就跟貼尋人啟事,公告一樣的道理,消息往公告上一貼,關心的人就去看看發生什麼事,不關心的就當作一堆廢紙,不用搭理。
Kafka很厲害,kafka将這兩個概念整合到一起,他隻有主題模式,但是能實作隊列模式的效果,實作方式就是消費者組的引入:
消費者組:将一個或者多個消費者劃到一起,取一個名标記,這就是消費者組,對于一個消費者組,
,處在同一個消費者組的消費者,隻能有一個消費者消費,
釋出訂閱模式:每個消費者組,隻有一個消費者,那就是釋出訂閱,每個消費者都有自己的組。
每個組都能消費,那就是釋出訂閱模式。
隊列模式: 所有的消費者都在一個組裡面。
消息隊列特點
·資料緩沖作用
· 降低耦合
·異構系統高效互動
Kafka是一個消息隊列元件,它遵循JMS規範,基本工作流程,生産者生産資料-> kafka叢集中轉資料->消費者消費資料
1.2.2.架構和原理
生産者生産消息,将消息發送給Kafka叢集,Kafka内在是分布式的,一個Kafka叢集通常包括多個代理。為了均衡負載,将話題分成多個分區,每個代理存儲一或多個分區。消費者從kafka主題中擷取消息。多個生産者和消費者能夠同時生産和擷取消息。
1.Producer根據指定partition方法(round-robin、hash等),将消息釋出到指定topic的partition裡面
2.kafka叢集接收到Producer發過來的消息後,将其持久化到硬碟,并保留消息指定時長(可配置),而不關注消息是否被消費。
3.Consumer從kafka叢集pull資料,并控制擷取消息的offset
1.2.3.基本概念
Broker:Kafka 叢集包含一個或多個伺服器,這種伺服器被稱為 broker。
Topic:每條釋出到 Kafka 叢集的消息都有一個類别,這個類别被稱為 Topic。(實體上不同 Topic 的消息分開存儲,邏輯上一個 Topic 的消息雖然儲存于一個或多個 broker 上,但使用者隻需指定消息的 Topic 即可生産或消費資料而不必關心資料存于何處)。
Partition:Partition 是實體上的概念,每個 Topic 包含一個或多個 Partition。
Producer:負責釋出消息到 Kafka broker。
Consumer:消息消費者,向 Kafka broker 讀取消息的用戶端。
Consumer Group:每個 Consumer 屬于一個特定的 Consumer Group(可為每個 Consumer 指定 group name,若不指定 group name 則屬于預設的 group)。
1.2.4.Kafka特點
1)分布式流平台,支援消息的分區(mr的分區類似),支援多個伺服器之間消息分區
2)支援釋出和訂閱資料流,類似于消息系統
3)支援分布式和副本叢集方式,來存儲資料流
4)實時處理資料流
5) 支援多種源資料,資料庫互動、app雙向互動
6)水準可伸縮
7) 容錯好
8)速度快
9)多種方式存儲,持久化存儲記憶體,磁盤秒級
10)海量資料,TB級高吞吐量:支援每秒百萬消息,·廉價硬體
11)多用戶端支援,很容內建不同平台,java,python,和多源進行協同,它是一個
12)中間件的基因,跨平台和跨語言,開源
2.1環境準備
1)kafka下載下傳
kafka2.1.2官網下載下傳:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz
2) zookeeper下載下傳
zookeeper3.3.6官網位址:http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz
3) jdk下載下傳
kafka和zookeeper前提是安裝好了jdk,注意你電腦是32位還是64為
jdk官網下載下傳位址:http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz
4)關閉防火牆
否則zk啟動會報錯no route tohost
· 檢視防火牆狀态,使用root賬号執行
service iptables status
·關閉防火牆
service iptables stop
·檢視防火牆開機啟動狀态
chkconfig iptables --list
·關閉防火牆開機啟動
chkconfig iptables off
5)配置好host
2.2 Kafka小試牛刀
2.2.1 單個broker初體驗
1.安裝單節點的kafka
下載下傳下來了之後直接解壓就可以運作單節點的Kafka,因為Kafka需要用zookeeper做高可用,如果沒有安裝zk,也沒有關系,使用它自帶的配置啟動就可以。
1)啟動自帶配置的zk
啟動zk指令: bin/zookeeper-server-start.sh config/zookeeper.properties
zookeeper啟動成功
2)啟動kafka
kafka啟動:bin/kafka-server.start.sh config/server.properites
kafka啟動成功
3)建立一個topic
建立topic:bin/kafka-topics.sh --create --zookeeper kafka01:2181 --replication-factor 1 --partitions 1 --topic test
檢視topic:bin/kafka-topicts.sh --list --zookeeper kafka01:2181
4)啟動生産者
指令:bin/kafka-console-producer.sh --broker-list kafka:9092 --topic test
啟動之後,輸入2行
5)啟動消費者
指令:bin/kafka-console-consumer.sh --zookeeper kafka01:9202 --topic test --from-beginning
消費者接收到2行消息
總結
1.啟動zk
指令: bin/zookeeper-server-start.sh config/zookeeper.properties
2.啟動kafka
指令:bin/kafka-server.start.sh config/server.properites
參數:指定kafka的配置檔案
3.建立主題
指令:bin/kafka-topics.sh --create --zookeeper kafka01:2181 --replication-factor 1 --partitions 1 --topic test
參數:1)create:指定建立動作,2)zookeeper:指定zookeeper用戶端;3)replication-factor:指定主題副本個數;4)partitions:指定分區個數;5)topic:指定主題名稱
4.啟動生産者
指令:bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic test
參數:1) broker-list:指定broker;2)topic:指定主題名,從參數可以看出來,生産者是不直接和zk互動的
5.啟動消費者
指令:bin/kafka-console-consumer.sh --zookeeper kafka01:2181 --topic test --from-beginning
參數:1)zookeeper:指定zookeeper用戶端;2)topic:指定主題名,3)from-beginning:從topic頭開始讀取消息
2.2.2 多個broker初體驗
前面已經配置了一個單節點kafka服務,再次擴充示範kafka叢集多節點可用性、容錯性,也為kafka分布式叢集做好鋪墊。
1)配置kafka多節點服務
配置多節點服務,拷貝出2配置設定置server1.properties,server2.properties,修改2處參數
------------------------------server1.properties
broker.id=1
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka1-logs
------------------------------server2.properties
broker.id=2
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka2-logs
-----------------------------
2)啟動zookeeper服務
bin/zookeeper-server-start.sh config/zookeeper.properties
3)啟動2個kafka服務
bin/kafka-server-start.sh config/server1.properties
bin/kafka-server-start.sh config/server2.properties
4)建立topic主題
bin/kafka-topics.sh --create --zookeeper kafka01:2181 -replication-factor 1 --partitions 1 --test2
5)啟動生産者
bin/kafka-console-producer.sh --broker-list kafka02:9093 --topic test2
輸入内容
6)啟動消費者
bin/kafka-console-consumer.sh --zookeeper kafka01:2181 --topitc test2 --from-beginning
2) 測試多節點可用性
在生産者輸入内容,消費者端可以擷取到消息
生産者端:
消費者端:
3)測試多節點容錯性
殺掉一個kafka服務,然後發送消息測試,消費者是否能政策收到消息
2.3 Kafka分布式叢集建構
2.3.1 Kafka分布式叢集建構
1.配置zookeeper叢集
1).解壓後,配置zoo.cfg,如果沒有從模闆配置檔案中拷貝出來
官網建議使用zookeeper3.4.x,3.4.9
http://kafka.apache.org/documentation.html#zk
這裡有各種版本的下載下傳位址
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
我們需要的版本3.4.9
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
配置之前建立目錄
指令:mkdir -p /usr/local/hadoop/zookeeper/data /usr/local/hadoop/zookeeper/log
修改zoo.cfg配置
指令:vi /usr/local/hadoop/zookeeper/zookeeper-3.4.5-cdh5.4.5/conf/zoo.cfg
内容:
----------------------------
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/hadoop/zookeeper/data
dataLogDir=/usr/local/hadoop/zookeeper/log
clientPort=2181
server.1=kafka01:2287:3387
server.2=kafka02:2288:3387
server.3=kafka03:2289:3387
b.配置myid(在每個節點上都這樣配置)
還有一個關鍵的設定,在每個zk server配置檔案的dataDir所對應的目錄下,必須建立一個名為myid的檔案,其中的内容必須與zoo.cfg中server.x 中的x相同,即:
-----------------------
/usr/local/hadoop/zookeeper/data/myid 中的内容為1,對應server.1中的1
/usr/local/hadoop/zookeeper/data/myid 中的内容為2,對應server.2中的2
/usr/local/hadoop/zookeeper/data/myid 中的内容為3,對應server.3中的3
-----------------------
c.關閉防火牆,并且各節點要配置好jdk
-------------
d.啟動zookeeper
/usr/local/hadoop/zookeeper/zookeeper-3.4.5-cdh5.4.5/bin/zkServer.sh start
e.驗證服務
指令1:bin/zkServer.sh status
指令2:bin/zkServer.sh start
f.配置環境變量到/etc/profile
ZOOKEEPER_HOME=/usr/local/hadoop/zookeeper/zookeeper-3.3.6
PATH=$ZOOKEEPER_HOME/bin:$PATH
export PATH
KAFKA_HOME=/usr/local/hadoop/kafka/kafka_2.12-0.10.2.1
2.配置kafka叢集
1)準備目錄
指令:mkdir -p /usr/local/hadoop/kafka/log
2)修改server.properties 3個參數,
每個節點的broker.id不一樣,本次實驗kafka01,kafka02,kafka03對應1,2,3
------------
broker.id=03
log.dirs=/usr/local/hadoop/kafka/log
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181
3)啟動kafka叢集
每一個節點執行,
指令:bin/kafka-server-start.sh config/server.properties
4)驗證kafka叢集是否正常啟動
a.jps檢視程序是否啟動,正常啟動,會有Kafka程序服務
b.檢視/usr/local/hadoop/kafka/log下面是否有資料
c.檢視zk中是否有kafka目錄
2.3.2 Kafka主題建立
1)相關指令
建立指令
bin/kafka-topics.sh --create --zookeeper kafka01:2181 -replication-factor 3 --partitions 2 --topic test3
檢視指令
bin/kafka-topics.sh --list --zookeeper kafka01:2181
檢視分區個數、副本個數
bin/kafka-topics.sh --describe --zookeeper kafka01:2181 --topic test
2)相關說明
從這裡可以看到很多關于主題的資訊,總要包含:
· 主題的leader:讀寫都從這裡進行随機選擇
· 主題的副本數:副本數,節點清單
· isr:同步複制
·主題的分區數:2個分區
zk保留了副本之間的leader和随從資訊,每個副本周期性同步到磁盤
1.每個分區有N個副本,可以承受N-1個節點故障,ZK承受N-1/2個故障,如果3個節點,挂了2個,那就不行了。每個副本都有自己的leader,其餘的都是follower,zk存放分區的leader和all replica的資訊
2.每個副本存儲消息的部分資料在本地的log和offset中,周期性同步到磁盤中去確定消息寫入全部
副本或者其中一個
3.leader故障時,消息或者寫入本地log,或則在producer在收到ack消息前,從新發送消息給新的leader
這些資訊都是保留在zookeeper中的。進一步去zookeeper觀察
看到有2個topic
進入到0的目錄下檢視
再看看kafka的log目錄,實際的資料是儲存在kafka的log目錄下,雖然還沒有寫資料,但是相關目錄已經準備好了相關存放檔案和目錄了。
2.3.3 生産者生産資料
1)相關指令
bin/kafka-console-producer.sh --broker-list kafka02:9092 --topic test2
這裡要注意端口要和配置檔案的保持一緻,筆者因為前面示範了單機版的多broker(9093端口),端口沒有改正,導緻消費者沒法消費資料,白白浪費了很多時間排查問題。
從指令可以看出,生産者是和broker直接互動,broker使用zk協同工具來管理多個broker
broker:broker不知道誰消費了消息,并不維護哪個消費者消費了消息
消費者組:每個組中隻有一個消費者可以消費消息(所有的消費者都在一個組》隊列模式,都有自己的組》訂閱模式),通過消費者組同意了
消費者:維護了消費消息的狀态,broker不知道誰消費了消息,并不維護哪個消費者消費了消息,消費者自己知道的。
2.2.4 消費者消費資料
bin/kafka-console-consumer.sh --zookeeper kafka02:2181 --topic test2 --from-beginning
1.消息緩存與filesystem的存儲,資料是立即被即刻寫入OS的核心頁并且緩存以及清理磁盤(可以配置)
2.消息被消費後,kafka能長時間駐留消息在伺服器,允許重複消費
3.對分組消息使用了消息set,防止網絡過載
4.在伺服器存放消費的資訊,kafka是在消費者端存放,消費者保持消息的狀态
5.消費者狀态預設是在zk中,也允許存到到其他OLTP,比如資料庫
6.Kafka中生産和消費是點心的pull-push
生産者pull(write,輸入流),消費者push(read,輸出流,拉)
7.沒有主從模式,所有的broker的地位相同,broker資料均在zk中維護
并在producer之間共享
8.負載均衡政策,loadbalance,允許producer動态發現broker
9.producer生産者維護了一個broker連接配接池,并能通過zk的callback進行實時更新
10.producer可以選擇同步或者異步的方式發送消息給broker
打電話:同步,阻塞的都是同步的,NIO的特點就是非阻塞,IO就是阻塞的
發短息:異步,你收不收,知不知道,我不管,我先去幹其他的事情
2.2.5 消息的壓縮
Kafka設計的初衷是迅速處理短小的消息,一般10K大小的消息吞吐性能最好(可參見LinkedIn的kafka性能測試)。但有時候,我們需要處理更大的消息,比如XML文檔或JSON内容,一個消息差不多有10-100M,這種情況下,Kakfa應該如何處理?
kafka producer設定compression.type=snappy
2.4.1 kafka鏡像原理
将元叢集的資料副本化給target kafka叢集,目标kafka叢集就當作一個消費者消費資料實作資料的備份。
2.4.2 Kafka副本模型
同步模型(同步複制):生産者從zk找leader,并發送message,消息立即寫入本地log,follow開始拉取消息,每個follow将消息寫入各自本地的log,向leader發送确認回執。leader在收到所有的follow的确認回執和本地副本寫入工作均完成後,再向producer發送确認回執。生産者用戶端是阻塞的,消費者的資料pull從leader中完成。
異步模型:leader的本地log寫入完成馬上向生産者發送回執,leader不等待follow的回執,follow行不行,成不成功,不管。
2.4.3 在ZK目錄節點内容
/brokers/topics/topic:存儲某個topic的partitions所有配置設定資訊
/brokers/topics/[topic]/partitions/[0...N]:partitions狀态資訊
/brokers/ids/[0...N]:每個broker的配置檔案中都需要指定一個數字類型的id(全局不可重複),此節點為臨時znode(EPHEMERAL).
/controller_epoch -> int (epoch) :此值為一個數字,kafka叢集中第一個broker第一次啟動時為1,以後隻要叢集中center controller中央控制器所在broker變更或挂掉,就會重新選舉新的center controller,每次center controller變更controller_epoch值就會 + 1;
/controller -> int (broker id of the controller) :存儲center controller中央控制器所在kafka broker的資訊
2.5 實體間互動流程
zookeeper在kafka扮演重要角色,Kafka使用zookeeper作為其分布式協調架構,很好的将消息生産、消息存儲、消息消費的過程結合在一起。同時借助zookeeper,kafka能夠生産者、消費者和broker在内的是以元件在無狀态的情況下,建立起生産者和消費者的訂閱關系,并實作生産者與消費者的負載均衡。
2.5.1主題與zk
在kafka中,使用者可以自定義多個topic,每個topic又可以劃分為多個分區,一半情況下,每個分區存儲在一個獨立的broker上。所有這些topic與broker的對應關系都有zookeeper進行維護。
在zookeeper中,建立專門的節點來記錄這些資訊,其節點路徑為/brokers/topics/{topic_name},如:
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics
[toptic_t, test, my-replicated-topic, mykafka, mykafka6, mykafka5, mykafka4, test6, mykafka3, test7, mykafka2]
[zk: localhost:2181(CONNECTED) 17] get /brokers/topics/mykafka4
{"version":1,"partitions":{"1":[102,103,104],"2":[103,104,102],"0":[104,102,103]}}
針對topic 的每一個分區與broker的對應關系,zookeeper通過節點 /brokers/topics/topic.name來記錄,如:
當broker啟動時,會到對應topic節點下注冊自己的broker.id到對應分區的isr清單中,如:
[zk: localhost:2181(CONNECTED) 23] get /brokers/topics/mykafka4/partitions/1/state
{"controller_epoch":15,"leader":102,"version":1,"leader_epoch":2,"isr":[102,103,104]}
同樣的,當broker退出數,也會觸發zookeeper更新其對應topic分區的isr清單,并決定是否需要做消費者的負載均衡。
2.5.2 消費者與zk
l 注冊新的消費者分組
當新的消費者組注冊到zookeeper中時,zookeeper會建立專用的節點來儲存相關資訊,其節點路徑為ls /consumers/{group_id},其節點下有三個子節點,分别為[ids, owners, offsets]。
Ø ids節點:記錄該消費組中目前正在消費的消費者;
Ø owners節點:記錄該消費組消費的topic資訊;
Ø offsets節點:記錄每個topic的每個分區的offset,如:
[zk: localhost:2181(CONNECTED) 54] get /consumers/test-consumer2-group/offsets/mykafka4/0
142
l 注冊新的消費者
當新的消費者注冊到kafka中時,會在/consumers/{group_id}/ids節點下建立臨時子節點,并記錄相關資訊,如:
[zk: localhost:2181(CONNECTED) 57] ls /consumers/test-consumer2-group/ids/test-consumer2-group_dev103-1433562901087-7b517b97
[]
[zk: localhost:2181(CONNECTED) 58] get /consumers/test-consumer2-group/ids/test-consumer2-group_dev103-1433562901087-7b517b97
{"version":1,"subscription":{"mykafka5":1},"pattern":"white_list","timestamp":"1433562901290"}
l 監聽消費者分組中消費者的變化
每個消費者都要關注其所屬消費者組中消費者數目的變化,即監聽/consumers/{group_id}/ids下子節點的變化。一單發現消費者新增或減少,就會觸發消費者的負載均衡。
2.5.3 broker與與zk
為了記錄broker的注冊資訊,在zookeeper上,專門建立了屬于kafka的一個節點,其路徑為/brokers,如:
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, topics]
Kafka的每個broker啟動時,都會到zookeeper中進行注冊,告訴zookeeper其broker.id, 在整個叢集中,broker.id應該全局唯一,并在zookeeper上建立其屬于自己的節點,其節點路徑為/brokers/ids/{broker.id}. 如:
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[102, 103]
建立完節點後,kafka會将該broker的broker.name及端口号記錄到改節點,如
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/102
{"jmx_port":-1,"timestamp":"1433209686575","host":"host102","version":1,"port":9092}
另外,改broker節點屬性為臨時節點,當broker會話失效時,zookeeper會删除該節點,這樣,我們就可以很友善的監控到broker節點的變化,及時調整負載均衡等。
2.5.4 消費者與消費者組
a.每個consumer用戶端被建立時,會向zookeeper注冊自己的資訊;
b.此作用主要是為了"負載均衡".
c.同一個Consumer Group中的Consumers,Kafka将相應Topic中的每個消息隻發送給其中一個Consumer。
d.Consumer Group中的每個Consumer讀取Topic的一個或多個Partitions,并且是唯一的Consumer;
e.一個Consumer group的多個consumer的所有線程依次有序地消費一個topic的所有partitions,如果Consumer group中所有consumer總線程大于partitions數量,則會出現空閑情況;
舉例說明:
kafka叢集中建立一個topic為report-log 4 partitions 索引編号為0,1,2,3
假如有目前有三個消費者node:注意-->一個consumer中一個消費線程可以消費一個或多個partition.
如果每個consumer建立一個consumer thread線程,各個node消費情況如下,node1消費索引編号為0,1分區,node2費索引編号為2,node3費索引編号為3
如果每個consumer建立2個consumer thread線程,各個node消費情況如下(是從consumer node先後啟動狀态來确定的),node1消費索引編号為0,1分區;node2費索引編号為2,3;node3為空閑狀态
總結:
從以上可知,Consumer Group中各個consumer是根據先後啟動的順序有序消費一個topic的所有partitions的。
如果Consumer Group中所有consumer的總線程數大于partitions數量,則可能consumer thread或consumer會出現空閑狀态。
Consumer均衡算法
當一個group中,有consumer加入或者離開時,會觸發partitions均衡.均衡的最終目的,是提升topic的并發消費能力.
1) 假如topic1,具有如下partitions: P0,P1,P2,P3
2) 加入group中,有如下consumer: C0,C1
3) 首先根據partition索引号對partitions排序: P0,P1,P2,P3
4) 根據(consumer.id + '-'+ thread序号)排序: C0,C1
5) 計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6) 然後依次配置設定partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]