过期时间(TTL)
TTL
(
Time To Live
),也就是过期时间。
RabbitMQ
可以对消息和队列设置
TTL
。
设置消息的TTL
设置消息
TTL
的两种方式:
- 通过队列属性设置,队列中所有消息都有一样的
。TTL
- 对消息本身进行单独设置
。TTL
如果两种方式一起使用,则消息的
TTL
以两者之间较小的那个数值为准。
消息在队列中的生存时间一旦超过设置的
TTL
值时,就会变成死信(
Dead Message
),消费端将无法再收到该消息。如果队列设置了死信队列,那么这条消息就会被转发到死信队列上,后面还可以正常消费。
第一种方式
在定义队列时(调用
channel.queueDeclare()
时),在
arguments
中添加
x-message-ttl
参数,单位是毫秒,代码如下:
// 创建Queue
Map<String , Object> arguments = new HashMap<>();
arguments.put("x-message-ttl" , 10000);
channel.queueDeclare(queueName , true , false , false , arguments);
第二种方式
在发送消息时(调用
channel.basicPublish()
时),在
properties
中设置
expiration
属性,单位也是毫秒,代码如下:
// 4 发送消息
for (int i = 0; i < 5; i++) {
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
String msg = "RabbitMQ: TTL message" + i;
channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
}
这里只测试一种方式(第一种),另一种大家可以自己测试一下。
生产端
package com.kaven.rabbitmq.api.ttl;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Producer {
// 自己服务器的IP
private static String ip = "IP";
// RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一个 "/" 的虚拟主机
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// default exchange 的路由规则: routingKey(test) 将匹配同名的 queue(test)
private static String routingKey = "test";
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 创建Connection
Connection connection = connectionFactory.newConnection();
// 3 创建Channel
Channel channel = connection.createChannel();
// 4 发送消息
for (int i = 0; i < 5; i++) {
AMQP.BasicProperties properties = null;
// properties = new AMQP.BasicProperties.Builder()
// .deliveryMode(2)
// .contentEncoding("UTF-8")
// .expiration("10000")
// .build();
String msg = "RabbitMQ: TTL message" + i;
channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
}
// 5 关闭连接
channel.close();
connection.close();
}
}
消费端
package com.kaven.rabbitmq.api.ttl;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class MyConsumer extends DefaultConsumer {
public Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("------------ consumer message -----------");
System.out.println("body:" + new String(body));
}
}
这里我们把消费端的消费消息给关闭了,方便测试消息的过期。
package com.kaven.rabbitmq.api.ttl;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer {
// 自己服务器的IP
private static String ip = "IP";
// RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一个 "/" 的虚拟主机
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// 队列名
private static String queueName = "test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 创建Connection
Connection connection = connectionFactory.newConnection();
// 3 创建Channel
Channel channel = connection.createChannel();
// 4 创建Queue
Map<String , Object> arguments = new HashMap<>();
arguments.put("x-message-ttl" , 10000);
channel.queueDeclare(queueName , true , false , false , arguments);
// 5 先不消费消息
// channel.basicConsume(queueName , true, new MyConsumer(channel));
}
}
测试
启动生产端和消费端,看看
RabbitMQ Management
,我们消费端并没有消费消息,因为我们关闭了,但消息过了
10
秒种就过期了(
14:17:50-14:18:00
),这符合我们的预期。
消息未过期之前还在队列里面。