1.kafka消費者程式設計模型
分區消費模型
組(group)消費模型
1.1.分區消費模型
1.1.1.分區消費架構圖,每個分區對應一個消費者。
1.1.2.分區消費模型僞代碼描述
指定偏移量,用于從上次消費的地方開始消費.
送出offset ,java用戶端會自動送出的叢集,是以這一步可選。
1.2.組(group)消費模型
1.2.1.組消費模型架構圖
每個組都消費該topic的全量資料,一條消息會發給groupA和groupB.
1.2.2.組消費模型僞代碼:
流數N:表示一個consumer組裡面有幾個consumer 執行個體,上例中組A建立2個流,組B建立4個流。
1.2.3.consumer配置設定算法
當kafka的分區個數大于組A裡consumer執行個體個數時,怎麼去配置設定,以下為配置設定步驟:
1.3.兩種消費模型對比
Partition消費模型更加靈活但是:
(1)需要自己處理各種異常情況;
(2)需要自己管理offset(以實作消息傳遞的其他語義);
Group消費模型更加簡單,但是不靈活:
(1)不需要自己處理異常情況,不需要自己管理offset;
(2)隻能實作kafka預設的最少一次消息傳遞語義;
知識補充:消息傳遞的3中語義:
至少一次,(消息不會丢,消息者至少得到一次,但有可能會重複,生産者向消費者發送之後,會等待消費者确認,沒收到确認會再發) (kafka 預設實作的語義)。
至多一次,(消息會丢)
有且隻有一次。
1.4.java 用戶端參數調優
fetchSize: 從伺服器擷取單包大小;
bufferSize: kafka用戶端緩沖區大小;
group.id: 分組消費時分組名 (指定的每個組将獲得全量的資料)
2.生産者消費模型
同步生産模型
異步生産模型
2.1. 同步生産模型
至少成功一次 , 發送給kafka消費者
2.2.異步生産模型
打包發送給kafka broker。
2.3.兩種生産模型僞代碼描述
main()
建立到kafka broker的連接配接:KafkaClient(host,port)
選擇或者自定義生産者負載均衡算法 partitioner (算法有:hash,輪詢,随機)
設定生産者參數 (緩存隊列長度,發送時間,同步/異步參數設定)
根據負載均衡算法和設定的生産者參數構造Producer對象
while True
getMessage:從上遊獲得一條消息
按照kafka要求的消息格式構造kafka消息
根據分區算法得到分區
發送消息
處理異常
2.4.兩種生産模型對比
同步生産模型:
(1)低消息丢失率;
(2)高消息重複率(由于網絡原因,回複确認未收到);
(3)高延遲 (每發一條消息需要确認)
(使用在不丢消息場景)
異步生産模型:
(1)低延遲;
(2)高發送性能;(每秒一個分區發50萬條)
(3)高消息丢失率(無确認機制,發送端隊列滿了,消息會丢掉;整個隊列發送給)
(使用在允許丢消息場景,偶爾丢一條)
2.5.java用戶端代碼實作 (自定義分區)
//同步配置參數:
預設的序列化方式:位元組序列化。
設定分區算法:預設是對key進行hash分區算法,可以自定義分區算法。
确認機制 request.require.acks: 合理設定為1; 0: 絕不等确認 1: leader的一個副本收到這條消息,并發回确認 -1: leader的所有副本都收到這條消息,并發回确認
消息是以key-value的形式發送的,key必須要設定。
2.6.java用戶端參數調優
message.send.max.retries: 發送失敗重試次數;
retry.backoff.ms :未接到确認,認為發送失敗的時間;
producer.type: 同步發送或者異步發送;
batch.num.messages: 異步發送時,累計最大消息數;
queue.buffering.max.ms:異步發送時,累計最大時間;
本文版本主要是針對0.8.2,配套學習教程,浪尖已經分享到了知識星球。