天天看點

Kafka 實戰指南——Kafka 生産者配置

文章目錄

    • 1. Key和Value
    • 2. 失敗重試
    • 3. 異步發送
    • 4. 線程安全
    • 5. Acks
    • 6. Batch
    • 7. 單個請求的最大值
    • 8. OOM
    • 9. 分區順序
    • 10. 順序保證
    • 11. Producer 幂等性
      • 11.1 Producer 幂等性設定
      • 11.2 幂等性原理
      • 11.3 原因分析
    • 12. Producer 開啟事務
    • 12.1 Producer 事務示例
      • 12.1.2 查找TransactionCoordinator事務實作原理
      • 12.1.3 擷取PID
      • 12.1.4 開啟事務
      • 12.1.5 Consume-Porcess-Produce Loop
      • 12.1.6 送出或者中斷事務

1. Key和Value

0.10.2.2版本的Kafka的消息字段隻有兩個:Key和Value。

  • Key:消息的辨別。
  • Value:消息内容。

為了便于追蹤,請為消息設定一個唯一的Key。您可以通過Key追蹤某消息,列印發送日志和消費日志,了解該消息的發送和消費情況。

2. 失敗重試

分布式環境下,由于網絡等原因偶爾發送失敗是常見的。導緻這種失敗的原因可能是消息已經發送成功,但是Ack失敗,也有可能是确實沒發送成功。

消息隊列Kafka版是VIP網絡架構,會主動斷開空閑連接配接(30秒沒活動),是以,不是一直活躍的用戶端會經常收到 “connection rest by peer” 錯誤,建議重試消息發送。

重試參數

您可以根據業務需求,設定以下重試參數:

  • retries

    ,重試次數,建議設定為

    3

    。失敗重試将會導緻資料亂序,如果要保證消息有序,設定

    max.in.flight.requests.per.connection =1

    ,這樣就能保證消息有序性,但是會影響生産者吞吐量。是以隻有在對消息的順序有嚴格要求的情況下才能這麼做。
  • retry.backoff.ms

    ,重試間隔,建議設定為

    1000

3. 異步發送

發送接口是異步的,如果您想得到發送的結果,可以調用使用回調函數:

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (recordMetadata == null) {
            e.printStackTrace();
        }else {
            long offset = recordMetadata.offset();
            System.out.println("sender success:"+offset);
        }
        
    }
});
           

4. 線程安全

Producer 是線程安全的,且可以往任何 Topic 發送消息。通常情況下,一個應用對應一個 Producer 就足夠了。

5. Acks

acks

配置表示 producer 發送消息到 broker 上以後的确認值。有三個可選項

  1. :表示 producer 不需要等待 broker 的消息确認。這個選項時延最小但同

    時風險最大(因為當 server 當機時,資料将會丢失)。

  2. 1

    :表示 producer 隻需要獲得 kafka 叢集中的 leader 節點确認即可,這個

    選擇時延較小同時確定了 leader 節點确認接收成功。

  3. all(-1)

    :需要 ISR 中所有的 Replica(副本)給予接收确認,速度最慢,安全性最高,

    但是由于 ISR 可能會縮小到僅包含一個 Replica,是以設定參數為 all 時并不能一定避免資料丢失。

一般建議選擇

acks=1

,重要的服務可以設定

acks=all

6. Batch

  1. batch.size

    生産者發送多個消息到 broker上的同一個分區時,為了減少網絡請求帶來的性能開銷,通過批量的方式來送出消息,可以通過這個參數來控制批量送出的位元組數大小,預設大小是16384byte,也就是16kb,意味着當一批消息大小達到指定的

    batch.size

    的時候會統一發送。消息超過 16kb,将會動态申請記憶體,發送消息後回收動态記憶體。
  2. linger.ms

    Producer 預設會把兩次發送時間間隔内收集到的所有 Requests 進行一次聚合然後再發送,以此提高吞吐量,而

    linger.ms

    就是為每次發送到 broker 的請求增加一些延遲,以此來聚合更多的消息請求。

batch.size

linger.ms

這兩個參數是 kafka 性能優化的關鍵參數,很多同學會發現

batch.size

linger.ms

這兩者的作用是一樣的,如果兩個都配置了,那麼怎麼工作的呢?實際上,當二者都配置的時候,隻要滿足其中一個要求,就會發送請求到 broker上。

batch.size

有助于提高吞吐,

linger.ms

有助于控制延遲。您可以根據具體業務需求進行調整。

7. 單個請求的最大值

max.request.size

:設定請求的資料的最大位元組數,為了防止發生較大的資料包影響到吞吐量,預設值為1MB。超過 1MB 将會報錯。

8. OOM

結合 Kafka 的 Batch 設計思路,Kafka 會緩存消息并打包發送,如果緩存太多,則有可能造成OOM(Out of Memory)。

  • buffer.memory

    : 所有緩存消息的總體大小超過這個數值後,就會觸發把消息發往伺服器。此時會忽略

    batch.size

    linger.ms

    的限制。
  • buffer.memory

    的預設數值是32 MB,對于單個 Producer 來說,可以保證足夠的性能。 需要注意的是,如果您在同一個JVM中啟動多個 Producer,那麼每個 Producer 都有可能占用 32 MB緩存空間,此時便有可能觸發 OOM。
  • 在生産時,一般沒有必要啟動多個 Producer;如果特殊情況需要,則需要考慮

    buffer.memory

    的大小,避免觸發 OOM。

9. 分區順序

單個分區(Partition)内,消息是按照發送順序儲存的,是基本有序的。

預設情況下,消息隊列 Kafka 為了提升可用性,并不保證單個分區内絕對有序,發生消息重試或者當機時,會産生消息亂序(某個分區挂掉後把消息 Failover 到其它分區)。

10. 順序保證

Kafka 可以保證同一個分區裡的消息是有序的。也就是說,如果生産者按照一定的順序發送消息,broker 就會按照這個順序把它們寫入分區,消費者也會按照同樣的順序讀取它們。

在某些情況下,順序是非常重要的。例如,往一個賬戶存入 100 元再取出來,這個與先取錢再存錢是截然不同的。不過,有些場景對順序不是很敏感。

如果把

retries

設為非零整數,同時把

max.in.flight.requests.per.connection

設為比 1 大的數,那麼,如果第一個批次消息寫入失敗,而第二個批次寫入成功,broker 會重試寫入第一個批次。如果此時第一個批次也寫入成功,那麼兩個批次的順序就反過來了。

一般來說,如果某些場景要求消息是有序的,那麼消息是否寫入成功也是很關鍵的,是以不建議把

retries

設為 0。可以把

max.in.flight.requests.per.connection

設為 1,這樣在生産者嘗試發送第一批消息時,就不會有其他的消息發送給 broker。不過這樣會嚴重影響生産者的吞吐量,是以隻有在對消息的順序有嚴格要求的情況下才能這麼做。

11. Producer 幂等性

11.1 Producer 幂等性設定

Producer 的幂等性指的是當發送同一條消息時,資料在 Server 端隻會被持久化一次,資料不丟不重,但是這裡的幂等性是有條件的:

  • 隻能保證 Producer 在單個會話内不丟不重,如果 Producer 出現意外挂掉再重新開機是無法保證的(幂等性情況下,是無法擷取之前的狀态資訊,是以是無法做到跨會話級别的不丢不重);
  • 幂等性不能跨多個 Topic-Partition,隻能保證單個 partition 内的幂等性,當涉及多個 Topic-Partition 時,這中間的狀态并沒有同步。

如果需要跨會話、跨多個 topic-partition 的情況,需要使用 Kafka 的事務性來實作。

使用方式:

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

當幂等性開啟的時候acks即為all。如果顯性的将acks設定為0,-1,那麼将會報錯

Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.

示例如下:

// 保證 producer 的幂等性使用 這2個參數,該幂等性隻能保證分區内幂等。producer重新開機失效。
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG,"all");
           

11.2 幂等性原理

幂等性是通過兩個關鍵資訊保證的,PID(Producer ID) 和 sequence numbers。

  • PID 用來辨別每個producer client
  • sequence numbers 用戶端發送的每條消息都會帶相應的 sequence number,Server 端就是根據這個值來判斷資料是否重複

producer初始化會由server端生成一個PID,然後發送每條資訊都包含該PID和sequence number,在server端,是按照partition同樣存放一個sequence numbers 資訊,通過判斷用戶端發送過來的sequence number與server端number+1內插補點來決定資料是否重複或者漏掉。

通常情況下為了保證資料順序性,我們可以通過

max.in.flight.requests.per.connection=1

來保證,這個也隻是針對單執行個體。在kafka2.0+版本上,隻要開啟幂等性,不用設定這個參數也能保證發送資料的順序性。

11.3 原因分析

為什麼要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于5

其實這裡,要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5 的主要原因是:Server 端的 ProducerStateManager 執行個體會緩存每個 PID 在每個 Topic-Partition 上發送的最近 5 個batch 資料(這個 5 是寫死的,至于為什麼是 5,可能跟經驗有關,當不設定幂等性時,當這個設定為 5 時,性能相對來說較高,社群是有一個相關測試文檔),如果超過 5,ProducerStateManager 就會将最舊的 batch 資料清除。

假設應用将 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 設定為 6,假設發送的請求順序是 1、2、3、4、5、6,這時候 server 端隻能緩存 2、3、4、5、6 請求對應的 batch 資料,這時候假設請求 1 發送失敗,需要重試,當重試的請求發送過來後,首先先檢查是否為重複的 batch,這時候檢查的結果是否,之後會開始 check 其 sequence number 值,這時候隻會傳回一個 OutOfOrderSequenceException 異常,client 在收到這個異常後,會再次進行重試,直到超過最大重試次數或者逾時,這樣不但會影響 Producer 性能,還可能給 Server 帶來壓力(相當于client 狂發錯誤請求)。

12. Producer 開啟事務

12.1 Producer 事務示例

package com.mock.data.stream.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * https://www.cnblogs.com/fnlingnzb-learner/p/13646390.html
 * producer 啟用事務
 */
public class ProducerSenderTransactional {


    public ProducerSenderTransactional() {
        producer = new KafkaProducer<String, String>(producerConfig());
        // 初始化事務
        producer.initTransactions();

    }

    public Map<String, Object> producerConfig(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop102:9094,hadoop103:9092");

        props.put(ProducerConfig.CLIENT_ID_CONFIG,"flink_id");
        props.put(ProducerConfig.RETRIES_CONFIG, "3");
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);// 1M
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// 16kb
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//預設為0
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432L);// 32M
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 保證 producer 的幂等性使用 這2個參數,該幂等性隻能保證分區内幂等。producer重新開機失效。
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.ACKS_CONFIG,"all");
        // 事務ID
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionalId_0");
        // 保證消息順序性
        props.put("max.in.flight.requests.per.connection",1);
        
        return props;
    }
    KafkaProducer<String, String> producer;

    public void send(String topic, String key, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key,value);
        producer.beginTransaction();
        boolean success = true;
        try {
            RecordMetadata recordMetadata = producer.send(record).get(3, TimeUnit.SECONDS);
            long offset = recordMetadata.offset();
            System.out.println("發送成功:"+offset);
            if (key.contains("1")) {
                throw new RuntimeException("發送失敗");
            }
        }catch (Exception e) {
            e.printStackTrace();
            success = false;
        } finally {
            if (success) {
                // 送出事務
                producer.commitTransaction();
            } else {
                // 中斷事務
                producer.abortTransaction();
            }
        }
    }


    public static void main(String[] args) throws Exception{
        ProducerSenderTransactional sender2 = new ProducerSenderTransactional();
        String topic1 = "topic1";
        for (int i = 0; i < 10; i++) {
            String key = "key_"+i;
            String value = "value"+i;

            sender2.send(topic1,key,value);
        }
        Thread.sleep(600000);
    }
}

           

12.1.2 查找TransactionCoordinator事務實作原理

通過transaction_id 找到TransactionCoordinator,具體算法是

Utils.abs(transaction_id.hashCode %transactionTopicPartitionCount )

,擷取到partition,再找到該partition的leader,即為TransactionCoordinator。

12.1.3 擷取PID

凡是開啟幂等性都是需要生成PID(Producer ID),隻不過未開啟事務的PID可以在任意broker生成,而開啟事務隻能在TransactionCoordinator節點生成。這裡隻講開啟事務的情況,Producer Client的

initTransactions()

方法會向TransactionCoordinator發起InitPidRequest ,這樣就能擷取PID。這裡面還有一些細節問題,這裡不探讨,例如transaction_id 之前的事務狀态什麼的。但需要說明的一點是這裡會将 transaction_id 與相應的 TransactionMetadata 持久化到事務日志(_transaction_state)中。

12.1.4 開啟事務

Producer調用

beginTransaction

開始一個事務狀态,這裡隻是在用戶端将本地事務狀态轉移成 IN_TRANSACTION,隻有在發送第一條資訊後,TransactionCoordinator才會認為該事務已經開啟。

12.1.5 Consume-Porcess-Produce Loop

這裡說的是一個典型的

consume-process-produce

場景:

while (true) {
    ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
    producer.beginTransaction();
    //start
    for (ConsumerRecord record : records){
        producer.send(producerRecord(“outputTopic1”, record));
        producer.send(producerRecord(“outputTopic2”, record));
    }
    producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
    //end
    producer.commitTransaction();
}
           

AddPartitionsToTxnRequest該階段主要經曆以下幾個步驟:

  1. ProduceRequest
  2. AddOffsetsToTxnRequest
  3. TxnOffsetsCommitRequest

關于這裡的詳細介紹可以檢視官網文檔!

12.1.6 送出或者中斷事務

Producer 調用

commitTransaction()

或者

abortTransaction()

方法來 commit 或者 abort 這個事務操作。

基本上經曆以下三個步驟,才真正結束事務。

  1. EndTxnRequest
  2. WriteTxnMarkerRquest
  3. Writing the Final Commit or Abort Message

其中EndTxnRequest是在Producer發起的請求,其他階段都是在TransactionCoordinator端發起完成的。WriteTxnMarkerRquest是發送請求到partition的leader上寫入事務結果資訊(ControlBatch),第三步主要是在

_transaction_state

中标記事務的結束。

繼續閱讀