天天看点

实习成长之路:DelayQueue多线程下的延迟队列的使用

DelayQueue

什么是DelayQueue,有什么作用呢?

DelayQueue提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。

作用就是在我们规定的时间后去执行这个线程,处理我们的业务逻辑。

使用DelayQueue的步骤

我这里是定义了一个DelayQueue的管理类DelayQueueManager

首先我们肯定要创建一个DelayQueue,我理解的它就是一个延迟队列

/**
     * 延时队列
     */
    private DelayQueue<DelayTask<?>> delayQueue;
    private static DelayQueueManager instance = new DelayQueueManager();

    private DelayQueueManager() {
        delayQueue = new DelayQueue<>();
        init();
    }
     public static DelayQueueManager getInstance() {
        return instance;
    }

    /**
     * 初始化
     */
    public void init() {
        //创建守护线程处理队列
        Thread daemonThread = new Thread(() -> execute());
        daemonThread.setName("DelayQueueMonitor");
        daemonThread.setDaemon(true);
        daemonThread.start();
    }
           

它可能长这个样子

实习成长之路:DelayQueue多线程下的延迟队列的使用

我的使用方法就是,在我们处理业务的时候,先要获取DelayQueueManager的一个实例,让我们的DelayQueueManager执行init方法进行初始化

DelayQueueManager manager = DelayQueueManager.getInstance();
        xxx业务Worker delayWorker = new xxx业务Worker(xxx参数);
        manager.put(delayWorker, 100, TimeUnit.MILLISECONDS);
           

这个时候我们的业务就已经放进去了,上面的例子是延迟100s再去处理这个业务。

不理解?没关系看看这个xxx业务是做啥的

@Slf4j
public class xxx业务Worker implements Runnable {

    /**
     * 托管微信ID
     */
    private String xxx参数;  就是刚才传过来的那个参数啦

    public xxx业务Worker (String xxx参数){
        this.xxx = xxxx;
    }
	上面就是普通的类的参数和构造方法啦
	//充血run方法,真正执行的时候,是执行的run方法
    @Override
    public void run() {
        log.info("【xxx业务延迟队列处理】xxxx={}",this.xxxx);
        IAutoSendMsgTaskService iAutoSendMsgTaskService = ApplicationContextUtil.getBean(IAutoSendMsgTaskService.class);
        iAutoSendMsgTaskService.checkTaskStatusByWxId(this.Id);
        //大概意思就是:
        	//1.先通过上下文工具类,拿到这个接口的实例类对象
        	//2.调用这个接口的某个具体方法(我这里是根据id修改状态)
    }
}

也就是说:过100s之后,我就会有一个线程去执行这个run方法,然后就去调用了具体的业务方法,那他是怎么拿到这个run方法或者怎么维护这个线程呢?
           
put方法实现
public void put(Runnable task, long time, TimeUnit unit) {
        int taskNum = delayQueue.size();
        if(taskNum>= CommonConfig.MAX_DELAY_QUERY_SIZE){
            log.error("【虚拟机中当前存活线程数量】当前存活:{},超过阀值:{},启动拒绝策略丢弃任务",taskNum,CommonConfig.MAX_DELAY_QUERY_SIZE);
            return;
        }
        // 获取延时时间
        long timeout = TimeUnit.NANOSECONDS.convert(time, unit);
        // 将任务封装成实现Delayed接口的消息体
        DelayTask<?> delayOrder = new DelayTask<>(timeout, task);
        // 将消息体放到延时队列中
        delayQueue.put(delayOrder);
    }
           

这个put就是我们那个DelayQueueManager的put方法,由他放进我们的delayQueue的延迟队列中,怎么放的呢?我们来看看delayQueue的put方法

public void put(E e) {
        offer(e);
    }
    将指定元素插入此延迟队列。
	参数:
	e - 要添加的元素
	返回:
	true
	抛出:
	NullPointerException – 如果指定的元素为空
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;   //这是一个Thread
                available.signal();//private final Condition available = lock.newCondition();聪明的人在这就看出来了,没错他用的也是AQS来实现的
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

           

这样他就把一个实现了Runable的一个任务放进了队列中

怎么执行我们延迟队列中的任务?
private void execute() {
        while (true) {
            Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
            log.info("虚拟机中当前存活线程数量:" + map.size());
            //延迟队列大小
            int taskNum = delayQueue.size();
            if(taskNum>= CommonConfig.MAX_DELAY_QUERY_SIZE){
                log.error("【虚拟机中当前存活线程数量】当前存活:{},超过阀值:{}",taskNum,CommonConfig.MAX_DELAY_QUERY_SIZE);
            }
            log.info("延迟队列当前延时任务数量:" + taskNum);
            try {
                // 阻塞从延时队列中获取任务
                DelayTask<?> delayTask = delayQueue.take();
                Runnable task = delayTask.getTask();
                if (null == task) {
                    continue;
                }
                // 提交到线程池执行task
                ThreadExecutor.executorDelayTask(task, "延迟队列");

            } catch (Exception e) {
                log.error("延迟队列执行线程异常",e);
            }
        }
    }
           

这个take()方法是怎么拿到任务的

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
           

1.首先看看延迟队列是不是null,是null,说明延迟队列没有元素,就让他阻塞等待,有过期元素继续

2.如果有元素,那就getDelay(NANOSECONDS),这里是一个差值,就是过期时间和当前时间的差值

3.如果这个差值<=0 说明这个元素已经过期了,也就是说,这个任务的倒计时完了,你要执行了。poll()方法就会返回一个DelayTask对象(T在约束)

//自己封装的DelayTask
public class DelayTask<T extends Runnable> implements Delayed {
   private final long time;
   private final T task;

           
3.1 接着就是getTask拿到任务,直接提交给线程池去执行了
           

4.否则first=null,往下走