天天看點

《Netty 權威指南》—— NIO建立的TimeClient源碼分析

我們首先還是看下如何對timeclient進行改造:

<code>01</code>

<code>public</code> <code>class</code> <code>timeclient {</code>

<code>02</code>

<code>03</code>

<code>    </code><code>/**</code>

<code>04</code>

<code>     </code><code>* @param args</code>

<code>05</code>

<code>     </code><code>*/</code>

<code>06</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

<code>07</code>

<code>08</code>

<code>    </code><code>int</code> <code>port = </code><code>8080</code><code>;</code>

<code>09</code>

<code>    </code><code>if</code> <code>(args != </code><code>null</code> <code>&amp;&amp; args.length &gt; </code><code>0</code><code>) {</code>

<code>10</code>

<code>        </code><code>try</code> <code>{</code>

<code>11</code>

<code>        </code><code>port = integer.valueof(args[</code><code>0</code><code>]);</code>

<code>12</code>

<code>        </code><code>} </code><code>catch</code> <code>(numberformatexception e) {</code>

<code>13</code>

<code>        </code><code>// 采用預設值</code>

<code>14</code>

<code>        </code><code>}</code>

<code>15</code>

<code>    </code><code>}</code>

<code>16</code>

<code>    </code><code>new</code> <code>thread(</code><code>new</code> <code>timeclienthandle(</code><code>"127.0.0.1"</code><code>, port), </code><code>"timeclient-001"</code><code>)</code>

<code>17</code>

<code>        </code><code>.start();</code>

<code>18</code>

<code>19</code>

<code>}</code>

與之前唯一不同的就是我們通過建立timeclienthandle線程來處理異步連接配接、讀寫操作,由于timeclient非常簡單且變更不大,我們重點分析timeclienthandle,代碼如下:

<code>001</code>

<code>public</code> <code>class</code> <code>timeclienthandle </code><code>implements</code> <code>runnable {</code>

<code>002</code>

<code>    </code><code>private</code> <code>string host;</code>

<code>003</code>

<code>    </code><code>private</code> <code>int</code> <code>port;</code>

<code>004</code>

<code>    </code><code>private</code> <code>selector selector;</code>

<code>005</code>

<code>    </code><code>private</code> <code>socketchannel socketchannel;</code>

<code>006</code>

<code>    </code><code>private</code> <code>volatile</code> <code>boolean</code> <code>stop;</code>

<code>007</code>

<code>008</code>

<code>    </code><code>public</code> <code>timeclienthandle(string host, </code><code>int</code> <code>port) {</code>

<code>009</code>

<code>    </code><code>this</code><code>.host = host == </code><code>null</code> <code>? </code><code>"127.0.0.1"</code> <code>: host;</code>

<code>010</code>

<code>    </code><code>this</code><code>.port = port;</code>

<code>011</code>

<code>    </code><code>try</code> <code>{</code>

<code>012</code>

<code>        </code><code>selector = selector.open();</code>

<code>013</code>

<code>        </code><code>socketchannel = socketchannel.open();</code>

<code>014</code>

<code>        </code><code>socketchannel.configureblocking(</code><code>false</code><code>);</code>

<code>015</code>

<code>    </code><code>} </code><code>catch</code> <code>(ioexception e) {</code>

<code>016</code>

<code>        </code><code>e.printstacktrace();</code>

<code>017</code>

<code>        </code><code>system.exit(</code><code>1</code><code>);</code>

<code>018</code>

<code>019</code>

<code>020</code>

<code>021</code>

<code>    </code><code>/*</code>

<code>022</code>

<code>     </code><code>* (non-javadoc)</code>

<code>023</code>

<code>     </code><code>*</code>

<code>024</code>

<code>     </code><code>* @see java.lang.runnable#run()</code>

<code>025</code>

<code>026</code>

<code>    </code><code>@override</code>

<code>027</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>028</code>

<code>029</code>

<code>        </code><code>doconnect();</code>

<code>030</code>

<code>031</code>

<code>032</code>

<code>033</code>

<code>034</code>

<code>    </code><code>while</code> <code>(!stop) {</code>

<code>035</code>

<code>036</code>

<code>        </code><code>selector.select(</code><code>1000</code><code>);</code>

<code>037</code>

<code>        </code><code>set&lt;selectionkey&gt; selectedkeys = selector.selectedkeys();</code>

<code>038</code>

<code>        </code><code>iterator&lt;selectionkey&gt; it = selectedkeys.iterator();</code>

<code>039</code>

<code>        </code><code>selectionkey key = </code><code>null</code><code>;</code>

<code>040</code>

<code>        </code><code>while</code> <code>(it.hasnext()) {</code>

<code>041</code>

<code>            </code><code>key = it.next();</code>

<code>042</code>

<code>            </code><code>it.remove();</code>

<code>043</code>

<code>            </code><code>try</code> <code>{</code>

<code>044</code>

<code>            </code><code>handleinput(key);</code>

<code>045</code>

<code>            </code><code>} </code><code>catch</code> <code>(exception e) {</code>

<code>046</code>

<code>            </code><code>if</code> <code>(key != </code><code>null</code><code>) {</code>

<code>047</code>

<code>                </code><code>key.cancel();</code>

<code>048</code>

<code>                </code><code>if</code> <code>(key.channel() != </code><code>null</code><code>)</code>

<code>049</code>

<code>                </code><code>key.channel().close();</code>

<code>050</code>

<code>            </code><code>}</code>

<code>051</code>

<code>052</code>

<code>053</code>

<code>        </code><code>} </code><code>catch</code> <code>(exception e) {</code>

<code>054</code>

<code>055</code>

<code>056</code>

<code>057</code>

<code>058</code>

<code>059</code>

<code>    </code><code>// 多路複用器關閉後,所有注冊在上面的channel和pipe等資源都會被自動去注冊并關閉,是以不需要重複釋放資源</code>

<code>060</code>

<code>    </code><code>if</code> <code>(selector != </code><code>null</code><code>)</code>

<code>061</code>

<code>062</code>

<code>        </code><code>selector.close();</code>

<code>063</code>

<code>        </code><code>} </code><code>catch</code> <code>(ioexception e) {</code>

<code>064</code>

<code>065</code>

<code>066</code>

<code>067</code>

<code>068</code>

<code>    </code><code>private</code> <code>void</code> <code>handleinput(selectionkey key) </code><code>throws</code> <code>ioexception {</code>

<code>069</code>

<code>070</code>

<code>    </code><code>if</code> <code>(key.isvalid()) {</code>

<code>071</code>

<code>        </code><code>// 判斷是否連接配接成功</code>

<code>072</code>

<code>        </code><code>socketchannel sc = (socketchannel) key.channel();</code>

<code>073</code>

<code>        </code><code>if</code> <code>(key.isconnectable()) {</code>

<code>074</code>

<code>        </code><code>if</code> <code>(sc.finishconnect()) {</code>

<code>075</code>

<code>            </code><code>sc.register(selector, selectionkey.op_read);</code>

<code>076</code>

<code>            </code><code>dowrite(sc);</code>

<code>077</code>

<code>        </code><code>} </code><code>else</code>

<code>078</code>

<code>            </code><code>system.exit(</code><code>1</code><code>);</code><code>// 連接配接失敗,程序退出</code>

<code>079</code>

<code>080</code>

<code>        </code><code>if</code> <code>(key.isreadable()) {</code>

<code>081</code>

<code>        </code><code>bytebuffer readbuffer = bytebuffer.allocate(</code><code>1024</code><code>);</code>

<code>082</code>

<code>        </code><code>int</code> <code>readbytes = sc.read(readbuffer);</code>

<code>083</code>

<code>        </code><code>if</code> <code>(readbytes &gt; </code><code>0</code><code>) {</code>

<code>084</code>

<code>            </code><code>readbuffer.flip();</code>

<code>085</code>

<code>            </code><code>byte</code><code>[] bytes = </code><code>new</code> <code>byte</code><code>[readbuffer.remaining()];</code>

<code>086</code>

<code>            </code><code>readbuffer.get(bytes);</code>

<code>087</code>

<code>            </code><code>string body = </code><code>new</code> <code>string(bytes, </code><code>"utf-8"</code><code>);</code>

<code>088</code>

<code>            </code><code>system.out.println(</code><code>"now is : "</code> <code>+ body);</code>

<code>089</code>

<code>            </code><code>this</code><code>.stop = </code><code>true</code><code>;</code>

<code>090</code>

<code>        </code><code>} </code><code>else</code> <code>if</code> <code>(readbytes &lt; </code><code>0</code><code>) {</code>

<code>091</code>

<code>            </code><code>// 對端鍊路關閉</code>

<code>092</code>

<code>            </code><code>key.cancel();</code>

<code>093</code>

<code>            </code><code>sc.close();</code>

<code>094</code>

<code>095</code>

<code>            </code><code>; </code><code>// 讀到0位元組,忽略</code>

<code>096</code>

<code>097</code>

<code>098</code>

<code>099</code>

<code>100</code>

<code>101</code>

<code>    </code><code>private</code> <code>void</code> <code>doconnect() </code><code>throws</code> <code>ioexception {</code>

<code>102</code>

<code>    </code><code>// 如果直接連接配接成功,則注冊到多路複用器上,發送請求消息,讀應答</code>

<code>103</code>

<code>    </code><code>if</code> <code>(socketchannel.connect(</code><code>new</code> <code>inetsocketaddress(host, port))) {</code>

<code>104</code>

<code>        </code><code>socketchannel.register(selector, selectionkey.op_read);</code>

<code>105</code>

<code>        </code><code>dowrite(socketchannel);</code>

<code>106</code>

<code>    </code><code>} </code><code>else</code>

<code>107</code>

<code>        </code><code>socketchannel.register(selector, selectionkey.op_connect);</code>

<code>108</code>

<code>109</code>

<code>110</code>

<code>    </code><code>private</code> <code>void</code> <code>dowrite(socketchannel sc) </code><code>throws</code> <code>ioexception {</code>

<code>111</code>

<code>    </code><code>byte</code><code>[] req = </code><code>"query time order"</code><code>.getbytes();</code>

<code>112</code>

<code>    </code><code>bytebuffer writebuffer = bytebuffer.allocate(req.length);</code>

<code>113</code>

<code>    </code><code>writebuffer.put(req);</code>

<code>114</code>

<code>    </code><code>writebuffer.flip();</code>

<code>115</code>

<code>    </code><code>sc.write(writebuffer);</code>

<code>116</code>

<code>    </code><code>if</code> <code>(!writebuffer.hasremaining())</code>

<code>117</code>

<code>        </code><code>system.out.println(</code><code>"send order 2 server succeed."</code><code>);</code>

<code>118</code>

<code>119</code>

與服務端類似,我們通過對關鍵步驟的源碼進行分析和解讀,讓大家深入了解如何建立nio用戶端以及如何使用nio的api。

8-19行構造函數用于初始化nio的多路複用器和socketchannel對象,需要注意的是建立socketchannel之後,需要将其設定為異步非阻塞模式。就像在2.3.3章節中所講的,我們可以設定socketchannel的tcp參數,例如接收和發送的tcp緩沖區大小

28-33行用于發送連接配接請求,作為示例,連接配接是成功的,是以不需要做重連操作,是以将其放到循環之前。下面我們具體看看doconnect的實作,代碼跳到第116-123行,首先對socketchannel的connect()操作進行判斷,如果連接配接成功,則将socketchannel注冊到多路複用器selector上,注冊selectionkey.op_read,如果沒有直接連接配接成功,說明服務端沒有傳回tcp握手應答消息,這并不代表連接配接失敗,我們需要将socketchannel注冊到多路複用器selector上,注冊selectionkey.op_connect,當服務端傳回tcp syn-ack消息後,selector就能夠輪詢到這個socketchannel處于連接配接就緒狀态

4-72行在循環體中輪詢多路複用器selector,當有就緒的channel時,執行第59行的handleinput(key)方法,下面我們就對handleinput方法進行分析。

跳到第68行,我們首先對selectionkey進行判斷,看它處于什麼狀态。如果是處于連接配接狀态,說明服務端已經傳回ack應答消息,我們需要對連接配接結果進行判斷,調用socketchannel的finishconnect()方法,如果傳回值為true,說明用戶端連接配接成功,如果傳回值為false或者直接抛出ioexception,說明連接配接失敗。在本例程中,傳回值為true,說明連接配接成功。将socketchannel注冊到多路複用器上,注冊selectionkey.op_read操作位,監聽網絡讀操作。然後發送請求消息給服務端,下面我們對dowrite(sc)進行分析。代碼跳到110行,我們構造請求消息體,然後對其編碼,寫入到發送緩沖區中,最後調用socketchannel的write方法進行發送,由于發送是異步的,是以會存在“半包寫”問題,此處不再贅述。最後通過hasremaining()方法對發送結果進行判斷,如果緩沖區中的消息全部發送完成,列印”send order 2 server succeed.

代碼傳回第80行,我們繼續分析下用戶端是如何讀取時間伺服器應答消息的。如果用戶端接收到了服務端的應答消息,則socketchannel是可讀的,由于無法事先判斷應答碼流的大小,我們就預配置設定1m的接收緩沖區用于讀取應答消息,調用socketchannel的read()方法進行異步讀取操作,由于是異步操作,是以必須對讀取的結果進行判斷,這部分的處理邏輯已經在2.3.3章節詳細介紹過,此處不再贅述。如果讀取到了消息,則對消息進行解碼,最後列印結果。執行完成後将stop置為true,線程退出循環

線程退出循環後,我們需要對連接配接資源進行釋放,以實作“優雅退出”。60-66行用于多路複用器的資源釋放,由于多路複用器上可能注冊成千上萬的channel或者pipe,如果一一對這些資源進行釋放顯然不合适。是以,jdk底層會自動釋放所有跟此多路複用器關聯的資源,jdk的api doc如下:

《Netty 權威指南》—— NIO建立的TimeClient源碼分析

多路複用器selector的資源釋放

到此為止,我們已經将時間伺服器通過nio完成了改造,并對源碼進行了分析和解讀,下面分别執行時間伺服器的服務端和用戶端,看執行結果。

服務端執行結果:

《Netty 權威指南》—— NIO建立的TimeClient源碼分析

nio時間伺服器服務端執行結果

用戶端執行結果:

《Netty 權威指南》—— NIO建立的TimeClient源碼分析

nio時間伺服器用戶端執行結果

通過源碼對比分析,我們發現nio程式設計難度确實比同步阻塞bio大很多,我們的nio例程并沒有考慮“半包讀”和“半包寫”,如果加上這些,代碼将會更加複雜。nio代碼既然這麼複雜,為什麼它的應用卻越來越廣泛呢,使用nio程式設計的優點總結如下:

1)  用戶端發起的連接配接操作是異步的,可以通過在多路複用器注冊op_connect等待後續結果,不需要像之前的用戶端那樣被同步阻塞;

2) socketchannel的讀寫操作都是異步的,如果沒有可讀寫的資料它不會同步等待,直接傳回,這樣io通信線程就可以處理其它的鍊路,不需要同步等待這個鍊路可用;

3)  線程模型的優化:由于jdk的selector在linux等主流作業系統上通過epoll實作,它沒有連接配接句柄數的限制(隻受限于作業系統的最大句柄數或者對單個程序的句柄限制),這意味着一個selector線程可以同時處理成千上萬個用戶端連接配接,而且性能不會随着用戶端的增加而線性下降,是以,它非常适合做高性能、高負載的網絡伺服器。

jdk1.7更新了nio類庫,更新後的nio類庫被稱為nio2.0,引人注目的是java正式提供了異步檔案io操作,同時提供了與unix網絡程式設計事件驅動io對應的aio,下面的2.4章節我們學習下如何利用nio2.0編寫aio程式,我們還是以時間伺服器為例進行講解。