天天看点

RabbitMq的延时队列以及本地延迟队列

之前对延迟队列业务场景的理解,应该就是像延时订单未付款的自动取消、短信重发通知、或者其他终结状态的触发。但是也有另外一种场景可以用:比如场景切换,服务复用的时候可以考虑一下。比如讲讲开发中碰到的问题吧。是一个信息授权的场景。以前新增一个下级渠道商,需要手动去授权本渠道的销售信息给下一级渠道。然后现在需要改造,新增的时候同时授权过去。以前授权信息的消费者是定时(几分钟一次)去读渠道商的数据库信息,其他时候是拿到缓存里的渠道商去同步授权。所以就存在一个问题,即刻创建下一级渠道即刻去触发授权,授权消费者还没重新去捞渠道商,目前渠道商的缓存里没有新增的那一个,就没有授权信息下去。然而捞不到就不要同步,没毛病页不用报错。只能说这设计不适用当前我们改造的场景罢了。其实也很纳闷,消息带过去的渠道ID消费者在当前缓存找不到渠道,就去数据库查,如果有就直接加载到缓存一起同步授权不就好了,不就能适应更多场景,毕竟作为一个公用同步模块。但是已有的没办法改造,想想别的办法把。那就。。。等消费者去执行下一次捞渠道商再去触发他吧,那就是延迟发送消息了,延迟队列想法就这样用上了。

尝试了两种延迟队列,分别是java.util.concurrent.Delayed包下的本地延迟队列和RabbitMq的死信队列。对比了下肯定是要是要使用RabbitMq异步中间件的。DelayQueue是一个无界的BlockingQueue,实现的是一个单机的、JVM内存中的延迟队列,并没有集群的支持,而且无法满足在对业务系统泵机(即服务重启也会丢失)的时、消息消费异常的时候做相应的逻辑处理。但是也附上本地延迟队列的实现:

(1)先定义一个实现Delayed是队列消息

RabbitMq的延时队列以及本地延迟队列
(2)再定义消费者,实现线程Runnable,等待线程池调用
RabbitMq的延时队列以及本地延迟队列

(3)发送延迟队列,等待消费

DelayQueue queue = new DelayQueue();

AuthGoodsMessage message = new AuthGoodsMessage(new Random().nextInt(), dto, 120);

queue.offer(message);

ExecutorService exec = Executors.newFixedThreadPool(1);

exec.execute(new AuthGoodsConsumer(queue));

exec.shutdown();

对于延迟消息RabbitMq的实战:

RabbitMq的延时队列以及本地延迟队列
(1)定义队列以及路由、交换机的配置(类加上注解@Configuration)

@Bean
public Queue businessTestDeadQueue(){
    Map<String,Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange","business.test.dead.exchange.name");
    args.put("x-dead-letter-routing-key","business.test.dead.routing.key.name");
    args.put("x-message-ttl",120 * 1000);
    return new Queue("business.test.dead.queue.name",true,false,false,args);
}

@Bean
 public TopicExchange businessTestDeadExchange(){
    return new TopicExchange("business.test.dead.produce.exchange.name",true,false);
}

@Bean
public Binding businessTestDeadBinding() {
    return BindingBuilder.bind(goodsAuthDeadQueue()).to(goodsAuthDeadExchange()).with("business.test.dead.produce.routing.key.name");
}

@Bean
public Queue businessTestDeadRealQueue(){
    return new Queue("business.test.dead.real.queue.name",true);
}

@Bean
public TopicExchange businessTestDeadRealExchange(){
    return new TopicExchange("business.test.dead.exchange.name",true,false);
}

@Bean
public Binding businessTestDeadRealBinding() {
    return BindingBuilder.bind(goodsAuthDeadRealQueue()).to(goodsAuthDeadRealExchange()).with("business.test.dead.routing.key.name");
}           

(2)定义真实消费者(类加上注解@RabbitListener(queues = "business.test.dead.real.queue.name", containerFactory = ListenerSelector.multiThread)

public void exe(@Payload byte[] body) {
    String data = new String(body);
    logger.info("----------接受延时信息:{}------------------", data);
    具体实体 object = JSONObject.parseObject(data, 具体实体.class);

 }
           

(3)发送延迟消息

RabbitMq的延时队列以及本地延迟队列