天天看點

Kafka Consumer API樣例

Kafka Consumer API樣例

1. 自動确認Offset

說明參照:http://blog.csdn.net/xianzhen376/article/details/51167333

Properties props = new Properties();
/* 定義kakfa 服務的位址,不需要将所有broker指定上 */
props.put("bootstrap.servers", "localhost:9092");
/* 制定consumer group */
props.put("group.id", "test");
/* 是否自動确認offset */
props.put("enable.auto.commit", "true");
/* 自動确認offset的時間間隔 */
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
/* key的序列化類 */
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/* value的序列化類 */
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 /* 定義consumer */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/* 消費者訂閱的topic, 可同時訂閱多個 */
consumer.subscribe(Arrays.asList("foo", "bar"));

 /* 讀取資料,讀取逾時時間為100ms */
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
           

說明:

1. bootstrap.servers 隻是代表kafka的連接配接入口,隻需要指定叢集中的某一broker;

2. 一旦consumer和kakfa叢集建立連接配接,consumer會以心跳的方式來高速叢集自己還活着,如果session.timeout.ms 内心跳未到達伺服器,伺服器認為心跳丢失,會做rebalence。

2. 手工控制Offset

如果consumer在獲得資料後需要加入處理,資料完畢後才确認offset,需要程式來控制offset的确認。舉個栗子:

consumer獲得資料後,需要将資料持久化到DB中。自動确認offset的情況下,如果資料從kafka叢集讀出,就确認,但是持久化過程失敗,就會導緻資料丢失。我們就需要控制offset的确認。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
/* 關閉自動确認選項 */
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }
    /* 資料達到批量要求,就寫入DB,同步确認offset */
    if (buffer.size() >= minBatchSize) {
        insertIntoDb(buffer);
        consumer.commitSync();
        buffer.clear();
    }
}
           

還可以精細的控制對具體分區具體offset資料的确認:

try {
    while(running) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, String> record : partitionRecords) {
                System.out.println(record.offset() + ": " + record.value());
            }
            /* 同步确認某個分區的特定offset */
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
        }
    }
} finally {
  consumer.close();
}
           

說明:确認的offset為已接受資料最大offset+1。

3. 分區訂閱

可以向特定的分區訂閱消息。但是會失去partion的負載分擔。有幾種場景可能會這麼玩:

1. 隻需要擷取本機磁盤的分區資料;

2. 程式自己或者外部程式能夠自己實作負載和錯誤處理。例如YARN/Mesos的介入,當consumer挂掉後,再啟動一個consumer。

String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
           

說明:

1. 此種情況用了consumer Group,也不會做負載均衡。

2. topic的訂閱和分區訂閱不可以在同一consumer中混用。

4. 外部存儲offset

消費者可以自定義kafka的offset存儲位置。該設計的主要目的是讓消費者将資料和offset進行原子性的存儲。這樣可以避免上面提到的重複消費問題。舉栗說明:

訂閱特定分區。存儲所獲得的記錄時,将每條記錄的offset一起存儲。保證資料和offset的存儲是原子性的。當異步存儲被異常打斷時,凡已經存儲的資料,都有有相應的offset記錄。這種方式可以保證不會有資料丢失,也不會重複的從服務端讀取。

如何配置實作:

1. 去使能offset自動确認:enable.auto.commit=false;

2. 從ConsumerRecord中擷取offset,儲存下來;

3. Consumer重新開機時,調用seek(TopicPartition, long)重置在服務端的消費記錄。

如果消費分區也是自定義的,這種方式用起來會很爽。如果分區是自動配置設定的,當分區發生reblance的時候,就要考慮清楚了。如果因為更新等原因,分區漂移到一個不會更新offset的consumer上,那就日了狗了。

該情況下:

1. 原consumer需要監聽分區撤銷事件,并在撤銷時确認好offset。接口:ConsumerRebalanceListener.onPartitionsRevoked(Collection);

2. 新consumer監聽分區配置設定事件,擷取目前分區消費的offset。接口:ConsumerRebalanceListener.onPartitionsAssigned(Collection);

3. consumer監聽到 ConsumerRebalance事件,還沒有處理或者持久化的緩存資料flush掉。

5. 控制消費位置

大多數情況下,服務端的Consumer的消費位置都是由用戶端間歇性的确認。Kafka允許Consumer自己設定消費起點,達到的效果:

1. 可以消費已經消費過的資料;

2. 可以跳躍性的消費資料;

看下這樣做的一些場景:

1. 對Consumer來說,資料具備時效性,隻需要擷取最近一段時間内的資料,就可以進行跳躍性的擷取資料;

2. 上面自己存offset的場景,重新開機後就需要從指定的位置開始消費。

接口上面已經提到過了,用seek(TopicPartition, long)。、

麻蛋,說指針不就好了,這一小節就是多餘的叨叨。

6. 控制消費流Consumption Flow Control

如果一個consumer同時消費多個分區,預設情況下,這多個分區的優先級是一樣的,同時消費。Kafka提供機制,可以讓暫停某些分區的消費,先擷取其他分區的内容。場景舉栗:

1. 流式計算,consumer同時消費兩個Topic,然後對兩個Topic的資料做Join操作。但是這兩個Topic裡面的資料産生速率差距較大。Consumer就需要控制下擷取邏輯,先擷取慢的Topic,慢的讀到資料後再去讀快的。

2. 同樣多個Topic同時消費,但是Consumer啟動是,本地已經存有了大量某些Topic資料。此時就可以優先去消費下其他的Topic。

調控的手段:讓某個分區消費先暫停,時機到了再恢複,然後接着poll。接口:pause(TopicPartition…),resume(TopicPartition…)

7. 多線程處理模型 Multi-threaded Processing

Kafka的Consumer的接口為非線程安全的。多線程共用IO,Consumer線程需要自己做好線程同步。

如果想立即終止consumer,唯一辦法是用調用接口:wakeup(),使處理線程産生WakeupException。看磚:

public class KafkaConsumerRunner implements Runnable {
    /* 注意,這倆貨是類成員變量 */
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer;

    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = consumer.poll(10000);
                // Handle new records
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}
           

說明:

1. KafkaConsumerRunner是runnable的,請自覺補腦多線程運作;

2. 外部線程控制KafkaConsumerRunner線程的停止;

3. 主要說的是多線程消費同一topic,而不是消費同一分區;

比較一下兩種模型:

Consumer單線程模型

優點:實作容易;

優點:沒有線程之間的協作。通常比下面的那種更快;

優點:單分區資料的順序處理;

缺點:多個TCP連接配接,但是關系不大,kafka對自己的server自信滿滿;

缺點:太多的Request可能導緻server的吞吐降低一丢丢;

缺點:consumer數量受到分區數量限制,一個consumer一個分區;

Consumer多線程模型

優點:一個consumer任意多的線程,線程數不用受到分區數限制;

缺點:如果有保序需求,自己要加控制邏輯;

缺點:該模型中如果手動offset,自己要加控制邏輯;

一種可行的解決辦法:為每個分區配置設定獨立的存儲,擷取的資料根據資料所在分區進行hash存儲。這樣可以解決順序消費,和offset的确認問題。

後記

其實對于官網上說的,我是迷惑的:

對比兩種線程模型時,應該是有隐藏地圖的。

1. 單線程模型中,多分區情況下,應該說的是每個Consumer獨立去消費一個分區;

2. 多線程模型中,單Consumer消費一個Topic。如果多個線程同時消費同一分區,也就是要公用連接配接了,各個線程之間要做好同步;

3. 對于多線程模型下提出的用戶端分區資料分開存儲,各個分區之間是如何保序的?