天天看點

自已動手做高性能消息隊列

前言

        本人覺得碼農的技術提升應該是從how to do到why do,而項目或産品都是從why do到how to do,按題來,是以呢下面先從大的方面介紹一下消息隊列。

        消息隊列是分布式高并發面目中必不可少的一部分,随着網際網路、雲計算、大資料的使用,消息隊列的應用越來越多,消息隊列在系統的可伸縮性、穩定性、提升吞吐量等方面有着顯著的作用;它主要的作用一般如下:

       1.通過異步處理提高系統性能

自已動手做高性能消息隊列

<img src="https://pic3.zhimg.com/50/v2-223c09f60af24f2b9eb10d1b60774109_hd.jpg" data-caption="" data-size="normal" data-rawwidth="910" data-rawheight="350" class="origin_image zh-lightbox-thumb" width="910" data-original="https://pic3.zhimg.com/v2-223c09f60af24f2b9eb10d1b60774109_r.jpg">        如上圖,在不使用消息隊列伺服器的時候,使用者的請求資料直接寫入資料庫,在高并發的情況下資料庫壓力劇增,使得響應速度變慢。但是在使用消息隊列之後,使用者的請求資料發送給消息隊列之後立即 傳回,再由消息隊列的消費者程序從消息隊列中擷取資料,異步寫入資料庫。由于消息隊列伺服器處理速度快于資料庫(消息隊列也比資料庫有更好的伸縮性),是以響應速度得到大幅改善。

        通過以上分析我們可以得出消息隊列具有很好的削峰作用的功能——即通過異步處理,将短時間高并發産生的事務消息存儲在消息隊列中,進而削平高峰期的并發事務。 舉例:在電子商務一些秒殺、促銷活動中,合理使用消息隊列可以有效抵禦促銷活動剛開始大量訂單湧入對系統的沖擊。如下圖所示:

<img src="https://pic3.zhimg.com/50/v2-6b95a9281690c3d190f8bf84312ee536_hd.jpg" data-caption="" data-size="normal" data-rawwidth="780" data-rawheight="384" class="origin_image zh-lightbox-thumb" width="780" data-original="https://pic3.zhimg.com/v2-6b95a9281690c3d190f8bf84312ee536_r.jpg">

自已動手做高性能消息隊列

        因為使用者請求資料寫入消息隊列之後就立即傳回給使用者了,但是請求資料在後續的業務校驗、寫資料庫等操作中可能失敗。是以使用消息隊列進行異步處理之後,需要适當修改業務流程進行配合,比如使用者在送出訂單之後,訂單資料寫入消息隊列,不能立即傳回使用者訂單送出成功,需要在消息隊列的訂單消費者程序真正處理完該訂單之後,甚至出庫後,再通過電子郵件或短信通知使用者訂單成功,以免交易糾紛。這就類似我們平時手機訂火車票和電影票。

       2.降低系統耦合性

        我們知道子產品分布式部署以後聚合方式通常有兩種:1.分布式消息隊列和2.分布式服務。先來簡單說一下分布式服務:目前使用比較多的用來建構SOA(Service Oriented Architecture面向服務體系結構)的分布式服務架構是阿裡巴巴開源的Dubbo.如果想深入了解Dubbo的可以看我寫的關于Dubbo的這一篇文章:《高性能優秀的服務架構-dubbo介紹》:https://juejin.im/post/5acadeb1f265da2375072f9c 再來談我們的分布式消息隊列:我們知道如果子產品之間不存在直接調用,那麼新增子產品或者修改子產品就對其他子產品影響較小,這樣系統的可擴充性無疑更好一些。

        我們最常見的事件驅動架構類似生産者消費者模式,在大型網站中通常用利用消息隊列實作事件驅動結構。如下圖所示:

自已動手做高性能消息隊列

        消息隊列使利用釋出-訂閱模式工作,消息發送者(生産者)釋出消息,一個或多個消息接受者(消費者)訂閱消息。 從上圖可以看到消息發送者(生産者)和消息接受者(消費者)之間沒有直接耦合,消息發送者将消息發送至分布式消息隊列即結束對消息的處理,消息接受者從分布式消息隊列擷取該消息後進行後續處理,并不需要知道該消息從何而來。對新增業務,隻要對該類消息感興趣,即可訂閱該消息,對原有系統和業務沒有任何影響,進而實作網站業務的可擴充性設計。

        消息接受者對消息進行過濾、處理、包裝後,構造成一個新的消息類型,将消息繼續發送出去,等待其他消息接受者訂閱該消息。是以基于事件(消息對象)驅動的業務架構可以是一系列流程。

        另外為了避免消息隊列伺服器當機造成消息丢失,會将成功發送到消息隊列的消息存儲在消息生産者伺服器上,等消息真正被消費者伺服器處理後才删除消息。在消息隊列伺服器當機後,生産者伺服器會選擇分布式消息隊列伺服器叢集中的其他伺服器釋出消息。

        前面說了這麼多消息隊列的重要性、使用場景、工作模式,有很多人就可能會說了,現有的ActiveMQ、RabbitMQ、Kafka、RocketMQ等多了去了,那在項目架構的時候選一個用上去就不行了,完全沒有必要重複造輪子啊!本人認為對于重複造輪子的事情和其它任何事情都是一樣的——任何事情沒有絕對的好處或者壞處,比如是剛入門的碼農、又或者很急的項目,完全可以選用現有一種通用的、成熟的産品,沒必要去從零開始做;實際上沒有任何一個優秀的産品全部使用三方的産品來組裝完成的,任何一個好一點的項目發展到一定的時候都不約而同的進行底層開發。原因很簡單:第一個任何通用型的産品總用功能覆寫不到的場景;第二個任何通用型的産品為了實作通用必将做了一些性能或架構的犧牲;現在道理都講完了,開始動手了(都聽你逼半天,能動手就盡量少逼逼!)。

 概述

  動手前先構思一下,本人需要一個簡單的、可釋出訂閱的、高吞吐量的消息隊列,并将之簡單大的方面分成QServer、QClient;QServer主要有Exchange、Binding、MessageQueue構成;QClient和QServer共用一套相同的傳輸編解碼器QCoder ,主要實作Publish、Subscribe、Unsubcribe、Closes等功能;先想這麼多,開幹!

Exchange

  主要在QServer中提供釋出、訂閱、連接配接、隊列資訊等管理

1 /****************************************************************************
  2 *Copyright (c) 2018 Microsoft All Rights Reserved.
  3 *CLR版本: 4.0.30319.42000
  4 *機器名稱:WENLI-PC
  5 *公司名稱:Microsoft
  6 *命名空間:SAEA.QueueSocket.Model
  7 *檔案名: Exchange
  8 *版本号: V1.0.0.0
  9 *唯一辨別:6a576aad-edcc-446d-b7e5-561a622549bf
 10 *目前的使用者域:WENLI-PC
 11 *建立人: yswenli
 12 *電子郵箱:[email protected]
 13 *建立時間:2018/3/5 16:36:44
 14 *描述:
 15 *
 16 *=====================================================================
 17 *修改标記
 18 *修改時間:2018/3/5 16:36:44
 19 *修改人: yswenli
 20 *版本号: V1.0.0.0
 21 *描述:
 22 *
 23 *****************************************************************************/
 24 
 25 using SAEA.Commom;
 26 using SAEA.Sockets.Interface;
 27 using System;
 28 using System.Collections.Generic;
 29 using System.Linq;
 30 using System.Text;
 31 using System.Threading;
 32 using System.Threading.Tasks;
 33 
 34 namespace SAEA.QueueSocket.Model
 35 {
 36     class Exchange : ISyncBase
 37     {
 38         object _syncLocker = new object();
 39 
 40         public object SyncLocker
 41         {
 42             get
 43             {
 44                 return _syncLocker;
 45             }
 46         }
 47 
 48         long _pNum = 0;
 49 
 50         long _cNum = 0;
 51 
 52         long _inNum = 0;
 53 
 54         long _outNum = 0;
 55 
 56         private Binding _binding;
 57 
 58         private MessageQueue _messageQueue;
 59 
 60         public Exchange()
 61         {
 62             this._binding = new Binding();
 63 
 64             this._messageQueue = new MessageQueue();
 65         }
 66 
 67 
 68         public void AcceptPublish(string sessionID, QueueResult pInfo)
 69         {
 70             lock (_syncLocker)
 71             {
 72                 this._binding.Set(sessionID, pInfo.Name, pInfo.Topic);
 73 
 74                 this._messageQueue.Enqueue(pInfo.Topic, pInfo.Data);
 75 
 76                 _pNum = this._binding.GetPublisherCount();
 77 
 78                 Interlocked.Increment(ref _inNum);
 79             }
 80         }
 81 
 82         public void AcceptPublishForBatch(string sessionID, QueueResult[] datas)
 83         {
 84             if (datas != null)
 85             {
 86                 foreach (var data in datas)
 87                 {
 88                     if (data != null)
 89                     {
 90                         AcceptPublish(sessionID, data);
 91                     }
 92                 }
 93             }
 94         }
 95 
 96 
 97         public void GetSubscribeData(string sessionID, QueueResult sInfo, int maxSize = 500, int maxTime = 500, Action<List<string>> callBack = null)
 98         {
 99             lock (_syncLocker)
100             {
101                 var result = this._binding.GetBingInfo(sInfo);
102 
103                 if (result == null)
104                 {
105                     this._binding.Set(sessionID, sInfo.Name, sInfo.Topic, false);
106 
107                     _cNum = this._binding.GetSubscriberCount();
108 
109                     Task.Factory.StartNew(() =>
110                     {
111                         while (this._binding.Exists(sInfo))
112                         {
113                             var list = this._messageQueue.DequeueForList(sInfo.Topic, maxSize, maxTime);
114                             if (list != null)
115                             {
116                                 list.ForEach(i => { Interlocked.Increment(ref _outNum); });
117                                 callBack?.Invoke(list);
118                                 list.Clear();
119                                 list = null;
120                             }
121                         }
122                     });
123                 }
124             }            
125         }
126 
127         public void Unsubscribe(QueueResult sInfo)
128         {
129             Interlocked.Decrement(ref _cNum);
130             this._binding.Del(sInfo.Name, sInfo.Topic);
131         }
132 
133         public void Clear(string sessionID)
134         {
135             lock (_syncLocker)
136             {
137                 var data = this._binding.GetBingInfo(sessionID);
138 
139                 if (data != null)
140                 {
141                     if (data.Flag)
142                     {
143                         Interlocked.Decrement(ref _pNum);
144                     }
145                     else
146                     {
147                         Interlocked.Decrement(ref _cNum);
148                     }
149                     this._binding.Remove(sessionID);
150                 }
151             }
152         }
153 
154         public Tuple<long, long, long, long> GetConnectInfo()
155         {
156             return new Tuple<long, long, long, long>(_pNum, _cNum, _inNum, _outNum);
157         }
158 
159         public List<Tuple<string, long>> GetQueueInfo()
160         {
161             List<Tuple<string, long>> result = new List<Tuple<string, long>>();
162             lock (_syncLocker)
163             {
164                 var list = this._messageQueue.ToList();
165                 if (list != null)
166                 {
167                     var tlts = list.Select(b => b.Topic).Distinct().ToList();
168 
169                     if (tlts != null)
170                     {
171                         foreach (var topic in tlts)
172                         {
173                             var count = this._messageQueue.GetCount(topic);
174                             var t = new Tuple<string, long>(topic, count);
175                             result.Add(t);
176                         }
177                         tlts.Clear();
178                     }
179                     list.Clear();
180                 }
181             }
182             return result;
183         }
184 
185     }
186 }      

  思維發散:這裡可以增加全局消息隊列、指定連接配接消息隊列等;将連接配接通過類型redis cluster模式進行一個均衡分布等

Binding

  主要功能是将連接配接、主題進行映射管理

1 /****************************************************************************
  2 *Copyright (c) 2018 Microsoft All Rights Reserved.
  3 *CLR版本: 4.0.30319.42000
  4 *機器名稱:WENLI-PC
  5 *公司名稱:Microsoft
  6 *命名空間:SAEA.QueueSocket.Model
  7 *檔案名: Binding
  8 *版本号: V1.0.0.0
  9 *唯一辨別:7472dabd-1b6a-4ffe-b19f-2d1cf7348766
 10 *目前的使用者域:WENLI-PC
 11 *建立人: yswenli
 12 *電子郵箱:[email protected]
 13 *建立時間:2018/3/5 17:10:19
 14 *描述:
 15 *
 16 *=====================================================================
 17 *修改标記
 18 *修改時間:2018/3/5 17:10:19
 19 *修改人: yswenli
 20 *版本号: V1.0.0.0
 21 *描述:
 22 *
 23 *****************************************************************************/
 24 
 25 using SAEA.Commom;
 26 using SAEA.Sockets.Interface;
 27 using System;
 28 using System.Collections.Generic;
 29 using System.Linq;
 30 using System.Text;
 31 
 32 namespace SAEA.QueueSocket.Model
 33 {
 34     /// <summary>
 35     /// 連接配接與主題的映射
 36     /// </summary>
 37     class Binding : ISyncBase, IDisposable
 38     {
 39         List<BindInfo> _list = new List<BindInfo>();
 40 
 41         object _syncLocker = new object();
 42 
 43         public object SyncLocker
 44         {
 45             get
 46             {
 47                 return _syncLocker;
 48             }
 49         }
 50 
 51         bool _isDisposed = false;
 52 
 53         int _minutes = 10;
 54 
 55         public Binding(int minutes = 10)
 56         {
 57             _minutes = minutes;
 58 
 59             ThreadHelper.PulseAction(() =>
 60             {
 61                 lock (_syncLocker)
 62                 {
 63                     var list = _list.Where(b => b.Expired <= DateTimeHelper.Now).ToList();
 64                     if (list != null)
 65                     {
 66                         list.ForEach(item =>
 67                         {
 68                             _list.Remove(item);
 69                         });
 70                         list.Clear();
 71                         list = null;
 72                     }
 73                 }
 74             }, new TimeSpan(0, 0, 10), _isDisposed);
 75         }
 76 
 77 
 78         public void Set(string sessionID, string name, string topic, bool isPublisher = true)
 79         {
 80 
 81             lock (_syncLocker)
 82             {
 83                 var result = _list.FirstOrDefault(b => b.Name == name && b.Topic == topic);
 84                 if (result == null)
 85                 {
 86                     _list.Add(new BindInfo()
 87                     {
 88                         SessionID = sessionID,
 89                         Name = name,
 90                         Topic = topic,
 91                         Flag = isPublisher,
 92                         Expired = DateTimeHelper.Now.AddMinutes(_minutes)
 93                     });
 94                 }
 95                 else
 96                 {
 97                     result.Expired = DateTimeHelper.Now.AddMinutes(_minutes);
 98                 }
 99             }
100         }
101 
102         public void Del(string sessionID, string topic)
103         {
104             lock (_syncLocker)
105             {
106                 var result = _list.FirstOrDefault(b => b.Name == sessionID && b.Topic == topic);
107                 if (result != null)
108                 {
109                     _list.Remove(result);
110                 }
111             }
112         }
113 
114         public void Remove(string sessionID)
115         {
116             lock (_syncLocker)
117             {
118                 var result = _list.Where(b => b.SessionID == sessionID).ToList();
119                 if (result != null)
120                 {
121                     result.ForEach((item) =>
122                     {
123                         _list.Remove(item);
124                     });
125                     result.Clear();
126                 }
127             }
128         }
129 
130         public BindInfo GetBingInfo(QueueResult sInfo)
131         {
132             lock (_syncLocker)
133             {
134                 var bi = _list.FirstOrDefault(b => b.Name == sInfo.Name && b.Topic == sInfo.Topic);
135 
136                 if (bi != null)
137                 {
138                     if (bi.Expired <= DateTimeHelper.Now)
139                     {
140                         Remove(bi.SessionID);
141                     }
142                     else
143                     {
144                         return bi;
145                     }
146                 }
147                 return null;
148             }
149         }
150 
151         public BindInfo GetBingInfo(string sessionID)
152         {
153             lock (_syncLocker)
154             {
155                 return _list.FirstOrDefault(b => b.SessionID == sessionID);
156             }
157         }
158 
159         public bool Exists(QueueResult sInfo)
160         {
161             lock (_syncLocker)
162             {
163                 var data = _list.FirstOrDefault(b => b.Name == sInfo.Name && b.Topic == sInfo.Topic);
164 
165                 if (data != null)
166                 {
167                     if (data.Expired <= DateTimeHelper.Now)
168                     {
169                         Remove(data.SessionID);
170 
171                         return false;
172                     }
173 
174                     data.Expired = DateTimeHelper.Now.AddMinutes(_minutes);
175 
176                     return true;
177                 }
178             }
179             return false;
180         }
181 
182 
183         public IEnumerable<BindInfo> GetPublisher()
184         {
185             lock (_syncLocker)
186             {
187                 return _list.Where(b => b.Flag);
188             }
189         }
190 
191         public int GetPublisherCount()
192         {
193             lock (_syncLocker)
194             {
195                 return _list.Where(b => b.Flag).Count();
196             }
197         }
198 
199         public IEnumerable<BindInfo> GetSubscriber()
200         {
201             lock (_syncLocker)
202             {
203                 return _list.Where(b => !b.Flag);
204             }
205         }
206 
207         public int GetSubscriberCount()
208         {
209             lock (_syncLocker)
210             {
211                 return _list.Where(b => !b.Flag).Count();
212             }
213         }
214 
215 
216         public void Dispose()
217         {
218             _isDisposed = true;
219             lock (_syncLocker)
220             {
221                 _list.Clear();
222                 _list = null;
223             }
224         }
225     }
226 }      

  思維發散:實作多個QServer的主題與隊列映射克隆、或者隊列消息轉發實作容災叢集或大容量叢集等

MessageQueue

  将主題與隊列形成一個映射,并對主題映射進行管理

1 /****************************************************************************
  2 *Copyright (c) 2018 Microsoft All Rights Reserved.
  3 *CLR版本: 4.0.30319.42000
  4 *機器名稱:WENLI-PC
  5 *公司名稱:Microsoft
  6 *命名空間:SAEA.QueueSocket.Model
  7 *檔案名: QueueCollection
  8 *版本号: V1.0.0.0
  9 *唯一辨別:89a65c12-c4b3-486b-a933-ad41c3db6621
 10 *目前的使用者域:WENLI-PC
 11 *建立人: yswenli
 12 *電子郵箱:[email protected]
 13 *建立時間:2018/3/6 10:31:11
 14 *描述:
 15 *
 16 *=====================================================================
 17 *修改标記
 18 *修改時間:2018/3/6 10:31:11
 19 *修改人: yswenli
 20 *版本号: V1.0.0.0
 21 *描述:
 22 *
 23 *****************************************************************************/
 24 
 25 using SAEA.Commom;
 26 using SAEA.Sockets.Interface;
 27 using System;
 28 using System.Collections.Concurrent;
 29 using System.Collections.Generic;
 30 using System.Linq;
 31 using System.Threading.Tasks;
 32 
 33 namespace SAEA.QueueSocket.Model
 34 {
 35     public class MessageQueue : ISyncBase, IDisposable
 36     {
 37         bool _isDisposed = false;
 38 
 39         ConcurrentDictionary<string, QueueBase> _list;
 40 
 41         object _syncLocker = new object();
 42 
 43         public object SyncLocker
 44         {
 45             get
 46             {
 47                 return _syncLocker;
 48             }
 49         }
 50 
 51         public MessageQueue()
 52         {
 53             _list = new ConcurrentDictionary<string, QueueBase>();
 54 
 55             ThreadHelper.Run(() =>
 56             {
 57                 while (!_isDisposed)
 58                 {
 59                     var list = _list.Values.Where(b => b.Expired <= DateTimeHelper.Now);
 60                     if (list != null)
 61                     {
 62                         foreach (var item in list)
 63                         {
 64                             if (item.Length == 0)
 65                             {
 66                                 _list.TryRemove(item.Topic, out QueueBase q);
 67                             }
 68                         }
 69                     }
 70                     ThreadHelper.Sleep(10000);
 71                 }
 72             }, true, System.Threading.ThreadPriority.Highest);
 73         }
 74 
 75 
 76         public void Enqueue(string topic, string data)
 77         {
 78             var queue = _list.Values.FirstOrDefault(b => b.Topic.Equals(topic));
 79             lock (_syncLocker)
 80             {
 81                 if (queue == null)
 82                 {
 83                     queue = new QueueBase(topic);
 84                     _list.TryAdd(topic, queue);
 85                 }                
 86             }
 87             queue.Enqueue(data);
 88         }
 89 
 90 
 91         public string Dequeue(string topic)
 92         {
 93             var queue = _list.Values.FirstOrDefault(b => b.Topic.Equals(topic));
 94             if (queue != null)
 95             {
 96                 return queue.Dequeue();
 97             }
 98             return null;
 99         }
100 
101         /// <summary>
102         /// 批量讀取資料
103         /// </summary>
104         /// <param name="topic"></param>
105         /// <param name="maxSize"></param>
106         /// <param name="maxTime"></param>
107         /// <returns></returns>
108         public List<string> DequeueForList(string topic, int maxSize = 500, int maxTime = 500)
109         {
110             List<string> result = new List<string>();
111             bool running = true;
112             var m = 0;
113             var task = Task.Factory.StartNew(() =>
114             {
115                 while (running)
116                 {
117                     var data = Dequeue(topic);
118                     if (data != null)
119                     {
120                         result.Add(data);
121                         m++;
122                         if (m == maxSize)
123                         {
124                             running = false;
125                         }
126                     }
127                     else
128                     {
129                         ThreadHelper.Sleep(1);
130                     }
131                 }
132             });
133             Task.WaitAll(new Task[] { task }, maxTime);
134             running = false;
135             return result;
136         }
137 
138         public string BlockDequeue(string topic)
139         {
140             var queue = _list.Values.FirstOrDefault(b => b.Topic == topic);
141             if (queue != null)
142             {
143                 return queue.BlockDequeue();
144             }
145             return null;
146         }
147 
148         public List<QueueBase> ToList()
149         {
150             lock (_syncLocker)
151             {
152                 return _list.Values.ToList();
153             }
154         }
155 
156         public long GetCount(string topic)
157         {
158             var queue = _list.Values.FirstOrDefault(b => b.Topic == topic);
159             if (queue != null)
160                 return queue.Length;
161             return 0;
162         }
163 
164         public void Dispose()
165         {
166             _isDisposed = true;
167             _list.Clear();
168                 _list = null;
169         }
170     }
171 }      

  思維發散:增加硬碟持久化以實作down機容災、增加ack确認再移除以實作高可靠性等

QCoder

  在QServer和QClient之間進行傳輸編解碼,這個編解碼的速度直接影響消息隊列的傳輸性能;本人使用了2種方案:1.使用類似redis傳輸方案,使用回車作為分隔符方式,這種方案結果要麼一個位元組一個位元組檢查分隔符,這種for操作還是C、C++屌,C#做這個真心不行;要麼先将位元組數組通過Encoding轉換成String再來for,雖說能提升幾倍性能,但是遇到不完整的位元組數組時,本人沒有找一個好的方法。2.使用自定義類型+長度+内容這種格式

1 /****************************************************************************
  2 *Copyright (c) 2018 Microsoft All Rights Reserved.
  3 *CLR版本: 4.0.30319.42000
  4 *機器名稱:WENLI-PC
  5 *公司名稱:Microsoft
  6 *命名空間:SAEA.QueueSocket.Net
  7 *檔案名: QCoder
  8 *版本号: V1.0.0.0
  9 *唯一辨別:88f5a779-8294-47bc-897b-8357a09f2fdb
 10 *目前的使用者域:WENLI-PC
 11 *建立人: yswenli
 12 *電子郵箱:[email protected]
 13 *建立時間:2018/3/5 18:01:56
 14 *描述:
 15 *
 16 *=====================================================================
 17 *修改标記
 18 *修改時間:2018/3/5 18:01:56
 19 *修改人: yswenli
 20 *版本号: V1.0.0.0
 21 *描述:
 22 *
 23 *****************************************************************************/
 24 
 25 using SAEA.Commom;
 26 using SAEA.QueueSocket.Model;
 27 using SAEA.QueueSocket.Type;
 28 using SAEA.Sockets.Interface;
 29 using System;
 30 using System.Collections.Generic;
 31 using System.Text;
 32 
 33 namespace SAEA.QueueSocket.Net
 34 {
 35     public sealed class QCoder : ICoder
 36     {
 37         static readonly int MIN = 1 + 4 + 4 + 0 + 4 + 0 + 0;
 38 
 39         private List<byte> _buffer = new List<byte>();
 40 
 41         private object _locker = new object();
 42 
 43         public void Pack(byte[] data, Action<DateTime> onHeart, Action<ISocketProtocal> onUnPackage, Action<byte[]> onFile)
 44         {
 45 
 46         }
 47 
 48         /// <summary>
 49         /// 隊列編解碼器
 50         /// </summary>
 51         public QueueCoder QueueCoder { get; set; } = new QueueCoder();
 52 
 53         /// <summary>
 54         /// 包解析
 55         /// </summary>
 56         /// <param name="data"></param>
 57         /// <param name="OnQueueResult"></param>
 58         public void GetQueueResult(byte[] data, Action<QueueResult> OnQueueResult)
 59         {
 60             lock (_locker)
 61             {
 62                 try
 63                 {
 64                     _buffer.AddRange(data);
 65 
 66                     if (_buffer.Count > (1 + 4 + 4 + 0 + 4 + 0 + 0))
 67                     {
 68                         var buffer = _buffer.ToArray();
 69 
 70                         QCoder.Decode(buffer, (list, offset) =>
 71                         {
 72                             if (list != null)
 73                             {
 74                                 foreach (var item in list)
 75                                 {
 76                                     OnQueueResult?.Invoke(new QueueResult()
 77                                     {
 78                                         Type = (QueueSocketMsgType)item.Type,
 79                                         Name = item.Name,
 80                                         Topic = item.Topic,
 81                                         Data = item.Data
 82                                     });
 83                                 }
 84                                 _buffer.RemoveRange(0, offset);
 85                             }
 86                         });
 87                     }
 88                 }
 89                 catch (Exception ex)
 90                 {
 91                     ConsoleHelper.WriteLine("QCoder.GetQueueResult error:" + ex.Message + ex.Source);
 92                     _buffer.Clear();
 93                 }
 94             }
 95         }
 96 
 97 
 98 
 99         /// <summary>
100         /// socket 傳輸位元組編碼
101         /// 格式為:1+4+4+x+4+x+4
102         /// </summary>
103         /// <param name="queueSocketMsg"></param>
104         /// <returns></returns>
105         public static byte[] Encode(QueueSocketMsg queueSocketMsg)
106         {
107             List<byte> list = new List<byte>();
108 
109             var total = 4 + 4 + 4;
110 
111             var nlen = 0;
112 
113             var tlen = 0;
114 
115             byte[] n = null;
116             byte[] tp = null;
117             byte[] d = null;
118 
119             if (!string.IsNullOrEmpty(queueSocketMsg.Name))
120             {
121                 n = Encoding.UTF8.GetBytes(queueSocketMsg.Name);
122                 nlen = n.Length;
123                 total += nlen;
124             }
125             if (!string.IsNullOrEmpty(queueSocketMsg.Topic))
126             {
127                 tp = Encoding.UTF8.GetBytes(queueSocketMsg.Topic);
128                 tlen = tp.Length;
129                 total += tlen;
130             }
131             if (!string.IsNullOrEmpty(queueSocketMsg.Data))
132             {
133                 d = Encoding.UTF8.GetBytes(queueSocketMsg.Data);
134                 total += d.Length;
135             }
136 
137             list.Add(queueSocketMsg.Type);
138             list.AddRange(BitConverter.GetBytes(total));
139             list.AddRange(BitConverter.GetBytes(nlen));
140             if (nlen > 0)
141                 list.AddRange(n);
142             list.AddRange(BitConverter.GetBytes(tlen));
143             if (tlen > 0)
144                 list.AddRange(tp);
145             if (d != null)
146                 list.AddRange(d);
147             var arr = list.ToArray();
148             list.Clear();
149             return arr;
150         }
151 
152         /// <summary>
153         /// socket 傳輸位元組解碼
154         /// </summary>
155         /// <param name="data"></param>
156         /// <param name="onDecode"></param>
157         public static bool Decode(byte[] data, Action<QueueSocketMsg[], int> onDecode)
158         {
159             int offset = 0;
160 
161             try
162             {
163                 if (data != null && data.Length > offset + MIN)
164                 {
165                     var list = new List<QueueSocketMsg>();
166 
167                     while (data.Length > offset + MIN)
168                     {
169                         var total = BitConverter.ToInt32(data, offset + 1);
170 
171                         if (data.Length >= offset + total + 1)
172                         {
173                             offset += 5;
174 
175                             var qm = new QueueSocketMsg((QueueSocketMsgType)data[0]);
176                             qm.Total = total;
177 
178                             qm.NLen = BitConverter.ToInt32(data, offset);
179                             offset += 4;
180 
181 
182                             if (qm.NLen > 0)
183                             {
184                                 var narr = new byte[qm.NLen];
185                                 Buffer.BlockCopy(data, offset, narr, 0, narr.Length);
186                                 qm.Name = Encoding.UTF8.GetString(narr);
187                             }
188                             offset += qm.NLen;
189 
190                             qm.TLen = BitConverter.ToInt32(data, offset);
191 
192                             offset += 4;
193 
194                             if (qm.TLen > 0)
195                             {
196                                 var tarr = new byte[qm.TLen];
197                                 Buffer.BlockCopy(data, offset, tarr, 0, tarr.Length);
198                                 qm.Topic = Encoding.UTF8.GetString(tarr);
199                             }
200                             offset += qm.TLen;
201 
202                             var dlen = qm.Total - 4 - 4 - qm.NLen - 4 - qm.TLen;
203 
204                             if (dlen > 0)
205                             {
206                                 var darr = new byte[dlen];
207                                 Buffer.BlockCopy(data, offset, darr, 0, dlen);
208                                 qm.Data = Encoding.UTF8.GetString(darr);
209                                 offset += dlen;
210                             }
211                             list.Add(qm);
212                         }
213                         else
214                         {
215                             break;
216                         }
217                     }
218                     if (list.Count > 0)
219                     {
220                         onDecode?.Invoke(list.ToArray(), offset);
221                         return true;
222                     }
223                 }
224             }
225             catch (Exception ex)
226             {
227                 ConsoleHelper.WriteLine($"QCoder.Decode error:{ex.Message} stack:{ex.StackTrace} data:{data.Length} offset:{offset}");
228             }
229             onDecode?.Invoke(null, 0);
230             return false;
231         }
232 
233 
234         /// <summary>
235         /// dispose
236         /// </summary>
237         public void Dispose()
238         {
239             _buffer.Clear();
240             _buffer = null;
241         }
242 
243 
244 
245     }
246 }      

測試

  簡單的How to do和Why do已經完成了,是時候定義個Producer、Consumer來測試一把了

1 using SAEA.QueueSocket;
  2 using SAEA.Commom;
  3 using SAEA.QueueSocket.Model;
  4 using System;
  5 using System.Collections.Generic;
  6 using System.Diagnostics;
  7 using System.Text;
  8 using System.Threading;
  9 using System.Threading.Tasks;
 10 
 11 namespace SAEA.QueueSocketTest
 12 {
 13     class Program
 14     {
 15         static void Main(string[] args)
 16         {
 17             do
 18             {
 19                 ConsoleHelper.WriteLine("輸入s啟動隊列伺服器,輸入p啟動生産者,輸入c啟動消費者");
 20 
 21                 var inputStr = ConsoleHelper.ReadLine();
 22 
 23                 if (!string.IsNullOrEmpty(inputStr))
 24                 {
 25                     var topic = "測試頻道";
 26 
 27                     switch (inputStr.ToLower())
 28                     {
 29                         case "s":
 30                             ConsoleHelper.Title = "SAEA.QueueServer";
 31                             ServerInit();
 32                             break;
 33                         case "p":
 34                             ConsoleHelper.Title = "SAEA.QueueProducer";
 35                             ConsoleHelper.WriteLine("輸入ip:port連接配接到隊列伺服器");
 36                             inputStr = ConsoleHelper.ReadLine();
 37                             ProducerInit(inputStr, topic);
 38                             break;
 39                         case "c":
 40                             ConsoleHelper.Title = "SAEA.QueueConsumer";
 41                             ConsoleHelper.WriteLine("輸入ip:port連接配接到隊列伺服器");
 42                             inputStr = ConsoleHelper.ReadLine();
 43                             ConsumerInit(inputStr, topic);
 44                             break;
 45                         default:
 46                             ServerInit();
 47                             inputStr = "127.0.0.1:39654";
 48                             ProducerInit(inputStr, topic);
 49                             ConsumerInit(inputStr, topic);
 50                             break;
 51                     }
 52                     ConsoleHelper.WriteLine("回車退出!");
 53                     ConsoleHelper.ReadLine();
 54                     return;
 55                 }
 56             }
 57             while (true);
 58         }
 59 
 60 
 61 
 62         static QServer _server;
 63         static void ServerInit()
 64         {
 65             _server = new QServer();
 66             _server.OnDisconnected += Server_OnDisconnected;
 67             _server.CalcInfo((ci, qi) =>
 68             {
 69                 var result = string.Format("生産者:{0} 消費者:{1} 收到消息:{2} 推送消息:{3}{4}", ci.Item1, ci.Item2, ci.Item3, ci.Item4, Environment.NewLine);
 70 
 71                 qi.ForEach((item) =>
 72                 {
 73                     result += string.Format("隊列名稱:{0} 堆積消息數:{1} {2}", item.Item1, item.Item2, Environment.NewLine);
 74                 });
 75                 ConsoleHelper.WriteLine(result);
 76             });
 77             _server.Start();
 78         }
 79 
 80         private static void Server_OnDisconnected(string ID, Exception ex)
 81         {
 82             _server.Clear(ID);
 83             if (ex != null)
 84             {
 85                 ConsoleHelper.WriteLine("{0} 已從伺服器斷開,err:{1}", ID, ex.ToString());
 86             }
 87         }
 88 
 89         static void ProducerInit(string ipPort, string topic)
 90         {
 91             int pNum = 0;
 92 
 93             //string msg = "主要原因是由于在高并發環境下,由于來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達MySQL,直接導緻無數的行鎖表鎖,甚至最後請求會堆積過多,進而觸發too many connections錯誤。通過使用消息隊列,我們可以異步處理請求,進而緩解系統的壓力。";
 94             string msg = "123";
 95             if (string.IsNullOrEmpty(ipPort)) ipPort = "127.0.0.1:39654";
 96 
 97             QClient producer = new QClient("productor" + Guid.NewGuid().ToString("N"), ipPort);
 98 
 99             producer.OnError += Producer_OnError;
100 
101             producer.OnDisconnected += Client_OnDisconnected;
102 
103             producer.ConnectAsync((s) =>
104             {
105                 Task.Factory.StartNew(() =>
106                 {
107                     var old = 0;
108                     var speed = 0;
109                     while (producer.Connected)
110                     {
111                         speed = pNum - old;
112                         old = pNum;
113                         ConsoleHelper.WriteLine("生産者已成功發送:{0} 速度:{1}/s", pNum, speed);
114                         Thread.Sleep(1000);
115                     }
116                 });
117 
118                 var list = new List<Tuple<string, byte[]>>();
119                 
120 
121                 while (producer.Connected)
122                 {                   
123 
124                     producer.Publish(topic, msg);
125 
126                     Interlocked.Increment(ref pNum);
127                 }
128             });
129 
130 
131         }
132 
133         private static void Producer_OnError(string ID, Exception ex)
134         {
135             ConsoleHelper.WriteLine("id:" + ID + ",error:" + ex.Message);
136         }
137 
138         static void ConsumerInit(string ipPort, string topic)
139         {
140             if (string.IsNullOrEmpty(ipPort)) ipPort = "127.0.0.1:39654";
141             QClient consumer = new QClient("subscriber-" + Guid.NewGuid().ToString("N"), ipPort);
142             consumer.OnMessage += Subscriber_OnMessage;
143             consumer.OnDisconnected += Client_OnDisconnected;
144             consumer.ConnectAsync((s) =>
145             {
146                 Task.Factory.StartNew(() =>
147                 {
148                     var old = 0;
149                     var speed = 0;
150                     while (consumer.Connected)
151                     {
152                         speed = _outNum - old;
153                         old = _outNum;
154                         ConsoleHelper.WriteLine("消費者已成功接收:{0} 速度:{1}/s", _outNum, speed);
155                         Thread.Sleep(1000);
156                     }
157                 });
158 
159                 consumer.Subscribe(topic);
160             });
161 
162         }
163 
164         private static void Client_OnDisconnected(string ID, Exception ex)
165         {
166             ConsoleHelper.WriteLine("目前連接配接已關閉");
167         }
168 
169         static int _outNum = 0;
170 
171         private static void Subscriber_OnMessage(QueueResult obj)
172         {
173             if (obj != null)
174                 _outNum += 1;
175         }
176     }
177 }      

  單線程的、單生産者、單消費者、單隊列伺服器的測試結果如下圖:

自已動手做高性能消息隊列

  到此一個自行實作的簡單的消息隊列完成了,雖說它離實際産品還很遙遠,但是本人還是覺的技術的提升離不開鑽研,路漫漫其修遠兮,吾将上下而求索!

轉載請标明本文來源:http://www.cnblogs.com/yswenli/p//9029587.html 

更多内容歡迎star作者的github:https://github.com/yswenli/SAEA

如果發現本文有什麼問題和任何建議,也随時歡迎交流~

感謝您的閱讀,如果您對我的部落格所講述的内容有興趣,請繼續關注我的後續部落格,我是yswenli 。

繼續閱讀