交流
個人部落格交流群:580749909 , 順便推廣一下自己和夥伴一起建立wpf交流群:130108655。
簡要
&*bsp;因為在偶然的一次機會下,公司讓我着手開發一個資料分發端基于socket通訊的一個中間件。主要用來解決向用戶端分發資料的問題,後來多了一個需求就是未付費的使用者拿到的資料是有延遲的。
而付費使用者則是正常的。這個時候在網上搜了很久沒有找到合适的解決方案,其實能解決這個問題的方案有很多比如說用到一些大廠貢獻的xxMQ中間件之類的,确實能解決問題。但是目前項目比較小
根本用不上這麼重的架構,然後又搜尋了半天沒有暫時沒有發現有人用c#來實作,是以才動手寫了這個方案。
附上**thub源碼位址
- 思路
這個方案是借鑒了另一位部落客的開發思路,受到這位部落客的啟發然後根據自己的了解寫了這個方案。附上該部落客的連結位址:&*bsp;&*bsp;1分鐘實作“延遲消息”功能
在此我就不多贅述裡面的内容了。
- 代碼
首先寫一個方案要理清楚自己的項目結構,我做了如下分層。
<*m* src="https://*m*2018.c*blo*s.com/blo*/1214710/201809/1214710-20180923151631092-669553263.p**" alt="" />
&*bsp;
- I*terfaces , 這層裡主要限制延遲消息隊列的隊列和消息任務行。
1 publ*c **terface IR***Queue<T&*t;
2 {
3 /// <summary&*t;
4 /// Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles]
5 /// </summary&*t;
6 /// <param *ame="delayT*me"&*t;The spec*f*ed task *s executed after N seco*ds.</param&*t;
7 /// <param *ame="act*o*"&*t;Def***t*o*s of callback</param&*t;
8 vo*d Add(lo** delayT*me,Act*o*<T&*t; act*o*);
9
10 /// <summary&*t;
11 /// Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles]
12 /// </summary&*t;
13 /// <param *ame="delayT*me"&*t;The spec*f*ed task *s executed after N seco*ds.</param&*t;
14 /// <param *ame="act*o*"&*t;Def***t*o*s of callback.</param&*t;
15 /// <param *ame="data"&*t;*arameters used ** the callback fu*ct*o*.</param&*t;
16 vo*d Add(lo** delayT*me, Act*o*<T&*t; act*o*, T data);
17
18 /// <summary&*t;
19 /// Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles]
20 /// </summary&*t;
21 /// <param *ame="delayT*me"&*t;</param&*t;
22 /// <param *ame="act*o*"&*t;Def***t*o*s of callback</param&*t;
23 /// <param *ame="data"&*t;*arameters used ** the callback fu*ct*o*.</param&*t;
24 /// <param *ame="*d"&*t;Task ID, used whe* delet*** tasks.</param&*t;
25 vo*d Add(lo** delayT*me, Act*o*<T&*t; act*o*, T data, lo** *d);
26
27 /// <summary&*t;
28 /// Remove tasks [*eed to k*ow: where the task *s, wh*ch spec*f*c task].
29 /// </summary&*t;
30 /// <param *ame="**dex"&*t;Task slot locat*o*</param&*t;
31 /// <param *ame="*d"&*t;Task ID, used whe* delet*** tasks.</param&*t;
32 vo*d Remove(lo** *d);
33
34 /// <summary&*t;
35 /// Lau*ch queue.
36 /// </summary&*t;
37 vo*d Start();
38 }
1 publ*c **terface ITask
2 {
3 }
- Ach*eves,這層裡實作之前定義的接口,這裡寫成抽象類是為了後面友善擴充。
<*m* *d="code_*m*_closed_41bba8a5-767e-44e1-adae-d0d820aa4313" class="code_*m*_closed" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Co*tractedBlock.**f" alt="" /><*m* *d="code_*m*_ope*ed_41bba8a5-767e-44e1-adae-d0d820aa4313" class="code_*m*_ope*ed" style="d*splay: *o*e;" o*cl*ck="c*blo*s_code_h*de('41bba8a5-767e-44e1-adae-d0d820aa4313',eve*t)" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Expa*dedBlockStart.**f" alt="" />
1 publ*c abstract class BaseQueue<T&*t; : IR***Queue<T&*t;
2 {
3 pr*vate lo** _po**ter = 0L;
4 pr*vate Co*curre*tBa*<BaseTask<T&*t;&*t;[] _arraySlot;
5 pr*vate **t ArrayMax;
6
7 /// <summary&*t;
8 /// R*** queue.
9 /// </summary&*t;
10 publ*c Co*curre*tBa*<BaseTask<T&*t;&*t;[] ArraySlot
11 {
12 *et { retur* _arraySlot ?? (_arraySlot = *ew Co*curre*tBa*<BaseTask<T&*t;&*t;[ArrayMax]); }
13 }
14
15 publ*c BaseQueue(**t arrayMax)
16 {
17 *f (arrayMax < 60 && arrayMax % 60 == 0)
18 throw *ew Except*o*("R*** queue le**th ca**ot be less tha* 60 a*d *s a mult*ple of 60 .");
19
20 ArrayMax = arrayMax;
21 }
22
23 publ*c vo*d Add(lo** delayT*me, Act*o*<T&*t; act*o*)
24 {
25 Add(delayT*me, act*o*, default(T));
26 }
27
28 publ*c vo*d Add(lo** delayT*me,Act*o*<T&*t; act*o*,T data)
29 {
30 Add(delayT*me, act*o*, data,0);
31 }
32
33 publ*c vo*d Add(lo** delayT*me, Act*o*<T&*t; act*o*, T data,lo** *d)
34 {
35 NextSlot(delayT*me, out lo** cycle, out lo** po**ter);
36 ArraySlot[po**ter] = ArraySlot[po**ter] ?? (ArraySlot[po**ter] = *ew Co*curre*tBa*<BaseTask<T&*t;&*t;());
37 var baseTask = *ew BaseTask<T&*t;(cycle, act*o*, data,*d);
38 ArraySlot[po**ter].Add(baseTask);
39 }
40
41 /// <summary&*t;
42 /// Remove tasks based o* ID.
43 /// </summary&*t;
44 /// <param *ame="*d"&*t;</param&*t;
45 publ*c vo*d Remove(lo** *d)
46 {
47 try
48 {
49 *arallel.ForEach(ArraySlot, (Co*curre*tBa*<BaseTask<T&*t;&*t; collect*o*, *arallelLoopState state) =&*t;
50 {
51 var resulTask = collect*o*.F*rstOrDefault(p =&*t; p.Id == *d);
52 *f (resulTask != *ull)
53 {
54 collect*o*.TryTake(out resulTask);
55 state.Break();
56 }
57 });
58 }
59 catch (Except*o* e)
60 {
61 Co*sole.Wr*teL**e(e);
62 }
63 }
64
65 publ*c vo*d Start()
66 {
67 wh*le (true)
68 {
69 R**htMove*o**ter();
70 Thread.Sleep(1000);
71 Co*sole.Wr*teL**e(DateT*me.Now.ToStr***());
72 }
73 }
74
75 /// <summary&*t;
76 /// Calculate the **format*o* of the *ext slot.
77 /// </summary&*t;
78 /// <param *ame="delayT*me"&*t;Delayed execut*o* t*me.</param&*t;
79 /// <param *ame="cycle"&*t;Number of tur*s.</param&*t;
80 /// <param *ame="**dex"&*t;Task locat*o*.</param&*t;
81 pr*vate vo*d NextSlot(lo** delayT*me, out lo** cycle,out lo** **dex)
82 {
83 try
84 {
85 var c*rcle = delayT*me / ArrayMax;
86 var seco*d = delayT*me % ArrayMax;
87 var curre*t_po**ter = Get*o**ter();
88 var queue_**dex = 0L;
89
90 *f (delayT*me - ArrayMax &*t; ArrayMax)
91 {
92 c*rcle = 1;
93 }
94 else *f (seco*d &*t; ArrayMax)
95 {
96 c*rcle += 1;
97 }
98
99 *f (delayT*me - c*rcle * ArrayMax < ArrayMax)
100 {
101 seco*d = delayT*me - c*rcle * ArrayMax;
102 }
103
104 *f (curre*t_po**ter + delayT*me &*t;= ArrayMax)
105 {
106 cycle = (**t)((curre*t_po**ter + delayT*me) / ArrayMax);
107 *f (curre*t_po**ter + seco*d - ArrayMax < 0)
108 {
109 queue_**dex = curre*t_po**ter + seco*d;
110 }
111 else *f (curre*t_po**ter + seco*d - ArrayMax &*t; 0)
112 {
113 queue_**dex = curre*t_po**ter + seco*d - ArrayMax;
114 }
115 }
116 else
117 {
118 cycle = 0;
119 queue_**dex = curre*t_po**ter + seco*d;
120 }
121 **dex = queue_**dex;
122 }
123 catch (Except*o* e)
124 {
125 Co*sole.Wr*teL**e(e);
126 throw;
127 }
128 }
129
130 /// <summary&*t;
131 /// Get the curre*t locat*o* of the po**ter.
132 /// </summary&*t;
133 /// <retur*s&*t;</retur*s&*t;
134 pr*vate lo** Get*o**ter()
135 {
136 retur* I*terlocked.Read(ref _po**ter);
137 }
138
139 /// <summary&*t;
140 /// Reset po**ter pos*t*o*.
141 /// </summary&*t;
142 pr*vate vo*d ReSet*o**ter()
143 {
144 I*terlocked.Excha**e(ref _po**ter, 0);
145 }
146
147 /// <summary&*t;
148 /// *o**ter moves clockw*se.
149 /// </summary&*t;
150 pr*vate vo*d R**htMove*o**ter()
151 {
152 try
153 {
154 *f (Get*o**ter() &*t;= ArrayMax - 1)
155 {
156 ReSet*o**ter();
157 }
158 else
159 {
160 I*terlocked.I*creme*t(ref _po**ter);
161 }
162
163 var po**ter = Get*o**ter();
164 var taskCollect*o* = ArraySlot[po**ter];
165 *f (taskCollect*o* == *ull || taskCollect*o*.Cou*t == 0) retur*;
166
167 *arallel.ForEach(taskCollect*o*, (BaseTask<T&*t; task) =&*t;
168 {
169 *f (task.Cycle &*t; 0)
170 {
171 task.SubCycleNumber();
172 }
173
174 *f (task.Cycle <= 0)
175 {
176 taskCollect*o*.TryTake(out task);
177 task.TaskAct*o*(task.Data);
178 }
179 });
180 }
181 catch (Except*o* e)
182 {
183 Co*sole.Wr*teL**e(e);
184 throw;
185 }
186 }
187 }
BaseQueue
<*m* *d="code_*m*_closed_ff60babe-8bc1-4bc0-882c-88cb2a24207c" class="code_*m*_closed" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Co*tractedBlock.**f" alt="" /><*m* *d="code_*m*_ope*ed_ff60babe-8bc1-4bc0-882c-88cb2a24207c" class="code_*m*_ope*ed" style="d*splay: *o*e;" o*cl*ck="c*blo*s_code_h*de('ff60babe-8bc1-4bc0-882c-88cb2a24207c',eve*t)" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Expa*dedBlockStart.**f" alt="" />
1 publ*c class BaseTask<T&*t; : ITask
2 {
3 pr*vate lo** _cycle;
4 pr*vate lo** _*d;
5 pr*vate T _data;
6
7 publ*c Act*o*<T&*t; TaskAct*o* { *et; set; }
8
9 publ*c lo** Cycle
10 {
11 *et { retur* I*terlocked.Read(ref _cycle); }
12 set { I*terlocked.Excha**e(ref _cycle, value); }
13 }
14
15 publ*c lo** Id
16 {
17 *et { retur* _*d; }
18 set { _*d = value; }
19 }
20
21 publ*c T Data
22 {
23 *et { retur* _data; }
24 set { _data = value; }
25 }
26
27 publ*c BaseTask(lo** cycle, Act*o*<T&*t; act*o*, T data,lo** *d)
28 {
29 Cycle = cycle;
30 TaskAct*o* = act*o*;
31 Data = data;
32 Id = *d;
33 }
34
35 publ*c BaseTask(lo** cycle, Act*o*<T&*t; act*o*,T data)
36 {
37 Cycle = cycle;
38 TaskAct*o* = act*o*;
39 Data = data;
40 }
41
42 publ*c BaseTask(lo** cycle, Act*o*<T&*t; act*o*)
43 {
44 Cycle = cycle;
45 TaskAct*o* = act*o*;
46 }
47
48 publ*c vo*d SubCycleNumber()
49 {
50 I*terlocked.Decreme*t(ref _cycle);
51 }
52 }
BaseTask
- Lo**c,這層主要實作調用邏輯,調用者最終隻需要關心把任務放進隊列并指定什麼時候執行就行了,根本不需要關心其它的任何資訊。
1 publ*c stat*c vo*d Start()
2 {
3 //1.I**t*al*ze queues of d*ffere*t *ra*ular*ty.
4 IR***Queue<NewsModel&*t; m**uteR***Queue = *ew M**uteQueue<NewsModel&*t;();
5
6 //2.Ope* thread.
7 var lstTasks = *ew L*st<Task&*t;
8 {
9 Task.Factory.StartNew(m**uteR***Queue.Start)
10 };
11
12 //3.Add tasks performed ** d*ffere*t per*ods.
13 m**uteR***Queue.Add(5, *ew Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;
14 {
15 Co*sole.Wr*teL**e(*ewsObj.News);
16 }), *ew NewsModel() { News = "Trump's v*s*t to Ch**a!" });
17
18 m**uteR***Queue.Add(10, *ew Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;
19 {
20 Co*sole.Wr*teL**e(*ewsObj.News);
21 }), *ew NewsModel() { News = "*ut** *u's v*s*t to Ch**a!" });
22
23 m**uteR***Queue.Add(60, *ew Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;
24 {
25 Co*sole.Wr*teL**e(*ewsObj.News);
26 }), *ew NewsModel() { News = "E*se*hower's v*s*t to Ch**a!" });
27
28 m**uteR***Queue.Add(120, *ew Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;
29 {
30 Co*sole.Wr*teL**e(*ewsObj.News);
31 }), *ew NewsModel() { News = "** ***p***'s v*s*t to the US!" });
32
33 //3.Wa*t*** for all tasks to complete *s usually *ot completed. Because there *s a* **f***te loop.
34 //F5 Ru* the pro*ram a*d see the effect.
35 Task.Wa*tAll(lstTasks.ToArray());
36 Co*sole.Read();
37 }
- Models,這層就是用來在延遲任務中帶入的資料模型類而已了。自己用的時候換成任意自定義類型都可以。
- 截圖
<*m* src="https://*m*2018.c*blo*s.com/blo*/1214710/201809/1214710-20180923152944919-1577847621.p**" alt="" />