天天看點

【硬剛Kafka】KAFKA基礎(十):Kafka API (2)Kafka producer 攔截器(interceptor)

本文是對《【硬剛大資料之學習路線篇】從零到大資料專家的學習指南(全面更新版)》的Kafka部分補充。

  Producer 攔截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于實作 clients 端的定制化控制邏輯。

  對于 producer 而言,interceptor 使得使用者在消息發送前以及 producer 回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer 允許使用者指定多個 interceptor

按序作用于同一條消息進而形成一個攔截鍊(interceptor chain)。Intercetpor 的實作接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

  (1)configure(configs)

擷取配置資訊和初始化資料時調用。

  (2)onSend(ProducerRecord):

  該方法封裝進 KafkaProducer.send 方法中,即它運作在使用者主線程中。Producer 確定在消息被序列化以及計算分區前調用該方法。使用者可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的 topic 和分區,否則會影響目标分區的計算

  (3)onAcknowledgement(RecordMetadata, Exception):

  該方法會在消息從RecordAccumulator成功發送到Kafka Broker之後,或者在發送過程中失敗時調用。并且通常都是在producer回調邏輯觸發之前。onAcknowledgement運作在producer的IO線程中,是以不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發送效率。

  (4)close:

關閉 interceptor,主要用于執行一些資源清理工作

如前所述,interceptor 可能被運作在多個線程中,是以在具體實作時使用者需要自行確定線程安全。另外倘若指定了多個 interceptor,則 producer 将按照指定順序調用它們,并僅僅是捕獲每個 interceptor 可能抛出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特别留意。

1)需求:

實作一個簡單的雙 interceptor 組成的攔截鍊。第一個 interceptor 會在消息發送前将時間戳資訊加到消息 value 的最前部;第二個 interceptor 會在消息發送後更新成功發送消息數或

失敗發送消息數。

Kafka攔截器

【硬剛Kafka】KAFKA基礎(十):Kafka API (2)Kafka producer 攔截器(interceptor)

2)案例實操

(1)增加時間戳攔截器

(2)統計發送消息成功和發送失敗消息數,并在 producer 關閉時列印這兩個計數器

(3)producer 主程式

3)測試

(1)在 kafka 上啟動消費者,然後運作用戶端 java 程式。

(2)觀察 java 平台控制台輸出資料如下:

繼續閱讀