編寫一個NIO Server用JDK NIO包實作非常繁瑣,要綁定端口、監聽用戶端連接配接、監聽資料、接收資料、處理資料。用Netty了了二三十行代碼就實作了這些功能,我們知道Netty對JDK NIO進行了封裝和改進,接下來從官方的Demo分析Netty的實作
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();
}finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if(args.length > 0){
port = Integer.valueOf(args[0]);
}
new DiscardServer(port).run();
}
}
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
簡化上邊代碼Netty的建立NIO Server的過程隻需要幾個步驟,按照以下幾個步驟逐漸解析
serverBootstrap
.group() // 1
.channel() // 2
.childHandler() // 3
.option() // 4
.childOption() // 5
.bind(); // 6
1.group() 設定處理器
這裡用來設定【處理使用者請求的線程組】 和 【處理讀寫請求的線程組】。這裡是reactor的核心部分,參見https://zhuanlan.zhihu.com/p/87630368
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
ServerBootstrap類持有group、childGroup倆個線程組,其中group在父類AbstractBootstrap中聲明,這裡調用group參數隻是簡單的給内部成員指派。parentGroup用來異步處理使用者注冊請求;childGroup用來做IO事件回調處理,特别注意的是這裡一個channel對應一個EvenLoop,netty巧妙規避了多線程并發問題提高了性能
2.channel() 設定channel類型
// 設定channel類型
public B channel(Class<? extends C> channelClass) {
// 實際建構了一個channel工廠
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
// 反射工廠的構造方法,實際儲存了channel類的構造方法
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}
3.childHandler() 設定處理器
這裡注冊的是ChannelInitializer實作的,在服務端接受到連接配接的時候會調用initChannel方法給建立的channel設定一個處理器
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
4.option()、childOption() channel參數配置
配置channel參數,option對應的是boss線程組,childOption對應worker線程組,實作方别在AbstractBootstrap和ServerBootstrap,前者是後者的父類是個抽象類。boss線程組是在AbstractBootstrap裡聲明的,worker線程組是在ServerBootstrap中聲明的。
5.bind() 端口綁定
這裡比較關鍵,執行bind方法,
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并注冊,這裡很關鍵
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
// 如果抛出異常,這裡直接退出注冊流程
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
看下關鍵方法AbstractBootstrap.initAndRegister()
final ChannelFuture initAndRegister() {
// 1.初始化channel
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 2.注冊channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
- channelFactory.newChannel() 調用在第二步channel(Class<? extends C> channelClass)方法裡建構的channelFactory來建立channel,然後調用init(channel)加載配置
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
void init(Channel channel) {
setChannelOptions(channel, options0().entrySet().toArray(EMPTY_OPTION_ARRAY), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
- config().group().register(channel)是注冊channel到selector的方法。
config().group()實則擷取的boss線程,實作如下
public final ServerBootstrapConfig config() {
return config;
}
public final EventLoopGroup group() {
return bootstrap.group();
}
這裡有很多類都實作了register()方法,由于我們調用group方法傳入的NioEventLoopGroup,而NioEventLoopGroup又繼承自MultithreadEventLoopGroup,是以我們應該看MultithreadEventLoopGroup的實作。
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
next()方法實作很關鍵,用來選取一個EventLoop線程。調用鍊為MultithreadEventLoopGroup.next() -> MultithreadEventExecutorGroup.next() -> chooser.next() ,MultithreadEventExecutorGroup.next是MultithreadEventLoopGroup.next的父類,chooser是一個選擇器封裝了選取EventLoop線程的政策,netty自帶實作有GenericEventExecutorChooser和PowerOfTwoEventExecutorChooser倆種
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
選出一個EventLoop線程了,那接下來就看綁定方法register()的實作,實作在SingleThreadEventLoop裡
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
這裡調用的AbstractChannel的内部類abstarctUnsafe,register0會被封裝成異步讓我
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 擷取一個線程,這裡是單線程實作的線程池
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 異步注冊
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
register0()通過調用doRegister()方法進行注冊
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 這個方法很關鍵,真正執行注冊實作在這裡
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
doRegister()實作在AbstractNioChannel中
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 這裡調用了JDK的NIO實作
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
至此注冊流程分析結束。最後做個回顧,使用Netty建構一個NIO server大緻需要如下幾個步驟:
- 配置線程組,boss線程組負責處理使用者請求、worker線程組負責處理IO
- 配置Channel類型,并設定相關參數,非阻塞等
- 設定處理器,不同方法分别對應建立連接配接、讀寫操作等事件
- 綁定端口,由選舉器chooser 從NiOEventLoopGroup裡選出一個EventLoop異步進行端口綁定。
Netty的封裝極大的簡化了開發,同時boss線程組、worker線程組把accepter和reactor解耦分别用線程組來實作提升了性能。boss線程組異步設計使得能夠處理更多的使用者請求、worker線程組隻需要比連接配接少的多的線程就可以處理IO回調。每個Channel和一個EventLoop綁定消除了多線程資料同步問題,無所設計也極大的提升了性能,使得netty更順滑的處理大量IO請求。