天天看點

實戰Netty!基于私有協定,怎樣快速開發網絡通信服務

作者:Java機械師

私有協定

編寫目的

本文檔用于描述邊緣計算單元(以下簡稱邊緣盒)與上位機配置軟體(以下簡稱上位機)之間進行的資料互動通信協定。

通信方式

邊緣盒作為服務端,上位機作為用戶端,采用TCP/IP協定的socket連接配接,端口号預設為6000,資料包采用位元組的二進制資料傳輸。

資料包

標頭(10位元組) 負載(消息内容)

資料包由標頭和消息内容組成,標頭固定10個位元組,其内容如下:

标志(4) 負載長度(2) 協定版本号(1) 包類型(1) 校驗位(1) Reserve(1)

标志: 包的前導字元,固定CYRC;

負載長度: 負載位元組數(不包括標頭的長度);

協定版本号: 辨別通信協定的版本,初版值為0x10;

包類型: 辨別資料包的操作類型,具體見下表:

取值 含義 說明
1 查詢 上位機發送查詢消息。
2 設定 上位機發送設定消息。
3 查詢應答 邊緣盒對查詢請求的應答。
4 設定應答 邊緣盒對設定請求的應答。
5 訂閱 上位機發送給邊緣盒訂閱資料主動上報請求。
6 主動上報 邊緣盒主動向上位機發送資料。
7 心跳 上位機發送心跳消息
8 心跳應答 邊緣盒對心跳消息的應答
其他 保留

校驗位: 負載資料所有位元組之和;

Reserve: 預留,值填0;

包體負載(消息内容)表示具體的資料對象,其内容如下:

對象辨別(1) 對象資料内容(0…n)

對于查詢、心跳等包類型,包體負載(消息内容)隻需要對象辨別,對象資料内容省略。

對象辨別: 辨別資料表的操作對象,具體如下:

取值 含義 說明
心跳 上位機連接配接後間隔時間發送心跳消息給邊緣盒。

具體的協定内容就不做展示了,下面就開始服務的編寫。

服務開發

這裡我們開發一個上位機的配置軟體(用戶端),我們首先要來分析,怎麼對資料包進行編解碼,其實工作中,這個也是服務開發的核心所在,也是難點所在。

編寫消息類

public class MyProtocol
{
    /**
     * 消息的開頭的資訊标志
     */
    private String head = "CYRC";
    /**
     * 消息的長度
     */
    private int contentLength;
    /**
     * 消息的内容
     */
    private byte[] content;

    public MyProtocol(int contentLength, byte[] content)
    {
        this.contentLength = contentLength;
        this.content = content;
    }

    public String getHead()
    {
        return head;
    }

    public void setHead(String head)
    {
        this.head = head;
    }

    public int getContentLength()
    {
        return contentLength;
    }

    public void setContentLength(int contentLength)
    {
        this.contentLength = contentLength;
    }

    public byte[] getContent()
    {
        return content;
    }

    public void setContent(byte[] content)
    {
        this.content = content;
    }

    public String byteToHex(byte[] bytes, int cnt)
    {
        String strHex;
        StringBuilder sb = new StringBuilder();
        for (int n = 0; n < cnt; n++)
        {
            strHex = Integer.toHexString(bytes[n] & 0xFF);
            sb.append((strHex.length() == 1) ? "0" + strHex : strHex);
            sb.append(" ");
        }
        return sb.toString().trim();
    }

    @Override
    public String toString()
    {
        return "MyProtocol [head=" + head + ", contentLength="
                + contentLength + ", content=" + byteToHex(content, contentLength) + "]";
    }
}
複制代碼           

MyDecoder解碼器

@Slf4j
public class MyDecoder extends ByteToMessageDecoder
{
    /**
     * <pre>
     * 協定開始的标準head_data,CYRC,占據4個位元組.
     * 表示資料的長度contentLength,占據2個位元組.
     * </pre>
     */
    public final int BASE_LENGTH = 10;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception
    {
        if (buffer.readableBytes() >= BASE_LENGTH)
        {
            if (buffer.readableBytes() > 2048)
            {
                buffer.skipBytes(buffer.readableBytes());
            }

            // 記錄標頭開始的index
            int beginReader;
            //CYRC   43 59 52 43
            while (true)
            {
                // 擷取標頭開始的index
                beginReader = buffer.readerIndex();
                // 标記標頭開始的index
                buffer.markReaderIndex();
                // 讀到了協定的開始标志,結束while循環
                int head1 = buffer.readUnsignedShort();
                int head2 = buffer.readUnsignedShort();
                if (head1 == 17241 && head2 == 21059)
                {
                    break;
                }

                // 未讀到標頭,略過一個位元組
                // 每次略過,一個位元組,去讀取,標頭資訊的開始标記
                buffer.resetReaderIndex();
                buffer.readByte();
                // 當略過,一個位元組之後,資料包的長度,又變得不滿足
                // 此時,應該結束。等待後面的資料到達
                if (buffer.readableBytes() < BASE_LENGTH)
                {
                    return;
                }
            }

            // 消息的長度
            int length = buffer.readUnsignedShort() + 4;
            // 判斷請求資料包資料是否到齊
            if (buffer.readableBytes() < length)
            {
                // 還原讀指針
                buffer.readerIndex(beginReader);
                return;
            }

            // 讀取data資料
            byte[] data = new byte[length];
            buffer.readBytes(data);

            MyProtocol protocol = new MyProtocol(data.length, data);
            out.add(protocol);
        }
    }
}
複制代碼           

MyEncoder編碼器

public class MyEncoder extends MessageToByteEncoder<MyProtocol>
{
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MyProtocol myProtocol, ByteBuf out) throws Exception
    {
        // 1.寫入消息的開頭的資訊标志(CYCR)
        out.writeBytes(myProtocol.getHead().getBytes());
        // 2.寫入消息的長度(負載長度)
        out.writeShort(myProtocol.getContentLength() - 4);
        // 3.寫入消息的内容(byte[]類型)
        out.writeBytes(myProtocol.getContent());
    }
}
複制代碼           

自定義ChannelInboundHandlerAdapter

@Slf4j
public class BootNettyClientChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter
{
    public BootNettyClientChannelInboundHandlerAdapter()
    {

    }

    /**
     * 從服務端收到新的資料時,這個方法會在收到消息時被調用
     *
     * @param ctx
     * @param msg
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        MyProtocol protocol = (MyProtocol) msg;
        log.info("接收到服務端的消息:" + protocol);
    }

    /**
     * 從服務端收到新的資料、讀取完成時調用
     *
     * @param ctx
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException
    {
        ctx.flush();
    }

    /**
     * 當出現 Throwable 對象才會被調用,即當 Netty 由于 IO 錯誤或者處理器在處理事件時抛出的異常時
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws InterruptedException
    {
        log.error("exceptionCaught:{}", cause.getMessage());
        ctx.close();//抛出異常,斷開與用戶端的連接配接
    }

    /**
     * 用戶端與服務端第一次建立連接配接時 執行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        super.channelActive(ctx);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        log.info("channelActive------TCP用戶端建立連接配接------clientIp:{}", clientIp);
    }

    /**
     * 用戶端與服務端 斷連時 執行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception
    {
        super.channelInactive(ctx);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        ctx.close(); //斷開連接配接時,必須關閉,否則造成資源浪費
        log.info("channelInactive------TCP用戶端斷開連接配接----------clientIp:{}", clientIp);
    }
}
複制代碼           

BootNettyClient用戶端

@Slf4j
public class BootNettyClient
{
    public void connect(String host,int port)
    {
        /**
         * 用戶端的NIO線程組
         *
         */
        EventLoopGroup group = new NioEventLoopGroup();
        try
        {
            /**
             * Bootstrap 是一個啟動NIO服務的輔助啟動類 用戶端的
             */
            Bootstrap bootstrap = new Bootstrap();
            /**
             * 設定group
             */
            bootstrap = bootstrap.group(group);
            /**
             * 關聯用戶端通道
             */
            bootstrap = bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
            /**
             * 設定 I/O處理類,主要用于網絡I/O事件,記錄日志,編碼、解碼消息
             */
            bootstrap = bootstrap.handler(new ChannelInitializer<SocketChannel>()
            {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception
                {
                    ChannelPipeline pipeline = channel.pipeline();
                    // 添加自定義協定的編解碼工具
                    pipeline.addLast(new MyDecoder());
                    pipeline.addLast(new MyEncoder());
                    /**
                     * 自定義ChannelInboundHandlerAdapter
                     */
                    pipeline.addLast(new BootNettyClientChannelInboundHandlerAdapter());
                }
            });
            /**
             * 連接配接服務端
             */
            ChannelFuture f = bootstrap.connect(host, port).sync();
            log.info("TCP用戶端連接配接成功, 位址是: " + host + ":" + port);
            /**
             * 等待連接配接端口關閉
             */
            f.channel().closeFuture().sync();
        }
        catch (Exception e)
        {
            log.error("啟動netty client失敗:", e);
        }
        finally
        {
            /**
             * 退出,釋放資源
             */
            group.shutdownGracefully();
        }
    }
}
複制代碼           

NettyClientApplication程式啟動類

@SpringBootApplication
public class NettyClientApplication implements CommandLineRunner {

	public static void main(String[] args) {
		SpringApplication.run(NettyClientApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		new BootNettyClient().connect("172.16.1.100", 6000);
	}
}
複制代碼           

測試

利用網絡調試助手工具,開啟一個服務端,模拟發送資料
實戰Netty!基于私有協定,怎樣快速開發網絡通信服務
發送一個完整的包(43 59 52 43 00 01 10 02 00 00 0f),如下圖,用戶端完整接收資料。
實戰Netty!基于私有協定,怎樣快速開發網絡通信服務
半包測試資料(43 59 52 43 00 01 10 02 00),無日志列印,說明用戶端沒有接收該不完整資料。
粘包資料測試,兩個包一起發送(43 59 52 43 00 01 10 02 00 00 0f 43 59 52 43 00 01 10 02 00 00 0f),如下圖,用戶端同時接收到兩條資料。
實戰Netty!基于私有協定,怎樣快速開發網絡通信服務
粘包資料測試,一個半包發送(43 59 52 43 00 01 10 02 00 00 0f 43 59 52 43 00 01 10 02),如下圖,可以看出,隻接收到前面完整包的資料,後面的半包資料被忽略。
實戰Netty!基于私有協定,怎樣快速開發網絡通信服務

業務代碼編寫

業務代碼,無非就是将收到的資料進行一些邏輯處理,資料的解析。編寫一個接收消息處理類即可。示例如下

通信參數對象:

序号 名稱 位元組數 取值範圍 備注
1 對象辨別 1 3 對象辨別号:3
2 IP位址 4 每個位元組表示一段位址值(A.B.C.D,第一位元組對應A,依次類推)
3 端口 2
4 标志 1 [0-1] 0:網絡通信,1: 485通信(端口賦波特率,IP指派0)
@Slf4j
public class ClientService {

  /** 接收邊緣盒子消息 */
  public void receiveData(MyProtocol myProtocol) {
    try {
      byte[] data = myProtocol.getContent();
      int type = data[1];
      int objId = data[4];

      // 心跳應答
      if (type == PackageTypeConstant.HEART_BEAT_REPLY) {
        log.info("--------收到心跳回複----------");
      }
      // 查詢應答
      else if (type == PackageTypeConstant.QUERY_RESULT) {
        switch (objId) {
          case ObjectIdConstant.SIGNAL:
            {
              reSignalParameter(data); // 接收通信參數
              break;
            }
    
  					//.....
     
          default:
            {
              break;
            }
        }
      }
    } catch (Exception e) {
      log.error("錯誤的消息指令..", e);
    }
  }

  private void reSignalParameter(byte[] data) {
    EventCenterService.getInstance()
        .submitEvent(
            new IEvent() {
              @Override
              public void execute() {
                try {
                  SignalParameter sp = new SignalParameter();

                  int idx = 5;
                  byte temp1;
                  byte temp2;
                  int a = (data[idx++] & 0xFF);
                  int b = (data[idx++] & 0xFF);
                  int c = (data[idx++] & 0xFF);
                  int d = (data[idx++] & 0xFF);
                  String ip = a + "." + b + "." + c + "." + d;

                  temp1 = data[idx++];
                  temp2 = data[idx++];
                  // 端口号
                  int port = ((char) (temp1 & 0xFF) << 8) | (char) (temp2 & 0xFF);
                  int sign = data[idx];

                  sp.setIp(ip);
                  sp.setPort(port);
                  sp.setSign(sign);

                  DataConfig.signalParameterList.add(sp);
                } catch (Exception e) {
                  log.error("通信參數解析出錯:", e);
                }
              }
            });
  }
}
複制代碼           

後記

工作中,利用netty開發網絡通信服務,資料的編解碼處理好了,後面的業務代碼相對就很容易了。

本篇文章,是我在工作中的一些實戰經驗,希望對netty感興趣的小夥伴有點幫助。關于netty的原理這篇文章就不做過多介紹了,前面的文章也講了很多,後面時間主要講講netty實際的運用。

繼續閱讀