netty作為一個高性能的異步網絡開發架構,可以作為各種服務的開發架構。
前段時間的一個項目涉及到硬體裝置實時資料的采集,采用netty作為采集服務的實作架構,同時使用rabbitmq作為采集服務和各個其他子產品的通信消息隊列,整個服務架構圖如下:
将業務代碼和實際協定解析部分的代碼抽離,得到以上一個簡單的設計圖,代碼開源在github上,簡單介紹下nettymqserver采集服務涉及到的幾個關鍵技術點:
1、裝置tcp消息解析:
nettymqserver和采集裝置device之間采用tcp通信,tcp消息的解析可以使用lengthfieldbasedframedecoder(消息頭和消息體),可以有效的解決tcp消息“粘包”問題。
消息包解析圖如下:bytes length field at offset 0, do not strip header, the length field represents the length of the whole message
代碼中消息長度的存儲采用了4個位元組,采用lengthfieldbasedframedecoder(65535,0,4,-4,0)解碼,netty會從接收的資料中頭4個位元組中得到消息的長度,進而得到一個tcp消息包。
2、給裝置發消息:
首先在連接配接建立時,要保留tcp的連接配接:
在每次一個channel active(連接配接建立)的時候用channelgroup儲存這個channel連接配接,當需要給某個裝置發消息的時候,可以周遊該channelgroup,找到對應的channel,給該channel發送消息:
這裡是給所有的連接配接的裝置都發。當連接配接斷開的時候,channelgroup會自動remove掉這個連接配接,不需要我們手動管理。
3、心跳檢測
當某個裝置device由于斷電或是其他原因導緻裝置不正常無法采集資料,netty服務端需要知道該裝置是否在正常工作,可以使用netty的idlestatehandler,示例代碼如下:
上面設定3分鐘沒有讀到資料,則觸發一個reader_idle事件。
4、rabbitmq消息接收與發送
nettymqserver消息發送采用了spring amqp,隻需要在配置檔案中簡單配置一下,就可以友善使用。
nettymqserver消息接收同樣可以采用spring amqp,但由于對spring相關的配置不是很熟悉,為了更靈活的使用mq,這裡使用了rabbitmq client java api來實作:
以上代碼從一個queue中讀取資料,為了有效處理資料,防止異常資料丢失,使用了手動ack。
參考:
http://netty.io/
http://netty.io/4.0/api/io/netty/handler/codec/lengthfieldbasedframedecoder.html
http://netty.io/4.0/api/io/netty/handler/timeout/idlestatehandler.html
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。