netty
1.netty介紹:
netty是由jboss提供的基于nio的網絡程式設計開源架構,它采用異步,事件驅動的方式用來快速開發一個高性能高可靠的網絡IO程式;elasticsearch,dubbox内部采用的就是netty
2.netty線程模型:
* 單線程模型:
單線程多路複用的方式來完成伺服器端包括建立用戶端連接配接,讀寫的所有操作,編碼簡單,但是無法滿足大量用戶端連接配接的需求,傳統的nio程式設計便是這種方式
* 線程池模型:
伺服器端使用單線程管理用戶端的連接配接,同時使用一個線程池來管理其他的網絡IO操作
* netty模型; NioEventLoopGroup
伺服器端使用bossGroup線程池管理用戶端連接配接請求,同時使用workerGroup管理其他的所有IO操作;bossGroup和workerGroup對應的執行個體是NioEventLoopGroup類,每個NioEventLoopGrou中對應有多個NioEventLoop,而每個NioEventLoop對應線程池中的單個線程,每個NioEventLoop都有自己的Selector并在Selector上注冊各種事件和Channel,以及taskQueue;
bossGroup對應多個Selector和ServerSocketChannel,workerGroup對應多個Selector和SocketChannel;ChannelPipeline是貫穿整個netty的一條處理鍊,所有的入站出站操作都需要挂載到ChannelPipeline上才能生效
3.netty異步模型:
netty異步模型基于future和callback,它的核心思想:當需要調用方法時,直接傳回一個future,使用future和callbackk來監控方法的處理過程
4.netty API:
* 業務邏輯處理類(workerGroup使用)
自定義一個Handler繼承ChannelInboundHandlerAdapter類,重寫通道入站處理器擴充卡中的預設空實作的方法
public void channelActive(ChannelHandlerContext ctx),通道就緒事件
public void channelRead(ChannelHandlerContext ctx, Object msg),通道讀取資料事件
public void channelReadComplete(ChannelHandlerContext ctx) ,資料讀取完畢事件
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道發生異常
事件
* Pipeline 和 ChannelPipeline
ChannelPipeline是一個Handler集合,是貫穿整個netty的一條處理鍊
* ChannelHandlerContext
這是事件處理器上下文對象,代表ChannelPipeline中每個Handler的處理節點
ctx.writeAndFlush(Unpooled.copiedBuffer("你好",CharsetUtil.UTF_8));
* ChannelOption 設定socket标準參數
* ChannelFuture 監控nettyI/O操作的結果
* ServerBootstrap 伺服器端啟動助手類,用于設定參數
* Bootstrap 用戶端啟動助手類
* Unpooled netty提供的用來操作緩沖區的工具類,netty的位元組緩沖區叫ByteBuf,不是ButeBuffer
ByteBuf byteBuf = Unpooled.copiedBuffer("你好",CharsetUtil.UTF_8);
5.netty坐标
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.8.Final</version>
</dependency>
6.編碼和解碼
* 方式一: 采用jdk自帶的序列化方式
缺點:無法跨語言,且序列化後體積是二進制的5倍
* 方式二: netty提供的編碼器解碼器,ObjectDecoder/ObjectEncoder
缺點:内部使用的是jdk序列化方式,同上
* 方式三:google提供的Protobuf序列化
跨語言,且高性能高可靠
7.Protobuf序列化使用步驟
第一步:導入依賴
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
第二步:編寫proto檔案
# syntax 設定版本号
# BookMessage 設定生成java檔案的類名
# Book 生成内部類的類名(實際需要使用的實體類)
# string name = 2 設定實體類屬性,并指定序号為2(不是實體類屬性)
syntax = "proto3";
option java_outer_classname = "BookMessage";
message Book{
int32 id = 1;
string name = 2;
}
第三步:使用protoc.exe工具軟體,cmd輸入指令,生成BookMessage.java檔案,并拷貝到項目中
protoc --java_out=. Book.proto
第四步:在netty中使用
Client: socketChannel.addLast("encoder",new ProtobufEncoder());
MessageBook book = MessageBook.Book.newBuilder().setId(1).setName("張三").build();
Server: socketChannel.addLast("decoder",new ProtobufDecoder(BookMessage.Book.getDefaultInstance()));
netty demo
案例: 實作用戶端向伺服器端對話
1.伺服器端
public class TestNettyServer {
public static void main(String[] args) throws Exception {
//1. 建立一個線程組:接收用戶端連接配接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2. 建立一個線程組:處理網絡操作
EventLoopGroup workerGroup = new NioEventLoopGroup();
//3. 建立伺服器端啟動助手來配置參數
ServerBootstrap serverBootstrap = new ServerBootstrap();
//配置啟動資訊
serverBootstrap
//4.設定兩個線程組
.group(bossGroup, workerGroup)
//5.使用NioServerSocketChannel作為伺服器端通道的實作
.channel(NioServerSocketChannel.class)
//6.設定線程隊列中等待連接配接的個數
.option(ChannelOption.SO_BACKLOG, 128)
//7.保持活動連接配接狀态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//8. 建立一個通道初始化對象
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel socketChannel) {
//9. 往PipelineChannel鍊中添加自定義的handler類
socketChannel.pipeline().addLast(new TestNettyServerHandler());
}
});
//異步啟動服務,并同步等待
System.out.println("......Server is ready......");
//10. 綁定端口 bind方法是異步的,sync方法是同步阻塞的,即等待連接配接成功代碼才繼續往下執行
ChannelFuture future = serverBootstrap.bind(9999).sync();
System.out.println("......Server is starting......");
//11. 關閉通道,關閉線程組
future.channel().closeFuture().sync(); //closeFuture異步 sync同步阻塞
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
2.伺服器業務處理類
public class TestNettyServerHandler extends ChannelInboundHandlerAdapter {
//讀取事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("Client Say : " + byteBuf.toString(CharsetUtil.UTF_8));
}
//讀取完成事件
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello Client ...", CharsetUtil.UTF_8));
}
//異常事件
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3.用戶端
public class TestNettyClient {
public static void main(String[] args) throws Exception {
//1. 建立一個線程組
EventLoopGroup workerGroup = new NioEventLoopGroup();
//2. 建立用戶端的啟動助手,完成相關配置
Bootstrap bootstrap = new Bootstrap();
//3. 設定線程組
bootstrap.group(workerGroup)
//4. 設定用戶端通道的實作類
.channel(NioSocketChannel.class)
//5. 建立一個通道初始化對象
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
//6.往Pipeline鍊中添加自定義的handler
socketChannel.pipeline().addLast(new TestNettyClientHandler());
}
});
System.out.println("Client is ready......");
//7.啟動用戶端去連接配接伺服器端 connect方法是異步的 sync方法是同步阻塞的
ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
//8.關閉連接配接(異步非阻塞)
future.channel().closeFuture().sync();
}
}
4.用戶端業務邏輯處理類
public class TestNettyClientHandler extends ChannelInboundHandlerAdapter {
//用戶端連接配接伺服器完成事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Server ...", CharsetUtil.UTF_8));
}
//讀取事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("Server Say : " + byteBuf.toString(CharsetUtil.UTF_8));
}
}