天天看點

設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結

原文: 設計一個百萬級的消息推送系統

設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結

前言

首先遲到的祝大家中秋快樂。

最近一周多沒有更新了。其實我一直想憋一個大招,分享一些大家感興趣的幹貨。

鑒于最近我個人的工作内容,于是利用這三天小長假憋了一個出來(其實是玩了兩天)。

先簡單說下本次的主題,由于我最近做的是物聯網相關的開發工作,其中就不免會遇到和裝置的互動。

最主要的工作就是要有一個系統來支援裝置的接入、向裝置推送消息;同時還得滿足大量裝置接入的需求。

是以本次分享的内容不但可以滿足物聯網領域同時還支援以下場景:

  • 基于

    WEB

    的聊天系統(點對點、群聊)。
  • WEB

    應用中需求服務端推送的場景。
  • 基于 SDK 的消息推送平台。

技術選型

要滿足大量的連接配接數、同時支援雙全工通信,并且性能也得有保障。

在 Java 技術棧中進行選型首先自然是排除掉了傳統

IO

那就隻有選 NIO 了,在這個層面其實選擇也不多,考慮到社群、資料維護等方面最終選擇了 Netty。

最終的架構圖如下:

設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結

現在看着蒙沒關系,下文一一介紹。

協定解析

既然是一個消息系統,那自然得和用戶端定義好雙方的協定格式。

常見和簡單的是 HTTP 協定,但我們的需求中有一項需要是雙全工的互動方式,同時 HTTP 更多的是服務于浏覽器。我們需要的是一個更加精簡的協定,減少許多不必要的資料傳輸。

是以我覺得最好是在滿足業務需求的情況下定制自己的私有協定,在我這個場景下其實有标準的物聯網協定。

如果是其他場景可以借鑒現在流行的

RPC

架構定制私有協定,使得雙方通信更加高效。

不過根據這段時間的經驗來看,不管是哪種方式都得在協定中預留安全相關的位置。

協定相關的内容就不過讨論了,更多介紹具體的應用。

簡單實作

首先考慮如何實作功能,再來思考百萬連接配接的情況。

注冊鑒權

在做真正的消息上、下行之前首先要考慮的就是鑒權問題。

就像你使用微信一樣,第一步怎麼也得是登入吧,不能無論是誰都可以直接連接配接到平台。

是以第一步得是注冊才行。

如上面架構圖中的

注冊/鑒權

子產品。通常來說都需要用戶端通過

HTTP

請求傳遞一個唯一辨別,背景鑒權通過之後會響應一個

token

,并将這個

token

和用戶端的關系維護到

Redis

或者是 DB 中。

用戶端将這個 token 也儲存到本地,今後的每一次請求都得帶上這個 token。一旦這個 token 過期,用戶端需要再次請求擷取 token。

鑒權通過之後用戶端會直接通過

TCP 長連接配接

到圖中的

push-server

子產品。

這個子產品就是真正處理消息的上、下行。

儲存通道關系

在連接配接接入之後,真正處理業務之前需要将目前的用戶端和 Channel 的關系維護起來。

假設用戶端的唯一辨別是手機号碼,那就需要把手機号碼和目前的 Channel 維護到一個 Map 中。

這點和之前

SpringBoot 整合長連接配接心跳機制

類似。

設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結

同時為了可以通過 Channel 擷取到用戶端唯一辨別(手機号碼),還需要在 Channel 中設定對應的屬性:

public static void putClientId(Channel channel, String clientId) {
    channel.attr(CLIENT_ID).set(clientId);
}           

擷取時手機号碼時:

public static String getClientId(Channel channel) {
    return (String)getAttribute(channel, CLIENT_ID);
}           

這樣當我們用戶端下線的時便可以記錄相關日志:

String telNo = NettyAttrUtil.getClientId(ctx.channel());
NettySocketHolder.remove(telNo);
log.info("用戶端下線,TelNo=" +  telNo);           
這裡有一點需要注意:存放用戶端與 Channel 關系的 Map 最好是預設好大小(避免經常擴容),因為它将是使用最為頻繁同時也是占用記憶體最大的一個對象。

消息上行

接下來則是真正的業務資料上傳,通常來說第一步是需要判斷上傳消息輸入什麼業務類型。

在聊天場景中,有可能上傳的是文本、圖檔、視訊等内容。

是以我們得進行區分,來做不同的處理;這就和用戶端協商的協定有關了。

  • 可以利用消息頭中的某個字段進行區分。
  • 更簡單的就是一個

    JSON

    消息,拿出一個字段用于區分不同消息。

不管是哪種隻有可以區分出來即可。

消息解析與業務解耦

消息可以解析之後便是處理業務,比如可以是寫入資料庫、調用其他接口等。

我們都知道在 Netty 中處理消息一般是在

channelRead()

方法中。

設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結

在這裡可以解析消息,區分類型。

但如果我們的業務邏輯也寫在裡面,那這裡的内容将是巨多無比。

甚至我們分為好幾個開發來處理不同的業務,這樣将會出現許多沖突、難以維護等問題。

是以非常有必要将消息解析與業務處理完全分離開來。

這時面向接口程式設計就發揮作用了。

這裡的核心代碼和

「造個輪子」——cicada(輕量級 WEB 架構)

是一緻的。

都是先定義一個接口用于處理業務邏輯,然後在解析消息之後通過反射建立具體的對象執行其中的

處理函數

即可。

這樣不同的業務、不同的開發人員隻需要實作這個接口同時實作自己的業務邏輯即可。

僞代碼如下:

設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結
設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結

想要了解 cicada 的具體實作請點選這裡:

https://github.com/TogetherOS/cicada

上行還有一點需要注意;由于是基于長連接配接,是以用戶端需要定期發送心跳包用于維護本次連接配接。同時服務端也會有相應的檢查,N 個時間間隔沒有收到消息之後将會主動斷開連接配接節省資源。

這點使用一個

IdleStateHandler

就可實作,更多内容可以檢視

Netty(一) SpringBoot 整合長連接配接心跳機制

消息下行

有了上行自然也有下行。比如在聊天的場景中,有兩個用戶端連上了

push-server

,他們直接需要點對點通信。

這時的流程是:

  • A 将消息發送給伺服器。
  • 伺服器收到消息之後,得知消息是要發送給 B,需要在記憶體中找到 B 的 Channel。
  • 通過 B 的 Channel 将 A 的消息轉發下去。

這就是一個下行的流程。

甚至管理者需要給所有線上使用者發送系統通知也是類似:

周遊儲存通道關系的 Map,挨個發送消息即可。這也是之前需要存放到 Map 中的主要原因。

設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結

具體可以參考:

https://github.com/crossoverJie/netty-action/

分布式方案

單機版的實作了,現在着重講講如何實作百萬連接配接。

百萬連接配接其實隻是一個形容詞,更多的是想表達如何來實作一個分布式的方案,可以靈活的水準拓展進而能支援更多的連接配接。

再做這個事前首先得搞清楚我們單機版的能支援多少連接配接。影響這個的因素就比較多了。

  • 伺服器自身配置。記憶體、CPU、網卡、Linux 支援的最大檔案打開數等。
  • 應用自身配置,因為 Netty 本身需要依賴于堆外記憶體,但是 JVM 本身也是需要占用一部分記憶體的,比如存放通道關系的大

    Map

    。這點需要結合自身情況進行調整。

結合以上的情況可以測試出單個節點能支援的最大連接配接數。

單機無論怎麼優化都是有上限的,這也是分布式主要解決的問題。

架構介紹

在将具體實作之前首先得講講上文貼出的整體架構圖。

設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結

先從左邊開始。

上文提到的

注冊鑒權

子產品也是叢集部署的,通過前置的 Nginx 進行負載。之前也提過了它主要的目的是來做鑒權并傳回一個 token 給用戶端。

但是

push-server

叢集之後它又多了一個作用。那就是得傳回一台可供目前用戶端使用的

push-server

右側的

平台

一般指管理平台,它可以檢視目前的實時線上數、給指定用戶端推送消息等。

推送消息則需要經過一個推送路由(

push-server

)找到真正的推送節點。

其餘的中間件如:Redis、Zookeeper、Kafka、MySQL 都是為了這些功能所準備的,具體看下面的實作。

注冊發現

首先第一個問題則是

注冊發現

push-server

變為多台之後如何給用戶端選擇一台可用的節點是第一個需要解決的。

這塊的内容其實已經在

分布式(一) 搞定服務注冊與發現

中詳細講過了。

所有的

push-server

在啟動時候需要将自身的資訊注冊到 Zookeeper 中。

注冊鑒權

子產品會訂閱 Zookeeper 中的節點,進而可以擷取最新的服務清單。結構如下:

設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結

以下是一些僞代碼:

應用啟動注冊 Zookeeper。

設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結
設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結

對于

注冊鑒權

子產品來說隻需要訂閱這個 Zookeeper 節點:

設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結

路由政策

既然能擷取到所有的服務清單,那如何選擇一台剛好合适的

push-server

給用戶端使用呢?

這個過程重點要考慮以下幾點:

  • 盡量保證各個節點的連接配接均勻。
  • 增删節點是否要做 Rebalance。

首先保證均衡有以下幾種算法:

  • 輪詢。挨個将各個節點配置設定給用戶端。但會出現新增節點配置設定不均勻的情況。
  • Hash 取模的方式。類似于 HashMap,但也會出現輪詢的問題。當然也可以像 HashMap 那樣做一次 Rebalance,讓所有的用戶端重新連接配接。不過這樣會導緻所有的連接配接出現中斷重連,代價有點大。
  • 由于 Hash 取模方式的問題帶來了

    一緻性 Hash

    算法
    ,但依然會有一部分的用戶端需要 Rebalance。
  • 權重。可以手動調整各個節點的負載情況,甚至可以做成自動的,基于監控當某些節點負載較高就自動調低權重,負載較低的可以提高權重。

還有一個問題是:

當我們在重新開機部分應用進行更新時,在該節點上的用戶端怎麼處理?

由于我們有心跳機制,當心跳不通之後就可以認為該節點出現問題了。那就得重新請求

注冊鑒權

子產品擷取一個可用的節點。在弱網情況下同樣适用。

如果這時用戶端正在發送消息,則需要将消息儲存到本地等待擷取到新的節點之後再次發送。

有狀态連接配接

在這樣的場景中不像是 HTTP 那樣是無狀态的,我們得明确的知道各個用戶端和連接配接的關系。

在上文的單機版中我們将這個關系儲存到本地的緩存中,但在分布式環境中顯然行不通了。

比如在平台向用戶端推送消息的時候,它得首先知道這個用戶端的通道儲存在哪台節點上。

借助我們以前的經驗,這樣的問題自然得引入一個第三方中間件用來存放這個關系。

也就是架構圖中的存放

路由關系的 Redis

,在用戶端接入

push-server

時需要将目前用戶端唯一辨別和服務節點的

ip+port

存進

Redis

同時在用戶端下線時候得在 Redis 中删掉這個連接配接關系。

這樣在理想情況下各個節點記憶體中的 map 關系加起來應該正好等于 Redis 中的資料。
設計一個百萬級的消息推送系統前言技術選型協定解析簡單實作分布式方案分布式問題總結

這裡存放路由關系的時候會有并發問題,最好是換為一個

lua

腳本。

推送路由

設想這樣一個場景:管理者需要給最近注冊的用戶端推送一個系統消息會怎麼做?

結合架構圖

假設這批用戶端有 10W 個,首先我們需要将這批号碼通過

平台

下的

Nginx

下發到一個推送路由中。

為了提高效率甚至可以将這批号碼再次分散到每個

push-route

中。

拿到具體号碼之後再根據号碼的數量啟動多線程的方式去之前的路由 Redis 中擷取用戶端所對應的

push-server

再通過 HTTP 的方式調用

push-server

進行真正的消息下發(Netty 也很好的支援 HTTP 協定)。

推送成功之後需要将結果更新到資料庫中,不線上的用戶端可以根據業務再次推送等。

消息流轉

也許有些場景對于用戶端上行的消息非常看重,需要做持久化,并且消息量非常大。

push-sever

做業務顯然不合适,這時完全可以選擇 Kafka 來解耦。

将所有上行的資料直接往 Kafka 裡丢後就不管了。

再由消費程式将資料取出寫入資料庫中即可。

其實這塊内容也很值得讨論,可以先看這篇了解下:

強如 Disruptor 也發生記憶體溢出?

後續談到 Kafka 再做詳細介紹。

分布式問題

分布式解決了性能問題但卻帶來了其他麻煩。

應用監控

比如如何知道線上幾十個

push-server

節點的健康狀況?

這時就得監控系統發揮作用了,我們需要知道各個節點目前的記憶體使用情況、GC。

以及作業系統本身的記憶體使用,畢竟 Netty 大量使用了堆外記憶體。

同時需要監控各個節點目前的線上數,以及 Redis 中的線上數。理論上這兩個數應該是相等的。

這樣也可以知道系統的使用情況,可以靈活的維護這些節點數量。

日志處理

日志記錄也變得異常重要了,比如哪天回報有個用戶端一直連不上,你得知道問題出在哪裡。

最好是給每次請求都加上一個 traceID 記錄日志,這樣就可以通過這個日志在各個節點中檢視到底是卡在了哪裡。

以及 ELK 這些工具都得用起來才行。

總結

本次是結合我日常經驗得出的,有些坑可能在工作中并沒有踩到,所有還會有一些遺漏的地方。

就目前來看想做一個穩定的推送系統其實是比較麻煩的,其中涉及到的點非常多,隻有真正做過之後才會知道。

看完之後覺得有幫助的還請不吝轉發分享。

歡迎關注公衆号一起交流: