天天看點

Java并發程式設計的藝術6之Java中的13個原子操作類、并發工具類、線程池、Executor架構7 Java中的并發工具類8Java中的線程池9 Executor架構

Java從JDK1.5開始提供了java.util.concurrent.atomic包,這個包中的原子操作類提供了一種用法簡單、性能高效、線程安全地更新變量地方式。

6.Java中的13個原子操作類

6.1.原子更新基本類型

使用原子方式更新基本類型,Atomic包提供了三個類:AtomicBoolean、AtomicInteger、AtomicLong。

幾種常用方法: int addAndGet(int delta)、boolean compareAndSet(int expect,int update)、int getAndTncrement()、void laszSet()、int getAndSet().

Atomic包裡的類基本都是使用Unsafe實作的。

6.2.原子更新數組

通過原子的方式更新數字裡的某個元素,Atomic包提供了以下4個類:

AtomicIntegerArray:原子更新整型數組裡的元素。

AtomicLongArray:原子更新長整型數組裡的元素。

AtomicReferenceArray:原子更新引用類型數組裡的元素。

6.3.原子更新引用類型

如果要原子更新多個變量,就需要使用這個原子更新類型提供的類:

AtomicReference:原子更新引用類型

AtomicReferenceFieldUpdater:原子更新引用類型裡的字段

AtomicMarkableReference:原子更新帶有标記位的引用類型。可以原子更新一個布爾類型的标記為和引用類型。

6.4.原子更新字段類

三個類:AtomicIntegerFieldUpdater:原子更新整形字段的更新器

AtomicLongFieldUpdater:原子更新長整形字段的更新器

AtomicStampedReference:原子更新帶有版本号的引用類型該類将整數值與引用關聯起來,可用于原子的更新資料和資料的版本号,可以解決使用CAS進行原子更新可能出現的ABA問題。

要想原子的更新字段類需要兩步:1.因為原子更新字段類都是抽象類,每次使用的時候必須使用靜态方法newUpdate()建立一個更新器,并且需要設定想要更新的類和屬性。2.更新類的字段(屬性)必須使用public volatile修飾符。

7 Java中的并發工具類

在JDK開發包裡提供的幾種有效的并發工具類,CountDownLatch,CyclicBarrier和Semaphore工具類提供了一種并發流程控制的手段,Exchanger工具類則提供了線上程間交換資料的一種手段。

7.1 等待多線程完成的CountDownLatch

CountDownLatch允許一個或多個線程等待其他線程完成操作

7.2 同步屏障 CyclicBarrier

CyclicBarrier:讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運作。

預設構造方法:CyclicBarrier(int parties) 參數表示屏障攔截的線程數量。

更進階的構造函數:CyclicBarrier(int parties,Runnable barrierAction)用于線上程到達屏障時,有限執行barrierAction,友善處理更複雜的業務場景。

CyclicBarrier可以用于多線程計算資料場景。

CyclicBarrier和CountDownLatch的差別:CountDownLatch的計數器隻能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置。是以CyclicBarrier可以處理更為複雜的業務場景。CyclicBarrier還有其他方法:getNUmberWaiting方法可以擷取CyclicBarrier阻塞的線程數量。isBloken方法用來了解阻塞的線程是否被中斷。

7.3 控制并發線程數的Semaphore

Semaphore 用來控制同時通路特定資源的線程數量。它通過協調各個線程,以保證合理的使用公共資源。

應用場景在于流量控制,特别是公共資源有限的應用場景,比如資料庫連接配接。

構造方法Semaphore(int permits)接受一個整形數字,表示可用的許可證數量。首先線程使用Semphore的acquire方法擷取一個許可證,使用完之後調用release方法歸還許可證。還可以用tryAcquire方法擷取許可證。

Eemaphore還有其他方法:intavailablePermits()傳回此信号量中目前可用的許可證數。

intgetQueueLength()傳回正在等待擷取許可證的線程數。

booleanhasQueueThreads:是否有線程正在等待擷取許可證。

7.4 線程間交換資料的EXchanger

Exchanger是一個用于線程間協作的工具類,用于線程間的資料交換。通過提供一個同步點,使得在這個同步點,兩個線程可以交換彼此的資料,通過exchange()方法。第一個線程執行exchange方法,然後會一直等待第二個線程也執行exchange方法,當兩個線程都達到同步點時,這兩個線程就可以交換資料。

應用場景:遺傳算法、校對工作。

8Java中的線程池

合理使用線程池有3個好處:

1.降低資源消耗 2.提高相應速度 3.提高線程的可管理性。

8.1 線程的實作原理

看書上的流程圖

8.2 線程池的使用

8.2.1 線程池的建立

使用ThreadPoolExecutor ,new  ThreadPoolExecutor(coolPoolSize,maxiumPoolSize,keepAliveTime,milliseconds,runnableTaskQueue,handler);

(1)coolPoolSize :線程池的基本大小,當送出一個任務到線程池時,線程池會建立一個線程來執行任務,即其他空閑的基本線程能夠執行新任務也會建立線程,等到需要執行的任務數大于線程池基本大小時就不再建立。如果調用線程池的prestartAllCoreThreads方法,線程池會提前建立并啟動所有基本線程。

(2)runnableTaskQueue:任務隊列,用于儲存等待執行的任務的阻塞隊列。可以選擇以下幾個阻塞隊列:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlovkingQueue。

(3)maximumPoolSize 線程池最大數量

如果使用了無界的任務隊列這個參數就沒什麼效果。

(4)ThreadFactory用于設定建立線程的工廠,可以給線程起名字。

(5)RejectedExecutionHandler 飽和政策。當隊列和線程池都滿了後,采取的處理新送出任務的政策。一共有4種:AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy,DiscardOilicy.

8.2.2 向線程池送出任務

兩個方法execute(不需要傳回值)和submit(需要傳回值)。submit方法中,線程池會傳回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,并且可以通過future的get方法來擷取傳回值,get方法會阻塞目前線程直到任務完成。

8.2.3 關閉線程池

shutdown或shutdownNow。原理:周遊線程池中的工作線程,然後逐個調用線程的interrupt方法來中斷線程,是以無法響應終端的任務可能永遠無法中止。

shutdown:隻是将線程池的狀态設定成SHUTDOWN狀态,然後中斷所有沒有正在執行任務的線程。

shutdownNow:首先将線程池的狀态設定成STOP,然後嘗試停止所有正在執行或暫停任務的線程,并傳回等待執行任務的清單。

調用後,isShutDown方法會傳回true,隻有所有任務都關閉後,才表示線程池關閉成功,調用isTerminaed方法會傳回true。

通常調用shutdown方法,如果線程不用執行完,調用shutdownNow方法。

8.2.4 合理地配置線程池

需要先分析任務特性,看書上截圖

建議使用有界隊列。

8.2.5 線程池的監控

taskCount:線程池需要執行的任務數量

completedTaskCount:線程池在運作過程中已完成的任務數量。

largestPoolSize:線程池曾經建立過的最大線程數量《=taskCount。

getPoolSize:線程池的線程數量

getActiveCount:擷取活動的線程數。

9 Executor架構

Java的線程即是工作單元也是執行機制。從JDK1.5開始,把工作單元與執行機制分離開。工作單元包括Runnable和Callable,而執行機制由Executor提供。

9.1 Executor架構簡介

 在上層,Java多線程程式通常會把應用分解為若幹個認為,然後使用使用者級的排程器(Executor架構)将這些任務映射為固定數量的線程;在底層,作業系統将這些線程映射到硬體處理器上。

Executor架構的結構:主要由3大部分組成:

  • 任務: 包括被執行任務需要實作的接口:Runnable接口或Callable接口。
  • 任務的執行:包括任務執行機制的核心接口Executor,以及繼承自Executor的ExecutorService接口。Executor架構有兩個關鍵類實作了ExecutorService接口。(ThreadPoolExecutor和ScheduledThreadPoolExecutor)
  • 異步計算的結果 包括接口Future和實作Future接口的FutureTask類。

Executor架構的成員:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口和Executors。

(1)ThreadPoolExecutor (參數是線程池的建立中的參數)

通常使用工廠類Executors來建立,可以建立3種類型的ThreadPoolExecutor:

  • FixedThreadPool(适用于需要限制目前線程數量的場景,适合負載比較重的伺服器)、

它的coolPoolSize,maxiumPoolSize都被設定為建立FixedThreadPool時指定的參數nThreads。當線程池中的線程數大于coolPoolSize時,keepAliveTime為多餘的空閑線程等待新任務的最長時間,超過這個時間後多餘的線程将被終止。keepAliveTime=0L,意味着多餘的空閑線程立刻被終止。(看書上的流程圖)

  • SingleThreadExecutor (适合需要保證順序的執行各個任務,并且在任意時間點都不會有多個線程活動的應用場景)

它的coolPoolSize,maxiumPoolSize都被設定為1.其他的與FixedThreadPool相同。使用無界隊列LinkedBlockingQueue作為線程池的工作隊列。(看書上的流程圖)

  • CachedThreadPool(大小無界,适合執行很多的異步短期任務的小程式,或者負載較輕的伺服器)

CachedThreadPool使用沒有容量的SynchronousQueue作為線程池的工作隊列,且maxiumPoolSize無界,意味着如果主線程送出任務的速度高于maximumPool中線程處理任務的速度時,CachedThreadPool會不斷建立新線程。極端情況下,CachedThreadPool會由于建立過多的線程而耗盡CPU和記憶體資源。

(仔細揣摩下流程圖)

(2)ScheduledThreadPoolExecutor

繼承自ThreadPoolExecutor,通常使用工廠類Executors來建立,可以建立2種類型的ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor(包含若幹個線程,适用于多個背景線程執行周期任務,同時為了滿足資源管理的需求而限制背景線程數量的應用場景)、SingleThreadScheduledExecutor(包含一個線程,且需要保證順序的執行各個任務)

ScheduledThreadPoolExecutor功能與Time類似,但ScheduledThreadPoolExecutor的功能更加強大,靈活。Time如對應的是單個背景線程,而ScheduledThreadPoolExecutor可以在構造函數中指定多個對應的背景線程數。

  • ScheduledThreadPoolExecutor的運作機制

基于DelayQueue,是個無界隊列,是以maximumPoolSize在這裡沒有什麼用處。

它的執行主要分為兩個部分:

        1)當調用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDealy()方法時,會向Delayqueue添加一個實作了RunnableScheduledFuture接口的ScheduleFutureTask。

         2)線程池中的線程從DelayQueue中擷取ScheduledFutureTask,然後執行任務

  • ScheduledThreadPoolExecutor的實作

ScheduleFutureTask(待排程任務)會被放到一個Delayqueue中,待排程任務主要包含3個成員變量,long型time,表示這個任務将要被執行的具體時間。long型sequenceNumber,表示這個任務被添加到ScheduledThreadPolExecutor中的序号。

long型period,表示任務執行的間隔周期。

DelayQueue封裝了一個PriorityQueue,它會對隊列中的ScheduledFutureTask進行排序,首先比較time小的,在前面,time相同就比較sequenceNumber,sequenceNumber小的在前面。

(3)Future接口和實作Future接口的FutureTask類

Future接口和實作Future接口的FutureTask類表示異步計算的結果。

當我們把Runnable接口或Callable接口的實作類(submit)給ThreadPoolExecutor或ScheduledThreadPoolExecutor時,ThreadPoolExecutor或ScheduledThreadPoolExecutor會傳回一個FutureTask對象。到目前最新的JDK1.8中Java通過上述API傳回的是一個FutureTask對象。但這個僅僅是一個實作了Future的接口,是以以後的JDK中,傳回的不一定是FutureTask對象。

FutureTask類除了實作Future接口,還實作了Runnable接口,是以FutureTask可以較給executor執行,也可以調用線程直接執行(FutureTask.run())。根據FutureTask.run()被執行的時機,FutureTask可以分為以下三種狀态:

        1)未啟動。FutureTask.run()還沒有被執行,FutureTask處于未啟動狀态。

         2)已啟動。FutureTask.run()被執行的過程中,FutureTask處于已啟動狀态。

          3)已完成。FutureTask.run()方法執行完後正常結束,或被取消,或抛出異常而結束,FutureTask處于已完成狀态。

當FutureTask處于未啟動或已啟動狀态時,執行FutureTask.get()方法将導緻線程阻塞,執行FutureTask.cancel(true)方法将以中斷執行此任務線程的方式來試圖停止任務。處于已完成狀态時,執行FutureTask.get()方法會導緻調用線程立即傳回結果或抛出異常,執行FutureTask.cancel(。。。)方法會傳回false。已啟動狀态時,執行FutureTask.cancel(false)方法将不會對正在執行此任務的線程産生影響。

FutureTask的使用:一個線程需要等待另一個線程把某個任務執行完成後它才能繼續執行。

FutureTask的實作:基于AbstractQueuedSynchronizer(AQS)。AQS是一個同步架構,它提供通用機制來原子性管理同步狀态、阻塞或喚醒線程以及維護被阻塞線程的隊列。基于AQS的同步器會包含兩種類型的操作:acquire操作和release操作。

FutureTask.run()的執行過程:1)執行在構造函數中指定的任務 2)以原子方式類更新同步狀态,調用AQS。compareANdSetState(int expect,int update)設定state為執行完成狀态RAN)。如果這個原子操作成功,就設定代表計算劫奪的result變量為Callable.call()的值,然後調用AQS.releaseShared(int   arg)。3)AQS.releaseShared(int arg)首先會回調在子類Sync中實作的tryReleaseShared(arg)來執行release操作(設定運作任務的線程runner為null,然後傳回true);AQS.releaseShared(int arg),然後喚醒線程等待隊列中的第一個線程。4)調用FutureTask.done()

(4)Runnable接口或Callable接口:

Runnable接口或Callable接口的實作類都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執行。差別是Runnable不會傳回結果,Callable可以傳回結果。

除了可以自己建立實作Callable接口的對象外,還可以使用工廠類Executors來把一個Runnable包裝成Callable。

9.2 ThreadPoolExecutor 詳解

繼續閱讀