生产者api包含2个producers-<code>kafka.producer.syncproducer</code>和
<code>kafka.producer.async.asyncproducer。</code>示例代码如下:
其目的就是通过一个单一的api向客户端暴露所有的生产者功能。kafka生产者
可以处理多个生产者的排队以及缓冲请求以及异步地分发批量的数据:
<code>kafka.producer.producer</code>对于多个生产者的请求数据(<code>producer.type=async</code>),在序列化和分发它们至相应的kafka节点分区之前,其有能力对它们进行批量处理。而批量处理的大小可由少量的配置参数完成。当数据进入至队列,它们将被缓冲在队列里面,直到<code>queue.time</code>超时或者达到了配置(<code>batch.size</code>)的批量处理的最大值.后台的异步线程(<code>kafka.producer.async.producersendthread</code>)负责将队列里的数据批量取出并让<code>kafka.producer.eventhandler</code>进行序列化工作,且将数据发送至kafka相应的节点分区。通过设置<code>event.handler</code>配置参数,即可实现一个自定义的事件处理器(event handler)。不论对于植入自定义日志/跟踪代码,还是自定义监控逻辑,能在生产者队列管道的不同阶段注入回调函数是极其有帮助的。一种可能的方案是通过实现<code>kafka.producer.async.callbackhandler</code>接口并且对该类设置<code>callback.handler</code>配置参数。
通过用户自定义的<code>encoder</code>实现对数据的序列化操作:
默认的<code>encoder``是</code>kafka.serializer.defaultencoder“`
通过用户设置(可选)的<code>partitioner</code>提供基于软件层面的负载均衡(slb):
<code>kafka.producer.partitioner</code>会影响到数据传输时的路由策略。
分区api使用key以及可用节点分区来返回一个分区id。这个id通常用作有序<code>broker_ids</code>的索引,同时节点分区(partitions)将会用这个id挑选出一个分区去处理生产者的请求。默认的分区策略是是对key进行hash,并对分区数目取余,即<code>hash(key)%numpartitions</code>。如果key为null,那么将会挑选出一个随机的节点。如果想要实现自定义的分区策略,也可以通过设置<code>partitioner.class</code>配置参数实现。
kafka提供两种级别的消费者apis。对于普通、简单的消费者api,其仅包含对单个节点的连接,且可关闭发送给server网络请求。这个api是完全无状态的,每个网络请求将携带偏移量,用户可以根据自己的选择是否保留这些元数据。
高级的消费者api不仅隐藏了kafka集群的细节,而且可以消费集群中的任意一台机器而不用关心其背后的网络拓扑。同时,它也保留了消息是否被消费的状态。另外,高级别的消费者api还支持对依据过滤表达式来对订阅的topic进行过滤(譬如白名单或者黑名单等类似的正则表达式)
普通消费者api通常用于实现高级api,以及用于一些离线消费者,这些消费者对于保持状态有特殊的要求
这个api围绕迭代器并由kafkastream类实现。每个kafkastream表示从一个或多个服务器的一个或多个分区的信息流。每个流用于单线程处理,所以客户端可以在创建调用中提供所需的流数。因此,流可能代表多个服务器分区的合并(对应于处理线程的数量),但每个分区只会流向一个流。
createmessagestreams方法调用已在某个topic注册的consumer,这将导致消费者/kafka节点分配的再平衡。api鼓励在一个调用中创建多个主题流,以最小化这种重新平衡。createmessagestreamsbyfilter方法调用(额外的)注册的watcher去发现新的符合被过滤的topic。注意通过createmessagestreamsbyfilter方法返回的每个流可能会迭代多个topic的消息(譬如,过滤器中允许多个topic)
kafka网络层是一个相当简单的nio服务器,这个将不会进行详细的阐述。sendfile的实现是由<code>messageset</code>接口和<code>writeto</code>方法完成。这使得备份文件的信息集合,使用更有效的<code>transferto</code>实现而不是中间缓冲写。线程模型是一个单线程和用来处理每个固定连接数的n个处理器线程组成。这种设计已经在其他地方进行了充分的测试,并且被公认为是简单和快速的实现。该协议保持相当简洁的形式,以便将来更多其他类型语言的客户端实现。
消息由固定大小的head、可变长度的不透明密钥键字节数组和可变长度的不透明值字节数组组成.消息头包含如下的一些字段:
– crc32 用以检测消息的截取和损坏
– 格式版本
– 鉴别器的一个属性
– 时间戳
使键和值保持不透明是一个正确的确定:现在序列化包有很大的进展,任何特定的选择都不适合所有的使用.更不用说,一个特定的应用程序使用卡夫卡可能会指定一个特定的序列化类型作为其使用的一部分。<code>messageset</code>接口仅仅只是一个迭代器,用于迭代方法产生的消息,这个方法对nio通道进行批量读取和写入。