過期時間(TTL)
TTL
(
Time To Live
),也就是過期時間。
RabbitMQ
可以對消息和隊列設定
TTL
。
設定消息的TTL
設定消息
TTL
的兩種方式:
- 通過隊列屬性設定,隊列中所有消息都有一樣的
。TTL
- 對消息本身進行單獨設定
。TTL
如果兩種方式一起使用,則消息的
TTL
以兩者之間較小的那個數值為準。
消息在隊列中的生存時間一旦超過設定的
TTL
值時,就會變成死信(
Dead Message
),消費端将無法再收到該消息。如果隊列設定了死信隊列,那麼這條消息就會被轉發到死信隊列上,後面還可以正常消費。
第一種方式
在定義隊列時(調用
channel.queueDeclare()
時),在
arguments
中添加
x-message-ttl
參數,機關是毫秒,代碼如下:
// 建立Queue
Map<String , Object> arguments = new HashMap<>();
arguments.put("x-message-ttl" , 10000);
channel.queueDeclare(queueName , true , false , false , arguments);
第二種方式
在發送消息時(調用
channel.basicPublish()
時),在
properties
中設定
expiration
屬性,機關也是毫秒,代碼如下:
// 4 發送消息
for (int i = 0; i < 5; i++) {
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
String msg = "RabbitMQ: TTL message" + i;
channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
}
這裡隻測試一種方式(第一種),另一種大家可以自己測試一下。
生産端
package com.kaven.rabbitmq.api.ttl;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Producer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
private static String routingKey = "test";
public static void main(String[] args) throws IOException, TimeoutException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 發送消息
for (int i = 0; i < 5; i++) {
AMQP.BasicProperties properties = null;
// properties = new AMQP.BasicProperties.Builder()
// .deliveryMode(2)
// .contentEncoding("UTF-8")
// .expiration("10000")
// .build();
String msg = "RabbitMQ: TTL message" + i;
channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
}
// 5 關閉連接配接
channel.close();
connection.close();
}
}
消費端
package com.kaven.rabbitmq.api.ttl;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class MyConsumer extends DefaultConsumer {
public Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("------------ consumer message -----------");
System.out.println("body:" + new String(body));
}
}
這裡我們把消費端的消費消息給關閉了,友善測試消息的過期。
package com.kaven.rabbitmq.api.ttl;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// 隊列名
private static String queueName = "test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 建立Queue
Map<String , Object> arguments = new HashMap<>();
arguments.put("x-message-ttl" , 10000);
channel.queueDeclare(queueName , true , false , false , arguments);
// 5 先不消費消息
// channel.basicConsume(queueName , true, new MyConsumer(channel));
}
}
測試
啟動生産端和消費端,看看
RabbitMQ Management
,我們消費端并沒有消費消息,因為我們關閉了,但消息過了
10
秒種就過期了(
14:17:50-14:18:00
),這符合我們的預期。
消息未過期之前還在隊列裡面。