天天看點

從流處理來深入了解消息隊列

作者:搬山道猿

1. 流處理

“流” 是指随着時間的推移逐漸可用的資料,是以流處理認為資料是 無界限的,它們會随着時間的推移而逐漸達到。

流處理介于線上處理和批處理(離線處理)之間,是以又被稱為 準實時 或 準線上 處理。它和批處理相似:它們消費輸入并産生輸出(并不需要響應請求),不同的是流處理在 事件(event) 發生時會盡快處理,而批處理需要等待若幹資料準備好之後,才進行處理,這種差異使流處理系統比起批處理系統具有 更低的延遲。在流處理系統中,一個事件由 生産者 生成一次,然後可能被多個 消費者 進行處理,相關的事件通常被聚合為一個主題(topic)或流(stream)。

2. 消息系統

消息系統是典型的流處理系統,它能在新事件出現時立即通知消費者,這樣就能保證對新事件進行低延遲的連續處理,也是以避免了消費者通過輪詢機制檢查新事件産生開銷。

像TCP信道這種直接通信的形式是比較簡單的消息系統,它使用生産者和消費者直接進行網絡通信,不過這種形式的消息系統容錯程度極為有限:如果消費者當機,即使生産者有逾時重傳的機制也會導緻消息丢失;如果生産者當機,那麼需要它進行逾時重傳的消息和緩沖隊列中的消息都會丢失。

為了解決容錯程度較低的問題,可以采用 消息隊列(message queue) 來對消息進行管理,它的本質上是一種 針對消息流而優化的資料庫,生産者将消息寫入消息隊列,消費者從消息隊列進行讀取,通過将 資料持久化 轉移到消息隊列上,來提高生産者和消費者用戶端對消息丢失的容忍程度。

3. 消息隊列

消費者對消息隊列中消息的消費通常是 異步 的,當生産者發送消息時,通常隻會等待消息隊列确認消息已經被緩存,而不會等待消費者來處理消息。消費者對消息的消費通常在幾分之一秒内,如果發生消息積壓的情況,會出現明顯的延遲。

3.1 基于 JMS/AMQP 标準的消息隊列

我們所熟悉的 RabbitMQ 就是對 JMS/AMQP 标準的實作。如果在消息處理代價比較高昂,并且希望 并行處理 以及 消息的順序沒那麼重要 的情況下,這種消息隊列是非常合适的選擇。不過 JMS/AMQP 風格的消息隊列在消費者收到消息後可能會将該消息在消息隊列中移除,那麼如果此時再加入新的消費者,隻能接收到該消費者注冊之後的消息了。為了能夠消費先前的消息和獲得對消息持久化的能力,便提出了基于 日志 的消息隊列。

JMS 即Java消息服務(Java Message Service)應用程式接口,是一個 Java 平台中關于面向消息中間件(MOM)的 API,它是一種技術規範。用于在兩個應用程式之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平台無關的 API,絕大多數 MOM提供商都對 JMS提供支援。

AMQP 的全稱是 Advanced Message Queuing Protocol,一個提供統一消息服務的應用層标準進階消息隊列協定,是應用層協定的一個開放标準,為面向消息的中間件設計。基于此協定的用戶端與消息中間件可傳遞消息,并不受用戶端/中間件不同産品,不同的開發語言等條件的限制。

3.2 基于日志的消息隊列

Apache Kafka 是基于日志的消息隊列。日志僅是在磁盤上簡單地追加記錄消息序列,生産者通過将消息追加到日志末尾來對消息進行持久化,而消費者則通過依次讀取日志來接收消息,當消費者讀取到日志末尾時,則會等待新消息追加的通知。

為了提高單個磁盤的吞吐量,可以進行分區處理,如下圖所示:

從流處理來深入了解消息隊列

這些分區對應的主題可以了解為攜帶了一組相同類型消息的分區,不同的分區可以托管在不同的伺服器上,這樣就使得每個分區都有一份能獨立于其他分區進行讀寫的日志,進而提高吞吐量。

在每個分區中,消息隊列為每條消息都會配置設定一個單調遞增的 偏移量,以此來保證消息在分區内的有序,但是并不支援跨分區的順序保證。消息隊列為每個消費者維護一個偏移量即可記錄消費者的消費進度,而無需跟蹤每一條消息,如果消費者節點失效,則消費者的分區将指派給其他消費者節點,并從最後記錄的偏移量開始消費;消費者也可以通過指定偏移量來對先前的消息進行消費,不過如果消費者已經處理了後續的消息,但還沒記錄它們的偏移量,那麼消費者節點發生失效重新開機後,這些消息将被消費兩次。

在消息吞吐量很高,消息能被迅速處理 且 順序很重要 的情況下,基于日志的消息隊列是合适的選擇。

3.3 消息的傳遞模式

當多個消費者從同一主題讀取消息時,有兩種主要的消息傳遞模式,如下圖所示:

從流處理來深入了解消息隊列
  • 負載均衡(load balancing):每個消息隻被傳遞給 消費者之一,是以處理該主題下的消息能被消費者共享。代理可以為消費者任意配置設定消息,當在處理消息代價比較高昂時,希望能并行處理消息時,此模式非常有用
  • 扇出(fan-out):每條消息都被傳遞給 所有消費者

以上兩種模式可以組合使用:兩個獨立的消費者組可以訂閱同一主題,每一組都共同收到所有消息,而在每一組内,隻由單個節點來處理消息。

如果采用的是消息隊列向消費者 推送 消息的模式,為了確定消息被消費,消費者在消費完消息時需要向消息隊列發送确認(ACK),供消息隊列判斷是否需要逾時重傳。

當發生消息積壓時(生産者發送消息的速率大于消費者消費消息的速率),可以采用如下三種方式解決:

  • 丢棄消息
  • 為積壓的消息建立緩沖區
  • 降低生産者發送消息的速率(流量控制)

3.4 消息隊列與資料庫的差異

我們在前文說過:消息隊列是一種 針對消息流而優化的資料庫,那麼它與資料庫又有什麼差別呢?

  • 在資料的儲存機制上:資料庫通常保留資料直至顯式删除;基于 JMS/AMQP 标準的消息隊列在消息成功發送給消費者時會自動删除消息,基于日志的消息隊列會在磁盤中以日志的形式對消息做持久化處理
  • 資料搜尋方式上:資料庫通常支援次級索引和各種搜尋資料的方式;消息隊列通常支援按照某種模式比對主題,訂閱其子集。雖然機制并不一樣,但對于用戶端來說都是選擇想要了解資料的一部分
  • 查詢結果的時效上:查詢資料庫時,結果通常基于某個時間點的資料快照,如果另一個用戶端随後向資料庫寫入一些改變了查詢結果的内容,則第一個用戶端不會發現其先前結果現已過期(快照隔離);消息隊列不支援任意查詢,當資料發生變化時(即新消息可用時),它們會通知用戶端