天天看點

咖啡汪日志————使用Redisson延遲隊列,實作會員到期前N天提醒

這個其實不怎麼推薦,當然她使用的話,也是沒有問題的,隻是因為用到了定時器

是以不如RMapCache好用,大家可以當作是了解Redisson延時隊列來看待這篇部落格

源碼連結:

https://github.com/HuskyCorps/distributeMiddleware

咖啡汪日志————使用Redisson延遲隊列,實作會員到期前N天提醒

0.

application.properties

#使用者會員到期提醒
vip.expire.first.subject=會員即将到期提醒【泰達便民服務平台-http://www.tjxstech.com/】
vip.expire.first.content=手機為:%s 的使用者,您好!您的會員有效期即将失效,請您前往平台續費~祝您生活愉快【泰達便民服務平台-http://www.tjxstech.com/】

vip.expire.end.subject=會員到期提醒【泰達便民服務平台-http://www.tjxstech.com/】
vip.expire.end.content=手機為:%s 的使用者,您好!您的會員有效期已經失效,為了您有更好的體驗,請您前往平台繼續續費~祝您生活愉快【泰達便民服務平台-http://www.tjxstech.com/】


           

1.controller

/**
 * Redisson延遲隊列DelayedQueue,實作會員到期前N天提醒
 *
 * @author Yuezejian
 * @date 2020年 09月09日 19:57:53
 */
@RestController
@RequestMapping("vip")
public class VipController {
    private static final Logger log = LoggerFactory.getLogger(VipController.class);

    @Autowired
    private VipService vipService;

    @RequestMapping(value = "put" ,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)application/json;charset=UTF-8
    public BaseResponse putVip(@RequestBody @Validated UserVip userVip, BindingResult result) {
        String checkRes = ValidatorUtil.checkResult(result);
        if (StringUtils.isNotBlank(checkRes)) {
            return new BaseResponse(StatusCode.InvalidParams.getCode(),checkRes);
        }
        BaseResponse response = new BaseResponse(StatusCode.Success);
        try {
            vipService.addVip(userVip);
            log.info("————成功充值會員  "+userVip.getVipDay()+"  天");

        } catch (Exception e) {
            log.error("——————————充值會員-發生異常:",e.fillInStackTrace());
            response = new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
        }
        return response;
    }
}
           

2.service

/**
 * 基于Redisson延遲隊列DelayQueue,實作會員到期前N天提醒
 *
 * @author Yuezejian
 * @date 2020年 09月09日 20:16:35
 */
@Service
public class VipService {
    private static final Logger log = LoggerFactory.getLogger(VipService.class);

    @Autowired
    private UserVipMapper userVipMapper;

    @Autowired
    private RedissonClient redissonClient;

    @Transactional(rollbackFor = Exception.class)
    public void addVip(UserVip vip) throws Exception {
        vip.setVipTime(DateTime.now().toDate());
        int res = userVipMapper.insertSelective(vip);
        //TODO:充值成功(現實一般是需要走支付的..在這裡以成功插入db為準) - 設定兩個過期提醒時間,
        //TODO:一個是vipDay後的;一個是在到達vipDay前 x 的時間
        //TODO:如,vipDay=10天,x=2,即代表vip到期 前2天 提醒一次,vip到期時提醒一次,即
        //TODO:第一次提醒的時間點為:ttl=10-2=8,即距離現在開始的8天後進行第一次提醒;
        //TODO:第二次提醒的時間點是:ttl=10,即距離現在開始的10天後進行第二次提醒   -- 以此類推
        //TODO: (時間的話,建議轉化為s;當然啦,具體業務具體設定即可)
        //TODO:基于redisson的延遲隊列實作,重點就在于 ttl 的計算
        // (為了測試友善,在這裡我們以 s 為機關);
        //TODO:如果是多次提醒的話,需要做标記
        if (res > 0 ) {
            RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(Constant.RedissonUserVipQueue);
            RDelayedQueue<String> rDelayedQueue = redissonClient.getDelayedQueue(blockingDeque);

            //TODO:第一次提醒
            //這個value,實際是這樣的value=vipId+"-"+"1",{value="14-1"}
            // 下面的處理隻是為了構架考慮,是以外提了出去。
            //為了友善大家了解,本汪把相關代碼都加到了注釋裡
            // public static final String SplitCharUserVip="-";
            // 使用者會員到期前的多次提醒的辨別
            //    public enum VipExpireFlg{
            //        First(1),
            //        End(2),
            //        ;
            //
            //        private Integer type;
            //
            //        VipExpireFlg(Integer type) {
            //            this.type = type;
            //        }
            //
            //        public Integer getType() {
            //            return type;
            //        }
            //
            //        public void setType(Integer type) {
            //            this.type = type;
            //        }
            //    }
            String value = vip.getId()+Constant.SplitCharUserVip+Constant.VipExpireFlg.First.getType();
            Long firstTTL = Long.valueOf(String.valueOf(vip.getVipDay()-Constant.x));
            if ( firstTTL > 0 ) {
                //在firstTTL秒内,把value對象移動到目标隊列中去
                rDelayedQueue.offer(value,firstTTL,TimeUnit.SECONDS);
            }
            //TODO:第二次提醒
            //這個value,實際是value=vipId+"-"+"2",{value="14-2"}
            value = vip.getId()+Constant.SplitCharUserVip+Constant.VipExpireFlg.End.getType();
            Long secondTTL = Long.valueOf(vip.getVipDay());
            //在secondTTL秒内,把value對象移動到目标隊列中去
            rDelayedQueue.offer(value,secondTTL,TimeUnit.SECONDS);
            //本汪帶大家看一下官方的執行個體
            //Java的基于Redis的DelayedQueue對象允許将每個元素以指定的延遲傳輸到目标隊列。
            // 對于将消息傳遞給消費者的指數退避政策可能很有用。目标隊列可以是任何隊列實作的RQueue接口。
            //RBlockingQueue<String> distinationQueue = ...
            //RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
            //--》move object to distinationQueue in 10 seconds
            //delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
            //--》move object to distinationQueue in 1 minutes
            //delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
            //
            //
            //--》 msg1 will appear in 15 seconds
            //distinationQueue.poll(15, TimeUnit.SECONDS);
            //
            //--》msg2 will appear in 2 seconds
            //distinationQueue.poll(2, TimeUnit.SECONDS);



        }

    }
}
           

3.監聽器

/**
 * Redisson的延時隊列DelayQueue,Vip提前N天提醒——Listener
 *
 * @author Yuezejian
 * @date 2020年 09月09日 21:07:25
 */
@Component
public class VipQueueListener {
    private static final Logger log = LoggerFactory.getLogger(VipQueueListener.class);


    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private UserVipMapper vipMapper;

    @Autowired
    private MailService mailService;

    @Autowired
    private Environment env;//環境變量執行個體


    //實時監聽延時隊列中的代理消息
    @Async("threadPoolTaskExecutor")
    @Scheduled(cron = "0/5 * * * * ?")
    public void listenExpireVip() {
        RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(Constant.RedissonUserVipQueue);
        if (blockingDeque != null && !blockingDeque.isEmpty()) {
            //本汪說下,此處我們所取到的element,就是我們
            //rDelayedQueue.offer(value,secondTTL,TimeUnit.SECONDS);
            //放進去的value了,他的格式是“14-1”
            String element = blockingDeque.poll();

            if (StringUtils.isNotBlank(element)) {
                log.info("Vip提前N天提醒,Redisson的延時隊列DelayQueue監聽器——Listener,監聽到 element={}",element);
                //這時候,你應該知道為什麼把分隔符提出去,就是為了使用的統一
                // public static final String SplitCharUserVip="-";
                String[] arr = StringUtils.split(element,Constant.SplitCharUserVip);
                Integer id = Integer.valueOf( arr[0]);
                Integer type = Integer.valueOf(arr[1]);
                UserVip vip = vipMapper.selectByPrimaryKey(id);
                if (vip != null && 1==vip.getIsActive() && StringUtils.isNotBlank(vip.getEmail())) {
                    //TODO:區分第幾次提醒,發送對應消息
                    if (Constant.VipExpireFlg.First.getType().equals(type)) {
                        log.info("Vip提前N天提醒,第一次提醒");
                        String content=String.format(env.getProperty("vip.expire.first.content"),vip.getPhone());
                        mailService.sendSimpleEmail(env.getProperty("vip.expire.first.subject"),content,vip.getEmail());
                    } else {
                        //設定資料庫內會員資訊失效,就是把isActive由“1”變為“0”
                        int res = vipMapper.updateExpireVip(id);
                        if (res > 0) {
                            log.info("Vip提前N天提醒,第二次提醒");
                            String content=String.format(env.getProperty("vip.expire.end.content"),vip.getPhone());
                            mailService.sendSimpleEmail(env.getProperty("vip.expire.end.subject"),content,vip.getEmail());
                        }
                    }
                }

            }
        }
    }
           

4.線程池配置

/**
 * 線程池配置類
 *
 * @author Yuezejian
 * @date 2020年 08月22日 22:09:26
 */
@Configuration
public class ThreadConfig {

    @Bean("threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor(){
        //ThreadPoolTaskExecutor 底層直接使用了一個BlockingQueue,
        // 初始容量為2147483647(0x7fffffff,2的31次方-1),即無界隊列
        //線程池維護線程所允許的空閑時間,預設為60s, keepAliveSeconds = 60
        //其内部使用隊列:BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity);
        //createQueue()方法底層使用了new LinkedBlockingQueue(内部基于連結清單來存放元素)
        //本汪說下:
        //LinkedBlockingQueue内部分别使用了takeLock 和 putLock 對并發進行控制,也就是說,
        //添加和删除操作并不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量
        ThreadPoolTaskExecutor executor=new ThreadPoolTaskExecutor();
        /*executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setKeepAliveSeconds(10);
        executor.setQueueCapacity(8);*/

        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(4);
        executor.setKeepAliveSeconds(10);
        executor.setQueueCapacity(4);

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}
           

5.運作結果

咖啡汪日志————使用Redisson延遲隊列,實作會員到期前N天提醒
咖啡汪日志————使用Redisson延遲隊列,實作會員到期前N天提醒
咖啡汪日志————使用Redisson延遲隊列,實作會員到期前N天提醒
咖啡汪日志————使用Redisson延遲隊列,實作會員到期前N天提醒