原文位址
在Kafka源碼分析-序列2中,我們提到了整個Producer client的架構圖,如下所示:
其它幾個元件我們在前面都講過了,今天講述最後一個元件RecordAccumulator.
Batch發送
在以前的kafka client中,每條消息稱為 “Message”,而在Java版client中,稱之為”Record”,同時又因為有批量發送累積功能,是以稱之為RecordAccumulator.
RecordAccumulator最大的一個特性就是batch消息,扔到隊列中的多個消息,可能組成一個RecordBatch,然後由Sender一次性發送出去。
每個TopicPartition一個隊列
下面是RecordAccumulator的内部結構,可以看到,每個TopicPartition對應一個消息隊列,隻有同一個TopicPartition的消息,才可能被batch。
public final class RecordAccumulator {
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches; ...}123456
batch的政策
那什麼時候,消息會被batch,什麼時候不會呢?下面從KafkaProducer的send方法看起:
//KafkaProducer
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { try {
// first make sure the metadata for the topic is available
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); ...
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs); //核心函數:把消息放入隊列 if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
} return result.future;123456789101112131415
從上面代碼可以看到,batch邏輯,都在accumulator.append函數裡面:
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {
appendsInProgress.incrementAndGet(); try { if (closed) throw new IllegalStateException("Cannot send after the producer is closed.");
Deque<RecordBatch> dq = dequeFor(tp); //找到該topicPartiton對應的消息隊列
synchronized (dq) {
RecordBatch last = dq.peekLast(); //拿出隊列的最後1個元素
if (last != null) {
FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); //最後一個元素, 即RecordBatch不為空,把該Record加入該RecordBatch
if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
} int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock.
if (closed) throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast(); if (last != null) {
FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer); return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
} //隊列裡面沒有RecordBatch,建一個新的,然後把Record放進去
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
} private Deque<RecordBatch> dequeFor(TopicPartition tp) {
Deque<RecordBatch> d = this.batches.get(tp); if (d != null) return d;
d = new ArrayDeque<>();
Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d); if (previous == null) return d; else
return previous;
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
從上面代碼我們可以看出Batch的政策:
1。如果是同步發送,每次去隊列取,RecordBatch都會為空。這個時候,消息就不會batch,一個Record形成一個RecordBatch
2。Producer 入隊速率 < Sender出隊速率 && lingerMs = 0 ,消息也不會被batch
3。Producer 入隊速率 > Sender出對速率, 消息會被batch
4。lingerMs > 0,這個時候Sender會等待,直到lingerMs > 0 或者 隊列滿了,或者超過了一個RecordBatch的最大值,就會發送。這個邏輯在RecordAccumulator的ready函數裡面。
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<Node>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; boolean unknownLeadersExist = false; boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(part); if (leader == null) {
unknownLeadersExist = true;
} else if (!readyNodes.contains(leader)) { synchronized (deque) {
RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; long waitedTimeMs = nowMs - batch.lastAttemptMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); //關鍵的一句話
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
} return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}12345678910111213141516171819202122232425262728293031323334353637
為什麼是Deque?
在上面我們看到,消息隊列用的是一個“雙端隊列“,而不是普通的隊列。
一端生産,一端消費,用一個普通的隊列不就可以嗎,為什麼要“雙端“呢?