Kafka的消息模型為釋出訂閱模型,消息生産者将消息釋出到主題(topic)中,一個或多個消費者訂閱(消費)該主題消息并消費,此模型中釋出到topic中的消息會被所有消費者所訂閱到,先介紹Kafka消費模型,然後再通過KafkaConsumer原來了解它的業務流程,源碼基于kafka 2.4;
Kafka消費模型關鍵點:
1、Kafka一個消費組(ConsumerGroup)中存在一個或多個消費者(Consumer),每個消費者也必須屬于一個消費者組;
2、消費者組(ConsumerGroup)中的消費者(Consumer)獨占一個或多個分區(Partition);
3、消費時每個分區(Partition)最多隻有一個Consumer再消費;
4、消費者組(ConsumerGroup)在Broker存在一個協調者(Coordinator)配置設定管理Consumer與Partition之間的對應關系。當兩種中的Consumer或Partition發生變更時将會觸發reblance(重新平衡),重新配置設定Consumer與Partition的對應關系;
下面是Kafka消費者程式的示例:
//配置Consumer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//建立Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱主題
consumer.subscribe(Arrays.asList("foo", "bar"));
//消費消息
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
在上面我們可以看到Kafka消費消息的整個流程:配置Consumer屬性、訂閱主題、拉取消費消息,基本流程知道了也就是這幾個點,配置ConsumerId、自動送出offset、序列化、Kafka服務端位址,這就是Kafka最最最基礎的配置,當然還有很多配置項可以到官網檢視;
消費者關鍵點
Consumer程式主要分為三個部分:配置、訂閱主題、拉取消息;從中也可以看到在消費前需要訂閱某個主題、在前面我們提到Consumer執行個體需要與某個Partition綁定關聯然後才能進行消費資料,下面我們透過官方提供的Consumer程式簡單看看如何訂閱主題、如何關聯Consumer與Partition、如何拉取消息消費;
訂閱主題
訂閱主題可以說是Kafka消費的基礎,下面先看看簡化後的訂閱方法:
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquireAndEnsureOpen();
try {
//忽略部分代碼
if (topics.isEmpty()) {
this.unsubscribe();
} else {
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
metadata.requestUpdateForNewTopics();
}
} finally {
release();
}
}
安全檢查: Consumer注釋中也說了KafkaConsumer為非線程安全的,從上也可看到acquireAndEnsureOpen的作用就是檢查目前是否為多線程運作,確定Consumer隻在一個線程中執行;
設定訂閱狀态: SubscriptionState 對象的subscribe方法主要是設定ConsumerRelance監聽器、設定所監聽的主題;
更新中繼資料: metadata對象維護了Kafka叢集中繼資料子集,存儲了Broker節點、Topic、Partition節點資訊等;
跟進metadata.requestUpdateForNewTopics方法發現最終調用了metadata對象的requestUpdate方法;
public synchronized int requestUpdate() {
this.needUpdate = true;
return this.updateVersion;
}
此方法并沒有什麼實質性的動作,隻是更新needUpdate屬性為true;由于Kafka拉取資料時必須得到中繼資料資訊否則無法知道broker、topic、Partition資訊也就無法知道去哪個節點拉取資料;但此處并沒有實質性的更新中繼資料請求,接下來我們看看拉取方法。
拉取資料
上一步訂閱了主題,這時我們就可以從中拉取資料,跟蹤代碼最終進入了KafkaConsumer的poll方法;
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
//多線程檢查
acquireAndEnsureOpen();
try {//省略代碼
//逾時檢查
if (includeMetadataInTimeout) {
//請求更新中繼資料
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {//省略代碼
}
//拉取資料
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.pollNoWakeup();
}
//調用消費者攔截器後傳回
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
return ConsumerRecords.empty();
} finally {
release();
this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
}
此方法幾個流程
1、 多線程檢查
2、 逾時檢查
3、 請求更新中繼資料
4、 拉取資料
此處我們比較關心的還是更新中繼資料與拉取資料,這裡我們主要看看這兩個流程的執行;
請求更新中繼資料
在updateAssignmentMetadataIfNeeded方法中調用coordinator對象的poll方法去更新中繼資料,并且調用updateFetchPositions方法用于重新整理Consumer對應Partition對應的offset值;
資料的拉取在pollForFetches方法中;
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
//省略代碼
//從緩存區資料
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
//構造拉取請求發送
fetcher.sendFetches();
//省略代碼
//發起拉取資料請求
Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasAvailableFetches();
});
timer.update(pollTimer.currentTimeMs());
//省略代碼
return fetcher.fetchedRecords();
}
pollForFetches方法執行邏輯:
1、 從緩存取資料如有可用資料,直接傳回;
2、 構造請求對象fetches,一個節點node對應一個clientRequest對象,将其放入ConsumerNetworkClient對象的unsent屬性中;
3、 調用client對象poll方法,将上一步放入unsent屬性的請求對象ClientRequest發送出去;
4、 傳回所拉取到的消息;
Offset送出
offset送出放在ConsumerCoordinator對象中,offset送出又分為自動送出與手動送出;當設定了enable.auto.commit==true且 autoCommitIntervalMs等于指定間隔時有這麼幾個時機會觸發自動:
1、 consumer對象close時,調用commitOffsetsSync觸發同步的offset送出;
2、 consumer對象poll時,調用commitOffsetsAsync觸發異步的offset送出;
3、 觸發Partition與Topic 配置設定 assign時觸發commitOffsetsAsync異步送出;
4、 當發生relance或有Consumer加入Group時觸發commitOffsetsSync方法同步送出;
參考資料: http://kafka.apache.org