我们都知道在Java多线程应用中,多数 生产/消费 模型的首选数据结构就是队列(FIFO - 先进先出)。Java提供的线程安全的队列可分为:阻塞队列、非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中我们需要根据他们的优缺点来选用阻塞队列或者非阻塞队列。
1.ConcurrentLinkedQueue
它是Queue的一个安全实现,元素按FIFO原则进行排序.采用CAS操作来保证元素的一致性。
package com.leitan.architect.threads.threadpool;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* ConcurrentLinkedQueue 是Queue的一个安全实现.Queue中元素按FIFO原则进行排序.采用CAS操作,来保证元素的一致性。
* @Author: lei.tan
* @Date: 2018-11-14 16:16
*/
public class ConcurrentLinkedQueueTest {
private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
private static int count = 2; // 线程个数,计数器
//CountDownLatch 是同步辅助类,在完成某一组线程的任务之前,它允许一个或多个线程一直等待
private static CountDownLatch latch = new CountDownLatch(count) {
// 当计数器减为0的时候回调执行这个方法
@Override
public void await() throws InterruptedException {
super.await();
System.out.println(Thread.currentThread().getName() + ": count down is ok");
}
};
/**
* 生产
*/
public static void offer() {
System.out.println(Thread.currentThread().getName() + ": 开始生产...");
for (int i = 0; i < 100; i++) {
try {
queue.offer(i);
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ": 生产完毕!");
}
static class offer implements Runnable {
@Override
public void run() {
offer();
}
}
/**
* 消费
*/
static class Poll implements Runnable {
public void run() {
// 这里两种判断方式都是准确的,都有可能消费到null值
// while (queue.size()>0) {
while (!queue.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": 消费" + queue.poll());
}
latch.countDown();// 一个线程执行完计数器减一
}
}
public static void main(String[] args) throws InterruptedException {
long timeStart = System.currentTimeMillis();
// 最多允许四个线程同时进行
ExecutorService es = Executors.newFixedThreadPool(4);
// 最好是能保证现生产再消费
ConcurrentLinkedQueueTest.offer();
for (int i = 0; i < count; i++) {
// es.submit(new offer());
es.submit(new Poll());
}
latch.await(); //阻塞主线程(main),等到latch.countDown()减为零才继续执行
System.out.println("剩余:" + queue.size());
System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
es.shutdown();
}
}
2.BlockingQueue
由于LinkedBlockingQueue实现是线程安全的,实现了FIFO(先进先出),是 生产/消费 模型的首选,LinkedBlockingQueue 可以指定容量也可不指定,不指定默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。
package com.leitan.architect.threads.threadpool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 多线程模拟实现 生产者/消费者 模型
*
* @Author: lei.tan
* @Date: 2018-11-14 15:51
*/
public class BlockingQueueTest {
/**
* 定义装苹果的篮子
*/
public class Basket {
private static final int APPLE_NUMBER = 5;// 篮子能够容纳几个苹果
BlockingQueue<String> basket = new LinkedBlockingQueue<>(APPLE_NUMBER);
// 生产苹果,放入篮子
public void produce() throws InterruptedException {
// put方法放入一个苹果,若basket满了,等到basket有位置
System.out.println(Thread.currentThread().getName() + ": 准备产生Apple,当前篮子里有" + (APPLE_NUMBER - basket.remainingCapacity()) + "个apple");
basket.put("An apple");
System.out.println(Thread.currentThread().getName() + ": 产生一个Apple,当前篮子里有" + (APPLE_NUMBER - basket.remainingCapacity()) + "个apple");
}
// 消费苹果,从篮子中取走
public String consume() throws InterruptedException {
// take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)
System.out.println(Thread.currentThread().getName() + ": 准备消费Apple,当前篮子里有" + (APPLE_NUMBER - basket.remainingCapacity()) + "个apple");
String apple = basket.take();
System.out.println(Thread.currentThread().getName() + ": 消费一个Apple,当前篮子里有" + (APPLE_NUMBER - basket.remainingCapacity()) + "个apple");
return apple;
}
}
// 定义苹果生产者
class Producer implements Runnable {
private String instance;
private Basket basket;
public Producer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 生产苹果
// basket.produce();
System.out.println(instance + "准备生产苹果...");
basket.produce();
System.out.println(instance + "生产苹果完毕!");
Thread.sleep(200);
}
} catch (InterruptedException ex) {
System.out.println("Producer Interrupted");
}
}
}
// 定义苹果消费者
class Consumer implements Runnable {
private String instance;
private Basket basket;
public Consumer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 消费苹果
// basket.consume();
System.out.println(instance + "准备消费苹果...");
System.out.println(basket.consume());
System.out.println(instance + "消费苹果完毕!");
Thread.sleep(1000);
}
} catch (InterruptedException ex) {
System.out.println("Consumer Interrupted");
}
}
}
public static void main(String[] args) {
BlockingQueueTest test = new BlockingQueueTest();
// 建立一个装苹果的篮子
Basket basket = test.new Basket();
ExecutorService service = Executors.newCachedThreadPool();
Producer producer1 = test.new Producer("生产者1", basket);
Producer producer2 = test.new Producer("生产者2", basket);
Consumer consumer = test.new Consumer("消费者1", basket);
service.submit(producer1);
service.submit(producer2);
service.submit(consumer);
// 运行7s后停止
try {
Thread.sleep(1000 * 7);
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdownNow();
}
}
3.小结
从上面两个的运行结果我们可以总结出以下几点:
ConcurrentLinkedQueue 消费原则FIFO,无容量限制
offer: 生产(不管已有多少,有命令我就去生产)
poll: 消费(不管桶内有没有东西,我直接去取来消费,有可能取到null,不过实际应用的业务逻辑中,你判断如果取到是null值可以不作处理,不影响这个api的本身使用)
BlockingQueue 消费原则FIFO,可以指定容量也可以不指定,不指定默认最大为Integer.MAX_VALUE
put: 生产(在容量还没有满的情况下我就生产,容量如果满了我就等待,等有空位了我在生产)
take: 消费(桶子里面有东西我才消费,没有东西我就等到有东西我再去消费)