TPL 資料流庫向具有高吞吐量和低滞後時間的占用大量 CPU 和 I/O 操作的應用程式的并行化和消息傳遞提供了基礎。 它還能顯式控制緩存資料的方式以及在系統中移動的方式。 為了更好地了解資料流程式設計模型,請考慮一個以異步方式從磁盤加載圖像并建立複合圖像的應用程式。 傳統程式設計模型通常需要使用回調和同步對象(例如鎖)來協調任務和通路共享資料。 通過使用資料流程式設計模型,您可以從磁盤讀取時建立處理圖像的資料流對象。 在資料流模型下,您可以聲明當資料可用時的處理方式,以及資料之間的所有依賴項。 由于運作時管理資料之間的依賴項,是以通常可以避免這種要求來同步通路共享資料。 此外,因為運作時計劃基于資料的異步到達,是以資料流可以通過有效管理基礎線程提高響應能力和吞吐量。
System.Threading.Tasks.Dataflow 命名空間提供基于角色的程式設計模型,用以支援粗粒度資料流和流水線操作任務的程序内消息傳遞。TDP的主要作用就是Buffering Data和Processing Data,在TDF中,有兩個非常重要的接口,ISourceBlock<T> 和ITargetBlock<T>接口。繼承于ISourceBlock<T>的對象時作為提供資料的資料源對象-生産者,而繼承于ITargetBlock<T>接口類主要是扮演目标對象-消費者。在這個類庫中,System.Threading.Tasks.Dataflow名稱空間下,提供了很多以Block名字結尾的類,ActionBlock,BufferBlock,TransformBlock,BroadcastBlock等9個Block,我們在開發中通常使用單個或多個Block組合的方式來實作一些功能,以下逐個來簡單介紹一下。
BufferBlock
BufferBlock是TDF中最基礎的Block。BufferBlock提供了一個有界限或沒有界限的Buffer,該Buffer中存儲T。該Block很像BlockingCollection<T>。可以用過Post往裡面添加資料,也可以通過Receive方法阻塞或異步的的擷取資料,資料處理的順序是FIFO的。它也可以通過Link向其他Block輸出資料。

private static BufferBlock<int> m_buffer = new BufferBlock<int>();
// Producer
private static void Producer()
{
while(true)
{
int item = Produce();
m_buffer.Post(item);
}
}
// Consumer
private static void Consumer()
{
while(true)
{
int item = m_buffer.Receive();
Process(item);
}
}
// Main
public static void Main()
{
var p = Task.Factory.StartNew(Producer);
var c = Task.Factory.StartNew(Consumer);
Task.WaitAll(p,c);
}
ActionBlock
ActionBlock實作ITargetBlock,說明它是消費資料的,也就是對輸入的一些資料進行處理。它在構造函數中,允許輸入一個委托,來對每一個進來的資料進行一些操作。如果使用Action(T)委托,那說明每一個資料的處理完成需要等待這個委托方法結束,如果使用了Func<TInput, Task>)來構造的話,那麼資料的結束将不是委托的傳回,而是Task的結束。預設情況下,ActionBlock會FIFO的處理每一個資料,而且一次隻能處理一個資料,一個處理完了再處理第二個,但也可以通過配置來并行的執行多個資料。
先看一個例子:
public ActionBlock<int> abSync = new ActionBlock<int>((i) =>
{
Thread.Sleep(1000);
Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
}
);
public void TestSync()
{
for (int i = 0; i < 10; i++)
{
abSync.Post(i);
}
Console.WriteLine("Post finished");
}
可見,ActionBlock是順序處理資料的,這也是ActionBlock一大特性之一。主線程在往ActionBlock中Post資料以後馬上傳回,具體資料的處理是另外一個線程來做的。資料是異步處理的,但處理本身是同步的,這樣在一定程度上保證資料處理的準确性。下面的例子是使用async和await。
public ActionBlock<int> abSync2 = new ActionBlock<int>(async (i) =>
{
await Task.Delay(1000);
Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
}
雖然還是1秒鐘處理一個資料,但是處理資料的線程會有不同。
如果你想異步處理多個消息的話,ActionBlock也提供了一些接口,讓你輕松實作。在ActionBlock的構造函數中,可以提供一個ExecutionDataflowBlockOptions的類型,讓你定義ActionBlock的執行選項,在下面了例子中,我們定義了MaxDegreeOfParallelism選項,設定為3。目的的讓ActionBlock中的Item最多可以3個并行處理。
public ActionBlock<int> abAsync = new ActionBlock<int>((i) =>
{
Thread.Sleep(1000);
Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
}
, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 3 });
public void TestAsync()
{
for (int i = 0; i < 10; i++)
{
abAsync.Post(i);
}
Console.WriteLine("Post finished");
}
運作程式,我們看見,每3個資料幾乎同時處理,并且他們的線程ID也是不一樣的。
ActionBlock也有自己的生命周期,所有繼承IDataflowBlock的類型都有Completion屬性和Complete方法。調用Complete方法是讓ActionBlock停止接收資料,而Completion屬性則是一個Task,是在ActionBlock處理完所有資料時候會執行的任務,我們可以使用Completion.Wait()方法來等待ActionBlock完成所有的任務,Completion屬性隻有在設定了Complete方法後才會有效。
public void TestAsync()
{
for (int i = 0; i < 10; i++)
{
abAsync.Post(i);
}
abAsync.Complete();
Console.WriteLine("Post finished");
abAsync.Completion.Wait();
Console.WriteLine("Process finished");
}
TransformBlock
TransformBlock是TDF提供的另一種Block,顧名思義它常常在資料流中充當資料轉換處理的功能。在TransformBlock内部維護了2個Queue,一個InputQueue,一個OutputQueue。InputQueue存儲輸入的資料,而通過Transform處理以後的資料則放在OutputQueue,OutputQueue就好像是一個BufferBlock。最終我們可以通過Receive方法來阻塞的一個一個擷取OutputQueue中的資料。TransformBlock的Completion.Wait()方法隻有在OutputQueue中的資料為0的時候才會傳回。
舉個例子,我們有一組網址的URL,我們需要對每個URL下載下傳它的HTML資料并存儲。那我們通過如下的代碼來完成:
public TransformBlock<string, string> tbUrl = new TransformBlock<string, string>((url) =>
{
WebClient webClient = new WebClient();
return webClient.DownloadString(new Uri(url));
}
public void TestDownloadHTML()
{
tbUrl.Post("www.baidu.com");
tbUrl.Post("www.sina.com.cn");
string baiduHTML = tbUrl.Receive();
string sinaHTML = tbUrl.Receive();
}
當然,Post操作和Receive操作可以在不同的線程中進行,Receive操作同樣也是阻塞操作,在OutputQueue中有可用的資料時,才會傳回。
TransformManyBlock
TransformManyBlock和TransformBlock非常類似,關鍵的不同點是,TransformBlock對應于一個輸入資料隻有一個輸出資料,而TransformManyBlock可以有多個,及可以從InputQueue中取一個資料出來,然後放多個資料放入到OutputQueue中。
TransformManyBlock<int, int> tmb = new TransformManyBlock<int, int>((i) => { return new int[] { i, i + 1 }; });
ActionBlock<int> ab = new ActionBlock<int>((i) => Console.WriteLine(i));
public void TestSync()
{
tmb.LinkTo(ab);
for (int i = 0; i < 4; i++)
{
tmb.Post(i);
}
Console.WriteLine("Finished post");
}
BroadcastBlock
BroadcastBlock的作用不像BufferBlock,它是使命是讓所有和它相聯的目标Block都收到資料的副本,這點從它的命名上面就可以看出來了。還有一點不同的是,BroadcastBlock并不儲存資料,在每一個資料被發送到所有接收者以後,這條資料就會被後面最新的一條資料所覆寫。如沒有目标Block和BroadcastBlock相連的話,資料将被丢棄。但BroadcastBlock總會儲存最後一個資料,不管這個資料是不是被發出去過,如果有一個新的目标Block連上來,那麼這個Block将收到這個最後一個資料。
BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; });
ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i));
ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i));
ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i));
public void TestSync()
{
bb.LinkTo(displayBlock);
bb.LinkTo(saveBlock);
bb.LinkTo(sendBlock);
for (int i = 0; i < 4; i++)
{
bb.Post(i);
}
Console.WriteLine("Post finished");
}
如果我們在Post以後再添加連接配接Block的話,那些Block就隻會收到最後一個資料了。
public void TestSync()
{
for (int i = 0; i < 4; i++)
{
bb.Post(i);
}
Thread.Sleep(5000);
bb.LinkTo(displayBlock);
bb.LinkTo(saveBlock);
bb.LinkTo(sendBlock);
Console.WriteLine("Post finished");
}
WriteOnceBlock
如果說BufferBlock是最基本的Block,那麼WriteOnceBock則是最最簡單的Block。它最多隻能存儲一個資料,一旦這個資料被發送出去以後,這個資料還是會留在Block中,但不會被删除或被新來的資料替換,同樣所有的接收者都會收到這個資料的備份。
和BroadcastBlock同樣的代碼,但是結果不一樣:
WriteOnceBlock<int> bb = new WriteOnceBlock<int>((i) => { return i; });
ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i));
ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i));
ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i));
public void TestSync()
{
bb.LinkTo(displayBlock);
bb.LinkTo(saveBlock);
bb.LinkTo(sendBlock);
for (int i = 0; i < 4; i++)
{
bb.Post(i);
}
Console.WriteLine("Post finished");
}
WriteOnceBock隻會接收一次資料。而且始終保留那個資料。
同樣使用Receive方法來擷取資料也是一樣的結果,擷取到的都是第一個資料:
public void TestReceive()
{
for (int i = 0; i < 4; i++)
{
bb.Post(i);
}
Console.WriteLine("Post finished");
Console.WriteLine("1st Receive:" + bb.Receive());
Console.WriteLine("2nd Receive:" + bb.Receive());
Console.WriteLine("3rd Receive:" + bb.Receive());
}
BatchBlock
BatchBlock提供了能夠把多個單個的資料組合起來處理的功能,如上圖。應對有些需求需要固定多個資料才能處理的問題。在構造函數中需要制定多少個為一個Batch,一旦它收到了那個數量的資料後,會打包放在它的OutputQueue中。當BatchBlock被調用Complete告知Post資料結束的時候,會把InputQueue中餘下的資料打包放入OutputQueue中等待處理,而不管InputQueue中的資料量是不是滿足構造函數的數量。
BatchBlock<int> bb = new BatchBlock<int>(3);
ActionBlock<int[]> ab = new ActionBlock<int[]>((i) =>
{
string s = string.Empty;
foreach (int m in i)
{
s += m + " ";
}
Console.WriteLine(s);
});
public void TestSync()
{
bb.LinkTo(ab);
for (int i = 0; i < 10; i++)
{
bb.Post(i);
}
bb.Complete();
Console.WriteLine("Finished post");
}
BatchBlock執行資料有兩種模式:貪婪模式和非貪婪模式。貪婪模式是預設的。貪婪模式是指任何Post到BatchBlock,BatchBlock都接收,并等待個數滿了以後處理。非貪婪模式是指BatchBlock需要等到構造函數中設定的BatchSize個數的Source都向BatchBlock發資料,Post資料的時候才會處理。不然都會留在Source的Queue中。也就是說BatchBlock可以使用在每次從N個Source那個收一個資料打包處理或從1個Source那裡收N個資料打包處理。這裡的Source是指其他的繼承ISourceBlock的,用LinkTo連接配接到這個BatchBlock的Block。
在另一個構造參數中GroupingDataflowBlockOptions,可以通過設定Greedy屬性來選擇是否貪婪模式和MaxNumberOfGroups來設定最大産生Batch的數量,如果到達了這個數量,BatchBlock将不會再接收資料。
JoinBlock
JoinBlock一看名字就知道是需要和兩個或兩個以上的Source Block相連接配接的。它的作用就是等待一個資料組合,這個組合需要的資料都到達了,它才會處理資料,并把這個組合作為一個Tuple傳遞給目标Block。舉個例子,如果定義了JoinBlock<int, string>類型,那麼JoinBlock内部會有兩個ITargetBlock,一個接收int類型的資料,一個接收string類型的資料。那隻有當兩個ITargetBlock都收到各自的資料後,才會放到JoinBlock的OutputQueue中,輸出。
JoinBlock<int, string> jb = new JoinBlock<int, string>();
ActionBlock<Tuple<int, string>> ab = new ActionBlock<Tuple<int, string>>((i) =>
{
Console.WriteLine(i.Item1 + " " + i.Item2);
});
public void TestSync()
{
jb.LinkTo(ab);
for (int i = 0; i < 5; i++)
{
jb.Target1.Post(i);
}
for (int i = 5; i > 0; i--)
{
Thread.Sleep(1000);
jb.Target2.Post(i.ToString());
}
Console.WriteLine("Finished post");
}
BatchedJoinBlock
BatchedJoinBlock一看就是BacthBlock和JoinBlick的組合。JoinBlick是組合目标隊列的一個資料,而BatchedJoinBlock是組合目标隊列的N個資料,當然這個N可以在構造函數中配置。如果我們定義的是BatchedJoinBlock<int, string>, 那麼在最後的OutputQueue中存儲的是Tuple<IList<int>, IList<string>>,也就是說最後得到的資料是Tuple<IList<int>, IList<string>>。它的行為是這樣的,還是假設上文的定義,BatchedJoinBlock<int, string>, 構造BatchSize輸入為3。那麼在這個BatchedJoinBlock種會有兩個ITargetBlock,會接收Post的資料。那什麼時候會生成一個Tuple<IList<int>,IList<string>>到OutputQueue中呢,測試下來并不是我們想的需要有3個int資料和3個string資料,而是隻要2個ITargetBlock中的資料個數加起來等于3就可以了。3和0,2和1,1和2或0和3的組合都會生成Tuple<IList<int>,IList<string>>到OutputQueue中。可以參看下面的例子:
BatchedJoinBlock<int, string> bjb = new BatchedJoinBlock<int, string>(3);
ActionBlock<Tuple<IList<int>, IList<string>>> ab = new ActionBlock<Tuple<IList<int>, IList<string>>>((i) =>
{
Console.WriteLine("-----------------------------");
foreach (int m in i.Item1)
{
Console.WriteLine(m);
};
foreach (string s in i.Item2)
{
Console.WriteLine(s);
};
});
public void TestSync()
{
bjb.LinkTo(ab);
for (int i = 0; i < 5; i++)
{
bjb.Target1.Post(i);
}
for (int i = 5; i > 0; i--)
{
bjb.Target2.Post(i.ToString());
}
Console.WriteLine("Finished post");
}
最後剩下的一個資料1,由于沒有滿3個,是以一直被保留在Target2中。
TDF中最有用的功能之一就是多個Block之間可以組合應用。ISourceBlock可以連接配接ITargetBlock,一對一,一對多,或多對多。下面的例子就是一個TransformBlock和一個ActionBlock的組合。TransformBlock用來把資料*2,并轉換成字元串,然後把資料扔到ActionBlock中,而ActionBlock則用來最後的處理資料列印結果。
public ActionBlock<string> abSync = new ActionBlock<string>((i) =>
{
Thread.Sleep(1000);
Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
}
);
public TransformBlock<int, string> tbSync = new TransformBlock<int, string>((i) =>
{
i = i * 2;
return i.ToString();
}
);
public void TestSync()
{
tbSync.LinkTo(abSync);
for (int i = 0; i < 10; i++)
{
tbSync.Post(i);
}
tbSync.Complete();
Console.WriteLine("Post finished");
tbSync.Completion.Wait();
Console.WriteLine("TransformBlock process finished");
}
TDF提供的一些Block,通過對這些Block配置群組合,可以滿足很多的資料處理的場景。這一篇将繼續介紹與這些Block配置的相關類,和挖掘一些進階功能。
在一些Block的構造函數中,我們常常可以看見需要你輸入DataflowBlockOptions 類型或者它的兩個派生類型ExecutionDataflowBlockOptions 和 GroupingDataflowBlockOptions。
DataflowBlockOptions
DataflowBlockOptions有五個屬性:BoundedCapacity,CancellationToken,MaxMessagesPerTask,NameFormat和TaskScheduler。
用BoundedCapacity來限定容量
這個屬性用來限制一個Block中最多可以緩存資料項的數量,大多數Block都支援這個屬性,這個值預設是DataflowBlockOptions.Unbounded = -1,也就是說沒有限制。開發人員可以制定這個屬性設定數量的上限。那後面的新資料将會延遲。比如說用一個BufferBlock連接配接一個ActionBlock,如果在ActionBlock上面設定了上限,ActionBlock處理的操作速度比較慢,留在ActionBlock中的資料到達了上限,那麼餘下的資料将留在BufferBlock中,直到ActionBlock中的資料量低于上限。這種情況常常會發生在生産者生産的速度大于消費者速度的時候,導緻的問題是記憶體越來越大,資料操作越來越延遲。我們可以通過一個BufferBlock連接配接多個ActionBlock來解決這樣的問題,也就是負載均衡。一個ActionBlock滿了,就會放到另外一個ActionBlock中去了。
用CancellationToken來取消操作
TPL中常用的類型。在Block的構造函數中放入CancellationToken,Block将在它的整個生命周期中全程監控這個對象,隻要在這個Block結束運作(調用Complete方法)前,用CancellationToken發送取消請求,該Block将會停止運作,如果Block中還有沒有處理的資料,那麼将不會再被處理。
用MaxMessagesPerTask控制公平性
每一個Block内部都是異步處理,都是使用TPL的Task。TDF的設計是在保證性能的情況下,盡量使用最少的任務對象來完成資料的操作,這樣效率會高一些,一個任務執行完成一個資料以後,任務對象并不會銷毀,而是會保留着去處理下一個資料,直到沒有資料處理的時候,Block才會回收掉這個任務對象。但是如果資料來自于多個Source,公平性就很難保證。從其他Source來的資料必須要等到早前的那些Source的資料都處理完了才能被處理。這時我們就可以通過MaxMessagesPerTask來控制。這個屬性的預設值還是DataflowBlockOptions.Unbounded=-1,表示沒有上限。假如這個數值被設定為1的話,那麼單個任務隻會處理一個資料。這樣就會帶來極緻的公平性,但是将帶來更多的任務對象消耗。
用NameFormat來定義Block名稱
MSDN上說屬性NameFormat用來擷取或設定查詢塊的名稱時要使用的格式字元串。
Block的名字Name=string.format(NameFormat, block.GetType ().Name, block.Completion.Id)。是以當我們輸入”{0}”的時候,名字就是block.GetType ().Name,如果我們資料的是”{1}”,那麼名字就是block.Completion.Id。如果是“{2}”,那麼就會抛出異常。
用TaskScheduler來排程Block行為
TaskScheduler是非常重要的屬性。同樣這個類型來源于TPL。每個Block裡面都使用TaskScheduler來排程行為,無論是源Block和目标Block之間的資料傳遞,還是使用者自定義的執行資料方法委托,都是使用的TaskScheduler。如果沒有特别設定的話,将使用TaskScheduler.Default(System.Threading.Tasks.ThreadPoolTaskScheduler)來排程。我們可以使用其他的一些繼承于TaskScheduler的類型來設定這個排程器,一旦設定了以後,Block中的所有行為都會使用這個排程器來執行。.Net Framework 4中内建了兩個Scheduler,一個是預設的ThreadPoolTaskScheduler,另一個是用于UI線程切換的SynchronizationContextTaskScheduler。如果你使用的Block設計到UI的話,那可以使用後者,這樣在UI線程切換上面将更加友善。
.Net Framework 4.5 中,還有一個類型被加入到System.Threading.Tasks名稱空間下:ConcurrentExclusiveSchedulerPair。這個類是兩個TaskScheduler的組合。它提供兩個TaskScheduler:ConcurrentScheduler和ExclusiveScheduler;我們可以把這兩個TaskScheduler構造進要使用的Block中。他們保證了在沒有排他任務的時候(使用ExclusiveScheduler的任務),其他任務(使用ConcurrentScheduler)可以同步進行,當有排他任務在運作的時候,其他任務都不能運作。其實它裡面就是一個讀寫鎖。這在多個Block操作共享資源的問題上是一個很友善的解決方案。
public ActionBlock<int> readerAB1;
public ActionBlock<int> readerAB2;
public ActionBlock<int> readerAB3;
public ActionBlock<int> writerAB1;
public BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; });
public void Test()
{
ConcurrentExclusiveSchedulerPair pair = new ConcurrentExclusiveSchedulerPair();
readerAB1 = new ActionBlock<int>((i) =>
{
Console.WriteLine("ReaderAB1 begin handling." + " Execute Time:" + DateTime.Now);
Thread.Sleep(500);
}
, new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler });
readerAB2 = new ActionBlock<int>((i) =>
{
Console.WriteLine("ReaderAB2 begin handling." + " Execute Time:" + DateTime.Now);
Thread.Sleep(500);
}
, new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler });
readerAB3 = new ActionBlock<int>((i) =>
{
Console.WriteLine("ReaderAB3 begin handling." + " Execute Time:" + DateTime.Now);
Thread.Sleep(500);
}
, new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler });
writerAB1 = new ActionBlock<int>((i) =>
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("WriterAB1 begin handling." + " Execute Time:" + DateTime.Now);
Console.ResetColor();
Thread.Sleep(3000);
}
, new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler });
bb.LinkTo(readerAB1);
bb.LinkTo(readerAB2);
bb.LinkTo(readerAB3);
Task.Run(() =>
{
while (true)
{
bb.Post(1);
Thread.Sleep(1000);
}
});
Task.Run(() =>
{
while (true)
{
Thread.Sleep(6000);
writerAB1.Post(1);
}
});
}
用MaxDegreeOfParallelism來并行處理
通常,Block中處理資料都是單線程的,一次隻能處理一個資料,比如說ActionBlock中自定義的代理。使用MaxDegreeOfParallelism可以讓你并行處理這些資料。屬性的定義是最大的并行處理個數。如果定義成-1的話,那就是沒有限制。使用者需要在實際情況中選擇這個值的大小,并不是越大越好。如果是平行處理的話,還應該考慮是否有共享資源。
TDF中的負載均衡
我們可以使用Block很友善的構成一個生産者消費者的模式來處理資料。當生産者産生資料的速度快于消費者的時候,消費者Block的Buffer中的資料會越來越多,消耗大量的記憶體,資料處理也會延時。這時,我們可以用一個生産者Block連接配接多個消費者Block來解決這個問題。由于多個消費者Block一定是并行處理,是以對共享資源的處理一定要做同步處理。
使用BoundedCapacity屬性來實作
當連接配接多個ActionBlock的時候,可以通過設定ActionBlock的BoundedCapacity屬性。當第一個滿了,就會放到第二個,第二個滿了就會放到第三個。
public BufferBlock<int> bb = new BufferBlock<int>();
public ActionBlock<int> ab1 = new ActionBlock<int>((i) =>
{
Thread.Sleep(1000);
Console.WriteLine("ab1 handle data" + i + " Execute Time:" + DateTime.Now);
}
, new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 });
public ActionBlock<int> ab2 = new ActionBlock<int>((i) =>
{
Thread.Sleep(1000);
Console.WriteLine("ab2 handle data" + i + " Execute Time:" + DateTime.Now);
}
, new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 });
public ActionBlock<int> ab3 = new ActionBlock<int>((i) =>
{
Thread.Sleep(1000);
Console.WriteLine("ab3 handle data:" + i + " Execute Time:" + DateTime.Now);
}
, new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 });
public void Test()
{
bb.LinkTo(ab1);
bb.LinkTo(ab2);
bb.LinkTo(ab3);
for (int i = 0; i < 9; i++)
{
bb.Post(i);
}
}
感謝您的閱讀,如果您對我的部落格所講述的内容有興趣,請繼續關注我的後續部落格,我是yswenli 。