DelayQueue
什么是DelayQueue,有什么作用呢?
DelayQueue提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。
作用就是在我们规定的时间后去执行这个线程,处理我们的业务逻辑。
使用DelayQueue的步骤
我这里是定义了一个DelayQueue的管理类DelayQueueManager
首先我们肯定要创建一个DelayQueue,我理解的它就是一个延迟队列
/**
* 延时队列
*/
private DelayQueue<DelayTask<?>> delayQueue;
private static DelayQueueManager instance = new DelayQueueManager();
private DelayQueueManager() {
delayQueue = new DelayQueue<>();
init();
}
public static DelayQueueManager getInstance() {
return instance;
}
/**
* 初始化
*/
public void init() {
//创建守护线程处理队列
Thread daemonThread = new Thread(() -> execute());
daemonThread.setName("DelayQueueMonitor");
daemonThread.setDaemon(true);
daemonThread.start();
}
它可能长这个样子
我的使用方法就是,在我们处理业务的时候,先要获取DelayQueueManager的一个实例,让我们的DelayQueueManager执行init方法进行初始化
DelayQueueManager manager = DelayQueueManager.getInstance();
xxx业务Worker delayWorker = new xxx业务Worker(xxx参数);
manager.put(delayWorker, 100, TimeUnit.MILLISECONDS);
这个时候我们的业务就已经放进去了,上面的例子是延迟100s再去处理这个业务。
不理解?没关系看看这个xxx业务是做啥的
@Slf4j
public class xxx业务Worker implements Runnable {
/**
* 托管微信ID
*/
private String xxx参数; 就是刚才传过来的那个参数啦
public xxx业务Worker (String xxx参数){
this.xxx = xxxx;
}
上面就是普通的类的参数和构造方法啦
//充血run方法,真正执行的时候,是执行的run方法
@Override
public void run() {
log.info("【xxx业务延迟队列处理】xxxx={}",this.xxxx);
IAutoSendMsgTaskService iAutoSendMsgTaskService = ApplicationContextUtil.getBean(IAutoSendMsgTaskService.class);
iAutoSendMsgTaskService.checkTaskStatusByWxId(this.Id);
//大概意思就是:
//1.先通过上下文工具类,拿到这个接口的实例类对象
//2.调用这个接口的某个具体方法(我这里是根据id修改状态)
}
}
也就是说:过100s之后,我就会有一个线程去执行这个run方法,然后就去调用了具体的业务方法,那他是怎么拿到这个run方法或者怎么维护这个线程呢?
put方法实现
public void put(Runnable task, long time, TimeUnit unit) {
int taskNum = delayQueue.size();
if(taskNum>= CommonConfig.MAX_DELAY_QUERY_SIZE){
log.error("【虚拟机中当前存活线程数量】当前存活:{},超过阀值:{},启动拒绝策略丢弃任务",taskNum,CommonConfig.MAX_DELAY_QUERY_SIZE);
return;
}
// 获取延时时间
long timeout = TimeUnit.NANOSECONDS.convert(time, unit);
// 将任务封装成实现Delayed接口的消息体
DelayTask<?> delayOrder = new DelayTask<>(timeout, task);
// 将消息体放到延时队列中
delayQueue.put(delayOrder);
}
这个put就是我们那个DelayQueueManager的put方法,由他放进我们的delayQueue的延迟队列中,怎么放的呢?我们来看看delayQueue的put方法
public void put(E e) {
offer(e);
}
将指定元素插入此延迟队列。
参数:
e - 要添加的元素
返回:
true
抛出:
NullPointerException – 如果指定的元素为空
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null; //这是一个Thread
available.signal();//private final Condition available = lock.newCondition();聪明的人在这就看出来了,没错他用的也是AQS来实现的
}
return true;
} finally {
lock.unlock();
}
}
这样他就把一个实现了Runable的一个任务放进了队列中
怎么执行我们延迟队列中的任务?
private void execute() {
while (true) {
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
log.info("虚拟机中当前存活线程数量:" + map.size());
//延迟队列大小
int taskNum = delayQueue.size();
if(taskNum>= CommonConfig.MAX_DELAY_QUERY_SIZE){
log.error("【虚拟机中当前存活线程数量】当前存活:{},超过阀值:{}",taskNum,CommonConfig.MAX_DELAY_QUERY_SIZE);
}
log.info("延迟队列当前延时任务数量:" + taskNum);
try {
// 阻塞从延时队列中获取任务
DelayTask<?> delayTask = delayQueue.take();
Runnable task = delayTask.getTask();
if (null == task) {
continue;
}
// 提交到线程池执行task
ThreadExecutor.executorDelayTask(task, "延迟队列");
} catch (Exception e) {
log.error("延迟队列执行线程异常",e);
}
}
}
这个take()方法是怎么拿到任务的
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
1.首先看看延迟队列是不是null,是null,说明延迟队列没有元素,就让他阻塞等待,有过期元素继续
2.如果有元素,那就getDelay(NANOSECONDS),这里是一个差值,就是过期时间和当前时间的差值
3.如果这个差值<=0 说明这个元素已经过期了,也就是说,这个任务的倒计时完了,你要执行了。poll()方法就会返回一个DelayTask对象(T在约束)
//自己封装的DelayTask
public class DelayTask<T extends Runnable> implements Delayed {
private final long time;
private final T task;
3.1 接着就是getTask拿到任务,直接提交给线程池去执行了
4.否则first=null,往下走