一、Kafka的基本概述
1、Kafka是什麼?
Kafka官網上對Kafka的定義叫:Adistributed publish-subscribe messaging system。publish-subscribe是釋出和訂閱的意思,是以準确的說Kafka是一個消息訂閱和釋出的系統。最初,Kafka實際上是LinkedIn用于日志處理的分布式消息隊列,LinkedIn的日志資料容量大,但對可靠性要求不高,其日志資料主要包括使用者行為(登入、浏覽、點選、分享、喜歡)以及系統運作日志(CPU、記憶體、磁盤、網絡、系統及程序狀态)。
2、Kafka能做什麼?
現今,Kafka主要用于處理活躍的流式資料,如分析使用者的行為,包括使用者的pageview(頁面浏覽),以便能夠設計出更好的廣告位,對使用者搜尋關鍵詞進行統計以便分析出目前的流行趨勢,比如經濟學上著名的長裙理論:如果長裙的銷量高了,說明經濟不景氣了,因為姑娘們沒錢買各種絲襪了。當然還有些業務資料,如果存資料庫浪費,而直接用傳統的存硬碟方式效率又低下,這個時候,也可以使用Kafka的分布式進行存儲。
3、Kafka中的相關概念
· Broker
Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker。一台Kafka伺服器就是一個broker,一個叢集由多個broker組成,一個broker可以容納多個topic。
· Topic
每條釋出到Kafka叢集的消息都有一個類别,這個類别被稱為Topic,Kafka中Topic可以了解為一個存儲消息的隊列。實體上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然儲存在一個或多個broker上但使用者隻需指定消息的Topic即可生産或消費資料而不必關心資料存于何處。
· Partition
Partition是實體上的概念,Kafka實體上把Topic分成一個或多個Partition,每個Partition在實體上對應一個檔案夾,該檔案夾下存儲這個Partition的所有消息和索引檔案。如建立topic1和topic2兩個topic,且分别有13個和19個Partition分區,則整個叢集上相應會生成32個檔案夾。為了實作擴充性,一個非常大的topic可以分布到多個broker上,但Kafka隻保證按一個partition中的順序将消息發給consumer,不保證一個topic的整體(多個partition間)的順序。
· Producer
負責釋出消息到Kafkabroker。
· Consumer
消息消費者,向Kafkabroker讀取消息的用戶端。
· Consumer Group(CG)
這是Kafka用來實作一個topic消息的廣播(發給所有的consumer)和單點傳播(發給任意一個consumer)的手段。一個 topic可以屬于多個CG。topic的消息會複制(不是真的複制,是概念上的)到所有的CG,但每個CG隻會把消息發給該CG中的一個consumer。如果需要實作廣播,隻要每個consumer有一個獨立的CG就可以了。要實作單點傳播隻要所有的consumer在同一個CG。用CG還可以将consumer進行自由的分組而不需要多次發送消息到不同的topic。每個consumer屬于一個特定的Consumer Group, Kafka允許為每個consumer指定group name,若不指定group name則屬于預設的group。
4、Kafka的特性:
1)資料在磁盤上存取代價為O(1),而一般資料在磁盤上是使用BTree存儲的,存取代價為O(lgn)。
2)高吞吐率:即使在普通的節點(非常普通的硬體)上每秒鐘也能處理成百上千的message。
3)顯式分布式:即所有的producer、broker和consumer都會有多個,均勻分布并支援通過Kafka伺服器和消費機叢集來分區消息。
4)支援資料并行加載到Hadoop中。
5) 支援Broker間的消息分區及分布式消費,同時保證每個partition内的消息順序傳輸。
6)同時支援離線資料處理和實時資料處理:目前很多的消息隊列服務提供可靠傳遞保證,并預設是即時消費(不适合離線),而Kafka通過建構分布式的叢集,允許消息在系統中累積,使得Kafka同時支援離線和線上日志處理。
7)Scale out:支援線上水準擴充。
二、Kafka的架構設計
1、 最簡單的Kafka部署圖
如果将消息的釋出(publish)稱作producer,将消息的訂閱(subscribe)表述為consumer,将中間的存儲陣列稱作broker,這樣可以得到一個最簡單的消息釋出與訂閱模型:

2、Kafka的拓撲圖:
Kafka是顯示的分布式消息釋出和訂閱系統,除了有多個producer, broker,consumer外,還有一個zookeeper叢集用于管理producer,broker和consumer之間的協同調用。
從圖中可以看出,一個典型的Kafka叢集中包含若幹Producer(可以是web前端産生的PageView,或者是伺服器日志,系統CPU、Memory等),若幹broker(Kafka支援水準擴充,一般broker數量越多,叢集吞吐率越高),若幹Consumer Group,以及一個Zookeeper叢集。Kafka通過Zookeeper管理叢集配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式将消息釋出到broker,Consumer使用pull模式從broker訂閱并消費消息。
圖上有個細節需要注意,Producer到Broker的過程是push,也就是有資料就推送到Broker,而Consumer到Broker的過程是pull,是通過Consumer主動去拉資料的,而不是Broker把資料主動發送到Consumer端的。
3、Zookeeper和Producer、Broker、Consumer的協同工作
為了便于了解,假定此時Kafka叢集中有兩台Producer,但隻有一台Kafka的Broker、Zookeeper和Consumer。如下圖所示的部署叢集。
1、 Kafka Broker其實就是Kafka的server,Broker主要做存儲用,每個Broker啟動後會在Zookeeper上注冊一個臨時的broker registry,包含Broker的IP位址和端口号,所存儲的topics和partitions資訊。
2、Zookeeper,可以把Zookeeper想象成它維持了一張表,記錄了各個節點的IP、端口等配置資訊。
3、Producer1,Producer2,Consumer的共同之處就是都配置了zkClient,更明确的說,就是運作前必須配置Zookeeper的位址,道理也很簡單,因為他們之間的連接配接都是需要Zookeeper來進行分發的。
4、Kafka Broker和Zookeeper可以放在一台機器上,也可以分開放,此外Zookeeper也可以配叢集,這樣就不會出現單點故障。
5、 每個Consumer啟動後會在Zookeeper上注冊一個臨時的consumer registry:包含Consumer所屬的Consumer Group以及訂閱的topics。每個Consumer Group關聯一個臨時的owner registry和一個持久的offset registry。對于被訂閱的每個partition包含一個owner registry,内容為訂閱這個partition的consumer id,同時包含一個offset registry,内容為上一次訂閱的offset。
整個系統運作的順序可簡單歸納為:
1、啟動Zookeeper的server。
2、啟動Kafka的server。
3、Producer如果生産了資料, 會先通過Zookeeper找到Broker,然後将資料存放進Broker。
4、Consumer如果要消費資料,會先通過Zookeeper找對應的Broker,然後消費。
Producer代碼執行個體:
producer = new Producer(...); message = new Message("Hello Ebay".getBytes()); set = new MessageSet(message); producer.send("topic1", set); |
釋出消息時,Producer先構造一條消息,将消息加入到消息集set中(Kafka支援批量釋出,可以往消息集合中添加多條消息,然後一次性釋出), send消息時,client需指定消息所屬的topic。
Consumer代碼執行個體:
streams[] = Consumer.createMessageStreams("topic1", 1); for (message : streams) { bytes = message.payload(); // do something with the bytes } |
訂閱消息時,Consumer需指定topic以及partition num(每個partition對應一個邏輯日志流,如topic代表某個産品線,partition代表産品線的日志按天切分的結果),client訂閱後,就可疊代讀取消息,如果沒有消息,client會阻塞直到有新的消息釋出。Consumer可以累積确認接收到的消息,當其确認了某個offset的消息,意味着之前的消息也都已成功接收到,此時Broker會更新Zookeeper上的offset registry。
那麼怎樣記錄每個Consumer處理的資訊的狀态呢?其實在Kafka中僅儲存了每個Consumer已經處理資料的offset。這樣有兩個好處:一是儲存的資料量少,二是當Consumer出錯時,重新啟動Consumer處理資料時,隻需從最近的offset開始處理資料即可。
4、Kafka的存儲政策
Kafka broker主要是用于做存儲使用,每個broker上可存儲很多的topic資訊。
4、Kafka的存儲政策
Kafka broker主要是用于做存儲使用,每個broker上可存儲很多的topic資訊。
2、每個segment中存儲多條消息,消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
3、每個partition在記憶體中對應一個index,記錄每個segment中的第一條消息偏移。
4、Producer發到某個topic的消息會被均勻的分布到多個part上(随機或根據使用者指定的回調函數進行分布),broker收到釋出消息往對應part的最後一個segment上添加該消息,當某個segment上的消息條數達到配置值或消息釋出時間超過門檻值時,segment上的消息會被flush到磁盤,隻有flush到磁盤上的消息訂閱者才能訂閱到,segment達到一定的大小後将不會再往該segment寫資料,broker會建立新的segment。
5、Kafka的deliveryguarantee
在Kafka上,有兩個原因可能導緻低效:太多的網絡請求和過多的位元組拷貝。為了提高網絡使用率,Kafka把message分成一組一組的,每次請求會把一組message發給相應的consumer。 此外,為了減少位元組拷貝,采用sendfile系統調用, sendfile比傳統的利用socket發送檔案進行拷貝高效得多。
Kafka支援有這幾種delivery guarantee:
· Atmost once 消息可能會丢,但絕不會重複傳輸
· Atleast one 消息絕不會丢,但可能會重複傳輸
· Exactlyonce 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是使用者所想要的。
6、Kafka多資料中心的資料流拓撲結構
實際設計中,有時由于安全性問題并不想讓一個單個的Kafka叢集系統跨越多個資料中心(資料中心之間可以進行通信),而是想讓Kafka支援多資料中心的資料流拓撲結構。這可通過在叢集之間進行鏡像或“同步”實作。這個功能非常簡單,鏡像叢集隻是作為源叢集的資料使用者(Consumer)的角色運作。這意味着,一個單個的叢集就能夠将來自多個資料中心的資料集中到一個位置。下面所示是可用于支援批量裝載(batch loads)的多資料中心拓撲結構的一個例子:
請注意,在圖中上方的兩個叢集之間不存在通信連接配接,兩者可能大小不同,具有不同數量的節點,而下面部分中的這個單個的叢集可以鏡像任意數量的源叢集。