天天看點

RocketMQ 消息過濾

引言

前面我們已經簡單地介紹了 RocketMQ 的整體設計思路,本文着重其中消息過濾部分的實作細節,更多關于 RocketMQ 的文章均收錄于

<RocketMQ系列文章>

;

FilterServer 過濾器

RocketMQ 提供了基于表達式與基于類模式兩種過濾模式,前面已經詳細介紹了整個消息拉取、基于表達式(TAG)的過濾模式。基于類模式過濾是指在 Broker 端運作 1 個或多個消息過濾伺服器(FilterServer), RocketMQ 允許消息消費者自定義消息過濾實作類并将其代碼上傳到 FilterServer 上,消息消費者向 FilterServer 拉取消息,FilterServer将消息消費者的拉取指令轉發到 Broker,然後對傳回的消息執行消息過濾邏輯,最終将消息傳回給消費端,其工作原理如下圖所示。

RocketMQ 消息過濾
  1. Broker 程序所在的伺服器會啟動多個 FilterServer 程序
  2. 消費者在訂閱消息主題時會上傳一個自定義的消息過濾實作類,FilterServer 加載并執行個體化
  3. 消息消費者(Consume)向 FilterServer 發送消息拉取請求,FilterServer 接收到消費者消息拉取請求後,FilterServer 将消息拉取請求轉發給 Broker, Broker 傳回消息後在 FilterServer 端執行消息過濾邏輯,然後傳回符合條件的消息給消費者進行消費

FilterServer 注冊

FilterServer 從配置檔案中擷取 Broker位址,然後将自己的IP與端口發送到 Broker 伺服器。随後 Broker 會在其記憶體中維護一個 FilterServer 清單,此後 FilterServer 和 Broker 之間還會通過心跳來維持注冊關系,如果超過 30s 未收到心跳,則會删除關于該 FilterServer 的資訊。

為了防止 FilterServer 由于 Crash 而越來越少,Broker 也會定時檢查目前 FilterServer 的數量,如果數量小于門檻值,則自動建立一個 FilterServer。

還記得前面說過的 Broker 每隔 30s 會向 NameServer 發送心跳包麼,在心跳包中就包含該 Broker 的所有 FilterServer 資訊,消息的消費者就是從 NameServer 中擷取到該 Broker 的所有 FilterServer 資訊的。

總結一下:FilterServer 在啟動時向 Broker 注冊自己,在 Broker 端維護該 Broker 的 FilterServer 資訊,并定時監控 FilterServer 的狀态,然後 Broker 通過與所有 NameServer 的心跳包向 NameServer 注冊 Broker 上存儲的 FilterServer 清單,指引消息消費者正确從 FilterServer 上拉取消息。

類過濾機制

  1. 消費者查詢需要訂閱的主題所在的 Broker 和其對應的 FilterServer
  2. 周遊所有 FilterServer 并發送過濾代碼
  3. FilterServer 先通過CRC驗證源碼的正确性,然後根據消費組名+topic 儲存其過濾代碼,最後進行編譯

如果 FilterServer 設定為不允許直接編譯消費者上傳的類,則會開啟一個定時任務,每隔一段時間從指定的遠端伺服器下載下傳對應的過濾代碼。而遠端伺服器的過濾代碼上傳,就需要進行适當的檢查,防止圖謀不軌的代碼上傳。

消息拉取模式

RocketMQ 消息的過濾發生在消息消費的時候,PullMessageService 線程預設從 Broker 上拉取消息,執行相關的過濾邏輯,在 FilterServer 過濾模式下,PullMessageService 線程将拉取位址由原來的 Broker 位址轉換成随機一個 FilterServer 位址。

文章說明

更多有價值的文章均收錄于

貝貝貓的文章目錄
RocketMQ 消息過濾

版權聲明: 本部落格所有文章除特别聲明外,均采用 BY-NC-SA 許可協定。轉載請注明出處!

創作聲明: 本文基于下列所有參考内容進行創作,其中可能涉及複制、修改或者轉換,圖檔均來自網絡,如有侵權請聯系我,我會第一時間進行删除。

參考内容

[1]《RocketMQ技術内幕》

[2]《RocketMQ實戰與原了解析》

[3]

老生常談——利用消息隊列處理分布式事務

[4]

RocketMQ架構解析

[5]

MappedByteBuffer VS FileChannel 孰強孰弱?

[6]

檔案 IO 操作的一些最佳實踐

[7]

海量資料處理之Bloom Filter詳解

[8]

rocketmq GitHub Wiki

繼續閱讀