前言
本人覺得碼農的技術提升應該是從how to do到why do,而項目或産品都是從why do到how to do,按題來,是以呢下面先從大的方面介紹一下消息隊列。
消息隊列是分布式高并發面目中必不可少的一部分,随着網際網路、雲計算、大資料的使用,消息隊列的應用越來越多,消息隊列在系統的可伸縮性、穩定性、提升吞吐量等方面有着顯著的作用;它主要的作用一般如下:
1.通過異步處理提高系統性能
<img src="https://pic3.zhimg.com/50/v2-223c09f60af24f2b9eb10d1b60774109_hd.jpg" data-caption="" data-size="normal" data-rawwidth="910" data-rawheight="350" class="origin_image zh-lightbox-thumb" width="910" data-original="https://pic3.zhimg.com/v2-223c09f60af24f2b9eb10d1b60774109_r.jpg"> 如上圖,在不使用消息隊列伺服器的時候,使用者的請求資料直接寫入資料庫,在高并發的情況下資料庫壓力劇增,使得響應速度變慢。但是在使用消息隊列之後,使用者的請求資料發送給消息隊列之後立即 傳回,再由消息隊列的消費者程序從消息隊列中擷取資料,異步寫入資料庫。由于消息隊列伺服器處理速度快于資料庫(消息隊列也比資料庫有更好的伸縮性),是以響應速度得到大幅改善。
通過以上分析我們可以得出消息隊列具有很好的削峰作用的功能——即通過異步處理,将短時間高并發産生的事務消息存儲在消息隊列中,進而削平高峰期的并發事務。 舉例:在電子商務一些秒殺、促銷活動中,合理使用消息隊列可以有效抵禦促銷活動剛開始大量訂單湧入對系統的沖擊。如下圖所示:
<img src="https://pic3.zhimg.com/50/v2-6b95a9281690c3d190f8bf84312ee536_hd.jpg" data-caption="" data-size="normal" data-rawwidth="780" data-rawheight="384" class="origin_image zh-lightbox-thumb" width="780" data-original="https://pic3.zhimg.com/v2-6b95a9281690c3d190f8bf84312ee536_r.jpg">
因為使用者請求資料寫入消息隊列之後就立即傳回給使用者了,但是請求資料在後續的業務校驗、寫資料庫等操作中可能失敗。是以使用消息隊列進行異步處理之後,需要适當修改業務流程進行配合,比如使用者在送出訂單之後,訂單資料寫入消息隊列,不能立即傳回使用者訂單送出成功,需要在消息隊列的訂單消費者程序真正處理完該訂單之後,甚至出庫後,再通過電子郵件或短信通知使用者訂單成功,以免交易糾紛。這就類似我們平時手機訂火車票和電影票。
2.降低系統耦合性
我們知道子產品分布式部署以後聚合方式通常有兩種:1.分布式消息隊列和2.分布式服務。先來簡單說一下分布式服務:目前使用比較多的用來建構SOA(Service Oriented Architecture面向服務體系結構)的分布式服務架構是阿裡巴巴開源的Dubbo.如果想深入了解Dubbo的可以看我寫的關于Dubbo的這一篇文章:《高性能優秀的服務架構-dubbo介紹》:https://juejin.im/post/5acadeb1f265da2375072f9c 再來談我們的分布式消息隊列:我們知道如果子產品之間不存在直接調用,那麼新增子產品或者修改子產品就對其他子產品影響較小,這樣系統的可擴充性無疑更好一些。
我們最常見的事件驅動架構類似生産者消費者模式,在大型網站中通常用利用消息隊列實作事件驅動結構。如下圖所示:
消息隊列使利用釋出-訂閱模式工作,消息發送者(生産者)釋出消息,一個或多個消息接受者(消費者)訂閱消息。 從上圖可以看到消息發送者(生産者)和消息接受者(消費者)之間沒有直接耦合,消息發送者将消息發送至分布式消息隊列即結束對消息的處理,消息接受者從分布式消息隊列擷取該消息後進行後續處理,并不需要知道該消息從何而來。對新增業務,隻要對該類消息感興趣,即可訂閱該消息,對原有系統和業務沒有任何影響,進而實作網站業務的可擴充性設計。
消息接受者對消息進行過濾、處理、包裝後,構造成一個新的消息類型,将消息繼續發送出去,等待其他消息接受者訂閱該消息。是以基于事件(消息對象)驅動的業務架構可以是一系列流程。
另外為了避免消息隊列伺服器當機造成消息丢失,會将成功發送到消息隊列的消息存儲在消息生産者伺服器上,等消息真正被消費者伺服器處理後才删除消息。在消息隊列伺服器當機後,生産者伺服器會選擇分布式消息隊列伺服器叢集中的其他伺服器釋出消息。
前面說了這麼多消息隊列的重要性、使用場景、工作模式,有很多人就可能會說了,現有的ActiveMQ、RabbitMQ、Kafka、RocketMQ等多了去了,那在項目架構的時候選一個用上去就不行了,完全沒有必要重複造輪子啊!本人認為對于重複造輪子的事情和其它任何事情都是一樣的——任何事情沒有絕對的好處或者壞處,比如是剛入門的碼農、又或者很急的項目,完全可以選用現有一種通用的、成熟的産品,沒必要去從零開始做;實際上沒有任何一個優秀的産品全部使用三方的産品來組裝完成的,任何一個好一點的項目發展到一定的時候都不約而同的進行底層開發。原因很簡單:第一個任何通用型的産品總用功能覆寫不到的場景;第二個任何通用型的産品為了實作通用必将做了一些性能或架構的犧牲;現在道理都講完了,開始動手了(都聽你逼半天,能動手就盡量少逼逼!)。
概述
動手前先構思一下,本人需要一個簡單的、可釋出訂閱的、高吞吐量的消息隊列,并将之簡單大的方面分成QServer、QClient;QServer主要有Exchange、Binding、MessageQueue構成;QClient和QServer共用一套相同的傳輸編解碼器QCoder ,主要實作Publish、Subscribe、Unsubcribe、Closes等功能;先想這麼多,開幹!
Exchange
主要在QServer中提供釋出、訂閱、連接配接、隊列資訊等管理
1 /****************************************************************************
2 *Copyright (c) 2018 Microsoft All Rights Reserved.
3 *CLR版本: 4.0.30319.42000
4 *機器名稱:WENLI-PC
5 *公司名稱:Microsoft
6 *命名空間:SAEA.QueueSocket.Model
7 *檔案名: Exchange
8 *版本号: V1.0.0.0
9 *唯一辨別:6a576aad-edcc-446d-b7e5-561a622549bf
10 *目前的使用者域:WENLI-PC
11 *建立人: yswenli
12 *電子郵箱:[email protected]
13 *建立時間:2018/3/5 16:36:44
14 *描述:
15 *
16 *=====================================================================
17 *修改标記
18 *修改時間:2018/3/5 16:36:44
19 *修改人: yswenli
20 *版本号: V1.0.0.0
21 *描述:
22 *
23 *****************************************************************************/
24
25 using SAEA.Commom;
26 using SAEA.Sockets.Interface;
27 using System;
28 using System.Collections.Generic;
29 using System.Linq;
30 using System.Text;
31 using System.Threading;
32 using System.Threading.Tasks;
33
34 namespace SAEA.QueueSocket.Model
35 {
36 class Exchange : ISyncBase
37 {
38 object _syncLocker = new object();
39
40 public object SyncLocker
41 {
42 get
43 {
44 return _syncLocker;
45 }
46 }
47
48 long _pNum = 0;
49
50 long _cNum = 0;
51
52 long _inNum = 0;
53
54 long _outNum = 0;
55
56 private Binding _binding;
57
58 private MessageQueue _messageQueue;
59
60 public Exchange()
61 {
62 this._binding = new Binding();
63
64 this._messageQueue = new MessageQueue();
65 }
66
67
68 public void AcceptPublish(string sessionID, QueueResult pInfo)
69 {
70 lock (_syncLocker)
71 {
72 this._binding.Set(sessionID, pInfo.Name, pInfo.Topic);
73
74 this._messageQueue.Enqueue(pInfo.Topic, pInfo.Data);
75
76 _pNum = this._binding.GetPublisherCount();
77
78 Interlocked.Increment(ref _inNum);
79 }
80 }
81
82 public void AcceptPublishForBatch(string sessionID, QueueResult[] datas)
83 {
84 if (datas != null)
85 {
86 foreach (var data in datas)
87 {
88 if (data != null)
89 {
90 AcceptPublish(sessionID, data);
91 }
92 }
93 }
94 }
95
96
97 public void GetSubscribeData(string sessionID, QueueResult sInfo, int maxSize = 500, int maxTime = 500, Action<List<string>> callBack = null)
98 {
99 lock (_syncLocker)
100 {
101 var result = this._binding.GetBingInfo(sInfo);
102
103 if (result == null)
104 {
105 this._binding.Set(sessionID, sInfo.Name, sInfo.Topic, false);
106
107 _cNum = this._binding.GetSubscriberCount();
108
109 Task.Factory.StartNew(() =>
110 {
111 while (this._binding.Exists(sInfo))
112 {
113 var list = this._messageQueue.DequeueForList(sInfo.Topic, maxSize, maxTime);
114 if (list != null)
115 {
116 list.ForEach(i => { Interlocked.Increment(ref _outNum); });
117 callBack?.Invoke(list);
118 list.Clear();
119 list = null;
120 }
121 }
122 });
123 }
124 }
125 }
126
127 public void Unsubscribe(QueueResult sInfo)
128 {
129 Interlocked.Decrement(ref _cNum);
130 this._binding.Del(sInfo.Name, sInfo.Topic);
131 }
132
133 public void Clear(string sessionID)
134 {
135 lock (_syncLocker)
136 {
137 var data = this._binding.GetBingInfo(sessionID);
138
139 if (data != null)
140 {
141 if (data.Flag)
142 {
143 Interlocked.Decrement(ref _pNum);
144 }
145 else
146 {
147 Interlocked.Decrement(ref _cNum);
148 }
149 this._binding.Remove(sessionID);
150 }
151 }
152 }
153
154 public Tuple<long, long, long, long> GetConnectInfo()
155 {
156 return new Tuple<long, long, long, long>(_pNum, _cNum, _inNum, _outNum);
157 }
158
159 public List<Tuple<string, long>> GetQueueInfo()
160 {
161 List<Tuple<string, long>> result = new List<Tuple<string, long>>();
162 lock (_syncLocker)
163 {
164 var list = this._messageQueue.ToList();
165 if (list != null)
166 {
167 var tlts = list.Select(b => b.Topic).Distinct().ToList();
168
169 if (tlts != null)
170 {
171 foreach (var topic in tlts)
172 {
173 var count = this._messageQueue.GetCount(topic);
174 var t = new Tuple<string, long>(topic, count);
175 result.Add(t);
176 }
177 tlts.Clear();
178 }
179 list.Clear();
180 }
181 }
182 return result;
183 }
184
185 }
186 }
思維發散:這裡可以增加全局消息隊列、指定連接配接消息隊列等;将連接配接通過類型redis cluster模式進行一個均衡分布等
Binding
主要功能是将連接配接、主題進行映射管理
1 /****************************************************************************
2 *Copyright (c) 2018 Microsoft All Rights Reserved.
3 *CLR版本: 4.0.30319.42000
4 *機器名稱:WENLI-PC
5 *公司名稱:Microsoft
6 *命名空間:SAEA.QueueSocket.Model
7 *檔案名: Binding
8 *版本号: V1.0.0.0
9 *唯一辨別:7472dabd-1b6a-4ffe-b19f-2d1cf7348766
10 *目前的使用者域:WENLI-PC
11 *建立人: yswenli
12 *電子郵箱:[email protected]
13 *建立時間:2018/3/5 17:10:19
14 *描述:
15 *
16 *=====================================================================
17 *修改标記
18 *修改時間:2018/3/5 17:10:19
19 *修改人: yswenli
20 *版本号: V1.0.0.0
21 *描述:
22 *
23 *****************************************************************************/
24
25 using SAEA.Commom;
26 using SAEA.Sockets.Interface;
27 using System;
28 using System.Collections.Generic;
29 using System.Linq;
30 using System.Text;
31
32 namespace SAEA.QueueSocket.Model
33 {
34 /// <summary>
35 /// 連接配接與主題的映射
36 /// </summary>
37 class Binding : ISyncBase, IDisposable
38 {
39 List<BindInfo> _list = new List<BindInfo>();
40
41 object _syncLocker = new object();
42
43 public object SyncLocker
44 {
45 get
46 {
47 return _syncLocker;
48 }
49 }
50
51 bool _isDisposed = false;
52
53 int _minutes = 10;
54
55 public Binding(int minutes = 10)
56 {
57 _minutes = minutes;
58
59 ThreadHelper.PulseAction(() =>
60 {
61 lock (_syncLocker)
62 {
63 var list = _list.Where(b => b.Expired <= DateTimeHelper.Now).ToList();
64 if (list != null)
65 {
66 list.ForEach(item =>
67 {
68 _list.Remove(item);
69 });
70 list.Clear();
71 list = null;
72 }
73 }
74 }, new TimeSpan(0, 0, 10), _isDisposed);
75 }
76
77
78 public void Set(string sessionID, string name, string topic, bool isPublisher = true)
79 {
80
81 lock (_syncLocker)
82 {
83 var result = _list.FirstOrDefault(b => b.Name == name && b.Topic == topic);
84 if (result == null)
85 {
86 _list.Add(new BindInfo()
87 {
88 SessionID = sessionID,
89 Name = name,
90 Topic = topic,
91 Flag = isPublisher,
92 Expired = DateTimeHelper.Now.AddMinutes(_minutes)
93 });
94 }
95 else
96 {
97 result.Expired = DateTimeHelper.Now.AddMinutes(_minutes);
98 }
99 }
100 }
101
102 public void Del(string sessionID, string topic)
103 {
104 lock (_syncLocker)
105 {
106 var result = _list.FirstOrDefault(b => b.Name == sessionID && b.Topic == topic);
107 if (result != null)
108 {
109 _list.Remove(result);
110 }
111 }
112 }
113
114 public void Remove(string sessionID)
115 {
116 lock (_syncLocker)
117 {
118 var result = _list.Where(b => b.SessionID == sessionID).ToList();
119 if (result != null)
120 {
121 result.ForEach((item) =>
122 {
123 _list.Remove(item);
124 });
125 result.Clear();
126 }
127 }
128 }
129
130 public BindInfo GetBingInfo(QueueResult sInfo)
131 {
132 lock (_syncLocker)
133 {
134 var bi = _list.FirstOrDefault(b => b.Name == sInfo.Name && b.Topic == sInfo.Topic);
135
136 if (bi != null)
137 {
138 if (bi.Expired <= DateTimeHelper.Now)
139 {
140 Remove(bi.SessionID);
141 }
142 else
143 {
144 return bi;
145 }
146 }
147 return null;
148 }
149 }
150
151 public BindInfo GetBingInfo(string sessionID)
152 {
153 lock (_syncLocker)
154 {
155 return _list.FirstOrDefault(b => b.SessionID == sessionID);
156 }
157 }
158
159 public bool Exists(QueueResult sInfo)
160 {
161 lock (_syncLocker)
162 {
163 var data = _list.FirstOrDefault(b => b.Name == sInfo.Name && b.Topic == sInfo.Topic);
164
165 if (data != null)
166 {
167 if (data.Expired <= DateTimeHelper.Now)
168 {
169 Remove(data.SessionID);
170
171 return false;
172 }
173
174 data.Expired = DateTimeHelper.Now.AddMinutes(_minutes);
175
176 return true;
177 }
178 }
179 return false;
180 }
181
182
183 public IEnumerable<BindInfo> GetPublisher()
184 {
185 lock (_syncLocker)
186 {
187 return _list.Where(b => b.Flag);
188 }
189 }
190
191 public int GetPublisherCount()
192 {
193 lock (_syncLocker)
194 {
195 return _list.Where(b => b.Flag).Count();
196 }
197 }
198
199 public IEnumerable<BindInfo> GetSubscriber()
200 {
201 lock (_syncLocker)
202 {
203 return _list.Where(b => !b.Flag);
204 }
205 }
206
207 public int GetSubscriberCount()
208 {
209 lock (_syncLocker)
210 {
211 return _list.Where(b => !b.Flag).Count();
212 }
213 }
214
215
216 public void Dispose()
217 {
218 _isDisposed = true;
219 lock (_syncLocker)
220 {
221 _list.Clear();
222 _list = null;
223 }
224 }
225 }
226 }
思維發散:實作多個QServer的主題與隊列映射克隆、或者隊列消息轉發實作容災叢集或大容量叢集等
MessageQueue
将主題與隊列形成一個映射,并對主題映射進行管理
1 /****************************************************************************
2 *Copyright (c) 2018 Microsoft All Rights Reserved.
3 *CLR版本: 4.0.30319.42000
4 *機器名稱:WENLI-PC
5 *公司名稱:Microsoft
6 *命名空間:SAEA.QueueSocket.Model
7 *檔案名: QueueCollection
8 *版本号: V1.0.0.0
9 *唯一辨別:89a65c12-c4b3-486b-a933-ad41c3db6621
10 *目前的使用者域:WENLI-PC
11 *建立人: yswenli
12 *電子郵箱:[email protected]
13 *建立時間:2018/3/6 10:31:11
14 *描述:
15 *
16 *=====================================================================
17 *修改标記
18 *修改時間:2018/3/6 10:31:11
19 *修改人: yswenli
20 *版本号: V1.0.0.0
21 *描述:
22 *
23 *****************************************************************************/
24
25 using SAEA.Commom;
26 using SAEA.Sockets.Interface;
27 using System;
28 using System.Collections.Concurrent;
29 using System.Collections.Generic;
30 using System.Linq;
31 using System.Threading.Tasks;
32
33 namespace SAEA.QueueSocket.Model
34 {
35 public class MessageQueue : ISyncBase, IDisposable
36 {
37 bool _isDisposed = false;
38
39 ConcurrentDictionary<string, QueueBase> _list;
40
41 object _syncLocker = new object();
42
43 public object SyncLocker
44 {
45 get
46 {
47 return _syncLocker;
48 }
49 }
50
51 public MessageQueue()
52 {
53 _list = new ConcurrentDictionary<string, QueueBase>();
54
55 ThreadHelper.Run(() =>
56 {
57 while (!_isDisposed)
58 {
59 var list = _list.Values.Where(b => b.Expired <= DateTimeHelper.Now);
60 if (list != null)
61 {
62 foreach (var item in list)
63 {
64 if (item.Length == 0)
65 {
66 _list.TryRemove(item.Topic, out QueueBase q);
67 }
68 }
69 }
70 ThreadHelper.Sleep(10000);
71 }
72 }, true, System.Threading.ThreadPriority.Highest);
73 }
74
75
76 public void Enqueue(string topic, string data)
77 {
78 var queue = _list.Values.FirstOrDefault(b => b.Topic.Equals(topic));
79 lock (_syncLocker)
80 {
81 if (queue == null)
82 {
83 queue = new QueueBase(topic);
84 _list.TryAdd(topic, queue);
85 }
86 }
87 queue.Enqueue(data);
88 }
89
90
91 public string Dequeue(string topic)
92 {
93 var queue = _list.Values.FirstOrDefault(b => b.Topic.Equals(topic));
94 if (queue != null)
95 {
96 return queue.Dequeue();
97 }
98 return null;
99 }
100
101 /// <summary>
102 /// 批量讀取資料
103 /// </summary>
104 /// <param name="topic"></param>
105 /// <param name="maxSize"></param>
106 /// <param name="maxTime"></param>
107 /// <returns></returns>
108 public List<string> DequeueForList(string topic, int maxSize = 500, int maxTime = 500)
109 {
110 List<string> result = new List<string>();
111 bool running = true;
112 var m = 0;
113 var task = Task.Factory.StartNew(() =>
114 {
115 while (running)
116 {
117 var data = Dequeue(topic);
118 if (data != null)
119 {
120 result.Add(data);
121 m++;
122 if (m == maxSize)
123 {
124 running = false;
125 }
126 }
127 else
128 {
129 ThreadHelper.Sleep(1);
130 }
131 }
132 });
133 Task.WaitAll(new Task[] { task }, maxTime);
134 running = false;
135 return result;
136 }
137
138 public string BlockDequeue(string topic)
139 {
140 var queue = _list.Values.FirstOrDefault(b => b.Topic == topic);
141 if (queue != null)
142 {
143 return queue.BlockDequeue();
144 }
145 return null;
146 }
147
148 public List<QueueBase> ToList()
149 {
150 lock (_syncLocker)
151 {
152 return _list.Values.ToList();
153 }
154 }
155
156 public long GetCount(string topic)
157 {
158 var queue = _list.Values.FirstOrDefault(b => b.Topic == topic);
159 if (queue != null)
160 return queue.Length;
161 return 0;
162 }
163
164 public void Dispose()
165 {
166 _isDisposed = true;
167 _list.Clear();
168 _list = null;
169 }
170 }
171 }
思維發散:增加硬碟持久化以實作down機容災、增加ack确認再移除以實作高可靠性等
QCoder
在QServer和QClient之間進行傳輸編解碼,這個編解碼的速度直接影響消息隊列的傳輸性能;本人使用了2種方案:1.使用類似redis傳輸方案,使用回車作為分隔符方式,這種方案結果要麼一個位元組一個位元組檢查分隔符,這種for操作還是C、C++屌,C#做這個真心不行;要麼先将位元組數組通過Encoding轉換成String再來for,雖說能提升幾倍性能,但是遇到不完整的位元組數組時,本人沒有找一個好的方法。2.使用自定義類型+長度+内容這種格式
1 /****************************************************************************
2 *Copyright (c) 2018 Microsoft All Rights Reserved.
3 *CLR版本: 4.0.30319.42000
4 *機器名稱:WENLI-PC
5 *公司名稱:Microsoft
6 *命名空間:SAEA.QueueSocket.Net
7 *檔案名: QCoder
8 *版本号: V1.0.0.0
9 *唯一辨別:88f5a779-8294-47bc-897b-8357a09f2fdb
10 *目前的使用者域:WENLI-PC
11 *建立人: yswenli
12 *電子郵箱:[email protected]
13 *建立時間:2018/3/5 18:01:56
14 *描述:
15 *
16 *=====================================================================
17 *修改标記
18 *修改時間:2018/3/5 18:01:56
19 *修改人: yswenli
20 *版本号: V1.0.0.0
21 *描述:
22 *
23 *****************************************************************************/
24
25 using SAEA.Commom;
26 using SAEA.QueueSocket.Model;
27 using SAEA.QueueSocket.Type;
28 using SAEA.Sockets.Interface;
29 using System;
30 using System.Collections.Generic;
31 using System.Text;
32
33 namespace SAEA.QueueSocket.Net
34 {
35 public sealed class QCoder : ICoder
36 {
37 static readonly int MIN = 1 + 4 + 4 + 0 + 4 + 0 + 0;
38
39 private List<byte> _buffer = new List<byte>();
40
41 private object _locker = new object();
42
43 public void Pack(byte[] data, Action<DateTime> onHeart, Action<ISocketProtocal> onUnPackage, Action<byte[]> onFile)
44 {
45
46 }
47
48 /// <summary>
49 /// 隊列編解碼器
50 /// </summary>
51 public QueueCoder QueueCoder { get; set; } = new QueueCoder();
52
53 /// <summary>
54 /// 包解析
55 /// </summary>
56 /// <param name="data"></param>
57 /// <param name="OnQueueResult"></param>
58 public void GetQueueResult(byte[] data, Action<QueueResult> OnQueueResult)
59 {
60 lock (_locker)
61 {
62 try
63 {
64 _buffer.AddRange(data);
65
66 if (_buffer.Count > (1 + 4 + 4 + 0 + 4 + 0 + 0))
67 {
68 var buffer = _buffer.ToArray();
69
70 QCoder.Decode(buffer, (list, offset) =>
71 {
72 if (list != null)
73 {
74 foreach (var item in list)
75 {
76 OnQueueResult?.Invoke(new QueueResult()
77 {
78 Type = (QueueSocketMsgType)item.Type,
79 Name = item.Name,
80 Topic = item.Topic,
81 Data = item.Data
82 });
83 }
84 _buffer.RemoveRange(0, offset);
85 }
86 });
87 }
88 }
89 catch (Exception ex)
90 {
91 ConsoleHelper.WriteLine("QCoder.GetQueueResult error:" + ex.Message + ex.Source);
92 _buffer.Clear();
93 }
94 }
95 }
96
97
98
99 /// <summary>
100 /// socket 傳輸位元組編碼
101 /// 格式為:1+4+4+x+4+x+4
102 /// </summary>
103 /// <param name="queueSocketMsg"></param>
104 /// <returns></returns>
105 public static byte[] Encode(QueueSocketMsg queueSocketMsg)
106 {
107 List<byte> list = new List<byte>();
108
109 var total = 4 + 4 + 4;
110
111 var nlen = 0;
112
113 var tlen = 0;
114
115 byte[] n = null;
116 byte[] tp = null;
117 byte[] d = null;
118
119 if (!string.IsNullOrEmpty(queueSocketMsg.Name))
120 {
121 n = Encoding.UTF8.GetBytes(queueSocketMsg.Name);
122 nlen = n.Length;
123 total += nlen;
124 }
125 if (!string.IsNullOrEmpty(queueSocketMsg.Topic))
126 {
127 tp = Encoding.UTF8.GetBytes(queueSocketMsg.Topic);
128 tlen = tp.Length;
129 total += tlen;
130 }
131 if (!string.IsNullOrEmpty(queueSocketMsg.Data))
132 {
133 d = Encoding.UTF8.GetBytes(queueSocketMsg.Data);
134 total += d.Length;
135 }
136
137 list.Add(queueSocketMsg.Type);
138 list.AddRange(BitConverter.GetBytes(total));
139 list.AddRange(BitConverter.GetBytes(nlen));
140 if (nlen > 0)
141 list.AddRange(n);
142 list.AddRange(BitConverter.GetBytes(tlen));
143 if (tlen > 0)
144 list.AddRange(tp);
145 if (d != null)
146 list.AddRange(d);
147 var arr = list.ToArray();
148 list.Clear();
149 return arr;
150 }
151
152 /// <summary>
153 /// socket 傳輸位元組解碼
154 /// </summary>
155 /// <param name="data"></param>
156 /// <param name="onDecode"></param>
157 public static bool Decode(byte[] data, Action<QueueSocketMsg[], int> onDecode)
158 {
159 int offset = 0;
160
161 try
162 {
163 if (data != null && data.Length > offset + MIN)
164 {
165 var list = new List<QueueSocketMsg>();
166
167 while (data.Length > offset + MIN)
168 {
169 var total = BitConverter.ToInt32(data, offset + 1);
170
171 if (data.Length >= offset + total + 1)
172 {
173 offset += 5;
174
175 var qm = new QueueSocketMsg((QueueSocketMsgType)data[0]);
176 qm.Total = total;
177
178 qm.NLen = BitConverter.ToInt32(data, offset);
179 offset += 4;
180
181
182 if (qm.NLen > 0)
183 {
184 var narr = new byte[qm.NLen];
185 Buffer.BlockCopy(data, offset, narr, 0, narr.Length);
186 qm.Name = Encoding.UTF8.GetString(narr);
187 }
188 offset += qm.NLen;
189
190 qm.TLen = BitConverter.ToInt32(data, offset);
191
192 offset += 4;
193
194 if (qm.TLen > 0)
195 {
196 var tarr = new byte[qm.TLen];
197 Buffer.BlockCopy(data, offset, tarr, 0, tarr.Length);
198 qm.Topic = Encoding.UTF8.GetString(tarr);
199 }
200 offset += qm.TLen;
201
202 var dlen = qm.Total - 4 - 4 - qm.NLen - 4 - qm.TLen;
203
204 if (dlen > 0)
205 {
206 var darr = new byte[dlen];
207 Buffer.BlockCopy(data, offset, darr, 0, dlen);
208 qm.Data = Encoding.UTF8.GetString(darr);
209 offset += dlen;
210 }
211 list.Add(qm);
212 }
213 else
214 {
215 break;
216 }
217 }
218 if (list.Count > 0)
219 {
220 onDecode?.Invoke(list.ToArray(), offset);
221 return true;
222 }
223 }
224 }
225 catch (Exception ex)
226 {
227 ConsoleHelper.WriteLine($"QCoder.Decode error:{ex.Message} stack:{ex.StackTrace} data:{data.Length} offset:{offset}");
228 }
229 onDecode?.Invoke(null, 0);
230 return false;
231 }
232
233
234 /// <summary>
235 /// dispose
236 /// </summary>
237 public void Dispose()
238 {
239 _buffer.Clear();
240 _buffer = null;
241 }
242
243
244
245 }
246 }
測試
簡單的How to do和Why do已經完成了,是時候定義個Producer、Consumer來測試一把了
1 using SAEA.QueueSocket;
2 using SAEA.Commom;
3 using SAEA.QueueSocket.Model;
4 using System;
5 using System.Collections.Generic;
6 using System.Diagnostics;
7 using System.Text;
8 using System.Threading;
9 using System.Threading.Tasks;
10
11 namespace SAEA.QueueSocketTest
12 {
13 class Program
14 {
15 static void Main(string[] args)
16 {
17 do
18 {
19 ConsoleHelper.WriteLine("輸入s啟動隊列伺服器,輸入p啟動生産者,輸入c啟動消費者");
20
21 var inputStr = ConsoleHelper.ReadLine();
22
23 if (!string.IsNullOrEmpty(inputStr))
24 {
25 var topic = "測試頻道";
26
27 switch (inputStr.ToLower())
28 {
29 case "s":
30 ConsoleHelper.Title = "SAEA.QueueServer";
31 ServerInit();
32 break;
33 case "p":
34 ConsoleHelper.Title = "SAEA.QueueProducer";
35 ConsoleHelper.WriteLine("輸入ip:port連接配接到隊列伺服器");
36 inputStr = ConsoleHelper.ReadLine();
37 ProducerInit(inputStr, topic);
38 break;
39 case "c":
40 ConsoleHelper.Title = "SAEA.QueueConsumer";
41 ConsoleHelper.WriteLine("輸入ip:port連接配接到隊列伺服器");
42 inputStr = ConsoleHelper.ReadLine();
43 ConsumerInit(inputStr, topic);
44 break;
45 default:
46 ServerInit();
47 inputStr = "127.0.0.1:39654";
48 ProducerInit(inputStr, topic);
49 ConsumerInit(inputStr, topic);
50 break;
51 }
52 ConsoleHelper.WriteLine("回車退出!");
53 ConsoleHelper.ReadLine();
54 return;
55 }
56 }
57 while (true);
58 }
59
60
61
62 static QServer _server;
63 static void ServerInit()
64 {
65 _server = new QServer();
66 _server.OnDisconnected += Server_OnDisconnected;
67 _server.CalcInfo((ci, qi) =>
68 {
69 var result = string.Format("生産者:{0} 消費者:{1} 收到消息:{2} 推送消息:{3}{4}", ci.Item1, ci.Item2, ci.Item3, ci.Item4, Environment.NewLine);
70
71 qi.ForEach((item) =>
72 {
73 result += string.Format("隊列名稱:{0} 堆積消息數:{1} {2}", item.Item1, item.Item2, Environment.NewLine);
74 });
75 ConsoleHelper.WriteLine(result);
76 });
77 _server.Start();
78 }
79
80 private static void Server_OnDisconnected(string ID, Exception ex)
81 {
82 _server.Clear(ID);
83 if (ex != null)
84 {
85 ConsoleHelper.WriteLine("{0} 已從伺服器斷開,err:{1}", ID, ex.ToString());
86 }
87 }
88
89 static void ProducerInit(string ipPort, string topic)
90 {
91 int pNum = 0;
92
93 //string msg = "主要原因是由于在高并發環境下,由于來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達MySQL,直接導緻無數的行鎖表鎖,甚至最後請求會堆積過多,進而觸發too many connections錯誤。通過使用消息隊列,我們可以異步處理請求,進而緩解系統的壓力。";
94 string msg = "123";
95 if (string.IsNullOrEmpty(ipPort)) ipPort = "127.0.0.1:39654";
96
97 QClient producer = new QClient("productor" + Guid.NewGuid().ToString("N"), ipPort);
98
99 producer.OnError += Producer_OnError;
100
101 producer.OnDisconnected += Client_OnDisconnected;
102
103 producer.ConnectAsync((s) =>
104 {
105 Task.Factory.StartNew(() =>
106 {
107 var old = 0;
108 var speed = 0;
109 while (producer.Connected)
110 {
111 speed = pNum - old;
112 old = pNum;
113 ConsoleHelper.WriteLine("生産者已成功發送:{0} 速度:{1}/s", pNum, speed);
114 Thread.Sleep(1000);
115 }
116 });
117
118 var list = new List<Tuple<string, byte[]>>();
119
120
121 while (producer.Connected)
122 {
123
124 producer.Publish(topic, msg);
125
126 Interlocked.Increment(ref pNum);
127 }
128 });
129
130
131 }
132
133 private static void Producer_OnError(string ID, Exception ex)
134 {
135 ConsoleHelper.WriteLine("id:" + ID + ",error:" + ex.Message);
136 }
137
138 static void ConsumerInit(string ipPort, string topic)
139 {
140 if (string.IsNullOrEmpty(ipPort)) ipPort = "127.0.0.1:39654";
141 QClient consumer = new QClient("subscriber-" + Guid.NewGuid().ToString("N"), ipPort);
142 consumer.OnMessage += Subscriber_OnMessage;
143 consumer.OnDisconnected += Client_OnDisconnected;
144 consumer.ConnectAsync((s) =>
145 {
146 Task.Factory.StartNew(() =>
147 {
148 var old = 0;
149 var speed = 0;
150 while (consumer.Connected)
151 {
152 speed = _outNum - old;
153 old = _outNum;
154 ConsoleHelper.WriteLine("消費者已成功接收:{0} 速度:{1}/s", _outNum, speed);
155 Thread.Sleep(1000);
156 }
157 });
158
159 consumer.Subscribe(topic);
160 });
161
162 }
163
164 private static void Client_OnDisconnected(string ID, Exception ex)
165 {
166 ConsoleHelper.WriteLine("目前連接配接已關閉");
167 }
168
169 static int _outNum = 0;
170
171 private static void Subscriber_OnMessage(QueueResult obj)
172 {
173 if (obj != null)
174 _outNum += 1;
175 }
176 }
177 }
單線程的、單生産者、單消費者、單隊列伺服器的測試結果如下圖:
到此一個自行實作的簡單的消息隊列完成了,雖說它離實際産品還很遙遠,但是本人還是覺的技術的提升離不開鑽研,路漫漫其修遠兮,吾将上下而求索!
轉載請标明本文來源:http://www.cnblogs.com/yswenli/p//9029587.html
更多内容歡迎star作者的github:https://github.com/yswenli/SAEA
如果發現本文有什麼問題和任何建議,也随時歡迎交流~
感謝您的閱讀,如果您對我的部落格所講述的内容有興趣,請繼續關注我的後續部落格,我是yswenli 。