
衆所周知垂直擴充是提升單機的性能的方式,比如提升雙路、四路的CPU運算能力,加大記憶體,更換速度更快的SSD,或者從代碼根本上進行優化和性能提升。水準擴充是提供多台多種伺服器分離單機性能的方式,比如叢集,主從,隊列,負載平衡等等。
白話的垂直擴充
現在伺服器都是雲伺服器,單純從單機的硬體性能提升整體性能,可能已經不太适用,而從代碼上,其實還有些功課可以做,即使不多:
- 優化多線程協調模式,優化多線程下資源共享問題,以免出現奇怪的運作時錯誤。
- 改同步為異步:此方法提升的是吞吐率,性能并不能提升,不過對于用戶端響應也算是件好事吧。
- 使用磁盤預讀模式,極小幅度提升IO性能。
- 使用單機任務隊列,強制任務有序進行:此方法在單機上不會提升性能,甚至會減少原本的單機吞吐率,但是卻能保證任務在同一時刻的完整性。
我們先從垂直擴充中壓榨單機的性能,同時還要保證穩定性,甚至穩定性比單機極限性能更加重要,為何?因為多線程(web伺服器都是多線程模型)資源互斥問題,會讓你查找問題的時候抓狂(當然,如果你要通路的資源隻是單個,就另當别論了)。是以,很多時候我們通常會加鎖來避免這類事情發生(鎖的問題和功能我們這裡不讨論),雖然犧牲了性能,但卻換來了每次高請求所帶來的穩定性。
其次,我們知道,web伺服器都屬于多線程模型,這樣設計的目的是為了提高該伺服器的整體吞吐量(不同伺服器語言采用不同的線程開辟模式,例如java使用的是系統級的線程),當一個線程正在接近滿負荷的處理目前的任務,緊接着馬上又來一個請求(系統不會因為目前正在運作任務而終止新的請求),那麼将是雪上加霜的,多個任務同時長時間在搶占同一個CPU資源,無疑是對整體影響甚大的。
言歸正傳,我們在單機上面針對這類問題,既要盡可能的減少處理時間,又要絕對保證整體運作期間的穩定性(後期會介紹如何使用熔斷機制提升多台伺服器系統的整體穩定性)。
上一節,我們已經建立了一個同步的接口,下面我們将這個接口稍作改動,使其成為包含異步任務方法的接口,整體代碼就不貼上來了,以免影響篇幅
- 如果你喜歡手動建立與管理任務,那麼你可以new一個Task<TResult>執行個體。
- 如果你喜歡讓系統為你管理該任務狀态,那麼你可以Task.Factary.StartNew來建立一個執行個體。
白話的水準擴充
當一條街道上的小區越來越多,用水越來越大,而住戶反應水壓卻越來越小,你是考慮增加主管道通水量大小、還是考慮增加每個小區的增壓泵的功率、還是考慮增加主管道的數量(目前筆者小區就遇到水壓不夠的情況)。
在軟體工程項目中,其實伺服器TPS跟水壓是同一個概念。
- 加大主水管道(如同提升CPU、記憶體)始終會有一個極限;
- 增加每個小區增壓泵的功率(如同用戶端使用大量的輪詢,可主管道出口就隻有那麼點點量)始終要求比得到的多得多;
是以換句話說:
- 增加伺服器數量(畢竟伺服器比自來水廠容易建設:-)),提升管道入口的處理能力;
- 增加不同伺服器類型,例如隊列伺服器,負載伺服器,緩存伺服器等等中間伺服器,分攤和分離不同功能分到而行,如同主管道的分流閥,節流閥,增壓泵等等;
- 增加帶寬(這個是肯定的,提升TPS帶寬肯定也是主要的);
一:點對點——原裝
當然,如果接口已經成為了異步模式(本質其實是提前傳回請求,但并沒有傳回請求所處理的結果),那麼還需要一個接口來告訴用戶端處理的結果,用戶端通過該接口的輪詢獲得實時的結果。
在筆者介紹的這個服務中,流程架構如下:
非常簡單的點對點模式,使用者請求一次,等待伺服器處理響應完成後釋放,所有内容均采用同步方式進行,得到結果是:
非常簡單的點對點模式,使用者請求一次,等待伺服器處理響應完成後釋放,所有内容均采用同步方式進行,得到結果是:
使用者等待時間 = 伺服器處理時間
如果使用者上傳10秒,而伺服器處理需要4秒,那麼這個等待對于使用者來說,是極為煎熬的。
也許聰明的你會說,在用戶端給個友好的提示,比如讓一個“風火輪”不停的轉動,當處理完成後隐藏掉。的确,這從另一個角度上看确實也行得通(比如成本因素),但我們不讨論使用者的視覺和等待等感覺上的東西,隻讨論從技術上如何讓這個響應時間更快,能快到幾乎讓使用者察覺不到。
二:使用并行任務——小幅提升
單機并行模式大家應該都明白,畢竟現在CPU都是多核的了,幹嘛要讓其他CPU閑着呢,不管是JAVA還是C#,目前主流語言都可以完美的執行并行任務(python開多程序其實也算),各種文法請自行Google,既然文章标題是Net,那麼筆者就少量的複制一下C#的代碼。
Parallel.Invoke(() => { },() => { });
哇撒,真的很少,就是C#中的一個并行執行的語句而已,自己需要并行執行的代碼放入花括号中就行,換成流程結構圖如下:
畫的很搓,歡迎拍磚。
筆者采用的CPU是I7-2700K,并行任務狀态确實使用了起來,但減少時間卻隻有1秒,很不可思議,或許是筆者的代碼優化不夠好吧(并行原理和理想結果為何有出入請自行Google),是以就不毛遂自薦的貼上來了:-),但是這個3秒時間我會跟他死磕到底——使用者不能等。
三:分離使用者請求和耗時處理——異步
當朋友們看到這裡的時候,或許心裡早就想到用異步的方式來實作C/S的接口請求了,對,但我們還是需要走一下流程,梳理一下思路。請繼續接着看。
異步其實就是多線程,隻是目前由于進階語言的發展,已經将線程的難點給隐藏掉了,在一個請求主線程中,建立一個異步線程(或任務),分離主線程的長時間處理耗時,将這塊難啃的骨頭交給子線程去做,自己隻管輕松的執行到return,是的确很舒服哦(筆者也夢想擁有這樣的碼磚方式,o(∩_∩)o 哈哈),new一個線程我們不做介紹,畢竟他的管理模式是純手動的、并且是複雜的,我們隻介紹new一個任務來分離主線程之間的關聯。
正如之前提到,微軟巴巴已經将這種模型給封裝好了,隻需要在接口處理函數内、将處理子產品塞進Task中即可,不用再去new一個線程、管理這個線程的狀态、什麼時候排程、什麼時候阻塞等等一些較底層的操作。萬事有好必有壞,多線程模型建立是很簡單了,相應的實作細節對于很多入門的朋友就看不懂了。
不過,當一個對外接口(或者内部函數)采用異步模式,那麼調用端也需要進行輪詢(異步同步無所謂,看調用如何實作)處理結果,這個模式相比原來正常同步複雜許多,需要建立任務、執行任務、存儲任務狀态和結果等等,不廢話,上圖:
通過将“伺服器處理耗時”進行分離,請求主線程隻需要将相應的參數傳遞給子線程(或任務),主線程就直接傳回到用戶端,如果忽略子任務之前的邏輯時間複雜度,完全可以達到瞬間傳回到用戶端,具體時長根據不同的平台和架構不同而不同,正如之前國外有人對NET CORE和GO進行過空業務響應對比(具體連結得找找),在請求數高于100W(包括并發)和沒有任何邏輯代碼的前提下,直接請求某個接口,NET CORE隻比GO慢了近40ms(相同單機)。這樣的性能還是非常看好的。
另外,如果處理時間過長,而且子任務不能及時傳回,那将産生越來越多的任務阻塞,畢竟一個CPU是有極限的,并且伴随着或多或少的運作時錯誤,而這種錯誤是最讓我們程式員頭疼的,是以,這時我們需要加入單機隊列,來限制和防止處理和請求達到瞬時波峰,文章結尾提供一份單機隊列的代碼供大家參考:
實際證明,這樣對于使用者來說,是瞬間的,不用等待的,極大的提升了使用者體驗。不過呢,如果請求數越多,那麼越後進來的請求,等待的時間将越長,對于用戶端輪詢的時間也将變得更長。
輪詢時間 = 請求數(單機隊列數) * 單個處理耗時時間
好像比原來的點對點更糟糕了,實際我們根據這個架構進行擴充,将得到更好的體驗,請繼續接着看。
四:讓多台機器一起工作吧——叢集
先看張圖:
哇塞,一下子變得這麼複雜,好捉急啊。其實并不難了解,我們來看一看做了哪些變化:
- 單機的隊列擴充為了使用伺服器做隊列叢集;
- 增加排程任務;
- 将多個處理服務配置設定到多台機器上運作;
- 單機緩存增加到緩存叢集;
其他也就沒什麼花頭了。當請求任務過高,放入隊列中,分離前級請求和後級處理,後級處理伺服器的數量将直接影響整個平台的異步處理時間。如果非要對比單機模式,性能是随處理伺服器的數量增加而提高的。下一節我們将詳細讨論這套架構方案。
感謝閱讀!
(附上單機隊列的實作,僅供參考)
1 /// <summary>
2 /// 異步任務隊列
3 /// </summary>
4 public class AsyncTaskQueue : IDisposable
5 {
6 private bool _isDisposed;
7 private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>();
8 private Thread _thread;
9 private AutoResetEvent _autoResetEvent;
10
11 /// <summary>
12 /// 異步任務隊列
13 /// </summary>
14 public AsyncTaskQueue()
15 {
16 _autoResetEvent = new AutoResetEvent(false);
17 _thread = new Thread(InternalRuning) {IsBackground = true};
18 _thread.Start();
19 }
20
21 private bool TryGetNextTask(out AwaitableTask task)
22 {
23 task = null;
24 while (_queue.Count > 0)
25 {
26 if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) return true;
27 task.Cancel();
28 }
29
30 return false;
31 }
32
33 private AwaitableTask PenddingTask(AwaitableTask task, int maxQueueCount = 1000)
34 {
35 lock (_queue)
36 {
37 if (_queue.Count >= maxQueueCount)
38 {
39 throw new Exception($"超出最大隊列數量,maxQueueCount={maxQueueCount}");
40 }
41
42 Debug.Assert(task != null);
43 _queue.Enqueue(task);
44 _autoResetEvent.Set();
45 }
46
47 return task;
48 }
49
50 private void InternalRuning()
51 {
52 while (!_isDisposed)
53 {
54 if (_queue.Count == 0)
55 {
56 _autoResetEvent.WaitOne();
57 }
58
59 while (TryGetNextTask(out var task))
60 {
61 if (task.IsCancel) continue;
62
63 if (UseSingleThread)
64 {
65 task.RunSynchronously();
66 }
67 else
68 {
69 task.Start();
70 }
71 }
72 }
73 }
74
75 /// <summary>
76 /// 是否使用單線程完成任務.
77 /// </summary>
78 public bool UseSingleThread { get; set; } = true;
79
80 /// <summary>
81 /// 自動取消以前的任務。
82 /// </summary>
83 public bool AutoCancelPreviousTask { get; set; } = false;
84
85 /// <summary>
86 /// 執行任務
87 /// </summary>
88 /// <param name="action"></param>
89 /// <param name="maxQueueCount"></param>
90 /// <returns></returns>
91 public AwaitableTask Run(Action action, int maxQueueCount = 1000)
92 => PenddingTask(new AwaitableTask(new Task(action, new CancellationToken(false))), maxQueueCount);
93
94 /// <summary>
95 /// 執行任務
96 /// </summary>
97 /// <typeparam name="TResult"></typeparam>
98 /// <param name="function"></param>
99 /// <param name="maxQueueCount"></param>
100 /// <returns></returns>
101 public AwaitableTask<TResult> Run<TResult>(Func<TResult> function, int maxQueueCount = 1000)
102 => (AwaitableTask<TResult>) PenddingTask(new AwaitableTask<TResult>(new Task<TResult>(function)),
103 maxQueueCount);
104
105
106 /// <inheritdoc />
107 public void Dispose()
108 {
109 Dispose(true);
110 GC.SuppressFinalize(this);
111 }
112
113 /// <summary>
114 /// 析構任務隊列
115 /// </summary>
116 ~AsyncTaskQueue() => Dispose(false);
117
118 private void Dispose(bool disposing)
119 {
120 if (_isDisposed) return;
121 if (disposing)
122 {
123 _autoResetEvent.Dispose();
124 }
125
126 _thread = null;
127 _autoResetEvent = null;
128 _isDisposed = true;
129 }
130
131 /// <summary>
132 /// 可等待的任務
133 /// </summary>
134 public class AwaitableTask
135 {
136 private readonly Task _task;
137
138 /// <summary>
139 /// 初始化可等待的任務。
140 /// </summary>
141 /// <param name="task"></param>
142 public AwaitableTask(Task task) => _task = task;
143
144 /// <summary>
145 /// 任務的Id
146 /// </summary>
147 public int TaskId => _task.Id;
148
149 /// <summary>
150 /// 任務是否取消
151 /// </summary>
152 public bool IsCancel { get; private set; }
153
154 /// <summary>
155 /// 開始任務
156 /// </summary>
157 public void Start() => _task.Start();
158
159 /// <summary>
160 /// 同步執行開始任務
161 /// </summary>
162 public void RunSynchronously() => _task.RunSynchronously();
163
164 /// <summary>
165 /// 取消任務
166 /// </summary>
167 public void Cancel() => IsCancel = true;
168
169 /// <summary>
170 /// 擷取任務等待器
171 /// </summary>
172 /// <returns></returns>
173 public TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
174
175 /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary>
176 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
177 public struct TaskAwaiter : INotifyCompletion
178 {
179 private readonly AwaitableTask _task;
180
181 /// <summary>
182 /// 任務等待器
183 /// </summary>
184 /// <param name="awaitableTask"></param>
185 public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask;
186
187 /// <summary>
188 /// 任務是否完成.
189 /// </summary>
190 public bool IsCompleted => _task._task.IsCompleted;
191
192 /// <inheritdoc />
193 public void OnCompleted(Action continuation)
194 {
195 var This = this;
196 _task._task.ContinueWith(t =>
197 {
198 if (!This._task.IsCancel) continuation?.Invoke();
199 });
200 }
201
202 /// <summary>
203 /// 擷取任務結果
204 /// </summary>
205 public void GetResult() => _task._task.Wait();
206 }
207 }
208
209 /// <summary>
210 /// 可等待的任務
211 /// </summary>
212 /// <typeparam name="TResult"></typeparam>
213 public class AwaitableTask<TResult> : AwaitableTask
214 {
215 /// <summary>
216 /// 初始化可等待的任務
217 /// </summary>
218 /// <param name="task">需要執行的任務</param>
219 public AwaitableTask(Task<TResult> task) : base(task) => _task = task;
220
221
222 private readonly Task<TResult> _task;
223
224 /// <summary>
225 /// 擷取任務等待器
226 /// </summary>
227 /// <returns></returns>
228 public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
229
230 /// <summary>
231 /// 任務等待器
232 /// </summary>
233 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
234 public new struct TaskAwaiter : INotifyCompletion
235 {
236 private readonly AwaitableTask<TResult> _task;
237
238 /// <summary>
239 /// 初始化任務等待器
240 /// </summary>
241 /// <param name="awaitableTask"></param>
242 public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;
243
244 /// <summary>
245 /// 任務是否已完成
246 /// </summary>
247 public bool IsCompleted => _task._task.IsCompleted;
248
249 /// <inheritdoc />
250 public void OnCompleted(Action continuation)
251 {
252 var This = this;
253 _task._task.ContinueWith(t =>
254 {
255 if (!This._task.IsCancel) continuation?.Invoke();
256 });
257 }
258
259 /// <summary>
260 /// 擷取任務結果
261 /// </summary>
262 /// <returns></returns>
263 public TResult GetResult() => _task._task.Result;
264 }
265 }
266 }
View Code