尊重原創,轉載注明出處,原文位址:http://www.cnblogs.com/cishengchongyan/p/6129971.html
本文将不會對netty中每個點分類講解,而是一個服務端啟動的代碼走讀,在這個過程中再去了解和學習,這也是部落客自己的學習曆程。下面開始正文~~~~
衆所周知,在寫netty服務端應用的時候一般會有這樣的啟動代碼:
(代碼一)
1 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
2 EventLoopGroup workerGroup = new NioEventLoopGroup();
3 try {
4 ServerBootstrap bootStrap = new ServerBootstrap();
5 bootStrap.group(bossGroup, workerGroup)
6 .channel(NioServerSocketChannel.class)
7 .childHandler(new WebsocketChatServerInitializer())
8 .option(ChannelOption.SO_BACKLOG, 128)
9 .childOption(ChannelOption.SO_KEEPALIVE, true);
10
11 ChannelFuture f = bootStrap.bind(port).sync();
12 f.channel().closeFuture().sync();
13 } finally {
14 ...
15 }
本文将沿着這條主線來走讀代碼,但是在走讀之前首先要先認識一下Netty中的reactor模式是怎麼玩的。
首先先借用Doug Lea在Scalable IO in Java中的經典的圖示:
這張圖是經典的運用了多路複用的Reactor模式,也大緻說明了在netty中各線程的工作模式,mainReactor負責處理用戶端的請求,subReacor負責處理I/O的讀寫操作,同時還會有一些使用者的線程,用于異步處理I/O資料,在整個過程中通過角色細化,有效地将線程資源充分利用起來,建構了一條無阻塞通道,最後将耗時的業務邏輯交由業務線程去處理。本文不會對reactor做過多的解讀,而是結合netty的線程池模式來學習。
回到剛剛的主題,在服務端啟動的時候首先會new兩個NioEventLoopGroup,一個叫bossGroup(boss線程池),一個叫workerGroup(worker線程池),而這兩個就分别對應了上述的mainReactor和subReacor。接下來我們來看在new的過程中發生了什麼。
代碼走到MultithreadEventLoopGroup的構造方法中:
(代碼二)
1 public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
2
3 private static final int DEFAULT_EVENT_LOOP_THREADS;
4
5 static {
6 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
7 "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
8
9 if (logger.isDebugEnabled()) {
10 logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
11 }
12 }
13
14 protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
15 super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
16 }
17 ...
18 }
可以看到如果參數傳入了thread個數就取這個數目,如果沒有傳入就取可用處理器(CPU)個數的2倍。是以【代碼一】中boss隻有1個線程,而worker有2*cpu個數個線程。
繼續往下走到了核心代碼MultithreadEventExecutorGroup中:
(代碼三)
1 public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
2
3 private final EventExecutor[] children;
4 private final AtomicInteger childIndex = new AtomicInteger();
5 private final AtomicInteger terminatedChildren = new AtomicInteger();
6 private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
7 private final EventExecutorChooser chooser;
8
9 protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
10 if (nThreads <= 0) {
11 throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
12 }
13
14 if (threadFactory == null) {
15 threadFactory = newDefaultThreadFactory();
16 }
17
18 children = new SingleThreadEventExecutor[nThreads];
19 if (isPowerOfTwo(children.length)) {
20 chooser = new PowerOfTwoEventExecutorChooser();
21 } else {
22 chooser = new GenericEventExecutorChooser();
23 }
24
25 for (int i = 0; i < nThreads; i ++) {
26 boolean success = false;
27 try {
28 children[i] = newChild(threadFactory, args);
29 success = true;
30 } catch (Exception e) {
31 // TODO: Think about if this is a good exception type
32 throw new IllegalStateException("failed to create a child event loop", e);
33 } finally {
34 if (!success) {
35 ...50 }
51 }
52 }
53
54 final FutureListener<Object> terminationListener = new FutureListener<Object>() {
55 @Override
56 public void operationComplete(Future<Object> future) throws Exception {
57 if (terminatedChildren.incrementAndGet() == children.length) {
58 terminationFuture.setSuccess(null);
59 }
60 }
61 };
62
63 for (EventExecutor e: children) {
64 e.terminationFuture().addListener(terminationListener);
65 }
66 }
首先new一個線程工廠newDefaultThreadFactory,然後給變量children指派【PS:children是線程執行器的集合,幾個線程就會有幾個EventExecutor。是以EventExecutor是Reactor模式中真正執行工作的對象,它繼承自ScheduledExecutorService,是以應該明白它本質上是什麼了吧】
children是指派new了給定線程數數量的SingleThreadEventExecutor,看其内部代碼,SingleThreadEventExecutor構造方法:
(代碼四)
1 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor {
2 ...
3 private final EventExecutorGroup parent;
4 private final Queue<Runnable> taskQueue;
5 private final Thread thread;
6 ...
7 protected SingleThreadEventExecutor(
8 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
9
10 if (threadFactory == null) {
11 throw new NullPointerException("threadFactory");
12 }
13
14 this.parent = parent;
15 this.addTaskWakesUp = addTaskWakesUp;
16
17 thread = threadFactory.newThread(new Runnable() {
18 @Override
19 public void run() {
20 boolean success = false;
21 updateLastExecutionTime();
22 try {
23 SingleThreadEventExecutor.this.run();
24 success = true;
25 } catch (Throwable t) {
26 logger.warn("Unexpected exception from an event executor: ", t);
27 } finally {
28 ...
29 }
30 }
31 });
32 threadProperties = new DefaultThreadProperties(thread);
33 taskQueue = newTaskQueue();
34 }
35 ...
36 }
回到剛剛的主題(代碼三),發現在children[i] = newChild(threadFactory, args);而newChild是抽象方法,由于最開始我們初始化的是NioEventLoopGroup,是以是在NioEventLoopGroup中調用的:
(代碼五)
1 protected EventExecutor newChild(
2 ThreadFactory threadFactory, Object... args) throws Exception {
3 return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
4 }
是以相當于我們有多少個work或boss線程就有多少個NioEventLoop,而每一個NioEventLoop都綁定了一個selector。是以,相當于一個NioEventLoopGroup有自定義線程數量的NioEventLoop。
【PS:EventLoopGroup顧名思義是EventLoop的group,即包含了一組EventGroup。在實際的業務進行中,EventLoopGroup會通過EventLoop next()方法選擇一個 EventLoop,然後将實際的業務處理交給這個被選出的EventLoop去做。對于 NioEventLoopGroup來說,其真實功能都會交給EventLoopGroup去實作。】
接下來我們重點去看一下EventLoop和EventLoopGroup,自己畫了這一塊的UML圖來理一下類關系:
可以看出,EventLoop也繼承自EventLoopGroup,是以也是EventLoopGroup的一種。同時看到,這一堆類都實作自ScheduledExecutorService,那麼大家應該了解EventLoop和EventLoopGroup本質上是什麼東西了吧。這裡先不鋪展開,下文中在講注冊邏輯時會對EventLoopGroup做一個更詳細的了解。
我們先回到【代碼五主線】,我們接下來繼續看初始化邏輯:
(代碼六)
1 NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
2 super(parent, threadFactory, false);
3 if (selectorProvider == null) {
4 throw new NullPointerException("selectorProvider");
5 }
6 provider = selectorProvider;
7 selector = openSelector();
8 }
初始化NioEventLoop時調用了openSelector來打開目前作業系統中一個預設的selector實作。
回到【代碼一主線】,服務端初始化了boss和worker線程之後調用ServerBootstrap.group()來綁定兩個線程池排程器。接下來調用ServerBootstrap.channel(NioServerSocketChannel.class)。這塊邏輯很簡單就是在bootstrap内部初始化了一個class類型是NioServerSocketChannel的ChannelFactory,【PS:ChannelFactory不會指定生産對象的具體類型,隻要繼承自Channel就可以了】。
接下來,ServerBootstrap.childHandler()作用就是設定ChannelHandler來響應Channel的請求。一般這裡都會設定抽象類ChannelInitializer,并且實作模闆方法initChannel,在ChannelHandler注冊(初始化)的時候會調用initChannel來完成ChannelPipeline的初始化。
(代碼七)
1 public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
2
3 protected abstract void initChannel(C ch) throws Exception;
4
5 @Override
6 @SuppressWarnings("unchecked")
7 public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
8 initChannel((C) ctx.channel());
9 ctx.pipeline().remove(this);
10 ctx.fireChannelRegistered();
11 }
12 ...
13 }
關于ChannelHandler我們後面會做詳細的介紹,這裡隻需要了解到此就可以了。
回到【代碼一主線】,接下來bootStrap.option()和childOption()分别是給boss線程和worder線程設定參數,這裡先忽略。
然後是綁定端口ChannelFuture f = bootStrap.bind(port);在這一步中不僅僅是綁定端口,實際上需要做大量的初始化工作。我們先看一下AbstractBootstrap中的核心代碼:
(代碼八)
1 private ChannelFuture doBind(final SocketAddress localAddress) {
2 final ChannelFuture regFuture = initAndRegister();
3 final Channel channel = regFuture.channel();
4 if (regFuture.cause() != null) {
5 return regFuture;
6 }
7
8 if (regFuture.isDone()) {
9 ChannelPromise promise = channel.newPromise();
10 doBind0(regFuture, channel, localAddress, promise);
11 return promise;
12 } else {
13 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
14 regFuture.addListener(new ChannelFutureListener() {
15 @Override
16 public void operationComplete(ChannelFuture future) throws Exception {
17 Throwable cause = future.cause();
18 if (cause != null) {
19 promise.setFailure(cause);
20 } else {
21 promise.executor = channel.eventLoop();
22 }
23 doBind0(regFuture, channel, localAddress, promise);
24 }
25 });
26 return promise;
27 }
28 }
【代碼八主線】首先是initAndRegister(),看一下代碼:
(代碼九)
1 final ChannelFuture initAndRegister() {
2 final Channel channel = channelFactory().newChannel();
3 try {
4 init(channel);
5 } catch (Throwable t) {
6 channel.unsafe().closeForcibly();
7 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
8 }
9
10 ChannelFuture regFuture = group().register(channel);
11 if (regFuture.cause() != null) {
12 if (channel.isRegistered()) {
13 channel.close();
14 } else {
15 channel.unsafe().closeForcibly();
16 }
17 }
18
19 return regFuture;
20 }
首先調用工廠方法生成一個新Channel,我們剛剛說過,ChannelFactory不限定Channel的具體類型,而我們注冊的是NioServerSocketChannel,那麼這裡生産的就是該類型的Channel,然後調用init(),具體實作在ServerBootstrap中:
(代碼十)
1 @Override
2 void init(Channel channel) throws Exception {
3 final Map<ChannelOption<?>, Object> options = options();
4 synchronized (options) {
5 channel.config().setOptions(options);
6 }
7
8 final Map<AttributeKey<?>, Object> attrs = attrs();
9 synchronized (attrs) {
10 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
11 @SuppressWarnings("unchecked")
12 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
13 channel.attr(key).set(e.getValue());
14 }
15 }
16
17 ChannelPipeline p = channel.pipeline();
18
19 final EventLoopGroup currentChildGroup = childGroup;
20 final ChannelHandler currentChildHandler = childHandler;
21 final Entry<ChannelOption<?>, Object>[] currentChildOptions;
22 final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
23 synchronized (childOptions) {
24 currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
25 }
26 synchronized (childAttrs) {
27 currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
28 }
29
30 p.addLast(new ChannelInitializer<Channel>() {
31 @Override
32 public void initChannel(Channel ch) throws Exception {
33 ChannelPipeline pipeline = ch.pipeline();
34 ChannelHandler handler = handler();
35 if (handler != null) {
36 pipeline.addLast(handler);
37 }
38 pipeline.addLast(new ServerBootstrapAcceptor(
39 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
40 }
41 });
42 }
在init()中做了大緻這麼幾件事:1,配置channel的option;2,配置channel的attr;3,ChannelPipeline增加兩個Handler,一個是bootstrap中的私有handler,一個是ServerBootstrapAcceptor(這個Handler用于接收客戶連接配接後設定其初始化參數)。
【代碼九主線】完成了init之後調用EventLoopGroup.register(channel)完成了channel的注冊,實際上就是将channel注冊到EventLoop中的selector上。這塊我們可以了解一下其中的實作:
先看一下EventLoopGroup接口:
(代碼十一)
1 public interface EventLoopGroup extends EventExecutorGroup {
2
3 @Override
4 EventLoop next();
5
6 ChannelFuture register(Channel channel);
7
8 ChannelFuture register(Channel channel, ChannelPromise promise);
9 }
其中next方法傳回EventLoopGroup裡的一個EventLoop,register用于注冊Channel到EventLoop裡。【PS:EventLoopGroup顧名思義是EventLoop的group,即包含了一組EventGroup。在實際的業務進行中,EventLoopGroup會通過EventLoop next()方法選擇一個 EventLoop,然後将實際的業務處理交給這個被選出的EventLoop去做。對于 NioEventLoopGroup來說,其真實功能都會交給EventLoopGroup去實作】
我們詳細看一下register到底如何實作的,往下看是在SingleThreadEventLoop裡實作了該方法:
(代碼十二)
1 public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
2 ...
3 @Override
4 public ChannelFuture register(Channel channel) {
5 return register(channel, new DefaultChannelPromise(channel, this));
6 }
7
8 @Override
9 public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
10 if (channel == null) {
11 throw new NullPointerException("channel");
12 }
13 if (promise == null) {
14 throw new NullPointerException("promise");
15 }
16
17 channel.unsafe().register(this, promise);
18 return promise;
19 }
20 ...
21 }
注意,在這裡調用了Channel的Unsafe内部類完成了注冊,是以接下來的東西都是NIO中的 【PS:Unsafe是定義在Channel中的内部接口,是不會被使用者代碼調用到的,但是在channel的I/O操作中實際上都是由unsafe來完成的。Unsafe不論是接口還是類,都會定義到channel的内部(例如Channel接口中定義了Unsafe接口,AbstractChannel抽象類中定義了AbstractUnsafe抽象類),是以如果将nio類比為一個linux系統的話,那麼unsafe就是其中的核心空間】
具體的register操作是在AbstractUnsafe中完成,在register()方法中調用了模闆方法,我們看一下在AbstractNioChannel中的核心實作:
(代碼十三)
1 @Override
2 protected void doRegister() throws Exception {
3 boolean selected = false;
4 for (;;) {
5 try {
6 selectionKey = javaChannel().register(eventLoop().selector, 0, this);
7 return;
8 } catch (CancelledKeyException e) {
9 if (!selected) {
10 eventLoop().selectNow();
11 selected = true;
12 } else {
13 throw e;
14 }
15 }
16 }
17 }
18 }
這裡實際上調用的是SelectableChannel中的register方法,作用就是将本channel注冊到本channel的eventLoop的Selector中,那麼問題又來了,什麼是SelectableChannel?【PS:它實作Channel接口,代碼注釋說明其是一種可以被Selector使用用于多路複用的Channel,SelectableChannel可以通過 register方法将自己注冊在Selector上,并提供其所關注的事件類型。是以,繼承自SelectableChannel的Channel才可以真正和Selector打交道,例如ServerSocketChannel和SocketChannel】
繼續看其中的SelectableChannel中的實作:
(代碼十四)
1 public final SelectionKey register(Selector sel, int ops,
2 Object att)throws ClosedChannelException{
3 synchronized (regLock) {
4 ...
5 SelectionKey k = findKey(sel);
6 if (k != null) {
7 k.interestOps(ops);
8 k.attach(att);
9 }
10 if (k == null) {
11 // New registration
12 synchronized (keyLock) {
13 if (!isOpen())
14 throw new ClosedChannelException();
15 k = ((AbstractSelector)sel).register(this, ops, att);
16 addKey(k);
17 }
18 }
19 return k;
20 }
21 }
這裡的邏輯很清晰,如果該channel有在Selector中注冊過(有對應的SelectionKey),那麼将這個key強制綁定到入參的Channel中(可能會導緻之前綁定失效),如果該channel沒有在Selector中注冊過,那麼調用AbstractSelector(底層JDK實作)該register邏輯。至此我們完成了register邏輯代碼的走讀。
繼續回歸【代碼八主線】,我們已經完成了initAndRegister邏輯,如果不出意外那麼regFuture.isDone()将是true,接下來調用了doBind0():
(代碼十五)
1 private static void doBind0(
2 final ChannelFuture regFuture, final Channel channel,
3 final SocketAddress localAddress, final ChannelPromise promise) {
4
5 channel.eventLoop().execute(new Runnable() {
6 @Override
7 public void run() {
8 if (regFuture.isSuccess()) {
9 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
10 } else {
11 promise.setFailure(regFuture.cause());
12 }
13 }
14 });
15 }
這裡有必要了解一下ChannelPromise,它擴充了Promise和ChannelFuture,是一個可寫入的ChannelFuture。我再在網上搜了很多資料裡說它具備監聽器的功能。但是我自己不這麼認為,我們看Promise接口在future的基礎上增加了setSuccess(), setFailure()這些方法,而ChannelFuture裡success和failuer都是不可寫的。為什麼呢?從定義上來看,ChannelFuture本來就是異步執行的結果,既然已經異步了那麼在傳回的時候本來就無法确定其成功或者失敗,而有的時候我們做校驗或者寫一些業務邏輯的時候可以确定其結果,是以我覺得ChannelPromise作為一個可寫的ChannelFuture是對其的一個補充,可以标記異步任務成功或者失敗,是以它是netty異步架構中實際使用的異步執行結果。在這裡調用channel.bind(localAddress, promise);作用很明确就是給該channel綁定端口,然後該方法會立即傳回一個ChannelPromise(不論這個實際的異步操作有沒有做完)。一般用法也是這樣的,方法定義時傳回值都是ChannelFuture,而實作時實際傳回的都是ChannelPromise。
最後給立即傳回的這個ChannelFuture添加一個listener。netty中有兩種方式擷取異步執行的真正結果,一種是調用老祖宗Future的get方法來擷取(阻塞等待),一種是添加listener(異步回調),netty中推薦使用第二種方式,在整個的netty異步架構中也大量使用了這種方式。剛剛添加的那個listener的作用是:如果注冊失敗了,那麼就關閉該Channel。最後bind傳回異步的ChannelPromise,完成整個bind流程。
至此【代碼一主線】走讀完畢,我們大緻浏覽了一遍server端bootstrap啟動流程。
最後大緻總結一下服務端啟動的主流程:
- 初始化boss和worker線程排程器NioEventLoopGroup,打開其中的Selector對象并配置相關參數。
- ServerBootstrap綁定這兩個NioEventLoopGroup。
- 為server端确定綁定Channel的class類型(即将要使用什麼類型),在本文的例子中綁定的是NioServerSocketChannel,實質上隻是初始化ChannelFactory。(此時還沒有初始化該Channel,也沒有為Selector注冊該Channel)。
- 初始化使用者定義的ChannelInitializer,也就是在ChannelPipeline中添加使用者自己的ChannelHandler(此時還沒有注冊,隻是初始化變量而已)。
- 調用bind(port)啟動監聽,整個bind的過程非常複雜,做了最核心的初始化工作:
1) ChannelFactory生成核心的NioServerSocketChannel執行個體,為該Channel初始化參數,然後為NioServerSocketChannel的pipeline中再添加兩個netty架構的Handler。
2) 将NioServerSocketChannel執行個體綁定到boss線程排程器的Selector中,此時boss線程被激活并開始接受I/O請求,同時所有的Pipeline中的Handler也會完成注冊。
3) 異步為NioServerSocketChannel綁定注冊的端口。
至此,ServerBootstrap啟動完畢,開始接收I/O請求。本文大緻走讀了一遍服務端啟動的代碼,在走讀的過程中對一些概念進行解讀,相信大家在大腦中對netty的基本成員已經有了一個輪廓。那麼服務端啟動之後,netty是如何接收并分發socket請求,pipeline中又是如何組織并調用handler,以及boss和worker如何協同工作将在下一篇部落格中進行解讀。
轉載于:https://www.cnblogs.com/cishengchongyan/p/6129971.html