天天看點

C#基于消息釋出訂閱模型詳解(上)

  在我們的開發過程中,我們經常會遇到這樣的場景就是一個對象的其中的一些狀态依賴于另外的一個對象的狀态,而且這兩個對象之間彼此是沒有關聯的,及兩者之間的耦合性非常低,特别是在這種基于容器模型的開發中遇到的會非常多,比如Prism架構或者MEF這種架構中,而我們會發現在這樣的系統中我們經常使用一種Publish和Subscribe的模式來進行互動,這種互動有什麼好處呢?基于帶着這些問題的思考,我們來一步步來剖析!

  首先第一步就是定義一個叫做IEventAggregator的接口,裡面定義了一些重載的Subscribe和Publish方法,我們具體來看一看這個接口:

/// <summary>
    ///   Enables loosely-coupled publication of and subscription to events.
    /// </summary>
    public interface IEventAggregator 
    {
        /// <summary>
        ///   Gets or sets the default publication thread marshaller.
        /// </summary>
        /// <value>
        ///   The default publication thread marshaller.
        /// </value>
        Action<System.Action> PublicationThreadMarshaller { get; set; }

        /// <summary>
        ///   Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" />
        /// </summary>
        /// <param name = "instance">The instance to subscribe for event publication.</param>
        void Subscribe(object instance);

        /// <summary>
        ///   Unsubscribes the instance from all events.
        /// </summary>
        /// <param name = "instance">The instance to unsubscribe.</param>
        void Unsubscribe(object instance);

        /// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <remarks>
        ///   Uses the default thread marshaller during publication.
        /// </remarks>
        void Publish(object message);

        /// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
        void Publish(object message, Action<System.Action> marshal);
    }
      

  有了這個接口,接下來就是怎樣去實作這個接口中的各種方法,我們來看看具體的實作過程。

/// <summary>
    ///   Enables loosely-coupled publication of and subscription to events.
    /// </summary>
    public class EventAggregator : IEventAggregator 
    {
        /// <summary>
        ///   The default thread marshaller used for publication;
        /// </summary>
        public static Action<System.Action> DefaultPublicationThreadMarshaller = action => action();

        readonly List<Handler> handlers = new List<Handler>();

        /// <summary>
        ///   Initializes a new instance of the <see cref = "EventAggregator" /> class.
        /// </summary>
        public EventAggregator()
        {
            PublicationThreadMarshaller = DefaultPublicationThreadMarshaller;
        }

        /// <summary>
        ///   Gets or sets the default publication thread marshaller.
        /// </summary>
        /// <value>
        ///   The default publication thread marshaller.
        /// </value>
        public Action<System.Action> PublicationThreadMarshaller { get; set; }

        /// <summary>
        ///   Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" />
        /// </summary>
        /// <param name = "instance">The instance to subscribe for event publication.</param>
        public virtual void Subscribe(object instance) 
        {
            lock(handlers) 
            {
                if (handlers.Any(x => x.Matches(instance)))
                {
                    return;
                }                    
                handlers.Add(new Handler(instance));
            }
        }

        /// <summary>
        ///   Unsubscribes the instance from all events.
        /// </summary>
        /// <param name = "instance">The instance to unsubscribe.</param>
        public virtual void Unsubscribe(object instance) 
        {
            lock(handlers) 
            {
                var found = handlers.FirstOrDefault(x => x.Matches(instance));
                if (found != null)
                { 
                   handlers.Remove(found);
                }                   
            }
        }

        /// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <remarks>
        ///   Does not marshall the the publication to any special thread by default.
        /// </remarks>
        public virtual void Publish(object message) 
        {
            Publish(message, PublicationThreadMarshaller);
        }

        /// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
        public virtual void Publish(object message, Action<System.Action> marshal)
        {
            Handler[] toNotify;
            lock (handlers)
            {
                toNotify = handlers.ToArray();
            } 
            marshal(() => 
            {
                var messageType = message.GetType();
                var dead = toNotify
                    .Where(handler => !handler.Handle(messageType, message))
                    .ToList();

                if(dead.Any()) 
                {
                    lock(handlers)
                    {
                        foreach(var handler in dead)
                        {
                            handlers.Remove(handler);
                        }
                    }
                }
            });
        }

        protected class Handler
        {
            readonly WeakReference reference;
            readonly Dictionary<Type, MethodInfo> supportedHandlers = new Dictionary<Type, MethodInfo>();

            public Handler(object handler) 
            {
                reference = new WeakReference(handler);

                var interfaces = handler.GetType().GetInterfaces()
                    .Where(x => typeof(IHandle).IsAssignableFrom(x) && x.IsGenericType);

                foreach(var @interface in interfaces) 
                {
                    var type = @interface.GetGenericArguments()[0];
                    var method = @interface.GetMethod("Handle");
                    supportedHandlers[type] = method;
                }
            }

            public bool Matches(object instance) 
            {
                return reference.Target == instance;
            }

            public bool Handle(Type messageType, object message) 
            {
                var target = reference.Target;
                if(target == null)
                    return false;

                foreach(var pair in supportedHandlers) 
                {
                    if(pair.Key.IsAssignableFrom(messageType)) 
                    {
                        pair.Value.Invoke(target, new[] { message });
                        return true;
                    }
                }
                return true;
            }
        }
    }
      

  首先在EventAggregator的内部維護了一個LIst<Handler>的List對象,用來存放一系列的Handle,那麼這個嵌套類Handler到底起什麼作用呢?

  我們會發現在每一次當執行這個Subscribe的方法的時候,會将目前object類型的參數instance傳入到Handler這個對象中,在Handler這個類的構造函數中,首先将這個instance放入到一個弱引用中去,然後再擷取這個對象所有繼承的接口,并檢視是否繼承了IHandle<TMessage>這個泛型的接口,如果能夠擷取到,那麼就通過反射擷取到目前instance中定義的Handle方法,并擷取到其中定義的表示泛型類型的類型實參或泛型類型定義的類型形參,并把這兩個對象放到内部定義的一個Dictionary<Type, MethodInfo>字典之中,這樣就把這樣一個活得具體的處理方法的Handler對象放到了一個List<Handler>集合中,這個就是訂閱消息的核心部分,是以目前的對象要想訂閱一個消息,那麼必須實作泛型接口IHandle<TMessage>,并且實作接口中的方法,同時最重要的就是在目前對象的構造函數函數中去訂閱消息(即執行Subscribe(this),我們來看一看這個泛型接口IHandle<TMessage>  

public interface IHandle {}

    /// <summary>
    ///   Denotes a class which can handle a particular type of message.
    /// </summary>
    /// <typeparam name = "TMessage">The type of message to handle.</typeparam>
    public interface IHandle<TMessage> : IHandle 
    {
        /// <summary>
        ///   Handles the message.
        /// </summary>
        /// <param name = "message">The message.</param>
        void Handle(TMessage message);
    }
      

  在看完了Subscribe這個方法後,後面我們就來看看Unsubscribe方法吧,這個思路其實很簡單就是找到List<Handler>中的這個對象,并且移除目前的對象就可以了,那麼下面我們關注的重點就是Publish這個方法中到底實作了什麼?首先來看看代碼,然後再來做一步步分析。  

/// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
        public virtual void Publish(object message, Action<System.Action> marshal)
        {
            Handler[] toNotify;
            lock (handlers)
            {
                toNotify = handlers.ToArray();
            } 
            marshal(() => 
            {
                var messageType = message.GetType();
                var dead = toNotify
                    .Where(handler => !handler.Handle(messageType, message))
                    .ToList();

                if(dead.Any()) 
                {
                    lock(handlers)
                    {
                        foreach(var handler in dead)
                        {
                            handlers.Remove(handler);
                        }
                    }
                }
            });
        }
      

  我們看到,在釋出一個object類型的message的時候,必然對應着另外的一個對象來處理這個消息,那麼怎樣找到這個消息的處理這呢?

  對,我們在Subscribe一個對象的時候不是已經通過反射将訂閱這個消息的對象及方法都存在了一個List<Handler>中去了嗎?那麼我們隻需要在這個List中找到對應的和message類型一緻的那個對象并執行裡面的Handle方法不就可以了嗎?确實是一個很好的思路,這裡我們看代碼也是這樣實行的。

  這裡面還有一個要點就是,如果執行的方法傳回了false,就是執行不成功,那麼就從目前的List<Handler>中移除掉這個對象,因為這樣的操作是沒有任何意義的,通過這樣的過程我們就能夠完沒地去實作兩個對象之間的消息傳遞了,另外我們通過總結以後就能夠發現,這個思路實作的重點包括以下方面:

  1 所有消息訂閱的對象必須實作統一的接口IHandle<TMessage>,并實作裡面的Handel方法。

  2 整個EventAggregator必須是單執行個體或者是靜态的,這樣才能夠在統一的集合中去實作上述的各種操作。

  最後還是按照之前的慣例,最後給出一個具體的執行個體來做相關的說明,請點選此處進行下載下傳,在下篇中我們将介紹一種簡單版的基于事件的釋出和訂閱模式的例子。