代码思路
- 因为需要固定容量的阻塞队列,底层可以使用数组实现;
- 使用两个指针,分别指向插入和取出的索引位置;
- 使用ReentrantLock和Condition实现线程锁和线程调度。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @ClssName:BlockingQueue100Elements.java
* @Description:编码实现一个容量为100的阻塞队列,队列中可以放任何对象,不一定是简单对象,也可以是Thread或者其他bean对象
* @createtime:2021/2/22 11:19 上午
* @author:Joker
*/
public class BlockingQueue100Elements {
// 具体存放阻塞队列元素的数组
private Object[] items;
// 向阻塞队列中增加元素时的位置
int putptr;
// 从阻塞队列中获取元素时的位置
int takeptr;
// 记录阻塞队列的中元素的个数
int count;
private ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public BlockingQueue100Elements() {
this(100);
}
public BlockingQueue100Elements(int capacity) {
this.items = new Object[capacity];
}
/**
* 插入元素,当满的时候,需要将notFull等待,因为生产者是通过这个条件进行生产,放进去后,通知消费者进行消费:notEmpty.signal();
*
* @param item
*/
public void put(Object item) {
lock.lock();
try {
while (count == items.length) {
System.out.println("满了....");
// 阻塞生产者线程,调用await(),这个时候线程就被notFull堵了
notFull.await();
}
items[putptr++] = item;
// 如果插入的位置到头了,就从0重新开始
if (putptr == items.length) {
putptr = 0;
}
count++;
// 激活消费者线程进行消费,激活被notEmpty堵住的线程
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 获取元素,当空的时候,执行notEmpty.await()阻塞消费者线程,暂时停止消费,取出元素后,notFull.signal();激活生产者进行生产
*
* @return
*/
public Object take() {
lock.lock();
try {
while (count == 0) {
System.out.println("空了...");
// 阻塞消费者消费,此线程被notEmpty堵住了
notEmpty.await();
}
Object ret = items[takeptr++];
if (takeptr == items.length) {
takeptr = 0;
}
count--;
// 激活生产者线程进行生产,激活被notFull堵住的线程
notFull.signal();
return ret;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return null;
}
}
测试代码
public static void main(String[] args) throws InterruptedException {
BlockingQueue100Elements queue = new BlockingQueue100Elements(5);
new Thread(() -> {
for (int i = 0; i < 20; i++) {
queue.put(i + 1);
System.out.println("生产:" + (i + 1));
}
}).start();
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
new Thread(() -> {
for (int i = 0; i < 10; i++) {
Integer value = (Integer) queue.take();
System.out.println("A消费:" + value);
}
}).start();
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
System.out.println("=========");
new Thread(() -> {
for (int i = 0; i < 30; i++) {
Integer value = (Integer) queue.take();
System.out.println("B消费:" + value);
}
}).start();
}