dubbo 做為 RPC 架構,需要進行跨 JVM 通信,要保證高性、穩定的進行遠端通信。dubbo 底層通信選擇了 netty 這個 nio 架構做為預設的網絡通信架構并且通過自定義協定進行通信。dubbo 支援以下網絡通信架構:
- Netty(預設)
- Mina
- Grizzly
1、 netty 簡介
在網絡編碼領悟,Netty 是 Java 的卓越架構。它封裝了 Java NIO 操作的複雜性,使得開發者能夠使用其提供的簡易 API 就能夠開發出高效的網絡程式。
首先我們來看一下 netty 裡面的關鍵特性:
很多公司包含:Apple、Twitter、Facebook、Google 都在使用 Netty,并且很多流行的開源項目也在使用 Netty如:Vert.x 、Apache Cassandra 和 Elasticsearch,它們所有的核心代碼都是利用了 Netty 強大的網絡抽象。
2、netty 的核心元件
在 netty 中主要包括以下主要的核心構件塊:
- Channel
- EventLoop
- ChannelHandler
- ChannelFuture
2.1 Channel
Channel 是 Java NIO 裡面的一個基本構造。可以把它看做是傳入或傳出資料的載體,然後通過 NIO 裡面的 Buffer 來進行資料傳遞。
而在 Netty 中自己也定義了一個 Channel,它的主要作用是:
連接配接到網絡套接字或能夠進行輸入輸出的元件的連接配接操作,如讀、寫、連接配接和綁定。
通道提供給使用者以下功能:
- 通道的目前狀态(例如,open?connected?)
- 通道的 ChannelConfig 配置參數(例如,接收緩沖區大小)
- 通道支援的輸入/輸出操作(如讀、寫、連接配接和綁定)
- 通道處理與通道相關聯的所有輸入/輸出事件和請求
具體的方法可以參看 Netty 的 javadoc 中的 Channel
2.2 EventLoop
EventLoop 是 Netty 的核心概念, netty 裡面的網絡程式設計處理都是基于事件回調。下面的圖說明了 Channel、EventLoop、Thread 以及 EventLoopGroup 之間的關系
它們的關系是:
- 一個 EventLoopGroup 包含一個或者多個 EventLoop;
- 一個 EventLoop 在它的生命周期内隻和一個 Thread 綁定;
- 所有的 EventLoop 處理的 I/O 事件都将在它專門的 Thread 被處理;
- 一個 Channel 在它的生命周期内隻注冊于一個 EventLoop;
- 一個 EventLoop 可能會被配置設定一個或者多個 Channel;
netty 這樣的設計,一個操作 I/O 的 Channel 都會由相同的 Thread 執行的,這樣就能夠消除線程之間的同步。是以 netty 内部是通過回調來處理事件。當一個回調被觸發時相關的事件。
2.3 ChannelHandler
ChannelHandler 在 Netty 裡面的地位,大家可以了解成 Bean 在 Spring 裡面的位址。Spring 把需要操作的 POJO 抽象成 Bean,然後對于對象的管理都是可以通過 spring bean 的生命周期來進行管理。同樣的,我們在操作 Netty 的時候,其實就是操作的就是 ChannelHandler。
在上一個小節闡述了 EventLoop 在一個線程裡面管理 I/O 操作 Channel 的生命周期。而 netty 的内部是通過回調來處理相關的事件。了解 ChannelHandler 其實可以了解成 Java Servlet 裡面的 Filter.
而 Netty 在處理 Channel 的時候也是類似的。隻不過它差別得更加明确, 入站的時候使用是 ChannelHandler 的子接口 ChannelInboundHandler,而出站使用的 ChannelHandler 的子接口 ChannelOutboundHandler,然後 ChannelPipeline 提供了 ChannelHandler 鍊的容器。
具體使用 Netty 的時候可以注冊哪些回調事件,大家可以參看以下接口對應的 API:
- ChannelHandler:ChannelHandler 的注冊與删除
- ChannelInboundHandler:ChannelHandler 的入站操作
- ChannelOutboundHandler:ChannelHandler 的出站操作
2.4 ChannelFuture
因為 Netty 裡面的操作都是異步的,是以一個操作可能不會立即傳回。這樣我們就需要一種用于在之後的某個時間點确定其結果的方法。是以, Netty 提供了 ChannelFuture 接口。它提供了 addListener() 方法來注冊一個 ChannelFutureListener,以便在某個操作完成時(無論是否成功) 得到通知。
可以将 ChannelFuture 看做是将來要執行的操作的結果的占位符。它究竟什麼時候被執行則可能取決于若幹因素,是以不可能準确的預測,但是可以肯定的是它将會被執行。并且,所有屬于同一個 Channel 的操作都被保證其将以它們被調用的順序被執行
3、Netty Demo
在上面的章節介紹了 Netty 裡面的核心概念,下面我們就來編寫一個 Hello World 來加深一下對上面概念的了解。
3.1 Netty Server
編寫 Server 端業務處理類,因為需要響應用戶端傳入的資訊,是以需要實作 ChannelInboundHandler 接口。作為一個簡單的 hello world 程式,我們可以繼承 ChannelInboundHandlerAdapter 它提供了 ChannelInboundHandler 的預設實作,我們隻需要重寫需要關注的方法就行了。
EchoServerHandler
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 将接收到的消息寫給發送者,而不沖刷出站消息
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received : " + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将未決消息沖刷到遠端節點,并且關閉該 Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 列印異常堆棧
cause.printStackTrace();
// 關閉該 Channel
ctx.close();
}
下面就需要建立引導伺服器,綁定伺服器并且監聽并且接收傳入連接配接請求的端口,配置 Channel ,用來将傳入的入站消息通知給 EchoServerHandler 執行個體。
EchoServer
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
new EchoServer(8080).start();
}
public void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(serverHandler);
}
});
ChannelFuture future = bootstrap.bind().sync();
future.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully().sync();
worker.shutdownGracefully().sync();
}
}
}
上面的服務主要做了以下幾件事:
- main() 方法引導了伺服器,引導過程中所需要以下步驟
- 建立一個 ServerBootstrap 的執行個體用于引導和綁定服務
- 建立兩個 EventLoopGroup 執行個體,boss 用于接收請求,而 worker 用于真正的請求處理
- 指定伺服器綁定到本地的 InetSocketAddress
- 使用一個 EventLoopGroup 的執行個體初始化每一個新的 Channel
- 調用 ServerBootstrap.bind() 方法綁定伺服器
這個時候伺服器已經初始化好了,可以 接收用戶端發送過來的請求了。
3.2 Netty Client
首先編碼用戶端處理類,用戶端處理類需要發送一個或者多個消息到伺服器。并且在發送消息之後接收伺服器發回的響應,最後關閉連接配接。
EchoClientHandler
@ChannelHandler.Sharable
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 當被通知 Channel 活躍的時候,發送消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock ! ", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 列印服務端響應的資訊
ByteBuf in = (ByteBuf) msg;
System.out.println("Client received : " + in.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 列印異常,并關閉 Channel
cause.printStackTrace();
ctx.close();
}
}
下面我們就需要編寫用戶端引導類,用于連接配接服務端并且與服務端進行通信。
EchoClient
public class EchoClient {
private final String host;
private final int prot;
public EchoClient(String host, int prot) {
this.host = host;
this.prot = prot;
}
public static void main(String[] args) throws Exception {
new EchoClient("localhost", 8080).start();
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, prot))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = bootstrap.connect().sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
以上代碼做的事如下:
- 建立一個 Bootstrap 執行個體以初始化用戶端
- 建立一個 EventLoopGroup 執行個體處理建立新的連接配接以及處理入站和出站資料
- 為連接配接伺服器建立一個 InetSocketAddress 執行個體
- 當連接配接被建立時,一個 EchoClientHandler 執行個體會被安裝到 ChannelPipeline 中
- 一切設定完成後,調用 Bootstrap.connect() 方法連接配接到遠端節點
下面我們就可以測試程式的正确性了。
3.3 Test
運作服務端引導類 EchoServer,然後運作用戶端引導類 EchoClient。此時在服務端的控制台列印以下結果:
接着在用戶端的控制台列印以下結果: