天天看点

Netty使用指南

目录

      • netty的使用
      • netty的核心模块
        • 线程模型EventLoop、EventLoopGroup
        • 启动引导类Bootstrap
        • 通道Channel
        • 缓冲ByteBuf
        • 编码、解码
      • tcp粘包、拆包
        • 现象
        • 发生原因
        • 解决方案
      • netty用到的设计模式
      • 构建万量级连接的注意点
      • 互联网架构之数据链路分析

netty用于应用之间的网络通信,eg. dubbo的2个服务之间使用netty通信,交换数据。

netty和传统的http通信一样分为客户端、服务端2部分,客户端发送数据,服务端接收、处理数据,很多时候一个应用既是客户端又是服务端。

唯一确定一个连接:源ip、源端口、目的ip、目的端口。

依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.51.Final</version>
</dependency>
           

客户端

handler

/**
 * 客户端的handler,用于向服务端发送消息、接收服务端返回的数据
 */
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private static ChannelHandlerContext ctx;

    /**
     * 建立连接时channel被激活,会自动调用此方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ClientHandler.ctx = ctx;
    }

    /**
     * 接收到服务端返回的数据时自动调用
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("接收到服务端返回的数据: " + msg.toString(CharsetUtil.UTF_8));
    }

    /**
     * 处理完服务端返回的数据时自动调用
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    /**
     * 发生异常时自动调用
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
        e.printStackTrace();
        ctx.close();
    }

    /**
     * 向服务端发送消息,暴露出去给外部调用
     */
    public static void sendMsg(String msg) {
        // 不能直接发送,要转换为ByteBuf发送
        ByteBuf buf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
        ClientHandler.ctx.writeAndFlush(buf);
    }

}
           

启动类

/**
 * 客户端的启动类
 */
public class Client {

    /**
     * 此处的host、port指的是服务端的
     */
    private String host;
    private int port;

    public Client(String host, int port) {
        this.host = host;
        this.port = port;
    }

    /**
     * 启动客户端
     */
    public void start() {
        System.out.println("客户端启动中...");

        //EventLoop可以看做线程,EventLoopGroup可以看做线程组
        EventLoopGroup group = new NioEventLoopGroup();
        //Bootstrap是启动引导类
        Bootstrap bootstrap = new Bootstrap();

        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                //指定服务端地址
                .remoteAddress(new InetSocketAddress(host, port))
                //允许复用host、port,默认false
                .option(ChannelOption.SO_REUSEADDR,true)
                //指定客户端使用的handler
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //可以指定多个handler,handler的添加顺序即消息处理顺序
                        ch.pipeline().addLast(new ClientHandler());
                        // ch.pipeline().addLast(new XxxHandler());
                    }
                });

        try {
            //连接到服务端。connect是异步连接,需要调用sync同步等待连接成功
            ChannelFuture channelFuture = bootstrap.connect().sync();
            System.out.println("客户端启动成功");
            //阻塞线程直到客户端channel关闭
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            System.out.println("客户端启动失败!");
            e.printStackTrace();
        } finally {
            //优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }

    }


    public static void main(String[] args) throws InterruptedException {
        //启动一个客户端。因为启动后会阻塞所在线程,所以单独用一个线程来启动
        new Thread(()->{
            new Client("127.0.0.1", 8000).start();
        }).start();

        //向服务端发送一条消息作为测试
        Thread.sleep(1000);
        ClientHandler.sendMsg("hello");
    }


}
           

服务端

@ChannelHandler.Sharable  //允许多个线程使用此handler
public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        System.out.println("服务端收到数据: "+ data.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(data);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
        e.printStackTrace();
        ctx.close();
    }


}
           
/**
 * 服务端的启动类
 */
public class Server {

    /**
     * 服务端使用的端口
     */
    private int port;

    public Server(int port) {
        this.port = port;
    }

    /**
     * 启动服务端
     */
    public void start(){
        System.out.println("服务端启动中...");

        //配置服务端的线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                //设置存放已完成tcp三次握手的请求的等待队列的最大长度
                .option(ChannelOption.SO_BACKLOG, 1024)
                //允许复用host、port
                .option(ChannelOption.SO_REUSEADDR,true)
                //指定要使用的handler,同样可以指定多个handler
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });

        try {
            //绑定端口,同步等待绑定成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            System.out.println("服务端启动成功");
            //等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            System.out.println("服务端启动失败!");
            e.printStackTrace();
        } finally {
            //优雅退出,释放线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

    }


    public static void main(String[] args) {
        //启动一个服务端
        new Server(8000).start();
    }


}
           

先启动服务端,再启动客户端。

高性能RPC框架的3要素:IO模型、数据协议、线程模型

EventLoop相当于一个线程,一个EventLoop对应一个 Selector,可以创建、管理多个Channel。

EventLoopGroup相当于线程组,包含多个EventLoop。

默认创建的线程数量:cpu核心数*2

Bootstrap用于引导启动一个netty应用(客户端或者服务端)。

Channel:客户端、服务端之间的一个连接通道

ChannelHandler: 负责处理Channel的业务逻辑

ChannelPipeline: 作为ChannelHandler的有序容器,管理多个ChannelHandler的执行顺序

一个Channel对应一个ChannelPipeline,创建Channel时会自动创建一个对应的ChannelPipeline。

Channel的事件、状态

  • channelRegistered: channel已注册到某个EventLoop上
  • channelUnregistered: channel已创建,但未注册到EventLoop上,没有和Selector绑定
  • channelActive: channel连接到了远程主机(服务端|客户端),变为激活状态,可以发送、接收数据
  • channelInactive: channel没有连接到远程主机,处于非活跃状态

Channel的生命周期

Netty使用指南

ChannelHandler的事件、状态

  • handlerAdded : ChannelHandler 已添加到 ChannelPipeline 中
  • handlerRemoved : ChannelHandler 已从 ChannelPipeline 中移除
  • exceptionCaught : 发生异常

ChannelHandler接口

ChannelHandler有两个子接口

  • ChannelInboundHandler:入站,处理输入的数据, 常用的实现类有ChannelInboundHandlerAdapter、SimpleChannelInboundHandler。其中SimpleChannelInboundHandler可以指定泛型,会自动将消息转换为指定类型。
  • ChannelOutboundHandler:出站,处理输出的数据,常用的实现类有ChannelOutboundHandlerAdapter

ChannelPipeline接口

类似流水线,把多个handler串起来,按一定的顺序执行。入站执行的是InboundHandler队列,出站执行的是OutboundHandler队列。

Netty使用指南
  • 如果有多个InboundHandler,一个handler处理完后,要调用ctx.fireChannelRead(msg)或调用父类同名的方法传给下一个InboundHandler处理
  • InboundHandler调用ctx.write(msg),则会传递给OutboundHandler

ChannelHandlerContext接口

ChannelHandlerContext是连接ChannelHandler、ChannelPipeline的桥梁,三者有一些同名的方法,比如都有write()方法,区别是:Channel、ChannelPipeline会在整个管道流中传播,而ChannelHandlerContext只在后续的handler中传播。

常见的抽象实现类是AbstractChannelHandlerContext,使用双向链表存储handler,此抽象类的常见实现类是DefaultChannelHandlerContext。

服务端返回数据的三种方式

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf data = (ByteBuf) msg;
    System.out.println("服务端收到数据: "+ data.toString(CharsetUtil.UTF_8));

    //第一种,直接使用ChannelHandlerContext,最方便也最常用
    ctx.writeAndFlush(data);

	//第二种,使用ChannelPipeline
    ChannelPipeline pipeline = ctx.pipeline();
    pipeline.writeAndFlush(data);

    //第三种,使用Channel
    Channel channel = ctx.channel();
    channel.writeAndFlush(data);
}
           

数据容器(字节容器),与jdk原生的ByteBuffer相比

  • JDK ByteBuffer:共用读写索引,每次读写操作都需要flip()使索引复位,扩容麻烦,扩容后容易造成空间浪费。
  • Netty ByteBuf: 读写使用不同的索引,自动扩容,使用便捷

ByteBuf的2种创建方式

  • 池化:使用PooledByteBufAllocator获取ByteBuf,从池中获取ByteBuf实例,可以提高性能、减少内存碎片,4.x版本分配buffer默认使用池化方式。
  • 非池化:使用UnpooledByteBufAllocator获取ByteBuf,每次返回新的ByteBuf实例
// 池化方式。4.x版本分配buffer默认使用池化方式
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes(msg.getBytes(Charset.forName("utf-8")));

// 非池化方式
ByteBuf buf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
           

池化、非池化都可以创建堆内存缓冲区、直接内存缓冲区。

我在示例中使用的是非池化,且每次调用方法都是重新获取ByteBuf,没有复用ByteBuf;如果使用池化或者复用了ByteBuf,每次使用完或使用前需要buf.clear();清除ByteBuf中数据、重置索引。

ByteBuf的3种使用模式

1、堆缓冲区 heap buffer

优点:使用堆的内存空间,可以快速分配、释放

缺点:每次使用前都会拷贝到直接缓存区(也叫堆外内存)

2、直接缓冲区 direct buffer

优点:使用堆外内存(直接内存),可直接使用,不占用堆的内存空间

缺点:内存的分配、释放比堆缓冲区复杂

3、复合缓冲区 composite buffer

创建多个不同的ByteBuf,组合使用

选择:如果要进行大量数据的IO读写,用直接缓冲区,速度快;如果只是消息的编码、解码,用堆缓存区,方便管理,内存碎片少。

jdk有编解码,为什么netty还要自己开发编解码?因为jdk自带的编码器编码后数据包可能太大,且编解码性能较差。

netty的解码器负责处理InboundHandler入站的数据,编码器负责处理OutboundHandler出站的数据。

Encoder 编码器

在OutboundHandler中使用,将Object格式的消息转换为其它格式。常见的编码器如下

  • MessageToByteEncoder:转换为byte[ ]
  • MessageToMessageEncoder:转换为其它类型,eg. User -> UserVO,转换为另一种实体类。

Decoder 解码器

在InboundHandler中使用,将byte[ ]或其它格式的消息转换为指定类型的消息。

常见的抽象解码器

  • ByteToMessageDecoder:将字节转为指定类型的消息,会检查ByteBuf中的可用字节
  • ReplayingDecoder :继承自ByteToMessageDecoder,不检查ByteBuf中的可用字节
  • MessageToMessageDecoder :转换为其它类型,eg. User -> UserVO,转换为另一种实体类。

常见的实现类

  • DelimiterBasedFrameDecoder: 指定消息分隔符的解码器
  • LineBasedFrameDecoder: 以换行符作为消息分隔符的解码器
  • FixedLengthFrameDecoder:定长消息的解码器,会把固定长度的字节作为一条消息解析
  • LengthFieldBasedFrameDecoder:指定消息长度的解码器,message = header+body,在header中指定body的长度
  • StringDecoder:文本解码器,将接收到的消息转化为String

前4种都可以用来解决tcp的粘包、拆包问题,最后一种

Codec

编解码器,组合了一对编码器、解码器,既可以编码、又可以解码,不常用。

粘包: 多个小数据包被粘成一个大包,eg. 发送5条短消息,被粘成一条大消息,服务端只收到一条大消息(包含5条小消息的内容)。

拆包: 一个大数据包被拆分为多个小包,eg. 发送一条大消息,被拆成多条小消息,服务端收到多条消息(合起来才是一条完整的大消息)。

粘包、拆包问题又称为半包读写问题。

udp是有边界的数据流,不会出现粘包、拆包问题;tcp是无边界的数据流,没有处理粘包、拆包的机制, 会出现粘包、拆包问题。

tcp出现粘包、拆包的原因

  • 发送方:发送数据包使默认使用Nagle算法优化数据包的发送,凑足一批才会发送
  • 接收方: 接收到的消息会先放到缓冲区中,应用从缓冲区中读取消息,数据包可能比缓冲区大很多或小很多

方案一 发送方禁用Nagle算法

发送发的handler

// 禁用Nagle算法,立刻发送消息
.option(ChannelOption.TCP_NODELAY, true)
           

在一定程度上可以解决粘包问题,不能解决拆包问题。

如果2次消息发送的间隔时间极短,比如发送完一条消息立刻发送下一条消息,仍可能出现粘包问题。

方案二 使用消息分割符(推荐)

使用换行符作为消息分隔符 DelimiterBasedFrameDecoder ,接收方的handler

//指定要使用的handler
.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ServerHandler());
        //使用换行符作为消息分隔符,参数指定帧的最大长度
        ch.pipeline().addLast(new LineBasedFrameDecoder(10240));
    }
});
           

发送消息时,在每条消息的末尾加上换行符 \n 。

Encoder继承了InboundHandler,Decoder继承了OutboundHandler,都属于handler。

如果消息中含有换行符,可以使用自定义的消息分隔符 DelimiterBasedFrameDecoder ,接收方的handler

//指定要使用的handler
.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ServerHandler());
        //使用^^作为消息分隔符
        ByteBuf delimiter = Unpooled.copiedBuffer("^^".getBytes());
        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10240, delimiter));
    }
});
           

发送消息时,在每条消息的末尾加上分隔符 ^^

设置帧的最大长度要足够大,不然消息内容超过帧长度时会报错:TooLongFrameException。

方案三 使用定长消息

设置一个固定的、足够大的消息长度,消息长度不够时补空格。缺点是浪费空间。

FixedLengthFrameDecoder 定长消息解码器。

方案四 在消息头中指定消息长度

message=header+body,body携带消息内容,header中指定消息长度,根据消息长度从缓冲区读取消息。缺点是编码复杂。

LengthFieldBasedFrameDecoder 基于长度字段的解码器

主要有4种

  • 构造器模式:ServerBootstap
  • 工厂模式:Channel的创建
  • 适配器模式:HandlerAdapter
  • 责任链模式:pipeline的事件传播

1、硬件要好,cpu、内存、带宽

2、linux内核参数优化

一个连接的信息会保存在一个文件中,建立连接时会打开这个文件,生成一个文件描述符(fd)。

单个进程可以打开的fd最大数量有限制,建立几千及以上量级的连接时,需要修改单个进程可以打开的fd最大数量。

操作系统允许打开的fd最大数量也有限制,当操作系统建立的连接数为几十万量级时,需要修改操作系统允许打开的fd最大数量。

(1)修改单个进程可以打开的fd最大数量

#查看
ulimit -n

#修改,先切换到root
vim /etc/security/limits.conf

#末尾加上以下代码, 可以指定特定用户,也可以用*表示所有用户
root soft nofile 1000000
root hard nofile 1000000
* soft nofile 1000000
* hard nofile 1000000
           

(2)修改操作系统可以打开的fd最大数量

#查看
cat /proc/sys/fs/file-max

#修改,切换到root
vim  /etc/sysctl.conf

#末尾加上
fs.file-max = 1000000

#立刻生效。此命令会使全局的fd限制修改会立刻生效,但单个进程的fd限制修改仍需要重启linux才会生效。
#sysctl -p
           

(3)修改somaxconn参数

somaxconn指定linux内核可以建立的最大连接数,默认为1024

vim /proc/sys/net/core/somaxconn
           

3、在程序中指定等待队列的最大长度

//设置存放已完成tcp三次握手的请求的等待队列的最大长度
.option(ChannelOption.SO_BACKLOG, 10240)
           

此参数受somaxconn参数大小的限制,大于somaxconn时,自动取somaxconn的值。

java -jar xxx.jar -Xms5g -Xmx5g -XX:NewSize=3g -XX:MaxNewSize=3g
           

  • 输入域名 -> 浏览器内核调度 -> 本地dns解析 -> 远程dns解析 -> 获取到ip -> 路由多层跳转 -> 目的服务器(负载均衡等应用所在的服务器)
  • 服务器内核 -> nginx等代理|网关 -> 目的服务器(应用程序所在服务器)
  • 服务器内核 -> 应用程序(springboot)-> redis -> mysql