天天看點

Producer和Consumer

producer直接将資料發送到broker的leader(主節點),不需要在多個節點進行分發。為了幫助producer做到這點,所有的Kafka節點都可以及時的告知:哪些節點是活動的,目标topic目标分區的leader在哪。這樣producer就可以直接将消息發送到目的地了。

用戶端控制消息将被分發到哪個分區。可以通過負載均衡随機的選擇,或者使用分區函數。Kafka允許使用者實作分區函數,指定分區的key,将消息hash到不同的分區上(當然有需要的話,也可以覆寫這個分區函數自己實作邏輯).比如如果你指定的key是user id,那麼同一個使用者發送的消息都被發送到同一個分區上。經過分區之後,consumer就可以有目的的消費某個分區的消息。

批量發送可以很有效的提高發送效率。Kafka producer的異步發送模式允許進行批量發送,先将消息緩存在記憶體中,然後一次請求批量發送出去。這個政策可以配置的,比如可以指定緩存的消息達到某個量的時候就發出去,或者緩存了固定的時間後就發送出去(比如100條消息就發送,或者每5秒發送一次)。這種政策将大大減少服務端的I/O次數。

既然緩存是在producer端進行的,那麼當producer崩潰時,這些消息就會丢失。Kafka0.8.1的異步發送模式還不支援回調,就不能在發送出錯時進行處理。Kafka 0.9可能會增加這樣的回調函數。見Proposed Producer API.

Kafa consumer消費消息時,向broker發出"fetch"請求去消費特定分區的消息。consumer指定消息在日志中的偏移量(offset),就可以消費從這個位置開始的消息。customer擁有了offset的控制權,可以向後復原去重新消費之前的消息,這是很有意義的。

Kafka最初考慮的問題是,customer應該從brokes拉取消息還是brokers将消息推送到consumer,也就是pull還push。在這方面,Kafka遵循了一種大部分消息系統共同的傳統的設計:producer将消息推送到broker,consumer從broker拉取消息。

一些消息系統比如Scribe和Apache Flume采用了push模式,将消息推送到下遊的consumer。這樣做有好處也有壞處:由broker決定消息推送的速率,對于不同消費速率的consumer就不太好處理了。消息系統都緻力于讓consumer以最大的速率最快速的消費消息,但不幸的是,push模式下,當broker推送的速率遠大于consumer消費的速率時,consumer恐怕就要崩潰了。最終Kafka還是選取了傳統的pull模式。

Pull模式的另外一個好處是consumer可以自主決定是否批量的從broker拉取資料。Push模式必須在不知道下遊consumer消費能力和消費政策的情況下決定是立即推送每條消息還是緩存之後批量推送。如果為了避免consumer崩潰而采用較低的推送速率,将可能導緻一次隻推送較少的消息而造成浪費。Pull模式下,consumer就可以根據自己的消費能力去決定這些政策。

Pull有個缺點是,如果broker沒有可供消費的消息,将導緻consumer不斷在循環中輪詢,直到新消息到t達。為了避免這點,Kafka有個參數可以讓consumer阻塞知道新消息到達(當然也可以阻塞知道消息的數量達到某個特定的量這樣就可以批量發送)。

對消費消息狀态的記錄也是很重要的。

大部分消息系統在broker端的維護消息被消費的記錄:一個消息被分發到consumer後broker就馬上進行标記或者等待customer的通知後進行标記。這樣也可以在消息在消費後立馬就删除以減少空間占用。

但是這樣會不會有什麼問題呢?如果一條消息發送出去之後就立即被标記為消費過的,一旦consumer處理消息時失敗了(比如程式崩潰)消息就丢失了。為了解決這個問題,很多消息系統提供了另外一個個功能:當消息被發送出去之後僅僅被标記為已發送狀态,當接到consumer已經消費成功的通知後才标記為已被消費的狀态。這雖然解決了消息丢失的問題,但産生了新問題,首先如果consumer處理消息成功了但是向broker發送響應時失敗了,這條消息将被消費兩次。第二個問題時,broker必須維護每條消息的狀态,并且每次都要先鎖住消息然後更改狀态然後釋放鎖。這樣麻煩又來了,且不說要維護大量的狀态資料,比如如果消息發送出去但沒有收到消費成功的通知,這條消息将一直處于被鎖定的狀态,

Kafka采用了不同的政策。Topic被分成了若幹分區,每個分區在同一時間隻被一個consumer消費。這意味着每個分區被消費的消息在日志中的位置僅僅是一個簡單的整數:offset。這樣就很容易标記每個分區消費狀态就很容易了,僅僅需要一個整數而已。這樣消費狀态的跟蹤就很簡單了。

這帶來了另外一個好處:consumer可以把offset調成一個較老的值,去重新消費老的消息。這對傳統的消息系統來說看起來有些不可思議,但确實是非常有用的,誰規定了一條消息隻能被消費一次呢?consumer發現解析資料的程式有bug,在修改bug後再來解析一次消息,看起來是很合理的額呀!

進階的資料持久化允許consumer每個隔一段時間批量的将資料加載到線下系統中比如Hadoop或者資料倉庫。這種情況下,Hadoop可以将加載任務分拆,拆成每個broker或每個topic或每個分區一個加載任務。Hadoop具有任務管理功能,當一個任務失敗了就可以重新開機而不用擔心資料被重新加載,隻要從上次加載的位置繼續加載消息就可以了。

繼續閱讀