天天看點

Java NIO之選擇器

1.簡介

前面的文章說了緩沖區,說了通道,本文就來說說 NIO 中另一個重要的實作,即選擇器 Selector。在更早的文章中,我簡述了幾種 IO 模型。如果大家看過之前的文章,并動手寫過代碼的話。再看 Java 的選擇器大概就會知道它是什麼了,以及怎麼用了。選擇器是 Java 多路複用模型的一個實作,可以同時監控多個非阻塞套接字通道。示意圖大緻如下:

Java NIO之選擇器

如果大家了解過多路複用模型,那應該也會知道幾種複用模型的實作。比如 select,poll 以及 Linux 下的 epoll 和 BSD 下的 kqueue。Java 的選擇器并非憑空創造,而是在底層作業系統提供的接口的基礎上封裝而來。相關的細節,我随後會進行分析。

關于 Java 選擇器的簡介這裡先說到這,接下來進入正題。

2.基本操作及實作

本章我将對 Selector 的建立,通道的注冊,Selector 的選擇過程進行分析。内容篇幅較大,希望大家耐心看完。由于 Selector 相關類在不同作業系統下的實作是不同的,加之個人對 Linux epoll 更為熟悉,是以本文所分析的源碼也是和 epoll 相關的。好了,進入正題吧。

2.1 建立選擇器

選擇器 Selector 是一個抽象類,是以不能直接建立。Selector 提供了一個 open 方法,通過 open 方法既可以建立選擇器執行個體。示例代碼如下:

Selector selector = Selector.open();           

上面的代碼比較簡單,隻有一行。不過不要被表象迷惑,這行代碼僅是完整實作的冰山一角,更複雜的邏輯則隐藏在水面之下。

在簡介一節,我已經說了 Java 選擇器是對底層多路複用接口的一個包裝,這裡的 open 方法也不例外。假設我們的 Java 運作在 Linux 平台下,那麼 open 最終所做的事情應該是調用作業系統的

epoll_create

函數,用于建立 epoll 執行個體。真實情況是不是如此呢?答案就在冰山深處,接下來就讓我們一起去求索吧。下面我們将沿着 open 方法一路走下去,如下:

public abstract class Selector implements Closeable {         public static Selector open() throws IOException {             // 建立 SelectorProvider,再通過其 openSelector 方法建立 Selector             return SelectorProvider.provider().openSelector();         }         // 省略無關代碼     }     public abstract class SelectorProvider {         public static SelectorProvider provider() {             synchronized (lock) {                 if (provider != null)                     return provider;                 return AccessController.doPrivileged(                     new PrivilegedAction<SelectorProvider>() {                         public SelectorProvider run() {                                 if (loadProviderFromProperty())                                     return provider;                                 if (loadProviderAsService())                                     return provider;                                 // 建立預設的 SelectorProvider                                 provider = sun.nio.ch.DefaultSelectorProvider.create();                                 return provider;                             }                         });             }         }     }     public class DefaultSelectorProvider {         private DefaultSelectorProvider() { }         /**          * 根據系統名稱建立相應的 SelectorProvider          */         public static SelectorProvider create() {             String osname = AccessController                 .doPrivileged(new GetPropertyAction("os.name"));             if (osname.equals("SunOS"))                 return createProvider("sun.nio.ch.DevPollSelectorProvider");             if (osname.equals("Linux"))                 return createProvider("sun.nio.ch.EPollSelectorProvider");             //              return new sun.nio.ch.PollSelectorProvider();         }         /**          * 加載 SelectorProvider 類,并建立執行個體          */         @SuppressWarnings("unchecked")         private static SelectorProvider createProvider(String cn) {             Class<SelectorProvider> c;             try {                 c = (Class<SelectorProvider>)Class.forName(cn);             } catch (ClassNotFoundException x) {                 throw new AssertionError(x);             }             try {                 return c.newInstance();             } catch (IllegalAccessException | InstantiationException x) {                 throw new AssertionError(x);             }         }     }     /**      * 建立完 SelectorProvider,接下來要調用 openSelector 方法      * 建立 Selector 的繼承類了。      */     public class EPollSelectorProvider extends SelectorProviderImpl {         public AbstractSelector openSelector() throws IOException {             return new EPollSelectorImpl(this);         }     }     class EPollSelectorImpl extends SelectorImpl {         EPollSelectorImpl(SelectorProvider sp) throws IOException {             // 調用父類構造方法             super(sp);             long pipeFds = IOUtil.makePipe(false);             fd0 = (int) (pipeFds >>> 32);             fd1 = (int) pipeFds;             // 建立 EPollArrayWrapper,EPollArrayWrapper 是一個重要的實作             pollWrapper = new EPollArrayWrapper();             pollWrapper.initInterrupt(fd0, fd1);             fdToKey = new HashMap<>();         }     }     public abstract class SelectorImpl extends AbstractSelector {         protected SelectorImpl(SelectorProvider sp) {             super(sp);             keys = new HashSet<SelectionKey>();             selectedKeys = new HashSet<SelectionKey>();             /* 初始化 publicKeys 和 publicSelectedKeys,              * publicKeys 即 selector.keys() 方法所傳回的集合,              * publicSelectedKeys 則是 selector.selectedKeys() 方法傳回的集合              */             if (Util.atBugLevel("1.4")) {                 publicKeys = keys;                 publicSelectedKeys = selectedKeys;             } else {                 publicKeys = Collections.unmodifiableSet(keys);                 publicSelectedKeys = Util.ungrowableSet(selectedKeys);             }         }     }     /**      * EPollArrayWrapper 一個重要的實作,這一層再往下就是 C 代碼了      */     class EPollArrayWrapper {         EPollArrayWrapper() throws IOException {             // 調用 epollCreate 方法建立 epoll 檔案描述符             epfd = epollCreate();             // the epoll_event array passed to epoll_wait             // 初始化 pollArray,該對象用于存儲就緒檔案描述符和事件             int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;             pollArray = new AllocatedNativeObject(allocationSize, true);             pollArrayAddress = pollArray.address();             // eventHigh needed when using file descriptors > 64k             if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)                 eventsHigh = new HashMap<>();         }         // epollCreate 方法是 native 類型的         private native int epollCreate();     }           

以上代碼時 Java 層面的,Java 層調用棧最下面的類是 EPollArrayWrapper(源碼路徑可以在附錄中查找)。EPollArrayWrapper 是一個重要的實作,起着承上啟下的作用。上層是 Java 代碼,下層是 C 代碼。上層的代碼看完了,接下來看看冰山深處的 C 代碼:

JNIEXPORT jint JNICALL     Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)     {         // 調用 epoll_create 函數建立 epoll 執行個體,并傳回檔案描述符 epfd         int epfd = epoll_create(256);         if (epfd < 0) {            JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");         }         return epfd;     }           

上面的代碼很簡單,僅做了建立 epoll 執行個體這一件事。看到這裡,答案就明了了。最後在附一張時序圖幫助大家理清代碼調用順序,如下:

Java NIO之選擇器

2.2 選擇鍵

2.2.1 幾種事件

選擇鍵 SelectionKey 包含4種事件,分别是:

public static final int OP_READ = 1 << 0;     public static final int OP_WRITE = 1 << 2;     public static final int OP_CONNECT = 1 << 3;     public static final int OP_ACCEPT = 1 << 4;           

事件之間可以通過或運算進行組合,比如:

int interestOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE;           

2.2.2 兩種事件集合:interestOps 和 readyOps

interestOps 即感興趣的事件集合,通道調用 register 方法注冊時會設定此值,interestOps 可通過 SelectionKey interestOps() 方法擷取。readyOps 是就緒事件集合,可通過 SelectionKey readyOps() 擷取。

interestOps 和 readyOps 被聲明在 SelectionKey 子類 SelectionKeyImpl 中,代碼如下:

public class SelectionKeyImpl extends AbstractSelectionKey {         private volatile int interestOps;         private int readyOps;     }           

接下來再來看看與 readyOps 事件集合相關的幾個方法,如下:

selectionKey.isAcceptable();     selectionKey.isConnectable();     selectionKey.isReadable();     selectionKey.isWritable();           

以上方法從字面意思上就可以知道有什麼用,這裡就不解釋了。接下來以 isReadable 方法為例,簡單看一下這個方法是如何實作。

public final boolean isReadable() {         return (readyOps() & OP_READ) != 0;     }           

上面說到可以通過或運算組合事件,這裡則是通過與運算來測試某個事件是否在事件集合中。比如

readyOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE = 0101,     readyOps & OP_READ = 0101 & 0001 = 0001,     readyOps & OP_CONNECT = 0101 & 1000 = 0           

readyOps & OP_READ != 0

,是以 OP_READ 在事件集合中。

readyOps & OP_CONNECT == 0

,是以 OP_CONNECT 不在事件集合中。

2.2.3 attach 方法

attach 是一個好用的方法,通過這個方法,可以将對象暫存在 SelectionKey 中,待需要的時候直接取出來即可。比如本文對應的練習代碼實作了一個簡單的 HTTP 伺服器,在讀取使用者請求資料後(即 selectionKey.isReadable() 為 true),會去解析請求頭,然後将請求頭資訊通過 attach 方法放入 selectionKey 中。待通道可寫後,再從 selectionKey 中取出請求頭,并根據請求頭回複用戶端不同的消息。當然,這隻是一個應用場景,attach 可能還有其他的應用場景,比如辨別通道。不過其他的場景我沒使用過,就不說了。attach 使用方式如下:

selectionKey.attach(obj);     Object attachedObj = selectionKey.attachment();           

2.3 通道注冊

通道注冊即将感興趣的事件告知 Selector,待事件發生時,Selector 即可傳回就緒事件,我們就可以去做後續的事情了。比如 ServerSocketChannel 通道通常對 OP_ACCEPT 事件感興趣,那麼我們就可以把這個事件注冊給 Selector。待事件發生,即服務端接受用戶端連接配接後,我們即可擷取這個就緒的事件并做相應的操作。通道注冊的示例代碼如下:

channel.configureBlocking(false);     SelectionKey key = channel.register(selector, SelectionKey.OP_READ);           

起初我以為通道注冊操作會調用作業系統的 epoll_ctl 函數,但最終通過看源碼,發現自己的了解是錯的。既然通道注冊階段不調用 epoll_ctl 函數。那麼,epoll_ctl 什麼時候才會被調用呢?如果不調用 epoll_ctl,那麼注冊過程都幹了什麼事情呢?關于第一個問題,本節還無法解答,不過第二個問題則可以說說。接下來讓我們深入通道類 register 方法的調用棧中去探尋答案吧。

public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {         public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {             return register(sel, ops, null);         }         public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;     }     public abstract class AbstractSelectableChannel extends SelectableChannel {         private SelectionKey[] keys = null;         public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {             synchronized (regLock) {                 // 省去一些校驗代碼                 // 從 keys 數組中查找,查找條件為 k.selector() == sel                 SelectionKey k = findKey(sel);                 // 如果 k 不為空,則修改 k 所感興趣的事件                 if (k != null) {                     k.interestOps(ops);                     k.attach(att);                 }                 // k 為空,則建立一個 SelectionKey,并存儲到 keys 數組中                 if (k == null) {                     // New registration                     synchronized (keyLock) {                         if (!isOpen())                             throw new ClosedChannelException();                         k = ((AbstractSelector)sel).register(this, ops, att);                         addKey(k);                     }                 }                 return k;             }         }     }     public abstract class AbstractSelector extends Selector {         protected abstract SelectionKey register(AbstractSelectableChannel ch,                                              int ops, Object att);     }     public abstract class SelectorImpl extends AbstractSelector {         protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {             if (!(ch instanceof SelChImpl))                 throw new IllegalSelectorException();             // 建立 SelectionKeyImpl 執行個體             SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);             k.attach(attachment);             synchronized (publicKeys) {                 implRegister(k);             }             k.interestOps(ops);             return k;         }     }     class EPollSelectorImpl extends SelectorImpl {         protected void implRegister(SelectionKeyImpl ski) {             if (closed)                 throw new ClosedSelectorException();             SelChImpl ch = ski.channel;             int fd = Integer.valueOf(ch.getFDVal());             // 存儲 fd 和 SelectionKeyImpl 的映射關系             fdToKey.put(fd, ski);             pollWrapper.add(fd);             // 将 SelectionKeyImpl 執行個體存儲到 keys 中(這裡的 keys 聲明在 SelectorImpl 類中),keys 集合可由 selector.keys() 方法擷取             keys.add(ski);         }     }     public class SelectionKeyImpl extends AbstractSelectionKey {         public SelectionKey interestOps(int ops) {             ensureValid();             return nioInterestOps(ops);         }         public SelectionKey nioInterestOps(int ops) {             if ((ops & ~channel().validOps()) != 0)                 throw new IllegalArgumentException();             // 轉換并設定感興趣的事件             channel.translateAndSetInterestOps(ops, this);             // 設定 interestOps 變量             interestOps = ops;             return this;         }     }     class SocketChannelImpl extends SocketChannel implements SelChImpl {         public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {             int newOps = 0;             // 轉換事件             if ((ops & SelectionKey.OP_READ) != 0)                 newOps |= PollArrayWrapper.POLLIN;             if ((ops & SelectionKey.OP_WRITE) != 0)                 newOps |= PollArrayWrapper.POLLOUT;             if ((ops & SelectionKey.OP_CONNECT) != 0)                 newOps |= PollArrayWrapper.POLLCONN;             // 設定事件             sk.selector.putEventOps(sk, newOps);         }     }     class class EPollSelectorImpl extends SelectorImpl {         public void putEventOps(SelectionKeyImpl ski, int ops) {             if (closed)                 throw new ClosedSelectorException();             SelChImpl ch = ski.channel;             // 設定感興趣的事件             pollWrapper.setInterest(ch.getFDVal(), ops);         }     }     class EPollArrayWrapper {         void setInterest(int fd, int mask) {             synchronized (updateLock) {                 // 擴容 updateDescriptors 數組,并存儲檔案描述符 fd                 int oldCapacity = updateDescriptors.length;                 if (updateCount == oldCapacity) {                     int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;                     int[] newDescriptors = new int[newCapacity];                     System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);                     updateDescriptors = newDescriptors;                 }                 updateDescriptors[updateCount++] = fd;                 // events are stored as bytes for efficiency reasons                 byte b = (byte)mask;                 assert (b == mask) && (b != KILLED);                 // 存儲事件                 setUpdateEvents(fd, b, false);             }         }         private void setUpdateEvents(int fd, byte events, boolean force) {             if (fd < MAX_UPDATE_ARRAY_SIZE) {                 if ((eventsLow[fd] != KILLED) || force) {                     eventsLow[fd] = events;                 }             } else {                 Integer key = Integer.valueOf(fd);                 if (!isEventsHighKilled(key) || force) {                     eventsHigh.put(key, Byte.valueOf(events));                 }             }         }     }           

到 setUpdateEvents 這個方法,整個調用棧就結束了。但是我們并未在調用棧中看到調用 epoll_ctl 函數的地方,也就是說,通道注冊時,并不會立即調用 epoll_ctl,而是先将事件集合 events 存放在 eventsLow。至于 epoll_ctl 函數何時調用的,需要大家繼續往下看了。

2.4 選擇過程

2.4.1 選擇方法

Selector 包含3種不同功能的選擇方法,分别如下:

  • int select()
  • int select(long timeout)
  • int selectNow()

select() 是一個阻塞方法,僅在至少一個通道處于就緒狀态時才傳回。

select(long timeout) 同樣也是阻塞方法,不過可對該方法設定逾時時間(timeout > 0),使得線程不會被一直阻塞。如果 timeout = 0,會一直阻塞線程。

selectNow() 為非阻塞方法,調用後立即傳回。

以上3個方法均傳回 int 類型值,表示每次調用 select 或 selectNow 方法後,新就緒通道的數量。如果某個通道在上一次調用 select 方法時就已經處于就緒狀态,但并未将該通道對應的 SelectionKey 對象從 selectedKeys 集合中移除。假設另一個的通道在本次調用 select 期間處于就緒狀态,此時,select 傳回1,而不是2。

2.4.2 選擇過程

選擇方法用起來雖然簡單,但方法之下隐藏的邏輯還是比較複雜的。大緻分為下面幾個步驟:

  1. 檢查已取消鍵集合 cancelledKeys 是否為空,不為空則将 cancelledKeys 的鍵從 keys 和 selectedKeys 中移除,并将鍵和通道登出。
  2. 調用作業系統的 epoll_ctl 函數将通道感興趣的事件注冊到 epoll 執行個體中
  3. 調用作業系統的 epoll_wait 函數監聽事件
  4. 再次執行步驟1
  5. 更新 selectedKeys 集合,并傳回就緒通道數量

上面五個步驟對應于 EPollSelectorImpl 類中 doSelect 方法的邏輯,如下:

protected int doSelect(long timeout) throws IOException {         if (closed)             throw new ClosedSelectorException();         // 處理已取消鍵集合,對應步驟1         processDeregisterQueue();         try {             begin();             // select 方法的核心,對應步驟2和3             pollWrapper.poll(timeout);         } finally {             end();         }         // 處理已取消鍵集合,對應步驟4         processDeregisterQueue();         // 更新 selectedKeys 集合,并傳回就緒通道數量,對應步驟5         int numKeysUpdated = updateSelectedKeys();         if (pollWrapper.interrupted()) {             // Clear the wakeup pipe             pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);             synchronized (interruptLock) {                 pollWrapper.clearInterrupted();                 IOUtil.drain(fd0);                 interruptTriggered = false;             }         }         return numKeysUpdated;     }           

接下來,我們按照上面的步驟順序去分析代碼實作。先來看看步驟1對應的代碼:

+----SelectorImpl.java     void processDeregisterQueue() throws IOException {         // Precondition: Synchronized on this, keys, and selectedKeys         Set<SelectionKey> cks = cancelledKeys();         synchronized (cks) {             if (!cks.isEmpty()) {                 Iterator<SelectionKey> i = cks.iterator();                 // 周遊 cancelledKeys,執行登出操作                 while (i.hasNext()) {                     SelectionKeyImpl ski = (SelectionKeyImpl)i.next();                     try {                         // 執行登出邏輯                         implDereg(ski);                     } catch (SocketException se) {                         throw new IOException("Error deregistering key", se);                     } finally {                         i.remove();                     }                 }             }         }     }     +----EPollSelectorImpl.java     protected void implDereg(SelectionKeyImpl ski) throws IOException {         assert (ski.getIndex() >= 0);         SelChImpl ch = ski.channel;         int fd = ch.getFDVal();         // 移除 fd 和選擇鍵鍵的映射關系         fdToKey.remove(Integer.valueOf(fd));         // 從 epoll 執行個體中删除事件         pollWrapper.remove(fd);         ski.setIndex(-1);         // 從 keys 和 selectedKeys 中移除選擇鍵         keys.remove(ski);         selectedKeys.remove(ski);         // 登出選擇鍵         deregister((AbstractSelectionKey)ski);         // 登出通道         SelectableChannel selch = ski.channel();         if (!selch.isOpen() && !selch.isRegistered())             ((SelChImpl)selch).kill();     }           

上面的代碼代碼邏輯不是很複雜,首先是擷取 cancelledKeys 集合,然後周遊集合,并對每個選擇鍵及其對應的通道執行登出操作。接下來再來看看步驟2和3對應的代碼,如下:

+----EPollArrayWrapper.java     int poll(long timeout) throws IOException {         // 調用 epoll_ctl 函數注冊事件,對應步驟3         updateRegistrations();         // 調用 epoll_wait 函數等待事件發生,對應步驟4         updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);         for (int i=0; i<updated; i++) {             if (getDescriptor(i) == incomingInterruptFD) {                 interruptedIndex = i;                 interrupted = true;                 break;             }         }         return updated;     }     /**      * Update the pending registrations.      */     private void updateRegistrations() {         synchronized (updateLock) {             int j = 0;             while (j < updateCount) {                 // 擷取 fd 和 events,這兩個值在調用 register 方法時被存儲到數組中                 int fd = updateDescriptors[j];                 short events = getUpdateEvents(fd);                 boolean isRegistered = registered.get(fd);                 int opcode = 0;                 if (events != KILLED) {                     // 确定 opcode 的值                     if (isRegistered) {                         opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;                     } else {                         opcode = (events != 0) ? EPOLL_CTL_ADD : 0;                     }                     if (opcode != 0) {                         // 注冊事件                         epollCtl(epfd, opcode, fd, events);                         // 設定 fd 的注冊狀态                         if (opcode == EPOLL_CTL_ADD) {                             registered.set(fd);                         } else if (opcode == EPOLL_CTL_DEL) {                             registered.clear(fd);                         }                     }                 }                 j++;             }             updateCount = 0;         }         // 下面兩個均是 native 方法         private native void epollCtl(int epfd, int opcode, int fd, int events);         private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;     }           

看到 updateRegistrations 方法的實作,大家現在知道 epoll_ctl 這個函數是在哪裡調用的了。在 3.2 節通道注冊的結尾給大家埋了一個疑問,這裡就是答案了。注冊通道實際上隻是先将事件收集起來,等調用 select 方法時,在一起通過 epoll_ctl 函數将事件注冊到 epoll 執行個體中。

上面 epollCtl 和 epollWait 方法是 native 類型的,接下來我們再來看看這兩個方法是如何實作的。如下:

+----EPollArrayWrapper.c     JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events) {         struct epoll_event event;         int res;         event.events = events;         event.data.fd = fd;         // 調用 epoll_ctl 注冊事件         RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);         if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {             JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");         }     }     JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) {         struct epoll_event *events = jlong_to_ptr(address);         int res;         if (timeout <= 0) {           /* Indefinite or no wait */             // 調用 epoll_wait 等待事件             RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);         } else {                      /* Bounded wait; bounded restarts */             res = iepoll(epfd, events, numfds, timeout);         }         if (res < 0) {             JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");         }         return res;     }           

上面的C代碼沒什麼複雜的邏輯,這裡就不多說了。如果大家對 epoll_ctl 和 epoll_wait 函數不了解,可以參考 Linux man-page。關于 epoll 的示例,也可以參考我的另一篇文章“基于epoll實作簡單的web伺服器”。

說完步驟2和3對應的代碼,接下來再來說說步驟4和5。由于步驟4和步驟1是一樣的,這裡不再贅述。最後再來說說步驟5的邏輯。代碼如下:

+----EPollSelectorImpl.java     private int updateSelectedKeys() {         int entries = pollWrapper.updated;         int numKeysUpdated = 0;         for (int i=0; i<entries; i++) {             /* 從 pollWrapper 成員變量的 pollArray 中擷取檔案描述符,              * pollArray 中的資料由 epoll_wait 設定              */             int nextFD = pollWrapper.getDescriptor(i);             SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));             // ski is null in the case of an interrupt             if (ski != null) {                 // 從 pollArray 中擷取就緒事件集合                 int rOps = pollWrapper.getEventOps(i);                 /* 如果 selectedKeys 已包含選擇鍵,則選擇鍵必須由新的事件發生時,                  * 才會将 numKeysUpdated + 1                  */                  if (selectedKeys.contains(ski)) {                     if (ski.channel.translateAndSetReadyOps(rOps, ski)) {                         numKeysUpdated++;                     }                 } else {                     // 轉換并設定就緒事件集合                     ski.channel.translateAndSetReadyOps(rOps, ski);                     if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {                         // 更新 selectedKeys 集合,并将 numKeysUpdated + 1                         selectedKeys.add(ski);                         numKeysUpdated++;                     }                 }             }         }         // 傳回 numKeysUpdated         return numKeysUpdated;     }     +----SocketChannelImpl.java     public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {         int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes         int oldOps = sk.nioReadyOps();         int newOps = initialOps;         if ((ops & PollArrayWrapper.POLLNVAL) != 0) {             return false;         }         if ((ops & (PollArrayWrapper.POLLERR                     | PollArrayWrapper.POLLHUP)) != 0) {             newOps = intOps;             sk.nioReadyOps(newOps);             // No need to poll again in checkConnect,             // the error will be detected there             readyToConnect = true;             return (newOps & ~oldOps) != 0;         }         /*           * 轉換事件          */         if (((ops & PollArrayWrapper.POLLIN) != 0) &&             ((intOps & SelectionKey.OP_READ) != 0) &&             (state == ST_CONNECTED))             newOps |= SelectionKey.OP_READ;         if (((ops & PollArrayWrapper.POLLCONN) != 0) &&             ((intOps & SelectionKey.OP_CONNECT) != 0) &&             ((state == ST_UNCONNECTED) || (state == ST_PENDING))) {             newOps |= SelectionKey.OP_CONNECT;             readyToConnect = true;         }         if (((ops & PollArrayWrapper.POLLOUT) != 0) &&             ((intOps & SelectionKey.OP_WRITE) != 0) &&             (state == ST_CONNECTED))             newOps |= SelectionKey.OP_WRITE;         // 設定事件         sk.nioReadyOps(newOps);         // 如果新的就緒事件和老的就緒事件不相同,則傳回true,否則傳回 false         return (newOps & ~oldOps) != 0;     }           

上面就是步驟5的邏輯了,簡單總結一下。首先是擷取就緒通道數量,然後再擷取這些就緒通道對應的檔案描述符 fd,以及就緒事件集合 rOps。之後調用 translateAndSetReadyOps 轉換并設定就緒事件集合。最後,将選擇鍵添加到 selectedKeys 集合中,并累加 numKeysUpdated 值,之後傳回該值。

以上就是選擇過程的代碼講解,貼了不少代碼,可能不太好了解。Java NIO 和作業系統接口關聯比較大,是以在學習 NIO 相關原理時,也應該去了解諸如 epoll 等系統調用的知識。沒有這些背景知識,很多東西看起來不太好懂。好了,本節到此結束。

2.5 模闆代碼

使用 NIO 選擇器程式設計時,主幹代碼的結構一般比較固定。是以把主幹代碼寫好後,就可以往裡填業務代碼了。下面貼一個服務端的模闆代碼,如下:

ServerSocketChannel ssc = ServerSocketChannel.open();     ssc.socket().bind(new InetSocketAddress("localhost", 8080));     ssc.configureBlocking(false);     Selector selector = Selector.open();     ssc.register(selector, SelectionKey.OP_ACCEPT);     while(true) {         int readyNum = selector.select();         if (readyNum == 0) {             continue;         }         Set<SelectionKey> selectedKeys = selector.selectedKeys();         Iterator<SelectionKey> it = selectedKeys.iterator();         while(it.hasNext()) {             SelectionKey key = it.next();             if(key.isAcceptable()) {                 // 接受連接配接             } else if (key.isReadable()) {                 // 通道可讀             } else if (key.isWritable()) {                 // 通道可寫             }             it.remove();         }     }           

2.6 執行個體示範

原本打算将示例示範的代碼放在本節中展示,奈何文章篇幅已經很大了,是以決定把本節的内容獨立成文。在下一篇文章中,我将會示範使用 Java NIO 完成一個簡單的 HTTP 伺服器。這裡先貼張效果圖,如下:

Java NIO之選擇器

3.總結

到這裡,本文差不多就要結束了。原本隻是打算簡單說說 Selector 的用法,然後再寫一份執行個體代碼。但是後來發現這樣寫顯得比較空洞,沒什麼深度。是以後來翻了一下 Selector 的源碼,大緻了解了 Selector 的邏輯,然後就有了上面的分析。不過 Selector 的邏輯并不止我上面所說的那些,還有一些内容我現在還沒看,是以就沒有講。對于已寫出來的分析,由于我個人水準有限,難免會有錯誤。如果有錯誤,也歡迎大家指出來,共同進步!

好了,本文到此結束,感謝大家的閱讀。

參考

  • Java NIO Selector - jenkov.com
  • Java NIO(6): Selector - 知乎

附錄

文中貼的一些代碼是沒有包含在 JDK src.zip 包裡的,這裡單獨列舉出來,友善大家查找。

檔案名 路徑
DefaultSelectorProvider.java jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java
EPollSelectorProvider.java jdk/src/solaris/classes/sun/nio/ch/EPollSelectorProvider.java
SelectorImpl.java jdk/src/share/classes/sun/nio/ch/SelectorImpl.java
EPollSelectorImpl.java jdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java
EPollArrayWrapper.java jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java
SelectionKeyImpl.java jdk/src/share/classes/sun/nio/ch/SelectionKeyImpl.java
SocketChannelImpl.java jdk/src/share/classes/sun/nio/ch/SocketChannelImpl.java
EPollArrayWrapper.c jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c

本文在知識共享許可協定 4.0 下釋出,轉載需在明顯位置處注明出處

作者:coolblog

本文同步釋出在我的個人部落格:http://www.coolblog.xyz/?r=cb

Java NIO之選擇器

本作品采用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協定進行許可。