天天看點

kafka進擊之路(四) ——consumer simple API開發說明consumer低級别API開發

說明

kafka版本:kafka_2.10-0.8.2.1(kafka0.9.xx版本提供了新的API)

IED環境:intellij14 + maven3.3

語言:java

consumer低級别API開發

低級别API适用場景

低級别API和進階别最大的不同就是你可以自己控制一個topic的不同partition的消費和offset。适用于:1)你想多次讀一條消息,2)你隻想消費一個topic的部分partition,3)你想對partition的offset有更加嚴格的控制等。

當然,更多的控制同時帶來了更多的工作,比如1)你必須在程式中自己控制offset,2)你必須自己控制并處理不同partition的kafka broker的leader問題等。

程式示例

本程式實作一個低級别API的kafka consumer,實作對offset保持至本地檔案,下次啟動時,自己從offset檔案讀取offset位置。

maven依賴

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2</artifactId>
            <version></version>
      </dependency>
           

程式包組織

kafka進擊之路(四) ——consumer simple API開發說明consumer低級别API開發

配置檔案consumer.properties

brokerList=xxxx,xxxx,xxxx,xxxx
port=
topic=myTopic
partitionNum=
#offset file path
checkpoint=./checkpoint
#once subscribe size
patchSize=
#latest or earliest
subscribeStartPoint=earliest
           

KafkaConfig.java代碼

package kafka.simple;

import java.util.List;

public class KafkaConfig {

    public String topic = null;                 // topic
    public int partitionNum = ;                // partition個數
    public int port = ;                        // kafka broker端口号
    public List<String> replicaBrokers = null;  // kafka broker ip清單

    public String checkpoint;                   // checkpoint目錄,即儲存partition offset的目錄
    public int patchSize = ;                  // 一次讀取partition最大消息個數

    public String subscribeStartPoint = null;   // 預設開始訂閱點,latest or earliest(最近或者最早)

    public KafkaConfig() { }

    @Override
    public String toString() {
        return "[brokers:" + replicaBrokers.toString()
                + "] [port:" + port
                + "] [topic:" + topic
                + "] [partition num:" + partitionNum
                + "] [patch size:" + patchSize
                + "] [start point:" + subscribeStartPoint
                + "]";
    }
}
           

KafkaUtil.java代碼

package kafka.simple;

import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class KafkaUtil {
    private static final Logger logger = LogManager.getLogger(KafkaUtil.class);

    /**
     * 找一個broker leader
     * @param seedBrokers 配置的broker清單
     * @param port broker端口
     * @param topic
     * @param partition
     * @return
     */
    public static PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {
        PartitionMetadata returnMeataData = null;
        logger.info("find leader begin. brokers:[" + seedBrokers.toString() + "]");
        loop:
        for (String seed : seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, port, ,  * , "leaderLookup");
                List<String> topics = Collections.singletonList(topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse res = consumer.send(req);

                List<TopicMetadata> metadatas = res.topicsMetadata();
                for (TopicMetadata item : metadatas) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == partition) {
                            returnMeataData = part;
                            break loop;
                        }
                    }

                }
            } catch (Exception e) {
                logger.error("error communicating with broker [" + seed + "] to find Leader for [" + topic
                        + ", " + partition + "] Reason: " + e);
            } finally {
                if (consumer != null) {
                    consumer.close();
                }
            }
        }
        if (returnMeataData != null) {
            // TODO
            // seedBrokers.clear();
            // for (kafka.cluster.Broker seed : returnMeataData.replicas()) {
            //    seedBrokers.add(seed.host());
            // }
        }
        return returnMeataData;
    }

    /**
     * 找一個新的kafka broker leader
     * @param oldLeader
     * @param seedBrokers
     * @param port
     * @param topic
     * @param partition
     * @return
     * @throws Exception
     */
    public static String findNewLeader(String oldLeader,
                                       List<String> seedBrokers, int port,
                                       String topic, int partition) throws Exception {
        for (int i = ; i < ; ++i) {
            boolean sleep = false;
            PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);
            if (metadata == null) {
                sleep = true;
            } else if (metadata.leader() == null) {
                sleep = true;
            } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == ) {
                sleep = true;
            } else {
                return metadata.leader().host();
            }
            if (sleep) {
                try {
                    Thread.sleep();
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        logger.warn("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    /**
     * 擷取指定topic,指定partition的offset
     * @param consumer
     * @param topic
     * @param partition
     * @param whichTime
     * @param clientId
     * @return
     */
    public static long getSpecificOffset(SimpleConsumer consumer,
                                         String topic, int partition,
                                         long whichTime, String clientId) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, ));
        kafka.javaapi.OffsetRequest request =
                new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientId);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            logger.warn("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return -;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[];
    }
}
           

PartitionMsgTask.java代碼

package kafka.simple;

import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.message.MessageAndOffset;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class PartitionMsgTask implements Runnable {

    private static final Logger logger = LogManager.getLogger(PartitionMsgTask.class);

    private KafkaConfig kafkaConfig = null;

    private String filePath = null;
    private int partitionIndex = ;
    private SimpleConsumer consumer = null;
    private String leadBroker = null;
    private long readOffset = ;
    private String clientName = null;

    public PartitionMsgTask(KafkaConfig config, int index) {
        partitionIndex = index;
        this.kafkaConfig = config;
        filePath = kafkaConfig.checkpoint + "/partition" + partitionIndex;
    }

    public void shutdown() {
        if (consumer != null) {
            consumer.close();
        }
    }

    public void run() {
        logger.info("partition:" + partitionIndex + " config:" + kafkaConfig.toString());

        PartitionMetadata metadata = KafkaUtil
                .findLeader(kafkaConfig.replicaBrokers, kafkaConfig.port, kafkaConfig.topic, partitionIndex);
        if (metadata == null) {
            logger.error("Can't find metadata for Topic:" + kafkaConfig.topic
                    + " and Partition:" + partitionIndex + ". Exiting");
            return;
        }
        if (metadata.leader() == null) {
            logger.error("Can't find Leader for Topic:" + kafkaConfig.topic
                    + " and Partition:" + partitionIndex + ". Exiting");
            return;
        }

        leadBroker = metadata.leader().host();
        clientName = "Client_" + kafkaConfig.topic + "_" + partitionIndex;
        logger.info("leadBroker:" + leadBroker + " client:" + clientName);
        consumer = new SimpleConsumer(leadBroker, kafkaConfig.port, ,  * , clientName);
        // first time to get offset
        readOffset = getOffset();
        logger.info("first time get offset :" + readOffset);
        if (readOffset == -) {
            logger.error("get offset failed");
            return;
        }

        logger.info("partition" + partitionIndex + " thread run success.");
        while (true) {
            try {
                int ret;
                List<String> messageList = new ArrayList<String>();
                // subscribe message
                long offset = subsribe(kafkaConfig.patchSize, messageList);
                if (offset < ) {
                    logger.error("subscribe message failed. will continue");
                    continue;
                }
                // todo process messageList
                // todo 如果處理失敗,可重試或者繼續,自己選擇是否儲存offset
                // save offset
                ret = saveOffset(offset);
                if (ret != ) {
                    if (saveOffset(offset) != ) {
                        continue;
                    }
                }
                readOffset = offset;
            } catch (Exception e) {
                logger.error("exception :" + e.getMessage());
            }
        }
    }

    /**
     * 擷取offset
     * @return
     */
    public long getOffset() {
        long offset = -;
        // get from file
        String offsetFile = filePath + "/" + "offset";
        BufferedReader reader = null;
        try {
            File file = new File(offsetFile);
            reader = new BufferedReader(new FileReader(file));
            String tempStr = reader.readLine();
            offset = Long.parseLong(tempStr);
            reader.close();
            return offset;
        } catch (FileNotFoundException e) {
            logger.info("offset file:" + offsetFile + " not found. will get the "
                    + kafkaConfig.subscribeStartPoint + " offset.");
        } catch (IOException e) {
            logger.error("get offset from file exception");
            return -;
        }

        if (kafkaConfig.subscribeStartPoint.equals("earliest")) {
            // get earliest offset
            offset = KafkaUtil.getSpecificOffset(consumer,
                    kafkaConfig.topic, partitionIndex,
                    kafka.api.OffsetRequest.EarliestTime(), clientName);
        } else if (kafkaConfig.subscribeStartPoint.equals("latest")) {
            // get latest offset
            offset = KafkaUtil.getSpecificOffset(consumer,
                    kafkaConfig.topic, partitionIndex,
                    kafka.api.OffsetRequest.LatestTime(), clientName);
        } else {
            logger.error("kafka config start point error");
        }
        return offset;
    }

    /**
     * 保持offset
     * @param offset
     * @return
     */
    public int saveOffset(long offset) {
        String offsetFile = filePath + "/" + "offset";
        try {
            File file = new File(offsetFile);
            if (!file.exists()) {
                file.createNewFile();
            }
            FileWriter fileWriter = new FileWriter(file);
            fileWriter.write(String.valueOf(offset));
            fileWriter.close();
        } catch (IOException e) {
            logger.error("save offset failed");
            return -;
        }
        return ;
    }

    /**
     * 訂閱消息
     * @param maxReads
     * @param messageList
     * @return
     * @throws Exception
     */
    public long subsribe(long maxReads, List<String> messageList) throws Exception {
        if (messageList == null) {
            logger.warn("messageList is null");
            return -;
        }
        int numErrors = ;
        long offset = readOffset;
        while (maxReads > ) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, kafkaConfig.port, ,  * , clientName);
            }
            FetchRequest request = new FetchRequestBuilder().clientId(clientName)
                    .addFetch(kafkaConfig.topic, partitionIndex, offset, ).build();
            FetchResponse fetchResponse = consumer.fetch(request);

            if (fetchResponse.hasError()) {
                logger.warn("fetch response has error");
                numErrors++;
                short code = fetchResponse.errorCode(kafkaConfig.topic, partitionIndex);
                logger.warn("Error fetching data from the Broker:" + leadBroker + " error code: " + code);
                if (numErrors > ) {
                    return -;
                }
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for the last element to reset
                    // offset = KafkaUtil.getLastOffset(consumer, kafkaConfig.topic, partitionIndex,
                    //        kafka.api.OffsetRequest.LatestTime(), clientName);
                    logger.warn("offset out of range. will get a new offset");
                    offset = getOffset();
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = KafkaUtil.findNewLeader(leadBroker,
                        kafkaConfig.replicaBrokers,
                        kafkaConfig.port,
                        kafkaConfig.topic,
                        partitionIndex);
                continue;
            }
            numErrors = ;

            long numRead = ;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(kafkaConfig.topic, partitionIndex)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < offset) {
                    logger.warn("Found an old offset: " + currentOffset + " Expecting: " + offset);
                    continue;
                }
                offset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                String message = new String(bytes, "UTF-8");
                messageList.add(message);
                numRead++;
                maxReads--;
            }
            if (numRead == ) {
                try {
                    Thread.sleep();
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        return offset;
    }
}
           

KafkaSimpleConsumer.java代碼

package kafka.simple;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KafkaSimpleConsumer {
    private static final Logger logger = LogManager.getLogger(KafkaSimpleConsumer.class);

    private KafkaConfig kafkaConfig = null;
    private ExecutorService executor = null;
    private boolean inited = false;

    public void run() {
        if (!inited) {
            logger.error("uninit, init first!");
            return;
        }
        File file = new File(kafkaConfig.checkpoint);
        if (!file.exists()) {
            file.mkdir();
        }
        executor = Executors.newFixedThreadPool(kafkaConfig.partitionNum);
        int threadNum = ;
        for (; threadNum < kafkaConfig.partitionNum; ++threadNum) {
            file = new File(kafkaConfig.checkpoint +"/partition" + threadNum);
            if (!file.exists() && !file.isDirectory()) {
                file.mkdir();
            }
            logger.info("begin submit partition msg task thread");
            executor.submit(new PartitionMsgTask(kafkaConfig, threadNum));
        }
    }

    public int init(String confFile) {
        Properties props = new Properties();
        kafkaConfig = new KafkaConfig();
        try {
            FileInputStream in = new FileInputStream(confFile);
            props.load(in);
        } catch (FileNotFoundException e) {
            logger.error("kafka config file not found. file name:" + confFile);
            return -;
        } catch (IOException e) {
            logger.error("properties load file failed");
            return -;
        }
        kafkaConfig.topic = props.getProperty("topic");
        kafkaConfig.port = Integer.parseInt(props.getProperty("port"));
        kafkaConfig.partitionNum = Integer.parseInt(props.getProperty("partitionNum"));
        kafkaConfig.checkpoint = props.getProperty("checkpoint");
        kafkaConfig.patchSize = Integer.parseInt(props.getProperty("patchSize"));
        String startPoint = props.getProperty("subscribeStartPoint");
        if (!startPoint.equals("latest") && !startPoint.equals("earliest")) {
            logger.error("config file startPoint error. startPoint must be latest or earliest");
            return -;
        }
        kafkaConfig.subscribeStartPoint = startPoint;
        String brokerList = props.getProperty("brokerList");
        String[] brokers = brokerList.split(",");
        kafkaConfig.replicaBrokers = new ArrayList<String>();
        for (String str : brokers) {
            kafkaConfig.replicaBrokers.add(str);
        }
        inited = true;
        logger.info("init success. kafkaConfig:" + kafkaConfig.toString());
        return ;
    }

}
           

Main.java代碼

package kafka.simple;

public class Main {
    public static void main(String[] args) {
        KafkaSimpleConsumer consumer = new KafkaSimpleConsumer();
        consumer.init("./consumer.properties");

        consumer.run();
    }
}