本文轉載自http://shift-alt-ctrl.iteye.com/blog/1841084
請首先參考:http://shift-alt-ctrl.iteye.com/blog/1839142
一.BlockingDeque阻塞雙端隊列(線程安全):
注意ArrayDeque和LinkedList僅僅擴充了Deque,是非阻塞類型的雙端隊列。
BlockingQueue單向隊列,其内部基于ReentrantLock + Condition來控制同步和"阻塞"/"喚醒"的時機;有如下幾個實作類:
ArrayBlockingQueue: “浮動相對遊标”的數組,來實作有界的阻塞隊列。
DelayQueue:支援“可延遲”的隊列,DelayQueue還隻接受Delayed類型的元素,Delayed接口繼承自Compare接口并提供了一個long getDelay(TimeUnit),來擷取指定時間到now的時間剩餘量。DelayQueue底層就是使用PriorityQueue作為支撐的。
PriorityBlockingQueue:有權重的隊列,此隊列時可以根據指定的comparator進行排序的。
SynchronousQueue://
LinkedBlockingDeque:有界或者無界阻塞隊列
PriorityQueue為非線程安全非阻塞,有權重的隊列,其權重需要根據特定的compartor來産生。
二.ConcurrentMap(接口):支援并發的map,支援多線程環境中安全的通路。
其提供了幾個獨特的方法:
V putIfAbsent(K,V):如果map中不存在此key,則put,否則傳回現有的此key關聯的值。此過程有Lock同步:
//等價于:
if (!map.containsKey(key))
return map.put(key, value);
else
return map.get(key);
Map<String,Object> map = new ConcurrentHashMap<String, Object>();
if(map.containsKey("key")){
map.put("key", new Object());
}
//注意,concurrentHashMap并不保證contains方法和put方法直接保持"原子性",即有可能contains方法傳回false之後,在put之前,可能其他線程已經put成功,即在目前線程put時,此時資料已經不一緻了.建議采用putIfAbsent
boolean remove(Object key,Object value):比較并删除指定的key和value。
boolean replace(K,V oldValue,V newValue):比較并替換。
目前實作ConcurrentMap的類有ConcurrentHashMap,一種基于鎖分段技術實作的并發hashMap,鎖采取了ReentrantLock。
三.ConcurrentLinkedQueue:
基于單向連結清單實作的,線程安全的并發隊列,無界非阻塞隊列,當隊列需要在多線程環境中被使用,可以考慮使用它。記住,這是個非阻塞隊列,不過支援阻塞的隊列,貌似都是線程安全的。
此隊列的size不是時間固定的,它的iterator也會被不斷調整。ConcurrentLinkedQueue并沒有使用Lock,而是采用了CAS的方式,對tail.next進行指派操作。因為tail.next永遠是null,且隊列不接受null的元素。
注意,非并發集合(list,queue,set)的iterator以及forEach循環在并發環境中是不能正常工作的,如果原始集合被外部修改(其他線程的add,remove),将會導緻異常。對于并發集合的iterator,沒有做相關的size校驗。
Lock(鎖)是控制操作(action)的,可以讓一個操作或者一個子操作被串行的處理。。。CAS其實隻是對記憶體資料的變更時使用,如果想知道資料變更在并發環境中是否符合預期,才會使用到CAS。
四.ConcurrentSkipListMap/ConcurrentSkipListSet
兩個基于SkipList(跳躍表)方式實作的、支援并發通路的資料結構。依據跳躍表的思想,可以提高資料通路的效率。其中ConcurrentSkipListSet底層使用ConcurrentSkipListMap支撐。
ConcurrentSkipListMap也是ConcurrentNavigableMap的實作類,對于SkipList,其内部元素,必須是可排序的。
跳躍表是一個很簡單的表,(參見跳躍表概念),對底層的線性存儲結構,加入類似“多級索引”的概念,“索引”的添加時基于随即化。一個跳躍表,整體設計上(設計思路很多)分為表左端head索引,右端tail索引(邊界),底端存儲層(排序的線性連結清單),和一個随機化、散列化的不同高度的多級索引“指針”。head索引是高度最高的索引,它是整個連結清單中值最小的元素鎖産生的索引;右端為邊界索引,索引指向null或者任意設計的邊界值(bound).
跳躍表的底端是一個和普通的連結清單沒啥差別,單向或者雙向的均可,前提是必須是排序的。索引節點,就是一個有向路徑的标,每個索引節點,都分别有right、down指向,對于雙向跳躍表,就具有left、right、up、down四個方向指針;指針就是為了友善尋路。每個新增元素時,首先會導緻底層連結清單的改動;根據自定義的随即算法,來決定此元素的索引高度,如果高度不為0,則依次建立相應層高的索引,并調整各個層高的是以指向。
跳躍表之是以這麼設計,實事上就是在做一件事情:基于簡單的設計思路和算法,來實作較為高效的查詢政策。相對于二分查找有一定的優勢.
五.CopyOnWriteArrayList/CopyOnWriteArraySet:
均是CopyOnWrite思想,在資料修改時(happen-before),對資料進行Copy(),read操作可以在原資料結構上繼續進行,待write操作結束後,調整資料結構指針。基于這種設計思路的資料結構,通常是read操作頻率遠大于write操作,可以在并發環境中,支撐較高的吞吐量;避免了因為同步而帶來的瓶頸,同時也能確定資料安全操作。同時需要注意,copy操作将會帶來多餘的空間消耗。注意,此API時支援并發的,多個線程add操作(即CopyOnWrite)将會被隊列化,内部采取了ReentrantLock機制來控制.
CopyOnWriteArrayList底層基于數組實作,在進行write操作時(add,remove),将會導緻Arrays.copy操作,建立一個新的數組;待write操作成功後,将原數組的指針替換成新數組指針.
CopyOnWriteArraySet底層直接使用CopyOnWriteArrayList作為支撐,隻不過在add操作時會周遊整個數組結構并進行equals比較(確定具有Set的特性),隻有發現此新元素不存在時才會"替換指針".
java中這兩個API,支援并發操作時,仍然可以進行周遊而無需額外的同步;即不會抛出ConcurrentModificationException。事實上,疊代器所持有的數組隻是一個"建立iterator時底層數組的引用",是以在周遊期間,即使CopyOnWriteArrayList已經新增或者删除了某些元素,仍不會發生沖突,因為iterator持有的是舊數組的引用,而CopyOnWriteArrayList持有的是Copy操作時建立的新數組引用..由此可見,iterator也無法反應實時的數組變化(周遊期間,實際數組的添加、删除),但是原始數組中對象内容發生改變還是可以在疊代器中反應出來。CopyOnWrite的周遊器的remove/add/set操作不被支援,這差別于ArrayList.
CopyOnWriteArrayList、CopyOnWriteArraySet,底層基于數組實作,采取ReentrantLock來同步add/remove/clear等操作。并采取了snapshot的簡單手段:
//例如add:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
//數組copy
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
//修改結束後,指針轉換
setArray(newElements);
return true;
} finally {
lock.unlock();
}
六.CountDownLatch:
同步類,用于多個線程協調工作。共享鎖,當鎖計數器較少到0時,将釋放等待的線程。使用場景是,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。當CountDownLatch的鎖計數器為1時,可以作為一種“開關”來使用。計數器無法被重置,如果需要重複計數,可以使用CyclicBarrier。
CountDownLatch内部基于AQS來控制線程通路。這個API很簡單,隻有2個核心方法:
void await():如果計數器不為0,則擷取鎖失敗,加入同步隊列;即線程阻塞。
void countDown():釋放鎖,導緻計數器遞減,如果此時計數器為0,則表示鎖釋放成功,AQS會幫助“發射”因為await阻塞的線程(組)。
public class CountDownLatchTestMain {
/**
* @param args
*/
public static void main(String[] args) throws Exception{
System.out.println("Begin");
CountDownLatch latch = new CountDownLatch(2);
for(int i=0;i<4;i++){
CThread c = new CThread(i,latch);
c.start();
//Thread.sleep(500);
}
Thread.sleep(1000);
System.out.println("End");
static class CThread extends Thread{
CountDownLatch latch;
int count;
CThread(int count,CountDownLatch latch){
this.count = count;
this.latch = latch;
@Override
public void run(){
try{
System.out.println("---"+count);
if(count % 2 == 0){
latch.await();
System.out.println("//" + count + "await--!");
}else{
latch.countDown();
System.out.println("//" + count + "down!");
}
}catch(Exception e){
e.printStackTrace();
七.CyclicBarrier:
循環屏障,它允許一組線程互相等待,直到到達某個公共屏障點;線程(組)數量固定,線程之間需要不時的互相等待,CyclicBarrier和CountDownLatch相比,它可以在釋放等待線程後被再次“重用”,是以稱為循環屏障。它提供了類似“關卡”的功能。對于失敗的同步嘗試,CyclicBarrier 使用了一種要麼全部要麼全不 (all-or-none) 的破壞模式:如果因為中斷、失敗或者逾時等原因,導緻線程過早地離開了屏障點,那麼在該屏障點等待的其他所有線程也将通過 BrokenBarrierException(如果它們幾乎同時被中斷,則用 InterruptedException)以反常的方式離開。
CyclicBarrier(int parties):指定參與者個數
CyclicBarrier(int parties,Runnable barrierAction):指定一個屏障操作,此操作将會有最後一個進入barrier的線程執行。
int await():在所有的線程達到barrier之前,一直等待。此方法可以抛出InterrutedExeception(此線程被中斷),可以抛出BrokenBarrierExeception(當其他參與者在wait期間中斷,導緻屏障完整性被破壞),在方法被await時,如果抛出上述異常,需要做補救的相應操作。此方法傳回目前線程到達屏障時的索引。(第一個到達的,為0,最後一個為getParties() - 1);根據傳回值的不同可以做一些操作,比如最先/最後達到的做一些前置、後置操作等。
boolean isBroken():屏障是否處于損壞狀态。
void reset():重置屏障為其初始狀态;如果此時有線程在await,其線程将會抛出BrokenBarrierExeception。對于reset操作,需要線程的執行方法有相應的配合(比如支援操作輪訓等),否則重置會帶來一些不必要的麻煩。。。如果你需要重置,尚不如重建立一個CyclicBarrier。
底層基于ReentrantLock實作。線程阻塞基于Condition方式(注意Condition底層也是通過AQS架構實作);大概僞代碼:
ReentrantLock lock = new ReentrantLock();
Condition trip = lock.newCondition();
////await方法:
if(count!=0){
trip.await();//AQS:目前線程隊列化,LockSupport.park
count--;
}else{
trip.signalAll();
//////////////////代碼執行個體
public class CyclicBarrierTestMain {
CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("Barrier action!!");
});
for(int i=0;i<5;i++){
CThread c = new CThread(barrier);
CyclicBarrier barrier;
CThread(CyclicBarrier barrier){
this.barrier = barrier;
int count = 0;
while(true){
try{
System.out.print("---" + count);
int index = barrier.await();
System.out.println("+++" + count);
count++;
if(index == barrier.getParties() - 1){
//barrier.reset();
}
}catch(Exception e){
e.printStackTrace();
break;
}
八.Exchanger
Exchanger:同步交換器,2個互相比對(協調的對象),互相交換資料。2個線程需要把相同類型的資料,以互相等待的方式交換。比如線程1将資料A交換給B,此時線程1等待直到線程B将資料交換出去。Exchanger有一個方法,就是exchange(V x):其作用就是等待另一個線程到達交換點,然後将資料傳遞給線程。
如果沒有其他線程到達交換點,處于排程的目的,禁用目前線程,直到某個線程到達或者某個線程中斷。
僞代碼:
void exchange(V item){
//如果有線程已經到達
for(;;){
Node e = get();
if(e != null){
V i = e.getItem();
CAS(e,i,null);//将等待比對者移除
Thread t = e.waiter;
LockSupport.unpark(t);
//
Node ne = new Node(currentThread,ne);
set();//将目前需要交換的資料加入,當其他線程unpart之後,可以get,并擷取資料
return i;//傳回需要交換的資料
}else{
Node e = new Now(currentThread,item);
set(node);
LockSupport.park(currentThread);
//重新回到頂層for循環,并擷取交換資料
如下的例子是基于一個簡單的Productor和Consumer模式,一個線程負責生産資料,當資料滿時,交換給consumer消費;當consumer消費完時,也申請交換。
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Exchanger;
public class ExchangerTestMain {
Exchanger<Queue<Integer>> exchanger = new Exchanger<Queue<Integer>>();
CThread c = new CThread(exchanger);
PThread p = new PThread(exchanger);
c.start();
p.start();
Thread.sleep(2000);
Exchanger<Queue<Integer>> exchanger ;
Queue<Integer> current;
CThread(Exchanger<Queue<Integer>> exchanger){
this.exchanger = exchanger;
if(current == null){
current = new ArrayDeque<Integer>(10);
while(true){
//productor
if(current.size() == 0){
current = exchanger.exchange(current);//交換出去fullList,希望獲得EmptyList
System.out.println("C:" + current.poll());
return;
static class PThread extends Thread{
PThread(Exchanger<Queue<Integer>> exchanger){
Random r = new Random();
//productor
if(current.size() >= 10){
current = exchanger.exchange(current);//交換出去fullList,希望獲得EmptyList
Integer i = r.nextInt(20);
System.out.println("P:" + i);
current.add(i);
九.Semaphore:信号量
我們需要把semaphore真的看成“信号量”,它是可以被“增減”的鎖引用,“0”是判斷信号“過剩”的界限。
我們通常使用semaphore來控制資源通路并發量。它底層使用“共享”模式鎖實作,提供了“公平”“非公平”2中政策。當“信号量”大于0時,允許擷取鎖;否則将阻塞直到信号量恢複。
将信号量初始化為 1,使得它在使用時最多隻有一個可用的許可,進而可用作一個互相排斥的鎖。這通常也稱為二進制信号量,因為它隻能有兩種狀态:一個可用的許可,或零個可用的許可。按此方式使用時,二進制信号量具有某種屬性(與很多 Lock 實作不同),即可以由線程釋放“鎖”,而不是由所有者(因為信号量沒有所有權的概念)。在某些專門的上下文(如死鎖恢複)中這會很有用。
Semaphore(int permits, boolean fair):指定信号量,指定公平政策。
void acquire():擷取一個信号,如果信号量<=0,則阻塞;在非公平模式下,允許闖入。
void acquire(int permits).
上面2個方法都會抛出InterruptException,即在等待線程被“中斷時”,将會抛出異常而傳回。底層基于AQS.acquireSharedInterruptibly()
void acquireUninterruptibly():擷取一個信号,不支援中斷,當線程被中斷時,此線程将繼續等待,當線程确實從此方法傳回後,将設定其中斷狀态。底層基于AQS.acquireShared();
void release():釋放一個信号,直接導緻信号量++。
boolean tryAcquire():擷取一個信号,如果擷取成功,則傳回true。