天天看点

从生产者消费者窥探线程同步(下)

欢迎转载,转载请注明出处。尊重他人的一丢丢努力,谢谢啦!

阅读本篇之前,如果你还没有看过从生产者消费者窥探线程同步(上)

,那不妨先戳一下,两篇一起嚼才更好呢。

上一篇分析了使用BlockQueue和synchronized来实现生产者消费者模式。这一篇来看一下其他的实现,闲言少叙。

(3)Lock实现

核心:Lock的用法中规中矩,有点类似于非静态同步方法,只是前者是对lock对象显式加锁,而后者是对当前对象隐式加锁。

我相信大多数人在第一次接触Lock锁的时候,内心都会有这样的疑惑:明明提供了lock就能实现加锁解锁,而这多出来的Condition是干鸡毛的?怎么用?适用场合是啥?

事实上,Java 5之所以提供了Condition接口(可通过Lock.newCondition()来产生),主要是用来解决线程间的通信问题,通过condition的signalAll()和await()方法可以很方便地实现线程间的“广播”或者“悄悄话”。咦那不对吖,用Lock自身的方法岂不更方便?何必又引进Condition呢?

很不幸的是,Lock只是接口,它并不是Object的子类,而这个接口里面也没有提供类似于wait()和notify()的方法。虽然Condition也是个接口,但人家里面提供了signalAll()和await()哇。想来它的这种设计,可能基于这样的考虑:让锁的实现更加细颗粒度,更加精准。

怎么理解呢?不妨先来看张图。

从生产者消费者窥探线程同步(下)

椭圆代表了等待lock锁的所有线程集合,而集合A、B和C则是执行不同任务的线程,此刻它们都在原地休整,等待任务的重新开始。

假设Lock里面提供了notify()方法,现在前一个线程执行完毕,Lock的notify()被调用了,它的方法受体是整个椭圆,接下来的场景就是椭圆内的所有线程立刻打了鸡血,原地复活,都去争夺唯一的一把锁,那可真是人仰马翻哭声震天,惨烈程度可想而知。并且很有可能出现的情况是,我只想让执行任务A的线程获取锁资源,偏偏被执行任务B的线程拿到了,然而B检查了一下发现自己并不具备运行的条件,那怎么办?只好放弃锁继续等待呗。这样的情况显然不是我们想看到的。

而Condition很好地处理了这个问题。现在仍然是椭圆里所有的线程都在等待,线程S在使用完锁资源之后,发现目前的条件适合A运行,那就调用A对应Condition实例的notify()方法,于是A内的线程开始躁动,然而B和C却是静悄悄的,就像不知道这回事一样。A中的小a拿到锁之后,离开就开始生产活动,小a在执行结束的时候,发现目前的条件适合B运行,于是她就不动声色地调用B对应Condition实例的notify()方法,然后自己又回去等待休息。B集合中的线程听到调动号令,自然要开始新一轮的争抢,无辜的C集合依然在漫长的等待,等待…

说了这么多,不妨通过代码来看一下。

public class StorageLock {
    public static Integer MAX = 50;
    public static List<Product> list = new ArrayList<>();
    public static Lock lock = new ReentrantLock();
    public static Condition notEmpty = lock.newCondition();
    public static Condition notFull = lock.newCondition();

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ExecutorService s = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            s.submit(new Producer(10));
            s.submit(new Consumer(5));
        }
    }

    public static void produce(Integer num) {
        lock.lock();
        try {
            while (list.size() + num > MAX) {
                try {
                    System.out.println("数量过大,无法生产");
                    //队列太满了,生产的就先等一下吧,自动释放lock
                    notFull.await();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            for (int i = 0; i < num; i++) {
                list.add(new Product(i, ""));
            }
            System.out.println("生产后,存储量为: " + list.size());
            //生产后,队列里面有产品,消费的赶紧去消费吧。
            notEmpty.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void consume(Integer num) {
        lock.lock();
        try {
            while (list.size() < num) {
                System.out.println("数量过大,无法消费");
                try {
                    //消费的太多,没那么多货呃,那先等等吧,自动释放lock
                    notEmpty.await();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            for (int i = 0; i < num; i++) {
                list.remove(0);
            }
            System.out.println("消费后,存储量为: " + list.size());
            //消费完了,仓库里有空间,要生产的赶紧去生产吧
            notFull.signalAll();
        } finally {
            lock.unlock();
        }
    }
}           

部分输出:

生产后,存储量为: 10
消费后,存储量为: 5
生产后,存储量为: 15
消费后,存储量为: 10
生产后,存储量为: 20
消费后,存储量为: 15
消费后,存储量为: 10
生产后,存储量为: 20
...
           

这里采用了两个条件notEmpty和notFull,来控制不同角色的线程,前者控制消费行为,后者控制生产行为。当然,你也可以只是用一个条件来控制,比如把程序换成这样:

public static void produce(Integer num) {
        lock.lock();
        try {
            while (list.size() + num > MAX) {
                try {
                    System.out.println("数量过大,无法生产");
                    notFull.await();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            for (int i = 0; i < num; i++) {
                list.add(new Product(i, ""));
            }

            System.out.println("生产后,存储量为: " + list.size());
            notFull.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void consume(Integer num) {
        lock.lock();
        try {
            while (list.size() < num) {
                System.out.println("数量过大,无法消费");
                try {
                    notFull.await();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            for (int i = 0; i < num; i++) {
                list.remove(0);
            }
            System.out.println("消费后,存储量为: " + list.size());
            notFull.signalAll();
        } finally {
            lock.unlock();
        }
    }
           

部分输出:

生产后,存储量为: 10
生产后,存储量为: 20
消费后,存储量为: 15
消费后,存储量为: 10
消费后,存储量为: 5
消费后,存储量为: 0
数量过大,无法消费
...           

结果也完全没问题,只是每执行一次signalAll(),所有线程都来竞争锁资源,增加了锁获取的难度。

(4)不常用的PV实现

PV,高中同学的外号就叫PV,算一算毕业许久,还真有点想念他们呐。

相比于synchronized,信号量Semaphore和互斥量Mutex使用的就少多了。而他们的侧重点也有所不同。synchronized偏重对资源的直接控制。而Semaphore更像是控制访问资源的并发量,它基于CAS(compareAndSwap,一种非阻塞的同步手段)实现,支持公平和不公平机制。Mutex,多用在链表中的锁传递,且这个类不在java.util路径下暂且不提。

拿著名的厕所理论来说吧。假如有5个人要上厕所,厕所有3个坑位。

现在这5个人死脑筋,都要用靠近窗户风景好的那个1号坑,谁也不让谁,只好请synchronized来裁决,synchronized对1号坑位加了锁,你拿到钥匙就去,否则就干等着。

假如这5个人比较现实,都说无所谓用几号坑。但是坑位只有3个,不能全部都进去,也是谁都不让谁,只好请Semaphore来裁决,Semaphore决定发号牌,注意不是对坑位加锁,领到号牌的进去,没领到的干等着。至于进去之后的事,Semaphore就不管了。这个时候进去的A和B看见唯一的公共厕纸质量炒鸡好,都争着去抢,又发生了竟态,就导致另外一个同步问题了。

说了这么多,相信对他们的区别已经有所了解,如果不,请仔细体味后,再继续往下看。

public class StorageSemaphore {
    public static Integer MAX = 50;
    public static List<Product> list = new ArrayList<>();
    // 公平锁,所谓的公平锁,就是讲究顺序,先申请先得。反之不公平锁,就是与大家机会均等,与顺序无关。
    public static Semaphore semaphore = new Semaphore(3, true);

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ExecutorService s = Executors.newCachedThreadPool();
        for (int i = 0; i < 200; i++) {
            s.submit(new Producer(10));
            s.submit(new Consumer(5));
        }
    }

    public static void produce(Integer num) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        // synchronized (StorageSemaphore.class) {

        while (list.size() + num < MAX) {
            for (int i = 0; i < num; i++) {
                list.add(new Product(i, ""));
            }
            System.out.println(Thread.currentThread().getName() + "\t已经生产数:"
                    + num + "\t现仓储量为:" + list.size());
        }
        System.out.println("要生产的数量:" + num + "\t库存量:" + list.size()
                + "\t暂时不能执行生产任务!");
        // }
        semaphore.release();
    }

    public static void consume(Integer num) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        // synchronized (StorageSemaphore.class) {

        while (list.size() > num) {
            for (int i = 0; i < num; i++) {
                list.remove(0);
            }
            System.out.println(Thread.currentThread().getName() + "\t已经消费数:"
                    + num + "\t现仓储量为:" + list.size());
        }
        System.out.println("要消费的数量:" + num + "\t库存量:" + list.size()
                + "\t暂时不能执行消费任务!");
        // }
        semaphore.release();
    }
}
           

部分运行结果:

...
pool-1-thread-107   已经消费数:5 现仓储量为:35
pool-1-thread-108   已经消费数:5 现仓储量为:30
pool-1-thread-110   已经消费数:5 现仓储量为:36
pool-1-thread-111   已经生产数:10    现仓储量为:36
pool-1-thread-109   已经生产数:10    现仓储量为:46
pool-1-thread-112   已经消费数:5 现仓储量为:41
pool-1-thread-113   已经消费数:5 现仓储量为:36
pool-1-thread-115   已经生产数:10    现仓储量为:46
...           

敏感的同学可能已经注意到了,怎么出现36和41这样的数据呢,我们代码中无论生产和消费都是5的倍数呀(注意不同的机器上运行效果可能不一样,请多运行几次,肯定会有类似情况)。唯一的解释就是出现了内部争夺”厕纸”的情况。不妨把注释同步的部分去掉,再运行一下看看。

这次无论你运行多少次,输出都不会有异常的数据。那,完美了?

并没有。狂汗…

先来看个问题,前面我们已经分析过使用if()会导致的后果,而在这段程序里如果你使用if(),得到的效果其实是一样的。究其原因,这里的程序并没有使用拥塞控制策略,也就是说一个线程如果不满足生产条件,那么它将结束自己,而不是循环中等待条件合适后继续生产,所以也就没什么区别了。若有疑问的话,可以自己动手尝试验证。

除了加synchronized同步块之外,那就没有别的办法了?我们还有个互斥量Mutex没用呢。下面用binary semaphore来模拟一下。将上面代码的

public static Semaphore semaphore = new Semaphore(3, true);

修改为

public static Semaphore semaphore = new Semaphore(1, true);

,其他的都不用动,同步问题就解决了。

但问题到这里并没有结束,上面已经说过,程序里没有采用拥塞控制,虽然解决了同步,但它用的实际上是一种抛弃策略:想生产的不能保证都能生产到,该消费的也不保证都能消费到。这是它的弊端,但这并不影响它满足生产者消费者模式的6个条件(啥条件?忘记的话,请回到上一篇查看)。

如果你精力充沛,完全可以在synchronized里面添加对象的notify()和wait()操作(天呐,人生都如此艰难了,何必为难自己呐…),不过要注意多重锁极易造成死锁,倒不如放弃semaphore直接使用synchronized来得直接。

(5)总是被误用的volatile

volatile很诱人,也很容易被用错。关于它,你只要记住一句话:volatile能保证从主内存读取到最新的值,但它并不是线程安全的。

这里还是说一下,Java内存模型讨论的线程安全一般都是围绕这原子性、可见性、有序性这三个方面来考虑的(速记规则:”可有缘”)。

volatile的内存同步机制保证了可见性,本身的禁止指令重排语义保证了有序性,但致命的是,它保证不了原子性。

一般说来JVM提供的原子操作,只有8个,分别是lock、unlock、read、load、assign、use、store和write(速记规则”allrsuuw”)。

常用的synchronized三个条件统统满足,也是jvm比较倚重的锁实现方式。相比volatile来说,synchronized在字节码层面提供了monitorenter(lock)和monitorexit(unlock)操作,从而保证了操作的原子性。

回到我们的话题。

得,直接看代码吧。

public class StorageVolatile {

    public static Integer MAX = 50;
    public static volatile List<Product> list = new ArrayList<>();

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ExecutorService s = Executors.newCachedThreadPool();
        for (int i = 0; i < 200; i++) {
            s.submit(new Producer(5));
            s.submit(new Consumer(10));
        }
    }

    public static void produce(Integer num) {
        if (list.size() + num > MAX) {
            System.out.println(Thread.currentThread().getName()
                    + " 我是生产,我在等待... ");
        } else {
            for (int i = 0; i < num; i++) {
                list.add(new Product(i, ""));
            }
            System.out.println(Thread.currentThread().getName() + " 库存: "
                    + list.size());
        }
    }

    public static void consume(Integer num) {
        if (list.size() < num) {
            System.out.println(Thread.currentThread().getName()
                    + " 我是消费,我在等待... ");
        } else {
            for (int i = 0; i < num; i++) {
                list.remove(i);
            }
            System.out.println(Thread.currentThread().getName() + " 库存: "
                    + list.size());
        }
    }
}           

部分输出

pool-1-thread-1 库存: 5
pool-1-thread-1 库存: 10
pool-1-thread-1 库存: 15
pool-1-thread-1 库存: 20
pool-1-thread-1 库存: 10
pool-1-thread-1 库存: 10
pool-1-thread-1 库存: 12
pool-1-thread-1 库存: 12
pool-1-thread-1 库存: 17
pool-1-thread-4 我是消费,我在等待... 
pool-1-thread-1 库存: 13
pool-1-thread-4 库存: 18           

从输出可以发现,我们的生产和消费都是5的倍数,结果却出现了12,17这样的“异类”,也印证了我们的分析。

要用它来实现生产者和消费者模式,就必须借助synchronized或者lock锁,那样的话实际上又回到前三种方案了,感兴趣的娃娃请自行尝试吧。

(6)温和而低调的ThreadLocal

从实现原理上说,ThreadLocal与生产者消费者模式根本就不搭界。把它放在此处说,算是探究线程同步的延伸。

如果说前面的synchronized、Lock,还是BlockQueue(内部也是Lock实现的)、semaphore等PV操作,甚至是volatile关键字,它们都是从或者试图从(volatile就不是安全的)资源的访问权限上(存在竟态)来处理线程同步问题,那ThreadLocal则提供了另外一种完全不同的思路:从资源本身下手消除竟态。

具体来说,就是ThreadLocal为每个线程都提供了一份单独的资源,维护在它的静态内部类ThreadLocalMap中,和平共处互不干扰。它的思路就是,你们不是吃不饱吗你们不是抢吗,来,我就给你们每个人各搞一份,这下都满足了吧,你们这群难伺候的程序,汗…

ThreadLocal类本身使用起来也很方便,主要是get()和set()操作。所以在遇到不需要保证资源一致性的并发场景时,使用ThreadLocal类不失为一种优雅的解决方案。

那它到底该怎么使用呢?不妨来看一下代码(与生产者消费者模式无关)。

public class StorageThreadLocal {
    public static Integer MAX = 50;
    public static List<Product> listInit = new ArrayList<Product>();
    public static ThreadLocal<List<Product>> list = new ThreadLocal<List<Product>>() {
        @Override
        protected List<Product> initialValue() {
            // TODO Auto-generated method stub
            return new ArrayList<Product>();
        }
    };

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ExecutorService s = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            s.submit(new Producer(10));
            s.submit(new Consumer(10));
        }
    }

    public static void produce(Integer num) {
        List<Product> listP = list.get();
        while (listP.size() + num <= MAX) {
            listP.add(new Product(1, ""));
            System.out.println(Thread.currentThread().getName() + " 库存: "
                    + listP.size());
        }
        list.set(listP);
    }

    public static void consume(Integer num) {
        List<Product> listC = list.get();
        while (listC.size() < num) {
            listC.remove(0);        System.out.println(Thread.currentThread().getName() + " 库存: "
                    + listC.size());
        }
        list.set(listC);
    }
}           

部分输出:

pool-1-thread-1 库存: 1
pool-1-thread-1 库存: 2
pool-1-thread-1 库存: 3
pool-1-thread-1 库存: 4
pool-1-thread-1 库存: 5
pool-1-thread-1 库存: 6
pool-1-thread-1 库存: 7
pool-1-thread-1 库存: 8
pool-1-thread-3 库存: 1
pool-1-thread-3 库存: 2
pool-1-thread-3 库存: 3
pool-1-thread-2 库存: 1
pool-1-thread-2 库存: 2
pool-1-thread-2 库存: 3
pool-1-thread-2 库存: 4
pool-1-thread-2 库存: 5
pool-1-thread-2 库存: 6
pool-1-thread-2 库存: 7
pool-1-thread-2 库存: 8
pool-1-thread-2 库存: 9
pool-1-thread-2 库存: 10
pool-1-thread-2 库存: 11
pool-1-thread-2 库存: 12
pool-1-thread-2 库存: 13
...           

很明显,各个线程都有自己队列,大家互不干涉内政,和和气气,愉快生产,再也没有脸红耳赤的尴尬场面了…

总结:

常见的线程同步方案大概就这几种吧,而它们也分别代表了文章开头说的线程同步的三种实现方式,阻塞方式(synchronized、java.util.concurrent包中提供的各种集合(Lock同步))、非阻塞方式(Lock的内部实现)和本身就是安全的代码(ThreadLocal)。各有各的应用特点和应用场景,总的来说,synchronized的效率要高于Lock,毕竟是JVM原生的,但也是应用最广泛最泛滥的;而ThreadLocal的实现机理与前两者都不同,不好直接做比较。

回到生产者和消费者模式上,虽然文中给了四种实现方式,但PV操作的实现必须借助synchronized,所以还是要以前三种为主。

一窥之见,难免疏漏,有不到或待商榷之处,还请来往诸君在留言区热心提出,热烈讨论,热情补充,大家一块学习一起进步,何尝不是人生一乐事呢。

代码示例