天天看點

kafka——生産者原了解析

一、為什麼分區

kafka有主題(Topic)的概念,它是承載真實資料的邏輯容器,而在主題之下還分為若幹個分區,也就是說kafka的消息組織方式實際上是三級結構: 主題---分區---消息。主題下的每條消息隻會儲存在某一個分區中,而不會在多個分區中儲存多份。官網上的這張圖非常清晰地展示了kafka的三級結構,如下:

kafka——生産者原了解析

其實分區的作用就是提供負載均衡的能力,或者說對資料進行分區的主要原因,就是為了實作系統的高伸縮性。不同的分區能夠放置到不同節點機器上,而資料庫的讀寫操作也都是針對分區這個粒度而進行的,這樣每個節點的機器都能獨立的執行各自分區的讀寫請求處理。并且,我們還可以通過添加新的節點記起來增加整體系統的吞吐量。

二、分區政策

分區的原因

  • 1、友善在叢集中擴充,每個 Partition 可以通過調整以适應它所在的機器,而一個 topic

    又可以有多個 Partition 組成,是以整個叢集就可以适應任意大小的資料了。

  • 2、可以提高并發,因為可以以 Partition 為機關讀寫了。

分區的原則

我們需要将 producer 發送的資料封裝成一個 ProducerRecord 對象。

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
	this(topic, partition, timestamp, key, value, (Iterable)null);
}

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
	this(topic, partition, (Long)null, key, value, headers);
}

public ProducerRecord(String topic, Integer partition, K key, V value) {
	this(topic, partition, (Long)null, key, value, (Iterable)null);
}

public ProducerRecord(String topic, K key, V value) {
	this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}

public ProducerRecord(String topic, V value) {
	this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
}
           
  • 1、指明 partition 的情況下,直接将指明的值直接作為 partiton 值。
  • 2、沒有指明 partition 值但有 key 的情況下,将 key 的 hash 值與 topic 的 partition

    數進行取餘得到 partition 值。

  • 3、既沒有 partition 值又沒有 key 值的情況下,第一次調用時随機生成一個整數(後

    面每次調用在這個整數上自增),将這個值與 topic 可用的 partition 總數取餘得到 partition值,也就是常說的 round-robin 算法。

RoundRobinPartitioner(輪詢政策)源碼:

public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public RoundRobinPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    public void close() {
    }
}
           

三、資料可靠性保證

為保證 producer 發送的資料,能可靠的發送到指定的 topic,topic 的每個 partition 收到

producer 發送的資料後,都需要向 producer 發送 ack(acknowledgement 确認收到),如果producer 收到 ack,就會進行下一輪的發送,否則重新發送資料。

kafka——生産者原了解析

3.1、副本資料同步政策

方案 優點 缺點
半數以上完成同步,就發送 ack 延遲低 選舉新的 leader 時,容忍 n 台節點的故障,需要 2n+1 個副本
全部完成同步,才發送ack 選舉新的 leader 時,容忍 n 台節點的故障,需要 n+1 個副本 延遲高

Kafka 選擇了第二種方案,原因如下:

  • 1、同樣為了容忍 n 台節點的故障,第一種方案需要 2n+1 個副本,而第二種方案隻需要 n+1個副本,而 Kafka 的每個分區都有大量的資料,第一種方案會造成大量資料的備援。
  • 2、雖然第二種方案的網絡延遲會比較高,但網絡延遲對 Kafka 的影響較小。

3.2、ISR

采用第二種方案之後,設想以下情景:leader 收到資料,所有 follower 都開始同步資料,但有一個 follower,因為某種故障,遲遲不能與 leader 進行同步,那 leader 就要一直等下去,直到它完成同步,才能發送 ack。這個問題怎麼解決呢?

Leader 維護了一個動态的

in-sync replica set (ISR)

,意為和 leader 保持同步的 follower 集合。當 ISR 中的 follower 完成資料的同步之後,leader 就會給 producer 發送 ack。如果 follower長時間 未 向 leader 同 步 數 據 , 則 該 follower 将 被 踢 出 ISR , 該 時 間 阈 值 由

replica.lag.time.max.ms

參數設定。Leader 發生故障之後,就會從 ISR 中選舉新的 leader。

3.3、ack 應答機制

對于某些不太重要的資料,對資料的可靠性要求不是很高,能夠容忍資料的少量丢失,

是以沒必要等 ISR 中的 follower 全部接收成功。

是以 Kafka 為使用者提供了三種可靠性級别,使用者根據對可靠性和延遲的要求進行權衡,

選擇以下的配置。

  • acks = 0:生産者隻負責發消息,不管Leader 和Follower 是否完成落盤就會發送ack 。這樣能夠最大降低延遲,但當Leader還未落盤時發生故障就會造成資料丢失。
  • acks = 1:Leader将資料落盤後,不管Follower 是否落盤就會發送ack 。這樣可以保證Leader節點内有一份資料,但當Follower還未同步時Leader發生故障就會造成資料丢失。
  • acks = -1(all):生産者等待Leader 和ISR 集合内的所有Follower 都完成同步才會發送ack 。但當Follower 同步完之後,broker發送ack之前,Leader發生故障時,此時會重新從ISR内選舉一個新的Leader,此時由于生産者沒收到ack,于是生産者會重新發消息給新的Leader,此時就會造成資料重複。
kafka——生産者原了解析

3.4、故障處理細節

kafka——生産者原了解析
kafka——生産者原了解析

LEO——Log End Offset:Producer 寫入到 Kafka 中的最新一條資料的 offset。

HW——High Watermark::指的是消費者能見到的最大的 offset,ISR 隊列中最小的 LEO。

  • 1、follower 故障

    follower 發生故障後會被臨時踢出 ISR,待該 follower 恢複後,follower 會讀取本地磁盤

    記錄的上次的 HW,并将 log 檔案高于 HW 的部分截取掉,從 HW 開始向 leader 進行同步。等該 follower 的 LEO 大于等于該 Partition 的 HW,即 follower 追上 leader 之後,就可以重新加入 ISR 了。

  • 2、leader 故障

    leader 發生故障之後,會從 ISR 中選出一個新的 leader,之後,為保證多個副本之間的資料一緻性,其餘的 follower 會先将各自的 log 檔案高于 HW 的部分截掉,然後從新的 leader同步資料。

注意:這隻能保證副本之間的資料一緻性,并不能保證資料不丢失或者不重複。

四、Exactly Once 語義

在分布式消息傳遞一緻性語義上有下面三種:

  • At Least Once:消息不會丢,但可能會重複。
  • At Most Once:消息會丢,但不會重複。
  • Exactly Once:消息不會丢,也不會重複。

在kafka0.11版本之前是無法在kafka内實作exactly once 精确一次性保證的語義的,在0.11之後的版本,我們可以結合新特性 幂等性以及acks=-1 來實作kafka生産者的exactly once。

  • acks=-1:在上面ack機制裡已經介紹過,他能實作at least once語義,保證資料不會丢,但可能會有重複資料。
  • 幂等性:0.11版本之後新增的特性,針對生産者,指生産者無論向broker 發送多少次重複資料,broker 隻會持久化一條;
在0.11版本之前要實作exactly once語義隻能通過外部系統如hbase的rowkey實作基于主鍵的去重。

五、幂等性解讀:

在生産者配置檔案

producer.properties

設定參數

enable.idompotence = true

即可啟用幂等性。

Kafka的幂等性其實就是将原來需要在下遊進行的去重操作放在了資料上遊。開啟幂等性的生産者在初始化時會被配置設定一個PID(producer ID),該生産者發往同一個分區(Partition)的消息會附帶一個序列号(Sequence Number),Broker 端會對<PID, Partition, SeqNumber>作為該消息的主鍵進行緩存,當有相同主鍵的消息送出時,Broker 隻會持久化一條。但是生産者重新開機時PID 就會發生變化,同時不同的 分區(Partition)也具有不同的編号,是以生産者幂等性無法保證跨分區和跨會話的 Exactly Once。

事務:kafka在0.11 版本引入了事務支援。事務可以保證 Kafka 在 Exactly Once 語義的基礎上,生産和消費可以跨分區和會話,要麼全部成功,要麼全部失敗。

六、生産者事務:

Kafka引入了一個新的元件Transaction Coordinator,它管理了一個全局唯一的事務ID(Transaction ID),并将生産者的PID和事務ID進行綁定,當生産者重新開機時雖然PID會變,但仍然可以和Transaction Coordinator互動,通過事務ID可以找回原來的PID,這樣就保證了重新開機後的生産者也能保證Exactly Once 了。

同時,Transaction Coordinator 将事務資訊寫入 Kafka 的一個内部 Topic,即使整個kafka服務重新開機,由于事務狀态已持久化到topic,進行中的事務狀态也可以得到恢複,然後繼續進行。

七、生産者用戶端的基本架構圖

kafka——生産者原了解析

由上圖可以看出:KafkaProducer有兩個基本線程:

  • 主線程:負責消息建立,攔截器,序列化器,分區器等操作,并将消息追加到消息收集器RecoderAccumulator中(這裡可以看出攔截器确實在序列化和分區之前執行)。
    • 消息收集器RecoderAccumulator為每個分區都維護了一個 Deque<ProducerBatch> 類型的雙端隊列。
    • ProducerBatch 可以暫時了解為是 ProducerRecord 的集合,批量發送有利于提升吞吐量,降低網絡影響。
    • 由于生産者用戶端使用 java.io.ByteBuffer 在發送消息之前進行消息儲存,并維護了一個 BufferPool 實作 ByteBuffer 的複用;該緩存池隻針對特定大小( batch.size 指定)的 ByteBuffer進行管理,對于消息過大的緩存,不能做到重複利用。
    • 每次追加一條ProducerRecord消息,會尋找/建立對應的雙端隊列,從其尾部擷取一個ProducerBatch,判斷目前消息的大小是否可以寫入該批次中。若可以寫入則寫入;若不可以寫入,則建立一個ProducerBatch,判斷該消息大小是否超過用戶端參數配置 batch.size 的值,不超過,則以 batch.size建立新的ProducerBatch,這樣友善進行緩存重複利用;若超過,則以計算的消息大小建立對應的 ProducerBatch ,缺點就是該記憶體不能被複用了。
  • Sender線程:
    • 該線程從消息收集器擷取緩存的消息,将其處理為 <Node, List<ProducerBatch> 的形式, Node 表示叢集的broker節點。
    • 進一步将<Node, List<ProducerBatch>轉化為<Node, Request>形式,此時才可以向服務端發送資料。
    • 在發送之前,Sender線程将消息以 Map<NodeId, Deque<Request>> 的形式儲存到 InFlightRequests 中進行緩存,可以通過其擷取 leastLoadedNode ,即目前Node中負載壓力最小的一個,以實作消息的盡快發出。

寫入流程

producer 寫入消息序列圖如下所示:

kafka——生産者原了解析

流程說明:

  • 1、producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader。
  • 2、producer 将消息發送給該 leader。
  • 3、leader 将消息寫入本地 log。
  • 4、followers 從 leader pull 消息,寫入本地 log 後 leader 發送 ACK。
  • 5、leader 收到所有 ISR 中的 replica 的 ACK 後,增加 HW(high watermark,最後 commit 的 offset) 并向 producer 發送 ACK。

參考:

http://kafka.apache.org/0110/documentation.html

https://www.cnblogs.com/tugeboke/p/11760387.html

https://blog.csdn.net/wsdc0521/article/details/108604420