天天看點

JAVA 非阻塞IO原理

1.  基本概念

IO是主存和外部裝置(硬碟、終端和網絡等)傳輸資料的過程。IO是作業系統的底層功能實作,底層通過I/O指令進行完成。

2.  nio簡介

nio是java New IO的簡稱(并不隻是指非阻塞IO),在jdk1.4裡提供的新api。Sun官方标榜的特性如下:

–   為所有的原始類型提供(Buffer)緩存支援。

–   字元集編碼解碼解決方案。

–   Channel:一個新的原始I/O抽象。

–   支援鎖和記憶體映射檔案的檔案通路接口。

–   提供多路(non-bloking)非阻塞式的高伸縮性網絡I/O。

詳細介紹可見 http://www.iteye.com/topic/834447

3.   非阻塞 IO

何為阻塞、何為非阻塞,非阻塞原理。

何為阻塞?

一個常見的網絡IO通訊流程如下:

JAVA 非阻塞IO原理

何為非阻塞?

下面有個隐喻:

一輛從 A 開往 B 的公共汽車上,路上有很多點可能會有人下車。司機不知道哪些點會有哪些人會下車,對于需要下車的人,如何處理更好?

1. 司機過程中定時詢問每個乘客是否到達目的地,若有人說到了,那麼司機停車,乘客下車。 ( 類似阻塞式 )

2. 每個人告訴售票員自己的目的地,然後睡覺,司機隻和售票員互動,到了某個點由售票員通知乘客下車。 ( 類似非阻塞 )

很顯然,每個人要到達某個目的地可以認為是一個線程,司機可以認為是 CPU 。在阻塞式裡面,每個線程需要不斷的輪詢,上下文切換,以達到找到目的地的結果。而在非阻塞方式裡,每個乘客 ( 線程 ) 都在睡覺 ( 休眠 ) ,隻在真正外部環境準備好了才喚醒,這樣的喚醒肯定不會阻塞。

socket的操作都有一個共同的結構:

1. Read request

2. Decode request

3. Process service

4. Encode reply

5. Send reply

經典的網絡服務的設計如下圖,在每個線程中完成對資料的處理:

JAVA 非阻塞IO原理

但這種模式在使用者負載增加時,性能将下降非常的快。我們需要重新尋找一個新的方案,保持資料處理的流暢,很顯然,事件觸發機制是最好的解決辦法,當有事件發生時,會觸動handler,然後開始資料的處理。

Reactor模式類似于AWT中的Event處理:

JAVA 非阻塞IO原理

Reactor模式參與者

1.Reactor 負責響應IO事件,一旦發生,廣播發送給相應的Handler去處理,這類似于AWT的thread

2.Handler 是負責非堵塞行為,類似于AWT ActionListeners;同時負責将handlers與event事件綁定,類似于AWT addActionListener

  非阻塞的原理

把整個過程切換成小的任務,通過任務間協作完成。

由一個專門的線程來處理所有的 IO 事件,并負責分發

事件驅動機制:事件到的時候觸發,而不是同步的去監視事件。

線程通訊:線程之間通過 wait,notify 等方式通訊。保證每次上下文切換都是有意義的。減少無謂的程序切換。

Reactor就是上面隐喻的售票員角色。每個線程的處理流程大概都是讀取資料、解碼、計算處理、編碼、發送響應

如圖:

JAVA 非阻塞IO原理

Java的NIO為reactor模式提供了實作的基礎機制,它的Selector當發現某個channel有資料時,會通過SlectorKey來告知我們,在此我們實作事件和handler的綁定。

我們來看看Reactor模式代碼:

public class Reactor implements Runnable{

  final Selector selector;
  final ServerSocketChannel serverSocket;

  Reactor(int port) throws IOException {
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
    serverSocket.socket().bind(address);

    serverSocket.configureBlocking(false);
    //向selector注冊該channel
     SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

    logger.debug("-->Start serverSocket.register!");

    //利用sk的attache功能綁定Acceptor 如果有事情,觸發Acceptor
    sk.attach(new Acceptor());
    logger.debug("-->attach(new Acceptor()!");
  }


  public void run() { // normally in a new Thread
    try {
    while (!Thread.interrupted())
    {
      selector.select();
      Set selected = selector.selectedKeys();
      Iterator it = selected.iterator();
      //Selector如果發現channel有OP_ACCEPT或READ事件發生,下列周遊就會進行。
      while (it.hasNext())
        //來一個事件 第一次觸發一個accepter線程
        //以後觸發SocketReadHandler
        dispatch((SelectionKey)(it.next()));
        selected.clear();
      }
    }catch (IOException ex) {
        logger.debug("reactor stop!"+ex);
    }
  }

  //運作Acceptor或SocketReadHandler
  void dispatch(SelectionKey k) {
    Runnable r = (Runnable)(k.attachment());
    if (r != null){
      // r.run();

    }
  }

  class Acceptor implements Runnable { // inner
    public void run() {
    try {
      logger.debug("-->ready for accept!");
      SocketChannel c = serverSocket.accept();
      if (c != null)
        //調用Handler來處理channel
        new SocketReadHandler(selector, c);
      }
    catch(IOException ex) {
      logger.debug("accept stop!"+ex);
    }
    }
  }
}
           

以上代碼中巧妙使用了SocketChannel的attach功能,将Hanlder和可能會發生事件的channel連結在一起,當發生事件時,可以立即觸發相應連結的Handler。

将資料讀出後,可以将這些資料處理線程做成一個線程池,這樣,資料讀出後,立即扔到線程池中,這樣加速處理速度:

JAVA 非阻塞IO原理

可參考:

http://www.jdon.com/concurrent/reactor.htm

http://javag.iteye.com/blog/221641