天天看點

Kafka Producer攔截器(Interceptor)

Kafka攔截器一共有兩種:

Producer端

Consumer端

本篇主要講述的是Kafka Producer端的攔截器,它主要用來對消息進行攔截或者修改,也可以用于Producer的Callback回調之前進行相應的預處理。

使用Kafka Producer端的攔截器非常簡單,主要是實作ProducerInterceptor接口,此接口包含4個方法:

ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)

Producer在将消息序列化和配置設定分區之前會調用攔截器的這個方法來對消息進行相應的操作。一般來說最好不要修改消息ProducerRecord的topic、key以及partition等資訊,如果要修改,也需確定對其有準确的判斷,否則會與預想的效果出現偏差。比如修改key不僅會影響分區的計算,同樣也會影響Broker端日志壓縮(Log Compaction)的功能。

void onAcknowledgement(RecordMetadata metadata, Exception exception)

在消息被應答(Acknowledgement)之前或者消息發送失敗時調用,優先于使用者設定的Callback之前執行。這個方法運作在Producer的IO線程中,是以這個方法裡實作的代碼邏輯越簡單越好,否則會影響消息的發送速率。

void close()

關閉目前的攔截器,此方法主要用于執行一些資源的清理工作。

configure(Map<String, ?> configs)

用來初始化此類的方法,這個是ProducerInterceptor接口的父接口Configurable中的方法。

一般情況下隻需要關注并實作onSend或onAcknowledgement方法即可。

示例

onSend統計發送的消息個數

Kafka Producer攔截器(Interceptor)

通過onAcknowledgement統計發送消息的成功次數

Kafka Producer攔截器(Interceptor)

繼續閱讀