.net 業務消息隊列是應用于業務的解耦和分離,應具備分布式,高可靠性,高性能,高實時性,高穩定性,高擴充性等特性。大量的業務消息堆積能力;無單點故障及故障監控,異常提醒;.生産者端負載均衡,故障轉移,故障自動恢複,并行消息插入;.消費者端負載均衡,故障保持,故障自動恢複,并行消息消費;消息高可靠性持久化,較高性能,較高實時性,高穩定性,高擴張性;支援99*99個消息分區,單個消息分區單天支援近1億的消息存儲;消費者拉方式擷取消息,在高并發,大量消息湧入的情況下,隻要消費能力足夠,不會有消息延遲,消息越多性能越好。
開源QQ群: .net 開源基礎服務 238543768
開源位址: http://git.oschina.net/chejiangyi/Dyd.BusinessMQ
## 業務消息隊列 ##
業務消息隊列是應用于業務的解耦和分離,應具備分布式,高可靠性,高性能,高實時性,高穩定性,高擴充性等特性。
## 優點: ##
- 大量的業務消息堆積能力
- 無單點故障及故障監控,異常提醒
- 生産者端負載均衡,故障轉移,故障自動恢複,并行消息插入。
- 消費者端負載均衡,故障保持,故障自動恢複,并行消息消費。
- 消息高可靠性持久化,較高性能,較高實時性,高穩定性,高擴充性。
- 支援99*99個消息分區,單個消息分區單天支援近1億的消息存儲。
- 消費者拉方式擷取消息,在高并發,大量消息湧入的情況下,隻要消費能力足夠,不會有消息延遲,消息越多性能越好。
## 缺點: ##
- 能保證消息順序插入,保證相同分區的消息是順序的(排除網絡延遲),但是多個分區之間的可能是亂序的。
- 消息并行消費或者多個分區并行消費或者負載均衡情況下的,消息消費順序是亂序。
## 缺點原因: ##
- 消息的負載均衡是基于消息的分區存儲,故多個分區之間的消息是亂序的,但是相同分區的消息是順序的。
- 消息的消費者負載均衡也是基于消息的分區進行均衡的,同時單個消費者訂閱多個分區的情況下,也可并行進行消費。意味着不同分區的消息的消費是亂序的,但是相同分區的消息消費是順序的。
## 缺點解決方案: ##
- 生産者自定義負載均衡算法,按照業務次元(使用者,商戶)等進行分區(多個使用者之間可以消息亂序,單個使用者的消息必須是順序的),不同次元可以指向不同的分區,但是單個次元的消息是可以保證順序的。
- 本解決方案在故障的情況下,故障會移除某些故障節點,意味着故障節點會立即報錯(當然也可自己指定故障節點進行轉移,但是轉移的節點消息會被提前消費,故障的消息會在恢複故障後重新消費,這樣也會出現故障程度上的消息亂序消費)。
- 本解決方案線上上無縫擴容和擴充性能方面也會有限制,看要具體的負載均衡算法,但是一般情況下,如果要擴容還是會進行部分消息遷移的情況。
## 問答: ##
### *1.大量的業務消息堆積能力,如何實作?* ###
每個分區表支援約1億的消息存儲,可以通過增加分區表進行擴容。消費者進行消息消費,内部僅保留某個分區上一次消費的指針,是以不會影響消費者。
消息持久化到磁盤,不會在記憶體駐留,理論上不影響記憶體。
### *2.無單點故障及故障監控,異常提醒?* ###
故障一般會發生在redis,資料節點,管理中心,日志中心。
redis節點故障會影響消費者的消息消費響應及時度,一般延遲5s以内。不會影響消息消費速度和消息消費QPS
資料節點故障會影響生産者和消費者的消息,并造成消息暫時丢失(但是都是可恢複的,具體的看資料庫的高可用做到什麼程度)。
生産者端會無縫的進行節點移除,但是會預設1分鐘重新嘗試重連。消費者會持續報錯至日志中,但是不會影響其他分區消費。
管理中心故障會影響生産者和消費者的心跳檢測和新注冊的生産者,消費者,但不會影響生産者和消費者具體的消息存儲和發送接收。
日志中心故障不會影響生産者和消費者,但是影響日志的列印,日志中心故障會通知公司内部監控平台。
雖然故障不會影響線上已有的消息運作,但是還是會在高并發情況下出現性能問題,和系統穩定性,是以一旦發現要重視和及時處理。
### *3.生産者端負載均衡,故障轉移,故障自動恢複,并行消息插入?* ###
預設負載均衡采用多個分區順序輪詢插入,在并發情況下輪詢插入是并行插入到不同分區的;某個資料節點出現故障,會移除相關資料節點的所有分區;
預設1分鐘會重新載入故障分區進行重試。
### *4.消費者端負載均衡,故障保持,故障自動恢複,并行消息消費。* ###
預設消費者端負載均衡是根據消費者訂閱的分區進行的(一個消費者可以訂閱多個分區,多個相同業務的消費者可以訂閱多個不同分區進行負載)。
一個消費者訂閱多個分區,這個消費者可以開啟并行進行多分區消費。并行度=分區數,效果理論上最佳。
分區節點出現故障等,單個分區或者資料節點就會暫停消費,并通知日志中心列印錯誤日志。當故障恢複後,消費繼續進行。
### *5.消息高可靠性持久化,較高性能,較高實時性,高穩定性,高穩定性。* ###
消息傳遞到消息中心後,立即持久化到磁盤,故不會丢失消息。生産者可以采用多個分區進行并行插入,消費者可以采用并行進行消息消費,故理論上性能是可擴充無限量的。
消息是通過拉取的方式擷取的,發送消息會由redis進行即時通知消費者拉取(即時消息預設會合并在500ms内redis通知消息),一般在20ms内消息會被消費掉。
批量拉消息的方式相對push的消息推送方式在高并發和大量消息處理的情況下,消息發送性能應該是更優的。
穩定性是基于資料庫的穩定性和故障轉移層面來確定的,擴充性展現線上上無縫的遷移和擴容。
### *6.支援9999個消息分區,單個消息分區單天支援近1億的消息存儲。* ###
資料節點是01~99個,節點裡面的表分區是01~99個,是以可以支援近1萬個分區節點。單表的mqid最大應該是(1億-1)條,應該滿足一般的業務需求,
若不能滿足,可以通過多個分區的方式擴容。
### *7.消費者拉方式擷取消息,在高并發,大量消息湧入的情況下,隻要消費能力足夠,不會有消息延遲,消息越多性能越好。* ###
push推消息的模式能保證更高的實時性,但是在大量消息的情況下,消息堆積的情況更嚴重,性能會有所影響。
pull拉消息的模式在保證消息實時性方面會略差,但是在大量消息湧入的情況下,批量拉消息效率更加。而且會将消息分發的負載轉移到多個消費者端上。
## 未來改進: ##
1. 未來采用leveldb重寫存儲。
1. 剝離broker服務用于支援相對可靠的消息服務。
1. 消息完成标記本地緩存/持久化(或者存儲redis),每秒送出更新至資料庫,消除頻繁消費導緻的瓶頸。
## 架構示意圖 ##
## 使用demo示例 ##/// <summary>
/// 發送消息
/// </summary>
/// <param name="msg"></param>
public void SendMessageDemo(string msg)
{
//發送字元串示例
var p = ProducterPoolHelper.GetPool(new BusinessMQConfig() {
ManageConnectString = "server=192.168.17.201;Initial
Catalog=dyd_bs_MQ_manage;User ID=sa;Password=Xx~!@#;" },//管理中心資料庫
"dyd.mytest3");//隊列路徑 .分隔,類似類的namespace,是隊列的唯一辨別,要提前告知運維在消息中心注冊,方可使用。
p.SendMessage(@"1");
//發送對象示例
/* var obj = new message2 { text = "文字", num = 1 };
var p = ProducterPoolHelper.GetPool(new BusinessMQConfig() {
ManageConnectString = "server=192.168.17.237;Initial
Catalog=dyd_bs_MQ_manage;User ID=sa;Password=Xx~!@#;" },//管理中心資料庫
"test.diayadian.obj");//隊列路徑 .分隔,類似類的namespace,是隊列的唯一辨別,要提前告知運維在消息中心注冊,方可使用。
p.SendMessage<message>(obj);
*/
}
private ConsumerProvider Consumer;
/// <summary>
/// 接收消息
/// </summary>
/// <param name="action"></param>
public void ReceiveMessageDemo(Action<string> action)
{
if (Consumer == null)
{
Consumer = new ConsumerProvider();
Consumer.Client = "dyd.mytest3.customer1";//clientid,接收消息的(消費者)唯一标示,一旦注冊以後,不能更改,業務下線廢棄後必須要告知運維,删除消費者注冊。
Consumer.ClientName = "用戶端名稱";//這個相對随意些,主要是用來自己識别的,要簡短
Consumer.Config = new BusinessMQConfig() { ManageConnectString =
"server=192.168.17.201;Initial Catalog=dyd_bs_MQ_manage;User
ID=sa;Password=Xx~!@#;" };
Consumer.MaxReceiveMQThread = 1;//并行處理的線程數,一般為1足夠,若消息處理慢,又想并行消費,則考慮 正在使用的分區=并行處理線程數 為并行效率極端最優,但cpu消耗應該不小。
Consumer.MQPath = "dyd.mytest3";//接收的隊列要正确
Consumer.PartitionIndexs = new List<int>() { 1, 2, 3,4, 5, 6, 7, 8 };//消費者訂閱的分區順序号,從1開始
Consumer.RegisterReceiveMQListener<string>((r) =>
{
/*
* 這些編寫業務代碼
* 編寫的時候要注意考慮,業務處理失敗的情況。
* 1.重試失敗n次。
* 2.重試還不行,則标記消息已被處理。然後跳過該消息處理,自己另外文檔記錄這種情況。
* 消息被消費完畢,一定要調用MarkFinished,标記消息被消費完畢。
*/
action.Invoke(r.ObjMsg);
r.MarkFinished();
});
}
}
/// <summary>
/// 關閉消息訂閱連接配接
/// </summary>
public void CloseReceiveMessage()
{
//注冊消費者消息,消費者務必要在程式關閉後關掉(dispose)。否則導緻異常終止,要人工等待連接配接逾時後,方可重新注冊。
if (Consumer != null)
{
Consumer.Dispose();
Consumer = null;
}
}
}
部分截圖
備注:.net開源的消息隊列很少,特别是針對業務的高可靠性的消息隊列;希望這個開源的消息隊列,能夠為.net領域帶來更多解決方案,更多的思路和架構設計;同時也希望了解消息隊列的人能夠給于這個解決方案更多的建議和完善意見。
作者:車江毅
開源是一種态度,分享是一種精神,學習仍需堅持,進步仍需努力,.net生态圈因你我更加美好。