1. CyclicBarrier【參考文獻】
- 和 CountDownLatch 類似
- 線程會等待,直到足夠多線程達到了事先規定的資料。一旦觸發條件,就可以進行下一步的操作
- 适用于線程之間互相等待處理結果就緒的場景
-
CyclicBarrier
可以構造一個集結點,當某一個線程執行完畢,它就會到集結點等待,直到所有線程都到了集結點,那麼該栅欄就會被撤銷,所有線程再統一出發,繼續執行剩下的任務
用法一:等待所有人到達指定地點,再統一出發
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有人都到了,大家統一出發");
}
});
for (int i = 0; i < 5; i++) {
new Thread(new Task(cyclicBarrier)).start();
}
}
static class Task implements Runnable {
private final CyclicBarrier cyclicBarrier;
public Task(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 現在前往集合地點");
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + " 已經到了集合地點,等待其他人到達");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
用法二:👉 CyclicBarrier簡單使用示例
2. CountDownLatch【參考文獻】
- 和 CyclicBarrier 類似,數量遞減到0時,觸發動作
- 但是不可重複使用
主要方法:
- CountDownLatch(int count) ,僅有一個構造函數,參數 count 為需要倒數的數值
- await() ,調用 await() 方法的現場會被挂起,它會等待直到 count 值為0才繼續執行
- countDown() ,講 count 值減1,直到為0時,等待的現場會被喚起
用法一:一個線程等待多個線程都執行完畢,再繼續自己的工作(一等多)
import java.util.concurrent.*;
public class Test {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("Num:" + no);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
};
service.submit(runnable);
}
System.out.println("等待5個線程執行完....");
latch.await();
System.out.println("所有線程都已經執行完,可以進入下一個環節了。");
}
}
用法二:多個線程等待某一個線程的信号,然後同時開始執行(多等一)
import java.util.concurrent.*;
/**
* 模拟跑步比賽,5名選手等待1名裁判發令,發令後所有人同時開始跑步
*/
public class Test {
public static void main(String[] args) throws InterruptedException {
// 1名裁判
CountDownLatch latch = new CountDownLatch(1);
// 5名選手
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("NO." + no + "号選手準備完畢,等待發令");
latch.await();
System.out.println("NO." + no + "開始跑步");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.submit(runnable);
}
System.out.println("裁判檢查發令槍....");
Thread.sleep(2000);
System.out.println("裁判檢查完畢,比賽開始....");
latch.countDown();
}
}
CyclicBarrier 和 CountDownLatch 的差別
作用不同:CyclicBarrier要等固定數量的線程都到達了栅欄位置才能繼續執行,而CountDownLatch隻需等待數字到0,也就是說,CountDownLatch用于事件,但是CyclicBarrier是用于線程的
可重用性不同:CountDownLatch在倒數到0并觸發門闩打開後,就不能再次使用了,除非建立新的執行個體;而CyclicBarrier可以重複使用
3. Semaphore【參考文獻】
- 信号量,可以通過控制“許可證”的數量,來保證線程之間的配合
- 線程隻有在拿到“許可證”後才能繼續運作,相比于其他的同步器,更靈活
主要方法:
-
new Semaphore(int permits, boolean
fair),這裡可以設定是否要設定公平政策,如果傳入true,那麼Semaphore會把之前等待的線程放到FIFO的隊列裡,以便于當有了新的許可證,可以分發給之前等了最長時間的線程;
- acquire(),擷取許可證,沒有的話會陷入阻塞狀态;
- acquireUninterruptibly(),同上,但是可以響應中斷;
- tryAcquire(),看看現在有沒有空閑的許可證,如果有的話就擷取,沒有的話,不用陷入阻塞狀态,可以去做其它的事,過一會再來檢視許可證的空閑情況
-
tryAcquire(long timeout, TimeUnit unit),和 tryAcquire()
一樣,但是多了一個逾時時間,比如“在3秒内擷取不到許可證,就去做别的事情”
- release(),操作完成後,歸還許可證
- 可以指定擷取和釋放的許可證數量,擷取和釋放的數量必須一緻
用法:
import java.util.concurrent.*;
public class Test {
static Semaphore semaphore = new Semaphore(3, true);
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(50);
for (int i = 0; i < 100; i++) {
service.submit(new Task());
}
service.shutdown();
}
static class Task implements Runnable {
@Override
public void run() {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 拿到了許可證");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 釋放了許可證");
semaphore.release();
}
}
}
4. Condition【參考文獻】
- 控制線程的“等待”和“喚醒”,Object.wait()的更新版
實作生産者消費者
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo {
private final int queueSize = 10;
private final PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ConditionDemo demo = new ConditionDemo();
Produce produce = demo.new Produce();
Consume consume = demo.new Consume();
produce.start();
consume.start();
}
/**
* 消費者
*/
class Consume extends Thread {
@Override
public void run() {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void consume() throws InterruptedException {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("隊列空,等待資料");
notEmpty.await();
}
// 從隊列中取資料
queue.poll();
// 并且喚醒生産者
notFull.signal();
System.out.println("從隊列中取走了一條資料,隊列剩餘:" + queue.size());
} finally {
lock.unlock();
}
}
}
}
/**
* 生産者
*/
class Produce extends Thread {
@Override
public void run() {
try {
produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void produce() throws InterruptedException {
while (true) {
lock.lock();
try {
while (queue.size() == queueSize) {
System.out.println("隊列滿,等待消費者進行消費");
notFull.await();
}
// 開始生産資料
queue.offer(1);
// 并且喚醒消費者
notEmpty.signal();
System.out.println("向隊列添加了一條資料,隊列剩餘:" + queue.size());
} finally {
lock.unlock();
}
}
}
}
}
5. Phaser【參考文獻 】
和 CyclicBarrier 類似
示例:4個線程執行完後做一次同步操作
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
public class PhaserTest {
Phaser phaser = new Phaser();
ExecutorService executorService = Executors.newCachedThreadPool();
class Worker implements Runnable {
@Override
public void run() {
phaser.register();
while (true) {
try {
Thread.sleep(500);
System.out.println("working:" + phaser.getPhase());
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void run() throws InterruptedException {
phaser.register();
executorService.execute(new Worker());
executorService.execute(new Worker());
executorService.execute(new Worker());
executorService.execute(new Worker());
while (true) {
phaser.arriveAndAwaitAdvance();
System.out.println("Sync..." + phaser.getPhase());
}
}
public static void main(String[] args) throws InterruptedException {
var test = new PhaserTest();
test.run();
}
}
6. Exchanger
讓兩個線程在合适時交換資料
适用場景:當兩個線程工作在同一個類的不同執行個體上時,用于交換資料
解決了什麼問題:線程間高效交換資料
最後來自小編的福利
小編整理了一份大廠真題的面試資料,以及2021最新Java核心技術整理的資料集錦,需要領取的小夥伴可以 私聊關注我 免費領取 ,程式設計的世界永遠向所有熱愛程式設計的人開放,這是一個自由,平等,共享的世界,我始終是這樣堅信的。
喜歡小編的分享可以點贊關注哦,小編持續為你分享最新文章 和 福利領取哦