天天看點

MediatR 知多少

引言

首先不用查字典了,詞典查無此詞。猜測是作者筆誤将mediator寫成mediatr了。廢話少說,轉入正題。

MediatR 知多少

先來簡單了解下這個開源項目mediatr(作者jimmy bogard,也是開源項目automapper的建立者,在此表示膜拜):

simple mediator implementation in .net. in-process messaging with no dependencies. supports request/response, commands, queries, notifications and events, synchronous and async with intelligent dispatching via c# generic variance. .net中的簡單中介者模式實作,一種程序内消息傳遞機制(無其他外部依賴)。 支援以同步或異步的形式進行請求/響應,指令,查詢,通知和事件的消息傳遞,并通過c#泛型支援消息的智能排程。

如上所述,其核心是一個中介者模式的.net實作,其目的是消息發送和消息處理的解耦。它支援以單點傳播和多點傳播形式使用同步或異步的模式來釋出消息,建立和偵聽事件。

中介者模式

既然是對中介者模式的一種實作,那麼我們就有必要簡要介紹下中介者這個設計模式,以便後續展開。

MediatR 知多少
中介者模式:用一個中介對象封裝一系列的對象互動,中介者使各對象不需要顯示地互相作用,進而使耦合松散,而且可以獨立地改變它們之間的互動。

看上面的官方定義可能還是有點繞,那麼下面這張圖應該能幫助你對中介者模式有個直覺了解。

MediatR 知多少

使用中介模式,對象之間的互動将封裝在中介對象中。對象不再直接互相互動(解耦),而是通過中介進行互動。這減少了對象之間的依賴性,進而減少了耦合。

那其優缺點也在圖中很容易看出:

優點:中介者模式的優點就是減少類間的依賴,把原有的一對多的依賴變成了一對一的依賴,同僚類隻依賴中介者,減少了依賴,當然同時也降低了類間的耦合 缺點:中介者模式的缺點就是中介者會膨脹得很大,而且邏輯複雜,原本n個對象直接的互相依賴關系轉換為中介者和同僚類的依賴關系,同僚類越多,中介者的邏輯就越複雜。

hello meidatr

在開始之前,我們先來了解下其基本用法。

單點傳播消息傳輸,也就是一對一的消息傳遞,一個消息對應一個消息處理。其通過 ​<code>​irequest​</code>​來抽象單點傳播消息,用 ​<code>​irequesthandler​</code>​進行消息處理。

//建構 消息請求

public class ping : irequest&lt;string&gt; { }

//建構 消息處理

public class pinghandler : irequesthandler&lt;ping, string&gt; {

   public task&lt;string&gt; handle(ping request, cancellationtoken cancellationtoken) {

       return task.fromresult("pong");

   }

}

//發送 請求

var response = await mediator.send(new ping());

debug.writeline(response); // "pong"

多點傳播消息傳輸,也就是一對多的消息傳遞,一個消息對應多個消息處理。其通過 ​<code>​inotification​</code>​來抽象多點傳播消息,對應的消息處理類型為 ​<code>​inotificationhandler​</code>​。

//建構 通知消息

public class ping : inotification { }

//建構 消息處理器1

public class pong1 : inotificationhandler&lt;ping&gt; {

   public task handle(ping notification, cancellationtoken cancellationtoken) {

       debug.writeline("pong 1");

       return task.completedtask;

//建構 消息處理器2

public class pong2 : inotificationhandler&lt;ping&gt; {

       debug.writeline("pong 2");

//釋出消息

await mediator.publish(new ping());

源碼解析

對mediatr有了基本認識後,我們來看看源碼,研究下其如何實作的。

MediatR 知多少

從代碼圖中我們可以看到其核心的對象主要包括:

irequest vs irequesthandler

inotification vs inoticifaitonhandler

imediator vs mediator

unit

ipipelinebehavior

其中 ​<code>​irequest​</code>​和 ​<code>​inotification​</code>​分别對應單點傳播和多點傳播消息的抽象。 對于單點傳播消息可以決定是否需要傳回值選用不同的接口:

irequest - 有傳回值

irequest - 無傳回值

這裡就不得不提到其中巧妙的設計,通過引入結構類型 ​<code>​unit​</code>​來代表無傳回的情況。

/// &lt;summary&gt;

/// 代表無需傳回值的請求

/// &lt;/summary&gt;

public interface irequest : irequest&lt;unit&gt; { }

/// 代表有傳回值的請求

/// &lt;typeparam name="tresponse"&gt;response type&lt;/typeparam&gt;

public interface irequest&lt;out tresponse&gt; : ibaserequest { }

/// allows for generic type constraints of objects implementing irequest or irequest{tresponse}

public interface ibaserequest { }

同樣對于 ​<code>​irequesthandler​</code>​也是通過結構類型 ​<code>​unit​</code>​來處理不需要傳回值的情況。

public interface irequesthandler&lt;in trequest, tresponse&gt;

   where trequest : irequest&lt;tresponse&gt;

{

   task&lt;tresponse&gt; handle(trequest request, cancellationtoken cancellationtoken);

public interface irequesthandler&lt;in trequest&gt; : irequesthandler&lt;trequest, unit&gt;

   where trequest : irequest&lt;unit&gt;

從上面我們可以看出定義了一個方法名為 ​<code>​handle​</code>​傳回值為 ​<code>​task​</code>​的包裝類型,而是以賦予了其具有以同步和異步的方式進行消息處理的能力。我們再看一下其以異步方式進行消息處理(無傳回值)的預設實作 ​<code>​asyncrequesthandler​</code>​:

public abstract class asyncrequesthandler&lt;trequest&gt; : irequesthandler&lt;trequest&gt;

   where trequest : irequest

   async task&lt;unit&gt; irequesthandler&lt;trequest, unit&gt;.handle(trequest request, cancellationtoken cancellationtoken)

   {

       await handle(request, cancellationtoken).configureawait(false);

       return unit.value;

   protected abstract task handle(trequest request, cancellationtoken cancellationtoken);

從上面的代碼來看,我們很容易看出這是裝飾模式的實作方式,是不是很巧妙的解決了無需傳回值的場景。

最後我們來看下結構類型 ​<code>​unit​</code>​的定義:

public struct unit : iequatable&lt;unit&gt;, icomparable&lt;unit&gt;, icomparable

   public static readonly unit value = new unit();

   public static readonly task&lt;unit&gt; task = system.threading.tasks.task.fromresult(value);

   // some other code

MediatR 知多少

​<code>​imediator​</code>​主要定義了兩個方法 ​<code>​send​</code>​和 ​<code>​publish​</code>​,分别用于發送消息和釋出通知。其預設實作mediator中定義了兩個集合,分别用來儲存請求與請求處理的映射關系。

//mediator.cs

//儲存request和requesthandler的映射關系,1對1。

private static readonly concurrentdictionary&lt;type, object&gt; _requesthandlers = new concurrentdictionary&lt;type, object&gt;();

//儲存notification與notificationhandler的映射關系,

private static readonly concurrentdictionary&lt;type, notificationhandlerwrapper&gt; _notificationhandlers = new concurrentdictionary&lt;type, notificationhandlerwrapper&gt;();

這裡面其又引入了兩個包裝類: ​<code>​requesthandlerwrapper​</code>​和 ​<code>​notificationhandlerwrapper​</code>​。這兩個包裝類的作用就是用來傳遞 ​<code>​servicefactory​</code>​委托進行依賴解析。

是以說 ​<code>​mediator​</code>​借助 ​<code>​publicdelegateobjectservicefactory(typeservicetype);​</code>​完成對ioc容器的一層抽象。這樣就可以對接任意你喜歡用的ioc容器,比如:autofac、windsor或asp.net core預設的ioc容器,隻需要在注冊 ​<code>​imediator​</code>​時指定 ​<code>​servicefactory​</code>​類型的委托即可,比如asp.net core中的做法:

MediatR 知多少

在使用asp.net core提供的原生ioc容器有些問題:service registration crashes when registering generic handlers

MediatR 知多少

meidatr支援按需配置請求管道進行消息處理。即支援在請求處理前和請求處理後添加額外行為。僅需實作以下兩個接口,并注冊到ioc容器即可。

irequestpreprocessor 請求處理前接口

irequestpostprocessor 請求處理後接口

其中 ​<code>​ipipelinebehavior​</code>​的預設實作: ​<code>​requestpreprocessorbehavior​</code>​和 ​<code>​requestpostprocessorbehavior​</code>​分别用來處理所有實作 ​<code>​irequestpreprocessor​</code>​和 ​<code>​irequestpostprocessor​</code>​接口定義的管道行為。

而處理管道是如何建構的呢?我們來看下 ​<code>​requesthandlerwrapperimpl​</code>​的具體實作:

internal class requesthandlerwrapperimpl&lt;trequest, tresponse&gt; : requesthandlerwrapper&lt;tresponse&gt;

   public override task&lt;tresponse&gt; handle(irequest&lt;tresponse&gt; request, cancellationtoken cancellationtoken,

       servicefactory servicefactory)

       task&lt;tresponse&gt; handler() =&gt; gethandler&lt;irequesthandler&lt;trequest, tresponse&gt;&gt;(servicefactory).handle((trequest) request, cancellationtoken);

       return servicefactory

           .getinstances&lt;ipipelinebehavior&lt;trequest, tresponse&gt;&gt;()

           .reverse()

           .aggregate((requesthandlerdelegate&lt;tresponse&gt;) handler, (next, pipeline) =&gt; () =&gt; pipeline.handle((trequest)request, cancellationtoken, next))();

就這樣一個簡單的函數,涉及的知識點還真不少,說實話我花了不少時間來理清這個邏輯。 那都涉及到哪些知識點呢?我們一個一個的來理一理。

c# 7.0的新特性 - 局部函數

c# 6.0的新特性 - 表達式形式的成員函數

linq高階函數 - <code>aggregate</code>

匿名委托

構造委托函數鍊

關于第1、2個知識點,請看下面這段代碼:

public delegate int sumdelegate();//定義委托

public static void main()

   //局部函數(在函數内部定義函數)

   //表達式形式的成員函數, 相當于 int sum() { return 1 + 2;}

   int sum() =&gt; 1 + 2;

   var sumdelegate = (sumdelegate)sum;//轉換為委托

   console.writeline(sumdelegate());//委托調用,輸出:3

再看第4個知識點,匿名委托:

public delegate int sumdelegate();

sumdelegate delegater1 = delegate(){ return 1+2; }

//也相當于

sumdelegate delegater2 =&gt; 1+2;

下面再來介紹一下 ​<code>​aggregate​</code>​這個linq高階函數。 ​<code>​aggregate​</code>​是對一個集合序列進行累加操作,通過指定初始值,累加函數,以及結果處理函數完成計算。

函數定義:

public static tresult aggregate&lt;tsource,taccumulate,tresult&gt;

(this ienumerable&lt;tsource&gt; source,

taccumulate seed,

func&lt;taccumulate,tsource,taccumulate&gt; func,

func&lt;taccumulate,tresult&gt; resultselector);

根據函數定義我們來寫個簡單的demo:

var nums = enumerable.range(2, 3);//[2,3,4]

// 計算1到5的累加之和,再将結果乘以2

var sum = nums.aggregate(1, (total, next) =&gt; total + next, result =&gt; result * 2);// 相當于 (((1+2)+3)+4)*2=20

console.writeline(sum);//20

和函數參數進行一一對應:

seed : 1

func func : (total, next) =&gt; total + next

func resultselector : result =&gt; result * 2

基于上面的認識,我們再來回過頭梳理一下 ​<code>​requesthandlerwrapperimpl​</code>​。 其主要是借助委托: ​<code>​publicdelegatetask&lt;tresponse&gt;requesthandlerdelegate&lt;tresponse&gt;();​</code>​來構造委托函數鍊來建構處理管道。

對 ​<code>​aggregate​</code>​函數了解後,我們就不難了解處理管道的建構了。請看下圖中的代碼解讀:

MediatR 知多少
MediatR 知多少

那如何保證先執行 ​<code>​irequestpreprocessor​</code>​再執行 ​<code>​irequestpostprocessor​</code>​呢? 就是在注冊到ioc容器時必須保證順序,先注冊 ​<code>​irequestpreprocessor​</code>​再注冊 ​<code>​irequestpostprocessor​</code>​。(這一點很重要!!!)

看到這裡有沒有想到asp.net core中請求管道中中間件的建構呢?是不是很像俄羅斯套娃?先由内而外建構管道,再由外而内執行!

至此,mediatr的實作思路算是理清了。

應用場景

如文章開頭提到:mediatr是一種程序内消息傳遞機制。 支援以同步或異步的形式進行請求/響應,指令,查詢,通知和事件的消息傳遞,并通過c#泛型支援消息的智能排程。

那麼我們就應該明白,其核心是消息的解耦。因為我們幾乎都是在與消息打交道,那是以它的應用場景就很廣泛,比如我們可以基于mediatr實作cqrs、eventbus等。

另外,還有一種應用場景:我們知道借助依賴注入的好處是,就是解除依賴,但我們又不得不思考一個問題,随着業務邏輯複雜度的增加,構造函數可能要注入更多的服務,當注入的依賴太多時,其會導緻構造函數膨脹。比如:

public dashboardcontroller(

   icustomerrepository customerrepository,

   iorderservice orderservice,

   icustomerhistoryrepository historyrepository,

   iorderrepository orderrepository,

   iproductrespoitory productrespoitory,

   irelatedproductsrepository relatedproductsrepository,

   isupportservice supportservice,

   ilog logger

   )  

如果借助 ​<code>​mediatr​</code>​進行改造,也許僅需注入 ​<code>​imediatr​</code>​就可以了。

總結

看到這裡,也許你應該明白mediatr實質上并不是嚴格意義上的中介者模式實作,我更傾向于其是基于ioc容器的一層抽象,根據請求定位相應的請求處理器進行消息處理,也就是服務定位。 那到這裡似乎也恍然大悟mediatr這個筆誤可能是有意為之了。序員,你怎麼看?

參考資料: cqrs/mediatr implementation patterns mediatr when and why i should use it?  abp cqrs 實作案例:基于 mediatr 實作

繼續閱讀