天天看點

Kafka Consumer底層原理分析【Kafka系列】

雖然目前Kafka0.10版本已經重寫了其API,但底層原理是類似的,是以我們可以先了解kafka0.8.x裡面的提供的Consumer的實作原理與互動流程

Kafka提供了兩套API給Consumer

  • The SimpleConsumer API
  • The high-level Consumer API

1. 低階API

本質上是提供了一種與broker互動資訊的API

剩下的處理全靠使用者自己的程式,功能比較簡單,但使用者擴充性比較強

1) API結構

低階API的consumer,指定查找topic某個partition的指定offset去消費

首先與broker通信,尋找到leader(不與zookeeper通信,不存在groupid),然後直接和leader通信,指定offset去消費。消費多少,從哪裡開始消費,都可控(我們的例子是從0開始消費)

Kafka Consumer底層原理分析【Kafka系列】

findLeader方法中會去調用findPartitionMetadata方法

Kafka Consumer底層原理分析【Kafka系列】

程式運作結果:

運作過程中一直卡住沒有成功消費,加入如下錯誤資訊判斷,發現error code為1

說明我們從offset 0消費offsetoutofrange了

(我們發送請求topic1 partition0的offset 0  broker回複我們offset out of range,因為kafka中已經沒有offset 0 的資料了,已經過期清理掉了)

Kafka Consumer底層原理分析【Kafka系列】
Kafka Consumer底層原理分析【Kafka系列】

是以我們添加getLastOffset,getEarliestOffset的方法,擷取該topic該partition在kafka叢集中有的的最小和最大的offset

Kafka Consumer底層原理分析【Kafka系列】
Kafka Consumer底層原理分析【Kafka系列】
Kafka Consumer底層原理分析【Kafka系列】

調整offset之後,可能最新的資料也過期了,于是擷取到的message的size為0

Kafka Consumer底層原理分析【Kafka系列】

檢視SimpleConsumer的源碼:

Kafka Consumer底層原理分析【Kafka系列】
Kafka Consumer底層原理分析【Kafka系列】

1) 互動過程

使用SimpleConsumer的步驟

1) 從所有活躍的broker中找出哪個是指定Topic Partition中的leader broker

2) 擷取kafka中已存在的offset通路(或人工指定)

3) 構造請求

4) 發送請求查詢資料

5) 擷取查詢結果,處理(判斷擷取的結果,進行相應的處理)

處理就包括:

l 處理offset不存在的情況

l 處理offset的增長

l 處理leader broker變更

(當連接配接的這個brokerdown掉,我們要寫程式捕獲異常并且寫程式去切換broker,重新連接配接)

注意:該API是不阻塞的,SimpleConsumer傳一個請求過去,不論是資料過期、新的資料還沒來等,都會有一個response回來的

Kafka Consumer底層原理分析【Kafka系列】

使用SimpleConsumer有哪些弊端呢?

l 必須在程式中跟蹤offset值

l 必須找出指定Topic Partition中的lead broker

l 必須處理broker的變動

(當連接配接的這個brokerdown掉,我們要寫程式捕獲異常并且寫程式去切換broker,重新連接配接)

l 如果多個SimpleConsumer共享消費某個topic,想要實作彼此的負載均衡,需要添加很多額外代碼

(多個用戶端共享某個topic,就要保證他們的消費是互斥的,不能消費到同一條資料,比如A,B,C共享topicX共4個partition,那麼A就消費partition0,B消費partition1,partition2,C就消費partition3,保證其消費互相獨立,并且A,B,C的消費總和是整個topicX的所有消息)

2. 高階API

本質上是提供了一個完整的程式,内置各種功能(比如和其他consumer的負載均衡,比如處理broker變動)

使用者隻需要調用API即可,功能非常強大,但使用者擴充性比較差

1) API

Kafka Consumer底層原理分析【Kafka系列】

高階消費者API必須要指定group.id,否則會報錯

Kafka Consumer底層原理分析【Kafka系列】

2) 負載均衡原理與算法

該程式,運作多個程序,他們之間是可以實作負載均衡的。前提是他們同屬于一個group,擁有相同的groupid。

Kafka Consumer底層原理分析【Kafka系列】

每個consumer都會監聽zk上topic partition資訊和consumer的資訊

添加一個節點,他們就監聽到變化,監聽到變化就會調用rebanlance去重新算自己需要消費哪些partition。然後開始消費。

每個consumer都自己獨立去調整自己的消費

Kafka Consumer底層原理分析【Kafka系列】

會觸發consumer rebalance的場景有如下場景:

l 條件1:有新的consumer加入

l 條件2:舊的consumer挂了

l 條件3:coordinator挂了,叢集選舉出新的coordinator(0.10 特有的)

l 條件4:topic的partition新加

l 條件5:consumer調用unsubscrible(),取消topic的訂閱

這種負載均衡方案存在的問題

  • Herd effect(羊群效應)

任何Broker或者Consumer的增減都會觸發所有的Consumer的Rebalance,造成叢集内大量的調整

  • Split Brain

每個Consumer分别單獨通過Zookeeper判斷哪些Broker和Consumer 當機了,那麼不同Consumer在同一時刻從Zookeeper“看”到的View就可能不一樣,這是由Zookeeper的特性決定的,這就會造成不正确的Reblance嘗試。

  • 調整結果不可控

所有的Consumer都并不知道其它Consumer的Rebalance是否成功,這可能會導緻Kafka工作在一個不正确的狀态。

為了解決這些問題,Kafka作者在0.9.x版本中開始使用中心協調器(Coordinator)。由它統一來監聽zookeeper,生成rebalance指令,并且判斷是否成功,不成功進行重試(後面講解)

3) blockingQueue

Kafka Consumer底層原理分析【Kafka系列】

consumerMap

Topic1 4
Topic2 2
Topic3 1

核心建立方案就是:

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

Kafka Consumer底層原理分析【Kafka系列】

這個topicCountMap主要是指定消費線程數,該API底層的實作如下圖:

1.會為每個topic生成對應的消費線程,我們可以叫它消費線程。它會從一個blockingQueue裡面取資料。這個取的過程是阻塞的,如果queue中沒有資料,就會阻塞。

2.會為每個kafka的broker生成一個fetch線程,我們可以叫它取資料線程。每個fetch thread會于kafka broker建立一個連接配接。fetch thread線程去拉取消息資料,最終放到對應的blockingQueue中,等待消費線程來消費。

Kafka Consumer底層原理分析【Kafka系列】

用戶端使用時:

Kafka Consumer底層原理分析【Kafka系列】

根據topic,指定取某一個消費線程,拿出流資料,然後可以周遊該資料了,如上所述,該方法會是阻塞的,如果沒有資料了,它就會阻塞在這裡。

4) 關于offset

該api除了使用zk做負載均衡  還會用它記錄offset。

/consumers/groupid/offsets/topic/partition/xxxx

記錄消費到的offset值

Kafka Consumer底層原理分析【Kafka系列】

上圖左邊為在zookeeper中沒有此groupid節點的流程       右邊為有的流程

groupid節點如果沒有,會建立,然後offset的建立初始值會在kafka中擷取,預設是擷取最新的offset,也可以指定擷取kafka中目前存在的最小offset,如下參數可以人工指定

//偏移量,初始化從哪個位置讀

//props.put("auto.offset.reset", "smallest");

注意:此設定隻有在運作初始化的時候有效,如果zookeeper中已經有值,那麼這個參數是無效的,會直接去讀zookeeper中的offset值。如果還想擷取之前的資料,方法1手動修改zookeeper中該offset的值,方法2換一個groupid去消費,指定smallest。

在擷取到offset值之後,就是去kafka中消費資料,然後在zookeeper中更新此offset的值。這些都是API底層幫我們實作了,我們上層API無感覺。

5) 負載均衡小實驗

建立一個partition為2的topic

建立兩個groupid相同的consumer程序來消費這個topic的消息

一個producer不斷的打入消息

結果:

Kafka Consumer底層原理分析【Kafka系列】
Kafka Consumer底層原理分析【Kafka系列】

附API使用代碼

高階API

package com.wangke.consumer;

import com.wangke.kafkaProducerConsumer.KafkaProperties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Created by dell on 2017/8/11.
 */
public class HighConsumerTest {


    private static ConsumerConfig createConsumerConfig()
    {
        String zkConnect = "ip:2181";
        String groupId = "group2";
        Properties props = new Properties();
        props.put("zookeeper.connect", zkConnect);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        //偏移量,從哪個位置讀
        props.put("auto.offset.reset", "smallest");
        return new ConsumerConfig(props);
    }


    public static void main(String[] args) throws InterruptedException {
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        String topic = "test7";
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        int i=0;
        while (it.hasNext()) {
            System.out.println("receive:" + new String(it.next().message()));
            i++;
            if(i==10)
                break;
        }
        consumer.shutdown();
    }
}
           

低階API

package com.wangke.consumer;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 *
 * @author wangke
 * @date 2017/8/9
 */
public class SimpleComsumerTest {
    public static void main(String[] args) throws InterruptedException {

        String BROKER_CONNECT = "ip:9092";
        String TOPIC = "topic";
        int partitionNum = 0;
        // 找到leader
        Broker leaderBroker = findLeader(BROKER_CONNECT, TOPIC, partitionNum);
        if(leaderBroker==null){
            System.out.println("未找到leader資訊");
            return;
        }
        // 從leader消費    soTimeout  bufferSize  clientId
        SimpleConsumer simpleConsumer = new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), 20000, 10000, "mySimpleConsumer");
        long startOffet = 0;
        int fetchSize = 500;
        long offset = startOffet;
        while (true ) {
            System.out.println("offset:"+offset);
            // 添加fetch指定目标topic,分區,起始offset及fetchSize(位元組),可以添加多個fetch
            FetchRequest req = new FetchRequestBuilder().addFetch(TOPIC, partitionNum, offset, fetchSize).build();
            // 拉取消息
            FetchResponse fetchResponse = simpleConsumer.fetch(req);

            if (fetchResponse.hasError()) {
                // Something went wrong!
                short code = fetchResponse.errorCode(TOPIC, partitionNum);
                System.out.println("Error fetching data from the Broker:" + leaderBroker + " Reason: " + code);
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for
                    // the last element to reset
                    //offset = getEarliestOffset(simpleConsumer, TOPIC, partitionNum,  "mySimpleConsumer");
                    offset = getLastOffset(simpleConsumer, TOPIC, partitionNum,  "mySimpleConsumer");
                    continue;
                }
                System.out.println("Error fetching data Offset Data the Broker. Reason: " + fetchResponse.errorCode(TOPIC, partitionNum));
                continue;
            }
            ByteBufferMessageSet messageSet = fetchResponse.messageSet(TOPIC, partitionNum);
            if(messageSet.sizeInBytes() ==0){
                Thread.sleep(5000);
                System.out.println("資料為空");
                continue;
            }
            for (MessageAndOffset messageAndOffset : messageSet) {
                Message mess = messageAndOffset.message();
                ByteBuffer payload = mess.payload();
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                String msg = new String(bytes);
                offset = messageAndOffset.offset();
                System.out.println("partition : " + partitionNum + ", offset : " + offset + "  mess : " + msg);
            }
            // 繼續消費下一批
            offset = offset + 1;
            Thread.sleep(5000);
        }
    }

    /**
     * 找到制定分區的leader broker
     *
     * @param brokerHosts broker位址,格式為:“host1:port1,host2:port2,host3:port3”
     * @param topic topic
     * @param partition 分區
     * @return
     */
    private static Broker findLeader(String brokerHosts, String topic, int partition) {
        PartitionMetadata partitionMetadata = findPartitionMetadata(brokerHosts, topic, partition);
        if(partitionMetadata==null){
            System.out.println("未找到leader資訊");
            return null;
        }
        Broker leader = partitionMetadata.leader();
        System.out.println(String.format("Leader tor topic %s, partition %d is %s:%d", topic, partition, leader.host(), leader.port()));
        return leader;
    }
    /**
     * 找到指定分區的中繼資料
     *
     * @param brokerHosts broker位址,格式為:“host1:port1,host2:port2,host3:port3”
     * @param topic topic
     * @param partition 分區
     * @return 中繼資料
     */
    private static PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) {
        PartitionMetadata returnMetaData = null;
        for (String brokerHost : brokerHosts.split(",")) {
            SimpleConsumer consumer = null;
            String[] splits = brokerHost.split(":");
            consumer = new SimpleConsumer(splits[0], Integer.valueOf(splits[1]), 100000, 64 * 1024, "leaderLookup");
            List<String> topics = Collections.singletonList(topic);
            TopicMetadataRequest request = new TopicMetadataRequest(topics);
            TopicMetadataResponse response = consumer.send(request);
            List<TopicMetadata> topicMetadatas = response.topicsMetadata();
            for (TopicMetadata topicMetadata : topicMetadatas) {
                for (PartitionMetadata PartitionMetadata : topicMetadata.partitionsMetadata()) {
                    if (PartitionMetadata.partitionId() == partition) {
                        returnMetaData = PartitionMetadata;
                        break;//找到中繼資料,程式可以退出了
                    }
                }
            }
            if (consumer != null)
                consumer.close();
        }
        return returnMetaData;
    }

    /*消費者消費一個topic的指定partition時,從哪裡開始讀資料
     *kafka.api.OffsetRequest.EarliestTime()找到日志中資料的最開始頭位置,從那裡開始消費(hadoop-consumer中使用的應該就是這種方式)
     *kafka.api.OffsetRequest.LatestTime()隻消費最新的資料
     *注意,不要假設0是offset的初始值
     *參數:long whichTime的取值即兩種:
     *                          kafka.api.OffsetRequest.LatestTime()
     *                          kafka.api.OffsetRequest.LatestTime()
     *傳回值:一個long類型的offset*/
    private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    private static long getEarliestOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }
}