天天看點

Netty 線程模型與基本使用

原創 Se7en Se7en的架構筆記 2021-03-27 20:52

為什麼使用 Netty

Netty 是一個異步事件驅動的網絡應用程式架構,用于快速開發可維護的高性能和高伸縮性的伺服器和用戶端。Netty 擁有高性能,吞吐量更高,延遲更低,減少資源消耗,最小化不必要的記憶體複制等優點。

Netty 和 NIO

NIO 的缺點

  • NIO 的類庫和 API 繁雜,學習成本高,你需要熟練掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
  • 需要熟悉 Java 多線程程式設計。這是因為 NIO 程式設計涉及到 Reactor 模式,你必須對多線程和網絡程式設計非常熟悉,才能寫出高品質的 NIO 程式。
  • 臭名昭著的 epoll bug。它會導緻 Selector 空輪詢,最終導緻 CPU 使用率飙升至 100%。直到 JDK1.7 版本依然沒得到根本性的解決。

Netty 的優點

  • Netty 對 JDK 自帶的 NIO 的 API 進行了良好的封裝,API使用簡單,學習成本低。
  • 功能強大,内置了多種解碼編碼器,支援多種協定。
  • 性能高,對比其他主流的 NIO 架構,Netty 的性能最優。
  • 社群活躍,發現 BUG 會及時修複,疊代版本周期短,不斷加入新的功能。Dubbo、Elasticsearch 都采用了 Netty,品質得到驗證。

Netty線程模型

Netty 線程模型與基本使用

模型解釋

  • 1.Netty 抽象出兩組線程池 BossGroup 和 WorkerGroup,BossGroup 專門負責接收用戶端的連接配接, WorkerGroup 專門負責網絡的讀寫。
  • 2.BossGroup 和 WorkerGroup 類型都是 NioEventLoopGroup。NioEventLoopGroup 相當于是一個事件循環線程組,這個組内含有多個事件循環線程,每一個事件循環線程都是 NioEventLoop。
  • 3.每個 NioEventLoop 都有一個 Selector,用于監聽注冊在其上的 SocketChannel 的網絡通訊。
  • 4.每個 Boss NioEventLoop 線程内部循環執行的3個步驟:
    • 處理 Accept 事件,與用戶端建立連接配接,生成 NioSocketChannel。
    • 将 NioSocketChannel 注冊到某個 Worker NioEventLoop 上的 Selector。
    • 處理任務隊列的任務,即 runAllTask。
  • 5.每個 Worker NioEventLoop 線程内部循環執行的3個步驟:
    • 輪詢注冊到自己 Selector 上的所有 NioSocketChannel 的 read,write 事件。
    • 處理 I/O 事件,即 read,write 事件,在對應的 NioSocketChannel 處理業務。
    • runAllTasks 處理任務隊列 TaskQueue 的任務,一些耗時的業務處理一般可以放入 TaskQueue 中慢慢處理,這樣不影響資料在 Pipeline 中的流動處理。
  • 6.每個 Worker NioEventLoop 處理 NioSocketChannel 業務時,會使用 Pipeline(管道),Pipeline 中維護了很多的 handler 處理器用來處理 NioSocketChannel 中的資料。

Netty 用戶端 & 伺服器開發

建立并配置伺服器啟動器

Bootstrap、ServerBootstrap

Bootstrap 意思是引導,一個 Netty 應用通常由一個 Bootstrap 開始,主要作用是配置整個 Netty 程式,串聯各個元件,Netty 中 Bootstrap 類是用戶端程式的啟動引導類,ServerBootstrap 是服務端啟動引導類。

Bootstrap 和 ServerBootStrap 是 Netty 提供的一個建立用戶端和服務端啟動器的工廠類,使用這個工廠類非常便利地建立啟動類。

Netty 線程模型與基本使用

group()

服務端要使用兩個線程組:

  • bossGroup 用于監聽用戶端連接配接,專門負責與用戶端建立連接配接,并把連接配接注冊到 workerGroup 的 Selector 中。
  • workerGroup 用于處理每一個連接配接發生的讀寫事件。

一般建立線程組直接使用以下new就完事了:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();      

預設的線程數是cpu核數的兩倍。假設想自定義線程數,可以使用有參構造器:

//設定bossGroup線程數為1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//設定workerGroup線程數為8
EventLoopGroup workerGroup = new NioEventLoopGroup(8);      

channel()

這個方法用于設定通道類型,當建立連接配接後,會根據這個設定建立對應的 Channel 執行個體。

常用的就是這兩個通道類型,因為是異步非阻塞的。是以是首選:

  • NioSocketChannel:異步非阻塞的用戶端 TCP Socket 連接配接。
  • NioServerSocketChannel:異步非阻塞的伺服器端 TCP Socket 連接配接。

還有就是同步阻塞的,一般沒什麼人用:

  • OioSocketChannel:同步阻塞的用戶端 TCP Socket 連接配接。
  • OioServerSocketChannel:同步阻塞的伺服器端 TCP Socket 連接配接。

option() 與 childOption()

option() 設定的是服務端用于接收進來的連接配接,也就是 boosGroup 線程。option() 常用參數:

SO_BACKLOG //Socket 參數,服務端接受連接配接的隊列長度,如果隊列已滿,用戶端連接配接将被拒絕。預設值,Windows 為200,其他為128。      

childOption() 是提供給父管道接收到的連接配接,也就是 workerGroup 線程。childOption() 常用的參數:

SO_RCVBUF  //Socket 參數,TCP 資料接收緩沖區大小。
TCP_NODELAY //TCP 參數,立即發送資料,預設值為 True。
SO_KEEPALIVE //Socket 參數,連接配接保活,預設值為 False。啟用該功能時,TCP 會主動探測空閑連接配接的有效性。      

pipeline(ChannelPipeline)

ChannelPipeline 是 Netty 處理請求的責任鍊,ChannelHandler 則是具體處理請求的處理器。在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應,它們的組成關系如下:

Netty 線程模型與基本使用

處理器 Handler 分為兩種:ChannelInboundHandlerAdapter(入站處理器)、ChannelOutboundHandler(出站處理器)。

一個 Channel 包含了一個 ChannelPipeline,而 ChannelPipeline 中又維護了一個由 ChannelHandlerContext 組成的雙向連結清單,并且每個 ChannelHandlerContext 中又關聯着一個 ChannelHandler,通過 ChannelHandlerContext 上下文對象,就可以拿到 Channel、Pipeline 等對象,就可以進行讀寫等操作。

read事件(入站事件)和write事件(出站事件)在一個雙向連結清單中,入站事件會從連結清單 head 往後傳遞到最後一個入站的 handler,出站事件會從連結清單 tail 往前傳遞到最前一個出站的 Handler。兩種類型的 Handler 互不幹擾,相同類型的 Handler 的處理順序是有影響的。

在 Bootstrap 中 childHandler() 方法需要初始化通道,執行個體化一個 ChannelInitializer,這時候需要重寫 initChannel() 初始化通道的方法,裝配流水線就是在這個地方進行。代碼如下:

//使用匿名内部類的形式初始化通道對象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { //建立通道初始化對象,設定初始化參數,在 SocketChannel 建立起來之前執行
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
    //對 workerGroup的SocketChannel 設定處理器,調用我們自定義的 NettyServerHandler
      ch.pipeline().addLast(new NettyServerHandler());
   }
 });      

自定義 NettyServerHandler 代碼如下:

package com.chengzw.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
 * 自定義 Handler 需要繼承 netty 規定好的某個 HandlerAdapter(規範)
 * @author 程治玮
 * @since 2021/3/25 9:31 下午
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 讀取用戶端發送的資料
     *
     * @param ctx 上下文對象, 含有通道 channel,管道 pipeline
     * @param msg 就是用戶端發送的資料
     * @throws Exception
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("伺服器讀取線程 " + Thread.currentThread().getName());
        //将 msg 轉成一個 ByteBuf,類似 NIO 的 ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("用戶端發送的消息是:" + buf.toString(CharsetUtil.UTF_8));
    }
    /**
     * 資料讀取完畢後的處理方法
     *
     * @param ctx 上下文對象, 含有通道 channel,管道 pipeline
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }
    /**
     * 處理異常, 一般是需要關閉通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}      

bind()

提供用于服務端或者用戶端綁定伺服器位址和端口号,預設是異步啟動,如果加上 sync() 方法則是同步。

// sync 同步
ChannelFuture channelFuture = bootstrap.bind(9000).sync();
// 異步
// 給cf注冊監聽器,監聽我們關心的事件
channelFuture.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
          if (channelFuture.isSuccess()) {
             System.out.println("監聽端口9000成功");
          } else {
             System.out.println("監聽端口9000失敗");
          }
      }
 });      

優雅地關閉 EventLoopGroup

//釋放掉所有的資源,包括建立的線程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();      

服務端啟動器完整代碼

package com.chengzw.netty.base;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
 * Netty 服務端
 * @author 程治玮
 * @since 2021/3/25 9:31 下午
 */
public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        // 建立兩個線程組 bossGroup 和 workerGroup, 含有的子線程 NioEventLoop 的個數預設為cpu核數的兩倍
        // bossGroup 隻是處理連接配接請求 ,真正的和用戶端業務處理,會交給 workerGroup 完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(1); //1個線程
        EventLoopGroup workerGroup = new NioEventLoopGroup(8); //8個線程
        try {
            // 建立伺服器端的啟動對象
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 使用鍊式程式設計來配置參數
            bootstrap.group(bossGroup, workerGroup) //設定兩個線程組
                    // 使用 NioServerSocketChannel 作為伺服器的通道實作,該類用于執行個體化新的 Channel 來接收用戶端的連接配接
                    .channel(NioServerSocketChannel.class)
                    // 初始化伺服器連接配接隊列大小,服務端處理用戶端連接配接請求是順序處理的,是以同一時間隻能處理一個用戶端連接配接。
                    // 多個用戶端同時來的時候,服務端将不能處理的用戶端連接配接請求放在隊列中等待處理
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() { //建立通道初始化對象,設定初始化參數,在 SocketChannel 建立起來之前執行
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //對 workerGroup的SocketChannel 設定處理器,調用我們自定義的 NettyServerHandler
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start ...");
            // 綁定一個端口并且同步, 生成了一個 ChannelFuture 異步對象,通過 isDone() 等方法可以判斷異步事件的執行情況
            // 啟動伺服器(并綁定端口),bind 是異步操作,sync 方法是等待異步操作執行完畢
            // sync 同步
            ChannelFuture channelFuture = bootstrap.bind(9000).sync();
            // 異步
            // 給cf注冊監聽器,監聽我們關心的事件
            /*channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (channelFuture.isSuccess()) {
                        System.out.println("監聽端口9000成功");
                    } else {
                        System.out.println("監聽端口9000失敗");
                    }
                }
            });*/
            // 等待服務端監聽端口關閉,closeFuture是異步操作
            // 通過sync方法同步等待通道關閉處理完畢,這裡會阻塞等待通道關閉完成,内部調用的是Object的wait()方法
            // 在這裡面cf.channel().closeFuture().sync();這個語句的主要目的是,如果缺失上述代碼,則main方法所在的線程,
            // 即主線程會在執行完bind().sync()方法後,會進入finally 代碼塊,之前的啟動的nettyserver也會随之關閉掉,整個程式都結束了。
            // 原文的例子有英文注釋:
            // Wait until the server socket is closed,In this example, this does not happen, but you can do that to gracefully shut down your server.
            // 線程進入wait狀态,也就是main線程暫時不會執行到finally裡面,nettyserver也持續運作,如果監聽到關閉事件,可以優雅的關閉通道和nettyserver,
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 資源優雅釋放
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}      

建立并配置用戶端啟動器

用戶端隻需要一個 NioEventLoopGroup,其餘代碼和伺服器類似。首先自定義 NettyClientHandler 用于處理用戶端 ChannelPipeline 的業務。

package com.chengzw.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
 * 自定義 Handler 需要繼承 netty 規定好的某個 HandlerAdapter(規範)
 * @author 程治玮
 * @since 2021/3/25 9:50 下午
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 當用戶端連接配接伺服器完成就會觸發該方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }
    /**
     * 當通道有讀取事件時會觸發,即服務端發送資料給用戶端
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到服務端的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服務端的位址: " + ctx.channel().remoteAddress());
    }
    /**
     * 處理異常, 一般是需要關閉通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}      

然後配置用戶端啟動器:

package com.chengzw.netty.base;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
 * Netty 用戶端
 * @author 程治玮
 * @since 2021/3/25 9:52 下午
 */
public class NettyClient {
    public static void main(String[] args) throws Exception {
        //用戶端需要一個事件循環組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //建立用戶端啟動對象
            //注意用戶端使用的不是ServerBootstrap而是Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //設定相關參數
            bootstrap.group(group) //設定線程組
                    .channel(NioSocketChannel.class) // 使用NioSocketChannel作為用戶端的通道實作
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //加入處理器
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("netty client start..");
            //啟動用戶端去連接配接伺服器端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
            //對通道關閉進行監聽
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}      

參考連結