天天看點

【Kafka】消息的同步發送和異步發送概述1. sync vs async2. 可靠性機制(ack屬性配置)3. 一般配置4. 同步異步和ack的聯系和差別參考

文章目錄

  • 概述
  • 1. sync vs async
    • 1.1 java代碼同步和異步
  • 2. 可靠性機制(ack屬性配置)
    • 2.1 oneway
  • 3. 一般配置
  • 4. 同步異步和ack的聯系和差別
  • 參考

概述

kafka有同步(sync)、異步(async)以及oneway這三種發送方式,某些概念上區分也可以分為同步和異步兩種,同步和異步的發送方式通過“producer.type”參數指定,而oneway由“request.require.acks”參數指定。

1. sync vs async

在官方文檔Producer Configs中有如下:

【Kafka】消息的同步發送和異步發送概述1. sync vs async2. 可靠性機制(ack屬性配置)3. 一般配置4. 同步異步和ack的聯系和差別參考

翻譯過來就是:

producer.type的預設值是sync,即同步的方式。這個參數指定了在背景線程中消息的發送方式是同步的還是異步的。如果設定成異步的模式,可以運作生産者以batch的形式push資料,這樣會極大的提高broker的性能,但是這樣會增加丢失資料的風險。

為什麼是背景線程進行發送? 其實client調用發送接口,所有的資料被臨時加入請求隊列InFlightRequest,背景線程是通過 讀取該隊列的資料,進行發送操作的。

對于異步模式,還有4個配套的參數,如下:

Property Default Description
queue.buffering.max.ms 5000 啟用異步模式時,producer緩存消息的時間。比如我們設定成1000時,它會緩存1s的資料再一次發送出去,這樣可以極大的增加broker吞吐量,但也會造成時效性的降低。
queue.buffering.max.messages 10000 啟用異步模式時,producer緩存隊列裡最大緩存的消息數量,如果超過這個值,producer就會阻塞或者丢掉消息。
queue.enqueue.timeout.ms -1 當達到上面參數時producer會阻塞等待的時間。如果設定為0,buffer隊列滿時producer不會阻塞,消息直接被丢掉;若設定為-1,producer會被阻塞,不會丢消息。
batch.num.messages 200 啟用異步模式時,一個batch緩存的消息數量。達到這個數值時,producer才會發送消息。(每次批量發送的數量)
以batch的方式推送資料可以極大的提高處理效率,kafka producer可以将消息在記憶體中累計到一定數量後作為一個batch發送請求。batch的數量大小可以通過producer的參數(batch.num.messages)控制。通過增加batch的大小,可以減少網絡請求和磁盤IO的次數,當然具體參數設定需要在效率和時效性方面做一個權衡。在比較新的版本中還有batch.size這個參數。

注:這裡的參數是指安裝包中的shell腳本指令,而java用戶端代碼,需要用相應的文法

總結:

  • 同步方式,一定是逐條發送的,第一條響應到達後,才會請求第二條
  • 異步方式,可以發送一條,也可以批量發送多條,特性是不需等第一次(注意這裡機關是次,因為單次可以是單條,也可以是批量資料)響應,就立即發送第二次

1.1 java代碼同步和異步

同步發送

如果需要使用同步發送,可以在每次發送之後使用get方法,因為producer.send方法傳回一個Future類型的結果,Future的get方法會一直阻塞直到該線程的任務得到傳回值,也就是broker傳回發送成功。

異步發送回調

可以從傳回的future對象中稍後擷取發送的結果,ProducerRecord、RecordMetadata包含了傳回的結果資訊

ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("testJson", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
    @Override
    public void onFailure(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onSuccess(SendResult<String, Message> result) {
        System.out.println(result.getProducerRecord());
        System.out.println(result.getRecordMetadata());
    }
});
           

2. 可靠性機制(ack屬性配置)

producers可以一步的并行向kafka發送消息,但是通常producer在發送完消息之後會得到一個響應,傳回的是offset值或者發送過程中遇到的錯誤。這其中有個非常重要的參數“request.required.acks",這個參數決定了producer要求leader partition收到确認的副本個數:

  • 如果acks設定為0,表示producer不會等待broker的相應,是以,producer無法知道消息是否發生成功,這樣有可能導緻資料丢失,但同時,acks值為0會得到最大的系統吞吐量。
  • 若acks設定為1,表示producer會在leader partition收到消息時得到broker的一個确認,這樣會有更好的可靠性,因為用戶端會等待知道broker确認收到消息。
  • 若設定為-1,producer會在所有備份的partition收到消息時得到broker的确認,這個設定可以得到最高的可靠性保證。

2.1 oneway

前面隻提到了sync和async,那麼oneway是什麼呢?

oneway是隻顧消息發出去而不管死活,消息可靠性最低,但是低延遲、高吞吐,這種對于某些完全對可靠性沒有要求的場景還是适用的,即

request.required.acks

設定為0。

oneway的效果也是異步的。是以,設定同步和異步非時候,要綜合考慮。

3. 一般配置

對于sync的發送方式:

producer.type=sync
request.required.acks=1
           

對于async的發送方式:

producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200

           

對于oneway的發送發送:

producer.type=async           '既然都已經設定ack=0忽略高可靠性了,也就沒必要設定為同步了'
request.required.acks=0
           

4. 同步異步和ack的聯系和差別

【Kafka】消息的同步發送和異步發送概述1. sync vs async2. 可靠性機制(ack屬性配置)3. 一般配置4. 同步異步和ack的聯系和差別參考

上圖分析:

  • 當使用者調用send時,就完成資料發送了(對于使用者來說),背景線程負責實際發送資料,是以,新版本下,我們說資料發送總是異步的。
  • send()方法每次隻能發送一條資料至InFlightRequest隊列
  • 使用者可以通過send().get() ,把使用者主線程改為同步方式

    是以,同步和異步概念 分為使用者線程和發送線程,使用者線程有同步和異步之分;發送線程隻有異步

    使用者線程選擇同步,效果是逐條發送,因為請求隊列InFlightRequest中永遠最多有一條資料。異步+設定 背景線程的異步發送參數:max.in.flight.requests.per.connection=1 & batch.size=1,效果也是逐條發送。

    max.in.flight.requests.per.connection控制隻能發送一次請求,發送次數有個視窗,控制該視窗的值,但是每次可發送一批資料;batch.size是控制一批資料的上限,當batch.size=1時,每次最多發送一條。組合在一起就是 隻能連續發送一次請求,每次最多發送一條。

同步和異步指client(producer)是否收到leader給的ack後才發下一條,acks = 0, -1和1是指leader節點和follower節點資料同步的方式,可靠性機制,是保證資料能成功備份到其他節點的機制,二者是獨立關系,是以可以是下面的組合

同步+ack任意值
           
異步+ack任意值
           

但是由于ack的選項有3個,會有最佳搭配的概念,例如:

producer.type=async           '既然都已經設定ack=0忽略高可用性了,也就沒必要設定為同步了'
request.required.acks=0
           

既然都已經設定ack=0忽略高可靠性了,ack=0犧牲可靠性換取速度,也就沒必要設定為同步了,設定為異步又可以提高資料

參考

Kafka之sync、async以及oneway

kafka 同步、異步發送java代碼同步和異步