天天看點

netty系列之:在netty中使用protobuf協定

簡介

netty中有很多适配不同協定的編碼工具,對于流行的google出品的protobuf也不例外。netty為其提供了ProtobufDecoder和ProtobufEncoder兩個工具還有對應的frame detection,接下來我們會通過一個例子來詳細講解如何在netty中使用protobuf。

定義protobuf

我們舉個最簡單的例子,首先定義一個需要在網絡中進行傳輸的message,這裡我們定義一個student對象,他有一個age和一個name屬性,如下所示:

syntax = "proto3";

package com.flydean17.protobuf;

option java_multiple_files = true;
option java_package = "com.flydean17.protobuf";
option java_outer_classname = "StudentWrapper";

message Student {
  optional int32 age = 1;
  optional string name =2;
}
           

使用下面的指令,對其進行編譯:

protoc --experimental_allow_proto3_optional  -I=. --java_out=. student.proto

           

可以看到生成了3個檔案,分别是Student,StudentOrBuilder和StudentWrapper。其中Student和StudentOrBuilder是我們真正需要用到的對象。

定義handler

在handler中,我們主要進行對消息進行處理,這裡我們在clientHandler中進行消息的建構和發送,StudentClientHandler繼承SimpleChannelInboundHandler并重新channelActive方法, 在該方法中我們使用protobuf的文法,建構一個新的Student執行個體,并給他設定好age和name兩個屬性。

然後使用ctx.write和ctx.flush方法将其發送到server端:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // channel活躍
        //建構一個Student,并将其寫入到channel中
        Student student= Student.newBuilder().setAge(22).setName("flydean").build();
        log.info("client發送消息{}",student);
        ctx.write(student);
        ctx.flush();
    }
           

StudentServerHandler也是繼承SimpleChannelInboundHandler,并重寫channelRead0方法,當server端讀取到student消息的時候,日志輸出,并将其回寫到channel中,供clientHandler讀取:

public void channelRead0(ChannelHandlerContext ctx, Student student) throws Exception {
        log.info("server收到消息{}",student);
        // 寫入消息
        ChannelFuture future = ctx.write(student);
    }
           

當client讀取到消息之後,直接日志輸出,不再做進一步處理,到此,一輪client和server端的互動就完成了:

public void channelRead0(ChannelHandlerContext ctx, Student student) throws Exception {
        log.info("client收到消息{}",student);
    }
           

設定ChannelPipeline

在上一節,不管是在StudentClientHandler還是在StudentServerHandler中,我們都假設channel中傳遞的對象就是Student,而不是原始的ByteBuf。這是怎麼做到的呢?

這裡我們需要使用到netty提供的frame detection,netty為protobuf協定專門提供了ProtobufDecoder和ProtobufEncoder,用于對protobuf對象進行編碼和解碼。

但是這兩個編碼和解碼器分别是MessageToMessageEncoder和MessageToMessageDecoder,他們是消息到消息的編碼和解碼器,是以還需要和frame detection配合使用。

netty同樣提供了和protobuf配合使用的frame detector,他們是ProtobufVarint32FrameDecoder和ProtobufVarint32LengthFieldPrepender。

Varint32指的是protobuf的編碼格式,第一個位元組使用的是可變的varint。

有了frame detector和編碼解碼器,我們隻需要将其順序加入ChannelPipeline即可。

在用戶端,StudentClientInitializer繼承自ChannelInitializer,我們需要重寫其initChannel方法:

public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();

        p.addLast(new ProtobufVarint32FrameDecoder());
        p.addLast(new ProtobufDecoder(Student.getDefaultInstance()));

        p.addLast(new ProtobufVarint32LengthFieldPrepender());
        p.addLast(new ProtobufEncoder());

        p.addLast(new StudentClientHandler());
    }
           

在伺服器端,同樣StudentServerInitializer也繼承自ChannelInitializer,也需要重寫其initChannel方法:

public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();

        p.addLast(new ProtobufVarint32FrameDecoder());
        p.addLast(new ProtobufDecoder(Student.getDefaultInstance()));

        p.addLast(new ProtobufVarint32LengthFieldPrepender());
        p.addLast(new ProtobufEncoder());

        p.addLast(new StudentServerHandler());
    }
           

這樣ChannelPipeline也設定完成了。

建構client和server端并運作

最後好做的就是建構client和server端并運作,這和普通的netty用戶端和伺服器端并沒有什麼差別:

建構StudentClient:

public static void main(String[] args) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new StudentClientInitializer());
            // 建立連接配接
            Channel ch = b.connect(HOST, PORT).sync().channel();
            // 等待關閉
            ch.closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
           

建構StudentServer:

public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new StudentServerInitializer());

            b.bind(PORT).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
           

運作可得:

server端:
[nioEventLoopGroup-3-1] INFO  c.f.protobuf.StudentServerHandler - server收到消息age: 22
name: "flydean"

[nioEventLoopGroup-2-1] INFO  c.f.protobuf.StudentClientHandler - client發送消息age: 22
name: "flydean"

client端:
[nioEventLoopGroup-2-1] INFO  c.f.protobuf.StudentClientHandler - client收到消息age: 22
name: "flydean"
           

可見Student消息已經發送和接收成功了。

總結

netty提供了很多和協定适配的工具類,這樣我們就可以專注于業務邏輯,不需要考慮具體的編碼轉換的問題,非常好用。

本文的例子可以參考:learn-netty4