開始之前,還是先貼一段netty用戶端的經典使用姿勢,如下:
try {
// 通過無參構造函數,建立一個Bootstrap執行個體
Bootstrap b = new Bootstrap();
// 設定EventLoop線程組
b.group(new NioEventLoopGroup())
// 設定具體使用的Channel子類
.channel(NioSocketChannel.class)
// 設定tcp的參數
.option(ChannelOption.TCP_NODELAY, true)
// 設定資料處理的handler
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
// 建立與服務的的連接配接
ChannelFuture f = b.connect(HOST, PORT).sync();
// 等待channel關閉
f.channel().closeFuture().sync();
} finally {
// 關閉EventLoop線程組
group.shutdownGracefully();
}
從上述代碼可以看出,netty用戶端的使用很友善,由于用戶端需要配置的參數較多,是以Bootstrap提供了一個無參構造函數,而具體的參數配置,則通過build模式進行各自獨立配置。
group配置的就是用來執行I/O操作的線程池,每個線程以EventLoop的形式存在,配置代碼如下:
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return (B) this;
}
做了一些非null的校驗,如果傳入的group為null,以及如果已經被設定過,則都會抛出異常,校驗通過之後,則進行屬性指派。
對于channel的配置,會根據傳入的class,建構一個工廠類,然後指派給channelFactory這個屬性,指派過程與group指派過程類似。
而工廠類,則會根據需要,通過反射的機制,得到一個channel具體子類的執行個體,用于建立連接配接。
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
option就是配置tcp相關的參數,就是一系列的key-value對。
handler配置的就是用戶端想要對I/O過程中的資料的處理邏輯,就是一系列的ChannelHandler構成的清單。
Bootstrap在經過上述一系列配置後,各項準備工作已經就緒,接下來重點分析其connect過程。
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(new InetSocketAddress(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validate();
return doConnect(remoteAddress, localAddress());
}
connect的調用鍊如上,最終會通過調用doConnect方法進行connect操作,在doConnect之前,會調用validate方法進行校驗操作,主要是對group、channelFactory、handler這三個必須配置進行校驗,看是否做了配置,代碼如下:
public Bootstrap validate() {
super.validate();
if (handler() == null) {
throw new IllegalStateException("handler not set");
}
return this;
}
public B validate() {
if (group == null) {
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return (B) this;
}
在上述校驗通過後,繼續看connect真正發生的地方,也就是doConnect方法裡做了什麼。
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
/**
* 建立 NioSocketChannel , 并将該channel 注冊到一個 eventLoop 上去
* 傳回的 ChannelFuture 子類為 DefaultChannelPromise
* 這步的操作都是本機操作,還不涉及到網絡操作
*/
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise = channel.newPromise();
/**
* 根據注冊時,傳回的channelFuture的狀态來決定,進行connect操作
* 這裡都是完全異步的,如果注冊還沒有完成,則會監聽注冊的狀态,在注冊完成時,才會進行connect操作
*/
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
return promise;
}
這段代碼很短,是進行connect操作的入口,主要分為兩大步驟,register和connect操作。
1、register就是将建立的NioSocketChannel注冊到NioEventLoop上,最終就是将socket注冊到一個selector上,
2、connect則是建立與遠端伺服器的網絡連接配接,我的了解,就是與遠端主機完成了3次握手的操作。
從上述這段代碼可以看出,register和connect操作都是異步操作,如調用initAndRegister方法後,傳回的是一個regFutrure,這是一個異步調用的傳回結果,接着對其做了判斷,如果其中的異常不為null,說明register過程出錯,則直接就講regFutrure作為整個doConnect方法的傳回值,也就是結束了connect操作。判斷如下:
if (regFuture.cause() != null) {
return regFuture;
}
如果regFutrure沒有異常,則會再判斷是否已經done,如果done,則直接調用doConnect0方法進行connect操作,如果沒有完成,則在regFutrure上添加了一個ChannelFutureListener,主要邏輯就是在regFuture完成時,調用doConnect0方法,這裡需要注意下,在這之前,通過如下代碼:
建立了一個ChannelPromise的執行個體promise,并作為doConnect0的第五個入參,然後doConnect方法的傳回值也是這個promise。
從這個過程可以看出,register和connect過程都是異步操作,後面具體分析裡面的過程時,會更加明确這一點。
這裡簡單說明下ChannelFuture和ChannelPromise,這兩個都是異步執行的方法結果,其中ChannelPromise是ChannelFuture的子類,ChannelFuture隻能擷取結果,不能對異步傳回結果進行操作,而ChannelPromise對其進行了擴充,可以對異步傳回結果進行設定等操作。
接下裡,先來看register的過程,主要邏輯在initAndRegister方法中,代碼如下:
final ChannelFuture initAndRegister() {
// 1、通過channelFactory建立一個NioSocketChannel的執行個體
final Channel channel = channelFactory().newChannel();
try {
// 2、初始化channel
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}
// 3、對channel進行注冊
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
1、根據Bootstrap的配置,用channelFactory,利用反射建立一個NioSocketChannel的執行個體;
2、對建立好的NioSocketChannel的執行個體進行初始化操作,代碼如下:
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
初始化的邏輯很明确,
a、把配置的handler添加到channel的pipeline中,用于後續資料處理; b、把配置的tcp的參數添加到channel的配置中; c、把配置的屬性添加到channel的屬性中。
3、上述初始化完成之後,則進行channel的注冊過程,代碼如下:
ChannelFuture regFuture = group().register(channel);
其中group方法傳回的就是Bootstrap在初始時傳入的NioEventLoopGroup對象,其調用register方法,最終是調用了SingleThreadEventLoop的register方法,如下:
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
在register的入參中建立了一個DefaultChannelPromise執行個體,該執行個體也就是該register方法後面會傳回的ChannelFuture。
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
在調用了unsafte的register方法後,就會把這個promise傳回。接下來繼續看register裡做了什麼。
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 1
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
// 2
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
// 3
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 4
AbstractChannel.this.eventLoop = eventLoop;
// 5
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 6
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
1、判斷eventloop是否為null,如果為null,則抛出異常;
2、判斷該channel是否已經被注冊過,一個channel隻能被注冊一次,不能注冊到多個eventloop上,是以如果注冊,則抛出異常;
3、相容性判斷,看eventloop是否是一個NioEventLoop的執行個體;
protected boolean isCompatible(EventLoop loop) {
return loop instanceof NioEventLoop;
}
4、将eventloop指派給channel的eventloop屬性;
5、這裡就是register實作異步的地方,判斷目前執行現場是否就是eventloop的線程,如果是,則進行調用register0方法,如果不是,則将register0方法封裝成一個task,送出給eventloop來執行,注意register0的入參就是前面傳入的promise,是以可以根據這個promise來判斷register是否完成,以及是否成功等;
6、如果注冊出現異常,則會做一些收尾工作,如關閉channel,promise中設定fail标志等。
是以到這裡,使用者執行的register的動作已經完成了,但真正的register操作還沒有發生,被送出給eventloop去異步執行了,但是會傳回一個ChannelFuture的子類執行個體promise,可以用來檢測注冊的完成情況。
再繼續跟進到register0裡,看看做了什麼操作。
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// 1
doRegister();
// 2
registered = true;
// 3
safeSetSuccess(promise);
// 4
pipeline.fireChannelRegistered();
if (isActive()) {
// 5
pipeline.fireChannelActive();
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
1、doRegister方法,進行真正的注冊,實質就是調用了java nio中的channel的register方法,将其注冊到selector上去,這裡需要注意的是,注冊時,register的第二個參數是設定感興趣的操作,這裡設定的是0,說明沒有設定任何感興趣的操作,這裡隻是簡單的完成了注冊的動作,對于感興趣的操作的設定是在fireChannelActive中設定的,後續會分析到。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, , this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
2、注冊成功後,會将channel的registered标志設定為true;
3、在promise中設定注冊成功的辨別,這樣在前面doConnect方法中,監聽注冊情況的那個listener就可以開始進行doConnect0方法的執行了;
4、上述動作完成之後,就會調用pipeline的fireChannelRegistered方法,在pipeline的ChannelHandler鍊中,依次處理channel注冊完成的操作;
5、接着判斷該channel是否處于活躍狀态,也就是該channel中包含的ch是否處于open和connect的狀态,一般情況下,第3步結束後才觸發了doConnect0操作,是以一般這裡的判斷都是false,也就是不會觸發fireChannelActive操作,但也不是絕對。
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
上述就是注冊的過程,在注冊完成之後,就會觸發connect操作,來繼續看下doConnect0方法中做了什麼。
private static void doConnect0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
if (localAddress == null) {
channel.connect(remoteAddress, promise);
} else {
channel.connect(remoteAddress, localAddress, promise);
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
這裡做的更直白,直接封裝了一個task,然後送出給eventloop去異步執行,然後就傳回了。
前面我們分析過,在register完成之後,才會觸發doConnect0操作,當這裡封裝的任務開始執行時,首先就是對regFuture的狀态進行判斷,看是成功還是失敗,成功了,才會繼續執行connect操作,調用channel的connect操作,就是調用的該channel的pipeline的connect操作,對于connect是一個使用者發起的動作,是以是一個outbound的操作,outbound操作都是從tail開始,傳遞到head,是以真正的操作是在head的connect方法中。
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
而head的connect方法中,則是直接調用了unsafe的connect方法,繼續向下看。
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
unsafe的connect的方法如下:
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// 1、設定channel不能取消标志,并判斷channel是否處于open狀态
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
// 2、如果connectPromise不為null,則說明已經做過連接配接,則這裡就要抛出異常了
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}
// 3、判斷目前channel是否活躍,也就是要open和connect都完成,才是true,才開始連接配接,是以wasActive應該是false
boolean wasActive = isActive();
// 4、進行連接配接操作,下面會詳細分析裡面内容
if (doConnect(remoteAddress, localAddress)) {
// 5、連接配接成功,則會觸發channelActive事件,以及将selectionKey的監聽事件中加入read事件
fulfillConnectPromise(promise, wasActive);
} else {
// 6、如果目前沒有連接配接成功
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > ) {
/**
* 如果逾時時間大于0, 則設定一個定時任務, 在逾時時間時, 檢查連接配接是否成功,
* 如果還沒有連接配接上, 則會抛出連接配接逾時異常, 這是netty自己做的一個逾時檢查任務
*/
connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
promise.tryFailure(t);
closeIfClosed();
}
}
4、進行連接配接操作的具體邏輯如下,如果連接配接成功了,則傳回true,由于channel是非阻塞的,是以暫時沒有連接配接成功的,會注冊OP_CONNECT事件,等待其連接配接成功。
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
javaChannel().socket().bind(localAddress);
}
boolean success = false;
try {
boolean connected = javaChannel().connect(remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
5、如果連接配接當場就成功了,則會fullFillConnectPromise操作,如下:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
return;
}
// 1、嘗試設定連接配接成功的辨別,讓使用者調用的地方,知道連接配接成功了
boolean promiseSet = promise.trySuccess();
// 2、如果在連接配接之前不處于活躍狀态,而現在處于活躍狀态了,則觸發fireChannelActive操作
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
// 3、如果第2步設定失敗,則會關閉channel
if (!promiseSet) {
close(voidPromise());
}
}
6、在目前沒有連接配接成功的情況下,netty對這種異步連接配接的操作,做了一個連接配接逾時的檢測,就是設定了一個定時任務,用來判斷連接配接是否成功。
if (connectTimeoutMillis > ) {
/**
* 如果逾時時間大于0, 則設定一個定時任務, 在逾時時間時, 檢查連接配接是否成功,
* 如果還沒有連接配接上, 則會抛出連接配接逾時異常, 這是netty自己做的一個逾時檢查任務
*/
connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
在設定netty的用戶端時,有一個參數 CONNECT_TIMEOUT_MILLIS 就是用來設定這裡的逾時時間的,如果設定過小,而網絡較差時,則有可能會出現 ConnectTimeoutException 的異常,可以根據網絡環境,适當設定該參數的大小。
在channel被注冊的selector上去時,并沒有設定OP_READ事件,那在哪裡設定的哪?
在第5步中,講到了fireChannelActive這個操作,來繼續看看裡面做了什麼。
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
可以看到,先會調用pipeline上的所有ChannelHandler的channelActive方法。
然後會判斷是否自動讀,該配置預設是true,是以會進入channel的read方法,繼續看下去。
public Channel read() {
pipeline.read();
return this;
}
直接調用了pipeline的read方法,由于是一個outbound事件,是以最終是調用head的read方法,進入看看。
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
public void beginRead() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}
public void beginRead() {
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
經過一層層的調用,最終是調用了doBeginRead方法,實作如下:
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == ) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
這裡找到了設定感興趣事件的地方。
selectionKey.interestOps(interestOps | readInterestOp);
至此,netty用戶端建立與服務端的連接配接操作就結束了。
總結如下: