1)常用資料結構

- EPollSelectorImpl
-
- 維護了3個鍵set
-
- keys【已經注冊的鍵的集合】
- selectedKeys【已選擇的鍵的集合】
- cancelledKeys【已取消的鍵的集合】
- EPollArrayWrapper【真正的對linux epoll的封裝】
-
- 包含了3個重要的native方法epollCreate、epollCtl、epollWait分别對應庫函數epoll_create、epoll_ctl、epoll_wait
- 一個native執行個體pollArray模拟庫中struct epoll_event
- SelectionKeyImpl
-
- 表示了一個特定的通道對象和一個特定的選擇器對象之間的注冊 關系
- 包含兩個以整數形式進行編碼的比特掩碼
-
- 關心的操作(interestOps)
- 表示通道準備好要執行的操作(readyOps)
- SocketChannelImpl/ServerSocketChannelImpl: 分别是連接配接socket和監聽socket
2)一段代碼
有了上面的概念,接下來看一段簡單的代碼,采用select實作的echo server
public class SelectTest {
public static int PORT_NUMBER = 1234;
public static void main(String args[]) throws Exception {
new SelectTest().go();
}
public void go() throws Exception {
int port = PORT_NUMBER;
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverChannel.socket();
// Set the port the server channel will listen to
serverSocket.bind(new InetSocketAddress(port));
// Set nonblocking mode for the listening socket
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
// Register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// This may block for a long time. Upon returning, the // selected
// set contains keys of the ready channels.
int n = selector.select();
if (n == 0) {
// nothing to do
continue;
}
// Get an iterator over the set of selected keys
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
// Look at each key in the selected set while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// step1 Is a new connection coming in?
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
SocketChannel channel = server.accept();
registerChannel(selector, channel, SelectionKey.OP_READ);
sayHello(channel);
}
// step2 Is there data to read on this channel?
if (key.isReadable()) {
readDataFromSocket(key);
}
// step3 Remove key from selected set; it's been handled
it.remove();
}
}
protected void registerChannel(Selector selector,
SelectableChannel channel, int ops) throws Exception {
if (channel == null) {
return; // could happen
}
// Set the new channel nonblocking
channel.configureBlocking(false);
// Register it with the selector
channel.register(selector, ops);
}
// Use the same byte buffer for all channels. A single thread is //
// servicing all the channels, so no danger of concurrent acccess.
private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
protected void readDataFromSocket(SelectionKey key) throws Exception {
SocketChannel socketChannel = (SocketChannel) key.channel();
int count;
buffer.clear(); // Empty buffer
// Loop while data is available; channel is nonblocking
while ((count = socketChannel.read(buffer)) > 0) {
buffer.flip(); // Make buffer readable
// Send the data; don't assume it goes all at once
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// WARNING: the above loop is evil. Because
// it's writing back to the same nonblocking
// channel it read the data from, this code can
// potentially spin in a busy loop. In real life
// you'd do something more useful than this.
buffer.clear(); // Empty buffer
}
if (count < 0) {
// Close channel on EOF, invalidates the key
socketChannel.close();
}
}
private void sayHello(SocketChannel channel) throws Exception {
buffer.clear();
buffer.put("Hi there!\r\n".getBytes());
buffer.flip();
channel.write(buffer);
}
}
這段代碼摘自JAVA NIO這本書,代碼做了很簡單的幾件事
- 起一個server socket監聽子1234端口
- 起一個selector
- 将server socket注冊到epoll,感興趣的事件為SelectionKey.OP_ACCEPT,即來了新的連接配接
- 開始一個輪詢過程,不斷的通過selector來探測到底有沒有新的網絡事件
- 如果有監聽事件,那麼取出連接配接socket,然後給将這個連接配接socket注冊到epoll,感興趣的事件為 SelectionKey.OP_READ
- 如果有讀事件,那麼就把這個讀的内容寫回
- 注意出于示範的目的,沒有注冊寫事件,這樣的話會導緻一個問題就如上面注釋中提到的邪惡代碼
3) selector的機制
selector最關鍵的上個點就是初始化/注冊/以及select過程,以上面的代碼為例,分别說明這3個關鍵點
3.1)selector的初始化
Selector selector = Selector.open();
- 根據作業系統執行個體化不同Selector(通常見sun.nio.ch.DefaultSelectorProvider.create())
- 常見的Linux且 kernels >= 2.6,會使用sun.nio.ch.EPollSelectorImpl
- 執行個體化EPollSelectorImpl
- 執行個體化EPollArrayWrapper
-
- 調用epollCreate産生epoll FD
- 執行個體化AllocatedNativeObject,得到上文提到的pollArray
3.2)注冊
ServerSocketChannel.register
- 如果該通道曾經注冊過那麼
-
- SelectionKeyImpl.interestOps[SelectionKey.OP_ACCEPT] -->
-
- SelectionKeyImpl.nioInterestOps[SelectionKey.OP_ACCEPT]-->
-
- ServerSocketChannelImpl.translateAndSetInterestOps[SelectionKey.OP_ACCEPT]--> :将SelectionKey.OP_ACCEPT轉化為PollArrayWrapper.POLLIN
-
- EPollSelectorImpl.putEventOps[PollArrayWrapper.POLLIN]-->
-
- EPollArrayWrapper.setInterest[fd,PollArrayWrapper.POLLIN] :加入updateList
- 如果沒有注冊過
-
- EPollSelectorImpl.register-->:僅僅是将key所對應的fd加入epoll
-
- EPollSelectorImpl.implRegister-->
-
- EPollArrayWrapper.add:加入updateList
- 将該key加入到keys集合中
- SelectionKeyImpl.interestOps:調用棧見上面,功能和上面一樣就是更新fd感興趣的事件
- 抛開上面的代碼細節,注冊會
-
- 往EPollArrayWrapper的updateList添加記錄,updateList會在select的時候使用
- 如果沒有注冊過,會将該key加入到keys集合中即所有注冊過的key都會在keys中,除非以後取消掉了
- 應用這邊感興趣的事件為
-
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- 底層的epoll接受的事件
-
- PollArrayWrapper.POLLIN
- PollArrayWrapper.POLLOUT
- PollArrayWrapper.POLLERR
- PollArrayWrapper.POLLHUP
- PollArrayWrapper.POLLNVAL
- PollArrayWrapper.POLLREMOVE
- 由于存在上面提到的兩種事件類型:應用級别和系統(epoll)級别,是以需要轉換一下,見SocketChannelImpl及ServerSocketChannelImpl的translateAndSetInterestOps和translateReadyOps方法,前者是将應用-->系統,後者是系統->應用
- 注意到上面的注冊實際上分兩步
-
- 現将key轉換成一個内部資料結構EPollArrayWrapper$Updator添加到updateList,此時事件為空
- 再更新EPollArrayWrapper$Updator的事件為感興趣的事件
- 為什麼要分兩步??
3.3)selector.select()
- EPollSelectorImpl.doSelect
-
- 登出cancelledKeys【已取消的鍵的集合】中的key
- EPollArrayWrapper.poll
-
- EPollArrayWrapper.updateRegistrations:周遊上面的updateList,調用epollCtl真正到向epoll fd注冊
- 調用epollWait等待事件發生,可能會阻塞,傳回更新的事件
- 此時telnet 127.0.0.1 1234發起連接配接,上面的方法傳回
- 再次登出cancelledKeys【已取消的鍵的集合】中的key
- EPollSelectorImpl.updateSelectedKeys
-
- 如果selectedKeys【已選擇的鍵的集合】包含該鍵
-
- ServerSocketChannelImpl.translateReadyOps
-
- 将PollArrayWrapper.POLLIN轉化為SelectionKey.OP_ACCEPT
- 更新readyOps
- 判斷老的readyOps是否和新的readyOps,如果不一緻事件數+1
- 如果selectedKeys【已選擇的鍵的集合】不包含該鍵
-
- ServerSocketChannelImpl.translateReadyOps
- 将該鍵填入selectedKeys
- 事件數+1
- 如果事件數>1,周遊事件對應的key,開始相應處理。。。
- Java NIO這本書提到“一旦鍵 被放置于選擇器的已選擇的鍵的集合中,它的 ready 集合(即readyOps)将是累積的。比特位隻會被設定,不會被 清理”,實際并非如此,就我的了解和觀察的結果,readyOps并不會積累而是每次更新,不明白為什麼作者會這麼說???,無論如何對整體的程式了解不會有影響
- java使用epoll預設會使用水準觸發,即如果有事件發生,如果你不處理,那麼下次還會觸發
- 但經過java 中EPollSelectorImpl實作之後有了小小的變化,如果某個事件發生,你不做任何處理,那麼下次調用select的時候,雖然底層epoll仍然會傳回事件,但上面的代碼會判斷本次事件和上次事件是否一緻,如果是一樣,java認為沒有事件發生,如果要做到一緻,必須将selectedKeys中的key删掉,否則會有差别,是以請注意selectedKeys删除的重要性!否則會死循環!
- 上面已經解釋了ServerSocketChannel的注冊,以及select過程,代碼樣例還有SocketChannel的注冊和select,其實和前面很雷同就不再贅述了