非阻塞I/O是基于事件机制驱动设计的,通道会事先注册在一个监听器上,并且注明其所要关注的事件,当事件触发的时候会自动的由系统来调用这些符合条件的通道。
要获取一个服务器端通道,一般需要利用ServerSocketChannel.open()创建一个服务器端通道。然后利用该服务器端通道获取其对等的ServerSocket对象,设置该对象的监听端口。服务器端通道ServerSocketChannel会调用accept来监听所有的入站请求,默认情况下accept会阻塞的,可以设置会非阻塞,利用serverChannel.configureBlocking(false),此时就会马上返回一个null,所以必须要进行判断。对于accept获取的客户端通道需要设置其为非阻塞模式,因为这样可以让服务器不必在因为该客户端通道中的读操作和写操作而一直等待,进而去处理别的并发连接,当客户端读操作或写操作准备好后,根据事件驱动会进而来随机选择那些已经准备好的通道。
一个服务器通道会创建多个打开的非阻塞客户端通道,这里不必为每次的连接分配一个线程,而是创建一个Selector,使得程序能够对于所有准备好的连接进行循环处理。这里准备好的连接有服务器准备好了接受,客户端通道准备好了读或者写操作。
为每一个通道在监视通道的选择器上进行注册,并且要表面要所关注的动作。服务器一般所要关心的操作就是OP_ACCEPT,即是否准备好接受一个新连接。客户端通道一般所要关注的是是否准备好写入通道了,OP_WRITE。一般都会返回一个SelectionKey,该对象有一个Object类型的附件,可以保持一个将要写入网络的缓冲区对象。
选择器不断的循环,找到一个就绪的通道的集合。
利用选择器来选择一个就续通道,如果是服务器通道,则程序接受一个新的Socket通道,将其添加到选择器中。如果是一个客户端通道,程序就会向通道写入尽可能多的数据。一般一个线程可以同时就处理多个连接。
假如是客户端通道,就可以获取该返回的SelectionKey中的附件,将其转换为byteBuffer,进而来检测缓冲区是否有数据,如果有就写入到通道中。
流与缓冲区和通道的差异
1 利用非阻塞I/O进行交互的时候,不再获得Socket的输入流或者输出流了,而是都要从缓冲区中读写数据
2 流是基于字节的,而通道是基于块的,在利用非阻塞中都是将缓冲区中的数据写入到通道中进而传输,通道传输会传输缓冲区中的数据块。
服务器端非阻塞I/O服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
<code>package</code> <code>com.nio.server;</code>
<code>import</code> <code>java.io.IOException;</code>
<code>import</code> <code>java.net.InetSocketAddress;</code>
<code>import</code> <code>java.net.ServerSocket;</code>
<code>import</code> <code>java.nio.ByteBuffer;</code>
<code>import</code> <code>java.nio.channels.SelectionKey;</code>
<code>import</code> <code>java.nio.channels.Selector;</code>
<code>import</code> <code>java.nio.channels.ServerSocketChannel;</code>
<code>import</code> <code>java.nio.channels.SocketChannel;</code>
<code>import</code> <code>java.util.Iterator;</code>
<code>import</code> <code>java.util.Set;</code>
<code>public</code> <code>class</code> <code>ChargenServer {</code>
<code> </code><code>private</code> <code>static</code> <code>int</code> <code>DEFAULT_PORT=</code><code>2122</code><code>;</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) {</code>
<code> </code><code>// TODO Auto-generated method stub</code>
<code> </code><code>int</code> <code>port=DEFAULT_PORT;</code>
<code> </code><code>byte</code><code>[] rotation=</code><code>new</code> <code>byte</code><code>[</code><code>95</code><code>*</code><code>2</code><code>];</code>
<code> </code><code>for</code><code>( </code><code>byte</code> <code>i=</code><code>' '</code><code>;i<=</code><code>'~'</code><code>;i++)</code>
<code> </code><code>{</code>
<code> </code><code>rotation[i-</code><code>' '</code><code>]=i;</code>
<code> </code><code>rotation[i+</code><code>95</code><code>-</code><code>' '</code><code>]=i;</code>
<code> </code><code>}</code>
<code> </code>
<code> </code><code>ServerSocketChannel serverChannel;</code>
<code> </code><code>Selector selector;</code>
<code> </code><code>try</code><code>{</code>
<code> </code><code>//创建一个服务器端通道,绑定到指定的端口</code>
<code> </code><code>serverChannel=ServerSocketChannel.open();</code>
<code> </code><code>ServerSocket ss=serverChannel.socket();</code>
<code> </code><code>InetSocketAddress address=</code><code>new</code> <code>InetSocketAddress(port);</code>
<code> </code><code>ss.bind(address);</code>
<code> </code>
<code> </code><code>//设置服务器通道为非阻塞即调用accept会立即返回,默认是阻塞的即调用accept会阻塞的</code>
<code> </code><code>serverChannel.configureBlocking(</code><code>false</code><code>);</code>
<code> </code><code>//获取选择器</code>
<code> </code><code>selector=Selector.open();</code>
<code> </code><code>//将服务器通道注册到选择器,监听的事件为准备好接受新的连接</code>
<code> </code><code>serverChannel.register(selector, SelectionKey.OP_ACCEPT);</code>
<code> </code><code>}</code><code>catch</code><code>(IOException e)</code>
<code> </code><code>System.out.println(e);</code>
<code> </code><code>return</code> <code>;</code>
<code> </code><code>while</code><code>(</code><code>true</code><code>)</code>
<code> </code><code>try</code><code>{</code>
<code> </code><code>//这里会一直阻塞,直到有一个连接准备好了</code>
<code> </code><code>selector.select();</code>
<code> </code><code>}</code><code>catch</code><code>(IOException e)</code>
<code> </code><code>{</code>
<code> </code><code>System.err.println(e);</code>
<code> </code><code>break</code><code>;</code><code>//发生异常退出循环</code>
<code> </code><code>}</code>
<code> </code><code>//当有通道已经准备好处理时,此时selectedKeys就会获取就绪的通道</code>
<code> </code><code>//该集合中每一项都是SelectionKey</code>
<code> </code><code>Set readyKeys=selector.selectedKeys();</code>
<code> </code><code>Iterator iterator=readyKeys.iterator();</code>
<code> </code><code>//处理就绪的通道</code>
<code> </code><code>while</code><code>(iterator.hasNext())</code>
<code> </code><code>SelectionKey key=(SelectionKey)iterator.next();</code>
<code> </code><code>//处理该SelectionKey,必须先移除掉</code>
<code> </code><code>iterator.remove();</code>
<code> </code><code>try</code><code>{</code>
<code> </code><code>//判断key的动作,这里是服务器端通道准备好接受连接</code>
<code> </code><code>//这里的接受,代表的是服务器端通道得到了连接请求,已经准备好来连接该请求</code>
<code> </code><code>if</code><code>(key.isAcceptable())</code>
<code> </code><code>{</code>
<code> </code><code>ServerSocketChannel server=(ServerSocketChannel)key.channel();</code>
<code> </code><code>//由于已经在注册的时候设置服务器端通道是非阻塞的,这里accept会立马返回的,</code>
<code> </code><code>//不过这里返回的client是已经申请连接的socket</code>
<code> </code><code>SocketChannel client=server.accept();</code>
<code> </code><code>System.out.println(</code><code>"Accepted connection from :"</code><code>+client);</code>
<code> </code><code>//设置客户端通道是非阻塞的</code>
<code> </code><code>client.configureBlocking(</code><code>false</code><code>);</code>
<code> </code><code>//客户端通道注册到选择器中,监听写入</code>
<code> </code><code>SelectionKey key2=client.register(selector, SelectionKey.OP_WRITE);</code>
<code> </code>
<code> </code><code>ByteBuffer buffer=ByteBuffer.allocate(</code><code>74</code><code>);</code>
<code> </code><code>buffer.put(rotation,</code><code>0</code><code>,</code><code>72</code><code>);</code>
<code> </code><code>buffer.put((</code><code>byte</code><code>)</code><code>'\r'</code><code>);</code>
<code> </code><code>buffer.put((</code><code>byte</code><code>)</code><code>'\n'</code><code>);</code>
<code> </code><code>//每行以\r\n结束</code>
<code> </code><code>//回绕到缓冲区开头再次进行</code>
<code> </code><code>buffer.flip();</code>
<code> </code><code>//将该缓冲区作为附件加在该key2中,这样下次就可以继续获取该缓冲区中的数据</code>
<code> </code><code>key2.attach(buffer);</code>
<code> </code><code>}</code>
<code> </code><code>//客户端通道准备好写入数据发送给客户端</code>
<code> </code><code>else</code> <code>if</code><code>(key.isWritable())</code>
<code> </code><code>//获取客户端通道</code>
<code> </code><code>SocketChannel client=(SocketChannel)key.channel();</code>
<code> </code><code>//获取该客户端通道的key中的附件,即已经准备好的缓冲区</code>
<code> </code><code>ByteBuffer buffer=(ByteBuffer)key.attachment();</code>
<code> </code><code>//缓冲区已经没有剩余数据需要写入通道的时候</code>
<code> </code><code>//判断当前缓冲区的位置与限制之间是否有数据,将未填满的数据继续填充</code>
<code> </code><code>if</code><code>(!buffer.hasRemaining())</code>
<code> </code><code>{</code>
<code> </code><code>//将缓冲区将要被读取或者写入的位置变为0,同时限度不变</code>
<code> </code><code>buffer.rewind();</code>
<code> </code><code>//得到缓冲区当前的位置,并且自增位置</code>
<code> </code><code>int</code> <code>first=buffer.get();</code>
<code> </code><code>//寻找rotation中的新的首字符位置</code>
<code> </code><code>int</code> <code>position=first-</code><code>' '</code><code>+</code><code>1</code><code>;</code>
<code> </code><code>//数据从rotation复制到缓冲区</code>
<code> </code><code>buffer.put(rotation,position,</code><code>72</code><code>);</code>
<code> </code><code>buffer.put((</code><code>byte</code><code>)</code><code>'\r'</code><code>);</code>
<code> </code><code>buffer.put((</code><code>byte</code><code>)</code><code>'\n'</code><code>);</code>
<code> </code><code>buffer.flip();</code>
<code> </code><code>}</code>
<code> </code><code>//将数据发送给客户端</code>
<code> </code><code>client.write(buffer);</code>
<code> </code><code>}</code><code>catch</code><code>(IOException e)</code>
<code> </code><code>{</code>
<code> </code><code>key.cancel();</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code>}</code>
本文转自 zhao_xiao_long 51CTO博客,原文链接:http://blog.51cto.com/computerdragon/1197556