非阻塞I/O是基于事件機制驅動設計的,通道會事先注冊在一個監聽器上,并且注明其所要關注的事件,當事件觸發的時候會自動的由系統來調用這些符合條件的通道。
要擷取一個伺服器端通道,一般需要利用ServerSocketChannel.open()建立一個伺服器端通道。然後利用該伺服器端通道擷取其對等的ServerSocket對象,設定該對象的監聽端口。伺服器端通道ServerSocketChannel會調用accept來監聽所有的入站請求,預設情況下accept會阻塞的,可以設定會非阻塞,利用serverChannel.configureBlocking(false),此時就會馬上傳回一個null,是以必須要進行判斷。對于accept擷取的用戶端通道需要設定其為非阻塞模式,因為這樣可以讓伺服器不必在因為該用戶端通道中的讀操作和寫操作而一直等待,進而去處理别的并發連接配接,當用戶端讀操作或寫操作準備好後,根據事件驅動會進而來随機選擇那些已經準備好的通道。
一個伺服器通道會建立多個打開的非阻塞用戶端通道,這裡不必為每次的連接配接配置設定一個線程,而是建立一個Selector,使得程式能夠對于所有準備好的連接配接進行循環處理。這裡準備好的連接配接有伺服器準備好了接受,用戶端通道準備好了讀或者寫操作。
為每一個通道在監視通道的選擇器上進行注冊,并且要表面要所關注的動作。伺服器一般所要關心的操作就是OP_ACCEPT,即是否準備好接受一個新連接配接。用戶端通道一般所要關注的是是否準備好寫入通道了,OP_WRITE。一般都會傳回一個SelectionKey,該對象有一個Object類型的附件,可以保持一個将要寫入網絡的緩沖區對象。
選擇器不斷的循環,找到一個就緒的通道的集合。
利用選擇器來選擇一個就續通道,如果是伺服器通道,則程式接受一個新的Socket通道,将其添加到選擇器中。如果是一個用戶端通道,程式就會向通道寫入盡可能多的資料。一般一個線程可以同時就處理多個連接配接。
假如是用戶端通道,就可以擷取該傳回的SelectionKey中的附件,将其轉換為byteBuffer,進而來檢測緩沖區是否有資料,如果有就寫入到通道中。
流與緩沖區和通道的差異
1 利用非阻塞I/O進行互動的時候,不再獲得Socket的輸入流或者輸出流了,而是都要從緩沖區中讀寫資料
2 流是基于位元組的,而通道是基于塊的,在利用非阻塞中都是将緩沖區中的資料寫入到通道中進而傳輸,通道傳輸會傳輸緩沖區中的資料塊。
伺服器端非阻塞I/O伺服器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
<code>package</code> <code>com.nio.server;</code>
<code>import</code> <code>java.io.IOException;</code>
<code>import</code> <code>java.net.InetSocketAddress;</code>
<code>import</code> <code>java.net.ServerSocket;</code>
<code>import</code> <code>java.nio.ByteBuffer;</code>
<code>import</code> <code>java.nio.channels.SelectionKey;</code>
<code>import</code> <code>java.nio.channels.Selector;</code>
<code>import</code> <code>java.nio.channels.ServerSocketChannel;</code>
<code>import</code> <code>java.nio.channels.SocketChannel;</code>
<code>import</code> <code>java.util.Iterator;</code>
<code>import</code> <code>java.util.Set;</code>
<code>public</code> <code>class</code> <code>ChargenServer {</code>
<code> </code><code>private</code> <code>static</code> <code>int</code> <code>DEFAULT_PORT=</code><code>2122</code><code>;</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) {</code>
<code> </code><code>// TODO Auto-generated method stub</code>
<code> </code><code>int</code> <code>port=DEFAULT_PORT;</code>
<code> </code><code>byte</code><code>[] rotation=</code><code>new</code> <code>byte</code><code>[</code><code>95</code><code>*</code><code>2</code><code>];</code>
<code> </code><code>for</code><code>( </code><code>byte</code> <code>i=</code><code>' '</code><code>;i<=</code><code>'~'</code><code>;i++)</code>
<code> </code><code>{</code>
<code> </code><code>rotation[i-</code><code>' '</code><code>]=i;</code>
<code> </code><code>rotation[i+</code><code>95</code><code>-</code><code>' '</code><code>]=i;</code>
<code> </code><code>}</code>
<code> </code>
<code> </code><code>ServerSocketChannel serverChannel;</code>
<code> </code><code>Selector selector;</code>
<code> </code><code>try</code><code>{</code>
<code> </code><code>//建立一個伺服器端通道,綁定到指定的端口</code>
<code> </code><code>serverChannel=ServerSocketChannel.open();</code>
<code> </code><code>ServerSocket ss=serverChannel.socket();</code>
<code> </code><code>InetSocketAddress address=</code><code>new</code> <code>InetSocketAddress(port);</code>
<code> </code><code>ss.bind(address);</code>
<code> </code>
<code> </code><code>//設定伺服器通道為非阻塞即調用accept會立即傳回,預設是阻塞的即調用accept會阻塞的</code>
<code> </code><code>serverChannel.configureBlocking(</code><code>false</code><code>);</code>
<code> </code><code>//擷取選擇器</code>
<code> </code><code>selector=Selector.open();</code>
<code> </code><code>//将伺服器通道注冊到選擇器,監聽的事件為準備好接受新的連接配接</code>
<code> </code><code>serverChannel.register(selector, SelectionKey.OP_ACCEPT);</code>
<code> </code><code>}</code><code>catch</code><code>(IOException e)</code>
<code> </code><code>System.out.println(e);</code>
<code> </code><code>return</code> <code>;</code>
<code> </code><code>while</code><code>(</code><code>true</code><code>)</code>
<code> </code><code>try</code><code>{</code>
<code> </code><code>//這裡會一直阻塞,直到有一個連接配接準備好了</code>
<code> </code><code>selector.select();</code>
<code> </code><code>}</code><code>catch</code><code>(IOException e)</code>
<code> </code><code>{</code>
<code> </code><code>System.err.println(e);</code>
<code> </code><code>break</code><code>;</code><code>//發生異常退出循環</code>
<code> </code><code>}</code>
<code> </code><code>//當有通道已經準備好處理時,此時selectedKeys就會擷取就緒的通道</code>
<code> </code><code>//該集合中每一項都是SelectionKey</code>
<code> </code><code>Set readyKeys=selector.selectedKeys();</code>
<code> </code><code>Iterator iterator=readyKeys.iterator();</code>
<code> </code><code>//處理就緒的通道</code>
<code> </code><code>while</code><code>(iterator.hasNext())</code>
<code> </code><code>SelectionKey key=(SelectionKey)iterator.next();</code>
<code> </code><code>//處理該SelectionKey,必須先移除掉</code>
<code> </code><code>iterator.remove();</code>
<code> </code><code>try</code><code>{</code>
<code> </code><code>//判斷key的動作,這裡是伺服器端通道準備好接受連接配接</code>
<code> </code><code>//這裡的接受,代表的是伺服器端通道得到了連接配接請求,已經準備好來連接配接該請求</code>
<code> </code><code>if</code><code>(key.isAcceptable())</code>
<code> </code><code>{</code>
<code> </code><code>ServerSocketChannel server=(ServerSocketChannel)key.channel();</code>
<code> </code><code>//由于已經在注冊的時候設定伺服器端通道是非阻塞的,這裡accept會立馬傳回的,</code>
<code> </code><code>//不過這裡傳回的client是已經申請連接配接的socket</code>
<code> </code><code>SocketChannel client=server.accept();</code>
<code> </code><code>System.out.println(</code><code>"Accepted connection from :"</code><code>+client);</code>
<code> </code><code>//設定用戶端通道是非阻塞的</code>
<code> </code><code>client.configureBlocking(</code><code>false</code><code>);</code>
<code> </code><code>//用戶端通道注冊到選擇器中,監聽寫入</code>
<code> </code><code>SelectionKey key2=client.register(selector, SelectionKey.OP_WRITE);</code>
<code> </code>
<code> </code><code>ByteBuffer buffer=ByteBuffer.allocate(</code><code>74</code><code>);</code>
<code> </code><code>buffer.put(rotation,</code><code>0</code><code>,</code><code>72</code><code>);</code>
<code> </code><code>buffer.put((</code><code>byte</code><code>)</code><code>'\r'</code><code>);</code>
<code> </code><code>buffer.put((</code><code>byte</code><code>)</code><code>'\n'</code><code>);</code>
<code> </code><code>//每行以\r\n結束</code>
<code> </code><code>//回繞到緩沖區開頭再次進行</code>
<code> </code><code>buffer.flip();</code>
<code> </code><code>//将該緩沖區作為附件加在該key2中,這樣下次就可以繼續擷取該緩沖區中的資料</code>
<code> </code><code>key2.attach(buffer);</code>
<code> </code><code>}</code>
<code> </code><code>//用戶端通道準備好寫入資料發送給用戶端</code>
<code> </code><code>else</code> <code>if</code><code>(key.isWritable())</code>
<code> </code><code>//擷取用戶端通道</code>
<code> </code><code>SocketChannel client=(SocketChannel)key.channel();</code>
<code> </code><code>//擷取該用戶端通道的key中的附件,即已經準備好的緩沖區</code>
<code> </code><code>ByteBuffer buffer=(ByteBuffer)key.attachment();</code>
<code> </code><code>//緩沖區已經沒有剩餘資料需要寫入通道的時候</code>
<code> </code><code>//判斷目前緩沖區的位置與限制之間是否有資料,将未填滿的資料繼續填充</code>
<code> </code><code>if</code><code>(!buffer.hasRemaining())</code>
<code> </code><code>{</code>
<code> </code><code>//将緩沖區将要被讀取或者寫入的位置變為0,同時限度不變</code>
<code> </code><code>buffer.rewind();</code>
<code> </code><code>//得到緩沖區目前的位置,并且自增位置</code>
<code> </code><code>int</code> <code>first=buffer.get();</code>
<code> </code><code>//尋找rotation中的新的首字元位置</code>
<code> </code><code>int</code> <code>position=first-</code><code>' '</code><code>+</code><code>1</code><code>;</code>
<code> </code><code>//資料從rotation複制到緩沖區</code>
<code> </code><code>buffer.put(rotation,position,</code><code>72</code><code>);</code>
<code> </code><code>buffer.put((</code><code>byte</code><code>)</code><code>'\r'</code><code>);</code>
<code> </code><code>buffer.put((</code><code>byte</code><code>)</code><code>'\n'</code><code>);</code>
<code> </code><code>buffer.flip();</code>
<code> </code><code>}</code>
<code> </code><code>//将資料發送給用戶端</code>
<code> </code><code>client.write(buffer);</code>
<code> </code><code>}</code><code>catch</code><code>(IOException e)</code>
<code> </code><code>{</code>
<code> </code><code>key.cancel();</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code>}</code>
本文轉自 zhao_xiao_long 51CTO部落格,原文連結:http://blog.51cto.com/computerdragon/1197556