類似于 java.nio包 的 Channel,Netty 提供了自己的 Channel 和其子類實作,用于異步 I/O操作 等。Unsafe 是 Channel 的内部接口,聚合在 Channel 中協助進行網絡讀寫相關的操作,因為它的設計初衷就是 Channel 的内部輔助類,不應該被 Netty架構 的上層使用者調用,是以被命名為 Unsafe。
Channel 元件
Netty 的 Channel元件 是 Netty 對網絡操作的封裝,如 網絡資料的讀寫,與用戶端建立連接配接,主動關閉連接配接 等,也包含了 Netty架構 相關的一些功能,如 擷取該 Chanel 的 EventLoop、ChannelPipeline 等。另外,Netty 并沒有直接使用 java.nio包 的 SocketChannel和ServerSocketChannel,而是使用 NioSocketChannel和NioServerSocketChannel 對其進行了進一步的封裝。下面我們先從 Channel接口 的API開始分析,然後看一下其重要子類的源碼實作。
為了便于後面的閱讀源碼,我們先看下 NioSocketChannel 和 NioServerSocketChannel 的繼承關系類圖。

Channel 接口
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
/**
* Channel 需要注冊到 EventLoop 的多路複用器上,用于處理 I/O事件,
* EventLoop 實際上就是處理網絡讀寫事件的 Reactor線程。
*/
EventLoop eventLoop();
/**
* ChannelMetadata 封裝了 TCP參數配置
*/
ChannelMetadata metadata();
/**
* 對于服務端Channel而言,它的父Channel為空;
* 對于用戶端Channel,它的 父Channel 就是建立它的 ServerSocketChannel
*/
Channel parent();
/**
* 每個 Channel 都有一個全局唯一辨別
*/
ChannelId id();
/**
* 擷取目前 Channel 的配置資訊,如 CONNECT_TIMEOUT_MILLIS
*/
ChannelConfig config();
/**
* 目前 Channel 是否已經打開
*/
boolean isOpen();
/**
* 目前 Channel 是否已注冊進 EventLoop
*/
boolean isRegistered();
/**
* 目前 Channel 是否已激活
*/
boolean isActive();
/**
* 目前 Channel 的本地綁定位址
*/
SocketAddress localAddress();
/**
* 目前 Channel 的遠端綁定位址
*/
SocketAddress remoteAddress();
/**
* 目前 Channel 是否可寫
*/
boolean isWritable();
/**
* 目前 Channel 内部的 Unsafe對象
*/
Unsafe unsafe();
/**
* 目前 Channel 持有的 ChannelPipeline
*/
ChannelPipeline pipeline();
/**
* 從目前 Channel 中讀取資料到第一個 inbound緩沖區 中,如果資料被成功讀取,
* 觸發ChannelHandler.channelRead(ChannelHandlerContext,Object)事件。
* 讀取操作API調用完成之後,緊接着會觸發ChannelHandler.channelReadComplete(ChannelHandlerContext)事件,
* 這樣業務的ChannelHandler可以決定是否需要繼續讀取資料。如果己經有讀操作請求被挂起,則後續的讀操作會被忽略。
*/
@Override
Channel read();
/**
* 将之前寫入到發送環形數組中的消息全部寫入到目标Chanel中,發送給通信對方
*/
@Override
Channel flush();
}
AbstractChannel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
// 父Channel
private final Channel parent;
// Channel的全局唯一辨別
private final ChannelId id;
// 内部輔助類 Unsafe
private final Unsafe unsafe;
// Netty 會為每一個 channel 建立一個 pipeline
private final DefaultChannelPipeline pipeline;
// 本地位址
private volatile SocketAddress localAddress;
// 遠端主機位址
private volatile SocketAddress remoteAddress;
// 注冊到了哪個 EventLoop 上
private volatile EventLoop eventLoop;
// 是否已注冊
private volatile boolean registered;
/**
* channnel 會将 網絡IO操作 觸發到 ChannelPipeline 對應的事件方法。
* Netty 基于事件驅動,我們也可以了解為當 Chnanel 進行 IO操作 時會産生對應的IO 事件,
* 然後驅動事件在 ChannelPipeline 中傳播,由對應的 ChannelHandler 對事件進行攔截和處理,
* 不關心的事件可以直接忽略
*/
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, localAddress, promise);
}
@Override
public ChannelFuture disconnect() {
return pipeline.disconnect();
}
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
return pipeline.disconnect(promise);
}
@Override
public ChannelFuture close() {
return pipeline.close();
}
@Override
public ChannelFuture close(ChannelPromise promise) {
return pipeline.close(promise);
}
@Override
public ChannelFuture deregister() {
return pipeline.deregister();
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
return pipeline.deregister(promise);
}
@Override
public Channel flush() {
pipeline.flush();
return this;
}
@Override
public Channel read() {
pipeline.read();
return this;
}
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
return pipeline.write(msg, promise);
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return pipeline.writeAndFlush(msg, promise);
}
}
AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel {
// AbstractNioChannel 是 NioSocketChannel和NioServerSocketChannel 的公共父類,是以定義
// 了一個 java.nio 的 SocketChannel 和 ServerSocketChannel 的公共父類 SelectableChannel,
// 用于設定 SelectableChannel參數 和進行 IO操作
private final SelectableChannel ch;
// 它代表了 JDK 的 SelectionKey.OP_READ
protected final int readInterestOp;
// 該 SelectionKey 是 Channel 注冊到 EventLoop 後傳回的,
// 由于 Channel 會面臨多個業務線程的并發寫操作,當 SelectionKey 被修改了,
// 需要讓其他業務線程感覺到變化,是以使用volatile保證修改的可見性
volatile SelectionKey selectionKey;
/**
* Channel 的注冊
*/
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
protected SelectableChannel javaChannel() {
return ch;
}
@Override
protected void doBeginRead() throws Exception {
// Channel.read() 或 ChannelHandlerContext.read() 被調用
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}
NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
// java.nio 包的内容,用于擷取 java.nio.channels.ServerSocketChannel 執行個體
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* 擷取的是 java.nio.channels.ServerSocketChannel 執行個體
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a server socket.", e);
}
}
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* 在父類中完成了 非阻塞IO的配置,及事件的注冊
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
/**
* 對 NioServerSocketChannel 來說,它的讀取操作就是接收用戶端的連接配接,建立 NioSocketChannel對象
*/
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 首先通過 ServerSocketChannel 的 accept()方法 接收新的用戶端連接配接,
// 擷取 java.nio.channels.SocketChannel 對象
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
// 如果擷取到用戶端連接配接對象 SocketChannel,則利用目前的 NioServerSocketChannel、EventLoop
// 和 SocketChannel 建立新的 NioSocketChannel,并添加到 buf 中
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
}
NioSocketChannel
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
// 與 NioServerSocketChannel 一樣,也依賴了 java.nio包 的API
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
/**
* 從這裡可以看出,NioSocketChannel 對 java.nio.channels.SocketChannel 做了進一步封裝
* 使其 适用于 Netty架構
*/
private static SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
/**
* Create a new instance
*/
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
// 在父類中完成 非阻塞IO的配置,注冊事件
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
@Override
protected SocketChannel javaChannel() {
return (SocketChannel) super.javaChannel();
}
@Override
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
/**
* 與遠端伺服器建立連接配接
*/
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
// 根據遠端位址建立TCP連接配接,對連接配接結果進行判斷
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
/**
* 關閉 Channel
*/
@Override
protected void doClose() throws Exception {
super.doClose();
javaChannel().close();
}
/**
* 從 Channel 中讀取資料
*/
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
/**
* 向 Channel 中寫資料
*/
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
// Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
}
Unsafe 功能簡介
Unsafe接口 實際上是 Channel接口 的輔助接口,它不應該被使用者代碼直接調用。實際的 IO讀寫操作 都是由 Unsafe接口 負責完成的。
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
interface Unsafe {
/**
* 傳回綁定的 本地位址
*/
SocketAddress localAddress();
/**
* 傳回綁定的 遠端位址
*/
SocketAddress remoteAddress();
/**
* 将 Channel 注冊到 EventLoop 上
*/
void register(EventLoop eventLoop, ChannelPromise promise);
/**
* 綁定 本地位址 到 Channel 上
*/
void bind(SocketAddress localAddress, ChannelPromise promise);
/**
* 連接配接到遠端伺服器
*/
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
/**
* 斷開連接配接
*/
void disconnect(ChannelPromise promise);
/**
* 關閉 Channel
*/
void close(ChannelPromise promise);
/**
* 讀就緒 網絡事件
*/
void beginRead();
/**
* 發送資料
*/
void write(Object msg, ChannelPromise promise);
/**
* 将緩沖區的資料 刷到 Channel
*/
void flush();
}
}
AbstractUnsafe
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected abstract class AbstractUnsafe implements Unsafe {
/**
* 将目前 Unsafe 對應的 Channel 注冊到 EventLoop 的多路複用器上,
* 然後調用 DefaultChannelPipeline 的 fireChannelRegistered()方法,
* 如果 Channel 被激活 則調用 DefaultChannelPipeline 的 fireChannelActive()方法
*/
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
/**
* 綁定指定的端口,對于服務端 用于綁定監聽端口,
* 對于用戶端,主要用于指定 用戶端Channel 的本地綁定Socket位址。
*/
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
/**
* 用戶端 或 服務端,主動關閉連接配接
*/
@Override
public final void disconnect(final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable()) {
return;
}
boolean wasActive = isActive();
try {
doDisconnect();
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
safeSetSuccess(promise);
closeIfClosed(); // doDisconnect() might have closed the channel
}
/**
* 在鍊路關閉之前需要首先判斷是否處于重新整理狀态,如果處于重新整理狀态說明還有消息尚
* 未發送出去,需要等到所有消息發送完成再關閉鍊路,是以,将關閉操作封裝成Runnable稍後再執行
*/
@Override
public final void close(final ChannelPromise promise) {
assertEventLoop();
close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}
/**
* 本方法實際上将消息添加到環形發送數組中,并不是真正的寫Channel
*/
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, newWriteException(initialCloseCause));
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
/**
* 将緩沖區中待發送的消息全部寫入 Channel,并發送給通信對方
*/
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
@SuppressWarnings("deprecation")
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newFlush0Exception(initialCloseCause), false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
initialCloseCause = t;
close(voidPromise(), t, newFlush0Exception(t), false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
initialCloseCause = t;
close(voidPromise(), t2, newFlush0Exception(t), false);
}
}
} finally {
inFlush0 = false;
}
}
}
}
AbstractNioUnsafe
AbstractNioUnsafe 是 AbstractUnsafe類 的 NIO實作,它主要實作了 connect 、finishConnect 等方法。
public abstract class AbstractNioChannel extends AbstractChannel {
/**
* 擷取目前的連接配接狀态進行緩存,然後發起連接配接操作。
*/
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
/**
* 對 TCP三向交握連接配接結果 進行判斷
*/
@Override
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
}
}