天天看點

同步工具(未完待更新)CountDownLatch一次性栅欄Semaphore信号量CyclicBarrier循環同步栅欄Exchange線程間變量交換

CountDownLatch一次性栅欄

1 概念與用法

CountDownLatch是一個用來同步多個線程的并發工具,n個線程啟動後,分别調用CountDownLatch的await方法來等待其m個條件滿足(m在初始化時指定);

每當有條件滿足時,目前線程調用CountDownLatch的countDown方法,使得其m值減1;

直至m值為0時,所有等待的線程被喚醒,繼續執行。

注意,CountDownLatch是一次性的,當條件滿足後,它不能再回到初始狀态,也不能阻止後續線程了。

若要循環的阻塞多個線程,則考慮使用CyclicBarrier。

例如5匹馬參加賽馬比賽,需等待3個裁判到位後才能啟動,代碼如下:

public class CountDownLatchExam {
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(3);
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            service.submit(new Horse("horse" + i, latch));
        }
        for (int i = 0; i < 3; i++) {
            service.submit(new Judge("judge" + i, latch));
        }
        service.shutdown();
    }


    private static class Horse implements Runnable {
        private final String name;
        private final CountDownLatch latch;

        Horse(String name, CountDownLatch latch) {
            this.name = name;
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + " is ready,wait for all judges.");
                latch.await();
                System.out.println(name + " is running.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static class Judge implements Runnable {
        private final String name;
        private final CountDownLatch latch;
        private static Random random = new Random(System.currentTimeMillis());

        Judge(String name, CountDownLatch latch) {
            this.name = name;
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(name + " is ready.");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
      
horse0 is ready,wait for all judges.
horse1 is ready,wait for all judges.
horse2 is ready,wait for all judges.
horse3 is ready,wait for all judges.
horse4 is ready,wait for all judges.
judge2 is ready.
judge1 is ready.
judge0 is ready.
horse0 is running.
horse1 is running.
horse2 is running.
horse3 is running.
horse4 is running.      

CountDownLatch的原理在上一篇的4.7節“一次喚醒所有阻塞線程的共享鎖”中已經詳細闡述了。簡要複述一下,CountDownLatch使用AQS的子類Sync作為内部的同步器,并由Sync複寫了AQS的tryAcquireShared和tryReleaseShared方法。它将AQS中的state當做需要滿足的條件個數,生成了一個共享鎖。

每當調用await方法時,内部調用了tryAcquireShared方法,由于state>0,是以調用的線程會阻塞在共享鎖的循環架構中。

每當調用countDown方法時,内部調用了releaseShared方法,而此方法将會把state的值減1,當state的值為0時,tryAcquireShared中的循環将會喚醒所有等待線程,使之繼續運作。由于tryAcquireShared方法中沒有修改state值,是以CountDownLatch隻能一次性使用,不能循環使用。

若需知道更多細節,請直接閱讀CountDownLatch和AQS的源代碼。提醒一句,CountDownLatch的源代碼是所有AQS的應用中最簡單的,應當從它讀起。

Semaphore信号量

Semaphore信号量,在多個任務争奪幾個有限的共享資源時使用。

調用acquire方法擷取一個許可,成功擷取的線程繼續執行,否則就阻塞;

調用release方法釋放一個許可。每當有空餘的許可時,阻塞的線程和其他線程可競争許可。

下面的例子中,10輛車競争3個許可證,有了許可證的車就可以入内通路資源,通路完成後釋放許可證:

作者:Alex Wang
連結:https://zhuanlan.zhihu.com/p/27829595
來源:知乎
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

public class SemaphoreExam {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        ExecutorService service = Executors.newCachedThreadPool();
        // 10 cars wait for 3 semaphore
        for (int i = 0; i < 10; i++) {
            service.submit(new Car("Car" + i, semaphore));
        }

        service.shutdown();
    }
    private static class Car implements Runnable {
        private final String name;
        private final Semaphore semaphore;
        private static Random random = new Random(System.currentTimeMillis());

        Car(String name, Semaphore semaphore) {
            this.name = name;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + " is waiting for a permit");
                semaphore.acquire();
                System.out.println(name+" get a permit to access, available permits:"+semaphore.availablePermits());
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(name + " release a permit, available permits:"+semaphore.availablePermits());
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}      

注意,運作時semaphore.availablePermits()方法會傳回目前空餘的許可證數量。但由于線程擷取許可證的速度往往快于IO的速度,是以很多時刻看到這個數字都是0。

2 原理

Semaphore的原理在上一篇的4.8節“擁有多個許可證的共享鎖”中已經詳細闡述了。

簡要複述一下,Semaphore使用AQS的子類Sync作為内部的同步器,并由Sync複寫了AQS的tryAcquireShared和tryReleaseShared方法。

它将AQS中的state當做許可證的個數,生成了一個共享鎖。state的值在Semaphore的構造函數中指定,必須大于0。

1.每當調用acquire方法時,内部調用了tryAcquireShared方法,此方法檢測state的值是否>0,若是則将state減1,并繼續運作,否則線程就阻塞在共享鎖的循環架構中。

2.每當調用release方法時,内部調用了releaseShared方法,而此方法将會把state的值加1,當state的值大于0時,tryAcquireShared中的循環将會喚醒所有等待線程,使之繼續運作,重新競争許可證。

若需知道更多細節,請直接閱讀Semaphore和AQS的源代碼。

CyclicBarrier循環同步栅欄

CyclicBarrier可用來在某些栅欄點處同步多個線程,且可以多次使用,每次在栅欄點同步後,還可以激發一個事件。

例如三個旅遊者(線程)各自出發,依次到達三個城市,必須每個人都到達某個城市後(栅欄點),才能再次出發去向下一個城市,當他們每同步一次時,激發一個事件,輸出一段文字。代碼如下:

public class CyclicBarrierExam {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                System.out.println("======== all threads have arrived at the checkpoint ===========");
            }
        });
        ExecutorService service = Executors.newFixedThreadPool(3);
        service.submit(new Traveler("Traveler1", barrier));
        service.submit(new Traveler("Traveler2", barrier));
        service.submit(new Traveler("Traveler3", barrier));
        service.shutdown();
    }
    private static class Traveler implements Runnable {
        private final String name;
        private final CyclicBarrier barrier;
        private static Random rand = new Random(47);

        Traveler(String name, CyclicBarrier barrier) {
            this.name = name;
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(rand.nextInt(5));
                System.out.println(name + " arrived at Beijing.");
                barrier.await();
                TimeUnit.SECONDS.sleep(rand.nextInt(5));
                System.out.println(name + " arrived at Shanghai.");
                barrier.await();
                TimeUnit.SECONDS.sleep(rand.nextInt(5));
                System.out.println(name + " arrived at Guangzhou.");
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }
}      

CyclicBarrier是依賴一個可重入鎖ReentrantLock和它的一個Condition實作的,

在構造時,CyclicBarrier得到了一個parties數值,它代表參與的線程數量,以及一個Runnable的執行個體,它代表被激發的事件。

每當有線程調用await時,parties減1。若此時parties大于0,線程就在Condition處阻塞,若parties等于0,則此Condition會調用signalAll釋放所有等待線程,并觸發事件,同時将parties複原。是以所有的線程又進入下一輪循環。

CyclicBarrier代碼非常簡單,複雜之處在于它還要處理線程中斷、逾時等情況。

Exchange線程間變量交換

Exchange專門用于成對的線程間同步的交換一個同類型的變量,這種交換是線程安全且高效的。直接來看一個例子:

public class ExchangerExam {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(new StringHolder("LeftHand", "LeftValue", exchanger));
        service.submit(new StringHolder("RightHand", "RightValue", exchanger));
        service.shutdown();
    }

    private static class StringHolder implements Runnable {
        private final String name;
        private final String val;
        private final Exchanger<String> exchanger;
        private static Random rand = new Random(System.currentTimeMillis());

        StringHolder(String name, String val, Exchanger<String> exchanger) {
            this.name = name;
            this.val = val;
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + " hold the val:" + val);
                TimeUnit.SECONDS.sleep(rand.nextInt(5));
                String str = exchanger.exchange(val);
                System.out.println(name + " get the val:" + str);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}      

可以看到,代碼中兩個線程同步的交換了一個String。先執行exchange方法的線程會阻塞直到後一個線程也執行了exchange方法,然後同步的完成資料的交換。再看一個例子:

public class ExchangerExam2 {
    public static void main(String[] args) throws InterruptedException {
        Exchanger<String> exchanger = new Exchanger<>();
        ExecutorService service = Executors.newCachedThreadPool();
        long start = System.currentTimeMillis();
        service.submit(new StringHolder("LeftHand", "LeftValue", exchanger));
        service.submit(new StringHolder("RightHand", "RightValue", exchanger));
        service.shutdown();
        service.awaitTermination(1, TimeUnit.DAYS);
        long end = System.currentTimeMillis();
        System.out.println("time span is " + (end - start) + " milliseconds");
    }


    private static class StringHolder implements Runnable {
        private final String name;
        private final String val;
        private final Exchanger<String> exchanger;
        private static Random rand = new Random(System.currentTimeMillis());

        StringHolder(String name, String val, Exchanger<String> exchanger) {
            this.name = name;
            this.val = val;
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10000; i++) {
//                    System.out.println(name + "-" + i + ": hold the val:" + val + i);
//                    TimeUnit.NANOSECONDS.sleep(rand.nextInt(5));
                    String str = exchanger.exchange(val + i);
//                    System.out.println(name + "-" + i + ": get the val:" + str);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}      

代碼中,兩個線程交換了10000組資料,用時僅41ms,這說明Exchanger的同步效率是非常高的。

再看一段代碼:

作者:Alex Wang
連結:https://zhuanlan.zhihu.com/p/27829595
來源:知乎
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

public class ExchangerExam3 {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(new StringHolder("North", "NorthValue", exchanger));
        service.submit(new StringHolder("East", "EastValue", exchanger));
        service.submit(new StringHolder("West", "WestValue", exchanger));
        service.submit(new StringHolder("South", "SouthValue", exchanger));
        service.shutdown();
    }

    private static class StringHolder implements Runnable {
        private final String name;
        private final String val;
        private final Exchanger<String> exchanger;
        private static Random rand = new Random(System.currentTimeMillis());

        StringHolder(String name, String val, Exchanger<String> exchanger) {
            this.name = name;
            this.val = val;
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10000; i++) {
                    System.out.println(name + "-" + i + ": hold the val:" + val + i);
                    TimeUnit.NANOSECONDS.sleep(rand.nextInt(5));
                    String str = exchanger.exchange(val + i);
                    System.out.println(name + "-" + i + ": get the val:" + str);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}      

這段代碼在運作時有很大的機率會死鎖,原因就是Exchanger是用來在“成對”的線程之間交換資料的,像上面這樣在四個線程之間交換資料,Exchanger很有可能将多個線程互相阻塞在其Slot中,造成死鎖。

Exchanger這個類初看非常簡單,其公開的接口僅有一個無參構造函數,兩個重載的泛型exchange方法:

public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException      

第一個方法用來持續阻塞的交換資料;第二個方法用來在一個時間範圍内交換資料,若逾時則抛出TimeoutException後傳回,同時喚醒另一個阻塞線程。

Exchanger的基本原理是維持一個槽(Slot),這個Slot中存儲一個Node的引用,這個Node中儲存了一個用來交換的Item和一個用來擷取對象的洞Hole。如果一個來“占有”的線程看見Slot為null,則調用CAS方法使一個Node對象占據這個Slot,并等待另一個線程前來交換。如果第二個來“填充”的線程看見Slot不為null,則調用CAS方法将其設定為null,同時使用CAS與Hole交換Item,然後喚醒等待的線程。注意所有的CAS操作都有可能失敗,是以CAS必須是循環調用的。

看看JDK1.7中Exchanger的資料結構相關源代碼:

// AtomicReference中存儲的是Hole對象
private static final class Node extends AtomicReference<Object> {
    /** 用來交換的對象. */
    public final Object item;

    /** The Thread waiting to be signalled; null until waiting. */
    public volatile Thread waiter;

    /**
     * Creates node with given item and empty hole.
     * @param item the item
     */
    public Node(Object item) {
        this.item = item;
    }
}
//Slot中存儲的是Node
private static final class Slot extends AtomicReference<Object> {
    //這一行是為了防止僞共享而加入的緩沖行,與具體算法無關
long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
}
//一個Slot數組,數組中有32個Slot,隻在必要時才建立
private volatile Slot[] arena = new Slot[CAPACITY];      

下面是進行交換操作的核心算法:/

private Object doExchange(Object item, boolean timed, long nanos) {
    Node me = new Node(item);               // 建立一個Node,預備在“占用”時使用
    int index = hashIndex();                   // 目前Slot的哈希值
    int fails = 0;                            // CAS失敗次數

    for (;;) {
        Object y;                          // 目前Slot中的内容
        Slot slot = arena[index];              //得到目前的Slot
        if (slot == null)                     // 延遲加載slots
            createSlot(index);                // 建立Slot并重入循環
        else if ((y = slot.get()) != null &&  // 如果Hole不為null,準備“填充”
                 slot.compareAndSet(y, null)) {
            Node you = (Node)y;               // 從這裡開始交換資料
            if (you.compareAndSet(null, item)) {
                LockSupport.unpark(you.waiter);  //喚醒等待線程
                return you.item;                //“填充”線程從這裡傳回值
            }                                // 上面條件不滿足,重入循環
        }
        else if (y == null &&                 // 如果Hole為null,準備“占有”
                 slot.compareAndSet(null, me)) {
            if (index == 0)                   // 在slot 0上等待交換
                return timed ?
                    awaitNanos(me, slot, nanos) :
                    await(me, slot);
            Object v = spinWait(me, slot);    // Slot位置不為0時,自旋等待交換
            if (v != CANCEL)
                return v;                 //“占有”線程從這裡傳回值
            me = new Node(item);              // 抛棄被取消的Node,建立新Node
            int m = max.get();
            if (m > (index >>>= 1))           // index右移1位,相當于arena中slot向右1位
                max.compareAndSet(m, m - 1);  // 縮表
        }
        else if (++fails > 1) {               // 在第一個Slot上運作兩次失敗
            int m = max.get();
            if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
                index = m + 1;                // 第三次失敗時index增加
            else if (--index < 0)
                index = m;                    // 當index小于0時,指派為m
        }
    }
}      

繼續閱讀