天天看点

Kafka快速入门系列(9) | Kafka的Producer API操作

  本篇博主带来的是Kafka的Producer API操作。

1. 消息发送流程

2. 无回调参数的API

3. 带回调函数的API

4. 同步发送API / 只是比异步多了一个.get()

  Kafka的Producer发送消息采用的是<code>异步发送</code>的方式。在消息发送的过程中,涉及到了<code>两个线程——main线程和Sender线程</code>,以及<code>一个线程共享变量——RecordAccumulator</code>。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

下图为KafkaProducer发送消息流程
Kafka快速入门系列(9) | Kafka的Producer API操作
Kafka快速入门系列(9) | Kafka的Producer API操作
相关参数: batch.size:只有数据积累到batch.size之后,sender才会发送数据。 linger.ms:如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。

1. 导入依赖

2. 编写代码

需要用到的类:

KafkaProducer:需要创建一个生产者对象,用来发送数据

ProducerConfig:获取所需的一系列配置参数

ProducerRecord:每条数据都要封装成一个ProducerRecord对象

3. 完整代码

4. 群起zookeeper和kafka

Kafka快速入门系列(9) | Kafka的Producer API操作

5. 启动一个consumer控制台

6. 跑代码进行测试

Kafka快速入门系列(9) | Kafka的Producer API操作
为什么是无序的,这是因为我们只能保证分区内有序。其他无序。

  回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

  <code>注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试</code>。

1. 代码

2. 结果

Kafka快速入门系列(9) | Kafka的Producer API操作

3. 大致流程图

Kafka快速入门系列(9) | Kafka的Producer API操作

  同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。

  由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。

1. 代码:

Kafka快速入门系列(9) | Kafka的Producer API操作

  本次的分享就到这里了,

Kafka快速入门系列(9) | Kafka的Producer API操作

  

\color{#FF0000}{看完就赞,养成习惯!!!}

看完就赞,养成习惯!!!^ _ ^ ❤️ ❤️ ❤️

  码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注我哦!