天天看點

Java并發程式設計中的同步器

1. CyclicBarrier【參考文獻】

  • 和 CountDownLatch 類似
  • 線程會等待,直到足夠多線程達到了事先規定的資料。一旦觸發條件,就可以進行下一步的操作
  • 适用于線程之間互相等待處理結果就緒的場景
  • CyclicBarrier

    可以構造一個集結點,當某一個線程執行完畢,它就會到集結點等待,直到所有線程都到了集結點,那麼該栅欄就會被撤銷,所有線程再統一出發,繼續執行剩下的任務

用法一:等待所有人到達指定地點,再統一出發

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到了,大家統一出發");
            }
        });

        for (int i = 0; i < 5; i++) {
            new Thread(new Task(cyclicBarrier)).start();
        }
    }

    static class Task implements Runnable {

        private final CyclicBarrier cyclicBarrier;

        public Task(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " 現在前往集合地點");
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println(Thread.currentThread().getName() + " 已經到了集合地點,等待其他人到達");
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}
           

用法二:👉 CyclicBarrier簡單使用示例

2. CountDownLatch【參考文獻】

  • 和 CyclicBarrier 類似,數量遞減到0時,觸發動作
  • 但是不可重複使用

主要方法:

  • CountDownLatch(int count) ,僅有一個構造函數,參數 count 為需要倒數的數值
  • await() ,調用 await() 方法的現場會被挂起,它會等待直到 count 值為0才繼續執行
  • countDown() ,講 count 值減1,直到為0時,等待的現場會被喚起

用法一:一個線程等待多個線程都執行完畢,再繼續自己的工作(一等多)

import java.util.concurrent.*;

public class Test {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("Num:" + no);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                    }
                }
            };
            service.submit(runnable);
        }
        System.out.println("等待5個線程執行完....");
        latch.await();
        System.out.println("所有線程都已經執行完,可以進入下一個環節了。");
    }
}
           

用法二:多個線程等待某一個線程的信号,然後同時開始執行(多等一)

import java.util.concurrent.*;

/**
 * 模拟跑步比賽,5名選手等待1名裁判發令,發令後所有人同時開始跑步
 */
public class Test {

    public static void main(String[] args) throws InterruptedException {
        // 1名裁判
        CountDownLatch latch = new CountDownLatch(1);
        // 5名選手
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("NO." + no + "号選手準備完畢,等待發令");
                        latch.await();
                        System.out.println("NO." + no + "開始跑步");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.submit(runnable);
        }
        System.out.println("裁判檢查發令槍....");
        Thread.sleep(2000);
        System.out.println("裁判檢查完畢,比賽開始....");
        latch.countDown();
    }
}
           

CyclicBarrier 和 CountDownLatch 的差別

作用不同:CyclicBarrier要等固定數量的線程都到達了栅欄位置才能繼續執行,而CountDownLatch隻需等待數字到0,也就是說,CountDownLatch用于事件,但是CyclicBarrier是用于線程的

可重用性不同:CountDownLatch在倒數到0并觸發門闩打開後,就不能再次使用了,除非建立新的執行個體;而CyclicBarrier可以重複使用

3. Semaphore【參考文獻】

  • 信号量,可以通過控制“許可證”的數量,來保證線程之間的配合
  • 線程隻有在拿到“許可證”後才能繼續運作,相比于其他的同步器,更靈活

主要方法:

  1. new Semaphore(int permits, boolean

    fair),這裡可以設定是否要設定公平政策,如果傳入true,那麼Semaphore會把之前等待的線程放到FIFO的隊列裡,以便于當有了新的許可證,可以分發給之前等了最長時間的線程;

  2. acquire(),擷取許可證,沒有的話會陷入阻塞狀态;
  3. acquireUninterruptibly(),同上,但是可以響應中斷;
  4. tryAcquire(),看看現在有沒有空閑的許可證,如果有的話就擷取,沒有的話,不用陷入阻塞狀态,可以去做其它的事,過一會再來檢視許可證的空閑情況
  5. tryAcquire(long timeout, TimeUnit unit),和 tryAcquire()

    一樣,但是多了一個逾時時間,比如“在3秒内擷取不到許可證,就去做别的事情”

  6. release(),操作完成後,歸還許可證
  7. 可以指定擷取和釋放的許可證數量,擷取和釋放的數量必須一緻

用法:

import java.util.concurrent.*;

public class Test {

    static Semaphore semaphore = new Semaphore(3, true);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 100; i++) {
            service.submit(new Task());
        }
        service.shutdown();
    }

    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + " 拿到了許可證");

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + " 釋放了許可證");
            semaphore.release();
        }
    }
}
           

4. Condition【參考文獻】

  • 控制線程的“等待”和“喚醒”,Object.wait()的更新版

實作生産者消費者

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo {

    private final int queueSize = 10;
    private final PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo demo = new ConditionDemo();
        Produce produce = demo.new Produce();
        Consume consume = demo.new Consume();
        produce.start();
        consume.start();
    }

    /**
     * 消費者
     */
    class Consume extends Thread {
        @Override
        public void run() {
            try {
                consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void consume() throws InterruptedException {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("隊列空,等待資料");
                        notEmpty.await();
                    }

                    // 從隊列中取資料
                    queue.poll();
                    // 并且喚醒生産者
                    notFull.signal();

                    System.out.println("從隊列中取走了一條資料,隊列剩餘:" + queue.size());

                } finally {
                    lock.unlock();
                }
            }
        }
    }

    /**
     * 生産者
     */
    class Produce extends Thread {
        @Override
        public void run() {
            try {
                produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        private void produce() throws InterruptedException {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("隊列滿,等待消費者進行消費");
                        notFull.await();
                    }

                    // 開始生産資料
                    queue.offer(1);
                    // 并且喚醒消費者
                    notEmpty.signal();

                    System.out.println("向隊列添加了一條資料,隊列剩餘:" + queue.size());

                } finally {
                    lock.unlock();
                }
            }
        }
    }
}
           

5. Phaser【參考文獻 】

和 CyclicBarrier 類似

示例:4個線程執行完後做一次同步操作

import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;

public class PhaserTest {

    Phaser phaser = new Phaser();
    ExecutorService executorService = Executors.newCachedThreadPool();

    class Worker implements Runnable {
        @Override
        public void run() {
            phaser.register();
            while (true) {
                try {
                    Thread.sleep(500);
                    System.out.println("working:" + phaser.getPhase());
                    phaser.arriveAndAwaitAdvance();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public void run() throws InterruptedException {
        phaser.register();
        executorService.execute(new Worker());
        executorService.execute(new Worker());
        executorService.execute(new Worker());
        executorService.execute(new Worker());
        while (true) {
            phaser.arriveAndAwaitAdvance();
            System.out.println("Sync..." + phaser.getPhase());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        var test = new PhaserTest();
        test.run();
    }
}
           

6. Exchanger

讓兩個線程在合适時交換資料

适用場景:當兩個線程工作在同一個類的不同執行個體上時,用于交換資料

解決了什麼問題:線程間高效交換資料

最後來自小編的福利

小編整理了一份大廠真題的面試資料,以及2021最新Java核心技術整理的資料集錦,需要領取的小夥伴可以 私聊關注我 免費領取 ,程式設計的世界永遠向所有熱愛程式設計的人開放,這是一個自由,平等,共享的世界,我始終是這樣堅信的。

喜歡小編的分享可以點贊關注哦,小編持續為你分享最新文章 和 福利領取哦