天天看點

分享一個與硬體通訊的分布式監控與遠端控制程式的設計(中:通訊協定設計與實作)

4 基于RoundTrip(往返)的通訊協定設計

通訊伺服器插件的核心為3部分:(1)與通訊方式、業務邏輯無關的通訊協定實作;(2)和通訊方式、業務邏輯有關的通訊業務邏輯的實作;(3)遠端通訊消息隊列。在這裡我将重點描述通訊協定的實作。這個通訊協定的實作比較靈巧。

4.1 通訊協定基本單元——消息

通訊協定的通訊單元是消息,以下是來自硬體開發工程師編寫的協定,消息包由前導符、起始符、消息頭、校驗碼、消息體、結束符等部分組成。不同的通訊指令,發出的消息和接收到消息均不相同。

分享一個與硬體通訊的分布式監控與遠端控制程式的設計(中:通訊協定設計與實作)

通訊協定必須能夠發出正确的消息和解析響應的消息包,此外,硬體能接受的消息是位元組格式,而通訊伺服器軟體能夠正确識别的則是各個有意義的字段。為此,我們為消息設計了如下的基類。消息較小的單元是一個MessagePart,它提供了ToContent和ToMessage方法分别用于轉換成位元組碼和字元串,此外,它還定義了TryParse方法用于将位元組碼解析成有意義的MessagePart對象。這裡定義了ParseMessageException異常,當消息解析失敗時,抛出該異常。下面是消息頭、消息體以及消息基類的定義。消息基類由字首、起始、頭、體和字尾部分組成。

分享一個與硬體通訊的分布式監控與遠端控制程式的設計(中:通訊協定設計與實作)

接着我們根據硬體開發工程師提供的SCATA 3.0協定,定義與通訊協定相關的消息基類Scata30Message。這個消息提供了一個預設的消息頭的實作,但是消息體則需要根據指令進一步實作。下圖是通訊協定涉及的大部分消息體的實作,消息體基本都是一對的,即消息體和響應消息體。

分享一個與硬體通訊的分布式監控與遠端控制程式的設計(中:通訊協定設計與實作)

消息體一般是指伺服器發出給硬體的指令,這樣的消息體需要構造所有的字段,并要實作ToContent方法,将消息轉換成位元組碼,發送給硬體;而響應消息一般是由硬體發送給伺服器的消息,它至少需要實作TryParse方法,将硬體位元組碼解析成有意義的字段,供業務邏輯層通路。

下面是一個消息的定義。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

<code>using</code> <code>System;</code>

<code>using</code> <code>System.Collections.Generic;</code>

<code>using</code> <code>System.Linq;</code>

<code>using</code> <code>System.Text;</code>

<code>using</code> <code>UIShell.CommServerService.Utility;</code>

<code>using</code> <code>System.ComponentModel;</code>

<code>namespace</code> <code>UIShell.CommServerService.Protocol.Scata30.Message</code>

<code>{</code>

<code>    </code><code>[Description(</code><code>"讀取單一表"</code><code>)]</code>

<code>    </code><code>public</code> <code>class</code> <code>Scata30ReadMeterMessageBody : Scata30MessageBody</code>

<code>    </code><code>{</code>

<code>        </code><code>public</code> <code>byte</code> <code>MeterProtocolCategory;</code>

<code>        </code><code>public</code> <code>byte</code> <code>Channel;</code>

<code>        </code><code>public</code> <code>byte</code><code>[] MeterAddressBCD;</code>

<code>        </code><code>public</code> <code>long</code> <code>MeterAddress;</code>

<code>        </code><code>internal</code> <code>Scata30ReadMeterMessageBody()</code>

<code>        </code><code>{</code>

<code>        </code><code>}</code>

<code>        </code><code>public</code> <code>Scata30ReadMeterMessageBody(</code><code>byte</code> <code>meterProtocol, </code><code>byte</code> <code>channel,</code>

<code>long</code> <code>meterAddress)</code>

<code>            </code><code>MeterProtocolCategory = meterProtocol;</code>

<code>            </code><code>Channel = channel;</code>

<code>            </code><code>MeterAddress = meterAddress;</code>

<code>            </code><code>MeterAddressBCD = ProtocolUtility.MeterAddressFromLong(meterAddress, </code><code>true</code><code>);</code>

<code>        </code><code>protected</code> <code>override</code> <code>bool</code> <code>TryParseWithoutCheckCode(</code><code>byte</code><code>[] bodyContent)</code>

<code>            </code><code>throw</code> <code>new</code> <code>NotImplementedException();</code>

<code>        </code><code>protected</code> <code>override</code> <code>byte</code><code>[] ToContentWithoutCheckCode()</code>

<code>            </code><code>return</code> <code>new</code> <code>byte</code><code>[] {</code>

<code>MeterProtocolCategory, Channel }.Concat(MeterAddressBCD).ToArray();</code>

<code>        </code><code>public</code> <code>override</code> <code>string</code> <code>ToString()</code>

<code>            </code><code>return</code> <code>string</code><code>.Format(</code><code>"協定類型={0},通道号={1},表位址={2}"</code><code>,</code>

<code>MeterProtocolCategory, Channel, MeterAddress);</code>

<code>    </code><code>}</code>

<code>}</code>

  

下面則是響應消息的實作。

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

<code>    </code><code>[Description(</code><code>"讀取單一表響應"</code><code>)]</code>

<code>    </code><code>public</code> <code>class</code> <code>Scata30ReadMeterResponseMessageBody : Scata30MessageBody</code>

<code>        </code><code>public</code> <code>Scata30ResponseStatus ResponseStatus;</code>

<code>        </code><code>/// &lt;summary&gt;</code>

<code>        </code><code>/// 表資料,同讀取多表的資料類似。</code>

<code>        </code><code>/// &lt;/summary&gt;</code>

<code>        </code><code>public</code> <code>byte</code><code>[] MeterBodyContent;</code>

<code>        </code><code>public</code> <code>Scata30ReadMeterResponseMessageBody()</code>

<code>            </code> 

<code>            </code><code>if</code> <code>(bodyContent == </code><code>null</code> <code>|| bodyContent.Length == 0)</code>

<code>            </code><code>{</code>

<code>                </code><code>_log.Error(</code><code>string</code><code>.Format(UIShell.CommServerService.Properties.Resources.</code>

<code>ParseMessageBodyFailed, ProtocolUtility.BytesToHexString(bodyContent)));</code>

<code>                </code><code>return</code> <code>false</code><code>;</code>

<code>            </code><code>}</code>

<code>            </code><code>if</code> <code>(bodyContent.Length == 1)</code>

<code>                </code><code>if</code> <code>(bodyContent[0] != (</code><code>byte</code><code>)Scata30ResponseStatus.Failed)</code>

<code>                </code><code>{</code>

<code>_log.Error(</code><code>string</code><code>.Format(UIShell.CommServerService.Properties.Resources.</code>

<code>                    </code><code>return</code> <code>false</code><code>;</code>

<code>                </code><code>}</code>

<code>                </code><code>else</code>

<code>                    </code><code>ResponseStatus = (Scata30ResponseStatus)bodyContent[0];</code>

<code>            </code><code>else</code>

<code>                </code><code>ResponseStatus = Scata30ResponseStatus.Success;</code>

<code>                </code><code>MeterBodyContent = bodyContent;</code>

<code>            </code><code>return</code> <code>true</code><code>;</code>

<code>            </code><code>if</code> <code>(ResponseStatus == Scata30ResponseStatus.Failed)</code>

<code>                </code><code>return</code> <code>new</code> <code>byte</code><code>[] { (</code><code>byte</code><code>)ResponseStatus };</code>

<code>            </code><code>return</code> <code>MeterBodyContent;</code>

<code>            </code><code>return</code> <code>string</code><code>.Format(</code><code>"狀态={0},表資料={1}"</code><code>,</code>

<code>EnumDescriptionHelper.GetDescription(ResponseStatus), ProtocolUtility.BytesToHexString(MeterBodyContent));</code>

4.2 通訊協定的組成——RoundTrip(往返)

通訊伺服器與硬體的通訊過程是由一組的對話來實作的,每一組對話都是問答式的方式來完成。我們把一次問答式的對話用RoundTripBase這個類型來表示。問答式的對話又分成主動式(ActiveRoundTrip)和被動式(PassiveRoundTrip),即伺服器發起然後硬體響應,或者硬體發起伺服器響應。有時,一次問答式的對話可能需要由若幹組的子對話來實作,我們稱其為組合對話(CompositeRoundTripBase)。有關通訊協定對話過程涉及的基類設計如下。

分享一個與硬體通訊的分布式監控與遠端控制程式的設計(中:通訊協定設計與實作)

對話RoundTripBase的詳細設計如下所示,它由優先級、時間戳屬性組成,提供了Start方法表示會話開始,以及OnCompleted和OnError事件。RoundTripQueue則是對話隊列,它嚴格限制通訊協定每次隻能執行一個RoundTrip,不能交叉運作,這個RoundTripQueue是一個線程安全的,因為通訊協定會被遠端通訊線程、協定線程、UI線程等線程來通路。

分享一個與硬體通訊的分布式監控與遠端控制程式的設計(中:通訊協定設計與實作)

4.3 協定的RoundTrip實作

在本系統中,我們使用SCATA 3.0通訊協定,這裡我們實作了2個基類:Scata30ActiveRoundTrip和Scata30PassiveRoundTrip。

分享一個與硬體通訊的分布式監控與遠端控制程式的設計(中:通訊協定設計與實作)

在Scata30ActiveRoundTrip中,它在Start方法中,将利用StreamAdapter來從通訊信道中擷取一條消息,一旦消息解析成功後,将發送響應消息包。這個對話,一旦中間發生錯誤或者逾時,将重試若幹次。同理,Scata30PassiveRoundTrip也是如此實作。

接下來,我們根據通訊協定,定義了如下的對話。

分享一個與硬體通訊的分布式監控與遠端控制程式的設計(中:通訊協定設計與實作)

下面我們來看一個對話的實作。

<code> </code><code>using</code> <code>System.Collections.Generic;</code>

<code> </code><code>using</code> <code>System.Linq;</code>

<code> </code><code>using</code> <code>System.Text;</code>

<code> </code><code>using</code> <code>UIShell.CommServerService.Protocol.Scata30.Message;</code>

<code> </code><code>using</code> <code>UIShell.CommServerService.Utility;</code>

<code> </code><code>using</code> <code>System.ComponentModel;</code>

<code> </code> 

<code> </code><code>namespace</code> <code>UIShell.CommServerService.Protocol.Scata30.RoundTrip</code>

<code> </code><code>{</code>

<code>     </code><code>[Description(</code><code>"讀取指定時間點表資料"</code><code>)]</code>

<code>     </code><code>public</code> <code>class</code> <code>Scata30ReadHistoricalMeterRoundTrip : Scata30HasNextActiveRoundTrip&lt;Scata30ReadHistoricalMeterMessageBody, Scata30ReadHistoricalMeterResponseMessageBody&gt;</code>

<code>     </code><code>{</code>

<code>         </code><code>public</code> <code>DateTime HistoricalDateTime;</code>

<code>         </code><code>public</code> <code>Scata30ReadHistoricalMeterRoundTrip(</code>

<code>             </code><code>ushort</code> <code>destinationAddress,</code>

<code>             </code><code>ushort</code> <code>destinationZigbeeAddress,</code>

<code>             </code><code>DateTime timeStamp,</code>

<code>             </code><code>Scata30Protocol protocol)</code>

<code>             </code><code>: </code><code>base</code><code>(destinationAddress, destinationZigbeeAddress, </code><code>new</code> <code>Scata30Message&lt;Scata30ReadHistoricalMeterMessageBody&gt;(Scata30MessageType.ReadMeterByDate, protocol.MasterStationAddress, destinationAddress, 0, DateTime.Now, </code><code>new</code> <code>Scata30ReadHistoricalMeterMessageBody(timeStamp)), Scata30MessageType.ReadMeterByDateResponse, protocol)</code>

<code>         </code><code>{</code>

<code>             </code><code>HistoricalDateTime = timeStamp;</code>

<code>         </code><code>}</code>

<code>         </code><code>public</code> <code>override</code> <code>void</code> <code>ReceiveResponseMessages()</code>

<code>             </code><code>base</code><code>.ReceiveResponseMessages();</code>

<code>             </code><code>foreach</code> <code>(</code><code>var</code> <code>message </code><code>in</code> <code>ReceivedResponseMessages)</code>

<code>             </code><code>{</code>

<code>                 </code><code>if</code> <code>(!message.Body.HistoricalDateTime.Equals(HistoricalDateTime))</code>

<code>                 </code><code>{</code>

<code>                     </code><code>_log.Error(</code><code>string</code><code>.Format(</code><code>"Read the historical meter content error since the date time mismatched. The require date time is '{0}', return by concentrator is '{1}'"</code><code>, HistoricalDateTime.ToString(</code><code>"yyyy-MM-dd HH:mm:ss"</code><code>), message.Body.HistoricalDateTime.ToString(</code><code>"yyyy-MM-dd HH:mm:ss"</code><code>)));</code>

<code>                     </code><code>// throw new Exception("Parse message error since historical date time mismatched.");</code>

<code>                 </code><code>}</code>

<code>             </code><code>}</code>

<code>     </code><code>}</code>

<code> </code><code>}</code>

4.4 通訊協定的實作

通訊協定的實作類圖如下所示,由于通訊協定與通訊方式、業務邏輯無關,是以,在這裡我們引入StreamAdapter和StreamProvider來屏蔽這些上下文。StreamAdapter的功能是擷取一條消息和發送一條消息,StreamProvider則是為不同通訊方式提供通訊流。

分享一個與硬體通訊的分布式監控與遠端控制程式的設計(中:通訊協定設計與實作)

下面我來描述協定類的關鍵實作。協定類内部有一個線程來實作與硬體的通訊。這個線程會一直運作,然後從對話隊列中不停擷取RoundTrip,一旦擷取的RoundTrip不會空,則運作這個RoundTrip,否則線程進入休眠狀态。

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

<code>public</code> <code>bool</code> <code>Start()</code>

<code>    </code><code>if</code> <code>(_started)</code>

<code>        </code><code>return</code> <code>true</code><code>;</code>

<code>    </code><code>FireOnStarting();</code>

<code>    </code><code>try</code>

<code>        </code><code>CommStreamProvider.Start();</code>

<code>    </code><code>catch</code> <code>(Exception ex)</code>

<code>        </code><code>_log.Error(</code><code>"Start the communication provider failed."</code><code>, ex);</code>

<code>        </code><code>return</code> <code>false</code><code>;</code>

<code>    </code><code>_thread = </code><code>new</code> <code>Thread(() =&gt; {</code>

<code>        </code><code>RoundTripBase roundTrip;</code>

<code>        </code><code>while</code> <code>(!_exited)</code>

<code>            </code><code>Monitor.Enter(_queue.SyncRoot);</code>

<code>            </code><code>roundTrip = Dequeue();</code>

<code>            </code><code>if</code> <code>(roundTrip != </code><code>null</code><code>)</code>

<code>                </code><code>try</code>

<code>                    </code><code>Monitor.Exit(_queue.SyncRoot);</code>

<code>                    </code><code>OnRoundTripStartingHandler(</code><code>this</code><code>,</code>

<code>new</code> <code>RoundTripEventArgs() { RoundTrip = roundTrip });</code>

<code>                    </code><code>roundTrip.Start();</code>

<code>                </code><code>catch</code> <code>(ThreadAbortException)</code>

<code>                    </code><code>Trace(</code><code>"通訊線程被終止。"</code><code>);</code>

<code>                    </code><code>throw</code><code>;</code>

<code>                </code><code>catch</code> <code>(Scata30StreamException ex) </code><code>// 無法擷取Stream的時候,直接退出</code>

<code>                    </code><code>_exited = </code><code>true</code><code>;</code>

<code>                    </code><code>roundTrip.Trace(</code><code>"會話失敗,因為:連接配接已經關閉。"</code><code>);</code>

<code>                </code><code>catch</code> <code>(Exception ex)</code>

<code>                    </code><code>string</code> <code>error = GetErrorMessage(ex);</code>

<code>                    </code><code>roundTrip.Trace(</code><code>string</code><code>.Format(</code><code>"會話失敗,因為:{0}。"</code><code>, error));</code>

<code>                </code><code>if</code> <code>(!_exited)</code>

<code>                    </code><code>roundTrip.Trace(Environment.NewLine);</code>

<code>                    </code><code>OnRoundTripStartedHandler(</code><code>this</code><code>,</code>

<code>                    </code><code>// 1 将目前失敗的RoundTrip儲存入隊</code>

<code>                    </code><code>FailedRoundTrips.Enqueue(roundTrip);</code>

<code>                    </code><code>// 2 儲存其它沒有處理的RoundTrip</code>

<code>                    </code><code>do</code>

<code>                    </code><code>{</code>

<code>                        </code><code>roundTrip = _queue.Dequeue();</code>

<code>                        </code><code>if</code> <code>(roundTrip != </code><code>null</code><code>)</code>

<code>                        </code><code>{</code>

<code>                            </code><code>FailedRoundTrips.Enqueue(roundTrip);</code>

<code>                        </code><code>}</code>

<code>                    </code><code>} </code><code>while</code> <code>(roundTrip != </code><code>null</code><code>);</code>

<code>                    </code><code>// 3 停止目前協定</code>

<code>                    </code><code>Stop();</code>

<code>                </code><code>// 執行完RoundTrip後,開始清理資源</code>

<code>                </code><code>roundTrip.Dispose();</code>

<code>                </code><code>Monitor.Exit(_queue.SyncRoot);</code>

<code>                </code><code>OnIdleHandler(</code><code>this</code><code>, </code><code>new</code> <code>RoundTripEventArgs());</code>

<code>                </code><code>_autoResetEvent.WaitOne();</code>

<code>    </code><code>});</code>

<code>    </code><code>_thread.Start();</code>

<code>    </code><code>_started = </code><code>true</code><code>;</code>

<code>    </code><code>FireOnStarted();</code>

<code>    </code><code>return</code> <code>true</code><code>;</code>

執行對話,是以異步的方式來進行,通過事件進行通知。如下所示。

<code>using</code> <code>UIShell.CommServerService.Protocol.Scata30.RoundTrip;</code>

<code>using</code> <code>UIShell.CommServerService.Protocol.Scata30.Message;</code>

<code>namespace</code> <code>UIShell.CommServerService.Protocol.Scata30</code>

<code>    </code><code>public</code> <code>partial</code> <code>class</code> <code>Scata30Protocol</code>

<code>        </code><code>public</code> <code>Scata30SetConcentratorTimeRoundTrip SetConcentratorTime(</code>

<code>            </code><code>ushort</code> <code>concentratorAddress,</code>

<code>            </code><code>ushort</code> <code>concentratorZigbeeAddress,</code>

<code>            </code><code>DateTime timeStamp,</code>

<code>            </code><code>EventHandler&lt;RoundTripEventArgs&gt; onMessageSend,</code>

<code>            </code><code>EventHandler&lt;RoundTripEventArgs&gt; onCompleted,</code>

<code>            </code><code>EventHandler&lt;RoundTripEventArgs&gt; onError)</code>

<code>            </code><code>var</code> <code>roundTrip = </code><code>new</code> <code>Scata30SetConcentratorTimeRoundTrip(</code>

<code>                </code><code>concentratorAddress,</code>

<code>                </code><code>concentratorZigbeeAddress,</code>

<code>                </code><code>timeStamp,</code>

<code>                </code><code>this</code><code>);</code>

<code>            </code><code>if</code> <code>(onMessageSend != </code><code>null</code><code>)</code>

<code>                </code><code>roundTrip.OnMessageSend += onMessageSend;</code>

<code>            </code><code>if</code> <code>(onCompleted != </code><code>null</code><code>)</code>

<code>                </code><code>roundTrip.OnCompleted += onCompleted;</code>

<code>            </code><code>if</code> <code>(onError != </code><code>null</code><code>)</code>

<code>                </code><code>roundTrip.OnError += onError;</code>

<code>            </code><code>Enqueue(roundTrip);</code>

<code>            </code><code>return</code> <code>roundTrip;</code>

這個通訊協定的實作非常優雅,在維護的過程中,通訊指令的變更和通訊方式的轉變,都不需要再修改協定和RoundTrip本身,隻需要對消息體進行變更并增加新的StreamProvider,并在上層的業務邏輯進行實作。關于通訊協定的設計,限于篇幅描述到這,在下文将繼續描述。

本文轉自道法自然部落格園部落格,原文連結:http://www.cnblogs.com/baihmpgy/archive/2012/12/27/2836122.html,如需轉載請自行聯系原作者