天天看點

多線程之并發工具類(七)

“工欲善其事必先利其器”,有了這些并發工具,多線程控制變得So easy。

多線程之并發工具類(七)

與文無關

JDK中已經給我們内置了很多并發工具,都屬于應用類型,知道具體如何使用就好,主要講以下幾個類:

  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • LockSupport
  • BlockingQueue

這次的幾個案例都需要實際運作,看運作效果才明白怎麼回事,代碼可以直接複制粘貼。

多線程控制類,計數器栅欄,當計數器滿足條件的時候,再開始執行接下來的操作。

public class CountDownLatchTest {
    static final int THREAD_COUNT = 10;
    static final CountDownLatch end = new CountDownLatch(THREAD_COUNT);

    public static void main(String[] args) throws InterruptedException {
        Runnable demo = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("檢查完成");
                end.countDown();
            }
        };
        
        //線程池内有5個線程友善看效果
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < THREAD_COUNT; i++) {
            executorService.submit(demo);
        }

        end.await();
        System.out.println("一切就緒");
        executorService.shutdown();
    }
}
           
多線程之并發工具類(七)

CountDownLatch 運作效果

循環栅欄,可以看做CountDownLatch的重複利用。當滿足一定的條件時候,才開始執行某線程。

// 當線程的數量滿足條件時候,才開始執行。
public class CyclicBarrierTest {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                System.out.println("一切就緒,準備出發");
            }
        });

        Runnable task = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getId() + ":就緒");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        };

        ExecutorService executorService = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 4; i++) {
            executorService.submit(task);
        }


        executorService.shutdown();

    }
}
           
多線程之并發工具類(七)

CyclicBarrier運作結果

所有的線程都在等待,當等待的線程達到一定的數量,然後開始執行接下來的操作。

Semaphore,也是控制線程的一種手段,可以控制并發線程的數量,某些時候我們線程數過多,在通路有限的資源時候,可以使用Semaphore來控制線程的數量。

public class SemaphoreDemo implements Runnable {
    Semaphore mSemaphore = new Semaphore(5);

    @Override
    public void run() {
        try {
            mSemaphore.acquire();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + " done!");
            mSemaphore.release();
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        SemaphoreDemo demo = new SemaphoreDemo();
        for (int i = 0; i < 20; i++) {
            executorService.submit(demo);
        }
        executorService.shutdown();
        ;
    }
}
           
多線程之并發工具類(七)

Semaphore 運作案例

LockSupport提供了一些靜态方法用于阻塞線程,和喚醒線程的功能。

處于park()挂起狀态的線程是Waiting狀态,park()方法阻塞的線程還支援中斷,不抛出中斷異常的同時設定中斷标志位,然後我們可以通過中斷标志位來檢查。

public class LockDemo implements Runnable{
    public static Object sObject = new Object();

    @Override
    public void run() {
        synchronized (sObject){
            System.out.println("目前線程名稱:" + Thread.currentThread().getName());
            LockSupport.park();

            if (Thread.currentThread().isInterrupted()){
                System.out.println( Thread.currentThread().getName() +  "被中斷了");
            }
            System.out.println("執行結束");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        LockDemo demo = new LockDemo();
        Thread t1 = new Thread(demo,"t1");
        Thread t2 = new Thread(demo,"t2");
        t1.start();
        Thread.sleep(3000);
        t2.start();
        t1.interrupt();
        LockSupport.unpark(t2);
    }
}
           
多線程之并發工具類(七)

LockSupport的demo

Java的Queue也是面試中經常提到的知識點,這次因為我們隻涉及到并發相關知識,是以隻提一些并發相關的Queue,關于Queue的具體分析等後面的資料結構系列的時候再詳細解說。

BlockingQueue是Java中的阻塞隊列,JDK中提供了7個阻塞隊列

  • ArrayBlockingQueue : 數組實作的有界隊列,對元素進行FIFO(先進先出)的原則排序。
  • LinkedBlockingQueue: 連結清單組成的有界隊列,長度預設最大值為Integer.MAX_VALUE,元素按FIFO原則排序,性能好于ArrayBlockingQueue。
  • PriorityBlockingQueue:支援優先級的無界阻塞隊列。
  • DelayQueue: 支援延遲擷取元素的無界阻塞隊列
  • SynchronousQueue:不存儲元素的阻塞隊列。每一個put操作必須等待take操作,否則不能繼續添加元素。
  • LinkedTransferQueue:連結清單組成的無界傳輸隊列
  • LinkedBlockingDeque:由連結清單組成的雙向阻塞隊列。可以從兩段插入和移除元素。

帶大家看一下LinkedBlockingQueue的幾個關鍵方法:

//LinkedBlockingQueue 方法探索
  // 添加元素
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        //如果隊列滿了,直接傳回false
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        // 建立新的節點
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            // 如果隊列不滿的話,就讓元素加入隊列。
            //然後判斷,目前隊列元素是否滿了,不滿的話,通知notFull條件。
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        // 假如添加的是第一個元素,通知隊列不為空了。
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
       
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 當隊列滿的時候進行等待。若不滿入隊
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
          
           // 同offer
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // 同offer
        if (c == 0)
            signalNotEmpty();
    }
           

可以看出添加元素上:

  • 當隊列滿的時候,offer不添加元素,立刻傳回。put則會阻塞操作,直到隊列為不滿。
  • 還有一個帶參數的offer方法,和put的唯一差別就是有逾時時間,在一段時間内隊列還不空的話,就傳回。
//LinkedBlockingQueue 方法探索
  // 移除
    public E poll() {
        final AtomicInteger count = this.count;
        // 隊列為空,傳回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //隊列有元素的話,取出元素
            //取出元素後如果隊列是不為空,發出不為空的信号。
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        //如果取出元素之前,隊列是滿的,因為取出了元素,現在發出不滿的信号
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
           // 隊列為空的話,就等待隊列不為空。
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
           

可以看出LinkedBlockingQueue的移除操作poll和take方法:

  • poll不阻塞,take會阻塞
  • poll(long timeout, TimeUnit unit),當隊列為空的時候,等待指定的時間,如果隊列扔為空,那麼就傳回。

這次是帶領大家一起看了下LinkedBlockingQueue的關鍵方法,其它的隊列的操作也都類似,望大家自行檢視,JDK中Queue的實作并不難了解。

最後

這次主要介紹了幾個并發中可能會用到的工具類,最後說了下JDK并發包中的阻塞隊列,阻塞隊列相對比較重要,就簡單的分析了其實作。

希望能幫助到大家。

參考

  • 《Java并發實戰》
  • 《Java高并發程式設計》
  • 《并發程式設計的藝術》