天天看點

Kafka 實戰指南—— Kafka 工作原理分析

文章目錄

    • 1. Kafka 生産過程分析
      • 1.1 Kafka 的消息寫入方式(順序寫磁盤)
      • 1.2 分區(Partition)
        • 1.2.1 為什麼要分區
        • 1.2.2 分區的原則
      • 1.3 副本(Replication)
      • 1.4 寫入流程
    • 2. Broker 儲存消息
      • 2.1 存儲方式
      • 2.2 存儲政策
      • 2.3 Zookeeper 存儲結構
    • 3. Kafka 消費過程分析
      • 3.3 消費者組
      • 3.4 消費方式

Kafka 核心組成:

Kafka 實戰指南—— Kafka 工作原理分析

圖 Kafka 核心組成

1. Kafka 生産過程分析

1.1 Kafka 的消息寫入方式(順序寫磁盤)

producer 采用推(push)模式将消息發送到 broker,每條消息都被追加(append)到分區(patition)中,屬于順序寫磁盤(順序寫磁盤效率比随機寫記憶體要高,保障 kafka 吞吐率)。

1.2 分區(Partition)

消息發送時都被發送到一個 topic,其本質就是一個目錄,而 topic 是由一些

Partition Logs(分區日志)

組成,其組織結構如下圖所示:

Kafka 實戰指南—— Kafka 工作原理分析

圖 生産者寫入資料

Kafka 實戰指南—— Kafka 工作原理分析

圖 消費者消費資料

我們可以看到,每個 Partition 中的消息都是

有序

的,生産的消息被不斷追加到

Partition log

上,其中的每一個消息都被賦予了一個唯一的

offset值

1.2.1 為什麼要分區

  1. 友善在叢集中擴充,每個 Partition 可以通過調整以适應它所在的機器,而一個 topic 又可以有多個 Partition 組成,是以整個叢集就可以适應任意大小的資料了;
  2. 可以提高并發,因為可以以 Partition 為機關讀寫了。

1.2.2 分區的原則

  1. 指定了 patition,則直接使用;
  2. 未指定 patition 但指定 key,通過對 key 的值進行 hash 取模得出一個 patition;
  3. patition 和 key 都未指定,使用輪詢選出一個 patition。

DefaultPartitioner

分區源碼:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // 有 key,通過key hash 後取模
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

// 輪詢
private int nextValue(String topic) {
    AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    
    return counter.getAndIncrement();
}
           

1.3 副本(Replication)

同一個 partition 可能會有多個 replication(對應

server.properties

配置中的

default.replication.factor=N

)。沒有 replication 的情況下,一旦 broker 當機,其上所有 patition 的資料都不可被消費,同時 producer 也不能再将資料存于其上的 patition。引入 replication 之後,同一個 partition 可能會有多個 replication,而這時需要在這些 replication 之間選出一個 leader,producer 和 consumer 隻與這個leader互動,其它 replication 作為 follower 從 leader 中複制資料。

1.4 寫入流程

producer 寫入消息流程如下:

Kafka 實戰指南—— Kafka 工作原理分析
  1. producer 先從 zookeeper 的

    /brokers/.../state

    節點找到該 partition 的 leader。(注意:這裡不是直接連接配接Zookeeper,而是通過 Kafka 叢集的到 broker state)
  2. producer将消息發送給該 leader。
  3. leader 将消息寫入本地 log。
  4. followers 從 leader pull 消息,寫入本地 log 後向 leader 發送 ACK。
  5. leader 收到所有 ISR 中的 replication 的 ACK 後,增加 HW(high watermark,最後 commit 的offset,消費者最大能消費的 offset)并向producer發送 ACK。

2. Broker 儲存消息

2.1 存儲方式

實體上把 topic 分成一個或多個 patition(對應 server.properties 中的

num.partitions=3

配置),每個patition 實體上對應一個檔案夾(該檔案夾存儲 patition 的所有消息和索引檔案),如下:

[[email protected] logs]$ ll
drwxrwxr-x. 2 dwjf321 dwjf321  4096 8月   6 14:37 first-0
drwxrwxr-x. 2 dwjf321 dwjf321  4096 8月   6 14:35 first-1
drwxrwxr-x. 2 dwjf321 dwjf321  4096 8月   6 14:37 first-2
[[email protected] logs]$ cd first-0
[[email protected] first-0]$ ll
總用量 356
-rw-rw-r--. 1 dwjf321 dwjf321 10485760 12月 25 22:32 00000000000000018519.index
-rw-rw-r--. 1 dwjf321 dwjf321   348075 12月 24 23:41 00000000000000018519.log
-rw-rw-r--. 1 dwjf321 dwjf321 10485756 12月 25 22:32 00000000000000018519.timeindex
-rw-rw-r--. 1 dwjf321 dwjf321       10 12月 24 23:58 00000000000000019534.snapshot
-rw-rw-r--. 1 dwjf321 dwjf321       22 12月 25 22:32 leader-epoch-checkpoint
           

2.2 存儲政策

無論消息是否被消費,kafka 都會保留所有消息。有兩種政策可以删除舊資料:

  1. 基于時間:

    log.retention.hours=168

    ,預設 7 天
  2. 基于大小:

    log.retention.bytes=1073741824

    ,預設 1 G。

需要注意的是,因為 Kafka 讀取特定消息的時間複雜度為 O(1),即與檔案大小無關,是以這裡删除過期檔案與提高 Kafka 性能無關。

2.3 Zookeeper 存儲結構

Kafka 實戰指南—— Kafka 工作原理分析

注意:producer不在zk中注冊,消費者在zk中注冊。

3. Kafka 消費過程分析

3.3 消費者組

Kafka 實戰指南—— Kafka 工作原理分析

圖 消費組

消費者是以 consumer group 消費組 的方式工作,由一個或者多個消費者組成一個組,共同消費一個 topic。每個分區在同一時間隻能由 group 中的一個消費者讀取,但是多個 group 可以同時消費這個 partition。在圖中,有一個由三個消費者組成的 group,有一個消費者讀取主題中的兩個分區,另外兩個分别讀取一個分區。某個消費者讀取某個分區,也可以叫做某個消費者是某個分區的擁有者。

在這種情況下,消費者可以通過水準擴充的方式同時讀取大量的消息。另外,如果一個消費者當機了,那麼其他的group 成員會自動負載均衡讀取之前失敗的消費者讀取的分區。

3.4 消費方式

consumer 采用 pull(拉)模式從 broker 中讀取資料。

因為 push(推)模式很難适應消費速率不同的消費者,因為消息發送速率是由broker決定的。它的目标是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以适當的速率消費消息。

對于Kafka而言,pull模式更合适,它可簡化broker的設計,consumer可自主要制消費消息的速率,同時consumer 可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇 offset 的不同的送出方式進而控制消息不丢失。

pull 模式不足之處是,如果 kafka 沒有資料,消費者可能會陷入循環中,一直等待資料到達。為了避免這種情況,我們在我們的拉請求中有參數,允許消費者請求在等待資料到達的“長輪詢”中進行阻塞(并且可選地等待到給定的位元組數,以確定大的傳輸大小)。

繼續閱讀