深入淺出 NIO 網絡程式設計
- JAVA NIO概述
- NIO 和 BIO的對比
- NIO的三大核心元件
-
- NIO的三大核心元件—Buffer緩沖區
-
- Buffer記憶體類型
- 堆外方式的使用場景和注意
- NIO的三大核心元件—Channel通道
-
- NIO Channel VS Java Stream
- Channel的實作
- SocketChannel
- ServerSocketChannel
- 模拟代碼
- Selector選擇器
-
- Selector監聽事件
- Selector的使用
- NIO 與多線程結合改進方案
-
- reactor模式
- 總結
JAVA NIO概述
Java NIO是始于java1.4版本後提出的新的Java IO操作的
非阻塞
API,用于替換标準( 傳統,或者 Blocking IO ,簡稱為 BIO ) Java IO API 的 IO API 。
NIO 和 BIO的對比
NIO | BIO |
---|---|
非阻塞IO | 阻塞IO |
基于緩沖區(Buffer) | 基于流(stream) |
擁有選擇器Selector,是 NIO 實作非阻塞的基礎 | 無選擇器 |

NIO的三大核心元件
- Buffer緩沖區,本質上市一個可以寫入的記憶體塊(類似數組),可以再次讀取,該記憶體塊包含在NIO Buffer對象中,該對象提供了一系列的方法,用于操作使用記憶體塊,後文會詳細解讀Buffer緩沖區的使用。
- Channel通道
- Selector選擇器
NIO的三大核心元件—Buffer緩沖區
Buffer的基礎屬性:
capacity容量:buffer具有一定的固定的大小,也成為容量;
position位置:寫入模式下代表寫資料的位置,讀模式下代表讀取資料的位置;
limit限制:寫入模式,限制大小等于buffer的容量大小,limit=capacity,讀取模式下,limit等于寫入的資料量的大小。
mark标記: 記錄目前 position
![]()
深入淺出了解Java NIOJAVA NIO概述NIO 和 BIO的對比NIO的三大核心元件
從上述可知
position 屬性代表位置,初始值為 0
。在寫模式下,每 Buffer 中寫入一個值,position 就加 1 ,代表下一次的寫入位置。在讀模式下,每從 Buffer 中讀取一個值,position 就自動加 1 ,代表下一次的讀取位置。
limit 屬性代表上限限制
。在寫模式下,代表最大能寫入的資料上限位置,此時 limit 等于 capacity 。在讀模式下,在 Buffer 完成所有資料寫入後,通過調用 flip() 方法,切換到讀模式。此時,limit 等于 Buffer 中實際的資料大小。因為 Buffer 不一定被寫滿,是以不能使用 capacity 作為實際的資料大小。
mark 屬性為标記
,通過 mark() 方法,記錄目前 position ;通過 reset() 方法,恢複 position 為标記。
寫模式下,标記上一次寫位置。
讀模式下,标記上一次讀位置。
使用Buffer進行資料讀取和寫入操作,步驟如下
1.将資料寫入緩沖區
2.調用buffer.filp(),轉換為讀取模式
3.緩沖區讀取資料
4.調用buffer.clear() 或buffer.compact()清除緩沖區
代碼示例
package nio;
import java.nio.ByteBuffer;
/**
* @author 潇兮
* @date 2019/10/13 16:58
**/
public class BufferTest {
public static void main(String[] args){
//基于堆内( Non-Direct )記憶體的實作類 HeapByteBuffer 的對象,allocateDirect(int capacity)堆外記憶體
ByteBuffer byteBuffer=ByteBuffer.allocate(5);
//預設是寫入模式
System.out.println(String.format("初始化: capacity容量:%s,position位置:%sm,limit上限:%s",byteBuffer.capacity(),byteBuffer.position(),byteBuffer.limit()));
//寫入操作
System.out.println("開始讀資料");
byteBuffer.put((byte) 1);
byteBuffer.put((byte) 2);
byteBuffer.put((byte) 3);
//寫入資料後屬性變化
System.out.println(String.format("寫入資料後屬性變化: capacity容量:%s,position位置:%sm,limit上限:%s",byteBuffer.capacity(),byteBuffer.position(),byteBuffer.limit()));
//轉換為讀模式,讀取資料若是不掉用flip方法,position的位置不正确
byteBuffer.flip();
byte a=byteBuffer.get();
System.out.println(a);
byte b=byteBuffer.get();
System.out.println(b);
//讀取資料後屬性的變化
System.out.println(String.format("讀取資料後屬性的變化: capacity容量:%s,position位置:%sm,limit上限:%s",byteBuffer.capacity(),byteBuffer.position(),byteBuffer.limit()));
}
}
Buffer記憶體類型
ByteBuffer提供了直接記憶體(direct堆外記憶體)和直接記憶體(heap堆)兩種實作。
堆外記憶體擷取的方式:ByteBuffer byteBuffer=ByteBuffer.allocateDirect(int capacity);
使用堆外記憶體的好處:
1.進行網絡 IO 比heapBuffer少一次拷貝。(file/socket——OS memory——jvm heap)因為GC會移動對象記憶體,是以在寫file或者socket時,JVM的實作會先把資料複制到堆外 ,再進行寫入。
2.GC範圍之外,降低GC壓力,實作了自動管理。DirectByteBuffer中有一個Cleaner對象(PhantomReference),Cleaner被GC前會執行clean方法,觸發DirectByteBuffer中定義的回收函數 Deallocator
堆外方式的使用場景和注意
1.在性能确實可觀的情況下才去使用堆外記憶體的方式;例如配置設定給大型、長壽命的對象或應用(網絡傳輸、檔案讀寫場景),減少記憶體拷貝的次數;
2.通過虛拟機參數MaxDirectMemorySize限制大小,防止耗盡整個機器的記憶體
NIO的三大核心元件—Channel通道
Channel的API涵蓋了UDP/TCP網絡和檔案 IO,例如FileChannel、DatagramChannel、SocketChannel
ServerSocketChannel。
NIO Channel VS Java Stream
1.對于同一個channel,我們可以在同一通道内讀取和寫入操作。而對于同一個stream中,要麼隻讀,要麼隻寫,二選一,即是單向操作的;
2.Channel可以非阻塞的讀寫 IO 操作,而stream隻能阻塞的讀寫IO操作;
3.Channel的使用必須搭配Buffer,即是總是先讀取到一個buffer中或向一個buffer中寫入,再寫入Channel。
Channel的實作
Channel 在 Java 中,作為一個接口(java.nio.channels.Channel ),定義了 IO 操作的連接配接與關閉。主要有代碼如下
Channel 最為重要的四個 Channel 實作類如下:
:用戶端用于發起 TCP 的 Channel 。
SocketChannel
:服務端用于監聽新進來的連接配接的 TCP 的 Channel 。對于新進來的連接配接,都會建立一個對應的 SocketChannel 。
ServerSocketChannel
:通過 UDP 讀寫資料。
DatagramChannel
:從檔案中,讀寫資料。
FileChannel
SocketChannel
SocketChannel用于建立TCP網絡連接配接,類似java.net.Socket。有兩種建立方式。參考《Java NIO 系列教程(八) SocketChannel》
- 用戶端主動發起和伺服器的連接配接
- 服務端擷取新連接配接
//用戶端主動發起連接配接
SocketChannel socketChannel=SocketChannel.open();
socketChannel.configureBlocking(false);//設定為非阻塞模式,預設是阻塞模式
socketChannel.connect(new InetSocketAddress("http://xxxx.com",80));
channel.write(byteBuffer);//發送請求資料——向通道内寫入資料(循環中調用)
int bytesRead=socketChannel.read(byteBuffer);//讀取服務端傳回的資料——讀取緩沖區的資料
socketChannel.close();//關閉連接配接
注意:write寫:write()在還沒寫入任何内容的時候就有可能傳回(非阻塞),是以需要循環中調用write().
read讀:read()方法可能直接傳回而讀不到任何資料,根據傳回的int值判斷讀取了多少位元組。
ServerSocketChannel
ServerSocketChannel可以監聽建立的TCP連接配接通道,類似ServerSocket。詳細介紹可以參考《Java NIO系列教程(九) ServerSocketChannel》
//建立網絡服務端
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);//設定非阻塞
//綁定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
while(true){
//擷取新的TCP連結端口
SocketChannel socketChannel=serverSocketChannel.accept();
if(socketChannel !=null){
//tcp請求,讀取響應
}
}
serverSocketChannel.accept():如果該通道處于非阻塞模式,那麼如果沒有挂起連接配接,該方法立即傳回null,是以SocketChannel 要進行非null校驗。
模拟代碼
用戶端代碼:
package nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
/**
* @Author 作者 :@潇兮
* @Date 建立時間:2019/10/14 12:09
* 類說明:
*/
public class NIOClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
while (!socketChannel.finishConnect()) {
// 判斷是否連接配接完成,若沒連接配接上,則一直等待
Thread.yield();
}
Scanner scanner = new Scanner(System.in);
System.out.println("請輸入:");
// 發送内容
String msg = scanner.nextLine();
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// 讀取響應
System.out.println("收到服務端響應:");
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 長連接配接情況下,需要手動判斷資料有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)
if (requestBuffer.position() > 0) break;
}
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
scanner.close();
socketChannel.close();
}
}
服務端代碼(一):
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
* @Author 作者 :@潇兮
* @Date 建立時間:2019/10/14 11:06
* 類說明:用于初步了解,直接基于非阻塞的寫法,後文繼續待改進
*/
public class NIOServer {
public static void main(String[] args) throws IOException {
//建立網絡服務端
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);//設定阻塞
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("端口啟動");
while (true){
SocketChannel socketChannel=serverSocketChannel.accept();//擷取新連接配接
//判斷socketChannel
if (socketChannel!=null){
System.out.println("擷取的新連接配接:"+socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);//設定為非阻塞
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
try {
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 長連接配接情況下,需要手動判斷資料有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)
if (requestBuffer.position() > 0) break;
}
if (requestBuffer.position() == 0) continue;//若無資料,不繼續處理後續
requestBuffer.flip();//切換為讀模式
byte[] content = new byte[requestBuffer.limit()];//初始化數組大小
requestBuffer.get(content);//擷取内容
System.out.println("收到消息:"+new String(content));
System.out.println("收到資料,來自:"+ socketChannel.getRemoteAddress());
// 響應結果 200,模拟請求響應
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Yes,He is";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());//資料存放在byte數組
while (buffer.hasRemaining()) {
// hasRemaining() 傳回是否有剩餘的可用長度
socketChannel.write(buffer);// 非阻塞
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}
從服務端代碼(一)中,我們運作NIOServer,同時打開多個client,發現隻有當一個client發送讀取結束後第二個client才與Server建立連接配接。即是當58729端口完成發送資訊後,58737端口的用戶端才會與server建立連接配接。和 BIO 的代碼實作差別不大,此處隻是将方式改為非阻塞的形式,改進代碼,見
服務端代碼(改進一)
,服務端代碼(一)運作結果如下圖:
服務端代碼(改進一)
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
/**
* @Author 作者 :@潇兮
* @Date 建立時間:2019/10/14 11:54
* 類說明:直接基于非阻塞的寫法,一個線程處理輪詢所有請求, 問題: 輪詢通道的方式,低效,浪費CPU。後文繼續改進
*/
public class NIOServer1 {
//已經建立連接配接的集合
private static ArrayList<SocketChannel> channels = new ArrayList<>();
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("端口啟動");
while (true){
SocketChannel socketChannel=serverSocketChannel.accept();
if (socketChannel!=null){
System.out.println("收到新連接配接 : " + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false); // 預設是阻塞的,一定要設定為非阻塞
channels.add(socketChannel);
}else {
//若無新連接配接,處理現有連接配接後删除
Iterator<SocketChannel> iterator = channels.iterator();
while (iterator.hasNext()){
SocketChannel ch=iterator.next();
try {
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
if (ch.read(byteBuffer)==0){
//若通道内無資料處理,退出目前循環
continue;
}
while (ch.isOpen() && ch.read(byteBuffer)!=-1){
// 長連接配接情況下,需要手動判斷資料有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)
if (byteBuffer.position() > 0) break;
}
if (byteBuffer.position()==0) continue; // 如果沒資料了, 則不繼續後面的處理
//切換為讀模式
byteBuffer.flip();
byte[] content = new byte[byteBuffer.limit()];
byteBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到資料,來自:" + ch.getRemoteAddress());
// 響應結果 200,模拟響應
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
// hasRemaining() 傳回是否有剩餘的可用長度
ch.write(buffer);
}
//處理後删除
iterator.remove();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}
}
運作server開啟服務端後運作多個用戶端client,發現通過
輪詢通道
的方式可以一次建立多個連接配接,如同代碼所示,當一個程式與服務端建立連接配接的時候,fu服務端收到響—accept(),收到響應後将它加入channels中,保留多個連接配接,當無新連接配接建立時,線程處理已經建立好的連接配接。但是存在不足,上述代碼的實作方式是為低效的循環檢查的方式,NIO提供了Selector的方式解決此類問題,避免循環檢查,具體代碼見
服務端代碼(改進二)
,服務端代碼(改進一)運作結果如下圖:
Selector選擇器
Selector是一個Java NIO 元件,可以檢查一個或者多個NIO通道,确定一個或多個 NIO Channel 的狀态是否處于可讀、可寫。實作了單個線程可以管理多個通道,進而管理多個網絡連接配接。是以,Selector 也被稱為多路複用器。
Selector監聽事件
一個線程使用Selector監聽多個channel的不同僚件:四個事件分别對應SelectionKey四個常量
1.Connet連接配接:連接配接完成事件( TCP 連接配接 ),僅适用于用戶端, SelectionKey.OP_CONNECT
2.Accept:接受新連接配接事件,僅适用于服務端,SelectionKey.OP_ACCEPT
3.Read讀取:讀事件,适用于兩端,表示Buffer可讀,SelectionKey.OP_READ
4.Write寫入:讀事件,适用于兩端,表示Buffer可寫,SelectionKey.OP_WRITE
Selector的使用
建立Selector可以通過
Selector selector = Selector.open();
來建立一個Selector對象。為了Selector能夠管理Channel,将Channel注冊到Selector中(一個 Channel 要注冊到 Selector 中,那麼該 Channel 必須是非阻塞,FileChannel是阻塞的,是以不能夠注冊)
channel.configureBlocking(false); //
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
若Selector 可以對 Channel 的多個事件感興趣,要注冊 Channel 的多個事件到 Selector 中時,可以使用或運算
|
來組合多個事件。示例代碼如下:
channel.configureBlocking(false); //
SelectionKey key = channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
服務端代碼(改進二)
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @Author 作者 :@潇兮
* @Date 建立時間:2019/10/14 15:12
* 類說明: 此處一個selector監聽所有事件,一個線程處理所有請求事件. 會成為瓶頸! 要有多線程的運用,後文繼續改進
*/
public class NIOServer2 {
public static void main(String[] args) throws IOException {
// 1. 建立網絡服務端ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 設定為非阻塞模式
// 2. 建構一個Selector選擇器,将channel注冊上去
Selector selector = Selector.open();
// 将serverSocketChannel注冊到selector
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);
// 對serverSocketChannel上面的accept事件監聽(serverSocketChannel隻能支援accept操作)
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
// 3. 綁定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("端口啟動");
while (true){
//不輪詢通道,改用輪詢事件的方式.select方法有阻塞效果,直到有事件通知才會有傳回
selector.select();
Set<SelectionKey> selectionKeys=selector.selectedKeys();
//周遊查詢結果
Iterator<SelectionKey> iterator=selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key=iterator.next();
iterator.remove();
//監聽讀和寫事件
if (key.isAcceptable()){
ServerSocketChannel server= (ServerSocketChannel) key.attachment();
//将通道注冊到selector上
SocketChannel clientSocketChannel=server.accept();//mainReactor 輪詢accept
clientSocketChannel.configureBlocking(false);
clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel);
System.out.println("收到新連接配接 : " + clientSocketChannel.getRemoteAddress());
}
if (key.isReadable()){
SocketChannel socketChannel = (SocketChannel) key.attachment();
try {
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 長連接配接情況下,需要手動判斷資料有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)
if (requestBuffer.position() > 0) break;
}
if(requestBuffer.position() == 0) continue; // 如果沒資料了, 則不繼續後面的處理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到資料,來自:" + socketChannel.getRemoteAddress());
// TODO 業務操作 資料庫 接口調用等等
// 響應結果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
} catch (IOException e) {
// e.printStackTrace();
key.cancel(); // 取消事件訂閱
}
}
}
selector.selectNow();
}
}
}
NIO 與多線程結合改進方案
reactor模式
reactor模式稱之為響應器模式,常用于nio的網絡通信架構。下圖來源:《Scalable IO in Java》
單Reactor模式
概括上圖:Reactor線程接收請求->分發給線程池處理請求
單Reactor模式,定義了兩種線程,一種線程是Reator線程。Reator線程主要負責網絡的資料接收(accept())以及網絡連接配接的處理(比如TCP連接配接中接收的資料),接收的資料處理操作(如讀資料、解析協定等)由單獨的線程池執行。 實際上就是将底層的基礎網絡處理和應用層的邏輯處理做了分離,提高效率。
多Reactor模式
概括上圖:mainReactor->分發給subReactor讀寫->具體業務邏輯分發給單獨的線程池處理
多Reactor模式,是将Reactor分為了多種,将處理網絡連接配接的交由mainReactor去做,資料的讀取處理交由另外一個Reactor去做,其他的和單Reactor模式無較大差別。本質上就是在網絡底層多了一次分發,将資料處理交由另外一個線程去做。
根據Reactor模型改進服務端代碼如下:、
服務端代碼(三)
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author 潇兮
* @date 2019/10/15 21:26
**/
public class NIOServer3 {
/** 處理業務操作的線程 */
private static ExecutorService workPool = Executors.newCachedThreadPool();
/**
* 封裝了selector.select()等事件輪詢的代碼
*/
abstract class ReactorThread extends Thread {
Selector selector;
LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
/**
* Selector監聽到有事件後,調用這個方法
*/
public abstract void handler(SelectableChannel channel) throws Exception;
private ReactorThread() throws IOException {
selector = Selector.open();
}
volatile boolean running = false;
@Override
public void run() {
// 輪詢Selector事件
while (running) {
try {
// 執行隊列中的任務
Runnable task;
while ((task = taskQueue.poll()) != null) {
task.run();
}
selector.select(1000);
// 擷取查詢結果
Set<SelectionKey> selected = selector.selectedKeys();
// 周遊查詢結果
Iterator<SelectionKey> iter = selected.iterator();
while (iter.hasNext()) {
// 被封裝的查詢結果
SelectionKey key = iter.next();
iter.remove();
int readyOps = key.readyOps();
// 關注 Read 和 Accept兩個事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
try {
SelectableChannel channel = (SelectableChannel) key.attachment();
channel.configureBlocking(false);
handler(channel);
if (!channel.isOpen()) {
key.cancel(); // 如果關閉了,就取消這個KEY的訂閱
}
} catch (Exception ex) {
key.cancel(); // 如果有異常,就取消這個KEY的訂閱
}
}
}
selector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private SelectionKey register(SelectableChannel channel) throws Exception {
// 為什麼register要以任務送出的形式,讓reactor線程去處理?
// 因為線程在執行channel注冊到selector的過程中,會和調用selector.select()方法的線程争用同一把鎖
// 而select()方法實在eventLoop中通過while循環調用的,争搶的可能性很高,為了讓register能更快的執行,就放到同一個線程來處理
FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
taskQueue.add(futureTask);
return futureTask.get();
}
private void doStart() {
if (!running) {
running = true;
start();
}
}
}
private ServerSocketChannel serverSocketChannel;
// 1、建立多個線程 - accept處理reactor線程 (accept線程)
private ReactorThread[] mainReactorThreads = new ReactorThread[1];
// 2、建立多個線程 - io處理reactor線程 (I/O線程)
private ReactorThread[] subReactorThreads = new ReactorThread[8];
/**
* 初始化線程組
*/
private void newGroup() throws IOException {
// 建立IO線程,負責處理用戶端連接配接以後socketChannel的IO讀寫
for (int i = 0; i < subReactorThreads.length; i++) {
subReactorThreads[i] = new ReactorThread() {
@Override
public void handler(SelectableChannel channel) throws IOException {
// work線程隻負責處理IO處理,不處理accept事件
SocketChannel ch = (SocketChannel) channel;
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (ch.isOpen() && ch.read(requestBuffer) != -1) {
// 長連接配接情況下,需要手動判斷資料有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)
if (requestBuffer.position() > 0) break;
}
if (requestBuffer.position() == 0) return; // 如果沒資料了, 則不繼續後面的處理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println(Thread.currentThread().getName() + "收到資料,來自:" + ch.getRemoteAddress());
// TODO 業務操作 資料庫、接口...
workPool.submit(() -> {
});
// 響應結果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
ch.write(buffer);
}
}
};
}
// 建立mainReactor線程, 隻負責處理serverSocketChannel
for (int i = 0; i < mainReactorThreads.length; i++) {
mainReactorThreads[i] = new ReactorThread() {
AtomicInteger incr = new AtomicInteger(0);
@Override
public void handler(SelectableChannel channel) throws Exception {
// 隻做請求分發,不做具體的資料讀取
ServerSocketChannel ch = (ServerSocketChannel) channel;
SocketChannel socketChannel = ch.accept();
socketChannel.configureBlocking(false);
// 收到連接配接建立的通知之後,分發給I/O線程繼續去讀取資料
int index = incr.getAndIncrement() % subReactorThreads.length;
ReactorThread workEventLoop = subReactorThreads[index];
workEventLoop.doStart();
SelectionKey selectionKey = workEventLoop.register(socketChannel);
selectionKey.interestOps(SelectionKey.OP_READ);
System.out.println(Thread.currentThread().getName() + "收到新連接配接 : " + socketChannel.getRemoteAddress());
}
};
}
}
/**
* 初始化channel,并且綁定一個eventLoop線程
*
* @throws IOException IO異常
*/
private void initAndRegister() throws Exception {
// 1、 建立ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 2、 将serverSocketChannel注冊到selector
int index = new Random().nextInt(mainReactorThreads.length);
mainReactorThreads[index].doStart();
SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
}
/**
* 綁定端口
*
* @throws IOException IO異常
*/
private void bind() throws IOException {
// 1、 正式綁定端口,對外服務
serverSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("啟動完成,端口8080");
}
public static void main(String[] args) throws Exception {
NIOServer3 nioServerV3 = new NIOServer3();
nioServerV3.newGroup(); // 1、 建立main和sub兩組線程
nioServerV3.initAndRegister(); // 2、 建立serverSocketChannel,注冊到mainReactor線程上的selector上
nioServerV3.bind(); // 3、 為serverSocketChannel綁定端口
}
}
附上參考連結Reactor模型了解【NIO系列】——Reactor模式
總結
Java NIO是一種新的Java IO操作的
非阻塞
API。要想提升性能,需要與多線程技術結合使用。由于網絡程式設計的複雜性,在開源社群中湧現了多款對JDK NIO封裝和增強的架構如netty、Mina等架構。