傳回該系列目錄《基于Task的異步模式--全面介紹》
把一個流拷貝到另一個流是有用且常見的操作。Stream.CopyTo 方法在.Net 4中就已經加入來滿足要求這個功能的場景,例如在一個指定的URL處下載下傳資料:
public static byte[] DownloadData(string url)
{
using(var request = WebRequest.Create(url))
using(var response = request.GetResponse())
using(var responseStream = response.GetResponseStream())
using(var result = new MemoryStream())
{
responseStream.CopyTo(result);
return result.ToArray();
}
}
為了提高響應能力和伸縮性,我們想使用基于TAP模式來實作上面的功能。可以嘗試按下面的來做:
public static async Task<byte[]> DownloadDataAsync(string url) { using(var request = WebRequest.Create(url)) { return await Task.Run(() => { using(var response = request.GetResponse()) using(var responseStream = response.GetResponseStream()) using(var result = new MemoryStream()) { responseStream.CopyTo(result); return result.ToArray(); } } } }
此實作如果用于UI線程會提升響應能力,因為它脫離了從網絡流下載下傳資料任務的調用線程以及把該網絡流複制到最終将下載下傳的資料轉成一個數組的記憶體流。然而,該實作對伸縮性沒有效果,因為它在等待資料下載下傳的過程中,仍舊執行同步I/O和阻塞線程池線程。反之,我們想要的是下面的功能代碼:
public static async Task<byte[]> DownloadDataAsync(string url)
{
using(var request = WebRequest.Create(url))
using(var response = await request.GetResponseAsync())
using(var responseStream = response.GetResponseStream())
using(var result = new MemoryStream())
{
await responseStream.CopyToAsync(result);
return result.ToArray();
}
}
不幸的是,在.Net 4中缺少異步的CopyToAsync方法,隻有Stream類有一個同步的CopyTo方法。現在我們就自己提供一個實作:
public static void CopyTo(this Stream source, Stream destination)
{
var buffer = new byte[0x1000];
int bytesRead;
while((bytesRead = source.Read(buffer, 0, buffer.Length)) > 0)
{
destination.Write(buffer, 0, bytesRead);
}
}
為了提供一個異步的CopyTo實作,我們可以利用編譯器實作TAP的能力,稍微地修改這個實作:
public static async Task CopyToAsync(this Stream source, Stream destination)
{
var buffer = new byte[0x1000];
int bytesRead;
while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
await destination.WriteAsync(buffer, 0, bytesRead);
}
}
這裡我們将傳回類型從void改成了Task,将Read和Write分别換成了ReadAsync和WriteAsync,并且在ReadAsync和WriteAsync的調用前加了與上下文相關的await關鍵字字首。.Net 4 中不存在ReadAsycn和WriteAsync,但是可以通過基于Task.Factory.FromAsync實作,關于這個描述在上一篇随筆中的“Tasks和APM”章節講過:
public static Task<int> ReadAsync(
this Stream source, byte [] buffer, int offset, int count)
{
return Task<int>.Factory.FromAsync(source.BeginRead, source.EndRead,
buffer, offset, count, null);
}
public static Task WriteAsync(
this Stream destination, byte [] buffer, int offset, int count)
{
return Task.Factory.FromAsync(
destination.BeginWrite, destination.EndWrite,
buffer, offset, count, null);
}
有了這些方法,我們可以成功地實作CopyToAsync方法。我們也可以通過添加一個CancellationToken到方法中以支援撤銷請求,該CancellationToken将會在複制過程中的每次讀寫之後被監控到(如果ReadAsync和WriteAsync支援撤銷,那麼也可以将CancellationToken線程化到那些調用中):
public static async Task CopyToAsync( this Stream source, Stream destination, CancellationToken cancellationToken) { var buffer = new byte[0x1000]; int bytesRead; while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0) { await destination.WriteAsync(buffer, 0, bytesRead); cancellationToken.ThrowIfCancellationRequested(); } }
【注意這種撤銷在同步的CopyTo實作中也是有用的,傳入的CancellationToken會啟用撤銷。實作會依賴一個從該方法傳回的可取消的對象,但實作接收到那麼對象已經太晚了,因為同步調用完成時,已經沒有留下要取消的東西了。】
我們也加入了進度通知的支援,包括至今已經複制了多少資料:
public static async Task CopyToAsync(
this Stream source, Stream destination,
CancellationToken cancellationToken,
IProgress<long> progress)
{
var buffer = new byte[0x1000];
int bytesRead;
long totalRead = 0;
while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
await destination.WriteAsync(buffer, 0, bytesRead);
cancellationToken.ThrowIfCancellationRequested();
totalRead += bytesRead;
progress.Report(totalRead);
}
}
有了該方法,我們現在可以完全實作我們的DownloadDataAsync方法了,包括加入撤銷和進度支援:
public static async Task<byte[]> DownloadDataAsync(
string url,
CancellationToken cancellationToken,
IProgress<long> progress)
{
using(var request = WebRequest.Create(url))
using(var response = await request.GetResponseAsync())
using(var responseStream = response.GetResponseStream())
using(var result = new MemoryStream())
{
await responseStream.CopyToAsync(
result, cancellationToken, progress);
return result.ToArray();
}
}
給我們的CopyToAsync方法做進一步的優化也是可能的。比如,如果我們要使用兩個buffer而不是一個,就可以在讀取下一片資料時寫入之前讀取的資料,是以如果讀取和寫入都使用了異步了I/O就會産生交叉延遲:
public static async Task CopyToAsync(this Stream source, Stream destination)
{
int i = 0;
var buffers = new [] { new byte[0x1000], new byte[0x1000] };
Task writeTask = null;
while(true)
{
var readTask = source.ReadAsync(buffers[i], 0, buffers[i].Length))>0;
if (writeTask != null) await Task.WhenAll(readTask, writeTask);
int bytesRead = await readTask;
if (bytesRead == 0) break;
writeTask = destination.WriteAsync(buffers[i], 0, bytesRead);
i ^= 1; // swap buffers
}
}
消除不必要的上下文轉換是另一個優化。正如之前提到的,預設await一個Task開始執行的時候,會傳輸回到目前的SynchronizationContext。在CopyToAsynch實作的情況下,使用這樣的轉換時沒必要的,因為我們沒有操作任何UI狀态。我們可以發揮Task.ConfigureAwait的優勢類關閉這個自動的轉換。為了簡化,上面的原始異步的實作修改如下:
public static Task CopyToAsync(this Stream source, Stream destination)
{
var buffer = new byte[0x1000];
int bytesRead;
while((bytesRead = await
source.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false)) > 0)
{
await destination.WriteAsync(buffer, 0, bytesRead)
.ConfigureAwait(false);
}
}
如果您認為這篇文章還不錯或者有所收獲,您可以通過右邊的“打賞”功能 打賞我一杯咖啡【物質支援】,也可以點選右下角的【好文要頂】按鈕【精神支援】,因為這兩種支援都是我繼續寫作,分享的最大動力!
作者:tkb至簡
來源:http://farb.cnblogs.com/
聲明:原創部落格請在轉載時保留原文連結或者在文章開頭加上本人部落格位址,如發現錯誤,歡迎批評指正。凡是轉載于本人的文章,不能設定打賞功能,如有特殊需求請與本人聯系!
已将所有贊助者統一放到單獨頁面!簽名處隻保留最近10條贊助記錄!檢視贊助者清單
衷心感謝打賞者的厚愛與支援!也感謝點贊和評論的園友的支援! | ||
---|---|---|
打賞者 | 打賞金額 | 打賞日期 |
微信:匿名 | 10.00 | 2017-08-03 |
2017-08-04 | ||
5.00 | 2017-06-15 | |
支付寶:一個名字499***@qq.com | 2017-06-14 | |
16.00 | 2017-04-08 | |
支付寶:向京劉 | 2017-04-13 | |
2017-003-08 | ||
2017-03-08 | ||
支付寶:lll20001155 | 2017-03-03 | |
支付寶:她是一個弱女子 | 2017-03-02 |