天天看點

《Scalable IO in Java》翻譯版

作者:滴水穿石K1

定義

《Scalable IO in Java》 是Doug Lea關于分析與建構可伸縮的高性能IO服務的經典文章。 原文位址: https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

目錄

  • 可擴充的網絡服務
  • 事件驅動處理
  • 反應堆模式
    • 基本版本
    • 多線程版本
    • 其他變體
  • java.nio非阻塞IO API示範

網絡服務

  • Web服務、分布式對象等大多具有相同的基本結構:
    • 讀取請求
    • 解碼請求
    • 處理服務
    • 編碼回複
    • 發送回複。
  • 但每個步驟的成本不同,包括XML解析、檔案傳輸、網頁生成和計算服務等。

經典的ServerSocket循環

《Scalable IO in Java》翻譯版
  • 每個處理程式可以在自己的線程中啟動。
class Server implements Runnable {
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();

            or, single-threaded, or a thread pool
        } catch (IOException ex) { 
        }
    }

    static class Handler implements Runnable {
        final Socket socket;

        Handler(Socket s) {
            socket = s;
        }

        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);

                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { 
            }
        }

        private byte[] process(byte[] cmd) { 
        }
    }
}
           

可擴充性目标

  • 在負載增加(更多用戶端)的情況下優雅降級
  • 随着資源的增加(CPU、記憶體、磁盤、帶寬)
  • 同時滿足可用性和性能目标
    • 低延遲
    • 滿足峰值需求
    • 可調整服務品質
  • "分而治之"通常是實作任何可擴充性目标的最佳方法

分而治之處理機制

  • 将處理過程分成小任務,每個任務以非阻塞的方式執行操作
  • 當啟用每個任務時執行它。在這裡,通常使用IO事件作為觸發器。
  • Java.nio支援的基本機制
    • 非阻塞讀寫,
    • 感覺IO事件分派相關的任務執行。
  • 結合事件驅動設計,可以有更多的變化.

事件驅動設計

  • 事件驅動通常比其他選擇更有效率。
    • 更少的資源
      • 通常不需要為每個用戶端建立一個線程
    • 更少的開銷
      • 減少的上下文切換,通常意味着需要更少的鎖
    • 分派可能會更慢。
      • 必須手動将操作綁定到事件
  • 通常更難程式設計
    • 必須分解成簡單的非阻塞動作。
      • 類似于圖形使用者界面事件驅動的操作
      • 無法消除所有阻塞:GC、頁面錯誤等。
  • 必須跟蹤服務的邏輯狀态

背景:AWT中的事件

《Scalable IO in Java》翻譯版
  • 事件驅動的IO使用類似的思想,但采用不同的設計。

反應器模式

  • 反應堆通過分派處理程式來響應IO事件,類似于 AWT 線程
  • 處理程式執行非阻塞操作, 類似于 AWT 的 ActionListeners
  • 通過将處理程式綁定到事件來管理, 類似于 AWT addActionListener

基本反應堆設計

《Scalable IO in Java》翻譯版
  • 單線程版本

java.nio 支援

  • 通道----支援非阻塞讀取的檔案、套接字等連接配接。
  • 緩存區---類似數組的對象,可以直接被通道讀取或寫入
  • 選擇器---告訴我哪些通道有IO事件。
  • 選擇鍵集合---負責IO事件狀态和綁定

反應堆1:設定

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);

        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }
}
           

反應堆2:循環排程

public void run() { //通常在一個新的線程中
    try {
        while (!Thread.interrupted()) {
            selector.select();
            Set selected = selector.selectedKeys();
            Iterator it = selected.iterator();
            while (it.hasNext())
                dispatch((SelectionKey)(it.next());
            selected.clear();
        }
    } catch (IOException ex) { 

     }
}
    
void dispatch(SelectionKey k) {
    Runnable r = (Runnable)(k.attachment());
    if (r != null)
        r.run();
}
           

反應堆3:接受者

class Acceptor implements Runnable { // inner
    public void run() {
        try {
            SocketChannel c = serverSocket.accept();

            if (c != null) {
                new Handler(selector, c);
            }
        } catch (IOException ex) { /* ... */
        }
    }
}
           
《Scalable IO in Java》翻譯版

反應堆4:處理程式設定

final class Handler implements Runnable {
    static final int READING = 0;
    static final int SENDING = 1;
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    int state = READING;

    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }
    boolean inputIsComplete() { /* ... */ }
    boolean outputIsComplete() { /* ... */ }
    void process() { /* ... */ }
}
           

反應堆5:請求處理

public void run() {
    try {
        if (state == READING)read();
        else if (state == SENDING) send();
    } catch (IOException ex) { /* ... */ }
}

void read() throws IOException {
    socket.read(input);
    if (inputIsComplete()) {
        process();
        state = SENDING;
    // Normally also do first write now
        sk.interestOps(SelectionKey.OP_WRITE);
    }
}

void send() throws IOException {
    socket.write(output);
    if (outputIsComplete()) sk.cancel();
}
           

Per-State 處理程式

  • 使用狀态模式(GoF)進行優化,不需要再進行狀态的判斷
class Handler {
    // ...
    public void run() {
        // initial state is reader
        socket.read(input);
        if (inputIsComplete()) {
            process();
            sk.attach(new Sender());
            sk.interest(SelectionKey.OP_WRITE);
            sk.selector().wakeup();
        }
    }
     class Sender implements Runnable {
        public void run() {
            // ...
            socket.write(output);
            if (outputIsComplete())
                sk.cancel();
        }
    }
}
           

多線程設計

  • 為了提升可擴充性增加加線程,主要适用于多處理器
  • 工作線程
    • 反應器應該快速觸發處理程式,處理程式的處理會減慢反應器的速度,将非IO處了解除安裝到其他線程上
  • 多個反應器線程
    • 反應器線程可能會因為IO操作而飽和,
    • 将負載配置設定到其他反應器上
    • "負載均衡以比對CPU和IO速率

工作者線程

  • 将非I/O處了解除安裝以加速反應器線程
  • “比重新設計計算綁定處理為事件驅動形式更簡單
    • 應該仍然是純非阻塞計算,“足夠的處理能夠超過開銷,“但是與IO重疊處理更加困難
  • 最好的方法是先将所有輸入讀入緩沖區
  • “使用線程池以進行調整和控制
  • 通常需要的線程比用戶端少得多
《Scalable IO in Java》翻譯版

使用線程池處理

class Handler implements Runnable {
    // uses util.concurrent thread pool
    static PooledExecutor pool = new PooledExecutor(...);
    static final int PROCESSING = 3;
    // ...
    synchronized void read() { // ...
        socket.read(input);
        if (inputIsComplete()) {
            state = PROCESSING;
            pool.execute(new Processer());
        }
    }
    synchronized void processAndHandOff() {
        process();
        state = SENDING; // or rebind attachment
        sk.interest(SelectionKey.OP_WRITE);
    }
    class Processer implements Runnable {
        public void run() { processAndHandOff(); }
    }
}
           

協調任務

  • 任務間互動
    • 每個任務都會啟用、觸發或調用下一個任務,傳遞通常是最快的,但難以控制
  • 每個處理器中分發器的回調設定狀态,傳回值等(中介者模式的變體)
  • 不同線程的緩沖區問題
  • 需要傳回值時,線程需要通過join,wait/notify等方法進行協調s

使用 PooledExecutor

  • 可調節的工作線程池
  • 主方法執行(Runnable r)
  • 控制項:
    • 任務隊列類型
    • 最大線程數
    • 最小線程數
    • 按需配置設定線程
    • 保持活動狀态的時間間隔,直到空閑線程死亡
    • 飽和度政策

reactor線程的池化處理

  • 使用反應堆池,用于比對CPU和IO速率
  • 每個reactor靜态或動态構造
  • 在主接收器(acceptor)中分發給其他reactor.
Selector[] selectors;
int next = 0;
class Acceptor {
    // ...
    public synchronized void run() {
        // ...
        Socket connection = serverSocket.accept();
        if (connection != null)
            new Handler(selectors[next], connection);
        if (++next == selectors.length)
            next = 0;
    }
}           
《Scalable IO in Java》翻譯版

使用其他的java.nio功能

  • 每個反應器多個選擇器
    • 将不同的處理程式綁定到不同的IO事件可能需要仔細協調以進行同步。
  • 檔案傳輸
    • 自動檔案到網絡或網絡到檔案的複制
  • 記憶體映射檔案
    • 通過緩沖區通路檔案
  • 直接緩沖區

基于連接配接的擴充

  • 不是單一的服務請求
    • 用戶端連接配接
    • 用戶端發送一系列消息/請求
    • 用戶端斷開連接配接
  • 範例
    • 資料庫和事務監控器
    • 多參與者遊戲、聊天等
  • 可以擴充基本網絡服務模式
    • 處理許多相對長期的用戶端
    • 跟蹤用戶端和會話狀态(包括掉線)
    • 将服務分布在多個主機上