文章目录
-
-
- 生产者发送消息的整体流程
- 消息追加器 `RecordAccumulator`
-
- `ProducerBatch` 批量消息
- `Sender` 线程
-
- `InFlightRequests`
- 重要参数
- 总结
- 与 `RocketMQ` 区别
- 知识补充
-
- `ByteBuffer`
-
- `ByteBuffer` 重要属性
- `ByteBuffer` 基本方法
- `ByteBuffer` 食用DEMO
- `ArrayDeque`
-
- 插入
- 删除
- 扩容
-
生产者发送消息的整体流程
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL6lEVPd3ZE9UeNpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL0UDOzUjNwIjM5ADOwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
消息追加器 RecordAccumulator
RecordAccumulator
前面几个组件,在 3.1 的文章中,已经说清楚。现在来看
RecordAccumulator
组件
RecordAccumulator
主要用于缓存消息,以便
Sender
线程能够批量发送消息。
RecordAccumulator
会将消息放入缓存
BufferPool
(实际上就是
ByteBuffer
) 中。
BufferPool
默认最大为
33554432B
,即
32MB
, 可通过
buffer.memory
进行配置。
当生产者生产消息的速度大于
sender
线程的发送速度,那么
send
方法就会阻塞。默认阻塞
60000ms
,可通过
max.block.ms
配置。
RecordAccumulator
类的几个重要属性
public final class RecordAccumulator {
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
// 缓存空间,默认 32MB,可通过上面说的 buffer.memory 参数进行配置
private final BufferPool free;
}
TopicPartition
为分区的抽象。定义如下所示
public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
private final String topic;
}
主线程发送的消息,都会被放入
batcher
中,
batches
将发往不同
TopicPartition
的消息,存放到各自的
ArrayDeque<ProducerBatch>
中。
主线程
append
时,往队尾插入,
sender
线程取出时,则往队头取出。
ProducerBatch
批量消息
ProducerBatch
ProducerBatch
为批量消息的抽象。
在编写客户端发送消息时,客户端面向的类则是
ProducerRecord
,
kafka
客户端,在发送消息时,会将
ProducerRecord
放入
ProducerBatch
,使消息更加紧凑。
如果为每个消息都独自创建内存空间,那么内存空间的开辟和释放,则将会比较耗时。因此
ProducerBatch
内部有一个
ByteBufferOutputStream bufferStream
(实则为
ByteBuffer
), 使用
ByteBuffer
重复利用内存空间。
bufferStream
值的大小为:
public final class RecordAccumulator {
// 该值大小,可通过 buffer.memory 配置
private final BufferPool free;
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
}
}
其中,
batchSize
默认
16384B
,即
16kb
,可通过
batch.size
配置。第2个入参的值则为消息的大小。
需要注意的是,
bufferStream
的内存空间是从
free
内存空间中划出的。
上面有说到,
ProducerBatch
会使用
ByteBuffer
追加消息。但是,如果你看代码,你会发现
ProducerBatch
在做消息的追加时,会将消息放入
DataOutputStream appendStream
。好像跟我们说的 不一样! 但是实际上,就是利用
ByteBuffer
,这里还需要看
appendStream
是如何初始化的!
注:MemoryRecordsBuilder 为 ProducerBatch 中的一个属性
public class MemoryRecordsBuilder {
private final ByteBufferOutputStream bufferStream;
private DataOutputStream appendStream;
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
Header[] headers) throws IOException {
ensureOpenForRecordAppend();
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - firstTimestamp;
// 往 appendStream 中追加消息
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
recordWritten(offset, timestamp, sizeInBytes);
}
}
MemoryRecordsBuilder
初始化
public class MemoryRecordsBuilder {
private final ByteBufferOutputStream bufferStream;
private DataOutputStream appendStream;
public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long baseOffset,
long logAppendTime,
long producerId,
short producerEpoch,
int baseSequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit) {
// ..省略部分代码
bufferStream.position(initialPosition + batchHeaderSizeInBytes);
this.bufferStream = bufferStream;
// 使用 bufferStream 包装
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}
}
可以看到实际上使用的还是
ByteBufferOutputStream bufferStream
Sender
线程
Sender
Sender
线程在发送消息时,会从
RecordAccumulator
中取出消息,并将放在
RecordAccumulator
中的
Deque<ProducerBatch>
转换成
Map<nodeId, List<ProducerBatch>>
,这里的
nodeId
是
kafka
节点的
id
。再发送给
kafka
之前,又会将消息封装成
Map<nodeId, ClientRequest>
。
请求在从
Sender
发往
kafka
时,还会被存入
InFlightRequests
public class NetworkClient implements KafkaClient {
/* the set of requests currently being sent or awaiting a response */
private final InFlightRequests inFlightRequests;
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
int latestClientVersion = clientRequest.apiKey().latestVersion();
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
clientRequest.correlationId(), destination);
} else {
log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
}
}
Send send = request.toSend(destination, header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
// 将请求放入
this.inFlightRequests.add(inFlightRequest);
selector.send(send);
}
}
InFlightRequests
InFlightRequests
/**
* The set of requests which have been sent or are being sent but haven't yet received a response
*/
final class InFlightRequests {
private final int maxInFlightRequestsPerConnection;
private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
}
InFlightRequests
的作用是存储已经发送的,或者发送了,但是未收到响应的请求。
InFlightRequests
类中有一个属性
maxInFlightRequestsPerConnection
, 标识一个节点最多可以缓存多少个请求。该默认值为
5
, 可通过
max.in.flight.requests.per.connection
进行配置, 需要注意的是
InFlightRequests
对象是在创建
KafkaProducer
时就会被创建。
requests
参数的
key
为
nodeId
,
value
则为缓存的请求。
sender
线程 在发送消息时,会先判断
InFlightRequests
对应的请求缓存中是否超过了
maxInFlightRequestsPerConnection
的大小
代码入口:
Sender.sendProducerData
public class Sender implements Runnable {
private long sendProducerData(long now) {
// ... 省略部分代码
while (iter.hasNext()) {
Node node = iter.next();
// todo 这里为代码入口
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// ... 省略部分代码
}
}
public class NetworkClient implements KafkaClient {
private boolean canSendRequest(String node, long now) {
return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
inFlightRequests.canSendMore(node);
}
}
final class InFlightRequests {
public boolean canSendMore(String node) {
Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
}
从
InFlightRequests
的设计中,可以看到,我们可以很轻松的就知道,哪个
kafka
节点的负载是最低。因为只需要判断
requests
中对应
node
集合的大小即可。
重要参数
-
用于指定分区中需要有多少个副本收到消息,生产者才会认为消息是被写入的acks
= 1。默认为1, 只要acks
副本写入,则被认为已经写入。如果消息已经被写入leader
副本,且已经返回给生产者leader
,但是在ok
拉取follower
消息之前,leader
副本突然挂掉,那么此时消息也会丢失leader
= 0。发送消息后,不需要等待服务端的响应,此配置,吞吐量最高。acks
= -1 或者 all。需要等待所有acks
ISR
中的所有副本都成功写入消息之后,才会收到服务端的成功响应。
需要注意的一点是
入参是acks
,而不是String
int
-
客户端允许发送的消息最大长度,默认为max.request.size
.1MB
-
、retries
retry.backoff.ms
配置生产者的重试次数,默认为 .retries
配置两次重试的间隔时间retry.backoff.ms
-
指定消息的压缩方式,默认为compression.type
。可选配置none
,gzip
,snappy
lz4
-
指定在多久之后关闭闲置的连接,默认connection.max.idle.ms
,即540000(ms)
9分钟
-
指定发送linger.ms
之前等待更多的消息(ProducerBatch
) 加入ProducerRecord
的时间,默认为 。生产者在ProducerBatch
填充满时,或者等待时间超过ProducerBatch
发送消息出去。linger.ms
-
设置receive.buffer.bytes
接收消息缓存区的大小,默认Socket
,32678B
。如果设置为32KB
, 则表示使用 操作系统的默认值。如果-1
和Procuer
处于不同的机房,可以调大此参数。kafka
-
设置send.buffer.bytes
发送消息缓冲区大小。默认Socket
, 即131072B
。如果设置为128KB
,则使用操作系统的默认值-1
-
request.timeout.ms
等待响应的最长时间,默认Producer
。需要注意的是,该参数需要比30000ms
值更大。可以减少因客户端重试,而造成的消息重复replica.lag.time.max.ms
-
配置消息追加器,内存大小。默认最大为buffer.memory
,即33554432B
32MB
-
batch.size
ProducerBatch
。默认ByteBuffer
,即16384B
16kb
-
生产者生成消息过快时,客户端最多阻塞多少时间。max.block.ms
总结
-
将生产者生产消息,消息发送给服务端,拆成了 2 个过程。生产消息交由 主线程, 消息发送给服务端的任务交由kafka
线程。sender
- 通过
的设计,将生产消息,与发送消息解耦。RecordAccumulator
-
内部存储数据的数据结构是RecordAccumulator
. 队尾追加消息,队头取出消息ArrayDeque
- 开发人员编写的
,在消息发送之前会被转为ProducerRecord
。为的是批量发送消息,提高网络 IO 效率ProducetBatch
- 为了避免,每个节点负载过高,
设计了kafka
, 将为响应的消息放入其中InFlightRequests
- 从源码角度,
最好是buffer.memory
整数倍大小。因为buffer.memory
的ProducerBatch
是从ByteBuffer
的RecordAccumulator
中划出的ByteBuffer
与 RocketMQ
区别
RocketMQ
-
没有将生产消息与发送消息解耦。RocketMQ
-
的消息发送,分为 同步,异步、单向。其中单向发送与RocketMQ
的kafka
= 0 的配置效果一样。但是实际上,还得看acks
的刷盘配置!broker
-
发送失败,默认不重试,kafka
默认重试 2 次。不过RocketMQ
无法配置 2 次重试的间隔时间.RocketMQ
可以配置重试的间隔时间。kafka
-
默认消息最大为RocketMQ
,4MB
默认kafka
1MB
-
在消息的发送上,是直接使用RocketMQ
。Netty
则是使用kafka
自己实现通信。(虽说,NIO
也是基于Netty
)NIO
- 当然还有很多咯…, 因为设计完全不一样!,实际解决场景也不一样
知识补充
ByteBuffer
ByteBuffer
ByteBuffer
一般用于网络传输的缓冲区。
先来看下
ByteBuffer
的类继承体系
ByteBuffer
主要的 2 个父类。
DirectByteBuffer
、
HeapByteBuffer
。一般而言,我们主要的是使用
HeapByteBuffer
。
ByteBuffer
重要属性
ByteBuffer
-
当前读取的位置position
-
为某一读过的位置做标记,便于某些时候回退到该位置mark
-
读取的结束位置limit
-
capacity
大小buffer
ByteBuffer
基本方法
ByteBuffer
-
往put()
中写数据,并将buffer
往前移动position
-
将flip()
设置为0,position
设置为当前位置limit
-
将rewind()
设置为0,position
不变limit
-
将mark()
设置为当前mark
值,调用position
, 会将reset()
赋值给mark
position
-
将clear()
设置为0,position
设置为limit
capacity
ByteBuffer
食用DEMO
ByteBuffer
FileInputStream fis = new FileInputStream("/Users/chenshaoping/text.txt");
FileChannel channel = fis.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = channel.read(buffer);
while (read != -1) {
System.out.println(new String(buffer.array(), Charset.defaultCharset()));
buffer.clear();
read = channel.read(buffer);
}
ArrayDeque
ArrayDeque
ArrayDeque
,是一个双端队列。即可以从队头插入元素,也可以从队尾插入元素
对于双端队列,既可以使用 链表的方式实现,也可以使用数组的方式实现。
JDK
中
LinkedList
使用链表实现,
ArrayDeque
则使用数组的方式实现
来看
ArrayDeque
的实现。
ArrayDeque 中,有
head
,
tail
分别指向 头指针,和尾指针。可以把
ArrayDeque
想象成循环数组
插入
- 当往队尾插入元素时,tail 指针会往后走
kafka 系列 -- 3.2、生产者客户端原理分析 - 当往队前插入元素时, head 指针会向前走
kafka 系列 -- 3.2、生产者客户端原理分析
删除
- 从队头删除元素 4 ,
会往前走head
kafka 系列 -- 3.2、生产者客户端原理分析 - 从队尾删除元素 3,
会往后走tail
kafka 系列 -- 3.2、生产者客户端原理分析
可以看到,这里通过移动
head
,
tail
指针就可以删除元素了。
扩容
当
tail
、
head
都指向都一个位置时,则需要扩容
扩容会将数组的大小扩充为原来的 2 倍,然后重新将
head
指向数组
下标,
tail
指向数组的最后一个元素位置。
上面的数组,在重新扩容后,会变成下面这个样子
public class ArrayDeque<E> extends AbstractCollection<E>
implements Deque<E>, Cloneable, Serializable
{
private void doubleCapacity() {
assert head == tail;
int p = head;
int n = elements.length;
int r = n - p; // number of elements to the right of p
int newCapacity = n << 1;
if (newCapacity < 0)
throw new IllegalStateException("Sorry, deque too big");
Object[] a = new Object[newCapacity];
System.arraycopy(elements, p, a, 0, r);
System.arraycopy(elements, 0, a, r, p);
elements = a;
head = 0;
tail = n;
}
}