天天看點

Kafka3.0源碼分析-生産者的實作細節

Kafka3.0源碼分析-生産者的實作細節

引言

在 Kafka 中,生産者(Producer)負責将消息發送到 Kafka 叢集,是實作高效資料流動的關鍵元件之一。本文将從源碼層面分析 Kafka 生産者的實作細節,幫助讀者更好地了解 Kafka 生産者的工作原理和性能特征。

注明:

本次源碼分析基于kafka的3版本

0.10.2 的 Kafka 中,其 Client 端是由 Java 實作,Server 端是由 Scala 來實作的

能學到什麼

  1. Kafka 生産者是如何實作消息的發送和分發的?
  2. Kafka 生産者的代碼實作中有哪些值得我們注意的細節和技巧?

名詞解釋

  • Producer Metadata——管理生産者所需的中繼資料:叢集中的主題和分區、充當分區上司者的代理節點等。
  • Partitioner——計算給定記錄的分區。
  • 序列化器——記錄鍵和值序列化器。序列化程式将對象轉換為位元組數組。
  • 生産者攔截器——可能改變記錄的攔截器。
  • Record Accumulator——累積記錄并按主題分區将它們分組為批次。
  • 事務管理器——管理事務并維護必要的狀态以確定幂等生産。
  • Sender——向 Kafka 叢集發送資料的背景線程。

架構圖

Kafka3.0源碼分析-生産者的實作細節

從上圖,我們了解到:

  1. kafka的生産者采用生産者-消費者模式,生産者發送消息的過程可以分為兩個階段:第一個階段是将待發送的消息緩存到 RecordAccumulator(記錄疊加器)中第二個階段是從 RecordAccumulator 中取出消息進行網絡發送。
  2. kafka的生産者其實分為三部分kafkaProducer主線程RecordAccumulatorsender線程

開始分析

生産者的例子

public class Producer {
    private final KafkaProducer<Integer, String> producer;
  
    public Producer(final String topic,
                    final String transactionalId,
                    final boolean enableIdempotency,
                    final int transactionTimeoutMs
                    ) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        if (transactionTimeoutMs > 0) {
            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
        }
        if (transactionalId != null) {
            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
        }
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
        // 重點 分析1
        producer = new KafkaProducer<>(props);
    }


    private void sendAsync(final int messageKey, final String messageStr, final long currentTimeMs) {
      // 重點 分析2
        this.producer.send(new ProducerRecord<>(topic,
                        messageKey,
                        messageStr),
                new DemoCallBack(currentTimeMs, messageKey, messageStr));
    }

}

class DemoCallBack implements Callback {

    private final long startTime;
    private final int key;
    private final String message;

    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }

    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                    "), " +
                    "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
}

           

說明:

  • 上面的例子,最關鍵的兩個地方(尤其是send消息的):KafkaProducer的構造方法producer.send消息

KafkaProducer的介紹

組成部分

Kafka3.0源碼分析-生産者的實作細節

說明

  1. producerConfig: 存儲了Kafka Producer的配置資訊,包括連接配接的Kafka叢集位址、序列化器、确認機制等參數。
  2. metrics: 存儲了生産者的名額資料,例如發送的消息數量、成功發送的消息數量、失敗的消息數量等。
  3. sender: 負責将消息發送到Kafka叢集的元件。它會将消息轉換成Kafka可識别的格式,然後将其發送到指定的分區。
  4. recordAccumulator: 緩存待發送的消息。生産者将消息發送到recordAccumulator後,sender從recordAccumulator中擷取消息并發送到Kafka叢集。
  5. metadata: 存儲了Kafka叢集中所有主題和分區的中繼資料資訊,包括分區的leader、ISR(in-sync replicas)清單等。
  6. interceptors: 消息攔截器清單。生産者可以配置多個攔截器,用于在消息發送前、發送後對消息進行處理,例如添加時間戳、列印日志等。
  7. bufferMemory: 緩存待發送消息的總大小。如果recordAccumulator中待發送消息的大小超過了bufferMemory,則生産者将等待sender将消息發送出去,以釋放recordAccumulator中的空間。
  8. maxBlockMs: 生産者在發送消息時,如果recordAccumulator已滿,會等待sender将消息發送出去。如果sender在指定的時間内無法發送消息,則生産者會抛出異常。maxBlockMs指定了等待sender的最大時間。
  9. requestTimeoutMs: 生産者等待Kafka Broker的響應的最大時間。如果在指定時間内沒有收到Broker的響應,則生産者會重試發送消息或抛出異常。
  10. transactionManager: 支援事務的生産者需要配置transactionManager。transactionManager負責管理事務的狀态、事務中發送的消息等資訊。

構造方法

KafkaProducer(ProducerConfig config,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  ProducerMetadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors<K, V> interceptors,
                  Time time) {
        try {
            // 生産者的配置項
            this.producerConfig = config;
            this.time = time;
            // 事務id
            String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
            // 用戶端id
            this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);

            // 設定對應的分區器
            this.partitioner = config.getConfiguredInstance(
                    ProducerConfig.PARTITIONER_CLASS_CONFIG,
                    Partitioner.class,
                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
            warnIfPartitionerDeprecated();
            this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
            // 失敗重試的退避時間
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

            // 序列化
            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                         Serializer.class);
                this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = keySerializer;
            }
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                           Serializer.class);
                this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;
            }

            // 配置生産者的攔截器
            List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
                    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ProducerInterceptor.class,
                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
         
            // 叢集資源變更監聽器
            ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(this.keySerializer,
                    this.valueSerializer, interceptorList, reporters);
            //設定消息的最大的長度,預設1M,生産環境可以提高到10M
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
            // 設定發送消息的緩沖區的大小
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
            // 壓縮類型
            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
            // 最大阻塞時間
            this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
            // 投遞的逾時時間
            int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

            this.apiVersions = new ApiVersions();
            // 事務管理器
            this.transactionManager = configureTransactionState(config, logContext);
            // 如果我們使用自定義分區器,則無需執行自适應分區所需的工作.
            boolean enableAdaptivePartitioning = partitioner == null &&
                config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);

            // 分區器的配置
            RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
                enableAdaptivePartitioning,
                config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
            );

            // 按Kafka生産者配置配置大小。Size可以設定為0以顯式禁用批處理,這實際上意味着使用批處理大小為1
            int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
            // 消息記錄累加器
            this.accumulator = new RecordAccumulator(logContext,
                    batchSize,
                    this.compressionType,
                    lingerMs(config),
                    retryBackoffMs,
                    deliveryTimeoutMs,
                    partitionerConfig,
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,
                    new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));

            // 解析Broker位址
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                    config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
                    config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));

            // 生産端的元資訊配置
            if (metadata != null) {
                this.metadata = metadata;
            } else {
                this.metadata = new ProducerMetadata(retryBackoffMs,
                        config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                        config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
                        logContext,
                        clusterResourceListeners,
                        Time.SYSTEM);
                this.metadata.bootstrap(addresses);
            }
            // 記錄失敗的監控資料
            this.errors = this.metrics.sensor("errors");

            // 建立發送器
            this.sender = newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

            config.logUnused();
            // 注冊mb的相關的
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
            close(Duration.ofMillis(0), true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }

           

說明:

  • 配置生産者的監控
  • 設定對應的分區器
  • 配置發送失敗的重試時間
  • key和value的序列化
  • 配置生産者的攔截器
  • 分區器的配置
  • 初始化累加器
  • 解析Broker位址
  • 生産端的元資訊配置
  • 建立發送器并且啟動的IO線程

KafkaProducer發送消息

說明

  • 生産者發送消息的過程可以分為兩個階段:将發送的消息緩存到 RecordAccumulator(記錄疊加器)中RecordAccumulator是如何存儲消息的sender線程取出消息進行網絡發送。
Kafka3.0源碼分析-生産者的實作細節

将消息緩存到記錄疊加器

代碼

send方法

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {  
 
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);  
    return doSend(interceptedRecord, callback);  
}
           

doSend方法

private Future < RecordMetadata > doSend(ProducerRecord < K, V > record, Callback callback) {
    
    AppendCallbacks < K, V > appendCallbacks = new AppendCallbacks < K, V > (callback, this.interceptors, record);

    try {
       
        long nowMs = time.milliseconds();
        ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
        
        nowMs += clusterAndWaitTime.waitedOnMetadataMs;
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;
        // key和value序列化
        ....
        // 計算分區,但注意,在調用之後,它可以是RecordMetadata。UNKNOWN_PARTITION
        // 這意味着RecordAccumulator将使用内置邏輯(可能會考慮代理負載,每個分區産生的資料量等)選擇一個分區.
        int partition = partition(record, serializedKey, serializedValue, cluster);

        // 将記錄追加到累加器
        RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
            serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
       
        // 在累加器成功追加分區後,将其添加到事務中(如果正在進行)。我們不能在此之前執行此操作,因為該分區可能是未知的,
        // 或者當批處理關閉時初始選擇的分區可能會更改(如“abortForNewBatch”所示)。請注意,“發送方”将拒絕從累加器中出隊批次,直到它們被添加到事務中。
        if (transactionManager != null) {
            transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
        }
        // 如果累加器滿了或者新建立的批次
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
            // 喚醒發送器線程
            this.sender.wakeup();
        }
        return result.future;
        
    } catch (ApiException e) {
        //處理異常并記錄錯誤 對于 API 異常,将它們傳回Future,對于其他異常直接抛出
        if (callback != null) {
            TopicPartition tp = appendCallbacks.topicPartition();
            callback.onCompletion(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1), e);
        }
        this.errors.record();
        this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
        if (transactionManager != null) {
            transactionManager.maybeTransitionToErrorState(e);
        }
        return new FutureFailure(e);
    } catch (InterruptedException e) {
        this.errors.record();
        this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
        throw new InterruptException(e);
    } catch (KafkaException e) {
        this.errors.record();
        this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
        throw e;
    } catch (Exception e) {
        this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
        throw e;
    }
}
           

說明

Kafka3.0源碼分析-生産者的實作細節

RecordAccumulator

RecordAccumulator是Kafka消息傳輸機制的核心元件之一,主要功能是将多個ProducerRecord對象批量打包成RecordBatch,并将RecordBatch添加到RecordBatchBuilder中等待發送。

成員變量

/**
     * 用于存儲正在等待發送的RecordBatch
     */
    private final AtomicInteger flushesInProgress;
    /**
     * 已經發送完成但還未被确認的RecordBatch
     */
    private final AtomicInteger appendsInProgress;
    /**
     * 批次大小
     */
    private final int batchSize;
    /**
     * RecordAccumulator可以使用LZ4和Gzip等壓縮方式對RecordBatch進行壓縮,以減少資料傳輸時的帶寬占用和網絡延遲。
     */
    private final CompressionType compression;
    /**
     * 消息 batch 延遲多久再發送的時間
     */
    private final int lingerMs;
    /**
     * 重試 間隔時間
     */
    private final long retryBackoffMs;
    private final int deliveryTimeoutMs;
    
    /**
     * 緩沖池
     */
    private final BufferPool free;
    private final Time time;
    private final ApiVersions apiVersions;
    // topic的緩存
    private final ConcurrentMap<String /*topic*/, TopicInfo> topicInfoMap = new CopyOnWriteMap<>();
    // node的狀态
    private final ConcurrentMap<Integer /*nodeId*/, NodeLatencyStats> nodeStats = new CopyOnWriteMap<>();
    // 未完成的批次
    private final IncompleteBatches incomplete;
    // The following variables are only accessed by the sender thread, so we don't need to protect them.
    private final Set<TopicPartition> muted;
    private final Map<String, Integer> nodesDrainIndex;
    private final TransactionManager transactionManager;
           

如何追加消息的流程

public RecordAppendResult append(String topic,
                                     int partition,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     AppendCallbacks callbacks,
                                     long maxTimeToBlock,
                                     boolean abortOnNewBatch,
                                     long nowMs,
                                     Cluster cluster) throws InterruptedException {
        // 建立或擷取指定主題的 `TopicInfo` 對象,`TopicInfo` 用于跟蹤與指定主題相關的資訊,如分區資訊、分區内的批次
        TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(logContext, k, batchSize));

        // 跟蹤追加線程的數量,以確定在abortIncompleteBatches()中不會遺漏批次.
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            // 循環-在遇到分區器的競态條件時重試.
            while (true) {
                
                // 根據TopicPartition擷取或建立Deque雙端隊列
                Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
                synchronized (dq) {
                    // 擷取鎖後,驗證分區沒有更改,然後重試.
                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                        continue;
                    // 嘗試将消息加入到緩沖區中
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
                    if (appendResult != null) {
                        // 追加成功
                        boolean enableSwitch = allBatchesFull(dq);
                        topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                        return appendResult;
                    }
                }

                // 我們沒有正在進行的記錄批處理,請嘗試配置設定一個新批處理
                if (abortOnNewBatch) {
                    return new RecordAppendResult(null, false, false, true, 0);
                }
                // 配置設定緩存區
                if (buffer == null) {
                    byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                    // 取16k和消息大小的最大值
                    int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                    
                    // 如果耗盡緩沖區空間,重新配置設定,此調用可能阻塞.
                    buffer = free.allocate(size, maxTimeToBlock);
                    nowMs = time.milliseconds();
                }

                synchronized (dq) {
                    
                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))  continue;
                    // 
                    RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
                    
                    if (appendResult.newBatchCreated)
                        buffer = null;
                 
                    boolean enableSwitch = allBatchesFull(dq);
                    topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                    return appendResult;
                }
            }
        } finally {
            free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
    }
           

tryAppend

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {  
    if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {  
        return null;  
    } else {  
        // 重點是這裡
        this.recordsBuilder.append(timestamp, key, value, headers);  
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(), recordsBuilder.compressionType(), key, value, headers));  
        this.lastAppendTime = now;  
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,  
            timestamp,  
            key == null ? -1 : key.length,  
            value == null ? -1 : value.length,  
            Time.SYSTEM);  

        thunks.add(new Thunk(callback, future));  
        this.recordCount++;  
        return future;  
    }  
}
           
  • recordsBuilder.append的方法實際上是調用MemoryRecordsBuilder#appendWithOffset方法,代碼如下

MemoryRecordsBuilder#appendWithOffset

private void appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
                              ByteBuffer value, Header[] headers) {
    try {
        // 檢查是否可以将控制記錄追加到控制批次中
        if (isControlRecord != isControlBatch) {
            throw new IllegalArgumentException("Control records can only be appended to control batches");
        }

        // 檢查新記錄的偏移量是否合法
        if (lastOffset != null && offset <= lastOffset) {
            throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " +
                    "(Offsets must increase monotonically).", offset, lastOffset));
        }

        // 檢查時間戳是否合法
        if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP) {
            throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);
        }

        // 檢查是否支援記錄頭
        if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) {
            throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
        }

        // 設定基準時間戳
        if (baseTimestamp == null) {
            baseTimestamp = timestamp;
        }

        // 根據不同的魔數調用不同的追加記錄方法
        if (magic > RecordBatch.MAGIC_VALUE_V1) {
            appendDefaultRecord(offset, timestamp, key, value, headers);
        } else {
            appendLegacyRecord(offset, timestamp, key, value, magic);
        }
    } catch (IOException e) {
        throw new KafkaException("I/O exception when writing to the append stream, closing", e);
    }
}

           

根據RecordBatch類中的定義 byte CURRENT_MAGIC_VALUE = MAGIC_VALUE_V2;是以我們直接看appendLegacyRecord方法的實作

appendLegacyRecord

private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, byte magic) throws IOException {
    // 檢查消息追加器是否已經打開,如果未打開則抛出異常
    ensureOpenForRecordAppend();

    // 如果消息的壓縮類型為NONE,時間戳類型為LOG_APPEND_TIME,則将時間戳設定為目前追加時間
    if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME) {
        timestamp = logAppendTime;
    }

    // 計算記錄的大小
    int size = LegacyRecord.recordSize(magic, key, value);

    // 向追加流中寫入記錄頭
    AbstractLegacyRecordBatch.writeHeader(appendStream, toInnerOffset(offset), size);

    // 如果時間戳類型為LOG_APPEND_TIME,則将時間戳設定為目前追加時間
    if (timestampType == TimestampType.LOG_APPEND_TIME) {
        timestamp = logAppendTime;
    }

    // 調用遺留記錄的寫入方法寫入記錄,并傳回記錄的CRC校驗碼
    long crc = LegacyRecord.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);

    // 更新已寫入的記錄數和位元組數
    recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);

    return crc;
}

           

說明

  • RecordAccumulator采用了分區級别的緩沖機制,即每個分區都有一個對應的緩沖區。這樣可以避免多個分區之間的競争,提高發送消息的效率
  • RecordAccumulator會對消息進行壓縮,但是不會立即進行壓縮操作,而是會等待一段時間後再進行壓縮。這樣可以讓更多的消息被累積到一個批次中,進而提高壓縮的效率。
  • RecordAccumulator會将多個批次中的消息合并成一個更大的批次進行發送。這樣可以減少網絡I/O操作的次數,進而提高發送消息的效率。
  • RecordAccumulator會根據目前發送消息的速度動态調整批次的大小。如果發送速度很快,就會增加批次的大小;如果發送速度很慢,就會減小批次的大小。這樣可以保證發送消息的效率和穩定性

sender線程取出消息進行網絡發送

說明

回憶下在kafkaProducer的構造方法裡面會初始化sender線程:

public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
// 建立發送器

this.sender = newSender(logContext, kafkaClient, this.metadata);  
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;  
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);  
this.ioThread.start();
           
  • newSender方法其實就是建構一個 Sender對象

Sender對象的組成

Kafka3.0源碼分析-生産者的實作細節
sender類實作了Runnable接口,那麼我們直接看run方法

Run

public void run() {

    // main loop, runs until close is called
    while (running) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in Kafka producer I/O thread: ", e);
        }
    }
    ... // 删除其他代碼
   
}
           

runOnce

void runOnce() {
    if (transactionManager != null) {
        try {
            transactionManager.maybeResolveSequences();

            // 如果transaction manager處于失敗狀态,不再發送消息
            if (transactionManager.hasFatalError()) {
                RuntimeException lastError = transactionManager.lastError();
                if (lastError != null)
                    maybeAbortBatches(lastError);
                client.poll(retryBackoffMs, time.milliseconds());
                return;
            }

            // 檢查是否需要一個新的producerId,如果需要,則發送一個InitProducerId請求
            transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();

            if (maybeSendAndPollTransactionalRequest()) {
                return;
            }
        } catch (AuthenticationException e) {
          
            transactionManager.authenticationFailed(e);
        }
    }

    long currentTimeMs = time.milliseconds();
    long pollTimeout = sendProducerData(currentTimeMs);
    client.poll(pollTimeout, currentTimeMs);
}

           

說明:

  • 這裡我們隻看 sendProducerData和poll的方法

sendProducerData

private long sendProducerData(long now) {
    // 擷取目前叢集的所有資料
    Cluster cluster = metadata.fetch();
    // 目前可發送資料的分區清單
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    //如果存在未知上司者(Leader),則将其添加到中繼資料中,并請求更新中繼資料
    if (!result.unknownLeaderTopics.isEmpty()) {
        for (String topic : result.unknownLeaderTopics) {
            this.metadata.add(topic, now);
        }
        unknownLeaderTopics);
        this.metadata.requestUpdate();
    }

    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        //删除未準備好發送到的所有節點,并更新節點的延遲統計資料。
        if (!this.client.ready(node, now)) {
            this.accumulator.updateNodeLatencyStats(node.id(), now, false);
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
        } else {
            this.accumulator.updateNodeLatencyStats(node.id(), now, true);
        }
    }
    // 将可發送的批次添加到正在進行中的批次清單中。    
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
    addToInflightBatches(batches);
    // 保證發送消息的順序
    if (guaranteeMessageOrder) {
        for (List<ProducerBatch> batchList : batches.values()) {
            for (ProducerBatch batch : batchList) {
                this.accumulator.mutePartition(batch.topicPartition);
            }
        }
    }

    accumulator.resetNextBatchExpiryTime();
    // 将所有已過期的批次删除,并标記為失敗
    List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
    List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
    expiredBatches.addAll(expiredInflightBatches);

    if (!expiredBatches.isEmpty()) {
        log.trace("Expired {} batches in accumulator", expiredBatches.size());
    }
    for (ProducerBatch expiredBatch : expiredBatches) {
        String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
            + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
        failBatch(expiredBatch, new TimeoutException(errorMessage), false);
        if (transactionManager != null && expiredBatch.inRetry()) {
            transactionManager.markSequenceUnresolved(expiredBatch);
        }
    }
   

    //計算poll的逾時時間
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
    pollTimeout = Math.max(pollTimeout, 0);

    if (!result.readyNodes.isEmpty()) {
        pollTimeout = 0;
    }
    // 發送請求到Kafka叢集
    sendProduceRequests(batches, now);
    return pollTimeout;
}
           

sendProduceRequests

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
    if (batches.isEmpty())
        return;

    final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

    // 找到建立記錄集時使用的最小版本
    byte minUsedMagic = apiVersions.maxUsableProduceMagic();
    for (ProducerBatch batch : batches) {
        if (batch.magic() < minUsedMagic)
            minUsedMagic = batch.magic();
    }

    ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
    for (ProducerBatch batch : batches) {
        TopicPartition tp = batch.topicPartition;
        MemoryRecords records = batch.records();

        if (!records.hasMatchingMagic(minUsedMagic))
            records = batch.records().downConvert(minUsedMagic, 0, time).records();

        ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
        if (tpData == null) {
            tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
            tpd.add(tpData);
        }

        tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
                .setIndex(tp.partition())
                .setRecords(records));
        recordsByPartition.put(tp, batch);
    }

    String transactionalId = null;
    if (transactionManager != null && transactionManager.isTransactional()) {
        transactionalId = transactionManager.transactionalId();
    }
    // 将ProducerBatch轉換為ProduceRequest
    ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
            new ProduceRequestData()
                    .setAcks(acks)
                    .setTimeoutMs(timeout)
                    .setTransactionalId(transactionalId)
                    .setTopicData(tpd));

    RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());

     // 将ProduceRequest轉換為clientRequest
    ClientRequest clientRequest = client.newClientRequest(Integer.toString(destination), requestBuilder, now, acks != 0,
            requestTimeoutMs, callback);
    // 調用NetworkClient将消息寫入網絡發送出去
    client.send(clientRequest, now);
}
           
  • client.send 是調用NetworkClient#doSend的方法來發送資料的

NetworkClient#doSend

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
    // 校驗是否可用
    ensureActive();
    // 擷取目的地的node節點
    String nodeId = clientRequest.destination();
    if (!isInternalRequest) {
        if (!canSendRequest(nodeId, now))
            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
    }
    AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
    try {
        NodeApiVersions versionInfo = apiVersions.get(nodeId);
        short version;
        
        if (versionInfo == null) {
            version = builder.latestAllowedVersion();
          
        } else {
            version = versionInfo.latestUsableVersion(clientRequest.apiKey(),
                                                       builder.oldestAllowedVersion(),
                                                       builder.latestAllowedVersion());
        }
       // 真正的發送
        doSend(clientRequest, isInternalRequest, now, builder.build(version));
    } catch (UnsupportedVersionException unsupportedVersionException) {
        
        ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
                                                            clientRequest.callback(),
                                                            clientRequest.destination(), now, now, false,
                                                            unsupportedVersionException, null, null);
        if (!isInternalRequest)
            abortedSends.add(clientResponse);
        else if (clientRequest.apiKey() == ApiKeys.METADATA)
            metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
    }
}

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
    String destination = clientRequest.destination();
    RequestHeader header = clientRequest.makeHeader(request.version());
   
    Send send = request.toSend(header);
    InFlightRequest inFlightRequest = new InFlightRequest(
        clientRequest,
        header,
        isInternalRequest,
        request,
        send,
        now
    );
    // InFlightRequest(飛行隊列)表示請求已經發送,但是還沒有得到響應
    this.inFlightRequests.add(inFlightRequest);
    selector.send(new NetworkSend(destination, send));
}
           

selector.send

/**
 * 主要實作了 Kafka 用戶端的網絡請求的排隊功能,能夠将網絡請求加入到發送隊列中,等待後續的 poll 方法進行發送
 *
 * @param send The request to send
 */
public void send(NetworkSend send) {
    // 擷取目标連接配接的連接配接 ID
    String connectionId = send.destinationId();
    //獲得 KafkaChannel 對象,
    KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
    //若連接配接正在關閉,則将連接配接 ID 添加到 failedSends 隊列中
    if (closingChannels.containsKey(connectionId)) {
        this.failedSends.add(connectionId);
    } else {
        try {
            //将網絡請求交給 KafkaChannel 對象處理,暫存資料預發送,并沒有真正的發送
            channel.setSend(send);
        } catch (Exception e) {
            // 如果 KafkaChannel 對象在處理過程中抛出異常,将連接配接狀态設定為 FAILED_SEND,并将連接配接 ID 添加到 failedSends 隊列中,然後關閉連接配接,并将異常向上抛出,以便上層代碼處理
            channel.state(ChannelState.FAILED_SEND);
            this.failedSends.add(connectionId);
            close(channel, CloseMode.DISCARD_NO_NOTIFY);
            if (!(e instanceof CancelledKeyException)) {
                throw e;
            }
        }
    }
}

           

poll

public void poll(long timeout) throws IOException {
    // 逾時時間是否小于 0
    if (timeout < 0) {
        throw new IllegalArgumentException("timeout should be >= 0");
    }

    boolean madeReadProgressLastCall = madeReadProgressLastPoll;
    clear();

    boolean dataInBuffers = !keysWithBufferedRead.isEmpty();

    if (!immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers)) {
        timeout = 0;
    }

    //檢查記憶體是否已經不足
    if (!memoryPool.isOutOfMemory() && outOfMemory) {
        
        for (KafkaChannel channel : channels.values()) {
            if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
                channel.maybeUnmute();
            }
        }
        outOfMemory = false;
    }

    /* 檢測已經準備好的keys */
    long startSelect = time.nanoseconds();
    //Java NIO 庫提供的 select 方法
    int numReadyKeys = select(timeout);
    long endSelect = time.nanoseconds();
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds(), false);

    if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
        Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

        // 從緩沖了資料的通道進行輪詢(但不再從底層套接字進行輪詢)
        if (dataInBuffers) {
            keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
            Set<SelectionKey> toPoll = keysWithBufferedRead;
            keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
            pollSelectionKeys(toPoll, false, endSelect);
        }

        // 從底層套接字擁有更多資料的通道進行輪詢
        pollSelectionKeys(readyKeys, false, endSelect);

        // 清除所有標明的鍵,使它們從下一次選擇的就緒計數中排除
        readyKeys.clear();

        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        immediatelyConnectedKeys.clear();
    } else {
        madeReadProgressLastPoll = true; 
    }

    long endIo = time.nanoseconds();
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds(), false);

    // 關閉被延遲的通道,現在可以關閉了
    completeDelayedChannelClose(endIo);

    // 我們使用select末尾的時間來確定不會關閉pollSelectionKeys中剛剛處理的任何連接配接
    maybeCloseOldestConnection(endSelect);
}


           

總結

Kafka 生産者的設計具有多個精妙之處,其中包括:

  • 高效的異步發送:Kafka 生産者使用 RecordAccumulator 進行消息緩存,并利用 Sender 線程異步發送消息,這種設計可以提高消息發送的吞吐量。
  • 批量發送:Kafka 生産者可以将多個消息批量發送,進而減少網絡開銷和服務端的負載壓力。
  • 可靠的重試機制:Kafka 生産者使用重試機制來保證消息能夠成功發送,當消息發送失敗時,生産者會自動進行重試,直到消息發送成功或者達到最大重試次數。
  • 動态分區配置設定:Kafka 生産者可以根據生産者和分區的數量動态配置設定分區,進而實作負載均衡和優化網絡使用。
  • 可配置的消息壓縮:Kafka 生産者支援多種消息壓縮算法,可以根據實際需求進行配置,進而減少網絡傳輸的資料量。

遺留

  • RecordAccumulator對記憶體的操作邏輯沒有分析透徹
  • selector#poll底層的邏輯也沒有分析透徹

為幫助開發者們提升面試技能、有機會入職BATJ等大廠公司,特别制作了這個專輯——這一次整體放出。

大緻内容包括了: Java 集合、JVM、多線程、并發程式設計、設計模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat等大廠面試題等、等技術棧!

Kafka3.0源碼分析-生産者的實作細節

歡迎大家關注公衆号【Java爛豬皮】,回複【666】,擷取以上最新Java後端架構VIP學習資料以及視訊學習教程,然後一起學習,一文在手,面試我有。

每一個專欄都是大家非常關心,和非常有價值的話題,如果我的文章對你有所幫助,還請幫忙點贊、好評、轉發一下,你的支援會激勵我輸出更高品質的文章,非常感謝!

Kafka3.0源碼分析-生産者的實作細節