kafka consumer 線程設計
Kafka Java Consumer采用的是單線程的設計。其入口類KafkaConsumer是一個雙線程的設計,即使用者主線程和心跳線程。
使用者主線程,指的是啟動Consumer應用程式main方法的線程,心跳線程(Heartbeat Thread)隻負責定期給對應的Broker機器發送心跳請求,以表示消費者應用的存活性。
Kafka consumer不是線程安全的。所有網絡I/O都發生在進行調用應用程式的線程中。使用者的責任是確定多線程通路正确同步的。非同步通路将導緻ConcurrentModificationException。
ConcurrentmodificationException異常的出處見以下代碼:
/**
* Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
* when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
* supported).
* @throws IllegalStateException if the consumer has been closed
* @throws ConcurrentModificationException if another thread already has the lock
*/
private void acquire() {
ensureNotClosed();
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
該方法acquire 會在KafkaConsumer的大部分公有方法調用第一句就判斷是否正在同一個KafkaConsumer被多個線程調用。
"正在"怎麼了解呢?我們順便看下KafkaConsumer的commitAsync 這個方法就知道了。
@Override
public void commitAsync(OffsetCommitCallback callback) {
acquire(); // 引用開始
try {
commitAsync(subscriptions.allConsumed(), callback);
} finally {
release(); //引用釋放
}
}
我們看KafkaConsumer的release方法就是釋放正在操作KafkaConsumer執行個體的引用。
/**
* Release the light lock protecting the consumer from multi-threaded access.
*/
private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
通過以上的代碼了解,我們可以總結出來kafka多線程的要點: kafka的KafkaConsumer必須保證隻能被一個線程操作。
kafka consumer多線程消費消息
為了提高應用對消息的處理效率,我們通常會使用多線程來并行消費消息,進而加快消息的處理速度。
而多線程處理消息的方式主要有兩種。
方式一:每個partition建立一個線程
按Partition數量建立線程,然後每個線程裡建立一個Consumer,多個Consumer對多個Partition進行消費。
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5iN3czNxEWMxQGNxITY0ImNzYzXzETNzQTM5AzLcFTMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
每個線程有自己的消費者執行個體。優點和缺點如下:
優點:
- 這是最容易實作的
- 因為它不需要線上程之間協調,是以通常它是最快的。
- 它按順序處理每個分區(每個線程隻處理它接受的消息)。
缺點:
- 更多的消費者意味着更多的TCP連接配接到叢集(每個線程一個)。一般kafka處理連接配接非常的快,是以這是一個小成本。
- 更多的消費者意味着更多的請求被發送到伺服器,但稍微較少的資料批次可能導緻I/O吞吐量的一些下降
- 所有程序中的線程總數受到分區總數的限制。
這種屬于是經典模式,實作起來也比較簡單,适用于對消息的順序和offset控制有要求的場景。代碼示例:
public class ConsumerThreadSample {
private final static String TOPIC_NAME="xt";
/*
這種類型是經典模式,每一個線程單獨建立一個KafkaConsumer,用于保證線程安全
*/
public static void main(String[] args) throws InterruptedException {
KafkaConsumerRunner r1 = new KafkaConsumerRunner(0);
KafkaConsumerRunner r2 = new KafkaConsumerRunner(1);
KafkaConsumerRunner r3 = new KafkaConsumerRunner(2);
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
Thread t3 = new Thread(r3);
t1.start();
t2.start();
t3.start();
Thread.sleep(15000);
r1.shutdown();
r2.shutdown();
r3.shutdown();
}
public static class KafkaConsumerRunner implements Runnable{
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public KafkaConsumerRunner(int partitionNumber) {
Properties props = new Properties();
props.put("bootstrap.servers", "81.68.82.48:9092");
props.put("group.id", "groupxt");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
TopicPartition p = new TopicPartition(TOPIC_NAME, partitionNumber);
consumer.assign(Arrays.asList(p));
}
@Override
public void run() {
try {
while(!closed.get()) {
//處理消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
System.out.println("------------------"+Thread.currentThread().getName()+"-----消費消息----------------------------");
// 處理每個分區的消息
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("thread = %s ,patition = %d , offset = %d, key = %s, value = %s%n",
Thread.currentThread().getName(),record.partition(),record.offset(), record.key(), record.value());
}
System.out.println("-------------------"+Thread.currentThread().getName()+"-----消費消息----------------------------");
// 傳回去告訴kafka新的offset
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
// 注意加1
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}catch(WakeupException e) {
if(!closed.get()) {
throw e;
}
}finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
}
方式二:池化,一個consumer 去拉取消息,多個Worker線程處理消息
另一種多線程的消費方式則是在一個線程池中隻建立一個Consumer執行個體,然後通過這個Consumer去拉取資料後交由線程池中的線程去處理。如下圖所示:(類似于netty的形式,一個負責建立網絡通信,拉取到的資料交給其他處理器去處理)
但需要注意的是在這種模式下我們無法手動控制資料的offset,也無法保證資料的順序性,是以通常應用在流處理場景,對資料的順序和準确性要求不高。
優點:
- 可擴充消費者和處理程序的數量。這樣單個消費者的資料可分給多個處理器線程來執行,避免受分區partition的任何限制。
- 并發度高,單個consumer能力隻受CPU限制
缺點:
- 跨多個處理器的順序保證需要特别注意,因為線程是獨立的執行,後來的消息可能比遭到的消息先處理,這僅僅是因為線程執行的運氣。如果對排序沒有問題,這就不是個問題。
- 手動送出變得更困難,因為它需要協調所有的線程以確定處理對該分區的處理完成。
兩種實作方式的共同點:
- 每個consumer消費的partition個數都是由協調器協調
經過之前的例子,我們知道每拉取一次資料傳回的就是一個ConsumerRecords,這裡面存放了多條資料。然後我們對ConsumerRecords進行疊代,就可以将多條資料交由線程池中的多個線程去并行處理了。代碼示例:
public class ConsumerRecordThreadSample {
private final static String TOPIC_NAME = "xt";
public static void main(String[] args) throws InterruptedException {
String brokerList = "kafka IP:9092";
String groupId = "groupxt";
int workerNum = 3;
CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);
consumers.execute(workerNum);
Thread.sleep(1000000);
consumers.shutdown();
}
// Consumer處理
public static class CunsumerExecutor{
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
public CunsumerExecutor(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (final ConsumerRecord record : records) {
executors.submit(new ConsumerRecordWorker(record));
}
}
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}
// 記錄處理
public static class ConsumerRecordWorker implements Runnable {
private ConsumerRecord<String, String> record;
public ConsumerRecordWorker(ConsumerRecord record) {
this.record = record;
}
@Override
public void run() {
// 假如說資料入庫操作
System.err.printf("thread = %s ,patition = %d , offset = %d, key = %s, value = %s%n",
Thread.currentThread().getName(),record.partition(), record.offset(), record.key(), record.value());
}
}
}
References:
- https://www.jianshu.com/p/abbc09ed6703
- https://www.orchome.com/451#item-6
- https://blog.51cto.com/zero01/2498017
- https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded