天天看點

Java并發包--JUC學習(上)JUC

文章目錄

  • JUC
    • 程序與線程
    • 賣票問題(ReentanrLock)
    • 生産者與消費者(Condition)
      • Condition 的精确通知
    • “8 鎖的現象”
    • 多線程下的集合
      • List
      • Set
      • HashMap
    • Callable
    • JUC 輔助類
      • CountDownLatch
      • CyclicBarrier
      • Semaphore
    • 讀寫鎖
    • 阻塞隊列
      • 種類分析
      • ArrayBlockingQueue
      • LinkedBlockingQueue
      • PriorityBlockingQueue
      • DelayQueue
      • SynchronousQueue
      • LinkedTransferQueue
      • LinkedBlockingDeque
      • 阻塞的實作
    • 線程池
      • ThreadPoolExecutor

JUC

java.util.concurrent 是 Java 的并發工具包;

程序與線程

程序(Process) 是系統進行資源配置設定和排程的基本機關,是作業系統結構的基礎。

線程(thread) **是作業系統能夠進行運算排程的最小機關。**它被包含在程序之中,是程序中的實際運作機關。一條線程指的是程序中一個單一順序的控制流,一個程序中可以并發多個線程,每條線程并行執行不同的任務。

  • 線程是程式執行的最小機關,而程序是作業系統配置設定資源的最小機關;
  • 一個程序由一個或多個線程組成,線程是一個程序中代碼的不同執行路線;
  • 程序之間互相獨立,但同一個程序下的各個線程之間共享程式的記憶體空間(包括代碼段,資料集,堆等)及一些程序級的資源(如打開檔案和信号等),某程序内的線程在其他程序不可見;
  • 排程和切換:線程上下文切換比程序上下文切換要快得多;

線程和程序一樣分為五個階段:建立(New)、就緒(Runnable)、運作(Running)、阻塞(Blocked)和死亡(Dead)。

在Java的線程中(Thread.State)存在六種基本狀态:建立(NEW),運作(RUNNABLE),阻塞(BLOCKED),等待(WAITING),時間等待(TIMED_WAITING,存在時間,逾時後不再等待),死亡(TERMINATED)。

Java并發包--JUC學習(上)JUC

JUC 其實也就是 java.util.concurrent 包,是一個對于線程的工具包

賣票問題(ReentanrLock)

三個核心:線程,公共資源,操作(對外方法);

java.util.concurrent.locks

  • Lock

    實作提供比使用

    synchronized

    方法和語句可以獲得的更廣泛的鎖定操作。 它們允許更靈活的結構化,可能具有完全不同的屬性,并且可以支援多個相關聯的對象

    Condition

class Ticket{
    private int ticketNum = 30;
    private Lock lock = new ReentrantLock();
    public void saleTicket(){
        lock.lock();
        try {
            if(ticketNum > 0){
                System.out.println(Thread.currentThread().getName() + "\t 正在銷售第" + ticketNum + "張票,剩餘" + --ticketNum + "張票");
            }
        }finally{
            lock.unlock();
        }
    }
}

public class Test{
    public static void main(String[] args) {
        Ticket ticket = new Ticket();
        new Thread(()->{for(int i = 1; i <=40; i++) ticket.saleTicket();},"A").start();;
        new Thread(()->{for(int i = 1; i <=40; i++) ticket.saleTicket();},"B").start();;
        new Thread(()->{for(int i = 1; i <=40; i++) ticket.saleTicket();},"C").start();;
    }
}
           

生産者與消費者(Condition)

Condition 接口中的重要方法:

void await() throws InterruptedException;
void signal();
void signalAll();
           

之前的生産者與消費者模式:(注意:虛假喚醒問題)

核心:判斷,操作(生産/消費),通知;

//資源類
class AirConditioner{
    private int number = 0;
    public synchronized void increment() throws InterruptedException{
        //1. 判斷
        if(number != 0){
            this.wait();
        }
        //2. 生産
        number++;
        System.out.println(Thread.currentThread().getName() + "\t" + number);
        //3. 通知
        this.notifyAll();
    }
    public synchronized void decrement() throws InterruptedException{
        //1. 判斷
        if(number == 0){
            this.wait();
        }
        //2. 消費
        number--;
        System.out.println(Thread.currentThread().getName() + "\t" + number);
        //3. 通知
        this.notifyAll();
    }
}

public class ThreadWaitNotifyDemo{
    public static void main(String[] args) {
        AirConditioner airConditioner = new AirConditioner();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    airConditioner.increment();
                } catch (InterruptedException e) {
                    
                }
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    airConditioner.decrement();
                } catch (InterruptedException e) {
                    
                }
            }
        },"B").start();
    }
}
           

上述代碼雖然可以正常運作,但是在多個生産者和消費者的情況下可能會出現虛假喚醒的情況;是以,我們必須解決虛假喚醒的狀況;

在資源類判斷時,不能使用 if 語句,而是 while 循環的判斷才能解決該問題;

原因: if 語句時,當生産者判斷為 true 時,進入等待狀态并釋放鎖資源,但下次進入的線程可能不是消費者而是生産者,這時生産者又進入等待狀态,再到下一次時肯定是消費者線程進入,當消費者消費完成後調用 notifyAll 方法會将之前等待的兩個消費者線程全部喚醒,緊接着就回去執行“生産語句”,最終就會出現列印的結果有 2 的情況了;

但如果是 while 循環判斷的話,當兩個消費者線程運作後,就會先再次判斷一次循環條件,而這時 number 公共資源已經不滿足條件,然後就會讓其中一個消費者線程去執行“生産語句”,另一個繼續等待,這時就會成功的避免上述的問題。

public synchronized void increment() throws InterruptedException{
        //1. 判斷
        while(number != 0){
            this.wait();
        }
        //2. 生産
        number++;
        System.out.println(Thread.currentThread().getName() + "\t" + number);
        //3. 通知
        this.notifyAll();
    }
           

使用 JUC 中的 locks 下的 Condition 接口專注的去解決 wait 和 notify 等功能;

Lock

替換

synchronized

方法和語句的使用,

Condition

取代了對象螢幕方法的使用;

//資源類
class AirConditioner{
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    public void increment() throws InterruptedException{
        lock.lock();
        try {
            //1. 判斷
        while(number != 0){
            condition.await();
        }
        //2. 生産
        number++;
        System.out.println(Thread.currentThread().getName() + "\t" + number);
        //3. 通知
        condition.signalAll();
        } finally{
            lock.unlock();
        }
    }
    public void decrement() throws InterruptedException{
        lock.lock();
        try {
            //1. 判斷
        while(number == 0){
            condition.await();
        }
        //2. 生産
        number--;
        System.out.println(Thread.currentThread().getName() + "\t" + number);
        //3. 通知
        condition.signalAll();
        } finally{
            lock.unlock();
        }
    }
}
           

Condition 的精确通知

當然, Condition 接口不僅僅是簡單的将 wait 和 notify 替換,它還能做精确通知(有順序的等待和喚醒);

示例:三個線程 A , B 和 C ,A 線程列印 A 5 次,B 線程列印 B 10 次, C 線程列印 C 15 次,共列印 10 輪;

相當于 A 線程對用的 number 值為1,以此類推;并且,在判斷時,隻要不等于線程對應的值時,就讓該線程進行等待;注意: condition1/2/3 隻是代表 condition 對象,而不是對應的線程的 condition ;

比如,方法 print5 :如果當 A 線程進行執行時并且 numner 也為 1 時(為 1 時,其他線程就會進行等待,如,B 線程的方法中的判斷就會為 true 進而執行 condition2.await() ,是以在 A 線程列印時,B 和 C 線程就會等待 A),判斷傳回 false 不會進行等待,就會去執行下面的 for 循環,當 for 執行完後,就需要 B 線程執行它的 for 循環(記住,是有順序的執行),是以将 number 的值修改為 2 ,并且讓正在等待的 B 線程喚醒;

//資源類
class ShareResource {
    // 1:A ,2:B , 3:C
    private int number = 1;
    private Lock lock = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();
    public void print5() throws InterruptedException {
        lock.lock();
        try {
            // 1. 判斷
            while (number != 1) {
                 //隻要 number 不為 1 時,說明是其它線程對應的 number 值,是以讓該線程進行等待
                condition1.await();
            }
            // 2. 操作
            for (int i = 1; i <= 5; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + i);
            }
            // 3. 通知
           number = 2;
           //喚醒 condition2
           condition2.signal();
        } finally {
            lock.unlock();
        }
    }
    public void print10() throws InterruptedException {
        lock.lock();
        try {
            // 1. 判斷
            while (number != 2) {
                //隻要 number 不為 2 時,說明是其它線程對應的 number 值,是以讓該線程進行等待
                condition2.await();
            }
            // 2. 操作
            for (int i = 1; i <= 10; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + i);
            }
            // 3. 通知
            number = 3;
            condition3.signal();
        } finally {
            lock.unlock();
        }
    }
    public void print15() throws InterruptedException {
        lock.lock();
        try {
           // 1. 判斷
           while (number != 3) {
            //隻要 number 不為 3 時,說明是其它線程對應的 number 值,是以讓該線程進行等待
            condition3.await();
        }
        // 2. 操作
        for (int i = 1; i <= 15; i++) {
            System.out.println(Thread.currentThread().getName() + "\t" + i);
        }
        // 3. 通知
        number = 1;
        condition1.signal();
        } finally {
            lock.unlock();
        }
    }
}
public class ConditionTest {
    public static void main(String[] args) {
        ShareResource shareResource = new ShareResource();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareResource.print5();
                } catch (InterruptedException e) {

                }
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareResource.print10();
                } catch (InterruptedException e) {

                }
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareResource.print15();
                } catch (InterruptedException e) {

                }
            }
        }, "C").start();
    }
}
           

“8 鎖的現象”

示例:

class Phone{
    public synchronized void sendEmail() throws Exception{
        System.out.println("-----sendEmail");
    }
    public synchronized void sendSMS() throws Exception{
        System.out.println("-----sendSMS");
    }
}
public class Lock8{
    public static void main(String[] args) throws Exception{
        Phone phone = new Phone();
        new Thread(()->{
            phone.sendEmail();
        },"A").start();
        TimeUnit.MILLISECONDS.sleep(100);
        new Thread(()->{
            phone.sendSMS();
        },"B").start();
    }
}
           

問:

  1. 正常通路,先列印郵件還是短信?

    郵件—>短信

  2. 郵件方法睡眠 3 秒,先列印郵件還是短信?(下面的問題都會在加了睡眠時間的基礎上進行)

    郵件—>短信

    上述 1,2 問題:

    一個對象裡面如果存在多個 synchronized 方法,某一時刻内,隻要有一個線程去調用其中的一個 synchronized 方法,其它的線程都隻能等待,換句話說,某一時刻内,隻能有唯一一個線程去通路這些 synchronized 方法,并且鎖的是目前對象 this ,被鎖之後,其它的線程都不能進入到目前對象的其它的 synchronized 方法。

  3. 新增一個普通方法 hello() ,先列印郵件還是 hello ?

    hello()—>郵件

    class Phone{
        public synchronized void sendEmail(){
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {}
            System.out.println("-----sendEmail");
        }
        public synchronized void sendSMS(){
            System.out.println("-----sendSMS");
        }
        public void hello(){
            System.out.println("-----hello");
        }
    }
    public class Lock8{
        public static void main(String[] args) throws Exception{
            Phone phone = new Phone();
            new Thread(()->{
                phone.sendEmail();
            },"A").start();
            TimeUnit.MILLISECONDS.sleep(200);
            new Thread(()->{
                phone.hello();
            },"B").start();
        }
    }
               
  4. “兩部手機”,先列印郵件還是短信?

    短信—>郵件

    上述 3,4 問題:

    普通方法和加同步鎖的方法無關;換成兩個對象後,它們鎖的不是同一個對象,即不是同一把鎖,是以情況就會不同;

    public static void main(String[] args) throws Exception{
            Phone phone = new Phone();
            Phone phone2 = new Phone();
            new Thread(()->{
                phone.sendEmail();
            },"A").start();
            TimeUnit.MILLISECONDS.sleep(200);
            new Thread(()->{
                phone2.sendSMS();
            },"B").start();
        }
               
  5. 發送郵件和短信的方法修改為靜态同步方法,“同一部手機”,先列印郵件還是短信?

    郵件—>短信

  6. 發送郵件和短信的方法修改為靜态同步方法,“兩部手機”,先列印郵件還是短信?

    郵件—>短信

  7. 一個普通同步方法發送短信 sendSMS() ,一個靜态同步方法 sendEmail() ,“一部手機”,先列印郵件還是短信?

    短信—>郵件

  8. 一個普通同步方法發送短信 sendSMS() ,一個靜态同步方法 sendEmail() ,“兩部手機”,先列印郵件還是短信?

    短信—>郵件

通過上述的問題,就可以很清晰地看出來 synchronized 實作同步的基礎:

Java 中的每一個對象都可以作為鎖;有 3 種表現形式:

  • 對于普通同步方法,鎖的是目前執行個體對象;
  • 對于靜态同步方法,鎖的是目前類的 class 對象;
  • 對于同步代碼塊,鎖的是 synchronized 括号裡配置的對象。

    是以這兩把鎖(this/class)是兩個不同的對象,是以靜态同步方法和普通同步方法之間是不會有競争條件的。

多線程下的集合

List

示例:

public class ListTest{
    public static void main(String[] args) {
        ArrayList<String> list = new ArrayList<>();
        for (int i = 1; i <= 3; i++) {
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(list);
            },String.valueOf(i)).start();
        }
    }
}
           

上述代碼運作結果如下,而且每次運作的結果“類型”可能都不同;

[null, 697f0ec8, b502b230]
[null, 697f0ec8, b502b230]
[null, 697f0ec8, b502b230]
           

如果再将線程數量調大,會發現有如下結果:

[null, c3d3c6ae, 34f4af7e, 9aa99807, 04816aec] 
...
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
        at java.util.ArrayList$Itr.next(ArrayList.java:851)[null, c3d3c6ae, 34f4af7e, 9aa99807, 04816aec]
        at java.util.AbstractCollection.toString(AbstractCollection.java:461)
        at java.lang.String.valueOf(String.java:2994)
        at java.io.PrintStream.println(PrintStream.java:821)
        at ListTest.lambda$0(ListTest.java:11)
        at java.lang.Thread.run(Thread.java:745)
java.util.ConcurrentModificationException
           
  • 出現異常

    java.util.ConcurrentModificationException ,并發修改異常

  • 導緻原因

    并發情況下多線程對集合進行增,删,改等操作,進而使給集合出現修改錯誤;

  • 解決方案
    • 使用 Vector 代替;
    • 使用 Collections.synchronizedList() ;
    • 使用 JUC 的 CopyOnWriteArrayList ;
  • 優化建議

CopyOnWriteArrayList 的 add 方法:

public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
           

CopyOnWrite 容器,即寫時複制;往一個容器添加元素的時候,不直接往目前容器 Object[] 添加,而是先将目前的容器 Object[] 進行 copy 複制,複制出一個新的容器 Object[] newElements ,然後在新的容器 Object[] newElements 中添加元素,添加完畢後再将原容器的引用指向新的容器,也就是 setArray(newElements) ;這樣的好處就是可以對 CopyOnWrite 容器進行并發的讀取而不需要加鎖,因為目前容器不會添加任何元素。是以, CopyOnWrite 容器也是一種讀寫分離的思想。

Set

HashSet 也不是線程安全的,如果在并發情況下也會報出 java.util.ConcurrentModificationException 異常,使用 Collections 和 JUC 中的 CopyOnWriteArraySet 也解決該問題;

HashSet 底層是由 HashMap 實作的,而該 Map 的 value 值是一個 Object 的常量值 PRESENT ;

HashMap

Collections 和 JUC 的 ConcurrentHashMap 解決并發問題;

ConcurrentHashMap 單獨講解;

Callable

自從Java 1.5開始,就提供了接口 Callable 和 接口 Future ,通過它們可以在任務執行完畢之後得到任務執行結果;

Callable 接口中有未實作方法 V call() throws Exception ;

Callable 和 Runnable 接口的差別:

  • Callable 接口可以在任務結束的時候提供一個傳回值,Runnable 無法提供這個功能;
  • Callable 的 call 方法分可以抛出異常,而 Runnable 的 run 方法不能抛出異常;

示例

class MyThread implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("call 方法......");
        return 666;
    }
}
class CallableTest{
    public static void main(String[] args) throws Exception{
        FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());
        new Thread(futureTask,"A").start();
        System.out.println(futureTask.get());
    }
}
           

将實作了 Callable 接口的類中的 call 方法運作起來,就會使用到 FutureTask 類,而該類實作了 Runnable 接口和 Future 接口,是以在啟動該類的 call 方法時,要将該類作為參數傳入 Thread 的構造器中(Thread 構造器參數中可以傳入 Runnable 接口的實作類);

Thread 的構造方法有如下

public Thread(Runnable target, String name) {
        init(null, target, name, 0);
    }
           

傳入的是 Runnable ,而 FutureTask 其實也是實作了 Runnable 接口,這就是為什麼建立 FutureTask 對象了;

Java并發包--JUC學習(上)JUC

如果 new 多個 Thread 後接着啟動 call 方法,那麼最後隻能執行一個 call 方法;

JUC 輔助類

CountDownLatch

  • CountDownLatch 這個類使一個線程等待其他線程各自執行完畢後再執行;
  • 該類是通過一個計數器來實作的,計數器的初始值是線程的數量。每當一個線程執行完畢後,計數器的值就 -1 ,當計數器的值為 0 時,表示所有線程都執行完畢,因 await 方法阻塞的線程會被喚醒,然後繼續執行;

示例

public static void main(String[] args) {
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                System.out.println("Test......");
            }).start();
        }
        System.out.println(Thread.currentThread().getName() + " 結束");
    }
           

上述的結果如下:

Test......
Test......
Test......
main 結束
Test......
Test......
Test......
           

會發現 main 線程在所有的列印未完成的情況下已經結束了,如果想讓 main 線程最後一個完成,可使用 CountDownLatch

public static void main(String[] args) throws InterruptedException {
        //構造器傳入的值為線程的數量
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                //每次執行完成後就減一,直到為 0 就喚醒被阻塞的 main 線程
                countDownLatch.countDown();
                System.out.println("Test......");
            }).start();
        }
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + " 結束");
    }
           

CyclicBarrier

示例

public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("七顆龍珠已集齊,召喚神龍");
        });
        for (int i = 1; i <= 7; i++) {
            final int t = i;
            new Thread(()->{
                System.out.println("收集到第"+ t +"顆龍珠");
                try {
                    //線程等待
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },i+"").start();
        }
    }
           

CountDownLatch 和 CyclicBarrier 差別:

  • CountDownLatch 是一個計數器,線程完成一個記錄一個,計數器遞減,隻能隻用一次;
  • CyclicBarrier 的計數器更像一個閥門,需要所有線程都到達,然後繼續執行,計數器遞增,提供 reset 功能,可以多次使用;

Semaphore

在信号量上,有定義兩種操作

  • acquire(擷取)目前一個線程調用 acquire 操作時,它要麼通過成功擷取信号量(信号量減 1 ),要麼一直等待下去,直到有線程釋放信号量或逾時;
  • release(釋放)實際上會将信号量的值加 1 ,然後喚醒等待的線程;

信号量主要用于兩個目的:

  1. 多個共享資源的互斥使用;
  2. 并發線程數量的控制;

示例

public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "\t搶到車位");
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName() + "\t離開車位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    semaphore.release();
                }
            },"" + i).start();
        }
    }
           

讀寫鎖

多個線程讀取一個共享資源無需加鎖無任何問題,但其中一個線程對共享資源進行寫操作時,就不會讓其它線程對該資源進行讀或寫了;隻要存在一個寫操作時,就會被加鎖;

示例:模拟緩存讀寫

class TestCache {
    private volatile Map<String,Object> map = new HashMap<>();
    private ReadWriteLock lock = new ReentrantReadWriteLock();
    public void put(String key,Object val){
        Lock lock = this.lock.writeLock();
        lock.lock();
        System.out.println(Thread.currentThread().getName() + "\t開始寫入資料");
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        map.put(key, val);
        System.out.println(Thread.currentThread().getName() + "\t寫入資料結束");
    }
    public void get(String key){
        Lock lock = this.lock.readLock();
        lock.lock();
        System.out.println(Thread.currentThread().getName() + "\t開始讀取資料");
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName() + "\t讀取資料結束,結果是:" + o);
    }
}
public class ReadWriteLockTest {
    public static void main(String[] args) {
        TestCache testCache = new TestCache();
        for (int i = 1; i <= 5; i++) {
            int finalI = i;
            new Thread(()->{
                testCache.put(finalI+"", finalI);
            },"" + i).start();
        }
        for (int i = 1; i <= 5; i++) {
            int finalI = i;
            new Thread(()->{
                testCache.get(finalI+"");
            },"" + i).start();
        }
    }
}
           

阻塞隊列

阻塞隊列(BlockingQueue)是一個支援兩個附加操作的隊列;這兩個附加的操作支援阻塞的插入和移除方法:

  1. 支援阻塞的插入方法:當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿;
  2. 支援阻塞的移除方法:在隊列為空時,擷取元素的線程會等待隊列變為非空;

阻塞隊列常用于生産者和消費者的場景,生産者是向隊列裡添加元素的線程,消費者是從隊列裡取元素的線程。阻塞隊列就是生産者用來存放元素、消費者用來擷取元素的容器;

種類分析

ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列;

LinkedBlockingQueue :一個由連結清單結構組成的無界(其實有界,是整數最大值)阻塞隊列;

PriorityBlockingQueue :一個支援優先級排序的無界阻塞隊列;

DelayQueue:一個使用優先級隊列實作的無界阻塞隊列;

SynchronousQueue:一個不存儲元素的阻塞隊列;

LinkedTransferQueue:一個由連結清單結構組成的無界阻塞隊列;

LinkedBlockingDeque:一個由連結清單結構組成的雙向阻塞隊列;

在阻塞隊列不可用時,這兩個附加操作提供了4種處理方式:

Java并發包--JUC學習(上)JUC

抛出異常:是指當阻塞隊列滿時候,再往隊列裡插入元素,會抛出 **IllegalStateException Queue full ** 異常;當隊列為空時,從隊列裡擷取元素時會抛出 NoSuchElementException 異常 ;

示例(添加)

public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        System.out.println(blockingQueue.add("d"));
    }
           
true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
	at 多線程.BlockingQueueTest.main(BlockingQueueTest.java:17)
           

傳回特殊值:插入方法會傳回是否成功,成功則傳回 true;移除方法,則是從隊列裡拿出一個元素,如果沒有則傳回 null ;

一直阻塞:當阻塞隊列滿時,如果生産者線程往隊列裡 put 元素,隊列會一直阻塞生産者線程,直到拿到資料,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裡 take 元素,隊列也會阻塞消費者線程,直到隊列可用;

逾時退出:當阻塞隊列滿時,隊列會阻塞生産者線程一段時間,如果超過一定的時間,生産者線程就會退出;

ArrayBlockingQueue

ArrayBlockingQueue 是一個用數組實作的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序;

預設情況下不保證通路者公平的通路隊列;

所謂公平通路隊列是指阻塞的所有生産者線程或消費者線程,當隊列可用時,可以按照阻塞的先後順序通路隊列,即先阻塞的生産者線程,可以先往隊列裡插入元素,先阻塞的消費者線程,可以先從隊列裡擷取元素。通常情況下為了保證公平性會降低吞吐量。

可以使用以下代碼建立一個公平的阻塞隊列:

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3,true);
	//有參構造器構造器
	public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        //使用 ReentrantLock 建立一個公平鎖
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
	//ReentrantLock 的構造器(建立一個公平鎖)
	public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
           

LinkedBlockingQueue

LinkedBlockingQueue 是一個用連結清單實作的有界阻塞隊列。隊列的預設和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序;

PriorityBlockingQueue

PriorityBlockingQueue 是一個支援優先級的無界阻塞隊列。預設情況下元素采取自然順序升序排列。繼承 Comparable 類實作compareTo() 方法來指定元素排序規則,或者初始化 PriorityBlockingQueue 時,指定構造參數 Comparator 來對元素進行排序。需要注意的是不能保證同優先級元素的順序。

DelayQueue

DelayQueue 是一個支援延時擷取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實作。隊列中的元素必須實作 Delayed 接口,在建立元素時可以指定多久才能從隊列中擷取目前元素。隻有在延遲期滿時才能從隊列中提取元素

DelayQueue 非常有用,可以将 DelayQueue 運用在以下應用場景。

  • 緩存系統的設計:可以用DelayQueue儲存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中擷取元素時,表示緩存有效期到了;
  • 定時任務排程:使用DelayQueue儲存當天将會執行的任務和執行時間,一旦從DelayQueue中擷取到任務就開始執行,比如TimerQueue就是使用DelayQueue實作的;

實作DelayQueue的三個步驟

第一步:繼承Delayed接口;

第二步:實作getDelay(TimeUnit unit),該方法傳回目前元素還需要延時多長時間,機關是納秒;

第三步:實作compareTo方法來指定元素的順序;

SynchronousQueue

SynchronousQueue是一個不存儲元素的阻塞隊列。每一個 put 操作必須等待一個 take 操作,否則不能繼續添加元素。它支援公平通路隊列。預設情況下線程采用非公平性政策通路隊列。使用以下構造方法可以建立公平性通路的 SynchronousQueue,如果設定為true,則等待的線程會采用先進先出的順序通路隊列;

SynchronousQueue的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue ;

LinkedTransferQueue

LinkedTransferQueue是一個由連結清單結構組成的無界阻塞 TransferQueue 隊列。相對于其他阻塞隊列,LinkedTransferQueue 多了tryTransfer 和 transfer 方法;

LinkedBlockingDeque

LinkedBlockingDeque 是一個由連結清單結構組成的雙向阻塞隊列。所謂雙向隊列指的是可以從隊列的兩端插入和移出元素;

種類分析引用簡書 高默思

阻塞的實作

ReentranLock + Condition 實作隊列的阻塞,ReentranLock 是鎖,Condition是條件狀态,通過等待/通知機制,來實作線程之間的通信;

ReentranLock + Condition的等待/通知機制和Object的wait()與notify()是類似的,通過synchronized,在鎖中使用wait()與notify()達到線程之間通信,在ReentranLock 的lock()和unlock()之間通過類似的await()和signal()達到線程之間的通信;

線程池

線程池主要是控制運作的線程數量,處理過程中将任務放入隊列,然後線上程建立後啟動這些任務,如果線程數量超過了最大線程數量(maxPoolSize),超出數量的線程就會排隊等待,等待其它線程執行完畢,再從隊列中取出任務來執行;

主要特點是:線程複用,控制量大的并發數,管理線程(降低系統資源消耗,提高系統響應速度);

使用 Executors 線程池的工具類可以提供工廠方法用來建立不同類型的線程池;

  • newFixedThreadPool:建立固定線程數的線程池;
  • newSingleThreadExecutor:建立一個隻有一個線程的線程池;
  • newCachedThreadPool:可以根據需要建立新的線程,但如果已有線程是空閑的會重用已有線程(一池 N 個工作線程,類似擴容);

上述三個建立的線程池,底層都是由 ThreadPoolExecutor 來進行建立的;關閉線程池調用 shutdown 方法;

并且上述三個建立線程池的方法在實際中都不可采取(阿裡巴巴開發手冊),可能會導緻 OOM;一般我們自定義線程池;
Java并發包--JUC學習(上)JUC
public class ExecutorDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        try {
            for (int i = 1; i <= 10; i++) {
                int num = i;
                threadPool.execute(()->{
                    System.out.println(num);
                });
            }
        } finally {
            threadPool.shutdown();
        }
    }
}
           
Java并發包--JUC學習(上)JUC

ThreadPoolExecutor

public ThreadPoolExecutor(	  int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler){...}
           
  • corePoolSize:線程池中保持的線程數量(常駐核心線程數),包括空閑線程在内,也就是線程池釋放的最小線程數量界限;
  • maximumPoolSize:線程池中能夠容納最大線程數量(必須大于等于 1);
  • keepAliveTime:目前線程數量大于核心線程數量,并且空閑線程保持線上程池中的時間大于存活時間 keepAliveTime 時,就釋放空閑的線程(maximumPoolSize - corePoolSize);
  • TimeUnit(枚舉類) unit:是 keepAliveTime 參數時間的機關,可以是分鐘,秒,毫秒等等;
  • BlockingQueue< Runnable > workQueue:任務隊列,當線程任務送出到線程池以後,首先放入隊列中,然後線程池按照該任務隊列依次執行相應的任務;可以使用的 workQueue 有很多,比如:LinkedBlockingQueue 等等;
  • ThreadFactory threadFactory:表示生成線程池中工作線程的線程工廠,用于建立線程,一般使用預設即可;
  • RejectedExecutionHandler handler:如果隊列已滿,并且工作線程大于了最大線程數(maximumPoolSize),就會按照指定的拒絕政策拒絕執行任務;(注意都是靜态内部類)
    • ThreadPoolExecutor.AbortPolicy: 丢棄任務并抛出 RejectedExecutionException 異常(預設的);
    • ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常;
    • ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列中等待最久的任務,然後把目前任務加入到隊列中嘗試再次送出目前任務;
    • ThreadPoolExecutor.CallerRunsPolicy:不會抛棄任務,也不抛出異常,而是将某些任務

      回退到調用者

      ,進而降低新任務的流量;

工作原理

Java并發包--JUC學習(上)JUC
Java并發包--JUC學習(上)JUC
  1. 在建立了線程池後,開始等待請求;
  2. 當調用 execute() 方法添加一個請求任務時,線程池會做出如下判斷:
    1. 如果正在運作的線程數量小于 corePoolSize ,那麼就會立刻建立線程運作這個任務;
    2. 如果正在運作的線程數量大于或等于 corePoolSize ,那麼将這個任務放入隊列;
    3. 如果這個時候隊列滿了且正在運作的線程數量還小于 maximumPoolSize ,那麼還是要建立非核心線程立刻運作這個任務;
    4. 如果隊列滿了且正在運作的線程數量大于或等于 maximumPoolSize ,那麼線程池會啟動飽和拒絕政策來執行;
  3. 當一個線程完成任務時,它會從隊列中取出一個任務來執行;
  4. 當一個線程無事可做超過一定的時間(keepAliveTime)時,線程會判斷:
    1. 如果目前運作的線程數大于 corePoolSize ,那麼這個線程就被停掉;是以線程池的所有任務完成後,它最終會收縮到corePoolSize的大小;
一般,編寫線程池時,如果該項目或該服務,方法使用到的 CPU 比較多,那麼最大線程數最好是 CPU 核數 + 1(CPU 密集型);而如果是 IO 密集型的,最大線程數一般為 CPU 核數 / 阻塞系數;