概述
雙工(Duplex)模式的消息交換方式展現在消息交換過程中,參與的雙方均可以向對方發送消息。基于雙工MEP消息交換可以看成是多個基本模式下(比如請求-回複模式和單項模式)消息交換的組合。雙工MEP又具有一些變體,比如典型的訂閱-釋出模式就可以看成是雙工模式的一種表現形式。雙工消息交換模式使服務端回調(Callback)用戶端操作成為可能。
在Wcf中不是所有的綁定協定都支援回調操作,BasicHttpBinding,WSHttpBinding綁定協定不支援回調操作;NetTcpBinding和NetNamedPipeBinding綁定支援回調操作;WSDualHttpBinding綁定是通過設定兩個HTTP信道來支援雙向通信,是以它也支援回調操作。
兩種典型的雙工MEP
1.請求過程中的回調
這是一種比較典型的雙工消息交換模式的表現形式,用戶端在進行服務調用的時候,附加上一個回調對象;服務在對處理該進行中,通過用戶端附加的回調對象(實際上是調用回調服務的代理對象)回調用戶端的操作(該操作在用戶端執行)。整個消息交換的過程實際上由兩個基本的消息交換構成,其一是用戶端正常的服務請求,其二則是服務端對用戶端的回調。兩者可以采用請求-回複模式,也可以采用單向(One-way)的MEP進行消息交換。下圖描述了這樣的過程,服務調用和回調都采用請求-回複MEP。

2.訂閱-釋出
訂閱-釋出模式是雙工模式的一個典型的變體。在這個模式下,消息交換的雙方變成了訂閱者和釋出者,若幹訂閱者就某個主題向釋出者申請訂閱,釋出者将所有的訂閱者儲存在一個訂閱者清單中,在某個時刻将主題發送給該主題的所有訂閱者。實際上基于訂閱-釋出模式的消息交換也可以看成是兩個基本模式下消息交換的組合,申請訂閱是一個單向模式的消息交換(如果訂閱者行為得到訂閱的回饋,該消息交換也可以采用請求-回複模式);而主題釋出也是一個基于單向模式的消息交換過程。訂閱-釋出消息交換模式如下圖所示。
示例
接下來我們将會建立一個簡單的Wcf通信服務,包括使使用NetTcpBinding實作雙工通信,和監控雙工通信過程中的用戶端和服務端一方斷開後的捕捉事件。
項目如圖所示
第一步:
先建立IGateWayService和INotifyCallBack接口
[ServiceContract(CallbackContract = typeof(INotifyCallBack))]
public interface IGateWayService
{
[OperationContract]
void RegisterClient(string clientName);
[OperationContract]
string GetData(int value);
[OperationContract]
CompositeType GetDataUsingDataContract(CompositeType composite);
}
// 使用下面示例中說明的資料約定将複合類型添加到服務操作。
[DataContract]
public class CompositeType
{
bool boolValue = true;
string stringValue = "Hello ";
[DataMember]
public bool BoolValue
{
get { return boolValue; }
set { boolValue = value; }
}
[DataMember]
public string StringValue
{
get { return stringValue; }
set { stringValue = value; }
}
}
INotifyCallBack.cs如下:
public interface INotifyCallBack
{
[OperationContract(IsOneWay = true)]
void NotifyFunction(string sender);
}
記住在IGateWayService接口上方設定Attribute [ServiceContract(CallbackContract = typeof(INotifyCallBack))] 這樣設定表示這個接口是支援回調的。
接下來定義一個ClientRegisterInfo.cs來定義用戶端的名字和用戶端的INotifyCallBack屬性,再定義一個Timer 來調用INotifyCallBack給用戶端發送消息。再通過
wcf 的ICommunicationObject來定義通信出錯和關閉的事件。
public class ClientRegisterInfo
{
public ClientRegisterInfo()
{
_senderTimer.Elapsed += OnSenderMessage;
_senderTimer.Start();
}
private void OnSenderMessage(object sender, ElapsedEventArgs e)
{
if (_notifyCallBack != null)
{
var communication = _notifyCallBack as ICommunicationObject;
if(communication.State==CommunicationState.Opened)
_notifyCallBack.NotifyFunction(DateTime.Now.ToString());
}
}
public Timer _senderTimer=new Timer(10*1000);
private INotifyCallBack _notifyCallBack;
public INotifyCallBack NotifyCallBack
{
get { return _notifyCallBack; }
set
{
lock (_syncNotifyObj)
{
_notifyCallBack = value;
if (_notifyCallBack != null)
{
var communication = _notifyCallBack as ICommunicationObject;
if (communication != null)
{
communication.Closed += OnChannelClose;
communication.Faulted += OnChannelFault;
}
}
}
}
}
private readonly object _syncNotifyObj = new object();
private void OnChannelFault(object sender, EventArgs e)
{
ClientInfoCache.Instance.Remove(this);
}
private void OnChannelClose(object sender, EventArgs e)
{
ClientInfoCache.Instance.Remove(this);
}
public string ClientName { get; set; }
}
再定義一個單例來儲存用戶端的資訊。
public class ClientInfoCache
{
private static readonly object SyncObj = new object();
private static ClientInfoCache _instance;
public static ClientInfoCache Instance
{
get
{
lock (SyncObj)
{
if (_instance == null)
_instance = new ClientInfoCache();
}
return _instance;
}
}
private ClientInfoCache()
{
_clientList = new List<ClientRegisterInfo>();
}
private List<ClientRegisterInfo> _clientList;
private static object SyncOperator = new object();
/// <summary>
/// Add client entity
/// </summary>
/// <param name="entity">client entity</param>
public void Add(ClientRegisterInfo entity)
{
if (entity == null) return;
lock (SyncOperator)
{
var findClient =
_clientList.FirstOrDefault(
t => t.ClientName.Equals(entity.ClientName, StringComparison.OrdinalIgnoreCase));
if (findClient == null)
_clientList.Add(entity);
else
{
findClient.NotifyCallBack = entity.NotifyCallBack;
}
}
}
/// <summary>
/// Remove client
/// </summary>
/// <param name="entity">Client entity</param>
public void Remove(ClientRegisterInfo entity)
{
lock (SyncOperator)
{
_clientList.Remove(entity);
}
}
}
再建立個控制台運應程式來啟動Wcf,代碼如下:
public class Program
{
static void Main(string[] args)
{
StartListener();
}
private static void StartListener()
{
try
{
using (var host = new ServiceHost(typeof(GateWayService)))
{
host.Opened += delegate
{
Console.WriteLine("[Server] Begins to listen request on " + host.BaseAddresses[0]);
};
host.Open();
Console.Read();
}
}
catch (Exception ex)
{
}
}
}
在App.config設定配置如下:
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.serviceModel>
<bindings>
<netTcpBinding>
<binding name="longTimeoutBinding" closeTimeout="01:10:00" openTimeout="01:10:00"
receiveTimeout="10:10:00" sendTimeout="10:10:00" maxBufferPoolSize="655350000"
maxBufferSize="655350000" maxReceivedMessageSize="655350000">
<readerQuotas maxDepth="32" maxStringContentLength="655350000"
maxArrayLength="655350000" maxBytesPerRead="655350000" maxNameTableCharCount="655350000" />
<reliableSession inactivityTimeout="23:59:59" />
<security mode="None" />
</binding>
</netTcpBinding>
</bindings>
<behaviors>
<serviceBehaviors>
<behavior name="NewBehavior">
<serviceMetadata httpGetEnabled="True" httpGetUrl="Http://localhost:7789/" httpsGetEnabled="True"/>
<serviceDebug includeExceptionDetailInFaults="False" />
<serviceThrottling maxConcurrentCalls="1000" maxConcurrentSessions="1000" maxConcurrentInstances="1000" />
</behavior>
</serviceBehaviors>
</behaviors>
<services>
<service name="WcfService.GateWayService" behaviorConfiguration="NewBehavior" >
<endpoint address="net.tcp://localhost:7788/GatewayService.svc" binding="netTcpBinding" contract="WcfService.IGateWayService" name="WcfService_GateWayService" bindingConfiguration="longTimeoutBinding" >
</endpoint>
<endpoint address="mex" binding="mexTcpBinding" contract="IMetadataExchange" ></endpoint>
<host >
<baseAddresses >
<add baseAddress="net.tcp://localhost:7788/GatewayService.svc" />
<add baseAddress="Http://localhost:7789/" />
</baseAddresses>
</host >
</service>
</services>
</system.serviceModel>
</configuration>
longTimeoutBinding是設定傳輸的屬性,如最大傳輸大小,TimeOut的時間等。
在用戶端建立個WcfCallBack.cs 繼承IGateWayServiceCallback接口,代碼如下。
[CallbackBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)]
public class WcfCallBack : IGateWayServiceCallback
{
public void NotifyFunction(string sender)
{
Console.WriteLine("Get a message,message info is {0}", sender);
}
}
設定屬性[CallbackBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)]表示伺服器是通過并發的給用戶端來發送消息的。
控制台代碼如下
class Program
{
private static GateWayServiceClient _client;
static void Main(string[] args)
{
var cb = new WcfCallBack();
var context = new InstanceContext(cb);
_client = new GateWayServiceClient(context);
_client.RegisterClient("Test1");
((ICommunicationObject)_client).Closed += OnChannelClose;
((ICommunicationObject)_client).Faulted += OnChannelFaulted;
Console.WriteLine("Input Q to exit.");
while (string.Compare(Console.ReadLine(), ConsoleKey.Q.ToString(), StringComparison.OrdinalIgnoreCase) != 0)
{
}
}
private static void OnChannelFaulted(object sender, EventArgs e)
{
if (FaultedEvent != null)
FaultedEvent(sender, e);
}
private static void OnChannelClose(object sender, EventArgs e)
{
if (CloseEvent != null)
CloseEvent(sender, e);
}
public static EventHandler CloseEvent;
public static EventHandler FaultedEvent;
}
運作的結果如下圖:
當我關閉用戶端時,能捕捉到Closed和Faulted事件
當我關閉服務端時,在用戶端能捕捉到Faulted事件
總結:
Wcf 通信使用簡單,功能豐富。