天天看點

簡單的Java實作Netty進行通信

使用Java搭建一個簡單的Netty通信例子

看過dubbo源碼的同學應該都清楚,使用dubbo協定的底層通信是使用的netty進行互動,而最近看了dubbo的Netty部分後,自己寫了個簡單的Netty通信例子。

本文源位址:實作Netty進行通信

準備

工程截圖

子產品詳解

rpc-common

rpc-common作為各個子產品都需使用的子產品,工程中出現的是一些通信時請求的參數以及傳回的參數,還有一些序列化的工具。

rpc-client

rpc-client中目前隻是單單的一個NettyClient啟動類。

rpc-server

rpc-client中目前也隻是單單的一個NettyServer服務啟動類。

需要的依賴

目前所有的依賴項都出現在 rpc-common 下的 pom.xml中。

<!-- Netty -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.10.Final</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
</dependency>

<!-- Protostuff -->
<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.0.9</version>
</dependency>

<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.0.9</version>
</dependency>

<!-- Objenesis -->
<dependency>
    <groupId>org.objenesis</groupId>
    <artifactId>objenesis</artifactId>
    <version>2.1</version>
</dependency>

<!-- fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.38</version>
</dependency>           

實作

首先我們在common中先定義本次的Request和Response的基類對象。

public class Request {

private String requestId;

private Object parameter;

public String getRequestId() {
    return requestId;
}

public void setRequestId(String requestId) {
    this.requestId = requestId;
}

public Object getParameter() {
    return parameter;
}

public void setParameter(Object parameter) {
    this.parameter = parameter;
}           

}

public class Response {

private String requestId;

private Object result;

public String getRequestId() {
    return requestId;
}

public void setRequestId(String requestId) {
    this.requestId = requestId;
}

public Object getResult() {
    return result;
}

public void setResult(Object result) {
    this.result = result;
}           

使用fastJson進行本次序列化

Netty對象的序列化轉換很好懂, ByteToMessageDecoder 和 MessageToByteEncoder 分别隻要繼承它們,重寫方法後,擷取到Object和Byte,各自轉換就OK。

不過如果是有要用到生産上的同學,建議不要使用 fastJson,因為它的漏洞更新檔真的是太多了,可以使用google的 protostuff。

public class RpcDecoder extends ByteToMessageDecoder {

// 目标對象類型進行解碼
private Class<?> target;

public RpcDecoder(Class target) {
    this.target = target;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    if (in.readableBytes() < 4) {   // 不夠長度丢棄
        return;
    }
    in.markReaderIndex();   // 标記一下目前的readIndex的位置
    int dataLength = in.readInt();  // 讀取傳送過來的消息的長度。ByteBuf 的readInt()方法會讓他的readIndex增加4

    if (in.readableBytes() < dataLength) {  // 讀到的消息體長度如果小于我們傳送過來的消息長度,則resetReaderIndex. 這個配合markReaderIndex使用的。把readIndex重置到mark的地方
        in.resetReaderIndex();
        return;
    }
    byte[] data = new byte[dataLength];
    in.readBytes(data);

    Object obj = JSON.parseObject(data, target);    // 将byte資料轉化為我們需要的對象
    out.add(obj);
}           

public class RpcEncoder extends MessageToByteEncoder {

//目标對象類型進行編碼
private Class<?> target;

public RpcEncoder(Class target) {
    this.target = target;
}

@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
    if (target.isInstance(msg)) {
        byte[] data = JSON.toJSONBytes(msg);    // 使用fastJson将對象轉換為byte
        out.writeInt(data.length);  // 先将消息長度寫入,也就是消息頭
        out.writeBytes(data);   // 消息體中包含我們要發送的資料
    }
}
           

NetyServer

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    Request request = (Request) msg;

    System.out.println("Client Data:" + JSON.toJSONString(request));

    Response response = new Response();
    response.setRequestId(request.getRequestId());
    response.setResult("Hello Client !");

    // client接收到資訊後主動關閉掉連接配接
    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
}           

public class NettyServer {

private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

private String ip;
private int port;

public NettyServer(String ip, int port) {
    this.ip = ip;
    this.port = port;
}

public void server() throws Exception {

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {

        final ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_SNDBUF, 32 * 1024)
                .option(ChannelOption.SO_RCVBUF, 32 * 1024)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new RpcDecoder(Request.class))
                                .addLast(new RpcEncoder(Response.class))
                                .addLast(new NettyServerHandler());
                    }
                });

        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);  // 開啟長連接配接

        ChannelFuture future = serverBootstrap.bind(ip, port).sync();
           

// if (future.isSuccess()) {

//

// new Register().register("/yanzhenyidai/com.yanzhenyidai.server", ip + ":" + port);

// }

future.channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

public static void main(String[] args) throws Exception {
    new NettyServer("127.0.0.1", 20000).server();
}           

關鍵名詞:

EventLoopGroup

workerGroup

bossGroup

Server端的EventLoopGroup分為兩個,一般workerGroup作為處理請求,bossGroup作為接收請求。

ChannelOption

SO_BACKLOG

SO_SNDBUF

SO_RCVBUF

SO_KEEPALIVE

以上四個常量作為TCP連接配接中的屬性。

ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);

NettyServerHandler中出現的 ChannelFutureListener.CLOSE ,作為Server端主動關閉與Client端的通信,如果沒有主動Close,那麼NettyClient将會一直處于阻塞狀态,得不到NettyServer的傳回資訊。

NettyClient

public class NettyClient extends SimpleChannelInboundHandler {

private final String ip;
private final int port;
private Response response;

public NettyClient(String ip, int port) {
    this.ip = ip;
    this.port = port;
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
    this.response = response;
}

public Response client(Request request) throws Exception {
    EventLoopGroup group = new NioEventLoopGroup();

    try {

        // 建立并初始化 Netty 用戶端 Bootstrap 對象
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel channel) throws Exception {
                ChannelPipeline pipeline = channel.pipeline();

                pipeline.addLast(new RpcDecoder(Response.class));
                pipeline.addLast(new RpcEncoder(Request.class));
                pipeline.addLast(NettyClient.this);
            }
        });
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
           

// String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":");

// 連接配接 RPC 伺服器
        ChannelFuture future = bootstrap.connect(ip, port).sync();

        // 寫入 RPC 請求資料并關閉連接配接
        Channel channel = future.channel();

        channel.writeAndFlush(request).sync();
        channel.closeFuture().sync();

        return response;
    } finally {
        group.shutdownGracefully();
    }
}

public static void main(String[] args) throws Exception {
    Request request = new Request();
    request.setRequestId(UUID.randomUUID().toString());
    request.setParameter("Hello Server !");
    System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 30000).client(request)));
}           

測試

如果以上所有内容都準備就緒,那麼就可以進行調試了。

啟動順序,先啟動NettyServer,再啟動NettyClient。

總結

記得剛出來工作時,有工作很多年的同僚問我了不了解Netty,當時工作太短,直說聽過Putty,現在回想起來真的挺丢人的,哈哈。😋

Netty作為通信架構,如果你了解TCP,而且項目中有類似傳輸資訊的需求,又不想內建HTTP或者Socket,那麼Netty真的挺實用的。

參考資料:

Dubbo-Netty

Netty.io

本項目Github位址:Netty-RPC

原文位址

https://www.cnblogs.com/yanzhenyidai/p/12901527.html