我們首先還是看下如何對timeclient進行改造:
<code>01</code>
<code>public</code> <code>class</code> <code>timeclient {</code>
<code>02</code>
<code>03</code>
<code> </code><code>/**</code>
<code>04</code>
<code> </code><code>* @param args</code>
<code>05</code>
<code> </code><code>*/</code>
<code>06</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>
<code>07</code>
<code>08</code>
<code> </code><code>int</code> <code>port = </code><code>8080</code><code>;</code>
<code>09</code>
<code> </code><code>if</code> <code>(args != </code><code>null</code> <code>&& args.length > </code><code>0</code><code>) {</code>
<code>10</code>
<code> </code><code>try</code> <code>{</code>
<code>11</code>
<code> </code><code>port = integer.valueof(args[</code><code>0</code><code>]);</code>
<code>12</code>
<code> </code><code>} </code><code>catch</code> <code>(numberformatexception e) {</code>
<code>13</code>
<code> </code><code>// 采用預設值</code>
<code>14</code>
<code> </code><code>}</code>
<code>15</code>
<code> </code><code>}</code>
<code>16</code>
<code> </code><code>new</code> <code>thread(</code><code>new</code> <code>timeclienthandle(</code><code>"127.0.0.1"</code><code>, port), </code><code>"timeclient-001"</code><code>)</code>
<code>17</code>
<code> </code><code>.start();</code>
<code>18</code>
<code>19</code>
<code>}</code>
與之前唯一不同的就是我們通過建立timeclienthandle線程來處理異步連接配接、讀寫操作,由于timeclient非常簡單且變更不大,我們重點分析timeclienthandle,代碼如下:
<code>001</code>
<code>public</code> <code>class</code> <code>timeclienthandle </code><code>implements</code> <code>runnable {</code>
<code>002</code>
<code> </code><code>private</code> <code>string host;</code>
<code>003</code>
<code> </code><code>private</code> <code>int</code> <code>port;</code>
<code>004</code>
<code> </code><code>private</code> <code>selector selector;</code>
<code>005</code>
<code> </code><code>private</code> <code>socketchannel socketchannel;</code>
<code>006</code>
<code> </code><code>private</code> <code>volatile</code> <code>boolean</code> <code>stop;</code>
<code>007</code>
<code>008</code>
<code> </code><code>public</code> <code>timeclienthandle(string host, </code><code>int</code> <code>port) {</code>
<code>009</code>
<code> </code><code>this</code><code>.host = host == </code><code>null</code> <code>? </code><code>"127.0.0.1"</code> <code>: host;</code>
<code>010</code>
<code> </code><code>this</code><code>.port = port;</code>
<code>011</code>
<code> </code><code>try</code> <code>{</code>
<code>012</code>
<code> </code><code>selector = selector.open();</code>
<code>013</code>
<code> </code><code>socketchannel = socketchannel.open();</code>
<code>014</code>
<code> </code><code>socketchannel.configureblocking(</code><code>false</code><code>);</code>
<code>015</code>
<code> </code><code>} </code><code>catch</code> <code>(ioexception e) {</code>
<code>016</code>
<code> </code><code>e.printstacktrace();</code>
<code>017</code>
<code> </code><code>system.exit(</code><code>1</code><code>);</code>
<code>018</code>
<code>019</code>
<code>020</code>
<code>021</code>
<code> </code><code>/*</code>
<code>022</code>
<code> </code><code>* (non-javadoc)</code>
<code>023</code>
<code> </code><code>*</code>
<code>024</code>
<code> </code><code>* @see java.lang.runnable#run()</code>
<code>025</code>
<code>026</code>
<code> </code><code>@override</code>
<code>027</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code>028</code>
<code>029</code>
<code> </code><code>doconnect();</code>
<code>030</code>
<code>031</code>
<code>032</code>
<code>033</code>
<code>034</code>
<code> </code><code>while</code> <code>(!stop) {</code>
<code>035</code>
<code>036</code>
<code> </code><code>selector.select(</code><code>1000</code><code>);</code>
<code>037</code>
<code> </code><code>set<selectionkey> selectedkeys = selector.selectedkeys();</code>
<code>038</code>
<code> </code><code>iterator<selectionkey> it = selectedkeys.iterator();</code>
<code>039</code>
<code> </code><code>selectionkey key = </code><code>null</code><code>;</code>
<code>040</code>
<code> </code><code>while</code> <code>(it.hasnext()) {</code>
<code>041</code>
<code> </code><code>key = it.next();</code>
<code>042</code>
<code> </code><code>it.remove();</code>
<code>043</code>
<code> </code><code>try</code> <code>{</code>
<code>044</code>
<code> </code><code>handleinput(key);</code>
<code>045</code>
<code> </code><code>} </code><code>catch</code> <code>(exception e) {</code>
<code>046</code>
<code> </code><code>if</code> <code>(key != </code><code>null</code><code>) {</code>
<code>047</code>
<code> </code><code>key.cancel();</code>
<code>048</code>
<code> </code><code>if</code> <code>(key.channel() != </code><code>null</code><code>)</code>
<code>049</code>
<code> </code><code>key.channel().close();</code>
<code>050</code>
<code> </code><code>}</code>
<code>051</code>
<code>052</code>
<code>053</code>
<code> </code><code>} </code><code>catch</code> <code>(exception e) {</code>
<code>054</code>
<code>055</code>
<code>056</code>
<code>057</code>
<code>058</code>
<code>059</code>
<code> </code><code>// 多路複用器關閉後,所有注冊在上面的channel和pipe等資源都會被自動去注冊并關閉,是以不需要重複釋放資源</code>
<code>060</code>
<code> </code><code>if</code> <code>(selector != </code><code>null</code><code>)</code>
<code>061</code>
<code>062</code>
<code> </code><code>selector.close();</code>
<code>063</code>
<code> </code><code>} </code><code>catch</code> <code>(ioexception e) {</code>
<code>064</code>
<code>065</code>
<code>066</code>
<code>067</code>
<code>068</code>
<code> </code><code>private</code> <code>void</code> <code>handleinput(selectionkey key) </code><code>throws</code> <code>ioexception {</code>
<code>069</code>
<code>070</code>
<code> </code><code>if</code> <code>(key.isvalid()) {</code>
<code>071</code>
<code> </code><code>// 判斷是否連接配接成功</code>
<code>072</code>
<code> </code><code>socketchannel sc = (socketchannel) key.channel();</code>
<code>073</code>
<code> </code><code>if</code> <code>(key.isconnectable()) {</code>
<code>074</code>
<code> </code><code>if</code> <code>(sc.finishconnect()) {</code>
<code>075</code>
<code> </code><code>sc.register(selector, selectionkey.op_read);</code>
<code>076</code>
<code> </code><code>dowrite(sc);</code>
<code>077</code>
<code> </code><code>} </code><code>else</code>
<code>078</code>
<code> </code><code>system.exit(</code><code>1</code><code>);</code><code>// 連接配接失敗,程序退出</code>
<code>079</code>
<code>080</code>
<code> </code><code>if</code> <code>(key.isreadable()) {</code>
<code>081</code>
<code> </code><code>bytebuffer readbuffer = bytebuffer.allocate(</code><code>1024</code><code>);</code>
<code>082</code>
<code> </code><code>int</code> <code>readbytes = sc.read(readbuffer);</code>
<code>083</code>
<code> </code><code>if</code> <code>(readbytes > </code><code>0</code><code>) {</code>
<code>084</code>
<code> </code><code>readbuffer.flip();</code>
<code>085</code>
<code> </code><code>byte</code><code>[] bytes = </code><code>new</code> <code>byte</code><code>[readbuffer.remaining()];</code>
<code>086</code>
<code> </code><code>readbuffer.get(bytes);</code>
<code>087</code>
<code> </code><code>string body = </code><code>new</code> <code>string(bytes, </code><code>"utf-8"</code><code>);</code>
<code>088</code>
<code> </code><code>system.out.println(</code><code>"now is : "</code> <code>+ body);</code>
<code>089</code>
<code> </code><code>this</code><code>.stop = </code><code>true</code><code>;</code>
<code>090</code>
<code> </code><code>} </code><code>else</code> <code>if</code> <code>(readbytes < </code><code>0</code><code>) {</code>
<code>091</code>
<code> </code><code>// 對端鍊路關閉</code>
<code>092</code>
<code> </code><code>key.cancel();</code>
<code>093</code>
<code> </code><code>sc.close();</code>
<code>094</code>
<code>095</code>
<code> </code><code>; </code><code>// 讀到0位元組,忽略</code>
<code>096</code>
<code>097</code>
<code>098</code>
<code>099</code>
<code>100</code>
<code>101</code>
<code> </code><code>private</code> <code>void</code> <code>doconnect() </code><code>throws</code> <code>ioexception {</code>
<code>102</code>
<code> </code><code>// 如果直接連接配接成功,則注冊到多路複用器上,發送請求消息,讀應答</code>
<code>103</code>
<code> </code><code>if</code> <code>(socketchannel.connect(</code><code>new</code> <code>inetsocketaddress(host, port))) {</code>
<code>104</code>
<code> </code><code>socketchannel.register(selector, selectionkey.op_read);</code>
<code>105</code>
<code> </code><code>dowrite(socketchannel);</code>
<code>106</code>
<code> </code><code>} </code><code>else</code>
<code>107</code>
<code> </code><code>socketchannel.register(selector, selectionkey.op_connect);</code>
<code>108</code>
<code>109</code>
<code>110</code>
<code> </code><code>private</code> <code>void</code> <code>dowrite(socketchannel sc) </code><code>throws</code> <code>ioexception {</code>
<code>111</code>
<code> </code><code>byte</code><code>[] req = </code><code>"query time order"</code><code>.getbytes();</code>
<code>112</code>
<code> </code><code>bytebuffer writebuffer = bytebuffer.allocate(req.length);</code>
<code>113</code>
<code> </code><code>writebuffer.put(req);</code>
<code>114</code>
<code> </code><code>writebuffer.flip();</code>
<code>115</code>
<code> </code><code>sc.write(writebuffer);</code>
<code>116</code>
<code> </code><code>if</code> <code>(!writebuffer.hasremaining())</code>
<code>117</code>
<code> </code><code>system.out.println(</code><code>"send order 2 server succeed."</code><code>);</code>
<code>118</code>
<code>119</code>
與服務端類似,我們通過對關鍵步驟的源碼進行分析和解讀,讓大家深入了解如何建立nio用戶端以及如何使用nio的api。
8-19行構造函數用于初始化nio的多路複用器和socketchannel對象,需要注意的是建立socketchannel之後,需要将其設定為異步非阻塞模式。就像在2.3.3章節中所講的,我們可以設定socketchannel的tcp參數,例如接收和發送的tcp緩沖區大小
28-33行用于發送連接配接請求,作為示例,連接配接是成功的,是以不需要做重連操作,是以将其放到循環之前。下面我們具體看看doconnect的實作,代碼跳到第116-123行,首先對socketchannel的connect()操作進行判斷,如果連接配接成功,則将socketchannel注冊到多路複用器selector上,注冊selectionkey.op_read,如果沒有直接連接配接成功,說明服務端沒有傳回tcp握手應答消息,這并不代表連接配接失敗,我們需要将socketchannel注冊到多路複用器selector上,注冊selectionkey.op_connect,當服務端傳回tcp syn-ack消息後,selector就能夠輪詢到這個socketchannel處于連接配接就緒狀态
4-72行在循環體中輪詢多路複用器selector,當有就緒的channel時,執行第59行的handleinput(key)方法,下面我們就對handleinput方法進行分析。
跳到第68行,我們首先對selectionkey進行判斷,看它處于什麼狀态。如果是處于連接配接狀态,說明服務端已經傳回ack應答消息,我們需要對連接配接結果進行判斷,調用socketchannel的finishconnect()方法,如果傳回值為true,說明用戶端連接配接成功,如果傳回值為false或者直接抛出ioexception,說明連接配接失敗。在本例程中,傳回值為true,說明連接配接成功。将socketchannel注冊到多路複用器上,注冊selectionkey.op_read操作位,監聽網絡讀操作。然後發送請求消息給服務端,下面我們對dowrite(sc)進行分析。代碼跳到110行,我們構造請求消息體,然後對其編碼,寫入到發送緩沖區中,最後調用socketchannel的write方法進行發送,由于發送是異步的,是以會存在“半包寫”問題,此處不再贅述。最後通過hasremaining()方法對發送結果進行判斷,如果緩沖區中的消息全部發送完成,列印”send order 2 server succeed.
代碼傳回第80行,我們繼續分析下用戶端是如何讀取時間伺服器應答消息的。如果用戶端接收到了服務端的應答消息,則socketchannel是可讀的,由于無法事先判斷應答碼流的大小,我們就預配置設定1m的接收緩沖區用于讀取應答消息,調用socketchannel的read()方法進行異步讀取操作,由于是異步操作,是以必須對讀取的結果進行判斷,這部分的處理邏輯已經在2.3.3章節詳細介紹過,此處不再贅述。如果讀取到了消息,則對消息進行解碼,最後列印結果。執行完成後将stop置為true,線程退出循環
線程退出循環後,我們需要對連接配接資源進行釋放,以實作“優雅退出”。60-66行用于多路複用器的資源釋放,由于多路複用器上可能注冊成千上萬的channel或者pipe,如果一一對這些資源進行釋放顯然不合适。是以,jdk底層會自動釋放所有跟此多路複用器關聯的資源,jdk的api doc如下:
多路複用器selector的資源釋放
到此為止,我們已經将時間伺服器通過nio完成了改造,并對源碼進行了分析和解讀,下面分别執行時間伺服器的服務端和用戶端,看執行結果。
服務端執行結果:
nio時間伺服器服務端執行結果
用戶端執行結果:
nio時間伺服器用戶端執行結果
通過源碼對比分析,我們發現nio程式設計難度确實比同步阻塞bio大很多,我們的nio例程并沒有考慮“半包讀”和“半包寫”,如果加上這些,代碼将會更加複雜。nio代碼既然這麼複雜,為什麼它的應用卻越來越廣泛呢,使用nio程式設計的優點總結如下:
1) 用戶端發起的連接配接操作是異步的,可以通過在多路複用器注冊op_connect等待後續結果,不需要像之前的用戶端那樣被同步阻塞;
2) socketchannel的讀寫操作都是異步的,如果沒有可讀寫的資料它不會同步等待,直接傳回,這樣io通信線程就可以處理其它的鍊路,不需要同步等待這個鍊路可用;
3) 線程模型的優化:由于jdk的selector在linux等主流作業系統上通過epoll實作,它沒有連接配接句柄數的限制(隻受限于作業系統的最大句柄數或者對單個程序的句柄限制),這意味着一個selector線程可以同時處理成千上萬個用戶端連接配接,而且性能不會随着用戶端的增加而線性下降,是以,它非常适合做高性能、高負載的網絡伺服器。
jdk1.7更新了nio類庫,更新後的nio類庫被稱為nio2.0,引人注目的是java正式提供了異步檔案io操作,同時提供了與unix網絡程式設計事件驅動io對應的aio,下面的2.4章節我們學習下如何利用nio2.0編寫aio程式,我們還是以時間伺服器為例進行講解。