天天看點

java并發知識彙總

項目中用到并發的地方不多,一次是多個規則線程并發校驗文檔,一個是多個并發監控應用,比較簡單,但查找并發資料後,發現有更好的方式。于是抽空想全面了解并發,可查到網上相關并發的技術文章很多,但是對我來說有這些問題:

[list]

[*]有的隻講一個技術點

[*]有的代碼多缺少了解

[*]有的層次不夠深

[/list]

是以自己總結一下了,不是百科全書也不是字典,隻是一定知識的索引,需要記在腦子裡的。此次彙總的目标:

[list]

[*]對并發常用的東西有個概念

[*]對并發的基本例子心中有數

[*]對并發的原理,甚至設計思想有所了解

[/list]

這些彙總的東西都是需要記住的,但需要了解而不需要記太多的代碼,等到用的時候能想到解決辦法,能通過快速查找适當的資料來實作功能。

[b][size=x-large]一、最常用的同步工具[/size][/b]

[b]1.synchronized[/b]是Java中的關鍵字,是一種同步鎖。可以修飾一段代碼,一個方法...這個不展開了,初級使用。

我的了解:一個對象或者類天然有一個鎖,用法:synchronized(某個對象)。這個對象也許是自己this,也可以是其它對象:private byte[] lock = new byte[0];據說這個開銷比較少。有時候辨別在方法前面的方法,可以了解為使用this的鎖。

這個鎖的鎖定與釋放都由系統控制,不用自己管理。這具鎖對象有wait、notify 和 notifyAll方法,用于線程之間的通訊。常用于代碼結構如:for(;;){如果不滿足條件就等,如果滿足就執行,并通知其它等待的線程},這裡注重留意一下interrupt概念,要求中斷與可被中斷以及中斷後執行什麼。暫時不展開,回頭補充一下。

最好記住一個例子,比如《thinking in java》中的一個廚師放入空盤子與顧客拿走食物的這個例子。

後面開始是複雜的并發包中的内容。

[color=violet][b][size=x-large]二、并發包中常用的進階工具[/size][/b][/color]

[b]2.ReentrantLock[/b]是java.util.concurrent包中的,擁有與 synchronized 相同的并發性和記憶體語義,但是添加了類似鎖投票、定時鎖等候和可中斷鎖等候的一些特性。此外,它還提供了在激烈争用情況下更佳的性能。lock後必須在finally塊中釋放。

[b]3.ReentrantLock與條件的配合[/b]使用

要記住這個例子,一個籃子可以放蘋果,可以有多個線程生産蘋果,多個線程拿走蘋果。籃子對象有個容量,而且有一個鎖與兩個條件。生産線程放的時候調用籃子的put方法,如果籃子滿了就等待,如果不滿就放個蘋果,同時通知等待在【空了條件】上的拿蘋果的線程可以拿了,也許通知的時候,拿蘋果的線程沒有等待,而是正常運作着。

網上可以找到例子,但先記住對象關系以及鎖與方法在誰身上:獨立的籃子對象,生産者對象(引用籃子),消費者對象(引用籃子)。

籃子對象上有條件鎖,籃子對象提供PUT、GET方法,與條件鎖有關系。而生産者與消費者不斷執行線程,也就是不斷調用籃子對象上的方法。

Lock lock = new ReentrantLock();
        //條件鎖與Lock是相關的
       Condition isEmptity =lock.newCondition();
       Condition isFull = lock.newCondition();
           

如果是拿走的get()方法,就是先擷取Lock,再判斷,籃子空了就在【isEmptity】條件上等待,如果不空就拿一個,再通知可能的等待在【isFull】條件上的線程,最後釋放Lock。使用get()的線程可以在循環中調用get()方法,一般一個循環中要sleep()一會。

應用執行個體:之前在看阿裡的資料庫源工具druid中發現了使用這個條件鎖的情況。上面的例子用于把握原理的代碼架構,而真實的使用例子可以幫助你做自己代碼時考慮的更全面。

druid中的使用是連接配接池對象,它持有兩個守護程序(主線程結束就可以退出JVM,不用考慮守護程序存在)。其中一個線程是産生新的連接配接,一個線程是删除連接配接。連接配接池有一定的容量,如果不夠了就需要多的線程就産生,如果不使用的連接配接多了,就删除掉,維持一個最小池子,但又可以動态擴容的。

Condition 的方法與wait、notify和notifyAll方法類似,分别命名為await、signal和 signalAll,因為它們不能覆寫Object上的對應方法。

[b]4.ReentrantReadWriteLock[/b]

看名字就知道是讀寫鎖。這個用的應該蠻多的,不過估計都封閉在緩存工具裡了。

比如一個cache,持有一個map。那可以用此鎖來控制對map的讀寫,讀取資料的方法用讀鎖,修改資料的方法用寫鎖。讀鎖可以多個線程都擷取,如果有其它線程有寫的鎖的時候就不行。寫要等待沒有讀的鎖,也沒有其它寫的鎖,才能寫。

static Map<String, Object> map = new HashMap<String, Object>();
  static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  static Lock r = rwl.readLock();
  static Lock w = rwl.writeLock();
           

每個讀鎖或者寫鎖擷取後都要在finally中釋放。更深入的可以了解鎖降級,主要是有了寫鎖,自己寫好後,釋放前再擷取讀鎖。防止中間被其它寫鎖鑽空子,造成自己寫的内容,立馬讀出來又不對了的情況。

[b]5.CountDownLatch[/b]。Java并發包中有三個類用于同步一批線程的行為,分别是CountDownLatch、 Semaphore和CyclicBarrier,這個就是其中之一。

從名字上看就是一個倒計數的控制。有多個線程在上面等等待着,另外有多個線程會讓計數減少。當減為0後,所有等待的線程就開始動起來了。(關于等待的線程怎麼動,後面有提到更底層AQS裡的隊列。)

多對多就不講了,記住一對多與多對一例子。比如主線程同時啟動一組線程時,主線程先持有一個CountDownLatch(1),再可以循環new出一組線程,他們會在CountDownLatch上等待。主線程sleep一定時間後(等一組都進行等待中),突然讓CountDownLatch來一個countDown()。這時候一組線程就都可以動起來了。另一個情況是主線程等一組線程做完了再接着做事。主線程new一個CountDownLatch(5)後啟動一組5個線程開始運作,自己在CountDownLatch上等着,每個子線程最後來一個countDown()操作,那最後計數為0時,就激活等待的主線程繼續運作。

應用執行個體:還是在阿裡的資料庫源工具druid中看到過這個,就是init的時候,主線程産生了CountDownLatch(2),而生産連接配接的守護線程啟動後countDown(),删除連接配接的守護線程countDown(),表明兩個需要的線程都啟動了,接着做其它的事情了。生活執行個體:比如汽車啟動時要做5個自查,每完成一個減少一個,都檢查完了就正式可以開了,否則可能報警。再比如大家去吃飯,每到一個人就報數,人數夠了,等待中的上菜主線程就可以啟動了。而幾個炒菜師傅都等着指令呢,突然一個指令,幾個師傅都開始幹活了。前者是主線程等子線程的條件滿足後開工,後者是主線程下指令後子線程開工。

項目執行個體:之前那個多規則線程校驗一批文檔時,最後一步都校驗後要置文檔已經被校驗過了,正好需要些功能。其它人寫的原代碼比較老,這裡有同時運作的線程數限制,于是先是循環檢測所有線程的狀态,如果有State.NEW的,并且沒超過限制就start它,如果沒有State.NEW了就跳出循環。再并一個循環所有的線程,就把沒有State.TERMINATED的都join到主線程中來。最後又出現了與主線程的串行。如要用上面的工具就非常簡單了。另外,還可以用線程池來做,當線程池shutdown後,主線程可以循環(中間sleep一會)檢查線程池的isTerminated(),如果OK就可以再做後面的工作了。

[b]6.Semaphore[/b]

Semaphore與CountDownLatch相似,不同的地方在于Semaphore的值被擷取到後是可以釋放的,并不像CountDownLatch那樣一直減到底。它也被更多地用來限制流量,類似閥門的 功能。如果限定某些資源最多有N個線程可以通路,那麼超過N個主不允許再有線程來通路,同時當現有線程結束後,就會釋放,然後允許新的線程進來。有點類似于鎖的lock與 unlock過程。相對來說他也有兩個主要的方法:

[list]

[*]用于擷取權限的acquire(),其底層實作與CountDownLatch.countdown()類似;

[*]用于釋放權限的release(),其底層實作與acquire()是一個互逆的過程。

[/list]

這個沒有見到過例子,隻能與生活中的餐館舉例。比如有5個桌子,多了顧客隻能等待,少了就可以進去吃飯。有限流的功能,如果碰到類似的需要再來研究。

[b]7.CyclicBarrier[/b]

CyclicBarrier是用來一個關卡來阻擋住所有線程,等所有線程全部執行到關卡處時,再統一執行下一步操作,它裡面最重要的方法是await()方法。

即每個線程執行完後調用await(),然後在await()裡,線程先将計數器減1,如果計數器為0,則執行定義好的操作,然後再繼續執行原線程的内容。

代碼中就是先new一個CyclicBarrier(計數,統一操作)。前一個參數是多少個線程等待了就可以啟動了,後一個是啟動前做些其它的統一操作。生活中的場景:比如警--察抓行人闖紅燈,抓住10個人(行人等待湊夠數)後現場開班學習交通法規(統一操作),之後這批行人再出發。

上面一些東西用起來也不難,關鍵是記住使用模型是什麼樣的。

[b][size=x-large]三、更深入的了解java共享鎖模型[/size][/b]

[b]8.AQS[/b]。

在java5提供的并發包下,有一個AbstractQueuedSynchronizer抽象類,也叫AQS,此類根據大部分并發共性作了一些抽象,便于開發者實作如排他鎖,共享鎖,條件等待等更進階的業務功能。它通過使用CAS(compare and swap,比較和交換,更底層的)和隊列模型,出色的完成了抽象任務。

仔細想想上面的那些工具,有些什麼共性呢?

[b]9.cas[/b]。compare and swap的縮寫,中文翻譯成比較并交換。

CAS指令在Intel CPU上稱為CMPXCHG指令,它的作用是将指定記憶體位址的内容與所給的某個值相比,如果相等,則将其内容替換為指令中提供的新值,如果不相等,則更新失敗。這一比較并交換的操作是原子的,不可以被中斷。

這不是java特有的,而是作業系統需要保證的。利用CPU的CAS指令,同時借助JNI來完成Java的非阻塞算法。允許算法執行讀-修改-寫操作,而無需害怕其他線程同時 修改變量,因為如果其他線程修改變量,那麼 CAS 會檢測它(并失敗),算法可以對該操作重新計算。

作為樂觀鎖就是,每次不加鎖而是假設沒有沖突而去完成某項操作,如果因為沖突失敗就重試,無限循環執行(稱為自旋),直到成功為止,[color=red]這個自旋的過程算是線程在等待嗎[/color]?。AtomicInteger也是用樂觀鎖cas原子操作實作的。

是以上面很多工具首先是有一個計數,而這個計數是共享的變量。比如countDown中的計數,比如Semaphore中的閘門數,比如CyclicBarrier中的阻隔數。線程調用共享變量正好用到cas。

[b]10.等待線程隊列[/b]

另外發現上面的工具都有很多線程處于等待狀态,這些線程資訊必然要存下來,應該是按順序存,而且可能如餐廳限流一樣不斷的産生等待和啟動,是以一定用的隊列這種結構。

在countDown中通過CAS成功置為0的那個線程将會同時承擔起喚醒等待線程隊列中第一個節點線程的任務,而第一個節點任務又會發現自身為通知狀态,又會把隊列中的head指向後一個等待線程的節點,然後删除自身節點,并喚醒它。一個線程在阻塞之前,就會把它前面的節點設定為通知狀态,這樣便可以實作鍊式喚醒機制了。

[b]11.引伸知識簡單了解[/b]

java的CAS同時具有 volatile 讀和volatile寫的記憶體語義,Java的CAS會使用現代處理器上提供的高效機器級别原子指令,這些原子指令以原子方式對記憶體執行讀-改-寫操作,這是在多處理器中實作同步的關鍵。同時,volatile變量的讀/寫和CAS可以實作線程之間的通信(CAS中的不相等時的自旋看做等待?)。把這些特性整合在一起,就形成了整個concurrent包得以實作的基石。如果我們仔細分析concurrent包的源代碼實作,會發現一個通用化的實作模式:

[list]

[*]首先,聲明共享變量為volatile;

[*]然後,使用CAS的原子條件更新來實作線程之間的同步;

[*]同時,配合以volatile的讀/寫和CAS所具有的volatile讀和寫的記憶體語義來實作線程之間的通信。

[/list]

AQS,非阻塞資料結構和原子變量類(java.util.concurrent.atomic包中的類),這些concurrent包中的基礎類都是使用這種模式來實作的,而concurrent包中的高層類又是依賴于這些基礎類來實作的。

[b][size=large]12.有一個疑問[/size][/b]

CountDownLatch、 Semaphore和CyclicBarrier的設計中,是不是可以讓調用工具方法的線程由于條件不足時,都鎖定在同一個對象中。而工具持有一個atomInteger的數字,而另外的線程調用工具的調整數字的方法時,如果滿足條件再通知那些鎖定的線程啟動。這樣當然不是用更底層的AQS方式來做,等于繞了一個彎而已。不過,這樣可以/不可以嗎?

前面有提出等待是否底層就是自旋的問題?有文章說:

“自旋和阻塞:像同步模式會分為兩個陣營,他們的實作也會分為兩個陣營:他們都采用自旋或者阻塞。自旋是一個簡單的例子。比如條件同步,他采用一個一般的循環。自旋的明顯缺點就是它浪費了cpu的執行周期。在一個多應用程式系統,經常會使用阻塞——讓處理器去執行其他的可執行的線程。之前的線程也許會不久之後又會執行。阻塞不用不停的去檢視條件和鎖的狀态,但是他會在來回切換程式的時候有性能花費。如果線程等待的平均時間小上下文切換時間的兩倍,則可以優先考慮輪詢。當每個cpu核上之後一個線程在執行的時候,輪訓也是不錯的選擇,這通常是發生生在嵌入式或者高性能的系統中。[color=red]最終我們會發現,阻塞(基于排程的同步)一定是基于輪詢(自旋)實作的[/color],因為排程器使用的資料結構本身也需要同步。”

引伸:前陣子又看了一點andriod開發的例子,特别是對于UI線程的handlerMessage方式了解了一下。才明白為什麼UI的設計都是單線程,如何處理其中耗時的操作。之前講JS也是單線程模式,另外node.js當然也是單線程模式,NIO也是單線程模式。而其中也必然有一個LOOP環處理事件驅動的操作,而且由于是喚醒機制(FD),并不會造成CPU空轉,是以也是高效的。而多線程都用于處理耗時的操作,操作是通過發送消息與主線程互動的,比如NID是注冊各種事件,由選擇器處理。看來很多技術的原始的思路都是相通的。

[b][size=x-large]四、線程池[/size][/b]

13.差點漏了這個重要内容了,開始寫的内容比較多,考慮到本文的目的,進行精簡。

如同連接配接池一樣,如果是比較大的開銷進行生成與銷毀,就要考慮一個池子。如果并發的線程數量很多,并且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁建立線程就會大大降低系統的效率,因為頻繁建立線程和銷毀線程需要時間。

JDK中強烈建議程式員使用較為友善的Executors工廠方法Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)Executors.newSingleThreadExecutor()(單個背景線程)它們均為大多數使用場景預定義了設定。

newScheduledThreadPool 建立一個定長線程池,支援定時及周期性任務執行。這個在業務中用的也比較多,比如定時交換資料,同步資料。有時候用spring的定時任務,也可以啟動容器時用線程池,估計底層都差不多。

14.線程池配置的主要的參數

重點是要了解核心線程數量corePoolSize 與最大線程數量maximumPoolSize的關系。線程超過核心了,先進隊列。再多了就擴大線程數。如果無界隊列,就永遠不會用到最大數量。keepAliveTime是大于核心時,超過此時間的空閑線程要殺死。線程池實在來不及處理的時候用到handler來。

送出給池子的線程可能放隊列中緩存,有界的ArrayBlockingQueue放滿了就擴大核心數。無界就一直放,不會擴大核心數。[b]說想SynchronousQueue,你一定是碰到了假隊列[/b],它是管理直接線上程間移交資訊的機制,它會直接送出給空線程做事,沒有空的就建新線程,是以最大數量要設定為integer.maxvalue了吧。

15.線程池任務監控

線程池裡有一些屬性可以直接看到,比較任務總數,完成的任務數之類的。但到ThreadPoolExecutor時,我們是無法知道這些任務是在什麼時候才真正的執行的,為了實作這個需求,我們需要擴充ThreadPoolExecutor,重寫beforeExecute和afterExecute,在這兩個方法裡分别做一些任務執行前和任務執行後的相關監控邏輯,還有個terminated方法,是線上程池關閉後回調(這個是否可以把主線程傳進去回調整呢?),另外,我們可以通過getLargestPoolSize()和getCompletedTaskCount()來分别擷取線程池數的峰值和線程池已完成的任務數。重寫就是繼承老的,具體查資料不展開,知道可以這樣就行了。

16.線程池來不及處理的政策

一共有四種政策為,AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy. 第一個AbortPolicy是預設政策,拒絕政策就是抛異常。這個在構造ThreadPoolExecutor時,傳入一個政策對象就行了。如果傳CallerRunsPolicy對象,它本身構造時還要有一個線程對象,來執行來不及處理時,你想做的事情。