天天看點

java Nio 異步操作(四)channel

java nio 的核心組成部分:

1.channels

2.buffers

3.selectors

  我們首先來學習channels(java.nio.channels):

通道

  1)通道基礎

  通道(channel)是java.nio的第二個主要創新。它們既不是一個擴充也不是一項增強,而是全新、極好的java i/o示例,提供與i/o服務的直接連接配接。channel用于在位元組緩沖區和位于通道另一側的實體(通常是一個檔案或套接字)之間有效地傳輸資料。

  channel的jdk源碼:

1

2

3

4

5

6

<code>package</code> <code>java.nio.channels;</code>

<code>    </code><code>public</code>

<code>interface</code> <code>channel;</code>

<code>    </code><code>{</code>

<code>        </code><code>public</code>

<code>boolean</code> <code>isopen();</code>

<code>void</code> <code>close()</code><code>throws</code>

<code>ioexception;</code>

<code>    </code><code>}</code>

  與緩沖區不同,通道api主要由接口指定。不同的作業系統上通道實作(channel implementation)會有根本性的差異,是以通道api僅僅描述了可以做什麼。是以很自然地,通道實作經常使用作業系統的本地代碼。通道接口允許您以一種受控且可移植的方式來通路底層的i/o服務。

<code>  channel</code>是一個對象,可以通過它讀取和寫入資料。拿 nio 與原來的 i/o 做個比較,通道就像是流。所有資料都通過 <code>buffer</code> 對象來處理。您永遠不會将位元組直接寫入通道中,相反,您是将資料寫入包含一個或者多個位元組的緩沖區。同樣,您不會直接從通道中讀取位元組,而是将資料從通道讀入緩沖區,再從緩沖區擷取這個位元組。

java nio 的通道類似流,但又有些不同:

既可以從通道中讀取資料,又可以寫資料到通道。但流的讀寫通常是單向的。

通道可以異步地讀寫。

通道中的資料總是要先讀到一個 buffer,或者總是要從一個 buffer 中寫入。

基本上,所有的 io 在nio 中都從一個channel 開始。channel 有點象流。 資料可以從channel讀到buffer中,也可以從buffer 寫到channel中。這裡有個圖示: 

                                                      

java Nio 異步操作(四)channel

下面是java nio中的一些主要channel的實作: java.nio.channels包中的channel接口的已知實作類

filechannel:從檔案中讀寫資料。

datagramchannel:能通過udp讀寫網絡中的資料。

socketchannel:能通過tcp讀寫網絡中的資料。

serversocketchannel:可以監聽新進來的tcp連接配接,像web伺服器那樣。對每一個新進來的連接配接都會建立一個socketchannel。

正如你所看到的,這些通道涵蓋了udp 和 tcp 網絡io,以及檔案io。 

  2)通道api

  1.檔案通道API

  filechannel類可以實作常用的read,write以及scatter/gather操作,同時它也提供了很多專用于檔案的新方法。這些方法中的許多都是我們所熟悉的檔案操作。

  filechannel類的jdk源碼:

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

<code>abstract</code> <code>class</code>

<code>filechannel</code><code>extends</code>

<code>abstractchannel</code><code>implements</code>

<code>bytechannel, gatheringbytechannel, scatteringbytechannel</code>

<code>        </code><code>// this is a partial api listing</code>

<code>        </code><code>// all methods listed here can throw java.io.ioexception</code>

<code>abstract</code> <code>int</code> <code>read (bytebuffer dst,</code><code>long</code> <code>position);</code>

<code>abstract</code> <code>int</code> <code>write (bytebuffer src,</code><code>long</code> <code>position);</code>

<code>abstract</code> <code>long</code>

<code>size();</code>

<code>position();</code>

<code>abstract</code> <code>void</code>

<code>position (</code><code>long</code> <code>newposition);</code>

<code>truncate (</code><code>long</code> <code>size);</code>

<code>force (</code><code>boolean</code> <code>metadata);</code>

<code>final</code> <code>filelock lock();</code>

<code>abstract</code> <code>filelock lock (</code><code>long</code>

<code>position,</code><code>long</code> <code>size,</code><code>boolean</code> <code>shared);</code>

<code>final</code> <code>filelock trylock();</code>

<code>abstract</code> <code>filelock trylock (</code><code>long</code>

<code>abstract</code> <code>mappedbytebuffer map (mapmode mode,</code><code>long</code> <code>position,</code>

<code>long</code> <code>size);</code>

<code>static</code> <code>class</code> <code>mapmode;</code>

<code>static</code> <code>final</code> <code>mapmode read_only;</code>

<code>static</code> <code>final</code> <code>mapmode read_write;</code>

<code>static</code> <code>final</code> <code>mapmode private;</code>

<code>transferto (</code><code>long</code>

<code>position,</code><code>long</code> <code>count, writablebytechannel target);</code>

<code>transferfrom (readablebytechannel src,</code><code>long</code>

<code>position,</code><code>long</code> <code>count);</code>

<code>    </code><code>} </code>

  檔案通道總是阻塞式的,是以不能被置于非阻塞模式。現代作業系統都有複雜的緩存和預取機制,使得本地磁盤i/o操作延遲很少。網絡檔案系統一般而言延遲會多些,不過卻也因該優化而受益。面向流的i/o的非阻塞範例對于面向檔案的操作并無多大意義,這是由檔案i/o本質上的不同性質造成的。對于檔案i/o,最強大之處在于異步i/o(asynchronous i/o),它允許一個程序可以從作業系統請求一個或多個i/o操作而不必等待這些操作的完成。發起請求的程序之後會收到它請求的i/o操作已完成的通知。

  filechannel對象是線程安全(thread-safe)的。多個程序可以在同一個執行個體上并發調用方法而不會引起任何問題,不過并非所有的操作都是多線程的(multithreaded)。影響通道位置或者影響檔案大小的操作都是單線程的(single-threaded)。如果有一個線程已經在執行會影響通道位置或檔案大小的操作,那麼其他嘗試進行此類操作之一的線程必須等待。并發行為也會受到底層的作業系統或檔案系統影響。

  每個filechannel對象都同一個檔案描述符(file descriptor)有一對一的關系,是以上面列出的api方法與在您最喜歡的posix(可移植作業系統接口)相容的作業系統上的常用檔案i/o系統調用緊密對應也就不足為怪了。本質上講,randomaccessfile類提供的是同樣的抽象内容。在通道出現之前,底層的檔案操作都是通過randomaccessfile類的方法來實作的。filechannel模拟同樣的i/o服務,是以它的api自然也是很相似的。

  三者之間的方法對比:

  

filechannel

randomaccessfile

posix system call

read( )

write( )

size( )

length( )

fstat( )

position( )

getfilepointer( )

lseek( )

position (long newposition)

seek( )

truncate( )

setlength( )

ftruncate( )

force( )

getfd().sync( )

fsync( )

  2.socket通道

  新的socket通道類可以運作非阻塞模式并且是可選擇的。這兩個性能可以激活大程式(如網絡伺服器和中間件元件)巨大的可伸縮性和靈活性。本節中我們會看到,再也沒有為每個socket連接配接使用一個線程的必要了,也避免了管理大量線程所需的上下文交換總開銷。借助新的nio類,一個或幾個線程就可以管理成百上千的活動socket連接配接了并且隻有很少甚至可能沒有性能損失。所有的socket通道類(datagramchannel、socketchannel和serversocketchannel)都繼承了位于java.nio.channels.spi包中的abstractselectablechannel。這意味着我們可以用一個selector對象來執行socket通道的就緒選擇(readiness

selection)。

  請注意datagramchannel和socketchannel實作定義讀和寫功能的接口而serversocketchannel不實作。serversocketchannel負責監聽傳入的連接配接和建立新的socketchannel對象,它本身從不傳輸資料。

  在我們具體讨論每一種socket通道前,您應該了解socket和socket通道之間的關系。之前的章節中有寫道,通道是一個連接配接i/o服務導管并提供與該服務互動的方法。就某個socket而言,它不會再次實作與之對應的socket通道類中的socket協定api,而java.net中已經存在的socket通道都可以被大多數協定操作重複使用。

  全部socket通道類(datagramchannel、socketchannel和serversocketchannel)在被執行個體化時都會建立一個對等socket對象。這些是我們所熟悉的來自java.net的類(socket、serversocket和datagramsocket),它們已經被更新以識别通道。對等socket可以通過調用socket( )方法從一個通道上擷取。此外,這三個java.net類現在都有getchannel( )方法。

  socket通道将與通信協定相關的操作委托給相應的socket對象。socket的方法看起來好像在通道類中重複了一遍,但實際上通道類上的方法會有一些新的或者不同的行為。

  要把一個socket通道置于非阻塞模式,我們要依靠所有socket通道類的公有超級類:selectablechannel。就緒選擇(readiness selection)是一種可以用來查詢通道的機制,該查詢可以判斷通道是否準備好執行一個目标操作,如讀或寫。非阻塞i/o和可選擇性是緊密相連的,那也正是管理阻塞模式的api代碼要在selectablechannel超級類中定義的原因。

  設定或重新設定一個通道的阻塞模式是很簡單的,隻要調用configureblocking( )方法即可,傳遞參數值為true則設為阻塞模式,參數值為false值設為非阻塞模式。真的,就這麼簡單!您可以通過調用isblocking( )方法來判斷某個socket通道目前處于哪種模式。

  非阻塞socket通常被認為是服務端使用的,因為它們使同時管理很多socket通道變得更容易。但是,在用戶端使用一個或幾個非阻塞模式的socket通道也是有益處的,例如,借助非阻塞socket通道,gui程式可以專注于使用者請求并且同時維護與一個或多個伺服器的會話。在很多程式上,非阻塞模式都是有用的。

  偶爾地,我們也會需要防止socket通道的阻塞模式被更改。api中有一個blockinglock( )方法,該方法會傳回一個非透明的對象引用。傳回的對象是通道實作修改阻塞模式時内部使用的。隻有擁有此對象的鎖的線程才能更改通道的阻塞模式。

2.2.1 serversocketchannel

讓我們從最簡單的serversocketchannel來開始對socket通道類的讨論。以下是serversocketchannel的完整api:

<code>public</code> <code>abstract</code> <code>class</code> <code>serversocketchannel</code><code>extends</code> <code>abstractselectablechannel</code>

<code>  </code><code>{</code>

<code>      </code><code>public</code>

<code>static</code> <code>serversocketchannel open()</code><code>throws</code> <code>ioexception;</code>

<code>abstract</code> <code>serversocket socket();</code>

<code>abstract</code> <code>serversocket accept()</code><code>throws</code>

<code>final</code> <code>int</code> <code>validops();</code>

<code>  </code><code>}  </code>

serversocketchannel是一個基于通道的socket監聽器。它同我們所熟悉的java.net.serversocket執行相同的基本任務,不過它增加了通道語義,是以能夠在非阻塞模式下運作。

由于serversocketchannel沒有bind( )方法,是以有必要取出對等的socket并使用它來綁定到一個端口以開始監聽連接配接。我們也是使用對等serversocket的api來根據需要設定其他的socket選項。

同它的對等體java.net.serversocket一樣,serversocketchannel也有accept( )方法。一旦您建立了一個serversocketchannel并用對等socket綁定了它,然後您就可以在其中一個上調用accept( )。如果您選擇在serversocket上調用accept( )方法,那麼它會同任何其他的serversocket表現一樣的行為:總是阻塞并傳回一個java.net.socket對象。如果您選擇在serversocketchannel上調用accept(

)方法則會傳回socketchannel類型的對象,傳回的對象能夠在非阻塞模式下運作。

如果以非阻塞模式被調用,當沒有傳入連接配接在等待時,serversocketchannel.accept( )會立即傳回null。正是這種檢查連接配接而不阻塞的能力實作了可伸縮性并降低了複雜性。可選擇性也是以得到實作。我們可以使用一個選擇器執行個體來注冊一個serversocketchannel對象以實作新連接配接到達時自動通知的功能。以下代碼示範了如何使用一個非阻塞的accept( )方法:

25

26

27

28

29

30

31

32

33

<code>package</code> <code>com.ronsoft.books.nio.channels;</code>

<code>   </code><code>import</code>

<code>java.nio.bytebuffer;</code>

<code>java.nio.channels.serversocketchannel;</code>

<code>java.nio.channels.socketchannel;</code>

<code>java.net.inetsocketaddress;</code>

<code>   </code><code>public</code>

<code>class</code> <code>channelaccept</code>

<code>   </code><code>{</code>

<code>       </code><code>public</code>

<code>static</code> <code>final</code> <code>string greeting =</code><code>"hello i must be going.\r\n"</code><code>;</code>

<code>static</code> <code>void</code> <code>main (string [] argv)</code><code>throws</code> <code>exception</code>

<code>       </code><code>{</code>

<code>           </code><code>int</code>

<code>port =</code><code>1234</code><code>;</code><code>// default</code>

<code>           </code><code>if</code>

<code>(argv.length &gt;</code><code>0</code><code>) {</code>

<code>               </code><code>port = integer.parseint (argv [</code><code>0</code><code>]);</code>

<code>           </code><code>}</code>

<code>           </code><code>bytebuffer buffer = bytebuffer.wrap (greeting.getbytes());</code>

<code>           </code><code>serversocketchannel ssc = serversocketchannel.open();</code>

<code>           </code><code>ssc.socket().bind (</code><code>new</code>

<code>inetsocketaddress (port));</code>

<code>           </code><code>ssc.configureblocking (</code><code>false</code><code>);</code>

<code>           </code><code>while</code>

<code>(</code><code>true</code><code>) {</code>

<code>               </code><code>system.out.println (</code><code>"waiting for connections"</code><code>);</code>

<code>               </code><code>socketchannel sc = ssc.accept();</code>

<code>               </code><code>if</code>

<code>(sc ==</code><code>null</code><code>) {</code>

<code>                   </code><code>thread.sleep (</code><code>2000</code><code>);</code>

<code>               </code><code>}</code><code>else</code> <code>{</code>

<code>                   </code><code>system.out.println (</code><code>"incoming connection from: "</code>

<code>+ sc.socket().getremotesocketaddress());</code>

<code>                   </code><code>buffer.rewind();</code>

<code>                   </code><code>sc.write (buffer);</code>

<code>                   </code><code>sc.close();</code>

<code>               </code><code>}</code>

<code>       </code><code>}</code>

<code>   </code><code>}</code>

2.2.2 socketchannel

下面開始學習socketchannel,它是使用最多的socket通道類:

java nio中的socketchannel是一個連接配接到tcp網絡套接字的通道。可以通過以下2種方式建立socketchannel:

打開一個socketchannel并連接配接到網際網路上的某台伺服器。

一個新連接配接到達serversocketchannel時,會建立一個socketchannel。

下面是socketchannel的打開方式:

<code>socketchannel socketchannel = socketchannel.open();</code>

<code>socketchannel.connect(</code><code>new</code>

<code>inetsocketaddress(</code><code>"http://jenkov.com"</code><code>,</code><code>80</code><code>));</code>

關閉 socketchannel

當用完socketchannel之後調用socketchannel.close()關閉socketchannel:

<code>socketchannel.close();  </code>

要從socketchannel中讀取資料,調用一個read()的方法之一。以下是例子:

<code>bytebuffer buf = bytebuffer.allocate(</code><code>48</code><code>);</code>

<code>int</code> <code>bytesread = socketchannel.read(buf);</code>

  首先,配置設定一個buffer。從socketchannel讀取到的資料将會放到這個buffer中。然後,調用socketchannel.read()。該方法将資料從socketchannel 讀到buffer中。read()方法傳回的int值表示讀了多少位元組進buffer裡。如果傳回的是-1,表示已經讀到了流的末尾(連接配接關閉了)。

寫資料到socketchannel用的是socketchannel.write()方法,該方法以一個buffer作為參數。示例如下:

<code>string newdata =</code>

<code>"new string to write to file..."</code> <code>+ system.currenttimemillis();</code>

<code>buf.clear();</code>

<code>buf.put(newdata.getbytes());</code>

<code>buf.flip();</code>

<code>while</code><code>(buf.hasremaining()) {</code>

<code>    </code><code>channel.write(buf);</code>

<code>}</code>

  注意socketchannel.write()方法的調用是在一個while循環中的。write()方法無法保證能寫多少位元組到socketchannel。是以,我們重複調用write()直到buffer沒有要寫的位元組為止。

可以設定 socketchannel 為非阻塞模式(non-blocking mode).設定之後,就可以在異步模式下調用connect(), read() 和write()了。

如果socketchannel在非阻塞模式下,此時調用connect(),該方法可能在連接配接建立之前就傳回了。為了确定連接配接是否建立,可以調用finishconnect()的方法。像這樣:

<code>socketchannel.configureblocking(</code><code>false</code><code>);</code>

<code>while</code><code>(! socketchannel.finishconnect() ){</code>

<code>    </code><code>//wait, or do something else...</code>

write()

非阻塞模式下,write()方法在尚未寫出任何内容時可能就傳回了。是以需要在循環中調用write()。前面已經有例子了,這裡就不贅述了。

非阻塞模式下,read()方法在尚未讀取到任何資料時可能就傳回了。是以需要關注它的int傳回值,它會告訴你讀取了多少位元組。

非阻塞模式與選擇器搭配會工作的更好,通過将一或多個socketchannel注冊到selector,可以詢問選擇器哪個通道已經準備好了讀取,寫入等。selector與socketchannel的搭配使用會在後面詳講。

2.2.3 datagramchannel

最後一個socket通道是datagramchannel。正如socketchannel對應socket,serversocketchannel對應serversocket,每一個datagramchannel對象也有一個關聯的datagramsocket對象。不過原命名模式在此并未适用:“datagramsocketchannel”顯得有點笨拙,是以采用了簡潔的“datagramchannel”名稱。

正如socketchannel模拟連接配接導向的流協定(如tcp/ip),datagramchannel則模拟包導向的無連接配接協定(如udp/ip)。

datagramchannel是無連接配接的。每個資料報(datagram)都是一個自包含的實體,擁有它自己的目的位址及不依賴其他資料報的資料負載。與面向流的的socket不同,datagramchannel可以發送單獨的資料報給不同的目的位址。同樣,datagramchannel對象也可以接收來自任意位址的資料包。每個到達的資料報都含有關于它來自何處的資訊(源位址)。

最後給出一個基本的channel執行個體:

<code>randomaccessfile afile =</code><code>new</code> <code>randomaccessfile(</code><code>"data/nio-data.txt"</code><code>,</code><code>"rw"</code><code>);</code>

<code>filechannel inchannel = afile.getchannel();</code>

<code>int</code> <code>bytesread = inchannel.read(buf);</code>

<code>//讀取的位元組數,可能為零,如果該通道已到達流的末尾,則傳回 -1</code>

<code>while</code> <code>(bytesread != -</code><code>1</code><code>) { system.out.println(</code><code>"read "</code>

<code>+ bytesread);</code>

<code>//反轉緩沖區</code>

<code>while</code><code>(buf.hasremaining()){</code>

<code>system.out.print((</code><code>char</code><code>) buf.get());</code>

<code>//讀取此緩沖區目前位置的位元組,然後該位置遞增。</code>

<code>} buf.clear();</code>

<code>bytesread = inchannel.read(buf);</code>

<code>//從緩沖區讀取資料</code>

<code>} afile.close();</code>