天天看點

《Java并發程式設計的藝術》第一章第1章并發程式設計的挑戰

并發程式設計的目的是為了讓程式運作的更快,但是并不是啟動更多的線程,就能讓程式最大限度的并發執行。在進行并發程式設計時,如果希望通過多線程執行任務讓程式運作的更快,會面臨非常多的挑戰,比如上下文切換的問題,死鎖的問題,以及受限于硬體和軟體的資源限制問題,本章會介紹幾種并發程式設計的挑戰,以及解決方案。

即使是單核處理器也支援多線程執行代碼,cpu通過給每個線程配置設定cpu時間片來實作這個機制。時間片是cpu配置設定給各個線程的時間,因為時間片非常短,是以cpu通過不停的切換線程執行,讓我們感覺多個線程是同時執行的,時間片一般是幾十毫秒(ms)。

cpu通過時間片配置設定算法來循環執行任務,目前任務執行一個時間片後會切換到下個任務,但是在切換前會儲存上一個任務的狀态,以便下次切換回這個任務時,可以再加載這個任務的狀态。是以任務的儲存到再加載的過程就是一次上下文切換。

就像我們同時在讀兩本書,比如當我們在讀一本英文的技術書時,發現某個單詞不認識,于是便打開中英文字典,但是在放下英文技術書之前,大腦必需首先記住這本書讀到了多少頁的第多少行,等查完單詞之後,能夠繼續讀這本書,這樣的切換是會影響讀書效率的,同樣上下文切換也會影響到多線程的執行速度。

下面的代碼示範串行和并發執行累加操作的時間,請思考下面的代碼并發執行一定比串行執行快些嗎?

<code>01</code>

<code>package</code> <code>chapter01;</code>

<code>02</code>

<code>03</code>

<code>/**</code>

<code>04</code>

<code> </code><code>* 并發和單線程執行測試</code>

<code>05</code>

<code> </code><code>* @author tengfei.fangtf</code>

<code>06</code>

<code> </code><code>* @version $id: concurrencytest.java, v 0.1 2014-7-18 下午10:03:31 tengfei.fangtf exp $</code>

<code>07</code>

<code> </code><code>*/</code>

<code>08</code>

<code>public</code> <code>class</code> <code>concurrencytest {</code>

<code>09</code>

<code>10</code>

<code>    </code><code>/** 執行次數 */</code>

<code>11</code>

<code>    </code><code>private</code> <code>static</code> <code>final</code> <code>long</code> <code>count = 10000l;</code>

<code>12</code>

<code>13</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) </code><code>throws</code> <code>interruptedexception {</code>

<code>14</code>

<code>        </code><code>//并發計算</code>

<code>15</code>

<code>        </code><code>concurrency();</code>

<code>16</code>

<code>        </code><code>//單線程計算</code>

<code>17</code>

<code>        </code><code>serial();</code>

<code>18</code>

<code>    </code><code>}</code>

<code>19</code>

<code>20</code>

<code>    </code><code>private</code> <code>static</code> <code>void</code> <code>concurrency() </code><code>throws</code> <code>interruptedexception {</code>

<code>21</code>

<code>        </code><code>long</code> <code>start = system.currenttimemillis();</code>

<code>22</code>

<code>        </code><code>thread thread = </code><code>new</code> <code>thread(</code><code>new</code> <code>runnable() {</code>

<code>23</code>

<code>            </code><code>@override</code>

<code>24</code>

<code>            </code><code>public</code> <code>void</code> <code>run() {</code>

<code>25</code>

<code>                </code><code>int</code> <code>a = </code><code>0</code><code>;</code>

<code>26</code>

<code>                </code><code>for</code> <code>(</code><code>long</code> <code>i = </code><code>0</code><code>; i &lt; count; i++) {</code>

<code>27</code>

<code>                    </code><code>a += </code><code>5</code><code>;</code>

<code>28</code>

<code>                </code><code>}</code>

<code>29</code>

<code>                </code><code>system.out.println(a);</code>

<code>30</code>

<code>            </code><code>}</code>

<code>31</code>

<code>        </code><code>});</code>

<code>32</code>

<code>        </code><code>thread.start();</code>

<code>33</code>

<code>        </code><code>int</code> <code>b = </code><code>0</code><code>;</code>

<code>34</code>

<code>        </code><code>for</code> <code>(</code><code>long</code> <code>i = </code><code>0</code><code>; i &lt; count; i++) {</code>

<code>35</code>

<code>            </code><code>b--;</code>

<code>36</code>

<code>        </code><code>}</code>

<code>37</code>

<code>        </code><code>long</code> <code>time = system.currenttimemillis() - start;</code>

<code>38</code>

<code>        </code><code>thread.join();</code>

<code>39</code>

<code>        </code><code>system.out.println(</code><code>"concurrency :"</code> <code>+ time + </code><code>"ms,b="</code> <code>+ b);</code>

<code>40</code>

<code>41</code>

<code>42</code>

<code>    </code><code>private</code> <code>static</code> <code>void</code> <code>serial() {</code>

<code>43</code>

<code>44</code>

<code>        </code><code>int</code> <code>a = </code><code>0</code><code>;</code>

<code>45</code>

<code>46</code>

<code>            </code><code>a += </code><code>5</code><code>;</code>

<code>47</code>

<code>48</code>

<code>49</code>

<code>50</code>

<code>51</code>

<code>52</code>

<code>53</code>

<code>        </code><code>system.out.println(</code><code>"serial:"</code> <code>+ time + </code><code>"ms,b="</code> <code>+ b + </code><code>",a="</code> <code>+ a);</code>

<code>54</code>

<code>55</code>

<code>56</code>

<code>}</code>

答案是不一定,測試結果如表1-1所示:

表1-1 測試結果

循環次數

串行執行耗時(機關ms)

并發執行耗時

并發比串行快多少

1億

130

77

約1倍

1千萬

18

9

1百萬

5

差不多

10萬

4

3

1萬

1

從表1-1可以發現當并發執行累加操作不超過百萬次時,速度會比串行執行累加操作要慢。那麼為什麼并發執行的速度還比串行慢呢?因為線程有建立和上下文切換的開銷。

下面我們來看看有什麼工具可以度量上下文切換帶來的消耗。

使用vmstat可以測量上下文切換的次數。

下面是利用vmstat測量上下文切換次數的示例。

<code>$ vmstat 1</code>

<code>procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----</code>

<code>r b   swpd   free   buff cache   si   so   bi   bo   in   cs us sy id wa st</code>

<code>0 0     0 127876 398928 2297092   0   0     0     4   2   2 0 0 99 0 0</code>

<code>0 0     0 127868 398928 2297092   0   0     0     0 595 1171 0 1 99 0 0</code>

<code>0 0     0 127868 398928 2297092   0   0     0     0 590 1180 1 0 100 0 0</code>

<code>0 0     0 127868 398928 2297092   0   0     0     0 567 1135 0 1 99 0 0</code>

cs(content switch)表示上下文切換的次數,從上面的測試結果中,我們可以看到其中上下文的每一秒鐘切換1000多次。

減少上下文切換的方法有無鎖并發程式設計、cas算法、單線程程式設計和使用協程。

無鎖并發程式設計。多線程競争鎖時,會引起上下文切換,是以多線程處理資料時,可以用一些辦法來避免使用鎖,如将資料用id進行hash算法後分段,不同的線程處理不同段的資料。

cas算法。java的atomic包使用cas算法來更新資料,而不需要加鎖。

使用最少線程。避免建立不需要的線程,比如任務很少,但是建立了很多線程來處理,這樣會造成大量線程都處于等待狀态。

協程:在單線程裡實作多任務的排程,并在單線程裡維持多個任務間的切換。

本節描述通過減少線上大量waiting的線程,來減少上下文切換次數。

第一步:用jstack指令 dump線程資訊,看看pid是3117程序裡的線程都在做什麼。

<code>1</code>

<code>sudo -u admin /opt/ifeve/java/bin/jstack 31177 &amp;gt; /home/tengfei.fangtf/dump17</code>

第二步:統計下所有線程分别處于什麼狀态,發現300多個線程處于waiting(onobjectmonitor)狀态。

<code>[tengfei.fangtf@ifeve ~]$ grep java.lang.thread.state dump17 | awk '{print $2$3$4$5}' | sort | uniq -c</code>

<code>2</code>

<code>39 runnable</code>

<code>3</code>

<code>21 timed_waiting(onobjectmonitor)</code>

<code>4</code>

<code>6 timed_waiting(parking)</code>

<code>5</code>

<code>51 timed_waiting(sleeping)</code>

<code>6</code>

<code>305 waiting(onobjectmonitor)</code>

<code>7</code>

<code>3 waiting(parking)</code>

第三步:打開dump檔案檢視處于waiting(onobjectmonitor)的線程在做什麼。發現這些線程基本全是jboss的工作線程在await。說明jboss線程池裡線程接收到的任務太少,大量線程都閑着。

<code>"http-0.0.0.0-7001-97" daemon prio=10 tid=0x000000004f6a8000 nid=0x555e in object.wait() [0x0000000052423000]</code>

<code> </code><code>java.lang.thread.state: waiting (on object monitor)</code>

<code> </code><code>at java.lang.object.wait(native method)</code>

<code> </code><code>- waiting on &lt;0x00000007969b2280&gt; (a org.apache.tomcat.util.net.aprendpoint$worker)</code>

<code> </code><code>at java.lang.object.wait(object.java:485)</code>

<code> </code><code>at org.apache.tomcat.util.net.aprendpoint$worker.await(aprendpoint.java:1464)</code>

<code> </code><code>- locked &lt;0x00000007969b2280&gt; (a org.apache.tomcat.util.net.aprendpoint$worker)</code>

<code>8</code>

<code> </code><code>at org.apache.tomcat.util.net.aprendpoint$worker.run(aprendpoint.java:1489)</code>

<code>9</code>

<code> </code><code>at java.lang.thread.run(thread.java:662)</code>

第四步:減少jboss的工作線程數,找到jboss的線程池配置資訊,将maxthreads降低到100。

<code>&lt;maxthreads="250" maxhttpheadersize="8192"</code>

<code>emptysessionpath="false" minsparethreads="40" maxsparethreads="75" maxpostsize="512000" protocol="http/1.1"</code>

<code>enablelookups="false" redirectport="8443" acceptcount="200" buffersize="16384"</code>

<code>connectiontimeout="15000" disableuploadtimeout="false" usebodyencodingforuri="true"&gt;</code>

第五步:重新開機jboss,再dump線程資訊,然後再統計waiting(onobjectmonitor)的線程,發現減少了175。waiting的線程少了,系統上下文切換的次數就會少,因為從waitting到runnable會進行一次上下文的切換。讀者也可以使用vmstat指令測試下。

<code>44 runnable</code>

<code>22 timed_waiting(onobjectmonitor)</code>

<code>9 timed_waiting(parking)</code>

<code>36 timed_waiting(sleeping)</code>

<code>130 waiting(onobjectmonitor)</code>

<code>1 waiting(parking)</code>

鎖是個非常有用的工具,運用場景非常多,因為其使用起來非常簡單,而且易于了解。但同時它也會帶來一些困擾,那就是可能會引起死鎖,一旦産生死鎖,會造成系統功能不可用。讓我們先來看一段代碼,這段代碼會引起死鎖,線程t1和t2互相等待對方釋放鎖。

<code> </code><code>* 死鎖例子</code>

<code> </code><code>*</code>

<code> </code><code>* @version $id: deadlockdemo.java, v 0.1 2015-7-18 下午10:08:28 tengfei.fangtf exp $</code>

<code>public</code> <code>class</code> <code>deadlockdemo {</code>

<code>    </code><code>/** a鎖 */</code>

<code>    </code><code>private</code> <code>static</code> <code>string a = </code><code>"a"</code><code>;</code>

<code>    </code><code>/** b鎖 */</code>

<code>    </code><code>private</code> <code>static</code> <code>string b = </code><code>"b"</code><code>;</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

<code>        </code><code>new</code> <code>deadlockdemo().deadlock();</code>

<code>    </code><code>private</code> <code>void</code> <code>deadlock() {</code>

<code>        </code><code>thread t1 = </code><code>new</code> <code>thread(</code><code>new</code> <code>runnable() {</code>

<code>                </code><code>synchronized</code> <code>(a) {</code>

<code>                    </code><code>try</code> <code>{</code>

<code>                        </code><code>thread.sleep(</code><code>2000</code><code>);</code>

<code>                    </code><code>} </code><code>catch</code> <code>(interruptedexception e) {</code>

<code>                        </code><code>e.printstacktrace();</code>

<code>                    </code><code>}</code>

<code>                    </code><code>synchronized</code> <code>(b) {</code>

<code>                        </code><code>system.out.println(</code><code>"1"</code><code>);</code>

<code>        </code><code>thread t2 = </code><code>new</code> <code>thread(</code><code>new</code> <code>runnable() {</code>

<code>                </code><code>synchronized</code> <code>(b) {</code>

<code>                    </code><code>synchronized</code> <code>(a) {</code>

<code>                        </code><code>system.out.println(</code><code>"2"</code><code>);</code>

<code>        </code><code>t1.start();</code>

<code>        </code><code>t2.start();</code>

這段代碼隻是示範死鎖的場景,在現實中你可能很難會寫出這樣的代碼。但是一些更為複雜的場景中你可能會遇到這樣的問題,比如t1拿到鎖之後,因為一些異常情況沒有釋放鎖,比如死循環。又或者是t1拿到一個資料庫鎖,釋放鎖的時候抛了異常,沒釋放掉。

一旦出現死鎖,業務是可感覺的,因為不能繼續提供服務了,那麼隻能通過dump線程看看到底是哪個線程出現了問題,以下線程資訊告訴我們是deadlockdemo類的42行和31号引起的死鎖:

<code>"thread-2"</code> <code>prio=</code><code>5</code> <code>tid=7fc0458d1000 nid=</code><code>0x116c1c000</code> <code>waiting </code><code>for</code> <code>monitor entry [116c1b000]</code>

<code>   </code><code>java.lang.thread.state: blocked (on object monitor)</code>

<code>        </code><code>at com.ifeve.book.forkjoin.deadlockdemo$</code><code>2</code><code>.run(deadlockdemo.java:</code><code>42</code><code>)</code>

<code>        </code><code>- waiting to lock &lt;7fb2f3ec0&gt; (a java.lang.string)</code>

<code>        </code><code>- locked &lt;7fb2f3ef8&gt; (a java.lang.string)</code>

<code>        </code><code>at java.lang.thread.run(thread.java:</code><code>695</code><code>)</code>

<code>"thread-1"</code> <code>prio=</code><code>5</code> <code>tid=7fc0430f6800 nid=</code><code>0x116b19000</code> <code>waiting </code><code>for</code> <code>monitor entry [116b18000]</code>

<code>        </code><code>at com.ifeve.book.forkjoin.deadlockdemo$</code><code>1</code><code>.run(deadlockdemo.java:</code><code>31</code><code>)</code>

<code>        </code><code>- waiting to lock &lt;7fb2f3ef8&gt; (a java.lang.string)</code>

<code>        </code><code>- locked &lt;7fb2f3ec0&gt; (a java.lang.string)</code>

現在我們介紹下如何避免死鎖的幾個常見方法。

避免一個線程同時擷取多個鎖。

避免一個線程在鎖内同時占用多個資源,盡量保證每個鎖隻占用一個資源。

嘗試使用定時鎖,使用trylock(timeout)來替代使用内部鎖機制。

對于資料庫鎖,加鎖和解鎖必須在一個資料庫連接配接裡,否則會出現解鎖失敗。

資源限制是指在進行并發程式設計時,程式的執行速度受限于計算機硬體資源或軟體資源的限制。比如伺服器的帶寬隻有2m,某個資源的下載下傳速度是1m每秒,系統啟動十個線程下載下傳資源,下載下傳速度不會變成10m每秒,是以在進行并發程式設計時,要考慮到這些資源的限制。硬體資源限制有帶寬的上傳下載下傳速度,硬碟讀寫速度和cpu的處理速度。軟體資源限制有資料庫的連接配接數和sorket連接配接數等。

并發程式設計将代碼執行速度加速的原則是将代碼中串行執行的部分變成并發執行,但是如果某段串行的代碼并發執行,但是因為受限于資源的限制,仍然在串行執行,這時候程式不僅不會執行加快,反而會更慢,因為增加了上下文切換和資源排程的時間。例如,之前看到一段程式使用多線程在辦公網并發的下載下傳和處理資料時,導緻cpu使用率100%,任務幾個小時都不能運作完成,後來修改成單線程,一個小時就執行完成了。

對于硬體資源限制,可以考慮使用叢集并行執行程式,既然單機的資源有限制,那麼就讓程式在多機上運作,比如使用odps,hadoop或者自己搭建伺服器叢集,不同的機器處理不同的資料,比如将資料id%機器數,得到一個機器編号,然後由對應編号的機器處理這筆資料。

對于軟體資源限制,可以考慮使用資源池将資源複用,比如使用連接配接池将資料庫和sorket連接配接複用,或者調用對方webservice接口擷取資料時,隻建立一個連接配接。

那麼如何在資源限制的情況下,讓程式執行的更快呢?根據不同的資源限制調整程式的并發度,比如下載下傳檔案程式依賴于兩個資源,帶寬和硬碟讀寫速度。有資料庫操作時,要資料庫連接配接數,如果sql語句執行非常快,而線程的數量比資料庫連接配接數大很多,則某些線程會被阻塞住,等待資料庫連接配接。

本章介紹了在進行并發程式設計的時候,大家可能會遇到的幾個挑戰,并給出了一些解決建議。有的并發程式寫的不嚴謹,在并發下如果出現問題,定位起來會比較耗時和棘手。是以對于java開發工程師,筆者強烈建議多使用jdk并發包提供的并發容器和工具類來幫你解決并發問題,因為這些類都已經通過了充分的測試和優化,解決了本章提到的幾個挑戰。