天天看點

實作延遲消息隊列

    交流

    個人部落格交流群:580749909 , 順便推廣一下自己和夥伴一起建立wpf交流群:130108655。

    簡要

&*bsp;因為在偶然的一次機會下,公司讓我着手開發一個資料分發端基于socket通訊的一個中間件。主要用來解決向用戶端分發資料的問題,後來多了一個需求就是未付費的使用者拿到的資料是有延遲的。

而付費使用者則是正常的。這個時候在網上搜了很久沒有找到合适的解決方案,其實能解決這個問題的方案有很多比如說用到一些大廠貢獻的xxMQ中間件之類的,确實能解決問題。但是目前項目比較小

根本用不上這麼重的架構,然後又搜尋了半天沒有暫時沒有發現有人用c#來實作,是以才動手寫了這個方案。

附上**thub源碼位址

    思路

這個方案是借鑒了另一位部落客的開發思路,受到這位部落客的啟發然後根據自己的了解寫了這個方案。附上該部落客的連結位址:&*bsp;&*bsp;1分鐘實作“延遲消息”功能

在此我就不多贅述裡面的内容了。

    代碼

首先寫一個方案要理清楚自己的項目結構,我做了如下分層。

<*m* src="https://*m*2018.c*blo*s.com/blo*/1214710/201809/1214710-20180923151631092-669553263.p**" alt="" />

&*bsp;

    I*terfaces , 這層裡主要限制延遲消息隊列的隊列和消息任務行。
1   publ*c **terface IR***Queue<T&*t;
 2     {
 3         /// <summary&*t;
 4         /// Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles]
 5         /// </summary&*t;
 6         /// <param *ame="delayT*me"&*t;The spec*f*ed task *s executed after N seco*ds.</param&*t;
 7         /// <param *ame="act*o*"&*t;Def***t*o*s of callback</param&*t;
 8         vo*d Add(lo** delayT*me,Act*o*<T&*t; act*o*);
 9 
10         /// <summary&*t;
11         /// Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles]
12         /// </summary&*t;
13         /// <param *ame="delayT*me"&*t;The spec*f*ed task *s executed after N seco*ds.</param&*t;
14         /// <param *ame="act*o*"&*t;Def***t*o*s of callback.</param&*t;
15         /// <param *ame="data"&*t;*arameters used ** the callback fu*ct*o*.</param&*t;
16         vo*d Add(lo** delayT*me, Act*o*<T&*t; act*o*, T data);
17 
18         /// <summary&*t;
19         /// Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles]
20         /// </summary&*t;
21         /// <param *ame="delayT*me"&*t;</param&*t;
22         /// <param *ame="act*o*"&*t;Def***t*o*s of callback</param&*t;
23         /// <param *ame="data"&*t;*arameters used ** the callback fu*ct*o*.</param&*t;
24         /// <param *ame="*d"&*t;Task ID, used whe* delet*** tasks.</param&*t;
25         vo*d Add(lo** delayT*me, Act*o*<T&*t; act*o*, T data, lo** *d);
26 
27         /// <summary&*t;
28         /// Remove tasks [*eed to k*ow: where the task *s, wh*ch spec*f*c task].
29         /// </summary&*t;
30         /// <param *ame="**dex"&*t;Task slot locat*o*</param&*t;
31         /// <param *ame="*d"&*t;Task ID, used whe* delet*** tasks.</param&*t;
32         vo*d Remove(lo** *d);
33 
34         /// <summary&*t;
35         /// Lau*ch queue.
36         /// </summary&*t;
37         vo*d Start();
38     }      
1 publ*c **terface ITask
2     {
3     }      
    Ach*eves,這層裡實作之前定義的接口,這裡寫成抽象類是為了後面友善擴充。

<*m* *d="code_*m*_closed_41bba8a5-767e-44e1-adae-d0d820aa4313" class="code_*m*_closed" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Co*tractedBlock.**f" alt="" /><*m* *d="code_*m*_ope*ed_41bba8a5-767e-44e1-adae-d0d820aa4313" class="code_*m*_ope*ed" style="d*splay: *o*e;" o*cl*ck="c*blo*s_code_h*de('41bba8a5-767e-44e1-adae-d0d820aa4313',eve*t)" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Expa*dedBlockStart.**f" alt="" />

1    publ*c abstract class BaseQueue<T&*t; : IR***Queue<T&*t;
  2     {
  3         pr*vate lo** _po**ter = 0L;
  4         pr*vate Co*curre*tBa*<BaseTask<T&*t;&*t;[] _arraySlot;
  5         pr*vate **t ArrayMax;
  6 
  7         /// <summary&*t;
  8         /// R*** queue.
  9         /// </summary&*t;
 10         publ*c Co*curre*tBa*<BaseTask<T&*t;&*t;[] ArraySlot
 11         {
 12             *et { retur* _arraySlot ?? (_arraySlot = *ew Co*curre*tBa*<BaseTask<T&*t;&*t;[ArrayMax]); }
 13         }
 14         
 15         publ*c BaseQueue(**t arrayMax)
 16         {
 17             *f (arrayMax < 60 && arrayMax % 60 == 0)
 18                 throw *ew Except*o*("R*** queue le**th ca**ot be less tha* 60 a*d *s a mult*ple of 60 .");
 19 
 20             ArrayMax = arrayMax;
 21         }
 22 
 23         publ*c vo*d Add(lo** delayT*me, Act*o*<T&*t; act*o*)
 24         {
 25             Add(delayT*me, act*o*, default(T));
 26         }
 27 
 28         publ*c vo*d Add(lo** delayT*me,Act*o*<T&*t; act*o*,T data)
 29         {
 30             Add(delayT*me, act*o*, data,0);
 31         }
 32 
 33         publ*c vo*d Add(lo** delayT*me, Act*o*<T&*t; act*o*, T data,lo** *d)
 34         {
 35             NextSlot(delayT*me, out lo** cycle, out lo** po**ter);
 36             ArraySlot[po**ter] =  ArraySlot[po**ter] ?? (ArraySlot[po**ter] = *ew Co*curre*tBa*<BaseTask<T&*t;&*t;());
 37             var baseTask = *ew BaseTask<T&*t;(cycle, act*o*, data,*d);
 38             ArraySlot[po**ter].Add(baseTask);
 39         }
 40 
 41         /// <summary&*t;
 42         /// Remove tasks based o* ID.
 43         /// </summary&*t;
 44         /// <param *ame="*d"&*t;</param&*t;
 45         publ*c vo*d Remove(lo** *d)
 46         {
 47             try
 48             {
 49                 *arallel.ForEach(ArraySlot, (Co*curre*tBa*<BaseTask<T&*t;&*t; collect*o*, *arallelLoopState state) =&*t;
 50                 {
 51                     var resulTask = collect*o*.F*rstOrDefault(p =&*t; p.Id == *d);
 52                     *f (resulTask != *ull)
 53                     {
 54                         collect*o*.TryTake(out resulTask);
 55                         state.Break();
 56                     }
 57                 });
 58             }
 59             catch (Except*o* e)
 60             {
 61                 Co*sole.Wr*teL**e(e);
 62             }
 63         }
 64         
 65         publ*c vo*d Start()
 66         {
 67             wh*le (true)
 68             {
 69                 R**htMove*o**ter();
 70                 Thread.Sleep(1000);
 71                 Co*sole.Wr*teL**e(DateT*me.Now.ToStr***());
 72             }
 73         }
 74 
 75         /// <summary&*t;
 76         /// Calculate the **format*o* of the *ext slot.
 77         /// </summary&*t;
 78         /// <param *ame="delayT*me"&*t;Delayed execut*o* t*me.</param&*t;
 79         /// <param *ame="cycle"&*t;Number of tur*s.</param&*t;
 80         /// <param *ame="**dex"&*t;Task locat*o*.</param&*t;
 81         pr*vate vo*d NextSlot(lo** delayT*me, out lo** cycle,out lo** **dex)
 82         {
 83             try
 84             {
 85                 var c*rcle = delayT*me / ArrayMax;
 86                 var seco*d = delayT*me % ArrayMax;
 87                 var curre*t_po**ter = Get*o**ter();
 88                 var queue_**dex = 0L;
 89 
 90                 *f (delayT*me - ArrayMax &*t; ArrayMax)
 91                 {
 92                     c*rcle = 1;
 93                 }
 94                 else *f (seco*d &*t; ArrayMax)
 95                 {
 96                     c*rcle += 1;
 97                 }
 98 
 99                 *f (delayT*me - c*rcle * ArrayMax < ArrayMax)
100                 {
101                     seco*d = delayT*me - c*rcle * ArrayMax;
102                 }
103 
104                 *f (curre*t_po**ter + delayT*me &*t;= ArrayMax)
105                 {
106                     cycle = (**t)((curre*t_po**ter + delayT*me) / ArrayMax);
107                     *f (curre*t_po**ter + seco*d - ArrayMax < 0)
108                     {
109                         queue_**dex = curre*t_po**ter + seco*d;
110                     }
111                     else *f (curre*t_po**ter + seco*d - ArrayMax &*t; 0)
112                     {
113                         queue_**dex = curre*t_po**ter + seco*d - ArrayMax;
114                     }
115                 }
116                 else
117                 {
118                     cycle = 0;
119                     queue_**dex = curre*t_po**ter + seco*d;
120                 }
121                 **dex = queue_**dex;
122             }
123             catch (Except*o* e)
124             {
125                 Co*sole.Wr*teL**e(e);
126                 throw;
127             }
128         }
129 
130         /// <summary&*t;
131         /// Get the curre*t locat*o* of the po**ter.
132         /// </summary&*t;
133         /// <retur*s&*t;</retur*s&*t;
134         pr*vate lo** Get*o**ter()
135         {
136             retur* I*terlocked.Read(ref _po**ter);
137         }
138 
139         /// <summary&*t;
140         /// Reset po**ter pos*t*o*.
141         /// </summary&*t;
142         pr*vate vo*d ReSet*o**ter()
143         {
144             I*terlocked.Excha**e(ref _po**ter, 0);
145         }
146 
147         /// <summary&*t;
148         /// *o**ter moves clockw*se.
149         /// </summary&*t;
150         pr*vate vo*d R**htMove*o**ter()
151         {
152             try
153             {
154                 *f (Get*o**ter() &*t;= ArrayMax - 1)
155                 {
156                     ReSet*o**ter();
157                 }
158                 else
159                 {
160                     I*terlocked.I*creme*t(ref _po**ter);
161                 }
162 
163                 var po**ter = Get*o**ter();
164                 var taskCollect*o* = ArraySlot[po**ter];
165                 *f (taskCollect*o* == *ull || taskCollect*o*.Cou*t == 0) retur*;
166 
167                 *arallel.ForEach(taskCollect*o*, (BaseTask<T&*t; task) =&*t;
168                 {
169                     *f (task.Cycle &*t; 0)
170                     {
171                         task.SubCycleNumber();
172                     }
173 
174                     *f (task.Cycle <= 0)
175                     {
176                         taskCollect*o*.TryTake(out task);
177                         task.TaskAct*o*(task.Data);
178                     }
179                 });
180             }
181             catch (Except*o* e)
182             {
183                 Co*sole.Wr*teL**e(e);
184                 throw;
185             }
186         }
187     }      

BaseQueue

<*m* *d="code_*m*_closed_ff60babe-8bc1-4bc0-882c-88cb2a24207c" class="code_*m*_closed" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Co*tractedBlock.**f" alt="" /><*m* *d="code_*m*_ope*ed_ff60babe-8bc1-4bc0-882c-88cb2a24207c" class="code_*m*_ope*ed" style="d*splay: *o*e;" o*cl*ck="c*blo*s_code_h*de('ff60babe-8bc1-4bc0-882c-88cb2a24207c',eve*t)" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Expa*dedBlockStart.**f" alt="" />

1   publ*c class BaseTask<T&*t; : ITask
 2     {
 3         pr*vate lo** _cycle;
 4         pr*vate lo** _*d;
 5         pr*vate T _data;
 6 
 7         publ*c Act*o*<T&*t; TaskAct*o* { *et; set; }
 8 
 9         publ*c lo** Cycle
10         {
11             *et { retur* I*terlocked.Read(ref _cycle); }
12             set { I*terlocked.Excha**e(ref _cycle, value); }
13         }
14 
15         publ*c lo** Id
16         {
17             *et { retur* _*d; }
18             set { _*d = value; }
19         }
20 
21         publ*c T Data
22         {
23             *et { retur* _data; }
24             set { _data = value; }
25         }
26 
27         publ*c BaseTask(lo** cycle, Act*o*<T&*t; act*o*, T data,lo** *d)
28         {
29             Cycle = cycle;
30             TaskAct*o* = act*o*;
31             Data = data;
32             Id = *d;
33         }
34 
35         publ*c BaseTask(lo** cycle, Act*o*<T&*t; act*o*,T data)
36         {
37             Cycle = cycle;
38             TaskAct*o* = act*o*;
39             Data = data;
40         }
41 
42         publ*c BaseTask(lo** cycle, Act*o*<T&*t; act*o*)
43         {
44             Cycle = cycle;
45             TaskAct*o* = act*o*;
46         }
47         
48         publ*c vo*d SubCycleNumber()
49         {
50             I*terlocked.Decreme*t(ref _cycle);
51         }
52     }      

BaseTask

    Lo**c,這層主要實作調用邏輯,調用者最終隻需要關心把任務放進隊列并指定什麼時候執行就行了,根本不需要關心其它的任何資訊。
1  publ*c stat*c vo*d Start()
 2         {
 3             //1.I**t*al*ze queues of d*ffere*t *ra*ular*ty.
 4             IR***Queue<NewsModel&*t; m**uteR***Queue = *ew M**uteQueue<NewsModel&*t;();
 5 
 6             //2.Ope* thread.
 7             var lstTasks = *ew L*st<Task&*t;
 8             {
 9                 Task.Factory.StartNew(m**uteR***Queue.Start)
10             };
11 
12             //3.Add tasks performed ** d*ffere*t per*ods.
13             m**uteR***Queue.Add(5, *ew Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;
14             {
15                 Co*sole.Wr*teL**e(*ewsObj.News);
16             }), *ew NewsModel() { News = "Trump's v*s*t to Ch**a!" });
17 
18             m**uteR***Queue.Add(10, *ew Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;
19             {
20                 Co*sole.Wr*teL**e(*ewsObj.News);
21             }), *ew NewsModel() { News = "*ut** *u's v*s*t to Ch**a!" });
22 
23             m**uteR***Queue.Add(60, *ew Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;
24             {
25                 Co*sole.Wr*teL**e(*ewsObj.News);
26             }), *ew NewsModel() { News = "E*se*hower's v*s*t to Ch**a!" });
27 
28             m**uteR***Queue.Add(120, *ew Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;
29             {
30                 Co*sole.Wr*teL**e(*ewsObj.News);
31             }), *ew NewsModel() { News = "** ***p***'s v*s*t to the US!" });
32 
33             //3.Wa*t*** for all tasks to complete *s usually *ot completed. Because there *s a* **f***te loop.
34             //F5 Ru* the pro*ram a*d see the effect.
35             Task.Wa*tAll(lstTasks.ToArray());
36             Co*sole.Read();
37         }      
    Models,這層就是用來在延遲任務中帶入的資料模型類而已了。自己用的時候換成任意自定義類型都可以。
    截圖

<*m* src="https://*m*2018.c*blo*s.com/blo*/1214710/201809/1214710-20180923152944919-1577847621.p**" alt="" />