kafka是什麼?
Kafka是一個分布式流式存儲并處理的消息隊列。由scale+java語言編寫,它提供了類似于JMS的特性,但是在設計實作上又完全不同,因為kafka并不是按照JMS規範實作的。kafka叢集由多個broke(Kafka執行個體稱之為broke)組成,在叢集裡,kafka通過消息訂閱和釋出将消息以topic的形式釋出出來,同時,消息也是存儲在topic中的,消息的發送者成為producer,消息接受者成為Consummer。
同時,topic 是根據分區partitions,和副本replications來實作的資料的分布式存儲,和加強資料的可靠性。

何為topic?
一個topic可以認為是一類消息,每個topic将被分成多個partitions,每個partition在存儲append log的形式存在檔案裡的。任何釋出到partition的消息都會直接被追加到log檔案的末尾,每條消息在檔案中的位置稱之為offset偏移量,offset為一個long型數字,它唯一辨別一條消息,kafka并沒有提供其他索引來存儲offset,是以kafka不支援消息的随機讀寫。
kafka和JMS(Java Message Service)實作(activeMQ)不同的是:即使消息被消費,消息仍然不會被立即删除.日志檔案将會根據broker中的配置要求,保留一定的時間之後(預設是7天)删除;比如log檔案保留2天,那麼兩天後,檔案會被清除,無論其中的消息是否被消費.kafka通過這種簡單的手段,來釋放磁盤空間,以及減少消息消費之後對檔案内容改動的磁盤IO開支.
kafka消息如何消費的?
對于consumer而言,它需要儲存消費消息的offset,對于offset的儲存和使用,有consumer來控制;當consumer正常消費消息時,offset将會"線性"的向前驅動,即消息将依次順序被消費.事實上consumer可以使用任意順序消費消息,它隻需要将offset重置為任意值..(kafka 老版本中offset将會儲存在zookeeper中,1.x之後也會存儲在broke叢集裡,參見下文)
kafka 叢集裡consumer和producer的狀态資訊是如何儲存的?
kafka叢集幾乎不需要維護任何consumer和producer狀态資訊,這些資訊由zookeeper儲存;是以producer和consumer的用戶端實作非常輕量級,它們可以随意離開,而不會對叢集造成額外的影響.
kafka為何要引入分區的概念,有何好處?
partitions的設計目的有多個.最根本原因是kafka基于檔案存儲.通過分區,可以将日志内容分散到多個kafka執行個體上,來避免檔案尺寸達到單機磁盤的上限,每個partiton都會被目前server(kafka執行個體)儲存;可以将一個topic切分多任意多個partitions,來消息儲存/消費的效率.此外越多的partitions意味着可以容納更多的consumer,有效提升并發消費的能力.有負載均衡的功效(具體原理參見下文).
kakfa資料是如何寫入到磁盤的?
一個Topic的多個partitions,被分布在kafka叢集中的多個server上;每個server(kafka執行個體)負責partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition将會被備份到多台機器上,以提高可用性.
基于replicated方案,那麼就意味着需要對多個備份進行排程;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那麼将會有其他follower來接管(成為新的leader);follower隻是單調的和leader跟進,同步消息即可..由此可見作為leader的server承載了全部的請求壓力,是以從叢集的整體考慮,有多少個partitions就意味着有多少個"leader",kafka會将"leader"均衡的分散在每個執行個體上,來確定整體的性能穩定.這和zookeeper的follower是有差別的:zookeeper的follower是可以讀到資料的,而kafka的follower是讀不到資料的。
kafka使用檔案存儲消息,這就直接決定kafka在性能上嚴重依賴檔案系統的本身特性.且無論任何OS下,對檔案系統本身的優化幾乎沒有可能.檔案緩存/直接記憶體映射等是常用的手段.因為kafka是對日志檔案進行append操作,是以磁盤檢索的開支是較小的;同時為了減少磁盤寫入的次數,broker會将消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁盤,這樣減少了磁盤IO調用的次數.
kafka中消費者組如何了解?
Producer将消息釋出到指定的Topic中,同時Producer也能決定将此消息歸屬于哪個partition;比如基于"round-robin"方式或者通過其他的一些算法等.
本質上kafka隻支援Topic.每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發送到Topic的消息,隻會被訂閱此Topic的每個group中的一個consumer消費.
如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息将會在consumers之間負載均衡.
如果所有的consumer都具有不同的group,那這就是"釋出-訂閱";消息将會廣播給所有的消費者.
在kafka中,一個partition中的消息隻會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,隻會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息.kafka隻能保證一個partition中的消息被某個consumer消費時,消息是順序的.事實上,從Topic角度來說,消息仍不是有序的. 因為消費者消費消息的時候是按照分區依次讀取的,是以無法保證消息的全局順序性,隻能保證在同一個分區内的消息是順序的。如果想要所有的消息都是順序的,可以把分區數設定為1.
kafka中如何保證資料一段時間内不丢失?
kafka 的producer有ACK機制。可以由使用者自行設定是否開啟确認機制,如果開啟确認機制,kafka會等發送消息到kafka叢集時,當leader伺服器,會傳回中繼資料給producer用戶端,ACK機制也在中繼資料裡,這裡的ACK有兩種,一種就是leader隻要接收成功,就傳回确認,另外一種就是:要等所有follower都收到了之後才傳回确認。producer在接收到确認之後,才會發下一條消息。而所有的消息最終都是存儲在磁盤一段時間的,是以一段時間内消息是不會丢失的。
kafka 的應用場景主要有哪些?
官方介紹是講可以用作message queue,資料采集,簡單流式計算等。
用作消息隊列message queue有哪些優缺點?
對于一些正常的消息系統,kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴充性和性能優勢.不過到目前為止,我們應該很清楚認識到,kafka并沒有提供JMS中的"事務性""消息傳輸擔保(消息确認機制)""消息分組"等企業級特性;kafka隻能使用作為"正常"的消息系統,在一定程度上,尚未確定消息的發送與接收絕對可靠(比如,消息重發,消息發送丢失等)
kafka是如何保持高性能的?
需要考慮的影響性能點很多,除磁盤IO之外,我們還需要考慮網絡IO,這直接關系到kafka的吞吐量問題.kafka并沒有提供太多高超的技巧;對于producer端,可以将消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對于consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置檔案來指定.對于kafka broker端,似乎有個sendfile系統調用可以潛在的提升網絡IO的性能:将檔案的資料映射到系統記憶體中,socket直接讀取相應的記憶體區域即可,而無需程序再次copy和交換. 其實對于producer/consumer/broker三者而言,CPU的開支應該都不大,是以啟用消息壓縮機制是一個良好的政策;壓縮需要消耗少量的CPU資源,不過對于kafka而言,網絡IO更應該需要考慮.可以将任何在網絡上傳輸的消息都經過壓縮.kafka支援gzip/snappy等多種壓縮方式.
kafka在消費者端有哪些異常處理政策?
對于JMS實作,消息傳輸擔保非常直接:有且隻有一次(exactly once).在kafka中稍有不同:
- at most once: 最多一次,這個和JMS中"非持久化"消息類似.發送一次,無論成敗,将不會重發.
- at least once: 消息至少發送一次,如果消息未能接受成功,可能會重發,直到接收成功.
-
exactly once: 消息隻會發送一次.
at most once: 消費者fetch消息,然後儲存offset,然後處理消息;當client儲存offset之後,但是在消息處理過程中出現了異常,導緻部分消息未能繼續處理.那麼此後"未處理"的消息将不能被fetch到,這就是"at most once".
at least once: 消費者fetch消息,然後處理消息,然後儲存offset.如果消息處理成功之後,但是在儲存offset階段zookeeper異常導緻儲存操作未能執行成功,這就導緻接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once",原因offset沒有及時的送出給zookeeper,zookeeper恢複正常還是之前offset狀态.
exactly once: kafka中并沒有嚴格的去實作基于2階段送出,事務),我們認為這種政策在kafka中是沒有必要的.
通常情況下"at-least-once"是我們搜選.(相比at most once而言,重複接收資料總比丢失資料要好).
kafka 工作流程是怎樣的?
- 主要結構圖:大體可以從三個方面分析:生産者産生消息、消費者消費消息、Broker cluster儲存消息。
kafka基本介紹 - 生産者産生消息過程分析
-
寫入方式:
producer 采用push的方式将消息發送到broker cluster,每條消息都被追加到分區中,屬于順序寫磁盤(順序寫磁盤效率比随機寫記憶體效率要高,能提高Kafka吞吐率)
而且broker叢集并不是每一條消息都及時寫磁盤,而是先寫buffer,達到一定大小或者每隔一段時間再flush到磁盤上。
多個producer可以給同一個topic 釋出消息,而且可以指定分區釋出。
-
分區Partition
每個Topic可以有多個分區,而消息最終是存儲在磁盤的檔案裡的,Partition在磁盤上是檔案夾的形式存在的。如
cd /var/applog/kafka/ ## 賺到kafka資料目錄 即log.dir=配置的目錄
ls
cleaner-offset-checkpoint __consumer_offsets-22 __consumer_offsets-4 log-start-offset-checkpoint recovery-point-offset-checkpoint
__consumer_offsets-1 __consumer_offsets-25 __consumer_offsets-40 meta.properties replication-offset-checkpoint
__consumer_offsets-10 __consumer_offsets-28 __consumer_offsets-43 mytest-0 test-0
__consumer_offsets-13 __consumer_offsets-31 __consumer_offsets-46 mytest-1
__consumer_offsets-16 __consumer_offsets-34 __consumer_offsets-49 mytest-2
__consumer_offsets-19 __consumer_offsets-37 __consumer_offsets-7 mytest-3
其中mytest-0 mytest-1 mytest-2 mytest-3 即為分區Partition,裡面的檔案就是分區裡面存放的資料。
-
broker cluster 儲存消息
broker 收到消息後,首先會去找topic對應分區的leader,找到leader後,先将資料寫入buffer,再flush到磁盤。然後zookeeper會協調follower自動同步leader分區的資料,以達到replication備份的目的,同時leader會按照備份完成的先後順序給follower作一次排序,作為leader發生意外時選舉時選舉為leader的順序。
kafka基本介紹 - 消費者消費消息
- 消費者消費消息,同一個分區裡的資料不能夠被一個消費組裡面的多個消費者同時消費,同一個消費組裡的消費者隻能消費不同分區的資料。
- 不同消費者組可以消費同一個分區裡的資料。
- 消費者消費資料時是按照分區的一個一個分區資料進行消費的。
zookeeper在kafka中的具體作用是什麼?
kafka是依賴于zookeeper注冊中心的,主要來協調各個broker的分區備份,broker的選舉,以及消費者相關狀資訊的存儲。
kafka使用zookeeper來存儲一些meta資訊,并使用了zookeeper watch機制來發現meta資訊的變更并作出相應的動作(比如consumer失效,觸發負載均衡等)
在0.8.0.2之後引入了選舉contorller的方式來控制replication,以及consumer狀态相關的資訊,而不是直接由zookeeper去協調控制,減少了zookeeper的壓力,但是卻增加了broke的複雜度。
-
Broker node registry: 當一個kafkabroker啟動後,首先會向zookeeper注冊自己的節點資訊(臨時znode),同時當broker和zookeeper斷開連接配接時,此znode也會被删除.
格式: /broker/ids/[0...N] -->host:port;其中[0..N]表示broker id,每個broker的配置檔案中都需要指定一個數字類型的id(全局不可重複),znode的值為此broker的host:port資訊.
$ zkCli -server k8s-n1:2181
$ ls /brokers
[ids, topics, seqid]
$ ls /brokers/ids
[0, 1, 2]
$ get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://k8s-n1:9092"],"jmx_port":-1,"host":"k8s-n1","timestamp":"1556568752340","port":9092,"version":4}
cZxid = 0xd0000003c
ctime = Wed Apr 24 16:10:19 CST 2019
mZxid = 0xd0000003c
mtime = Wed Apr 24 16:10:19 CST 2019
pZxid = 0xd0000003c
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x26a4e173fc40002
dataLength = 182
numChildren = 0
-
Broker Topic Registry: 當一個broker啟動時,會向zookeeper注冊自己持有的topic和partitions資訊,仍然是一個臨時znode.
格式: /broker/topics/[topic]/[0...N] 其中[0..N]表示partition索引号.
$ ls /brokers/topics
[test, __consumer_offsets]
__consumer_offsets 是消費端的offset
$ ls /brokers/topics/test
[partitions] ##test的分區資訊
$ ls /brokers/topics/test/partitions
[0]
$ ls /brokers/topics/test/partitions/0
[state]
$ get /brokers/topics/test/partitions/0/state
{"controller_epoch":19,"leader":0,"version":1,"leader_epoch":3,"isr":[0]}
cZxid = 0x2000000b6
ctime = Wed Apr 24 07:53:42 CST 2019
mZxid = 0xd00000044
mtime = Wed Apr 24 16:10:19 CST 2019
pZxid = 0x2000000b6
cversion = 0
dataVersion = 3
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 73
numChildren = 0
-
Consumer and Consumer group: 每個consumer用戶端被建立時,會向zookeeper注冊自己的資訊;此作用主要是為了"負載均衡".
一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了性能考慮,讓partition相對均衡的分散到每個consumer上.
-
Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置檔案指定,也可以由系統生成),此id用來标記消費者資訊.
格式:/consumers/[group_id]/ids/[consumer_id]
仍然是一個臨時的znode,此節點的值為{"topic_name":#streams...},即表示此consumer目前所消費的topic + partitions清單.
啟動消費者:
$ kafka-console-consumer.sh --bootstrap-server k8s-n2:9092 --topic test
啟動生成者:
kafka-console-producer.sh --broker-list k8s-n1:9092 --topic test
檢視zookeeper資訊:
$ ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
$ ls /consumers
[]
發現consummer下啥也沒有?這是因為新版本的kafka,consumer中offset不是放在這個位置的,而是放在__consumer_offset 這個topic下的。那麼該如何驗證呢?
啟動消費者:
$ kafka-console-consumer.sh --bootstrap-server k8s-n2:9092 --topic test
啟動生成者:
kafka-console-producer.sh --broker-list k8s-n1:9092 --topic test
驗證消息生産成功
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list k8s-n1:9092 --topic mytest --time -1
mytest:0:15
mytest:1:16
mytest:2:16
mytest:3:15
mytest topic 上 0号分區有15條消息。很好了解。
再建立一個消費者組
kafka-console-consumer.sh --bootstrap-server k8s-n1:9092 --topic mytest --from-beginning
查詢一下消費者組資訊
kafka-consumer-groups.sh --bootstrap-server k8s-n1:9092 --list
console-consumer-24766
console-consumer-52794
查詢一下topic裡的内容:
kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server k8s-n1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
結果:
[console-consumer-52794,__consumer_offsets,12]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1556122524504, expireTimestamp=None)
[console-consumer-52794,__consumer_offsets,45]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1556122524504, expireTimestamp=None)
[console-consumer-52794,__consumer_offsets,1]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1556122524504, expireTimestamp=None)
[console-consumer-52794,__consumer_offsets,5]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1556122524504, expireTimestamp=None)
[console-consumer-52794,__consumer_offsets,26]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1556122524504, expireTimestamp=None)
[console-consumer-52794,__consumer_offsets,29]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1556122524504, expireTimestamp=None)
[console-consumer-52794,__consumer_offsets,34]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1556122524504, expireTimestamp=None)
[console-consumer-52794,__consumer_offsets,10]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1556122524504, expireTimestamp=None)
[console-consumer-52794,__consumer_offsets,32]::OffsetAndMetadata(offset=5, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1556122524504, expireTimestamp=None)
[console-consumer-52794,__consumer_offsets,40]::OffsetAndMetadata(offset=3, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1556122524504, expireTimestamp=None)
^CProcessed a total of 1674 messages
參考了 http://www.cnblogs.com/huxi2b/p/6061110.html這篇blog的作法,但是我的版本是kafka_2.2.0裡面并沒有找offset的指令。
-
Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.
格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value
此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費.
-
Partition Owner registry: 用來标記partition被哪個consumer消費.臨時znode
格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id當consumer啟動時,所觸發的操作:
A) 首先進行"Consumer id Registry";
B) 然後在"Consumer id Registry"節點下注冊一個watch用來監聽目前group中其他consumer的"leave"和"join";隻要此znode path下節點清單變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那麼其他consumer接管partitions).
C) 在"Broker id registry"節點下,注冊一個watch用來監聽broker的存活情況;如果broker清單變更,将會觸發所有的groups下的consumer重新balance.