天天看點

基于Netty與RabbitMQ的消息服務

netty作為一個高性能的異步網絡開發架構,可以作為各種服務的開發架構。

前段時間的一個項目涉及到硬體裝置實時資料的采集,采用netty作為采集服務的實作架構,同時使用rabbitmq作為采集服務和各個其他子產品的通信消息隊列,整個服務架構圖如下:

基于Netty與RabbitMQ的消息服務

将業務代碼和實際協定解析部分的代碼抽離,得到以上一個簡單的設計圖,代碼開源在github上,簡單介紹下nettymqserver采集服務涉及到的幾個關鍵技術點:

1、裝置tcp消息解析:

nettymqserver和采集裝置device之間采用tcp通信,tcp消息的解析可以使用lengthfieldbasedframedecoder(消息頭和消息體),可以有效的解決tcp消息“粘包”問題。

消息包解析圖如下:bytes length field at offset 0, do not strip header, the length field represents the length of the whole message

代碼中消息長度的存儲采用了4個位元組,采用lengthfieldbasedframedecoder(65535,0,4,-4,0)解碼,netty會從接收的資料中頭4個位元組中得到消息的長度,進而得到一個tcp消息包。

2、給裝置發消息:

首先在連接配接建立時,要保留tcp的連接配接:

基于Netty與RabbitMQ的消息服務
基于Netty與RabbitMQ的消息服務

在每次一個channel active(連接配接建立)的時候用channelgroup儲存這個channel連接配接,當需要給某個裝置發消息的時候,可以周遊該channelgroup,找到對應的channel,給該channel發送消息:

這裡是給所有的連接配接的裝置都發。當連接配接斷開的時候,channelgroup會自動remove掉這個連接配接,不需要我們手動管理。

3、心跳檢測

當某個裝置device由于斷電或是其他原因導緻裝置不正常無法采集資料,netty服務端需要知道該裝置是否在正常工作,可以使用netty的idlestatehandler,示例代碼如下:

基于Netty與RabbitMQ的消息服務
基于Netty與RabbitMQ的消息服務

上面設定3分鐘沒有讀到資料,則觸發一個reader_idle事件。

4、rabbitmq消息接收與發送

nettymqserver消息發送采用了spring amqp,隻需要在配置檔案中簡單配置一下,就可以友善使用。

nettymqserver消息接收同樣可以采用spring amqp,但由于對spring相關的配置不是很熟悉,為了更靈活的使用mq,這裡使用了rabbitmq client java api來實作:

基于Netty與RabbitMQ的消息服務
基于Netty與RabbitMQ的消息服務

以上代碼從一個queue中讀取資料,為了有效處理資料,防止異常資料丢失,使用了手動ack。

參考:

http://netty.io/

http://netty.io/4.0/api/io/netty/handler/codec/lengthfieldbasedframedecoder.html

http://netty.io/4.0/api/io/netty/handler/timeout/idlestatehandler.html

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。