分享知识 传递快乐
因为需要处理死信队列问 RocketMQ 官方死信队列配置,将死信队列配置到服务中,代码配置啥的都没问题,可就是不消费。经过多从磨难终于抛开云雾见明月,下面记录一下。
pom.xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
yml
spring:
cloud:
### Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
### Binding 配置项,对应 BindingProperties Map
bindings:
# 事务消费者
tx-input:
destination: tx-topic
contentType: application/json
group: tx-group
consumer:
#多线程
concurrency: 20
#重试1次
maxAttempts: 2
# 死信队列配置
dlq-input:
destination: '%DLQ%${spring.cloud.stream.bindings.tx-input.group}'
contentType: application/json
group: dlq-group
consumer:
concurrency: 20
### Spring Cloud Stream RocketMQ 配置项
rocketmq:
binder:
name-server: 172.16.1.100:9876 # RocketMQ 服务器地址
bindings:
tx-input:
consumer:
# 异步消费消息模式下消费失败重试策略,默认为 0,重试失败直接进入死信队列
delayLevelWhenNextConsume: -1
java
@StreamListener(ConsumeConfig.ConsumeSink.DLQ_INPUT)
void dlqInput(Message message);
在使用控制台查询死信消息队列的时候会报错:
org.apache.rocketmq.client.exception.MQClientException: Can not find Message Queue for this topic, %DLQ%tx-group See http://rocketmq.apache.org/docs/faq/ for further details.
第一次遇到这种问题TMD无从下手,为了解决问题,不停的查阅资料、尝试,最后终于在 github 上的 RocketMQ 官方评论中知道上面的问题,竟然是是 RocketMQ 权限问题。虽然知道是权限问题了,接下来怎么改?改成什么?都是一无所知。有陷入沉思......
通过 RocketMQ 的管理命令查看所有的 topic,%DLQ%tx-group 确实是存在,那么为啥不消费呢?
在查看该 topic 信息时,发现 perm 为 2,
bin/mqadmin topicRoute -n 172.16.1.100:9876 -t %DLQ%tx-group
当看到这里时终于看见的黎明的太阳,于是修改 topic 的 perm 为 6:
bin/mqadmin updateTopic -b 172.16.1.100:10911 -n 172.16.1.100:9876 -t %DLQ%tx-group -p 6
当消费者再次连续达到指定失败次数时,就会走到死信队列中消费。
RocketMQ权限
- PermName.PERM_INHERIT:Topic继承,value:1
- PermName.PERM_READ:Topic读,value:4
- PermName.PERM_WRITE:Topic写,value:2
perm=7 #可继承、读、写
perm=4 #可读
perm=2 #可写
RocketMQ的Topic自动新建是通过继承TBW102实现的,如果把TBW102的perm改为6,将不会自动创建Topic。