天天看點

SparkStreaming 對接Kafka手動送出偏移量異常

場景:

應業務需求,需使用SparkStreaming 一分鐘一批次消費Kafka資料,消費2批次後出現送出偏移量異常

送出代碼如下

dataRDD.foreachRDD(rdd=>{
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      
      //業務統計
      //....
      
      //commitAsync會将offset送出到隊列中,是異步的,隻有當下次拉取時,才會将隊列中的offset進行commit。
      dataRDD.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })
           

0/06/23 10:50:00 ERROR ConsumerCoordinator: Offset commit failed.

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

錯誤分析:

倆次送出間隔時間太長,導緻 kafka誤以為consumer以及死了,發生了rebalanced

解決辦法:

增大max.poll.interval.ms

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
//設定kafka參數
    val kafkaParams: Map[String, Object] = HashMap[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" ->  "latest",
      "enable.auto.commit" ->  (false: java.lang.Boolean),
      "max.poll.interval.ms" -> "70000" //間隔70s
    )
           

執行程式發現:

20/06/23 11:16:45 WARN ConsumerConfig: The configuration max.poll.interval.ms = 70000 was supplied but isn't a known config.

Kafka消費者中沒有max.poll.interval.ms這項參數 

檢視pom檔案發現缺少【kafka-clients】:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>
           

添加後問題解決!!!

檢視kafka-clients源碼後發現:

.define(MAX_POLL_INTERVAL_MS_CONFIG,
        Type.INT,
        300000,
        atLeast(1),
        Importance.MEDIUM,
        MAX_POLL_INTERVAL_MS_DOC)
           

max.poll.interval.ms 預設大小是300s(大于60s),

是以在添加pom檔案後,預設參數就能滿足需求,不用再設定max.poll.interval.ms參數。

常見參數預設值:

HEARTBEAT_INTERVAL_MS_CONFIG 3000
           
SESSION_TIMEOUT_MS_CONFIG 10000
           
REQUEST_TIMEOUT_MS_CONFIG 305000 // chosen to be higher than the default of max.poll.interval.ms
           
MAX_POLL_INTERVAL_MS_CONFIG 300000
           
MAX_POLL_RECORDS_CONFIG 500
           

繼續閱讀