KafkaProducer是Kafka中Producer的一種實作,其主要功能就是發送消息給Kafka中broker。其send()方法如下:
再看兩個參數的send()方法,代碼如下:
其大體邏輯如下:
1、首先調用waitOnMetadata()方法確定該主題topic對應的中繼資料metadata是可用的;
2、計算剩餘等待時間remainingWaitMs;
3、根據record中topic、key,利用valueSerializer得到序列化key:serializedKey;
4、根據record中topic、value,利用valueSerializer得到序列化value:serializedValue;
5、調用partition()方法獲得分區号partition;
6、計算序列化後的key、value及其offset、size所占大小serializedSize;
7、調用ensureValidRecordSize()方法確定記錄大小serializedSize是有效的;
8、根據record中的topic和partition構造TopicPartition執行個體tp;
9、調用accumulator的append()方法添加記錄,獲得記錄添加結果RecordAppendResult類型的result;
10、根據結果result的batchIsFull或newBatchCreated确定是否執行sender的wakeup();
11、傳回result中的future。