当队列中的消息过期,或者达到最大长度而被删除,或者达到最大空间时而被删除时,可以将这些被删除的信息推送到其他交换机中,让其他消费者订阅这些被删除的消息,处理这些消息。
准备要素:一个生产者 一个正常消费者(带一个正常交换机,一个路由键,一个正常队列) 一个处理过期消息的消费者(带一个死亡交换机,死亡路由键,死亡队列)
生产者MsgProducer :
@Component
public class MsgProducer {
//交换机名称
private static final String EXCHANGE_NAME = "amqp-hello";
private static final String EXCHANGE_NAME2 = "amqp-hello2";
//交换机和交换机绑定的key "mqkey"
private static final String ExCHANEROUTING_KEY = "mqkey";
//对列名称
private static final String QUEUE_NAME = "Hello-Queue1";
//routingkey 名称
private static final String ROUTING_KEY = "key-Queue";
//死亡路由键
private static final String DEADROUTING_KEY = "deadkey-Queue";
//死亡交换机
private static final String DEADEXCHANGE_NAME = "deadamqp-hello";
//处理死亡消息的对列名称
private static final String DEADQUEUE_NAME = "deadHello-Queue1";
public static void main(String[] args) throws Exception{
String message;
//1.获取连接
Connection connection = RabbitConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
//2.声明通道
Channel channel = connection.createChannel();
//交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 15000);
//通过这两个参数把两交换机连在一块!!
arguments.put("x-dead-letter-exchange", DEADEXCHANGE_NAME);
arguments.put("x-dead-letter-routing-key", DEADROUTING_KEY);
//3.声明(创建)队列 b:durable是否持久化, 队列的声明默认是存放到内存中的,如果true :rabbitmq重启队列不会丢失但是里面消息没了 false:队列和消息全没了
//b1:exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除
//b2:autoDelete:是否自动删除,当最后一个消费者断开连接之后队列里面消息是否自动被删除,当consumers = 0时队列里面消息就会自动删除
channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
//将交换器与队列通过路由键绑定 EXCHANGE_NAME 为与队列连接的交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
//死亡交换机
channel.exchangeDeclare(DEADEXCHANGE_NAME, "direct", false, false, null);
//死亡信息的队列
channel.queueDeclare(DEADQUEUE_NAME,true,false,false,null);
//将死亡交换器与死信队列通过死亡路由键绑定 DEADEXCHANGE_NAME 为与死信队列连接的死亡交换机
channel.queueBind(DEADQUEUE_NAME,DEADEXCHANGE_NAME,DEADROUTING_KEY);
for (int i=0;i<20;i++){
//4.定义消息内容
message = "hello rabbitmq-";
//5.发布消息 routingkey 是应用于多个消费者的时候明确如何给多个消费者分配信息
//BasicProperties props -- 消息的基本属性,例如路由头等。byte[] body -- 消息体
//void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
//注意此处EXCHANGE_NAME 为与生产者相连的交换机!! MessageProperties.PERSISTENT_TEXT_PLAIN 代表消息持久化(用别的方法还有单独为几条消息设置持久化的)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,null,(message+i).getBytes());
System.out.println("[x] send'" + (message+i).toString() + "'");
};
//6.关闭通道和连接
channel.close();
connection.close();
}
}
}
关键代码如下:
死亡交换机:DEADEXCHANGE_NAME
死亡路由键:DEADROUTING_KEY
上图显示TTL DLX DLK都是map中的参数
Map<String, Object> arguments = new HashMap<String, Object>();
//简称TTL
arguments.put("x-message-ttl", 15000);
//通过这两个参数把两交换机连在一块!!
//简称 DLX
arguments.put("x-dead-letter-exchange", DEADEXCHANGE_NAME);
//简称 DLK 其实这个路由键用正常交换机的也可以
arguments.put("x-dead-letter-routing-key", DEADROUTING_KEY);
//死亡交换机,死亡路由键放入正常的queueDeclare中
channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
处理正常消息的消费者MsgReceiver1:
@Component
public class MsgReceiver1 {
//交换机名称
private static final String EXCHANGE_NAME = "amqp-hello";
private static final String EXCHANGE_NAME2 = "amqp-hello2";
//交换机和交换机绑定的key "mqkey"
private static final String ExCHANEROUTING_KEY = "mqkey";
//对列名称
private static final String QUEUE_NAME = "Hello-Queue1";
//routingkey 名称
private static final String ROUTING_KEY = "key-Queue";
//死亡路由键
private static final String DEADROUTING_KEY = "deadkey-Queue";
//死亡交换机
private static final String DEADEXCHANGE_NAME = "deadamqp-hello";
//处理死亡消息的对列名称
private static final String DEADQUEUE_NAME = "deadHello-Queue1";
public static void main(String[] args) throws Exception {
Connection connection = RabbitConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest");
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 15000);
//通过这两个参数把两交换机连在一块!!
arguments.put("x-dead-letter-exchange", DEADEXCHANGE_NAME);
arguments.put("x-dead-letter-routing-key", DEADROUTING_KEY);
//3.声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
//需要设置每次从队列获取消息的数量 能者多劳的原则不至于做的快的做完了就歇着了!
channel.basicQos(1);
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY1);
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY2);
//4.定义队列的消费者
QueueingConsumer queueingConsumer1 = new QueueingConsumer(channel);
/*
true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息成功消费.
false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,
如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息,
直到该消费者反馈.例如:channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
*/
//b:ack
channel.basicConsume(QUEUE_NAME, false,queueingConsumer1);
// channel.basicConsume(QUEUE_NAME,true,queueingConsumer2);
//6.获取消息
while (true) {
anInt += 1;
QueueingConsumer.Delivery delivery1 = queueingConsumer1.nextDelivery();
// QueueingConsumer.Delivery delivery2 = queueingConsumer2.nextDelivery();
String message1 = new String(delivery1.getBody());
// String message2 = String.valueOf(delivery2.getBody());
System.out.println("[" + String.valueOf(anInt) + "]:receve msg:" + message1);
//手动确认ack 是否批量处理.true:将一次性ack所有小于deliveryTag的消息
channel.basicAck(delivery1.getEnvelope().getDeliveryTag(), false);
// System.out.println("[x] Received '" + message2 + "'");
Thread.sleep(500);
}
}
}
处理过期消息的消费者MsgReceiver2
@Component
public class MsgReceiver2 {
private static int anInt = 0;
//交换机名称
private static final String EXCHANGE_NAME = "amqp-hello";
private static final String EXCHANGE_NAME2 = "amqp-hello2";
//交换机和交换机绑定的key "mqkey"
private static final String ExCHANEROUTING_KEY = "mqkey";
//对列名称
private static final String QUEUE_NAME = "Hello-Queue1";
//routingkey 名称
private static final String ROUTING_KEY = "key-Queue";
//死亡路由键
private static final String DEADROUTING_KEY = "deadkey-Queue";
//死亡交换机
private static final String DEADEXCHANGE_NAME = "deadamqp-hello";
//处理死亡消息的对列名称
private static final String DEADQUEUE_NAME = "deadHello-Queue1";
//routingkey 名称
// private static final String ROUTING_KEY1 = "key1-Queue";
private static final String ROUTING_KEY2 = "key2-Queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest");
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(DEADQUEUE_NAME, true, false, false, null);
//轮询时需要设置每次从队列获取消息的数量 能者多劳的原则不至于做的快的做完了就歇着了!轮询时需要
//这个不起作用
channel.basicQos(1);
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY1);
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY2);
//4.定义队列的消费者
QueueingConsumer queueingConsumer1 = new QueueingConsumer(channel);
/*
true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息成功消费.
false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,
如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息,
直到该消费者反馈.例如:channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
*/
//b:ack
channel.basicConsume(DEADQUEUE_NAME, false, queueingConsumer1);
// channel.basicConsume(QUEUE_NAME,true,queueingConsumer2);
//6.获取消息
while (true) {
anInt += 1;
QueueingConsumer.Delivery delivery1 = queueingConsumer1.nextDelivery();
// QueueingConsumer.Delivery delivery2 = queueingConsumer2.nextDelivery();
String message1 = new String(delivery1.getBody());
// String message2 = String.valueOf(delivery2.getBody());
System.out.println("[" + String.valueOf(anInt) + "]:receve msg:" + message1);
//手动确认ack 是否批量处理.true:将一次性ack所有小于deliveryTag的消息
channel.basicAck(delivery1.getEnvelope().getDeliveryTag(), false);
// System.out.println("[x] Received '" + message2 + "'");
Thread.sleep(500);
}
}
先开启多个消费者在开启生产者。最后运行生产者后,发现MsgReceiver2在等待,当MsgReceiver1把正常消息消费完后,出现过期的消息MsgReceiver2才开始消费
运行结果:
生产者50条
[x] send'hello rabbitmq-0'
[x] send'hello rabbitmq-1'
[x] send'hello rabbitmq-2'
[x] send'hello rabbitmq-3'
[x] send'hello rabbitmq-4'
[x] send'hello rabbitmq-5'
[x] send'hello rabbitmq-6'
[x] send'hello rabbitmq-7'
[x] send'hello rabbitmq-8'
[x] send'hello rabbitmq-9'
[x] send'hello rabbitmq-10'
[x] send'hello rabbitmq-11'
[x] send'hello rabbitmq-12'
[x] send'hello rabbitmq-13'
[x] send'hello rabbitmq-14'
[x] send'hello rabbitmq-15'
[x] send'hello rabbitmq-16'
[x] send'hello rabbitmq-17'
[x] send'hello rabbitmq-18'
[x] send'hello rabbitmq-19'
[x] send'hello rabbitmq-20'
[x] send'hello rabbitmq-21'
[x] send'hello rabbitmq-22'
[x] send'hello rabbitmq-23'
[x] send'hello rabbitmq-24'
[x] send'hello rabbitmq-25'
[x] send'hello rabbitmq-26'
[x] send'hello rabbitmq-27'
[x] send'hello rabbitmq-28'
[x] send'hello rabbitmq-29'
[x] send'hello rabbitmq-30'
[x] send'hello rabbitmq-31'
[x] send'hello rabbitmq-32'
[x] send'hello rabbitmq-33'
[x] send'hello rabbitmq-34'
[x] send'hello rabbitmq-35'
[x] send'hello rabbitmq-36'
[x] send'hello rabbitmq-37'
[x] send'hello rabbitmq-38'
[x] send'hello rabbitmq-39'
[x] send'hello rabbitmq-40'
[x] send'hello rabbitmq-41'
[x] send'hello rabbitmq-42'
[x] send'hello rabbitmq-43'
[x] send'hello rabbitmq-44'
[x] send'hello rabbitmq-45'
[x] send'hello rabbitmq-46'
[x] send'hello rabbitmq-47'
[x] send'hello rabbitmq-48'
[x] send'hello rabbitmq-49'
MsgReceiver1正常处理31条
[1]:receve msg:hello rabbitmq-0
[2]:receve msg:hello rabbitmq-1
[3]:receve msg:hello rabbitmq-2
[4]:receve msg:hello rabbitmq-3
[5]:receve msg:hello rabbitmq-4
[6]:receve msg:hello rabbitmq-5
[7]:receve msg:hello rabbitmq-6
[8]:receve msg:hello rabbitmq-7
[9]:receve msg:hello rabbitmq-8
[10]:receve msg:hello rabbitmq-9
[11]:receve msg:hello rabbitmq-10
[12]:receve msg:hello rabbitmq-11
[13]:receve msg:hello rabbitmq-12
[14]:receve msg:hello rabbitmq-13
[15]:receve msg:hello rabbitmq-14
[16]:receve msg:hello rabbitmq-15
[17]:receve msg:hello rabbitmq-16
[18]:receve msg:hello rabbitmq-17
[19]:receve msg:hello rabbitmq-18
[20]:receve msg:hello rabbitmq-19
[21]:receve msg:hello rabbitmq-20
[22]:receve msg:hello rabbitmq-21
[23]:receve msg:hello rabbitmq-22
[24]:receve msg:hello rabbitmq-23
[25]:receve msg:hello rabbitmq-24
[26]:receve msg:hello rabbitmq-25
[27]:receve msg:hello rabbitmq-26
[28]:receve msg:hello rabbitmq-27
[29]:receve msg:hello rabbitmq-28
[30]:receve msg:hello rabbitmq-29
[31]:receve msg:hello rabbitmq-30
MsgReceiver2处理过期消息19条
[1]:receve msg:hello rabbitmq-31
[2]:receve msg:hello rabbitmq-32
[3]:receve msg:hello rabbitmq-33
[4]:receve msg:hello rabbitmq-34
[5]:receve msg:hello rabbitmq-35
[6]:receve msg:hello rabbitmq-36
[7]:receve msg:hello rabbitmq-37
[8]:receve msg:hello rabbitmq-38
[9]:receve msg:hello rabbitmq-39
[10]:receve msg:hello rabbitmq-40
[11]:receve msg:hello rabbitmq-41
[12]:receve msg:hello rabbitmq-42
[13]:receve msg:hello rabbitmq-43
[14]:receve msg:hello rabbitmq-44
[15]:receve msg:hello rabbitmq-45
[16]:receve msg:hello rabbitmq-46
[17]:receve msg:hello rabbitmq-47
[18]:receve msg:hello rabbitmq-48
[19]:receve msg:hello rabbitmq-49