kafka消費模型(圖檔引用自胡夕大神的部落格:http://www.cnblogs.com/huxi2b/p/6124937.html):
下面直接上代碼:
package cn.com.duiba.test;
public class Cache {
private static Cache instance = null;
/**
* kafka 拉取資料線程的狀态, false:停止擷取資料
*/
private boolean kafkaThreadStatus = true;
public static Cache getInstance(){
if(instance == null){
init();
}
return instance;
}
public synchronized static void init(){
if(instance == null){
instance = new Cache();
}
}
public boolean isKafkaThreadStatus() {
return kafkaThreadStatus;
}
public void setKafkaThreadStatus(boolean kafkaThreadStatus) {
this.kafkaThreadStatus = kafkaThreadStatus;
}
}
package cn.com.duiba.test;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WorkThread implements Callable<String>{
private static Logger logger = LoggerFactory.getLogger(WorkThread.class);
/**
* 記錄解析失敗的日志或者發送到kafka叢集失敗的日志
*/
private String topic;
private String message;
private Semaphore semaphore;
public WorkThread(String topic, String message, Semaphore semaphore){
this.topic = topic;
this.message = message;
this.semaphore = semaphore;
}
@Override
public String call() throws Exception {
try {
//deal your business
//......
//這裡隻列印下,如果在實際業務中處理失敗,可能是代碼bug或者系統不穩定等,先将消息記錄到日志中,後續可以處理,不影響主流程的繼續運作
logger.info("topic is {}, message is {}", topic, message);
} catch (Exception e) {
logger.error("ParseKafkaLogJob run error. ", e);
} finally{
semaphore.release();
}
return "done";
}
}
package cn.com.duiba.test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TopicPartitionThread extends Thread{
private static Logger logger = LoggerFactory.getLogger(TopicPartitionThread.class);
private ExecutorService workerExecutorService;
private Semaphore semaphore;
private Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
private List<Future<String>> taskList = new ArrayList<>();
public TopicPartitionThread(ExecutorService workerExecutorService, Semaphore semaphore){
this.workerExecutorService = workerExecutorService;
this.semaphore = semaphore;
}
@Override
public void run() {
//啟動kafka消費
Properties props = new Properties();
props.put("bootstrap.servers", "xx.xx.xx.xx:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records", 100); //每次poll最多擷取100條資料
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test001", "test002"),
new ConsumerRebalanceListener(){
@Override
public void onPartitionsRevoked(
Collection<TopicPartition> partitions) {
logger.info("threadId = {}, onPartitionsRevoked.", Thread.currentThread().getId());
consumer.commitSync(offsetsMap);
}
@Override
public void onPartitionsAssigned(
Collection<TopicPartition> partitions) {
logger.info("threadId = {}, onPartitionsAssigned.", Thread.currentThread().getId());
offsetsMap.clear();
//清空taskList清單
taskList.clear();
}});
//接收kafka消息
while (Cache.getInstance().isKafkaThreadStatus()) {
try {
//使用100ms作為擷取逾時時間
ConsumerRecords<String, String> records = consumer.poll(100);
for (final ConsumerRecord<String, String> record : records) {
semaphore.acquire();
//記錄目前 TopicPartition和OffsetAndMetadata
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offset = new OffsetAndMetadata(record.offset());
offsetsMap.put(topicPartition, offset);
//送出任務到線程池處理
taskList.add(workerExecutorService.submit(new WorkThread(record.topic(), record.value(), semaphore)));
}
//判斷kafka消息是否處理完成
for(Future<String> task : taskList){
//阻塞,直到消息處理完成
task.get();
}
//同步向kafka叢集中送出offset
consumer.commitSync();
} catch (Exception e) {
logger.error("TopicPartitionThread run error.", e);
} finally{
//清空taskList清單
taskList.clear();
}
}
//關閉comsumer連接配接
consumer.close();
}
}
package cn.com.duiba.test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.com.duiba.service.TopicPartitionThread;
public class Main {
private static Logger logger = LoggerFactory.getLogger(Main.class);
/**
* 用于接收kafka 消息的線程池
*/
private ExecutorService kafkaConsumerExecutorService;
/**
* 具體處理kafka消息的線程池
*/
private ExecutorService workerExecutorService;
/**
* 用于阻塞往線程池中送出新的任務,直到有可用的線程
*/
private Semaphore semaphore;
private int kafkaConsumerExecutorNumber = 10;
private int workerExecutorNumber = 50;
public Main(){
//線程池的大小可以根據自己需要來調節,這裡簡單點就使用了固定線程池
//用于接收kafka 消息的線程池
kafkaConsumerExecutorService = Executors.newFixedThreadPool(kafkaConsumerExecutorNumber);
//具體處理kafka消息的線程池
workerExecutorService = Executors.newFixedThreadPool(workerExecutorNumber);
//用于阻塞往線程池中送出新的任務,直到有可用的線程
semaphore = new Semaphore(workerExecutorNumber);
}
public static void main(String[] args) throws Exception{
Main main = new Main();
//啟動kafka消費
main.start();
//運作一段時間後停止kafka消息接收
Thread.sleep(3600 * 1000); //運作一個小時
//關閉線程池,實際部署在生産上的應用,不要直接kill -9 強制關閉,請使用kill ,給應用關閉前做一些清理操作
main.destroy();
}
public void start(){
for(int i = 0; i < kafkaConsumerExecutorNumber; i++){
kafkaConsumerExecutorService.submit(new TopicPartitionThread(workerExecutorService, semaphore));
}
}
public void destroy() throws Exception {
//停止kafka 消費線程
Cache.getInstance().setKafkaThreadStatus(false);
//關閉線程池
kafkaConsumerExecutorService.shutdown();
while(!kafkaConsumerExecutorService.awaitTermination(1, TimeUnit.SECONDS)){
logger.info("await kafkaConsumerExecutorService stop...");
}
logger.info("kafkaConsumerExecutorService stoped.");
workerExecutorService.shutdown();
while(!workerExecutorService.awaitTermination(1, TimeUnit.SECONDS)){
logger.info("await workerExecutorService stop...");
}
logger.info("workerExecutorService stoped.");
}
}
建議多部署幾台伺服器,來達到負載均衡以及高可用,大家有什麼疑問或者建議都可以在下面進行評論
關于kafka重複消費:可以更新到kafka 0.11版本,引入了事務,已經支援EOS語義了