天天看點

kafka多線程消費和手動送出offset,新版java api

kafka消費模型(圖檔引用自胡夕大神的部落格:http://www.cnblogs.com/huxi2b/p/6124937.html):

kafka多線程消費和手動送出offset,新版java api

下面直接上代碼:

package cn.com.duiba.test;

public class Cache {
	private static Cache instance = null;
	/**
	 * kafka 拉取資料線程的狀态, false:停止擷取資料
	 */
	private boolean kafkaThreadStatus = true;
	
	public static Cache getInstance(){
		if(instance == null){
			init();
		}
		return instance;
	}
	
	public synchronized static void init(){
		if(instance == null){
			instance = new Cache();
		}
	}

	public boolean isKafkaThreadStatus() {
		return kafkaThreadStatus;
	}
	public void setKafkaThreadStatus(boolean kafkaThreadStatus) {
		this.kafkaThreadStatus = kafkaThreadStatus;
	}

}
           
package cn.com.duiba.test;

import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkThread implements Callable<String>{
	private static Logger logger = LoggerFactory.getLogger(WorkThread.class);
	/**
	 * 記錄解析失敗的日志或者發送到kafka叢集失敗的日志
	 */
	private String topic;
	private String message;
	private Semaphore semaphore;
	
	public WorkThread(String topic, String message, Semaphore semaphore){
		this.topic = topic;
		this.message = message;
		this.semaphore = semaphore;
	}

	@Override
	public String call() throws Exception {
		try {
			//deal your business
			//......
			
			//這裡隻列印下,如果在實際業務中處理失敗,可能是代碼bug或者系統不穩定等,先将消息記錄到日志中,後續可以處理,不影響主流程的繼續運作
			logger.info("topic is {}, message is {}", topic, message);
		} catch (Exception e) {
			logger.error("ParseKafkaLogJob run error. ", e);
		} finally{
			semaphore.release();
		}
		return "done";
	}
}
           
package cn.com.duiba.test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicPartitionThread extends Thread{
	private static Logger logger = LoggerFactory.getLogger(TopicPartitionThread.class);
	
	private ExecutorService workerExecutorService;
	
	private Semaphore semaphore;
	
	private Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
	
	private List<Future<String>> taskList = new ArrayList<>();
	
	public TopicPartitionThread(ExecutorService workerExecutorService, Semaphore semaphore){
		this.workerExecutorService = workerExecutorService;
		this.semaphore = semaphore;
	}

	@Override
	public void run() {
		//啟動kafka消費
		Properties props = new Properties();
		props.put("bootstrap.servers", "xx.xx.xx.xx:9092");
		props.put("group.id", "test");
		props.put("enable.auto.commit", "false");
		props.put("session.timeout.ms", "30000");
		props.put("max.poll.records", 100); //每次poll最多擷取100條資料
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("test001", "test002"), 
				new ConsumerRebalanceListener(){

			@Override
			public void onPartitionsRevoked(
					Collection<TopicPartition> partitions) {
				logger.info("threadId = {}, onPartitionsRevoked.", Thread.currentThread().getId());
				consumer.commitSync(offsetsMap);
			}

			@Override
			public void onPartitionsAssigned(
					Collection<TopicPartition> partitions) {
				logger.info("threadId = {}, onPartitionsAssigned.", Thread.currentThread().getId());
				offsetsMap.clear();
				//清空taskList清單
				taskList.clear();
			}});
		
		//接收kafka消息
		while (Cache.getInstance().isKafkaThreadStatus()) {
			try {
				//使用100ms作為擷取逾時時間
				ConsumerRecords<String, String> records = consumer.poll(100);
				for (final ConsumerRecord<String, String> record : records) {
					semaphore.acquire();
					//記錄目前 TopicPartition和OffsetAndMetadata
					TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
					OffsetAndMetadata offset = new OffsetAndMetadata(record.offset());
					offsetsMap.put(topicPartition, offset);
					
					//送出任務到線程池處理
					taskList.add(workerExecutorService.submit(new WorkThread(record.topic(), record.value(), semaphore)));
				}
				
				//判斷kafka消息是否處理完成
				for(Future<String> task : taskList){
					//阻塞,直到消息處理完成
					task.get();
				}
				
				//同步向kafka叢集中送出offset
				consumer.commitSync();
			} catch (Exception e) {
				logger.error("TopicPartitionThread run error.", e);
			} finally{
				//清空taskList清單
				taskList.clear();
			}
		}
		
		//關閉comsumer連接配接
		consumer.close();
	}
}
           
package cn.com.duiba.test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.com.duiba.service.TopicPartitionThread;

public class Main {
	private static Logger logger = LoggerFactory.getLogger(Main.class);
	/**
	 * 用于接收kafka 消息的線程池
	 */
	private ExecutorService kafkaConsumerExecutorService;
	/**
	 * 具體處理kafka消息的線程池
	 */
	private ExecutorService workerExecutorService;
	/**
	 * 用于阻塞往線程池中送出新的任務,直到有可用的線程
	 */
	private Semaphore semaphore;
	private int kafkaConsumerExecutorNumber = 10;
	private int workerExecutorNumber = 50;
	
	public Main(){
		//線程池的大小可以根據自己需要來調節,這裡簡單點就使用了固定線程池
		//用于接收kafka 消息的線程池
		kafkaConsumerExecutorService = Executors.newFixedThreadPool(kafkaConsumerExecutorNumber);
		//具體處理kafka消息的線程池
		workerExecutorService = Executors.newFixedThreadPool(workerExecutorNumber);
		//用于阻塞往線程池中送出新的任務,直到有可用的線程
		semaphore = new Semaphore(workerExecutorNumber);
	}
	
	public static void main(String[] args) throws Exception{
		Main main = new Main();
		//啟動kafka消費
		main.start();
		//運作一段時間後停止kafka消息接收
		Thread.sleep(3600 * 1000); //運作一個小時
		//關閉線程池,實際部署在生産上的應用,不要直接kill -9 強制關閉,請使用kill ,給應用關閉前做一些清理操作
		main.destroy();
	}
	
	public void start(){
		for(int i = 0; i < kafkaConsumerExecutorNumber; i++){
			kafkaConsumerExecutorService.submit(new TopicPartitionThread(workerExecutorService, semaphore));
		}
	}
	
	public void destroy() throws Exception {
		//停止kafka 消費線程
		Cache.getInstance().setKafkaThreadStatus(false);
		//關閉線程池
		kafkaConsumerExecutorService.shutdown();
		while(!kafkaConsumerExecutorService.awaitTermination(1, TimeUnit.SECONDS)){
			logger.info("await kafkaConsumerExecutorService stop...");
		}
		logger.info("kafkaConsumerExecutorService stoped.");
		workerExecutorService.shutdown();
		while(!workerExecutorService.awaitTermination(1, TimeUnit.SECONDS)){
			logger.info("await workerExecutorService stop...");
		}
		logger.info("workerExecutorService stoped.");
	}
}
           

建議多部署幾台伺服器,來達到負載均衡以及高可用,大家有什麼疑問或者建議都可以在下面進行評論

關于kafka重複消費:可以更新到kafka 0.11版本,引入了事務,已經支援EOS語義了