天天看點

RabbitMQ 3.9( 基礎 )

1、認識MQ

1.1、什麼是MQ?

  • MQ全稱:message queue 即 消息隊列
  • 隊列裡面存的就是message

1.2、為什麼要用MQ?

1.2.1、流量削峰

RabbitMQ 3.9( 基礎 )
  • 這種情況,要是通路 1020次 / s呢?這種肯定會讓支付系統當機了,因為太大了嘛,受不了,是以:流量削峰
RabbitMQ 3.9( 基礎 )
  • 這樣就讓message排着隊了,這樣支付系統就可以承受得了了

1.2.2、應用解耦

RabbitMQ 3.9( 基礎 )
  • 上面這種,隻要支付系統或庫存系統其中一個挂彩了,那麼訂單系統也要挂彩,是以:解耦呗
RabbitMQ 3.9( 基礎 )
  • 而采用了MQ之後,支付系統和庫存系統有一個出問題,那麼它的處理記憶體是在MQ中的,此時訂單系統就不會有影響,可以正常完成,等到故障恢複了,訂單系統再處理對應的事情,這就提高了系統的可用性

1.2.3、異步處理

RabbitMQ 3.9( 基礎 )
  • 如上圖,訂單系統要調用支付系統的API,而訂單系統并不知道支付系統處理對應的業務需要多久,要解決可以采用訂單系統隔一定時間又去通路支付系統,看處理完沒有,而使用MQ更容易解決。
RabbitMQ 3.9( 基礎 )

1.3、RabbitMQ的原理?

RabbitMQ 3.9( 基礎 )
  • 圖中的東西後續會慢慢見到
  • Broker實體:接收和分發消息的應用 / RabbitMQ Server / Message Broker
  • 而上圖中RabbitMQ的四個核心就是:Producer生産者、exchange交換機、queue隊列、Consumer消費者
    • Producer生産者:就是負責推送消息的程式
    • Exchange交換機:接收來自生産者的消息,并且把消息放到隊列中
    • queue隊列:就是一個資料結構,本質就是一個很大的消息緩沖區
    • Consumer消費者:就是接收消息的程式
    • 注意:生産者、消息中間件MQ、消費者大多時候并不是在同一台機器上的,是以:生産者有時可以是消費者;而消費者有時也可以是生産者
  • Connection連結:就是讓Producer生産者、Broker實體、Consumer消費者之間建立TCP連結
  • Virtual host虛拟機:處于多租戶和安全因素考慮而設計的,當多個不同的使用者使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個 vhost,每個使用者在自己的 vhost 建立 exchange/queue 等
  • Channel信道:就是發消息的通道,它是在Connection内部建立的邏輯連接配接
  • Routes路由政策 / binding綁定:交換機以什麼樣的政策将消息釋出到Queue。也就是exchange交換機 和 queue隊列之間的聯系,即 二者之間的虛拟連接配接,它裡面可以包含routing key 路由鍵

1.4、RabbitMQ的通訊方式

  • 這個玩意兒在官網中有圖,位址:https://www.rabbitmq.com/getstarted.html 學完之後這張圖最好放在自己腦海裡
  • 另外:下面放的是七種,實質上第六種RPC需要在特定的場景和技術下才會使用,是以目前就不示範這個模式了
RabbitMQ 3.9( 基礎 )
  • 1、hello word - 簡單模式
  • 2、work queues - 工作隊列模式
  • 3、publish / subscribe - 釋出訂閱模式
  • 4、Routing - 路由模式
  • 5、Topics - 主題模式
  • 6、RPC模式 - 目前不用了解也行
  • 7、publisher confirms - 釋出确認模式

2、安裝RabbitMQ

  • 以下的方式自行選擇一種即可

2.1、在Centos 7下安裝

  • 檢視自己的Linux版本
uname -a

           
RabbitMQ 3.9( 基礎 )

2.1.1、使用rpm紅帽軟體

準備工作
  • 1、下載下傳Erlang,因為:RabbitMQ是Erlang語言寫的,Erlang下載下傳位址【 ps:這是官網 】:https://www.erlang.org/downloads,選擇自己要的版本即可
    • 另外:RabbitMQ和Erlang的版本對應關系連結位址 https://www.rabbitmq.com/which-erlang.html
RabbitMQ 3.9( 基礎 )
  • 當然:上面這種是下載下傳gz壓縮包,配置挺煩的,可以直接去github中下載下傳rpm檔案,位址:https://github.com/rabbitmq/erlang-rpm/releases , 選擇自己需要的版本即可,注意一個問題:要看是基于什麼Linux的版本
RabbitMQ 3.9( 基礎 )
  • 要是github下載下傳慢的話,都有自己的文明上網加速方式,要是沒有的話,可以進入 https://github.com/fhefh2015/Fast-GitHub 下載下傳好了然後內建到自己浏覽器的擴充程式中即可,而如果進入github很慢的話,可以選擇去gitee中搜尋一個叫做:

    dev-sidecar

    的東西安裝,這樣以後進入github就很快了,還有另外的很多方式,不介紹了。
  • 2、執行

    rpm -ivh erlang檔案

    指令
    • i 就是 install的意思
    • vh 就是顯示安裝進度條
    • 注意:需要保證自己的Linux中有rpm指令,沒有的話,執行

      yum install rpm

      指令即可安裝rpm
RabbitMQ 3.9( 基礎 )
  • 3、安裝RabbitMQ需要的依賴環境
yum install socat -y

           
RabbitMQ 3.9( 基礎 )
  • 4、下載下傳RabbitMQ的rpm檔案,github位址:https://github.com/rabbitmq/rabbitmq-server/releases , 選擇自己要的版本即可
  • 5、安裝RabbitMQ
RabbitMQ 3.9( 基礎 )
  • 6、啟動RabbitMQ服務
    啟動服務
    	/sbin/service rabbitmq-server start
    
    	停止服務
    	/sbin/service rabbitmq-server stop
    
    	檢視啟動狀态
    	/sbin/service rabbitmq-server status
    
    	開啟開機自啟
    	chkconfig rabbitmq-server on
    
               
    RabbitMQ 3.9( 基礎 )
    • 檢視啟動狀态
    RabbitMQ 3.9( 基礎 )
    • 這表示正在啟動,需要等一會兒,看到下面的樣子就表示啟動成功
    RabbitMQ 3.9( 基礎 )
  • 7、安裝web管理插件
1、停止RabbitMQ服務
	service rabbitmq-server stop   // 使用上面的指令 /sbin/service rabbitmq-server stop也行

	2、安裝插件
	rabbitmq-plugins enable rabbitmq_management

	3、開啟RabbitMQ服務
	service rabbitmq-server start
           
RabbitMQ 3.9( 基礎 )
RabbitMQ 3.9( 基礎 )
  • 要是通路不了,看看自己的防火牆關沒關啊
# 檢視防火牆狀态
systemctl status firewalld

# 關閉防火牆
systemctl stop firewalld

# 一勞永逸 禁用防火牆
systemctl disable firewalld

# ============================================

# 當然:上面的方式不建議用,可以用如下的方式

# 6379端口号是否開放
firewall-cmd --query-port=6379/tcp

# 開放6379端口
firewall-cmd --permanent --add-port=6379/tcp

#重新開機防火牆(修改配置後要重新開機防火牆)
firewall-cmd --reload

           
  • 同時檢視自己的伺服器有沒有開放15672端口,不同的東西有不同的處理方式,如我的雲伺服器直接在伺服器網址中添加規則即可,其他的方式自行百度

2.1.2、使用Docker安裝

  • 需要保證自己的Linux中有Docker容器,教程連結:https://www.cnblogs.com/xiegongzi/p/15621992.html
  • 使用下面的兩種方式都不需要進行web管理插件的安裝和erlang的安裝
  • 1、檢視自己的docker容器中是否已有了rabbitmq這個名字的鏡像
docker images

           
RabbitMQ 3.9( 基礎 )
  • 删除鏡像
docker rmi 鏡像ID  // 如上例的 dockerrmi 16c 即可删除鏡像

           
RabbitMQ 3.9( 基礎 )
  • 2、拉取RabbitMQ鏡像 并 啟動Docker容器
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

           
RabbitMQ 3.9( 基礎 )
  • 3、檢視Docker容器是否啟動
docker ps

           
RabbitMQ 3.9( 基礎 )
  • 4、再次在浏覽器進行通路就可以吃雞了,不需要再安裝插件啊,剛剛上一步拉鏡像和啟動時已經安裝好了
RabbitMQ 3.9( 基礎 )

2.1.3、使用Docker-compose安裝

  • 采用了第二種方式的話,記得把已經啟動的Docker容器關了,以下是附加的一些Docker的基操
# 拉取鏡像
docker pull 鏡像名稱

# 檢視全部鏡像
docker images

# 删除鏡像
docker rmi 鏡像ID

# 将本地的鏡像導出
docker save -o 導出的路徑 鏡像id

# 加載本地的鏡像檔案
docker load -i 鏡像檔案

# 修改鏡像名稱
docker tag 鏡像id 新鏡像名稱:版本

# 簡單運作操作
docker run 鏡像ID | 鏡像名稱

# 跟參數的運作
docker run -d -p 主控端端口:容器端口 --name 容器名稱 鏡像ID | 鏡像名稱
# 如:docker run -d -p 8081:8080 --name tomcat b8
# -d:代表背景運作容器 
# -p 主控端端口:容器端口:為了映射目前Linux的端口和容器的端口 
# --name 容器名稱:指定容器的名稱

# 檢視運作的容器
docker ps [-qa]
# -a:檢視全部的容器,包括沒有運作
# -q:隻檢視容器的辨別

# 檢視日志
docker logs -f 容器id
# -f:可以滾動檢視日志的最後幾行

# 進入容器内部
docker exec -it 容器id bash
# 退出容器:exit

# 将主控端的檔案複制到容器内部的指定目錄
docker cp 檔案名稱 容器id:容器内部路徑
docker cp index.html 982:/usr/local/tomcat/webapps/ROOT


=====================================================================


# 重新啟動容器 
docker restart 容器id

# 啟動停止運作的容器
docker start 容器id

# 停止指定的容器(删除容器前,需要先停止容器)
docker stop 容器id

# 停止全部容器
docker stop $(docker ps -qa)

# 删除指定容器 
docker rm 容器id

# 删除全部容器
docker rm $(docker ps -qa)

           
RabbitMQ 3.9( 基礎 )
  • 1、建立一個檔案夾,這些我很早之前就玩過了,是以建好了的
# 建立檔案夾
	mkdir 檔案夾名

           
RabbitMQ 3.9( 基礎 )
  • 2、進入檔案夾,建立docker-compose.yml檔案,注意:檔案名必須是這個
# 建立檔案
	touch docker-compose.yml

           
RabbitMQ 3.9( 基礎 )
  • 3、編輯docker-compose.yml檔案
# 編輯檔案
	vim docker-compose.yml

           
  • 裡面編寫的内容如下,編寫好儲存即可。注意:别用tab縮進啊,會出問題的,另外:每句的後面别有空格,嚴格遵循yml格式的
version: "3.1"
services:
  rabbitmq:
# 鏡像
    image: rabbitmq:3.9-management
# 自啟
    restart: always
# Docker容器名
    container_name: rabbitmq
# 端口号,docker容器内部端口 映射 外部端口
    ports:
      - 5672:5672
      - 15672:15672
# 資料卷映射 把容器裡面的東西映射到容器外面去 容易操作,否則每次都要進入容器
    volumes:
      - ./data:/opt/install/rabbitMQ-docker/

           
  • 4、在docker-compose.yml所在路徑執行如下指令,注意:一定要在此檔案路徑中才行,因為預設是在目前檔案夾下找尋docker-compose檔案
# 啟動
docker-compose up -d
# -d 背景啟動

=========================================================

# 附加内容:docker-compose的一些指令操作

# 1. 基于docker-compose.yml啟動管理的容器
docker-compose up -d

# 2. 關閉并删除容器
docker-compose down

# 3. 開啟|關閉|重新開機已經存在的由docker-compose維護的容器
docker-compose start|stop|restart

# 4. 檢視由docker-compose管理的容器
docker-compose ps 

# 5. 檢視日志 
docker-compose logs -f

# 有興趣的也可以去了解docker-file自定義鏡像

           
RabbitMQ 3.9( 基礎 )
  • 去浏覽器通路一樣的吃雞
  • 上面就是RabbitMQ的基操做完了,不過預設賬号是guest遊客狀态,很多事情還做不了呢,是以還得做一些操作

2.1.4、解決不能登入web管理界面的問題

2.1.4.1、使用rpm紅帽軟體安裝的RabbitMQ
  • 這種方式直接使用guest進行登入是不得吃的
RabbitMQ 3.9( 基礎 )
  • 這是因為guest是遊客身份,不能進入,需要添加新使用者
檢視目前使用者 / 角色有哪些
	rabbitmqctl list_users

	删除使用者
	rabbitmqctl delete_user 使用者名

	添加使用者
	rabbitmqctl add_user 使用者名 密碼

	設定使用者角色
	rabbitmqctl set_user_tags 使用者名 administrator

	設定使用者權限【 ps:guest角色就是沒有這一步 】
	rabbitmqctl set_permissions -p "/" 使用者名 ".*" ".*" ".*"
# 設定使用者權限指令解釋
				set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

           
RabbitMQ 3.9( 基礎 )
  • 現在使用admin去浏覽器登入就可以了
RabbitMQ 3.9( 基礎 )
2.1.4.2、使用docker 或 docker-compose安裝的RabbitMQ
  • 這兩種方式直接使用guest就可以進行登入,然後在Web管理界面添加一個新使用者就可以了,記得權限選成管理者
RabbitMQ 3.9( 基礎 )
RabbitMQ 3.9( 基礎 )
RabbitMQ 3.9( 基礎 )

3、開始玩RabbitMQ

  • 建立Maven項目 并導入如下依賴
<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>

           
  • 回到前面的RabbitMQ原理圖
RabbitMQ 3.9( 基礎 )

3.1、Hello word 簡單模式

  • 對照原理圖來玩,官網中有Hello word的模式圖
RabbitMQ 3.9( 基礎 )
  • 即:一個生産者Producer、一個預設交換機Exchange、一個隊列queue、一個消費者Consumer
生産者
  • 就是下圖前面部分
RabbitMQ 3.9( 基礎 )
package cn.zixieqing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    private static final String HOST = "ip";		// 放RabbitMQ服務的伺服器ip
    private static final int PORT = 5672;		// 伺服器中RabbitMQ的端口号,在浏覽器用的15672是通過5672映射出來的15672
    private static final String USER_NAME = "admin";
    private static final String PASSWORD = "admin";
    private static final String QUEUE_NAME = "hello word";

    public static void main(String[] args) throws IOException, TimeoutException {

        // 1、擷取連結工廠
        ConnectionFactory factory = new ConnectionFactory();

        // 2、設定連結資訊
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        /*
            當然:這裡還可以設定vhost虛拟機 - 前提是自己在web管理界面中添加了vhost
            factory.setVirtualHost();
         */

        // 3、擷取連結Connection
        Connection connection = factory.newConnection();

        // 4、建立channel信道 - 它才是去和交換機 / 隊列打交道的
        Channel channel = connection.createChannel();

        // 5、準備一個隊列queue
        // 這裡理論上是去和exchange打交道,但是:這裡是hello word簡單模式,是以直接使用預設的exchange即可
        /*
            下面這是參數的完整意思,源碼中偷懶了,沒有見名知意
            queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
            參數1、隊列名字
            參數2、是否持久化( 儲存到磁盤 ),預設是在記憶體中的
            參數3、是否共享,即:是否隻供一個消費者消費,是否讓多個消費者共享這個隊列中的資訊
            參數4、是否自動删除,即:最後一個消費者擷取資訊之後,這個隊列是否自動删除
            參數5、其他配置項,這涉及到後面的知識,目前選擇null
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("正在發送資訊!!!");
        // 6、推送資訊到隊列中
        // 準備發送的資訊内容
        String message = "it is hello word";
        /*
            basicPublish( exchangeName,queueName,properties,message )
            參數1、互動機名字 - 目前使用了預設的
            參數2、指定路由規則 - 目前使用隊列名字
            參數3、指定傳遞的消息所攜帶的properties
            參數4、推送的具體消息 - byte類型的
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

        // 7、釋放資源 - 倒着關閉即可
        if ( null != channel ) channel.close();

        if ( null != connection ) connection.close();

        System.out.println("消息發送完畢");

    }
}


           
  • 運作之後,去浏覽器管理界面進行檢視
RabbitMQ 3.9( 基礎 )
消費者
RabbitMQ 3.9( 基礎 )
public class Consumer {

    private static final String HOST = "ip";   // 自己的伺服器ip
    private static final int PORT = 5672;
    private static final String USER_NAME = "admin";
    private static final String PASSWORD = "admin";
    private static final String QUEUE_NAME = "hello word";

    public static void main(String[] args) throws IOException, TimeoutException {

        // 1、建立連結工廠
        ConnectionFactory factory = new ConnectionFactory();

        // 2、設定連結資訊
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);

        // 3、建立連結對象
        Connection connection = factory.newConnection();

        // 4、建立信道channel
        Channel channel = connection.createChannel();

        // 5、從指定隊列中擷取消息
        /*
            basicConsume( queueName,isAutoAnswer,deliverCallback,cancelCallback )
            參數1、隊列名
            參數2、是否自動應答,為true時,消費者接收到消息後,會立即告訴RabbitMQ
            參數3、消費者如何消費消息的回調
            參數4、消費者取消消費的回調
         */
        System.out.println("開始接收消息!!!");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接收到了消息:" + new String(message.getBody(), StandardCharsets.UTF_8) );
        };

        CancelCallback cancelCallback = consumerTag -> System.out.println("消費者取消了消費資訊行為");

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

        // 6、釋放資源 - 但是這裡不能直接關閉啊,否則:看不到接收的結果的,可以選擇不關,也可以選擇加一句代碼System.in.read();

        // channel.close();
        // connection.close();

    }
}

           

3.2、work queue工作隊列模式

  • 流程圖就是官網中的
RabbitMQ 3.9( 基礎 )
  • 一個生産者批量生産消息
  • 一個預設交換機
  • 一個隊列
  • 多個消費者
  • 換言之:就是有大量的任務 / 密集型任務有待處理( 生産者生産的消息 ),此時我們就将這些任務推到隊列中去,然後使用多個工作線程( 消費者 )來進行處理,否則:一堆任務直接就跑來了,那消費者不得亂套了,是以:這種就需要讓這種模式具有如下的特點:
    • 1、消息是有序排好的( 也就是在隊列中 )
    • 2、工作線程 / 消費者不能同時接收同一個消息,換言之:生産者推送的任務必須是輪詢分發的,即:工作線程1接收第一個,工作線程2接收第二個;工作線程1再接收第三個,工作線程2接收第四個
抽取RabbitMQ連結的工具類
package cn.zixieqing.util;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQUtil {

    private static final String HOST = "自己的ip";
    private static final int PORT = 5672;
    private static final String USER_NAME = "admin";
    private static final String PASSWORD = "admin";

    public static Channel getChannel(String vHost ) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        if ( !vHost.isEmpty() ) factory.setVirtualHost(vHost);

        return factory.newConnection().createChannel();

    }
}

           
生産者
  • 和hello word沒什麼兩樣
package cn.zixieqing.workqueue;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class WorkProducer {

    private static final String QUEUE_NAME = "work queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 1、聲明隊列
        /*
            下面這是參數的完整意思,源碼中偷懶了,沒有見名知意
            queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
            參數1、隊列名字
            參數2、是否持久化( 儲存到磁盤 ),預設是在記憶體中的
            參數3、是否共享,即:是否隻供一個消費者消費,是否讓多個消費者共享這個隊列中的資訊
            參數4、是否自動删除,即:最後一個消費者擷取資訊之後,這個隊列是否自動删除
            參數5、其他配置項,這涉及到後面的知識,目前選擇null
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 2、準備消息
        System.out.println("請輸入要推送的資訊,按回車确認:");
        Scanner input = new Scanner(System.in);

        // 3、推送資訊到隊列中
        while (input.hasNext()) {
            /*
                basicPublish( exchangeName,routing key,properties,message )
                參數1、互動機名字 - 目前是使用了預設的
                參數2、指定路由規則 - 目前使用隊列名字
                參數3、指定傳遞的消息所攜帶的properties
                參數4、推送的具體消息 - byte類型的
            */
            String message = input.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息====>" + message + "====>推送完畢!");
        }
    }
}

           
消費者
  • 消費者01
package cn.zixieqing.workqueue;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class WorkConsumer {

    private static final String QUEUE_NAME = "work queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接收到了消息====>" + new String(message.getBody(), StandardCharsets.UTF_8));
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println( consumerTag + "消費者中斷了接收消息====>" );
        };

        System.out.println("消費者01正在接收消息......");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

    }
}

           
  • 消費者02
package cn.zixieqing.workqueue;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class WorkConsumer {

    private static final String QUEUE_NAME = "work queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接收到了消息====>" + new String(message.getBody(), StandardCharsets.UTF_8));
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println( consumerTag + "消費者中斷了接收消息====>" );
        };

        System.out.println("消費者02正在接收消息......");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

    }
}

           
RabbitMQ 3.9( 基礎 )
RabbitMQ 3.9( 基礎 )
RabbitMQ 3.9( 基礎 )

3.3、消息應答機制

  • 消費者在接收到消息并且處理該消息之後,告訴 rabbitmq 它已經處理了,rabbitmq 可以把該消息删除了
  • 目的就是為了保證資料的安全,如果沒有這個機制的話,那麼就會造成下面的情況
RabbitMQ 3.9( 基礎 )
  • 消費者接收隊列中的消息時,沒接收完,出現異常了,然後此時MQ以為消費者已經把消息接收并處理了( MQ并沒有接收到消息有沒有被消費者處理完畢 ),然後MQ就把隊列 / 消息給删了,後續消費者異常恢複之後再次接收消息,就會出現:接收不到了

3.3.1、消息應答機制的分類

  • 這個東西已經見過了
/*
            basicConsume( queueName,isAutoAnswer,deliverCallback,cancelCallback )
            參數1、隊列名
            參數2、是否自動應答,為true時,消費者接收到消息後,會立即告訴RabbitMQ
            參數3、消費者如何消費消息的回調
            參數4、消費者取消消費的回調
    */
	channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

           
3.3.1.1、自動應答
  • 指的是:消息發送後立即被認為已經傳送成功
  • 需要具備的條件:
    • 1、發送的消息很多,就是高吞吐量的那種
    • 2、發送的消息在傳輸方面是安全的
  • 優點:處理效率快,很高效
3.3.1.2、手動應答
  • 就是我們自己去設定,好處是可以批量應答并且減少網絡擁堵
  • 調用的API如下:
    • Channel.basicACK( long, boolean );		// 用于肯定确認,即:MQ已知道該消息 并且 該消息已經成功被處理了,是以MQ可以将其丢棄了
      
      	Channel.basicNack( long, boolena, boolean );	// 用于否定确認
      
      	Channel.basicReject( long, boolea );		// 用于否定确認
      		與Channel.basicNack( long, boolena, boolean )相比,少了一個參數,這個參數名字叫做:multiple
                 
  • multiple參數說明,它為true和false有着截然不同的意義【 ps:建議弄成false,雖然是挨個去處理,進而應答,效率慢,但是:資料安全,否則:很大可能造成資料丢失 】
    • true 代表批量應答MQ,channel 上未應答 / 消費者未被處理完畢的消息
    RabbitMQ 3.9( 基礎 )
    • false 隻會處理隊列放到channel信道中目前正在處理的消息告知MQ是否确認應答 / 消費者處理完畢了
    RabbitMQ 3.9( 基礎 )
3.3.1.3、消息重新入隊原理
  • 指的是:如果消費者由于某些原因失去連接配接(其通道已關閉,連接配接已關閉或 TCP 連接配接丢失),導緻消息未發送 ACK 确認,RabbitMQ 将了解到消息未完全處理,并将對其重新排隊。如果此時其他消費者可以處理,它将很快将其重新分發給另一個消費者。這樣,即使某個消費者偶爾死亡,也可以確定不會丢失任何消息
  • 如下圖:消息1原本是C1這個消費者來接收的,但是C1失去連結了,而C2消費者并沒有斷開連結,是以:最後MQ将消息重新入隊queue,然後讓C2來處理消息1
RabbitMQ 3.9( 基礎 )
3.3.1.4、手動應答的代碼示範
生産者
package cn.zixieqing.ACK;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class AckProducer {

    private static final String QUEUE_NAME = "ack queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 聲明隊列
        /*
            下面這是參數的完整意思,源碼中偷懶了,沒有見名知意
            queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
            參數1、隊列名字
            參數2、是否持久化( 儲存到磁盤 ),預設是在記憶體中的
            參數3、是否共享,即:是否隻供一個消費者消費,是否讓多個消費者共享這個隊列中的資訊
            參數4、是否自動删除,即:最後一個消費者擷取資訊之後,這個隊列是否自動删除
            參數5、其他配置項,這涉及到後面的知識,目前選擇null
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        System.out.println("請輸入要推送的消息:");
        Scanner input = new Scanner(System.in);
        while (input.hasNext()) {
            /*
                basicPublish( exchangeName,routing key,properties,message )
                參數1、互動機名字 - 使用了預設的
                參數2、指定路由規則,使用隊列名字
                參數3、指定傳遞的消息所攜帶的properties
                參數4、推送的具體消息 - byte類型的
         */
            String message = input.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息====>" + message + "推送完畢");
        }
    }
}

           
消費者01
package cn.zixieqing.ACK;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;


public class AckConsumer {

    private static final String QUEUE_NAME = "ack queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try {

                Thread.sleep(5*1000);

                System.out.println("接收到了消息=====>" + new String( message.getBody(), StandardCharsets.UTF_8 ));

                // 添加手動應答
                /*
                    basicAck( long, boolean )
                    參數1、消息的辨別tag,這個辨別就相當于是消息的ID
                    參數2、是否批量應答multiple
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        System.out.println("消費者01正在接收消息,需要5秒處理完");
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
            System.out.println("觸發消費者取消消費消息行為的回調");
            System.out.println(Arrays.toString(consumerTag.getBytes(StandardCharsets.UTF_8)));
        });
    }
}

           
消費者02
package cn.zixieqing.ACK;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;


public class AckConsumer {

    private static final String QUEUE_NAME = "ack queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try {

                Thread.sleep(10*1000);

                System.out.println("接收到了消息=====>" + new String( message.getBody(), StandardCharsets.UTF_8 ));

                // 添加手動應答
                /*
                    basicAck( long, boolean )
                    參數1、消息的辨別tag,這個辨別就相當于是消息的ID
                    參數2、是否批量應答multiple
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        System.out.println("消費者02正在接收消息,需要10秒處理完");
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
            System.out.println("觸發消費者取消消費消息行為的回調");
            System.out.println(Arrays.toString(consumerTag.getBytes(StandardCharsets.UTF_8)));
        });
    }
}

           
RabbitMQ 3.9( 基礎 )
RabbitMQ 3.9( 基礎 )
RabbitMQ 3.9( 基礎 )

3.4、RabbitMQ的持久化 durable

3.4.1、隊列持久化

  • 這個玩意兒的配置吧,早就見過了,在生産者消息發送時,有一個聲明隊列的過程,那裡面就有一個是否持久化的配置
/*
            下面這是參數的完整意思,源碼中偷懶了,沒有見名知意
            queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
            參數1、隊列名字
            參數2、是否持久化( 儲存到磁盤 ),預設是在記憶體中的
            參數3、是否共享,即:是否隻供一個消費者消費,是否讓多個消費者共享這個隊列中的資訊
            參數4、是否自動删除,即:最後一個消費者擷取資訊之後,這個隊列是否自動删除
            參數5、其他配置項,這涉及到後面的知識,目前選擇null
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

           
  • 而如果沒有持久化,那麼RabbitMQ服務由于其他什麼原因導緻挂彩的時候,那麼重新開機之後,這個沒有持久化的隊列就灰飛煙滅了【 ps:注意和裡面的消息還沒關系啊,不是說隊列持久化了,那麼消息就持久化了 】
  • 在這個隊列持久化配置中,它的預設值就是false,是以要改成true時,需要注意一個點:選擇隊列持久化,那麼必須保證目前這個隊列是新的,即:RabbitMQ中沒有目前隊列,否則:需要進到web管理界面把已有的同名隊列删了,然後重新配置目前隊列持久化選項為true,不然:報錯
RabbitMQ 3.9( 基礎 )
  • 那麼:當我把持久化選項改為true,并 重新發送消息時
RabbitMQ 3.9( 基礎 )
  • inequivalent arg 'durable' for queue 'queue durable' in vhost '/': received 'true' but current is 'false'

  • 告知你:vhost虛拟機中已經有了這個叫做durable的隊列,要接收的選項值是true,但是它目前的值是false,是以報錯了呗
  • 解決方式就是去web管理界面,把已有的durable隊列删了,重新執行
RabbitMQ 3.9( 基礎 )
  • 再次執行就可以吃雞了,同時去web管理界面會發現它狀态變了,多了一個D辨別
RabbitMQ 3.9( 基礎 )
  • 有了這個玩意兒之後,那麼就算RabbitMQ出問題了,後續恢複之後,那麼這個隊列也不會丢失

3.4.2、消息持久化

  • 注意:這裡說的消息持久化不是說配置之後消息就一定不會丢失,而是:把消息标記為持久化,然後RabbitMQ盡量讓其持久化到磁盤
  • 但是:也會有意外,比如:RabbitMQ在将消息持久化到磁盤時,這是有一個時間間隔的,資料還沒完全刷寫到磁盤呢,RabbitMQ萬一出問題了,那麼消息 / 資料還是會丢失的,是以:消息持久化配置是一個弱持久化,但是:對于簡單隊列模式完全足夠了,強持久化的實作方式在後續的publisher / confirm釋出确認模式中
  • 至于配置極其地簡單,在前面都已經見過這個配置項,就是生産者發消息時做文章,就是下面的第三個參數,把它改為

    MessageProperties.PERSISTENT_TEXT_PLAIN

    即可
/*
            basicPublish( exchangeName,routing key,properties,message )
            參數1、互動機名字 - 使用了預設的
            參數2、指定路由規則,使用隊列名字
            參數3、指定傳遞的消息所攜帶的properties
            參數4、推送的具體消息 - byte類型的
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

		// 改成消息持久化
        channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

           
  • MessageProperties類的源碼如下:
public class MessageProperties {

    public static final BasicProperties MINIMAL_BASIC = new BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

    public static final BasicProperties MINIMAL_PERSISTENT_BASIC = new BasicProperties((String)null, (String)null, (Map)null, 2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

    public static final BasicProperties BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

    public static final BasicProperties PERSISTENT_BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

    public static final BasicProperties TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

    public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

    public MessageProperties() {
    }
}

           
  • 上面用到了BasicProperties類型,它的屬性如下:
public static class BasicProperties extends AMQBasicProperties {
        // 消息内容的類型
        private String contentType;
        // 消息内容的編碼格式
        private String contentEncoding;
        // 消息的header
        private Map<String, Object> headers;
        // 消息是否持久化,1:否,2:是
        private Integer deliveryMode;
        // 消息的優先級
        private Integer priority;
        // 關聯ID
        private String correlationId;
        // :用于指定回複的隊列的名稱
        private String replyTo;
        // 消息的失效時間
        private String expiration;
        // 消息ID
        private String messageId;
        // 消息的發送時間
        private Date timestamp;
        // 類型
        private String type;
        // 使用者ID
        private String userId;
        // 應用程式ID
        private String appId;
        // 叢集ID
        private String clusterId;
   }
           

3.5、不公平分發 和 預取值

不公平分發
  • 這個東西是在消費者那一方進行設定的
  • RabbitMQ預設是公平分發,即:輪詢分發
  • 輪詢分發有缺點:如前面消費者01( 設5秒的那個 )和 消費者02 ( 設10秒的那個 ),這種情況如果采用輪詢分發,那麼:01要快一點,而02要慢一點,是以01很快處理完了,然後處于空閑狀态,而02還在拼命奮鬥中,最後的結果就是02不停幹,而01悠悠閑閑的,浪費了時間,是以:應該壓榨一下01,讓它不能停
  • 設定方式:在消費者接收消息之前進行

    channel.basicQos( int prefetchCount )設定

// 不公平分發,就是在這裡接收消息之前做處理
        /* 
            basicQos( int prefetchCount )
            為0、輪詢分發 也是RabbitMQ的預設值
            為1、不公平分發
         */
        channel.basicQos(1);

        channel.basicConsume("qos queue", true, deliverCallback, consumerTag -> {
            System.out.println("消費者中斷了接收消息行為觸發的回調");
        });

           
預取值
  • 指的是:多個消費者在消費消息時,讓每一個消費者預計消費多少條消息
RabbitMQ 3.9( 基礎 )
  • 而要設定這種效果,和前面不公平分發的設定是一樣的,隻是把裡面的參數改一下即可
// 預取值,也是在這裡接收消息之前做處理,和不公平分發調的是同一個API
        /* 
            basicQos( int prefetchCount )	為0、輪詢分發 也是RabbitMQ的預設值;為1、不公平分發
            而當這裡的數字變成其他的,如:上圖中上面的那個消費者要消費20條消息,那麼把下面的數字改成對應的即可
            注意點:這是要設定哪個消費者的預取值,那就是在哪個消費者代碼中進行設定啊
         */
        channel.basicQos(10);		// 這樣就表示這個代碼所在的消費者需要消費10條消息了

        channel.basicConsume("qos queue", true, deliverCallback, consumerTag -> {
            System.out.println("消費者中斷了接收消息行為觸發的回調");
        });

           

3.6、publisher / confirms 釋出确認模式

3.6.1、釋出确認模式的原理

  • 這個玩意兒的目的就是為了持久化
RabbitMQ 3.9( 基礎 )
  • 在上面的過程中,想要讓資料持久化,那麼需要具備以下的條件
    • 1、隊列持久化
    • 2、消息持久化
    • 3、釋出确認
  • 而所謂的釋出确認指的就是:資料在刷寫到磁盤時,成功了,那麼MQ就回複生産者一下,資料确認刷寫到磁盤了,否則:隻具備前面的二者的話,那也有可能出問題,如:資料推到了隊列中,但是還沒來得及刷寫到磁盤呢,結果RabbitMQ當機了,那資料也有可能會丢失,是以:現在持久化的過程就是如下的樣子:
RabbitMQ 3.9( 基礎 )
開啟釋出确認
  • 在發送消息之前( 即:調basicPublish() 之前 )調一個API就可以了
channel.confirmSelect();		// 沒有參數

           

3.6.2、釋出确認的分類

3.6.2.1、單個确認釋出
  • 一句話:一手交錢一手交貨,即 生産者釋出一條消息,RabbitMQ就要回複确認狀态,否則不再發放消息,是以:這種模式是同步釋出确認的方式,缺點:很慢,優點:能夠實時地了解到那條消息出異常 / 哪些消息都釋出成功了
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {

        // 單個确認釋出
        singleConfirm();        // 單個确認釋出發送這些消息花費4797ms
    }

	public static void singleConfirm() throws IOException, TimeoutException, InterruptedException {

        Channel channel = MQUtil.getChannel("");

        // 開啟确認釋出
        channel.confirmSelect();

        // 聲明隊列 并 讓隊列持久化
        channel.queueDeclare("singleConfirm", true, false, false, null);

        long begin = System.currentTimeMillis();

        for (int i = 1; i <= 100; i++) {

            // 發送消息 并 讓消息持久化
            channel.basicPublish("","singleConfirm", MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes() );

            // 釋出一個 确認一個 channel.waitForConfirms()
            if ( channel.waitForConfirms() )
                System.out.println("消息".concat( String.valueOf(i) ).concat( "發送成功") );

        }

        long end = System.currentTimeMillis();

        System.out.println("單個确認釋出發送這些消息花費".concat( String.valueOf( end-begin ) ).concat("ms") );
    }

           
3.6.2.2、批量确認釋出
  • 一句話:隻要結果,是怎麼一個批量管不着,隻需要把一堆消息釋出之後,回複一個結果即可,這種釋出也是同步的
  • 優點:效率相比單個釋出要高
  • 缺點:如果因為什麼系統故障而導緻釋出消息出現問題,那麼就會導緻是批量發了一些消息,然後再回複的,中間有哪個消息出問題了鬼知道
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        // 單個确認釋出
        // singleConfirm();        // 單個确認釋出發送這些消息花費4797ms

        // 批量釋出
        batchConfirm();         // 批量釋出發送的消息共耗時:456ms

    }

    public static void batchConfirm() throws IOException, TimeoutException, InterruptedException {

        Channel channel = MQUtil.getChannel("");

        // 開啟确認釋出
        channel.confirmSelect();

        // 聲明隊列 并 讓隊列持久化
        channel.queueDeclare("batchConfirm", true, false, false, null);

        long begin = System.currentTimeMillis();

        for (int i = 1; i <= 100; i++) {

            // 發送消息 并 讓消息持久化
            channel.basicPublish("","batchConfirm", MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes() );

            // 批量釋出 并 回複批量釋出的結果 - 發了10條之後再确認
            if (i % 10 == 0) {

                channel.waitForConfirms();
                System.out.println("消息" + ( i-10 ) + "====>" + i + "的消息釋出成功");
            }
        }

        // 為了以防還有另外的消息未被确認,再次确認一下
        channel.waitForConfirms();

        long end = System.currentTimeMillis();

        System.out.println("批量釋出發送的消息共耗時:" + (end - begin) + "ms");

    }

           
3.6.2.3、異步确認釋出 - 必須會的一種
RabbitMQ 3.9( 基礎 )
  • 由上圖可知:所謂的異步确認釋出就是:
    • 1、生産者隻管發消息就行,不用管消息有沒有成功
    • 2、釋出的消息是存在一個map集合中的,其key就是消息的辨別tag / id,value就是消息内容
    • 3、如果消息成功釋出了,那麼實體broker會有一個ackCallback()回調函數來進行處理【 ps:裡面的處理邏輯是需要我們進行設計的 】
    • 4、如果消息未成功釋出,那麼實體broker會調用一個nackCallback()回調函數來進行處理【 ps:裡面的處理邏輯是需要我們進行設計的 】
    • 5、而需要異步處理,就是因為生産者隻管發就行了,是以:一輪的消息肯定是很快就釋出過去了,就可以做下一輪的事情了,至于上一輪的結果是怎麼樣的,那就需要等到兩個callback回調執行完了之後給結果,而想要能夠調取到兩個callback回調,那麼:就需要對發送的資訊進行監聽 / 對信道進行監聽
  • 而上述牽扯到一個map集合,那麼這個集合需要具備如下的條件:
    • 1、首先此集合應是一個安全且有序的,同時還支援高并發
    • 2、其次能夠将序列号( key ) 和 消息( value )輕松地進行關聯
代碼實作
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        // 單個确認釋出
        // singleConfirm();        // 單個确認釋出發送這些消息花費4797ms

        // 批量釋出
        // batchConfirm();         // 批量釋出發送的消息共耗時:456ms

        asyncConfirm();             // 異步釋出确認耗時:10ms

    }

    // 異步釋出确認
    public static void asyncConfirm() throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");
        channel.confirmSelect();
        channel.queueDeclare("async confirm", true, false, false, null);

        // 1、準備符合條件的map
        ConcurrentSkipListMap<Long, Object> messagePoolMap = new ConcurrentSkipListMap<>();

        // 3、對信道channel進行監聽
        // 成功确認釋出回調
        ConfirmCallback ackCallback = (messageTag, multiple) -> {
            System.out.println("确認釋出了消息=====>" + messagePoolMap.headMap(messageTag) );

            // 4、把确認釋出的消息删掉,減少記憶體開銷
            // 判斷是否是批量删除
            if ( multiple ){
                // 通過消息辨別tag 把 确認釋出的消息取出
                messagePoolMap.headMap(messageTag).clear();
                /**
                 * 上面這句代碼拆分寫法
                 *    ConcurrentNavigableMap<Long, Object> confirmed = messagePoolMap.headMap(messageTag);
                 *    confirmed.clear();
                 */
            }else {
                messagePoolMap.remove(messageTag);
            }
        };

        // 沒成功釋出确認回調
        ConfirmCallback nackCallback = (messageTag, multiple) -> {
            System.out.println("未确認的消息是:" + messagePoolMap.get(messageTag) );
        };

        // 進行channel監聽 這是異步的
        /**
         * channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)
         * 參數1、消息成功釋出的回調函數 ackCallback()
         * 參數2、消息未成功釋出的回調函數 nackCallback()
         */
        channel.addConfirmListener( ackCallback,nackCallback );

        long begin = System.currentTimeMillis();

        for (int i = 1; i <= 100; i++) {

            // 2、将要釋出的全部資訊儲存到map中去
            /*
                channel.getNextPublishSeqNo() 擷取下一次将要發送的消息辨別tag
             */
            messagePoolMap.put(channel.getNextPublishSeqNo(),String.valueOf(i) );
            // 生産者隻管釋出就行
            channel.basicPublish("","async confirm",MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes());

            System.out.println("消息=====>" + i + "發送完畢");
        }


        long end = System.currentTimeMillis();

        System.out.println("異步釋出确認耗時:" + ( end-begin ) + "ms" );
    }

           

3.7、交換機

  • 正如前面一開始就畫的原理圖,交換機的作用就是為了接收生産者發送的消息 并 将消息發送到隊列中去
RabbitMQ 3.9( 基礎 )
  • 注意點:前面一直玩的那些模式,雖然沒有寫交換機,但并不是說RabbitMQ就沒用交換機【 ps:使用的是""空串,也就是使用了RabbitMQ的預設交換機 】,生産者發送的消息隻能發到交換機中,進而由交換機來把消息發給隊列

3.7.1、交換機exchange的分類

  • 直接( direct ) / routing 模式
  • 主題( topic )
  • 标題 ( heanders ) - 這個已經很少用了
  • 扇出( fancut ) / 釋出訂閱模式
臨時隊列
  • 所謂的臨時隊列指的就是:自動幫我們生成隊列名 并且 當生産者和隊列斷開之後,這個隊列會被自動删除
  • 是以這麼一說:前面玩過的一種就屬于臨時隊列,即:将下面的第四個參數改成true即可【 ps:當然讓隊列名随機生成就完全比對了 】
/*
            下面這是參數的完整意思,源碼中偷懶了,沒有見名知意
            queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
            參數1、隊列名字
            參數2、是否持久化( 儲存到磁盤 ),預設是在記憶體中的
            參數3、是否共享,即:是否隻供一個消費者消費,是否讓多個消費者共享這個隊列中的資訊
            參數4、是否自動删除,即:最後一個消費者擷取資訊之後,這個隊列是否自動删除
            參數5、其他配置項,這涉及到後面的知識,目前選擇null
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

           
  • 而如果要更簡單的生成臨時隊列,那麼調用如下的API即可
String queueName = channel.queueDeclare().getQueue();

           
  • 這樣幫我們生成的隊列效果就和

    channel.queueDeclare(QUEUE_NAME, false, false, true, null);

    是一樣的了

3.7.2、fanout扇出 / 釋出訂閱模式

  • 這玩意兒吧,好比群發,一人發,很多人收到消息,就是原理圖的另一種樣子,生産者釋出的一個消息,可以供多個消費者進行消費
RabbitMQ 3.9( 基礎 )
  • 實作方式就是讓一個交換機binding綁定多個隊列
生産者
package cn.zixieqing.fanout;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class FanoutProducer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        /**
         * 定義交換機
         * 參數1、交換機名字
         * 參數2、交換機類型
         */
        channel.exchangeDeclare("fanoutExchange", BuiltinExchangeType.FANOUT);

        System.out.println("請輸入要發送的内容:");
        Scanner input = new Scanner(System.in);
        while (input.hasNext()){
            String message = input.next();
            channel.basicPublish("fanoutExchange","", null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息=====>" + message + "發送完畢");
        }
    }
}

           
消費者01
package cn.zixieqing.fanout;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class FanoutConsumer01 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 綁定隊列
        /**
         * 參數1、隊列名字
         * 參數2、交換機名字
         * 參數3、用于綁定的routing key / binding key
         */
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, "fanoutExchange", "");

        System.out.println("01消費者正在接收消息........");
        channel.basicConsume(queueName,true,(consumerTag,message)->{
            // 這裡面接收到消息之後就可以用來做其他事情了,如:存到磁盤
            System.out.println("接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
        },consumerTage->{});
    }
}

           
消費者02
package cn.zixieqing.fanout;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class FanoutConsumer02 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 綁定隊列
        /**
         * 參數1、隊列名字
         * 參數2、交換機名字
         * 參數3、用于綁定的routing key / binding key
         */
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, "fanoutExchange", "");

        System.out.println("02消費者正在接收消息........");
        channel.basicConsume(queueName,true,(consumerTag,message)->{
            // 這裡面接收到消息之後就可以用來做其他事情了,如:存到磁盤
            System.out.println("接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
        },consumerTage->{});
    }
}

           
RabbitMQ 3.9( 基礎 )

3.7.3、direct交換機 / routing路由模式

  • 這個玩意兒吧就是釋出訂閱模式,也就是fanout類型交換機的變樣闆,即:多了一個routing key的配置而已,也就是說:生産者和消費者傳輸消息就通過routing key進行關聯起來,是以:現在就變成了生産者想把消息發給誰就發給誰
RabbitMQ 3.9( 基礎 )
生産者
package cn.zixieqing.direct;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class DirectProducer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        channel.exchangeDeclare("directExchange", BuiltinExchangeType.DIRECT);

        System.out.println("請輸入要發送的消息:");
        Scanner input = new Scanner(System.in);

        while (input.hasNext()){
            String message = input.next();
            /**
             * 對第二個參數routing key做文章
             * 假如這裡的routing key為zixieqing 那麼:就意味着消費者隻能是綁定了zixieqing的隊列才可以進行接收這裡發的消息内容
             */
            channel.basicPublish("directExchange","zixieqing",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息=====>" + message + "====>發送完畢");
        }
    }
}

           
消費者01
package cn.zixieqing.direct;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class DirectConsumer01 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        channel.queueDeclare("direct", false, false, false, null);
        /**
         * 隊列綁定
         * 參數1、隊列名
         * 參數2、交換機名字
         * 參數3、routing key 這裡的routing key 就需要和生産者中的一樣了,這樣才可以通過這個routing key去對應的隊列中取消息
         */
        channel.queueBind("direct", "directExchange", "zixieqing");

        System.out.println("01消費者正在接收消息.......");
        channel.basicConsume("direct",true,(consumerTag,message)->{
            System.out.println("01消費者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
        },consumerTag->{});
    }
}

           
  • 上面這種,生産者的消息肯定能夠被01消費者給消費,因為:他們的交換機名字、隊列名字和routing key的值都是相同的
RabbitMQ 3.9( 基礎 )
  • 而此時再加一個消費者,讓它的routing key值和生産者中的不同
package cn.zixieqing.direct;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class DirectConsumer02 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        channel.queueDeclare("direct", false, false, false, null);
        /**
         * 隊列綁定
         * 參數1、隊列名
         * 參數2、交換機名字
         * 參數3、routing key 這裡的routing key 就需要和生産者中的一樣了,這樣才可以通過這個routing key去對應的隊列中取消息
         */
        // 搞點事情:這裡的routing key的值zixieqing和生産者的不同
        channel.queueBind("direct", "directExchange", "xiegongzi");

        System.out.println("02消費者正在接收消息.......");
        channel.basicConsume("direct",true,(consumerTag,message)->{
            System.out.println("02消費者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
        },consumerTag->{});
    }
}

           
RabbitMQ 3.9( 基礎 )

3.7.4、topic交換機 / topic主題模式 - 使用最廣的一個

  • 前面玩的fanout扇出類型的交換機 / 釋出訂閱模式是一個生産者釋出,多個消費者共享消息,和qq群類似;而direct直接交換機 / 路由模式是消費者隻能消費和消費者相同routing key的消息
  • 而上述這兩種還有局限性,如:現在生産者的routing key為zi.xie.qing,而一個消費者隻消費含xie的消息,一個消費者隻消費含qing的消息,另一個消費者隻消費第一個為zi的零個或無數個單詞的消息,甚至還有一個消費者隻消費最後一個單詞為qing,前面有三個單詞的routing key的消息呢?
  • 這樣一看,釋出訂閱模式和路由模式都不能解決,更别說前面玩的簡單模式、工作隊列模式、釋出确認模式了,這些和目前的這個需求更不搭了,是以:就來了這個topic主題模式
topic中routing key的要求
  • 隻要交換機類型是topic類型的,那麼其routing key就不能亂寫,要求:routing key隻能是一個單詞清單,多個單詞之間采用點隔開,如:cn.zixieqing.rabbit
  • 單詞清單的長度不能超過255個位元組
  • 在routing key的規則清單中有兩個替換符可以用
    • 1、

      *

      代表一個單詞
    • 2、

      #

      代表零活無數個單詞
  • 假如有如下的一個綁定關系圖
RabbitMQ 3.9( 基礎 )
  • Q1綁定的是:中間帶 orange 帶 3 個單詞的字元串(.orange.)
  • Q2綁定的是:
    • 最後一個單詞是 rabbit 的 3 個單詞(..rabbit)
    • 第一個單詞是 lazy 的多個單詞(lazy.#)
  • 熟悉一下這種綁定關系( 左為一些routes路由規則,右為能比對到上圖綁定關系的結果 )
quick.orange.rabbit 		被隊列 Q1Q2 接收到
	lazy.orange.elephant 		被隊列 Q1Q2 接收到
	quick.orange.fox 			被隊列 Q1 接收到
	lazy.brown.fox 				被隊列 Q2 接收到
	lazy.pink.rabbit 			雖然滿足兩個綁定,但隻被隊列 Q2 接收一次
	quick.brown.fox 			不滿足任何綁定關系,不會被任何隊列接收到,會被丢棄
	quick.orange.male.rabbit 	是四個單詞,不滿足任何綁定關系,會被丢棄
	lazy.orange.male.rabbit 	雖是四個單詞,但比對 Q2,因:符合lazy.#這個規則

           
  • 當隊列綁定關系是下列這種情況時需要引起注意
    • 當一個隊列綁定鍵是#,那麼這個隊列将接收所有資料,就有點像 fanout 了
    • 如果隊列綁定鍵當中沒有#和*出現,那麼該隊列綁定類型就是 direct 了

把上面的綁定關系和測試轉換成代碼玩一波

生産者

package cn.zixieqing.topic;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;


public class TopicProducer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);

        /**
         * 準備大量的routing key 和 message
         */
        HashMap<String, String> routesAndMessageMap = new HashMap<>();
        routesAndMessageMap.put("quick.orange.rabbit", "被隊列 Q1Q2 接收到");
        routesAndMessageMap.put("lazy.orange.elephant", "被隊列 Q1Q2 接收到");
        routesAndMessageMap.put("quick.orange.fox", "被隊列 Q1 接收到");
        routesAndMessageMap.put("lazy.brown.fox", "被隊列 Q2 接收到");
        routesAndMessageMap.put("lazy.pink.rabbit", "雖然滿足兩個綁定,但隻被隊列 Q2 接收一次");
        routesAndMessageMap.put("quick.brown.fox", "不滿足任何綁定關系,不會被任何隊列接收到,會被丢棄");
        routesAndMessageMap.put("quick.orange.male.rabbit", "是四個單詞,不滿足任何綁定關系,會被丢棄");
        routesAndMessageMap.put("lazy.orange.male.rabbit ", "雖是四個單詞,但比對 Q2,因:符合lazy.#這個規則");

        System.out.println("生産者正在發送消息.......");
        for (Map.Entry<String, String> routesAndMessageEntry : routesAndMessageMap.entrySet()) {
            String routingKey = routesAndMessageEntry.getKey();
            String message = routesAndMessageEntry.getValue();
            channel.basicPublish("topicExchange",routingKey,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息====>" + message + "===>發送完畢");
        }
    }
}

           
消費者01
package cn.zixieqing.topic;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class TopicConsumer01 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");
        channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);
        channel.queueDeclare("Q1", false, false, false, null);
        channel.queueBind("Q1", "topicExchange", "*.orange.*");

        System.out.println("消費者01正在接收消息......");
        channel.basicConsume("Q1",true,(consumerTage,message)->{
            System.out.println("01消費者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
            System.out.println("此條消息的交換機名為:" + message.getEnvelope().getExchange() + ",路由鍵為:" + message.getEnvelope().getRoutingKey());
        },consumerTag->{});
    }
}

           
消費者02
package cn.zixieqing.topic;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class TopicConsumer02 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");
        channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);
        channel.queueDeclare("Q2", false, false, false, null);
        channel.queueBind("Q2", "topicExchange", "*.*.rabbit");
        channel.queueBind("Q2", "topicExchange", "lazy.#");

        System.out.println("消費者02正在接收消息......");
        channel.basicConsume("Q2",true,(consumerTage,message)->{
            System.out.println("02消費者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
            System.out.println("此條消息的交換機名為:" + message.getEnvelope().getExchange() + ",路由鍵為:" + message.getEnvelope().getRoutingKey());
        },consumerTag->{});
    }
}

           
RabbitMQ 3.9( 基礎 )

3.8、死信隊列

  • 死信隊列指的是:死了的消息,換言之就是:生産者把消息發送到交換機中,再由交換機把消息推到隊列中,但由于某些原因,隊列中的消息沒有被正常消費,進而就讓這些消息變成了死信,而專門用來放這種消息的隊列就是死信隊列
  • 讓消息成為死信的三大因素
    • 1、消息過期 即:TTL( time to live )過期
    • 2、超過隊列長度
    • 3、消息被消費者絕收了
  • 實作下圖的邏輯( 下圖成為死信的因素是隻要出現一個就成為死信 )
RabbitMQ 3.9( 基礎 )

3.8.1、消息過期 TTL

生産者
package cn.zixieqing.dead_letter_queue.ttl;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class TtlProducer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");
        channel.exchangeDeclare("normal_exchange", BuiltinExchangeType.DIRECT);

        // 設定消息的失效時間
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder()
                // 10s過期 expiration( String time ) 這裡的機關是ms值
                .expiration(String.valueOf(10 * 1000))
                .build();
        for (int i = 1; i < 11; i++) {

            String message = "生産者發送了消息" + i;

            channel.basicPublish("normal_exchange","zhangsan",properties,message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

           
實作下面的消費者部分
RabbitMQ 3.9( 基礎 )
C1消費者
package cn.zixieqing.dead_letter_queue.ttl;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;


public class TtlConsumer01 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 聲明正常交換機、死信交換機
        channel.exchangeDeclare("normal_exchange", BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare("dead_exchange", BuiltinExchangeType.DIRECT);

        // 聲明死信隊列
        channel.queueDeclare("dead-queue", false, false, false, null);
        // 死信隊列綁定死信交換機
        channel.queueBind("dead-queue", "dead_exchange", "lisi");

        // 聲明正常隊列
        /**
         * 但是:需要考慮消息過期之後,轉到死信隊列去,是以:用最後一個參數做文章
         */
        Map<String, Object> params = new HashMap<>();
        // 消息過期 那需要找到死信交換機 - 是以:讓正常隊列和死信交換機聯系起來,其中key值x-dead-letter-exchange是固定的
        params.put("x-dead-letter-exchange", "dead_exchange");
        // 知道了交換機,那還需要知道routing key路由鍵,其中:key值x-dead-letter-routing-key也是死的
        params.put("x-dead-letter-routing-key", "lisi");
        // 經過上面的參數配置之後,隻要TTL過期,那麼消息會跑到上面定義的dead_exchange,然後推到dead-queue中去
        channel.queueDeclare("normal-queue", false, false, false, params);

        // 讓正常隊列和正常交換機進行綁定
        channel.queueBind("normal-queue", "normal_exchange", "zhangsan");

        // 消費消息
        System.out.println("消費者01正在接收消息.......");
        channel.basicConsume("normal-queue",true,(consumeTage,message)->{
            System.out.println("01消費者從正常隊列中消費了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8 ) );
        },consumeTage->{});
    }
}

           
  • 啟動C1,然後把C1關了( 僞裝成消費者無法消費消息 ),最後啟動生産者
RabbitMQ 3.9( 基礎 )

現在來收個尾

C2消費者來消費死信隊列中的消息 - 就是一個正常的消費者消費,隻是跑到死信隊列中去找了而已

RabbitMQ 3.9( 基礎 )
package cn.zixieqing.dead_letter_queue.ttl;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class TtlConsumer02 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");
        channel.exchangeDeclare("dead_exchange", BuiltinExchangeType.DIRECT);

        System.out.println("02消費者正在消費死信隊列中的消息.......");
        channel.basicConsume("dead-queue",true,(consumeTage,message)->{
            System.out.println("02消費者接收到了死信隊列中的===>" + new String(message.getBody(), StandardCharsets.UTF_8));
        },consumeTage->{});
    }
}

           
RabbitMQ 3.9( 基礎 )
RabbitMQ 3.9( 基礎 )

3.8.2、隊列超過最大長度

3.8.2.1、隊列超過所限制的最大個數
  • 意思就是:某一個隊列要求隻能放N個消息,但是放了N+1個消息,這就超過隊列的最大個數了
生産者
  • 就是一個正常的生産者發送消息而已
package cn.zixieqing.dead_letter_queue.queuelength.queuenumber;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        channel.exchangeDeclare("messageNumber_normal_exchange", BuiltinExchangeType.DIRECT);

        for (int i = 1; i < 11; i++) {
            String message = "生産者發送了消息" + i;
            channel.basicPublish("messageNumber_normal_exchange","zi",null,
                    message.getBytes(StandardCharsets.UTF_8) );
            System.out.println("消息====>" + message + "====>發送完畢");
        }
    }
}

           
01消費者
package cn.zixieqing.dead_letter_queue.queuelength.queuenumber;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;


public class Consumer01 {

    /**
     * 正常交換機名稱
     */
    public static final String NORMAL_EXCHANGE = "messageNumber_normal_exchange";

    /**
     * 正常隊列名稱
     */
    public static final String NORMAL_QUEUE = "messageNumber_queue";

    /**
     * 死信交換機名稱
     */
    public static final String DEAD_EXCHANGE = "messageNumber_dead_exchange";

    /**
     * 死信隊列名稱
     */
    public static final String DEAD_QUEUE = "messageNumber_dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 聲明正常交換機、死信交換機
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 聲明死信隊列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        // 死信交換機和死信隊列進行綁定
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "xie");

        // 聲明正常隊列 并 考慮達到條件時和死信交換機進行聯系
        HashMap<String, Object> params = new HashMap<>();
        // 死信交換機
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 死信路由鍵
        params.put("x-dead-letter-routing-key", "xie");
        // 達到隊列能接受的最大個數限制就多了如下的配置
        params.put("x-max-length", 6);
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
        // 正常隊列和正常交換機進行綁定
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zi");

        System.out.println("01消費者正在接收消息......");
        channel.basicConsume(NORMAL_QUEUE,true,(consumeTag,message)->{
            System.out.println("01消費者接收到了消息:" + new String( message.getBody(), StandardCharsets.UTF_8));
        },consumeTag->{});
    }
}

           
  • 啟動01消費者,然後關掉( 模仿異常 ),最後啟動生産者,那麼:生産者發送了10個消息,由于01消費者這邊做了配置,是以有6個消息是在正常隊列中,餘下的4個消息就會進入死信隊列
RabbitMQ 3.9( 基礎 )
3.8.2.2、超過隊列能接受消息的最大位元組長度
  • 和前面一種相比,在01消費者方做另一個配置即可
params.put("x-max-length-bytes", 255);

           
RabbitMQ 3.9( 基礎 )
注意:關于兩種情況同時使用的問題
  • 如配置的如下兩個
params.put("x-max-length", 6);
        params.put("x-max-length-bytes", 255);

           
  • 那麼先達到哪個上限設定就執行哪個

3.8.3、消息被拒收

  • 注意點:必須開啟手動應答
// 第二個參數改成false
	channel.basicConsume(NORMAL_QUEUE,false,(consumeTag,message)->{},consumeTag->{});

           
生産者
package cn.zixieqing.dead_letter_queue.reack;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");
        channel.exchangeDeclare("reack_normal_exchange", BuiltinExchangeType.DIRECT);

        for (int i = 1; i < 11; i++) {
            String message = "生産者發送的消息" + i;
            channel.basicPublish("reack_normal_exchange","zixieqing",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息===>" + message + "===>發送完畢");
        }
    }
}

           
消費者
package cn.zixieqing.dead_letter_queue.reack;

import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;


public class Consumer01 {

    public static final String NORMAL_EXCHANGE = "reack_normal_exchange";
    public static final String DEAD_EXCHANGE = "reack_dead_exchange";

    public static final String DEAD_QUEUE = "reack_dead_queue";
    public static final String NORMAL_QUEUE = "reack_normal_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = MQUtil.getChannel("");

        // 聲明正常交換機、死信交換機
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 聲明死信隊列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        // 死信隊列綁定死信交換機
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "xie");

        // 聲明正常隊列
        HashMap<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        params.put("x-dead-letter-routing-key", "xie");
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zixieqing");

        System.out.println("01消費者正在接收消息.....");
        // 1、注意:需要開啟手動應答( 第二個參數為false )
        channel.basicConsume(NORMAL_QUEUE,false,(consumeTag,message)->{
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);

            // 如果發送的消息為:生産者發送的消息5  則:拒收
            if ( "生産者發送的消息5".equals( msg ) ) {
                System.out.println("此消息====>" + msg + "===>是拒收的");
                // 2、做拒收處理 - 注意:第二個參數設為false,表示不再重新入正常隊列的隊,這樣消息才可以進入死信隊列
                channel.basicReject( message.getEnvelope().getDeliveryTag(),false);
            }else {
                System.out.println("01消費者接收到了消息=====>" + msg);
            }
        },consumeTag->{});
    }
}

           
RabbitMQ 3.9( 基礎 )

3.9、續篇連結

  • https://www.cnblogs.com/xiegongzi/p/16242291.html#top