天天看點

Wcf 雙工通信的應用

概述

雙工(Duplex)模式的消息交換方式展現在消息交換過程中,參與的雙方均可以向對方發送消息。基于雙工MEP消息交換可以看成是多個基本模式下(比如請求-回複模式和單項模式)消息交換的組合。雙工MEP又具有一些變體,比如典型的訂閱-釋出模式就可以看成是雙工模式的一種表現形式。雙工消息交換模式使服務端回調(Callback)用戶端操作成為可能。

在Wcf中不是所有的綁定協定都支援回調操作,BasicHttpBinding,WSHttpBinding綁定協定不支援回調操作;NetTcpBinding和NetNamedPipeBinding綁定支援回調操作;WSDualHttpBinding綁定是通過設定兩個HTTP信道來支援雙向通信,是以它也支援回調操作。

兩種典型的雙工MEP

1.請求過程中的回調

這是一種比較典型的雙工消息交換模式的表現形式,用戶端在進行服務調用的時候,附加上一個回調對象;服務在對處理該進行中,通過用戶端附加的回調對象(實際上是調用回調服務的代理對象)回調用戶端的操作(該操作在用戶端執行)。整個消息交換的過程實際上由兩個基本的消息交換構成,其一是用戶端正常的服務請求,其二則是服務端對用戶端的回調。兩者可以采用請求-回複模式,也可以采用單向(One-way)的MEP進行消息交換。下圖描述了這樣的過程,服務調用和回調都采用請求-回複MEP。

Wcf 雙工通信的應用

2.訂閱-釋出

訂閱-釋出模式是雙工模式的一個典型的變體。在這個模式下,消息交換的雙方變成了訂閱者和釋出者,若幹訂閱者就某個主題向釋出者申請訂閱,釋出者将所有的訂閱者儲存在一個訂閱者清單中,在某個時刻将主題發送給該主題的所有訂閱者。實際上基于訂閱-釋出模式的消息交換也可以看成是兩個基本模式下消息交換的組合,申請訂閱是一個單向模式的消息交換(如果訂閱者行為得到訂閱的回饋,該消息交換也可以采用請求-回複模式);而主題釋出也是一個基于單向模式的消息交換過程。訂閱-釋出消息交換模式如下圖所示。

Wcf 雙工通信的應用

示例

接下來我們将會建立一個簡單的Wcf通信服務,包括使使用NetTcpBinding實作雙工通信,和監控雙工通信過程中的用戶端和服務端一方斷開後的捕捉事件。

項目如圖所示

Wcf 雙工通信的應用

第一步:

先建立IGateWayService和INotifyCallBack接口

[ServiceContract(CallbackContract = typeof(INotifyCallBack))]
    public interface IGateWayService
    {
        [OperationContract]
        void RegisterClient(string clientName);
        [OperationContract]
        string GetData(int value);

        [OperationContract]
        CompositeType GetDataUsingDataContract(CompositeType composite);
    }


    // 使用下面示例中說明的資料約定将複合類型添加到服務操作。
    [DataContract]
    public class CompositeType
    {
        bool boolValue = true;
        string stringValue = "Hello ";

        [DataMember]
        public bool BoolValue
        {
            get { return boolValue; }
            set { boolValue = value; }
        }

        [DataMember]
        public string StringValue
        {
            get { return stringValue; }
            set { stringValue = value; }
        }
    }      

INotifyCallBack.cs如下:

public interface INotifyCallBack
    {
        [OperationContract(IsOneWay = true)]
        void NotifyFunction(string sender);
    }      

記住在IGateWayService接口上方設定Attribute [ServiceContract(CallbackContract = typeof(INotifyCallBack))] 這樣設定表示這個接口是支援回調的。

接下來定義一個ClientRegisterInfo.cs來定義用戶端的名字和用戶端的INotifyCallBack屬性,再定義一個Timer 來調用INotifyCallBack給用戶端發送消息。再通過

wcf 的ICommunicationObject來定義通信出錯和關閉的事件。

public class ClientRegisterInfo
    {
        public ClientRegisterInfo()
        {
            _senderTimer.Elapsed += OnSenderMessage;
            _senderTimer.Start();
        }

        private void OnSenderMessage(object sender, ElapsedEventArgs e)
        {
            if (_notifyCallBack != null)
            {
                 var communication = _notifyCallBack as ICommunicationObject;
                if(communication.State==CommunicationState.Opened)
                    _notifyCallBack.NotifyFunction(DateTime.Now.ToString());
            }
        }

        public Timer _senderTimer=new Timer(10*1000);

        private INotifyCallBack _notifyCallBack;

        public INotifyCallBack NotifyCallBack
        {
            get { return _notifyCallBack; }
            set
            {
                lock (_syncNotifyObj)
                {
                    _notifyCallBack = value;
                    if (_notifyCallBack != null)
                    {
                        var communication = _notifyCallBack as ICommunicationObject;
                        if (communication != null)
                        {
                            communication.Closed += OnChannelClose;
                            communication.Faulted += OnChannelFault;
                        }
                    }
                }
            }
        }

        private readonly object _syncNotifyObj = new object();

        private void OnChannelFault(object sender, EventArgs e)
        {
     
            ClientInfoCache.Instance.Remove(this);
        }

        private void OnChannelClose(object sender, EventArgs e)
        {
  
            ClientInfoCache.Instance.Remove(this);
        }

        public string ClientName { get; set; }
    }      

再定義一個單例來儲存用戶端的資訊。

public class ClientInfoCache
    {
        private static readonly object SyncObj = new object();

        private static ClientInfoCache _instance;

        public static ClientInfoCache Instance
        {
            get
            {
                lock (SyncObj)
                {
                    if (_instance == null)
                        _instance = new ClientInfoCache();
                }
                return _instance;
            }
        }

        private ClientInfoCache()
        {
            _clientList = new List<ClientRegisterInfo>();
        }

        private List<ClientRegisterInfo> _clientList;

        private static object SyncOperator = new object();

        /// <summary>
        /// Add client entity
        /// </summary>
        /// <param name="entity">client entity</param>
        public void Add(ClientRegisterInfo entity)
        {
            if (entity == null) return;
            lock (SyncOperator)
            {
                var findClient =
                    _clientList.FirstOrDefault(
                        t => t.ClientName.Equals(entity.ClientName, StringComparison.OrdinalIgnoreCase));
                if (findClient == null)
                    _clientList.Add(entity);
                else
                {
                    findClient.NotifyCallBack = entity.NotifyCallBack;
                }
            }
        }

        /// <summary>
        /// Remove client
        /// </summary>
        /// <param name="entity">Client entity</param>
        public void Remove(ClientRegisterInfo entity)
        {
            lock (SyncOperator)
            {
                _clientList.Remove(entity);
            }
        }
    }      

再建立個控制台運應程式來啟動Wcf,代碼如下:

public class Program
    {
        static void Main(string[] args)
        {
            StartListener();
        }

        private static void StartListener()
        {
            try
            {
                using (var host = new ServiceHost(typeof(GateWayService)))
                {
                    host.Opened += delegate
                    {
                        Console.WriteLine("[Server] Begins to listen request on " + host.BaseAddresses[0]);
                    };

                    host.Open();
                    Console.Read();
                }
            }
            catch (Exception ex)
            {
             
            }
        }
    }      

在App.config設定配置如下:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
 <system.serviceModel>
    <bindings>
      <netTcpBinding>
        <binding name="longTimeoutBinding" closeTimeout="01:10:00" openTimeout="01:10:00"
 receiveTimeout="10:10:00" sendTimeout="10:10:00" maxBufferPoolSize="655350000"
 maxBufferSize="655350000" maxReceivedMessageSize="655350000">
          <readerQuotas maxDepth="32" maxStringContentLength="655350000"
            maxArrayLength="655350000" maxBytesPerRead="655350000" maxNameTableCharCount="655350000" />
          <reliableSession inactivityTimeout="23:59:59" />
          <security mode="None" />
        </binding>
      </netTcpBinding>
    </bindings>
    <behaviors>
      <serviceBehaviors>
        <behavior name="NewBehavior">
          <serviceMetadata httpGetEnabled="True" httpGetUrl="Http://localhost:7789/" httpsGetEnabled="True"/>
          <serviceDebug includeExceptionDetailInFaults="False" />
          <serviceThrottling maxConcurrentCalls="1000"  maxConcurrentSessions="1000"  maxConcurrentInstances="1000" />
        </behavior>
      </serviceBehaviors>
    </behaviors>
    <services>
      <service name="WcfService.GateWayService" behaviorConfiguration="NewBehavior" >
        <endpoint  address="net.tcp://localhost:7788/GatewayService.svc"  binding="netTcpBinding"  contract="WcfService.IGateWayService" name="WcfService_GateWayService"  bindingConfiguration="longTimeoutBinding" >
        </endpoint>
        <endpoint  address="mex" binding="mexTcpBinding"   contract="IMetadataExchange" ></endpoint>
        <host >
          <baseAddresses >
            <add baseAddress="net.tcp://localhost:7788/GatewayService.svc" />
            <add baseAddress="Http://localhost:7789/" />
          </baseAddresses>
        </host >
      </service>
    </services>
  </system.serviceModel>
</configuration>      

longTimeoutBinding是設定傳輸的屬性,如最大傳輸大小,TimeOut的時間等。

在用戶端建立個WcfCallBack.cs 繼承IGateWayServiceCallback接口,代碼如下。

[CallbackBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)]
    public class WcfCallBack : IGateWayServiceCallback
    {
        public void NotifyFunction(string sender)
        {
            Console.WriteLine("Get a message,message info is {0}", sender);
        }
    }      

設定屬性[CallbackBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)]表示伺服器是通過并發的給用戶端來發送消息的。

控制台代碼如下

class Program
    {
        private static GateWayServiceClient _client;
        static void Main(string[] args)
        {
            var cb = new WcfCallBack();
            var context = new InstanceContext(cb);
            _client = new GateWayServiceClient(context);
            _client.RegisterClient("Test1");
            ((ICommunicationObject)_client).Closed += OnChannelClose;
            ((ICommunicationObject)_client).Faulted += OnChannelFaulted;
            Console.WriteLine("Input Q to exit.");
            while (string.Compare(Console.ReadLine(), ConsoleKey.Q.ToString(), StringComparison.OrdinalIgnoreCase) != 0)
            {

            }
        }

        private static  void OnChannelFaulted(object sender, EventArgs e)
        {
            if (FaultedEvent != null)
                FaultedEvent(sender, e);
        }

        private static  void OnChannelClose(object sender, EventArgs e)
        {
            if (CloseEvent != null)
                CloseEvent(sender, e);
        }

        public static  EventHandler CloseEvent;

        public static  EventHandler FaultedEvent;
    }      

運作的結果如下圖:

Wcf 雙工通信的應用

當我關閉用戶端時,能捕捉到Closed和Faulted事件

Wcf 雙工通信的應用

當我關閉服務端時,在用戶端能捕捉到Faulted事件

Wcf 雙工通信的應用

總結:

Wcf 通信使用簡單,功能豐富。