下面代码来自kafka源码包里的<code>examples</code>包
创建生产者的源码:
异步发送模式还传递了一个Callback回调类,当客户端发送完一条消息,并且这条消息成功地存储至Kafka集群后, 服务端会调用匿名回调类的回调方法( onCompletion)。 异步发送指的是生产者发送完一条消息后,不需要关心服务端处理完了没有 ,可以接着发送下一条消息 。 服务端在处理完每一条消息后,会自动触发回调函数,返回响应结果给客户端。 如果是以同步模式发送消息,生产者客户端应用程序不需要提供回调函数。 同步模式下,生产者发送完一条消息后,必须等待服务端返回响应结果,然后才能发送下一条消息 。
KafkaProducer 只用了一个 send 方法,就可以完成同步和l异步两种模式的消息发迭,这是因为 send方法返回的是一个 Future。 基于Future ,我们可以实现同步或异步的消息发送话义。
同步。
调用 send返回 Future时, 需要立即调用get ,因为Future.get在没有返回结果时会一直 阻塞。
异步。
提供一个回调,调用 send后可以继续发送消息而不用等待 。 当有结果运回时,会自动执行回调函数。