天天看點

架構設計:系統間通信(30)——Kafka及場景應用(中3)

接上文:《架構設計:系統間通信(29)——Kafka及場景應用(中2)》

4-5、Kafka原理:消費者

作為Apache Kafka消息隊列,它的性能名額相當一部分取決于消費者們的性能——隻要消息能被快速消費掉不在Broker端形成擁堵,整個Apache Kafka就不會出現性能瓶頸問題。

4-5-1、基本使用

我們首先使用Kafka Client For JAVA API為各位讀者示範一下最簡單的Kafka消費者端的使用。以下示例代碼可以和上文中所給出的生産者代碼相對應,形成一個完整的消息建立——接收——發送過程:

package kafkaTQ;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

/**
 * 這是Kafka的topic消費者
 * @author yinwenjie
 */
public class KafkaConsumer_GroupOne {
    public static void main(String[] args) throws RuntimeException {
        // ==============首先各種連接配接屬性
        // Kafka消費者的完整連接配接屬性在Apache Kafka官網http://kafka.apache.org/documentation.html#consumerconfigs
        // 有詳細介紹(請參看Old Consumer Configs。New Consumer Configs是給Kafka V0.9.0.0+使用的)
        // 這裡我們設定幾個關鍵屬性
        Properties props = new Properties();
        // zookeeper相關的,如果有多個zk節點,這裡以“,”進行分割
        props.put("zookeeper.connect", "192.168.61.140:2181");
        props.put("zookeeper.connection.timeout.ms", "10000");
        // 還記得上文的說明嗎:對于一個topic而言,同一使用者組内的所有使用者隻被允許通路一個分區。
        // 是以要讓多個Consumer實作對一個topic的負載均衡,每個groupid的名稱都要一樣
        String groupname = "group2";
        props.put("group.id", groupname);

        //==============
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

        // 我們隻建立一個消費者
        HashMap<String, Integer> map = new HashMap<String, Integer>();
        map.put("my_topic2", );
        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map);

        // 擷取并啟動消費線程,注意看關鍵就在這裡,一個消費線程可以負責消費一個topic中的多個partition
        // 但是一個partition隻能配置設定到一個消費線程去
        KafkaStream<byte[], byte[]> stream = topicMessageStreams.get("my_topic2").get();
        new Thread(new ConsumerThread(stream)).start();

        // 接着鎖住主線程,讓其不退出
        synchronized (KafkaConsumer_GroupTwo.class) {
            try {
                KafkaConsumer_GroupTwo.class.wait();
            } catch (InterruptedException e) {
                e.printStackTrace(System.out);
            }
        }
    }

    /**
     * @author yinwenjie
     */
    private static class ConsumerThread implements Runnable {

        private KafkaStream<byte[], byte[]> stream;

        /**
         * @param stream
         */
        public ConsumerThread(KafkaStream<byte[], byte[]> stream) {
            this.stream = stream;
        }

        @Override
        public void run() {
            ConsumerIterator<byte[], byte[]> iterator =  this.stream.iterator();
            //============這個消費者擷取的資料在這裡
            while(iterator.hasNext()){  
                MessageAndMetadata<byte[], byte[]> message = iterator.next();
                int partition = message.partition();
                String topic = message.topic();
                String messageT = new String(message.message());
                System.out.println("接收到: " + messageT + "來自于topic:[" + topic + "] + 第partition[" + partition + "]"); 
            }
        }
    }
}
           

以上代碼片段有幾個關鍵點需要進行一下說明:

  • “map.put(“my_topic2”, 1);” 這句代碼表示将會為名叫“my_topic2”的隊列建立數量為1的消費者。在一個程序的連接配接中,您可以指定建立多個topic的消費者數量。例如:
......
# 為my_topic2的隊列建立數量為1的消費者
# 并且為my_topic3的隊列建立數量為4的消費者
map.put("my_topic2", 1);
map.put("my_topic3", 4);
......
           
  • 每一個消費者都需要一個獨立的線程進行工作。您可以将工作任務放入已經建立好的線程池(推薦這樣做),也可以像以上代碼示例中那樣建立一個線程并運作任務。
......
# 使用線程池
# 這裡的參數就是消費者的總數量
ExecutorService threadPool = Executors.newFixedThreadPool(1);
threadPool.execute(new ConsumerThread(stream));
......
           
  • 在開發過程中,消費者端無需知道任何一個Broker的位置。但是必須至少知道一個zookeeper服務節點的位置。通過這個位置,消費者端首先會去zookeeper服務上查找指定的topic的分區情況和已有的消費者情況。

4-5-2、分區與消費者負載

Apache Kafka叢集中的消費者以線程為機關,如在上一小節代碼段所示:我們在一個程序中,為Topic為“my_topic2”的隊列建立了一個線程,這個線程就是一個消費者——屬于名為“group2”的使用者組。這時,Topic中所有分區的消息都會交給這個消費線程進行消費。如下圖所示:

架構設計:系統間通信(30)——Kafka及場景應用(中3)

雖然一個消費者可以同時消費Topic中多個分區(Partition)的消息,但在生産環境下為了獲得更優的消費性能并不建議這樣做。由于單個消費者線程的處理能力是有限的,一旦出現資料洪峰,消息就會堆積在Broker端無法被處理(如果消費者端使用了線程池,則可能堆積在消費者端,這要看您怎麼編寫代碼)。是以上一個小節那樣的消費者編碼方式,最多就是用來做做“Hello World”那樣的示例,沒有更多的使用價值了。

4-5-3、優化 一:

第一種改進方法,就是讓一個消費者隻消費一個分區(Partition)中的消息,且整個系統中的消費者大于等于Topic中的分區數量。設計方案如下:

架構設計:系統間通信(30)——Kafka及場景應用(中3)

如上圖所示,這個Topic下一共有四個分區(Partition),對應的消費者數量也有四個,但是這四個消費者同屬于一個程序,存在于同一個實體節點上。我們根據這個設計方案,更改之前消費者端的代碼,如下(為了節約篇幅,隻給出主要的更改位置):

......
// 後續建立的所有消費者線程,都是屬于group2的消費組
String groupname = "group2";
props.put("group.id", groupname);

......
// 在這個程序中,為topic名為my_topic2的隊列建立了四個消費者
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("my_topic2", );
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map);

......
// 為這四個消費者配置設定四個不同的線程
// 消費者線程1
KafkaStream<byte[], byte[]> stream = topicMessageStreams.get("my_topic2").get();
new Thread(new ConsumerThread(stream)).start();

// 消費者線程2
stream = topicMessageStreams.get("my_topic2").get();
new Thread(new ConsumerThread(stream)).start();

// 消費者線程3
stream = topicMessageStreams.get("my_topic2").get();
new Thread(new ConsumerThread(stream)).start();

// 消費者線程4
stream = topicMessageStreams.get("my_topic2").get();
new Thread(new ConsumerThread(stream)).start();

......
// 接着鎖住主線程,讓其不退出
synchronized (KafkaConsumer_GroupTwo.class) {
    try {
        KafkaConsumer_GroupTwo.class.wait();
    } catch (InterruptedException e) {
        e.printStackTrace(System.out);
    }
}
......
           

4-5-4、優化 二:

顯然“優化方案一”中的做法雖然實作了4消費者分别對應4個分區的負載均衡方案,但是受限于單個實體節點的處理性能,是以這種方案的處理性能還有進一步優化的可能。我們可以在多個節點實體節點上均勻散步這些消費者,對Topic分區中的消息進行一一對應的消費,如下圖所示:

架構設計:系統間通信(30)——Kafka及場景應用(中3)

上圖所示的設計思路中,我們使用了2個實體節點完成消息的消費任務,每個服務節點上開啟的消費程序上有兩個消費者線程。這樣Topic中4個分區的消息就會被均勻分布到2個實體節點中,且每一個實體節點處理兩個分區中的消息。

注意:可能您在分别啟動這些消費程序的時候,由于時間上存在差異,某一台服務節點上的消費程序将暫時被配置設定多個分區進行消息接收。但沒有關系,當這個消費者性能到達瓶頸,分區中的消息出現擁堵的時候,這個分區就會被新的消費者所代替,直到10個消費者線程分别和10個分區建立一一對應關系為止

繼續閱讀