天天看點

Kafka 多線程消費問題

Step By Step

Kafka執行個體建立,這裡使用阿裡雲Kafka消息隊列。為了友善本地測試,建立公網 + VPC執行個體,參考 連結 公網接入

消費端程式

1、參數配置

參考: Kafka消息發送的三種模式 連接配接參數配置部分。

2、消費端:Code Sample

import com.base.JavaKafkaConfigurer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemo {

    //加載kafka.properties
    public static Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

    public static Properties initConfig()
    {
        Properties props = new Properties();
        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //設定接入點,即控制台的執行個體詳情頁顯示的“預設接入點”
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        //兩次poll之間的最大允許間隔
        //請不要改得太大,伺服器會掐掉空閑連接配接,不要超過30000
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 25000);

        //每次poll的最大數量
        //注意該值不要改得太大,如果poll太多資料,而不能在下次poll之前消費完,則會觸發一次負載均衡,産生卡頓
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);

        //目前消費執行個體所屬的消費組,請在控制台申請之後填寫
        //屬于同一個組的消費執行個體,會負載消費消息
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        return props;
    }


    public static void main(String[] args) {

        Properties properties = initConfig();
        int consumerThreadNum = 6; // 可以設定和Topic的分區資料一緻,這樣一個分區就可以配置設定一個線程來消費消息。

        String topic  = kafkaProperties.getProperty("topic");
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(properties, topic).start();
        }
    }

    public static class KafkaConsumerThread extends Thread{
        private KafkaConsumer<String, String> kafkaConsumer;

        public KafkaConsumerThread(Properties props, String topic)
        {
            this.kafkaConsumer = new KafkaConsumer<String, String>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }

        public void run()
        {
            try {
                while (true)
                {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    for (ConsumerRecord<String, String> record: records) {
                        System.out.println("Thread num: " + this.getName());
                        System.out.println(String.format("Consume partition:%d offset:%d the message body:%s", record.partition(), record.offset(),record.value()));
                    }
                }
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }finally {
                kafkaConsumer.close();
            }
        }
    }
}           

3、消費端情況

Kafka 多線程消費問題

4、服務端控制台檢視

Kafka 多線程消費問題

參考連結

訂閱者最佳實踐