本篇博主带来的是Kafka的Producer API操作。
1. 消息发送流程
2. 无回调参数的API
3. 带回调函数的API
4. 同步发送API / 只是比异步多了一个.get()
Kafka的Producer发送消息采用的是<code>异步发送</code>的方式。在消息发送的过程中,涉及到了<code>两个线程——main线程和Sender线程</code>,以及<code>一个线程共享变量——RecordAccumulator</code>。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
下图为KafkaProducer发送消息流程![]()
Kafka快速入门系列(9) | Kafka的Producer API操作 相关参数: batch.size:只有数据积累到batch.size之后,sender才会发送数据。 linger.ms:如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。![]()
Kafka快速入门系列(9) | Kafka的Producer API操作
1. 导入依赖
2. 编写代码
需要用到的类:
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord对象
3. 完整代码
4. 群起zookeeper和kafka
5. 启动一个consumer控制台
6. 跑代码进行测试
为什么是无序的,这是因为我们只能保证分区内有序。其他无序。
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。
<code>注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试</code>。
1. 代码
2. 结果
3. 大致流程图
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。
1. 代码:
本次的分享就到这里了,
看
完
就
赞
,
养
成
习
惯
!
\color{#FF0000}{看完就赞,养成习惯!!!}
看完就赞,养成习惯!!!^ _ ^ ❤️ ❤️ ❤️
码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注我哦!