文章目錄
- 概述
- 1. sync vs async
-
- 1.1 java代碼同步和異步
- 2. 可靠性機制(ack屬性配置)
-
- 2.1 oneway
- 3. 一般配置
- 4. 同步異步和ack的聯系和差別
- 參考
概述
kafka有同步(sync)、異步(async)以及oneway這三種發送方式,某些概念上區分也可以分為同步和異步兩種,同步和異步的發送方式通過“producer.type”參數指定,而oneway由“request.require.acks”參數指定。
1. sync vs async
在官方文檔Producer Configs中有如下:

翻譯過來就是:
producer.type的預設值是sync,即同步的方式。這個參數指定了在背景線程中消息的發送方式是同步的還是異步的。如果設定成異步的模式,可以運作生産者以batch的形式push資料,這樣會極大的提高broker的性能,但是這樣會增加丢失資料的風險。
為什麼是背景線程進行發送? 其實client調用發送接口,所有的資料被臨時加入請求隊列InFlightRequest,背景線程是通過 讀取該隊列的資料,進行發送操作的。
對于異步模式,還有4個配套的參數,如下:
Property | Default | Description |
---|---|---|
queue.buffering.max.ms | 5000 | 啟用異步模式時,producer緩存消息的時間。比如我們設定成1000時,它會緩存1s的資料再一次發送出去,這樣可以極大的增加broker吞吐量,但也會造成時效性的降低。 |
queue.buffering.max.messages | 10000 | 啟用異步模式時,producer緩存隊列裡最大緩存的消息數量,如果超過這個值,producer就會阻塞或者丢掉消息。 |
queue.enqueue.timeout.ms | -1 | 當達到上面參數時producer會阻塞等待的時間。如果設定為0,buffer隊列滿時producer不會阻塞,消息直接被丢掉;若設定為-1,producer會被阻塞,不會丢消息。 |
batch.num.messages | 200 | 啟用異步模式時,一個batch緩存的消息數量。達到這個數值時,producer才會發送消息。(每次批量發送的數量) |
以batch的方式推送資料可以極大的提高處理效率,kafka producer可以将消息在記憶體中累計到一定數量後作為一個batch發送請求。batch的數量大小可以通過producer的參數(batch.num.messages)控制。通過增加batch的大小,可以減少網絡請求和磁盤IO的次數,當然具體參數設定需要在效率和時效性方面做一個權衡。在比較新的版本中還有batch.size這個參數。
注:這裡的參數是指安裝包中的shell腳本指令,而java用戶端代碼,需要用相應的文法
總結:
- 同步方式,一定是逐條發送的,第一條響應到達後,才會請求第二條
- 異步方式,可以發送一條,也可以批量發送多條,特性是不需等第一次(注意這裡機關是次,因為單次可以是單條,也可以是批量資料)響應,就立即發送第二次
1.1 java代碼同步和異步
同步發送
如果需要使用同步發送,可以在每次發送之後使用get方法,因為producer.send方法傳回一個Future類型的結果,Future的get方法會一直阻塞直到該線程的任務得到傳回值,也就是broker傳回發送成功。
異步發送回調
可以從傳回的future對象中稍後擷取發送的結果,ProducerRecord、RecordMetadata包含了傳回的結果資訊
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("testJson", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onSuccess(SendResult<String, Message> result) {
System.out.println(result.getProducerRecord());
System.out.println(result.getRecordMetadata());
}
});
2. 可靠性機制(ack屬性配置)
producers可以一步的并行向kafka發送消息,但是通常producer在發送完消息之後會得到一個響應,傳回的是offset值或者發送過程中遇到的錯誤。這其中有個非常重要的參數“request.required.acks",這個參數決定了producer要求leader partition收到确認的副本個數:
- 如果acks設定為0,表示producer不會等待broker的相應,是以,producer無法知道消息是否發生成功,這樣有可能導緻資料丢失,但同時,acks值為0會得到最大的系統吞吐量。
- 若acks設定為1,表示producer會在leader partition收到消息時得到broker的一個确認,這樣會有更好的可靠性,因為用戶端會等待知道broker确認收到消息。
- 若設定為-1,producer會在所有備份的partition收到消息時得到broker的确認,這個設定可以得到最高的可靠性保證。
2.1 oneway
前面隻提到了sync和async,那麼oneway是什麼呢?
oneway是隻顧消息發出去而不管死活,消息可靠性最低,但是低延遲、高吞吐,這種對于某些完全對可靠性沒有要求的場景還是适用的,即
request.required.acks
設定為0。
oneway的效果也是異步的。是以,設定同步和異步非時候,要綜合考慮。
3. 一般配置
對于sync的發送方式:
producer.type=sync
request.required.acks=1
對于async的發送方式:
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
對于oneway的發送發送:
producer.type=async '既然都已經設定ack=0忽略高可靠性了,也就沒必要設定為同步了'
request.required.acks=0
4. 同步異步和ack的聯系和差別
上圖分析:
- 當使用者調用send時,就完成資料發送了(對于使用者來說),背景線程負責實際發送資料,是以,新版本下,我們說資料發送總是異步的。
- send()方法每次隻能發送一條資料至InFlightRequest隊列
-
使用者可以通過send().get() ,把使用者主線程改為同步方式
是以,同步和異步概念 分為使用者線程和發送線程,使用者線程有同步和異步之分;發送線程隻有異步
使用者線程選擇同步,效果是逐條發送,因為請求隊列InFlightRequest中永遠最多有一條資料。異步+設定 背景線程的異步發送參數:max.in.flight.requests.per.connection=1 & batch.size=1,效果也是逐條發送。
max.in.flight.requests.per.connection控制隻能發送一次請求,發送次數有個視窗,控制該視窗的值,但是每次可發送一批資料;batch.size是控制一批資料的上限,當batch.size=1時,每次最多發送一條。組合在一起就是 隻能連續發送一次請求,每次最多發送一條。
同步和異步指client(producer)是否收到leader給的ack後才發下一條,acks = 0, -1和1是指leader節點和follower節點資料同步的方式,可靠性機制,是保證資料能成功備份到其他節點的機制,二者是獨立關系,是以可以是下面的組合
同步+ack任意值
異步+ack任意值
但是由于ack的選項有3個,會有最佳搭配的概念,例如:
producer.type=async '既然都已經設定ack=0忽略高可用性了,也就沒必要設定為同步了'
request.required.acks=0
既然都已經設定ack=0忽略高可靠性了,ack=0犧牲可靠性換取速度,也就沒必要設定為同步了,設定為異步又可以提高資料
參考
Kafka之sync、async以及oneway
kafka 同步、異步發送java代碼同步和異步