這個其實不怎麼推薦,當然她使用的話,也是沒有問題的,隻是因為用到了定時器
是以不如RMapCache好用,大家可以當作是了解Redisson延時隊列來看待這篇部落格
源碼連結:
https://github.com/HuskyCorps/distributeMiddleware
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIyVGduV2YfNWawNCM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPR1UMJRkT1smaNBDOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZwpmLxUTOzEjN1EjM5ATOwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
0.
application.properties
#使用者會員到期提醒
vip.expire.first.subject=會員即将到期提醒【泰達便民服務平台-http://www.tjxstech.com/】
vip.expire.first.content=手機為:%s 的使用者,您好!您的會員有效期即将失效,請您前往平台續費~祝您生活愉快【泰達便民服務平台-http://www.tjxstech.com/】
vip.expire.end.subject=會員到期提醒【泰達便民服務平台-http://www.tjxstech.com/】
vip.expire.end.content=手機為:%s 的使用者,您好!您的會員有效期已經失效,為了您有更好的體驗,請您前往平台繼續續費~祝您生活愉快【泰達便民服務平台-http://www.tjxstech.com/】
1.controller
/**
* Redisson延遲隊列DelayedQueue,實作會員到期前N天提醒
*
* @author Yuezejian
* @date 2020年 09月09日 19:57:53
*/
@RestController
@RequestMapping("vip")
public class VipController {
private static final Logger log = LoggerFactory.getLogger(VipController.class);
@Autowired
private VipService vipService;
@RequestMapping(value = "put" ,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)application/json;charset=UTF-8
public BaseResponse putVip(@RequestBody @Validated UserVip userVip, BindingResult result) {
String checkRes = ValidatorUtil.checkResult(result);
if (StringUtils.isNotBlank(checkRes)) {
return new BaseResponse(StatusCode.InvalidParams.getCode(),checkRes);
}
BaseResponse response = new BaseResponse(StatusCode.Success);
try {
vipService.addVip(userVip);
log.info("————成功充值會員 "+userVip.getVipDay()+" 天");
} catch (Exception e) {
log.error("——————————充值會員-發生異常:",e.fillInStackTrace());
response = new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}
}
2.service
/**
* 基于Redisson延遲隊列DelayQueue,實作會員到期前N天提醒
*
* @author Yuezejian
* @date 2020年 09月09日 20:16:35
*/
@Service
public class VipService {
private static final Logger log = LoggerFactory.getLogger(VipService.class);
@Autowired
private UserVipMapper userVipMapper;
@Autowired
private RedissonClient redissonClient;
@Transactional(rollbackFor = Exception.class)
public void addVip(UserVip vip) throws Exception {
vip.setVipTime(DateTime.now().toDate());
int res = userVipMapper.insertSelective(vip);
//TODO:充值成功(現實一般是需要走支付的..在這裡以成功插入db為準) - 設定兩個過期提醒時間,
//TODO:一個是vipDay後的;一個是在到達vipDay前 x 的時間
//TODO:如,vipDay=10天,x=2,即代表vip到期 前2天 提醒一次,vip到期時提醒一次,即
//TODO:第一次提醒的時間點為:ttl=10-2=8,即距離現在開始的8天後進行第一次提醒;
//TODO:第二次提醒的時間點是:ttl=10,即距離現在開始的10天後進行第二次提醒 -- 以此類推
//TODO: (時間的話,建議轉化為s;當然啦,具體業務具體設定即可)
//TODO:基于redisson的延遲隊列實作,重點就在于 ttl 的計算
// (為了測試友善,在這裡我們以 s 為機關);
//TODO:如果是多次提醒的話,需要做标記
if (res > 0 ) {
RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(Constant.RedissonUserVipQueue);
RDelayedQueue<String> rDelayedQueue = redissonClient.getDelayedQueue(blockingDeque);
//TODO:第一次提醒
//這個value,實際是這樣的value=vipId+"-"+"1",{value="14-1"}
// 下面的處理隻是為了構架考慮,是以外提了出去。
//為了友善大家了解,本汪把相關代碼都加到了注釋裡
// public static final String SplitCharUserVip="-";
// 使用者會員到期前的多次提醒的辨別
// public enum VipExpireFlg{
// First(1),
// End(2),
// ;
//
// private Integer type;
//
// VipExpireFlg(Integer type) {
// this.type = type;
// }
//
// public Integer getType() {
// return type;
// }
//
// public void setType(Integer type) {
// this.type = type;
// }
// }
String value = vip.getId()+Constant.SplitCharUserVip+Constant.VipExpireFlg.First.getType();
Long firstTTL = Long.valueOf(String.valueOf(vip.getVipDay()-Constant.x));
if ( firstTTL > 0 ) {
//在firstTTL秒内,把value對象移動到目标隊列中去
rDelayedQueue.offer(value,firstTTL,TimeUnit.SECONDS);
}
//TODO:第二次提醒
//這個value,實際是value=vipId+"-"+"2",{value="14-2"}
value = vip.getId()+Constant.SplitCharUserVip+Constant.VipExpireFlg.End.getType();
Long secondTTL = Long.valueOf(vip.getVipDay());
//在secondTTL秒内,把value對象移動到目标隊列中去
rDelayedQueue.offer(value,secondTTL,TimeUnit.SECONDS);
//本汪帶大家看一下官方的執行個體
//Java的基于Redis的DelayedQueue對象允許将每個元素以指定的延遲傳輸到目标隊列。
// 對于将消息傳遞給消費者的指數退避政策可能很有用。目标隊列可以是任何隊列實作的RQueue接口。
//RBlockingQueue<String> distinationQueue = ...
//RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
//--》move object to distinationQueue in 10 seconds
//delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
//--》move object to distinationQueue in 1 minutes
//delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
//
//
//--》 msg1 will appear in 15 seconds
//distinationQueue.poll(15, TimeUnit.SECONDS);
//
//--》msg2 will appear in 2 seconds
//distinationQueue.poll(2, TimeUnit.SECONDS);
}
}
}
3.監聽器
/**
* Redisson的延時隊列DelayQueue,Vip提前N天提醒——Listener
*
* @author Yuezejian
* @date 2020年 09月09日 21:07:25
*/
@Component
public class VipQueueListener {
private static final Logger log = LoggerFactory.getLogger(VipQueueListener.class);
@Autowired
private RedissonClient redissonClient;
@Autowired
private UserVipMapper vipMapper;
@Autowired
private MailService mailService;
@Autowired
private Environment env;//環境變量執行個體
//實時監聽延時隊列中的代理消息
@Async("threadPoolTaskExecutor")
@Scheduled(cron = "0/5 * * * * ?")
public void listenExpireVip() {
RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(Constant.RedissonUserVipQueue);
if (blockingDeque != null && !blockingDeque.isEmpty()) {
//本汪說下,此處我們所取到的element,就是我們
//rDelayedQueue.offer(value,secondTTL,TimeUnit.SECONDS);
//放進去的value了,他的格式是“14-1”
String element = blockingDeque.poll();
if (StringUtils.isNotBlank(element)) {
log.info("Vip提前N天提醒,Redisson的延時隊列DelayQueue監聽器——Listener,監聽到 element={}",element);
//這時候,你應該知道為什麼把分隔符提出去,就是為了使用的統一
// public static final String SplitCharUserVip="-";
String[] arr = StringUtils.split(element,Constant.SplitCharUserVip);
Integer id = Integer.valueOf( arr[0]);
Integer type = Integer.valueOf(arr[1]);
UserVip vip = vipMapper.selectByPrimaryKey(id);
if (vip != null && 1==vip.getIsActive() && StringUtils.isNotBlank(vip.getEmail())) {
//TODO:區分第幾次提醒,發送對應消息
if (Constant.VipExpireFlg.First.getType().equals(type)) {
log.info("Vip提前N天提醒,第一次提醒");
String content=String.format(env.getProperty("vip.expire.first.content"),vip.getPhone());
mailService.sendSimpleEmail(env.getProperty("vip.expire.first.subject"),content,vip.getEmail());
} else {
//設定資料庫內會員資訊失效,就是把isActive由“1”變為“0”
int res = vipMapper.updateExpireVip(id);
if (res > 0) {
log.info("Vip提前N天提醒,第二次提醒");
String content=String.format(env.getProperty("vip.expire.end.content"),vip.getPhone());
mailService.sendSimpleEmail(env.getProperty("vip.expire.end.subject"),content,vip.getEmail());
}
}
}
}
}
}
4.線程池配置
/**
* 線程池配置類
*
* @author Yuezejian
* @date 2020年 08月22日 22:09:26
*/
@Configuration
public class ThreadConfig {
@Bean("threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor(){
//ThreadPoolTaskExecutor 底層直接使用了一個BlockingQueue,
// 初始容量為2147483647(0x7fffffff,2的31次方-1),即無界隊列
//線程池維護線程所允許的空閑時間,預設為60s, keepAliveSeconds = 60
//其内部使用隊列:BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity);
//createQueue()方法底層使用了new LinkedBlockingQueue(内部基于連結清單來存放元素)
//本汪說下:
//LinkedBlockingQueue内部分别使用了takeLock 和 putLock 對并發進行控制,也就是說,
//添加和删除操作并不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量
ThreadPoolTaskExecutor executor=new ThreadPoolTaskExecutor();
/*executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setKeepAliveSeconds(10);
executor.setQueueCapacity(8);*/
executor.setCorePoolSize(2);
executor.setMaxPoolSize(4);
executor.setKeepAliveSeconds(10);
executor.setQueueCapacity(4);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
5.運作結果