引言
在 Kafka 中,生産者(Producer)負責将消息發送到 Kafka 叢集,是實作高效資料流動的關鍵元件之一。本文将從源碼層面分析 Kafka 生産者的實作細節,幫助讀者更好地了解 Kafka 生産者的工作原理和性能特征。
注明:
本次源碼分析基于kafka的3版本
0.10.2 的 Kafka 中,其 Client 端是由 Java 實作,Server 端是由 Scala 來實作的
能學到什麼
- Kafka 生産者是如何實作消息的發送和分發的?
- Kafka 生産者的代碼實作中有哪些值得我們注意的細節和技巧?
名詞解釋
- Producer Metadata——管理生産者所需的中繼資料:叢集中的主題和分區、充當分區上司者的代理節點等。
- Partitioner——計算給定記錄的分區。
- 序列化器——記錄鍵和值序列化器。序列化程式将對象轉換為位元組數組。
- 生産者攔截器——可能改變記錄的攔截器。
- Record Accumulator——累積記錄并按主題分區将它們分組為批次。
- 事務管理器——管理事務并維護必要的狀态以確定幂等生産。
- Sender——向 Kafka 叢集發送資料的背景線程。
架構圖
從上圖,我們了解到:
- kafka的生産者采用生産者-消費者模式,生産者發送消息的過程可以分為兩個階段:第一個階段是将待發送的消息緩存到 RecordAccumulator(記錄疊加器)中第二個階段是從 RecordAccumulator 中取出消息進行網絡發送。
- kafka的生産者其實分為三部分kafkaProducer主線程RecordAccumulatorsender線程
開始分析
生産者的例子
public class Producer {
private final KafkaProducer<Integer, String> producer;
public Producer(final String topic,
final String transactionalId,
final boolean enableIdempotency,
final int transactionTimeoutMs
) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
if (transactionTimeoutMs > 0) {
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
}
if (transactionalId != null) {
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
}
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
// 重點 分析1
producer = new KafkaProducer<>(props);
}
private void sendAsync(final int messageKey, final String messageStr, final long currentTimeMs) {
// 重點 分析2
this.producer.send(new ProducerRecord<>(topic,
messageKey,
messageStr),
new DemoCallBack(currentTimeMs, messageKey, messageStr));
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}
說明:
- 上面的例子,最關鍵的兩個地方(尤其是send消息的):KafkaProducer的構造方法producer.send消息
KafkaProducer的介紹
組成部分
說明
- producerConfig: 存儲了Kafka Producer的配置資訊,包括連接配接的Kafka叢集位址、序列化器、确認機制等參數。
- metrics: 存儲了生産者的名額資料,例如發送的消息數量、成功發送的消息數量、失敗的消息數量等。
- sender: 負責将消息發送到Kafka叢集的元件。它會将消息轉換成Kafka可識别的格式,然後将其發送到指定的分區。
- recordAccumulator: 緩存待發送的消息。生産者将消息發送到recordAccumulator後,sender從recordAccumulator中擷取消息并發送到Kafka叢集。
- metadata: 存儲了Kafka叢集中所有主題和分區的中繼資料資訊,包括分區的leader、ISR(in-sync replicas)清單等。
- interceptors: 消息攔截器清單。生産者可以配置多個攔截器,用于在消息發送前、發送後對消息進行處理,例如添加時間戳、列印日志等。
- bufferMemory: 緩存待發送消息的總大小。如果recordAccumulator中待發送消息的大小超過了bufferMemory,則生産者将等待sender将消息發送出去,以釋放recordAccumulator中的空間。
- maxBlockMs: 生産者在發送消息時,如果recordAccumulator已滿,會等待sender将消息發送出去。如果sender在指定的時間内無法發送消息,則生産者會抛出異常。maxBlockMs指定了等待sender的最大時間。
- requestTimeoutMs: 生産者等待Kafka Broker的響應的最大時間。如果在指定時間内沒有收到Broker的響應,則生産者會重試發送消息或抛出異常。
- transactionManager: 支援事務的生産者需要配置transactionManager。transactionManager負責管理事務的狀态、事務中發送的消息等資訊。
構造方法
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
try {
// 生産者的配置項
this.producerConfig = config;
this.time = time;
// 事務id
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
// 用戶端id
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
// 設定對應的分區器
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
warnIfPartitionerDeprecated();
this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
// 失敗重試的退避時間
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
// 序列化
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
// 配置生産者的攔截器
List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
// 叢集資源變更監聽器
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(this.keySerializer,
this.valueSerializer, interceptorList, reporters);
//設定消息的最大的長度,預設1M,生産環境可以提高到10M
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
// 設定發送消息的緩沖區的大小
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
// 壓縮類型
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
// 最大阻塞時間
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
// 投遞的逾時時間
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
this.apiVersions = new ApiVersions();
// 事務管理器
this.transactionManager = configureTransactionState(config, logContext);
// 如果我們使用自定義分區器,則無需執行自适應分區所需的工作.
boolean enableAdaptivePartitioning = partitioner == null &&
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
// 分區器的配置
RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
enableAdaptivePartitioning,
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
);
// 按Kafka生産者配置配置大小。Size可以設定為0以顯式禁用批處理,這實際上意味着使用批處理大小為1
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
// 消息記錄累加器
this.accumulator = new RecordAccumulator(logContext,
batchSize,
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
partitionerConfig,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
// 解析Broker位址
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
// 生産端的元資訊配置
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
this.metadata.bootstrap(addresses);
}
// 記錄失敗的監控資料
this.errors = this.metrics.sensor("errors");
// 建立發送器
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
config.logUnused();
// 注冊mb的相關的
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
說明:
- 配置生産者的監控
- 設定對應的分區器
- 配置發送失敗的重試時間
- key和value的序列化
- 配置生産者的攔截器
- 分區器的配置
- 初始化累加器
- 解析Broker位址
- 生産端的元資訊配置
- 建立發送器并且啟動的IO線程
KafkaProducer發送消息
說明
- 生産者發送消息的過程可以分為兩個階段:将發送的消息緩存到 RecordAccumulator(記錄疊加器)中RecordAccumulator是如何存儲消息的sender線程取出消息進行網絡發送。
将消息緩存到記錄疊加器
代碼
send方法
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
doSend方法
private Future < RecordMetadata > doSend(ProducerRecord < K, V > record, Callback callback) {
AppendCallbacks < K, V > appendCallbacks = new AppendCallbacks < K, V > (callback, this.interceptors, record);
try {
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
// key和value序列化
....
// 計算分區,但注意,在調用之後,它可以是RecordMetadata。UNKNOWN_PARTITION
// 這意味着RecordAccumulator将使用内置邏輯(可能會考慮代理負載,每個分區産生的資料量等)選擇一個分區.
int partition = partition(record, serializedKey, serializedValue, cluster);
// 将記錄追加到累加器
RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
// 在累加器成功追加分區後,将其添加到事務中(如果正在進行)。我們不能在此之前執行此操作,因為該分區可能是未知的,
// 或者當批處理關閉時初始選擇的分區可能會更改(如“abortForNewBatch”所示)。請注意,“發送方”将拒絕從累加器中出隊批次,直到它們被添加到事務中。
if (transactionManager != null) {
transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
}
// 如果累加器滿了或者新建立的批次
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
// 喚醒發送器線程
this.sender.wakeup();
}
return result.future;
} catch (ApiException e) {
//處理異常并記錄錯誤 對于 API 異常,将它們傳回Future,對于其他異常直接抛出
if (callback != null) {
TopicPartition tp = appendCallbacks.topicPartition();
callback.onCompletion(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1), e);
}
this.errors.record();
this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
if (transactionManager != null) {
transactionManager.maybeTransitionToErrorState(e);
}
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
throw e;
} catch (Exception e) {
this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
throw e;
}
}
說明
RecordAccumulator
RecordAccumulator是Kafka消息傳輸機制的核心元件之一,主要功能是将多個ProducerRecord對象批量打包成RecordBatch,并将RecordBatch添加到RecordBatchBuilder中等待發送。
成員變量
/**
* 用于存儲正在等待發送的RecordBatch
*/
private final AtomicInteger flushesInProgress;
/**
* 已經發送完成但還未被确認的RecordBatch
*/
private final AtomicInteger appendsInProgress;
/**
* 批次大小
*/
private final int batchSize;
/**
* RecordAccumulator可以使用LZ4和Gzip等壓縮方式對RecordBatch進行壓縮,以減少資料傳輸時的帶寬占用和網絡延遲。
*/
private final CompressionType compression;
/**
* 消息 batch 延遲多久再發送的時間
*/
private final int lingerMs;
/**
* 重試 間隔時間
*/
private final long retryBackoffMs;
private final int deliveryTimeoutMs;
/**
* 緩沖池
*/
private final BufferPool free;
private final Time time;
private final ApiVersions apiVersions;
// topic的緩存
private final ConcurrentMap<String /*topic*/, TopicInfo> topicInfoMap = new CopyOnWriteMap<>();
// node的狀态
private final ConcurrentMap<Integer /*nodeId*/, NodeLatencyStats> nodeStats = new CopyOnWriteMap<>();
// 未完成的批次
private final IncompleteBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
private final Set<TopicPartition> muted;
private final Map<String, Integer> nodesDrainIndex;
private final TransactionManager transactionManager;
如何追加消息的流程
public RecordAppendResult append(String topic,
int partition,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
AppendCallbacks callbacks,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs,
Cluster cluster) throws InterruptedException {
// 建立或擷取指定主題的 `TopicInfo` 對象,`TopicInfo` 用于跟蹤與指定主題相關的資訊,如分區資訊、分區内的批次
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(logContext, k, batchSize));
// 跟蹤追加線程的數量,以確定在abortIncompleteBatches()中不會遺漏批次.
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 循環-在遇到分區器的競态條件時重試.
while (true) {
// 根據TopicPartition擷取或建立Deque雙端隊列
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
synchronized (dq) {
// 擷取鎖後,驗證分區沒有更改,然後重試.
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;
// 嘗試将消息加入到緩沖區中
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null) {
// 追加成功
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
// 我們沒有正在進行的記錄批處理,請嘗試配置設定一個新批處理
if (abortOnNewBatch) {
return new RecordAppendResult(null, false, false, true, 0);
}
// 配置設定緩存區
if (buffer == null) {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
// 取16k和消息大小的最大值
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
// 如果耗盡緩沖區空間,重新配置設定,此調用可能阻塞.
buffer = free.allocate(size, maxTimeToBlock);
nowMs = time.milliseconds();
}
synchronized (dq) {
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster)) continue;
//
RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
if (appendResult.newBatchCreated)
buffer = null;
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
} finally {
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
tryAppend
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
// 重點是這裡
this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(), recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
- recordsBuilder.append的方法實際上是調用MemoryRecordsBuilder#appendWithOffset方法,代碼如下
MemoryRecordsBuilder#appendWithOffset
private void appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
ByteBuffer value, Header[] headers) {
try {
// 檢查是否可以将控制記錄追加到控制批次中
if (isControlRecord != isControlBatch) {
throw new IllegalArgumentException("Control records can only be appended to control batches");
}
// 檢查新記錄的偏移量是否合法
if (lastOffset != null && offset <= lastOffset) {
throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " +
"(Offsets must increase monotonically).", offset, lastOffset));
}
// 檢查時間戳是否合法
if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP) {
throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);
}
// 檢查是否支援記錄頭
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) {
throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
}
// 設定基準時間戳
if (baseTimestamp == null) {
baseTimestamp = timestamp;
}
// 根據不同的魔數調用不同的追加記錄方法
if (magic > RecordBatch.MAGIC_VALUE_V1) {
appendDefaultRecord(offset, timestamp, key, value, headers);
} else {
appendLegacyRecord(offset, timestamp, key, value, magic);
}
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
}
}
根據RecordBatch類中的定義 byte CURRENT_MAGIC_VALUE = MAGIC_VALUE_V2;是以我們直接看appendLegacyRecord方法的實作
appendLegacyRecord
private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, byte magic) throws IOException {
// 檢查消息追加器是否已經打開,如果未打開則抛出異常
ensureOpenForRecordAppend();
// 如果消息的壓縮類型為NONE,時間戳類型為LOG_APPEND_TIME,則将時間戳設定為目前追加時間
if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME) {
timestamp = logAppendTime;
}
// 計算記錄的大小
int size = LegacyRecord.recordSize(magic, key, value);
// 向追加流中寫入記錄頭
AbstractLegacyRecordBatch.writeHeader(appendStream, toInnerOffset(offset), size);
// 如果時間戳類型為LOG_APPEND_TIME,則将時間戳設定為目前追加時間
if (timestampType == TimestampType.LOG_APPEND_TIME) {
timestamp = logAppendTime;
}
// 調用遺留記錄的寫入方法寫入記錄,并傳回記錄的CRC校驗碼
long crc = LegacyRecord.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
// 更新已寫入的記錄數和位元組數
recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
return crc;
}
說明
- RecordAccumulator采用了分區級别的緩沖機制,即每個分區都有一個對應的緩沖區。這樣可以避免多個分區之間的競争,提高發送消息的效率
- RecordAccumulator會對消息進行壓縮,但是不會立即進行壓縮操作,而是會等待一段時間後再進行壓縮。這樣可以讓更多的消息被累積到一個批次中,進而提高壓縮的效率。
- RecordAccumulator會将多個批次中的消息合并成一個更大的批次進行發送。這樣可以減少網絡I/O操作的次數,進而提高發送消息的效率。
- RecordAccumulator會根據目前發送消息的速度動态調整批次的大小。如果發送速度很快,就會增加批次的大小;如果發送速度很慢,就會減小批次的大小。這樣可以保證發送消息的效率和穩定性
sender線程取出消息進行網絡發送
說明
回憶下在kafkaProducer的構造方法裡面會初始化sender線程:
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
// 建立發送器
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
- newSender方法其實就是建構一個 Sender對象
Sender對象的組成
sender類實作了Runnable接口,那麼我們直接看run方法
Run
public void run() {
// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in Kafka producer I/O thread: ", e);
}
}
... // 删除其他代碼
}
runOnce
void runOnce() {
if (transactionManager != null) {
try {
transactionManager.maybeResolveSequences();
// 如果transaction manager處于失敗狀态,不再發送消息
if (transactionManager.hasFatalError()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, time.milliseconds());
return;
}
// 檢查是否需要一個新的producerId,如果需要,則發送一個InitProducerId請求
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
if (maybeSendAndPollTransactionalRequest()) {
return;
}
} catch (AuthenticationException e) {
transactionManager.authenticationFailed(e);
}
}
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}
說明:
- 這裡我們隻看 sendProducerData和poll的方法
sendProducerData
private long sendProducerData(long now) {
// 擷取目前叢集的所有資料
Cluster cluster = metadata.fetch();
// 目前可發送資料的分區清單
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
//如果存在未知上司者(Leader),則将其添加到中繼資料中,并請求更新中繼資料
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics) {
this.metadata.add(topic, now);
}
unknownLeaderTopics);
this.metadata.requestUpdate();
}
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
//删除未準備好發送到的所有節點,并更新節點的延遲統計資料。
if (!this.client.ready(node, now)) {
this.accumulator.updateNodeLatencyStats(node.id(), now, false);
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
} else {
this.accumulator.updateNodeLatencyStats(node.id(), now, true);
}
}
// 将可發送的批次添加到正在進行中的批次清單中。
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
// 保證發送消息的順序
if (guaranteeMessageOrder) {
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList) {
this.accumulator.mutePartition(batch.topicPartition);
}
}
}
accumulator.resetNextBatchExpiryTime();
// 将所有已過期的批次删除,并标記為失敗
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
if (!expiredBatches.isEmpty()) {
log.trace("Expired {} batches in accumulator", expiredBatches.size());
}
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
transactionManager.markSequenceUnresolved(expiredBatch);
}
}
//計算poll的逾時時間
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
pollTimeout = 0;
}
// 發送請求到Kafka叢集
sendProduceRequests(batches, now);
return pollTimeout;
}
sendProduceRequests
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
// 找到建立記錄集時使用的最小版本
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
if (tpData == null) {
tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
tpd.add(tpData);
}
tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(records));
recordsByPartition.put(tp, batch);
}
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
// 将ProducerBatch轉換為ProduceRequest
ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
new ProduceRequestData()
.setAcks(acks)
.setTimeoutMs(timeout)
.setTransactionalId(transactionalId)
.setTopicData(tpd));
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
// 将ProduceRequest轉換為clientRequest
ClientRequest clientRequest = client.newClientRequest(Integer.toString(destination), requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
// 調用NetworkClient将消息寫入網絡發送出去
client.send(clientRequest, now);
}
- client.send 是調用NetworkClient#doSend的方法來發送資料的
NetworkClient#doSend
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
// 校驗是否可用
ensureActive();
// 擷取目的地的node節點
String nodeId = clientRequest.destination();
if (!isInternalRequest) {
if (!canSendRequest(nodeId, now))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try {
NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
if (versionInfo == null) {
version = builder.latestAllowedVersion();
} else {
version = versionInfo.latestUsableVersion(clientRequest.apiKey(),
builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
// 真正的發送
doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException unsupportedVersionException) {
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(),
clientRequest.destination(), now, now, false,
unsupportedVersionException, null, null);
if (!isInternalRequest)
abortedSends.add(clientResponse);
else if (clientRequest.apiKey() == ApiKeys.METADATA)
metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
}
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
Send send = request.toSend(header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now
);
// InFlightRequest(飛行隊列)表示請求已經發送,但是還沒有得到響應
this.inFlightRequests.add(inFlightRequest);
selector.send(new NetworkSend(destination, send));
}
selector.send
/**
* 主要實作了 Kafka 用戶端的網絡請求的排隊功能,能夠将網絡請求加入到發送隊列中,等待後續的 poll 方法進行發送
*
* @param send The request to send
*/
public void send(NetworkSend send) {
// 擷取目标連接配接的連接配接 ID
String connectionId = send.destinationId();
//獲得 KafkaChannel 對象,
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
//若連接配接正在關閉,則将連接配接 ID 添加到 failedSends 隊列中
if (closingChannels.containsKey(connectionId)) {
this.failedSends.add(connectionId);
} else {
try {
//将網絡請求交給 KafkaChannel 對象處理,暫存資料預發送,并沒有真正的發送
channel.setSend(send);
} catch (Exception e) {
// 如果 KafkaChannel 對象在處理過程中抛出異常,将連接配接狀态設定為 FAILED_SEND,并将連接配接 ID 添加到 failedSends 隊列中,然後關閉連接配接,并将異常向上抛出,以便上層代碼處理
channel.state(ChannelState.FAILED_SEND);
this.failedSends.add(connectionId);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
if (!(e instanceof CancelledKeyException)) {
throw e;
}
}
}
}
poll
public void poll(long timeout) throws IOException {
// 逾時時間是否小于 0
if (timeout < 0) {
throw new IllegalArgumentException("timeout should be >= 0");
}
boolean madeReadProgressLastCall = madeReadProgressLastPoll;
clear();
boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
if (!immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers)) {
timeout = 0;
}
//檢查記憶體是否已經不足
if (!memoryPool.isOutOfMemory() && outOfMemory) {
for (KafkaChannel channel : channels.values()) {
if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
channel.maybeUnmute();
}
}
outOfMemory = false;
}
/* 檢測已經準備好的keys */
long startSelect = time.nanoseconds();
//Java NIO 庫提供的 select 方法
int numReadyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds(), false);
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
// 從緩沖了資料的通道進行輪詢(但不再從底層套接字進行輪詢)
if (dataInBuffers) {
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
Set<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
}
// 從底層套接字擁有更多資料的通道進行輪詢
pollSelectionKeys(readyKeys, false, endSelect);
// 清除所有標明的鍵,使它們從下一次選擇的就緒計數中排除
readyKeys.clear();
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
immediatelyConnectedKeys.clear();
} else {
madeReadProgressLastPoll = true;
}
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds(), false);
// 關閉被延遲的通道,現在可以關閉了
completeDelayedChannelClose(endIo);
// 我們使用select末尾的時間來確定不會關閉pollSelectionKeys中剛剛處理的任何連接配接
maybeCloseOldestConnection(endSelect);
}
總結
Kafka 生産者的設計具有多個精妙之處,其中包括:
- 高效的異步發送:Kafka 生産者使用 RecordAccumulator 進行消息緩存,并利用 Sender 線程異步發送消息,這種設計可以提高消息發送的吞吐量。
- 批量發送:Kafka 生産者可以将多個消息批量發送,進而減少網絡開銷和服務端的負載壓力。
- 可靠的重試機制:Kafka 生産者使用重試機制來保證消息能夠成功發送,當消息發送失敗時,生産者會自動進行重試,直到消息發送成功或者達到最大重試次數。
- 動态分區配置設定:Kafka 生産者可以根據生産者和分區的數量動态配置設定分區,進而實作負載均衡和優化網絡使用。
- 可配置的消息壓縮:Kafka 生産者支援多種消息壓縮算法,可以根據實際需求進行配置,進而減少網絡傳輸的資料量。
遺留
- RecordAccumulator對記憶體的操作邏輯沒有分析透徹
- selector#poll底層的邏輯也沒有分析透徹
為幫助開發者們提升面試技能、有機會入職BATJ等大廠公司,特别制作了這個專輯——這一次整體放出。
大緻内容包括了: Java 集合、JVM、多線程、并發程式設計、設計模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat等大廠面試題等、等技術棧!
歡迎大家關注公衆号【Java爛豬皮】,回複【666】,擷取以上最新Java後端架構VIP學習資料以及視訊學習教程,然後一起學習,一文在手,面試我有。
每一個專欄都是大家非常關心,和非常有價值的話題,如果我的文章對你有所幫助,還請幫忙點贊、好評、轉發一下,你的支援會激勵我輸出更高品質的文章,非常感謝!