文章目錄
-
-
-
- 一、前言:
- 二、如果兩種模式都用的話會報錯
- 三、從指定的offset進行消費
- 四、怎麼獲得最後一條消息的offset
- 五、以時間戳查詢消息
- 六、消費者手動送出
- 七、同步和異步混合送出:
- 八、送出特定的偏移量:
- 九、監聽再均衡:
- 十、用java快速測試kafka連通性Demo:
-
-
一、前言:
使用Apache Kafka消費者組時,有一個為消費者配置設定對應分區partition的過程,我們可以使用“自動”subscribe和“手動”assign的方式。
- KafkaConsumer.subscribe():為consumer自動配置設定partition,有内部算法保證topic-partition以最優的方式均勻配置設定給同group下的不同consumer。
- KafkaConsumer.assign():為consumer手動、顯示的指定需要消費的topic-partitions,不受group.id限制,相當與指定的group無效(this method does not use the consumer’s group management)。
注意:consumer.assign()是不會被消費者的組管理功能管理的,他相對于是一個臨時的,不會改變目前group.id的offset,比如:在使用
consumer.subscribe(Arrays.asList(topicName));
時offset為20,如果再通過assign方式已經擷取了消息後,在下次通過
consumer.subscribe(Arrays.asList(topicName));
來擷取消息時offset還是20,還是會擷取20以後的消息。
二、如果兩種模式都用的話會報錯
報錯資訊:
java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
at org.apache.kafka.clients.consumer.internals.SubscriptionState.setSubscriptionType(SubscriptionState.java:111) ~[kafka-clients-0.11.0.2.jar!/:na]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.subscribe(SubscriptionState.java:118) ~[kafka-clients-0.11.0.2.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:873) ~[kafka-clients-0.11.0.2.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:901) ~[kafka-clients-0.11.0.2.jar!/:na]
at com.guoxin.sydjtxry.SydjtxryConsumer.doWork(SydjtxryConsumer.java:77) ~[classes!/:1.0-SNAPSHOT]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) ~[kafka_2.11-0.11.0.2.jar!/:na]
2020-04-12 09:46:38.705 INFO 43884 --- [ConsumerExample] com.guoxin.sydjtxry.SydjtxryConsumer : [KafkaConsumerExample]: Stopped
錯誤代碼:
consumer.subscribe(Collections.singletonList(this.topic));
TopicPartition partition = new TopicPartition(this.topic, 0);
consumer.assign(Arrays.asList(partition));
consumer.seek(partition, seekOffset);
三、從指定的offset進行消費
場景:kafka_2.11-0.11.0.2版本中建立的topic隻有一個分區。如果是多分區的話可以參考下這篇文章https://www.cnblogs.com/dongxishaonian/p/12038500.html
代碼:
package com.guoxin.sydjtxry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
/**
* Created by HuiQ on 2019-10-30.
*/
@Component
public class KafkaConsumerTask implements CommandLineRunner {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTask.class);
@Override
public void run(String... args) {
// 全量消費
SydjtxryConsumer.consumer();
}
}
package com.guoxin.sydjtxry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public final class KafkaProperties
{
private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);
private static Properties serverProps = new Properties();
private static Properties producerProps = new Properties();
private static Properties consumerProps = new Properties();
private static Properties clientProps = new Properties();
private static KafkaProperties instance = null;
public synchronized static KafkaProperties getInstance()
{
if (null == instance)
{
instance = new KafkaProperties();
}
return instance;
}
/**
* 擷取參數值
* @param key properites的key值
* @param defValue 預設值
* @return
*/
public String getValues(String key, String defValue)
{
String rtValue = null;
if (null == key)
{
LOG.error("key is null");
}
else
{
rtValue = getPropertiesValue(key);
}
if (null == rtValue)
{
LOG.warn("KafkaProperties.getValues return null, key is " + key);
rtValue = defValue;
}
LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue);
return rtValue;
}
/**
* 根據key值擷取server.properties的值
* @param key
* @return
*/
private String getPropertiesValue(String key)
{
String rtValue = serverProps.getProperty(key);
// server.properties中沒有,則再向producer.properties中擷取
if (null == rtValue)
{
rtValue = producerProps.getProperty(key);
}
// producer中沒有,則再向consumer.properties中擷取
if (null == rtValue)
{
rtValue = consumerProps.getProperty(key);
}
// consumer沒有,則再向client.properties中擷取
if (null == rtValue)
{
rtValue = clientProps.getProperty(key);
}
return rtValue;
}
}
package com.guoxin.sydjtxry;
import kafka.utils.ShutdownableThread;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
public class SydjtxryConsumer extends ShutdownableThread
{
private static final Logger LOG = LoggerFactory.getLogger(SydjtxryConsumer.class);
private final KafkaConsumer<String, String> consumer;
private final String topic;
private Long rowcount = 0L;
// 一次請求的最大等待時間
private final int waitTime = 10000;
// consumer從指定的offset處理
private Long seekOffset = 1936170L;
// Broker連接配接位址
private final String bootstrapServers = "bootstrap.servers";
/**
* NewConsumer構造函數
* @param topic 訂閱的Topic名稱
*/
public SydjtxryConsumer(String topic)
{
super("KafkaConsumerExample", false);
Properties props = new Properties();
KafkaProperties kafkaProc = KafkaProperties.getInstance();
// Broker連接配接位址
props.put(bootstrapServers,
kafkaProc.getValues(bootstrapServers, "192.110.110.33:9092"));
props.put("enable.auto.commit", "true"); // 自動送出
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
this.topic = topic;
}
/**
* 訂閱Topic的消息處理函數
*/
public void doWork()
{
// 訂閱
TopicPartition partition = new TopicPartition(this.topic, 0);
consumer.assign(Arrays.asList(partition));
consumer.seek(partition, seekOffset);
compareOffset = seekOffset;
// 消息消費請求
ConsumerRecords<String, String> records = consumer.poll(waitTime);
if (records.isEmpty()) {
System.out.println("消費者沒有消費到資料------->");
} else {
// 消息處理
for (ConsumerRecord<String, String> record : records) {
try {
JSONObject jsondata = JSONObject.fromObject(record.value().toString());
String table = jsondata.getString("table"); // 庫名.表名
if (table.equals("BJSX_OGG.GR_XX")) {
rowcount++;
// 業務邏輯
}
LOG.info("資料偏移量為-->" + record.offset());
} catch (Exception e) {
e.printStackTrace();
LOG.warn("偏移量為" + record.offset() + "的資料處理有問題,請排查-->" + record.value().toString());
}
seekOffset = record.offset();
if (seekOffset % 10000 == 0) {
LOG.info("offset-->" + seekOffset);
}
}
}
}
public static void consumer()
{
SydjtxryConsumer consumerThread = new SydjtxryConsumer("heheda");
consumerThread.start();
}
}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
遇到的問題:當消費到該topic最後一條資料後,以後的消費會循環消費該資料。改進:當消費完最後一條資料,以後的訂閱模式都由assign改為subscribe。
package com.guoxin.sydjtxry;
import kafka.utils.ShutdownableThread;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
public class SydjtxryConsumer extends ShutdownableThread
{
private static final Logger LOG = LoggerFactory.getLogger(SydjtxryConsumer.class);
private final KafkaConsumer<String, String> consumer;
private final String topic;
private Long rowcount = 0L;
// 一次請求的最大等待時間
private final int waitTime = 10000;
// consumer從指定的offset處理
private Long seekOffset = 1936170L;
private Long compareOffset = 0L;
private boolean flag = false;
// Broker連接配接位址
private final String bootstrapServers = "bootstrap.servers";
// Group id
private final String groupId = "group.id";
/**
* NewConsumer構造函數
* @param topic 訂閱的Topic名稱
*/
public SydjtxryConsumer(String topic)
{
super("KafkaConsumerExample", false);
Properties props = new Properties();
KafkaProperties kafkaProc = KafkaProperties.getInstance();
// Broker連接配接位址
props.put(bootstrapServers,
kafkaProc.getValues(bootstrapServers, "192.110.110.33:9092"));
props.put("enable.auto.commit", "true"); // 自動送出
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
// Group id
props.put(groupId, UUID.randomUUID().toString());
consumer = new KafkaConsumer<String, String>(props);
this.topic = topic;
}
/**
* 訂閱Topic的消息處理函數
*/
public void doWork()
{
// 訂閱
if (compareOffset.equals(seekOffset) && flag == false) {
// 暫停kafka的消費 暫停分區的配置設定
consumer.unsubscribe(); // 此處不取消訂閱暫停太久會出現訂閱逾時的錯誤
consumer.pause(consumer.assignment());
consumer.subscribe(Collections.singletonList(this.topic));
flag = true;
} else if (flag == true) {
consumer.subscribe(Collections.singletonList(this.topic));
} else {
TopicPartition partition = new TopicPartition(this.topic, 0);
consumer.assign(Arrays.asList(partition));
consumer.seek(partition, seekOffset);
compareOffset = seekOffset;
}
// 消息消費請求
ConsumerRecords<String, String> records = consumer.poll(waitTime);
if (compareOffset.equals(seekOffset + records.count() - 1) && flag == false) {
System.out.println("指定offset消費已結束,此條為末尾的重複消費資料,跳過業務處理,此後由assign改為subscribe訂閱模式-->");
} else {
if (records.isEmpty()) {
System.out.println("消費者沒有消費到資料------->");
} else {
// 消息處理
for (ConsumerRecord<String, String> record : records) {
try {
JSONObject jsondata = JSONObject.fromObject(record.value().toString());
String table = jsondata.getString("table"); // 庫名.表名
if (table.equals("BJSX_OGG.GR_XX")) {
// 業務邏輯
}
LOG.info("資料偏移量為-->" + record.offset());
} catch (Exception e) {
e.printStackTrace();
LOG.warn("偏移量為" + record.offset() + "的資料處理有問題,請排查-->" + record.value().toString());
}
seekOffset = record.offset();
if (seekOffset % 10000 == 0) {
LOG.info("offset-->" + seekOffset);
}
}
}
}
}
public static void consumer()
{
SydjtxryConsumer consumerThread = new SydjtxryConsumer("heheda");
consumerThread.start();
}
}
消費速度控制:
提供
pause(Collection<TopicPartition> partitions)
和
resume(Collection<TopicPartition> partitions)
方法,分别用來暫停某些分區在拉取操作時傳回資料給用戶端和恢複某些分區向用戶端傳回資料操作。通過這兩個方法可以對消費速度加以控制,結合業務使用。
四、怎麼獲得最後一條消息的offset
我覺得最後這種方法擷取到的值減1才是最後一條消息的offset:
package com.guoxin.sydjtxry;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.utils.ShutdownableThread;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class SydjtxryConsumer extends ShutdownableThread
{
private static final Logger LOG = LoggerFactory.getLogger(SydjtxryConsumer.class);
private final KafkaConsumer<String, String> consumer;
private final SimpleConsumer simpleConsumer;
private final String topic;
private Long rowcount = 0L;
// 一次請求的最大等待時間
private final int waitTime = 10000;
final int TIMEOUT = 100000;
final int BUFFERSIZE = 64 * 1024;
// Broker連接配接位址
private final String bootstrapServers = "bootstrap.servers";
private final String groupId = UUID.randomUUID().toString();
/**
* NewConsumer構造函數
* @param topic 訂閱的Topic名稱
*/
public SydjtxryConsumer(String topic)
{
super("KafkaConsumerExample", false);
Properties props = new Properties();
KafkaProperties kafkaProc = KafkaProperties.getInstance();
// Broker連接配接位址
props.put(bootstrapServers,
kafkaProc.getValues(bootstrapServers, "192.110.110.33:9092"));
props.put("enable.auto.commit", "true"); // 自動送出
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("group.id", groupId);
consumer = new KafkaConsumer<String, String>(props);
this.topic = topic;
// 擷取最後一條消息的offset
simpleConsumer = new SimpleConsumer("192.110.110.33", 9092, TIMEOUT, BUFFERSIZE, groupid);
long lastOffset = getLastOffset(simpleConsumer, this.topic, 0, groupid);
System.out.println("最後一條消息的offset是--->" + lastOffset);
}
/**
* 訂閱Topic的消息處理函數
*/
public void doWork()
{
// 訂閱
consumer.subscribe(Collections.singletonList(this.topic));
// 消息消費請求
ConsumerRecords<String, String> records = consumer.poll(waitTime);
if (records.isEmpty()) {
System.out.println("消費者沒有消費到資料------->");
} else {
// 消息處理
for (ConsumerRecord<String, String> record : records) {
try {
JSONObject jsondata = JSONObject.fromObject(record.value().toString());
String table = jsondata.getString("table"); // 庫名.表名
if (table.equals("BJSX_OGG.GR_XX")) {
rowcount++;
// 業務邏輯
}
LOG.info("資料偏移量為-->" + record.offset());
} catch (Exception e) {
e.printStackTrace();
LOG.warn("偏移量為" + record.offset() + "的資料處理有問題,請排查-->" + record.value().toString());
}
}
}
}
public static void consumer()
{
SydjtxryConsumer consumerThread = new SydjtxryConsumer("heheda");
consumerThread.start();
}
// 擷取最後一條消息的offset方法
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, String groupId) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),groupId);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
}
五、以時間戳查詢消息
Kafka 在0.10.1.1 版本增加了時間戳索引檔案,是以我們除了直接根據偏移量索引檔案查詢消息之外,還可以根據時間戳來通路消息。consumer-API 提供了一個offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,該方法入參為一個Map 對象,Key 為待查詢的分區,Value 為待查詢的時間戳,該方法會傳回時間戳大于等于待查詢時間的第一條消息對應的偏移量和時間戳。需要注意的是,若待查詢的分區不存在,則該方法會被一直阻塞。
假設我們希望從某個時間段開始消費,那們就可以用offsetsForTimes()方法定位到離這個時間最近的第一條消息的偏移量,在查到偏移量之後調用seek(TopicPartition partition, long offset)方法将消費偏移量重置到所查詢的偏移量位置,然後調用poll()方法長輪詢拉取消息。例如,我們希望從主題“stock-quotation”第0 分區距離目前時間相差12 小時之前的位置開始拉取消息
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("enable.auto.commit", true);// 顯示設定偏移量自動送出
props.put("auto.commit.interval.ms", 1000);// 設定偏移量送出時間間隔
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 訂閱主題
consumer.assign(Arrays.asList(new TopicPartition("test", 0)));
try {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition,Long>();
// 構造待查詢的分區
TopicPartition partition = new TopicPartition("stock-quotation", 0);
// 設定查詢12 小時之前消息的偏移量
timestampsToSearch.put(partition, (System.currentTimeMillis() - 12 * 3600 * 1000));
// 會傳回時間大于等于查找時間的第一個偏移量
Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes (timestampsToSearch);
OffsetAndTimestamp offsetTimestamp = null;
// 這裡依然用for 輪詢,當然由于本例是查詢的一個分區,是以也可以用if 處理
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {
// 若查詢時間大于時間戳索引檔案中最大記錄索引時間,
// 此時value 為空,即待查詢時間點之後沒有新消息生成
offsetTimestamp = entry.getValue();
if (null != offsetTimestamp) {
// 重置消費起始偏移量
consumer.seek(partition, entry.getValue().offset());
}
}
while (true) {
// 等待拉取消息
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records){
// 簡單列印出消息内容
System.out.printf("partition = %d, offset = %d,key= %s value = %s%n", record.partition(), record.offset(), record.key(),record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
六、消費者手動送出
場景:offset下标自動送出其實在很多場景都不适用,因為自動送出是在kafka拉取到資料之後就直接送出,這樣很容易丢失資料,尤其是在需要事物控制的時候。
很多情況下我們需要從kafka成功拉取資料之後,對資料進行相應的處理之後再進行送出。如拉取資料之後進行寫入mysql這種 , 是以這時我們就需要進行手動送出kafka的offset下标。
實施測試:
将 enable.auto.commit 改成 false 進行手動送出,并且設定每次拉取最大10條
props.put("enable.auto.commit", "false");
props.put("max.poll.records", 10);
将送出方式改成false之後,需要手動送出隻需加上這段代碼
- 同步送出:
consumer.commitSync();
- 異步送出:
consumer.commitAsync()
注:在成功送出或碰到無法恢複的錯誤之前,commitSync() 會一直重試,但是 commitAsync() 不會,這也是 commitAsync() 不好的一個地方。
它之是以不進行重試,是因為在它收到伺服器響應的時候,可能有一個更大的偏移量已經送出成功。假設我們發出一個請求用于送出偏移量 2000,這個時候發生了短暫的通信問題,伺服器收不到請求,自然也不會作出任何響應。與此同時,我們處理了另外一批消息,并成功送出了偏移量 3000。如果 commitAsync() 重新嘗試送出偏移量 2000,它有可能在偏移量 3000 之後送出成功。這個時候如果發生再均衡,就會出現重複消息。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null)
log.error("Commit failed for offsets {}", map, e);
}
});
}
說明:
手動送出的offset不能再次消費,未送出的可以再次進行消費。
這種做法一般也可以滿足大部分需求。例如從kafka擷取資料入庫,如果一批資料入庫成功,就送出offset,否則不送出,然後再次拉取。但是這種做法并不能最大的保證資料的完整性。
比如在運作的時候,程式挂了之類的。是以還有一種方法是手動的指定offset下标進行擷取資料,直到kafka的資料處理成功之後,将offset記錄下來,比如寫在資料庫中。
七、同步和異步混合送出:
一般情況下,針對偶爾出現的送出失敗,不進行重試不會有太大問題,因為如果送出失敗是因為臨時問題導緻的,那麼後續的送出總會有成功的。
但如果這是發生在關閉消費者或再均衡前的最後一次送出,就要確定能夠送出成功。是以在這種情況下,我們應該考慮使用混合送出的方法:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
- 在程式正常運作過程中,我們使用 commitAsync 方法來進行送出,這樣的運作速度更快,而且就算目前送出失敗,下次送出成功也可以。
- 如果直接關閉消費者,就沒有所謂的“下一次送出”了,因為不會再調用poll()方法。使用 commitSync() 方法會一直重試,直到送出成功或發生無法恢複的錯誤。
八、送出特定的偏移量:
如果 poll() 方法傳回一大批資料,為了避免因再均衡引起的重複處理整批消息,想要在批次中間送出偏移量該怎麼辦?這種情況無法通過調用 commitSync() 或 commitAsync() 來實作,因為它們隻會送出最後一個偏移量,而此時該批次裡的消息還沒有處理完。
這時候需要使用一下的兩個方法:
/**
* Commit the specified offsets for the specified list of topics and partitions.
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)
/**
* Commit the specified offsets for the specified list of topics and partitions to Kafka.
*/
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
消費者 API 允許在調用 commitSync() 和 commitAsync() 方法時傳進去希望送出的分區和偏移量的 map。
假設處理了半個批次的消息,最後一個來自主題“customers”分區 3 的消息的偏移量是 5000,你可以調用 commitSync() 方法來送出它。不過,因為消費者可能不隻讀取一個分區,你需要跟蹤所有分區的偏移量,是以在這個層面上控制偏移量的送出會讓代碼變複雜。
代碼如下:
private Map<TopicPartition, OffsetAndMetadata> currentOffsets =new HashMap<>();
int count = 0;
。。。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(),
record.partition()), new
OffsetAndMetadata(record.offset()+1, "no metadata"));
if (count % 1000 == 0)
consumer.commitAsync(currentOffsets,null);
count++;
}
}
這裡調用的是 commitAsync(),不過調用commitSync()也是完全可以的。在送出特定偏移量時,仍然要處理可能發生的錯誤。
九、監聽再均衡:
如果 Kafka 觸發了再均衡,我們需要在消費者失去對一個分區的所有權之前送出最後一個已處理記錄的偏移量。如果消費者準備了一個緩沖區用于處理偶發的事件,那麼在失去分區所有權之前,需要處理在緩沖區累積下來的記錄。可能還需要關閉檔案句柄、資料庫連接配接等。
在為消費者配置設定新分區或移除舊分區時,可以通過消費者 API 執行一些應用程式代碼,在調用 subscribe() 方法時傳進去一個 ConsumerRebalanceListener 執行個體就可以了。 ConsumerRebalanceListener 有兩個需要實作的方法。
- public void onPartitionsRevoked(Collection partitions) 方法會在再均衡開始之前和消費者停止讀取消息之後被調用。如果在這裡送出偏移量,下一個接管分區的消費者就知道該從哪裡開始讀取了。
- public void onPartitionsAssigned(Collection partitions) 方法會在重新配置設定分區之後和消費者開始讀取消息之前被調用。
下面的例子将示範如何在失去分區所有權之前通過 onPartitionsRevoked() 方法來送出偏移量。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets=
new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
System.out.println("Lost partitions in rebalance.
Committing current
offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}
try {
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(),
record.partition()), new
OffsetAndMetadata(record.offset()+1, "no metadata"));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
// 忽略異常,正在關閉消費者
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
如果發生再均衡,我們要在即将失去分區所有權時送出偏移量。要注意,送出的是最近處理過的偏移量,而不是批次中還在處理的最後一個偏移量。因為分區有可能在我們還在處理消息的時候被撤回。我們要送出所有分區的偏移量,而不隻是那些即将失去所有權的分區的偏移量——因為送出的偏移量是已經處理過的,是以不會有什麼問題。調用 commitSync() 方法,確定在再均衡發生之前送出偏移量。
十、用java快速測試kafka連通性Demo:
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Properties;
/**
* @author: huiq
* @createTime: 2021/8/26 19:37
* @description:
*/
public class SydjtxryConsumer extends ShutdownableThread
{
private static final Logger LOG = LoggerFactory.getLogger(SydjtxryConsumer.class);
private final KafkaConsumer<String, String> consumer;
private final String topic;
// 一次請求的最大等待時間
private final int waitTime = 10000;
// Broker連接配接位址
private final String bootstrapServers = "bootstrap.servers";
/**
* NewConsumer構造函數
* @param topic 訂閱的Topic名稱
*/
public SydjtxryConsumer(String topic)
{
super("KafkaConsumerExample", false);
Properties props = new Properties();
KafkaProperties kafkaProc = KafkaProperties.getInstance();
// Broker連接配接位址
props.put(bootstrapServers,
kafkaProc.getValues(bootstrapServers, "node01:6667,node02:6667,node03:6667"));
props.put("enable.auto.commit", "true"); // 自動送出
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
this.topic = topic;
}
/**
* 訂閱Topic的消息處理函數
*/
@Override
public void doWork()
{
// 訂閱
TopicPartition partition = new TopicPartition(this.topic, 0);
consumer.assign(Arrays.asList(partition));
// 消息消費請求
ConsumerRecords<String, String> records = consumer.poll(waitTime);
if (records.isEmpty()) {
System.out.println("消費者沒有消費到資料------->");
} else {
// 消息處理
for (ConsumerRecord<String, String> record : records) {
try {
System.out.println(record.value().toString());
LOG.info("資料偏移量為-->" + record.offset());
} catch (Exception e) {
e.printStackTrace();
LOG.warn("偏移量為" + record.offset() + "的資料處理有問題,請排查-->" + record.value().toString());
}
}
}
}
public static void consumer()
{
SydjtxryConsumer consumerThread = new SydjtxryConsumer("huiq_test.dbus.result");
consumerThread.start();
}
public static void main(String[] args) {
SydjtxryConsumer.consumer();
}
}
或者:
props.put("group.id", "heheda");
consumer = new KafkaConsumer<String, String>(props);
this.topic = topic;
}
/**
* 訂閱Topic的消息處理函數
*/
@Override
public void doWork()
{
// 訂閱
consumer.subscribe(Collections.singletonList(this.topic));
<!-- 這個kafka-clients依賴不确定沒有可不可以,但下面那個kafka_2.1依賴必須有 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>