天天看點

《實戰Java高并發程式設計》讀後感

寫在前面無關的内容

        白駒過隙,看下月曆已經畢業4年多,加上在大學裡的4年,算算在計算機界也躺了八年,按照格拉德韋爾的1萬小時定律差不多我也該成為行業的專家了,然後并沒有。當看着“什麼是Java?”、“什麼是程式?”、“多線程是什麼?”、“怎麼建構一個合理的大型網站?”、“怎麼保證系統的穩定運作”這些耳熟能詳的問題時,就知道前方的路還有很遠很遠,這些問題也許我一直無法給出确切的回答,但至少希望給别人解釋多線程時,不會說“多線程就是很多線程一起跑”這種文字級别的說明。

初讀此書的感受

        每天晚上看一點,零零碎碎的花二十天左右差不多算是翻完了。總的來說講的是通俗易懂,用例和代碼選擇得當;但不管怎麼說,這是一本純理論的解釋書籍,雖說中間穿插了很多樣例,也都基本上是為了示範理論構造出來的,自然就少了些生動;這種純理論的介紹對我這種記憶不佳的人簡直就是成噸的傷害,以至于前天才看完,今天上面的很多細節就被忘的差不多了,甚至懷疑要不了多久我都是否記得看過這本書。

        言歸正傳,看書本來就不是為了記住書名和作者名,真正要記住的是作者想通過書來表達的東西,或者是看書者想通過看書得到的收獲,反正我是這麼認為的。當時買這本書看就是想系統性的了解Java高并發這個概念,什麼是Java高并發,Java高并發的原理、怎麼實作Java高并發,哪裡用到Java高并發,我們怎麼使用Java高并發等,關于這些核心問題書中也都或多或少的給出了一定的回答;可能是此書專注于理論解釋的緣故,對“Java高并發的原理”、“怎麼實作Java高并發”講的比較詳細,而其它的則講的粗略些。對于書上的理論、方法是不是最好的,我沒做考究,當然目前我的水準也不夠;純從感覺上講,書中的理論嚴謹,方法簡潔,至少我是還沒發現有什麼不妥,或者是需要改進的地方,目前權且當它是最好的吧,以後會是啥樣,那就是以後的事了。

一些個人的了解

什麼是Java高并發?       

        Java高并發就是,用Java語言在短時間内處理大量的業務。

Java高并發的原理?

        并行的設計模式和對多線程的良好支援,後面詳細講到,也是全文的重點。

 高并發的背景:

        計算機經過這麼多年的發展,如果目前的計算理論或者晶片的生産工藝沒有本質的突破,那麼單個CUP的計算能力可能已經走到盡頭了,而且目前我們還沒有看到突破的可能。CUP的性能是遇到了不可逾越的瓶頸,但我們的硬體工程師并沒有放棄對性能的提升,于是他們想出了一個十分“機智”的辦法,一個CPU的性能是固定的,那我多弄幾個CPU放一起組成一個多核CPU,性能不就提高了幾倍麼。

并行模式:        

        一個16核CPU,總的來看性能确實比單核CPU性能提高了16倍,這很顯而易見;但CPU性能提高16倍的前提是這CPU的16個核都同時勤奮的工作,然後目前計算機顯然還沒這麼智能,并不會把我們的給它的任務拆分到16個不同的CPU核心上執行,而是選一個CPU核心來執行完成任務;如果這樣來看,那CPU核心再多也沒用啊,幹活的就那幾個,隻是多了些吃瓜的群衆。為了更好的将這些吃瓜的群衆也派上用場,加快我們的任務執行速度,既然計算不會自動幫我們分拆任務,那我們先将任務分拆好,然後再給計算機運作,這樣總能接受了吧。

        通俗的說,就是在業務邏輯層将業務拆分成很多子業務,然後再将這些大量的子業務交給計算機處理,這樣計算機就可以用每個CPU核心分别去處理這些子業務,隻要拆分的合理,那麼就能将合成的多核CPU的性能發揮出來了。由于是CPU的每個核心都在同時運作,于是就把這種模式稱之為--并行模式。

Java的多線程:

        我們都知道線程是程式運作的最小機關,也就是說一個CPU核心在一個時刻隻能運作一個線程。我們把任務拆分成子任務,然後交給CPU運作,實質上就是把一個大的程式(可以看做是一個需要運作很久的線程),拆分成多個小的線程,交給作業系統,然後由作業系統排程配置設定CPU核心來分别運作這些線程,當這些線程都執行完成後,我們的程式也就運作完成了。而Java提供一整套對線程進行管理的方法,包括建立、啟動、運作、挂起、結束等,我們通過Java提供的方法,能夠較友善的實作我們對線程的拆分,以及協同運作的控制,對于這一整體解決方案,稱之為Java多線程。Java多線程是實作Java程式高并發的核心,比較複雜,下節詳細講解。

總結:

        這一節我們談到了三個重要概念,高并發、并行模式、多線程。下面總結下這三者的關系。

        高并發是我們要實作的目标,要在較短的時間内處理掉大量的業務;

        并行模式是實作我們高并發的一種方案,這個主要是基于現實情況考慮得來的。假如我們的CPU運作能力能提高幾個數量級,一個CPU在短時間内就能處理掉大量業務,那麼就沒有并行的必要了;主要是目前單個CPU的運作能力有限,隻能通過多個CPU同時運作,才能在短時間能完成大量業務,是以就有了并行;

        多線程是實作并行模式的軟體基礎(多核CPU是實作并行的實體基礎),因為線程是CPU核運作的最小機關,如果隻有一個線程,再多CPU也無法并行;

        由于我們是軟體開發,是以高并發問題也就具體化到了解決程式的多線程問題,而在這些前面加上限定詞Java,就是說我們是用Java這個工具來解決這些問題的。

怎麼實作Java高并發?

        高并發的原理我們已經明白,簡單的說就是把一個大任務拆分成很多小任務,然後完成這些小的任務就可以了。大任務:簡單的了解就是要做很多、或者是很複雜的事,這裡可以等同于我們的高并發;小任務:簡單的了解就是做一件簡單的事,這裡可以等同于我們的線程;這種等價是十分不嚴謹的,但這也算是對複雜概念的一種白話解釋,友善記憶和了解。

        聽這麼一說,感覺要實作高并發也很簡單啊,就把事情拆分下交給計算機就ok了,然而難就難在這拆分上。至于為啥很難拆分,首先讓我們了解下程式的執行流程:

        1讀取資料--》2緩存到記憶體--》3緩存到cache--》4CPU運算--》5寫回cahce--》6寫回記憶體--》7持久化資料,重複上述流程,直到完成程式完成。

        從執行流程我們不難看出,程式下一步的執行是依賴上一步的執行結果,比如2沒有執行完成,3是無法運作的。如果不對我任務進行拆分我們是很容易做到這一點的,因為整個軟體設計就是嚴格要求程式是有序的,為此還要求程式隻能有順序、選擇、循環這3種結構;但一旦對任務進行了拆分,那麼一切都變了。比如我把一個任務拆分成兩個子任務A和B,假如B的運算資料是A的運算結果,那麼B就需要A執行完成6後,才能開始3,否則就會出現錯誤。而我們不管對A、或者B的程式單獨做什麼樣的設計,都無法保證B能正确執行,要保證B的正确執行,我們就需要實作A、B兩個程式的交叉控制;我們都知道,一個程式的耦合的越低越好,那麼我們不難得出:在拆分的時候要盡可能的減小子任務間的耦合度。

        減少耦合是我們拆分時的第一目标,如果能消除耦合當然是最好的,但這是很難、幾乎不可能做到(或者是消除耦合的代價很大不能接受)。這個時候我們就不僅僅是對大任務的拆分,還要考慮拆分後的運作;反映到Java高并發上,就是把高并發的任務合理拆分成多線程,并處理好線程間的資源配置設定、線程的協同運作等問題。要保證高效率,又要n多線程運作不沖突,這是一件很難辦到的事;不過好消息就是,Java提供了大量的方法和工具來幫我們解決這個問題,下面将重點介紹這些方法和工具。

Java多線程的基礎

        首先我們了解下線程的生命周期,建立---》運作--》挂起--》....運作--》結束;

        建立線程Java提供了2種方法:extends Thread; implements Runnable(推薦使用); 

        啟動線程:Object.start();啟動後線程進入運作狀态;

        挂起線程:分為主動挂起,Object.wait()、Thread.sleep();被動挂起,synchronized、lock等;

        喚醒線程:不同的挂起方式,分别有對于激活方式wait(notify),sleep(時間參數),synchronized(鎖對象),lock(unlock);

        結束線程:執行完成後自然退出(常用);設定狀态量,讓線程結束執行而退出;

        有了這些基礎的方法後,我們理論上是能開發多線程程式了,但如果用這麼原始的方法去開發幾萬、幾十萬行代碼的程式,這是一件讓人無法想象的事。于是Java再進一步,給你我們提供了大量實用的API和架構。

線程間的資源共享

        程序是作業系統配置設定和排程的基本機關,在當代面向線程設計的計算機結構中,程序是線程的容器;簡單的說就是作業系統隻負責把資源配置設定給程序,然後線程使用程序裡的資源。既然線程使用的是程序裡的資源,而不是僅僅使用線程獨有的資源,如果有多個線程,那麼自然就會存在一個資源共享的問題,否則臨界資源同時被多個線程使用,那肯定是要出錯了。為了解決資源的共享問題,JDK提供了很多方法,不過我隻寫我認為重要、有用的。

1、Lock:

        解決臨界資源問題最直接的方法就是加鎖,Lock作為synchronized功能的擴充,比synchronized更加靈活好用,并且也擴充了一些相關的子類針對特定場景的使用,在JDK底層中有大量應用;不過這還是屬于基礎方法,使用複雜,建議謹慎使用。

2、線程池:

        為了避免頻繁的建立和消耗線程,我們可以對建立的線程進行複用。這裡和資料庫連接配接池類似,當系統使用資料庫時并不是建立一個新的連接配接,而是從連接配接池中獲得一個可用的連接配接;反之,關閉連接配接時也不是真的關閉這個連接配接,而是将連接配接還給連接配接池,通過這種方式,既可以節約建立和銷毀對象的時間,又可以控制系統連接配接的數量,保證系統穩定運作。使用線程池我們也可以獲得類似的好處,将線程統一管理,既有利于系統運作效率的提高,也可以減少不同使用者獨立建立、銷毀線程可能帶來的風險,進而提高系統運作的穩定性。

        為了實作線程池的功能,JDK提供了一套Executor架構,幫助我們實作線程池的功能。在實際應用中我們也應該使用線程池來對我們的線程進行管理,避免對線程的手動管理。關于Executor具體内容和使用會在附錄中有一篇單獨介紹。

3、并發集合:

        在Java基礎裡我們知道,JDK給我們提供了一套常用的集合類,比如其中List、Map就給我們的程式設計帶來了極大的友善。很不幸的是,這些常用的集合類都是非線程安全的,比如兩個線程都在使用map.put(k, v)時,可能隻有一個線程成功,這對于我們程式肯定是無法接受的。為了在多線程中我們也能友善的使用這些常用的集合類的功能,JDK提供了幾套不同的解決方案。

        方法一:collections.synchronizedMap(new HashMap());、collections.synchronizedList(new LinkedList<String>()),使用collections集合工具包裡的方法将非線程安全的集合類包裝成線程安全的集合類。該方法的内部實作是mutex狀态鎖,可以滿足線程安全要求,但性能不優;

        方法二:JDK單獨為實作線程安全重新提供了線程安全的集合類,ConcurrentHashMap、CopyOnWriteArrayList、ConcurrentLinkedQueue、BlockingQueue等,這些集合類對多線程的支援良好,性能優異,唯一不好的就是實作複雜。

        關于并發集合的具體内容和使用也會在附錄中有一篇單獨介紹。

4、鎖的優化:

        鎖是最常用的同步方法之一,在高并發的環境下,激烈的鎖競争會導緻程式性能下降。這裡解釋下高并發下,鎖與性能的關系。通過上面論述我們知道,高并發其實就是通過多線程,讓多個CPU核同時運作,來加快運作速度;但是由于鎖的存在,線程需要擷取鎖資源後才能運作,如果有多個線程同時需要鎖資源,那麼同一時刻隻能有擷取鎖的那個線程運作,其它沒能擷取鎖資源的線程則需要等待;這樣就不能完全發揮出多線程的優勢,系統性能也就降低了。是以為了提高高并發的性能,需要我們對鎖進行合理優化。

        下面介紹幾條鎖優化的建議。

        減小持鎖時間:比如,fun(){ A(); B(); C(); },如果隻有B()方法是需要同步的,那就隻對B()加鎖就可以了,而不需要對整個fun()加鎖,這樣線程對鎖的持有時間明顯變短。

        減小鎖粒度:比如,ConcurrentHashMap,我們HashMap内部是分段,當我們get()、put()時,一次隻會操作其中的一段,這個時候顯然我們隻需要對需要操作的這一小段加鎖就可以保證線程安全了;相對于對HashMap加鎖,對段的加鎖顆粒更小,鎖的競争也就降低了。

        讀寫分離鎖:比如,i是臨界資源,但相對于讀取,i并不是臨界資源(如果不修改i,n個線程同時讀取i,讀到的都是i,不會有線程安全問題);但是寫i就是臨界資源了,需要鎖的控制(因為存在一個寫的覆寫問題,以及因為寫,造成的髒讀的問題);是以将讀和寫分離,能減少鎖的競争。

        鎖分離:比如,LinkedBlockingQueue,take()和put()方法,雖說都修改了隊列,但它們修改的位置不同,理論上兩者不存在沖突,是以用兩個鎖分别控制take()和put(),這樣削弱了鎖的競争。

        鎖粗化:因為申請鎖、釋放鎖都是要花時間的,比如,一次操作裡确實是一直需要鎖A,那麼就沒必要一會申請鎖A,一會兒又釋放鎖A,就一直拿着用完就得了。

        關于鎖的優化,就兩個方向,一個是怎麼樣讓鎖更小(鎖的時間、鎖的對象、鎖的功能),讓鎖更小的目的就是降低系統對鎖的競争;另一個方向就是讓鎖更大,讓鎖更大的目的是減少不必要的加鎖、釋放鎖的時間。性能優化需要我們根據運作的實際情況來改進,那個地方阻礙了系統性能的發揮,我們就應該做相應的優化。

        一種很好的新思路ThreadLocal。

        之前我們一直講鎖的優化,但想想如果我們直接都不用鎖了,那不就不存在競争,性能最好了。這要從鎖的起源說起,加鎖我們是為了解決臨界資源的問題,如果我增加臨界資源的數量,一直增加到臨界資源的數量和線程數量一樣多,這樣不就不存在臨界資源競争的問題了,于是就可以不需要鎖,自然就不需要解決鎖的問題了。關于ThreadLocal的方式,說出來很容易懂,隻是有時在實際中往往會忽視掉。還有一點要注意的是,ThreadLocal隻是提供一個簡單的容器,為每個線程都配置設定資源需要業務層代碼來完成。這裡也展現了計算機裡的一個重要思想,即時間和空間是可以互換的,隻是使用者需要控制好轉換的方向和量。

5、無鎖政策:

        無鎖政策實質上是一種樂觀鎖思想,即認為線程是不會沖突的,既然不會發生沖突,那自然就不需要等待、加鎖來保證線程安全了,所有的線程都直接執行就完了。如果真的發生沖突了,那發生沖突的線程重新執行,直到沒發生沖突執行完成。

        那問題是我們怎麼實作沒沖突就執行完成線程,有沖突就重新執行線程呢?這裡就使用到一個重要技術:比較交換技術(CAS Compare And Swap)。

        CAS算法:參數CAS(V,E,N),V要更新的變量,E變量的預測值,N新值;當V的值和E的值相同時,則将V的值更新為N的值,不同則放棄更新。通俗的說就是隻有當變量目前的值和我預測是一緻時,我才修改。而且現在大多數處理器已經支援原子化CAS指令,性能優異,而且線程安全。

        有了CAS技術後,就可以來實作我們的無鎖政策了:當我線上程中需要修改一個共享變量時,可以先讀出變量,緩存到線程獨有的記憶體中,再對變量運算後;把變量、緩存值、運算後的結果作為參數傳給CAS;如果CAS執行成功,那我們線程就完成了,如果CAS執行失敗,那麼我們就放棄本次操作,重新執行線程。

        從無鎖政策原理我們知道,無鎖政策既沒有鎖的開銷,也不會切換線程,并且還有處理器的底層支援,顯然在高并發性能上是優于有鎖的程式;而且CAS政策也保證了每次一定會有一個線程執行成功,這樣也不存在死鎖、阻塞的問題。

        寫了一大堆無鎖政策的優點,确實在并發性能上很有優勢;缺點呢就是我沒寫,并且也不打算寫的,實作複雜。JDK提供了無鎖政策實作的并發包,在java.util.concurrent.atomic工具包裡有相關方法,可以直接使用;關于無鎖政策的具體實作,參照書上Vector吧。

線程的協同運作(第5章)

        在上節我們詳細講解了怎麼解決線程間的競争問題,保證了多線程的安全,使多個線程能夠同時穩定運作。那麼怎麼讓這些線程更好的協同運作來完成我們大并發的任務呢?這節我們将講解一些常見的并行模式的設計方法,通過對這些經典的設計模式的學習,來提高我們并行程式的設計能力,讓我們設計出來的多線程運作的更好,最大的發揮出多線程的性能;最後通過介紹Java重要的NIO和AIO讓我們對多線程的使用有個更深入的認識。

1、常見的并行模式

        單例模式:作為最常見的、最簡單的模式之一。核心思想就是将構造方法私有化,保證無法建立該類對象;然後對外提供一個擷取靜态執行個體的靜态方法,将靜态執行個體的建立隐藏到類裡面完成;這樣就確定了系統隻有一個該靜态執行個體。

        不變模式:不變模式的核心是對象一旦建立,就不允許修改。實作的關鍵字是final,用來修飾class時表示該類不能被繼承,防止通過繼承子類的修改;用來修飾變量時,表示該變量隻許在對象構造時被指派一次,不允許再修改。不變模式天生對多線程友好,比如說Java的基礎資料類型和String使用的都是不變模式,使用不變模式後,所有的執行個體方法均是不需要進行同步操作(例如:String.subString(),在截取字元串時就不用擔心别的線程此時修改了該字元串;Long l=0L,也不用擔心受别的線程影響,但long l=0L就沒那麼幸運了),保證了多線程下的性能。

        生産者-消費者模式:生産者-消費者的核心思想就是通過共享記憶體緩沖區,避免了兩者的直接通信,實作生産者、消費者解耦。這也是并行模式的核心設計思想之一,實作線程的解耦。通常用來充當共享緩沖區資料結構有BlockQueue(BlockQueue是阻塞隊列,ConCurrentLinkedQueue為非線程阻塞隊列),即生産者負責将資料寫到BlockQueue裡,消費者直接到BlockQueue裡讀取資料,BlockQueue本身是JDK提供的一款多線程安全的阻塞隊列,能較好的實作線程安全和同步。但BlockQueue底層還是基于鎖的實作機制,完全使用鎖和阻塞等待來實作線程的同步,高并發性能并不是十分優異。我們知道ConCurrentLinkedQueue底層是用CAS政策實作的,性能優異,但這裡并不适合用來做共享緩沖區,主要是因為ConCurrentLinkedQueue是非線程阻塞,如果用來做緩沖區,需要業務層代碼控制循環監控隊列,使用不友善,也影響了性能。那麼有沒有一種使用簡便,底層也是基于CAS政策實作的共享資料通道呢?Disrupt無緩存架構提供了我們需要的共享資料通道,具體實作太複雜,就不研究了,使用十分友善,參加下官方API和樣例即可。

        Future模式:Future模式核心思想是異步調用,說到異步調用大家最熟悉的肯定是$.ajax();Future模式原理與之相同,唯一不同的是Future會立即傳回一個僞造的資料,等實際業務處理完成後再傳回真實的資料。這種異步調用,完全無阻塞,在大并發下能更好的提高系統性能。關于異步與同步的好處,本身就是一個很大的話題,這裡我們知道它帶來的好處,以及提高系統性能的原因就好了。

2、網絡NIO和AIO

        IO:IO是計算機經典的問題之一,我們知道計算機運作需要連接配接不同的裝置,但這些裝置的速度完全不在一個等級,其中IO是最突出的例子。例如CPU一個時鐘周期0.3ns,cache(1-10ns),記憶體100ns,SSD固态硬碟50*1000ns,普通磁盤500*1000ns,網絡IO速度和普通磁盤一個等級。上面是整理的目前各種電腦器件的大緻速度。CPU的速度與磁盤速度相差都達到了6個數量級,在Java标準IO中,一個線程負責網絡資料的讀取、資料的運算、結果的傳回,這樣如果程式中有大量的IO操作,那麼CPU基本上就長時間處于無效的等待狀态了(等待IO裝置讀取資料),極大的降低了系統的性能。

        NIO:NIO是替代Java IO的一套新機制,核心思想就是通過實作IO準備(即資料寫到緩沖區,但不一定全部讀寫完成)和線程處理的分離,來解決IO速度與CPU速度不比對的問題。具體實作政策就是,用Channel作為資料緩存工具,單獨建立一個或者少量的線程專門負責Channel的管理,當Channel裡的資料準備好了後會通知對應的線程進行資料處理。這樣就避免了每個線程都去等待IO的問題,把IO等待操作交給少量的線程去處理,實際的線程隻是等到IO資料準備好了後再開始執行,這裡很大的提高了系統的性能。

        AIO:NIO在網絡操作中提供了非阻塞方法,但NIO的IO行為還是同步的,隻是這個非阻塞是由少數的“管理線程”來實作的。AIO則在NIO的基礎上更進一步,它不是IO準備好時通知線程,而是在IO操作已經完成後(完成了整個IO操作),在給線程發通知。此時線程是完全不是阻塞,我們的業務邏輯将變成一個回調函數,等待IO完成後,由系統自動觸發。很明顯AIO設計模式更符合IO業務場景的需求,實作起來也更簡單,是我們應該重點了解的。插說一句,實作AIO回調很明顯的用到了Future模式,這也是對理論的很好的一個應用。

哪裡用到Java高并發?

        這裡借用下Linus Torvalds關于并行模式的觀點,Linus認為并行模式隻有在伺服器端開發和圖檔處理這兩個領域有廣泛的應用。而Java作為最廣泛的伺服器端開發語言之一,Java高并發在伺服器端的開發自然是大有用處。目前網際網路系統主流采用的都是《伺服器--用戶端》模式,這時候往往是一個伺服器對應成千上萬的用戶端,此時對伺服器的性能要求極高,要求伺服器在短時間内處理大量的用戶端的請求,于是也就有了我們高并發的要求,這也是Java高并發目前的主要應用。

怎麼使用Java高并發?

        即使我們學習并掌握了上面關于Java高并發這麼多的知識,但寫出一個正确的、高性能并且可擴充的高并發程式依然是很困難的,在實際開發中更多的還是使用優質的架構來做開發,這樣可以大大的降低我們開發難度和提高程式的穩定性、安全性、性能等等,在附錄中會講解Akka這個高并發架構。可能初學者會好奇,既然不用,那還大篇幅的去将高并發的實作原理、實作方法幹嘛,直接上Akka架構不就得了。其實這裡是對程式設計的一個誤解,以為程式設計就是寫代碼,就是調API,其實不然,核心還是設計、分析、綜合處理等,代碼隻是具體的一種實作方法。比如如果對多線程沒有一個實質的了解,可能在業務的拆分上就很不合理,這樣是用什麼先進的架構也彌補不了的,這也是花大篇幅去解釋基本原理的原因。

書中重要知識附錄

        這部分是單獨講解某個内容的,以講清核心原理為主,中間的部分方法、以及知識點有很多的忽略,詳情請參數書上的對應章節。

線程池

Executor架構

        Executor采用工廠模式提供了各種類型的線程池,是實際使用中我們就直接從Executor中擷取我們想要的線程池,拿來直接使用即可。下面簡單的介紹下Executor提供的五大類線程池。

        newFixedThreadPool()方法:該方法傳回一個固定數量的線程池;

        newSingleThreadExecutor()方法:該方法傳回隻有一個線程的線程池;

        newCachedThreadPool()方法:該方法傳回一個可根據實際情況調整線程數量的線程池;

        newScheduledThreadPool()方法:該方法傳回一個ScheduleExecutorService對象,可以指定線程數量;

        newSingleThreadScheduledExecutor()方法:該方法傳回一個ScheduleExecutorService對象,線程數量為1;

        上面線程池分兩大類,一類是無計劃的任務,送出就會執行,這類業務應用很廣泛;另一類是有計劃的任務,送出後會按照設定的規則去執行,這種應用的場景相對少一些,不過對有些業務是必須的,比如:我們系統晚上需要清空使用者的狀态、優惠券到期了自動提醒等等,用到的就是這類計劃任務,常見的有spring task。下面分别舉例示範兩種線程池的使用。

1、newFixedThreadPool()固定大小的線程池

package concurrent.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadDemo {

	public static void main(String args[]) {
		// 建立任務對象
		MyTask task = new MyTask();
		// 建立固定數量線程池
		ExecutorService es = Executors.newFixedThreadPool(5);
		for(int i=0; i<10; i++) {
			// 向線程池裡送出任務
			es.submit(task);
		}
	}
	
	public static class MyTask implements Runnable {
		@Override
		public void run() {
			System.out.println(System.currentTimeMillis() + ": Thread ID: " + Thread.currentThread().getId());	
			try {
				Thread.sleep(1000);
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}	
}
           

2、newScheduledThreadPool()計劃任務

package concurrent.threadpool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


public class ScheduleThreadDemo {

	public static void main(String args[]) {
		// 建立任務對象
		MyTask task = new MyTask();
		// 建立任務排程計劃對象
		ScheduledExecutorService ses = Executors.newScheduledThreadPool(100);
		// 設定任務與執行計劃
		ses.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
	}
	
	public static class MyTask implements Runnable {
		@Override
		public void run() {
			System.out.println(System.currentTimeMillis()/1000 + ": Thread ID: " + Thread.currentThread().getId());	
			try {
				Thread.sleep(1000);
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}
           

        大家可以看的出來這兩段代碼都非常簡單,都是建立任務對象、擷取Executors提供的線程池對象、将任務對象綁定到線程池對象上。通過Executors提供的不同政策的對象,就能快速實作我們對線程的控制。

線程池的内部實作

        Executor為我們提供了功能各異的線程池,其實其内部均是由ThreadPoolExecutor實作的,我們詳細了解下ThreadPoolExecutor實作原理不但對我們使用了解Executor提供的線程池大有幫助,也讓我們能根據實際情況自定義特定的線程池。

        首先介紹下ThreadPoolExecutor類最核心的構造方法,

public ThreadPoolExecutor(
		int corePoolSize,		// 指定線程池中線程的數量(總線程量可大于等于這個值)
		int maximumPoolSize,    // 指定線程池中最大線程數量(總線程量不可能超越這個數值)
		long keepAliveTime,		// 超過corePoolSize數量的空閑線程,存活的時間
		TimeUtil unit,			// keepAliveTime的機關
		BlockingQueue<Runnable> workQueue,    // 任務隊列,被送出但為執行的任務
		ThreadFactory threadFactory,		  // 線程工廠,用于建立線程
		RejectedExecutionHandler handler	  // 當workQueue隊列滿的時候的拒絕政策
		)
           

        看到corePoolSize和maximumPoolSize的含義,應該很容易通過設定參數的不同,得到Executors提供的線程池對象。該方法一共七個參數,前四個很簡單,我們都會使用,第六個一般使用的是JDK預設提供的,剩下的就隻有workQueue和handler了。

        workQueue:存放的是送出的任務,例如:es.submit(task);在樣例中送出了10次task,但線程隻有5個,于是就有5個送出但沒開始執行的任務存到了workQueue裡啦。既然是一個存放任務的隊列,我們知道實作隊列的方式有多種,比如:ArrayBlockQueue、LinkedBlockQueue、PriorityBlockQueue等等,選擇不同的隊列就會帶來不同的問題。ArrayBlockQueue,存在一個任務過多超出隊列長度;LinkedBlockQueue,接受過多的任務可能會占用太多記憶體,造成記憶體崩潰等等。這裡介紹下,newFixedThreadPool和newSingleFixedThreadPool使用的都是LinkedBlockQueue,newCacheThreadExecutor使用的SynchronousQueue隊列。關于隊列的選擇是要根據實際情況來确定,這也是自定義線程池的核心。

        handler:拒絕政策,實際上是一種補救措施,就是當超出了workQueue臨界值了,我們怎麼讓我們的系統不至于崩潰。JDK内置的處理方法有AbortPolicy,抛出異常阻止程式(除非是安全性要求極高,否則在大并發情況下使用這種做法不是很明智);DiscardPolicy,丢棄無法處理的任務(如果允許丢棄,這是不錯方案);DiscardOledesPolicy:也是丢棄任務,隻不過丢棄的是隊列最前的一個任務。由于上面政策都是實作RejectExecutionHandler接口,我們也可以實作該接口自定義拒絕政策。

自定義線程建立

package concurrent.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo {

	public static void main(String args[]) {
		// 建立任務對象
		MyTask task = new MyTask();
		// 擷取自定義線程池
		ExecutorService es = getMyThreadPool();
		for(int i=0; i<20; i++) {
			// 向線程池送出任務
			es.submit(task);
		}
	}
	
	// 自定義線程池,我們建立一個線程數固定的線程池
	public static ExecutorService getMyThreadPool() {
		ExecutorService es = new ThreadPoolExecutor(
			// 設定線程池大小
			5, 5, 0L, TimeUnit.MILLISECONDS, 
			// 設定緩存隊列
			new LinkedBlockingQueue<Runnable>(5),
			// 設定線程工廠
			Executors.defaultThreadFactory(),
			// 設定拒絕政策
			new RejectedExecutionHandler() {
				@Override
				public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
					System.out.println(r.toString() + " is discard! ");    // 輸出日志後直接丢棄任務
				}
			});
		return es;
	}
	
	public static class MyTask implements Runnable {
		@Override
		public void run() {
			System.out.println(System.currentTimeMillis() + ": Thread ID: " + Thread.currentThread().getId());	
			try {
				Thread.sleep(1000);
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}	
}
           

        從上面程式的運作結果我們可以看到,10任務被執行(因為線程池有5個線程,緩存隊列也能緩存5個),10任務被丢棄,符合我們預期。這個樣例十分簡單,隻是通過這個樣例展示怎麼去自定義一個線程池,具體的線程池定義,我們要根據實際情況,設定傳入的參數即可。

Fork/Join架構

        上面我們詳細介紹了線程池的原理,還是那句話,學底層原理是拿來做設計,并不是讓直接去使用。Fork/Join線上程池的基礎上,做了更近一步的封裝,對線程的開啟、分發做了優化,使系統更穩定。另外補充下,Fork/Join還涉及到關于多線程的一個重要思想:“分而治之”,通俗的将就是将複雜的問題拆分成多個簡單問題,分開處理。下面通過一段樣例了解下Fork/Join。

package concurrent.threadpool;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ForkJoinDemo extends RecursiveTask<Long> {
	
	private static final int THRESHOLD = 10000;	   // 門閥值,當大于這個值才進行拆分處理
	private long start;								// 數列的起始值
	private long end;								// 數列的結束值

	public ForkJoinDemo(long start, long end) {
		this.start = start;
		this.end = end;
	}
	
	// 對數列進行求和
	@Override
	protected Long compute() {
		// 定義求和對象
		long sum = 0;
		if((end - start) < THRESHOLD) {
			// 當求和數列的數量小于門閥值,則直接計算不需要分拆
			for(long i=start; i<=end; i++) {
				sum += i;
			}
		}
		else {
			// 當求和數列的數量大于門閥值,則拆分成100個小任務
			ArrayList<ForkJoinDemo> subTasks = new ArrayList<ForkJoinDemo>(); 
			long step = (start + end) / 100;	// 計算每個小任務的數列的數量
			long firstOne = start;				// 動态記錄小任務數列的起始值
			// 将任務拆分,并送出給架構
			for(int i=0; i<100; i++) {
				long lastOne = firstOne + step;	// 動态記錄小任務數列的結束值
				if(lastOne > end) {
					// 當隊列結束值大于end,需要将lastOne設定為end
					lastOne = end;
				}
				ForkJoinDemo subTask = new ForkJoinDemo(firstOne, lastOne);
				firstOne = firstOne + step + 1;
				// 将子任務添加到數組中
				subTasks.add(subTask);
				// 執行子任務,這裡是将子任務交個Fork/Join架構,什麼時候開始執行由架構自己決定
				subTask.fork();
			}
			// 将子任務的計算結果彙總
			for(ForkJoinDemo st : subTasks) {
				sum += st.join();
			}
		}
		return sum;
	}
	
	public static void main(String args[]) {
		// 建立任務對象
		ForkJoinDemo task = new ForkJoinDemo(0, 500000L);
		// 建立Fork/Join線程池
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		// 将任務送出給線程池
		ForkJoinTask<Long> result = forkJoinPool.submit(task);
		try {
			// 取出運算結果
			long sum = result.get();
			System.out.println("sum: " + sum);
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}
	}	
}
           

        這段多線程求和代碼本身沒啥用,主要是通過這段代碼了解分拆的思想,和熟悉架構的使用。

并發集合:

        有一種說法,程式就是“算法+資料結構”,而這類并發集合就是JDK為我們提供的資料結構,這些線程安全的并發集合主要有連結清單、HashMap、隊列等。

1、concurrentHashMap

        concurrentHashMap主要是使用了“鎖的優化方案--減小鎖顆粒度方案”對mutex互斥變量進行了優化,提高了并發性。

        我們知道為了讓HashMap線程安全,JDK采用的政策是,對HashMap添加mutex變量,所有的對HashMap的操作均要先獲得鎖,這樣就實作的HashMap多線程的同步;concurrentHashMap也是使用相同的政策,也是添加互斥變量mutex來實作線程的同步,不同的是concurrentHashMap的資料結構和HashMap有略微的差别。

        我們可以了解為concurrentHashMap是由16(預設為16個)個小HashMap來組成的,然後對每個小HashMap添加互斥變量來實作小HashMap的多線程安全,這樣concurrentHashMap也就是線程安全的了。因為Map的重要方法get()和put()方法一次隻會操作一個元素,也就是隻會修改16個中的某一個HashMap,這樣我們隻需對目前被修改的HashMap加鎖就能保證線程安全了。

        但是Map也有操作全局的方法,比如size()方法,要擷取目前Map裡元素的總數,這時候就要對全局加鎖,相當于是16個HashMap的鎖都擷取到了才能計算(當然在實際中JDK并沒有這麼去做,而是使用了優化方案,但concurrentHashMap的size()方法也比HashMap的size()方法慢)。這裡也揭示了一個問題,就是使用減小鎖顆粒的優化方案時,過多的使用擷取全局資訊的方法會降低性能。

2、concurrentLinkedQueue

        concurrentLinkedQueue是在高并發環境中性能最好的隊列。之是以有很好的性能,是因為采用了CAS無鎖政策來保證線程安全的。關于CAS實作原理、性能好的原因在鎖的優化裡面有詳細論述,這裡就不再重複了。

3、CopyOnWriteArrayList

        CopyOnWriteArrayList是适合用于讀操作遠遠大于寫操作場景的連結清單。首先我們來了解下CopyOnWriteArrayList的實作原理,其實在名字上已經展現了大半;連結清單采用無鎖機制,不過這裡使用的不是CAS政策,而是不變模式,當有讀操作,直接讀連結清單就可以了;當有寫操作,不去直接修改連結清單,而是将連結清單複制一份,然後修改複制的連結清單,修改完成後直接替換掉之前的連結清單,這類似于Java中String的處理。

        這樣确實保證多線程安全,不過我們也看的出來,這樣做代價很大;每修改一次,就要對連結清單全部複制,這是非常占記憶體和浪費CPU的;隻有當寫操作很少時,這種方案才有價值,當寫操作較多時,這種方案就會因為頻繁複制對象而大大降低系統性能。

4、BlockingQueue

        BlockingQueue是一個專門用來做資料共享通道的接口。上面提到concurrentLinkedQueue是性能最好的隊列,但它并不适用用來做資料共享通道;因為資料共享通道并不僅僅提供資料存儲的功能,還要做為線程的橋梁,實作線程間的協同運作等功能,concurrentLinkedQueue顯然沒有這樣的功能。

        BlockingQueue之是以适合用來做資料共享通道,其核心還是在Blocking(阻塞)上。BlockingQueue提供了阻塞的put()和take()方法,當向通道寫、讀資料時,如果資料通道為空或者已滿,它們會等待滿足條件了後再執行操作,這樣就較友善的實作了讀和寫線程的協同運作。生産者-消費者就是BlockingQueue使用的經典案例。

5、SkipList

        SkipList是一種可以用來快速查找的資料結構,結構比較奇特,總的來講采用的是分層的連結清單,可以看做是為适用多線程改良的平衡二叉樹吧。在功能上concurrentShipListMap和concurrentHashMap類似,但用ShipList實作的Map是有序的,在有順序要求的業務中,當然首推适用concurrentShipListMap。

        在性能上,當并發量不高、資料量不大時,concurrentHashMap總體性能略好些;當并發量很高、資料量很大時,concurrentShipListMap性能更好;也就是說concurrentShipListMap更适合大并發。concurrentShipListMap一個缺點就是占記憶體較多,因為其實作需要備援記錄一些節點,這裡也是使用計算機裡總要的思想:用空間換時間。

        總的來說這些常見的集合都有自己的優勢和缺陷,在使用時我們要根據實際業務場景選擇合适的集合,我們需要的是熟知其中的原理,這樣才能讓我們做出合理的選擇。

Akka介紹:

        還是回到高并發這個起點上來,通過上面對Java多線程的了解,我們知道怎麼去合理使用伺服器上的多核CPU;但是實際高并發環境中,往往一台伺服器是不夠的,需要多台伺服器的聯合使用才能完成任務,這個時候或許你會想到分布式、叢集等。而Akka就提供了分布式這樣的功能,通過Akka不僅可以在單機上建構高并發程式,也可以在網絡中建構分布式程式,并提供位置透明的Actor定位服務。總的來說Akka采用的是Actor和消息系統來實作分布式的,下面就簡單的介紹下Actor模型、消息投遞、消息接受、消息路由等知識,最後用一個簡單樣例示範下Akka的使用方式。

        插說一段:多線程、分布式、叢集,作為解決高并發問題的方案(注意:并行模式是設計模式,并不是解決方案),經常一起出現,其實給初學者帶來了很大的困惑,很多初學者并不了解它們間的關系,很長一段時間我也是把它們當同一個東西來看的,其實不然。既然上面提到了這些概念,那我還是對它們分别做一個簡單說明吧。

        多線程:一台伺服器上運作多個線程,解決的重點是怎樣提高單個伺服器記憶體、CPU等資源的使用率問題;

        分布式:一個業務拆分成多個子業務,部署在不同的伺服器上,解決的重點是業務在多台伺服器高效協同運作的問題;

        叢集:同一個業務,部署在多個伺服器上,指的是系統對多硬體的組合使用方式;

        多線程我們可以看做是對程式的優化,提高程式本身的性能;分布式可以看做是一個架構,因為隻有是分布式架構的系統才支援利用叢集的方式來對系統擴充;叢集就是對分布式系統橫向擴充;是以一般叢集與分布式是一起出現的,而兩者與多線程的關系則不大。因為這三者都提高了系統的性能,而且解決的問題的方向也不同,是以實際生産中都是将它們一起使用,來解決大高發問題。

        Actor模型:A Model of Concurrent Computing in Distributed Systems(分布式系統中的并行計算模型)。Actor模型我們可以看做是一個函數,隻不過比函數要複雜,因為還包含内置狀态,但它具有函數最重要的特征,就是無論怎麼執行,也無論在哪裡執行,隻有參數、環境一定,執行結果就是确定的,這也是實作分布式的基礎。

        消息系統:整個Akka應用是由消息來驅動的,我們可以簡單的了解,Actor是具體的執行函數,消息則是去調用這些函數;但這與一般的函數調用又不同,不是通過方法名來調用,而是給需要調用的Actor發一個消息就可以了,Actor收到消息後,就會自己去執行業務。這裡就涉及到消息投遞、消息接受、消息路由等。

        消息投遞:函數我們知道,參數确定了執行結果也就确定了,Actor模型類似于函數,我們想要一個确定的結果,那麼我們投遞的消息就應該是确定的,是以這裡強烈建議消息使用不變模式,防止消息在傳遞的過程中被修改。另外一個重要問題就是消息的投遞政策,Akka采用的是至多一次投遞政策,這種政策中,每條消息最多會被投遞一次。在這種情況下,可能偶爾會出現消息的投遞失敗,而導緻消息丢失,這就需要我們在業務層維護消息的可靠性,采用這種政策主要是因為性能最高、成本最低。

        消息收件箱:Akka架構為我們提供了一個叫做“收件箱”的元件,使用收件箱,可以很友善地對Actor進行消息的發送和接受。

        消息路由:其實就是用來群發消息的,通過收件箱我們可以友善的給某個Actor發送消息,但是在高并發環境中,往往存在大量的功能相同的Actor在并行處理業務,這個時候我們需要把消息合理的發送給這些Actor,首選就是群發了,而消息路由就是用來幫我們實作群發消息功能的。消息路由提供了不同的發送規則,在建立消息路由時可以指定合适的規則。

1、粒子群算法

        粒子群算法與Actor模型有着天生的切合度,下面就正好用這經典的算法來示範Akka的使用。當然Akka應用場景非常多,粒子群算法隻是某一個應用而已,介于我對Akka的了解也不深入,就不去标新立異,還是老老實實的先模仿吧。

        粒子群最典型的應用就是用來尋找最優化方案。本質就是就是窮舉法,比如解決一個問題的方案有1億種,如果我們把這1億種方案的結果都算出來,然後對結果進行比較,自然是能找出最優方案的;但由于計算機的運算能力有限,隻能運算出一萬種方案的結果;這個時候我們就隻能随機計算一萬種方案,然後找出其中最好的結果,做為我們的次優解,雖說這并不是最好的方案,但這是我們能擷取的最接近最優解的方案,在很多應用中,這種方案也是能被接受的(很典型的就是天氣預報,雖說不能100%準确,但也有很大的價值)。

        随機方案的選取其實是這種最優化計算的核心,Akka架構提供的運算支援隻是基礎,但我們這裡主要是為了示範Akka的使用方法,就不去讨論優化算法了。

        比如有這麼一個問題:

        假設有400萬資金,要求4年内使用完。若在任意一年使用x萬元,則可以擷取√x萬元的收益(本金和收益均不能再使用);當年不用的資金可以存入銀行,年利率為10%,這部分的本金和收益均能拿去使用。數學好的使用拉格朗日公式對方程組求解很快就能算出第一年使用86.19萬元,第二年使用104.29萬元,第三年使用126.29萬元,第四年使用152.69萬元,能獲得43.09萬元的最優收益。數學不好的也沒事,現在不有了計算機麼,下面讓我們的計算機試試,看看能得到什麼樣的結果。

package concurrent.akka;

import java.util.Collections;
import java.util.List;

/**
 * 可行的解決方案類,記錄每年投資的錢,和該種方案獲得的收益;
 * 因為每種方案都是固定的,不會改變,是以這裡都設定成不變模式;
 */
public final class PsoValue {
	// 該方案的收益
	final double value;
	// 該方案每年投資的錢
	final List<Double> x;
	
	public PsoValue(double v, List<Double> x2) {
		this.value = v;
		this.x = Collections.unmodifiableList(x2);
	}
	
	public double getValue() {
		return value;
	}
	
	public List<Double> getX() {
		return x;
	}
	
	public String toString() {
		StringBuffer sb = new StringBuffer();
		sb.append("value: ").append("-->").append(x.toString());
		return sb.toString();
	}
}
           
package concurrent.akka;

/**
 * 全局最優方案類,記錄全局最優的方案;
 */
public final class GBestMsg {
	
	final PsoValue value;
	
	public GBestMsg(PsoValue v) {
		value = v;
	}
	
	public PsoValue getValue() {
		return value;
	}
}
           
package concurrent.akka;

/**
 * 個體最優方案類,記錄個體最優的方案;
 */
public final class PBestMsg {
	
	final PsoValue value;
	
	public PBestMsg(PsoValue v) {
		value = v;
	}
	
	public PsoValue getValue() {
		return value;
	}	
}
           
package concurrent.akka;

import java.util.List;

/**
 * 投資方案适應度類,計算出投資方案的收益,收益越大,我們認為适應度越高
 */
public class Fitness {
	
	public static double fitness(List<Double> x) {
		double sum = 0;
		for(int i=1; i<x.size(); i++) {
			// 擷取對于年投資的收益
			sum += Math.sqrt(x.get(i));
		}
		return sum;
	}	
}
           
package concurrent.akka;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import akka.actor.ActorSelection;
import akka.actor.UntypedActor;

/**
 * 粒子類,也是粒子群算法中最核心的類;
 * 在粒子群算法中,為了提高效率,選擇執行方案時并不是完全随機的;
 * 而是讓粒子先随機分布在整個區域内單獨查找,看誰查找的方案适配度最優,
 * 并将這最優方案發送給大家,大家再以這個最優方案為方向,
 * 按一定算法選擇執行方案,繼續查找,以此類推,直到結束。
 * 
 * 如果不明白的,就直接把Bird看做是一個執行了多種方案的類,
 * 重點了解怎麼使用Akka讓這些Bird傳遞消息就可以了。
 */
public class Bird extends UntypedActor {
	// Bird個體最優方案
	private PsoValue pBest = null;
	// 全局最優方案
	private PsoValue gBest = null;
	// 粒子在各個次元上的移動速度;每一年的投資認為是一個次元,一共4個次元(這裡數組長度為5,是為從1-4的角标,友善代碼閱讀)
	private List<Double> velocity = new ArrayList<Double>(5);
	// 粒子的初始化位置
	private List<Double> x = new ArrayList<Double>(5);
	// 建立生成随機數對象
	private Random r = new Random();
	
	// 建立粒子時,初始化粒子的位置,每一個位置可以看做是一種方案
	@Override
	public void preStart() {
		// 初始化velocity和x
		for(int i=0; i<5; i++) {
			velocity.add(Double.NEGATIVE_INFINITY);
			x.add(Double.NEGATIVE_INFINITY);
		}
		
		// 第一年投資資金 x1<=400
		x.set(1, (double)r.nextInt(401));
		
		// 第二年投資資金 x2<=440-1.1*x1(x1:第一年投資資金)
		double max = 440 - 1.1 * x.get(1);
		if(max < 0){
			max = 0;
		}
		x.set(2, r.nextDouble() * max);
		
		// 第三年投資資金 x3<=484-1.21*x1-1.1*x2
		max = 484 - 1.21 * x.get(1) - 1.1 * x.get(2);
		if(max < 0) {
			max = 0;
		}
		x.set(3, r.nextDouble() * max);
		
		// 第四年投資資金 x4<=532.4-1.331*x1-1.21*x2-1.1*x3
		max = 532.4 - 1.331 * x.get(1) - 1.21 * x.get(2) - 1.1 * x.get(3);
		if(max < 0) {
			max = 0;
		}
		x.set(4, r.nextDouble() * max);
		
		// 計算出該方案的适應度(收益)
		double newFit = Fitness.fitness(x);
		// 得到局部最優方案(因為是第一個方案,肯定是目前最優方案)
		pBest = new PsoValue(newFit, x);
		// 建立局部最優方案消息
		PBestMsg pBestMsg = new PBestMsg(pBest);
		// 通過工廠擷取消息發送對象
		ActorSelection selection = getContext().actorSelection("/user/masterbird");
		// 将局部最優方案消息發送給Master
		selection.tell(pBestMsg, getSelf());
		
	}
	
	@Override
	public void onReceive(Object msg) throws Exception {
		// 如果接受到的是全局最優方案消息,則記錄最優方案,并根據全局最優方案更新自己的運作速度
		if(msg instanceof GBestMsg) {
			gBest = ((GBestMsg) msg).getValue();
			// 更新速度
			for(int i=1; i<velocity.size(); i++) {
				updateVelocity(i);
			}
			// 更新位置
			for(int i=1; i<x.size(); i++) {
				updateX(i);
			}
			// 有效性檢測,防止粒子超出了邊界
			validateX();
			// 重新計算适應度,如果産生了新的個體最優,就發送給Master
			double newFit = Fitness.fitness(x);
			if(newFit > pBest.value) {
				pBest = new PsoValue(newFit, x);
				PBestMsg pBestMsg = new PBestMsg(pBest);
				getSender().tell(pBestMsg, getSelf());
			}
		}
		else {
			unhandled(msg);
		}
	}

	// 更新速度
	public double updateVelocity(int i) {
		double v = Math.random() * velocity.get(i)
				+ 2 * Math.random() * (pBest.getX().get(i) - x.get(i))
				+ 2 * Math.random() * (gBest.getX().get(i) - x.get(i));
		v = v > 0 ? Math.min(v, 5) : Math.max(v, -5);
		velocity.set(i, v);
		return v;
	}

	// 更新位置
	public double updateX(int i) {
		double newX = x.get(i) + velocity.get(i);
		x.set(i, newX);
		return newX;
	}
	
	// 有效性檢測,防止粒子超出了邊界
	public void validateX() {
		// x1
		if(x.get(1) > 400) {
			x.set(1, (double) r.nextInt(401));
		}
		// x2
		double max = 440 - 1.1 * x.get(1);
		if((x.get(2) > max) || (x.get(2) < 0)) {
			x.set(2, r.nextDouble() * max);
		}
		// x3
		max = 484 - 1.21 * x.get(1) - 1.1 * x.get(2);;
		if((x.get(3) > max) || (x.get(3) < 0)) {
			x.set(3, r.nextDouble() * max);
		}
		// x4
		max = 532.4 - 1.331 * x.get(1) - 1.21 * x.get(2) - 1.1 * x.get(3);
		if((x.get(4) > max) || (x.get(4) < 0)) {
			x.set(4, r.nextDouble() * max);
		}		
	}		
}
           
package concurrent.akka;

import akka.actor.ActorSelection;
import akka.actor.UntypedActor;

/**
 * 主粒子類,使用者管理和通知全局最優方案
 */
public class MasterBird extends UntypedActor {
	// 全局最優方案
	private PsoValue gBest = null;
	
	@Override
	public void onReceive(Object msg) throws Exception {
		if(msg instanceof PBestMsg) {
			PsoValue pBest = ((PBestMsg) msg).getValue();
			if((gBest == null) || (gBest.getValue() < pBest.getValue())) {
				// 更新全局最優方案,并通知所有粒子
				gBest = pBest;
				ActorSelection selection = getContext().actorSelection("/user/bird_*");
				selection.tell(new GBestMsg(gBest), getSelf());
				// 列印最優方案
				System.out.println(gBest.getValue());
			}
		}
	}	
}
           
package concurrent.akka;

import akka.actor.ActorSystem;
import akka.actor.Props;

public class PSOMain {

	public static final int BIRD_COUNT = 1000000;
	
	public static void main(String args[]) {
		
		// 建立Actor的管理和維護系統
		ActorSystem system = ActorSystem.create("psoSystem");
		// 建立Master粒子
		system.actorOf(Props.create(MasterBird.class), "masterbird");
		// 建立Bird粒子群
		for(int i=0; i<BIRD_COUNT; i++) {
			system.actorOf(Props.create(Bird.class), "bird_" + i);
		}		
	}	
}
           

        當粒子群的數量達到100w時,基本每次的最大收益都能達到43.05萬之上了,與我們計算的理論值43.09非常接近,基本能滿足我們的要求。還有一個值得注意的問題,這裡建立了100w個粒子,如果是建立這麼多線程,我這ThinkPad的筆記本估計早卡死了,但在測試的時候,電腦完全不卡,并且一分鐘不到也就運算完了,可見在系統中是可以啟用大量的Actor,提高了我們對并發的支援。