天天看点

kafka 系列 -- 3.2、生产者客户端原理分析

文章目录

      • 生产者发送消息的整体流程
      • 消息追加器 `RecordAccumulator`
        • `ProducerBatch` 批量消息
      • `Sender` 线程
        • `InFlightRequests`
      • 重要参数
      • 总结
      • 与 `RocketMQ` 区别
      • 知识补充
        • `ByteBuffer`
          • `ByteBuffer` 重要属性
          • `ByteBuffer` 基本方法
          • `ByteBuffer` 食用DEMO
        • `ArrayDeque`
          • 插入
          • 删除
          • 扩容

生产者发送消息的整体流程

kafka 系列 -- 3.2、生产者客户端原理分析

消息追加器

RecordAccumulator

前面几个组件,在 3.1 的文章中,已经说清楚。现在来看

RecordAccumulator

组件

RecordAccumulator

主要用于缓存消息,以便

Sender

线程能够批量发送消息。

RecordAccumulator

会将消息放入缓存

BufferPool

(实际上就是

ByteBuffer

) 中。

BufferPool

默认最大为

33554432B

,即

32MB

, 可通过

buffer.memory

进行配置。

当生产者生产消息的速度大于

sender

线程的发送速度,那么

send

方法就会阻塞。默认阻塞

60000ms

,可通过

max.block.ms

配置。

RecordAccumulator

类的几个重要属性

public final class RecordAccumulator {
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    // 缓存空间,默认 32MB,可通过上面说的 buffer.memory 参数进行配置
    private final BufferPool free;
}
           

TopicPartition

为分区的抽象。定义如下所示

public final class TopicPartition implements Serializable {
    private int hash = 0;
    private final int partition;
    private final String topic;
}
           

主线程发送的消息,都会被放入

batcher

中,

batches

将发往不同

TopicPartition

的消息,存放到各自的

ArrayDeque<ProducerBatch>

中。

主线程

append

时,往队尾插入,

sender

线程取出时,则往队头取出。

ProducerBatch

批量消息

ProducerBatch

为批量消息的抽象。

在编写客户端发送消息时,客户端面向的类则是

ProducerRecord

kafka

客户端,在发送消息时,会将

ProducerRecord

放入

ProducerBatch

,使消息更加紧凑。

如果为每个消息都独自创建内存空间,那么内存空间的开辟和释放,则将会比较耗时。因此

ProducerBatch

内部有一个

ByteBufferOutputStream bufferStream

(实则为

ByteBuffer

), 使用

ByteBuffer

重复利用内存空间。

bufferStream

值的大小为:

public final class RecordAccumulator {
    
    // 该值大小,可通过 buffer.memory 配置
    private final BufferPool free;
    
    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
       
    }
}
           

其中,

batchSize

默认

16384B

,即

16kb

,可通过

batch.size

配置。第2个入参的值则为消息的大小。

需要注意的是,

bufferStream

的内存空间是从

free

内存空间中划出的。

上面有说到,

ProducerBatch

会使用

ByteBuffer

追加消息。但是,如果你看代码,你会发现

ProducerBatch

在做消息的追加时,会将消息放入

DataOutputStream appendStream

。好像跟我们说的 不一样! 但是实际上,就是利用

ByteBuffer

,这里还需要看

appendStream

是如何初始化的!

注:MemoryRecordsBuilder 为 ProducerBatch 中的一个属性

public class MemoryRecordsBuilder {
    private final ByteBufferOutputStream bufferStream;
    private DataOutputStream appendStream;
    
    private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
                                     Header[] headers) throws IOException {
        ensureOpenForRecordAppend();
        int offsetDelta = (int) (offset - baseOffset);
        long timestampDelta = timestamp - firstTimestamp;
        // 往 appendStream 中追加消息
        int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
        recordWritten(offset, timestamp, sizeInBytes);
    }
}
           

MemoryRecordsBuilder

初始化

public class MemoryRecordsBuilder {
    private final ByteBufferOutputStream bufferStream;
    private DataOutputStream appendStream;
    
    public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
                                byte magic,
                                CompressionType compressionType,
                                TimestampType timestampType,
                                long baseOffset,
                                long logAppendTime,
                                long producerId,
                                short producerEpoch,
                                int baseSequence,
                                boolean isTransactional,
                                boolean isControlBatch,
                                int partitionLeaderEpoch,
                                int writeLimit) {
           
        // ..省略部分代码
        bufferStream.position(initialPosition + batchHeaderSizeInBytes);
        this.bufferStream = bufferStream;
        
        // 使用 bufferStream 包装
        this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
    }
}
           

可以看到实际上使用的还是

ByteBufferOutputStream bufferStream

Sender

线程

Sender

线程在发送消息时,会从

RecordAccumulator

中取出消息,并将放在

RecordAccumulator

中的

Deque<ProducerBatch>

转换成

Map<nodeId, List<ProducerBatch>>

,这里的

nodeId

kafka

节点的

id

。再发送给

kafka

之前,又会将消息封装成

Map<nodeId, ClientRequest>

请求在从

Sender

发往

kafka

时,还会被存入

InFlightRequests

public class NetworkClient implements KafkaClient {
    /* the set of requests currently being sent or awaiting a response */
    private final InFlightRequests inFlightRequests;
    
    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        String destination = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        if (log.isDebugEnabled()) {
            int latestClientVersion = clientRequest.apiKey().latestVersion();
            if (header.apiVersion() == latestClientVersion) {
                log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
                        clientRequest.correlationId(), destination);
            } else {
                log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
                        header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
            }
        }
        Send send = request.toSend(destination, header);
        InFlightRequest inFlightRequest = new InFlightRequest(
                clientRequest,
                header,
                isInternalRequest,
                request,
                send,
                now);
        
        // 将请求放入
        this.inFlightRequests.add(inFlightRequest);
        selector.send(send);
    }
}
           

InFlightRequests

/**
 * The set of requests which have been sent or are being sent but haven't yet received a response
 */
final class InFlightRequests {
    private final int maxInFlightRequestsPerConnection;
    private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
}
           

InFlightRequests

的作用是存储已经发送的,或者发送了,但是未收到响应的请求。

InFlightRequests

类中有一个属性

maxInFlightRequestsPerConnection

, 标识一个节点最多可以缓存多少个请求。该默认值为

5

, 可通过

max.in.flight.requests.per.connection

进行配置, 需要注意的是

InFlightRequests

对象是在创建

KafkaProducer

时就会被创建。

requests

参数的

key

nodeId

value

则为缓存的请求。

sender

线程 在发送消息时,会先判断

InFlightRequests

对应的请求缓存中是否超过了

maxInFlightRequestsPerConnection

的大小

代码入口:

Sender.sendProducerData

public class Sender implements Runnable {
    private long sendProducerData(long now) {
        // ... 省略部分代码
        while (iter.hasNext()) {
            Node node = iter.next();
            
            // todo 这里为代码入口
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }
        // ... 省略部分代码
    }
}

public class NetworkClient implements KafkaClient {
    private boolean canSendRequest(String node, long now) {
        return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
            inFlightRequests.canSendMore(node);
    }
}

final class InFlightRequests {
    public boolean canSendMore(String node) {
        Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }
}
           

InFlightRequests

的设计中,可以看到,我们可以很轻松的就知道,哪个

kafka

节点的负载是最低。因为只需要判断

requests

中对应

node

集合的大小即可。

重要参数

  1. acks

    用于指定分区中需要有多少个副本收到消息,生产者才会认为消息是被写入的

    acks

    = 1。默认为1, 只要

    leader

    副本写入,则被认为已经写入。如果消息已经被写入

    leader

    副本,且已经返回给生产者

    ok

    ,但是在

    follower

    拉取

    leader

    消息之前,

    leader

    副本突然挂掉,那么此时消息也会丢失

    acks

    = 0。发送消息后,不需要等待服务端的响应,此配置,吞吐量最高。

    acks

    = -1 或者 all。需要等待所有

    ISR

    中的所有副本都成功写入消息之后,才会收到服务端的成功响应。

    需要注意的一点是

    acks

    入参是

    String

    ,而不是

    int

  2. max.request.size

    客户端允许发送的消息最大长度,默认为

    1MB

    .
  3. retries

    retry.backoff.ms

    retries

    配置生产者的重试次数,默认为 .

    retry.backoff.ms

    配置两次重试的间隔时间
  4. compression.type

    指定消息的压缩方式,默认为

    none

    。可选配置

    gzip

    ,

    snappy

    ,

    lz4

  5. connection.max.idle.ms

    指定在多久之后关闭闲置的连接,默认

    540000(ms)

    ,即

    9分钟

  6. linger.ms

    指定发送

    ProducerBatch

    之前等待更多的消息(

    ProducerRecord

    ) 加入

    ProducerBatch

    的时间,默认为 。生产者在

    ProducerBatch

    填充满时,或者等待时间超过

    linger.ms

    发送消息出去。
  7. receive.buffer.bytes

    设置

    Socket

    接收消息缓存区的大小,默认

    32678B

    ,

    32KB

    。如果设置为

    -1

    , 则表示使用 操作系统的默认值。如果

    Procuer

    kafka

    处于不同的机房,可以调大此参数。
  8. send.buffer.bytes

    设置

    Socket

    发送消息缓冲区大小。默认

    131072B

    , 即

    128KB

    。如果设置为

    -1

    ,则使用操作系统的默认值
  9. request.timeout.ms

    Producer

    等待响应的最长时间,默认

    30000ms

    。需要注意的是,该参数需要比

    replica.lag.time.max.ms

    值更大。可以减少因客户端重试,而造成的消息重复
  10. buffer.memory

    配置消息追加器,内存大小。默认最大为

    33554432B

    ,即

    32MB

  11. batch.size

    ProducerBatch

    ByteBuffer

    。默认

    16384B

    ,即

    16kb

  12. max.block.ms

    生产者生成消息过快时,客户端最多阻塞多少时间。

总结

  1. kafka

    将生产者生产消息,消息发送给服务端,拆成了 2 个过程。生产消息交由 主线程, 消息发送给服务端的任务交由

    sender

    线程。
  2. 通过

    RecordAccumulator

    的设计,将生产消息,与发送消息解耦。
  3. RecordAccumulator

    内部存储数据的数据结构是

    ArrayDeque

    . 队尾追加消息,队头取出消息
  4. 开发人员编写的

    ProducerRecord

    ,在消息发送之前会被转为

    ProducetBatch

    。为的是批量发送消息,提高网络 IO 效率
  5. 为了避免,每个节点负载过高,

    kafka

    设计了

    InFlightRequests

    , 将为响应的消息放入其中
  6. 从源码角度,

    buffer.memory

    最好是

    buffer.memory

    整数倍大小。因为

    ProducerBatch

    ByteBuffer

    是从

    RecordAccumulator

    ByteBuffer

    中划出的

RocketMQ

区别

  1. RocketMQ

    没有将生产消息与发送消息解耦。
  2. RocketMQ

    的消息发送,分为 同步,异步、单向。其中单向发送与

    kafka

    acks

    = 0 的配置效果一样。但是实际上,还得看

    broker

    的刷盘配置!
  3. kafka

    发送失败,默认不重试,

    RocketMQ

    默认重试 2 次。不过

    RocketMQ

    无法配置 2 次重试的间隔时间.

    kafka

    可以配置重试的间隔时间。
  4. RocketMQ

    默认消息最大为

    4MB

    ,

    kafka

    默认

    1MB

  5. RocketMQ

    在消息的发送上,是直接使用

    Netty

    kafka

    则是使用

    NIO

    自己实现通信。(虽说,

    Netty

    也是基于

    NIO

  6. 当然还有很多咯…, 因为设计完全不一样!,实际解决场景也不一样

知识补充

ByteBuffer

ByteBuffer

一般用于网络传输的缓冲区。

先来看下

ByteBuffer

的类继承体系

kafka 系列 -- 3.2、生产者客户端原理分析

ByteBuffer

主要的 2 个父类。

DirectByteBuffer

HeapByteBuffer

。一般而言,我们主要的是使用

HeapByteBuffer

ByteBuffer

重要属性
  1. position

    当前读取的位置
  2. mark

    为某一读过的位置做标记,便于某些时候回退到该位置
  3. limit

    读取的结束位置
  4. capacity

    buffer

    大小

ByteBuffer

基本方法
  1. put()

    buffer

    中写数据,并将

    position

    往前移动
  2. flip()

    position

    设置为0,

    limit

    设置为当前位置
  3. rewind()

    position

    设置为0,

    limit

    不变
  4. mark()

    mark

    设置为当前

    position

    值,调用

    reset()

    , 会将

    mark

    赋值给

    position

  5. clear()

    position

    设置为0,

    limit

    设置为

    capacity

ByteBuffer

食用DEMO
FileInputStream fis = new FileInputStream("/Users/chenshaoping/text.txt");
FileChannel channel = fis.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);

int read = channel.read(buffer);
while (read != -1) {
    System.out.println(new String(buffer.array(), Charset.defaultCharset()));
    buffer.clear();
    read = channel.read(buffer);
}

           

ArrayDeque

ArrayDeque

,是一个双端队列。即可以从队头插入元素,也可以从队尾插入元素

对于双端队列,既可以使用 链表的方式实现,也可以使用数组的方式实现。

JDK

LinkedList

使用链表实现,

ArrayDeque

则使用数组的方式实现

来看

ArrayDeque

的实现。

ArrayDeque 中,有

head

,

tail

分别指向 头指针,和尾指针。可以把

ArrayDeque

想象成循环数组

插入
  1. 当往队尾插入元素时,tail 指针会往后走
    kafka 系列 -- 3.2、生产者客户端原理分析
  2. 当往队前插入元素时, head 指针会向前走
    kafka 系列 -- 3.2、生产者客户端原理分析
删除
  1. 从队头删除元素 4 ,

    head

    会往前走
    kafka 系列 -- 3.2、生产者客户端原理分析
  2. 从队尾删除元素 3,

    tail

    会往后走
    kafka 系列 -- 3.2、生产者客户端原理分析

可以看到,这里通过移动

head

,

tail

指针就可以删除元素了。

扩容

tail

head

都指向都一个位置时,则需要扩容

kafka 系列 -- 3.2、生产者客户端原理分析

扩容会将数组的大小扩充为原来的 2 倍,然后重新将

head

指向数组

下标,

tail

指向数组的最后一个元素位置。

上面的数组,在重新扩容后,会变成下面这个样子

kafka 系列 -- 3.2、生产者客户端原理分析
public class ArrayDeque<E> extends AbstractCollection<E>
                           implements Deque<E>, Cloneable, Serializable
{
    private void doubleCapacity() {
            assert head == tail;
            int p = head;
            int n = elements.length;
            int r = n - p; // number of elements to the right of p
            int newCapacity = n << 1;
            if (newCapacity < 0)
                throw new IllegalStateException("Sorry, deque too big");
            Object[] a = new Object[newCapacity];
            System.arraycopy(elements, p, a, 0, r);
            System.arraycopy(elements, 0, a, r, p);
            elements = a;
            head = 0;
            tail = n;
    }
}