當隊列中的消息過期,或者達到最大長度而被删除,或者達到最大空間時而被删除時,可以将這些被删除的資訊推送到其他交換機中,讓其他消費者訂閱這些被删除的消息,處理這些消息。
準備要素:一個生産者 一個正常消費者(帶一個正常交換機,一個路由鍵,一個正常隊列) 一個處理過期消息的消費者(帶一個死亡交換機,死亡路由鍵,死亡隊列)
生産者MsgProducer :
@Component
public class MsgProducer {
//交換機名稱
private static final String EXCHANGE_NAME = "amqp-hello";
private static final String EXCHANGE_NAME2 = "amqp-hello2";
//交換機和交換機綁定的key "mqkey"
private static final String ExCHANEROUTING_KEY = "mqkey";
//對列名稱
private static final String QUEUE_NAME = "Hello-Queue1";
//routingkey 名稱
private static final String ROUTING_KEY = "key-Queue";
//死亡路由鍵
private static final String DEADROUTING_KEY = "deadkey-Queue";
//死亡交換機
private static final String DEADEXCHANGE_NAME = "deadamqp-hello";
//處理死亡消息的對列名稱
private static final String DEADQUEUE_NAME = "deadHello-Queue1";
public static void main(String[] args) throws Exception{
String message;
//1.擷取連接配接
Connection connection = RabbitConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
//2.聲明通道
Channel channel = connection.createChannel();
//交換機
channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 15000);
//通過這兩個參數把兩交換機連在一塊!!
arguments.put("x-dead-letter-exchange", DEADEXCHANGE_NAME);
arguments.put("x-dead-letter-routing-key", DEADROUTING_KEY);
//3.聲明(建立)隊列 b:durable是否持久化, 隊列的聲明預設是存放到記憶體中的,如果true :rabbitmq重新開機隊列不會丢失但是裡面消息沒了 false:隊列和消息全沒了
//b1:exclusive:是否排外的,有兩個作用,一:當連接配接關閉時connection.close()該隊列是否會自動删除
//b2:autoDelete:是否自動删除,當最後一個消費者斷開連接配接之後隊列裡面消息是否自動被删除,當consumers = 0時隊列裡面消息就會自動删除
channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
//将交換器與隊列通過路由鍵綁定 EXCHANGE_NAME 為與隊列連接配接的交換機
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
//死亡交換機
channel.exchangeDeclare(DEADEXCHANGE_NAME, "direct", false, false, null);
//死亡資訊的隊列
channel.queueDeclare(DEADQUEUE_NAME,true,false,false,null);
//将死亡交換器與死信隊列通過死亡路由鍵綁定 DEADEXCHANGE_NAME 為與死信隊列連接配接的死亡交換機
channel.queueBind(DEADQUEUE_NAME,DEADEXCHANGE_NAME,DEADROUTING_KEY);
for (int i=0;i<20;i++){
//4.定義消息内容
message = "hello rabbitmq-";
//5.釋出消息 routingkey 是應用于多個消費者的時候明确如何給多個消費者配置設定資訊
//BasicProperties props -- 消息的基本屬性,例如路由頭等。byte[] body -- 消息體
//void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
//注意此處EXCHANGE_NAME 為與生産者相連的交換機!! MessageProperties.PERSISTENT_TEXT_PLAIN 代表消息持久化(用别的方法還有單獨為幾條消息設定持久化的)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,null,(message+i).getBytes());
System.out.println("[x] send'" + (message+i).toString() + "'");
};
//6.關閉通道和連接配接
channel.close();
connection.close();
}
}
}
關鍵代碼如下:
死亡交換機:DEADEXCHANGE_NAME
死亡路由鍵:DEADROUTING_KEY
上圖顯示TTL DLX DLK都是map中的參數
Map<String, Object> arguments = new HashMap<String, Object>();
//簡稱TTL
arguments.put("x-message-ttl", 15000);
//通過這兩個參數把兩交換機連在一塊!!
//簡稱 DLX
arguments.put("x-dead-letter-exchange", DEADEXCHANGE_NAME);
//簡稱 DLK 其實這個路由鍵用正常交換機的也可以
arguments.put("x-dead-letter-routing-key", DEADROUTING_KEY);
//死亡交換機,死亡路由鍵放入正常的queueDeclare中
channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
處理正常消息的消費者MsgReceiver1:
@Component
public class MsgReceiver1 {
//交換機名稱
private static final String EXCHANGE_NAME = "amqp-hello";
private static final String EXCHANGE_NAME2 = "amqp-hello2";
//交換機和交換機綁定的key "mqkey"
private static final String ExCHANEROUTING_KEY = "mqkey";
//對列名稱
private static final String QUEUE_NAME = "Hello-Queue1";
//routingkey 名稱
private static final String ROUTING_KEY = "key-Queue";
//死亡路由鍵
private static final String DEADROUTING_KEY = "deadkey-Queue";
//死亡交換機
private static final String DEADEXCHANGE_NAME = "deadamqp-hello";
//處理死亡消息的對列名稱
private static final String DEADQUEUE_NAME = "deadHello-Queue1";
public static void main(String[] args) throws Exception {
Connection connection = RabbitConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest");
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 15000);
//通過這兩個參數把兩交換機連在一塊!!
arguments.put("x-dead-letter-exchange", DEADEXCHANGE_NAME);
arguments.put("x-dead-letter-routing-key", DEADROUTING_KEY);
//3.聲明隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
//需要設定每次從隊列擷取消息的數量 能者多勞的原則不至于做的快的做完了就歇着了!
channel.basicQos(1);
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY1);
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY2);
//4.定義隊列的消費者
QueueingConsumer queueingConsumer1 = new QueueingConsumer(channel);
/*
true:表示自動确認,隻要消息從隊列中擷取,無論消費者擷取到消息後是否成功消費,都會認為消息成功消費.
false:表示手動确認,消費者擷取消息後,伺服器會将該消息标記為不可用狀态,等待消費者的回報,
如果消費者一直沒有回報,那麼該消息将一直處于不可用狀态,并且伺服器會認為該消費者已經挂掉,不會再給其發送消息,
直到該消費者回報.例如:channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
*/
//b:ack
channel.basicConsume(QUEUE_NAME, false,queueingConsumer1);
// channel.basicConsume(QUEUE_NAME,true,queueingConsumer2);
//6.擷取消息
while (true) {
anInt += 1;
QueueingConsumer.Delivery delivery1 = queueingConsumer1.nextDelivery();
// QueueingConsumer.Delivery delivery2 = queueingConsumer2.nextDelivery();
String message1 = new String(delivery1.getBody());
// String message2 = String.valueOf(delivery2.getBody());
System.out.println("[" + String.valueOf(anInt) + "]:receve msg:" + message1);
//手動确認ack 是否批量處理.true:将一次性ack所有小于deliveryTag的消息
channel.basicAck(delivery1.getEnvelope().getDeliveryTag(), false);
// System.out.println("[x] Received '" + message2 + "'");
Thread.sleep(500);
}
}
}
處理過期消息的消費者MsgReceiver2
@Component
public class MsgReceiver2 {
private static int anInt = 0;
//交換機名稱
private static final String EXCHANGE_NAME = "amqp-hello";
private static final String EXCHANGE_NAME2 = "amqp-hello2";
//交換機和交換機綁定的key "mqkey"
private static final String ExCHANEROUTING_KEY = "mqkey";
//對列名稱
private static final String QUEUE_NAME = "Hello-Queue1";
//routingkey 名稱
private static final String ROUTING_KEY = "key-Queue";
//死亡路由鍵
private static final String DEADROUTING_KEY = "deadkey-Queue";
//死亡交換機
private static final String DEADEXCHANGE_NAME = "deadamqp-hello";
//處理死亡消息的對列名稱
private static final String DEADQUEUE_NAME = "deadHello-Queue1";
//routingkey 名稱
// private static final String ROUTING_KEY1 = "key1-Queue";
private static final String ROUTING_KEY2 = "key2-Queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest");
Channel channel = connection.createChannel();
//3.聲明隊列
channel.queueDeclare(DEADQUEUE_NAME, true, false, false, null);
//輪詢時需要設定每次從隊列擷取消息的數量 能者多勞的原則不至于做的快的做完了就歇着了!輪詢時需要
//這個不起作用
channel.basicQos(1);
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY1);
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY2);
//4.定義隊列的消費者
QueueingConsumer queueingConsumer1 = new QueueingConsumer(channel);
/*
true:表示自動确認,隻要消息從隊列中擷取,無論消費者擷取到消息後是否成功消費,都會認為消息成功消費.
false:表示手動确認,消費者擷取消息後,伺服器會将該消息标記為不可用狀态,等待消費者的回報,
如果消費者一直沒有回報,那麼該消息将一直處于不可用狀态,并且伺服器會認為該消費者已經挂掉,不會再給其發送消息,
直到該消費者回報.例如:channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
*/
//b:ack
channel.basicConsume(DEADQUEUE_NAME, false, queueingConsumer1);
// channel.basicConsume(QUEUE_NAME,true,queueingConsumer2);
//6.擷取消息
while (true) {
anInt += 1;
QueueingConsumer.Delivery delivery1 = queueingConsumer1.nextDelivery();
// QueueingConsumer.Delivery delivery2 = queueingConsumer2.nextDelivery();
String message1 = new String(delivery1.getBody());
// String message2 = String.valueOf(delivery2.getBody());
System.out.println("[" + String.valueOf(anInt) + "]:receve msg:" + message1);
//手動确認ack 是否批量處理.true:将一次性ack所有小于deliveryTag的消息
channel.basicAck(delivery1.getEnvelope().getDeliveryTag(), false);
// System.out.println("[x] Received '" + message2 + "'");
Thread.sleep(500);
}
}
先開啟多個消費者在開啟生産者。最後運作生産者後,發現MsgReceiver2在等待,當MsgReceiver1把正常消息消費完後,出現過期的消息MsgReceiver2才開始消費
運作結果:
生産者50條
[x] send'hello rabbitmq-0'
[x] send'hello rabbitmq-1'
[x] send'hello rabbitmq-2'
[x] send'hello rabbitmq-3'
[x] send'hello rabbitmq-4'
[x] send'hello rabbitmq-5'
[x] send'hello rabbitmq-6'
[x] send'hello rabbitmq-7'
[x] send'hello rabbitmq-8'
[x] send'hello rabbitmq-9'
[x] send'hello rabbitmq-10'
[x] send'hello rabbitmq-11'
[x] send'hello rabbitmq-12'
[x] send'hello rabbitmq-13'
[x] send'hello rabbitmq-14'
[x] send'hello rabbitmq-15'
[x] send'hello rabbitmq-16'
[x] send'hello rabbitmq-17'
[x] send'hello rabbitmq-18'
[x] send'hello rabbitmq-19'
[x] send'hello rabbitmq-20'
[x] send'hello rabbitmq-21'
[x] send'hello rabbitmq-22'
[x] send'hello rabbitmq-23'
[x] send'hello rabbitmq-24'
[x] send'hello rabbitmq-25'
[x] send'hello rabbitmq-26'
[x] send'hello rabbitmq-27'
[x] send'hello rabbitmq-28'
[x] send'hello rabbitmq-29'
[x] send'hello rabbitmq-30'
[x] send'hello rabbitmq-31'
[x] send'hello rabbitmq-32'
[x] send'hello rabbitmq-33'
[x] send'hello rabbitmq-34'
[x] send'hello rabbitmq-35'
[x] send'hello rabbitmq-36'
[x] send'hello rabbitmq-37'
[x] send'hello rabbitmq-38'
[x] send'hello rabbitmq-39'
[x] send'hello rabbitmq-40'
[x] send'hello rabbitmq-41'
[x] send'hello rabbitmq-42'
[x] send'hello rabbitmq-43'
[x] send'hello rabbitmq-44'
[x] send'hello rabbitmq-45'
[x] send'hello rabbitmq-46'
[x] send'hello rabbitmq-47'
[x] send'hello rabbitmq-48'
[x] send'hello rabbitmq-49'
MsgReceiver1正常處理31條
[1]:receve msg:hello rabbitmq-0
[2]:receve msg:hello rabbitmq-1
[3]:receve msg:hello rabbitmq-2
[4]:receve msg:hello rabbitmq-3
[5]:receve msg:hello rabbitmq-4
[6]:receve msg:hello rabbitmq-5
[7]:receve msg:hello rabbitmq-6
[8]:receve msg:hello rabbitmq-7
[9]:receve msg:hello rabbitmq-8
[10]:receve msg:hello rabbitmq-9
[11]:receve msg:hello rabbitmq-10
[12]:receve msg:hello rabbitmq-11
[13]:receve msg:hello rabbitmq-12
[14]:receve msg:hello rabbitmq-13
[15]:receve msg:hello rabbitmq-14
[16]:receve msg:hello rabbitmq-15
[17]:receve msg:hello rabbitmq-16
[18]:receve msg:hello rabbitmq-17
[19]:receve msg:hello rabbitmq-18
[20]:receve msg:hello rabbitmq-19
[21]:receve msg:hello rabbitmq-20
[22]:receve msg:hello rabbitmq-21
[23]:receve msg:hello rabbitmq-22
[24]:receve msg:hello rabbitmq-23
[25]:receve msg:hello rabbitmq-24
[26]:receve msg:hello rabbitmq-25
[27]:receve msg:hello rabbitmq-26
[28]:receve msg:hello rabbitmq-27
[29]:receve msg:hello rabbitmq-28
[30]:receve msg:hello rabbitmq-29
[31]:receve msg:hello rabbitmq-30
MsgReceiver2處理過期消息19條
[1]:receve msg:hello rabbitmq-31
[2]:receve msg:hello rabbitmq-32
[3]:receve msg:hello rabbitmq-33
[4]:receve msg:hello rabbitmq-34
[5]:receve msg:hello rabbitmq-35
[6]:receve msg:hello rabbitmq-36
[7]:receve msg:hello rabbitmq-37
[8]:receve msg:hello rabbitmq-38
[9]:receve msg:hello rabbitmq-39
[10]:receve msg:hello rabbitmq-40
[11]:receve msg:hello rabbitmq-41
[12]:receve msg:hello rabbitmq-42
[13]:receve msg:hello rabbitmq-43
[14]:receve msg:hello rabbitmq-44
[15]:receve msg:hello rabbitmq-45
[16]:receve msg:hello rabbitmq-46
[17]:receve msg:hello rabbitmq-47
[18]:receve msg:hello rabbitmq-48
[19]:receve msg:hello rabbitmq-49