天天看点

SparkStreaming 对接Kafka手动提交偏移量异常

场景:

应业务需求,需使用SparkStreaming 一分钟一批次消费Kafka数据,消费2批次后出现提交偏移量异常

提交代码如下

dataRDD.foreachRDD(rdd=>{
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      
      //业务统计
      //....
      
      //commitAsync会将offset提交到队列中,是异步的,只有当下次拉取时,才会将队列中的offset进行commit。
      dataRDD.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })
           

0/06/23 10:50:00 ERROR ConsumerCoordinator: Offset commit failed.

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

错误分析:

俩次提交间隔时间太长,导致 kafka误以为consumer以及死了,发生了rebalanced

解决办法:

增大max.poll.interval.ms

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
//设置kafka参数
    val kafkaParams: Map[String, Object] = HashMap[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" ->  "latest",
      "enable.auto.commit" ->  (false: java.lang.Boolean),
      "max.poll.interval.ms" -> "70000" //间隔70s
    )
           

执行程序发现:

20/06/23 11:16:45 WARN ConsumerConfig: The configuration max.poll.interval.ms = 70000 was supplied but isn't a known config.

Kafka消费者中没有max.poll.interval.ms这项参数 

查看pom文件发现缺少【kafka-clients】:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>
           

添加后问题解决!!!

查看kafka-clients源码后发现:

.define(MAX_POLL_INTERVAL_MS_CONFIG,
        Type.INT,
        300000,
        atLeast(1),
        Importance.MEDIUM,
        MAX_POLL_INTERVAL_MS_DOC)
           

max.poll.interval.ms 默认大小是300s(大于60s),

所以在添加pom文件后,默认参数就能满足需求,不用再设置max.poll.interval.ms参数。

常见参数默认值:

HEARTBEAT_INTERVAL_MS_CONFIG 3000
           
SESSION_TIMEOUT_MS_CONFIG 10000
           
REQUEST_TIMEOUT_MS_CONFIG 305000 // chosen to be higher than the default of max.poll.interval.ms
           
MAX_POLL_INTERVAL_MS_CONFIG 300000
           
MAX_POLL_RECORDS_CONFIG 500
           

继续阅读