天天看點

Java并發程式設計【1.2時代】

    本文介紹了java原生的多線程技術(1.2),通過詳細介紹wait和notify相關的機制、基礎的多線程技術以及基于這些技術的等待逾時、線程間的通信技術和線程池高階技術,最後通過一個基于線程池的簡單文本web伺服器—mollyserver,來闡明多線程帶來好處。通過介紹這些技術,展示了在沒有使用java并發包的時代(1.5-)是如何完成java的多線程程式設計,為了解java5提供了良好幫助。

Java并發程式設計【1.2時代】

       java從誕生開始就明智的選擇内置對多線程的支援,這将java語言同其他同一時期的語言相比,具有明顯優勢。線程作為作業系統最小的排程單元,多個線程同時執行,将會改善我們的代碼,在多核環境中具有更加明顯的好處,但是過多的建立線程和對線程的不當管理也容易造成問題。

       java中啟動線程必須要先行的構造一個thread對象,然後調用這個對象的start方法。

<code>01</code>

<code>this</code><code>.group = g;</code>

<code>02</code>

<code>    </code><code>this</code><code>.daemon = parent.isdaemon();</code>

<code>03</code>

<code>    </code><code>this</code><code>.priority = parent.getpriority();</code>

<code>04</code>

<code>    </code><code>this</code><code>.name = name.tochararray();</code>

<code>05</code>

<code>    </code><code>if</code> <code>(security ==</code><code>null</code> <code>|| isccloverridden(parent.getclass()))</code>

<code>06</code>

<code>        </code><code>this</code><code>.contextclassloader = parent.getcontextclassloader();</code>

<code>07</code>

<code>    </code><code>else</code>

<code>08</code>

<code>        </code><code>this</code><code>.contextclassloader = parent.contextclassloader;</code>

<code>09</code>

<code>    </code><code>this</code><code>.inheritedaccesscontrolcontext = accesscontroller.getcontext();</code>

<code>10</code>

<code>    </code><code>this</code><code>.target = target;</code>

<code>11</code>

<code>    </code><code>setpriority(priority);</code>

<code>12</code>

<code>        </code><code>if</code> <code>(parent.inheritablethreadlocals !=</code><code>null</code><code>)</code>

<code>13</code>

<code>        </code><code>this</code><code>.inheritablethreadlocals =</code>

<code>14</code>

<code>        </code><code>threadlocal.createinheritedmap(parent.inheritablethreadlocals);</code>

<code>15</code>

<code>        </code><code>/* stash the specified stack size in case the vm cares */</code>

<code>16</code>

<code>        </code><code>this.stacksize = stacksize;</code>

<code>17</code>

<code>18</code>

<code>        </code><code>/* set thread id */</code>

<code>19</code>

<code>        </code><code>tid = nextthreadid();</code>

線程的構造,最主要或者說也就是線程對象的初始化過程,在上述過程中,一個新構造的線程對象是由其parent線程來進行配置設定空間的,而child線程繼承了parent的是否daemon,優先級和加載資源的classloader,棧空間的大小并且還會配置設定一個唯一的id來辨別這個child線程,至此一個能夠運作的線程對象就初始化好了,在堆記憶體中等待着運作。

          調用thread對象的start方法,就可啟動一個新的線程,parent線程同步告知java vm,隻要線程規劃器空閑,應立即啟動這個線程。

Java并發程式設計【1.2時代】

         而啟動線程,也是交給作業系統來完成,這裡就是一個本地方法了。

         啟動一個線程時,最好設定名稱,這樣在jstack分析時,就會好很多,自定義的線程最好能夠起個名字。

<code>/**</code>

<code> </code><code>* @author weipeng</code>

<code> </code><code>*</code>

<code> </code><code>*/</code>

<code>public</code> <code>class</code> <code>threadname {</code>

<code>    </code><code>/**</code>

<code>     </code><code>* @param args</code>

<code>     </code><code>*/</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

<code>        </code><code>thread t =</code><code>new</code> <code>thread(</code><code>new</code> <code>job());</code>

<code>        </code><code>t.setname(</code><code>"threadnamejob"</code><code>);</code>

<code>        </code><code>t.start();</code>

<code>    </code><code>}</code>

<code>    </code><code>static</code> <code>class</code> <code>job</code><code>implements</code> <code>runnable {</code>

<code>        </code><code>@override</code>

<code>        </code><code>public</code> <code>void</code> <code>run() {</code>

<code>20</code>

<code>            </code><code>try</code> <code>{</code>

<code>21</code>

<code>                </code><code>thread.sleep(</code><code>10000</code><code>);</code>

<code>22</code>

<code>            </code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>23</code>

<code>                </code><code>e.printstacktrace();</code>

<code>24</code>

<code>            </code><code>}</code>

<code>25</code>

<code>        </code><code>}</code>

<code>26</code>

<code>27</code>

<code>28</code>

<code>29</code>

<code>}</code>

        上述代碼直接運作,可以通過jstack pid來觀察棧資訊,結果如下:

<code>2012-05-05 23:50:07</code>

<code>full thread dump java hotspot(tm) 64-bit server vm (20.1-b02 mixed mode):</code>

<code>"attach listener" daemon prio=10 tid=0x00007f4c38001000 nid=0x30b5 waiting on condition [0x0000000000000000]</code>

<code>   </code><code>java.lang.thread.state: runnable</code>

<code>"destroyjavavm" prio=10 tid=0x00007f4c60007800 nid=0x3086 waiting on condition [0x0000000000000000]</code>

<code>"threadnamejob" prio=10 tid=0x00007f4c600a2800 nid=0x3097 waiting on condition [0x00007f4c37cfb000]</code>

<code>   </code><code>java.lang.thread.state: timed_waiting (sleeping)</code>

<code>    </code><code>at java.lang.thread.sleep(native method)</code>

<code>    </code><code>at com.murdock.books.multithread.example.threadname$job.run(threadname.java:26)</code>

<code>    </code><code>at java.lang.thread.run(thread.java:662)</code>

<code>"low memory detector" daemon prio=10 tid=0x00007f4c60091800 nid=0x3095 runnable [0x0000000000000000]</code>

<code>"c2 compilerthread1" daemon prio=10 tid=0x00007f4c6008f000 nid=0x3094 waiting on condition [0x0000000000000000]</code>

<code>"c2 compilerthread0" daemon prio=10 tid=0x00007f4c6008c000 nid=0x3093 waiting on condition [0x0000000000000000]</code>

<code>"signal dispatcher" daemon prio=10 tid=0x00007f4c6008a000 nid=0x3092 runnable [0x0000000000000000]</code>

<code>"finalizer" daemon prio=10 tid=0x00007f4c6006e000 nid=0x3091 in object.wait() [0x00007f4c5c860000]</code>

<code>   </code><code>java.lang.thread.state: waiting (on object monitor)</code>

<code>30</code>

<code>    </code><code>at java.lang.object.wait(native method)</code>

<code>31</code>

<code>    </code><code>- waiting on &lt;0x00000000ec6b1300&gt; (a java.lang.ref.referencequeue$lock)</code>

<code>32</code>

<code>    </code><code>at java.lang.ref.referencequeue.remove(referencequeue.java:118)</code>

<code>33</code>

<code>    </code><code>- locked &lt;0x00000000ec6b1300&gt; (a java.lang.ref.referencequeue$lock)</code>

<code>34</code>

<code>    </code><code>at java.lang.ref.referencequeue.remove(referencequeue.java:134)</code>

<code>35</code>

<code>    </code><code>at java.lang.ref.finalizer$finalizerthread.run(finalizer.java:159)</code>

<code>36</code>

<code>37</code>

<code>"reference handler" daemon prio=10 tid=0x00007f4c6006c000 nid=0x3090 in object.wait() [0x00007f4c5c961000]</code>

<code>38</code>

<code>39</code>

<code>40</code>

<code>    </code><code>- waiting on &lt;0x00000000ec6b11d8&gt; (a java.lang.ref.reference$lock)</code>

<code>41</code>

<code>    </code><code>at java.lang.object.wait(object.java:485)</code>

<code>42</code>

<code>    </code><code>at java.lang.ref.reference$referencehandler.run(reference.java:116)</code>

<code>43</code>

<code>    </code><code>- locked &lt;0x00000000ec6b11d8&gt; (a java.lang.ref.reference$lock)</code>

<code>44</code>

<code>45</code>

<code>"vm thread" prio=10 tid=0x00007f4c60065800 nid=0x308f runnable</code>

<code>46</code>

<code>47</code>

<code>"gc task thread#0 (parallelgc)" prio=10 tid=0x00007f4c6001a800 nid=0x3087 runnable</code>

<code>48</code>

<code>49</code>

<code>"gc task thread#1 (parallelgc)" prio=10 tid=0x00007f4c6001c800 nid=0x3088 runnable</code>

<code>50</code>

<code>51</code>

<code>"gc task thread#2 (parallelgc)" prio=10 tid=0x00007f4c6001e800 nid=0x3089 runnable</code>

<code>52</code>

<code>53</code>

<code>"gc task thread#3 (parallelgc)" prio=10 tid=0x00007f4c60020000 nid=0x308a runnable</code>

<code>54</code>

<code>55</code>

<code>"vm periodic task thread" prio=10 tid=0x00007f4c6009c000 nid=0x3096 waiting on condition</code>

<code>56</code>

<code>57</code>

<code>jni global references: 882</code>

         可以看到一個java程式在運作時,背景建立了很多的線程,是以一個java程式,縱使隻有main,它也是多線程的,其中可以看到threadnamejob這個線程,也可以看到本地以吞吐量優先的parallelgc的線程,它的數量預設是和cpu相同的,其中有4個對新生代進行gc的線程。

Java并發程式設計【1.2時代】

       線程從執行runnalbe開始到結束。

        中斷是一種狀态,它使一個運作中的線程能夠感覺到其他線程對自身作出了中斷操作,也就是影響到了自己。線程工作檢查自身是否被中斷來作出響應的行為。而該狀态并沒有維護在thread中,是通過native方法獲得。

         可以通過目前線程對象的isinterrupted來判斷是否被中斷了。

<code>public</code> <code>class</code> <code>interrupted {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args)</code><code>throws</code> <code>exception {</code>

<code>        </code><code>interruptedjob ij =</code><code>new</code> <code>interruptedjob();</code>

<code>        </code><code>ij.setname(</code><code>"interruptedjobthread "</code><code>);</code>

<code>        </code><code>ij.start();</code>

<code>        </code><code>thread.sleep(</code><code>2000</code><code>);</code>

<code>        </code><code>// 中斷</code>

<code>        </code><code>ij.interrupt();</code>

<code>        </code><code>system.out.println(</code><code>"interrupted ij"</code><code>);</code>

<code>    </code><code>static</code> <code>class</code> <code>interruptedjob</code><code>extends</code> <code>thread {</code>

<code>                </code><code>while</code> <code>(</code><code>true</code><code>) {</code>

<code>                    </code><code>thread.sleep(</code><code>1000</code><code>);</code>

<code>                </code><code>}</code>

<code>                </code><code>system.out.println(</code><code>"current interrupt status is "</code>

<code>                        </code><code>+ thread.currentthread().getname()</code>

<code>                        </code><code>+ thread.currentthread().isinterrupted());</code>

<code>                </code><code>// 再次進行中斷</code>

<code>                </code><code>thread.currentthread().interrupt();</code>

上述程式輸出:

interrupted ij

current interrupt status is interruptedjobthread false

current interrupt status is interruptedjobthread true

可以看出一旦抛出interruptedexception,目前線程的中斷狀态就被清除,但是也可以調用thread.interrupted()來清除目前的中斷狀态。

Java并發程式設計【1.2時代】

        java中建立的線程均會映射為作業系統層面的線程,在java線程對象中有部分屬性可以提供通路。線程狀态是了解線程運作的關鍵。

<code>public</code>

<code>class</code> <code>thread</code><code>implements</code> <code>runnable {</code>

<code>    </code><code>/* make sure registernatives is the first thing &lt;clinit&gt; does. */</code>

<code>    </code><code>private</code> <code>static</code> <code>native</code> <code>void</code> <code>registernatives();</code>

<code>    </code><code>static</code> <code>{</code>

<code>        </code><code>registernatives();</code>

<code>    </code><code>private</code> <code>char</code>    <code>name[];</code>

<code>    </code><code>private</code> <code>int</code>         <code>priority;</code>

         可以看到priority,這個代表着優先級,優先級的範圍從1到10,優先級高的線程占有cpu時間長一些,這當然是在長時間運作時展現出來的,但是不能做為程式執行的依據。

         對priority可以通過對線程對象進行設定,使用setpriority來完成對線程優先級的設定。

下面的例子中,建構了三個不同的線程,它們的優先級不一樣,從1到10,然後運作,優先級高的線程對times++執行的會多一些。

<code>public</code> <code>class</code> <code>priority {</code>

<code>    </code><code>private</code> <code>static</code> <code>countdownlatch countdownlatch =</code><code>new</code> <code>countdownlatch(</code><code>10000000</code><code>);</code>

<code>    </code><code>private</code> <code>static</code> <code>countdownlatch start =</code><code>new</code> <code>countdownlatch(</code><code>1</code><code>);</code>

<code>        </code><code>countjob job1 =</code><code>new</code> <code>countjob();</code>

<code>        </code><code>thread lingdao =</code><code>new</code> <code>thread(job1);</code>

<code>        </code><code>lingdao.setpriority(</code><code>10</code><code>);</code>

<code>        </code><code>lingdao.start();</code>

<code>        </code><code>countjob job2 =</code><code>new</code> <code>countjob();</code>

<code>        </code><code>thread pming =</code><code>new</code> <code>thread(job2);</code>

<code>        </code><code>pming.setpriority(</code><code>1</code><code>);</code>

<code>        </code><code>pming.start();</code>

<code>        </code><code>countjob job3 =</code><code>new</code> <code>countjob();</code>

<code>        </code><code>thread zhongchan =</code><code>new</code> <code>thread(job3);</code>

<code>        </code><code>zhongchan.setpriority(</code><code>5</code><code>);</code>

<code>        </code><code>zhongchan.start();</code>

<code>        </code><code>start.countdown();</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>countdownlatch.await();</code>

<code>        </code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>            </code><code>e.printstacktrace();</code>

<code>        </code><code>system.out.println(</code><code>"lingdao : have "</code> <code>+ job1.gettimes());</code>

<code>        </code><code>system.out.println(</code><code>"pming : have"</code> <code>+ job2.gettimes());</code>

<code>        </code><code>system.out.println(</code><code>"zhongchan : have"</code> <code>+ job3.gettimes());</code>

<code>    </code><code>static</code> <code>class</code> <code>countjob</code><code>implements</code> <code>runnable {</code>

<code>        </code><code>private</code> <code>int</code> <code>times =</code><code>0</code><code>;</code>

<code>            </code><code>// 等待開始</code>

<code>                </code><code>start.await();</code>

<code>            </code><code>while</code> <code>(countdownlatch.getcount() &gt;</code><code>0</code><code>) {</code>

<code>                </code><code>synchronized</code> <code>(countjob.</code><code>class</code><code>) {</code>

<code>                    </code><code>if</code> <code>(countdownlatch.getcount() &gt;</code><code>0</code><code>) {</code>

<code>                        </code><code>countdownlatch.countdown();</code>

<code>                        </code><code>times++;</code>

<code>58</code>

<code>                    </code><code>}</code>

<code>59</code>

<code>60</code>

<code>61</code>

<code>62</code>

<code>63</code>

<code>        </code><code>public</code> <code>int</code> <code>gettimes() {</code>

<code>64</code>

<code>            </code><code>return</code> <code>times;</code>

<code>65</code>

<code>66</code>

<code>67</code>

      執行結果如下:

lingdao : have 4347635

pming : have2661562

zhongchan : have2990803

       每次執行的可能都不一樣,但是總的趨勢是高優先級的線程對cpu的占用時間會多一些。

        線程在運作的生命周期中可能處于下面的6種不同的狀态,在一個時刻,線程可能處于cpu上處于運作,或者暫時的沒有配置設定到cpu資源而處于就緒(準備運作),或者處于阻塞的狀态。具體内容如下面的表格所示:

<b>狀态名稱</b>

<b>阻塞</b>

<b>可以中斷</b>

<b>說明</b>

運作中

n

正在cpu上進行執行

準備運作(就緒)

暫時的失去cpu資源處于就緒隊列中,可能随時被線程排程器排程執行

休眠

y

讓出cpu資源的就緒隊列,等待一段時間後再次被放入隊列,可以被中斷提前進入就緒隊列

等待

接受到通知或者等待逾時會進入到就緒隊列,可以被中斷

阻塞于i/o

i/o條件滿足後,例如讀入了一些字元,準備運作

阻塞于同步

當獲得同步鎖後準備運作

        可以使用如下狀态遷移來描述線程的狀态:

Java并發程式設計【1.2時代】

        線程在一個時刻将會處于上述的三種狀态之一,這個模型将有效的了解java線程對象,但是其中處于等待狀态的線程可能會在等待i/o和等待同步時無法被中斷,雖然運作的線程已經被中斷辨別,但是不會像休眠和等待一樣通過interruptedexception來直接傳回。

<code> </code><code>* &lt;pre&gt;</code>

<code> </code><code>* 處于同步讀取的線程被中斷,不會抛出異常</code>

<code> </code><code>* &lt;/pre&gt;</code>

<code>public</code> <code>class</code> <code>readinterrupted {</code>

<code>        </code><code>// 使用父線程,也就是main-thread</code>

<code>        </code><code>thread thread =</code><code>new</code> <code>thread(</code><code>new</code> <code>interruptedjob(thread.currentthread()));</code>

<code>        </code><code>thread.start();</code>

<code>        </code><code>inputstream is = system.in;</code>

<code>            </code><code>is.read();</code>

<code>        </code><code>}</code><code>catch</code> <code>(ioexception e) {</code>

<code>        </code><code>system.out.println(</code><code>"main thread is interrupted ? "</code> <code>+ thread.currentthread().isinterrupted());</code>

<code>    </code><code>static</code> <code>class</code> <code>interruptedjob</code><code>implements</code> <code>runnable {</code>

<code>        </code><code>thread interruptedthread;</code>

<code>        </code><code>public</code> <code>interruptedjob(thread thread) {</code>

<code>            </code><code>this</code><code>.interruptedthread = thread;</code>

<code>                </code><code>thread.sleep(</code><code>2000</code><code>);</code>

<code>            </code><code>interruptedthread.interrupt();</code>

       運作的結果是:

      這時整個線程挂在is.read上,這時随意從控制台輸入一個字元,主線程退出:

     123

     main thread is interrupted ? true

       可以看出對阻塞于同步i/o的線程被中斷後,中斷辨別被打上,但是不會抛出異常退出。

        對高i/o的線程盡量給予高優先級的設定,對于低i/o以cpu運算為主的線程盡量降低優先級,避免過多的占用cpu。是以,不能依據線程優先級的高低來運作程式,需要保證每個線程都有運作的機會。

Java并發程式設計【1.2時代】

      java支援多個線程同時的通路一個對象,或者對象的變量,由于每個線程可以擁有這個變量的拷貝(這麼做的目的是能夠快速的執行,雖然變量配置設定的記憶體在共享記憶體中,但是每個執行的線程還是可以擁有一份拷貝,這樣做的目的是加速程式的執行,這是現代多核處理器的一個顯著特性)。是以,程式在執行過程中,可能一個線程看到的變量并不一定是最新的。

     volatile關鍵字,就是告知任何對該變量的通路均需要從共享記憶體中擷取,而對它的改變必須同步重新整理會共享記憶體。

       比如,表示一個程式是否運作的變量,boolean on = true,那麼可能是另一個線程來對它進行關閉動作,是以将其設定成為volatile boolean on,這樣就會再其他線程對它進行改變時,能夠讓原有的線程立刻感覺到。

       但是過多的使用volatile是不必要的,相反它會降低程式執行的效率。

        同步,在帶來可見性的同時,它主要是對多個線程在同一個時刻,隻能有一個處于方法或者塊中。

        可以通過将synchronized關鍵字加在方法前面或者采用同步快的方式來進行表現:

<code>static</code> <code>synchronized</code> <code>void</code> <code>m() {</code>

<code>        </code><code>system.out.println(</code><code>"t"</code><code>);</code>

<code>        </code><code>m();</code>

<code>        </code><code>synchronized</code><code>(synchronized.</code><code>class</code><code>) {</code>

<code>            </code><code>m();</code>

}

        java同步是針對普通的java對象而言的,每個java對象均有一把“鎖”,這個鎖在一個線程進入時會排斥其他線程進入,是一個排他鎖。通過javap來觀察位元組碼,可以看到:

<code>public static void main(java.lang.string[]);</code>

<code>  </code><code>code:</code>

<code>   </code><code>stack=2, locals=2, args_size=1</code>

<code>   </code><code>0:   invokestatic    #31; //method m:()v</code>

<code>   </code><code>3:   ldc #1; //class com/murdock/books/multithread/example/synchronized</code>

<code>   </code><code>5:   dup</code>

<code>   </code><code>6:   astore_1</code>

<code>   </code><code>7:   monitorenter</code>

<code>   </code><code>8:   invokestatic    #31; //method m:()v</code>

<code>   </code><code>11:  aload_1</code>

<code>   </code><code>12:  monitorexit</code>

<code>   </code><code>13:  goto    19</code>

<code>   </code><code>16:  aload_1</code>

<code>   </code><code>17:  monitorexit</code>

<code>   </code><code>18:  athrow</code>

<code>   </code><code>19:  return</code>

          當出現指令monitorenter時代獲得了該對象的鎖,當運作指令monitorexit時代表釋放了該對象的鎖。

        在java的集合api中有非常多的同步集合,比如:vector和hashtable,這些集合的所有方法都是synchronized,也就是說對這些集合的通路是同步的,但是如果每個接口都有一個專屬的同步集合實作是非常不現實的,是以用過使用collections.synchronizedxxx方法,可以包裝一個同步的集合對象進行使用。

        比如,摘自collections

<code>1</code>

<code>public</code> <code>static</code> <code>&lt;t&gt; list&lt;t&gt; synchronizedlist(list&lt;t&gt; list) {</code>

<code>2</code>

<code>    </code><code>return</code> <code>(list</code><code>instanceof</code> <code>randomaccess ?</code>

<code>3</code>

<code>                </code><code>new</code> <code>synchronizedrandomaccesslist&lt;t&gt;(list) :</code>

<code>4</code>

<code>                </code><code>new</code> <code>synchronizedlist&lt;t&gt;(list));</code>

<code>5</code>

        該方法傳回的就是一個實作了list接口的同步資料結構,這個同步的資料結構每個方法均是同步的,但是如果需要對其進行額外的操作,需要将其加入到同步塊中。

<code>synchronizedcollection(collection&lt;e&gt; c) {</code>

<code>            </code><code>if</code> <code>(c==</code><code>null</code><code>)</code>

<code>                </code><code>throw</code> <code>new</code> <code>nullpointerexception();</code>

<code>        </code><code>this</code><code>.c = c;</code>

<code>            </code><code>mutex =</code><code>this</code><code>;</code>

<code>6</code>

<code>           </code><code>}</code>

        上面可以看到同步集合均是對自身進行同步。

<code>public</code> <code>class</code> <code>synchronized {</code>

<code>    </code><code>static</code> <code>synchronized</code> <code>void</code> <code>m() {</code>

<code>        </code><code>list&lt;string&gt; s =</code><code>new</code> <code>arraylist&lt;string&gt;();</code>

<code>        </code><code>s.add(</code><code>"1"</code><code>);</code>

<code>        </code><code>list&lt;string&gt; synchronizedlist = collections.synchronizedlist(s);</code>

<code>        </code><code>thread t =</code><code>new</code> <code>thread(</code><code>new</code> <code>accesssynchronizedcollections(</code>

<code>                </code><code>synchronizedlist));</code>

<code>        </code><code>synchronized</code> <code>(synchronizedlist) {</code>

<code>            </code><code>thread.sleep(</code><code>5000</code><code>);</code>

<code>            </code><code>system.out.println(</code><code>"main-thread"</code> <code>+ synchronizedlist.size());</code>

<code>     </code><code>* 這個線程将會首先休息2000ms,然後喚醒後去請求鎖,并執行操作</code>

<code>    </code><code>static</code> <code>class</code> <code>accesssynchronizedcollections</code><code>implements</code> <code>runnable {</code>

<code>        </code><code>list&lt;string&gt; list;</code>

<code>        </code><code>public</code> <code>accesssynchronizedcollections(list&lt;string&gt; list) {</code>

<code>            </code><code>this</code><code>.list = list;</code>

<code>            </code><code>system.out.println(</code><code>"accesssynchronizedcollections"</code> <code>+ list.size());</code>

<code>            </code><code>list.add(</code><code>"2"</code><code>);</code>

         上述執行的結果:

       main-thread1

       accesssynchronizedcollections1

         可以看到,在自定義對集合操作,比如缺少就添加,就需要将集合進行同步,然後在進行操作,否則很容易在判定過程中加入了其他線程對集合的操作。

         有時一個集合對象是程序内共享的,可能會發生一些變化,是以在作出一些操作的時候,希望能夠拿到一份瞬時的拷貝,這個拷貝可能和執行中的這一時刻的集合有了變化,但是能夠保證是穩定的。就像我們出門買了一份報紙,我們回家閱讀報紙的時候,上面的新聞可能随時會發生變化,但是這并不妨礙我們去閱讀它。

第一種複制的方式:

<code>list&lt;string&gt; synchronizedlist = collections.synchronizedlist(list);</code>

<code>        </code><code>long</code> <code>currenttime = system.currenttimemillis();</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>10000</code><code>; i++) {</code>

<code>            </code><code>string[] array = synchronizedlist.toarray(</code><code>new</code> <code>string[</code><code>0</code><code>]);</code>

<code>7</code>

<code>        </code><code>system.out.println(system.currenttimemillis() - currenttime);</code>

第二種複制的方式:

<code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>10000</code><code>; i++) {</code>

<code>            </code><code>synchronized</code> <code>(synchronizedlist) {</code>

<code>                </code><code>int</code> <code>size = synchronizedlist.size();</code>

<code>                </code><code>string[] array =</code><code>new</code> <code>string[size];</code>

<code>                </code><code>synchronizedlist.toarray(array);</code>

         第一種比較簡單,第二種對于new string[0]沒有做過多的浪費,但是時間測算,第二種沒有第一種好,因為主要比拼的是toarray的實作,在給定的數組大于等于清單時,将會使用給定的數組,否則将會通過反射構造一個數組,而這個還是很高效的。

         是以對于集合的數組複制,使用第一種方式是比較适合的。

          兩個線程或者多個線程在請求其永遠無法擷取資源的鎖時,就是死鎖狀态。這裡不示範死鎖産生的範例。

          避免死鎖的主要原則:

          首先,對于資源的加鎖時間必須足夠短,也就是必要時進行鎖;

          其次,通路資源過程中的鎖需要按照一緻的順序進行擷取,否則需要提升出一個更大的鎖來確定資源的擷取;

          最後,盡量通過封裝的形式,避免将鎖暴露給外部,進而造成不必要的資源死鎖。

Java并發程式設計【1.2時代】

         線程開始運作,就如同一個腳本一樣,有自己的棧空間,按照既定的代碼一步一步的執行,直到最後的終結。但是每個運作中的線程,如果僅僅是孤立的運作,那麼沒有一點用處,或者說用處很少,但是多個運作的線程能夠互相配合,各司其職将會帶來巨大的好處。

         一個運作的腳本(線程)修改了一個對象的值,另一個線程捕獲到這個對象的變化,然後進行對應的操作,這個過程事件的觸發啟于一個線程,而最終的執行又是一個線程。是以前者好比生産者,後者就是消費者,這樣的模式隔開了生産和消費,在功能上和架構上具有良好的伸縮性。但是在java語言中怎樣能夠做到上述的過程呢?

         當然,簡單的辦法是不斷的循環去檢視,比如:

while (value != desire) {

thread.sleep(1000);

doxxx

        這段僞碼就是相當與如果值不是這個消費線程所要的,那麼就睡眠一段時間,這樣的方式看似能夠解決這個問題,但是有兩個沖突的問題。

        第一個,在睡眠時,基本不消耗cpu,但是如果睡得久,那麼就不能及時的發現value已經變化,也就是及時性難以保證;

        第二個,如果降低睡眠的時間,比如睡1毫秒,這樣消費者能更加迅速的捕獲出變化,但是它卻占用了更多的cpu時間,造成了無端的浪費。

        面對這個沖突,java通過固有的wait/notify機制能夠很好的實作這個模式。

         等待通知機制,是指一個線程調用了對象a上的wait方法,而另外的一個線程在進行了某些操作後,在對象a上的notify或者notifyall方法,這樣完成了兩個線程之間的互動。而這個wait和notify之間的關系就像一個信号量一樣來完成二者之間的互動工作。

        一個标準的wait和notify的例子,這個例子有兩個線程,第一個等待共享的一個值為false,當為false時它進行print,另外一個在睡眠了一段時間後,将這個值由原有的true改為false并notify。

<code>public</code> <code>class</code> <code>waitnotify {</code>

<code>    </code><code>static</code> <code>boolean</code> <code>flag =</code><code>true</code><code>;</code>

<code>    </code><code>static</code> <code>object obj =</code><code>new</code> <code>object();</code>

<code>        </code><code>thread t1 =</code><code>new</code> <code>thread(</code><code>new</code> <code>waiter());</code>

<code>        </code><code>t1.start();</code>

<code>            </code><code>thread.sleep(</code><code>1000</code><code>);</code>

<code>        </code><code>thread t2 =</code><code>new</code> <code>thread(</code><code>new</code> <code>notifier());</code>

<code>        </code><code>t2.start();</code>

<code>     </code><code>* 等待,如果flag為false則列印</code>

<code>    </code><code>static</code> <code>class</code> <code>waiter</code><code>implements</code> <code>runnable {</code>

<code>            </code><code>// 加鎖,擁有obj的monitor</code>

<code>            </code><code>synchronized</code> <code>(obj) {</code>

<code>                </code><code>// 當條件不滿足時,繼續wait,同時釋放了obj的鎖</code>

<code>                </code><code>while</code> <code>(flag) {</code>

<code>                    </code><code>try</code> <code>{</code>

<code>                        </code><code>system.out.println(thread.currentthread()</code>

<code>                                </code><code>+</code><code>" still true. wait......"</code><code>);</code>

<code>                        </code><code>obj.wait();</code>

<code>                    </code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>                        </code><code>e.printstacktrace();</code>

<code>                </code><code>// 條件滿足時,完成工作</code>

<code>                </code><code>system.out</code>

<code>                        </code><code>.println(thread.currentthread() +</code><code>" is false. doxxx."</code><code>);</code>

<code>    </code><code>static</code> <code>class</code> <code>notifier</code><code>implements</code> <code>runnable {</code>

<code>                </code><code>// 擷取obj的鎖,然後進行通知,通知時不會釋放obj的鎖</code>

<code>                </code><code>// 這也類似于過早通知</code>

<code>                </code><code>obj.notifyall();</code>

<code>                </code><code>try</code> <code>{</code>

<code>                    </code><code>thread.sleep(</code><code>100</code><code>);</code>

<code>                </code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>                    </code><code>e.printstacktrace();</code>

<code>                </code><code>flag =</code><code>false</code><code>;</code>

      從上面的例子中能夠提煉出經典的等待和通知機制,對于等待的一方,遵循如下的原則:

(1)獲得對象的鎖;

(2)如果條件不滿足,那麼調用對象的wait,釋放鎖,被通知後繼續檢查(2)

(3)條件已經滿足,執行對應的邏輯。

synchronized(obj) {

while(condition not hold) {

obj.wait();

// condition hold

do xxx;

       通知的一方,遵循如下原則:

(2)更新變量或者條件,然後通知。

value = newvalue;

obj.notifyall();

等待和通知機制被深深植入了java語言中,在object方法中有5個final的方法,也就是子類不能複寫的方法。

<b>方法名稱</b>

<b>簡介</b>

notify()

随機通知調用notify對象上正在等待的線程,注意這個通知沒有放棄對對象的鎖,僅在通知notify完成之後直到釋放了對象的鎖才在對方線程的wait方法處傳回;

notifyall()

這個方法會依次通知所有的正在等待在該對象上的線程,是一種比較保險的做法;

wait()

該方法會讓調用線程進入休眠狀态,隻有等待另外線程的notify或者被中斷才會傳回,注意的是,調用wait後,會釋放對象的鎖;

wait(long)

等待,這裡的參數時間是毫秒,也就是等待長達n毫秒,如果沒有通知就逾時傳回,但是這裡很難區分出是其他線程的notify還是逾時傳回;

wait(long, int)

對于逾時更細粒度的控制,達到納秒,但是這個方法用的不多。

        這裡要說明notify方法不會釋放對象的鎖,而也隻有釋放了對象的鎖,另一個線程才能從wait中競争獲得對象的鎖并從wait方法中傳回。

<code>                                </code><code>+</code><code>" still true. wait......"</code> <code>+</code><code>new</code> <code>date());</code>

<code>                        </code><code>.println(thread.currentthread() +</code><code>" is false. doxxx."</code> <code>+</code><code>new</code> <code>date());</code>

<code>                </code><code>// 擷取obj的鎖,然後進行通知,不會在notify調用中,釋放obj的鎖</code>

<code>                </code><code>// 直到目前線程釋放了obj後,waiter才能從wait方法中傳回</code>

<code>                    </code><code>thread.sleep(</code><code>10000</code><code>);</code>

<code>68</code>

<code>69</code>

<code>70</code>

<code>71</code>

       程式的輸出:

thread[thread-0,5,main] still true. wait……sun jun 24 20:53:03 cst 2012

thread[thread-0,5,main] is false. doxxx.sun jun 24 20:53:14 cst 2012

        可以看到,二者之間相差了10秒,也就是thread.sleep(10000)這段代碼造成的,可以看出notifier沒有釋放obj的鎖,而waiter在對方沒有釋放前是不會傳回的。

      piped這個詞就是管道,相當于從一端入一端出的輸入輸出流。隻是不是從網絡和檔案上讀入内容,而是線上程之間傳遞資料,而傳輸的媒介為記憶體。

       管道主要包括了:

pipedoutputstream、pipedinputstream、pipedreader和pipedwriter四個,面向的處理内容為位元組和字元。

<code>public</code> <code>class</code> <code>pipedtest {</code>

<code>    </code><code>static</code> <code>class</code> <code>print</code><code>implements</code> <code>runnable {</code>

<code>        </code><code>private</code> <code>pipedinputstream in;</code>

<code>        </code><code>public</code> <code>print(pipedinputstream in) {</code>

<code>            </code><code>this</code><code>.in = in;</code>

<code>            </code><code>int</code> <code>receive =</code><code>0</code><code>;</code>

<code>                </code><code>while</code> <code>((receive = in.read()) != -</code><code>1</code><code>) {</code>

<code>                    </code><code>system.out.println(receive);</code>

<code>            </code><code>}</code><code>catch</code> <code>(ioexception ex) {</code>

<code>                </code><code>ex.printstacktrace();</code>

<code>        </code><code>pipedoutputstream out =</code><code>new</code> <code>pipedoutputstream();</code>

<code>        </code><code>pipedinputstream in =</code><code>new</code> <code>pipedinputstream();</code>

<code>        </code><code>// out ==&gt; in</code>

<code>        </code><code>out.connect(in);</code>

<code>        </code><code>thread t =</code><code>new</code> <code>thread(</code><code>new</code> <code>print(in));</code>

<code>        </code><code>int</code> <code>receive =</code><code>0</code><code>;</code>

<code>        </code><code>while</code> <code>((receive = system.in.read()) != -</code><code>1</code><code>) {</code>

<code>            </code><code>out.write(receive);</code>

        上述程式,以main線程作為輸入,而另外的print作為輸出。對于piped類型的流,必須要進行connect,如果沒有綁定,對于該流的通路會抛出異常。

       threadlocal線程變量,這是一個以threadlocal對象為key,一個object為value的存儲結構。它被附帶線上程上,也就是說一個線程可以根據一個threadlocal擁有一個變量。

       線上程對象中,有一個成員變量,類型如下:

<code>static</code> <code>class</code> <code>threadlocalmap {</code>

<code>        </code><code>/**</code>

<code>         </code><code>* the entries in this hash map extend weakreference, using</code>

<code>         </code><code>* its main ref field as the key (which is always a</code>

<code>         </code><code>* threadlocal object).  note that null keys (i.e. entry.get()</code>

<code>         </code><code>* == null) mean that the key is no longer referenced, so the</code>

<code>         </code><code>* entry can be expunged from table.  such entries are referred to</code>

<code>         </code><code>* as "stale entries" in the code that follows.</code>

<code>         </code><code>*/</code>

<code>        </code><code>static</code> <code>class</code> <code>entry</code><code>extends</code> <code>weakreference&lt;threadlocal&gt; {</code>

<code>            </code><code>/** the value associated with this threadlocal. */</code>

<code>            </code><code>object value;</code>

<code>            </code><code>entry(threadlocal k, object v) {</code>

<code>                </code><code>super</code><code>(k);</code>

<code>                </code><code>value = v;</code>

        可以看到線程對象中的這個threadlocalmap是以threadlocal作為key的。那麼對于一個threadlocal線上程對其調用get方法時,會擷取對應的object,下面是get方法。

<code>public</code> <code>t get() {</code>

<code>        </code><code>thread t = thread.currentthread();</code>

<code>        </code><code>threadlocalmap map = getmap(t);</code>

<code>        </code><code>if</code> <code>(map !=</code><code>null</code><code>) {</code>

<code>            </code><code>threadlocalmap.entry e = map.getentry(</code><code>this</code><code>);</code>

<code>            </code><code>if</code> <code>(e !=</code><code>null</code><code>)</code>

<code>                </code><code>return</code> <code>(t)e.value;</code>

<code>        </code><code>return</code> <code>setinitialvalue();</code>

       下面對這些代碼做些說明:

       首先調用方會獲得掉用線程thread t = thread.currentthread();

      其次會獲得線程對象的threadlocalmap對象;

      然後在threadlocalmap對象上,以this,也就是threadlocal為key去獲得對應的值;

      如果threadlocalmap這個對象為null,這裡做延遲加載,通過setinitialvalue()方法來初始化線程對象的threadlocalmap變量。

      可以看出隻有線程執行了任意threadlocal的get方法後,才會擁有threadlocalmap這個對象,而該變量又是包通路級别的,是以不會擔心被其他類修改。

Java并發程式設計【1.2時代】

             有時我們需要在調用一個方法時等待一段時間(一般來說是設定一個值,有更改),等待條件的滿足,而等待是有時限的,比如:1000ms,如果在1000ms後無法滿足條件那麼傳回,否則在時限内如果成功則立刻傳回。

        之前提到了基于wait的經典模式,即:同步,while,wait加doxxx的邏輯,那麼這種模式無法做到一點,就是能夠讓用戶端逾時傳回。

        如果加入逾時的話,對于經典模式的修改其實不會很複雜,假設逾時時間是t ms,那麼可以推知在now + t之後就會逾時,則定義:

remaining = t;

future = now + t;

          這時僅需要wait(remaining)即可,在醒來之後會将future – now,這個會設定到remaining上,但是如果remaining為負數,則直接退出。

<code>public</code> <code>synchronized</code> <code>object get(</code><code>long</code> <code>mills)</code><code>throws</code> <code>interruptedexception {</code>

<code>        </code><code>long</code> <code>future = system.currenttimemillis() + mills;</code>

<code>        </code><code>long</code> <code>remained = mills;</code>

<code>        </code><code>// 當結果為空并沒有逾時</code>

<code>        </code><code>while</code> <code>((result ==</code><code>null</code><code>) &amp;&amp; remained &gt;</code><code>0</code><code>) {</code>

<code>            </code><code>wait(remained);</code>

<code>            </code><code>remained = future - system.currenttimemillis();</code>

<code>        </code><code>return</code> <code>result;</code>

         在while的判斷中加入了remained &gt; 0的限制。這個模式就可以實作等待逾時,在mills毫秒内無法擷取到result或者result已經擷取到了,都會傳回。

        這裡我們模拟一個資料庫連結擷取的過程,這是一個消費者和生産者的案例。

         生産者每1000ms生産一個連結到池子中,每個消費者從池子中擷取一個連結,如果在800ms擷取不到,那麼就傳回,并告知擷取連結逾時。初始的池子裡有10個連結,消費者有5個,生産者有2個。

<code>public</code> <code>class</code> <code>connection {</code>

<code>    </code><code>public</code> <code>void</code> <code>sendstatement() {</code>

<code>            </code><code>thread.sleep(</code><code>10</code><code>);</code>

<code>            </code><code>system.out.println(thread.currentthread() +</code><code>" send statement"</code><code>);</code>

<code>            </code><code>thread.currentthread().interrupt();</code>

<code>public</code> <code>class</code> <code>connectionpool {</code>

<code>    </code><code>private</code> <code>linkedlist&lt;connection&gt; pool     =</code><code>new</code> <code>linkedlist&lt;connection&gt;();</code>

<code>    </code><code>private</code> <code>static</code> <code>final</code> <code>int</code>       <code>max_size =</code><code>20</code><code>;</code>

<code>    </code><code>public</code> <code>connectionpool(</code><code>int</code> <code>initialsize){</code>

<code>        </code><code>if</code> <code>(initialsize &gt;</code><code>0</code><code>) {</code>

<code>            </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt; initialsize; i++) {</code>

<code>                </code><code>pool.addlast(</code><code>new</code> <code>connection());</code>

<code>    </code><code>public</code> <code>void</code> <code>releaseconnection()</code><code>throws</code> <code>interruptedexception {</code>

<code>        </code><code>synchronized</code> <code>(pool) {</code>

<code>            </code><code>while</code> <code>(pool.size() &gt;= max_size) {</code>

<code>                </code><code>pool.wait();</code>

<code>            </code><code>// 添加後需要進行通知,這樣其他消費者能夠感覺到連結池中已經增加了一個連結</code>

<code>            </code><code>pool.addlast(</code><code>new</code> <code>connection());</code>

<code>            </code><code>pool.notifyall();</code>

<code>    </code><code>public</code> <code>connection fetchconnection(</code><code>long</code> <code>mills)</code><code>throws</code> <code>interruptedexception {</code>

<code>            </code><code>// 完全逾時</code>

<code>            </code><code>if</code> <code>(mills &lt;=</code><code>0</code><code>) {</code>

<code>                </code><code>while</code> <code>(pool.isempty()) {</code>

<code>                    </code><code>pool.wait();</code>

<code>                </code><code>return</code> <code>pool.removefirst();</code>

<code>            </code><code>}</code><code>else</code> <code>{</code>

<code>                </code><code>long</code> <code>futuretime = system.currenttimemillis() + mills;</code>

<code>                </code><code>long</code> <code>deltatime = mills;</code>

<code>                </code><code>while</code> <code>(pool.isempty() &amp;&amp; deltatime &gt;</code><code>0</code><code>) {</code>

<code>                    </code><code>pool.wait(deltatime);</code>

<code>                    </code><code>deltatime = futuretime - system.currenttimemillis();</code>

<code>                </code><code>connection result =</code><code>null</code><code>;</code>

<code>                </code><code>if</code> <code>(!pool.isempty()) {</code>

<code>                    </code><code>result = pool.removefirst();</code>

<code>                </code><code>return</code> <code>result;</code>

          這裡主要看一下fecthconnection,它提供了完全逾時的實作,主要是通過計算出将要逾時的時間點futuretime,和逾時的時間距離deltatime,在這個基礎上複用了僅點的同步、while和do的結構,隻不過是在while的不通過條件中增加了時間距離的消耗判斷,如果小于0直接傳回,當然面對過早通知,将會更新deltatime。

           當執行從pool.wait方法中傳回後,有可能是逾時,也有可能是已經滿足了池中有連接配接的狀況,是以如果有連接配接則直接傳回,否則傳回空。

<code>001</code>

<code>public</code> <code>class</code> <code>connectionpooltest {</code>

<code>002</code>

<code>003</code>

<code>    </code><code>static</code> <code>connectionpool pool  =</code><code>new</code> <code>connectionpool(</code><code>10</code><code>);</code>

<code>004</code>

<code>005</code>

<code>    </code><code>static</code> <code>countdownlatch latch =</code><code>new</code> <code>countdownlatch(</code><code>1</code><code>);</code>

<code>006</code>

<code>007</code>

<code>008</code>

<code>     </code><code>* &lt;pre&gt;</code>

<code>009</code>

<code>     </code><code>* thread[thread-5,5,main] put a connection.</code>

<code>010</code>

<code>     </code><code>* thread[thread-6,5,main] put a connection.</code>

<code>011</code>

<code>     </code><code>* thread[thread-4,5,main] got a connection</code>

<code>012</code>

<code>     </code><code>* thread[thread-3,5,main] got a connection</code>

<code>013</code>

<code>014</code>

<code>015</code>

<code>     </code><code>* thread[thread-1,5,main] got a connection</code>

<code>016</code>

<code>017</code>

<code>     </code><code>* &lt;/pre&gt;</code>

<code>018</code>

<code>     </code><code>*</code>

<code>019</code>

<code>020</code>

<code>021</code>

<code>022</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>5</code><code>; i++) {</code>

<code>023</code>

<code>            </code><code>consumer p =</code><code>new</code> <code>consumer(latch);</code>

<code>024</code>

<code>            </code><code>thread t =</code><code>new</code> <code>thread(p);</code>

<code>025</code>

<code>            </code><code>t.start();</code>

<code>026</code>

<code>027</code>

<code>028</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>2</code><code>; i++) {</code>

<code>029</code>

<code>            </code><code>producer p =</code><code>new</code> <code>producer(latch);</code>

<code>030</code>

<code>031</code>

<code>032</code>

<code>033</code>

<code>034</code>

<code>        </code><code>latch.countdown();</code>

<code>035</code>

<code>036</code>

<code>037</code>

<code>    </code><code>static</code> <code>class</code> <code>producer</code><code>implements</code> <code>runnable {</code>

<code>038</code>

<code>039</code>

<code>        </code><code>private</code> <code>countdownlatch latch;</code>

<code>040</code>

<code>041</code>

<code>        </code><code>public</code> <code>producer(countdownlatch latch){</code>

<code>042</code>

<code>            </code><code>this</code><code>.latch = latch;</code>

<code>043</code>

<code>044</code>

<code>045</code>

<code>046</code>

<code>047</code>

<code>                </code><code>latch.await();</code>

<code>048</code>

<code>049</code>

<code>050</code>

<code>051</code>

<code>            </code><code>while</code> <code>(</code><code>true</code><code>) {</code>

<code>052</code>

<code>053</code>

<code>054</code>

<code>055</code>

<code>056</code>

<code>057</code>

<code>058</code>

<code>059</code>

<code>                    </code><code>pool.releaseconnection();</code>

<code>060</code>

<code>061</code>

<code>062</code>

<code>063</code>

<code>064</code>

<code>                </code><code>system.out.println(thread.currentthread() +</code><code>" put a connection."</code><code>);</code>

<code>065</code>

<code>066</code>

<code>067</code>

<code>068</code>

<code>069</code>

<code>    </code><code>static</code> <code>class</code> <code>consumer</code><code>implements</code> <code>runnable {</code>

<code>070</code>

<code>071</code>

<code>072</code>

<code>073</code>

<code>        </code><code>public</code> <code>consumer(countdownlatch latch){</code>

<code>074</code>

<code>075</code>

<code>076</code>

<code>077</code>

<code>078</code>

<code>079</code>

<code>080</code>

<code>081</code>

<code>082</code>

<code>083</code>

<code>084</code>

<code>085</code>

<code>086</code>

<code>087</code>

<code>088</code>

<code>089</code>

<code>090</code>

<code>091</code>

<code>                    </code><code>connection connection = pool.fetchconnection(</code><code>0</code><code>);</code>

<code>092</code>

<code>093</code>

<code>                    </code><code>if</code> <code>(connection ==</code><code>null</code><code>) {</code>

<code>094</code>

<code>                        </code><code>system.out.println(thread.currentthread() +</code><code>" can not got a connection"</code><code>);</code>

<code>095</code>

<code>                    </code><code>}</code><code>else</code> <code>{</code>

<code>096</code>

<code>                        </code><code>system.out.println(thread.currentthread() +</code><code>" got a connection"</code><code>);</code>

<code>097</code>

<code>098</code>

<code>099</code>

<code>100</code>

<code>101</code>

<code>102</code>

<code>103</code>

<code>104</code>

<code>105</code>

這是一個執行了一段時間的結果:

<code>thread[thread-5,5,main] put a connection.</code>

<code>thread[thread-0,5,main] got a connection</code>

<code>thread[thread-6,5,main] put a connection.</code>

<code>thread[thread-4,5,main] got a connection</code>

          可以看到,因為生産者少,是以每次生産連接配接後,都被等待的消費者取走,而逾時是完全逾時,如果我們吧等待的時間長度調整到2000ms,就可以看到如下結果:

<code>thread[thread-2,5,main] got a connection</code>

<code>thread[thread-1,5,main] can not got a connection</code>

         有部分消費者,等待了2000ms沒有得到連接配接後,就傳回了,這裡就非常類似資料庫連結池的實作。

         阻塞隊列是對于資源擷取和釋放的一個良好資料結構,比如:作為資源的生産方,如果生産方生産的資料沒有位置存放,那麼生産方将會阻塞在生産的這個方法上,當然也可以選擇阻塞多少毫秒。消費方也是同樣的道理。

<code> </code><code>* @author weipeng 2012-7-24 下午4:34:22</code>

<code>public</code> <code>class</code> <code>blockingqueue&lt;e&gt; {</code>

<code>     </code><code>* 預設隊列長度</code>

<code>    </code><code>private</code> <code>static</code> <code>final</code> <code>int</code> <code>default_size =</code><code>10</code><code>;</code>

<code>     </code><code>* 隊列數組</code>

<code>    </code><code>private</code> <code>object[]         array;</code>

<code>     </code><code>* 目前的長度</code>

<code>    </code><code>private</code> <code>int</code>              <code>size;</code>

<code>     </code><code>* 将要放置的位置</code>

<code>    </code><code>private</code> <code>int</code>              <code>head;</code>

<code>     </code><code>* 将要移除的位置</code>

<code>    </code><code>private</code> <code>int</code>              <code>tail;</code>

<code>    </code><code>public</code> <code>blockingqueue(</code><code>int</code> <code>size){</code>

<code>        </code><code>array = size &gt;</code><code>0</code> <code>?</code><code>new</code> <code>object[size] :</code><code>new</code> <code>object[default_size];</code>

<code>    </code><code>public</code> <code>blockingqueue(){</code>

<code>        </code><code>this</code><code>(default_size);</code>

<code>    </code><code>public</code> <code>int</code> <code>getcapacity() {</code>

<code>        </code><code>return</code> <code>array.length;</code>

<code>     </code><code>* @return</code>

<code>    </code><code>public</code> <code>int</code> <code>getsize() {</code>

<code>        </code><code>synchronized</code> <code>(array) {</code>

<code>            </code><code>return</code> <code>size;</code>

<code>    </code><code>@suppresswarnings</code><code>(</code><code>"unchecked"</code><code>)</code>

<code>    </code><code>public</code> <code>e take(</code><code>long</code> <code>millis)</code><code>throws</code> <code>interruptedexception {</code>

<code>        </code><code>long</code> <code>waittime = millis &gt;</code><code>0</code> <code>? millis :</code><code>0</code><code>;</code>

<code>            </code><code>object result =</code><code>null</code><code>;</code>

<code>            </code><code>if</code> <code>(waittime ==</code><code>0</code><code>) {</code>

<code>                </code><code>while</code> <code>(size &lt;=</code><code>0</code><code>) {</code>

<code>                    </code><code>array.wait();</code>

<code>                </code><code>result = array[tail];</code>

<code>                </code><code>size--;</code>

<code>                </code><code>tail = (tail +</code><code>1</code><code>) % getcapacity();</code>

<code>                </code><code>long</code> <code>future = system.currenttimemillis() + waittime;</code>

<code>                </code><code>long</code> <code>remain = waittime;</code>

<code>                </code><code>while</code> <code>(size &lt;=</code><code>0</code> <code>&amp;&amp; remain &gt;</code><code>0</code><code>) {</code>

<code>                    </code><code>array.wait(remain);</code>

<code>                    </code><code>remain = future - system.currenttimemillis();</code>

<code>                </code><code>if</code> <code>(size &gt;</code><code>0</code><code>) {</code>

<code>                    </code><code>result = array[tail];</code>

<code>                    </code><code>size--;</code>

<code>                    </code><code>tail = (tail +</code><code>1</code><code>) % getcapacity();</code>

<code>            </code><code>array.notifyall();</code>

<code>            </code><code>return</code> <code>(e) result;</code>

<code>    </code><code>public</code> <code>e take()</code><code>throws</code> <code>interruptedexception {</code>

<code>        </code><code>return</code> <code>take(</code><code>0</code><code>);</code>

<code>    </code><code>public</code> <code>boolean</code> <code>offer(e e,</code><code>long</code> <code>mills)</code><code>throws</code> <code>interruptedexception {</code>

<code>        </code><code>long</code> <code>waittime = mills &gt;</code><code>0</code> <code>? mills :</code><code>0</code><code>;</code>

<code>        </code><code>boolean</code> <code>result =</code><code>false</code><code>;</code>

<code>        </code><code>if</code> <code>(e !=</code><code>null</code><code>) {</code>

<code>            </code><code>synchronized</code> <code>(array) {</code>

<code>                </code><code>if</code> <code>(waittime &lt;=</code><code>0</code><code>) {</code>

<code>                    </code><code>while</code> <code>(size &gt;= getcapacity()) {</code>

<code>                        </code><code>array.wait();</code>

<code>                    </code><code>array[head] = e;</code>

<code>                    </code><code>size++;</code>

<code>                    </code><code>head = (head +</code><code>1</code><code>) % getcapacity();</code>

<code>                    </code><code>result =</code><code>true</code><code>;</code>

<code>                </code><code>}</code><code>else</code> <code>{</code>

<code>                    </code><code>long</code> <code>future = system.currenttimemillis() + waittime;</code>

<code>106</code>

<code>                    </code><code>long</code> <code>remain = waittime;</code>

<code>107</code>

<code>108</code>

<code>                    </code><code>while</code> <code>(size &gt;= getcapacity() &amp;&amp; remain &gt;</code><code>0</code><code>) {</code>

<code>109</code>

<code>                        </code><code>array.wait(remain);</code>

<code>110</code>

<code>                        </code><code>remain = future - system.currenttimemillis();</code>

<code>111</code>

<code>112</code>

<code>113</code>

<code>                    </code><code>if</code> <code>(size &lt; getcapacity()) {</code>

<code>114</code>

<code>                        </code><code>array[head] = e;</code>

<code>115</code>

<code>                        </code><code>size++;</code>

<code>116</code>

<code>                        </code><code>head = (head +</code><code>1</code><code>) % getcapacity();</code>

<code>117</code>

<code>118</code>

<code>                        </code><code>result =</code><code>true</code><code>;</code>

<code>119</code>

<code>120</code>

<code>121</code>

<code>122</code>

<code>                </code><code>array.notifyall();</code>

<code>123</code>

<code>124</code>

<code>125</code>

<code>126</code>

<code>127</code>

<code>128</code>

<code>129</code>

<code>    </code><code>public</code> <code>boolean</code> <code>offer(e e)</code><code>throws</code> <code>interruptedexception {</code>

<code>130</code>

<code>        </code><code>return</code> <code>offer(e,</code><code>0</code><code>);</code>

<code>131</code>

<code>132</code>

<code>133</code>

<code>    </code><code>public</code> <code>void</code> <code>printqueue() {</code>

<code>134</code>

<code>135</code>

<code>            </code><code>system.out.println(</code><code>"======================"</code><code>);</code>

<code>136</code>

<code>            </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt; size; i++) {</code>

<code>137</code>

<code>                </code><code>system.out.println(</code><code>"["</code> <code>+ i +</code><code>"]"</code> <code>+ array[i]);</code>

<code>138</code>

<code>139</code>

<code>            </code><code>system.out.println(</code><code>"[head]"</code> <code>+ head);</code>

<code>140</code>

<code>            </code><code>system.out.println(</code><code>"[tail] "</code> <code>+ tail);</code>

<code>141</code>

<code>            </code><code>system.out.println(</code><code>"[size]"</code> <code>+ size);</code>

<code>142</code>

<code>143</code>

<code>144</code>

<code>145</code>

        其中 head是插入的位置,tail是移除的位置。下面是測試用例:

<code>@test</code>

<code>    </code><code>public</code> <code>void</code> <code>offer()</code><code>throws</code> <code>interruptedexception {</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>10</code><code>; i++) {</code>

<code>            </code><code>queue.offer(</code><code>new</code> <code>object());</code>

<code>        </code><code>queue.printqueue();</code>

<code>        </code><code>system.out.println(queue.offer(</code><code>new</code> <code>object(),</code><code>1000</code><code>));</code>

輸出結果:

<code>======================</code>

<code>[0]java.lang.object@78ce5b1c</code>

<code>[1]java.lang.object@33bfc93a</code>

<code>[2]java.lang.object@74341960</code>

<code>[3]java.lang.object@86e293a</code>

<code>[4]java.lang.object@7854a328</code>

<code>[5]java.lang.object@7ca3d4cf</code>

<code>[6]java.lang.object@67e8a1f6</code>

<code>[7]java.lang.object@59e152c5</code>

<code>[8]java.lang.object@5801319c</code>

<code>[9]java.lang.object@366025e7</code>

<code>[head]0</code>

<code>[tail] 0</code>

<code>[size]10</code>

<code>false</code>

         可以看到第11次添加被阻塞了,在1秒内沒有添加成功,那麼直接傳回false。

<code>    </code><code>public</code> <code>void</code> <code>take()</code><code>throws</code> <code>interruptedexception {</code>

<code>        </code><code>thread t =</code><code>new</code> <code>thread() {</code>

<code>            </code><code>thread thread;</code>

<code>            </code><code>{</code>

<code>                </code><code>thread = thread.currentthread();</code>

<code>            </code><code>@override</code>

<code>            </code><code>public</code> <code>void</code> <code>run() {</code>

<code>                    </code><code>thread.sleep(</code><code>500</code><code>);</code>

<code>                </code><code>thread.interrupt();</code>

<code>        </code><code>};</code>

<code>        </code><code>system.out.println(queue.take(</code><code>2000</code><code>));</code>

      結果是在2秒内,還沒有擷取到,主線程被中斷,而take能夠感覺到中斷,就提前傳回了。

<code>    </code><code>public</code> <code>void</code> <code>interactive()</code><code>throws</code> <code>exception {</code>

<code>        </code><code>final</code> <code>atomiclong offer =</code><code>new</code> <code>atomiclong();</code>

<code>        </code><code>final</code> <code>atomiclong take =</code><code>new</code> <code>atomiclong();</code>

<code>        </code><code>final</code> <code>atomiclong nottake =</code><code>new</code> <code>atomiclong();</code>

<code>                        </code><code>queue.offer(</code><code>new</code> <code>object());</code>

<code>                        </code><code>offer.incrementandget();</code>

<code>        </code><code>thread t1 =</code><code>new</code> <code>thread() {</code>

<code>                        </code><code>if</code> <code>(queue.take(</code><code>1</code><code>) ==</code><code>null</code><code>) {</code>

<code>                            </code><code>nottake.incrementandget();</code>

<code>                        </code><code>}</code><code>else</code> <code>{</code>

<code>                            </code><code>take.incrementandget();</code>

<code>                        </code><code>}</code>

<code>        </code><code>thread t2 =</code><code>new</code> <code>thread() {</code>

<code>        </code><code>thread.sleep(</code><code>10000</code><code>);</code>

<code>        </code><code>t.interrupt();</code>

<code>        </code><code>t1.interrupt();</code>

<code>        </code><code>t2.interrupt();</code>

<code>        </code><code>system.out.println(offer.get());</code>

<code>        </code><code>system.out.println(take.get());</code>

<code>        </code><code>system.out.println(nottake.get());</code>

           運作了10秒鐘,1個生産方,2個消費方,每個消費者在1ms内沒有擷取到的時候,就會将nottake加1。

           結果輸出:

<code>java.lang.interruptedexception</code>

<code>    </code><code>at com.murdock.controller.blockingqueue.take(blockingqueue.java:</code><code>74</code><code>)</code>

<code>    </code><code>at com.murdock.controller.blockingqueuetest$</code><code>3</code><code>.run(blockingqueuetest.java:</code><code>81</code><code>)</code>

<code>    </code><code>at com.murdock.controller.blockingqueuetest$</code><code>4</code><code>.run(blockingqueuetest.java:</code><code>99</code><code>)</code>

<code>    </code><code>at java.lang.object.wait(object.java:</code><code>485</code><code>)</code>

<code>    </code><code>at com.murdock.controller.blockingqueue.offer(blockingqueue.java:</code><code>103</code><code>)</code>

<code>    </code><code>at com.murdock.controller.blockingqueue.offer(blockingqueue.java:</code><code>137</code><code>)</code>

<code>    </code><code>at com.murdock.controller.blockingqueuetest$</code><code>2</code><code>.run(blockingqueuetest.java:</code><code>65</code><code>)</code>

8828338

6283

======================

[head]8

[tail] 8

[size]0

          可以看到有6283次沒有擷取到,生産了8828338次,消費了8828338次,一緻的,但是有6283次沒有擷取到資料,因為逾時傳回了。

Java并發程式設計【1.2時代】

       對于服務端的程式,經常處理的場景是:

       面對用戶端傳入的短小任務,快速的處理并傳回。

       如果每次接受到一個任務,建立一個線程,然後進行執行,這種模式在原型階段是個不錯的選擇,但是如果面對的是成千上萬的任務遞交進伺服器時,如果還是采用一個任務一個線程的方式,那麼将會建立數以萬記的線程,進而是作業系統進入到頻繁上下文切換的狀态,而如文中第一章所述,線程的建立和消亡是需要耗費系統資源的,這樣無疑是無法滿足要求的。

        而線程池技術能夠很好的解決這個問題,它預先的建立了若幹的線程,也就是說線程的建立是托管的,并不能由使用者直接完全控制,進而使用固定或較為固定數目的線程來完成任務的執行,一方面消除了頻繁建立和消亡線程的開銷,另一方面,随着任務的請求多少能夠平緩的進行響應。

        在最優的狀态下,系統面臨大量的請求和較小的請求時,總體線程數量水準波動不大,當請求的規模變大時,響應處于平緩的劣化。

<code>public</code> <code>interface</code> <code>threadpool&lt;job</code><code>extends</code> <code>runnable&gt; {</code>

<code>     </code><code>* 執行一個job,這個job需要實作runnable</code>

<code>     </code><code>* @param job</code>

<code>    </code><code>void</code> <code>execute(job job);</code>

<code>     </code><code>* 關閉線程池</code>

<code>    </code><code>void</code> <code>shutdown();</code>

<code>     </code><code>* 增加工作線程</code>

<code>     </code><code>* @param workernum</code>

<code>    </code><code>void</code> <code>addworkers(</code><code>int</code> <code>workernum);</code>

<code>     </code><code>* 減少工作線程</code>

<code>    </code><code>void</code> <code>removeworker(</code><code>int</code> <code>workernum);</code>

<code>     </code><code>* 得到jobs的清單</code>

<code>    </code><code>int</code> <code>getjobsize();</code>

       可以看到上面的接口可以完成一個runnable的執行,并且能夠将線程池中的工作線程進行增加和減少,同時可以支援優雅的關閉。

<code> </code><code>* 預設的線程池實作,可以新增工作線程也可以減少工作線程</code>

<code> </code><code>* 當然送出job後會進入隊列中,而worker進行消費</code>

<code> </code><code>* 這是一個簡單的生産和消費者模式</code>

<code>public</code> <code>class</code> <code>defaultthreadpool&lt;job</code><code>extends</code> <code>runnable&gt;</code><code>implements</code> <code>threadpool&lt;job&gt; {</code>

<code>     </code><code>* 線程池最大限制數</code>

<code>    </code><code>private</code> <code>static</code> <code>final</code> <code>int</code> <code>max_worker_numbers =</code><code>10</code><code>;</code>

<code>     </code><code>* 線程池預設的數量</code>

<code>    </code><code>private</code> <code>static</code> <code>final</code> <code>int</code> <code>default_worker_numbers =</code><code>5</code><code>;</code>

<code>     </code><code>* 線程池最小的數量</code>

<code>    </code><code>private</code> <code>static</code> <code>final</code> <code>int</code> <code>min_worker_numbers =</code><code>1</code><code>;</code>

<code>     </code><code>* 這是一個工作清單,将會向裡面插入工作</code>

<code>    </code><code>private</code> <code>final</code> <code>linkedlist&lt;job&gt; jobs =</code><code>new</code> <code>linkedlist&lt;job&gt;();</code>

<code>     </code><code>* 工作者清單</code>

<code>    </code><code>private</code> <code>final</code> <code>list&lt;worker&gt; workers = collections</code>

<code>            </code><code>.synchronizedlist(</code><code>new</code> <code>arraylist&lt;worker&gt;());</code>

<code>     </code><code>* 工作者線程的數量</code>

<code>    </code><code>private</code> <code>int</code> <code>workernum = default_worker_numbers;</code>

<code>    </code><code>public</code> <code>defaultthreadpool() {</code>

<code>        </code><code>initializewokers(default_worker_numbers);</code>

<code>    </code><code>public</code> <code>defaultthreadpool(</code><code>int</code> <code>num) {</code>

<code>        </code><code>workernum = num &gt; max_worker_numbers ? max_worker_numbers</code>

<code>                </code><code>: num &lt; min_worker_numbers ? min_worker_numbers : num;</code>

<code>        </code><code>initializewokers(workernum);</code>

<code>    </code><code>/*</code>

<code>     </code><code>* (non-javadoc)</code>

<code>     </code><code>* @see</code>

<code>     </code><code>* com.murdock.books.multithread.example.threadpool#execute(java.lang.runnable</code>

<code>     </code><code>* )</code>

<code>    </code><code>@override</code>

<code>    </code><code>public void execute(job job) {</code>

<code>        </code><code>if (job != null) {</code>

<code>            </code><code>// 添加一個工作,然後進行通知</code>

<code>            </code><code>synchronized (jobs) {</code>

<code>                </code><code>jobs.addlast(job);</code>

<code>                </code><code>jobs.notify();</code>

<code>     </code><code>* @see com.murdock.books.multithread.example.threadpool#shutdown()</code>

<code>    </code><code>public void shutdown() {</code>

<code>        </code><code>for (worker worker : workers) {</code>

<code>            </code><code>worker.shutdown();</code>

<code>    </code><code>public void addworkers(int workernum) {</code>

<code>        </code><code>int addednum = workernum;</code>

<code>        </code><code>if (workernum + this.workernum &gt; max_worker_numbers) {</code>

<code>            </code><code>addednum = max_worker_numbers - this.workernum;</code>

<code>        </code><code>synchronized (jobs) {</code>

<code>            </code><code>initializewokers(addednum);</code>

<code>            </code><code>this.workernum = this.workernum + addednum;</code>

<code>    </code><code>public void removeworker(int workernum) {</code>

<code>        </code><code>if (workernum &gt;= this.workernum) {</code>

<code>            </code><code>throw new illegalargumentexception(</code>

<code>                    </code><code>"can not remove beyond workernum. now num is "</code>

<code>                            </code><code>+ this.workernum);</code>

<code>            </code><code>int count = 0;</code>

<code>            </code><code>while (count &lt; workernum) {</code>

<code>                </code><code>workers.get(count).shutdown();</code>

<code>                </code><code>count++;</code>

<code>            </code><code>this.workernum = this.workernum - count;</code>

<code>    </code><code>public int getjobsize() {</code>

<code>        </code><code>return jobs.size();</code>

<code>     </code><code>* 初始化線程工作者</code>

<code>    </code><code>private void initializewokers(int num) {</code>

<code>        </code><code>for (int i = 0; i &lt; num; i++) {</code>

<code>            </code><code>worker worker = new worker();</code>

<code>            </code><code>workers.add(worker);</code>

<code>            </code><code>thread thread = new thread(worker);</code>

<code>            </code><code>thread.start();</code>

<code>     </code><code>* 工作者,負責消費任務</code>

<code>    </code><code>class worker implements runnable {</code>

<code>         </code><code>* 工作</code>

<code>        </code><code>private</code> <code>volatile</code> <code>boolean</code> <code>running =</code><code>true</code><code>;</code>

<code>146</code>

<code>            </code><code>while</code> <code>(running) {</code>

<code>147</code>

<code>148</code>

<code>                </code><code>job job =</code><code>null</code><code>;</code>

<code>149</code>

<code>                </code><code>synchronized</code> <code>(jobs) {</code>

<code>150</code>

<code>                    </code><code>// 如果工作者清單是空的,那麼就wait,放棄cpu執行占用</code>

<code>151</code>

<code>                    </code><code>while</code> <code>(jobs.isempty()) {</code>

<code>152</code>

<code>                        </code><code>try</code> <code>{</code>

<code>153</code>

<code>                            </code><code>jobs.wait();</code>

<code>154</code>

<code>                        </code><code>}</code><code>catch</code> <code>(interruptedexception ex) {</code>

<code>155</code>

<code>                            </code><code>thread.currentthread().interrupt();</code>

<code>156</code>

<code>                            </code><code>return</code><code>;</code>

<code>157</code>

<code>158</code>

<code>159</code>

<code>160</code>

<code>                    </code><code>// 取出一個job</code>

<code>161</code>

<code>                    </code><code>job = jobs.removefirst();</code>

<code>162</code>

<code>163</code>

<code>                </code><code>if</code> <code>(job !=</code><code>null</code><code>) {</code>

<code>164</code>

<code>165</code>

<code>                        </code><code>job.run();</code>

<code>166</code>

<code>                    </code><code>}</code><code>catch</code> <code>(exception ex) {</code>

<code>167</code>

<code>                        </code><code>ex.printstacktrace();</code>

<code>168</code>

<code>169</code>

<code>170</code>

<code>171</code>

<code>172</code>

<code>173</code>

<code>        </code><code>public</code> <code>void</code> <code>shutdown() {</code>

<code>174</code>

<code>            </code><code>running =</code><code>false</code><code>;</code>

<code>175</code>

<code>176</code>

<code>177</code>

<code>178</code>

          上面的邏輯中,用戶端調用execute時,會不斷的向jobs中添加工作,而每個worker在不斷将jobs取出并執行,當jobs為空時,worker進行阻塞狀态。

          這裡有一點需要注意,也就是execute時,使用了notify,而不是notifyall,因為我能夠确定有消費者worker被喚醒,這時使用notify将會比notifyall獲得更小的開銷,這在高性能的并發進行中是非常重要的。

<code>public</code> <code>void</code> <code>testexe() {</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>1000</code><code>; i++) {</code>

<code>            </code><code>threadpoolnoprint.execute(</code><code>new</code> <code>noprint());</code>

<code>        </code><code>sleep(</code><code>20</code><code>);</code>

<code>        </code><code>system.out.println(threadpoolnoprint.getjobsize());</code>

<code>        </code><code>sleep(</code><code>5000</code><code>);</code>

執行結果:

991

985

980

        可以看到送出後,每個20ms,檢視已經堆積的任務,發現在不斷的減少。

<code>    </code><code>public</code> <code>void</code> <code>addexe() {</code>

<code>        </code><code>system.out.println(</code><code>"============add worker============"</code><code>);</code>

<code>        </code><code>threadpoolnoprint.addworkers(</code><code>5</code><code>);</code>

990

============add worker============

967

955

           在起初的5個線程運作時,可以看到每隔一段時間,消耗了5個工作,而增加了線程(并發度增加)後,沒個間隔消耗量12個左右工作,提升了1倍多。

<code>    </code><code>public</code> <code>void</code> <code>reduceexe() {</code>

<code>        </code><code>system.out.println(</code><code>"==============reduce worker=============="</code><code>);</code>

<code>        </code><code>threadpoolnoprint.removeworker(</code><code>7</code><code>);</code>

965

==============reduce worker==============

952

949

         可以看到5個線程開始執行,然後增加到了10個,最後減少到了3個,執行的機關時間完成工作出現了先上揚再回落的過程。

<code>    </code><code>public</code> <code>void</code> <code>gracefulshutdown() {</code>

<code>            </code><code>threadpoolprint.execute(</code><code>new</code> <code>print());</code>

<code>        </code><code>sleep(</code><code>50</code><code>);</code>

<code>        </code><code>threadpoolprint.shutdown();</code>

<code>thread[thread-</code><code>1</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521118</code>

<code>thread[thread-</code><code>3</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521118</code>

<code>thread[thread-</code><code>0</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521118</code>

<code>thread[thread-</code><code>4</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521118</code>

<code>thread[thread-</code><code>2</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521118</code>

<code>thread[thread-</code><code>1</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521124</code>

<code>thread[thread-</code><code>4</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521124</code>

<code>thread[thread-</code><code>0</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521124</code>

<code>thread[thread-</code><code>3</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521124</code>

<code>thread[thread-</code><code>2</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521124</code>

<code>thread[thread-</code><code>1</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521129</code>

<code>thread[thread-</code><code>3</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521129</code>

<code>thread[thread-</code><code>0</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521129</code>

<code>thread[thread-</code><code>4</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521129</code>

<code>thread[thread-</code><code>2</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521129</code>

<code>thread[thread-</code><code>1</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521134</code>

<code>thread[thread-</code><code>3</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521134</code>

<code>thread[thread-</code><code>0</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521135</code>

<code>thread[thread-</code><code>4</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521135</code>

<code>thread[thread-</code><code>2</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521135</code>

<code>thread[thread-</code><code>1</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521140</code>

<code>thread[thread-</code><code>3</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521140</code>

<code>thread[thread-</code><code>0</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521140</code>

<code>thread[thread-</code><code>4</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521140</code>

<code>thread[thread-</code><code>2</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521140</code>

<code>thread[thread-</code><code>1</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521145</code>

<code>thread[thread-</code><code>3</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521145</code>

<code>thread[thread-</code><code>0</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521145</code>

<code>thread[thread-</code><code>4</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521145</code>

<code>thread[thread-</code><code>2</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521145</code>

<code>thread[thread-</code><code>1</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521150</code>

<code>thread[thread-</code><code>3</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521150</code>

<code>thread[thread-</code><code>0</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521150</code>

<code>thread[thread-</code><code>4</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521151</code>

<code>thread[thread-</code><code>2</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521151</code>

<code>thread[thread-</code><code>1</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521155</code>

<code>thread[thread-</code><code>3</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521156</code>

<code>thread[thread-</code><code>0</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521156</code>

<code>thread[thread-</code><code>4</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521156</code>

<code>thread[thread-</code><code>2</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521156</code>

<code>thread[thread-</code><code>1</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521161</code>

<code>thread[thread-</code><code>3</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521161</code>

<code>thread[thread-</code><code>0</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521161</code>

<code>thread[thread-</code><code>2</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521161</code>

<code>thread[thread-</code><code>4</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521161</code>

<code>thread[thread-</code><code>1</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521166</code>

<code>thread[thread-</code><code>3</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521166</code>

<code>thread[thread-</code><code>0</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521166</code>

<code>thread[thread-</code><code>4</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521167</code>

<code>thread[thread-</code><code>2</code><code>,</code><code>5</code><code>,main], time=</code><code>1347615521166</code>

          可以看到1000個工作,在50ms後消耗了上圖所示的工作,而非1000個全部,整個關閉過程沒有異常發生,俗稱“優雅關閉”。

          我們将一個http請求作為一個工作,送出到線程池中,然後由線程池的工作者來完成對請求的分析以及響應的回複,這樣做能夠極大的提升服務的效率,這也是傳統、經典的web伺服器運作方式。

<code>package</code> <code>com.murdock.books.multithread.example;</code>

<code>import</code> <code>java.io.bufferedreader;</code>

<code>import</code> <code>java.io.fileinputstream;</code>

<code>import</code> <code>java.io.inputstreamreader;</code>

<code>import</code> <code>java.io.printwriter;</code>

<code>import</code> <code>java.net.serversocket;</code>

<code>import</code> <code>java.net.socket;</code>

<code> </code><code>* 請求:</code>

<code> </code><code>* get /p/1845211588 http/1.1</code>

<code> </code><code>* 響應:</code>

<code> </code><code>* http/1.1 200 ok</code>

<code> </code><code>* date: fri, 14 sep 2012 11:39:26 gmt</code>

<code> </code><code>* content-type: text/html; charset=gbk</code>

<code> </code><code>* transfer-encoding: chunked</code>

<code> </code><code>* connection: keep-alive</code>

<code> </code><code>* vary: accept-encoding</code>

<code> </code><code>* tracecode: 23665957650539960842091419, 23665874971177305354091419</code>

<code> </code><code>* content-encoding: gzip</code>

<code> </code><code>* server: apache</code>

<code>public</code> <code>class</code> <code>httptextserver {</code>

<code>    </code><code>static</code> <code>threadpool&lt;texthandler&gt; threadpool =</code><code>new</code> <code>defaultthreadpool&lt;texthandler&gt;(</code>

<code>            </code><code>10</code><code>);</code>

<code>    </code><code>static</code> <code>string basepath =</code><code>"/home/weipeng/project/multithread"</code><code>;</code>

<code>        </code><code>serversocket ss =</code><code>new</code> <code>serversocket(</code><code>8080</code><code>);</code>

<code>        </code><code>socket socket =</code><code>null</code><code>;</code>

<code>        </code><code>while</code> <code>((socket = ss.accept()) !=</code><code>null</code><code>) {</code>

<code>            </code><code>threadpool.execute(</code><code>new</code> <code>texthandler(socket));</code>

<code>        </code><code>ss.close();</code>

<code>    </code><code>static</code> <code>class</code> <code>texthandler</code><code>implements</code> <code>runnable {</code>

<code>        </code><code>private</code> <code>socket socket;</code>

<code>        </code><code>public</code> <code>texthandler(socket socket) {</code>

<code>            </code><code>this</code><code>.socket = socket;</code>

<code>            </code><code>string line =</code><code>null</code><code>;</code>

<code>            </code><code>bufferedreader br =</code><code>null</code><code>;</code>

<code>            </code><code>bufferedreader reader =</code><code>null</code><code>;</code>

<code>            </code><code>printwriter out =</code><code>null</code><code>;</code>

<code>                </code><code>reader =</code><code>new</code> <code>bufferedreader(</code><code>new</code> <code>inputstreamreader(</code>

<code>                        </code><code>socket.getinputstream()));</code>

<code>                </code><code>string header = reader.readline();</code>

<code>                </code><code>string filepath = basepath + header.split(</code><code>" "</code><code>)[</code><code>1</code><code>];</code>

<code>                </code><code>br =</code><code>new</code> <code>bufferedreader(</code><code>new</code> <code>inputstreamreader(</code>

<code>                        </code><code>new</code> <code>fileinputstream(filepath)));</code>

<code>                </code><code>out =</code><code>new</code> <code>printwriter(socket.getoutputstream());</code>

<code>                </code><code>out.println(</code><code>"http/1.1 200 ok"</code><code>);</code>

<code>                </code><code>out.println(</code><code>"content-type: text/html; charset=utf-8"</code><code>);</code>

<code>                </code><code>out.println(</code><code>"server: simplemolly"</code><code>);</code>

<code>                </code><code>out.println(</code><code>""</code><code>);</code>

<code>                </code><code>while</code> <code>((line = br.readline()) !=</code><code>null</code><code>) {</code>

<code>                    </code><code>out.println(line);</code>

<code>                </code><code>out.println(</code><code>"current-thread ===&gt; "</code> <code>+ thread.currentthread());</code>

<code>                </code><code>out.flush();</code>

<code>            </code><code>}</code><code>catch</code> <code>(exception ex) {</code>

<code>            </code><code>}</code><code>finally</code> <code>{</code>

<code>                </code><code>if</code> <code>(br !=</code><code>null</code><code>) {</code>

<code>                        </code><code>br.close();</code>

<code>                    </code><code>}</code><code>finally</code> <code>{</code>

<code>                        </code><code>br =</code><code>null</code><code>;</code>

<code>                </code><code>if</code> <code>(reader !=</code><code>null</code><code>) {</code>

<code>                        </code><code>reader.close();</code>

<code>                        </code><code>reader =</code><code>null</code><code>;</code>

<code>                </code><code>if</code> <code>(out !=</code><code>null</code><code>) {</code>

<code>                        </code><code>out.close();</code>

<code>                        </code><code>out =</code><code>null</code><code>;</code>

<code>                </code><code>if</code> <code>(socket !=</code><code>null</code><code>) {</code>

<code>                        </code><code>socket.close();</code>

<code>                        </code><code>socket =</code><code>null</code><code>;</code>

        實作簡介:

(1)服務端監聽8080端口;

(2)當一個socket連結上來後,将其放置入線程池;

(3)線程池中的worker也就是texthandler從socket中擷取需要通路的資源;

(4)根據資源的路徑找到資源并讀取同時輸出到socket的輸出流;

(5)關閉輸出流和相關資源。

       通路效果:

第一次通路:

Java并發程式設計【1.2時代】

第二次通路:

Java并發程式設計【1.2時代】

          可以看到一個線程2提供的服務,一個是線程3的,證明是多個線程交替的提供服務。