1.簡介
前面的文章說了緩沖區,說了通道,本文就來說說 NIO 中另一個重要的實作,即選擇器 Selector。在更早的文章中,我簡述了幾種 IO 模型。如果大家看過之前的文章,并動手寫過代碼的話。再看 Java 的選擇器大概就會知道它是什麼了,以及怎麼用了。選擇器是 Java 多路複用模型的一個實作,可以同時監控多個非阻塞套接字通道。示意圖大緻如下:

如果大家了解過多路複用模型,那應該也會知道幾種複用模型的實作。比如 select,poll 以及 Linux 下的 epoll 和 BSD 下的 kqueue。Java 的選擇器并非憑空創造,而是在底層作業系統提供的接口的基礎上封裝而來。相關的細節,我随後會進行分析。
關于 Java 選擇器的簡介這裡先說到這,接下來進入正題。
2.基本操作及實作
本章我将對 Selector 的建立,通道的注冊,Selector 的選擇過程進行分析。内容篇幅較大,希望大家耐心看完。由于 Selector 相關類在不同作業系統下的實作是不同的,加之個人對 Linux epoll 更為熟悉,是以本文所分析的源碼也是和 epoll 相關的。好了,進入正題吧。
2.1 建立選擇器
選擇器 Selector 是一個抽象類,是以不能直接建立。Selector 提供了一個 open 方法,通過 open 方法既可以建立選擇器執行個體。示例代碼如下:
Selector selector = Selector.open();
上面的代碼比較簡單,隻有一行。不過不要被表象迷惑,這行代碼僅是完整實作的冰山一角,更複雜的邏輯則隐藏在水面之下。
在簡介一節,我已經說了 Java 選擇器是對底層多路複用接口的一個包裝,這裡的 open 方法也不例外。假設我們的 Java 運作在 Linux 平台下,那麼 open 最終所做的事情應該是調用作業系統的
epoll_create
函數,用于建立 epoll 執行個體。真實情況是不是如此呢?答案就在冰山深處,接下來就讓我們一起去求索吧。下面我們将沿着 open 方法一路走下去,如下:
public abstract class Selector implements Closeable { public static Selector open() throws IOException { // 建立 SelectorProvider,再通過其 openSelector 方法建立 Selector return SelectorProvider.provider().openSelector(); } // 省略無關代碼 } public abstract class SelectorProvider { public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; // 建立預設的 SelectorProvider provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } } } public class DefaultSelectorProvider { private DefaultSelectorProvider() { } /** * 根據系統名稱建立相應的 SelectorProvider */ public static SelectorProvider create() { String osname = AccessController .doPrivileged(new GetPropertyAction("os.name")); if (osname.equals("SunOS")) return createProvider("sun.nio.ch.DevPollSelectorProvider"); if (osname.equals("Linux")) return createProvider("sun.nio.ch.EPollSelectorProvider"); // return new sun.nio.ch.PollSelectorProvider(); } /** * 加載 SelectorProvider 類,并建立執行個體 */ @SuppressWarnings("unchecked") private static SelectorProvider createProvider(String cn) { Class<SelectorProvider> c; try { c = (Class<SelectorProvider>)Class.forName(cn); } catch (ClassNotFoundException x) { throw new AssertionError(x); } try { return c.newInstance(); } catch (IllegalAccessException | InstantiationException x) { throw new AssertionError(x); } } } /** * 建立完 SelectorProvider,接下來要調用 openSelector 方法 * 建立 Selector 的繼承類了。 */ public class EPollSelectorProvider extends SelectorProviderImpl { public AbstractSelector openSelector() throws IOException { return new EPollSelectorImpl(this); } } class EPollSelectorImpl extends SelectorImpl { EPollSelectorImpl(SelectorProvider sp) throws IOException { // 調用父類構造方法 super(sp); long pipeFds = IOUtil.makePipe(false); fd0 = (int) (pipeFds >>> 32); fd1 = (int) pipeFds; // 建立 EPollArrayWrapper,EPollArrayWrapper 是一個重要的實作 pollWrapper = new EPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); fdToKey = new HashMap<>(); } } public abstract class SelectorImpl extends AbstractSelector { protected SelectorImpl(SelectorProvider sp) { super(sp); keys = new HashSet<SelectionKey>(); selectedKeys = new HashSet<SelectionKey>(); /* 初始化 publicKeys 和 publicSelectedKeys, * publicKeys 即 selector.keys() 方法所傳回的集合, * publicSelectedKeys 則是 selector.selectedKeys() 方法傳回的集合 */ if (Util.atBugLevel("1.4")) { publicKeys = keys; publicSelectedKeys = selectedKeys; } else { publicKeys = Collections.unmodifiableSet(keys); publicSelectedKeys = Util.ungrowableSet(selectedKeys); } } } /** * EPollArrayWrapper 一個重要的實作,這一層再往下就是 C 代碼了 */ class EPollArrayWrapper { EPollArrayWrapper() throws IOException { // 調用 epollCreate 方法建立 epoll 檔案描述符 epfd = epollCreate(); // the epoll_event array passed to epoll_wait // 初始化 pollArray,該對象用于存儲就緒檔案描述符和事件 int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; pollArray = new AllocatedNativeObject(allocationSize, true); pollArrayAddress = pollArray.address(); // eventHigh needed when using file descriptors > 64k if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE) eventsHigh = new HashMap<>(); } // epollCreate 方法是 native 類型的 private native int epollCreate(); }
以上代碼時 Java 層面的,Java 層調用棧最下面的類是 EPollArrayWrapper(源碼路徑可以在附錄中查找)。EPollArrayWrapper 是一個重要的實作,起着承上啟下的作用。上層是 Java 代碼,下層是 C 代碼。上層的代碼看完了,接下來看看冰山深處的 C 代碼:
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this) { // 調用 epoll_create 函數建立 epoll 執行個體,并傳回檔案描述符 epfd int epfd = epoll_create(256); if (epfd < 0) { JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed"); } return epfd; }
上面的代碼很簡單,僅做了建立 epoll 執行個體這一件事。看到這裡,答案就明了了。最後在附一張時序圖幫助大家理清代碼調用順序,如下:
2.2 選擇鍵
2.2.1 幾種事件
選擇鍵 SelectionKey 包含4種事件,分别是:
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;
事件之間可以通過或運算進行組合,比如:
int interestOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
2.2.2 兩種事件集合:interestOps 和 readyOps
interestOps 即感興趣的事件集合,通道調用 register 方法注冊時會設定此值,interestOps 可通過 SelectionKey interestOps() 方法擷取。readyOps 是就緒事件集合,可通過 SelectionKey readyOps() 擷取。
interestOps 和 readyOps 被聲明在 SelectionKey 子類 SelectionKeyImpl 中,代碼如下:
public class SelectionKeyImpl extends AbstractSelectionKey { private volatile int interestOps; private int readyOps; }
接下來再來看看與 readyOps 事件集合相關的幾個方法,如下:
selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
以上方法從字面意思上就可以知道有什麼用,這裡就不解釋了。接下來以 isReadable 方法為例,簡單看一下這個方法是如何實作。
public final boolean isReadable() { return (readyOps() & OP_READ) != 0; }
上面說到可以通過或運算組合事件,這裡則是通過與運算來測試某個事件是否在事件集合中。比如
readyOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE = 0101, readyOps & OP_READ = 0101 & 0001 = 0001, readyOps & OP_CONNECT = 0101 & 1000 = 0
readyOps & OP_READ != 0
,是以 OP_READ 在事件集合中。
readyOps & OP_CONNECT == 0
,是以 OP_CONNECT 不在事件集合中。
2.2.3 attach 方法
attach 是一個好用的方法,通過這個方法,可以将對象暫存在 SelectionKey 中,待需要的時候直接取出來即可。比如本文對應的練習代碼實作了一個簡單的 HTTP 伺服器,在讀取使用者請求資料後(即 selectionKey.isReadable() 為 true),會去解析請求頭,然後将請求頭資訊通過 attach 方法放入 selectionKey 中。待通道可寫後,再從 selectionKey 中取出請求頭,并根據請求頭回複用戶端不同的消息。當然,這隻是一個應用場景,attach 可能還有其他的應用場景,比如辨別通道。不過其他的場景我沒使用過,就不說了。attach 使用方式如下:
selectionKey.attach(obj); Object attachedObj = selectionKey.attachment();
2.3 通道注冊
通道注冊即将感興趣的事件告知 Selector,待事件發生時,Selector 即可傳回就緒事件,我們就可以去做後續的事情了。比如 ServerSocketChannel 通道通常對 OP_ACCEPT 事件感興趣,那麼我們就可以把這個事件注冊給 Selector。待事件發生,即服務端接受用戶端連接配接後,我們即可擷取這個就緒的事件并做相應的操作。通道注冊的示例代碼如下:
channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
起初我以為通道注冊操作會調用作業系統的 epoll_ctl 函數,但最終通過看源碼,發現自己的了解是錯的。既然通道注冊階段不調用 epoll_ctl 函數。那麼,epoll_ctl 什麼時候才會被調用呢?如果不調用 epoll_ctl,那麼注冊過程都幹了什麼事情呢?關于第一個問題,本節還無法解答,不過第二個問題則可以說說。接下來讓我們深入通道類 register 方法的調用棧中去探尋答案吧。
public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel { public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException { return register(sel, ops, null); } public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException; } public abstract class AbstractSelectableChannel extends SelectableChannel { private SelectionKey[] keys = null; public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { synchronized (regLock) { // 省去一些校驗代碼 // 從 keys 數組中查找,查找條件為 k.selector() == sel SelectionKey k = findKey(sel); // 如果 k 不為空,則修改 k 所感興趣的事件 if (k != null) { k.interestOps(ops); k.attach(att); } // k 為空,則建立一個 SelectionKey,并存儲到 keys 數組中 if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } } } public abstract class AbstractSelector extends Selector { protected abstract SelectionKey register(AbstractSelectableChannel ch, int ops, Object att); } public abstract class SelectorImpl extends AbstractSelector { protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); // 建立 SelectionKeyImpl 執行個體 SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized (publicKeys) { implRegister(k); } k.interestOps(ops); return k; } } class EPollSelectorImpl extends SelectorImpl { protected void implRegister(SelectionKeyImpl ski) { if (closed) throw new ClosedSelectorException(); SelChImpl ch = ski.channel; int fd = Integer.valueOf(ch.getFDVal()); // 存儲 fd 和 SelectionKeyImpl 的映射關系 fdToKey.put(fd, ski); pollWrapper.add(fd); // 将 SelectionKeyImpl 執行個體存儲到 keys 中(這裡的 keys 聲明在 SelectorImpl 類中),keys 集合可由 selector.keys() 方法擷取 keys.add(ski); } } public class SelectionKeyImpl extends AbstractSelectionKey { public SelectionKey interestOps(int ops) { ensureValid(); return nioInterestOps(ops); } public SelectionKey nioInterestOps(int ops) { if ((ops & ~channel().validOps()) != 0) throw new IllegalArgumentException(); // 轉換并設定感興趣的事件 channel.translateAndSetInterestOps(ops, this); // 設定 interestOps 變量 interestOps = ops; return this; } } class SocketChannelImpl extends SocketChannel implements SelChImpl { public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) { int newOps = 0; // 轉換事件 if ((ops & SelectionKey.OP_READ) != 0) newOps |= PollArrayWrapper.POLLIN; if ((ops & SelectionKey.OP_WRITE) != 0) newOps |= PollArrayWrapper.POLLOUT; if ((ops & SelectionKey.OP_CONNECT) != 0) newOps |= PollArrayWrapper.POLLCONN; // 設定事件 sk.selector.putEventOps(sk, newOps); } } class class EPollSelectorImpl extends SelectorImpl { public void putEventOps(SelectionKeyImpl ski, int ops) { if (closed) throw new ClosedSelectorException(); SelChImpl ch = ski.channel; // 設定感興趣的事件 pollWrapper.setInterest(ch.getFDVal(), ops); } } class EPollArrayWrapper { void setInterest(int fd, int mask) { synchronized (updateLock) { // 擴容 updateDescriptors 數組,并存儲檔案描述符 fd int oldCapacity = updateDescriptors.length; if (updateCount == oldCapacity) { int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE; int[] newDescriptors = new int[newCapacity]; System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity); updateDescriptors = newDescriptors; } updateDescriptors[updateCount++] = fd; // events are stored as bytes for efficiency reasons byte b = (byte)mask; assert (b == mask) && (b != KILLED); // 存儲事件 setUpdateEvents(fd, b, false); } } private void setUpdateEvents(int fd, byte events, boolean force) { if (fd < MAX_UPDATE_ARRAY_SIZE) { if ((eventsLow[fd] != KILLED) || force) { eventsLow[fd] = events; } } else { Integer key = Integer.valueOf(fd); if (!isEventsHighKilled(key) || force) { eventsHigh.put(key, Byte.valueOf(events)); } } } }
到 setUpdateEvents 這個方法,整個調用棧就結束了。但是我們并未在調用棧中看到調用 epoll_ctl 函數的地方,也就是說,通道注冊時,并不會立即調用 epoll_ctl,而是先将事件集合 events 存放在 eventsLow。至于 epoll_ctl 函數何時調用的,需要大家繼續往下看了。
2.4 選擇過程
2.4.1 選擇方法
Selector 包含3種不同功能的選擇方法,分别如下:
- int select()
- int select(long timeout)
- int selectNow()
select() 是一個阻塞方法,僅在至少一個通道處于就緒狀态時才傳回。
select(long timeout) 同樣也是阻塞方法,不過可對該方法設定逾時時間(timeout > 0),使得線程不會被一直阻塞。如果 timeout = 0,會一直阻塞線程。
selectNow() 為非阻塞方法,調用後立即傳回。
以上3個方法均傳回 int 類型值,表示每次調用 select 或 selectNow 方法後,新就緒通道的數量。如果某個通道在上一次調用 select 方法時就已經處于就緒狀态,但并未将該通道對應的 SelectionKey 對象從 selectedKeys 集合中移除。假設另一個的通道在本次調用 select 期間處于就緒狀态,此時,select 傳回1,而不是2。
2.4.2 選擇過程
選擇方法用起來雖然簡單,但方法之下隐藏的邏輯還是比較複雜的。大緻分為下面幾個步驟:
- 檢查已取消鍵集合 cancelledKeys 是否為空,不為空則将 cancelledKeys 的鍵從 keys 和 selectedKeys 中移除,并将鍵和通道登出。
- 調用作業系統的 epoll_ctl 函數将通道感興趣的事件注冊到 epoll 執行個體中
- 調用作業系統的 epoll_wait 函數監聽事件
- 再次執行步驟1
- 更新 selectedKeys 集合,并傳回就緒通道數量
上面五個步驟對應于 EPollSelectorImpl 類中 doSelect 方法的邏輯,如下:
protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); // 處理已取消鍵集合,對應步驟1 processDeregisterQueue(); try { begin(); // select 方法的核心,對應步驟2和3 pollWrapper.poll(timeout); } finally { end(); } // 處理已取消鍵集合,對應步驟4 processDeregisterQueue(); // 更新 selectedKeys 集合,并傳回就緒通道數量,對應步驟5 int numKeysUpdated = updateSelectedKeys(); if (pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; }
接下來,我們按照上面的步驟順序去分析代碼實作。先來看看步驟1對應的代碼:
+----SelectorImpl.java void processDeregisterQueue() throws IOException { // Precondition: Synchronized on this, keys, and selectedKeys Set<SelectionKey> cks = cancelledKeys(); synchronized (cks) { if (!cks.isEmpty()) { Iterator<SelectionKey> i = cks.iterator(); // 周遊 cancelledKeys,執行登出操作 while (i.hasNext()) { SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); try { // 執行登出邏輯 implDereg(ski); } catch (SocketException se) { throw new IOException("Error deregistering key", se); } finally { i.remove(); } } } } } +----EPollSelectorImpl.java protected void implDereg(SelectionKeyImpl ski) throws IOException { assert (ski.getIndex() >= 0); SelChImpl ch = ski.channel; int fd = ch.getFDVal(); // 移除 fd 和選擇鍵鍵的映射關系 fdToKey.remove(Integer.valueOf(fd)); // 從 epoll 執行個體中删除事件 pollWrapper.remove(fd); ski.setIndex(-1); // 從 keys 和 selectedKeys 中移除選擇鍵 keys.remove(ski); selectedKeys.remove(ski); // 登出選擇鍵 deregister((AbstractSelectionKey)ski); // 登出通道 SelectableChannel selch = ski.channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); }
上面的代碼代碼邏輯不是很複雜,首先是擷取 cancelledKeys 集合,然後周遊集合,并對每個選擇鍵及其對應的通道執行登出操作。接下來再來看看步驟2和3對應的代碼,如下:
+----EPollArrayWrapper.java int poll(long timeout) throws IOException { // 調用 epoll_ctl 函數注冊事件,對應步驟3 updateRegistrations(); // 調用 epoll_wait 函數等待事件發生,對應步驟4 updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); for (int i=0; i<updated; i++) { if (getDescriptor(i) == incomingInterruptFD) { interruptedIndex = i; interrupted = true; break; } } return updated; } /** * Update the pending registrations. */ private void updateRegistrations() { synchronized (updateLock) { int j = 0; while (j < updateCount) { // 擷取 fd 和 events,這兩個值在調用 register 方法時被存儲到數組中 int fd = updateDescriptors[j]; short events = getUpdateEvents(fd); boolean isRegistered = registered.get(fd); int opcode = 0; if (events != KILLED) { // 确定 opcode 的值 if (isRegistered) { opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; } else { opcode = (events != 0) ? EPOLL_CTL_ADD : 0; } if (opcode != 0) { // 注冊事件 epollCtl(epfd, opcode, fd, events); // 設定 fd 的注冊狀态 if (opcode == EPOLL_CTL_ADD) { registered.set(fd); } else if (opcode == EPOLL_CTL_DEL) { registered.clear(fd); } } } j++; } updateCount = 0; } // 下面兩個均是 native 方法 private native void epollCtl(int epfd, int opcode, int fd, int events); private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException; }
看到 updateRegistrations 方法的實作,大家現在知道 epoll_ctl 這個函數是在哪裡調用的了。在 3.2 節通道注冊的結尾給大家埋了一個疑問,這裡就是答案了。注冊通道實際上隻是先将事件收集起來,等調用 select 方法時,在一起通過 epoll_ctl 函數将事件注冊到 epoll 執行個體中。
上面 epollCtl 和 epollWait 方法是 native 類型的,接下來我們再來看看這兩個方法是如何實作的。如下:
+----EPollArrayWrapper.c JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events) { struct epoll_event event; int res; event.events = events; event.data.fd = fd; // 調用 epoll_ctl 注冊事件 RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res); if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) { JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed"); } } JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) { struct epoll_event *events = jlong_to_ptr(address); int res; if (timeout <= 0) { /* Indefinite or no wait */ // 調用 epoll_wait 等待事件 RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res); } else { /* Bounded wait; bounded restarts */ res = iepoll(epfd, events, numfds, timeout); } if (res < 0) { JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed"); } return res; }
上面的C代碼沒什麼複雜的邏輯,這裡就不多說了。如果大家對 epoll_ctl 和 epoll_wait 函數不了解,可以參考 Linux man-page。關于 epoll 的示例,也可以參考我的另一篇文章“基于epoll實作簡單的web伺服器”。
說完步驟2和3對應的代碼,接下來再來說說步驟4和5。由于步驟4和步驟1是一樣的,這裡不再贅述。最後再來說說步驟5的邏輯。代碼如下:
+----EPollSelectorImpl.java private int updateSelectedKeys() { int entries = pollWrapper.updated; int numKeysUpdated = 0; for (int i=0; i<entries; i++) { /* 從 pollWrapper 成員變量的 pollArray 中擷取檔案描述符, * pollArray 中的資料由 epoll_wait 設定 */ int nextFD = pollWrapper.getDescriptor(i); SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); // ski is null in the case of an interrupt if (ski != null) { // 從 pollArray 中擷取就緒事件集合 int rOps = pollWrapper.getEventOps(i); /* 如果 selectedKeys 已包含選擇鍵,則選擇鍵必須由新的事件發生時, * 才會将 numKeysUpdated + 1 */ if (selectedKeys.contains(ski)) { if (ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated++; } } else { // 轉換并設定就緒事件集合 ski.channel.translateAndSetReadyOps(rOps, ski); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { // 更新 selectedKeys 集合,并将 numKeysUpdated + 1 selectedKeys.add(ski); numKeysUpdated++; } } } } // 傳回 numKeysUpdated return numKeysUpdated; } +----SocketChannelImpl.java public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) { int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes int oldOps = sk.nioReadyOps(); int newOps = initialOps; if ((ops & PollArrayWrapper.POLLNVAL) != 0) { return false; } if ((ops & (PollArrayWrapper.POLLERR | PollArrayWrapper.POLLHUP)) != 0) { newOps = intOps; sk.nioReadyOps(newOps); // No need to poll again in checkConnect, // the error will be detected there readyToConnect = true; return (newOps & ~oldOps) != 0; } /* * 轉換事件 */ if (((ops & PollArrayWrapper.POLLIN) != 0) && ((intOps & SelectionKey.OP_READ) != 0) && (state == ST_CONNECTED)) newOps |= SelectionKey.OP_READ; if (((ops & PollArrayWrapper.POLLCONN) != 0) && ((intOps & SelectionKey.OP_CONNECT) != 0) && ((state == ST_UNCONNECTED) || (state == ST_PENDING))) { newOps |= SelectionKey.OP_CONNECT; readyToConnect = true; } if (((ops & PollArrayWrapper.POLLOUT) != 0) && ((intOps & SelectionKey.OP_WRITE) != 0) && (state == ST_CONNECTED)) newOps |= SelectionKey.OP_WRITE; // 設定事件 sk.nioReadyOps(newOps); // 如果新的就緒事件和老的就緒事件不相同,則傳回true,否則傳回 false return (newOps & ~oldOps) != 0; }
上面就是步驟5的邏輯了,簡單總結一下。首先是擷取就緒通道數量,然後再擷取這些就緒通道對應的檔案描述符 fd,以及就緒事件集合 rOps。之後調用 translateAndSetReadyOps 轉換并設定就緒事件集合。最後,将選擇鍵添加到 selectedKeys 集合中,并累加 numKeysUpdated 值,之後傳回該值。
以上就是選擇過程的代碼講解,貼了不少代碼,可能不太好了解。Java NIO 和作業系統接口關聯比較大,是以在學習 NIO 相關原理時,也應該去了解諸如 epoll 等系統調用的知識。沒有這些背景知識,很多東西看起來不太好懂。好了,本節到此結束。
2.5 模闆代碼
使用 NIO 選擇器程式設計時,主幹代碼的結構一般比較固定。是以把主幹代碼寫好後,就可以往裡填業務代碼了。下面貼一個服務端的模闆代碼,如下:
ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress("localhost", 8080)); ssc.configureBlocking(false); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); while(true) { int readyNum = selector.select(); if (readyNum == 0) { continue; } Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); while(it.hasNext()) { SelectionKey key = it.next(); if(key.isAcceptable()) { // 接受連接配接 } else if (key.isReadable()) { // 通道可讀 } else if (key.isWritable()) { // 通道可寫 } it.remove(); } }
2.6 執行個體示範
原本打算将示例示範的代碼放在本節中展示,奈何文章篇幅已經很大了,是以決定把本節的内容獨立成文。在下一篇文章中,我将會示範使用 Java NIO 完成一個簡單的 HTTP 伺服器。這裡先貼張效果圖,如下:
3.總結
到這裡,本文差不多就要結束了。原本隻是打算簡單說說 Selector 的用法,然後再寫一份執行個體代碼。但是後來發現這樣寫顯得比較空洞,沒什麼深度。是以後來翻了一下 Selector 的源碼,大緻了解了 Selector 的邏輯,然後就有了上面的分析。不過 Selector 的邏輯并不止我上面所說的那些,還有一些内容我現在還沒看,是以就沒有講。對于已寫出來的分析,由于我個人水準有限,難免會有錯誤。如果有錯誤,也歡迎大家指出來,共同進步!
好了,本文到此結束,感謝大家的閱讀。
參考
- Java NIO Selector - jenkov.com
- Java NIO(6): Selector - 知乎
附錄
文中貼的一些代碼是沒有包含在 JDK src.zip 包裡的,這裡單獨列舉出來,友善大家查找。
檔案名 | 路徑 |
---|---|
DefaultSelectorProvider.java | jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java |
EPollSelectorProvider.java | jdk/src/solaris/classes/sun/nio/ch/EPollSelectorProvider.java |
SelectorImpl.java | jdk/src/share/classes/sun/nio/ch/SelectorImpl.java |
EPollSelectorImpl.java | jdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java |
EPollArrayWrapper.java | jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java |
SelectionKeyImpl.java | jdk/src/share/classes/sun/nio/ch/SelectionKeyImpl.java |
SocketChannelImpl.java | jdk/src/share/classes/sun/nio/ch/SocketChannelImpl.java |
EPollArrayWrapper.c | jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c |
本文在知識共享許可協定 4.0 下釋出,轉載需在明顯位置處注明出處
作者:coolblog
本文同步釋出在我的個人部落格:http://www.coolblog.xyz/?r=cb
本作品采用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協定進行許可。