下面代碼來自kafka源碼包裡的<code>examples</code>包
建立生産者的源碼:
異步發送模式還傳遞了一個Callback回調類,當用戶端發送完一條消息,并且這條消息成功地存儲至Kafka叢集後, 服務端會調用匿名回調類的回調方法( onCompletion)。 異步發送指的是生産者發送完一條消息後,不需要關心服務端處理完了沒有 ,可以接着發送下一條消息 。 服務端在處理完每一條消息後,會自動觸發回調函數,傳回響應結果給用戶端。 如果是以同步模式發送消息,生産者用戶端應用程式不需要提供回調函數。 同步模式下,生産者發送完一條消息後,必須等待服務端傳回響應結果,然後才能發送下一條消息 。
KafkaProducer 隻用了一個 send 方法,就可以完成同步和l異步兩種模式的消息發疊,這是因為 send方法傳回的是一個 Future。 基于Future ,我們可以實作同步或異步的消息發送話義。
同步。
調用 send傳回 Future時, 需要立即調用get ,因為Future.get在沒有傳回結果時會一直 阻塞。
異步。
提供一個回調,調用 send後可以繼續發送消息而不用等待 。 當有結果運回時,會自動執行回調函數。