本文試圖從一個更高的視角來總結Java語言中的并發程式設計内容,希望閱讀完本文之後,可以收獲一些内容,至少應該知道在Java中做并發程式設計實踐的時候應該注意什麼,應該關注什麼,如何保證線程安全,以及如何選擇合适的工具來滿足需求。
當然,更深層次的内容就會涉及到JVM層面的知識,包括底層對Java記憶體的管理,對線程的管理等較為核心的問題,當然,本文的定位在于抽象與總結,更為具體而深入的内容就需要自己去實踐,考慮到可能篇幅過長、重複描述某些内容,以及自身技術深度等原因,本文将在深度和廣度上做一些權衡,某些内容會做一些深入的分析,而有些内容會一帶而過,點到為止。
總之,本文就當是對學習Java并發程式設計内容的一個總結,以及給那些希望快速了解Java并發程式設計内容的讀者抛磚引玉,不足之處還望指正。
Java線程
一般來說,在java中實作高并發是基于多線程程式設計的,所謂并發,也就是多個線程同時工作,來處理我們的業務,在機器普遍多核心的今天,并發程式設計的意義極為重大,因為我們有多個cpu供線程使用,如果我們的應用依然隻使用單線程模式來工作的話,對極度浪費機器資源的。是以,學習java并發知識的首要問題是:如何建立一個線程,并且讓這個線程做一些事情?
這是java并發程式設計内容的起點,下面将分别介紹多個建立線程,并且讓線程做一些事情的方法。
繼承Thread類
繼承Thread類,然後重寫run方法,這是第一種建立線程的方法。run方法裡面就是我們要做的事情,可以在run方法裡面寫我們想要在新的線程裡面運作的任務,下面是一個小例子,我們繼承了Thread類,并且在run方法裡面列印出了當然線程的名字,然後sleep1秒中之後就退出了:
/* \ Created by hujian06 on 2017/10/31. * * the demo of thread */ public class ThreadDemo {undefined
public static void main(String ... args) {undefined
AThread aThread = new AThread();
//start the thread aThread.start();
}
}
class AThread extends Thread { @Override public void run() { System.out.println("Current Thread Name:" + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
如果我們想要啟動這個線程,隻需要像上面代碼中那樣,調用Thread類的start方法就可以了。
實作Runnable接口
啟動一個線程的第二種方法是實作Runnable接口,然後實作其run方法,将你想要在新線程裡面執行的業務代碼寫在run方法裡面,下面的例子展示了這種方法啟動線程的示例,實作的功能和上面的第一種示例是一樣的:
/* \ Created by hujian06 on 2017/10/31. * * the demo of Runnable */ public class ARunnableaDemo {undefined
ARunnanle aRunnanle = new ARunnanle(); Thread thread = new Thread(aRunnanle);
thread.start();
class ARunnanle implements Runnable {undefined
@Override public void run() { System.out.println("Current Thread Name:" + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
在啟動線程的時候,依然還是使用了Thread這個類,隻是我們在構造函數中将我們實作的Runnable對象傳遞進去了,是以在我們執行Thread類的start方法的時候,實際執行的内容是我們的Runnable的run方法。
使用FutureTask
啟動一個新的線程的第三種方法是使用FutureTask,下面來看一下FutureTask的類圖,就可以明白為什麼可以使用FutureTask來啟動一個新的線程了:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iMxE2MhFWNxEjYmFjNwgTY0IWM0UmYkdTYxUmZ5AjMz8CX5d2bs92Yl1iclB3bsVmdlR2LcNWaw9CXt92Yu4GZjlGbh5yYjV3Lc9CX6MHc0RHaiojIsJye.png)
從FutureTask的類圖中可以看出,FutureTask實作了Runnable接口和Future接口,是以它兼備Runnable和Future兩種特性,下面先來看看如何使用FutureTask來啟動一個新的線程:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask;
/* \ Created by hujian06 on 2017/10/31. * * the demo of FutureTask */ public class FutureTaskDemo {undefined
ACallAble callAble = new ACallAble();
FutureTask futureTask = new FutureTask<>(callAble);
Thread thread = new Thread(futureTask);
do {undefined
}while (!futureTask.isDone());
try { String result = futureTask.get();
System.out.println("Result:" + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
class ACallAble implements Callable {
@Override public String call() throws Exception { Thread.sleep(1000); return "Thread-Name:" + Thread.currentThread().getName(); } }
可以看到,使用FutureTask來啟動一個線程之後,我們可以監控這個線程是否完成,上面的示例中主線程會一直等待這個新建立的線程直到它傳回,其實隻要是Future提供的接口,我們在FutureTask中都可以使用,這極大的友善了我們,Future在并發程式設計中的意義極為重要,Future代表一個未來會發生的東西,它是一種暗示,一種占位符,它示意我們它可能不會立即得到結果,因為它的任務還在運作,但是我們可以得到一個對這個線程的監控對象。
我們可以對線程的執行做一些判斷,甚至是控制,比如,如果我們覺得我們等了太久,并且我們覺得沒有必要再等待下去的時候,就可以将這個Task取消,還有一點需要提到的是,Future代表它可能正在運作,也可能已經傳回,當然Future更多的暗示你可以在等待這個結果的同時可以使用其他的線程做一些其他的事情,當你真的需要這個結果的時候再來擷取就可以了,這就是并發,了解這一點非常重要。
本小節通過介紹三種建立并啟動一個新線程的方法,為進行并發程式設計開了一個頭,目前,我們還隻是在能建立多個線程,然後讓多個線程做不同個的事情的階段,當然,這是學習并發程式設計最為基礎的,無論如何,現在,我們可以讓我們的應用運作多個線程了,下面的文章将會基于這個假設(一個應用開啟了多個線程)讨論一些并發程式設計中值得關注的内容。
線程模型
我們現在可以啟動多個線程,但是好像并沒有形成一種類似于模型的東西,非常混亂,并且到目前為止我們的多個線程依然隻是各自做各自的事情,互不相幹,多個線程之間并沒有互動(通信),這是最簡單的模型,也是最基礎的模型,本小節試圖介紹線程模型,一種指導我們的代碼組織的思想,線程模型确定了我們需要處理那些多線程的問題,在一個系統中,多個線程之間沒有通信是不太可能的,更為一般的情況是,多個線程共享一些資源,然後互相競争來擷取資源權限,多個線程互相配合,來提高系統的處理能力。
正因為多個線程之間會有通信互動,是以本文接下來的讨論才有了意義,如果我們的系統裡面有幾百個線程在工作,但是這些線程互不相幹,那麼這樣的系統要麼實作的功能非常單一,要麼毫無意義(當然不是絕對的,比如Netty的線程模型)。
繼續來讨論線程模型,上面說到線程模型是一種指導代碼組織的思想,這是我自己的了解,不同的線程模型需要我們使用不同的代碼組織,好的線程模型可以提高系統的并發度,并且可以使得系統的複雜度降低,這裡需要提一下Netty 4的線程模型,Netty 4的線程模型使得我們可以很容易的了解Netty的事件處理機制,這種優秀的設計基于Reactor線程模型,Reactor線程模型分為單線程模型、多線程模型以及主從多線程模型,Netty的線程模型類似于Reactor主從多線程模型。
當然線程模型是一種更進階别的并發程式設計内容,它是一種程式設計指導思想,尤其在我們進行底層架構設計的時候特别需要注意線程模型,因為一旦線程模型設計不合理,可能會導緻後面架構代碼過于複雜,并且可能因為線程同步等問題造成問題不可控,最終導緻系統運作失控。類似于Netty的線程模型是一種好的線程模型,下面展示了這種模型:
Netty線程模型
簡單來說,Netty為每個建立立的Channel配置設定一個NioEventLoop,而每個NioEventLoop内部僅使用一個線程,這就避免了多線程并發的同步問題,因為為每個Channel處理的線程僅有一個,是以不需要使用鎖等線程同步手段來做線程同步,在我們的系統設計的時候應該借鑒這種線程模型的設計思路,可以避免我們走很多彎路。
Java線程池
池化技術是一種非常有用的技術,對于線程來說,建立一個線程的代價是很高的,如果我們在建立了一個線程,并且讓這個線程做一個任務之後就回收的話,那麼下次要使用線程來執行我們的任務的時候又需要建立一個新的線程,是否可以在建立完成一個線程之後一直緩沖,直到系統關閉的時候再進行回收呢?教你如何監控 Java 線程池運作狀态。
線程池就是這樣的元件,使用線程池,就沒必要頻繁建立線程,線程池會為我們管理線程,當我們需要一個新的線程來執行我們的任務的時候,就向線程池申請,而線程池會從池子裡面找到一個空閑的線程傳回給請求者,如果池子裡面沒有可用的線程,那麼線程池會根據一些參數名額來建立一個新的線程,或者将我們的任務送出到任務隊列中去,等待一個空閑的線程來執行這個任務。
細節内容在下文中進行分析,目前我們隻需要明白,線程池裡面有很多線程,這些線程會一直到系統關系才會被回收,否則一直會處于處理任務或者等待處理任務的狀态。
首先,如何使用線程池呢?下面的代碼展示了如何使用java線程池的例子:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;
/* \ Created by hujian06 on 2017/10/31. * * the demo of Executors */ public class ExecutorsDemo {undefined
int cpuCoreCount = Runtime.getRuntime().availableProcessors(); AThreadFactory threadFactory = new AThreadFactory(); ARunnanle runnanle = new ARunnanle();
ExecutorService fixedThreadPool= Executors.newFixedThreadPool(cpuCoreCount, threadFactory);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(threadFactory);
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(cpuCoreCount, threadFactory);
ScheduledExecutorService singleThreadExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
fixedThreadPool.submit(runnanle); cachedThreadPool.submit(runnanle); newScheduledThreadPool.scheduleAtFixedRate(runnanle, 0, 1, TimeUnit.SECONDS); singleThreadExecutor.scheduleWithFixedDelay(runnanle, 0, 100, TimeUnit.MILLISECONDS);
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
fixedThreadPool.shutdownNow(); cachedThreadPool.shutdownNow(); newScheduledThreadPool.shutdownNow(); singleThreadExecutor.shutdownNow(); }
class ARunnable implements Runnable {undefined
@Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("Current Thread Name:" + Thread.currentThread().getName()); } }
/* \ the thread factory */ class AThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread("aThread-" + threadNumber.incrementAndGet()); } }
更為豐富的應用應該自己去探索,結合自身的需求來借助線程池來實作,下面來分析一下Java線程池實作中幾個較為重要的内容。
ThreadPoolExecutor和ScheduledThreadPoolExecutor
ThreadPoolExecutor和ScheduledThreadPoolExecutor是java實作線程池的核心類,不同類型的線程池其實就是在使用不同的構造函數,以及不同的參數來構造出ThreadPoolExecutor或者ScheduledThreadPoolExecutor,是以,學習java線程池的重點也在于學習這兩個核心類。
前者适用于構造一般的線程池,而後者繼承了前者,并且很多内容是通用的,但是ScheduledThreadPoolExecutor增加了schedule功能,也就是說,ScheduledThreadPoolExecutor使用于構造具有排程功能的線程池,在需要周期性排程執行的場景下就可以使用ScheduledThreadPoolExecutor。
下面展示了ThreadPoolExecutor和ScheduledThreadPoolExecutor的類圖,可以看出他們的關系,以及他們的繼承關系:
ThreadPoolExecutor類圖
ScheduledThreadPoolExecutor類圖
關于較為細節的内容不再本文的叙述範圍之内,如果想要了解這些内容的詳細内容,可以參考文章中給出的連結,這些文章較為深入的分析和總結了相關的内容。
上文中提到,線程池會管理着一些線程,這些線程要麼處于運作狀态,要麼處于等待任務的狀态,當然這隻是我們較為形象的描述,一個線程的狀态不僅有運作态與等待狀态,還有其他的狀态,但是對我我們來說,線程池裡面的線程确實是要麼處于運作狀态,要麼處于等待任務的狀态,這展現在,當我們向一個線程池送出一個任務的時候,可能會被等待任務的線程立即執行,但是可能線程池裡面的線程都處于忙碌狀态,那麼我們送出的任務就會被加入到等待運作的任務隊列中去,當有空閑線程了,或者隊列也滿了,那麼線程池就會采用一些政策來執行任務,并且在某些時刻會拒絕送出的任務,這些細節都可以在ThreadPoolExecutor的實作中找到。
線上程池的實作中,有一個角色特别重要,那就是任務隊列,當線程池裡面沒有空閑的線程來執行我們的任務的時候,我們的任務就會被添加到任務隊列中去等待執行,而這個任務隊列可能會被多個線程并發讀寫,是以需要支援多線程安全通路,java提供了一類支援并發環境的隊列,稱為阻塞隊列,這是一類特殊的隊列,他們的使用時非常廣泛的,特别是在jdk自身的類庫建設上,當然在我們實際的工作中也是有很多使用場景的。
關于ThreadPoolExecutor是如何處理一個送出的任務的細節,可以參考下面的代碼:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
下面來看一下java中借助ThreadPoolExecutor來構造的幾個線程池的特性:
1、newFixedThreadPool
使用ThreadPoolExecutor構造一個newCachedThreadPool的流程如下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue ()); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
在任意時刻,newFixedThreadPool構造出來的線程池中最多隻可能存活着nThreads個線程,如果所有的線程都在運作任務,那麼這個時候送出的任務将會被添加到任務隊列中去等待執行。
我們可以控制corePoolSize和maximumPoolSize來使得通過ThreadPoolExecutor構造出來的線程池具有一些不一樣的特性,但是需要注意的是,當我們設定的maximumPoolSize大于corePoolSize的時候,如果目前線程池裡面的線程數量已經達到了corePoolSize了,并且目前是以線程都處于運作任務的狀态,那麼在這個時候送出的任務會被添加到任務隊列中去,隻有在任務隊列滿了的時候,才會去建立新的線程,如果線程數量已經達到了maximumPoolSize了,那麼到此就會拒絕送出的任務,這些流程可以參考上面展示出來的execute方法的實作。該類型的線程池使用的任務隊列是LinkedBlockingQueue類型的阻塞隊列。
2、newCachedThreadPool
通過ThreadPoolExecutor構造一個newCachedThreadPool線程池的流程如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue ()); }
newCachedThreadPool适合于類似秒殺系統中,它可以按需建立線程。每個線程在空閑了一段時間之後會被回收,然後需要建立的時候再建立出來,在使用的時候應該使用合适的構造參數。
該類型使用的任務隊列是SynchronousQueue這種同步隊列,這是一種特别的隊列,每個線程都是有使命的,每個線程都會等待另外一個線程和自己交易,在交易完成之前都會阻塞住線程,他們之間有一種傳遞關系,資料是從一個線程直接傳遞到例外一個線程中去的,SynchronousQueue這種隊列不存儲實際的資料,而是存儲着一些線程的資訊,而SynchronousQueue管理着這些線程之間的交易,更為詳細的細節參考後面的文章。
上面提到,ScheduleThreadPoolExecutor是繼承自ThreadPoolExecutor的,而且從類圖中也可以看出來這種關系,是以其實ScheduleThreadPoolExecutor是對ThreadPoolExecutor的增強,它增加了schedule功能,使用與那些需要周期性排程執行,或者是延時執行的任務,在ScheduleThreadPoolExecutor中使用了一種阻塞隊列稱為延時阻塞隊列,這種隊列有能力持有一段時間資料,我們可以設定這種時間,時間沒到的時候嘗試擷取資料的線程會被阻塞,直到設定的時間到了,線程才會被喚醒來消費資料。而關于ScheduleThreadPoolExecutor是如何運作的,包括他的周期性任務排程是如何工作的,可以參考上面提到的連結。
Future
Future代表一種未來某個時刻會發生的事情,在并發環境下使用Future是非常重要的,使用Future的前提是我們可以容許線程執行一段時間來完成這個任務,但是需要在我們送出了任務的時候就傳回一個Future,這樣在接下來的時間程式員可以根據實際情況來取消任務或者擷取任務,在多個任務沒有互相依賴關系的時候,使用Future可以實作多線程的并發執行,多個線程可以執行在不同的處理器上,然後在某個時間點來統一擷取結果就可以了。
上文中已經提到了FutureTask,FutureTask既是一種Runnable,也是一種Future,并且結合了兩種類型的特性。下面展示了Future提供的一些方法,使用這些方法可以很友善的進行任務控制:
public interface Future {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
在java 8中增加了一個新的類CompletableFuture,這是對Future的極大增強,CompletableFuture提供了非常豐富的操作可以來控制我們的任務,并且可以根據多種規則來關聯多個Future。
Fork/Join架構
Fork/Join架構是一種并行架構,它可以将一個較大的任務切分成一些小任務來執行,并且多個線程之間會互相配合,每個線程都會有一個任務隊列,對于某些線程來說它們可能很快完成了自己的任務隊列中的任務,但是其他的線程還沒有完成,那麼這些線程就會去竊取那些還沒有完成任務執行的線程的任務來執行,這成為“工作竊取”算法,關于Fork/Join中的工作竊取,其實作還是較為複雜的,下面展示了Fork/Join架構的工作模式:
Fork/Join工作模式
可以從上面的圖中看出,一個較大的任務會被切分為一個小任務,并且小任務還會繼續切分,直到符合我們設定的執行門檻值,然後就會執行,執行完成之後會進行join,也就是将小任務的結果組合起來,組裝出我們送出的整個任務的結果,這是一種非常先進的工作模式,非常有借鑒意義。當然,使用Fork/Join架構的前提是我們的任務時可以拆分成小任務來執行的,并且小人物的結果可以組裝出整個大任務的結果,歸并排序是一種可以借助Fork/Join架構來提供處理速度的算法,下面展示了使用Fork/Join架構來執行歸并排序的代碼,可以試着調整參數來進行性能測試:
import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction;
/* \ Created by hujian06 on 2017/10/23. * * merge sort by fork/join */ public class ForkJoinMergeSortDemo {undefined
public static void main(String ... args) { new Worker().runWork(); }
class Worker {undefined
private static final boolean isDebug = false;
public void runWork() {undefined
int[] array = mockArray(200000000, 1000000); // mock the data
forkJoinCase(array); normalCase(array);
private void printArray(int[] arr) {undefined
if (isDebug == false) { return; }
for (int i = 0; i < arr.length; i ++) { System.out.print(arr[i] + " "); }
System.out.println(); }
private void forkJoinCase(int[] array) { ForkJoinPool pool = new ForkJoinPool();
MergeSortTask mergeSortTask = new MergeSortTask(array, 0, array.length - 1);
long start = System.currentTimeMillis();
pool.invoke(mergeSortTask);
long end = System.currentTimeMillis();
printArray(array);
System.out.println("[for/join mode]Total cost: " + (end - start) / 1000.0 + " s, for " + array.length + " items' sort work."); }
private void normalCase(int[] array) {undefined
new MergeSortWorker().sort(array, 0, array.length - 1);
System.out.println("[normal mode]Total cost: " + (end - start) / 1000.0 + " s, for " + array.length + " items' sort work."); }
private static final int[] mockArray(int length, int up) { if (length <= 0) { return null; }
int[] array = new int[length];
Random random = new Random(47);
for (int i = 0; i < length; i ++) { array[i] = random.nextInt(up); }
return array; } }
class MergeSortTask extends RecursiveAction {undefined
private static final int threshold = 100000; private final MergeSortWorker mergeSortWorker = new MergeSortWorker();
private int[] data;
private int left; private int right;
public MergeSortTask(int[] array, int l, int r) { this.data = array; this.left = l; this.right = r; }
@Override protected void compute() { if (right - left < threshold) { mergeSortWorker.sort(data, left, right); } else { int mid = left + (right - left) / 2; MergeSortTask l = new MergeSortTask(data, left, mid); MergeSortTask r = new MergeSortTask(data, mid + 1, right);
invokeAll(l, r);
mergeSortWorker.merge(data, left, mid, right); } } }
class MergeSortWorker {undefined
// Merges two subarrays of arr[]. // First subarray is arr[l..m] // Second subarray is arr[m+1..r] void merge(int arr[], int l, int m, int r) { // Find sizes of two subarrays to be merged int n1 = m - l + 1; int n2 = r - m;
/* Create temp arrays */ int L[] = new int[n1]; int R[] = new int[n2];
/*Copy data to temp arrays*/ for (int i = 0; i < n1; ++i) L[i] = arr[l + i]; for (int j = 0; j < n2; ++j) R[j] = arr[m + 1 + j];
/* Merge the temp arrays */
// Initial indexes of first and second subarrays int i = 0, j = 0;
// Initial index of merged subarry array int k = l; while (i < n1 && j < n2) { if (L[i] <= R[j]) { arr[k ++] = L[i ++]; } else { arr[k ++] = R[j ++]; } }
/* Copy remaining elements of L[] if any */ while (i < n1) { arr[k ++] = L[i ++]; }
/* Copy remaining elements of R[] if any */ while (j < n2) { arr[k ++] = R[j ++]; } }
// Main function that sorts arr[l..r] using // merge() void sort(int arr[], int l, int r) { if (l < r) { // Find the middle point int m = l + (r - l) / 2;
// Sort first and second halves sort(arr, l, m); sort(arr, m + 1, r);
// Merge the sorted halves merge(arr, l, m, r); } } }
在jdk中,使用Fork/Join架構的一個典型案例是Streams API,Streams API試圖簡化我們的并發程式設計,可以使用很簡單的流式API來處理我們的資料流,在我們無感覺的狀态下,其實Streams的實作上借助了Fork/Join架構來實作了并發計算,是以強烈建議使用Streams API來處理我們的流式資料,這樣可以充分的利用機器的多核心資源,來提高資料處理的速度。鑒于Fork/Join架構的先進思想,了解并且學會使用Fork/Join架構來處理我們的實際問題是非常有必要的。
Java volatile關鍵字
volatile解決的問題是多個線程的記憶體可見性問題,在并發環境下,每個線程都會有自己的工作空間,每個線程隻能通路各自的工作空間,而一些共享變量會被加載到每個線程的工作空間中,是以這裡面就有一個問題,記憶體中的資料什麼時候被加載到線程的工作緩存中,而線程工作空間中的内容什麼時候會回寫到記憶體中去。這兩個步驟處理不當就會造成記憶體可加性問題,也就是資料的不一緻,比如某個共享變量被線程A修改了,但是沒有回寫到記憶體中去,而線程B在加載了記憶體中的資料之後讀取到的共享變量是髒資料,正确的做法應該是線程A的修改應該對線程B是可見的,更為通用一些,就是在并發環境下共享變量對多個線程是一緻的。volatile關鍵字解析~進階java必問。
對于記憶體可見性的一點補充是,之是以會造成多個線程看到的共享變量的值不一樣,是因為線程在占用CPU時間的時候,cpu為了提高處理速度不會直接和記憶體互動,而是會先将記憶體中的共享内容讀取到内部緩存中(L1,L2),然後cpu在處理的過程中就隻會和内部緩存互動,在多核心的機器中這樣的處理方式就會造成記憶體可見性問題。
volatile可以解決并發環境下的記憶體可見性問題,隻需要在共享變量前面加上volatile關鍵字就可以解決,但是需要說明的是,volatile僅僅是解決記憶體可見性問題,對于像i++這樣的問題還是需要使用其他的方式來保證線程安全。使用volatile解決記憶體可見性問題的原理是,如果對被volatile修飾的共享變量執行寫操作的話,JVM就會向cpu發送一條Lock字首的指令,cpu将會這個變量所在的緩存行(緩存中可以配置設定的最小緩存機關)寫回到記憶體中去。但是在多處理器的情況下,将某個cpu上的緩存行寫回到系統記憶體之後,其他cpu上該變量的緩存還是舊的,這樣再進行後面的操作的時候就會出現問題,是以為了使得所有線程看到的内容都是一緻的,就需要實作緩存一緻性協定,cpu将會通過監控總線上傳遞過來的資料來判斷自己的緩存是否過期,如果過期,就需要使得緩存失效,如果cpu再來通路該緩存的時候,就會發現緩存失效了,這時候就會重新從記憶體加載緩存。
總結一下,volatile的實作原則有兩條:
1、JVM的Lock字首的指令将使得cpu緩存寫回到系統記憶體中去 2、為了保證緩存一緻性原則,在多cpu的情景下,一個cpu的緩存回寫記憶體會導緻其他的cpu上的緩存都失效,再次通路會重新從系統記憶體加載新的緩存内容。
原子操作CAS
原子操作表達的意思是要麼一個操作成功,要麼失敗,中間過程不會被其他的線程中斷,這一點對于并發程式設計來說非常重要,在java中使用了大量的CAS來做并發程式設計,包括jdk的ConcurrentHsahMap的實作,還有AtomicXXX的實作等其他一些并發工具的實作都使用了CAS這種技術,CAS包括兩部分,也就是Compare and swap,首先是比較,然後再互動,這樣做的原因是,在并發環境下,可能不止一個線程想要來改變某個共享變量的值,那麼在進行操作之前使用一個比較,而這個比較的值是目前線程認為(知道)該共享變量最新的值,但是可能其他線程已經改變了這個值,那麼此時CAS操作就會失敗,隻有在共享變量的值等于線程提供的用于比較的值的時候才會進行原子改變操作。
java中有一個類是專門用于提供CAS操作支援的,那就是Unsafe類,但是我們不能直接使用Unsafe類,因為Unsafe類提供的一些底層的操作,需要非常專業的人才能使用好,并且Unsafe類可能會造成一些安全問題,是以不建議直接使用Unsafe類,但是如果想使用Unsafe類的話還是有方法的,那就是通過反射來擷取Unsafe執行個體,類似于下面的代碼:
class UnsafeHolder {undefined
private static Unsafe U = null;
public static Unsafe getUnsafe() { if (U == null) { synchronized (UnsafeHolder.class) { if (U == null) {undefined
List exception = null; try { Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
try { U = (Unsafe) field.get(null); } catch (IllegalAccessException e) {undefined
exception.add(e); } } catch (NoSuchFieldException e) {undefined
exception.add(e); } finally {undefined
if (exception != null) { reportException(exception); }
} } } }
return U; }
/* \ handler the exception in this method . * @param e The exception */ private static void reportException(List e) { e.forEach(System.out::println); }
如果想要了解Unsafe類到底提供了哪些較為底層的操作,可以直接參考Unsafe的源碼。CAS操作解決了原子操作問題,隻要進行操作,CAS就會保證操作會成功,不會被中斷,這是一種非常好非常強大的特性,下面就java 8中的ConcurrentHashMap的size實作來談談CAS操作在并發環境下的使用案例。
在java 7中,ConcurrentHashMap的實作是基于分段鎖協定的實作,本質上還是使用了鎖,隻是基于一種考慮,就是多個線程通路哈希桶具有随機性,基于這種考慮來将資料存儲在不同的哈希段上面,然後每一個段配有一把鎖,在需要寫某個段的時候需要加鎖,而在這個時候,其他通路其他段的線程是不需要阻塞的,但是對于該段的線程通路就需要等待,直到這個加鎖的線程釋放了鎖,其他線程才能進行通路。在java 8中,ConcurrentHashMap的實作抛棄了這種複雜的架構設計,但是繼承了這種分散線程競争壓力的思想,其實就提高系統的并發度這一次元來說,分散競争壓力是一種最為直接明了的解決方案,而java 8在實作ConcurrentHashMap的時候大量使用了CAS操作,減少了使用鎖的頻度來提高系統的響應度,其實使用鎖和使用CAS來做并發在複雜度上不是一個數量級的,使用鎖在很大程度上假設了多個線程的排斥性,并且使用鎖會将線程阻塞等待,也就是說使用鎖來做線程同步的時候,線程的狀态是會改變的,但是使用CAS是不會改變線程的狀态的(不太嚴謹的說),是以使用CAS比起使用synchronized或者使用Lcok來說更為輕量級。Java Map集合面試題彙總。
現在就ConcurrentHashMap的size方法來分析一下如何将線程競争的壓力分散出去。在java 7的實作上,在調用size方法之後,ConcurrentHashMap會進行兩次對哈希桶中的記錄累加的操作,這兩次累加的操作是不加鎖的,然後判斷兩次結果是否一緻,如果一緻就說明目前的系統是讀多寫少的場景,并且可能目前沒有線程競争,是以直接傳回就可以,這就避免了使用鎖,但是如果兩次累加結果不一緻,那就說明此時可能寫的線程較多,或者線程競争較為嚴重,那麼此時ConcurrentHashMap就會進行一個重量級的操作,對所有段進行加鎖,然後對每一個段進行記錄計數,然後求得最終的結果傳回。在最有情況下,size方法需要做兩次累加計數,最壞情況需要三次,并且會涉及全局加鎖這種重量級的加鎖操作,性能肯定是不高的。而在java 8的實作上,ConcurrentHashMap的size方法實際上是與ConcurrentHashMap是解耦的,size方法更像是接入了一個額外的并發計數系統,在進行size方法調用的時候是不會影響資料的存取的,這其實是一種非常先進的思想,就是一個系統子產品化,然後子產品可以進行更新,系統解耦,比如java 8中接入了并發計數元件Striped64來作為size方法的支撐,可能未來出現了比Striped64更為高效的算法來計數,那麼隻需要将Striped64子產品換成新的子產品就可以了,對原來的核心操作是不影響的,這種子產品化系統設定的思想應該在我們的項目中具體實踐。
上面說到java 8在進行size方法的設計上引入了Striped64這種并發計數元件,這種元件的計數思想其實也是分散競争,Striped64的實作上使用了volatile和CAS,在Striped64的實作中是看不到鎖的使用的,但是Striped64确實是一種高效的适用于并發環境下的計數元件,它會基于請求計數的線程,Striped64的計數會根據兩部分的内容來得到最後的結果,類似于java 7中ConcurrentHashMap的size方法的實作,在Striped64的實作上也是借鑒了這種思想的,Striped64會首先嘗試将某個線程的計數請求累加到一個base共享變量上,如果成功了,那麼說明目前的競争不是很激烈,也就沒必要後面的操作了,但是很多情況下,并發環境下的線程競争是很激烈的,是以嘗試累加到base上的計數請求很大機率是會失敗的,那麼Striped64會維護一個Cell數組,每個Cell是一個計數元件,Striped64會為每個請求計數的線程計算一個哈希值,然後哈希到Cell數組中的某個位置上,然後這個線程的計數就會累加到該Cell上面去。
并發同步架構AQS
AQS是java中實作Lock的基礎,也是實作線程同步的基礎,AQS提供了鎖的語義,并且支援獨占模式和共享模式,對應于悲觀鎖和樂觀鎖,獨占模式的含義是說同一時刻隻能有一個線程擷取鎖,而其他試圖擷取鎖的線程都需要阻塞等待,而共享鎖的含義是說可以有多個線程獲得鎖,兩種模式在不同的場景下使用。
而鎖在并發程式設計中的地位不言而喻,多個線程的同步很多時候是需要鎖來做同步的,比如對于某些資源,我們希望可以有多個線程獲得鎖來讀取,但是隻允許有一個線程獲得鎖來執行寫操作,這種鎖稱為讀寫鎖,它的實作上結合了AQS的共享模式和獨占模式,共享模式對應于可以使得多個線程獲得鎖來進行讀操作,獨占模式對應于隻允許有一個線程獲得鎖來進行寫操作。該文章詳細講述了多個Lock接口的實作類,以及他們是如何借助AQS來實作的具體細節。
某些時候,我們需要定制我們自己的線程同步政策,個性化的線程同步借助AQS可以很容易的實作,比如我們的需求是允許限定個數的線程獲得鎖來進行一些操作,想要實作這樣的語義,隻需要實作一個類,繼承AQS,然後重寫方法下面兩個方法:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
還需要提到的一點是,鎖分為公平鎖和非公平鎖,java中大多數時候會使用隊列來實作公平鎖,而使用棧來實作非公平鎖,當然這是基于隊列和棧這兩種資料結構的特點來實作的,直覺的來說,使用隊列的FIFO的特性就可以實作類似排隊的效果,也就保證了公平性,而棧是一個後進先出的資料結構,它的這種結構造成的結果就是,最新進入的線程可能比那些等待過一段時間的線程更早的獲得鎖,更為具體的内容可以參考上面的文章進行了解。
synchronized(同步鎖)
相對于volatile,synchronized就顯得比較重量級了。
首先,我們應該知道,在java中,所有的對象都可以作為鎖。可以分為下面三種情況:
1、普通方法同步,鎖是目前對象 2、靜态方法同步,鎖是目前類的Class對象 3、普通塊同步,鎖是synchronize裡面配置的對象
當一個線程試圖通路同步代碼時,必須要先獲得鎖,退出或者抛出異常時必須要釋放鎖。
JVM基于進入和退出Monitor對象來實作方法同步和代碼塊同步,可以使用monitorenter和monitorexit指令實作。monitorenter指令是在編譯後插入到同步代碼塊的開始位置,而monitorexit指令則插入到方法結束和異常處,JVM保證每個monitorenter都有一個monitorexit門檻值相對應。線程執行到monitorenter的時候,會嘗試獲得對象所對應的monitor的鎖,然後才能獲得通路權限,synchronize使用的鎖儲存在Java對象頭中。
并發隊列(阻塞隊列,同步隊列)
并發隊列,也就是可以在并發環境下使用的隊列,為什麼一般的隊列不能再并發環境下使用呢?因為在并發環境下,可能會有多個線程同時來通路一個隊列,這個時候因為上下文切換的原因可能會造成資料不一緻的情況,并發隊列解決了這個問題,并且java中的并發隊列的使用時非常廣泛的,比如在java的線程池的實作上使用了多種不同特性的阻塞隊列來做任務隊列,對于阻塞隊列來說,它要解決的首要的兩個問題是:
1. 多線程環境支援,多個線程可以安全的通路隊列 2. 支援生産和消費等待,多個線程之間互相配合,當隊列為空的時候,消費線程會阻塞等待隊列不為空;當隊列滿了的時候,生産線程就會阻塞直到隊列不滿。
Java中提供了豐富的并發隊列實作,下面展示了這些并發隊列的概覽:
java并發隊列概覽
根據上面的圖可以将java中實作的并發隊列分為幾類:
1. 一般的阻塞隊列 2. 支援雙端存取的并發隊列 3. 支援延時擷取資料的延時阻塞隊列 4. 支援優先級的阻塞隊列
這些隊列的差別就在于從隊列中存取資料時的具體表現,比如對于延時隊列來說,擷取資料的線程可能被阻塞等待一段時間,也可能立刻傳回,對于優先級阻塞隊列,擷取的資料是根據一定的優先級取到的。下面展示了一些隊列操作的具體表現:
Throws Exception 類型的插入和取出在不能立即被執行的時候就會抛出異常。
Special Value 類型的插入和取出在不能被立即執行的情況下會傳回一個特殊的值(true 或者 false)
Blocked 類型的插入和取出操作在不能被立即執行的時候會阻塞線程直到可以操作的時候會被其他線程喚醒
Timed out 類型的插入和取出操作在不能立即執行的時候會被阻塞一定的時候,如果在指定的時間内沒有被執行,那麼會傳回一個特殊值
總結
本文總結了Java并發程式設計中的若幹核心技術,并且對每一個核心技術都做了一些分析,并給出了參考連結,可以在參考連結中查找到更為具體深入的分析總結内容。
Java并發程式設計需要解決一些問題,比如線程間同步問題,如何保證資料可見性問題,以及如何高效的協調多個線程工作等内容,本文在這些次元上都有所設計。
本文作為對閱讀java.util.Concurrent包的源碼閱讀的一個總結,同時本文也作為一個起點,一個開始更高層次分析總結的起點,之前的分析都是基于JDK源碼來進行的,并且某些細節的内容還沒有完全搞明白,其實在閱讀了一些源碼之後就會發現。
如果想要深入分析某個方面的内容,就需要一些底層的知識,否則很難完整的分析總結出來,但是這種不徹底的分析又是很有必要的,至少可以對這些内容有一些大概的了解,并且知道自己的不足,以及未來需要了解的底層内容。
對于Java并發包的分析研究,深入到底層就是對JVM如何管理内容,如何管理線程的分析,在深入下去,就是作業系統對記憶體的管理,對線程的管理等内容,從作業系統再深入下去,就是去了解CPU的指令系統,學習磁盤知識等内容。
當然,知識的關聯是無止境的,學習也是無止境的,目前來說,首要解決的問題是可以熟練的使用Java提供的并發包内容來進行并發程式設計,在業務上提高并發處理能力,在出現問題的時候可以很快找到問題并且解決問題,在達到這個要求之後,可以去了解一些JVM層次的内容,比如JVM的記憶體模型,以及線程的實作,并且可以與學習作業系統的相關内容并行進行。