天天看點

從KafkaConsumer看看Kafka(一)

  Kafka的消息模型為釋出訂閱模型,消息生産者将消息釋出到主題(topic)中,一個或多個消費者訂閱(消費)該主題消息并消費,此模型中釋出到topic中的消息會被所有消費者所訂閱到,先介紹Kafka消費模型,然後再通過KafkaConsumer原來了解它的業務流程,源碼基于kafka 2.4;

Kafka消費模型關鍵點:

  1、Kafka一個消費組(ConsumerGroup)中存在一個或多個消費者(Consumer),每個消費者也必須屬于一個消費者組;

  2、消費者組(ConsumerGroup)中的消費者(Consumer)獨占一個或多個分區(Partition);

  3、消費時每個分區(Partition)最多隻有一個Consumer再消費;

  4、消費者組(ConsumerGroup)在Broker存在一個協調者(Coordinator)配置設定管理Consumer與Partition之間的對應關系。當兩種中的Consumer或Partition發生變更時将會觸發reblance(重新平衡),重新配置設定Consumer與Partition的對應關系;

下面是Kafka消費者程式的示例:

//配置Consumer
Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "1000");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

//建立Consumer
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱主題
 consumer.subscribe(Arrays.asList("foo", "bar"));
//消費消息
     while (true) {
     ConsumerRecords<String, String> records = 
consumer.poll(100);
     for (ConsumerRecord<String, String> record : records)
         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
 }
           

  在上面我們可以看到Kafka消費消息的整個流程:配置Consumer屬性、訂閱主題、拉取消費消息,基本流程知道了也就是這幾個點,配置ConsumerId、自動送出offset、序列化、Kafka服務端位址,這就是Kafka最最最基礎的配置,當然還有很多配置項可以到官網檢視;

消費者關鍵點

  Consumer程式主要分為三個部分:配置、訂閱主題、拉取消息;從中也可以看到在消費前需要訂閱某個主題、在前面我們提到Consumer執行個體需要與某個Partition綁定關聯然後才能進行消費資料,下面我們透過官方提供的Consumer程式簡單看看如何訂閱主題、如何關聯Consumer與Partition、如何拉取消息消費;

訂閱主題

  訂閱主題可以說是Kafka消費的基礎,下面先看看簡化後的訂閱方法:

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
    acquireAndEnsureOpen();
    try {
       //忽略部分代碼
        if (topics.isEmpty()) {
            this.unsubscribe();
        } else {
            if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
                metadata.requestUpdateForNewTopics();
        }
    } finally {
        release();
    }
}
           

  安全檢查: Consumer注釋中也說了KafkaConsumer為非線程安全的,從上也可看到acquireAndEnsureOpen的作用就是檢查目前是否為多線程運作,確定Consumer隻在一個線程中執行;

  設定訂閱狀态: SubscriptionState 對象的subscribe方法主要是設定ConsumerRelance監聽器、設定所監聽的主題;

  更新中繼資料: metadata對象維護了Kafka叢集中繼資料子集,存儲了Broker節點、Topic、Partition節點資訊等;

  跟進metadata.requestUpdateForNewTopics方法發現最終調用了metadata對象的requestUpdate方法;

public synchronized int requestUpdate() {
    this.needUpdate = true;
    return this.updateVersion;
}
           

  此方法并沒有什麼實質性的動作,隻是更新needUpdate屬性為true;由于Kafka拉取資料時必須得到中繼資料資訊否則無法知道broker、topic、Partition資訊也就無法知道去哪個節點拉取資料;但此處并沒有實質性的更新中繼資料請求,接下來我們看看拉取方法。

拉取資料

  上一步訂閱了主題,這時我們就可以從中拉取資料,跟蹤代碼最終進入了KafkaConsumer的poll方法;

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    //多線程檢查
    acquireAndEnsureOpen();
    try {//省略代碼
//逾時檢查
            if (includeMetadataInTimeout) {
                //請求更新中繼資料
                if (!updateAssignmentMetadataIfNeeded(timer)) {
                    return ConsumerRecords.empty();
                }
            } else {//省略代碼
			}
            //拉取資料
            final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
            if (!records.isEmpty()) {
                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                    client.pollNoWakeup();
                }
                //調用消費者攔截器後傳回
                return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }
        return ConsumerRecords.empty();
    } finally {
        release();
        this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
    }
}
           

此方法幾個流程

1、 多線程檢查

2、 逾時檢查

3、 請求更新中繼資料

4、 拉取資料

  此處我們比較關心的還是更新中繼資料與拉取資料,這裡我們主要看看這兩個流程的執行;

請求更新中繼資料

  在updateAssignmentMetadataIfNeeded方法中調用coordinator對象的poll方法去更新中繼資料,并且調用updateFetchPositions方法用于重新整理Consumer對應Partition對應的offset值;

  資料的拉取在pollForFetches方法中;

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
    //省略代碼
    //從緩存區資料
    final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty()) {
        return records;
    }
	//構造拉取請求發送
    fetcher.sendFetches();

    //省略代碼
	//發起拉取資料請求
    Timer pollTimer = time.timer(pollTimeout);
    client.poll(pollTimer, () -> {
        // since a fetch might be completed by the background thread, we need this poll condition
        // to ensure that we do not block unnecessarily in poll()
        return !fetcher.hasAvailableFetches();
    });
    timer.update(pollTimer.currentTimeMs());
	//省略代碼

    return fetcher.fetchedRecords();
           

}

pollForFetches方法執行邏輯:

  1、 從緩存取資料如有可用資料,直接傳回;

  2、 構造請求對象fetches,一個節點node對應一個clientRequest對象,将其放入ConsumerNetworkClient對象的unsent屬性中;

  3、 調用client對象poll方法,将上一步放入unsent屬性的請求對象ClientRequest發送出去;

  4、 傳回所拉取到的消息;

Offset送出

  offset送出放在ConsumerCoordinator對象中,offset送出又分為自動送出與手動送出;當設定了enable.auto.commit==true且  autoCommitIntervalMs等于指定間隔時有這麼幾個時機會觸發自動:

  1、 consumer對象close時,調用commitOffsetsSync觸發同步的offset送出;

  2、 consumer對象poll時,調用commitOffsetsAsync觸發異步的offset送出;

  3、 觸發Partition與Topic 配置設定 assign時觸發commitOffsetsAsync異步送出;

  4、 當發生relance或有Consumer加入Group時觸發commitOffsetsSync方法同步送出;

參考資料: http://kafka.apache.org