三人行,必有吾師,歡迎加入星球,一起讨論技術點滴

淺談NIO
-
-
-
-
-
- 三人行,必有吾師,歡迎加入星球,一起讨論技術點滴
-
-
-
-
- 淺談NIO(Non-blocking I/O)
-
-
- 1> BIO (Blocking I/O)
- 2> 同步,異步,阻塞、非阻塞
- 3> 程式在請求網絡時,到底做了什麼?和IO有什麼關系?
- 4> NIO原理
- 5> NIO示例
- 6> NIO适用場景
-
- 其他内容:
-
-
- 優化線程模型
- 事件分發器
- EPoll(linux大于 2.6) 和 Poll(linux 小于2.6)
- read()和write()
- Buffer的建立,及各種操作
- Netty:
-
-
-
- 三人行,必有吾師,歡迎加入星球,一起讨論技術點滴
-
-
-
淺談NIO(Non-blocking I/O)
是一種同步非阻塞I/O模型
主要從以下幾個方面談一下對NIO的了解:
- 在NIO之前,傳統IO是什麼樣的,有什麼弊端?
- 同步、異步、阻塞、非阻塞的概念
- 程式在請求網絡時,到底做了什麼?和IO有什麼關系?
- NIO的原理
- NIO的示例
- NIO的适用場景
1> BIO (Blocking I/O)
傳統IO(Blocking I/O):阻塞IO,常用的是異步阻塞IO,使用場景:一般我們請求網絡會開了個新的線程或者從線程池中選一個空閑的線程去執行網絡請求,在送出請求後、響應到來前,這個線程一直是等待(不繼續幹别的)是阻塞的,直到響應到來後,回調給調用線程後,該線程才會完成,不在占用CPU。
下面看一個例子:僞代碼如下:
{
ExecutorService executor = Executors.newFixedThreadPool(100);
ServerSocket serverSocket = null;
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(8088));//監聽 8088端口
while (true) {
Socket socket = serverSocket.accept(); // 這個是阻塞的
executor.submit(new ConnectIOHandler(socket)); //為新的連接配接建立新的線程
}
}
class ConnectIOHandler implements Runnable {
private Socket socket;
public ConnectIOHandler(Socket socket) {
this.socket = socket;
}
public void run() {
if (!socket.isClosed()) {
int read = socket.getInputStream().read();
............
socket.getOutputStream().write(bytes);
}
}
}
/**
socket.accept()、socket.read()、socket.write()都是同步阻塞的,當一個連接配接在處理I/O時,系統是阻塞的,但cpu是釋放的(瓶頸在I/O)必須使用多線程,
多線程本質:1.利用多核 2. 當I/O阻塞時,多線程使用CPU資源
注:現在多線程一般使用線程池,可以讓線程的建立和回收成本相對較低,在活動連接配接數不是特别高(小于單機1000)的情況下,這種模型是比較不錯的,可以讓每一個連接配接專注于自己的I/O并且程式設計模型簡單,不用過多的考慮系統的過載,限流等問題 ,線程池本身也是一個天然的漏鬥,可以緩沖一些處理不了的連接配接或請求。
問題 :
最本質的問題 ,嚴重依賴于線程,但線程是很貴的資源,主要表現在:
1. 線程的建立和銷毀成本高,在linux中,線程本身是一個程序,建立和銷毀都是重量級的系統函數
2. 線程本身占用較大的記憶體,像Java線程棧,一般至少配置設定 512K~1M的空間,如果系統中的線程數過多,恐怕整個jvm的記憶體會被吃掉一半
3. 線程切換的成本很高,作業系統在發生線程切換時,需要保留線程的上下文,然後執行系統調用,如果線程數過多,可能執行線程切換的時間會大于線程執行的時間,這時候帶來的表現往往是系統load偏高,cpu sy使用率高,導緻系統幾乎不可用的狀态
4. 容易造成鋸齒狀的系統負載,因為系統負載是用活動線程數或CPU核心數,一旦線程數量高但外部網絡環境不是很穩定,就很容易造成大量請求的結果同時傳回,激活大量阻塞線程進而使系統負載壓力過大。
*/
2> 同步,異步,阻塞、非阻塞
- 同步:關注消息通訊機制,調用者進行調用後,會等待結果,有結果後才傳回:調用者檢查結果是否就緒
- 異步:也是關注消息通訊機制,調用後立刻傳回,可能沒有結果。等有結果後,由被調用者通過知調用者:被調用者檢查調用結果是否就緒
- 阻塞:就等待結果時的狀态,阻塞是指在調用結果傳回之前,目前線程會被挂起,調用線程隻有在得到結果之後才會繼續執行
- 非阻塞:等 待結果時,調用線程不被挂起,還執行其他的事情
- 同步阻塞、同步非阻塞、異步非阻塞,異步阻塞
- 同步阻塞,一個線程請求網絡,并等到請求結果回來
- 同步非阻塞:一個線程請求網絡後,先去執行别的,不間斷的來檢視網絡下載下傳結果。
- 異步阻塞:調用線程支請求網絡,一直等待請求線程的下載下傳完成通知
- 異步非阻塞:調用請求網絡線程後,去幹别的,等待來通知後在繼續執行對應的邏輯,我們常用的網絡請求方式
3> 程式在請求網絡時,到底做了什麼?和IO有什麼關系?
[外鍊圖檔轉存失敗(img-ShI7JtrW-1562476986964)(https://raw.githubusercontent.com/winrainbow/imageRepository/master/IO.jpg)]
4> NIO原理
- 所有的系統的I/O都分為兩個階段:等待就緒和操作 如:讀分為等待系統可讀和真正的讀,寫分為等待網卡可以和真正的寫
- 需要說明的是等待就緒的阻塞是不使用cpu的,在空等,而真正的讀寫操作的阻塞是使用cpu的,是真正幹活的,而且這個過程非常快,屬于memory copy 帶寬通常是在1GB/s級别以上,可以了解為基本不耗時
socket.read() 在BIO中,socket.read()如果TCP recvBuffer裡沒有資料,函數會一直阻塞,直到收到資料,傳回讀到的資料
對于NIO,如果TCP RecvBuffer有資料,就把資料從網卡讀到記憶體中,并且傳回給使用者;反之直接傳回0,永遠不會阻塞
AIO(Async I/O)中,不但等待就緒是非阻塞的,連資料從網卡到記憶體的過程也是異步的
BIO:我要讀;NIO:我可以讀了 ;AIO:我讀完了
- NIO重要特點:socket主要的讀、寫、注冊和接收函數,在等待就緒階段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但性能非常高)
- BIO模型中,之是以需要多線程,是因為進行I/O操作時,一是沒有辦法知道能不能寫,能不能讀,隻能“傻等”,即使通過各種估算,算出來作業系統沒有能力進行讀寫,也沒有辦法在socket.read()和socket.write()方法中傳回,這兩個方法無法進行有效的中斷是以除了多開線程另起爐竈,沒有好的辦法利用CPU
- NIO的讀寫方法可以立刻傳回,這就給了我們不開線程利用CPU的最好機會,如果一個連接配接不能讀寫(socket.read()或者socket.write()傳回0) 我們就可以把這件事記下來,記錄的方式通常是在 Selector上注冊标記位,然後切換到其他就緒的連接配接(channel)繼續進行讀寫
- NIO幾個事件:讀就緒、寫就緒、有新連接配接到來
select是阻塞的,無論是通過作業系統的通知,還是不停的輪詢,這個函數是阻塞的,是以可以放心大膽的在一個while(true)裡面調用這個方法,而不用擔心cpu空轉
- 首先注冊當這幾個事件到來的時候所對應的處理器,然後在合适的時機告訴事件選擇器:我對這個事件有興趣;
- 對于寫操作,就是寫不出去的時候對寫事件有興趣,
- 對于讀操作,就是完成連接配接和系統沒有辦法承載新讀入的資料時
- 對于accept,一般是伺服器剛啟動的時候,
- 對于connect,一般是connect失敗需要重連或者直接異步調用connect的時候
- 用一個死循環選擇就緒的事件,會執行系統調用(epoll,Windows:IOCP),還會阻塞等待事件的到來,新事件到來的時候,會在selector上注冊标記位,标示可讀、可寫、有連接配接到來
5> NIO示例
package com.company;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* NIO server
*/
public class Main {
public static void main(String[] args) throws IOException {
System.out.println("hello World");
new Server().start();
}
static class Server {
/**
* selector
*/
private Selector selector;
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
private String tempContent;
/**
* 開啟服務
* @throws IOException
*/
public void start() throws IOException{
/**
* 打開伺服器 套接字通道
*/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
/**
* 設定為非阻塞
*/
serverSocketChannel.configureBlocking(false);
/**
* 綁定要監聽的端口号
*/
serverSocketChannel.bind(new InetSocketAddress("localhost",8001));
/**
* 找到selector
*/
selector = Selector.open();
/**
* socketServerChannel告之selector 我對 接入感興趣,如果有準備好的接入,通知我
*/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (!Thread.currentThread().isInterrupted()){
selector.select(); // 阻塞的
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 準備好的事件,接入、讀、寫
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if(!key.isValid()){
return;
}
if(key.isAcceptable()){
accept(key);
}else if(key.isReadable()){
read(key);
}else if(key.isWritable()){
write(key);
}
iterator.remove();// 事件處理完,丢棄
}
}
}
private void write(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel(); // 辨別用戶端過來的一個連結
sendBuffer.clear(); // 清空
sendBuffer.put(tempContent.getBytes());
sendBuffer.flip(); // 改變position位置
channel.write(sendBuffer); // 寫資料
channel.register(selector,SelectionKey.OP_READ); // 監聽該連結 用戶端發過來的資料
System.out.println("發出:"+tempContent); // 要響應的内容
}
private void read(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel(); // 辨別用戶端過來的一個連結
tempContent = "";
readBuffer.clear();
int readLength = channel.read(readBuffer);// 讀資料
tempContent = new String(readBuffer.array(),0,readLength);
System.out.println("收到:"+tempContent);
readBuffer.flip();
channel.register(selector,SelectionKey.OP_WRITE); // 監聽該連結,開始寫資料
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
channel.register(selector,SelectionKey.OP_READ);
System.out.println("a new client connected "+channel.getRemoteAddress());
}
}
}
NioClient
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class Main {
public static void main(String[] args) throws IOException {
System.out.println("I am client !");
new Client().start();
}
static class Client {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
public void start() throws IOException{
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("localhost",8001));
Selector selector = Selector.open();
sc.register(selector, SelectionKey.OP_CONNECT);
Scanner scanner = new Scanner(System.in);
while(true){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("selectionKeys.length:"+selectionKeys.size());
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if(key.isConnectable()){
System.out.println("selectionKeys_connectable");
// 連接配接成功
sc.finishConnect();
sc.register(selector,SelectionKey.OP_WRITE);
System.out.println("連接配接成功");
}else if(key.isWritable()){
System.out.println("selectionKeys_writable");
SocketChannel channel = (SocketChannel) key.channel();
InetSocketAddress localAddress = (InetSocketAddress) channel.getLocalAddress();
String ip = localAddress.getAddress().getLocalHost().toString();
String input = scanner.nextLine();
String writeStr = "hello i am client:["+ip+"]"+input;
writeBuffer.clear();
writeBuffer.put(writeStr.getBytes());
writeBuffer.flip();
channel.write(writeBuffer);
channel.register(selector,SelectionKey.OP_READ);
}else if(key.isReadable()){
System.out.println("selectionKeys_readable");
SocketChannel channel = (SocketChannel) key.channel();
readBuffer.clear();
int readLength = channel.read(readBuffer);
System.out.println("收到:"+new String(readBuffer.array(),0,readLength));
sc.register(selector,SelectionKey.OP_WRITE);
}
iterator.remove();
}
}
}
}
}
NIOClient
6> NIO适用場景
如果你需要管理數以千計的同時打開的連接配接,每一個隻發送一點點的資料,例如聊天伺服器,在NIO中實作伺服器可能是一個優勢。 同樣,如果你需要保持與其他計算機的大量開放連接配接,例如 在P2P網絡中,使用單個線程來管理所有出站連接配接可能是一個優勢
其他内容:
優化線程模型
NIO由原來的阻塞讀寫(占用線程)變程單線程輪詢事件,找到可以進行讀寫的網絡描述符進行讀寫,除了事件的輪詢是阻塞的(沒有可幹的事情,必須阻塞),剩餘的I/O操作都是純CPU操作 ,沒有必要開啟多線程
并且由于線程的節約,連接配接數大的時候因為線程切帶來的性 能問題也解決了,進而為海量連接配接提供了可能
單線程處理I/O的效率确實非常高,沒有線程切換,隻是拼命的讀、寫、選擇事件。但現在的伺服器一般是多核的處理器,如果能利用多核心進行I/O無疑對效率會有更大的提升
我們需要的線程:
- 事件分發器:單線程選擇就緒的事件
- I/O處理器:包括connect、read、write等,這種純cpu操作,一般開啟的線程的個數和cpu核心個數相同
- 業務線程:在處理完I/O後,業務一般會有自己的業務邏輯,有的還會有其他的阻塞I/O,如DB操作、RPC等,隻要有阻塞,就需要單獨的線程
- Java的selector對于Linux系統來說有個限制,同一個channel的select不能被并發的調用。是以,如果有多個I/O線程,必須保證:一個socket隻能屬于一個IOthread,而一個IOThread可以管理多個socket。
事件分發器
一般情況下,I/O 複用機制需要事件分發器(event dispatcher)。 事件分發器的作用,即将那些讀寫事件源分發給各讀寫事件的處理者,就像送快遞的在樓下喊: 誰誰誰的快遞到了, 快來拿吧!開發人員在開始的時候需要在分發器那裡注冊感興趣的事件,并提供相應的處理者(event handler),或者是回調函數;事件分發器在适當的時候,會将請求的事件分發給這些handler或者回調函數。
涉及到事件分發器的兩種模式稱為:Reactor和Proactor。 Reactor模式是基于同步I/O的,而Proactor模式是和異步I/O相關的。在Reactor模式中,事件分發器等待某個事件或者可應用或個操作的狀态發生(比如檔案描述符可讀寫,或者是socket可讀寫),事件分發器就把這個事件傳給事先注冊的事件處理函數或者回調函數,由後者來做實際的讀寫操作。
而在Proactor模式中,事件處理者(或者代由事件分發器發起)直接發起一個異步讀寫操作(相當于請求),而實際的工作是由作業系統來完成的。發起時,需要提供的參數包括用于存放讀到資料的緩存區、讀的資料大小或用于存放外發資料的緩存區,以及這個請求完後的回調函數等資訊。事件分發器得知了這個請求,它默默等待這個請求的完成,然後轉發完成事件給相應的事件處理者或者回調。舉例來說,在Windows上事件處理者投遞了一個異步IO操作(稱為overlapped技術),事件分發器等IO Complete事件完成。這種異步模式的典型實作是基于作業系統底層異步API的,是以我們可稱之為“系統級别”的或者“真正意義上”的異步,因為具體的讀寫是由作業系統代勞的。
舉個例子,将有助于了解Reactor與Proactor二者的差異,以讀操作為例(寫操作類似)。
在Reactor中實作讀
- 注冊讀就緒事件和相應的事件處理器。
- 事件分發器等待事件。
- 事件到來,激活分發器,分發器調用事件對應的處理器。
- 事件處理器完成實際的讀操作,處理讀到的資料,注冊新的事件,然後返還控制權。
在Proactor中實作讀:
- 處理器發起異步讀操作(注意:作業系統必須支援異步IO)。在這種情況下,處理器無視IO就緒事件,它關注的是完成事件。
- 事件分發器等待操作完成事件。
- 在分發器等待過程中,作業系統利用并行的核心線程執行實際的讀操作,并将結果資料存入使用者自定義緩沖區,最後通知事件分發器讀操作完成。
- 事件分發器呼喚處理器。
- 事件處理器處理使用者自定義緩沖區中的資料,然後啟動一個新的異步操作,并将控制權傳回事件分發器。
EPoll(linux大于 2.6) 和 Poll(linux 小于2.6)
底層的實作,具體可以去看作業系統的底層實作
JavaNio 在java自身的語言層面上,使用了緩存和記憶體映射來提高IO的讀寫效率
read()和write()
- read()兩個階段:1,等待資料準備(時間長)2.将資料從核心記憶體空間拷貝到程序的記憶體空間中
Buffer的建立,及各種操作
Netty:
三人行,必有吾師,歡迎加入星球,一起讨論技術點滴
