定義
《Scalable IO in Java》 是Doug Lea關于分析與建構可伸縮的高性能IO服務的經典文章。 原文位址: https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
目錄
- 可擴充的網絡服務
- 事件驅動處理
- 反應堆模式
- 基本版本
- 多線程版本
- 其他變體
- java.nio非阻塞IO API示範
網絡服務
- Web服務、分布式對象等大多具有相同的基本結構:
- 讀取請求
- 解碼請求
- 處理服務
- 編碼回複
- 發送回複。
- 但每個步驟的成本不同,包括XML解析、檔案傳輸、網頁生成和計算服務等。
經典的ServerSocket循環
- 每個處理程式可以在自己的線程中啟動。
class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
or, single-threaded, or a thread pool
} catch (IOException ex) {
}
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) {
socket = s;
}
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) {
}
}
private byte[] process(byte[] cmd) {
}
}
}
可擴充性目标
- 在負載增加(更多用戶端)的情況下優雅降級
- 随着資源的增加(CPU、記憶體、磁盤、帶寬)
- 同時滿足可用性和性能目标
- 低延遲
- 滿足峰值需求
- 可調整服務品質
- "分而治之"通常是實作任何可擴充性目标的最佳方法
分而治之處理機制
- 将處理過程分成小任務,每個任務以非阻塞的方式執行操作
- 當啟用每個任務時執行它。在這裡,通常使用IO事件作為觸發器。
- Java.nio支援的基本機制
- 非阻塞讀寫,
- 感覺IO事件分派相關的任務執行。
- 結合事件驅動設計,可以有更多的變化.
事件驅動設計
- 事件驅動通常比其他選擇更有效率。
- 更少的資源
- 通常不需要為每個用戶端建立一個線程
- 更少的開銷
- 減少的上下文切換,通常意味着需要更少的鎖
- 分派可能會更慢。
- 必須手動将操作綁定到事件
- 通常更難程式設計
- 必須分解成簡單的非阻塞動作。
- 類似于圖形使用者界面事件驅動的操作
- 無法消除所有阻塞:GC、頁面錯誤等。
- 必須跟蹤服務的邏輯狀态
背景:AWT中的事件
- 事件驅動的IO使用類似的思想,但采用不同的設計。
反應器模式
- 反應堆通過分派處理程式來響應IO事件,類似于 AWT 線程
- 處理程式執行非阻塞操作, 類似于 AWT 的 ActionListeners
- 通過将處理程式綁定到事件來管理, 類似于 AWT addActionListener
基本反應堆設計
- 單線程版本
java.nio 支援
- 通道----支援非阻塞讀取的檔案、套接字等連接配接。
- 緩存區---類似數組的對象,可以直接被通道讀取或寫入
- 選擇器---告訴我哪些通道有IO事件。
- 選擇鍵集合---負責IO事件狀态和綁定
反應堆1:設定
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}
}
反應堆2:循環排程
public void run() { //通常在一個新的線程中
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next());
selected.clear();
}
} catch (IOException ex) {
}
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
}
反應堆3:接受者
class Acceptor implements Runnable { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null) {
new Handler(selector, c);
}
} catch (IOException ex) { /* ... */
}
}
}
反應堆4:處理程式設定
final class Handler implements Runnable {
static final int READING = 0;
static final int SENDING = 1;
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }
}
反應堆5:請求處理
public void run() {
try {
if (state == READING)read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
Per-State 處理程式
- 使用狀态模式(GoF)進行優化,不需要再進行狀态的判斷
class Handler {
// ...
public void run() {
// initial state is reader
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
class Sender implements Runnable {
public void run() {
// ...
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}
}
多線程設計
- 為了提升可擴充性增加加線程,主要适用于多處理器
- 工作線程
- 反應器應該快速觸發處理程式,處理程式的處理會減慢反應器的速度,将非IO處了解除安裝到其他線程上
- 多個反應器線程
- 反應器線程可能會因為IO操作而飽和,
- 将負載配置設定到其他反應器上
- "負載均衡以比對CPU和IO速率
工作者線程
- 将非I/O處了解除安裝以加速反應器線程
- “比重新設計計算綁定處理為事件驅動形式更簡單
- 應該仍然是純非阻塞計算,“足夠的處理能夠超過開銷,“但是與IO重疊處理更加困難
- 最好的方法是先将所有輸入讀入緩沖區
- “使用線程池以進行調整和控制
- 通常需要的線程比用戶端少得多
使用線程池處理
class Handler implements Runnable {
// uses util.concurrent thread pool
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
}
協調任務
- 任務間互動
- 每個任務都會啟用、觸發或調用下一個任務,傳遞通常是最快的,但難以控制
- 每個處理器中分發器的回調設定狀态,傳回值等(中介者模式的變體)
- 不同線程的緩沖區問題
- 需要傳回值時,線程需要通過join,wait/notify等方法進行協調s
使用 PooledExecutor
- 可調節的工作線程池
- 主方法執行(Runnable r)
- 控制項:
- 任務隊列類型
- 最大線程數
- 最小線程數
- 按需配置設定線程
- 保持活動狀态的時間間隔,直到空閑線程死亡
- 飽和度政策
reactor線程的池化處理
- 使用反應堆池,用于比對CPU和IO速率
- 每個reactor靜态或動态構造
- 在主接收器(acceptor)中分發給其他reactor.
Selector[] selectors;
int next = 0;
class Acceptor {
// ...
public synchronized void run() {
// ...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length)
next = 0;
}
}
使用其他的java.nio功能
- 每個反應器多個選擇器
- 将不同的處理程式綁定到不同的IO事件可能需要仔細協調以進行同步。
- 檔案傳輸
- 自動檔案到網絡或網絡到檔案的複制
- 記憶體映射檔案
- 通過緩沖區通路檔案
- 直接緩沖區
基于連接配接的擴充
- 不是單一的服務請求
- 用戶端連接配接
- 用戶端發送一系列消息/請求
- 用戶端斷開連接配接
- 範例
- 資料庫和事務監控器
- 多參與者遊戲、聊天等
- 可以擴充基本網絡服務模式
- 處理許多相對長期的用戶端
- 跟蹤用戶端和會話狀态(包括掉線)
- 将服務分布在多個主機上