天天看点

编码实现一个容量为100的阻塞队列

代码思路
  • 因为需要固定容量的阻塞队列,底层可以使用数组实现;
  • 使用两个指针,分别指向插入和取出的索引位置;
  • 使用ReentrantLock和Condition实现线程锁和线程调度。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @ClssName:BlockingQueue100Elements.java
 * @Description:编码实现一个容量为100的阻塞队列,队列中可以放任何对象,不一定是简单对象,也可以是Thread或者其他bean对象
 * @createtime:2021/2/22 11:19 上午
 * @author:Joker
 */
public class BlockingQueue100Elements {
    // 具体存放阻塞队列元素的数组
    private Object[] items;
    // 向阻塞队列中增加元素时的位置
    int putptr;
    // 从阻塞队列中获取元素时的位置
    int takeptr;
    // 记录阻塞队列的中元素的个数
    int count;

    private ReentrantLock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public BlockingQueue100Elements() {
        this(100);
    }

    public BlockingQueue100Elements(int capacity) {
        this.items = new Object[capacity];
    }

    /**
     * 插入元素,当满的时候,需要将notFull等待,因为生产者是通过这个条件进行生产,放进去后,通知消费者进行消费:notEmpty.signal();
     *
     * @param item
     */
    public void put(Object item) {
        lock.lock();
        try {
            while (count == items.length) {
                System.out.println("满了....");
                // 阻塞生产者线程,调用await(),这个时候线程就被notFull堵了
                notFull.await();
            }

            items[putptr++] = item;
            // 如果插入的位置到头了,就从0重新开始
            if (putptr == items.length) {
                putptr = 0;
            }
            count++;
            // 激活消费者线程进行消费,激活被notEmpty堵住的线程
            notEmpty.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 获取元素,当空的时候,执行notEmpty.await()阻塞消费者线程,暂时停止消费,取出元素后,notFull.signal();激活生产者进行生产
     *
     * @return
     */
    public Object take() {
        lock.lock();
        try {
            while (count == 0) {
                System.out.println("空了...");
                // 阻塞消费者消费,此线程被notEmpty堵住了
                notEmpty.await();
            }

            Object ret = items[takeptr++];
            if (takeptr == items.length) {
                takeptr = 0;
            }
            count--;
            // 激活生产者线程进行生产,激活被notFull堵住的线程
            notFull.signal();

            return ret;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return null;
    }
}
           

测试代码

public static void main(String[] args) throws InterruptedException {
        BlockingQueue100Elements queue = new BlockingQueue100Elements(5);
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                queue.put(i + 1);
                System.out.println("生产:" + (i + 1));
            }
        }).start();
		TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                Integer value = (Integer) queue.take();
                System.out.println("A消费:" + value);
            }
        }).start();

        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
        System.out.println("=========");

        new Thread(() -> {
            for (int i = 0; i < 30; i++) {
                Integer value = (Integer) queue.take();
                System.out.println("B消费:" + value);
            }
        }).start();
    }