背景
我们在实际项目开发中,会遇到调用第三方的接口,有时候会因为网络原因或者第三方接口不稳定等因素导致接口调用失败,我们需要考虑增加重试机制,增加重试次数和重试时间间隔,保证我们业务的进行。
方案
通过对redis的了解,redis中有种数据类型zset支持延迟队列,Zset中存储数据结构也是K-V的数据,其中V中包含memmber和score。通过score可以排序。
Redis 的有序集合保留了集合的特性(元素不能重复),而且在此基础上的元素是可以排序的(分数可以重复)。
但是,redis 的有序集合的排序和列表的排序不同,有序集合并非使用索引下标来排序,而是使用每个元素设置了一个分数(score),来做为排序的依据。
原理设计
有序集合中键值对的键被称为成员,值被称为分值,分值必须为浮点数。
命令 | 行为 |
---|---|
ZADD | 将一个带有给定分值的成员添加到有序集合中,返回添加元素的个数 |
ZRANGE | 根据元素在有序排列中的位置,从有序集合里面获取多个元素 |
ZRANGEBYSCORE | zrangebyscore key min max [withscores] [LIMIT offset count] 根据一个分值段来获取在该分值段的指定数量的元素 |
ZREM | ZREM key member-------如果给定成员存在于该有序集合,则删除该成员 |
ZCARD | ZCARD key--------返回有序集合包含的成员数量 |
ZCOUNT | ZCOUNT key min max----------返回分值介于min 和max之间的成员数量 |
ZSCORE | ZSOCRE key member --------返回成员的分值 |
ZINCRBY | ZINCRBY key increment member -----将member成员的分值加上increment |
原理和思想
原理:
- 生产者将需要延迟发送的数据放入redis zset中
# key为队列的名称,score为当前的时间戳+延迟时间,member为消息体
zadd key score member
2.消费者一直循环从redis的zset队列获取数据
# key为队列的名称,min为0,max为当前的时间戳,limit为单次个数
zrangebyscore key min max limit
- 然后将该消息体删除 zrem key member
# key为队列的名称,member为消息体
zrem key member
流程图

代码实现
MessageDelyQueueService 延迟队列核心业务类
@Slf4j
@Service
public class MessageDelyQueueService {
@Autowired
StringRedisTemplate redisTemplate;
/**
* 延迟队列名称,可以根据不通的业务处理设置不同的队列
*/
private static final String DELY_QUEUE_NAME="dely_queue";
/**
* 锁key
*/
public static final String LOCK_KEY="message_lock_key";
/**
* 发送数据
* @param message 消息
* @param dely 延迟多久(秒)
*/
public Boolean pushMessage(Message message,int dely){
long score= System.currentTimeMillis()+dely*1000;
String msg= JSONObject.toJSONString(message);
Boolean add = redisTemplate.opsForZSet().add(DELY_QUEUE_NAME, msg, score);
return add;
}
/**
* 拉取最新需要
* 被消费的消息
* rangeByScore 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息
*
* @return
*/
public List<Message> pull() {
List<Message> msgList =new ArrayList<>();
try {
Set<String> strings = redisTemplate.opsForZSet().rangeByScore(DELY_QUEUE_NAME, 0, System.currentTimeMillis());
if (strings == null) {
return null;
}
msgList = strings.stream().map(msg -> {
Message message = null;
try {
message = JSONObject.parseObject(msg, Message.class);
} catch (Exception e) {
e.printStackTrace();
}
return message;
}).collect(Collectors.toList());
} catch (Exception e) {
log.error(e.toString());
}
return msgList;
}
/**
* 移除消息
*
* @param message
*/
@SneakyThrows
public Boolean remove(Message message) {
Long remove = redisTemplate.opsForZSet().remove(DELY_QUEUE_NAME, JSONObject.toJSONString(message));
return remove > 0 ? true : false;
}
/**
* 获取锁,这是使用的锁的方式比较简单 ,reids
* 实现分布式锁比较复杂,这里不介绍
* @return
*/
public Boolean getLock(){
boolean lock = false;
//获得锁
lock = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY,DELY_QUEUE_NAME+"is locking !",30, TimeUnit.SECONDS);
return lock;
}
/**
* 释放锁
*/
public void releaseLock(){
redisTemplate.delete(LOCK_KEY);
}
生产者
@Slf4j
@Service
public class MessageProducer {
@Autowired
MessageDelyQueueService delyQueueService;
/**
* 发送数据
* @param message 消息
* @param dely 延迟多久(秒)
*/
public Boolean pushMessage(Message message,int dely){
log.info("push mesage:{},now:{},dely:{}",message, DateUtil.now(),dely);
return delyQueueService.pushMessage(message,dely);
}
}
消费者
@Slf4j
@Service
public class MessageConsumer {
public static ExecutorService executorService = Executors.newFixedThreadPool(5);
@Autowired
MessageDelyQueueService delyQueueService;
private boolean stopFlag=false;
public void setStopFlag(boolean stopFlag) {
this.stopFlag = stopFlag;
}
@SneakyThrows
public void consumer(){
executorService.execute(()->{
while(!stopFlag){
List<Message> messageList = delyQueueService.pull();
if(messageList.size()==0){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
handleMessage(messageList);
}
});
}
/**
* 消息处理
* @param messageList
*/
public void handleMessage( List<Message> messageList){
for (Message message : messageList) {
log.info("consumer message:{},date:{}",message, DateUtil.now());
this.delyQueueService.remove(message);
}
}
}
实体类
@Data
@AllArgsConstructor
public class Message {
private String id;
private String msg;
}
执行结果
2022-11-03 15:04:02.759 delyqueue.MessageProducer : push mesage:Message(id=0, msg=msg0),now:2022-11-03 15:04:02,dely:0秒执行
2022-11-03 15:04:03.725 delyqueue.MessageProducer : push mesage:Message(id=1, msg=msg1),now:2022-11-03 15:04:03,dely:3秒执行
2022-11-03 15:04:03.728 delyqueue.MessageProducer : push mesage:Message(id=2, msg=msg2),now:2022-11-03 15:04:03,dely:6秒执行
2022-11-03 15:04:03.730 delyqueue.MessageProducer : push mesage:Message(id=3, msg=msg3),now:2022-11-03 15:04:03,dely:9秒执行
2022-11-03 15:04:03.732 delyqueue.MessageProducer : push mesage:Message(id=4, msg=msg4),now:2022-11-03 15:04:03,dely:12秒执行
2022-11-03 15:04:03.734 delyqueue.MessageProducer : push mesage:Message(id=5, msg=msg5),now:2022-11-03 15:04:03,dely:15秒执行
2022-11-03 15:04:03.736 delyqueue.MessageProducer : push mesage:Message(id=6, msg=msg6),now:2022-11-03 15:04:03,dely:18秒执行
2022-11-03 15:04:03.738 delyqueue.MessageProducer : push mesage:Message(id=7, msg=msg7),now:2022-11-03 15:04:03,dely:21秒执行
2022-11-03 15:04:03.740 delyqueue.MessageProducer : push mesage:Message(id=8, msg=msg8),now:2022-11-03 15:04:03,dely:24秒执行
2022-11-03 15:04:03.741 delyqueue.MessageProducer : push mesage:Message(id=9, msg=msg9),now:2022-11-03 15:04:03,dely:27秒执行
2022-11-03 15:04:04.249 delyqueue.MessageConsumer : consumer message:Message(id=0, msg=msg0),date:2022-11-03 15:04:04
2022-11-03 15:04:06.809 delyqueue.MessageConsumer : consumer message:Message(id=1, msg=msg1),date:2022-11-03 15:04:06
2022-11-03 15:04:09.856 delyqueue.MessageConsumer : consumer message:Message(id=2, msg=msg2),date:2022-11-03 15:04:09
2022-11-03 15:04:12.916 delyqueue.MessageConsumer : consumer message:Message(id=3, msg=msg3),date:2022-11-03 15:04:12
2022-11-03 15:04:15.978 delyqueue.MessageConsumer : consumer message:Message(id=4, msg=msg4),date:2022-11-03 15:04:15
2022-11-03 15:04:19.040 delyqueue.MessageConsumer : consumer message:Message(id=5, msg=msg5),date:2022-11-03 15:04:19
2022-11-03 15:04:22.102 delyqueue.MessageConsumer : consumer message:Message(id=6, msg=msg6),date:2022-11-03 15:04:22
2022-11-03 15:04:25.163 delyqueue.MessageConsumer : consumer message:Message(id=7, msg=msg7),date:2022-11-03 15:04:25
2022-11-03 15:04:28.218 delyqueue.MessageConsumer : consumer message:Message(id=8, msg=msg8),date:2022-11-03 15:04:28
2022-11-03 15:04:30.777 delyqueue.MessageConsumer : consumer message:Message(id=9, msg=msg9),date:2022-11-03 15:04:30