天天看點

《Netty 權威指南》—— AIO 建立的TimeServer源碼分析

nio2.0引入了新的異步通道的概念,并提供了異步檔案通道和異步套接字通道的實作。異步通道提供兩種方式擷取擷取操作結果:

通過java.util.concurrent.future類來表示異步操作的結果;

在執行異步操作的時候傳入一個java.nio.channels.

completionhandler接口的實作類作為操作完成的回調。

nio2.0的異步套接字通道是真正的異步非阻塞io,它對應unix網絡程式設計中的事件驅動io(aio),它不需要通過多路複用器(selector)對注冊的通道進行輪詢操作即可實作異步讀寫,簡化了nio的程式設計模型。

下面還是通過代碼來熟悉nio2.0 aio的相關類庫,我們仍舊以時間伺服器為例程進行講解。

首先看下時間伺服器的主函數:

<code>01</code>

<code>public</code> <code>class</code> <code>timeserver {</code>

<code>02</code>

<code>03</code>

<code>/**</code>

<code>04</code>

<code>* @param args</code>

<code>05</code>

<code>* @throws ioexception</code>

<code>06</code>

<code>*/</code>

<code>07</code>

<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) </code><code>throws</code> <code>ioexception {</code>

<code>08</code>

<code>int</code> <code>port = </code><code>8080</code><code>;</code>

<code>09</code>

<code>if</code> <code>(args != </code><code>null</code> <code>&amp;&amp; args.length &gt; </code><code>0</code><code>) {</code>

<code>10</code>

<code>try</code> <code>{</code>

<code>11</code>

<code>port = integer.valueof(args[</code><code>0</code><code>]);</code>

<code>12</code>

<code>} </code><code>catch</code> <code>(numberformatexception e) {</code>

<code>13</code>

<code>// 采用預設值</code>

<code>14</code>

<code>}</code>

<code>15</code>

<code>16</code>

<code>asynctimeserverhandler timeserver = </code><code>new</code> <code>asynctimeserverhandler(port);</code>

<code>17</code>

<code>new</code> <code>thread(timeserver, </code><code>"aio-asynctimeserverhandler-001"</code><code>).start();</code>

<code>18</code>

<code>19</code>

我們直接從第16行開始看,首先建立異步的時間伺服器處理類,然後啟動線程将asynctimeserverhandler拉起,代碼如下:

<code>public</code> <code>class</code> <code>asynctimeserverhandler </code><code>implements</code> <code>runnable {</code>

<code>    </code><code>private</code> <code>int</code> <code>port;</code>

<code>    </code><code>countdownlatch latch;</code>

<code>    </code><code>asynchronousserversocketchannel asynchronousserversocketchannel;</code>

<code>    </code><code>public</code> <code>asynctimeserverhandler(</code><code>int</code> <code>port) {</code>

<code>    </code><code>this</code><code>.port = port;</code>

<code>    </code><code>try</code> <code>{</code>

<code>        </code><code>asynchronousserversocketchannel = asynchronousserversocketchannel</code>

<code>            </code><code>.open();</code>

<code>        </code><code>asynchronousserversocketchannel.bind(</code><code>new</code> <code>inetsocketaddress(port));</code>

<code>        </code><code>system.out.println(</code><code>"the time server is start in port : "</code> <code>+ port);</code>

<code>    </code><code>} </code><code>catch</code> <code>(ioexception e) {</code>

<code>        </code><code>e.printstacktrace();</code>

<code>    </code><code>}</code>

<code>20</code>

<code>    </code><code>/*</code>

<code>21</code>

<code>     </code><code>* (non-javadoc)</code>

<code>22</code>

<code>     </code><code>*</code>

<code>23</code>

<code>     </code><code>* @see java.lang.runnable#run()</code>

<code>24</code>

<code>     </code><code>*/</code>

<code>25</code>

<code>    </code><code>@override</code>

<code>26</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>27</code>

<code>28</code>

<code>    </code><code>latch = </code><code>new</code> <code>countdownlatch(</code><code>1</code><code>);</code>

<code>29</code>

<code>    </code><code>doaccept();</code>

<code>30</code>

<code>31</code>

<code>        </code><code>latch.await();</code>

<code>32</code>

<code>    </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>

<code>33</code>

<code>34</code>

<code>35</code>

<code>36</code>

<code>37</code>

<code>    </code><code>public</code> <code>void</code> <code>doaccept() {</code>

<code>38</code>

<code>    </code><code>asynchronousserversocketchannel.accept(</code><code>this</code><code>,</code>

<code>39</code>

<code>        </code><code>new</code> <code>acceptcompletionhandler());</code>

<code>40</code>

我們重點對asynctimeserverhandler進行分析,首先看8-15行,在構造方法中,我們首先建立一個異步的服務端通道asynchronousserversocketchannel,然後調用它的bind方法綁定監聽端口,如果端口合法且沒被占用,綁定成功,列印啟動成功提示到控制台。

線上程的run方法中,第26行我們初始化countdownlatch對象,它的作用是在完成一組正在執行的操作之前,允許目前的線程一直阻塞。在本例程中,我們讓線程在此阻塞,防止服務端執行完成退出。在實際項目應用中,不需要啟動獨立的線程來處理asynchronousserversocketchannel,這裡僅僅是個demo示範。

第24行用于接收用戶端的連接配接,由于是異步操作,我們可以傳遞一個

completionhandler&lt;asynchronoussocketchannel,? super a&gt;類型的handler執行個體接收accept操作成功的通知消息,在本例程中我們通過acceptcompletionhandler執行個體作為handler接收通知消息,下面,我們繼續對acceptcompletionhandler進行分析:

<code>public</code> <code>class</code> <code>acceptcompletionhandler </code><code>implements</code>

<code>    </code><code>completionhandler&lt;asynchronoussocketchannel, asynctimeserverhandler&gt; {</code>

<code>    </code><code>public</code> <code>void</code> <code>completed(asynchronoussocketchannel result,</code>

<code>        </code><code>asynctimeserverhandler attachment) {</code>

<code>    </code><code>attachment.asynchronousserversocketchannel.accept(attachment, </code><code>this</code><code>);</code>

<code>    </code><code>bytebuffer buffer = bytebuffer.allocate(</code><code>1024</code><code>);</code>

<code>    </code><code>result.read(buffer, buffer, </code><code>new</code> <code>readcompletionhandler(result));</code>

<code>    </code><code>public</code> <code>void</code> <code>failed(throwable exc, asynctimeserverhandler attachment) {</code>

<code>    </code><code>exc.printstacktrace();</code>

<code>    </code><code>attachment.latch.countdown();</code>

completionhandler有兩個方法,分别是:

1)  public void completed(asynchronoussocketchannel result,

asynctimeserverhandler attachment);

2) public void failed(throwable exc, asynctimeserverhandler attachment);

下面我們分别對這兩個接口的實作進行分析:首先看completed接口的實作,代碼7-10行,我們從attachment擷取成員變量asynchronousserversocketchannel,然後繼續調用它的accept方法。可能讀者在此可能會心存疑惑,既然已經接收用戶端成功了,為什麼還要再次調用accept方法呢?原因是這樣的:當我們調用asynchronousserversocketchannel的accept方法後,如果有新的用戶端連接配接接入,系統将回調我們傳入的completionhandler執行個體的completed方法,表示新的用戶端已經接入成功,因為一個asynchronousserversocketchannel可以接收成千上萬個用戶端,是以我們需要繼續調用它的accept方法,接收其它的用戶端連接配接,最終形成一個循環。每當接收一個客戶讀連接配接成功之後,再異步接收新的用戶端連接配接。

鍊路建立成功之後,服務端需要接收用戶端的請求消息,代碼第8行我們建立新的bytebuffer,預配置設定1m的緩沖區。第8行我們通過調用asynchronoussocketchannel的read方法進行異步讀操作。下面我們看看異步read方法的參數:

1)   bytebuffer dst:接收緩沖區,用于從異步channel中讀取資料包;

2)  a attachment:異步channel攜帶的附件,通知回調的時候作為入參使用;

3)  completionhandler&lt;integer,? super a&gt;:接收通知回調的業務handler,本例程中為readcompletionhandler。

下面我們繼續對readcompletionhandler進行分析:

<code>public</code> <code>class</code> <code>readcompletionhandler </code><code>implements</code>

<code>    </code><code>completionhandler&lt;integer, bytebuffer&gt; {</code>

<code>    </code><code>private</code> <code>asynchronoussocketchannel channel;</code>

<code>    </code><code>public</code> <code>readcompletionhandler(asynchronoussocketchannel channel) {</code>

<code>    </code><code>if</code> <code>(</code><code>this</code><code>.channel == </code><code>null</code><code>)</code>

<code>        </code><code>this</code><code>.channel = channel;</code>

<code>    </code><code>public</code> <code>void</code> <code>completed(integer result, bytebuffer attachment) {</code>

<code>    </code><code>attachment.flip();</code>

<code>    </code><code>byte</code><code>[] body = </code><code>new</code> <code>byte</code><code>[attachment.remaining()];</code>

<code>    </code><code>attachment.get(body);</code>

<code>        </code><code>string req = </code><code>new</code> <code>string(body, </code><code>"utf-8"</code><code>);</code>

<code>        </code><code>system.out.println(</code><code>"the time server receive order : "</code> <code>+ req);</code>

<code>        </code><code>string currenttime = </code><code>"query time order"</code><code>.equalsignorecase(req) ? </code><code>new</code><code>java.util.date(</code>

<code>            </code><code>system.currenttimemillis()).tostring() : </code><code>"bad order"</code><code>;</code>

<code>        </code><code>dowrite(currenttime);</code>

<code>    </code><code>} </code><code>catch</code> <code>(unsupportedencodingexception e) {</code>

<code>    </code><code>private</code> <code>void</code> <code>dowrite(string currenttime) {</code>

<code>    </code><code>if</code> <code>(currenttime != </code><code>null</code> <code>&amp;&amp; currenttime.trim().length() &gt; </code><code>0</code><code>) {</code>

<code>        </code><code>byte</code><code>[] bytes = (currenttime).getbytes();</code>

<code>        </code><code>bytebuffer writebuffer = bytebuffer.allocate(bytes.length);</code>

<code>        </code><code>writebuffer.put(bytes);</code>

<code>        </code><code>writebuffer.flip();</code>

<code>        </code><code>channel.write(writebuffer, writebuffer,</code>

<code>            </code><code>new</code> <code>completionhandler&lt;integer, bytebuffer&gt;() {</code>

<code>            </code><code>@override</code>

<code>            </code><code>public</code> <code>void</code> <code>completed(integer result, bytebuffer buffer) {</code>

<code>                </code><code>// 如果沒有發送完成,繼續發送</code>

<code>                </code><code>if</code> <code>(buffer.hasremaining())</code>

<code>                </code><code>channel.write(buffer, buffer, </code><code>this</code><code>);</code>

<code>            </code><code>}</code>

<code>41</code>

<code>42</code>

<code>43</code>

<code>            </code><code>public</code> <code>void</code> <code>failed(throwable exc, bytebuffer attachment) {</code>

<code>44</code>

<code>                </code><code>try</code> <code>{</code>

<code>45</code>

<code>                </code><code>channel.close();</code>

<code>46</code>

<code>                </code><code>} </code><code>catch</code> <code>(ioexception e) {</code>

<code>47</code>

<code>                </code><code>// ingnore on close</code>

<code>48</code>

<code>                </code><code>}</code>

<code>49</code>

<code>50</code>

<code>            </code><code>});</code>

<code>51</code>

<code>52</code>

<code>53</code>

<code>54</code>

<code>55</code>

<code>    </code><code>public</code> <code>void</code> <code>failed(throwable exc, bytebuffer attachment) {</code>

<code>56</code>

<code>57</code>

<code>        </code><code>this</code><code>.channel.close();</code>

<code>58</code>

<code>59</code>

<code>60</code>

<code>61</code>

<code>62</code>

首先看構造方法,我們将asynchronoussocketchannel通過參數傳遞到readcompletionhandler中當作成員變量來使用,主要用于讀取半包消息和發送應答。本例程不對半包讀寫進行具體解說,對此感興趣的可以關注後續章節對netty半包處理的專題介紹。我們繼續看代碼,第12-25行是讀取到消息後的處理,首先對attachment進行flip操作,為後續從緩沖區讀取資料做準備。根據緩沖區的可讀位元組數建立byte數組,然後通過new string方法建立請求消息,對請求消息進行判斷,如果是”query time order”則擷取目前系統伺服器的時間,調用dowrite方法發送給用戶端。下面我們對dowrite方法進行詳細分析。

跳到代碼第28行,首先對目前時間進行合法性校驗,如果合法,調用字元串的解碼方法将應答消息編碼成位元組數組,然後将它拷貝到發送緩沖區writebuffer中,最後調用asynchronoussocketchannel的異步write方法。正如前面介紹的異步read方法一樣,它也有三個與read方法相同的參數,在本例程中我們直接實作write方法的異步回調接口completionhandler,代碼跳到第24行,對發送的writebuffer進行判斷,如果還有剩餘的位元組可寫,說明沒有發送完成,需要繼續發送,直到發送成功。

最後,我們關注下failed方法,它的實作很簡單,就是當發生異常的時候,我們對異常throwable進行判斷,如果是io異常,就關閉鍊路,釋放資源,如果是其它異常,按照業務自己的邏輯進行處理。本例程作為簡單demo,沒有對異常進行分類判斷,隻要發生了讀寫異常,就關閉鍊路,釋放資源。

異步非阻塞io版本的時間伺服器服務端已經介紹完畢,下面我們繼續看用戶端的實作。