天天看点

Java并发编程学习-日记5、Netty从DiscardServer实践开始

DiscardServer所示,创建netty服务端,一共有8步,重点在:监控事件的线程组、通道及通道参数、出入站的处理器(绑定在流水线上)、异步监控事件处理

public class NettyDiscardServer {

    private final int serverPort;

    ServerBootstrap b = new ServerBootstrap();//Netty服务端启动器

    public NettyDiscardServer(int port) {

        this.serverPort = port;

    }

    public void runServer() {

        //创建reactor 线程组,创建两个线程组仅仅是为了能够及时的接收到新建连接,

        // 如果将连接监听和事件处理放在同一线程组中,可能会因为事件处理较为耗时,而堵塞连接

        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1); //负责服务器通道新连接的IO事件监听

        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();//负责传输通道的IO事件处理

        try {

            b.group(bossLoopGroup, workerLoopGroup); //1 设置reactor 线程组

            //2 设置nio类型的channel,当然也可设置为堵塞通道b.channel(OioServerSocketChannel.class);

            b.channel(NioServerSocketChannel.class);

            //3 设置监听端口

            b.localAddress(serverPort);

            //4 设置通道的参数,option()用于给【父通道】选项设置,childOption()用于【子通道】选项设置

            b.option(ChannelOption.SO_KEEPALIVE, true);

            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 装配【子通道】流水线:这个流水线日后笔记中会详细说明,流水线用于组织处理器,使得消息在入站后,出站前的一些列处理。类似流与java异常链。

            b.childHandler(new ChannelInitializer<SocketChannel>() {

                //有连接到达时会创建一个channel

                protected void initChannel(SocketChannel ch) throws Exception {

                    // pipeline管理子通道channel中的Handler, 向子channel流水线添加一个handler处理器

                    ch.pipeline().addLast(new NettyDiscardHandler());

                }

            });

            // 6 开始绑定server,通过调用sync同步方法阻塞直到绑定成功,ChannelFuture继承Java的Future接口,用于异步监控,后续笔记中会详细说明。

            ChannelFuture channelFuture = b.bind().sync();

            Logger.info(" 服务器启动成功,监听端口: " + channelFuture.channel().localAddress());

            // 7 等待通道关闭的异步任务结束

            // 服务监听通道会一直等待通道关闭的异步任务结束

            ChannelFuture closeFuture = channelFuture.channel().closeFuture();

            closeFuture.sync();

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            // 8 优雅关闭EventLoopGroup,

            // 释放掉所有资源包括创建的线程

            workerLoopGroup.shutdownGracefully();

            bossLoopGroup.shutdownGracefully();

        }

    }

    public static void main(String[] args) throws InterruptedException {

        new NettyDiscardServer(9000).runServer();

    }

}