一、背景
Netty 是一款優秀的高性能網絡架構,内部通過 NIO 的方式來處理網絡請求,在高負載下也能可靠和高效地處理 I/O 操作
作為較底層的網絡通信架構,其被廣泛應用在各種中間件的開發中,比如 RPC架構、MQ、Elasticsearch等,這些中間件架構的底層網絡通信子產品大都利用到了 Netty 強大的網絡抽象
下面這篇文章将主要對 Netty 中的各個元件進行分析,并在介紹完了各個元件之後,通過 JSF 這個 RPC 架構為例來分析 Netty 的使用,希望讓大家對 Netty 能有一個清晰的了解
二、Netty Server
通過 Netty 來建構一個簡易服務端是比較簡單的,代碼如下:
public class NettyServer {
public static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ChannelFuture channelFuture = serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("Handler Added");
}
})
.childHandler(new ServerChannelInitializer())
.bind(8100);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
LOGGER.info("Netty Server Start !");
}
}
});
try {
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
上面代碼的主要邏輯如下:
- 建立服務端引導啟動類 ServerBootstrap,内部封裝了各個元件,用來進行服務端的啟動
- 建立了兩個 EventLoopGroup 用來進行連接配接處理,此時可以簡單的将 EventLoopGroup 了解為多個線程的集合。bossGroup 中的線程用來處理新連接配接的建立,當新連接配接建立後,workerGroup 中的每個線程則都會和唯一的用戶端 Channel 連接配接進行綁定,用來處理該 Channel 上的讀、寫事件
- 指定服務端建立的 Channel 類型為 NioServerSocketChannel
- childOption 用來配置用戶端連接配接的 NioSocketChannel 底層網絡參數
- handler 用來指定針對服務端 Channel 的處理器,内部定義了一系列的回調方法,會在服務端 Channel 發生指定事件時進行回調
- childHandler 用來指定用戶端 Channel 的處理器,當用戶端 Channel 中發生指定事件時,會進行回調
- bind 指定服務端監聽端口号
三、Netty Client
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 1. 啟動類
ChannelFuture channelFuture = new Bootstrap()
// 2. 添加 EventLoop
.group(workGroup)
// 3. 選擇用戶端 channel 實作
.channel(NioSocketChannel.class)
// 4. 添加處理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在連接配接建立後被調用
protected void initChannel(NioSocketChannel ch) throws Exception {
ZAS ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new StringEncoder());
}
})
// 5. 連接配接到伺服器
.connect(new InetSocketAddress("localhost", 8100));
channelFuture.addListener(future -> {
if (future.isSuccess()) {
((ChannelFuture) future).channel().writeAndFlush("hello");
}
});
channelFuture.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}
}
上面代碼的主要邏輯如下:
- 建立 Bootstrap 用來進行用戶端啟動
- group() 指定一個 NioEventLoopGroup 執行個體,用來處理用戶端連接配接的建立和後續事件處理
- handler() 指定 Channel 處理器,
- 當将用戶端啟動類中的各個屬性都設定完畢後,調用 connect() 方法進行服務端連接配接
從上面的的兩個例子可以看出,如果想通過 Netty 實作一個簡易的伺服器其實是非常簡單的,隻需要在啟動引導類中設定好對應屬性,然後完成端口綁定就可以實作。但也正是因為這種簡易的實作方式,導緻很多人在學習 Netty 的過程中,發現代碼是寫的出來,但是對内部的元件有什麼作用以及為什麼這麼寫可能就不是很清楚了,是以希望通過這一系列文章來加深大家對 Netty 的了解
四、Netty 基本元件
Channel
Netty 中的 Channel 可以看成網絡程式設計中的 Socket,其提供了一系列 IO 操作的 API,比如 read、write、bind、connect 等,大大降低了直接使用 Socket 類的複雜性
整體類繼承關系如下:
從上面的繼承關系可以看出,NioSocketChannel 和 NioServerSocketChannel 分别對應用戶端和服務端的 Channel,兩者的直接父類不一緻,是以對外提供的功能也是不相同的。比如當發生 read 事件時,NioServerSocketChannel 的主要邏輯就是建立新的連接配接,而 NioSocketChannel 則是讀取傳輸的位元組進行業務處理
下面就以 NioServerSocketChannel 為例,帶大家了解下該類的初始化過程,整體流程如下:
- 啟動引導類中通過 channel() 指定底層建立的 Channel 類型
- 根據指定的 Channel 類型建立出 ChannelFactory,後續通過該工廠類進行 Channel 的執行個體化
- 執行個體化 Channel
channel() 指定 ChannelFactory 類型
在上面的服務端啟動過程中,ServerBootstrap 調用 channel() 方法并傳入 NioServerSocketChannel,其底層代碼邏輯為:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
// ReflectiveChannelFactory 構造方法
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
整體邏輯很簡單,通過傳入的 Class 對象指定一個 Channel 反射工廠,後續調用工廠方法擷取指定類型的 Channel 對象
channel 執行個體化
當服務端啟動引導類 ServerBootstrap 調用 bind() 方法之後,内部會走到 Channel 的執行個體化過程,代碼精簡如下:
// channel 初始化流程,内部通過 channelFactory 構造
final ChannelFuture initAndRegister() {
channel = channelFactory.newChannel();
}
// channelFactory 的 newChannel 方法邏輯
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
ChannelFactory 的整體邏輯就是通過反射的方式建立 Channel 對象,而 Channel 對象的類型則是在啟動引導類中通過 channel() 方法進行指定的
在執行個體化 Channel 的過程中,會對其内部的一些屬性進行初始化,而對這些屬性的了解,可以使我們對 Netty 中各個元件的作用範圍有一個更加清晰的了解,下面看下 NioServerSocketChannel 的構造函數源碼
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
上述源碼就是一層一層的父類構造,可以對照前面的類關系圖進行閱讀
NioServerSocketChannel 執行個體化過程中主要完成了以下内部屬性的初始化:
- unsafe 屬性進行指派為 NioMessageUnsafe,後續 Channel 上事件處理的主要邏輯都是由該類完成
- pipeline 屬性進行初始化指派,pipeline 是 Channel 中特别重要的一個屬性,後續的所有業務處理器都是通過該 pipeline 組織的
- 指定目前 Channel 的 readInterestOp 屬性為 SelectionKey.OP_ACCEPT,用于後續綁定到 Selector 時指定目前 Channel 監聽的事件類型
- 指定目前 Channel 非阻塞,ch.configureBlocking(false)
總結
對于 Channel 的執行個體化流程可以總結如下:
- 啟動引導類中通過 channel() 方法指定生成的 ChannelFactory 類型
- 通過 ChannelFactory 來構造對應 Channel,并在執行個體化的過程中初始化了一些重要屬性,比如 pipeline
ChannelPipeline
ChannelPipeline 也是 Netty 中的一個比較重要的元件,從上面的 Channel 執行個體化過程可以看出,每一個 Channel 執行個體中都會包含一個對應的 ChannelPipeline 屬性
ChannelPipeline 初始化
ChannelPipeline 底層初始化源碼:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
從 ChannelPipeline 的構造函數可以看出,每一個 ChannelPipeline 底層都是一個雙向連結清單結構,預設會包含 head 和 tail 頭尾節點,用來進行一些預設的邏輯處理,處理細節會在後續文章中展現
addLast()
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 回調 ChannelHandler 中的 handlerAdded() 方法
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
addLast() 方法是向 ChannelPipeline 中添加 ChannelHandler 用來進行業務處理
整個方法的邏輯為:
- 判斷目前 ChannelHandler 是否已經添加
- 将目前 ChannelHandler 包裝成 ChannelHandlerContext,并将其添加到 ChannelPipeline 的雙向連結清單中
- 回調添加的 ChannelHandler 中的 handlerAdded() 方法
Channel、ChannelPipeline、ChannelHandler 關系
Channel、ChannelPipeline和 ChannelHandler 三者的關系如圖所示:
- 每一個 Channel 中都會包含一個 ChannelPipeline 屬性
- ChannelPipeline 是一個雙向連結清單結構,預設會包含 HeadContext 和 TailContext 兩個節點
- 當向 ChannelPipeline 中添加 ChannelHandler 時,會包裝成 ChannelContext 插入到 ChannelPipeline 連結清單中
- 當 Channel 中發生指定事件時,該事件就會在 ChannelPipeline 中沿着雙向連結清單進行傳播,調用各個 ChannelHandler 中的指定方法,完成相應的業務處理
Netty 正是通過 ChannelPipeline 這一結構為使用者提供了自定義業務邏輯的擴充點,使用者隻需要向 ChannelPipeline 中添加處理對應業務邏輯的 ChannelHandler,之後當指定事件發生時,該 ChannelHandler 中的對應方法就會進行回調,實作業務的處理
ChannelHandler
ChannelHandler 是 Netty 中業務處理的核心類,當有 IO 事件發生時,該事件會在 ChannelPipeline 中進行傳播,并依次調用到 ChannelHandler 中的指定方法
ChannelHandler 的類繼承關系如下:
從上面的類繼承關系可以看出,ChannelHandler 大緻可以分為 ChannelInboundHandler 和 ChannelOutboundHandler,分别用來處理讀、寫事件
ChannInboundHandler
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
在 ChannelInboundHandler 中定義了一系列的回調方法,使用者可以實作該接口并重寫相應的方法來自定義的業務邏輯。
重寫方法邏輯是簡單的,但很多人其實不清楚的是這些回調方法到底在什麼場景下會被調用,如何調用,隻有了解了這些回調方法的調用時機,才能在更适宜的地方完成相應功能
channelRegistered
channelRegistered() 從方法名了解是當 Channel 完成注冊之後會被調用,那麼何為 Channel 注冊?
下面就以 Netty 服務端啟動過程中的部分源碼為例(詳細源碼分析會在後續文章中),看下 channelRegistered() 的調用時機
在 Netty 服務端啟動時,會調用到 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法,精簡代碼如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// neverRegistered 初始值為 true
boolean firstRegistration = neverRegistered;
// 将 Channel 綁定到對應 eventLoop 中的 Selector 上
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 調用 ChannelHandler 中的 ChannelRegistered()
pipeline.fireChannelRegistered();
}
}
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
從 Netty 底層的 register() 方法可以看出,ChannelHandler 中的 ChannelRegistered() 調用時機是在調用 pipeline.fireChannelRegistered() 時觸發的,此時已經完成的邏輯為:
- 通過傳入的 EventLoopGroup 得到了該 Channel 對應的 EventLoop,并與Channel 中的對應屬性完成了綁定;AbstractChannel.this.eventLoop = eventLoop 邏輯
- 目前 Channel 已經綁定到了對應 EventLoop 中的 Selector 上;doRegister() 邏輯
- ChannelHandler 中的 handlerAdded() 方法已經完成了回調;pipeline.invokeHandlerAddedIfNeeded() 邏輯
是以當 Channel 和對應的 Selector 完成了綁定,Channel 中 pipeline 上綁定的 ChannelHandler 的channelRegisted() 方法就會進行回調
channelActive
上面已經分析了channelRegistered() 方法的調用時機,也就是當 Channel 綁定到了對應 Selector 上之後就會進行回調,下面開始分析 channelActive() 方法的調用時機
對于服務端 Channel,前面還隻是将 Channel 注冊到了 Selector 上,還沒有調用到 bind() 方法完成真正的底層端口綁定,那麼有沒有可能當服務端 Channel 完成端口監聽之後,就會調用到 channelActive() 方法呢?
下面繼續分析,在上面完成了 Channel 和 Selector 的注冊之後,Netty 服務端啟動過程中會繼續調用到 io.netty.channel.AbstractChannel.AbstractUnsafe#bind 邏輯:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
在該方法中完成了以下邏輯:
- 完成了 Channel 和本地端口的綁定
- 綁定成功後,isActive() 方法傳回 true,此時釋出 ChannelActive 事件,進行方法回調
- safeSetSuccess() 中會回調到服務端啟動過程中添加的 listener 方法,表明目前 Channel 完成了端口綁定
總結:
當 Channel 調用了 bind() 方法完成端口綁定之後,channelActive() 方法會進行回調
channelRead
該方法的調用時機,服務端和用戶端是不一緻的
服務端 channelRead
服務端 Channel 綁定到 Selector 上時監聽的是 Accept 事件,當用戶端有新連接配接接入時,會回調 channelRead() 方法,完成新連接配接的接入
Netty 在服務端啟動過程中,會預設添加一個 ChannelHandler io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor 來處理新連接配接的接入
用戶端 channelRead
當服務端處理完 Accept 事件後,會生成一個和用戶端通信的 Channel,該 Channel 也會注冊到對應的 Selector 上,并監聽 read 事件
當用戶端向該 Channel 中發送資料時就會觸發 read 事件,調用到 channelRead() 方法(Netty 内部的源碼處理會在後續的文章中進行分析)
exceptionCaught
目前 ChannelHandler 中各回調方法處理過程中如果發生了異常就會回調該方法