天天看点

并发数据结构

一.blockingdeque阻塞双端队列(线程安全):

注意arraydeque和linkedlist仅仅扩展了deque,是非阻塞类型的双端队列。

blockingqueue单向队列,其内部基于reentrantlock + condition来控制同步和"阻塞"/"唤醒"的时机;有如下几个实现类:

arrayblockingqueue: “浮动相对游标”的数组,来实现有界的阻塞队列。

delayqueue:支持“可延迟”的队列,delayqueue还只接受delayed类型的元素,delayed接口继承自compare接口并提供了一个long getdelay(timeunit),来获取指定时间到now的时间剩余量。delayqueue底层就是使用priorityqueue作为支撑的。

priorityblockingqueue:有权重的队列,此队列时可以根据指定的comparator进行排序的。

synchronousqueue://

linkedblockingdeque:有界或者无界阻塞队列

priorityqueue为非线程安全非阻塞,有权重的队列,其权重需要根据特定的compartor来产生。

二.concurrentmap(接口):支持并发的map,支持多线程环境中安全的访问。

其提供了几个独特的方法:

v putifabsent(k,v):如果map中不存在此key,则put,否则返回现有的此key关联的值。此过程有lock同步:

并发数据结构

//等价于:  

if (!map.containskey(key))   

      return map.put(key, value);  

  else  

return map.get(key);  

并发数据结构

map<string,object> map = new concurrenthashmap<string, object>();  

if(map.containskey("key")){  

    map.put("key", new object());  

}  

//注意,concurrenthashmap并不保证contains方法和put方法直接保持"原子性",即有可能contains方法返回false之后,在put之前,可能其他线程已经put成功,即在当前线程put时,此时数据已经不一致了.建议采用putifabsent  

boolean remove(object key,object value):比较并删除指定的key和value。

boolean replace(k,v oldvalue,v newvalue):比较并替换。

目前实现concurrentmap的类有concurrenthashmap,一种基于锁分段技术实现的并发hashmap,锁采取了reentrantlock。

三.concurrentlinkedqueue:

基于单向链表实现的,线程安全的并发队列,无界非阻塞队列,当队列需要在多线程环境中被使用,可以考虑使用它。记住,这是个非阻塞队列,不过支持阻塞的队列,貌似都是线程安全的。

此队列的size不是时间固定的,它的iterator也会被不断调整。concurrentlinkedqueue并没有使用lock,而是采用了cas的方式,对tail.next进行赋值操作。因为tail.next永远是null,且队列不接受null的元素。

注意,非并发集合(list,queue,set)的iterator以及foreach循环在并发环境中是不能正常工作的,如果原始集合被外部修改(其他线程的add,remove),将会导致异常。对于并发集合的iterator,没有做相关的size校验。

lock(锁)是控制操作(action)的,可以让一个操作或者一个子操作被串行的处理。。。cas其实只是对内存数据的变更时使用,如果想知道数据变更在并发环境中是否符合预期,才会使用到cas。

四.concurrentskiplistmap/concurrentskiplistset

两个基于skiplist(跳跃表)方式实现的、支持并发访问的数据结构。依据跳跃表的思想,可以提高数据访问的效率。其中concurrentskiplistset底层使用concurrentskiplistmap支撑。

concurrentskiplistmap也是concurrentnavigablemap的实现类,对于skiplist,其内部元素,必须是可排序的。

跳跃表是一个很简单的表,(参见跳跃表概念),对底层的线性存储结构,加入类似“多级索引”的概念,“索引”的添加时基于随即化。一个跳跃表,整体设计上(设计思路很多)分为表左端head索引,右端tail索引(边界),底端存储层(排序的线性链表),和一个随机化、散列化的不同高度的多级索引“指针”。head索引是高度最高的索引,它是整个链表中值最小的元素锁产生的索引;右端为边界索引,索引指向null或者任意设计的边界值(bound).

跳跃表的底端是一个和普通的链表没啥区别,单向或者双向的均可,前提是必须是排序的。索引节点,就是一个有向路径的标,每个索引节点,都分别有right、down指向,对于双向跳跃表,就具有left、right、up、down四个方向指针;指针就是为了方便寻路。每个新增元素时,首先会导致底层链表的改动;根据自定义的随即算法,来决定此元素的索引高度,如果高度不为0,则依次建立相应层高的索引,并调整各个层高的所以指向。

跳跃表之所以这么设计,实事上就是在做一件事情:基于简单的设计思路和算法,来实现较为高效的查询策略。相对于二分查找有一定的优势.

五.copyonwritearraylist/copyonwritearrayset:

均是copyonwrite思想,在数据修改时(happen-before),对数据进行copy(),read操作可以在原数据结构上继续进行,待write操作结束后,调整数据结构指针。基于这种设计思路的数据结构,通常是read操作频率远大于write操作,可以在并发环境中,支撑较高的吞吐量;避免了因为同步而带来的瓶颈,同时也能确保数据安全操作。同时需要注意,copy操作将会带来多余的空间消耗。注意,此api时支持并发的,多个线程add操作(即copyonwrite)将会被队列化,内部采取了reentrantlock机制来控制.

copyonwritearraylist底层基于数组实现,在进行write操作时(add,remove),将会导致arrays.copy操作,创建一个新的数组;待write操作成功后,将原数组的指针替换成新数组指针.

copyonwritearrayset底层直接使用copyonwritearraylist作为支撑,只不过在add操作时会遍历整个数组结构并进行equals比较(确保具有set的特性),只有发现此新元素不存在时才会"替换指针".

    java中这两个api,支持并发操作时,仍然可以进行遍历而无需额外的同步;即不会抛出concurrentmodificationexception。事实上,迭代器所持有的数组只是一个"创建iterator时底层数组的引用",所以在遍历期间,即使copyonwritearraylist已经新增或者删除了某些元素,仍不会发生冲突,因为iterator持有的是旧数组的引用,而copyonwritearraylist持有的是copy操作时创建的新数组引用..由此可见,iterator也无法反应实时的数组变化(遍历期间,实际数组的添加、删除),但是原始数组中对象内容发生改变还是可以在迭代器中反应出来。copyonwrite的遍历器的remove/add/set操作不被支持,这区别于arraylist.

    copyonwritearraylist、copyonwritearrayset,底层基于数组实现,采取reentrantlock来同步add/remove/clear等操作。并采取了snapshot的简单手段:

并发数据结构

//例如add:  

public boolean add(e e) {  

    final reentrantlock lock = this.lock;  

    lock.lock();  

    try {  

       object[] elements = getarray();  

       int len = elements.length;  

        //数组copy  

       object[] newelements = arrays.copyof(elements, len + 1);  

       newelements[len] = e;  

        //修改结束后,指针转换  

       setarray(newelements);  

       return true;  

    } finally {  

       lock.unlock();  

    }  

六.countdownlatch:

同步类,用于多个线程协调工作。共享锁,当锁计数器较少到0时,将释放等待的线程。使用场景是,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。当countdownlatch的锁计数器为1时,可以作为一种“开关”来使用。计数器无法被重置,如果需要重复计数,可以使用cyclicbarrier。

countdownlatch内部基于aqs来控制线程访问。这个api很简单,只有2个核心方法:

void await():如果计数器不为0,则获取锁失败,加入同步队列;即线程阻塞。

void countdown():释放锁,导致计数器递减,如果此时计数器为0,则表示锁释放成功,aqs会帮助“发射”因为await阻塞的线程(组)。

并发数据结构

public class countdownlatchtestmain {  

    /** 

    * @param args 

    */  

    public static void main(string[] args) throws exception{  

        system.out.println("begin");  

        countdownlatch latch = new countdownlatch(2);  

        for(int i=0;i<4;i++){  

            cthread c = new cthread(i,latch);  

            c.start();  

            //thread.sleep(500);  

        }  

        thread.sleep(1000);  

        system.out.println("end");  

    static class cthread extends thread{  

        countdownlatch latch;  

        int count;  

        cthread(int count,countdownlatch latch){  

            this.count = count;  

            this.latch = latch;  

        @override  

        public void run(){  

            try{  

                system.out.println("---"+count);  

            if(count % 2 == 0){  

                latch.await();  

                system.out.println("//" + count + "await--!");  

            }else{  

                latch.countdown();  

                system.out.println("//" + count + "down!");  

            }  

            }catch(exception e){  

                e.printstacktrace();  

七.cyclicbarrier:

循环屏障,它允许一组线程互相等待,直到到达某个公共屏障点;线程(组)数量固定,线程之间需要不时的互相等待,cyclicbarrier和countdownlatch相比,它可以在释放等待线程后被再次“重用”,所以称为循环屏障。它提供了类似“关卡”的功能。对于失败的同步尝试,cyclicbarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 brokenbarrierexception(如果它们几乎同时被中断,则用 interruptedexception)以反常的方式离开。

cyclicbarrier(int parties):指定参与者个数

cyclicbarrier(int parties,runnable barrieraction):指定一个屏障操作,此操作将会有最后一个进入barrier的线程执行。

int await():在所有的线程达到barrier之前,一直等待。此方法可以抛出interrutedexeception(此线程被中断),可以抛出brokenbarrierexeception(当其他参与者在wait期间中断,导致屏障完整性被破坏),在方法被await时,如果抛出上述异常,需要做补救的相应操作。此方法返回当前线程到达屏障时的索引。(第一个到达的,为0,最后一个为getparties() - 1);根据返回值的不同可以做一些操作,比如最先/最后达到的做一些前置、后置操作等。

boolean isbroken():屏障是否处于损坏状态。

void reset():重置屏障为其初始状态;如果此时有线程在await,其线程将会抛出brokenbarrierexeception。对于reset操作,需要线程的执行方法有相应的配合(比如支持操作轮训等),否则重置会带来一些不必要的麻烦。。。如果你需要重置,尚不如重新建一个cyclicbarrier。

底层基于reentrantlock实现。线程阻塞基于condition方式(注意condition底层也是通过aqs框架实现);大概伪代码:

并发数据结构

reentrantlock lock = new reentrantlock();  

condition trip = lock.newcondition();  

////await方法:  

if(count!=0){  

    trip.await();//aqs:当前线程队列化,locksupport.park  

    count--;  

}else{  

    trip.signalall();  

并发数据结构

//////////////////代码实例  

public class cyclicbarriertestmain {  

        cyclicbarrier barrier = new cyclicbarrier(5, new runnable() {  

            @override  

            public void run() {  

                system.out.println("barrier action!!");  

            });  

        for(int i=0;i<5;i++){  

            cthread c = new cthread(barrier);  

        cyclicbarrier barrier;  

        cthread(cyclicbarrier barrier){  

            this.barrier = barrier;  

            int count = 0;  

            while(true){  

                try{  

                    system.out.print("---" + count);  

                    int index = barrier.await();  

                    system.out.println("+++" + count);  

                    count++;  

                    if(index == barrier.getparties() - 1){  

                        //barrier.reset();  

                    }  

                }catch(exception e){  

                    e.printstacktrace();  

                    break;  

                }  

 八.exchanger

exchanger:同步交换器,2个互相匹配(协调的对象),互相交换数据。2个线程需要把相同类型的数据,以互相等待的方式交换。比如线程1将数据a交换给b,此时线程1等待直到线程b将数据交换出去。exchanger有一个方法,就是exchange(v x):其作用就是等待另一个线程到达交换点,然后将数据传递给线程。

如果没有其他线程到达交换点,处于调度的目的,禁用当前线程,直到某个线程到达或者某个线程中断。

伪代码:

并发数据结构

void exchange(v item){  

    //如果有线程已经到达  

    for(;;){  

        node e = get();  

        if(e != null){  

            v i = e.getitem();  

            cas(e,i,null);//将等待匹配者移除  

            thread t = e.waiter;  

            locksupport.unpark(t);  

            //  

            node ne = new node(currentthread,ne);  

            set();//将当前需要交换的数据加入,当其他线程unpart之后,可以get,并获取数据  

            return i;//返回需要交换的数据  

        }else{  

            node e = new now(currentthread,item);  

            set(node);  

            locksupport.park(currentthread);  

        //重新回到顶层for循环,并获取交换数据  

如下的例子是基于一个简单的productor和consumer模式,一个线程负责生产数据,当数据满时,交换给consumer消费;当consumer消费完时,也申请交换。

并发数据结构

import java.util.arraydeque;  

import java.util.queue;  

import java.util.random;  

import java.util.concurrent.exchanger;  

public class exchangertestmain {  

        exchanger<queue<integer>> exchanger = new exchanger<queue<integer>>();  

        cthread c = new cthread(exchanger);  

        pthread p = new pthread(exchanger);  

        c.start();  

        p.start();  

        thread.sleep(2000);  

        exchanger<queue<integer>> exchanger ;  

        queue<integer> current;  

        cthread(exchanger<queue<integer>> exchanger){  

            this.exchanger = exchanger;  

            if(current == null){  

                current = new arraydeque<integer>(10);  

                while(true){  

                //productor  

                if(current.size() == 0){  

                    current = exchanger.exchange(current);//交换出去fulllist,希望获得emptylist  

                    system.out.println("c:" + current.poll());  

                return;  

    static class pthread extends thread{  

        pthread(exchanger<queue<integer>> exchanger){  

            random r = new random();  

                    //productor  

                    if(current.size() >= 10){  

                        current = exchanger.exchange(current);//交换出去fulllist,希望获得emptylist  

                    integer i = r.nextint(20);  

                    system.out.println("p:" + i);  

                    current.add(i);  

九.semaphore:信号量

我们需要把semaphore真的看成“信号量”,它是可以被“增减”的锁引用,“0”是判断信号“过剩”的界限。

我们通常使用semaphore来控制资源访问并发量。它底层使用“共享”模式锁实现,提供了“公平”“非公平”2中策略。当“信号量”大于0时,允许获取锁;否则将阻塞直到信号量恢复。

将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。按此方式使用时,二进制信号量具有某种属性(与很多 lock 实现不同),即可以由线程释放“锁”,而不是由所有者(因为信号量没有所有权的概念)。在某些专门的上下文(如死锁恢复)中这会很有用。

semaphore(int permits, boolean fair):指定信号量,指定公平策略。

void acquire():获取一个信号,如果信号量<=0,则阻塞;在非公平模式下,允许闯入。

void acquire(int permits).

上面2个方法都会抛出interruptexception,即在等待线程被“中断时”,将会抛出异常而返回。底层基于aqs.acquiresharedinterruptibly()

void acquireuninterruptibly():获取一个信号,不支持中断,当线程被中断时,此线程将继续等待,当线程确实从此方法返回后,将设置其中断状态。底层基于aqs.acquireshared();

void release():释放一个信号,直接导致信号量++。

boolean tryacquire():获取一个信号,如果获取成功,则返回true。

原文链接:[http://wely.iteye.com/blog/2228720]