生産者api包含2個producers-<code>kafka.producer.syncproducer</code>和
<code>kafka.producer.async.asyncproducer。</code>示例代碼如下:
其目的就是通過一個單一的api向用戶端暴露所有的生産者功能。kafka生産者
可以處理多個生産者的排隊以及緩沖請求以及異步地分發批量的資料:
<code>kafka.producer.producer</code>對于多個生産者的請求資料(<code>producer.type=async</code>),在序列化和分發它們至相應的kafka節點分區之前,其有能力對它們進行批量處理。而批量處理的大小可由少量的配置參數完成。當資料進入至隊列,它們将被緩沖在隊列裡面,直到<code>queue.time</code>逾時或者達到了配置(<code>batch.size</code>)的批量處理的最大值.背景的異步線程(<code>kafka.producer.async.producersendthread</code>)負責将隊列裡的資料批量取出并讓<code>kafka.producer.eventhandler</code>進行序列化工作,且将資料發送至kafka相應的節點分區。通過設定<code>event.handler</code>配置參數,即可實作一個自定義的事件處理器(event handler)。不論對于植入自定義日志/跟蹤代碼,還是自定義監控邏輯,能在生産者隊列管道的不同階段注入回調函數是極其有幫助的。一種可能的方案是通過實作<code>kafka.producer.async.callbackhandler</code>接口并且對該類設定<code>callback.handler</code>配置參數。
通過使用者自定義的<code>encoder</code>實作對資料的序列化操作:
預設的<code>encoder``是</code>kafka.serializer.defaultencoder“`
通過使用者設定(可選)的<code>partitioner</code>提供基于軟體層面的負載均衡(slb):
<code>kafka.producer.partitioner</code>會影響到資料傳輸時的路由政策。
分區api使用key以及可用節點分區來傳回一個分區id。這個id通常用作有序<code>broker_ids</code>的索引,同時節點分區(partitions)将會用這個id挑選出一個分區去處理生産者的請求。預設的分區政策是是對key進行hash,并對分區數目取餘,即<code>hash(key)%numpartitions</code>。如果key為null,那麼将會挑選出一個随機的節點。如果想要實作自定義的分區政策,也可以通過設定<code>partitioner.class</code>配置參數實作。
kafka提供兩種級别的消費者apis。對于普通、簡單的消費者api,其僅包含對單個節點的連接配接,且可關閉發送給server網絡請求。這個api是完全無狀态的,每個網絡請求将攜帶偏移量,使用者可以根據自己的選擇是否保留這些中繼資料。
進階的消費者api不僅隐藏了kafka叢集的細節,而且可以消費叢集中的任意一台機器而不用關心其背後的網絡拓撲。同時,它也保留了消息是否被消費的狀态。另外,進階别的消費者api還支援對依據過濾表達式來對訂閱的topic進行過濾(譬如白名單或者黑名單等類似的正規表達式)
普通消費者api通常用于實作進階api,以及用于一些離線消費者,這些消費者對于保持狀态有特殊的要求
這個api圍繞疊代器并由kafkastream類實作。每個kafkastream表示從一個或多個伺服器的一個或多個分區的資訊流。每個流用于單線程處理,是以用戶端可以在建立調用中提供所需的流數。是以,流可能代表多個伺服器分區的合并(對應于處理線程的數量),但每個分區隻會流向一個流。
createmessagestreams方法調用已在某個topic注冊的consumer,這将導緻消費者/kafka節點配置設定的再平衡。api鼓勵在一個調用中建立多個主題流,以最小化這種重新平衡。createmessagestreamsbyfilter方法調用(額外的)注冊的watcher去發現新的符合被過濾的topic。注意通過createmessagestreamsbyfilter方法傳回的每個流可能會疊代多個topic的消息(譬如,過濾器中允許多個topic)
kafka網絡層是一個相當簡單的nio伺服器,這個将不會進行詳細的闡述。sendfile的實作是由<code>messageset</code>接口和<code>writeto</code>方法完成。這使得備份檔案的資訊集合,使用更有效的<code>transferto</code>實作而不是中間緩沖寫。線程模型是一個單線程和用來處理每個固定連接配接數的n個處理器線程組成。這種設計已經在其他地方進行了充分的測試,并且被公認為是簡單和快速的實作。該協定保持相當簡潔的形式,以便将來更多其他類型語言的用戶端實作。
消息由固定大小的head、可變長度的不透明密鑰鍵位元組數組和可變長度的不透明值位元組數組組成.消息頭包含如下的一些字段:
– crc32 用以檢測消息的截取和損壞
– 格式版本
– 鑒别器的一個屬性
– 時間戳
使鍵和值保持不透明是一個正确的确定:現在序列化包有很大的進展,任何特定的選擇都不适合所有的使用.更不用說,一個特定的應用程式使用卡夫卡可能會指定一個特定的序列化類型作為其使用的一部分。<code>messageset</code>接口僅僅隻是一個疊代器,用于疊代方法産生的消息,這個方法對nio通道進行批量讀取和寫入。