天天看點

線程池

線程池(thread pool)對于限制應用程式中同一時刻運作的線程數很有用。因為每啟動一個新線程都會有相應的性能開銷,每個線程都需要給棧配置設定一些記憶體等等。

線程池經常應用在多線程伺服器上。每個通過網絡到達伺服器的連接配接都被包裝成一個任務并且傳遞給線程池。線程池的線程會并發的處理連接配接上的請求。以後會再深入有關 java 實作多線程伺服器的細節。

這裡有一個簡單的線程池實作:

<a href="http://ifeve.com/thread-pools/#viewsource">檢視源代碼</a>

<code>01</code>

<code>public</code> <code>class</code> <code>threadpool {</code>

<code>02</code>

<code>03</code>

<code>  </code><code>private</code> <code>blockingqueue taskqueue =</code><code>null</code><code>;</code>

<code>04</code>

<code>  </code><code>private</code> <code>list&lt;poolthread&gt; threads =</code><code>new</code> <code>arraylist&lt;poolthread&gt;();</code>

<code>05</code>

<code>  </code><code>private</code> <code>boolean</code> <code>isstopped =</code><code>false</code><code>;</code>

<code>06</code>

<code>07</code>

<code>  </code><code>public</code> <code>threadpool(</code><code>int</code> <code>noofthreads,</code><code>int</code> <code>maxnooftasks) {</code>

<code>08</code>

<code>    </code><code>taskqueue =</code><code>new</code> <code>blockingqueue(maxnooftasks);</code>

<code>09</code>

<code>10</code>

<code>    </code><code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;noofthreads; i++) {</code>

<code>11</code>

<code>      </code><code>threads.add(</code><code>new</code> <code>poolthread(taskqueue));</code>

<code>12</code>

<code>    </code><code>}</code>

<code>13</code>

<code>    </code><code>for</code> <code>(poolthread thread : threads) {</code>

<code>14</code>

<code>      </code><code>thread.start();</code>

<code>15</code>

<code>16</code>

<code>  </code><code>}</code>

<code>17</code>

<code>18</code>

<code>  </code><code>public</code> <code>void</code> <code>synchronized</code> <code>execute(runnable task) {</code>

<code>19</code>

<code>    </code><code>if</code><code>(</code><code>this</code><code>.isstopped)</code><code>throw</code>

<code>20</code>

<code>      </code><code>new</code> <code>illegalstateexception(</code><code>"threadpool is stopped"</code><code>);</code>

<code>21</code>

<code>22</code>

<code>    </code><code>this</code><code>.taskqueue.enqueue(task);</code>

<code>23</code>

<code>24</code>

<code>25</code>

<code>  </code><code>public</code> <code>synchronized</code> <code>boolean</code> <code>stop() {</code>

<code>26</code>

<code>    </code><code>this</code><code>.isstopped =</code><code>true</code><code>;</code>

<code>27</code>

<code>28</code>

<code>      </code><code>thread.stop();</code>

<code>29</code>

<code>30</code>

<code>31</code>

<code>32</code>

<code>}</code>

(校注:原文有編譯錯誤,我修改了下)

<code>public</code> <code>class</code> <code>poolthread</code><code>extends</code> <code>thread {</code>

<code>  </code><code>private</code> <code>blockingqueue&lt;runnable&gt; taskqueue =</code><code>null</code><code>;</code>

<code>  </code><code>private</code> <code>boolean</code>       <code>isstopped =</code><code>false</code><code>;</code>

<code>  </code><code>public</code> <code>poolthread(blockingqueue&lt;runnable&gt; queue) {</code>

<code>    </code><code>taskqueue = queue;</code>

<code>  </code><code>public</code> <code>void</code> <code>run() {</code>

<code>    </code><code>while</code> <code>(!isstopped()) {</code>

<code>      </code><code>try</code> <code>{</code>

<code>        </code><code>runnable runnable =taskqueue.take();</code>

<code>        </code><code>runnable.run();</code>

<code>      </code><code>}</code><code>catch</code><code>(exception e) {</code>

<code>        </code><code>// 寫日志或者報告異常,</code>

<code>        </code><code>// 但保持線程池運作.</code>

<code>      </code><code>}</code>

<code>  </code><code>public</code> <code>synchronized</code> <code>void</code> <code>tostop() {</code>

<code>    </code><code>isstopped =</code><code>true</code><code>;</code>

<code>    </code><code>this</code><code>.interrupt();</code><code>// 打斷池中線程的 dequeue() 調用.</code>

<code>  </code><code>public</code> <code>synchronized</code> <code>boolean</code> <code>isstopped() {</code>

<code>    </code><code>return</code> <code>isstopped;</code>

線程池的實作由兩部分組成。類 <code>threadpool</code> 是線程池的公開接口,而類 <code>poolthread</code> 用來實作執行任務的子線程。

一個空閑的 <code>poolthread</code> 線程會把 <code>runnable</code> 對象從隊列中取出并執行。你可以在 <code>poolthread.run()</code> 方法裡看到這些代碼。執行完畢後,<code>poolthread</code> 進入循環并且嘗試從隊列中再取出一個任務,直到線程終止。

調用 <code>threadpool.stop()</code> 方法可以停止 <code>threadpool</code>。在内部,調用 stop 先會标記 <code>isstopped</code> 成員變量(為 true)。然後,線程池的每一個子線程都調用 <code>poolthread.stop()</code> 方法停止運作。注意,如果線程池的 <code>execute()</code> 在 <code>stop()</code> 之後調用,<code>execute()</code> 方法會抛出 <code>illegalstateexception</code> 異常。

子線程會在完成目前執行的任務後停止。注意 <code>poolthread.stop()</code> 方法中調用了 <code>this.interrupt()</code>。它確定阻塞在 <code>taskqueue.dequeue()</code> 裡的 <code>wait()</code> 調用的線程能夠跳出 <code>wait() 調用(校對注:因為執行了中斷interrupt,它能夠打斷這個調用)</code>,并且抛出一個 <code>interruptedexception</code> 異常離開 <code>dequeue()</code> 方法。這個異常在 <code>poolthread.run()</code> 方法中被截獲、報告,然後再檢查 <code>isstopped</code> 變量。由于 <code>isstopped</code> 的值是 true, 是以 <code>poolthread.run()</code> 方法退出,子線程終止。

上一篇: 重入鎖死
下一篇: 避免死鎖