天天看點

使用.NET Core搭建分布式音頻效果處理服務(四)選擇垂直擴充還是水準擴充?

使用.NET Core搭建分布式音頻效果處理服務(四)選擇垂直擴充還是水準擴充?

衆所周知垂直擴充是提升單機的性能的方式,比如提升雙路、四路的CPU運算能力,加大記憶體,更換速度更快的SSD,或者從代碼根本上進行優化和性能提升。水準擴充是提供多台多種伺服器分離單機性能的方式,比如叢集,主從,隊列,負載平衡等等。

 白話的垂直擴充

現在伺服器都是雲伺服器,單純從單機的硬體性能提升整體性能,可能已經不太适用,而從代碼上,其實還有些功課可以做,即使不多:

  1. 優化多線程協調模式,優化多線程下資源共享問題,以免出現奇怪的運作時錯誤。
  2. 改同步為異步:此方法提升的是吞吐率,性能并不能提升,不過對于用戶端響應也算是件好事吧。
  3. 使用磁盤預讀模式,極小幅度提升IO性能。
  4. 使用單機任務隊列,強制任務有序進行:此方法在單機上不會提升性能,甚至會減少原本的單機吞吐率,但是卻能保證任務在同一時刻的完整性。

我們先從垂直擴充中壓榨單機的性能,同時還要保證穩定性,甚至穩定性比單機極限性能更加重要,為何?因為多線程(web伺服器都是多線程模型)資源互斥問題,會讓你查找問題的時候抓狂(當然,如果你要通路的資源隻是單個,就另當别論了)。是以,很多時候我們通常會加鎖來避免這類事情發生(鎖的問題和功能我們這裡不讨論),雖然犧牲了性能,但卻換來了每次高請求所帶來的穩定性。

其次,我們知道,web伺服器都屬于多線程模型,這樣設計的目的是為了提高該伺服器的整體吞吐量(不同伺服器語言采用不同的線程開辟模式,例如java使用的是系統級的線程),當一個線程正在接近滿負荷的處理目前的任務,緊接着馬上又來一個請求(系統不會因為目前正在運作任務而終止新的請求),那麼将是雪上加霜的,多個任務同時長時間在搶占同一個CPU資源,無疑是對整體影響甚大的。

言歸正傳,我們在單機上面針對這類問題,既要盡可能的減少處理時間,又要絕對保證整體運作期間的穩定性(後期會介紹如何使用熔斷機制提升多台伺服器系統的整體穩定性)。

上一節,我們已經建立了一個同步的接口,下面我們将這個接口稍作改動,使其成為包含異步任務方法的接口,整體代碼就不貼上來了,以免影響篇幅

  1. 如果你喜歡手動建立與管理任務,那麼你可以new一個Task<TResult>執行個體。
  2. 如果你喜歡讓系統為你管理該任務狀态,那麼你可以Task.Factary.StartNew來建立一個執行個體。

 白話的水準擴充

當一條街道上的小區越來越多,用水越來越大,而住戶反應水壓卻越來越小,你是考慮增加主管道通水量大小、還是考慮增加每個小區的增壓泵的功率、還是考慮增加主管道的數量(目前筆者小區就遇到水壓不夠的情況)。

在軟體工程項目中,其實伺服器TPS跟水壓是同一個概念。

  1. 加大主水管道(如同提升CPU、記憶體)始終會有一個極限;
  2. 增加每個小區增壓泵的功率(如同用戶端使用大量的輪詢,可主管道出口就隻有那麼點點量)始終要求比得到的多得多;

是以換句話說:

  1. 增加伺服器數量(畢竟伺服器比自來水廠容易建設:-)),提升管道入口的處理能力;
  2. 增加不同伺服器類型,例如隊列伺服器,負載伺服器,緩存伺服器等等中間伺服器,分攤和分離不同功能分到而行,如同主管道的分流閥,節流閥,增壓泵等等;
  3. 增加帶寬(這個是肯定的,提升TPS帶寬肯定也是主要的);

一:點對點——原裝

當然,如果接口已經成為了異步模式(本質其實是提前傳回請求,但并沒有傳回請求所處理的結果),那麼還需要一個接口來告訴用戶端處理的結果,用戶端通過該接口的輪詢獲得實時的結果。

在筆者介紹的這個服務中,流程架構如下:

  非常簡單的點對點模式,使用者請求一次,等待伺服器處理響應完成後釋放,所有内容均采用同步方式進行,得到結果是:

使用.NET Core搭建分布式音頻效果處理服務(四)選擇垂直擴充還是水準擴充?

 非常簡單的點對點模式,使用者請求一次,等待伺服器處理響應完成後釋放,所有内容均采用同步方式進行,得到結果是:

使用者等待時間 = 伺服器處理時間

如果使用者上傳10秒,而伺服器處理需要4秒,那麼這個等待對于使用者來說,是極為煎熬的。

也許聰明的你會說,在用戶端給個友好的提示,比如讓一個“風火輪”不停的轉動,當處理完成後隐藏掉。的确,這從另一個角度上看确實也行得通(比如成本因素),但我們不讨論使用者的視覺和等待等感覺上的東西,隻讨論從技術上如何讓這個響應時間更快,能快到幾乎讓使用者察覺不到。

二:使用并行任務——小幅提升

單機并行模式大家應該都明白,畢竟現在CPU都是多核的了,幹嘛要讓其他CPU閑着呢,不管是JAVA還是C#,目前主流語言都可以完美的執行并行任務(python開多程序其實也算),各種文法請自行Google,既然文章标題是Net,那麼筆者就少量的複制一下C#的代碼。

Parallel.Invoke(() => { },() => { });      

哇撒,真的很少,就是C#中的一個并行執行的語句而已,自己需要并行執行的代碼放入花括号中就行,換成流程結構圖如下:

使用.NET Core搭建分布式音頻效果處理服務(四)選擇垂直擴充還是水準擴充?

畫的很搓,歡迎拍磚。

筆者采用的CPU是I7-2700K,并行任務狀态确實使用了起來,但減少時間卻隻有1秒,很不可思議,或許是筆者的代碼優化不夠好吧(并行原理和理想結果為何有出入請自行Google),是以就不毛遂自薦的貼上來了:-),但是這個3秒時間我會跟他死磕到底——使用者不能等。

三:分離使用者請求和耗時處理——異步

 當朋友們看到這裡的時候,或許心裡早就想到用異步的方式來實作C/S的接口請求了,對,但我們還是需要走一下流程,梳理一下思路。請繼續接着看。

異步其實就是多線程,隻是目前由于進階語言的發展,已經将線程的難點給隐藏掉了,在一個請求主線程中,建立一個異步線程(或任務),分離主線程的長時間處理耗時,将這塊難啃的骨頭交給子線程去做,自己隻管輕松的執行到return,是的确很舒服哦(筆者也夢想擁有這樣的碼磚方式,o(∩_∩)o 哈哈),new一個線程我們不做介紹,畢竟他的管理模式是純手動的、并且是複雜的,我們隻介紹new一個任務來分離主線程之間的關聯。

正如之前提到,微軟巴巴已經将這種模型給封裝好了,隻需要在接口處理函數内、将處理子產品塞進Task中即可,不用再去new一個線程、管理這個線程的狀态、什麼時候排程、什麼時候阻塞等等一些較底層的操作。萬事有好必有壞,多線程模型建立是很簡單了,相應的實作細節對于很多入門的朋友就看不懂了。

不過,當一個對外接口(或者内部函數)采用異步模式,那麼調用端也需要進行輪詢(異步同步無所謂,看調用如何實作)處理結果,這個模式相比原來正常同步複雜許多,需要建立任務、執行任務、存儲任務狀态和結果等等,不廢話,上圖:

使用.NET Core搭建分布式音頻效果處理服務(四)選擇垂直擴充還是水準擴充?

通過将“伺服器處理耗時”進行分離,請求主線程隻需要将相應的參數傳遞給子線程(或任務),主線程就直接傳回到用戶端,如果忽略子任務之前的邏輯時間複雜度,完全可以達到瞬間傳回到用戶端,具體時長根據不同的平台和架構不同而不同,正如之前國外有人對NET CORE和GO進行過空業務響應對比(具體連結得找找),在請求數高于100W(包括并發)和沒有任何邏輯代碼的前提下,直接請求某個接口,NET CORE隻比GO慢了近40ms(相同單機)。這樣的性能還是非常看好的。

另外,如果處理時間過長,而且子任務不能及時傳回,那将産生越來越多的任務阻塞,畢竟一個CPU是有極限的,并且伴随着或多或少的運作時錯誤,而這種錯誤是最讓我們程式員頭疼的,是以,這時我們需要加入單機隊列,來限制和防止處理和請求達到瞬時波峰,文章結尾提供一份單機隊列的代碼供大家參考:

實際證明,這樣對于使用者來說,是瞬間的,不用等待的,極大的提升了使用者體驗。不過呢,如果請求數越多,那麼越後進來的請求,等待的時間将越長,對于用戶端輪詢的時間也将變得更長。

輪詢時間 = 請求數(單機隊列數) * 單個處理耗時時間

好像比原來的點對點更糟糕了,實際我們根據這個架構進行擴充,将得到更好的體驗,請繼續接着看。

四:讓多台機器一起工作吧——叢集

先看張圖:

使用.NET Core搭建分布式音頻效果處理服務(四)選擇垂直擴充還是水準擴充?

哇塞,一下子變得這麼複雜,好捉急啊。其實并不難了解,我們來看一看做了哪些變化:

  1. 單機的隊列擴充為了使用伺服器做隊列叢集;
  2. 增加排程任務;
  3. 将多個處理服務配置設定到多台機器上運作;
  4. 單機緩存增加到緩存叢集;

其他也就沒什麼花頭了。當請求任務過高,放入隊列中,分離前級請求和後級處理,後級處理伺服器的數量将直接影響整個平台的異步處理時間。如果非要對比單機模式,性能是随處理伺服器的數量增加而提高的。下一節我們将詳細讨論這套架構方案。

感謝閱讀!

(附上單機隊列的實作,僅供參考)

使用.NET Core搭建分布式音頻效果處理服務(四)選擇垂直擴充還是水準擴充?
使用.NET Core搭建分布式音頻效果處理服務(四)選擇垂直擴充還是水準擴充?
1 /// <summary>
  2 /// 異步任務隊列
  3 /// </summary>
  4 public class AsyncTaskQueue : IDisposable
  5 {
  6     private bool _isDisposed;
  7     private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>();
  8     private Thread _thread;
  9     private AutoResetEvent _autoResetEvent;
 10 
 11     /// <summary>
 12     /// 異步任務隊列
 13     /// </summary>
 14     public AsyncTaskQueue()
 15     {
 16         _autoResetEvent = new AutoResetEvent(false);
 17         _thread = new Thread(InternalRuning) {IsBackground = true};
 18         _thread.Start();
 19     }
 20 
 21     private bool TryGetNextTask(out AwaitableTask task)
 22     {
 23         task = null;
 24         while (_queue.Count > 0)
 25         {
 26             if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) return true;
 27             task.Cancel();
 28         }
 29 
 30         return false;
 31     }
 32 
 33     private AwaitableTask PenddingTask(AwaitableTask task, int maxQueueCount = 1000)
 34     {
 35         lock (_queue)
 36         {
 37             if (_queue.Count >= maxQueueCount)
 38             {
 39                 throw new Exception($"超出最大隊列數量,maxQueueCount={maxQueueCount}");
 40             }
 41 
 42             Debug.Assert(task != null);
 43             _queue.Enqueue(task);
 44             _autoResetEvent.Set();
 45         }
 46 
 47         return task;
 48     }
 49 
 50     private void InternalRuning()
 51     {
 52         while (!_isDisposed)
 53         {
 54             if (_queue.Count == 0)
 55             {
 56                 _autoResetEvent.WaitOne();
 57             }
 58 
 59             while (TryGetNextTask(out var task))
 60             {
 61                 if (task.IsCancel) continue;
 62 
 63                 if (UseSingleThread)
 64                 {
 65                     task.RunSynchronously();
 66                 }
 67                 else
 68                 {
 69                     task.Start();
 70                 }
 71             }
 72         }
 73     }
 74 
 75     /// <summary>
 76     /// 是否使用單線程完成任務.
 77     /// </summary>
 78     public bool UseSingleThread { get; set; } = true;
 79 
 80     /// <summary>
 81     /// 自動取消以前的任務。
 82     /// </summary>
 83     public bool AutoCancelPreviousTask { get; set; } = false;
 84 
 85     /// <summary>
 86     /// 執行任務
 87     /// </summary>
 88     /// <param name="action"></param>
 89     /// <param name="maxQueueCount"></param>
 90     /// <returns></returns>
 91     public AwaitableTask Run(Action action, int maxQueueCount = 1000)
 92         => PenddingTask(new AwaitableTask(new Task(action, new CancellationToken(false))), maxQueueCount);
 93 
 94     /// <summary>
 95     /// 執行任務
 96     /// </summary>
 97     /// <typeparam name="TResult"></typeparam>
 98     /// <param name="function"></param>
 99     /// <param name="maxQueueCount"></param>
100     /// <returns></returns>
101     public AwaitableTask<TResult> Run<TResult>(Func<TResult> function, int maxQueueCount = 1000)
102         => (AwaitableTask<TResult>) PenddingTask(new AwaitableTask<TResult>(new Task<TResult>(function)),
103             maxQueueCount);
104 
105 
106     /// <inheritdoc />
107     public void Dispose()
108     {
109         Dispose(true);
110         GC.SuppressFinalize(this);
111     }
112 
113     /// <summary>
114     /// 析構任務隊列
115     /// </summary>
116     ~AsyncTaskQueue() => Dispose(false);
117 
118     private void Dispose(bool disposing)
119     {
120         if (_isDisposed) return;
121         if (disposing)
122         {
123             _autoResetEvent.Dispose();
124         }
125 
126         _thread = null;
127         _autoResetEvent = null;
128         _isDisposed = true;
129     }
130 
131     /// <summary>
132     /// 可等待的任務
133     /// </summary>
134     public class AwaitableTask
135     {
136         private readonly Task _task;
137 
138         /// <summary>
139         /// 初始化可等待的任務。
140         /// </summary>
141         /// <param name="task"></param>
142         public AwaitableTask(Task task) => _task = task;
143 
144         /// <summary>
145         /// 任務的Id
146         /// </summary>
147         public int TaskId => _task.Id;
148 
149         /// <summary>
150         /// 任務是否取消
151         /// </summary>
152         public bool IsCancel { get; private set; }
153 
154         /// <summary>
155         /// 開始任務
156         /// </summary>
157         public void Start() => _task.Start();
158 
159         /// <summary>
160         /// 同步執行開始任務
161         /// </summary>
162         public void RunSynchronously() => _task.RunSynchronously();
163 
164         /// <summary>
165         /// 取消任務
166         /// </summary>
167         public void Cancel() => IsCancel = true;
168 
169         /// <summary>
170         /// 擷取任務等待器
171         /// </summary>
172         /// <returns></returns>
173         public TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
174 
175         /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary>
176         [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
177         public struct TaskAwaiter : INotifyCompletion
178         {
179             private readonly AwaitableTask _task;
180 
181             /// <summary>
182             /// 任務等待器
183             /// </summary>
184             /// <param name="awaitableTask"></param>
185             public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask;
186 
187             /// <summary>
188             /// 任務是否完成.
189             /// </summary>
190             public bool IsCompleted => _task._task.IsCompleted;
191 
192             /// <inheritdoc />
193             public void OnCompleted(Action continuation)
194             {
195                 var This = this;
196                 _task._task.ContinueWith(t =>
197                 {
198                     if (!This._task.IsCancel) continuation?.Invoke();
199                 });
200             }
201 
202             /// <summary>
203             /// 擷取任務結果
204             /// </summary>
205             public void GetResult() => _task._task.Wait();
206         }
207     }
208 
209     /// <summary>
210     /// 可等待的任務
211     /// </summary>
212     /// <typeparam name="TResult"></typeparam>
213     public class AwaitableTask<TResult> : AwaitableTask
214     {
215         /// <summary>
216         /// 初始化可等待的任務
217         /// </summary>
218         /// <param name="task">需要執行的任務</param>
219         public AwaitableTask(Task<TResult> task) : base(task) => _task = task;
220 
221 
222         private readonly Task<TResult> _task;
223 
224         /// <summary>
225         /// 擷取任務等待器
226         /// </summary>
227         /// <returns></returns>
228         public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
229 
230         /// <summary>
231         /// 任務等待器
232         /// </summary>
233         [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
234         public new struct TaskAwaiter : INotifyCompletion
235         {
236             private readonly AwaitableTask<TResult> _task;
237 
238             /// <summary>
239             /// 初始化任務等待器
240             /// </summary>
241             /// <param name="awaitableTask"></param>
242             public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;
243 
244             /// <summary>
245             /// 任務是否已完成
246             /// </summary>
247             public bool IsCompleted => _task._task.IsCompleted;
248 
249             /// <inheritdoc />
250             public void OnCompleted(Action continuation)
251             {
252                 var This = this;
253                 _task._task.ContinueWith(t =>
254                 {
255                     if (!This._task.IsCancel) continuation?.Invoke();
256                 });
257             }
258 
259             /// <summary>
260             /// 擷取任務結果
261             /// </summary>
262             /// <returns></returns>
263             public TResult GetResult() => _task._task.Result;
264         }
265     }
266 }      

View Code