作者:阿裡雲智能IoT事業部 進階技術專家 呂建文
随着接觸客戶越來越多,也越來越颠覆了我對“傳統隊列”(kafka、rocketmq、rabbitmq...)的看法。 當然本文不是說“傳統隊列”做得不好, 這些隊列系統經過多年打磨,在高性能、海量堆積、消息可靠性等諸多方面都已經做得非常極緻了,都做得非常的優秀。 但今天我覺得大家在設計方案時動不動任何一個異步、系統解耦等就來選用隊列,然後線上又頻繁出一些問題,這些問題的背後我們得看看到底什麼場景适合、什麼場景過渡使用了、有沒有更好解法, 尤其今天IoT領域場景複雜,既有面向自身SAAS業務又要承擔多租戶PAAS平台化模式,面臨更多的隊列方面問題,拿來和大家讨論分享。
本文讨論的産品:
阿裡雲物聯網平台IoT使用隊列場景

隊列在IoT領域是個極其重要部件,上到伺服器,下到一個月發一次消息的嵌入式晶片,都需要傳遞事件消息;比如共享充電寶的開櫃子、開燈指令從伺服器發到裝置、工業網關高頻消息流等, 隊列最大意義在于讓整個消息事件在不可控的環境因素變成一個平穩運作的系統。 比如裝置時不時會網絡抖動,某些裝置故障、某些抖動導緻大量消息飙漲。
我們把隊列用于了2個場景,一個是上行,一個是下行;我們在隊列網關上即複用了kafka、rocketmq,又加入了自研的實時隊列,盡量做到了即使在某一個隊列大範圍堆積時候,即時的業務也還是相對可用的。比如充電寶,哪怕使用者的隊列消息大量堆積,新使用者過來控制,也開能打開櫃子,即時在有大量堆積情況也能快速恢複業務,而不是一定要運維同學清理掉堆積才能快速讓業務可用。單純使用kafka或者rocketmq在大量堆積時很可能是個可怕故障。
裝置上行消息,總量qps非常高,通常需要彙集到一組消費者,這個場景适合使用市面上隊列kafka、mq等選型,但在IoT控制裝置場景對實時性要求較高,直接使用kafka等 在堆積時候 那和這一個隊列挂鈎的入口裝置都會有業務影響,并非最佳方式。
裝置下行消息,總量qps不高,一般是下行控制裝置開櫃子等人為觸發操作,這個場景要求到達率很高,而且裝置級别隔離要求也高,不能因為一個裝置消息擁塞幹擾其它裝置控制,這個情況比較适合IM 比如釘釘消息類似的情況,也是我們自研的IoT裝置級隊列。
我們做了哪些優化
1.上下行隔離拆分
下行尤其重要。 比如控制一個裝置,比如一個開鎖、支付成功要下發打開櫃子等等,上行出任何問題,千萬不能影響到下行業務。
2.IoT隊列-裝置級/海量topic
用于下行推送裝置消息情況的隊列。
圖1和圖2對比較明顯, 一個隊列擁塞盡量減少對其它裝置影響。
3.IoT隊列-伺服器訂閱
用于把使用者裝置消息發送給使用者伺服器的隊列。相當于一個隊列網關,我們即支援使用者規則流轉到kafka、rocketmq,也能使用IoT自研的AMQP實時隊列網關。即使在消息有堆積情況,把堆積态的消息降級處理,實時生成消息依然優先發送。
大家用隊列初衷是什麼?
提高業務吞吐、異步化
消峰填谷
系統間解耦
高息可靠、事務、有序消息、最終一緻性 .....
大家都用得非常多 都很了解了,隊列基本上是設計一個分布式系統裡面必備的元件,可能還有更多場景就不多舉了。
線上出現堆積你的反應是什麼?
假如沒有堆積,隊列就失去意義。
大部分使用隊列的系統,可能大家都遭遇過“堆積”的案例。 如果你還沒碰到堆積,那說明比較幸運,你的生産和消費都還比較穩定,也還沒遭受過異常抖動,但是你依然需要為線上系統的穩定性考慮堆積了你的預案是什麼,有什麼樣的影響。
不論是我們自己,還是使用者使用,都面臨過“堆積”案例。我們IoT這邊光内部系統之間流轉,使用了RocketMq(阿裡内部是metaq)有上W級别數量topic,還有個場景是我們要把使用者裝置消息流轉到使用者自己購買的隊列裡面去。
短暫抖動這種堆積一般自我消化。奇怪的現象是,絕大部分出現堆積的場景,比如有個幾十w條堆積的,使用者第一反應是什麼呢?
很多結論是: 有堆積幾乎已經是線上故障了, 需要先清理堆積!
堆積能力強是不是件好事情呢?
1.為什麼需要清理堆積?
a.先說IoT “實時性讓隊列始料不及”:
例子:一個快遞櫃業務的隊列堆積,然後“此時此刻”在櫃子旁邊的使用者死命的在旁邊用手機點開櫃子 怎麼也打不開,(後端系統都恢複了)可是他還是打不開,問題就是隊列裡面還有幾十w條的消息,新來的消息不好意思,要排隊哦。 之前等着的那些消息源源不斷的消費,鬼才知道到底這些消息還有沒有用。 然後這個“此時此刻”的人 又氣急敗壞走了, 再過來下一個“此時此刻”的人。。。 。 是以運維的人就要手工清理大量堆積态消息,先讓等在裝置旁邊的人能打卡他的快遞。
b.即便不是IoT場景, 我相信肯定有同學碰到過線上清理堆積、重置消費點位案例,可能發生于:業務雪崩、快一點恢複業務
丢失一些資料和快速恢複交易 到底哪個重要點... 等了一會,不行啊 客戶都投訴了,壓力山大 還是清理堆積吧, 然後後面再補一個設計資料一緻性方案。
前斷時間活生生碰到個使用者回報情況,kafka在消費端堆積時候寫入延時很高,最後發現由于堆積消息資料需要從硬碟加載冷資料導緻io打高。
2.不需要清理堆積有多少情況?
a.短暫抖動,能自我恢複 (大部分能分鐘内消費完可能都不是問題) -- 這種情況确實是比較多的
b.時間不重要的資料,比如日志
是以, “需要先清理線上堆積” 就是把雙刃劍,沒堆積不是隊列,堆積多了又影響你的業務;不能過渡擁抱堆積。
有辦法優化嗎?
先總結下問題
1.市面上所有隊列承諾的消息實時性,前提都是消費端消費能力足夠
也有同學說把實時的消息topic和其它topic分開不同優先不就行了嗎? 不好意思,這個跑題了,繼續看上面例子把。
2.IoT是一個PAAS平台,内部我可以要求其它團隊消費能力上去; 外部ISV使用者,客戶自己說了算,很難讓每個開發者按照你希望優化的方法去做。
雖然我們也可以做很多租戶隔離措施,甚至跟使用者說,你自己隊列問題,是你們業務處理能力不夠 或者直接說 你們自己有問題。 可是,使用者還是委屈呀,業務出問題就是他得背鍋的。
3.隊列的存儲和核心機制,幾乎都是FIFO (就是先進先出,後面進來的一定要等前面的出去)
3.1 這樣才能有序,要麼是分區有序,要麼是全局有序。
3.2 不論如何 一旦發生堆積,新來的消息一定是要前面堆積的出去了才能被消費。這種情況 一定會導緻後來的消息被動延時。
3.3 注意,有的隊列說的無序消息隻是發送方是同步還是異步無序,每個子消費隊列單元底層還是這個結構運作,底層存儲和分發上如果堆積狀态有新進來消息是追加在堆積後面的;
4.全局保序能降級嗎
在某些場景,保序真的很重要。 但我覺得肯定存在無需保序場景,或者業務上設計時間戳而不是依賴消息的序列也能解決很多場景問題,這2個加起來 至少目前我們了解到的業務80%以上都能滿足。
補充下,現實系統中一個業務流程通常鍊路比較多,要經過ABCD 多個系統->隊列0 ->E/F/G ->隊列1->XYZ。。。 這條完整鍊路才是一筆完整的業務交易,僅僅在一個隊列0這裡做個有序,是不夠的,要從A到Z整個都變成有序,代價是非常高的 已經不單是一個隊列有序就可以解決的問題。
為了保序,隊列産品花了很大開銷,就像上面提到活生生例子 kafka在堆積時候突然消費端恢複導緻kafka需要加載硬碟冷資料把io打崩,原本可以高效的把生産成功的資料直接發送出去,但因為堆積就得先加載硬碟資料,導緻新進去資料沒機會。
目前的隊列産品即使有無序隊列類型,每個消費隊列結構上依然是FIFO的, 并不是設計上的問題,是有史以來“隊列” 的基準。
一個大膽的嘗試-IoT隊列誕生
我們需要一個針對IoT場景,實時優先的“隊列”,而且思路和rocketMq、kafka都不一樣,得開發:
1.實時生成消息優先發送,堆積的消息進入降級模式
保序降級,不保障整體消息序列,僅實時消息相對有序
堆積消息和實時消息是并行任務發送,堆積速率降級
實時消息發送失敗馬上降級為堆積消息
2.海量topic
傳統隊列核心點是 不論堆積多少不影響它的性能;kafka topic一多 原本消息順序寫檔案優勢 就會導緻一個broker要退化到随機寫,失去優勢,另外要zk來協調這麼多topic也是有局限,是以這些隊列本身有提供一個外挂代理橋接器 對外入口是多個裝置topic,再橋接映射到少量的實際kafka topic,這方案有一定可行性,但做不到我們希望的隔離效果 隻是治标。
我們需要的是“海量topic 盡量互相隔離 并且不影響整體性能”,盡量做到裝置A的消息堆積topic,不影響裝置B,上面做法基本上一個實際topic問題 這一面的裝置就影響。 阿裡雲物聯網平台今天首先面臨的是多租戶, 10w條消息有影響,你是影響一個廠商的10w裝置,還是影響1w個廠商 每個廠商10個裝置 完全是不一樣的故障面。 我們要先保租戶隔離,再保裝置隔離,不能完全按裝置Id去分散多個隊列。理想情況是裝置級别的topic數量,那麼就是 億級别topic 了!
面向裝置端海量topic和IM聊天系統稍微類似,是以我們也參考了IM産品碰到過的問題,比如阿裡内部手淘、釘釘,但有個最大差別是已有産品基于自有協定和用戶端,IoT領域做硬體同學可能更體感,端不可控;需要支援開源,也需要IoT的SDK。
最後topic數量也要考慮成本,我們希望是topic數量是幾乎無成本的!
我暫時也沒辦法定義這個産物到底還是不是隊列,但我們得彌補目前隊列一些問題。
總體思路
目前IoT隊列還沒有好名字,對外叫
服務端訂閱,意思就是使用者用伺服器訂閱他們裝置消息,歡迎拍磚。 為了降低接入成本,使用者可以使用AMQP1.0協定接入 符合開源生态。
注意,AMQP本身是個消息協定,隊列可以用AMQP做協定,但協定本身不代表隊列。就像Kafka也可以用http消費,但不代表 http是個kafka隊列。AMQP1.0是最新的标準,不止于隊列本身,它更多是一個消息通信的統一消息标準,而核心是否是隊列還是僅僅作為管道由Provider來實作,AMQP1.0标準被逐漸用于消息網關,而隊列模型隻是網關裡的一個可選實作。
相容傳統隊列和新隊列,交給使用者按場景來推薦選擇,使用者即可選擇使用kafka、mq(ONS、MNS) 也可以選用iot隊列(物聯網平台的服務端訂閱),甚至組合模式,比如按消息特征規則來配置流轉隊列。
IoT隊列的設計思路
設計目标 :實時優先、使用便捷、海量topic支援的 隊列網關、Follow開源用戶端
這套東西其實已經不再是一個大家了解中的“隊列”,不在一個次元。 但是目前隊列産品也在做裝置接入,是以還是給大家一個對比把:
IoT隊列-服務端訂閱部分 | 流轉傳統隊列RocketMQ、KAFKA、AMQP for MQ | |
---|---|---|
堆積能力 | 海量堆積的消息會直接降級,堆積消息不會影響實時消息,優先恢複裝置即時可用性 | 海量堆積不影響性能,突出在性能點。 當同一個隊列有堆積時,實時生成的消息一般會排在隊末尾,直到堆積的邏輯處理完,或者進入了死信消息 |
實時處理能力 | 實時和堆積處理分離; 即使有堆積,實時消息的也會先推送,是以實時性會相對更優 | 隊列在堆積時,實時消息一生成就會變成堆積排隊 ,消息實時性要求“消費端必須快速恢複消費能力” |
功能對比 | 雲監控、消費者狀态、軌迹、多消費組、海量topic、不保序 -- (功能簡單) | 消息監控、消費者狀态、軌迹、多消費組、有限topic、可保序(或分區有序)、事務消息、廣播消息、死信等 -- (更豐富)Q |
為什麼更适合IoT實時優先場景 | 1.出現海量堆積時,裝置實時控制依然能保持一定可用性,無需特别懼怕堆積;2.海量topic,部分裝置消息擁塞不要影響整個隊列 | 1. 出現海量堆積時,使用者第一反應通常是需要人工運維去清理堆積,才能讓系統恢複可用;堆積是個“可怕”的線上問題 ;2.同一個業務場景要用多個topic去拆分來模拟實作消息優先級,一個topic堆積還是會出現多個裝置受到影響;3.外挂邏輯裝置多個topic映射到内部有限topic,治标不治本 |
性能差異 | 依賴分布式存儲OTS/HBASE,broker自己不做存儲 單機性能較弱 ,需要利用叢集發揮性能;堆積态消息性能有限,取決底層存儲單分區掃描能力*分區數; | broker本地異步/同步刷盤,IO性能非常高, 單機即可達到非常高QPS ;單機性能非常優秀; |
成本對比 | 1.topic無開銷,海量topic時成本較低、隔離性較好;2.消息QPS較高時成本比kafka高;3.不利于小型化部署 依賴多 | 1.topic因為要預配置設定資源,有成本開銷,有數量限制;“海量topic”通過 合用一個topic實作 底層無法較好隔離;2.Kafka按帶寬收費成本最低;3.小型化部署成本低,依賴少 |
消息模式
這個圖隻是最基本片段,從這個模式可以看出來機制差别,大家都沒有錯,出發點不同。
消息政策-推拉結合
這個應該是隊列的核心難點之一,和傳統隊列區分在于,我們考慮為平台化模式,獨享資源過于昂貴。 但帶來問題是消費端不可控,是以使用結合模式,隻有在消費者線上時會拉取堆積消息,而拉取是由AMQP隊列網關來做,給到使用者接口始終是推送過去的onMessage回調。
- broker不是直接讓consumer來連接配接,而是把隊列網關剝離出來, 這樣會更靈活,甚至對于部分使用者 我們的queue可以切換到ons、kafka等實作。 kafka、rocketmq做法是在連接配接時會配置設定給用戶端一個broker接入位址。
2.消息高可用:broker實時消息優先推送給consumer,失敗才會落到queue ;這是一個完整事件,如果沒有完成 則不給producer commit。如果當機,需要發送方逾時來決定重試。
3.異步ACK
線性擴充-離線消息部分
實時部分消息采用推方式,基本上不會成為瓶頸,消費不過來消息進入堆積模式。
底層使用KV存儲(
阿裡雲自研表格存儲産品ots、hbase)已經解決存儲的擴充,剩餘主要問題如何消除寫入熱點和消費熱點,這樣broker幾乎可以做到無狀态水準擴充。
1.為了更快速友善實作cleansession、清理堆積、消費點位等特性,消息id借鑒了snowflake 使用非自然序自增ID ,這麼做好處是 如果需要清理離線消息,那麼生成一個目前時間的id即可。
2.消費記錄按租戶的消費者ID (cid和sharding分區值決定),即同一個租戶消費組,可以調整sharding數量,一個sharding分區 是一個單線程任務處理,是以離線消息總消費速率取決于sharding數量,為了謹慎防止熱點,目前一個sharding跑1000QPS。 這裡碰到過一些坑,比如寫前讀,尤其消費狀态更新問題。阿裡雲表格存儲在自增ID上做了不少優化, Hbase版本目前正在驗證。
3.實時消息“看起來沒落盤 會丢嗎” ,不是不落,是先推後看場景落。 每個發送給消費者的消息,都需要對方ack,如果異常或者逾時沒有答複 ,一個是落到堆積消息; 另外一個是這個過程如果都失敗,将直接告訴發送方失敗。
如何解決海量topic問題
首先面對“大量”的問題 一般都是考慮分區,單元化,分組等隔離和拆分,這裡海量topic我們讨論針對一個單執行個體模式下如何盡可能做到更多topic,完全任意數量都能100%沒問題肯定是不現實的。
海量的标準
這個标準起碼一個執行個體(4c8g ecs虛拟機)支撐10W裝置規格,每個裝置最多50topic,那麼就是500w topic/起步。
由于broker和存儲已經隔離,broker和topic已經沒有什麼關系,或者說任何topic資料生成,broker做的事情就是寫入和分發。
1.海量topic,每個topic有限數量訂閱: topic和訂閱者關系使用ots存儲加載redis或本地緩存,針對mqtt topic比對有個topic tree的樹算法,hivemq有實作版本, 詳情參考
https://github.com/hivemqTopicTreeImpl.java
2.單個topic 海量訂閱: 這個場景其實是多點傳播和廣播, 我們不會考慮在隊列本身上面去做這個事情,而是在上層封裝廣播元件來協調任務和批量發送。 簡單說就是每個消費者永遠訂閱自己topic,或者每個人都是自己的收件箱,你不能直接拿别人的;但是我可以一份消息發到多個人的收件箱(topic)裡面去。如果是100w topic數量級别,一般使用上面的topic tree能在本地記憶體構造快速比對。但平台化這個量級是不夠的,是以全局的海量廣播需要在上層建構任務系統來做。這個廣播目前還在建設中。
回壓
實時消息推送fail,将會進入堆積,如果逾時或操作失敗,由于producer沒有收到commit可以決定重試。假如裝置是Mqtt協定可以利用qos1,如果borker未推送成功或者寫入堆積失敗 不給producer ack,那麼producer會重試。但是解決不了其它協定或者qos0消息, 這裡還有個優化點是在發送方網關來做推送消息的failover ack機制。 由于producer的用戶端是開源sdk,不受管控 這個地方是一個局限點。
尤其面向碎片化的裝置端協定,回壓不能完全依賴裝置端機制,有了發送方網關後,我們可以在這裡幹些活。篇幅有限這裡先不展開了。
監控運維
這塊雲上內建雲監控、專有雲使用雲生态Prometheus等構造
未來探索點
隊列網關除了支援AMQP,也支援KAFKA協定,面向開發者程式設計接口更貼切。 (注意隻是kafka的消息protocol,我們絕對沒有能力去做一個一樣的kafka隊列)
總結
IoT場景的隊列實踐,在現有mq隊列、kafka隊列融合互補之外,加了種自有的實時優先隊列實作;同時加入了隊列網關代理,即能讓使用者選擇市面上隊列,也可以選擇輕便的IoT隊列。可能我把這個也叫“隊列”不是很貼切,目前隻是實作了基本功能,還需要進一步完善和打磨。 再次強調,本文沒有說目前隊列做得不好,我們系統内部環節也在享受已有隊列産品的服務,比如我們系統内部自己某些分支的FO就可能利用了rocketmq或者kafka,隻是在某些場景化方向 我們做了些微小的探索,希望能解決更多使用者問題。