我們都知道在Java多線程應用中,多數 生産/消費 模型的首選資料結構就是隊列(FIFO - 先進先出)。Java提供的線程安全的隊列可分為:阻塞隊列、非阻塞隊列,其中阻塞隊列的典型例子是BlockingQueue,非阻塞隊列的典型例子是ConcurrentLinkedQueue,在實際應用中我們需要根據他們的優缺點來選用阻塞隊列或者非阻塞隊列。
1.ConcurrentLinkedQueue
它是Queue的一個安全實作,元素按FIFO原則進行排序.采用CAS操作來保證元素的一緻性。
package com.leitan.architect.threads.threadpool;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* ConcurrentLinkedQueue 是Queue的一個安全實作.Queue中元素按FIFO原則進行排序.采用CAS操作,來保證元素的一緻性。
* @Author: lei.tan
* @Date: 2018-11-14 16:16
*/
public class ConcurrentLinkedQueueTest {
private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
private static int count = 2; // 線程個數,計數器
//CountDownLatch 是同步輔助類,在完成某一組線程的任務之前,它允許一個或多個線程一直等待
private static CountDownLatch latch = new CountDownLatch(count) {
// 當計數器減為0的時候回調執行這個方法
@Override
public void await() throws InterruptedException {
super.await();
System.out.println(Thread.currentThread().getName() + ": count down is ok");
}
};
/**
* 生産
*/
public static void offer() {
System.out.println(Thread.currentThread().getName() + ": 開始生産...");
for (int i = 0; i < 100; i++) {
try {
queue.offer(i);
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ": 生産完畢!");
}
static class offer implements Runnable {
@Override
public void run() {
offer();
}
}
/**
* 消費
*/
static class Poll implements Runnable {
public void run() {
// 這裡兩種判斷方式都是準确的,都有可能消費到null值
// while (queue.size()>0) {
while (!queue.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": 消費" + queue.poll());
}
latch.countDown();// 一個線程執行完計數器減一
}
}
public static void main(String[] args) throws InterruptedException {
long timeStart = System.currentTimeMillis();
// 最多允許四個線程同時進行
ExecutorService es = Executors.newFixedThreadPool(4);
// 最好是能保證現生産再消費
ConcurrentLinkedQueueTest.offer();
for (int i = 0; i < count; i++) {
// es.submit(new offer());
es.submit(new Poll());
}
latch.await(); //阻塞主線程(main),等到latch.countDown()減為零才繼續執行
System.out.println("剩餘:" + queue.size());
System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
es.shutdown();
}
}
2.BlockingQueue
由于LinkedBlockingQueue實作是線程安全的,實作了FIFO(先進先出),是 生産/消費 模型的首選,LinkedBlockingQueue 可以指定容量也可不指定,不指定預設最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。
package com.leitan.architect.threads.threadpool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 多線程模拟實作 生産者/消費者 模型
*
* @Author: lei.tan
* @Date: 2018-11-14 15:51
*/
public class BlockingQueueTest {
/**
* 定義裝蘋果的籃子
*/
public class Basket {
private static final int APPLE_NUMBER = 5;// 籃子能夠容納幾個蘋果
BlockingQueue<String> basket = new LinkedBlockingQueue<>(APPLE_NUMBER);
// 生産蘋果,放入籃子
public void produce() throws InterruptedException {
// put方法放入一個蘋果,若basket滿了,等到basket有位置
System.out.println(Thread.currentThread().getName() + ": 準備産生Apple,目前籃子裡有" + (APPLE_NUMBER - basket.remainingCapacity()) + "個apple");
basket.put("An apple");
System.out.println(Thread.currentThread().getName() + ": 産生一個Apple,目前籃子裡有" + (APPLE_NUMBER - basket.remainingCapacity()) + "個apple");
}
// 消費蘋果,從籃子中取走
public String consume() throws InterruptedException {
// take方法取出一個蘋果,若basket為空,等到basket有蘋果為止(擷取并移除此隊列的頭部)
System.out.println(Thread.currentThread().getName() + ": 準備消費Apple,目前籃子裡有" + (APPLE_NUMBER - basket.remainingCapacity()) + "個apple");
String apple = basket.take();
System.out.println(Thread.currentThread().getName() + ": 消費一個Apple,目前籃子裡有" + (APPLE_NUMBER - basket.remainingCapacity()) + "個apple");
return apple;
}
}
// 定義蘋果生産者
class Producer implements Runnable {
private String instance;
private Basket basket;
public Producer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 生産蘋果
// basket.produce();
System.out.println(instance + "準備生産蘋果...");
basket.produce();
System.out.println(instance + "生産蘋果完畢!");
Thread.sleep(200);
}
} catch (InterruptedException ex) {
System.out.println("Producer Interrupted");
}
}
}
// 定義蘋果消費者
class Consumer implements Runnable {
private String instance;
private Basket basket;
public Consumer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 消費蘋果
// basket.consume();
System.out.println(instance + "準備消費蘋果...");
System.out.println(basket.consume());
System.out.println(instance + "消費蘋果完畢!");
Thread.sleep(1000);
}
} catch (InterruptedException ex) {
System.out.println("Consumer Interrupted");
}
}
}
public static void main(String[] args) {
BlockingQueueTest test = new BlockingQueueTest();
// 建立一個裝蘋果的籃子
Basket basket = test.new Basket();
ExecutorService service = Executors.newCachedThreadPool();
Producer producer1 = test.new Producer("生産者1", basket);
Producer producer2 = test.new Producer("生産者2", basket);
Consumer consumer = test.new Consumer("消費者1", basket);
service.submit(producer1);
service.submit(producer2);
service.submit(consumer);
// 運作7s後停止
try {
Thread.sleep(1000 * 7);
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdownNow();
}
}
3.小結
從上面兩個的運作結果我們可以總結出以下幾點:
ConcurrentLinkedQueue 消費原則FIFO,無容量限制
offer: 生産(不管已有多少,有指令我就去生産)
poll: 消費(不管桶内有沒有東西,我直接去取來消費,有可能取到null,不過實際應用的業務邏輯中,你判斷如果取到是null值可以不作處理,不影響這個api的本身使用)
BlockingQueue 消費原則FIFO,可以指定容量也可以不指定,不指定預設最大為Integer.MAX_VALUE
put: 生産(在容量還沒有滿的情況下我就生産,容量如果滿了我就等待,等有空位了我在生産)
take: 消費(桶子裡面有東西我才消費,沒有東西我就等到有東西我再去消費)