天天看點

網絡程式設計-netty

netty
1.netty介紹:
	netty是由jboss提供的基于nio的網絡程式設計開源架構,它采用異步,事件驅動的方式用來快速開發一個高性能高可靠的網絡IO程式;elasticsearch,dubbox内部采用的就是netty
2.netty線程模型:
* 單線程模型:
	單線程多路複用的方式來完成伺服器端包括建立用戶端連接配接,讀寫的所有操作,編碼簡單,但是無法滿足大量用戶端連接配接的需求,傳統的nio程式設計便是這種方式
* 線程池模型:
	伺服器端使用單線程管理用戶端的連接配接,同時使用一個線程池來管理其他的網絡IO操作
* netty模型; NioEventLoopGroup
	伺服器端使用bossGroup線程池管理用戶端連接配接請求,同時使用workerGroup管理其他的所有IO操作;bossGroup和workerGroup對應的執行個體是NioEventLoopGroup類,每個NioEventLoopGrou中對應有多個NioEventLoop,而每個NioEventLoop對應線程池中的單個線程,每個NioEventLoop都有自己的Selector并在Selector上注冊各種事件和Channel,以及taskQueue;
	bossGroup對應多個Selector和ServerSocketChannel,workerGroup對應多個Selector和SocketChannel;ChannelPipeline是貫穿整個netty的一條處理鍊,所有的入站出站操作都需要挂載到ChannelPipeline上才能生效
3.netty異步模型:
	netty異步模型基于future和callback,它的核心思想:當需要調用方法時,直接傳回一個future,使用future和callbackk來監控方法的處理過程
	
4.netty API:
* 業務邏輯處理類(workerGroup使用)
    自定義一個Handler繼承ChannelInboundHandlerAdapter類,重寫通道入站處理器擴充卡中的預設空實作的方法
public void channelActive(ChannelHandlerContext ctx),通道就緒事件
public void channelRead(ChannelHandlerContext ctx, Object msg),通道讀取資料事件
public void channelReadComplete(ChannelHandlerContext ctx) ,資料讀取完畢事件
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道發生異常
事件

* Pipeline 和 ChannelPipeline
	ChannelPipeline是一個Handler集合,是貫穿整個netty的一條處理鍊
* ChannelHandlerContext
	這是事件處理器上下文對象,代表ChannelPipeline中每個Handler的處理節點
 	ctx.writeAndFlush(Unpooled.copiedBuffer("你好",CharsetUtil.UTF_8));
* ChannelOption 設定socket标準參數
* ChannelFuture 監控nettyI/O操作的結果
* ServerBootstrap 伺服器端啟動助手類,用于設定參數
* Bootstrap 用戶端啟動助手類
* Unpooled netty提供的用來操作緩沖區的工具類,netty的位元組緩沖區叫ByteBuf,不是ButeBuffer
	ByteBuf byteBuf = Unpooled.copiedBuffer("你好",CharsetUtil.UTF_8);

5.netty坐标
<dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.1.8.Final</version>
</dependency>

6.編碼和解碼
* 方式一: 采用jdk自帶的序列化方式
缺點:無法跨語言,且序列化後體積是二進制的5倍
* 方式二: netty提供的編碼器解碼器,ObjectDecoder/ObjectEncoder
缺點:内部使用的是jdk序列化方式,同上
* 方式三:google提供的Protobuf序列化
跨語言,且高性能高可靠

7.Protobuf序列化使用步驟
第一步:導入依賴
<dependency>
     <groupId>com.google.protobuf</groupId>
     <artifactId>protobuf-java</artifactId>
     <version>3.6.1</version>
</dependency>
第二步:編寫proto檔案
# syntax 設定版本号
# BookMessage 設定生成java檔案的類名
# Book 生成内部類的類名(實際需要使用的實體類)
# string name = 2 設定實體類屬性,并指定序号為2(不是實體類屬性)
syntax = "proto3";
option java_outer_classname = "BookMessage";
message Book{
    int32 id = 1;
    string name = 2;
}
第三步:使用protoc.exe工具軟體,cmd輸入指令,生成BookMessage.java檔案,并拷貝到項目中
protoc --java_out=. Book.proto
第四步:在netty中使用
Client: socketChannel.addLast("encoder",new ProtobufEncoder());
MessageBook book = MessageBook.Book.newBuilder().setId(1).setName("張三").build();
Server: socketChannel.addLast("decoder",new ProtobufDecoder(BookMessage.Book.getDefaultInstance()));
           
netty demo
案例: 實作用戶端向伺服器端對話
1.伺服器端
public class TestNettyServer {
    public static void main(String[] args) throws Exception {
        //1. 建立一個線程組:接收用戶端連接配接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //2. 建立一個線程組:處理網絡操作
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //3. 建立伺服器端啟動助手來配置參數
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //配置啟動資訊
        serverBootstrap
        		 //4.設定兩個線程組
        		.group(bossGroup, workerGroup)
        		//5.使用NioServerSocketChannel作為伺服器端通道的實作
                .channel(NioServerSocketChannel.class) 
                //6.設定線程隊列中等待連接配接的個數
                .option(ChannelOption.SO_BACKLOG, 128) 
                //7.保持活動連接配接狀态
                .childOption(ChannelOption.SO_KEEPALIVE, true) 
                //8. 建立一個通道初始化對象
                .childHandler(new ChannelInitializer<SocketChannel>() {  
                    public void initChannel(SocketChannel socketChannel) {
                     //9. 往PipelineChannel鍊中添加自定義的handler類
                     socketChannel.pipeline().addLast(new TestNettyServerHandler());
                   }
                });
        //異步啟動服務,并同步等待
        System.out.println("......Server is ready......");
        //10. 綁定端口 bind方法是異步的,sync方法是同步阻塞的,即等待連接配接成功代碼才繼續往下執行
        ChannelFuture future = serverBootstrap.bind(9999).sync();
        System.out.println("......Server is starting......");
        //11. 關閉通道,關閉線程組
        future.channel().closeFuture().sync(); //closeFuture異步 sync同步阻塞
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

2.伺服器業務處理類
public class TestNettyServerHandler extends ChannelInboundHandlerAdapter {
    //讀取事件
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("Client Say : " + byteBuf.toString(CharsetUtil.UTF_8));
    }
    //讀取完成事件
    @Override
   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello Client ...", CharsetUtil.UTF_8));
    }
    //異常事件
    @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3.用戶端
public class TestNettyClient {
    public static void main(String[] args) throws Exception {
        //1. 建立一個線程組
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //2. 建立用戶端的啟動助手,完成相關配置
        Bootstrap bootstrap = new Bootstrap();
        //3. 設定線程組
        bootstrap.group(workerGroup)
                //4. 設定用戶端通道的實作類
                .channel(NioSocketChannel.class)
                //5. 建立一個通道初始化對象
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                     //6.往Pipeline鍊中添加自定義的handler
                     socketChannel.pipeline().addLast(new TestNettyClientHandler());
                    }
                });
        System.out.println("Client is  ready......");
        //7.啟動用戶端去連接配接伺服器端  connect方法是異步的   sync方法是同步阻塞的
        ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
        //8.關閉連接配接(異步非阻塞)
        future.channel().closeFuture().sync();
    }
}

4.用戶端業務邏輯處理類
public class TestNettyClientHandler extends ChannelInboundHandlerAdapter {

    //用戶端連接配接伺服器完成事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Server ...", CharsetUtil.UTF_8));
    }

    //讀取事件
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("Server Say : " + byteBuf.toString(CharsetUtil.UTF_8));
    }
}
           

繼續閱讀