1、kafka消费实例的工作过程,从启动开始。(待补充)
2、auto.offset.reset属性:默认值为 latest
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
3、获得消费组的offset
kafka0.9以后的版本,group.id的消费信息不再存储在zookeeper上。kafka会新建立一个topic(__consumer_offsets),默认分区为50,用于存放消费组的消费信息。位置如
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICdzFWRoRXdvN1LclHdpZXYyd2LcBzNvwVZ2x2bzNXak9CX90TQNNkRrFlQKBTSvwFbslmZvwFMwQzLcVmepNHdu9mZvwFVywUNMZTY18CX052bm9CX90TUk1GZtJmdshlW1ZkMkZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39TO2MDNzgDM3ETNwEDM4EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
如何获得某个消费组的消费情况。分如下几步:
第一、得到这个消费者的 group.id.
第二、计算得到分区数值,例如:Math.abs("console-consumer-46965".hashCode()) % 50 = 11。
第三、在 topic为 __consumer_offsets 的分区 中得到消费者offset的数据。命令如:
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
4、kafka消费组位移重设
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets
#确定作用域
--all-topics (为consumer group下所有topic的所有分区调整位移)
#位移重设策略
--to-earliest 把位移调整到分区当前最小位移
#确定执行方案
--execute 执行真正的位移调整