說明
kafka版本:kafka_2.10-0.8.2.1(kafka0.9.xx版本提供了新的API)
IED環境:intellij14 + maven3.3
語言:java
consumer低級别API開發
低級别API适用場景
低級别API和進階别最大的不同就是你可以自己控制一個topic的不同partition的消費和offset。适用于:1)你想多次讀一條消息,2)你隻想消費一個topic的部分partition,3)你想對partition的offset有更加嚴格的控制等。
當然,更多的控制同時帶來了更多的工作,比如1)你必須在程式中自己控制offset,2)你必須自己控制并處理不同partition的kafka broker的leader問題等。
程式示例
本程式實作一個低級别API的kafka consumer,實作對offset保持至本地檔案,下次啟動時,自己從offset檔案讀取offset位置。
maven依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2</artifactId>
<version></version>
</dependency>
程式包組織

配置檔案consumer.properties
brokerList=xxxx,xxxx,xxxx,xxxx
port=
topic=myTopic
partitionNum=
#offset file path
checkpoint=./checkpoint
#once subscribe size
patchSize=
#latest or earliest
subscribeStartPoint=earliest
KafkaConfig.java代碼
package kafka.simple;
import java.util.List;
public class KafkaConfig {
public String topic = null; // topic
public int partitionNum = ; // partition個數
public int port = ; // kafka broker端口号
public List<String> replicaBrokers = null; // kafka broker ip清單
public String checkpoint; // checkpoint目錄,即儲存partition offset的目錄
public int patchSize = ; // 一次讀取partition最大消息個數
public String subscribeStartPoint = null; // 預設開始訂閱點,latest or earliest(最近或者最早)
public KafkaConfig() { }
@Override
public String toString() {
return "[brokers:" + replicaBrokers.toString()
+ "] [port:" + port
+ "] [topic:" + topic
+ "] [partition num:" + partitionNum
+ "] [patch size:" + patchSize
+ "] [start point:" + subscribeStartPoint
+ "]";
}
}
KafkaUtil.java代碼
package kafka.simple;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class KafkaUtil {
private static final Logger logger = LogManager.getLogger(KafkaUtil.class);
/**
* 找一個broker leader
* @param seedBrokers 配置的broker清單
* @param port broker端口
* @param topic
* @param partition
* @return
*/
public static PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {
PartitionMetadata returnMeataData = null;
logger.info("find leader begin. brokers:[" + seedBrokers.toString() + "]");
loop:
for (String seed : seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed, port, , * , "leaderLookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse res = consumer.send(req);
List<TopicMetadata> metadatas = res.topicsMetadata();
for (TopicMetadata item : metadatas) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == partition) {
returnMeataData = part;
break loop;
}
}
}
} catch (Exception e) {
logger.error("error communicating with broker [" + seed + "] to find Leader for [" + topic
+ ", " + partition + "] Reason: " + e);
} finally {
if (consumer != null) {
consumer.close();
}
}
}
if (returnMeataData != null) {
// TODO
// seedBrokers.clear();
// for (kafka.cluster.Broker seed : returnMeataData.replicas()) {
// seedBrokers.add(seed.host());
// }
}
return returnMeataData;
}
/**
* 找一個新的kafka broker leader
* @param oldLeader
* @param seedBrokers
* @param port
* @param topic
* @param partition
* @return
* @throws Exception
*/
public static String findNewLeader(String oldLeader,
List<String> seedBrokers, int port,
String topic, int partition) throws Exception {
for (int i = ; i < ; ++i) {
boolean sleep = false;
PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);
if (metadata == null) {
sleep = true;
} else if (metadata.leader() == null) {
sleep = true;
} else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == ) {
sleep = true;
} else {
return metadata.leader().host();
}
if (sleep) {
try {
Thread.sleep();
} catch (InterruptedException e) {
// do nothing
}
}
}
logger.warn("Unable to find new leader after Broker failure. Exiting");
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}
/**
* 擷取指定topic,指定partition的offset
* @param consumer
* @param topic
* @param partition
* @param whichTime
* @param clientId
* @return
*/
public static long getSpecificOffset(SimpleConsumer consumer,
String topic, int partition,
long whichTime, String clientId) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, ));
kafka.javaapi.OffsetRequest request =
new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientId);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
logger.warn("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return -;
}
long[] offsets = response.offsets(topic, partition);
return offsets[];
}
}
PartitionMsgTask.java代碼
package kafka.simple;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.message.MessageAndOffset;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class PartitionMsgTask implements Runnable {
private static final Logger logger = LogManager.getLogger(PartitionMsgTask.class);
private KafkaConfig kafkaConfig = null;
private String filePath = null;
private int partitionIndex = ;
private SimpleConsumer consumer = null;
private String leadBroker = null;
private long readOffset = ;
private String clientName = null;
public PartitionMsgTask(KafkaConfig config, int index) {
partitionIndex = index;
this.kafkaConfig = config;
filePath = kafkaConfig.checkpoint + "/partition" + partitionIndex;
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
}
public void run() {
logger.info("partition:" + partitionIndex + " config:" + kafkaConfig.toString());
PartitionMetadata metadata = KafkaUtil
.findLeader(kafkaConfig.replicaBrokers, kafkaConfig.port, kafkaConfig.topic, partitionIndex);
if (metadata == null) {
logger.error("Can't find metadata for Topic:" + kafkaConfig.topic
+ " and Partition:" + partitionIndex + ". Exiting");
return;
}
if (metadata.leader() == null) {
logger.error("Can't find Leader for Topic:" + kafkaConfig.topic
+ " and Partition:" + partitionIndex + ". Exiting");
return;
}
leadBroker = metadata.leader().host();
clientName = "Client_" + kafkaConfig.topic + "_" + partitionIndex;
logger.info("leadBroker:" + leadBroker + " client:" + clientName);
consumer = new SimpleConsumer(leadBroker, kafkaConfig.port, , * , clientName);
// first time to get offset
readOffset = getOffset();
logger.info("first time get offset :" + readOffset);
if (readOffset == -) {
logger.error("get offset failed");
return;
}
logger.info("partition" + partitionIndex + " thread run success.");
while (true) {
try {
int ret;
List<String> messageList = new ArrayList<String>();
// subscribe message
long offset = subsribe(kafkaConfig.patchSize, messageList);
if (offset < ) {
logger.error("subscribe message failed. will continue");
continue;
}
// todo process messageList
// todo 如果處理失敗,可重試或者繼續,自己選擇是否儲存offset
// save offset
ret = saveOffset(offset);
if (ret != ) {
if (saveOffset(offset) != ) {
continue;
}
}
readOffset = offset;
} catch (Exception e) {
logger.error("exception :" + e.getMessage());
}
}
}
/**
* 擷取offset
* @return
*/
public long getOffset() {
long offset = -;
// get from file
String offsetFile = filePath + "/" + "offset";
BufferedReader reader = null;
try {
File file = new File(offsetFile);
reader = new BufferedReader(new FileReader(file));
String tempStr = reader.readLine();
offset = Long.parseLong(tempStr);
reader.close();
return offset;
} catch (FileNotFoundException e) {
logger.info("offset file:" + offsetFile + " not found. will get the "
+ kafkaConfig.subscribeStartPoint + " offset.");
} catch (IOException e) {
logger.error("get offset from file exception");
return -;
}
if (kafkaConfig.subscribeStartPoint.equals("earliest")) {
// get earliest offset
offset = KafkaUtil.getSpecificOffset(consumer,
kafkaConfig.topic, partitionIndex,
kafka.api.OffsetRequest.EarliestTime(), clientName);
} else if (kafkaConfig.subscribeStartPoint.equals("latest")) {
// get latest offset
offset = KafkaUtil.getSpecificOffset(consumer,
kafkaConfig.topic, partitionIndex,
kafka.api.OffsetRequest.LatestTime(), clientName);
} else {
logger.error("kafka config start point error");
}
return offset;
}
/**
* 保持offset
* @param offset
* @return
*/
public int saveOffset(long offset) {
String offsetFile = filePath + "/" + "offset";
try {
File file = new File(offsetFile);
if (!file.exists()) {
file.createNewFile();
}
FileWriter fileWriter = new FileWriter(file);
fileWriter.write(String.valueOf(offset));
fileWriter.close();
} catch (IOException e) {
logger.error("save offset failed");
return -;
}
return ;
}
/**
* 訂閱消息
* @param maxReads
* @param messageList
* @return
* @throws Exception
*/
public long subsribe(long maxReads, List<String> messageList) throws Exception {
if (messageList == null) {
logger.warn("messageList is null");
return -;
}
int numErrors = ;
long offset = readOffset;
while (maxReads > ) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, kafkaConfig.port, , * , clientName);
}
FetchRequest request = new FetchRequestBuilder().clientId(clientName)
.addFetch(kafkaConfig.topic, partitionIndex, offset, ).build();
FetchResponse fetchResponse = consumer.fetch(request);
if (fetchResponse.hasError()) {
logger.warn("fetch response has error");
numErrors++;
short code = fetchResponse.errorCode(kafkaConfig.topic, partitionIndex);
logger.warn("Error fetching data from the Broker:" + leadBroker + " error code: " + code);
if (numErrors > ) {
return -;
}
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for the last element to reset
// offset = KafkaUtil.getLastOffset(consumer, kafkaConfig.topic, partitionIndex,
// kafka.api.OffsetRequest.LatestTime(), clientName);
logger.warn("offset out of range. will get a new offset");
offset = getOffset();
continue;
}
consumer.close();
consumer = null;
leadBroker = KafkaUtil.findNewLeader(leadBroker,
kafkaConfig.replicaBrokers,
kafkaConfig.port,
kafkaConfig.topic,
partitionIndex);
continue;
}
numErrors = ;
long numRead = ;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(kafkaConfig.topic, partitionIndex)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < offset) {
logger.warn("Found an old offset: " + currentOffset + " Expecting: " + offset);
continue;
}
offset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
String message = new String(bytes, "UTF-8");
messageList.add(message);
numRead++;
maxReads--;
}
if (numRead == ) {
try {
Thread.sleep();
} catch (InterruptedException e) {
// do nothing
}
}
}
return offset;
}
}
KafkaSimpleConsumer.java代碼
package kafka.simple;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaSimpleConsumer {
private static final Logger logger = LogManager.getLogger(KafkaSimpleConsumer.class);
private KafkaConfig kafkaConfig = null;
private ExecutorService executor = null;
private boolean inited = false;
public void run() {
if (!inited) {
logger.error("uninit, init first!");
return;
}
File file = new File(kafkaConfig.checkpoint);
if (!file.exists()) {
file.mkdir();
}
executor = Executors.newFixedThreadPool(kafkaConfig.partitionNum);
int threadNum = ;
for (; threadNum < kafkaConfig.partitionNum; ++threadNum) {
file = new File(kafkaConfig.checkpoint +"/partition" + threadNum);
if (!file.exists() && !file.isDirectory()) {
file.mkdir();
}
logger.info("begin submit partition msg task thread");
executor.submit(new PartitionMsgTask(kafkaConfig, threadNum));
}
}
public int init(String confFile) {
Properties props = new Properties();
kafkaConfig = new KafkaConfig();
try {
FileInputStream in = new FileInputStream(confFile);
props.load(in);
} catch (FileNotFoundException e) {
logger.error("kafka config file not found. file name:" + confFile);
return -;
} catch (IOException e) {
logger.error("properties load file failed");
return -;
}
kafkaConfig.topic = props.getProperty("topic");
kafkaConfig.port = Integer.parseInt(props.getProperty("port"));
kafkaConfig.partitionNum = Integer.parseInt(props.getProperty("partitionNum"));
kafkaConfig.checkpoint = props.getProperty("checkpoint");
kafkaConfig.patchSize = Integer.parseInt(props.getProperty("patchSize"));
String startPoint = props.getProperty("subscribeStartPoint");
if (!startPoint.equals("latest") && !startPoint.equals("earliest")) {
logger.error("config file startPoint error. startPoint must be latest or earliest");
return -;
}
kafkaConfig.subscribeStartPoint = startPoint;
String brokerList = props.getProperty("brokerList");
String[] brokers = brokerList.split(",");
kafkaConfig.replicaBrokers = new ArrayList<String>();
for (String str : brokers) {
kafkaConfig.replicaBrokers.add(str);
}
inited = true;
logger.info("init success. kafkaConfig:" + kafkaConfig.toString());
return ;
}
}
Main.java代碼
package kafka.simple;
public class Main {
public static void main(String[] args) {
KafkaSimpleConsumer consumer = new KafkaSimpleConsumer();
consumer.init("./consumer.properties");
consumer.run();
}
}