天天看点

netty与protobuf结合开发小demo

最近学习netty,于是就有了一下例子:(为什么用protobuf可以在我前一篇文章里面找找)

1.写了一个proto文件生成java代码;(到你的项目所在处执行protoc.exe msg.proto --java_out=./)

message Client {  
    required string head = 1;  
    required string body = 2;  
}

message Server {
    required int32 code=1;
    required string message=2;
}      

便可自动生成java代码;

由于我也是初学netty,有些东西可能不太熟悉,若有错误,请多包涵;

这是Client类

public class Client {
    public static String host = "127.0.0.1";
    public static int port = 8087;

    public static void main(String[] args) {
        EventLoopGroup worker = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(worker);
        b.channel(NioSocketChannel.class);
        b.handler(new ClientInitializer());
        try {
            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
        }

    }
}      
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
    protected void initChannel(SocketChannel ch) throws Exception {
        // decoded
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
        //这里是收到服务端发过来的消息,所以是对服务端的response解码
        ch.pipeline().addLast(new ProtobufDecoder(Msg.Server.getDefaultInstance()));
        // encoded
        ch.pipeline().addLast(new LengthFieldPrepender(4)); //文本长度
        ch.pipeline().addLast(new ProtobufEncoder());
        // 注册handler
        ch.pipeline().addLast(new ClientHandler("AQ"));
    }
}      
public class ClientHandler extends SimpleChannelInboundHandler<Message> {
    public String clientName;
    public ClientHandler(String name){
        this.clientName = name;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        System.out.println("Server say : " + msg.toString());
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client active ");
        Msg.Client msg = Msg.Client.newBuilder().setHead("Content-Type:application/json;charset=UTF-8").setBody("hello world!").build();
        ctx.writeAndFlush(msg);
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client close ");
        super.channelInactive(ctx);
    }

}
      

server端:

public class Server {
    private static int port = 8787;
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();
        server.group(boss, worker);
        server.channel(NioServerSocketChannel.class);
        server.childHandler(new ServerInitializer());
        server.option(ChannelOption.SO_BACKLOG, 128);
        server.childOption(ChannelOption.SO_KEEPALIVE, true);

        try {
            //绑定端口 同步等待成功
            ChannelFuture f = server.bind(port).sync();
            //等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {

            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}      
public class ServerInitializer  extends ChannelInitializer<SocketChannel> {
    protected void initChannel(SocketChannel ch) throws Exception {
        // decoded
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
        //解码客户端发过来的消息
        ch.pipeline().addLast(new ProtobufDecoder(Msg.Client.getDefaultInstance()));
        // encoded
        ch.pipeline().addLast(new LengthFieldPrepender(4));
        ch.pipeline().addLast(new ProtobufEncoder());
        // 注册handler
        ch.pipeline().addLast(new ServerHandler());
    }
}
      
public class ServerHandler extends SimpleChannelInboundHandler<Message> {
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        // 收到消息直接打印输出
        System.out.println(msg.getClass());
        Msg.Server response = null;
        if(msg instanceof Msg.Client) {
            Msg.Client clientMsg = (Msg.Client) msg;
            System.out.println(ctx.channel().remoteAddress() + " Say : " + clientMsg.getBody());
            response = Msg.Server.newBuilder().setCode(0).setMessage("Received client message success").build();
        } else {
            response = Msg.Server.newBuilder().setCode(-1).setMessage("client message is illegal").build();
            System.out.println("client message is illegal");
        }
        // 返回客户端消息 - 我已经接收到了你的消息
        ctx.writeAndFlush(response);
    }

    /*
     * 覆盖 channelActive 方法 在channel被启用的时候触发 (在建立连接的时候)
     */
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
        String welcome = "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!";
        Msg.Server response = Msg.Server.newBuilder().setCode(101).setMessage(welcome).build();
        ctx.writeAndFlush(response);
        super.channelActive(ctx);
    }
}
      

<很多代码是参考的>