天天看點

Kafka生産者原理剖析

作者:程式員洋仔

生存還是毀滅,這是一個問題。“ 是的對Kafka來說這個曾經受萬人追捧的分布式消息引擎,現在倒還真有點跌入神壇的趨勢。因為Pulsar(消息系統的新貴)仿佛正在全面替代Kafka。

Kafka真的不行了嗎?

答案個人覺得是否定的 固然Pulsar有着Kafka沒有的存儲和計算分離的設計,Pulsar在大資料大叢集的租戶管理上确實也要比Kafka更好。

但是Kafka2.8版本推出了社群呼籲已久的操作移除了Zookeeper,使用Kraft來進行代替,雖然隻是測試版本,但是官方實測的資料對比上:

支援的分區數由20萬個分區,變成了可以支援到200萬個分區左右,是之前的數十倍之多。

性能相同分區的情況下也是得到了數倍的提升

最重要的是Kafka現在僅僅是一個程序,而不再需要一個Zookeeper叢集了,更加輕量化。

現在看來對于性能Kafka還是有所期待的。

俗話說”萬變不離其宗“,Pulsar肯定也有很多好的優秀的設計值得我們學習。但是現在的技術更新換代真的是太快了,也許,你今天正在學習的一個技術,明天就湮滅在曆史的塵埃之中。我們要做的就是抓住事情的本質。弄明白它的原理。

無論是 Kafka、Pulsar、rabbitmq 它們不變的都是作為一個消息系統的構成 生産者、消費者、服務端。隻有弄明白其中的原理,才能在技術快速更新還貸的時代裡不被淘汰。

接下來詳細的剖析一下KafkaProducer 的原理。

1.Kafka如何發送消息

1.1Producer發送消息代碼示例

public class Producer extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;

    public void run() {
        int messageNo = 1;
        // todo: 一直會往kafka發送資料
        while (true) {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();    
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            ++messageNo;
        }
    }
}
           

上邊代碼所示為KafkaProducer發送消息的一個簡單的執行個體。主要有兩個步驟:

1.初始化KafkaProducer對象(代碼中省略了)

2.發送消息

但是Kafka發送一條消息的過程就這麼簡單呢,實則不然,一條消息要發送并存儲到Server端的路還很漫長。

1.2 Kafka 發送消息具體流程

如下圖所示為KafkaProducer發送消息的具體流程:

Kafka生産者原理剖析

總體來說分為四個步驟:

主線程

攔截器對消息做一些封裝

序列化消息以便進行網絡傳輸

消息分區 (預設輪詢的分區政策)

将消息 添加到 RecordAccumulator中。

Sender線程

更新中繼資料

從RecordAccumulator拉取消息

Ready

Drain

封裝ClientRequest

調用NetworkClient進行發送(使用的是NIO)

其實消息的發送的步驟 不止這些,比如中繼資料的更新、消息失敗的重試、響應資訊的各種處理方式等等 這裡就不再做詳細的叙述了,主需要了解消息發送的一個整體流程就可以了。

1.3Kafka為什麼選擇雙線程來進行消息發送?

優點:

用戶端使用者僅僅需要調用KafkaProducer 的send 方法,具體的消息發送、重試、與Server端的網絡連接配接等都交給Sender線程來進行處理。分工更明确,邏輯更清晰。

Sender來與Server端互動,主線程不比去做網絡連接配接處理請求等操作。

主線程僅僅将一條一條的消息放入消息累加器中,Sender線程根據觸發發送消息的條件将消息一批一批的發送,效率更高。

缺點

這個缺點其實不是雙線程發送的缺點,而是Kafka建立Sender線程的方式,Kafka建立Sender的方式是在調用KafkaProducer的構造方法的時候建立的,并且啟動了Sender線程。Kafka并發程式設計的作者曾經指出在對象的構造方法中建立并且啟動一個線程會造成this指針的逃逸。

afkaProducer(ProducerConfig config,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  Metadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors interceptors,
                  Time time) {
  	...
      this.sender = newSender(logContext, kafkaClient, this.metadata);
      String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
      this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
      this.ioThread.start();
    ...
}
           

2.Sender線程

Sender線程發送流程如下所示:

Kafka生産者原理剖析

1.擷取中繼資料的最新資訊

2.擷取RecordAccumulator有哪些消息準備好了

3.如果有topic的中繼資料不存在降該topic的更新中繼資料的标記設定為true意味着可以進行中繼資料更新了

4.檢查與要發送資料的主機網絡是否建立好,去掉那些不能發送資訊的節點

5.drain這個方法很重要一會會做詳細的分析。

6.放棄逾時的Batchs。

7.建立ProducerRequest

8.調用NetWorkClient的send方法,降請求添加到請求隊列中

9.觸發發送操作。

drain操作

将ProducerBatch與Broker節點做映射

核心邏輯是将RecordAccumulator記錄的Map<TopicPartition,Deque> 轉換Map<String,Deque> 類型。

在網絡層面更關心的是資料和對應節點的映射而不是TopicPartition的映射。而上層邏輯與之相反是以需要做這一次的轉換。

drain的操作其實和MapReduce和Spark的 shuffle有着異曲同工的作用,而且都是處于非常重要的位置。這樣看來大資料領域的好多理念都是相通的,最重要的就是去弄通它們的原理,就可以達到一知百解的效果。

public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
        if (nodes.isEmpty())
            return Collections.emptyMap();

        Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
        for (Node node : nodes) {
            List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
            batches.put(node.id(), ready);
        }
        return batches;
    }
           

drainIndex 防止饑餓提高系統的可用性

如果strat在每次發送消息的時候,都是從0開始周遊,就會出現每次隻發送相對Topic的前幾個分區的資料,後邊分區的資料一直得不到發送。利用drainIndex記錄了上次發送分區的位置,可以防止饑餓提高系統的可用性。

private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
        int size = 0;
        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
        List<ProducerBatch> ready = new ArrayList<>();
        /* to make starvation less likely this loop doesn't start at 0
        *  防止饑餓
        * */
        int start = drainIndex = drainIndex % parts.size();
        do {
            PartitionInfo part = parts.get(drainIndex);
            TopicPartition tp = new TopicPartition(part.topic(), part.partition());
            this.drainIndex = (this.drainIndex + 1) % parts.size();
          ...
            }
          ...  
      }
           

3.RecordAccumulator

如下圖所示,為RecordAccumulator,它會将Producer發送的消息按照TopicPartition進行分類。然後将消息存入BufferPoll中。每個TopicPartition的消息放入一個隊列中。TopicPartition 的唯一性由兩個字段确定 topicName和partition.

Kafka生産者原理剖析

BufferPoll

如上圖所示BufferPoll主要由兩部分構成:

  • free 緩存資料,有效的資料的頻繁的建立和銷毀。
  • nonPooledAvailableMemory 防止傳入的消息size太大。free 的batchSize不夠配置設定的情況。
/**
 * 緩存了指定大小的 byteBuffer 對象 batchSize 緩沖了大量的 ByteBuffer防止頻繁的建立和銷毀。每個批次的
 * 檔案中配置制定的
 */
private final Deque<ByteBuffer> free;

 /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */
    // 非緩沖池的可用記憶體大小,非緩沖池配置設定記憶體,其實就是調用ByteBuffer.allocate配置設定真實的JVM記憶體。
//但是這部分的資料是不走記憶體池的用完就銷毀,用了再重新申請
private long nonPooledAvailableMemory;

           

4.消息傳遞可靠性保障

4.1 可靠性保障種類

At most once:最多發送一次 消息可能會丢失,但是不會重複

At Least once:最少發送一次 消息可能會重複 不會丢失

Exactly once:恰好一次 每條消息隻被傳遞一次

日常的開發場景 At most once 很少用到,我們最需要的就是 Exactly once 恰好一次。

但是如果 某個topic的所有消息都是幂等的,存儲多條,重複消費也不會影響結果,那麼At least once 是一個好的選擇。因為所有的事物都是平衡的,在保證Exactly only的同時,一定會損失點其他的東西,就是性能。其實并不是說那種語義最好,脫離了場景一切都是白談,假如我對消息的丢失無所謂,你卻非要去保證消息的Exactly once 那不就是做了很多無用功還損失了性能,是以一切脫離了具體的場景去談問題,都是耍流氓。

Kafka預設選擇的是 At Least once的方式消息發送失敗會選擇重試,這樣就可能會造成消息重複。

如果關掉了重試的機制就是 At most once

4.2Kafka如何實作Exactly once?

分區次元

幂等性 Producer 是Kakfa 0.11版本引入的新功能,添加如下配置即可:

trueprops.put(ProducerfConfig.ENABLE_IDEMPOTENCE_CONFIG, true分區)

1

服務端會根據一個唯一辨別給我們做去重,但是僅僅是對單分區保證恰好一次的,不同的分區并不能保證恰好一次的語義,還是會有消息重複。而且 單個分區也是單次會話起作用的,假如生産者端重新開機了,不好意思,他就不能消息不會和上次會話的消息不重複了。

全局次元

事務型 Producer 能夠保證将消息原子性地寫入到多個分區中,不會有重複消息。

事務型 Producer即使在多次回話中 ,Kafka 依然保證它們發送消息的精确一次處理。

如何開啟事務:

#開啟事務
enable.idempotence = true。
#設定事務id 最好是和業務相關是一個有意義的id
transactional.id=MYTRAN
           

代碼也會做一些修改:

//開啟事務
producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
  					producer.send(record3);
  					//送出事務
            producer.commitTransaction();
} catch (KafkaException e) {
  					//復原
            producer.abortTransaction();
}
           

需要注意的是Producer開啟了事務後,Consumer對這些API也要有着相同的事務試圖:

read_uncommitted:預設值, Consumer 能夠讀取到 Kafka 寫入的任何消息,不論事務型 Producer 送出事務還是終止事務,其寫入的消息都可以讀取。

read_committed:表明 Consumer 隻會讀取事務型 Producer 成功送出事務寫入的消息。

這裡需要将Conumer的isolation.level參數設定為read_committed才可以。

4.3 自己如何實作Exactly once

禁止重試,消息隻發送到一個分區,當消息發送失敗後,具體的重試邏輯由生産者主線程做處理

生産者不做處理,并開啟重試機制,對每條消息建立一個唯一表示,具體的去重操作,由消費者來做。

KakfaProducer端還有很多優秀的設計,提供的API也比較豐富,比如分段鎖的使用,在多線程下使線程更少的去競争鎖的資源,ConcurrentMap的使用針對都多寫少的場景,網絡請求使用TCP方式,使用了NIO實作了自己的網絡架構等。都是值得我們去學習的地方。

原文連結:https://blog.csdn.net/weixin_39034379/article/details/118293244(自己的csdn賬号)