天天看點

使用阻塞隊列實作并發資料安全

最近一段時間在學習golang,golang的一個亮點,就是并發程式設計。在并發程式設計中,雖然也支援顯式的對并發代碼進行加鎖(這一點看來和java并無太大差別)。但go推薦使用channel進行線程間的通信,channel是一個線程安全的阻塞隊列,使用channel,可以實作多線程(協程)之間的資料一緻性。

在java中,多線程實作資料一緻性,一般會使用 synchronized或者volatile或者Lock,那麼能不能使用java中的阻塞隊列來保證資料的并發安全呢?

下面是一段使用多線程求和的代碼,使用synchronized關鍵字修飾代碼塊:

public class Main2 {
    private static long sum;
    private static final int COUNT=10;
    //使用CountDownLatch是為了子線程和主線程之間保持同步,便于主線程輸出計算結果(使用join也可以實作類似功能)
    static CountDownLatch latch = new CountDownLatch(COUNT);

    public static void main(String[] args) throws Exception {
        long startTime= System.currentTimeMillis();
        for (int i = 1; i <= COUNT; i++) {
            MyRunnable runnable = new MyRunnable((i - 1) * 1000000 + 1, i * 1000000);
            Thread thread = new Thread(runnable);
            thread.start();
        }
        latch.await();
        System.out.println("計算結果是:"+sum);
        System.out.println("花費的時間:"+(System.currentTimeMillis()-startTime));
    }

    static class MyRunnable implements Runnable {
        int startValue;
        int endValue;

        public MyRunnable(int startValue, int endValue) {
            this.startValue = startValue;
            this.endValue = endValue;
            System.out.println(startValue + "--------" + endValue);
        }

        @Override
        public void run() {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (Main.class) {
                for (; startValue <= endValue; startValue++) {
                    sum += startValue;
                }
            }
            latch.countDown();
        }
    }
}
           

輸出:

計算結果是:50000005000000
花費的時間:47
           

下面是使用JUC下的阻塞隊列實作的多線程求和代碼:

public class Main {
    private static ArrayBlockingQueue<Long> queue;
    private static final int COUNT=10;
    //使用CountDownLatch是為了子線程和主線程之間保持同步,便于主線程輸出計算結果(使用join也可以實作類似功能)
    static  CountDownLatch latch = new CountDownLatch(COUNT);

    public static void main(String[] args) throws Exception {
        //隊列長度為1,存放計算結果。如果有兩個多個線程同時讀取資料或者寫入資料,隻會有一個成功,另外的線程阻塞
        queue = new ArrayBlockingQueue<Long>(1);
        queue.put(0L);

        long startTime= System.currentTimeMillis();
        for (int i = 1; i <= COUNT; i++) {
            MyRunnable runnable = new MyRunnable((i - 1) * 1000000 + 1, i * 1000000);
            Thread thread = new Thread(runnable);
            thread.start();
        }
        latch.await();
        System.out.println("計算結果是:"+queue.peek());
        System.out.println("花費的時間:"+(System.currentTimeMillis()-startTime));

    }

    static class MyRunnable implements Runnable {
        int startValue;
        int endValue;
        public MyRunnable(int startValue, int endValue) {
            this.startValue = startValue;
            this.endValue = endValue;
            System.out.println(startValue + "--------" + endValue);
        }

        @Override
        public void run() {
            Long sum = 0L;
            try {
                //休眠的目的是為了更好的觀測是否發生了多線程導緻的資料不一緻現象
                Thread.sleep(10);
                //隊列取元素,阻塞方法
                sum = queue.take();
                for (; startValue <= endValue; startValue++) {
                    sum += startValue;
                }
                //隊列放元素,阻塞方法
                queue.put(sum);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                latch.countDown();
            }
        }
    }
}
           

輸出:

計算結果是:50000005000000
花費的時間:153
           

總結:使用synchronized關鍵字比使用阻塞隊列效率高得多,這個主要和阻塞隊列的内部實作有關( ArrayBlockingQueue内部使用了 ReentrantLock,take和put這兩個方法都加了鎖,是以效率比較低)