天天看點

RabbitMQ

目錄

  • 一、什麼是 RabbitMQ
  • 二、RabbitMQ 相關概念
    • 1、RabbitMQ 核心概念
    • 2、RabbitMQ 運作流程
    • 3、RabbitMQ 支援消息的模式
    • 4、RabbitMQ 使用場景
  • 三、以代碼入門
    • 1、簡單模式
    • 2、釋出訂閱模式
    • 3、路由模式
    • 4、主題Topic模式
    • 5、工作模式
      • 輪詢模式
      • 公平分發模式
  • SpringBoot 中使用 RabbitMQ
    • 1、釋出訂閱模式
    • 2、路由模式
    • 3、主題模式
    • 4、過期隊列 TTL
    • 5、死信隊列 DLX
    • 6、延時隊列
    • 7、消息确認機制的配置
    • 8、消息重發次數與手動應答

RabbitMQ是一個開源的遵循AMQP協定實作的基于Erlang語言編寫,支援多種用戶端(語言)。用于在分布式系統中存儲消息,轉發消息,具有高可用,高可擴性,易用性等特征的中間件。

RabbitMQ
  • Server:又稱Broker ,接受用戶端的連接配接,實作AMQP實體服務。 安裝rabbitmq-server
  • Connection:連接配接,應用程式與Broker的網絡連接配接 TCP/IP/ 三次握手和四次揮手
  • Channel:網絡信道,幾乎所有的操作都在Channel中進行,Channel是進行消息讀寫的通道,用戶端可以建立對各Channel,每個Channel代表一個會話任務。
  • Message :消息,服務與應用程式之間傳送的資料,由Properties和body組成,Properties可是對消息進行修飾,比如消息的優先級,延遲等進階特性,Body則就是消息體的内容。
  • Virtual Host 虛拟位址,用于進行邏輯隔離,最上層的消息路由,一個虛拟主機裡可以有若幹個Exhange和- Queueu,同一個虛拟主機裡面不能有相同名字的Exchange
  • Exchange:交換機,接受消息,根據路由鍵發送消息到綁定的隊列。(不具備消息存儲的能力)
  • Bindings:Exchange和Queue之間的虛拟連接配接,binding中可以儲存多個routing key.
  • Routing key:是一個路由規則,虛拟機可以用它來确定如何路由一個特定消息。
  • Queue:隊列:也成為Message Queue,消息隊列,儲存消息并将它們轉發給消費者。

RabbitMQ 的管理界面中就可以看到這些相關的概念:

RabbitMQ

RabbitMQ

參考官網:https://www.rabbitmq.com/getstarted.html

  • 簡單模式 Simple:不指定交換機,會使用預設交換機
    RabbitMQ
  • 工作模式 Work

    類型:無 ;特點:分發機制

  • 釋出訂閱模式

    類型:fanout;特點:Fanout—釋出與訂閱模式,是一種廣播機制,它是沒有路由key的模式。

  • 路由模式

    類型:direct;特點:有routing-key的比對模式

  • 主題Topic模式

    類型:topic;特點:模糊的routing-key的比對模式

  • 參數模式

    類型:headers;特點:參數比對模式

解耦:RabbitMQ可以實作不同應用之間的解耦,應用之間不直接進行通信,而是通過MQ來建立橋接。

削峰:以秒殺場景為例,會瞬時産生大量的請求,應用本身一時無法處理得完,是以可以先将請求都放入消息隊列,引用再去慢慢處理。

異步:将比較耗時而且不需要即時(同步)傳回結果的操作,作為消息放入消息隊列,以此減少請求響應時間,提高系統性能。

先建立一個maven工程,pom 引入RabbitMQ:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>
           

使用MQ一般主要就是兩個角色,生産者(producer)和消費者(consumer),RabbitMQ建立生産者和消費者步驟類似,主要分為一下幾步:

  • 1、建立連接配接工廠
  • 2、建立連接配接Connection
  • 3、通過連結擷取通道channel
  • 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
  • 5、主備消息内容
  • 6、發送消息給隊列queue
  • 7、關閉連接配接
  • 8、關閉通道

下面講解下各個模式下得代碼實作。

不聲明交換機(exchange),使用預設交換機

RabbitMQ

生産者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public class Producer {
    public static void main(String[] args) {
        // 1、建立連接配接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.136");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、建立連接配接 Connection
            connection = factory.newConnection("生産者");
            // 3、通過連結擷取通道 Channel
            channel = connection.createChannel();
            // 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
            // 這裡我們聲明一個隊列
            String queueName = "queue-jinsh";
            /*
              @params1:隊列名稱,
              @params2:是否持久化,就是消息是否存盤。其實非持久化也會存盤,但會伴随服務重新開機丢失
              @params3:排他性,是否是一個獨占隊列
              @params4:是否自動删除,随着最後一個消費者消費完畢消息後是否把隊列删除
              @params5:攜帶一些附加參數
             */
            channel.queueDeclare(queueName, false, false, false, null);
            // 5、準備消息内容
            String message = "信其雌蛙一次莫黑多刺";
            // 6、發送消息給隊列queue
            /*
              @params1:交換機,這裡沒有指定,會使用預設的交換機
              @params2:隊列、路由key
              @params3:消息的狀态控制
              @params4:消息主題
             */
            channel.basicPublish("", queueName, null, message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 7、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 8、關閉連接配接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
           

消費者

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {

    /**
     * 1、建立連接配接工廠
     * 2、建立連接配接Connection
     * 3、通過連結擷取通道channel
     * 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
     * 5、主備消息内容
     * 6、發送消息給隊列queue
     * 7、關閉連接配接
     * 8、關閉通道
     */
    public static void main(String[] args) {
        // 1、建立連接配接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.136");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、建立連接配接 Connection
            connection = factory.newConnection("消費者");
            // 3、通過連結擷取通道 Channel
            channel = connection.createChannel();
            // 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
            // 消費消息
            String queueName = "queue1";
            channel.basicConsume(queueName, true, new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到消息是:" + new String(message.getBody(), "utf-8"));
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                    System.out.println("接收失敗");
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 7、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 8、關閉連接配接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
           

釋出訂閱模式在聲明交換機(exchange)時需要指定交換機類型(type)為fanout,當生産者釋出消息,綁定(binding)得所有消費者都将收到消息。

RabbitMQ
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.136");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection("生産者");
            channel = connection.createChannel();
            channel.queueDeclare("queue4", false, false, false, null);
            channel.queueDeclare("queue5", false, false, false, null);
            String message = "信其雌蛙一次莫黑多刺";
            // 準備交換機
            String exchangeName = "fanout-exchange";
            // 定義路由key
            String routeKey = "";
            // 交換機類型
            String type = "fanout";
            channel.exchangeDeclare(exchangeName, type, false, false, false, null);
            channel.queueBind("queue4", exchangeName, "");
            channel.queueBind("queue5", exchangeName, "");
            channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
           
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer{

    private static Runnable runnable = () -> {
        // 1、建立連接配接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.136");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、建立連接配接 Connection
            connection = factory.newConnection("消費者");
            // 3、通過連結擷取通道 Channel
            channel = connection.createChannel();
            // 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
            // 消費消息
            String queueName = Thread.currentThread().getName();
            channel.basicConsume(queueName, true, new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println(queueName + ":收到消息是:" + new String(message.getBody(), "utf-8"));
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                    System.out.println("接收失敗");
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 7、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 8、關閉連接配接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable, "queue4").start();
        new Thread(runnable, "queue5").start();
    }
}
           

路由模式在聲明交換機(exchange)時需要指定交換機類型(type)為direct,每個消費者和生産者綁定時需要指定一個路由key(routing-key),當生産者釋出消息時也需要指定路由key,隻有與釋出時的路由key相同的綁定時的路由key對應的消費者才能消費到消息。

RabbitMQ
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.136");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection("生産者");
            channel = connection.createChannel();
            // 申明queue
            channel.queueDeclare("queue2", false, false, false, null);
            channel.queueDeclare("queue3", false, false, false, null);
            String message = "信其雌蛙一次莫黑多刺";
            // 準備交換機
            String exchangeName = "direct-exchange";
            // 定義路由key
            String routeKey = "email";
            // 交換機類型
            String type = "direct";
            // 申明exchange
            channel.exchangeDeclare(exchangeName, type, false, false, false, null);

            channel.queueBind("queue2", exchangeName, "email");
            channel.queueBind("queue3", exchangeName, "sms");
            channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
           
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer{

    private static Runnable runnable = () -> {
        // 1、建立連接配接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.136");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、建立連接配接 Connection
            connection = factory.newConnection("消費者");
            // 3、通過連結擷取通道 Channel
            channel = connection.createChannel();
            // 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
            // 消費消息
            String queueName = Thread.currentThread().getName();
            channel.basicConsume(queueName, true, new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println(queueName + ":收到消息是:" + new String(message.getBody(), "utf-8"));
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                    System.out.println("接收失敗");
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 7、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 8、關閉連接配接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
    }
}
           

路由模式在聲明交換機(exchange)時需要指定交換機類型(type)為topic,與路由模式相同,也需要routing-key的比對,隻不過主題模式的routing-key是模糊比對,比對規則如下:

  • *

    :必須比對一個單詞
  • #

    :比對0個或1個或多個單詞

例如生産者和消費者綁定時的routing-key="#.aaa.*" ,那麼生産者發送消息時的routing-key的值為"dd.aaa.b"可以比對上,或"dd.cc.aaa.b"也可以比對上,但"dd.aaa.b.cc"是比對不上的。

RabbitMQ
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.136");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection("生産者");
            channel = connection.createChannel();
            channel.queueDeclare("queue6", false, false, false, null);
            channel.queueDeclare("queue7", false, false, false, null);
            channel.queueDeclare("queue8", false, false, false, null);
            String message = "信其雌蛙一次莫黑多刺";
            // 準備交換機
            String exchangeName = "topic-exchange";
            // 定義路由key
            String routeKey = "com.jinsh.user";
            // 交換機類型
            String type = "topic";
            channel.exchangeDeclare(exchangeName, type, false, false, false, null);

            channel.queueBind("queue6", exchangeName, "#.jinsh.#");
            channel.queueBind("queue7", exchangeName, "com.*");
            channel.queueBind("queue8", exchangeName, "com.#");
            channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
           
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer{

    private static Runnable runnable = () -> {
        // 1、建立連接配接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.136");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、建立連接配接 Connection
            connection = factory.newConnection("消費者");
            // 3、通過連結擷取通道 Channel
            channel = connection.createChannel();
            // 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
            // 消費消息
            String queueName = Thread.currentThread().getName();
            channel.basicConsume(queueName, true, new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println(queueName + ":收到消息是:" + new String(message.getBody(), "utf-8"));
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                    System.out.println("接收失敗");
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 7、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 8、關閉連接配接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable, "queue6").start();
        new Thread(runnable, "queue7").start();
        new Thread(runnable, "queue8").start();
    }
}
           

當有多個消費者時,我們的消息會被哪個消費者消費呢,我們又該如何均衡消費者消費資訊的多少呢?

主要有兩種模式:

1、輪詢模式的分發:一個消費者一條,按均配置設定;

2、公平分發:根據消費者的消費能力進行公平分發,處理快的處理的多,處理慢的處理的少;按勞配置設定;

RabbitMQ

public class Producer {
    public static void main(String[] args) {
        // 1: 建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2: 設定連接配接屬性
        factory.setHost("192.168.10.136");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 從連接配接工廠中擷取連接配接
            connection = factory.newConnection("生産者");
            // 4: 從連接配接中擷取通道channel
            channel = connection.createChannel();
            // 6: 準備發送消息的内容
            for (int i = 1; i <= 20; i++) {
                //消息的内容
                String msg = "學相伴:" + i;
                // 7: 發送消息給中間件rabbitmq-server
                // @params1: 交換機exchange
                // @params2: 隊列名稱/routingkey
                // @params3: 屬性配置
                // @params4: 發送消息的内容
                channel.basicPublish("", "queue1", null, msg.getBytes());
            }
            System.out.println("消息發送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("發送消息出現異常...");
        } finally {
            // 7: 釋放連接配接關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
           

消費者1

public class Work1 {
    public static void main(String[] args) {
        // 1: 建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2: 設定連接配接屬性
        factory.setHost("192.168.10.136");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 從連接配接工廠中擷取連接配接
            connection = factory.newConnection("消費者-Work1");
            // 4: 從連接配接中擷取通道channel
            channel = connection.createChannel();
            // 5: 申明隊列queue存儲消息
            // 這裡如果queue已經被建立過一次了,可以不需要定義
//            channel.queueDeclare("queue1", false, false, false, null);
            // 同一時刻,伺服器隻會推送一條消息給消費者
            // 6: 定義接受消息的回調
            Channel finalChannel = channel;
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try{
                        System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(2000);
                    }catch(Exception ex){
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println("Work1-開始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("發送消息出現異常...");
        } finally {
            // 7: 釋放連接配接關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
           

消費者2

public class Work2 {
    public static void main(String[] args) {
        // 1: 建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2: 設定連接配接屬性
        factory.setHost("192.168.10.136");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 從連接配接工廠中擷取連接配接
            connection = factory.newConnection("消費者-Work2");
            // 4: 從連接配接中擷取通道channel
            channel = connection.createChannel();
            // 5: 申明隊列queue存儲消息
            // 這裡如果queue已經被建立過一次了,可以不需要定義
            //channel.queueDeclare("queue1", false, true, false, null);
            // 同一時刻,伺服器隻會推送一條消息給消費者
            //channel.basicQos(1);
            // 6: 定義接受消息的回調
            Channel finalChannel = channel;
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try{
                        System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(200);
                    }catch(Exception ex){
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println("Work2-開始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("發送消息出現異常...");
        } finally {
            // 7: 釋放連接配接關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
           

公平分發模式生産者和消費者代碼與輪詢模式一樣,隻是消費者的代碼略有修改:

  • 新增

    finalChannel.basicQos(1);

    ,表示每次消息的消費數量,即一次消費1個消息
  • basicConsume方法autoAck參數改為false,不自定應答
RabbitMQ

首先pom檔案引入

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
           

其次配置檔案配置連接配接

spring:
  rabbitmq:
    host: 192.168.10.136
    port: 5672
    username: guest
    password: guest
    virtual-host: /
           

聲明交換機、聲明隊列、交換機綁定隊列

這個類是即可以放在生産者項目這邊,也可以放在消費者項目那邊,一般選擇放消費者項目那邊寫。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class rabbitMqConfiguration {

    // 1、聲明交換機類型
    @Bean
    public Exchange exchange() {
        return new Exchange("order_exchange", true, false);
    }

    // 2、聲明隊列
    @Bean
    public Queue emailQueue() {
        return new Queue("email.queue", true);
    }

    // 3、綁定交換機和隊列
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(emailQueue()).to(exchange());
    }
}
           
@Service
public class ProduceService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

	public void sendMessage() {
		// 參數1:交換機,參數2:路由key/queue隊列名,參數3:消息内容
        rabbitTemplate.convertAndSend("order_exchange", "", orderId, "hahahahahahahaha");
	}
}
           
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@RabbitListener(queues = {"email.queue"})
@Component
public class SmsDirectConsumer {

    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("收到消息:" + message);
    }

}
           

對于上面“聲明交換機、聲明隊列、交換機綁定隊列”這一步,也可以通過注解的方式聲明,寫在消費類裡:

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "email.topic.queue", durable = "true", autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
        key = "*.email.#"
))
@Component
public class EmailTopicConsumer {

    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("email topic 收到訂單消息:" + message);
    }
}
           

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutRabbitMqConfiguration {

    // 1、聲明交換機類型
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }

    // 2、聲明隊列
    @Bean
    public Queue fanoutEmailQueue() {
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue fanoutSmsQueue() {
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue fanoutWxQueue() {
        return new Queue("wx.fanout.queue", true);
    }

    // 3、綁定交換機和隊列
    @Bean
    public Binding fanoutEmailBinding() {
        return BindingBuilder.bind(fanoutEmailQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutSmsBinding() {
        return BindingBuilder.bind(fanoutSmsQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutWxBinding() {
        return BindingBuilder.bind(fanoutWxQueue()).to(fanoutExchange());
    }
}
           

消費者這裡隻展示了1個其餘兩個類似

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@RabbitListener(queues = {"email.fanout.queue"})
@Component
public class EmailFanoutConsumer {

    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("email fanout 收到訂單消息:" + message);
    }

}
           
public void makeOrderFanout() {
        String orderId = UUID.randomUUID().toString();
        System.out.println("訂單生産成功:" + orderId);
        // 3、通過rabbitmq完成消息分發
        String exchange = "fanout_order_exchange";
        // 參數1:交換機,參數2:路由key/queue隊列名,參數3:消息内容
        rabbitTemplate.convertAndSend(exchange, "", orderId, postProcessor);
    }
           

@Configuration
public class DirectRabbitMqConfiguration {

    // 1、聲明交換機類型
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct_order_exchange", true, false);
    }

    // 2、聲明隊列
    @Bean
    public Queue directEmailQueue() {
        return new Queue("email.direct.queue", true, );
    }
    @Bean
    public Queue directSmsQueue() {
        return new Queue("sms.direct.queue", true);
    }
    @Bean
    public Queue directWxQueue() {
        return new Queue("wx.direct.queue", true);
    }

    // 3、綁定交換機和隊列
    @Bean
    public Binding directEmailBinding() {
        return BindingBuilder.bind(directEmailQueue()).to(directExchange()).with("email");
    }
    @Bean
    public Binding directSmsBinding() {
        return BindingBuilder.bind(directSmsQueue()).to(directExchange()).with("sms");
    }
    @Bean
    public Binding directWxBinding() {
        return BindingBuilder.bind(directWxQueue()).to(directExchange()).with("wx");
    }
}
           

消費者與上面類似這裡不展示了

public void makeOrderDirect() {
        String orderId = UUID.randomUUID().toString();
        System.out.println("訂單生産成功:" + orderId);
        // 3、通過rabbitmq完成消息分發
        String exchange = "direct_order_exchange";
        // 參數1:交換機,參數2:路由key/queue隊列名,參數3:消息内容
        rabbitTemplate.convertAndSend(exchange, "email", orderId);
        rabbitTemplate.convertAndSend(exchange, "sms", orderId);
    }
           

@Configuration
public class TopicRabbitMqConfiguration {

    // 1、聲明交換機類型
    @Bean
    public DirectExchange topicExchange() {
        return new DirectExchange("topic_order_exchange", true, false);
    }

    // 2、聲明隊列
    @Bean
    public Queue topicEmailQueue() {
        return new Queue("email.topic.queue", true, );
    }
    @Bean
    public Queue directSmsQueue() {
        return new Queue("sms.topic.queue", true);
    }
    @Bean
    public Queue directWxQueue() {
        return new Queue("wx.topic.queue", true);
    }

    // 3、綁定交換機和隊列
    @Bean
    public Binding topicEmailBinding() {
        return BindingBuilder.bind(topicEmailQueue()).to(topicExchange()).with("#.email.#");
    }
    @Bean
    public Binding topicSmsBinding() {
        return BindingBuilder.bind(topicSmsQueue()).to(topicExchange()).with("#.sms.*");
    }
    @Bean
    public Binding topicWxBinding() {
        return BindingBuilder.bind(topicWxQueue()).to(topicExchange()).with("*.wx.*");
    }
}
           
public void makeOrderTopic() {
        String orderId = UUID.randomUUID().toString();
        System.out.println("訂單生産成功:" + orderId);
        // 3、通過rabbitmq完成消息分發
        String exchange = "topic_order_exchange";
        // 參數1:交換機,參數2:路由key/queue隊列名,參數3:消息内容
        rabbitTemplate.convertAndSend(exchange, "com.sms.aa", orderId);
        rabbitTemplate.convertAndSend(exchange, "aa.email.cc.vv", orderId);
    }
           

過期隊列要在隊列聲明時設定

@Bean
    public Queue ttlQueue() {
        Map<String, Object> arg = new HashMap<String, Object>();
        // 設定過期時間5秒
        arg.put("x-message-ttl", 5000);
        // 設定隊列最多容納幾個消息
        // arg.put("x-max-length", 5);
        return new Queue("ttl.direct.queue", true, false, false, arg);
    }
           

即消息釋出到隊列中5秒内還沒有被消費,則将廢棄。

以上時對整個隊列中的消息過期設定,還有一種是對單個消息的過期時間設定,在生産者端消息發送時配置:

public void makeOrderFanout() {
        String orderId = UUID.randomUUID().toString();
        System.out.println("訂單生産成功:" + orderId);
        String exchange = "fanout_order_exchange";
        // 給消息設定過期時間
        MessagePostProcessor postProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
				// 設定5秒過期
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        // 參數1:交換機,參數2:路由key/queue隊列名,參數3:消息内容
        rabbitTemplate.convertAndSend(exchange, "", orderId, postProcessor);
    }
           

當兩種方式同時存在時,過期時間短的生效。

所謂死信隊列就是一個接盤用的隊列,聲明方式與普通隊列一樣

@Configuration
public class DeadQueueConfiguration {

    // 1、聲明交換機類型
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange("dead_order_exchange", true, false);
    }

    // 2、聲明隊列
    @Bean
    public Queue deadQueue() {
        return new Queue("dead.direct.queue", true);
    }

    // 3、綁定交換機和隊列
    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
    }
}
           

死信隊列的作用就是用于存放失效的消息,以便再次業務處理。例如過期未消費的消息,過期後會被從原隊列移動到死信隊列裡,需要在原隊列裡配置死信隊列:

@Bean
    public Queue ttlQueue() {
        Map<String, Object> arg = new HashMap<String, Object>();
        // 設定過期時間5秒
        arg.put("x-message-ttl", 5000);
        // 過期則或超過消息最大個數則進入死信隊列
        arg.put("x-dead-letter-exchange", "dead_order_exchange");
        arg.put("x-dead-letter-routing-key", "dead");
        return new Queue("ttl.direct.queue", true, false, false, arg);
    }
           

延時隊列其實就是過期隊列與死信隊列的配合使用,達到消息延時處理的目的。例如,消息延時一分鐘後處理,那麼消息先釋出到逾時隊列中,逾時時間設定為60000毫秒,時間到了,消息就會進入死信隊列,此時我們再在死信隊列裡處理這條消息就行了。

消息确認就是當生産者釋出消息成功後,可以收到确認回調。

配置檔案中開啟publisher-confirm-type:

spring:
  rabbitmq:
    host: 192.168.10.136
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
           

publisher-confirm-type:

  • NONE值是禁用釋出确認模式,是預設值
  • CORRELATED值是釋出消息成功到交換器後會觸發回調方法,如1示例
  • SIMPLE值經測試有兩種效果,其一效果和CORRELATED值一樣會觸發回調方法,其二在釋出消息成功後使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點傳回發送結果,根據傳回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果傳回false則會關閉channel,則接下來無法發送消息到broker;

确認回調類

package com.xuexiangban.rabbitmq.springbootorderrabbitmqproducer.callback;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            System.out.println("消息确認成功!!!!");
        }else{
            System.out.println("消息确認失敗!!!!");
        }
    }
}
           

生産者釋出消息時設定

public void makeOrderTopic(){
        String orderId = UUID.randomUUID().toString();
        System.out.println("儲存訂單成功:id是:" + orderId);
        // 設定消息确認機制
        rabbitTemplate.setConfirmCallback(new MessageConfirmCallback());
        rabbitTemplate.convertAndSend("topic_order_ex","com.email.sms.xxx",orderId);
    }
           

消息消費過程中出現異常預設會再次發送消息,繼續消費,然後繼續異常,以此類推發生死循環。

解決這個問題主要有兩種方案:

  • 配置消息的重發次數
  • try-catch捕捉異常,然後手動應答
spring:
  rabbitmq:
    host: 192.168.10.136
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        retry:
          enabled: true # 開啟重試
          max-attempts: 3 # 最大重試次數
          initial-interval: 2000ms # 重試間隔時間
       #  acknowledge-mode: manual # 手動應答
           
@RabbitListener(queues = {"email.direct.queue"})
@Component
public class EmailDirectConsumer {

    @RabbitHandler
    public void receiveMessage(String message, Channel channel, CorrelationData correlationData,
                            @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            System.out.println("email direct 收到訂單消息:" + message);

            // 消息消費過程中出現異常,會發生死循環
            int a = 1/0;

            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 如果出現異常情況,根據實際情況去進行重發
            // 參數1:消息的tag,參數2:多條處理,參數3:requeue 重發
            // requeue = false,消息不會重發,會把消息打入死信隊列
            // requeue = true,會死循環重發,如果使用true的話建議使用“解決方案1:控制重發次數”,通過重發次數去限制循環,
            // 不要使用try-catch方案,try-catch方案配置acknowledge-mode: manual,手動形式會使重發次數的配置失效。
            channel.basicNack(tag, false, false);
        }
    }
}