天天看點

kafka Java用戶端之consumer 流量控制 以及 Rebalance解析

Consumer 流量控制

為了避免Kafka中的流量劇增導緻過大的流量打到Consumer端将Consumer給壓垮的情況,我們就需要針對Consumer進行限流。例如,當處理的資料量達到某個門檻值時暫停消費,低于門檻值時則恢複消費,這就可以讓Consumer保持一定的速率去消費資料,進而避免流量劇增時将Consumer給壓垮。

還有的情況就是一個消費者配置設定了多個分區,并同時消費所有的分區,這些分區具有相同的優先級。在一些情況下,消費者需要首先消費一些指定的分區,當指定的分區有少量或者已經沒有可消費的資料時,則開始消費其他分區。

例如流處理,當處理器從2個topic擷取消息并把這兩個topic的消息合并,當其中一個topic長時間落後另一個,則暫停消費,以便落後的趕上來。

kafka支援動态控制消費流量,分别在future的poll(long)中使用pause(Collection) 和 resume(Collection) 來暫停消費指定配置設定的分區,重新開始消費指定暫停的分區。

結合令牌桶來對kafka consumer實作限流:

  1. 在poll到資料之後,先去令牌桶中拿取令牌
  2. 如果擷取到令牌,則繼續業務處理
  3. 如果擷取不到令牌,則調用pause方法暫停Consumer,等待令牌
  4. 當令牌桶中的令牌足夠,則調用resume方法恢複Consumer的消費狀态

接下來編寫具體的代碼案例簡單示範一下這個限流思路,令牌桶算法使用Guava裡内置的,是以需要在項目中添加對Guava的依賴。單機限流可以直接使用 Google Guava 自帶的限流工具類 RateLimiter 。 RateLimiter 基于令牌桶算法,可以應對突發流量。

除了最基本的令牌桶算法(平滑突發限流)實作之外,Guava 的RateLimiter還提供了 平滑預熱限流 的算法實作。 平滑突發限流就是按照指定的速率放令牌到桶裡,而平滑預熱限流會有一段預熱時間,預熱時間之内,速率會逐漸提升到配置的速率。

​​Google Guava 項目位址​​

添加的依賴項如下:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.0.1-jre</version>
    <!-- or, for Android: -->
    <!-- <version>31.0.1-android</version>-->
</dependency>      

然後我們就可以使用Guava的限流器對Consumer進行限流了,測試代碼如下

/*
    流量控制 - 限流
 */
private static void controlPause() {

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "81.68.82.48:9092");
    properties.setProperty("group.id", "groupxt");
    properties.setProperty("enable.auto.commit", "false");
    properties.setProperty("auto.commit.interval.ms", "1000");
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);


    TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
    TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
    TopicPartition p2 = new TopicPartition(TOPIC_NAME,2);


    /*** 令牌生成速率,機關為秒 */
    //分别控制每個partition的消費速度
    final int permitsPerSecond1 = 5;
    final int permitsPerSecond2 = 3;
    final int permitsPerSecond3 = 6;

    /*** 限流器 */
    final RateLimiter LIMITER = RateLimiter.create(permitsPerSecond1);
    final RateLimiter LIMITER2 = RateLimiter.create(permitsPerSecond2);
    final RateLimiter LIMITER3 = RateLimiter.create(permitsPerSecond3);


    // 消費訂閱某個Topic的某個分區或多個
    consumer.assign(Arrays.asList(p0,p1,p2));


    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
        //如果沒有拉取到消息,重新拉取
        if (records.isEmpty()) {
            continue;
        }

        // 每個partition單獨處理
        for(TopicPartition partition : records.partitions()){

            List<ConsumerRecord<String, String>> pRecord = records.records(partition);

            for (ConsumerRecord<String, String> record : pRecord) {
                //執行業務操作,consumer 拉取消息會堵塞在此處,要等其他業務處理完這些消息,consumer才會拉取下一批消息
                System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                        record.partition(), record.offset(), record.key(), record.value());
                /*
                    1、接收到record資訊以後,去令牌桶中拿取令牌
                    2、如果擷取到令牌,則繼續業務處理
                    3、如果擷取不到令牌, 則pause等待令牌
                    4、當令牌桶中的令牌足夠, 則将consumer置為resume狀态
                 */

                // 限流partition 0
                if (!LIMITER.tryAcquire()) {
                    System.out.println("無法擷取到p0令牌,暫停消費p0");
                    consumer.pause(Arrays.asList( p0));
                } else {
                    System.out.println("擷取到p0令牌,恢複消費p0");
                    consumer.resume(Arrays.asList(p0));
                }

                // 限流partition 1
                if (!LIMITER2.tryAcquire()) {
                    System.out.println("無法擷取到p1令牌,暫停消費p1");
                    consumer.pause(Arrays.asList(p1));
                } else {
                    System.out.println("擷取到p1令牌,恢複消費p1");
                    consumer.resume(Arrays.asList(p1));
                }

                // 限流partition 2
                if (!LIMITER3.tryAcquire()) {
                    System.out.println("無法擷取到p2令牌,暫停消費p2");
                    consumer.pause(Arrays.asList(p2));
                } else {
                    System.out.println("擷取到p2令牌,恢複消費p2");
                    consumer.resume(Arrays.asList(p2));
                }

            }

            long lastOffset = pRecord.get(pRecord.size() -1).offset();
            // 單個partition中的offset,并且進行送出
            Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
            offset.put(partition,new OffsetAndMetadata(lastOffset+1));
            // 送出offset
            consumer.commitSync(offset);
            System.out.println("=============partition - "+ partition +" end================");
        }
    }
}      

也可以使用其他一些限流的庫,比如 ​​Bucket4j​​ 是一個非常不錯的基于令牌/漏桶算法的限流庫。 相對于,Guava 的限流工具類來說,Bucket4j 提供的限流功能更加全面。不僅支援單機限流和分布式限流,還可以內建監控,搭配 Prometheus 和 Grafana 使用。 不過,畢竟 Guava 也隻是一個功能全面的工具類庫,其提供的開箱即用的限流功能在很多單機場景下還是比較實用的。

Spring Cloud Gateway 中自帶的單機限流的早期版本就是基于 Bucket4j 實作的。後來,替換成了 ​​Resilience4j​​。 Resilience4j 是一個輕量級的容錯元件,其靈感來自于 Hystrix。自Netflix 宣布不再積極開發 Hystrix (opens new window) 之後,Spring 官方和 Netflix 都更推薦使用 Resilience4j 來做限流熔斷。

一般情況下,為了保證系統的高可用,項目的限流和熔斷都是要一起做的。 Resilience4j 不僅提供限流,還提供了熔斷、負載保護、自動重試等保障系統高可用開箱即用的功能。并且,Resilience4j 的生态也更好,很多網關都使用 Resilience4j 來做限流熔斷的。 是以,在絕大部分場景下 Resilience4j 或許會是更好的選擇。如果是一些比較簡單的限流場景的話,Guava 或者 Bucket4j 也是不錯的選擇。

分布式限流

分布式限流常見的方案:

  • 借助中間件架限流 :可以借助 Sentinel 或者使用 Redis 來自己實作對應的限流邏輯。
  • 網關層限流 :比較常用的一種方案,直接在網關層把限流給安排上了。不過,通常網關層限流通常也需要借助到中間件/架構。就比如 Spring Cloud Gateway 的分布式限流實作RedisRateLimiter就是基于 Redis+Lua 來實作的,再比如 Spring Cloud Gateway 還可以整合 Sentinel 來做限流。

如果你要基于 Redis 來手動實作限流邏輯的話,建議配合 Lua 腳本來做。 網上也有很多現成的腳本供你參考,就比如 Apache 網關項目 ​​ShenYu​​ 的 RateLimiter 限流插件就基于 Redis + Lua 實作了令牌桶算法/并發令牌桶算法、漏桶算法、滑動視窗算法。

Consumer 消費控制

上面講到了kafka的流量控制,避免拉取過多的消息而導緻服務被壓崩。但是有時候我們需要及時迅速的消費掉生産者生産的消息,避免造成消費積壓問題。那應該怎麼做呢?

消費太慢

增加Topic的分區數,并且同時提升消費組的消費者數量,然後多線程消費消息進而提升消費速度,消費者最多的時候可以一個消費者消費一個分區

消費太快

可以采用上面的令牌桶等限流的方法,也可以調整kafka自己的參數

調整參數:

  • fetch.max.bytes:單次擷取資料的最大消息數。
  • max.poll.records <= 吞吐量 :單次poll調用傳回的最大消息數,如果處理邏輯很輕量,可以适當提高該值。預設值為500

一次從kafka中poll出來的資料條數,max.poll.records條資料需要在在session.timeout.ms這個時間内處理完

consumer.poll(1000)

新版本的Consumer的Poll方法使用了類似于Select I/O機制,是以所有相關事件(包括reblance,消息擷取等)都發生在一個事件循環之中。

1000是一個逾時時間,一旦拿到足夠多的資料(參數設定),consumer.poll(1000)會立即傳回 ConsumerRecords<String, String> records。

如果沒有拿到足夠多的資料,會阻塞1000ms,但不會超過1000ms就會傳回。

Consumer Rebalance解析

Consumer有個Rebalance的特性,即重新負載均衡,該特性依賴于一個協調器來實作。每當Consumer Group中有Consumer退出或有新的Consumer加入都會觸發Rebalance。

之是以要重新負載均衡,是為了将退出的Consumer所負責處理的資料再重新配置設定到組内的其他Consumer上進行處理。或當有新加入的Consumer時,将組内其他Consumer的負載壓力,重新進均勻配置設定,而不會說新加入一個Consumer就閑在那。

下面就用幾張圖簡單描述一下,各種情況觸發Rebalance時,組内成員是如何與協調器進行互動的。

1、新成員加入組(member join):

kafka Java用戶端之consumer 流量控制 以及 Rebalance解析

圖中的Coordinator是協調器,有新的成員加入的時候,會要求所有成員斷開,然後全部進行重連。而generation則類似于樂觀鎖中的版本号,每當成員入組成功就會更新,也是起到一個并發控制的作用,避免送出offset的髒資料,每次送出offset的時候要帶着generation這個版本号,隻有版本号對應上了,才認為送出的offset是有效的,才會接收這個送出2、組成員崩潰/非正常退出(member failure):

如有有一個consumer當機了,會重新rebalance一下,重新配置設定一下partition

kafka Java用戶端之consumer 流量控制 以及 Rebalance解析

3、組成員主動離組/正常退出(member leave group):

kafka Java用戶端之consumer 流量控制 以及 Rebalance解析

4、當Consumer送出位移(member commit offset)時,也會有類似的互動過程:

如果offset沒有送出成功,但是業務又做了,可能就會導緻重複消費問題

References:

  • ​​https://javaguide.cn/high-availability/limit-request/​​
  • ​​https://www.orchome.com/451#item-9​​
  • ​​https://blog.51cto.com/zero01/2498017​​