天天看點

并發資料結構

一.blockingdeque阻塞雙端隊列(線程安全):

注意arraydeque和linkedlist僅僅擴充了deque,是非阻塞類型的雙端隊列。

blockingqueue單向隊列,其内部基于reentrantlock + condition來控制同步和"阻塞"/"喚醒"的時機;有如下幾個實作類:

arrayblockingqueue: “浮動相對遊标”的數組,來實作有界的阻塞隊列。

delayqueue:支援“可延遲”的隊列,delayqueue還隻接受delayed類型的元素,delayed接口繼承自compare接口并提供了一個long getdelay(timeunit),來擷取指定時間到now的時間剩餘量。delayqueue底層就是使用priorityqueue作為支撐的。

priorityblockingqueue:有權重的隊列,此隊列時可以根據指定的comparator進行排序的。

synchronousqueue://

linkedblockingdeque:有界或者無界阻塞隊列

priorityqueue為非線程安全非阻塞,有權重的隊列,其權重需要根據特定的compartor來産生。

二.concurrentmap(接口):支援并發的map,支援多線程環境中安全的通路。

其提供了幾個獨特的方法:

v putifabsent(k,v):如果map中不存在此key,則put,否則傳回現有的此key關聯的值。此過程有lock同步:

并發資料結構

//等價于:  

if (!map.containskey(key))   

      return map.put(key, value);  

  else  

return map.get(key);  

并發資料結構

map<string,object> map = new concurrenthashmap<string, object>();  

if(map.containskey("key")){  

    map.put("key", new object());  

}  

//注意,concurrenthashmap并不保證contains方法和put方法直接保持"原子性",即有可能contains方法傳回false之後,在put之前,可能其他線程已經put成功,即在目前線程put時,此時資料已經不一緻了.建議采用putifabsent  

boolean remove(object key,object value):比較并删除指定的key和value。

boolean replace(k,v oldvalue,v newvalue):比較并替換。

目前實作concurrentmap的類有concurrenthashmap,一種基于鎖分段技術實作的并發hashmap,鎖采取了reentrantlock。

三.concurrentlinkedqueue:

基于單向連結清單實作的,線程安全的并發隊列,無界非阻塞隊列,當隊列需要在多線程環境中被使用,可以考慮使用它。記住,這是個非阻塞隊列,不過支援阻塞的隊列,貌似都是線程安全的。

此隊列的size不是時間固定的,它的iterator也會被不斷調整。concurrentlinkedqueue并沒有使用lock,而是采用了cas的方式,對tail.next進行指派操作。因為tail.next永遠是null,且隊列不接受null的元素。

注意,非并發集合(list,queue,set)的iterator以及foreach循環在并發環境中是不能正常工作的,如果原始集合被外部修改(其他線程的add,remove),将會導緻異常。對于并發集合的iterator,沒有做相關的size校驗。

lock(鎖)是控制操作(action)的,可以讓一個操作或者一個子操作被串行的處理。。。cas其實隻是對記憶體資料的變更時使用,如果想知道資料變更在并發環境中是否符合預期,才會使用到cas。

四.concurrentskiplistmap/concurrentskiplistset

兩個基于skiplist(跳躍表)方式實作的、支援并發通路的資料結構。依據跳躍表的思想,可以提高資料通路的效率。其中concurrentskiplistset底層使用concurrentskiplistmap支撐。

concurrentskiplistmap也是concurrentnavigablemap的實作類,對于skiplist,其内部元素,必須是可排序的。

跳躍表是一個很簡單的表,(參見跳躍表概念),對底層的線性存儲結構,加入類似“多級索引”的概念,“索引”的添加時基于随即化。一個跳躍表,整體設計上(設計思路很多)分為表左端head索引,右端tail索引(邊界),底端存儲層(排序的線性連結清單),和一個随機化、散列化的不同高度的多級索引“指針”。head索引是高度最高的索引,它是整個連結清單中值最小的元素鎖産生的索引;右端為邊界索引,索引指向null或者任意設計的邊界值(bound).

跳躍表的底端是一個和普通的連結清單沒啥差別,單向或者雙向的均可,前提是必須是排序的。索引節點,就是一個有向路徑的标,每個索引節點,都分别有right、down指向,對于雙向跳躍表,就具有left、right、up、down四個方向指針;指針就是為了友善尋路。每個新增元素時,首先會導緻底層連結清單的改動;根據自定義的随即算法,來決定此元素的索引高度,如果高度不為0,則依次建立相應層高的索引,并調整各個層高的是以指向。

跳躍表之是以這麼設計,實事上就是在做一件事情:基于簡單的設計思路和算法,來實作較為高效的查詢政策。相對于二分查找有一定的優勢.

五.copyonwritearraylist/copyonwritearrayset:

均是copyonwrite思想,在資料修改時(happen-before),對資料進行copy(),read操作可以在原資料結構上繼續進行,待write操作結束後,調整資料結構指針。基于這種設計思路的資料結構,通常是read操作頻率遠大于write操作,可以在并發環境中,支撐較高的吞吐量;避免了因為同步而帶來的瓶頸,同時也能確定資料安全操作。同時需要注意,copy操作将會帶來多餘的空間消耗。注意,此api時支援并發的,多個線程add操作(即copyonwrite)将會被隊列化,内部采取了reentrantlock機制來控制.

copyonwritearraylist底層基于數組實作,在進行write操作時(add,remove),将會導緻arrays.copy操作,建立一個新的數組;待write操作成功後,将原數組的指針替換成新數組指針.

copyonwritearrayset底層直接使用copyonwritearraylist作為支撐,隻不過在add操作時會周遊整個數組結構并進行equals比較(確定具有set的特性),隻有發現此新元素不存在時才會"替換指針".

    java中這兩個api,支援并發操作時,仍然可以進行周遊而無需額外的同步;即不會抛出concurrentmodificationexception。事實上,疊代器所持有的數組隻是一個"建立iterator時底層數組的引用",是以在周遊期間,即使copyonwritearraylist已經新增或者删除了某些元素,仍不會發生沖突,因為iterator持有的是舊數組的引用,而copyonwritearraylist持有的是copy操作時建立的新數組引用..由此可見,iterator也無法反應實時的數組變化(周遊期間,實際數組的添加、删除),但是原始數組中對象内容發生改變還是可以在疊代器中反應出來。copyonwrite的周遊器的remove/add/set操作不被支援,這差別于arraylist.

    copyonwritearraylist、copyonwritearrayset,底層基于數組實作,采取reentrantlock來同步add/remove/clear等操作。并采取了snapshot的簡單手段:

并發資料結構

//例如add:  

public boolean add(e e) {  

    final reentrantlock lock = this.lock;  

    lock.lock();  

    try {  

       object[] elements = getarray();  

       int len = elements.length;  

        //數組copy  

       object[] newelements = arrays.copyof(elements, len + 1);  

       newelements[len] = e;  

        //修改結束後,指針轉換  

       setarray(newelements);  

       return true;  

    } finally {  

       lock.unlock();  

    }  

六.countdownlatch:

同步類,用于多個線程協調工作。共享鎖,當鎖計數器較少到0時,将釋放等待的線程。使用場景是,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。當countdownlatch的鎖計數器為1時,可以作為一種“開關”來使用。計數器無法被重置,如果需要重複計數,可以使用cyclicbarrier。

countdownlatch内部基于aqs來控制線程通路。這個api很簡單,隻有2個核心方法:

void await():如果計數器不為0,則擷取鎖失敗,加入同步隊列;即線程阻塞。

void countdown():釋放鎖,導緻計數器遞減,如果此時計數器為0,則表示鎖釋放成功,aqs會幫助“發射”因為await阻塞的線程(組)。

并發資料結構

public class countdownlatchtestmain {  

    /** 

    * @param args 

    */  

    public static void main(string[] args) throws exception{  

        system.out.println("begin");  

        countdownlatch latch = new countdownlatch(2);  

        for(int i=0;i<4;i++){  

            cthread c = new cthread(i,latch);  

            c.start();  

            //thread.sleep(500);  

        }  

        thread.sleep(1000);  

        system.out.println("end");  

    static class cthread extends thread{  

        countdownlatch latch;  

        int count;  

        cthread(int count,countdownlatch latch){  

            this.count = count;  

            this.latch = latch;  

        @override  

        public void run(){  

            try{  

                system.out.println("---"+count);  

            if(count % 2 == 0){  

                latch.await();  

                system.out.println("//" + count + "await--!");  

            }else{  

                latch.countdown();  

                system.out.println("//" + count + "down!");  

            }  

            }catch(exception e){  

                e.printstacktrace();  

七.cyclicbarrier:

循環屏障,它允許一組線程互相等待,直到到達某個公共屏障點;線程(組)數量固定,線程之間需要不時的互相等待,cyclicbarrier和countdownlatch相比,它可以在釋放等待線程後被再次“重用”,是以稱為循環屏障。它提供了類似“關卡”的功能。對于失敗的同步嘗試,cyclicbarrier 使用了一種要麼全部要麼全不 (all-or-none) 的破壞模式:如果因為中斷、失敗或者逾時等原因,導緻線程過早地離開了屏障點,那麼在該屏障點等待的其他所有線程也将通過 brokenbarrierexception(如果它們幾乎同時被中斷,則用 interruptedexception)以反常的方式離開。

cyclicbarrier(int parties):指定參與者個數

cyclicbarrier(int parties,runnable barrieraction):指定一個屏障操作,此操作将會有最後一個進入barrier的線程執行。

int await():在所有的線程達到barrier之前,一直等待。此方法可以抛出interrutedexeception(此線程被中斷),可以抛出brokenbarrierexeception(當其他參與者在wait期間中斷,導緻屏障完整性被破壞),在方法被await時,如果抛出上述異常,需要做補救的相應操作。此方法傳回目前線程到達屏障時的索引。(第一個到達的,為0,最後一個為getparties() - 1);根據傳回值的不同可以做一些操作,比如最先/最後達到的做一些前置、後置操作等。

boolean isbroken():屏障是否處于損壞狀态。

void reset():重置屏障為其初始狀态;如果此時有線程在await,其線程将會抛出brokenbarrierexeception。對于reset操作,需要線程的執行方法有相應的配合(比如支援操作輪訓等),否則重置會帶來一些不必要的麻煩。。。如果你需要重置,尚不如重建立一個cyclicbarrier。

底層基于reentrantlock實作。線程阻塞基于condition方式(注意condition底層也是通過aqs架構實作);大概僞代碼:

并發資料結構

reentrantlock lock = new reentrantlock();  

condition trip = lock.newcondition();  

////await方法:  

if(count!=0){  

    trip.await();//aqs:目前線程隊列化,locksupport.park  

    count--;  

}else{  

    trip.signalall();  

并發資料結構

//////////////////代碼執行個體  

public class cyclicbarriertestmain {  

        cyclicbarrier barrier = new cyclicbarrier(5, new runnable() {  

            @override  

            public void run() {  

                system.out.println("barrier action!!");  

            });  

        for(int i=0;i<5;i++){  

            cthread c = new cthread(barrier);  

        cyclicbarrier barrier;  

        cthread(cyclicbarrier barrier){  

            this.barrier = barrier;  

            int count = 0;  

            while(true){  

                try{  

                    system.out.print("---" + count);  

                    int index = barrier.await();  

                    system.out.println("+++" + count);  

                    count++;  

                    if(index == barrier.getparties() - 1){  

                        //barrier.reset();  

                    }  

                }catch(exception e){  

                    e.printstacktrace();  

                    break;  

                }  

 八.exchanger

exchanger:同步交換器,2個互相比對(協調的對象),互相交換資料。2個線程需要把相同類型的資料,以互相等待的方式交換。比如線程1将資料a交換給b,此時線程1等待直到線程b将資料交換出去。exchanger有一個方法,就是exchange(v x):其作用就是等待另一個線程到達交換點,然後将資料傳遞給線程。

如果沒有其他線程到達交換點,處于排程的目的,禁用目前線程,直到某個線程到達或者某個線程中斷。

僞代碼:

并發資料結構

void exchange(v item){  

    //如果有線程已經到達  

    for(;;){  

        node e = get();  

        if(e != null){  

            v i = e.getitem();  

            cas(e,i,null);//将等待比對者移除  

            thread t = e.waiter;  

            locksupport.unpark(t);  

            //  

            node ne = new node(currentthread,ne);  

            set();//将目前需要交換的資料加入,當其他線程unpart之後,可以get,并擷取資料  

            return i;//傳回需要交換的資料  

        }else{  

            node e = new now(currentthread,item);  

            set(node);  

            locksupport.park(currentthread);  

        //重新回到頂層for循環,并擷取交換資料  

如下的例子是基于一個簡單的productor和consumer模式,一個線程負責生産資料,當資料滿時,交換給consumer消費;當consumer消費完時,也申請交換。

并發資料結構

import java.util.arraydeque;  

import java.util.queue;  

import java.util.random;  

import java.util.concurrent.exchanger;  

public class exchangertestmain {  

        exchanger<queue<integer>> exchanger = new exchanger<queue<integer>>();  

        cthread c = new cthread(exchanger);  

        pthread p = new pthread(exchanger);  

        c.start();  

        p.start();  

        thread.sleep(2000);  

        exchanger<queue<integer>> exchanger ;  

        queue<integer> current;  

        cthread(exchanger<queue<integer>> exchanger){  

            this.exchanger = exchanger;  

            if(current == null){  

                current = new arraydeque<integer>(10);  

                while(true){  

                //productor  

                if(current.size() == 0){  

                    current = exchanger.exchange(current);//交換出去fulllist,希望獲得emptylist  

                    system.out.println("c:" + current.poll());  

                return;  

    static class pthread extends thread{  

        pthread(exchanger<queue<integer>> exchanger){  

            random r = new random();  

                    //productor  

                    if(current.size() >= 10){  

                        current = exchanger.exchange(current);//交換出去fulllist,希望獲得emptylist  

                    integer i = r.nextint(20);  

                    system.out.println("p:" + i);  

                    current.add(i);  

九.semaphore:信号量

我們需要把semaphore真的看成“信号量”,它是可以被“增減”的鎖引用,“0”是判斷信号“過剩”的界限。

我們通常使用semaphore來控制資源通路并發量。它底層使用“共享”模式鎖實作,提供了“公平”“非公平”2中政策。當“信号量”大于0時,允許擷取鎖;否則将阻塞直到信号量恢複。

将信号量初始化為 1,使得它在使用時最多隻有一個可用的許可,進而可用作一個互相排斥的鎖。這通常也稱為二進制信号量,因為它隻能有兩種狀态:一個可用的許可,或零個可用的許可。按此方式使用時,二進制信号量具有某種屬性(與很多 lock 實作不同),即可以由線程釋放“鎖”,而不是由所有者(因為信号量沒有所有權的概念)。在某些專門的上下文(如死鎖恢複)中這會很有用。

semaphore(int permits, boolean fair):指定信号量,指定公平政策。

void acquire():擷取一個信号,如果信号量<=0,則阻塞;在非公平模式下,允許闖入。

void acquire(int permits).

上面2個方法都會抛出interruptexception,即在等待線程被“中斷時”,将會抛出異常而傳回。底層基于aqs.acquiresharedinterruptibly()

void acquireuninterruptibly():擷取一個信号,不支援中斷,當線程被中斷時,此線程将繼續等待,當線程确實從此方法傳回後,将設定其中斷狀态。底層基于aqs.acquireshared();

void release():釋放一個信号,直接導緻信号量++。

boolean tryacquire():擷取一個信号,如果擷取成功,則傳回true。

原文連結:[http://wely.iteye.com/blog/2228720]