天天看點

談談java selector的機制

1)常用資料結構

談談java selector的機制
  • EPollSelectorImpl
    • 維護了3個鍵set
      • keys【已經注冊的鍵的集合】
      • selectedKeys【已選擇的鍵的集合】
      • cancelledKeys【已取消的鍵的集合】
  • EPollArrayWrapper【真正的對linux epoll的封裝】
    • 包含了3個重要的native方法epollCreate、epollCtl、epollWait分别對應庫函數epoll_create、epoll_ctl、epoll_wait
    • 一個native執行個體pollArray模拟庫中struct epoll_event
  • SelectionKeyImpl
    • 表示了一個特定的通道對象和一個特定的選擇器對象之間的注冊 關系
    • 包含兩個以整數形式進行編碼的比特掩碼
      • 關心的操作(interestOps)
      • 表示通道準備好要執行的操作(readyOps)
  • SocketChannelImpl/ServerSocketChannelImpl: 分别是連接配接socket和監聽socket

2)一段代碼

有了上面的概念,接下來看一段簡單的代碼,采用select實作的echo server

public class SelectTest {

	public static int PORT_NUMBER = 1234;

	public static void main(String args[]) throws Exception {
		new SelectTest().go();
	}

	public void go() throws Exception {
		int port = PORT_NUMBER;

		ServerSocketChannel serverChannel = ServerSocketChannel.open();
		ServerSocket serverSocket = serverChannel.socket();
		// Set the port the server channel will listen to
		serverSocket.bind(new InetSocketAddress(port));
		// Set nonblocking mode for the listening socket
		serverChannel.configureBlocking(false);

		Selector selector = Selector.open();

		// Register the ServerSocketChannel with the Selector
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
		while (true) {
			// This may block for a long time. Upon returning, the // selected
			// set contains keys of the ready channels.
			int n = selector.select();
			if (n == 0) {
				// nothing to do
				continue;
			}

			// Get an iterator over the set of selected keys
			Iterator<SelectionKey> it = selector.selectedKeys().iterator();
			// Look at each key in the selected set while (it.hasNext()) {
			SelectionKey key = (SelectionKey) it.next();

			// step1 Is a new connection coming in?
			if (key.isAcceptable()) {
				ServerSocketChannel server = (ServerSocketChannel) key
						.channel();
				SocketChannel channel = server.accept();
				registerChannel(selector, channel, SelectionKey.OP_READ);
				sayHello(channel);
			}

			// step2 Is there data to read on this channel?
			if (key.isReadable()) {
				readDataFromSocket(key);
			}

			// step3 Remove key from selected set; it's been handled
			it.remove();

		}

	}

	protected void registerChannel(Selector selector,
			SelectableChannel channel, int ops) throws Exception {
		if (channel == null) {

			return; // could happen
		}
		// Set the new channel nonblocking
		channel.configureBlocking(false);
		// Register it with the selector
		channel.register(selector, ops);
	}

	// Use the same byte buffer for all channels. A single thread is //
	// servicing all the channels, so no danger of concurrent acccess.
	private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

	protected void readDataFromSocket(SelectionKey key) throws Exception {
		SocketChannel socketChannel = (SocketChannel) key.channel();
		int count;
		buffer.clear(); // Empty buffer
		// Loop while data is available; channel is nonblocking
		while ((count = socketChannel.read(buffer)) > 0) {
			buffer.flip(); // Make buffer readable
			// Send the data; don't assume it goes all at once
			while (buffer.hasRemaining()) {
				socketChannel.write(buffer);
			}
			// WARNING: the above loop is evil. Because
			// it's writing back to the same nonblocking
			// channel it read the data from, this code can
			// potentially spin in a busy loop. In real life
			// you'd do something more useful than this.
			buffer.clear(); // Empty buffer

		}
		if (count < 0) {
			// Close channel on EOF, invalidates the key
			socketChannel.close();
		}
	}

	private void sayHello(SocketChannel channel) throws Exception {
		buffer.clear();
		buffer.put("Hi there!\r\n".getBytes());
		buffer.flip();
		channel.write(buffer);
	}
}      

這段代碼摘自JAVA NIO這本書,代碼做了很簡單的幾件事

  • 起一個server socket監聽子1234端口
  • 起一個selector
  • 将server socket注冊到epoll,感興趣的事件為SelectionKey.OP_ACCEPT,即來了新的連接配接
  • 開始一個輪詢過程,不斷的通過selector來探測到底有沒有新的網絡事件
  • 如果有監聽事件,那麼取出連接配接socket,然後給将這個連接配接socket注冊到epoll,感興趣的事件為​ SelectionKey.OP_READ
  • 如果有讀事件,那麼就把這個讀的内容寫回
  • 注意出于示範的目的,沒有注冊寫事件,這樣的話會導緻一個問題就如上面注釋中提到的邪惡代碼

​3) selector的機制

selector最關鍵的上個點就是初始化/注冊/以及select過程,以上面的代碼為例,分别說明這3個關鍵點

3.1)selector的初始化

 Selector selector = Selector.open();

  • 根據作業系統執行個體化不同Selector(通常見sun.nio.ch.DefaultSelectorProvider.create())
  • 常見的Linux且 kernels >= 2.6,會使用sun.nio.ch.EPollSelectorImpl
  • 執行個體化EPollSelectorImpl
  • 執行個體化EPollArrayWrapper
    • 調用epollCreate産生epoll FD
    • 執行個體化AllocatedNativeObject,得到上文提到的pollArray

3.2)注冊

    ServerSocketChannel.register​

  • 如果該通道曾經注冊過那麼
    • SelectionKeyImpl.interestOps[SelectionKey.OP_ACCEPT] -->
      • SelectionKeyImpl.nioInterestOps[SelectionKey.OP_ACCEPT]-->
        • ServerSocketChannelImpl.translateAndSetInterestOps[SelectionKey.OP_ACCEPT]--> :将SelectionKey.OP_ACCEPT轉化為PollArrayWrapper.POLLIN
          • EPollSelectorImpl.putEventOps[PollArrayWrapper.POLLIN]-->
            • EPollArrayWrapper.setInterest[fd,PollArrayWrapper.POLLIN] :加入updateList
  • 如果沒有注冊過
    • EPollSelectorImpl.register-->:僅僅是将key所對應的fd加入epoll
      • EPollSelectorImpl.implRegister-->
        • EPollArrayWrapper.add:加入updateList
        • 将該key加入到keys集合中
    • SelectionKeyImpl.interestOps:調用棧見上面,功能和上面一樣就是更新fd感興趣的事件
  • 抛開上面的代碼細節,注冊會
    • 往EPollArrayWrapper的updateList添加記錄,updateList會在select的時候使用
    • 如果沒有注冊過,會将該key加入到keys集合中即所有注冊過的key都會在keys中,除非以後取消掉了
    • 應用這邊感興趣的事件為
      • SelectionKey.OP_READ
      • SelectionKey.OP_WRITE
      • SelectionKey.OP_CONNECT
      • SelectionKey.OP_ACCEPT
    • 底層的epoll接受的事件
      • PollArrayWrapper.POLLIN
      • PollArrayWrapper.POLLOUT
      • PollArrayWrapper.POLLERR
      • PollArrayWrapper.POLLHUP
      • PollArrayWrapper.POLLNVAL
      • PollArrayWrapper.POLLREMOVE​
    • 由于存在上面提到的兩種事件類型:應用級别和系統(epoll)級别,是以需要轉換一下,見SocketChannelImpl及ServerSocketChannelImpl的translateAndSetInterestOps和translateReadyOps方法,前者是将應用-->系統,後者是系統->應用
    • 注意到上面的注冊實際上分兩步
      • ​​現将key轉換成一個内部資料結構EPollArrayWrapper$Updator添加到updateList,此時事件為空
      • 再更新EPollArrayWrapper$Updator的事件為感興趣的事件
      • 為什麼要分兩步??

3.3)selector.select()

  • EPollSelectorImpl.doSelect
    • 登出cancelledKeys【已取消的鍵的集合】中的key
    • EPollArrayWrapper.poll
      • EPollArrayWrapper.updateRegistrations:周遊上面的updateList,調用epollCtl真正到向epoll fd注冊
      • 調用epollWait等待事件發生,可能會阻塞,傳回更新的事件
      • 此時telnet 127.0.0.1 1234發起連接配接,上面的方法傳回
    • 再次登出cancelledKeys【已取消的鍵的集合】中的key
    • EPollSelectorImpl.updateSelectedKeys
      • 如果selectedKeys【已選擇的鍵的集合】包含該鍵
        • ServerSocketChannelImpl.translateReadyOps
          • 将PollArrayWrapper.POLLIN轉化為SelectionKey.OP_ACCEPT
          • 更新readyOps
        • 判斷老的readyOps是否和新的readyOps,如果不一緻事件數+1
      • 如果selectedKeys【已選擇的鍵的集合】不包含該鍵
        • ServerSocketChannelImpl.translateReadyOps
        • 将該鍵填入selectedKeys
        • 事件數+1
  • 如果事件數>1,周遊事件對應的key,開始相應處理。。。
  • Java NIO這本書提到“一旦鍵 被放置于選擇器的已選擇的鍵的集合中,它的 ready 集合(即readyOps)将是累積的。比特位隻會被設定,不會被 清理”,實際并非如此,就我的了解和觀察的結果,readyOps并不會積累而是每次更新,不明白為什麼作者會這麼說???,無論如何對整體的程式了解不會有影響
  • java使用epoll預設會使用水準觸發,即如果有事件發生,如果你不處理,那麼下次還會觸發
  • 但經過java 中EPollSelectorImpl實作之後有了小小的變化,如果某個事件發生,你不做任何處理,那麼下次調用select的時候,雖然底層epoll仍然會傳回事件,但上面的代碼會判斷本次事件和上次事件是否一緻,如果是一樣,java認為沒有事件發生,如果要做到一緻,必須将selectedKeys中的key删掉,否則會有差别,是以請注意selectedKeys删除的重要性!否則會死循環!
  • 上面已經解釋了ServerSocketChannel的注冊,以及select過程,代碼樣例還有SocketChannel的注冊和select,其實和前面很雷同就不再贅述了

繼續閱讀