4 基于RoundTrip(往返)的通訊協定設計
通訊伺服器插件的核心為3部分:(1)與通訊方式、業務邏輯無關的通訊協定實作;(2)和通訊方式、業務邏輯有關的通訊業務邏輯的實作;(3)遠端通訊消息隊列。在這裡我将重點描述通訊協定的實作。這個通訊協定的實作比較靈巧。
4.1 通訊協定基本單元——消息
通訊協定的通訊單元是消息,以下是來自硬體開發工程師編寫的協定,消息包由前導符、起始符、消息頭、校驗碼、消息體、結束符等部分組成。不同的通訊指令,發出的消息和接收到消息均不相同。

通訊協定必須能夠發出正确的消息和解析響應的消息包,此外,硬體能接受的消息是位元組格式,而通訊伺服器軟體能夠正确識别的則是各個有意義的字段。為此,我們為消息設計了如下的基類。消息較小的單元是一個MessagePart,它提供了ToContent和ToMessage方法分别用于轉換成位元組碼和字元串,此外,它還定義了TryParse方法用于将位元組碼解析成有意義的MessagePart對象。這裡定義了ParseMessageException異常,當消息解析失敗時,抛出該異常。下面是消息頭、消息體以及消息基類的定義。消息基類由字首、起始、頭、體和字尾部分組成。
接着我們根據硬體開發工程師提供的SCATA 3.0協定,定義與通訊協定相關的消息基類Scata30Message。這個消息提供了一個預設的消息頭的實作,但是消息體則需要根據指令進一步實作。下圖是通訊協定涉及的大部分消息體的實作,消息體基本都是一對的,即消息體和響應消息體。
消息體一般是指伺服器發出給硬體的指令,這樣的消息體需要構造所有的字段,并要實作ToContent方法,将消息轉換成位元組碼,發送給硬體;而響應消息一般是由硬體發送給伺服器的消息,它至少需要實作TryParse方法,将硬體位元組碼解析成有意義的字段,供業務邏輯層通路。
下面是一個消息的定義。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<code>using</code> <code>System;</code>
<code>using</code> <code>System.Collections.Generic;</code>
<code>using</code> <code>System.Linq;</code>
<code>using</code> <code>System.Text;</code>
<code>using</code> <code>UIShell.CommServerService.Utility;</code>
<code>using</code> <code>System.ComponentModel;</code>
<code>namespace</code> <code>UIShell.CommServerService.Protocol.Scata30.Message</code>
<code>{</code>
<code> </code><code>[Description(</code><code>"讀取單一表"</code><code>)]</code>
<code> </code><code>public</code> <code>class</code> <code>Scata30ReadMeterMessageBody : Scata30MessageBody</code>
<code> </code><code>{</code>
<code> </code><code>public</code> <code>byte</code> <code>MeterProtocolCategory;</code>
<code> </code><code>public</code> <code>byte</code> <code>Channel;</code>
<code> </code><code>public</code> <code>byte</code><code>[] MeterAddressBCD;</code>
<code> </code><code>public</code> <code>long</code> <code>MeterAddress;</code>
<code> </code><code>internal</code> <code>Scata30ReadMeterMessageBody()</code>
<code> </code><code>{</code>
<code> </code><code>}</code>
<code> </code><code>public</code> <code>Scata30ReadMeterMessageBody(</code><code>byte</code> <code>meterProtocol, </code><code>byte</code> <code>channel,</code>
<code>long</code> <code>meterAddress)</code>
<code> </code><code>MeterProtocolCategory = meterProtocol;</code>
<code> </code><code>Channel = channel;</code>
<code> </code><code>MeterAddress = meterAddress;</code>
<code> </code><code>MeterAddressBCD = ProtocolUtility.MeterAddressFromLong(meterAddress, </code><code>true</code><code>);</code>
<code> </code><code>protected</code> <code>override</code> <code>bool</code> <code>TryParseWithoutCheckCode(</code><code>byte</code><code>[] bodyContent)</code>
<code> </code><code>throw</code> <code>new</code> <code>NotImplementedException();</code>
<code> </code><code>protected</code> <code>override</code> <code>byte</code><code>[] ToContentWithoutCheckCode()</code>
<code> </code><code>return</code> <code>new</code> <code>byte</code><code>[] {</code>
<code>MeterProtocolCategory, Channel }.Concat(MeterAddressBCD).ToArray();</code>
<code> </code><code>public</code> <code>override</code> <code>string</code> <code>ToString()</code>
<code> </code><code>return</code> <code>string</code><code>.Format(</code><code>"協定類型={0},通道号={1},表位址={2}"</code><code>,</code>
<code>MeterProtocolCategory, Channel, MeterAddress);</code>
<code> </code><code>}</code>
<code>}</code>
下面則是響應消息的實作。
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
<code> </code><code>[Description(</code><code>"讀取單一表響應"</code><code>)]</code>
<code> </code><code>public</code> <code>class</code> <code>Scata30ReadMeterResponseMessageBody : Scata30MessageBody</code>
<code> </code><code>public</code> <code>Scata30ResponseStatus ResponseStatus;</code>
<code> </code><code>/// <summary></code>
<code> </code><code>/// 表資料,同讀取多表的資料類似。</code>
<code> </code><code>/// </summary></code>
<code> </code><code>public</code> <code>byte</code><code>[] MeterBodyContent;</code>
<code> </code><code>public</code> <code>Scata30ReadMeterResponseMessageBody()</code>
<code> </code>
<code> </code><code>if</code> <code>(bodyContent == </code><code>null</code> <code>|| bodyContent.Length == 0)</code>
<code> </code><code>{</code>
<code> </code><code>_log.Error(</code><code>string</code><code>.Format(UIShell.CommServerService.Properties.Resources.</code>
<code>ParseMessageBodyFailed, ProtocolUtility.BytesToHexString(bodyContent)));</code>
<code> </code><code>return</code> <code>false</code><code>;</code>
<code> </code><code>}</code>
<code> </code><code>if</code> <code>(bodyContent.Length == 1)</code>
<code> </code><code>if</code> <code>(bodyContent[0] != (</code><code>byte</code><code>)Scata30ResponseStatus.Failed)</code>
<code> </code><code>{</code>
<code>_log.Error(</code><code>string</code><code>.Format(UIShell.CommServerService.Properties.Resources.</code>
<code> </code><code>return</code> <code>false</code><code>;</code>
<code> </code><code>}</code>
<code> </code><code>else</code>
<code> </code><code>ResponseStatus = (Scata30ResponseStatus)bodyContent[0];</code>
<code> </code><code>else</code>
<code> </code><code>ResponseStatus = Scata30ResponseStatus.Success;</code>
<code> </code><code>MeterBodyContent = bodyContent;</code>
<code> </code><code>return</code> <code>true</code><code>;</code>
<code> </code><code>if</code> <code>(ResponseStatus == Scata30ResponseStatus.Failed)</code>
<code> </code><code>return</code> <code>new</code> <code>byte</code><code>[] { (</code><code>byte</code><code>)ResponseStatus };</code>
<code> </code><code>return</code> <code>MeterBodyContent;</code>
<code> </code><code>return</code> <code>string</code><code>.Format(</code><code>"狀态={0},表資料={1}"</code><code>,</code>
<code>EnumDescriptionHelper.GetDescription(ResponseStatus), ProtocolUtility.BytesToHexString(MeterBodyContent));</code>
4.2 通訊協定的組成——RoundTrip(往返)
通訊伺服器與硬體的通訊過程是由一組的對話來實作的,每一組對話都是問答式的方式來完成。我們把一次問答式的對話用RoundTripBase這個類型來表示。問答式的對話又分成主動式(ActiveRoundTrip)和被動式(PassiveRoundTrip),即伺服器發起然後硬體響應,或者硬體發起伺服器響應。有時,一次問答式的對話可能需要由若幹組的子對話來實作,我們稱其為組合對話(CompositeRoundTripBase)。有關通訊協定對話過程涉及的基類設計如下。
對話RoundTripBase的詳細設計如下所示,它由優先級、時間戳屬性組成,提供了Start方法表示會話開始,以及OnCompleted和OnError事件。RoundTripQueue則是對話隊列,它嚴格限制通訊協定每次隻能執行一個RoundTrip,不能交叉運作,這個RoundTripQueue是一個線程安全的,因為通訊協定會被遠端通訊線程、協定線程、UI線程等線程來通路。
4.3 協定的RoundTrip實作
在本系統中,我們使用SCATA 3.0通訊協定,這裡我們實作了2個基類:Scata30ActiveRoundTrip和Scata30PassiveRoundTrip。
在Scata30ActiveRoundTrip中,它在Start方法中,将利用StreamAdapter來從通訊信道中擷取一條消息,一旦消息解析成功後,将發送響應消息包。這個對話,一旦中間發生錯誤或者逾時,将重試若幹次。同理,Scata30PassiveRoundTrip也是如此實作。
接下來,我們根據通訊協定,定義了如下的對話。
下面我們來看一個對話的實作。
<code> </code><code>using</code> <code>System.Collections.Generic;</code>
<code> </code><code>using</code> <code>System.Linq;</code>
<code> </code><code>using</code> <code>System.Text;</code>
<code> </code><code>using</code> <code>UIShell.CommServerService.Protocol.Scata30.Message;</code>
<code> </code><code>using</code> <code>UIShell.CommServerService.Utility;</code>
<code> </code><code>using</code> <code>System.ComponentModel;</code>
<code> </code>
<code> </code><code>namespace</code> <code>UIShell.CommServerService.Protocol.Scata30.RoundTrip</code>
<code> </code><code>{</code>
<code> </code><code>[Description(</code><code>"讀取指定時間點表資料"</code><code>)]</code>
<code> </code><code>public</code> <code>class</code> <code>Scata30ReadHistoricalMeterRoundTrip : Scata30HasNextActiveRoundTrip<Scata30ReadHistoricalMeterMessageBody, Scata30ReadHistoricalMeterResponseMessageBody></code>
<code> </code><code>{</code>
<code> </code><code>public</code> <code>DateTime HistoricalDateTime;</code>
<code> </code><code>public</code> <code>Scata30ReadHistoricalMeterRoundTrip(</code>
<code> </code><code>ushort</code> <code>destinationAddress,</code>
<code> </code><code>ushort</code> <code>destinationZigbeeAddress,</code>
<code> </code><code>DateTime timeStamp,</code>
<code> </code><code>Scata30Protocol protocol)</code>
<code> </code><code>: </code><code>base</code><code>(destinationAddress, destinationZigbeeAddress, </code><code>new</code> <code>Scata30Message<Scata30ReadHistoricalMeterMessageBody>(Scata30MessageType.ReadMeterByDate, protocol.MasterStationAddress, destinationAddress, 0, DateTime.Now, </code><code>new</code> <code>Scata30ReadHistoricalMeterMessageBody(timeStamp)), Scata30MessageType.ReadMeterByDateResponse, protocol)</code>
<code> </code><code>{</code>
<code> </code><code>HistoricalDateTime = timeStamp;</code>
<code> </code><code>}</code>
<code> </code><code>public</code> <code>override</code> <code>void</code> <code>ReceiveResponseMessages()</code>
<code> </code><code>base</code><code>.ReceiveResponseMessages();</code>
<code> </code><code>foreach</code> <code>(</code><code>var</code> <code>message </code><code>in</code> <code>ReceivedResponseMessages)</code>
<code> </code><code>{</code>
<code> </code><code>if</code> <code>(!message.Body.HistoricalDateTime.Equals(HistoricalDateTime))</code>
<code> </code><code>{</code>
<code> </code><code>_log.Error(</code><code>string</code><code>.Format(</code><code>"Read the historical meter content error since the date time mismatched. The require date time is '{0}', return by concentrator is '{1}'"</code><code>, HistoricalDateTime.ToString(</code><code>"yyyy-MM-dd HH:mm:ss"</code><code>), message.Body.HistoricalDateTime.ToString(</code><code>"yyyy-MM-dd HH:mm:ss"</code><code>)));</code>
<code> </code><code>// throw new Exception("Parse message error since historical date time mismatched.");</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
4.4 通訊協定的實作
通訊協定的實作類圖如下所示,由于通訊協定與通訊方式、業務邏輯無關,是以,在這裡我們引入StreamAdapter和StreamProvider來屏蔽這些上下文。StreamAdapter的功能是擷取一條消息和發送一條消息,StreamProvider則是為不同通訊方式提供通訊流。
下面我來描述協定類的關鍵實作。協定類内部有一個線程來實作與硬體的通訊。這個線程會一直運作,然後從對話隊列中不停擷取RoundTrip,一旦擷取的RoundTrip不會空,則運作這個RoundTrip,否則線程進入休眠狀态。
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
<code>public</code> <code>bool</code> <code>Start()</code>
<code> </code><code>if</code> <code>(_started)</code>
<code> </code><code>return</code> <code>true</code><code>;</code>
<code> </code><code>FireOnStarting();</code>
<code> </code><code>try</code>
<code> </code><code>CommStreamProvider.Start();</code>
<code> </code><code>catch</code> <code>(Exception ex)</code>
<code> </code><code>_log.Error(</code><code>"Start the communication provider failed."</code><code>, ex);</code>
<code> </code><code>return</code> <code>false</code><code>;</code>
<code> </code><code>_thread = </code><code>new</code> <code>Thread(() => {</code>
<code> </code><code>RoundTripBase roundTrip;</code>
<code> </code><code>while</code> <code>(!_exited)</code>
<code> </code><code>Monitor.Enter(_queue.SyncRoot);</code>
<code> </code><code>roundTrip = Dequeue();</code>
<code> </code><code>if</code> <code>(roundTrip != </code><code>null</code><code>)</code>
<code> </code><code>try</code>
<code> </code><code>Monitor.Exit(_queue.SyncRoot);</code>
<code> </code><code>OnRoundTripStartingHandler(</code><code>this</code><code>,</code>
<code>new</code> <code>RoundTripEventArgs() { RoundTrip = roundTrip });</code>
<code> </code><code>roundTrip.Start();</code>
<code> </code><code>catch</code> <code>(ThreadAbortException)</code>
<code> </code><code>Trace(</code><code>"通訊線程被終止。"</code><code>);</code>
<code> </code><code>throw</code><code>;</code>
<code> </code><code>catch</code> <code>(Scata30StreamException ex) </code><code>// 無法擷取Stream的時候,直接退出</code>
<code> </code><code>_exited = </code><code>true</code><code>;</code>
<code> </code><code>roundTrip.Trace(</code><code>"會話失敗,因為:連接配接已經關閉。"</code><code>);</code>
<code> </code><code>catch</code> <code>(Exception ex)</code>
<code> </code><code>string</code> <code>error = GetErrorMessage(ex);</code>
<code> </code><code>roundTrip.Trace(</code><code>string</code><code>.Format(</code><code>"會話失敗,因為:{0}。"</code><code>, error));</code>
<code> </code><code>if</code> <code>(!_exited)</code>
<code> </code><code>roundTrip.Trace(Environment.NewLine);</code>
<code> </code><code>OnRoundTripStartedHandler(</code><code>this</code><code>,</code>
<code> </code><code>// 1 将目前失敗的RoundTrip儲存入隊</code>
<code> </code><code>FailedRoundTrips.Enqueue(roundTrip);</code>
<code> </code><code>// 2 儲存其它沒有處理的RoundTrip</code>
<code> </code><code>do</code>
<code> </code><code>{</code>
<code> </code><code>roundTrip = _queue.Dequeue();</code>
<code> </code><code>if</code> <code>(roundTrip != </code><code>null</code><code>)</code>
<code> </code><code>{</code>
<code> </code><code>FailedRoundTrips.Enqueue(roundTrip);</code>
<code> </code><code>}</code>
<code> </code><code>} </code><code>while</code> <code>(roundTrip != </code><code>null</code><code>);</code>
<code> </code><code>// 3 停止目前協定</code>
<code> </code><code>Stop();</code>
<code> </code><code>// 執行完RoundTrip後,開始清理資源</code>
<code> </code><code>roundTrip.Dispose();</code>
<code> </code><code>Monitor.Exit(_queue.SyncRoot);</code>
<code> </code><code>OnIdleHandler(</code><code>this</code><code>, </code><code>new</code> <code>RoundTripEventArgs());</code>
<code> </code><code>_autoResetEvent.WaitOne();</code>
<code> </code><code>});</code>
<code> </code><code>_thread.Start();</code>
<code> </code><code>_started = </code><code>true</code><code>;</code>
<code> </code><code>FireOnStarted();</code>
<code> </code><code>return</code> <code>true</code><code>;</code>
執行對話,是以異步的方式來進行,通過事件進行通知。如下所示。
<code>using</code> <code>UIShell.CommServerService.Protocol.Scata30.RoundTrip;</code>
<code>using</code> <code>UIShell.CommServerService.Protocol.Scata30.Message;</code>
<code>namespace</code> <code>UIShell.CommServerService.Protocol.Scata30</code>
<code> </code><code>public</code> <code>partial</code> <code>class</code> <code>Scata30Protocol</code>
<code> </code><code>public</code> <code>Scata30SetConcentratorTimeRoundTrip SetConcentratorTime(</code>
<code> </code><code>ushort</code> <code>concentratorAddress,</code>
<code> </code><code>ushort</code> <code>concentratorZigbeeAddress,</code>
<code> </code><code>DateTime timeStamp,</code>
<code> </code><code>EventHandler<RoundTripEventArgs> onMessageSend,</code>
<code> </code><code>EventHandler<RoundTripEventArgs> onCompleted,</code>
<code> </code><code>EventHandler<RoundTripEventArgs> onError)</code>
<code> </code><code>var</code> <code>roundTrip = </code><code>new</code> <code>Scata30SetConcentratorTimeRoundTrip(</code>
<code> </code><code>concentratorAddress,</code>
<code> </code><code>concentratorZigbeeAddress,</code>
<code> </code><code>timeStamp,</code>
<code> </code><code>this</code><code>);</code>
<code> </code><code>if</code> <code>(onMessageSend != </code><code>null</code><code>)</code>
<code> </code><code>roundTrip.OnMessageSend += onMessageSend;</code>
<code> </code><code>if</code> <code>(onCompleted != </code><code>null</code><code>)</code>
<code> </code><code>roundTrip.OnCompleted += onCompleted;</code>
<code> </code><code>if</code> <code>(onError != </code><code>null</code><code>)</code>
<code> </code><code>roundTrip.OnError += onError;</code>
<code> </code><code>Enqueue(roundTrip);</code>
<code> </code><code>return</code> <code>roundTrip;</code>
這個通訊協定的實作非常優雅,在維護的過程中,通訊指令的變更和通訊方式的轉變,都不需要再修改協定和RoundTrip本身,隻需要對消息體進行變更并增加新的StreamProvider,并在上層的業務邏輯進行實作。關于通訊協定的設計,限于篇幅描述到這,在下文将繼續描述。
本文轉自道法自然部落格園部落格,原文連結:http://www.cnblogs.com/baihmpgy/archive/2012/12/27/2836122.html,如需轉載請自行聯系原作者