天天看點

RabbitMQ:過期時間(TTL)

過期時間(TTL)

​TTL​

​​(​

​Time To Live​

​​),也就是過期時間。​

​RabbitMQ​

​可以對消息和隊列設定​

​TTL​

​。

設定消息的TTL

設定消息​

​TTL​

​的兩種方式:

  1. 通過隊列屬性設定,隊列中所有消息都有一樣的​

    ​TTL​

    ​。
  2. 對消息本身進行單獨設定​

    ​TTL​

    ​。

如果兩種方式一起使用,則消息的​

​TTL​

​​以兩者之間較小的那個數值為準。

消息在隊列中的生存時間一旦超過設定的​​

​TTL​

​​值時,就會變成死信(​

​Dead Message​

​),消費端将無法再收到該消息。如果隊列設定了死信隊列,那麼這條消息就會被轉發到死信隊列上,後面還可以正常消費。

第一種方式

在定義隊列時(調用​

​channel.queueDeclare()​

​​時),在​

​arguments​

​​中添加​

​x-message-ttl​

​參數,機關是毫秒,代碼如下:

// 建立Queue
        Map<String , Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl" , 10000);
        channel.queueDeclare(queueName , true , false , false , arguments);      

第二種方式

在發送消息時(調用​

​channel.basicPublish()​

​​時),在​

​properties​

​​中設定​

​expiration​

​屬性,機關也是毫秒,代碼如下:

// 4 發送消息
        for (int i = 0; i < 5; i++) {

            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .expiration("10000")
                    .build();

            String msg = "RabbitMQ: TTL message" + i;
            channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
        }      

這裡隻測試一種方式(第一種),另一種大家可以自己測試一下。

生産端

package com.kaven.rabbitmq.api.ttl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Producer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
    private static String routingKey = "test";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        // 4 發送消息
        for (int i = 0; i < 5; i++) {

            AMQP.BasicProperties properties = null;
//            properties = new AMQP.BasicProperties.Builder()
//                    .deliveryMode(2)
//                    .contentEncoding("UTF-8")
//                    .expiration("10000")
//                    .build();

            String msg = "RabbitMQ: TTL message" + i;
            channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
        }

        // 5 關閉連接配接
        channel.close();
        connection.close();
    }
}      

消費端

package com.kaven.rabbitmq.api.ttl;


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer {

    public Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("------------ consumer message -----------");
        System.out.println("body:" + new String(body));
    }
}      

這裡我們把消費端的消費消息給關閉了,友善測試消息的過期。

package com.kaven.rabbitmq.api.ttl;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Consumer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // 隊列名
    private static String queueName = "test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        // 4 建立Queue
        Map<String , Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl" , 10000);
        channel.queueDeclare(queueName , true , false , false , arguments);

        // 5 先不消費消息
//        channel.basicConsume(queueName , true, new MyConsumer(channel));
    }
}      

測試

啟動生産端和消費端,看看​

​RabbitMQ Management​

​​,我們消費端并沒有消費消息,因為我們關閉了,但消息過了​

​10​

​​秒種就過期了(​

​14:17:50-14:18:00​

​),這符合我們的預期。

RabbitMQ:過期時間(TTL)

消息未過期之前還在隊列裡面。

RabbitMQ:過期時間(TTL)