了解 IO
輸入輸出(IO)是指計算機同任何外部裝置之間的資料傳遞。常見的輸入輸出裝置有檔案、鍵盤、列印機、螢幕等。資料可以按記錄(或稱資料塊)的方式傳遞,也可以 流的方式傳遞 。
所謂記錄,是指有着内部結構的資料塊。記錄内部除了有需要處理的實際資料之外,還可能包含附加資訊,這些附加資訊通常是對本記錄資料的描述。
同步和異步
同步
- 概念:指的是使用者程序觸發 IO 操作并等待或者輪詢的去檢視 IO 操作是否就緒。
- 例子:每天要吃飯,下班後自己跑去菜市場買菜,回來在做飯,所有的事情都是自己一件接着一件執行的。
異步
- 概念:異步是指使用者程序觸發IO操作以後便開始做自己的事情,而當 IO 操作已經完成的時候會得到 IO 完成的通知(異步的特點就是通知)。
- 例子:在某超市網上APP上選中食材後下單,委托快遞員送菜上門,超市給你備貨,快遞員送貨,而你在他們備貨送菜的階段還可以做其他事情。(使用異步 IO 時,Java 将 IO 讀寫委托給 OS 處理,需要将資料緩沖區位址和大小傳給 OS )。
差別
IO 操作主要分為兩個步驟,即發起 IO 請求和實際 IO 操作,同步與異步的差別就在于第二個步驟是否阻塞。
若實際 IO 操作阻塞請求程序,即請求程序需要等待或者輪詢檢視 IO 操作是否就緒,則為同步 IO;若實際 IO 操作并不阻塞請求程序,而是由作業系統來進行實際 IO 操作并将結果傳回,則為異步 IO。
計算把記憶體分為使用者記憶體和系統記憶體兩部分,同步和異步是針對應用程式(使用者記憶體)和核心(系統記憶體)的互動而言的。
阻塞和非阻塞
阻塞
- 概念:所謂阻塞方式就是指,當視圖對檔案描述符或者網絡套接字進行讀寫時,如果當時沒有東西可讀,或者暫時不可寫,程式就進入等待狀态,直到有東西讀或者寫。
- 例子:去公交站充值,發現這個時候,充值員不在(可能上廁所去了),然後我們就在這裡等待,一直等到充值員回來為止。
非阻塞
- 概念:所謂的非阻塞方式就是指,當視圖對檔案描述符或者網絡套接字進行讀寫時,如果沒有東西可讀,或者不可寫,讀寫函數馬上傳回,無須等待。
- 例子:銀行裡取款辦業務時,領取一張發票,領取完後我們自己可以玩玩手機,或者與别人聊聊天,當輪我們時,銀行的喇叭會通知,這時候我們就可以去了。
IO 操作主要分為兩個步驟,即發起 IO 請求和實際 IO 操作,阻塞與非阻塞的差別就在于第一個步驟是否阻塞。
若發起 IO 請求後請求線程一直等待實際 IO 操作完成,則為阻塞 IO;若發起 IO 請求後請求線程傳回而不會一直等待,即為非阻塞 IO。
阻塞和非阻塞是針對于程序在通路資料的時候,根據 IO 操作的就緒狀态來采取的不同方式,說白了是一種讀取或者寫入操作函數的實作方式,阻塞方式下讀取或者寫入函數将一直等待,而非阻塞方式下,讀取或者寫入函數會立即傳回一個狀态值。
同步阻塞和同步非阻塞
同步阻塞(JAVA BIO)
同步并阻塞 IO,伺服器實作模式一個連接配接一個線程,即用戶端有連接配接請求時服務端就需要啟動一個線程進行處理,如果這個連接配接不做任何事情,就會造成不必要的線程開銷,當然可以通過線遲(Thread-Pool)程機制改善。

圖1 同步阻塞
Note: linux 核心中通常基于 Netlink 協定簇實作使用者空間和核心空間的通訊。這裡的系統調用指代是
recvfrom
過程,其實作邏輯大緻如下:
圖2 Netlink協定簇使用者态和核心态通訊流程 圖檔來源:
https://www.cnblogs.com/mosp/p/3558506.html● 使用者空間:
建立流程大體如下:
- ① 建立 socket 套接字
- ② 調用 bind 函數完成位址的綁定,不過同通常意義下 server 端的綁- 定還是存在一定的差别的,server端通常綁定某個端口或者位址,而此處的綁定則是 将 socket 套接口與本程序的 pid 進行綁定 ;
- ③ 通過 sendto 或者sendmsg函數發送消息;
- ④ 通過 recvfrom 或者 rcvmsg 函數接受消息。
● 核心空間:
核心空間主要完成以下三方面的工作:
- ① 建立 netlinksocket,并注冊回調函數,注冊函數将會在有消息到達 netlinksocket 時會執行;
- ② 根據使用者請求類型,處理使用者空間的資料;
- ③ 将資料發送回使用者。
同步非阻塞(JAVA NIO)
同步非阻塞,伺服器實作模式一個請求一個線程,即用戶端發送的連接配接請求都會注冊到多路複用器上,多路複用器輪詢到連接配接有 I/O 請求時才啟動一個線程處理。使用者程序也需要時不時地詢問IO操作是否就緒,這就要求使用者程序不停的去詢問。
圖3 同步非阻塞
異步阻塞和異步非阻塞
異步阻塞(Java NIO)
異步阻塞,應用發起一個 IO 操作以後,不需要等待核心 IO 操作完成,等待核心完成 IO 操作以後會通知應用程式,這其實就是異步和同步的關鍵差別,同步必須等待或者主動去詢問 IO 操作是否完成。那為什麼說阻塞呢?因為此時是通過 select 系統調用來完成的,而 select 函數本身的實作方式就是阻塞的,但采用 select 函數有個好處就是它可以同時監聽多個檔案句柄(如果從UNP的角度看,select 屬于同步操作。因為 select 之後,程序還需要讀寫資料),進而提高系統的并發性。
圖4 異步阻塞
異步非阻塞(Java AIO)
異步非阻塞,此種方式下,使用者程序隻需要發起一個IO操作便立即傳回,等 IO 操作真正完成以後,應用程式會得到IO操作完成的通知,此時使用者程序隻需要對資料處理就好了,不需要進行實際的 IO 讀寫操作,因為真正的 IO 操作已經由作業系統核心完成了。
圖5 異步非阻塞
BIO、NIO、AIO适用場景分析
BIO 适用于連接配接數目比較小且固定的結構。它對伺服器資源要求比較高,并發局限于應用中,JDK1.4之前唯一選擇,但程式直覺簡單易了解,如之前在 Apache 中使用。
NIO 适用于連接配接數目多且連接配接比較短的架構,比如聊天伺服器,并發局限于應用中,變成比較複雜。JDK1.4開始支援,如在 Nginx、Netty 中使用。
AIO 适用于連接配接數目多且連接配接比較長(重操作)的架構,比如相冊伺服器,充分調用 OS 參與并發操作,程式設計比較複雜,JDK7 開始支援,在成長中,Netty 曾經使用過,後來放棄。
在大多數場景下,不建議直接使用 JDK 的 NIO 類庫(門檻很高),除非精通 NIO 程式設計或者有特殊的需求。在絕大多數的業務場景中,可以使用 NIO 架構 Netty 來進行 NIO 程式設計,其既可以作為用戶端也可以作為服務端,且支援 UDP 和異步檔案傳輸,功能非常強大。
NIO 比 BIO 把一些無效的連接配接擋在了啟動線程之前,減少了這部分資源的浪費。因為我們都知道每建立一個線程,就要為這個線程配置設定一定的記憶體空間。
AIO 比 NIO 進一步改善是,将一些暫時可能無效的請求擋在了啟動線程之前,比如在 NIO 的處理方式中,當一個請求來的話,開啟線程進行處理,但這個請求所需要的資源還沒有就緒,此時必須等待後端的應用資源,這時線程就被阻塞了。
Java對BIO、NIO、AIO的支援
Java IO(Java 資料流)主要就是 Java 用來讀取和輸出資料流。它有對應的一系列API。主要是
java.io.*
和
java.nio.*
。
Java 中 IO 主要有兩類:位元組流(讀寫以位元組(8bit)為機關,InputStream 和 OutputStream 為主要代表;字元流(讀寫以字元為機關,Reader 和 Writer 為主要代表) 。
讀取純文字資料優選用字元流,其他使用位元組流。因為字元流是以虛拟機的 encode 來處理,一次能讀多個位元組;而圖檔和視訊類的檔案是根據二進制進行存儲,當用字元流處理的時候有些會找不到映射的碼表,會造成資料缺失。下面介紹下經常用到的io類:
- 基本流:從特定的地方讀寫的流類,如磁盤或一塊記憶體區域。既有來源。
FileInputStream/FileOutputStream(檔案位元組輸出流)、
- 處理流(進階流,過濾流):沒有資料源,不能獨立存在,它的存在是用于處理基本流的,是使用一個已經存在的輸入流或輸出流連接配接建立。
- 緩沖位元組進階流:BufferedOutputStream/BufferdInputStream,内部維護者一個緩沖區,每次都盡可能的讀取更多的位元組放入到緩沖區,再将緩沖區中的内容部分或全部傳回給使用者,是以可以提高讀寫效率。
- 基本資料類型進階流:DataOutputStream/DataInputStream
- 字元進階流:OutputStreamWriter/InputStreamReader
- 緩沖字元進階流:BufferWriter/BufferReader
- 檔案字元進階流:FileWriter/FileReader,用于讀寫“文本檔案”
BIO、NIO、AIO 通信機制
BIO 程式設計模型
采用 BIO 通信模型的服務端,通常有一個獨立的 Acceptor 線程負責監聽用戶端的連接配接,它接收到用戶端的連接配接請求之後,為每個用戶端建立一個新的線程進行鍊路處理,處理完之後,通過輸出流傳回應答用戶端,線程銷毀。這就是典型的
一請求一應答
通信模型。這個是在多線程情況下執行的。當在單線程環境條件下時,在 while 循環中服務端會調用 accept 方法等待接收用戶端的連接配接請求,一旦收到這個連接配接請求,就可以建立 socket,并在 socket 上進行讀寫操作,此時不能再接收其他用戶端的連接配接請求,隻能等待同目前服務端連接配接的用戶端的操作完成或者連接配接斷開。
該模型最大的問題就是缺乏彈性伸縮能力,當用戶端并發通路量增加後,服務端的線程個數和用戶端并發通路數呈 1:1 的正比關系,由于線程是 Java 虛拟機非常寶貴的系統資源,當線程數膨脹之後,系統的性能将急劇下降,随着并發通路量的繼續增大,系統會發生線程堆棧溢出、建立新線程失敗等問題,并最終導緻程序當機或者僵死,不能對外提供服務。
僞異步 I/O 程式設計
為了解決同步阻塞I/O面臨的一個鍊路需要一個線程處理的問題,後來有人對它的線程模型進行了優化,後端通過一個線程池來處理多個用戶端的請求接入,形成用戶端個數M:線程池最大線程數N的比例關系,其中M可以遠遠大于N,通過線程池可以靈活的調配線程資源。設定線程的最大值,防止由于海量并發接入導緻線程耗盡。
采用線程池和任務隊列可以實作一種叫做僞異步的I/O通信架構。
當有新的用戶端接入時,将用戶端的 Socket 封裝成一個 Task(該任務實作 Java.lang.Runnablle 接口)投遞到後端的線程池中進行處理,JDK 的線程池維護一個消息隊列和N個活躍線程對消息隊列中的任務進行處理。由于線程池可以設定消息隊列的大小和最大線程數,是以,它的資源占用是可控的,無論多少個用戶端并發通路,都不會導緻資源的耗盡和當機。
由于線程池和消息隊列都是有界的,是以,無論用戶端并發連接配接數多大,它都不會導緻線程個數過于膨脹或者記憶體溢出,相對于傳統的一連接配接一線程模型,是一種改良。
僞異步I/O通信架構采用了線程池實作,是以避免了為每個請求都建立一個獨立線程造成的線程資源耗盡問題。但是由于它底層的通信依然采用同步阻塞模型,是以無法從根本上解決問題。
通過對輸入和輸出流的 API 文檔進行分析,我們了解到讀和寫操作都是同步阻塞的,阻塞的時間取決于對方 IO 線程的處理速度和網絡 IO 的傳輸速度,本質上講,我們無法保證生産環境的網絡狀況和對端的應用程式能足夠快,如果我們的應用程式依賴對方的處理速度,它的可靠性就會非常差。
NIO程式設計模型
與 Socket 類和 ServerSocket 類對應,NIO 也提供了 SocketChannel 和 ServerSocketChannel 兩種不同的套接字通道實作,在 JDK1.4 中引入。這兩種新增的通道都支援阻塞和非阻塞兩種模式。阻塞模式非常簡單,但性能和可靠性都不好,非阻塞模式正好相反。我們可以根據自己的需求來選擇合适的模式,一般來說,低負載、低并發的應用程式可以選擇同步阻塞 IO 以降低程式設計複雜度,但是對于高負載、高并發的網絡應用,需要使用 NIO 的非阻塞模式進行開發。
- (1)緩沖區 Buffer
Buffer 是一個對象,它包含一些要寫入或者要讀出的資料,在 NIO 庫中,所有資料都是用緩沖區處理的。在讀取資料時,它是直接讀到緩沖區中的;在寫入資料時,寫入到緩沖區中,任何時候通路 NIO 中的資料,都是通過緩沖區進行操作。
緩沖區實質上是一個數組。通常它是一個位元組數組(ByteBuffer),也可以使用其他種類的數組,但是一個緩沖區不僅僅是一個數組,緩沖區提供了對資料的結構化通路以及維護讀寫位置(limit)等資訊。常用的有ByteBuffer,其它還有CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。
- (2)通道 Channel
Channel 是一個通道,可以通過它讀取和寫入資料,它就像自來水管一樣,網絡資料通過 Channel 讀取和寫入。通道與流的不同之處在于通道是雙向的,流隻是一個方向上移動(一個流必須是 InputStream 或者 OutputStream 的子類),而且通道可以用于讀、寫或者用于讀寫。同時Channel 是全雙工的,是以它可以比流更好的映射底層作業系統的API。特别是在Unix網絡程式設計中,底層作業系統的通道都是全雙工的,同時支援讀寫操作。我們常用到的 ServerSocketChannnel 和 SocketChannel 都是SelectableChannel 的子類。
- (3)多路複用器Selector
多路複用器 Selector 是 Java NIO 程式設計的基礎,多路複用器提供選擇已經就緒的任務的能力,簡單的說,Selector 會不斷的輪詢注冊在其上的 Channel,如果某個 Channel 上面有新的 TCP 連接配接接入、讀和寫事件,這個 Channel 就處于就緒狀态,會被 Selector 輪詢出來,然後通過 SelectionKey 可以擷取就緒 Channel 的集合,進行後續的 I/O 操作。
一個多用複用器 Selector 可以同時輪詢多個 Channel,由于 JDK 使用了 epoll() 代替傳統的 select() 實作,是以它并沒有最大連接配接句柄 1024/2048 的限制,這也意味着隻需要一個線程負責 Selector 的輪詢,就可以接入成千上萬的用戶端。
盡管 NIO 程式設計難度确實比同步阻塞 BIO 大很多,但是我們要考慮到它的優點:
(1)用戶端發起的連接配接操作是異步的,可以通過在多路複用器注冊 OP_CONNECT 等後續結果,不需要像之前的用戶端那樣被同步阻塞。
(2)SocketChannel 的讀寫操作都是異步的,如果沒有可讀寫的資料它不會同步等待,直接傳回,這樣IO通信線程就可以處理其它的鍊路,不需要同步等待這個鍊路可用。
(3)線程模型的優化:由于 JDK 的 Selector 在 Linux 等主流作業系統上通過 epoll 實作,它沒有連接配接句柄數的限制(隻受限于作業系統的最大句柄數或者對單個程序的句柄限制),這意味着一個 Selector 線程可以同時處理成千上萬個用戶端連接配接,而且性能不會随着用戶端的增加而線性下降,是以,它非常适合做高性能、高負載的網絡伺服器。
AIO程式設計模型
JDK1.7 更新了 NIO 類庫,更新後的 NIO 類庫被稱為NIO2.0。也就是我們要介紹的 AIO。NIO2.0 引入了新的異步通道的概念,并提供了異步檔案通道和異步套接字通道的實作。異步通道提供兩種方式擷取操作結果。
(1)通過
Java.util.concurrent.Future
類來表示異步操作的結果;
(2)在執行異步操作的時候傳入一個
Java.nio.channels.CompletionHandler
接口的實作類作為操作完成的回調。
NIO2.0 的異步套接字通道是真正的異步非阻塞 IO,它對應 UNIX 網絡程式設計中的事件驅動 IO(AIO),它不需要通過多路複用器(Selector)對注冊的通道進行輪詢操作即可實作異步讀寫,進而簡化了 NIO 的程式設計模型。
我們可以得出結論:異步 Socket Channel是被動執行對象,我們不需要想NIO程式設計那樣建立一個獨立的IO線程來處理讀寫操作。對于
AsynchronousServerSocketChannel
AsynchronousSocketChannel
,它們都由 JDK 底層的線程池負責回調并驅動讀寫操作。正因為如此,基于 NIO2.0 新的異步非阻塞 Channel 進行程式設計比 NIO 程式設計更為簡單。
BIO、NIO和AIO使用樣例
BIO Demo
- 原始BIO
package com.openmind.io.bio;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* ${name}
*
* @author zhoujunwen
* @date 2019-11-05
* @time 16:14
* @desc
*/
public class BIODemo {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("0.0.0.0", 8888), 50);
Socket socket;
while ((socket = serverSocket.accept()) != null) {
InputStream is = socket.getInputStream();
byte[] data = new byte[1024];
is.read(data);
System.out.println(new String(data, UTF_8));
OutputStream out = socket.getOutputStream();
out.write(data);
socket.close();
}
}
}
- 新線程處理用戶端讀寫
package com.openmind.io.bio;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
/**
* ${name}
*
* @author zhoujunwen
* @date 2019-11-05
* @time 16:19
* @desc
*/
public class NewThreadBIODemo {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("0.0.0.0", 8888), 50);
Socket socket;
while ((socket = serverSocket.accept()) != null) {
final Socket clientSocket = socket;
new Thread(new Runnable() {
@Override
public void run() {
try {
InputStream is = clientSocket.getInputStream();
byte[] data = new byte[1024];
is.read(data);
OutputStream out = clientSocket.getOutputStream();
out.write(data);
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
}
- 線程池處理用戶端讀寫
package com.openmind.io.bio;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* ${name}
*
* @author zhoujunwen
* @date 2019-11-05
* @time 16:25
* @desc
*/
public class ThreadPoolBIODemo {
public static void main(String[] args) throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("0.0.0.0", 8888), 50);
Socket socket;
while ((socket = serverSocket.accept()) != null) {
final Socket clientSocket = socket;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
InputStream is = clientSocket.getInputStream();
byte[] data = new byte[1024];
is.read(data);
OutputStream out = clientSocket.getOutputStream();
out.write(data);
clientSocket.close();
} catch (Exception e) {
;
}
}
});
}
}
}
NIO Demo
package com.openmind.io.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* ${name}
*
* @author zhoujunwen
* @date 2019-11-05
* @time 16:54
* @desc
*/
public class NIODemo {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("0.0.0.0", 8888), 50);
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]);
SocketChannel clientChannel = (SocketChannel) key.channel();
int read = clientChannel.read(buffer);
if (read == -1) {
key.cancel();
clientChannel.close();
} else {
buffer.flip();
clientChannel.write(buffer);
}
}
}
iterator.remove();
}
}
}
AIO Demo
package com.openmind.io.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* AIO服務端
*
* @author zhoujunwen
* @date 2019-11-08
* @time 17:50
* @desc
*/
public class AIO_Server {
static Charset charset = Charset.forName("UTF-8");
public static void main(String[] args) throws InterruptedException {
int port = 7890;
new Thread(new AioServer(port)).start();
TimeUnit.MINUTES.sleep(60);
}
static class AioServer implements Runnable {
int port;
AsynchronousChannelGroup group;
AsynchronousServerSocketChannel serverSocketChannel;
public AioServer(int port) {
this.port = port;
init();
}
public void init() {
try {
// 建立處理線程池
group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5);
// 建立服務channel
serverSocketChannel = AsynchronousServerSocketChannel.open(group);
// 丙丁端口
serverSocketChannel.bind(new InetSocketAddress(port));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
// 接收請求
// accept的第一個參數附件,第二個參數是收到請求後的接收處理器
// 接收處理器AcceptHandler泛型的第一個參數的處理結果,這裡是AsynchronousSocketChannel,即接收到的請求的channel
// 第二個參數是附件,這裡是AioServer,即其執行個體
serverSocketChannel.accept(this, new AcceptHandler());
}
}
/**
* 接收請求處理器
* completionHandler泛型的第一個參數的處理結果,這裡是AsynchronousSocketChannel,即接收到的請求的channel,
* 第二個參數是附件,這裡是AioServer,即其執行個體
*/
static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AioServer> {
@Override
public void completed(AsynchronousSocketChannel result, AioServer attachment) {
// 繼續接收下一個請求,構成循環調用
attachment.serverSocketChannel.accept(attachment, this);
try {
System.out.println("接收到連接配接請求:" + result.getRemoteAddress().toString());
// 定義資料讀取緩存
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]);
// 讀取資料,并傳入資料到達時的處理器
// read的第一個參數資料讀取到目标緩存,第二個參數是附件,第三個傳輸的讀取結束後的處理器
// 讀取處理器泛型的第一個參數是讀取的位元組數,第二個參數輸附件對象
result.read(buffer, buffer, new ReadHandler(result));
// 新開新城發送資料
new Thread(new WriteThread(result)).start();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, AioServer attachment) {
}
}
/**
* 讀取資料處理器
* completionHandler第一個參數是讀取的位元組數,第二個參數輸附件對象
*/
static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel socketChannel;
public ReadHandler(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result == -1) {
attachment.clear();
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
return;
}
attachment.flip();
String readMsg = charset.decode(attachment).toString();
System.out.println("服務端接收到的資料:" + readMsg);
attachment.compact();
// 繼續接收資料,構成循環
socketChannel.read(attachment, attachment, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
/**
* 寫出資料處理器
*/
static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel socketChannel;
Scanner scanner;
public WriteHandler(AsynchronousSocketChannel socketChannel, Scanner scanner) {
this.socketChannel = socketChannel;
this.scanner = scanner;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.compact();
String msg = scanner.nextLine();
System.out.println("服務端即将發送的資料:" + msg);
attachment.put(charset.encode(msg));
attachment.flip();
// 繼續寫資料,構成循環
socketChannel.write(attachment, attachment, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
static class WriteThread implements Runnable {
private AsynchronousSocketChannel channel;
public WriteThread(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
// 第一緩沖區
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
String msg = scanner.nextLine();
System.out.println("服務端輸入資料:" + msg);
buffer.put(charset.encode(msg + System.lineSeparator()));
buffer.flip();
// 寫入資料,并有寫資料時的處理器
// write的第一個參數是資料寫入的緩存,第二個參數是附件,第三個參數寫結束後的處理器
// 讀取處理器泛型的第一個參數是寫入的位元組數,第二個是附件類型
channel.write(buffer, buffer, new WriteHandler(channel, scanner));
}
}
}
package com.openmind.io.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* AIO用戶端
*
* @author zhoujunwen
* @date 2019-11-11
* @time 09:31
* @desc
*/
public class AIO_Client {
static Charset charset = Charset.forName("UTF-8");
public static void main(String[] args) throws InterruptedException {
int port = 7890;
String host = "127.0.0.1";
// 啟動用戶端
new Thread(new AIOClient(port, host)).start();
TimeUnit.MINUTES.sleep(100);
}
static class AIOClient implements Runnable {
int port;
String host;
AsynchronousChannelGroup group;
AsynchronousSocketChannel channel;
InetSocketAddress address;
public AIOClient(int port, String host) {
this.port = port;
this.host = host;
// 初始化
init();
}
private void init() {
try {
// 建立處理線程組
group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5);
// 建立用戶端channel
channel = AsynchronousSocketChannel.open(group);
address = new InetSocketAddress(host, port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
// 接收請求,并傳入收到請求後的處理器
// connect 方法的第一二個參數是目标位址,第二個參數是附件對象,第三個參數是連接配接處理器
// 連接配接處理器的泛型的第一個參數為空(即Void),第二個參數為附件
channel.connect(address, channel, new ConnectHandler());
}
}
/**
* 連接配接處理器
*/
static class ConnectHandler implements CompletionHandler<Void, AsynchronousSocketChannel> {
@Override
public void completed(Void result, AsynchronousSocketChannel attachment) {
try {
System.out.println("connect server: " + attachment.getRemoteAddress().toString());
// 定義資料讀取緩存
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 讀取資料,并傳入到資料到達時的處理器
attachment.read(buffer, buffer, new ReadHandler(attachment));
// 新開線程,發送資料
new WriteThread(attachment).start();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
}
}
/**
* 讀處理器
*/
static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel channel;
public ReadHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
String readMsg = charset.decode(attachment).toString();
System.out.println("client receive msg: " + readMsg);
attachment.compact();
// 繼續接收資料,構成循壞
channel.read(attachment, attachment, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
/**
* 寫處理器
*/
static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel channel;
Scanner scanner;
public WriteHandler(AsynchronousSocketChannel channel, Scanner scanner) {
this.channel = channel;
this.scanner = scanner;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.compact();
System.out.print("client input data: ");
String msg = scanner.nextLine();
System.out.println("clinet will send msg:" + msg);
attachment.put(charset.encode(msg));
attachment.flip();
// 繼續寫入資料,構成循環
channel.write(attachment, attachment, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
/**
* 寫處理獨立建立線程
*/
static class WriteThread extends Thread {
private AsynchronousSocketChannel channel;
public WriteThread(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
System.out.print("client input data:");
String msg = scanner.nextLine();
System.out.println("client send msg:" + msg);
buffer.put(charset.encode(msg));
buffer.flip();
channel.write(buffer, buffer, new WriteHandler(channel, scanner));
}
}
}