java之NIO
1 什麼是NIO
Java NIO (New IO,Non-Blocking IO)是從Java 1.4版本開始引入的一套新的IO API。NIO支援面向緩沖區的、基于通道的IO操作。NIO的三大核心部分:通道(Channel),緩沖區(Buffer), 選擇器(Selector),資料總是從通道讀取到緩沖區,或者從緩沖區寫入到通道中,選擇器用于監聽多個通道事件,如連接配接打開,資料到達等。Java NIO系統的核心在于:通道(Channel)和緩沖區(Buffer),Channel負責傳輸,Buffer負責存儲資料。
BIO與NIO的了解:傳統IO即BIO在進行資料傳輸時必須要建立一個連接配接才能進行資料的寫入和讀取。可以吧資料了解為水流,需要有管道,可以認為應用程式和檔案之間的連接配接就是一個管道用來運輸水流。輸入流和輸出流是不同的管道,他們是單向的。NIO在程序應用程式和檔案之間資料傳輸時他們的連接配接不能了解為管道,他有個概念為“通道”,可以了解為鐵軌,還有“緩沖區”可以了解為火車。起到運輸作用,但是本事不能進行運輸資料,資料的運輸需要借助于火車。當我們要讀取磁盤檔案的時候資料會先加載到緩沖區,然後傳輸到應用程式。
2 BIO與NIO的差別
(1)BIO是面向流,流是單向的。每次從流中讀取一個或者多個位元組,直到讀取完所有位元組,沒有被緩存起來,不能前後移動流中的資料,如果想要能前後移動的話需要将他緩存到另外一個緩沖區;NIO是面向緩沖區的,通道可以将資料讀取到緩存區實作雙向傳輸。NIO是将資料讀取到一個稍後處理的緩沖區,并且在需要的時候可以前後移動。
(2)BIO是阻塞式,一個線程調用read()或者write()的時候這個線程被阻塞,直到資料被讀取或者完全寫入,不能再幹其他事情;NIO是非阻塞式,一個線程從一個通道發送請求讀取資料,隻能擷取到目前可用的,沒資料可用就什麼都不會擷取,不保持阻塞,直到資料變得可以讀取之前,這個線程可以做其他事,寫也是這樣。非阻塞IO的線程在空閑時間作用在其他通道上執行IO操作,那麼一個線程就可以管理多個輸入輸出通道。
(3)BIO傳輸的是位元組流或字元流,NIO是通過塊傳輸。
面向檔案IO的差別:BIO是面向流的,NIO是面向緩沖區的。面向網絡IO的差別:BIO是阻塞的,NIO是非阻塞的,并且NIO有選擇器
3 緩沖區(Buffer)
3.1 緩沖區相關概念
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPR5UMJpmT0MmeNBDOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL3EzM3AjM0YTMyIDMxAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
通道表示IO源到 IO 裝置(例如:檔案、套接字)的連接配接。若需要使用 NIO 系統,需要擷取用于連接配接 IO 裝置的通道以及用于容納資料的緩沖區。然後操作緩沖區,對資料進行處理。
緩沖區(Buffer):Buffer主要用于與Channel互動,資料從Channel寫入Buffer,然後再從Buffer寫出到Channel。是一個用于特定基本資料類型(除boolean型外)的容器,底層使用數組存儲,可以儲存多個相同類型的資料。所有緩沖區都是java.nio.buffer的子類,常見的子類有ByteBuffer,CharBuffer,IntBuffer,DoubleBuffer,ShortBuffer,LongBuffer,FloatBuffer等,他們管理資料的方法都相似,管理的類型不同而已。
Buffer的實作類都是通過allocate(int,capacity)建立一個容量為capacity的對象。Buffer有以下基本屬性:
容量(capacity) | 辨別Buffer存儲的最大資料容量,聲明後不能更改,不能為負,通過capacity()擷取 |
限制(limit) | 第一個不應該讀取或寫入的資料的索引,也就是limit後的資料不可以讀寫,不能為負,不能大于capacity,通過limit()擷取 |
位置(position) | 目前要讀取或者寫入資料的索引,不能為負,不能大于limit,通過position()擷取 |
标記(mark) | 标記是一個索引,通過mark()标記後,可以通過調用reset()将position恢複到标記的mark處 |
上述屬性的範圍大小為: 0 <= mark <= position <= limit <= capacity
3.2 緩沖區的基本操作
緩沖區為所有的子類提供了兩個用于資料操作的方法put和get方法,如ByteBuffer的這兩個方法如下
方法 | 說明 |
put(byte b) | 将指定的單個位元組寫入緩沖區的目前位置 |
put(byte[] buf) | 将buf中的位元組寫入緩沖區的目前位置 |
put(int index,byte b) | 将指定位元組寫入緩沖區的索引位置,不移動position |
get() | 讀取單個位元組 |
get(byte[] buf) | 批量讀取多個位元組到buf中 |
get(int index) | 讀取指定索引位置的位元組,不移動position |
Buffer其他常用方法
方法 | 說明 |
Buffer flip() | 将limit設定為目前position,position設定為0,mark設定為-1 |
Buffer rewind() | 将position設定為0,mark設定為-1,可以重複讀 |
Buffer clear() | 将limit設定為capacity,position設定為0,mark設定為-1,資料沒有清空 |
Buffer mark() | 設定緩沖區的mark |
Buffer reset() | 将目前位置的position轉到之前設定的mark的位置 |
Buffer hasRemaining() | 判斷緩沖區中是否還有元素 |
int remaining | 傳回position和limit之間元素的個數 |
Xxx[] array() | 傳回XxxBuffer底層的Xxx數組 |
int capacity() | 傳回Buffer的capacity大小 |
int limit() | 傳回Buffer的limit位置 |
Buffer limit(int n) | 将設定緩沖區界限為 n, 并傳回一個具有新 limit 的緩沖區對象 |
int position() | 傳回Buffer的position位置 |
Buffer position(int n) | 将設定緩沖區的目前位置為 n , 并傳回修改後的 Buffer 對象 |
說明:①當我們調用ByteBuffer.allocate(10)方法建立了一個10個byte的數組的緩沖區,position的位置為0,capacity和limit預設都是數組長度。②當通過put方法寫入5個位元組到緩沖區時,position更新為5。③需要将緩沖區中的5個位元組資料寫入Channel的通信信道,調用ByteBuffer.flip()方法,變化為position設回0,并将limit設成之前的position的值④這時底層作業系統就可以從緩沖區中正确讀取這個5個位元組資料并發送出去了。在下一次寫資料之前我們再調用clear()方法,緩沖區的索引位置又回到了初始位置。
注意:clear()是把position設回0,limit設定成capacity,換句話說,其實Buffer中的資料沒有清空,隻是這些标記告訴我們可以從哪裡開始往Buffer裡寫資料。如果Buffer中有一些未讀的資料,調用clear()方法,資料将丢棄,那就沒有标記說明哪些資料讀過,哪些還沒有。如果還需要Buffer中未讀的資料,但是還想要先寫些資料,那麼使用compact()方法。compact()方法将所有未讀的資料拷貝到Buffer起始處。然後将position設到最後一個未讀元素正後面。limit屬性依然像clear()方法一樣,設定成capacity。現在Buffer準備好寫資料了,但是不會覆寫未讀的資料。
public static void main(String[] args) {
/**
* 通過allocate()擷取緩沖區,緩沖區主要有2個核心方法:put()将輸入存入緩沖區,get()擷取緩沖區資料
*/
// 擷取緩沖區
ByteBuffer buffer = ByteBuffer.allocate(1024);
System.out.println("capacity:"+ buffer.capacity()+"\t position:"+buffer.position()+"\t limit:"+buffer.limit());
// 将資料存入緩沖區
String str = "hello";
buffer.put(str.getBytes());
System.out.println("capacity:"+ buffer.capacity()+"\t position:"+buffer.position()+"\t limit:"+buffer.limit());
// 擷取緩沖區資料,要擷取緩存區的資料需要flip()切換緩沖區的模式
buffer.flip();
System.out.println("capacity:"+ buffer.capacity()+"\t position:"+buffer.position()+"\t limit:"+buffer.limit());
// 建立位元組資料接收資料
byte[] b = new byte[buffer.limit()];
buffer.get(b);
System.out.println(new String(b,0,buffer.limit()));
System.out.println("capacity:"+ buffer.capacity()+"\t position:"+buffer.position()+"\t limit:"+buffer.limit());
// rewind()可重複讀
buffer.rewind();
System.out.println("capacity:"+ buffer.capacity()+"\t position:"+buffer.position()+"\t limit:"+buffer.limit());
}
-----------------------------------
輸出結果:
capacity:1024 position:0 limit:1024
capacity:1024 position:5 limit:1024
capacity:1024 position:0 limit:5
hello
capacity:1024 position:5 limit:5
capacity:1024 position:0 limit:5
3.3 直接緩沖區和非直接緩沖區
緩沖區分為直接緩沖區和非直接緩存區:①非直接緩沖區:硬碟-->系統的緩沖區-->copy-->JVM緩沖區-->程式②直接緩沖區:需要copy,JVM和緩沖區實作映射。
直接位元組緩沖區, Java 虛拟機會盡最大努力直接在此緩沖區上執行本機 I/O 操作。也就是說在每次調用基礎作業系統的一個本機 I/O 操作之前(或之後),虛拟機都會盡量避免将緩沖區的内容複制到中間緩沖區中(或從中間緩沖區中複制内容)。
直接位元組緩沖區可以通過調用ByteBuffer的 allocateDirect() 工廠方法來建立。此方法傳回的緩沖區進行配置設定和取消配置設定所需成本通常高于非直接緩沖區。直接緩沖區的内容可以駐留在正常的垃圾回收堆之外,是以,它們對應用程式的記憶體需求量造成的影響可能并不明顯。是以,建議将直接緩沖區主要配置設定給那些易受基礎系統的本機 I/O 操作影響的大型、持久的緩沖區。一般情況下,最好僅在直接緩沖區能在程式性能方面帶來明顯好處時配置設定它們。
直接位元組緩沖區還可以通過 FileChannel 的 map() 方法 将檔案區域直接映射到記憶體中來建立。該方法傳回ByteBuffer的子類:MappedByteBuffer 。Java 平台的實作有助于通過 JNI 從本機代碼建立直接位元組緩沖區。如果以上這些緩沖區中的某個緩沖區執行個體指的是不可通路的記憶體區域,則試圖通路該區域不會更改該緩沖區的内容,并且将會在通路期間或稍後的某個時間導緻抛出不确定的異常。
非直接緩沖區如上,假設應用程式想要在磁盤中讀取一些資料的話。應用程式首先發起一個請求,要去讀取目前實體磁盤裡面的資料,這個時候需要将實體磁盤的資料首先讀到核心空間中,然後拷貝一份到使用者空間,然後才能通過read的方法将資料讀到應用程式中。同樣的應用程式中有資料的話需要先寫入到使用者位址空間,然後複制到核心位址空間,再由核心空間寫入到實體磁盤。在這個過程中,這兩個複制的操作比較多餘,是以他的效率比較低一些,也就是将我們的緩沖區建立在jvm的記憶體中相對效率更低。
直接位元組緩沖區如上圖,直接緩沖區不需要拷貝,是将我們的資料直接在實體記憶體中建立一個映射檔案,将資料寫到這個檔案裡面去,這個時候我們應用程式要寫一些資料的話直接寫到這個映射檔案。作業系統就會将這個寫到實體磁盤中。讀磁盤資料同理。這個過程就沒有中間的copy就會比較高。
位元組緩沖區是直接緩沖區還是非直接緩沖區可通過調用其 isDirect() 方法來确定。非直接緩沖區:通過allocate方法分區緩存區,将緩存區建立在JVM的記憶體中
直接緩存區:通過allocateDircet方法分區緩沖區,将緩沖區建立在實體記憶體中,效率更高。
到allocateDirect和allocate建立的源碼中,發現allocate建立的是一個HeapByteBuffer,Heap堆其實就是表示使用者空間,在jvm記憶體中建立的一個緩沖區。
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
3.4 緩沖區使用
Buffer使用一般遵循以下幾個原則:
①配置設定空間,如ByteBuffer buffer = ByteBuffer.allocate(1024);或者使用allocateDirector
②将資料寫入到Buffer中 int readBuffer = inChannel.read(buffer);
③調用flip()方法,将limit設定為目前position,position設定為0,mark設定為-1
④從Buffer中讀取資料 readBuffer = inChannel.read(buffer);
⑤調用clear()(将limit設定為capacity,position設定為0,mark設定為-1,資料沒有清空)方法或者compact()方法
4 通道(Channel)
Channel表示IO源于目标節點打開的連接配接,類似于傳統的流,但是Channel不直接存儲資料,Channel隻能與Buffer進行互動。
以非直接緩沖區為例,應用程式擷取資料需要經過使用者空間,然後核心空間,再讀取資料,所有的讀取操作在NIO是直接由CPU負責的。這個流程會存在一些問題,當我們有大量的檔案讀取操作的時候cpu他的利用就很低,因為IO操作直接搶占CPU的資源,就不能夠去做其他的事情,是以他的效率就會變低。
計算機CPU和記憶體的互動是最頻繁的,記憶體是我們的高速緩存區,CPU運轉速度越來越快,磁盤遠遠跟不上CPU的讀寫速度,才設計了記憶體。這裡把CPU的連接配接幹掉了,變成了DMA(Direct Memory Access,直接記憶體存取器),就是直接記憶體存儲。如果要讀取資料,是以的操作是直接在目前DMA這裡直接完成,不再有CPU去進行負責。但是得到DMA還是需要由目前的CPU進行相關的排程。在這裡交給了DMA之後,CPU就能做其他的事,但是如果依然有大量的IO操作的時候又會造成DMA總線的擁堵,因為最終沒有直接和CPU撇開關系。導緻在大量的檔案讀取請求的時候依然使用率比較低,這個時候就出現了新的資料讀寫流程,這個時候就出現了channel通道。
把DMA換成了通道channel。通道channel可以認為他是一個完全獨立的處理器,他就是用來專門負責檔案的IO操作的,也就是說以後所有的資料直接交給channel去進行負責讀取。這個時候CPU才算是真正的解放了。
java為Channel接口提供的最主要的實作類如下:①FileChannel:用于讀取,寫入、映射和操作檔案的通道②SocketChannel:通過TCP讀取網絡中的資料③ServerSocketChannel:可以監聽新進來的TCP連接配接,對每個新進來的連接配接都會建立一個SocketChannel④DatagramChannel:通過UDP讀寫網絡中的資料通道
擷取通道的三種方式:①對支援通道的對象調用getChannel(),支援通道的類有:FileInputStream,FileOutputStream,RandomAccessFile,Socket,ServerSocket,DatagramSocket②通過XxxChannel的靜态方法open()打開并傳回指定的XxxChannel③使用Files工具類的靜态方法newByteChannel()擷取位元組通道。
FileChannel常用方法
方法 | 描述 |
int read(ByteBuffer dst) | 從Channel中讀取資料到ByteBuffer |
long read(ByteBuffer[] dsts) | 将Channel中的資料“分散”到ByteBuffer[] |
int write(ByteBuffer src) | 将ByteBuffer的資料寫入到Channel |
long write(ByteBuffer[] srcs) | 将ByteBuffer[]的資料"聚集"到Channel |
MappedByteBuffer map(MapMode mode,long position,long size) | 将Channel對應的部分資料或者全部資料映射到ByteBuffer |
long position() | 傳回次通道的檔案位置 |
FileChannel position(long p) | 設定此通道的檔案位置 |
long size() | 傳回此通道的檔案大小 |
FileChannel truncate(long s) | 将此通道的檔案截取為給定大小 |
void force(boolean metadata) | 強制将所有對此通道的檔案更新寫入到儲存設備中 |
分散(Scatter)讀取和聚集(Gather)寫入:①分散讀取(Scattering Reads是指從Channel中讀取的資料“分散”到多個Buffer中,注意:按照緩沖區的順序,從 Channel 中讀取的資料依次将Buffer填滿。②聚集寫入(Gathering Writes)是指将多個Buffer中的資料“聚集”到Channel,注意:按照緩沖區的順序,寫入position和limit之間的資料到Channel。
NIO的強大功能部分來自于Channel的非阻塞特性,套接字的某些操作可能會無限期地阻塞。如對accept()方法的調用可能會因為等待一個用戶端連接配接而阻塞;對read()方法的調用可能會因為沒有資料可讀而阻塞,直到連接配接的另一端傳來新的資料。總的來說,建立/接收連接配接或讀寫資料等I/O調用,都可能無限期地阻塞等待,直到底層的網絡實作發生了什麼。慢速的,有損耗的網絡,或僅僅是簡單的網絡故障都可能導緻任意時間的延遲。然而不幸的是,在調用一個方法之前無法知道其是否阻塞。NIO的channel抽象的一個重要特征就是可以通過配置它的阻塞行為,以實作非阻塞式的信道。 channel.configureBlocking(false)
在非阻塞式信道上調用一個方法總是會立即傳回。這種調用的傳回值訓示了所請求的操作完成的程度。例如,在一個非阻塞式ServerSocketChannel上調用accept()方法,如果有連接配接請求來了,則傳回用戶端SocketChannel,否則傳回null。
對比傳統IO和NIO的代碼
/**
* 傳統IO
*/
public static void IO_FileInputStream(){
BufferedInputStream bis = null;
BufferedOutputStream bos = null;
try {
bis = new BufferedInputStream(new FileInputStream(new File("a.txt")));
bos = new BufferedOutputStream(new FileOutputStream(new File("b.txt")));
byte[] buffer = new byte[1024];
int len;
while ((len=bis.read(buffer))!=-1){
bos.write(buffer,0,len);
bos.flush();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if(bis != null){
bis.close();
}
if(bos != null){
bos.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* NIO
*/
public static void NIO_FileInputStream(){
FileInputStream fis = null;
FileOutputStream fos = null;
try {
fis = new FileInputStream(new File("a.txt"));
fos = new FileOutputStream(new File("b.txt"));
FileChannel inChannel = fis.getChannel();
FileChannel outChannel = fos.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int readBuffer = inChannel.read(buffer);
while (readBuffer!=-1){
buffer.flip();
while(buffer.hasRemaining()){
outChannel.write(buffer);
}
buffer.compact();
readBuffer = inChannel.read(buffer);
}
} catch (IOException e) {
e.printStackTrace();
}finally{
try{
if(fis != null){
fis.close();
}
if(fos != null){
fos.close();
}
}catch (IOException e){
e.printStackTrace();
}
}
}
對比直接緩沖區與記憶體映射檔案操作
public class NioTest {
public static void main(String[] args) {
nioBuffer();
nioDirectBuffer();
}
private static void nioBuffer() {
long start = System.currentTimeMillis();
FileChannel inChannel = null;
FileChannel outChannel = null;
try {
// 擷取通道
inChannel = FileChannel.open(Paths.get("D:\\test\\doneFile0comlog_20201117_01.log.gz"), StandardOpenOption.READ);
outChannel = FileChannel.open(Paths.get("D:\\test\\doneFile0comlog_20201117_01.log.gz.bak"),StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
// 建立緩沖區
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 将輸入通道的資料寫入緩沖區
while (inChannel.read(buffer)!=-1){
buffer.flip();
// 将緩沖區資料寫入輸出通道
outChannel.write(buffer);
// 清空緩沖區
buffer.clear();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally {
if (outChannel!=null){
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (inChannel!=null){
try {
inChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
long end = System.currentTimeMillis();
System.out.println("nioBuffer:"+(end-start));
}
private static void nioDirectBuffer() {
long start = System.currentTimeMillis();
FileChannel inChannel = null;
FileChannel outChannel = null;
try {
// 擷取通道
inChannel = FileChannel.open(Paths.get("D:\\test\\doneFile0comlog_20201117_01.log.gz"), StandardOpenOption.READ);
outChannel = FileChannel.open(Paths.get("D:\\test\\doneFile0comlog_20201117_01.log.gz.bak"),StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
// 進行記憶體映射檔案
MappedByteBuffer inMapBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMapBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, outChannel.size());
// 對緩沖區進行讀寫操作
byte[] b = new byte[inMapBuffer.limit()];
inMapBuffer.get(b);
outMapBuffer.put(b);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (outChannel != null) {
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (inChannel != null) {
try {
inChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
long end = System.currentTimeMillis();
System.out.println("nioDirectBuffer:" + (end - start));
}
}
----------------------------------
結果為:
nioBuffer:94
nioDirectBuffer:7
5 選擇器(Selector )
5.1 相關概念
NIO和BIO有一個非常大的差別是BIO是阻塞的,NIO是非阻塞的。阻塞與非阻塞是相對于網絡通信而言的。網絡通信就會有用戶端的概念。用戶端要向服務端發送資料的話必須建立連接配接,在這個過程中會做一些相關的事情,如accpet等待連接配接,然後用戶端write資料,服務端read資料。這些操作在傳統的套接字socket裡面都是阻塞式的。服務端一次隻能接待一個用戶端,不能一下多個的用戶端。也就是用戶端請求伺服器做些事情的時候,這個用戶端沒有處理完,其他用戶端的請求是進不來的。這種就是阻塞式的,是以服務端如果是這種模型的話,他的效率是非常低的。
要解決這種阻塞就要通過多線程的方式解決,但是線程資源是有限的,那就極大的限制了服務端他的處理效率。這就是經典的C10K問題,假如有C10K,就需要建立1W個程序。在NIO中非阻塞的網絡通信模型Selector就能解決這個問題。
系統線程的切換是消耗系統資源的,如果我們每一個連接配接都用一個線程來管理,資源的開銷會非常大,這個時候就可以用Selector。通過Selector可以實作一個線程管理多個Channel,如果你的應用打開了多個通道,但每個連接配接的流量都很低,使用Selector就會很友善。例如在一個聊天伺服器中。要使用Selector, 得向Selector注冊Channel,然後調用它的select()方法。這個方法會一直阻塞到某個注冊的通道有事件就緒。一旦這個方法傳回,線程就可以處理這些事件(如新的連接配接進來、資料接收等)。Selector 的意義在于隻通過一個線程就可以管理成千上萬個 I/O 請求, 相比使用多個線程,避免了線程上下文切換帶來的開銷。
Selector是怎麼工作的呢?有了Selector之後,Selector會把每一個用戶端和服務端傳輸資料的通道都到Selector上去注冊一下。也就是以後你想向服務端發送資料,通道先到Selector選擇器上注冊一下,那麼Selector就會監控目前channel的IO狀況(讀,寫,連接配接,接受處理等情況)隻有當某個channel上的資料完全準備就緒,Selector才會把這樣一個channel裡面的任務配置設定到服務端來進行運作。當我們用戶端要給服務端發送資料的時候,channel需要在Selector上進行注冊,當channel的資料完全準備就緒的時候Selector才會将任務配置設定給服務端的一個線程進行處理。這種非阻塞式的相較于阻塞式的就能非常好的利用cpu的資源,提高cpu的工作效率。
一個Selector執行個體可以同時檢查一組信道的I/O狀态。用專業術語來說,選擇器就是一個多路開關選擇器,因為一個選擇器能夠管理多個信道上的I/O操作。然而如果用傳統的方式來處理這麼多用戶端,使用的方法是循環地一個一個地去檢查所有的用戶端是否有I/O操作,如果目前用戶端有I/O操作,則可能把目前用戶端扔給一個線程池去處理,如果沒有I/O操作則進行下一個輪詢,當所有的用戶端都輪詢過了又接着從頭開始輪詢;這種方法是非常笨而且也非常浪費資源,因為大部分用戶端是沒有I/O操作,我們也要去檢查;而Selector就不一樣了,它在内部可以同時管理多個I/O,當一個信道有I/O操作的時候,他會通知Selector,Selector就是記住這個信道有I/O操作,并且知道是何種I/O操作,是讀呢?是寫呢?還是接受新的連接配接;是以如果使用Selector,它傳回的結果隻有兩種結果,一種是0,即在你調用的時刻沒有任何用戶端需要I/O操作,另一種結果是一組需要I/O操作的用戶端,這時你就根本不需要再檢查了,因為它傳回給你的肯定是你想要的。這樣一種通知的方式比那種主動輪詢的方式要高效得多。
使用選擇器,首先建立一個Selector執行個體(使用靜态工廠方法open())并将其注冊(register)到想要監控的信道上(通過channel的方法實作,而不是使用selector的方法)。最後,調用選擇器的select()方法。該方法會阻塞等待,直到有一個或更多的信道準備好了I/O操作或等待逾時。select()方法将傳回可進行I/O操作的信道數量。現在,在一個單獨的線程中,通過調用select()方法就能檢查多個信道是否準備好進行I/O操作。如果經過一段時間後仍然沒有信道準備好,select()方法就會傳回0,并允許程式繼續執行其他任務。
Selector 隻能與非阻塞模式下的通道一起使用(即需要實作 SelectableChannel 接口),否則會抛出 IllegalBlockingModeException 異常
5.2 Selector使用
(1)使用步驟
①建立Selector
②向Selector注冊通道,一個Selector可以注冊多個通道
③通過Selector選擇就緒的通道
// 通過open()方法建立 Selector
Selector selector = Selector.open();
// 建立一個通道,以ServerSockeetChannel為例,并且将通道設定為非阻塞模式
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
// 通過register()方法注冊通道
channel.register(selector, SelectionKey.OP_ACCEPT);
// 通過select()方法從多個通道中以輪詢的方式選擇已經準備就緒的通道。根據之前register()方法中設定的興趣,将可以進行對應操作的通道選擇出來
selector.select();
// 通過Selector的selectedKeys()方法獲得已選擇鍵集(selected-key set)
Set key = selector.selectedKeys();
//通過Iterator疊代器依次擷取key中的SelectionKey對象,并通過SelectionKey中的判斷方法執行對應的操作
Iterator<SelectionKey> iterator = key.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
//TODO
}
if (selectionKey.isReadable()){
//TODO
}
if(selectionKey.isWritable()&&key.isValid()){
//TODO
}
if (selectionKey.isConnectable()){
//TODO
}
iterator.remove();
}
(2)register()方法
public abstract SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException;
register() 方法傳回SelectionKey對象,在SelectableChannel抽象類中定義如上。參數說明如下
Selector sel | 通道注冊的選擇器 |
int ops | interest集合,表示通過Selector監聽Channel時對什麼事件感興趣 |
Object att | 這是一個可選參數,在注冊通道時可以附加一個對象,用于之後便于識别某個通道 |
interest集合有下面4種操作
操作類型 | 值 | 描述 |
SelectionKey.OP_ACCEPT | 1<<4 | 接收Socket操作 |
SelectionKey.OP_READ | 1<<0 | 讀操作 |
SelectionKey.OP_WRITE | 1<<2 | 寫操作 |
SelectionKey.OP_CONNECT | 1<<3 | 接收Socket操作 |
注意:通道一般并不會同時支援這四種操作類型,我們可以通過 validOps() 方法擷取通道支援的類型。
(3)select()方法
select有2個重載方法:
①int select():選擇已準備就緒的通道,傳回值表示自上一次選擇後有多少新增通道準備就緒;當沒有通道準備就緒時,會一直阻塞下去,直到至少一個通道被選擇、該選擇器的 wakeup() 方法被調用或目前線程被中斷時。select() 方法實際上調用了 select(0L) 方法傳回
②int select(long timeout):選擇已準備就緒的通道;當沒有通道準備就緒時,會一直阻塞下去,直到至少一個通道被選擇、該選擇器的 wakeup() 方法被調用、目前線程被中斷或給定時間到期時傳回。
除此紫外還可以選擇 selectNow() 方法,該方法為非阻塞方法,無論有無通道就緒都會立即傳回。如果自前一次 select 操作後沒有新的通道準備就緒,則會立即傳回 0。
(4)SelectionKey
SelectionKey中有下面幾種判斷方法,與操作類型相對應:
boolean isReadable() | 是否可讀,是傳回 true |
boolean isWritable() | 是否可寫,是傳回 true |
boolean isConnectable() | 是否可連接配接,是傳回 true |
boolean isAcceptable() | 是否可接收,是傳回 true |
selectedKeys() 獲得的是已就緒的通道對應的 SelectionKey。如果想獲得該選擇器上所有通道對應的 SelectionKey,可以通過 keys() 方法擷取。
(5)使用例子
public class NIOServer {
public static void main(String[] args) throws IOException {
// 擷取通道,并設定為非阻塞
ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
// 綁定端口号
ssChannel.bind(new InetSocketAddress(9999));
// 建立選擇器對象
Selector selector = Selector.open();
// 将通道注冊到選擇器上,那麼選擇器就會監聽通道的接收時間,如果有接收,并且接收準備就緒才開始進行下一步操作
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
// 通過輪訓的方式擷取選擇器上準備就緒的事件
// selector.select()>0表示至少有個selectionKey準備就緒
while (selector.select()>0){
// 擷取目前選擇器中所有注冊的選擇鍵
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 疊代擷取已經準備好的選擇鍵
while (iterator.hasNext()){
// 擷取已經準備就是的事件
SelectionKey sk = iterator.next();
if(sk.isAcceptable()){
// 調用accpet
SocketChannel sChannel = ssChannel.accept();
// 将sChannel設定為非阻塞的
sChannel.configureBlocking(false);
// 将該通道注冊到選擇器上
sChannel.register(selector,SelectionKey.OP_READ);
}else if(sk.isReadable()){
// 如果讀狀态已經準備就是,那麼開始讀取資料
// 擷取目前選擇器上讀狀态準備就緒的通道
SocketChannel sChannel = (SocketChannel)sk.channel();
// 建立緩沖區接收用戶端發送過來的資料
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 讀取緩沖區的資料
int len =0;
while ((len=sChannel.read(buffer))>0){
buffer.flip();
System.out.println(new String(buffer.array(),0,len));
buffer.clear();
}
}
// 當selectKey使用完之後要溢出,否則會一直優先
iterator.remove();
}
}
}
}
public class NIOClient {
public static void main(String[] args) throws IOException {
// 擷取通道,預設是阻塞的
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
// 設定通道為非阻塞的
sChannel.configureBlocking(false);
// 建立緩沖區
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("hello".getBytes());
// 将緩沖區資料寫入到sChannel中
buffer.flip();
sChannel.write(buffer);
buffer.clear();
sChannel.close();
}
}