文章目錄
-
- 1. Kafka 生産過程分析
-
- 1.1 Kafka 的消息寫入方式(順序寫磁盤)
- 1.2 分區(Partition)
-
- 1.2.1 為什麼要分區
- 1.2.2 分區的原則
- 1.3 副本(Replication)
- 1.4 寫入流程
- 2. Broker 儲存消息
-
- 2.1 存儲方式
- 2.2 存儲政策
- 2.3 Zookeeper 存儲結構
- 3. Kafka 消費過程分析
-
- 3.3 消費者組
- 3.4 消費方式
Kafka 核心組成:
圖 Kafka 核心組成
1. Kafka 生産過程分析
1.1 Kafka 的消息寫入方式(順序寫磁盤)
producer 采用推(push)模式将消息發送到 broker,每條消息都被追加(append)到分區(patition)中,屬于順序寫磁盤(順序寫磁盤效率比随機寫記憶體要高,保障 kafka 吞吐率)。
1.2 分區(Partition)
消息發送時都被發送到一個 topic,其本質就是一個目錄,而 topic 是由一些
Partition Logs(分區日志)
組成,其組織結構如下圖所示:
圖 生産者寫入資料
圖 消費者消費資料
我們可以看到,每個 Partition 中的消息都是
有序的,生産的消息被不斷追加到
Partition log
上,其中的每一個消息都被賦予了一個唯一的
offset值。
1.2.1 為什麼要分區
- 友善在叢集中擴充,每個 Partition 可以通過調整以适應它所在的機器,而一個 topic 又可以有多個 Partition 組成,是以整個叢集就可以适應任意大小的資料了;
- 可以提高并發,因為可以以 Partition 為機關讀寫了。
1.2.2 分區的原則
- 指定了 patition,則直接使用;
- 未指定 patition 但指定 key,通過對 key 的值進行 hash 取模得出一個 patition;
- patition 和 key 都未指定,使用輪詢選出一個 patition。
DefaultPartitioner
分區源碼:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// 有 key,通過key hash 後取模
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
// 輪詢
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
1.3 副本(Replication)
同一個 partition 可能會有多個 replication(對應
server.properties
配置中的
default.replication.factor=N
)。沒有 replication 的情況下,一旦 broker 當機,其上所有 patition 的資料都不可被消費,同時 producer 也不能再将資料存于其上的 patition。引入 replication 之後,同一個 partition 可能會有多個 replication,而這時需要在這些 replication 之間選出一個 leader,producer 和 consumer 隻與這個leader互動,其它 replication 作為 follower 從 leader 中複制資料。
1.4 寫入流程
producer 寫入消息流程如下:
- producer 先從 zookeeper 的
節點找到該 partition 的 leader。(注意:這裡不是直接連接配接Zookeeper,而是通過 Kafka 叢集的到 broker state)/brokers/.../state
- producer将消息發送給該 leader。
- leader 将消息寫入本地 log。
- followers 從 leader pull 消息,寫入本地 log 後向 leader 發送 ACK。
- leader 收到所有 ISR 中的 replication 的 ACK 後,增加 HW(high watermark,最後 commit 的offset,消費者最大能消費的 offset)并向producer發送 ACK。
2. Broker 儲存消息
2.1 存儲方式
實體上把 topic 分成一個或多個 patition(對應 server.properties 中的
num.partitions=3
配置),每個patition 實體上對應一個檔案夾(該檔案夾存儲 patition 的所有消息和索引檔案),如下:
[[email protected] logs]$ ll
drwxrwxr-x. 2 dwjf321 dwjf321 4096 8月 6 14:37 first-0
drwxrwxr-x. 2 dwjf321 dwjf321 4096 8月 6 14:35 first-1
drwxrwxr-x. 2 dwjf321 dwjf321 4096 8月 6 14:37 first-2
[[email protected] logs]$ cd first-0
[[email protected] first-0]$ ll
總用量 356
-rw-rw-r--. 1 dwjf321 dwjf321 10485760 12月 25 22:32 00000000000000018519.index
-rw-rw-r--. 1 dwjf321 dwjf321 348075 12月 24 23:41 00000000000000018519.log
-rw-rw-r--. 1 dwjf321 dwjf321 10485756 12月 25 22:32 00000000000000018519.timeindex
-rw-rw-r--. 1 dwjf321 dwjf321 10 12月 24 23:58 00000000000000019534.snapshot
-rw-rw-r--. 1 dwjf321 dwjf321 22 12月 25 22:32 leader-epoch-checkpoint
2.2 存儲政策
無論消息是否被消費,kafka 都會保留所有消息。有兩種政策可以删除舊資料:
- 基于時間:
,預設 7 天log.retention.hours=168
- 基于大小:
,預設 1 G。log.retention.bytes=1073741824
需要注意的是,因為 Kafka 讀取特定消息的時間複雜度為 O(1),即與檔案大小無關,是以這裡删除過期檔案與提高 Kafka 性能無關。
2.3 Zookeeper 存儲結構
注意:producer不在zk中注冊,消費者在zk中注冊。
3. Kafka 消費過程分析
3.3 消費者組
圖 消費組
消費者是以 consumer group 消費組 的方式工作,由一個或者多個消費者組成一個組,共同消費一個 topic。每個分區在同一時間隻能由 group 中的一個消費者讀取,但是多個 group 可以同時消費這個 partition。在圖中,有一個由三個消費者組成的 group,有一個消費者讀取主題中的兩個分區,另外兩個分别讀取一個分區。某個消費者讀取某個分區,也可以叫做某個消費者是某個分區的擁有者。
在這種情況下,消費者可以通過水準擴充的方式同時讀取大量的消息。另外,如果一個消費者當機了,那麼其他的group 成員會自動負載均衡讀取之前失敗的消費者讀取的分區。
3.4 消費方式
consumer 采用 pull(拉)模式從 broker 中讀取資料。
因為 push(推)模式很難适應消費速率不同的消費者,因為消息發送速率是由broker決定的。它的目标是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以适當的速率消費消息。
對于Kafka而言,pull模式更合适,它可簡化broker的設計,consumer可自主要制消費消息的速率,同時consumer 可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇 offset 的不同的送出方式進而控制消息不丢失。
pull 模式不足之處是,如果 kafka 沒有資料,消費者可能會陷入循環中,一直等待資料到達。為了避免這種情況,我們在我們的拉請求中有參數,允許消費者請求在等待資料到達的“長輪詢”中進行阻塞(并且可選地等待到給定的位元組數,以確定大的傳輸大小)。