4 基于RoundTrip(往返)的通讯协议设计
通讯服务器插件的核心为3部分:(1)与通讯方式、业务逻辑无关的通讯协议实现;(2)和通讯方式、业务逻辑有关的通讯业务逻辑的实现;(3)远程通讯消息队列。在这里我将重点描述通讯协议的实现。这个通讯协议的实现比较灵巧。
4.1 通讯协议基本单元——消息
通讯协议的通讯单元是消息,以下是来自硬件开发工程师编写的协议,消息包由前导符、起始符、消息头、校验码、消息体、结束符等部分组成。不同的通讯指令,发出的消息和接收到消息均不相同。

通讯协议必须能够发出正确的消息和解析响应的消息包,此外,硬件能接受的消息是字节格式,而通讯服务器软件能够正确识别的则是各个有意义的字段。为此,我们为消息设计了如下的基类。消息较小的单元是一个MessagePart,它提供了ToContent和ToMessage方法分别用于转换成字节码和字符串,此外,它还定义了TryParse方法用于将字节码解析成有意义的MessagePart对象。这里定义了ParseMessageException异常,当消息解析失败时,抛出该异常。下面是消息头、消息体以及消息基类的定义。消息基类由前缀、起始、头、体和后缀部分组成。
接着我们根据硬件开发工程师提供的SCATA 3.0协议,定义与通讯协议相关的消息基类Scata30Message。这个消息提供了一个默认的消息头的实现,但是消息体则需要根据指令进一步实现。下图是通讯协议涉及的大部分消息体的实现,消息体基本都是一对的,即消息体和响应消息体。
消息体一般是指服务器发出给硬件的指令,这样的消息体需要构造所有的字段,并要实现ToContent方法,将消息转换成字节码,发送给硬件;而响应消息一般是由硬件发送给服务器的消息,它至少需要实现TryParse方法,将硬件字节码解析成有意义的字段,供业务逻辑层访问。
下面是一个消息的定义。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<code>using</code> <code>System;</code>
<code>using</code> <code>System.Collections.Generic;</code>
<code>using</code> <code>System.Linq;</code>
<code>using</code> <code>System.Text;</code>
<code>using</code> <code>UIShell.CommServerService.Utility;</code>
<code>using</code> <code>System.ComponentModel;</code>
<code>namespace</code> <code>UIShell.CommServerService.Protocol.Scata30.Message</code>
<code>{</code>
<code> </code><code>[Description(</code><code>"读取单一表"</code><code>)]</code>
<code> </code><code>public</code> <code>class</code> <code>Scata30ReadMeterMessageBody : Scata30MessageBody</code>
<code> </code><code>{</code>
<code> </code><code>public</code> <code>byte</code> <code>MeterProtocolCategory;</code>
<code> </code><code>public</code> <code>byte</code> <code>Channel;</code>
<code> </code><code>public</code> <code>byte</code><code>[] MeterAddressBCD;</code>
<code> </code><code>public</code> <code>long</code> <code>MeterAddress;</code>
<code> </code><code>internal</code> <code>Scata30ReadMeterMessageBody()</code>
<code> </code><code>{</code>
<code> </code><code>}</code>
<code> </code><code>public</code> <code>Scata30ReadMeterMessageBody(</code><code>byte</code> <code>meterProtocol, </code><code>byte</code> <code>channel,</code>
<code>long</code> <code>meterAddress)</code>
<code> </code><code>MeterProtocolCategory = meterProtocol;</code>
<code> </code><code>Channel = channel;</code>
<code> </code><code>MeterAddress = meterAddress;</code>
<code> </code><code>MeterAddressBCD = ProtocolUtility.MeterAddressFromLong(meterAddress, </code><code>true</code><code>);</code>
<code> </code><code>protected</code> <code>override</code> <code>bool</code> <code>TryParseWithoutCheckCode(</code><code>byte</code><code>[] bodyContent)</code>
<code> </code><code>throw</code> <code>new</code> <code>NotImplementedException();</code>
<code> </code><code>protected</code> <code>override</code> <code>byte</code><code>[] ToContentWithoutCheckCode()</code>
<code> </code><code>return</code> <code>new</code> <code>byte</code><code>[] {</code>
<code>MeterProtocolCategory, Channel }.Concat(MeterAddressBCD).ToArray();</code>
<code> </code><code>public</code> <code>override</code> <code>string</code> <code>ToString()</code>
<code> </code><code>return</code> <code>string</code><code>.Format(</code><code>"协议类型={0},通道号={1},表地址={2}"</code><code>,</code>
<code>MeterProtocolCategory, Channel, MeterAddress);</code>
<code> </code><code>}</code>
<code>}</code>
下面则是响应消息的实现。
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
<code> </code><code>[Description(</code><code>"读取单一表响应"</code><code>)]</code>
<code> </code><code>public</code> <code>class</code> <code>Scata30ReadMeterResponseMessageBody : Scata30MessageBody</code>
<code> </code><code>public</code> <code>Scata30ResponseStatus ResponseStatus;</code>
<code> </code><code>/// <summary></code>
<code> </code><code>/// 表数据,同读取多表的数据类似。</code>
<code> </code><code>/// </summary></code>
<code> </code><code>public</code> <code>byte</code><code>[] MeterBodyContent;</code>
<code> </code><code>public</code> <code>Scata30ReadMeterResponseMessageBody()</code>
<code> </code>
<code> </code><code>if</code> <code>(bodyContent == </code><code>null</code> <code>|| bodyContent.Length == 0)</code>
<code> </code><code>{</code>
<code> </code><code>_log.Error(</code><code>string</code><code>.Format(UIShell.CommServerService.Properties.Resources.</code>
<code>ParseMessageBodyFailed, ProtocolUtility.BytesToHexString(bodyContent)));</code>
<code> </code><code>return</code> <code>false</code><code>;</code>
<code> </code><code>}</code>
<code> </code><code>if</code> <code>(bodyContent.Length == 1)</code>
<code> </code><code>if</code> <code>(bodyContent[0] != (</code><code>byte</code><code>)Scata30ResponseStatus.Failed)</code>
<code> </code><code>{</code>
<code>_log.Error(</code><code>string</code><code>.Format(UIShell.CommServerService.Properties.Resources.</code>
<code> </code><code>return</code> <code>false</code><code>;</code>
<code> </code><code>}</code>
<code> </code><code>else</code>
<code> </code><code>ResponseStatus = (Scata30ResponseStatus)bodyContent[0];</code>
<code> </code><code>else</code>
<code> </code><code>ResponseStatus = Scata30ResponseStatus.Success;</code>
<code> </code><code>MeterBodyContent = bodyContent;</code>
<code> </code><code>return</code> <code>true</code><code>;</code>
<code> </code><code>if</code> <code>(ResponseStatus == Scata30ResponseStatus.Failed)</code>
<code> </code><code>return</code> <code>new</code> <code>byte</code><code>[] { (</code><code>byte</code><code>)ResponseStatus };</code>
<code> </code><code>return</code> <code>MeterBodyContent;</code>
<code> </code><code>return</code> <code>string</code><code>.Format(</code><code>"状态={0},表数据={1}"</code><code>,</code>
<code>EnumDescriptionHelper.GetDescription(ResponseStatus), ProtocolUtility.BytesToHexString(MeterBodyContent));</code>
4.2 通讯协议的组成——RoundTrip(往返)
通讯服务器与硬件的通讯过程是由一组的对话来实现的,每一组对话都是问答式的方式来完成。我们把一次问答式的对话用RoundTripBase这个类型来表示。问答式的对话又分成主动式(ActiveRoundTrip)和被动式(PassiveRoundTrip),即服务器发起然后硬件响应,或者硬件发起服务器响应。有时,一次问答式的对话可能需要由若干组的子对话来实现,我们称其为组合对话(CompositeRoundTripBase)。有关通讯协议对话过程涉及的基类设计如下。
对话RoundTripBase的详细设计如下所示,它由优先级、时间戳属性组成,提供了Start方法表示会话开始,以及OnCompleted和OnError事件。RoundTripQueue则是对话队列,它严格限制通讯协议每次只能执行一个RoundTrip,不能交叉运行,这个RoundTripQueue是一个线程安全的,因为通讯协议会被远程通讯线程、协议线程、UI线程等线程来访问。
4.3 协议的RoundTrip实现
在本系统中,我们使用SCATA 3.0通讯协议,这里我们实现了2个基类:Scata30ActiveRoundTrip和Scata30PassiveRoundTrip。
在Scata30ActiveRoundTrip中,它在Start方法中,将利用StreamAdapter来从通讯信道中获取一条消息,一旦消息解析成功后,将发送响应消息包。这个对话,一旦中间发生错误或者超时,将重试若干次。同理,Scata30PassiveRoundTrip也是如此实现。
接下来,我们根据通讯协议,定义了如下的对话。
下面我们来看一个对话的实现。
<code> </code><code>using</code> <code>System.Collections.Generic;</code>
<code> </code><code>using</code> <code>System.Linq;</code>
<code> </code><code>using</code> <code>System.Text;</code>
<code> </code><code>using</code> <code>UIShell.CommServerService.Protocol.Scata30.Message;</code>
<code> </code><code>using</code> <code>UIShell.CommServerService.Utility;</code>
<code> </code><code>using</code> <code>System.ComponentModel;</code>
<code> </code>
<code> </code><code>namespace</code> <code>UIShell.CommServerService.Protocol.Scata30.RoundTrip</code>
<code> </code><code>{</code>
<code> </code><code>[Description(</code><code>"读取指定时间点表数据"</code><code>)]</code>
<code> </code><code>public</code> <code>class</code> <code>Scata30ReadHistoricalMeterRoundTrip : Scata30HasNextActiveRoundTrip<Scata30ReadHistoricalMeterMessageBody, Scata30ReadHistoricalMeterResponseMessageBody></code>
<code> </code><code>{</code>
<code> </code><code>public</code> <code>DateTime HistoricalDateTime;</code>
<code> </code><code>public</code> <code>Scata30ReadHistoricalMeterRoundTrip(</code>
<code> </code><code>ushort</code> <code>destinationAddress,</code>
<code> </code><code>ushort</code> <code>destinationZigbeeAddress,</code>
<code> </code><code>DateTime timeStamp,</code>
<code> </code><code>Scata30Protocol protocol)</code>
<code> </code><code>: </code><code>base</code><code>(destinationAddress, destinationZigbeeAddress, </code><code>new</code> <code>Scata30Message<Scata30ReadHistoricalMeterMessageBody>(Scata30MessageType.ReadMeterByDate, protocol.MasterStationAddress, destinationAddress, 0, DateTime.Now, </code><code>new</code> <code>Scata30ReadHistoricalMeterMessageBody(timeStamp)), Scata30MessageType.ReadMeterByDateResponse, protocol)</code>
<code> </code><code>{</code>
<code> </code><code>HistoricalDateTime = timeStamp;</code>
<code> </code><code>}</code>
<code> </code><code>public</code> <code>override</code> <code>void</code> <code>ReceiveResponseMessages()</code>
<code> </code><code>base</code><code>.ReceiveResponseMessages();</code>
<code> </code><code>foreach</code> <code>(</code><code>var</code> <code>message </code><code>in</code> <code>ReceivedResponseMessages)</code>
<code> </code><code>{</code>
<code> </code><code>if</code> <code>(!message.Body.HistoricalDateTime.Equals(HistoricalDateTime))</code>
<code> </code><code>{</code>
<code> </code><code>_log.Error(</code><code>string</code><code>.Format(</code><code>"Read the historical meter content error since the date time mismatched. The require date time is '{0}', return by concentrator is '{1}'"</code><code>, HistoricalDateTime.ToString(</code><code>"yyyy-MM-dd HH:mm:ss"</code><code>), message.Body.HistoricalDateTime.ToString(</code><code>"yyyy-MM-dd HH:mm:ss"</code><code>)));</code>
<code> </code><code>// throw new Exception("Parse message error since historical date time mismatched.");</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
4.4 通讯协议的实现
通讯协议的实现类图如下所示,由于通讯协议与通讯方式、业务逻辑无关,因此,在这里我们引入StreamAdapter和StreamProvider来屏蔽这些上下文。StreamAdapter的功能是获取一条消息和发送一条消息,StreamProvider则是为不同通讯方式提供通讯流。
下面我来描述协议类的关键实现。协议类内部有一个线程来实现与硬件的通讯。这个线程会一直运行,然后从对话队列中不停获取RoundTrip,一旦获取的RoundTrip不会空,则运行这个RoundTrip,否则线程进入休眠状态。
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
<code>public</code> <code>bool</code> <code>Start()</code>
<code> </code><code>if</code> <code>(_started)</code>
<code> </code><code>return</code> <code>true</code><code>;</code>
<code> </code><code>FireOnStarting();</code>
<code> </code><code>try</code>
<code> </code><code>CommStreamProvider.Start();</code>
<code> </code><code>catch</code> <code>(Exception ex)</code>
<code> </code><code>_log.Error(</code><code>"Start the communication provider failed."</code><code>, ex);</code>
<code> </code><code>return</code> <code>false</code><code>;</code>
<code> </code><code>_thread = </code><code>new</code> <code>Thread(() => {</code>
<code> </code><code>RoundTripBase roundTrip;</code>
<code> </code><code>while</code> <code>(!_exited)</code>
<code> </code><code>Monitor.Enter(_queue.SyncRoot);</code>
<code> </code><code>roundTrip = Dequeue();</code>
<code> </code><code>if</code> <code>(roundTrip != </code><code>null</code><code>)</code>
<code> </code><code>try</code>
<code> </code><code>Monitor.Exit(_queue.SyncRoot);</code>
<code> </code><code>OnRoundTripStartingHandler(</code><code>this</code><code>,</code>
<code>new</code> <code>RoundTripEventArgs() { RoundTrip = roundTrip });</code>
<code> </code><code>roundTrip.Start();</code>
<code> </code><code>catch</code> <code>(ThreadAbortException)</code>
<code> </code><code>Trace(</code><code>"通讯线程被终止。"</code><code>);</code>
<code> </code><code>throw</code><code>;</code>
<code> </code><code>catch</code> <code>(Scata30StreamException ex) </code><code>// 无法获取Stream的时候,直接退出</code>
<code> </code><code>_exited = </code><code>true</code><code>;</code>
<code> </code><code>roundTrip.Trace(</code><code>"会话失败,因为:连接已经关闭。"</code><code>);</code>
<code> </code><code>catch</code> <code>(Exception ex)</code>
<code> </code><code>string</code> <code>error = GetErrorMessage(ex);</code>
<code> </code><code>roundTrip.Trace(</code><code>string</code><code>.Format(</code><code>"会话失败,因为:{0}。"</code><code>, error));</code>
<code> </code><code>if</code> <code>(!_exited)</code>
<code> </code><code>roundTrip.Trace(Environment.NewLine);</code>
<code> </code><code>OnRoundTripStartedHandler(</code><code>this</code><code>,</code>
<code> </code><code>// 1 将当前失败的RoundTrip保存入队</code>
<code> </code><code>FailedRoundTrips.Enqueue(roundTrip);</code>
<code> </code><code>// 2 保存其它没有处理的RoundTrip</code>
<code> </code><code>do</code>
<code> </code><code>{</code>
<code> </code><code>roundTrip = _queue.Dequeue();</code>
<code> </code><code>if</code> <code>(roundTrip != </code><code>null</code><code>)</code>
<code> </code><code>{</code>
<code> </code><code>FailedRoundTrips.Enqueue(roundTrip);</code>
<code> </code><code>}</code>
<code> </code><code>} </code><code>while</code> <code>(roundTrip != </code><code>null</code><code>);</code>
<code> </code><code>// 3 停止当前协议</code>
<code> </code><code>Stop();</code>
<code> </code><code>// 执行完RoundTrip后,开始清理资源</code>
<code> </code><code>roundTrip.Dispose();</code>
<code> </code><code>Monitor.Exit(_queue.SyncRoot);</code>
<code> </code><code>OnIdleHandler(</code><code>this</code><code>, </code><code>new</code> <code>RoundTripEventArgs());</code>
<code> </code><code>_autoResetEvent.WaitOne();</code>
<code> </code><code>});</code>
<code> </code><code>_thread.Start();</code>
<code> </code><code>_started = </code><code>true</code><code>;</code>
<code> </code><code>FireOnStarted();</code>
<code> </code><code>return</code> <code>true</code><code>;</code>
执行对话,是以异步的方式来进行,通过事件进行通知。如下所示。
<code>using</code> <code>UIShell.CommServerService.Protocol.Scata30.RoundTrip;</code>
<code>using</code> <code>UIShell.CommServerService.Protocol.Scata30.Message;</code>
<code>namespace</code> <code>UIShell.CommServerService.Protocol.Scata30</code>
<code> </code><code>public</code> <code>partial</code> <code>class</code> <code>Scata30Protocol</code>
<code> </code><code>public</code> <code>Scata30SetConcentratorTimeRoundTrip SetConcentratorTime(</code>
<code> </code><code>ushort</code> <code>concentratorAddress,</code>
<code> </code><code>ushort</code> <code>concentratorZigbeeAddress,</code>
<code> </code><code>DateTime timeStamp,</code>
<code> </code><code>EventHandler<RoundTripEventArgs> onMessageSend,</code>
<code> </code><code>EventHandler<RoundTripEventArgs> onCompleted,</code>
<code> </code><code>EventHandler<RoundTripEventArgs> onError)</code>
<code> </code><code>var</code> <code>roundTrip = </code><code>new</code> <code>Scata30SetConcentratorTimeRoundTrip(</code>
<code> </code><code>concentratorAddress,</code>
<code> </code><code>concentratorZigbeeAddress,</code>
<code> </code><code>timeStamp,</code>
<code> </code><code>this</code><code>);</code>
<code> </code><code>if</code> <code>(onMessageSend != </code><code>null</code><code>)</code>
<code> </code><code>roundTrip.OnMessageSend += onMessageSend;</code>
<code> </code><code>if</code> <code>(onCompleted != </code><code>null</code><code>)</code>
<code> </code><code>roundTrip.OnCompleted += onCompleted;</code>
<code> </code><code>if</code> <code>(onError != </code><code>null</code><code>)</code>
<code> </code><code>roundTrip.OnError += onError;</code>
<code> </code><code>Enqueue(roundTrip);</code>
<code> </code><code>return</code> <code>roundTrip;</code>
这个通讯协议的实现非常优雅,在维护的过程中,通讯指令的变更和通讯方式的转变,都不需要再修改协议和RoundTrip本身,只需要对消息体进行变更并增加新的StreamProvider,并在上层的业务逻辑进行实现。关于通讯协议的设计,限于篇幅描述到这,在下文将继续描述。
本文转自道法自然博客园博客,原文链接:http://www.cnblogs.com/baihmpgy/archive/2012/12/27/2836122.html,如需转载请自行联系原作者