天天看點

【Netty】Netty 入門案例分析 ( Netty 模型解析 | Netty 伺服器端代碼 | Netty 用戶端代碼 )

文章目錄

  • ​​一、 Netty 模型代碼解析​​
  • ​​二、 Netty 案例伺服器端代碼​​
  • ​​1 . 伺服器主程式​​
  • ​​2 . 伺服器自定義 Handler 處理者​​
  • ​​三、 Netty 案例用戶端代碼​​
  • ​​1 . 用戶端主程式​​
  • ​​2 . 用戶端自定義 Handler 處理者​​
  • ​​四、 Netty 案例運作​​

一、 Netty 模型代碼解析

​1 . 線程池 NioEventLoopGroup :​

​① NioEventLoopGroup 線程池使用場景 :​ Netty 模型中的 BossGroup 和 WorkerGroup 都是 NioEventLoopGroup 類型的線程池 ;

​② NioEventLoopGroup 預設線程個數 :​ 系統預設每個線程池中的 NioEventLoop 線程數是 CPU 核數

×

\times

× 2 , 下面的代碼可以擷取運作 Netty 程式的裝置的 CPU 核數 ;

// 擷取裝置的 CPU 核數
NettyRuntime.availableProcessors()      

​③ 指定 NioEventLoopGroup 線程個數 :​ 如果不想使用 Netty 線程池的預設線程個數 , 可以在 NioEventLoopGroup 構造函數中子星設定線程數 ;

// BossGroup 線程池 : 負責用戶端的連接配接
// 指定線程個數 : 用戶端個數很少, 不用很多線程維護, 這裡指定線程池中線程個數為 1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);      

​2 . NioEventLoopGroup 線程池線程配置設定 :​

以用戶端連接配接完成後 , 資料讀寫場景舉例 ;

​在 雙核 CPU 的伺服器上 , NioEventLoopGroup 預設有

4

4

4 個線程 ; 按照順序循環配置設定 , 為第

n

n

n 個用戶端配置設定第

n

%

4

n \% 4

n%4 個 NioEventLoop 線程 ;​

  • 用戶端

    0 與伺服器進行資料互動在 NioEventLoop

    0 線程中 ;

  • 用戶端

    1

    1

    1 與伺服器進行資料互動在 NioEventLoop

    1

    1

    1 線程中 ;

  • 用戶端

    2

    2

    2 與伺服器進行資料互動在 NioEventLoop

    2

    2

    2 線程中 ;

  • 用戶端

    3

    3

    3 與伺服器進行資料互動在 NioEventLoop

    3

    3

    3 線程中 ;

  • 用戶端

    4

    4

    4 與伺服器進行資料互動在 NioEventLoop

    0 線程中 ;

  • 用戶端

    5

    5

    5 與伺服器進行資料互動在 NioEventLoop

    1

    1

    1 線程中 ;

  • 用戶端

    6

    6

    6 與伺服器進行資料互動在 NioEventLoop

    2

    2

    2 線程中 ;

  • 用戶端

    7

    7

    7 與伺服器進行資料互動在 NioEventLoop

    3

    3

    3 線程中 ;

​3 . NioEventLoopGroup 線程池封裝内容 :​

​① NioEventLoopGroup 中的若幹個 NioEventLoop 線程​都封裝在 children 中 , 線程個數是 CPU 核數 2 倍 ;

​② 每個 NioEventLoop 線程中封裝了如下内容 :​

  • ​選擇器 ( Selector )​ , 用于監聽用戶端的讀寫 IO 事件 ;
  • ​任務隊列 ( taskQueue )​ , 用于存儲事件對應的業務邏輯任務 ;
  • ​線程執行器 ( executor )​ , 用于執行線程 ;

​4 . ChannelHandlerContext 通道處理者上下文對象封裝内容 :​

​① 使用者自定義的 處理者 ( Handler ) ,​ 這裡指的是 伺服器端的 ServerHandr ( 自定義 ) , 用戶端的 ClientHandler ( 自定義 ) ;

​② 管道 ( ChannelPipeline ) :​ 其本質是雙向連結清單 , 該 ChannelHandlerContext 可以擷取該連結清單的前一個 ( prev ) , 後一個管道對象 ( next ) ;

​③ 管道 與 通道 :​

  • 二者都可以通過 通道處理者上下文 ( ChannelHandlerContext ) 擷取 ;
  • 管道 與 通道 都可以互相從對方擷取 ;
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline();
channel = pipeline.channel();
pipeline = channel.pipeline();      

​④ 管道 ( Pipeline ) 與 通道 ( Channel ) 關聯 :​ 通過管道可以擷取通道 , 通過通道也可以擷取其對應的管道 ;

​5 . 處理者 ( Handler ) :​

​① 設定 Handler :​ 給 WorkerGroup 線程池中的 EventLoop 線程對應的管道設定 處理器 ( Handler ) ;

​② 自定義 Handler :​ 一般這個 Handler 都是使用者自定義的類 , 繼承 ChannelInboundHandlerAdapter 類 ;

​③ 運作機制 :​ 在 BossGroup 中連接配接用戶端成功後 , 将 NioSocketChannel 注冊給 WorkerGroup 中的 EventLoop 中的 選擇器 ( Selector ) , 如果監聽到用戶端資料 IO 事件 , 就會調用 管道 ( Pipeline ) 處理該事件 , 管道 ( Pipeline ) 中調用 處理器 ( Handler ) 處理相應的事件 , 該 處理器 ( Handler ) 可以是 Netty 提供的 , 也可以是開發者自定義的 ;

​特别注意 : 自定義 Handler 中 , 重寫的 ChannelInboundHandlerAdapter 方法 , 将 super() 語句都删除 ;​

二、 Netty 案例伺服器端代碼

1 . 伺服器主程式

package kim.hsl.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Netty 案例伺服器端
 */
public class Server {
    public static void main(String[] args) {

        // 1. 建立 BossGroup 線程池 和 WorkerGroup 線程池, 其中維護 NioEventLoop 線程
        //     NioEventLoop 線程中執行無限循環操作

        // BossGroup 線程池 : 負責用戶端的連接配接
        // 指定線程個數 : 用戶端個數很少, 不用很多線程維護, 這裡指定線程池中線程個數為 1
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // WorkerGroup 線程池 : 負責用戶端連接配接的資料讀寫
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        // 2. 伺服器啟動對象, 需要為該對象配置各種參數
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup) // 設定 主從 線程組 , 分别對應 主 Reactor 和 從 Reactor
                .channel(NioServerSocketChannel.class)  // 設定 NIO 網絡套接字通道類型
                .option(ChannelOption.SO_BACKLOG, 128)  // 設定線程隊列維護的連接配接個數
                .childOption(ChannelOption.SO_KEEPALIVE, true)  // 設定連接配接狀态行為, 保持連接配接狀态
                .childHandler(  // 為 WorkerGroup 線程池對應的 NioEventLoop 設定對應的事件 處理器 Handler
                        new ChannelInitializer<SocketChannel>() {// 建立通道初始化對象
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                // 為 管道 Pipeline 設定處理器 Hanedler
                                ch.pipeline().addLast(new ServerHandr());
                            }
                        }
                );
        System.out.println("伺服器準備完畢 ...");

        ChannelFuture cf = null;
        try {
            // 綁定本地端口, 進行同步操作 , 并傳回 ChannelFuture
            cf = bootstrap.bind(8888).sync();
            System.out.println("伺服器開始監聽 8888 端口 ...");
            // 關閉通道 , 開始監聽操作
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 出現異常後, 優雅的關閉
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}      

2 . 伺服器自定義 Handler 處理者

package kim.hsl.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;

/**
 * Handler 處理者, 是 NioEventLoop 線程中處理業務邏輯的類
 *
 * 繼承 : 該業務邏輯處理者 ( Handler ) 必須繼承 Netty 中的 ChannelInboundHandlerAdapter 類
 * 才可以設定給 NioEventLoop 線程
 *
 * 規範 : 該 Handler 類中需要按照業務邏輯處理規範進行開發
 */
public class ServerHandr extends ChannelInboundHandlerAdapter {

    /**
     * 讀取資料 : 在伺服器端讀取用戶端發送的資料
     * @param ctx
     *      通道處理者上下文對象 : 封裝了 管道 ( Pipeline ) , 通道 ( Channel ), 用戶端位址資訊
     *      管道 ( Pipeline ) : 注重業務邏輯處理 , 可以關聯很多 Handler
     *      通道 ( Channel ) : 注重資料讀寫
     * @param msg
     *      用戶端上傳的資料
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 檢視 ChannelHandlerContext 中封裝的内容
        System.out.println("channelRead : ChannelHandlerContext ctx = " + ctx);

        // 将用戶端上傳的資料轉為 ByteBuffer
        // 這裡注意該類是 Netty 中的 io.netty.buffer.ByteBuf 類
        // 不是 NIO 中的 ByteBuffer
        // io.netty.buffer.ByteBuf 性能高于 java.nio.ByteBuffer
        ByteBuf byteBuf = (ByteBuf) msg;
        // 将 ByteBuf 緩沖區資料轉為字元串, 列印出來
        System.out.println(ctx.channel().remoteAddress() + " 接收到用戶端發送的資料 : " + 
            byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 伺服器端讀取資料完畢後回調的方法
     * @param ctx
     *      通道處理者上下文對象 : 封裝了 管道 ( Pipeline ) , 通道 ( Channel ), 用戶端位址資訊
     *      *      管道 ( Pipeline ) : 注重業務邏輯處理 , 可以關聯很多 Handler
     *      *      通道 ( Channel ) : 注重資料讀寫
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 資料編碼 : 将字元串編碼, 存儲到 io.netty.buffer.ByteBuf 緩沖區中
        ByteBuf byteBuf = Unpooled.copiedBuffer("Hello Client", CharsetUtil.UTF_8);

        // 寫出并重新整理操作 : 寫出資料到通道的緩沖區 ( write ), 并執行重新整理操作 ( flush )
        ctx.writeAndFlush(byteBuf);
    }

    /**
     * 異常處理 , 上面的方法中都抛出了 Exception 異常, 在該方法中進行異常處理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("通道異常, 關閉通道");
        //如果出現異常, 就關閉該通道
        ctx.close();
    }
}      

三、 Netty 案例用戶端代碼

1 . 用戶端主程式

package kim.hsl.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {
    public static void main(String[] args) {
        // 用戶端隻需要一個 時間循環組 , 即 NioEventLoopGroup 線程池
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        // 用戶端啟動對象
        Bootstrap bootstrap = new Bootstrap();
        // 設定相關參數
        bootstrap.group(eventLoopGroup)     // 設定用戶端的線程池
                .channel(NioSocketChannel.class)    // 設定用戶端網絡套接字通道類型
                .handler(   // 設定用戶端的線程池對應的 NioEventLoop 設定對應的事件處理器 Handler
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ClientHandr());
                            }
                        }
                );

        try {
            // 開始連接配接伺服器, 并進行同步操作
            // ChannelFuture 類分析 , Netty 異步模型
            // sync 作用是該方法不會再次阻塞
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
            System.out.println("用戶端連接配接伺服器成功 ...");

            // 關閉通道, 開始監聽
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 優雅的關閉
            eventLoopGroup.shutdownGracefully();
        }
    }
}      

2 . 用戶端自定義 Handler 處理者

package kim.hsl.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * Handler 處理者, 是 NioEventLoop 線程中處理業務邏輯的類
 *
 * 繼承 : 該業務邏輯處理者 ( Handler ) 必須繼承 Netty 中的 ChannelInboundHandlerAdapter 類
 * 才可以設定給 NioEventLoop 線程
 *
 * 規範 : 該 Handler 類中需要按照業務邏輯處理規範進行開發
 */
public class ClientHandr extends ChannelInboundHandlerAdapter {

    /**
     * 通道就緒後觸發該方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 檢視 ChannelHandlerContext 中封裝的内容
        System.out.println("channelActive : ChannelHandlerContext ctx = " + ctx);

        // 資料編碼 : 将字元串編碼, 存儲到 io.netty.buffer.ByteBuf 緩沖區中
        ByteBuf byteBuf = Unpooled.copiedBuffer("Hello Server", CharsetUtil.UTF_8);

        // 寫出并重新整理操作 : 寫出資料到通道的緩沖區 ( write ), 并執行重新整理操作 ( flush )
        ctx.writeAndFlush(byteBuf);
        System.out.println("用戶端向伺服器端發送 Hello Server 成功");
    }

    /**
     * 讀取資料 : 在伺服器端讀取用戶端發送的資料
     * @param ctx
     *      通道處理者上下文對象 : 封裝了 管道 ( Pipeline ) , 通道 ( Channel ), 用戶端位址資訊
     *      管道 ( Pipeline ) : 注重業務邏輯處理 , 可以關聯很多 Handler
     *      通道 ( Channel ) : 注重資料讀寫
     * @param msg
     *      伺服器傳回的資料
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 檢視 ChannelHandlerContext 中封裝的内容
        System.out.println("channelRead : ChannelHandlerContext ctx = " + ctx);

        // 将伺服器下發的資料轉為 ByteBuffer
        // 這裡注意該類是 Netty 中的 io.netty.buffer.ByteBuf 類
        // 不是 NIO 中的 ByteBuffer
        // io.netty.buffer.ByteBuf 性能高于 java.nio.ByteBuffer
        ByteBuf byteBuf = (ByteBuf) msg;
        // 将 ByteBuf 緩沖區資料轉為字元串, 列印出來
        System.out.println(ctx.channel().remoteAddress() + " 伺服器傳回的資料 : " + 
            byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 異常處理 , 上面的方法中都抛出了 Exception 異常, 在該方法中進行異常處理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("通道異常, 關閉通道");
        //如果出現異常, 就關閉該通道
        ctx.close();
    }
}      

四、 Netty 案例運作

​1 . 運作伺服器端 :​ 伺服器啟動 , 監聽 8888 端口 ;

【Netty】Netty 入門案例分析 ( Netty 模型解析 | Netty 伺服器端代碼 | Netty 用戶端代碼 )

​2 . 運作用戶端 :​ 用戶端連接配接伺服器的 8888 端口 , 并向伺服器端寫出 Hello Server 字元串 , 之後便接到伺服器端回送的 Hello Client 字元串資訊 ;

【Netty】Netty 入門案例分析 ( Netty 模型解析 | Netty 伺服器端代碼 | Netty 用戶端代碼 )

​3 . 檢視用戶端 :​ 伺服器端接收到用戶端資訊 , 向用戶端寫出 Hello Client 字元串 ;

繼續閱讀