天天看點

Kafka

Kafka

1)producer :消息生産者,就是向kafka broker發消息的用戶端;

2)consumer :消息消費者,向kafka broker取消息的用戶端;

3)topic :可以了解為一個隊列;

4) consumer group (cg):這是kafka用來實作一個topic消息的廣播(發給所有的consumer)和單點傳播(發給任意一個consumer)的手段。一個topic可以有多個cg。topic的消息會複制(不是真的複制,是概念上的)到所有的cg,但每個partion隻會把消息發給該cg中的一個consumer。如果需要實作廣播,隻要每個consumer有一個獨立的cg就可以了。要實作單點傳播隻要所有的consumer在同一個cg。用cg還可以将consumer進行自由的分組而不需要多次發送消息到不同的topic;總結起來就是如下三點。

consumer group下可以有一個或多個consumer instance,consumer instance可以是一個程序,也可以是一個線程

group.id是一個字元串,唯一辨別一個consumer group

consumer group下訂閱的topic下的每個分區隻能配置設定給某個group下的一個consumer(當然該分區還可以被配置設定給其他group)。同一個topic下的不同的分區可以配置設定給同一個group下的consumer。

5)broker :一台kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic;

6)partition:為了實作擴充性,一個非常大的topic可以分布到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被配置設定一個有序的id(offset)。kafka隻保證按一個partition中的順序将消息發給consumer,不保證一個topic的整體(多個partition間)的順序;分區中leader負責讀寫,follower 負責同步,隻負責備份

kafka中的topic為什麼要進行分區?

若沒有分區,一個topic對應的消息集在分布式叢集服務組中,就會分布不均勻,即可能導緻某台伺服器a記錄目前topic的消息集很多,若此topic的消息壓力很大的情況下,伺服器a就可能導緻壓力很大,吞吐也容易導緻瓶頸。

有了分區後,假設一個topic可能分為10個分區,kafka内部會根據一定的算法把10分區盡可能均勻分布到不同的伺服器上,比如:a伺服器負責topic的分區1,b伺服器負責topic的分區2,在此情況下,producer發消息時若沒指定發送到哪個分區的時候,kafka就會根據一定算法上個消息可能分區1,下個消息可能在分區2。當然進階api也能自己實作其分發算法

7)offset:kafka的存儲檔案都是按照offset.kafka來命名,用offset做名字的好處是友善查找。例如你想找位于2049的位置,隻要找到2048.kafka的檔案即可。當然the first offset就是00000000000.kafka。

producer采用推(push)模式将消息釋出到broker,每條消息都被追加(append)到分區(patition)中,屬于順序寫磁盤(順序寫磁盤效率比随機寫記憶體要高,保障kafka吞吐率)。

消息發送時都被發送到一個topic,其本質就是一個目錄,而topic是由一些partition logs(分區日志)組成,其組織結構如下圖所示:

Kafka
Kafka

我們可以看到,每個partition中的消息都是有序的,生産的消息被不斷追加到partition log上,其中的每一個消息都被賦予了一個唯一的offset值。

1)分區的原因

(1) 友善在叢集中擴充,每個partition可以通過調整以适應它所在的機器,而一個topic又可以有多個partition組成,是以整個叢集就可以适應任意大小的資料了;

(2)可以提高并發,因為可以以partition為機關讀寫了。

2)分區的原則

(1)指定了patition,則直接使用;

(2)未指定patition但指定key,通過對key的value進行hash出一個patition;

(3)patition和key都未指定,使用輪詢選出一個patition。

同一個partition可能會有多個replication(對應 server.properties 配置中的 default.replication.factor=n)。沒有replication的情況下,一旦broker 當機,其上所有 patition 的資料都不可被消費,同時producer也不能再将資料存于其上的patition。引入replication之後,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer隻與這個leader互動,其它replication作為follower從leader 中複制資料。

 以ack機制為all的為例,producer寫入消息流程如下:

Kafka

1)producer先從zookeeper的 "/brokers/.../state"節點找到該partition的leader

2)producer将消息發送給該leader

3)leader将消息寫入本地log

4)followers從leader pull消息,寫入本地log後向leader發送ack

5)leader收到所有isr中的replication的ack後,增加hw(high watermark,最後commit 的offset)并向producer發送ack

kafka的ack的三種機制

request.required.acks有三個值 0 1 -1(all)

0:生産者不會等待broker的ack,這個延遲最低但是存儲的保證最弱當server挂掉的時候就會丢資料。

1:服務端會等待ack值 leader副本确認接收到消息後發送ack但是如果leader挂掉後他不確定是否複制完成新leader也會導緻資料丢失。

-1(all):服務端會等所有的follower的副本受到資料後才會受到leader發出的ack,這樣資料不會丢失

實體上把topic分成一個或多個patition(對應 server.properties 中的num.partitions=3配置),每個patition實體上對應一個檔案夾(該檔案夾存儲該patition的所有消息和索引檔案),如下:

kafka 是通過分段的方式将 log 分為多個 logsegment,logsegment 是一個邏輯上的概念,一個 logsegment 對應磁盤上的一個日志檔案和一個索引檔案,其中日志檔案是用來記錄消息的。索引檔案是用來儲存消息的索引。

具體可參考此文

無論消息是否被消費,kafka都會保留所有消息。有兩種政策可以删除舊資料:

1)基于時間:log.retention.hours=168

2)基于大小:log.retention.bytes=1073741824

需要注意的是,因為kafka讀取特定消息的時間複雜度為o(1),即與檔案大小無關,是以這裡删除過期檔案與提高 kafka 性能無關。

具體參考此文

https://www.jianshu.com/p/89a152f48674

zk不和生産者打交道,隻和kafka叢集還有消費者打交道

broker是分布式部署并且互相之間互相獨立,但是需要有一個注冊系統能夠将整個叢集中的broker管理起來,此時就使用到了zookeeper。在zookeeper上會有一個專門用來進行broker伺服器清單記錄的節點:

/brokers/ids

每個broker在啟動時,都會到zookeeper上進行注冊,即到/brokers/ids下建立屬于自己的節點,如/brokers/ids/[0...n]。

kafka使用了全局唯一的數字來指代每個broker伺服器,不同的broker必須使用不同的broker id進行注冊,建立完節點後,每個broker就會将自己的ip位址和端口資訊記錄到該節點中去。其中,broker建立的節點類型是臨時節點,一旦broker當機,則對應的臨時節點也會被自動删除。

在kafka中,同一個topic的消息會被分成多個分區并将其分布在多個broker上,這些分區資訊及與broker的對應關系也都是由zookeeper在維護,由專門的節點來記錄,如:

/borkers/topics

kafka中每個topic都會以/brokers/topics/[topic]的形式被記錄,如/brokers/topics/login和/brokers/topics/search等。broker伺服器啟動後,會到對應topic節點(/brokers/topics)上注冊自己的broker id并寫入針對該topic的分區總數,如/brokers/topics/login/3->2,這個節點表示broker id為3的一個broker伺服器,對于"login"這個topic的消息,提供了2個分區進行消息存儲,同樣,這個分區節點也是臨時節點。

由于同一個topic消息會被分區并将其分布在多個broker上,是以,生産者需要将消息合理地發送到這些分布式的broker上,那麼如何實作生産者的負載均衡,kafka支援傳統的四層負載均衡,也支援zookeeper方式實作負載均衡。

(1) 四層負載均衡,根據生産者的ip位址和端口來為其确定一個相關聯的broker。通常,一個生産者隻會對應單個broker,然後該生産者産生的消息都發往該broker。這種方式邏輯簡單,每個生産者不需要同其他系統建立額外的tcp連接配接,隻需要和broker維護單個tcp連接配接即可。但是,其無法做到真正的負載均衡,因為實際系統中的每個生産者産生的消息量及每個broker的消息存儲量都是不一樣的,如果有些生産者産生的消息遠多于其他生産者的話,那麼會導緻不同的broker接收到的消息總數差異巨大,同時,生産者也無法實時感覺到broker的新增和删除。

(2) 使用zookeeper進行負載均衡,由于每個broker啟動時,都會完成broker注冊過程,生産者會通過該節點的變化來動态地感覺到broker伺服器清單的變更,這樣就可以實作動态的負載均衡機制。

與生産者類似,kafka中的消費者同樣需要進行負載均衡來實作多個消費者合理地從對應的broker伺服器上接收消息,每個消費者分組包含若幹消費者,每條消息都隻會發送給分組中的一個消費者,不同的消費者分組消費自己特定的topic下面的消息,互不幹擾。

消費組 (consumer group):

consumer group 下有多個 consumer(消費者)。

對于每個消費者組 (consumer group),kafka都會為其配置設定一個全局唯一的group id,group 内部的所有消費者共享該 id。訂閱的topic下的每個分區隻能配置設定給某個 group 下的一個consumer(當然該分區還可以被配置設定給其他group)。

同時,kafka為每個消費者配置設定一個consumer id,通常采用"hostname:uuid"形式表示。

在kafka中,規定了每個消息分區 隻能被同組的一個消費者進行消費,是以,需要在 zookeeper 上記錄 消息分區 與 consumer 之間的關系,每個消費者一旦确定了對一個消息分區的消費權力,需要将其consumer id 寫入到 zookeeper 對應消息分區的臨時節點上,例如:

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

其中,[broker_id-partition_id]就是一個 消息分區 的辨別,節點内容就是該 消息分區 上 消費者的consumer id。

在消費者對指定消息分區進行消息消費的過程中,需要定時地将分區消息的消費進度offset記錄到zookeeper上,以便在該消費者進行重新開機或者其他消費者重新接管該消息分區的消息消費後,能夠從之前的進度開始繼續進行消息消費。offset在zookeeper中由一個專門節點進行記錄,其節點路徑為:

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

節點内容就是offset的值。

消費者伺服器在初始化啟動時加入消費者分組的步驟如下

注冊到消費者分組。每個消費者伺服器啟動時,都會到zookeeper的指定節點下建立一個屬于自己的消費者節點,例如/consumers/[group_id]/ids/[consumer_id],完成節點建立後,消費者就會将自己訂閱的topic資訊寫入該臨時節點。

對 消費者分組 中的 消費者 的變化注冊監聽。每個 消費者 都需要關注所屬 消費者分組 中其他消費者伺服器的變化情況,即對/consumers/[group_id]/ids節點注冊子節點變化的watcher監聽,一旦發現消費者新增或減少,就觸發消費者的負載均衡。

對broker伺服器變化注冊監聽。消費者需要對/broker/ids/[0-n]中的節點進行監聽,如果發現broker伺服器清單發生變化,那麼就根據具體情況來決定是否需要進行消費者負載均衡。

進行消費者負載均衡。為了讓同一個topic下不同分區的消息盡量均衡地被多個 消費者 消費而進行 消費者 與 消息 分區配置設定的過程,通常,對于一個消費者分組,如果組内的消費者伺服器發生變更或broker伺服器發生變更,會發出消費者負載均衡。

kafka使用檔案存儲消息(append only log),這就直接決定kafka在性能上嚴重依賴檔案系統的本身特性.且無論任何os下,對檔案系統本身的優化是非常艱難的.檔案緩存/直接記憶體映射等是常用的手段.因為kafka是對日志檔案進行append操作,是以磁盤檢索的開支是較小的;同時為了減少磁盤寫入的次數,broker會将消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁盤,這樣減少了磁盤io調用的次數.對于kafka而言,較高性能的磁盤,将會帶來更加直接的性能提升.

除磁盤io之外,我們還需要考慮網絡io,這直接關系到kafka的吞吐量問題.kafka并沒有提供太多高超的技巧;對于producer端,可以将消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對于consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置檔案來指定.對于kafka broker端,似乎有個sendfile系統調用可以潛在的提升網絡io的性能:将檔案的資料映射到系統記憶體中,socket直接讀取相應的記憶體區域即可,而無需程序再次copy和交換(這裡涉及到"磁盤io資料"/"核心記憶體"/"程序記憶體"/"網絡緩沖區",多者之間的資料copy).

其實對于producer/consumer/broker三者而言,cpu的開支應該都不大,是以啟用消息壓縮機制是一個良好的政策;壓縮需要消耗少量的cpu資源,不過對于kafka而言,網絡io更應該需要考慮.可以将任何在網絡上傳輸的消息都經過壓縮.kafka支援gzip/snappy等多種壓縮方式.

kafka叢集中的任何一個broker,都可以向producer提供metadata資訊,這些metadata中包含"叢集中存活的servers清單"/"partitions leader清單"等資訊(請參看zookeeper中的節點資訊). 當producer擷取到metadata資訊之後, producer将會和topic下所有partition leader保持socket連接配接;消息由producer直接通過socket發送到broker,中間不會經過任何"路由層".

異步發送,将多條消息暫且在用戶端buffer起來,并将他們批量發送到broker;小資料io太多,會拖慢整體的網絡延遲,批量延遲發送事實上提升了網絡效率;不過這也有一定的隐患,比如當producer失效時,那些尚未發送的消息将會丢失。

其他jms實作,消息消費的位置是有producer保留,以便避免重複發送消息或者将沒有消費成功的消息重發等,同時還要控制消息的狀态.這就要求jms broker需要太多額外的工作.在kafka中,partition中的消息隻有一個consumer在消費,且不存在消息狀态的控制,也沒有複雜的消息确認機制,可見kafka broker端是相當輕量級的.當消息被consumer接收之後,consumer可以在本地儲存最後消息的offset,并間歇性的向zookeeper注冊offset.由此可見,consumer用戶端也很輕量級。

kafka中consumer負責維護消息的消費記錄,而broker則不關心這些,這種設計不僅提高了consumer端的靈活性,也适度的減輕了broker端設計的複雜度;這是和衆多jms producer的差別.此外,kafka中消息ack的設計也和jms有很大不同,kafka中的消息是批量(通常以消息的條數或者chunk的尺寸為機關)發送給consumer,當消息消費成功後,向zookeeper送出消息的offset,而不會向broker傳遞ack.或許你已經意識到,這種"寬松"的設計,将會有"丢失"消息/"消息重發"的危險.

kafka提供3種消息傳輸一緻性語義:最多1次,最少1次,恰好1次。

最少1次:可能會重傳資料,有可能出現資料被重複處理的情況;

最多1次:可能會出現資料丢失情況;

恰好1次:并不是指真正隻傳輸1次,隻不過有一個機制。確定不會出現“資料被重複處理”和“資料丢失”的情況。

at most once: 消費者fetch消息,然後儲存offset,然後處理消息;當client儲存offset之後,但是在消息處理過程中consumer程序失效(crash),導緻部分消息未能繼續處理.那麼此後可能其他consumer會接管,但是因為offset已經提前儲存,那麼新的consumer将不能fetch到offset之前的消息(盡管它們尚沒有被處理),這就是"at most once".

at least once: 消費者fetch消息,然後處理消息,然後儲存offset.如果消息處理成功之後,但是在儲存offset階段zookeeper異常或者consumer失效,導緻儲存offset操作未能執行成功,這就導緻接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once".

"kafka cluster"到消費者的場景中可以采取以下方案來得到“恰好1次”的一緻性語義:

最少1次+消費者的輸出中額外增加已處理消息最大編号:由于已處理消息最大編号的存在,不會出現重複處理消息的情況。

----------------------------------------------

啟動zookeeper: ./ zkserver.sh,然後 zkserver.sh status檢視zookeeper是否被啟動

啟動kafka

bin/kafka-server-start.sh config/server.properties

建立一個kafka topic-

bin/kafka-topics.sh --zookeeper localhost:2181 \

--create --replication-factor 1 --partitions 1 --topic first

檢視topic清單

bin/kafka-topics.sh --zookeeper localhost:2181 --list

往topic裡面寫資料

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first

消費資料

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic

高版本offset維護在本地,是為了提高效率

-----