此文被筆者收錄在系列文章 架構師必備(系列) 中,在實踐中,委托是建立線程安全類最有效的政策之一:用已有的線程安全類來管理所有狀态即可。
一、同步容器-串行
同步容器包括Vector和Hashtable、以及Collections.synchronziedXXX工廠方法建立的容器類,這些類通過封裝它們的狀态,并對每一個公共方法進行同步而實作了線程安全。這種封裝對于單線程程式是線程安全的,對于複合操作,隻能采用用戶端鎖的方式來解決了。
public class UnsafeVectorHelpers {
public static Object getLast(Vector list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public static void deleteLast(Vector list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
public class SafeVectorHelpers {
public static Object getLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
//用戶端加鎖的實作方式
public static void deleteLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
}
//這種調用方式是最可靠的,但是效率也是最低的
synchronized(list){
for (int i=0; i<100;i++){
list.get(i);
}
}
疊代器和ConcurrentModificationException
現在的容器類也都沒有消除複合操作産生的問題。在疊代時都需要在疊代期間對容器加鎖。有兩種方式可以解決:
- 及時失敗,就是當它們察覺容器在疊代開始後修改,會抛出一個CME異常;被設計用來善意使用;設計思路是将狀态變化比如計數器與容器關聯起來;
- 複制容器,但有時會很影響性能,主要取決于容器的大小,每一個元素的工作量等等。用這種方式可以避免抛出此異常。
隐藏疊代器
當容器本身做為一個元素時,我們把關鍵的方法設定了同步,但有些比如hashCode和equals這樣的方法都會間接地調用疊代。可能都會引起ConcurrentModificationException如果把這些元素設定成synchronizedXXX的方式就不會出現這樣的問題。比如把下面的HashSet換成synchronizedSet。
public class HiddenIterator {
@GuardedBy("this")
private final Set<Integer> set = new HashSet<Integer>();
public synchronized void add(Integer i) {
set.add(i);
}
public synchronized void remove(Integer i) {
set.remove(i);
}
public void addTenThings() {
Random r = new Random();
for (int i = 0; i < 10; i++)
add(r.nextInt());
System.out.println("DEBUG: added ten elements to " + set);
}
}
二、并發容器-并行
同步容器通過對容器的所有狀态進行串行通路進而實作了它們的線程安全。這樣的代碼是削弱了并發性,當多個線程共同競争容器級的鎖時,吞吐量就會降低。同步容器類在每個操作的執行期間都持有一個鎖。
用并發容器替換同步容器,java5添加了兩個新的容器:Queue和BlockingQueue,LinkedList就實作了Queue(有序隊列),BlockingQueue擴充了Queue增加了可阻塞的get和set操作。
ConcurrentHashMap
ConcurrentHashMap擴充了Map接口,和HashMap一樣是一個哈希表,但使用完全不同的鎖政策。可以提供更好的并發性和可伸縮性。它使用一個更加細化的鎖,叫分離鎖。這種鎖機制允許更深層次的共享通路、任意數量的讀線程可以并發通路Map、有限數量的寫線程可以并發修改Map。也正因不提供獨占鎖的方式,是以不能使用用戶端加鎖來建立新的原子操作。
ConcurrentHashMap與其他的并發容器一起,進一步改進了同步容器類,提供不會抛出ConcurrentModificationException的疊代器,是以不需要在容器疊代中加鎖,它傳回的疊代器具有弱一緻改,而非“及時失敗”的。
這類的容器隻是保證了重要的功能如get、put、containsKey、remove等,像size和isEmpty這樣的功能被弱化了,隻會提供一個估算的值,因為并發的重要意義在于運動。
如果程式需要同步時優先考慮使用ConcurrentHashMap,隻有程式在通路時需要獨占鎖時才會使用Hashtable和SynchronizedMap。
CopyOnWriteArrayList
CopyOnWriteArrayList是同步List的一個并發替代品,通常情況下它提供了更好的并發性,并避免了在疊代期間對容器加鎖和複制。
寫入時複制容器的線程安全性來源于這樣一個事實:隻要有效的不可變對象被正确釋出,那麼通路它将不再需要更多的同步,在每次需要修改時,它們會建立并重新釋出一個新的容器拷貝,以此來實作可變性。
當對容器疊代操作的頻率遠遠高于對容器修改的頻率時,使用CopyOnWriteArrayList是個合理的選擇,比如事件listener。
三、阻塞隊列和生産者-消費者模式
阻塞隊列(Blocking queue)提供了可阻塞的put和take方法,以及定時的offer和poll方法。如果Queue滿了,put方法會被阻塞真到有空間可用,同時提供了offer方法來傳回失敗狀态,如果Queue是空的,那麼take會被阻塞,直到有元素可用,定義上分為有界和無界隊列。通用的作法是限制隊列的長度,控制生産和消費者的速率,超負荷的工作條目序列化後寫入磁盤或用其它方法遏制生産者線程。
生産者--消費者設計是圍繞阻塞隊列展開的,它實作了生産和消費代碼的解耦,隻通過一個隊列來進行溝通。如果阻塞隊列不能滿足你的設計需要也可以用信号量(Semaphore)建立其他的阻塞資料結構。
1、類庫中有一些阻塞隊列的實作,其中LinkedBlockingQueue和ArrayBlockQueue是FIFO隊列,與LinkedList和ArrayList類似,但是有比同步List更好的并發性能。
2、PriorityBlockingQueue是一個按優先級順序排序的隊列,它本身可以自然排序(如果元素本身實作了Comparable),也可以定義一個Comparator排序。
3、SynchronousQueue,它其實不是一個真正的隊列,因為它不會為隊列元素維護存儲空間,但它維護一個排列的線程清單,這些線程等待把元素加入或移入隊列,就好比生産者--消費者模式中,洗盤子的直接把盤子放進烘幹機,不需要盤子架一樣。當移交被接受,它就知道消費者已得到了任務。因為 SynchronousQueue沒有存儲的能力,是以除非另一個線程已經準備好參與移交工作,否則put和take會一直阻塞, SynchronousQueue這類隊列隻在消費者充足的時候比較合适。
桌面搜尋
比如掃描本地驅動器并歸檔檔案,為之後的搜尋建立索引的代理。類似于Google Desktop或windows索引服務。這有利于把一個大活動分為很多小的活動再進行組合,每個活動隻有一個單一的任務。由阻塞隊列掌控所有的控制流。很多生産--消費者設計都可以通過使用Executor任務執行架構來實作,它自身就應用了生産--消費者模式。
public class ProducerConsumer {
static class FileCrawler implements Runnable {
private final BlockingQueue<File> fileQueue;
private final File root;
public FileCrawler(BlockingQueue<File> fileQueue,
File root) {
this.fileQueue = fileQueue;
this.root = root;
}
public void run() {
crawl(root);
}
private void crawl(File root) throws InterruptedException {
fileQueue.put(entry);
}
}
static class Indexer implements Runnable {
private final BlockingQueue<File> queue;
public Indexer(BlockingQueue<File> queue) {
this.queue = queue;
}
public void run() {
while (true)
indexFile(queue.take());
}
public void indexFile(File file) {
// Index the file...
};
}
private static final int BOUND = 10; //隊列的長度,不緻于占用太多的内容
private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); //CPU個數,保證CPU忙
public static void startIndexing(File[] roots) {
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
for (File root : roots) //多線程啟動,共享queue,且每次都不會重複,通過共享的queue來完成了所有權的互動。
new Thread(new FileCrawler(queue, filter, root)).start();
for (int i = 0; i < N_CONSUMERS; i++)
new Thread(new Indexer(queue)).start();
}
}
連續的線程限制
在java.util.concurrent中實作的阻塞隊列,全部都包含充分的内部同步,進而能安全地将對象從生産者線程釋出至消費者線程。
對象池拓展了連續的線程限制,把對象“借給”一個請求線程,隻要對象池中含有充足的内部同步,使對象池能安全地釋出,并且用戶端本身不會釋出對象池或者在傳回對象池後不再繼續使用,所有權可以線上程間安全地傳遞。這種多個線程協作控制的方式稱為連續的線程限制。
雙端隊列和竊取工作
Deque和BlockingDeque分别擴充了Queue和BlockQueue,允許高效是在頭和尾分别進行插入和移除,具體實作ArrayDeque和LinkedBlockingDeque。
雙端阻塞隊列與一種叫竊取工作的模式相關聯,一個消費者設計中,所有的消費者隻共享一個工作隊列,在竊取工作的設計中,每一個消費者都有一個自己的雙端隊列,如果一個消費者完成了自己雙端隊列中的全部工作,它可以偷取其它消費者的雙端隊列中末尾的任務,這樣可以防止競争一個共享的任務隊列,也有了更佳的伸縮性。
竊取工作恰好适合用于解決消費者與生産者同體的問題--當運作到一個任務的某單元時,可能會識别出更多的任務。通過把任務做上記号後,把新任務放在隊列的末尾,這樣就可以保證所有的線程都保持忙碌狀态。
四、阻塞和可中斷的方法
線程可能會因為幾種原因阻塞或暫停:等待I/O操作結束、等待另一個鎖、等待從Thread.sleep喚醒、等待另一個線程的計算結果。當一個線程阻塞挂起時,通常設定阻塞狀态blocked、waiting、timed_waiting中的一個。這個等待通常是不受他自己控制的,由外部事件來決定,當外部事件發生後,線程被置回runable狀态,重新獲得被排程的機會。
BlockingQueue的put和take方法都會抛出一個受檢查的InterruptedException。當一個方法能抛出這個異常的時候,說明這個方法是一個阻塞方法。
Thread提供了interrupt方法,用來中斷一個線程,每個線程都有一個boolean類型的屬性,代表線程的中斷狀态,中斷線程時需要設定這個值。中斷是一種協作機制。一個線程不能迫使其他線程停止正在做的事情,中斷可以取消比較耗時的操作。當你的代碼調用一個可以抛出InterruptedException的方法時。你自己的方法也就成為了一個阻塞方法,要為響應中斷做好準備,在類庫代碼中有兩種基本選擇:
- 傳遞InterruptedException:如果能避開異常的話,這是明智的選擇,如果不能隻需要把InterruptedException傳遞給你的調用者,然後對特定活動進行簡單清理後再抛出。
- 恢複中斷:有時你不能抛出InterruptedException,這時你可能必須捕獲InterruptedException,并在目前線程中調用interrupt方法從中斷中恢複,這樣調用棧中更高層的代碼要以發現中斷已經發生。
不允許在捕獲InterruptedException後,不做任務響應。除非你擴充了Thread,并是以控制了所有處于調用堆棧上層的代碼,可以掩蓋InterruptedException。見下例:
public class TaskRunnable implements Runnable {
BlockingQueue<Task> queue;
//恢複中斷的例子
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// restore interrupted status
Thread.currentThread().interrupt();
}
}
}
五、同步工具類
阻塞隊列在容器類中是特有的,它不僅可做為容器還可以協調生産者和消費者線程之間的控制流。Synchronizer是一個對象。它根據本身的狀态調節線程的控制流,阻塞隊列可以扮演一個Synchronizer角色,其它的還有信号量semaphore、關卡barrier、以及閉鎖latch。還可以自己創造一個。他們都有類似的結構:封裝狀态,這些狀态決定着線程執行到了某一點時是通過還是被迫等待,它們還提供操控狀态的方法,以及高效地等待Synchronizer進入到期望狀态的方法。
latch閉鎖
它可以延遲線程的進度直到線程到達終止狀态。閉鎖工作起來像一個大門,直到閉鎖達到終點狀态之前,門一直關閉,沒有線程能通過,在終點狀态到來的時候,門開了,允許所有線程通過,一旦閉鎖到達了終點狀态,就不能再改變狀态了永遠處于敞開狀态。閉鎖可以用來確定特定活動直到其他的活動完成後才發生:
- 確定一個服務不會開始,直到它依賴的其他服務都已經開始。。
- 等待,直到活動的所有部分都為繼續處理作好充分準備,比如多玩家的遊戲。
CountDownLatch是一個靈活的閉鎖實作,閉鎖的狀态包括一個計數器,初始化為一個正數,用來表示需要等待的事件數。countDown方法對計數器做減操作,表示一個事件發生了。而await方法等待計數器達到零,此時所有需要等待的事件都已發生,如果為非零,await會一直阻塞直到計數器為零,或等待線程中斷以及逾時。
這種方法可以測試線程N倍并發的情況下執行一個任務的時間。開始閥門在最後用countDown()方法減1,導緻startGate.awarit()終止中斷狀态。所有線程開始執行。最後在endGate.countDown()減1.到0時endGate.await();終止中斷狀态計算響應時間。
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();//等待開始閥門打開,計數器達到0
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
}
FutureTask
FutureTask的計算是通過Callable實作的,它等價于一個可攜帶結果的Runnable。并有3個狀态:等待、運作、完成(完成包括正常結束、取消、異常)。一旦 FutureTask進入完成狀态,它會永遠停止在這個狀态上。
Future.get的行為依賴于任務的狀态,如果它已經完成,get可以立刻得到傳回結果,否則會被阻塞直到任務轉入完成狀态,然後會傳回結果或抛出異常。 FutureTask把計算的結果從運作計算的線程傳送到需要這個結果的線程, FutureTask的規約保證了這種傳遞建立在結果的安全釋出基礎之上。
Excecutor架構利用 FutureTask來完成異步任務,并可以用來進行任何潛在的耗時計算,而且可以在真正需要計算結果之前就啟動它們開始計算。
public class Preloader {
private final FutureTask<String> future =
new FutureTask<String>(new Callable<String>() {
public String call() throws Exception {
return "test";
}
});
private final Thread thread = new Thread(future);
public void start() {
thread.start();
}
public String get() throws Exception {
try {
return future.get();//如果future執行完成,則直接傳回,否則此方法阻塞
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) throws Exception {
Preloader p = new Preloader();
p.start();
p.get();
}
}
Semaphore計數信号量
用來控制能夠同時通路某特定資源活動的資料、同時執行某一給定操作的數量、實作資源池或給一個容器限定邊界。
/*使用Semaphore為容器設定邊界*/
public class BoundedHashSet <T> {
private final Set<T> set;
private final Semaphore sem;
//Semaphore管理一組虛拟的許可,通過構造函數來設定許可的數量
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();//這個方法會阻塞直到有許可可用
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded)
sem.release();//使用完成後,釋放許可給Semaphore
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
Barrier栅欄
栅欄類似于閉鎖,不同的閉鎖等待的是事件,關卡等待的是其它線程,閉鎖是一次使用,而關卡是可以重置狀态的。可以用于疊代計算
CyclicBarrier允許一個給定數量的成員多次集中在一個關卡點,這在并行疊代算法中很有用,這個算法會把一個問題拆分成一系列互相獨立的子問題。當線程到達關卡點時,調用await阻塞,直到所有線程都到達關卡點,如果所有線程都到達了關卡點,關卡就被成功地突破,所有的線程都被釋放,關卡會重置以備下一次使用。如果對await的調用逾時,或是阻塞中的線程被中斷,那麼關卡是失敗的,所有對await未完成的調用都通過BrokenBarrierException終止。如果成功通過,為每一個線程傳回一個唯一的到達索引号,可以用于下一次操作。關卡通常用來模拟:一個步驟的計算可以并行完成,但要求必須完成所有與一個步驟相關的工作後才能進入下一步。
Exchanger是關卡的另一種形式,它是一種兩步關卡,在關卡點會交換資料。當兩方進行的活動不對稱時,這非常有用。比如當一個線程向緩沖區寫入資料,這時另一個線程充當消費者使用這個資料。這些線程可以使用Exchanger進行會面,并用完整的緩沖與空緩沖進行交換,當發生交換對象時,交換為雙方的對象建立了一個安全的釋出。
public class CellularAutomata {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;
public CellularAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count,
new Runnable() {
public void run() {
mainBoard.commitNewValues();
}});
this.workers = new Worker[count];
for (int i = 0; i < count; i++)
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}
private class Worker implements Runnable {
private final Board board;
public Worker(Board board) { this.board = board; }
public void run() {
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxX(); x++)
for (int y = 0; y < board.getMaxY(); y++)
board.setNewValue(x, y, computeValue(x, y));
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
private int computeValue(int x, int y) {
// Compute the new value that goes in (x,y)
return 10;
}
}
public void start() {
for (int i = 0; i < workers.length; i++)
new Thread(workers[i]).start();
mainBoard.waitForConvergence();
}
public static void main(String []args)
{
Board bb = (Board) new BoardImpl();
new CellularAutomata(bb).start();
}
為計算結果建立高效、可伸縮的緩存
幾乎每一個伺服器就用程式都使用某種形式的高速緩存,複用已有的計算結果可以縮短等待時間,提高吞吐量,代價是占用更多的記憶體。下面的例子就解決了并發緩存的問題:
//A代表輸入,V代表輸出
public class Memoizer <A, V> implements Computable<A, V> {
//這處之是以這麼寫,是要注意java的引用傳遞,調用者在外面建立一個空的map做為形參傳進來,
//這樣生産-消費者都用共享此map
private final ConcurrentMap<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer(Computable<A, V> c) {
this.c = c;
}
//實作的Computable方法
public V compute(final A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval() {
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval(arg, ft);
if (f == null) {
f = ft;
ft.run(); //c.compute()是在此處運作的,是以if可以去掉
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f); //FutureTask取消時移除
} catch (ExecutionException e) {//FutureTask異常時也移除
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}
}
六、小結
- 所有并發問題都歸結為如何協調通路并發狀态,可變狀态越少,保證線程安全就越容易。
- 盡量将域聲明為final類型,除非需要是可變的。
- 不可變對象天生是線程安全的,它們簡單而安全,可以在沒有鎖或防禦性複制的情況下自由的共享
- 封裝使管理複雜度變的更可行,在對象中封裝資料,讓它們能夠更加容易地保持不變;在對象中封裝同步,使它能夠更容易地遵守同步政策
- 用鎖來守護每一個可變變量
- 對同一不變限制中的所有變量都使用相同的鎖
- 在非同步的多線程情況下,通路可變變量的程式是存在隐患的
- 不要依賴于可以需要同步的小聰明
- 在設計過程中就考慮線程安全,或者在文檔中明确地說明它不是線程安全的
- 文檔化你的同步政策