天天看點

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

目錄

1.基礎知識:

1.0 前言:

 1.1 rabbitmq的使用者角色

1.2 rabbitmq的端口号

1.3 消息隊列的運作原理

1.4你必須知道的Rabbit

2.六種隊列

2.1 上圖1:  簡單隊列   

生産者與消費者一一對應 生産者将消息發送到隊列,消費者從隊列中擷取消息。

2.2 上圖2:Work模式 

一個生産者多個消費者,每個消費者擷取到的消息唯一

2.3上圖3: 訂閱模式  Fanout Exchange

2.4上圖4: 路由模式 Direct Exchange

2.5 上圖5:主題模式 Topic Exchange (通配符模式)

3.應用場景

3.1.延遲隊列

3.2.消息錯峰

3.3.消息分發

3.4.應用解耦

3.5.消息确認ACK

4.rabbitMq與Erlang版本對應關系

5.windows搭建

5.1安裝Erlang

5.1.1下載下傳erlang

5.1.2安裝erlang

5.2安裝rabbitmq

5.2.1下載下傳rabbitmq

5.2.2安裝Rabbitmq

5.2.3 啟用管理工具

6.linux搭建

6.1 說明:

6.2CentOs6安裝:

6.2.1下載下傳安裝包:

6.2.2centos6安裝:

6.3 centos7安裝:

6.4 浏覽器通路

6.5 移除queue

7.與spingboot內建

7.1.配置rabbitmq連接配接資訊:

7.2 添加AMQP支援

7.3定制化AMQP模闆

7.4 direct類型 

7.4.1定義direct類型隊列、交換機并綁定:

7.4.2 controller中發送消息

7.4.3 receiver端接收消息

7.5 fanout類型 

7.5.1定義fanout類型隊列、交換機并綁定:

7.5.2 controller中發送消息

7.5.3 receiver端接收消息

7.6 diedletter類型(延遲消費類型)

7.6.1定義diedletter類型隊列、交換機并綁定:

7.6.2 controller中發送消息

7.6.3 receiver端接收消息

7.7 topic類型

7.7.1定義topic類型隊列、交換機并綁定:

7.7.2 controller中發送消息

7.7.3 receiver端接收消息

8.上代碼

1.基礎知識:

1.0 前言:

RabbitMQ是一個由erlang語言開發的基于AMQP(Advanved Message Queue)隊列協定的開源實作。

支援的作業系統  linux、windows、macox等

支援的開發語言 java、python、ruby、.net、php、c/c++、node.js等

官網 https://www.rabbitmq.com/

 1.1 rabbitmq的使用者角色

           1、 超級管理者(administrator)

                 可登陸管理控制台,可檢視所有的資訊,并且可以對使用者,政策(policy)進行操作。

           2、 監控者(monitoring)

                 可登陸管理控制台,同時可以檢視rabbitmq節點的相關資訊(程序數,記憶體使用情況,磁盤使用情況等)

           3、 政策制定者(policymaker)

                  可登陸管理控制台, 同時可以對policy進行管理。但無法檢視節點的相關資訊(上圖紅框辨別的部分)。

           4、 普通管理者(management)

                  僅可登陸管理控制台,無法看到節點資訊,也無法對政策進行管理。

           5、 其他

                 無法登陸管理控制台,通常就是普通的生産者和消費者。

1.2 rabbitmq的端口号

               5672: AMQP協定的端口号(與Java互動)

               15672: 管理工具的端口

               25672: 叢集

這些端口需要防火牆放行,詳細的操作細節後面的linux、windows搭建會有提及

1.3 消息隊列的運作原理

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

從上圖可以看出、生産者、消息隊列、消費者是最重要的三個概念:

    生産者(Producer):建立消息到消息隊列中(消息的建立者,負責建立和推送資料到消息伺服器)。

    消息隊列(Queue):存儲消息。

    消費者(Consumer):監聽指定消息隊列,當消息隊列接收到消息後,擷取消息并處理。

    RabbitMQ:建立消息隊列,并提供API給生産者和消費者進行存取消息,用于扮演“快遞”的角色,本身不生産消息,隻是扮演“快遞”的角色。

    生産者産生消息并調用RabbitMQ的API将消息加入到對應的消息隊列中,消費者通過RabbitMQ的API從消息隊列中擷取消息進行消費。

這裡面有一個虛拟機的概念,你可以把它了解為域名,在應用中需要通過配置或者代碼指定你要用哪個虛拟機(Vhost)。如果你不指定,那麼預設的會選用 "/".前提是你的目前使用者有對這個虛拟機 "/" 的使用權限。虛拟機可以在rabbitmq的可視化頁面裡面配置。

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

1.4你必須知道的Rabbit

 想要真正的了解Rabbit有些名詞是你必須知道的。

           包括:ConnectionFactory(連接配接管理器)、Channel(信道)、Exchange(交換器)、Queue(隊列)、            RoutingKey(路由鍵)、BindingKey(綁定鍵)。

           ConnectionFactory(連接配接管理器):應用程式與Rabbit之間建立連接配接的管理器,程式代碼中使用;

           Channel(信道):消息推送使用的通道;

           Exchange(交換器):用于接受、配置設定消息;

           Queue(隊列):用于存儲生産者的消息;

           RoutingKey(路由鍵):用于把生成者的資料配置設定到交換器上;

           BindingKey(綁定鍵):用于把交換器的消息綁定到隊列上;

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

2.六種隊列

rabbitmq一共有六種隊列,其中RPC已被淘汰:

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

2.1 上圖1:  簡單隊列   

生産者與消費者一一對應 生産者将消息發送到隊列,消費者從隊列中擷取消息。

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

在這種模式下一個生産者對應了多個消費者,但是一個消息隻能被一個消費者擷取。可以配置消息分發到消費者的路由規則

 “P”是我們的生産者,“C”是我們的消費者。中間的框是一個隊列。

  生産者将消息發送到隊列,消費者從隊列中消費。

2.2 上圖2:Work模式 

一個生産者多個消費者,每個消費者擷取到的消息唯一

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

2.3上圖3: 訂閱模式  Fanout Exchange

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

解讀:

1、1個生産者,多個消費者

2、每一個消費者都有自己的一個隊列

3、生産者沒有将消息直接發送到隊列,而是發送到了交換機

4、每個隊列都要綁定到交換機

5、生産者發送的消息,經過交換機,到達隊列,實作,一個消息被多個消費者擷取的目的

注意:一個消費者隊列可以有多個消費者執行個體,隻有其中一個消費者執行個體會消費

fanout(不處理路由鍵):每個和交換機綁定的隊列都會收到消息

2.4上圖4: 路由模式 Direct Exchange

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

發送消息到交換機并且要指定路由key ,消費者将隊列綁定到交換機時需要指定路由key。它會把消息路由到那些binding key與routing key完全比對的Queue中。
           

2.5 上圖5:主題模式 Topic Exchange (通配符模式)

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

這個可以了解為隊列的模糊比對。消息經過交換機推送到符合通配符規則的隊列中。

将路由鍵和某模式進行比對,此時隊列需要綁定在一個模式上,“#”比對一個詞或多個詞,“*”隻比對一個詞。當消息發送至交換機時,隻有routingKey能夠滿足通配規則的隊列才能夠接收到消息。

例如:routingKey 為   testqueue  的隊列 能夠接收到  test#  的消息

3.應用場景

3.1.延遲隊列

   使用者注冊後長時間不活躍定時推送提醒

   訂單下單後長時間未支付轉至推送提醒

   延遲重試,比如服務原因導緻業務執行終端通過延遲重試保障業務完整性等

3.2.消息錯峰

   秒殺、促銷等大流量資訊短時間注入是的錯峰處理以平衡上下遊吞吐量

3.3.消息分發

    統一消息源發送的消息應對不同的消費邏輯。如業務處理和日志采集

3.4.應用解耦

   通過釋出訂閱方式代替接口調用解耦項目

3.5.消息确認ACK

   保障業務完整性  提供了confirm 和 return 兩種消息确認回調方法。

   當消息由生産端成功發送至交換機 confirm回調方法會收到布爾類型的ack确認。true為推送至交換機成功。

   當消息由交換機根據routingKey推送至queue隊列中時,return回調方法會收到字元類型的replyText确認。

4.rabbitMq與Erlang版本對應關系

在搭建RabbitMQ環境過程中,由于版本問題導緻環境一直搭建不起來,以下是RabbitMQ與Erlang的版本對應關系

RabbitMQ版本 Erlang最低要求 Erlang最高要求
3.7.7 - 3.7.12 20.3.x 21.x
3.7.0 - 3.7.6 19.3 20.3.x
rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

5.windows搭建

5.1安裝Erlang

5.1.1下載下傳erlang

下載下傳:http://www.erlang.org/download/otp_win64_17.3.exe

 百度雲盤下載下傳:https://pan.baidu.com/s/1-5yJGzOl23BuQl1D2_n3SA

5.1.2安裝erlang

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼
rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼
rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

安裝完成。

5.2安裝rabbitmq

5.2.1下載下傳rabbitmq

官方下載下傳位址:http://www.rabbitmq.com/download.html

百度雲盤下載下傳: https://pan.baidu.com/s/18Dxi01D0sbjPZq-RWymnww

5.2.2安裝Rabbitmq

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼
rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

安裝完成。

5.2.3 啟用管理工具

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

cmd進入安裝目錄sbin檔案夾下 輸入指令:

  啟用管理頁面指令    rabbitmq-plugins enable rabbitmq_management

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

在浏覽器中輸入位址檢視:http://127.0.0.1:15672/

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

使用預設子賬号登入:guest    密碼  guest

操作界面不再贅述。 至此  windows 環境搭建已全部完成。

6.linux搭建

6.1 說明:

   搭建過程中遇到了mq包與linux系統版本沖突問題,故在此提供了 基于centos6 和centos7兩個版本系統的搭建步驟。

6.2CentOs6安裝:

6.2.1下載下傳安裝包:

rabbitmq安裝包:https://pan.baidu.com/s/1pAkK81dFFojnO6fTIxhHkw

erlang安裝包:https://pan.baidu.com/s/1jWl-YUSSv1GBUJntZNgXDw

socat安裝包:https://pan.baidu.com/s/1DfbX4a4dxtBFA3ZQ7NYDBQ

6.2.2centos6安裝:

1.安裝erlang的rpm庫。

rpm -ivh erlang-20.3.8.7-1.el6.x86_64.rpm

判斷erlang是否安裝成功:指令行輸入   erl 

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

有輸出即表示安裝成功

2.安裝socat依賴:

安裝socat的rpm庫。

yum install tcp_wrappers

rpm -ivh socat-1.7.3.0-1.el6.x86_64.rpm(不同版本号要修改)

3.安裝RabbitMQ

rpm -ivh rabbitmq-server-3.7.7-1.el6.noarch.rpm (不同版本号要修改)

4.啟動rabbitmq:service rabbitmq-server start/stop/restart

5.檢視rabbitmq的啟動狀态 service rabbitmq-server status

6.RabbitMQ配置   這裡主要是建立使用者和賦權,因為guest這個使用者 預設的隻能本機通路,而我們部署時一般不會放在同一台機器上。建立了admin使用者後就可以去浏覽器設定其他使用者了。

啟動RabbitMQ後執行:

1、rabbitmqctl add_user admin 123456(建立使用者)

2、rabbitmqctl set_user_tags admin administrator(将建立好的使用者加入管理者)

3、rabbitmqctl set_permissions -p "/" admin "." "." ".*"(授權)

4、rabbitmq-plugins enable rabbitmq_management 啟動RabbitMQ管理頁面(執行完這個才能用浏覽器可視化網頁)

5、重新開機MQ服務   service rabbitmq-server restart

6、開放5672/15672/25672端口。      

vi /etc/sysconfig/iptables

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

在紅框處的下一行依次添加開放端口語句後儲存:

-A RH-Firewall-1-INPUT -m state --state NEW -m tcp -p tcp --dport 5672-j ACCEPT

-A RH-Firewall-1-INPUT -m state --state NEW -m tcp -p tcp --dport 15672-j ACCEPT

-A RH-Firewall-1-INPUT -m state --state NEW -m tcp -p tcp --dport 25672-j ACCEPT

重新開機防火牆: 

service iptables restart

7、在浏覽器輸入ip:15672後出現RabbitMQ管理台。

6.3 centos7安裝:

方式一:  rpm包安裝

 1.1. 安裝依賴環境Erlang

使用Erlang Solutions源進行安裝

# 下載下傳rpm包

wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm

# 從erlang-solutions中更新該包,并将erlang_solutions.repo添加到/etc/yum.repos.d

rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

# 安裝

yum install erlang

該包還需依賴到epel源,請確定已有該源,若沒有則可通過以下方式安裝:

wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm

rpm -ivh epel-release-latest-7.noarch.rpm

yum repolist # 檢視安裝是否成功

由于Erlang Solutions會進行不斷地更新,且RabbitMQ對Erlang的版本有一定的要求(官方版本要求對應表)。是以官方建議我們禁止Erlang版本的自動更新。方法如下:參考如何禁止某個軟體包的自動更新

# 安裝yum-versionlock

yum install yum-plugin-versionlock

# 禁止Erlang自動更新

yum versionlock erlang

 1.2. 安裝RabbitMQ Server

從官網下載下傳rpm包并上傳到伺服器上。官方下載下傳連結

# 導入簽名

rpm --import https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc

# 或

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

# 安裝

yum install rabbitmq-server-3.7.7-1.el7.noarch.rpm

方式二:  使用腳本安裝

2.1安裝erlang

#建立erlang.repo庫

curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash

#安裝

yum install erlang

2.2安裝rabbitmq

#建立rabbitmq-server.repo庫

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash

#安裝

yum install rabbitmq-server

這種方式雖然會簡單點,但我嘗試過,發現隻有翻牆才能安裝成功,是以不太推薦大家使用這種方法。

2.3. 啟動RabbitMQ Server

# 設定開啟啟動

chkconfig rabbitmq-server on

# 啟動服務

service rabbitmq-server start

#停止服務

service rabbitmq-server stop

#監測狀态

service rabbitmq-server status

#使用者賦權

rabbitmqctl add_user admin 123456(建立使用者)

rabbitmqctl set_user_tags admin administrator(将建立好的使用者加入管理者)

rabbitmqctl set_permissions -p "/" admin "." "." ".*"(授權)

2.4. 配置RabbitMQ

2.4.1 找到配置檔案

/usr/share/doc/rabbitmq-server-3.7.7/ 目錄下複制一份模闆到 /etc/rabbitmq 目錄下進行修改

cd /usr/share/doc/rabbitmq-server-3.7.7/

cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

2.4.2 開啟管理背景 centos7 防火牆 叫firewall了  不是iptables了啊 ,這個注意下。

rabbitmq-plugins enable rabbitmq_management

# 開放端口

firewall-cmd --add-port=5672/tcp --permanen

firewall-cmd --add-port=15672/tcp --permanen

firewall-cmd --add-port=25672/tcp --permanent

firewall-cmd --reload

6.4 浏覽器通路

 http://ip:15672 ,進入如下頁面就證明插件啟動成功了

6.5 移除queue

清除的指令是: rabbitmqctl reset

但是在使用此指令前,要先關閉應用,否則不能清除。

關閉應用的指令為: rabbitmqctl stop_app

執行了這兩條指令後再次啟動此應用。

指令為: rabbitmqctl start_app

再次執行指令: rabbitmqctl list_queues

這次可以看到 listing 及 queues都是空的

7.與spingboot內建

7.1.配置rabbitmq連接配接資訊:

application.properties中配置rabbitMQ的連接配接資訊:

#對于rabbitMQ的支援
spring.rabbitmq.host=192.168.189.136
spring.rabbitmq.port=5672
spring.rabbitmq.username=yolly
spring.rabbitmq.password=yolly
spring.rabbitmq.virtualHost=yolly
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

#并發消費者的初始化值,并發消費者的最大值,每個消費者每次監聽時可拉取處理的消息數量。
spring.rabbitmq.listener.concurrency=10
spring.rabbitmq.listener.max-concurrency=20
spring.rabbitmq.listener.prefetch=5
           

host、使用者名、密碼還有虛拟機名稱這三個需要改成你自己的rabbitmq主機ip和賬号密碼虛拟機哈。

這個虛拟機spring.rabbitmq.virtualHost可以不寫,如果不寫或者寫“” 他會選擇使用 “/” 虛拟機,前提是你的目前賬号有 “/” 虛拟機的使用權限。

7.2 添加AMQP支援

因為rabbitmq是基于amqp協定開發的  是以需要在pom.xml中添加amqp dependencies:

同時用到了web測試效果,是以需要引用web starter

<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>RELEASE</version>
</dependency>
           

7.3定制化AMQP模闆

上面我們提到了rabbitmq的消息ack機制,在boot工程中這個機制生效的前提是聲明confirm和return為true ,且實作他們的回調方法:

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼
/**
     * 定制化amqp模版      可根據需要定制多個
     * <p>
     * <p>
     * 此處為模版類定義 Jackson消息轉換器
     * ConfirmCallback接口用于實作消息發送到RabbitMQ交換器後接收ack回調   即消息發送到exchange  ack
     * ReturnCallback接口用于實作消息發送到RabbitMQ 交換器,但無相應隊列與交換器綁定時的回調  即消息發送不到任何一個隊列中  ack
     *
     * @return the amqp template
     */
//    @Primary
    @Bean
    public AmqpTemplate amqpTemplate() {
        Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
//          使用jackson 消息轉換器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setEncoding("UTF-8");
//        開啟returncallback     yml 需要 配置    publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId();
            log.debug("消息:{} 發送失敗, 應答碼:{} 原因:{} 交換機: {}  路由鍵: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });
        //        消息确認  yml 需要配置   publisher-returns: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.debug("消息發送到exchange成功,id: {}", correlationData.getId());
            } else {
                log.debug("消息發送到exchange失敗,原因: {}", cause);
            }
        });
        return rabbitTemplate;
    }
           

至此內建工作已經全部實作完畢了

7.4 direct類型 

重定向模式 發送至exchange下滿足規則的queue

7.4.1定義direct類型隊列、交換機并綁定:

/* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */

    /**
     * 聲明直連交換機 支援持久化.
     *
     * @return the exchange
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
    }

    /**
     * 聲明一個隊列 支援持久化.
     *
     * @return the queue
     */
    @Bean("directQueueA")
    public Queue directQueueA() {
        return QueueBuilder.durable("DIRECT_QUEUE_A").build();
    }
    /**
     * 聲明一個隊列 支援持久化.
     *
     * @return the queue
     */
    @Bean("directQueueB")
    public Queue directQueueB() {
        return QueueBuilder.durable("DIRECT_QUEUE_B").build();
    }

    /**
     * 通過綁定鍵 将指定隊列綁定到一個指定的交換機 .
     *
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding directBindingA(@Qualifier("directQueueA") Queue queue, @Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY_A").noargs();
    }
    /**
     * 通過綁定鍵 将指定隊列綁定到一個指定的交換機 .
     *
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding directBindingB(@Qualifier("directQueueB") Queue queue, @Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY_B").noargs();
    }
           

7.4.2 controller中發送消息

/**
     * 測試Direct模式.
     *direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全比對的Queue中
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/direct")
    public ResponseEntity direct(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //routing key為一個句點号“. ”分隔的字元串(我們将被句點号“. ”分隔開的每一段獨立的字元串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
        //binding key與routing key一樣也是句點号“. ”分隔的字元串
        //binding key中可以存在兩種特殊字元“*”與“#”,用于做模糊比對,其中“*”用于比對一個單詞,“#”用于比對多個單詞(可以是零個)
        //   如果寫成   "DIRECT_ROUTING_KEY_.*"  則為topic模式 即模糊比對
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_A", p, correlationData);
        return ResponseEntity.ok();
    }
           

7.4.3 receiver端接收消息

/**
     * DIRECT模式.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  這裡異常需要處理
     */
    @RabbitListener(queues = {"DIRECT_QUEUE_A"})
    public void messageA(Message message, Channel channel) throws IOException {
        log.debug("DIRECTA "+new String (message.getBody()));
    }
    /**
     * DIRECT模式.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  這裡異常需要處理
     */
    @RabbitListener(queues = {"DIRECT_QUEUE_B"})
    public void messageB(Message message, Channel channel) throws IOException {
        log.debug("DIRECTB "+new String (message.getBody()));
    }
           

隻有隊列A能夠存儲消息、消費者A能夠收到消息

7.5 fanout類型 

廣播模式 發送到exchange下所有的queue

7.5.1定義fanout類型隊列、交換機并綁定:

/* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */

    /**
     * 聲明 fanout 交換機.
     *
     * @return the exchange
     */
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
    }

    /**
     * Fanout queue A.
     *
     * @return the queue
     */
    @Bean("fanoutQueueA")
    public Queue fanoutQueueA() {
        return QueueBuilder.durable("FANOUT_QUEUE_A").build();
    }

    /**
     * Fanout queue B .
     *
     * @return the queue
     */
    @Bean("fanoutQueueB")
    public Queue fanoutQueueB() {
        return QueueBuilder.durable("FANOUT_QUEUE_B").build();
    }

    /**
     * 綁定隊列A 到Fanout 交換機.
     *
     * @param queue          the queue
     * @param fanoutExchange the fanout exchange
     * @return the binding
     */
    @Bean
    public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    /**
     * 綁定隊列B 到Fanout 交換機.
     *
     * @param queue          the queue
     * @param fanoutExchange the fanout exchange
     * @return the binding
     */
    @Bean
    public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
           

7.5.2 controller中發送消息

/**
     * 測試廣播模式.
     *  fanout類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中。
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/fanout")
    public ResponseEntity send(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
        return ResponseEntity.ok();
    }
           

7.5.3 receiver端接收消息

/**
     * FANOUT廣播隊列監聽一.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  這裡異常需要處理
     */
    @RabbitListener(queues = {"FANOUT_QUEUE_A"})
    public void on(Message message, Channel channel) throws IOException {
        log.debug("FANOUT_QUEUE_A "+new String(message.getBody()));
    }

    /**
     * FANOUT廣播隊列監聽二.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception   這裡異常需要處理
     */
    @RabbitListener(queues = {"FANOUT_QUEUE_B"})
    public void t(Message message, Channel channel) throws IOException {
        log.debug("FANOUT_QUEUE_B "+new String(message.getBody()));
    }
           

AB 都能接收到消息

7.6 diedletter類型(延遲消費類型)

A隊列接收 過期後轉發至B隊列   B隊列被消費 實作消息的延遲消費

原理圖:

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼

7.6.1定義diedletter類型隊列、交換機并綁定:

/*----------------------------------------------------------------------------deadletter queue------------------------------------------------------------------------------*/

    /**
     * 死信隊列跟交換機類型沒有關系 不一定為directExchange  不影響該類型交換機的特性.
     *
     * @return the exchange
     */
    @Bean("deadLetterExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
    }

    /**
     * 聲明一個死信隊列.
     * x-dead-letter-exchange   對應  死信交換機
     * x-dead-letter-routing-key  對應 死信隊列
     *
     * @return the queue
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    聲明  死信交換機
        args.put(DEAD_LETTER_QUEUE_KEY, "DL_EXCHANGE");
//       x-dead-letter-routing-key    聲明 死信路由鍵
        args.put(DEAD_LETTER_ROUTING_KEY, "KEY_R");
        return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
    }

    /**
     * 定義死信隊列轉發隊列.
     *
     * @return the queue
     */
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        return QueueBuilder.durable("REDIRECT_QUEUE").build();
    }

    /**
     * 死信路由通過 DL_KEY 綁定鍵綁定到死信隊列上.
     *
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);

    }

    /**
     * 死信路由通過 KEY_R 綁定鍵綁定到死信隊列上.
     *
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
    }
           

7.6.2 controller中發送消息

/**
     * 測試死信隊列.
     *
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/dead")
    public ResponseEntity deadLetter(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//        聲明消息處理器  這個對消息進行處理  可以設定一些參數   對消息進行一些定制化處理   我們這裡  來設定消息的編碼  以及消息的過期時間  因為在.net 以及其他版本過期時間不一緻   這裡的時間毫秒值 為字元串
        MessagePostProcessor messagePostProcessor = message -> {
            MessageProperties messageProperties = message.getMessageProperties();
//            設定編碼
            messageProperties.setContentEncoding("utf-8");
//            設定過期時間10*1000毫秒
            messageProperties.setExpiration("2000");
            return message;
        };
//         向DL_QUEUE 發送消息  10*1000毫秒後過期 形成死信
        rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", p, messagePostProcessor, correlationData);
        return ResponseEntity.ok();
    }
           

7.6.3 receiver端接收消息

/**
     * 監聽替補隊列 來驗證死信.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  這裡異常需要處理
     */
    @RabbitListener(queues = {"REDIRECT_QUEUE"})
    public void redirect(Message message, Channel channel) throws IOException {
        log.debug("dead message  10s 後 消費消息 {}",new String (message.getBody()));
    }
           

過一會替補隊列接收到了消息

7.7 topic類型

模糊比對模式  這個routingkey與bindingkey采用正則通配符形式模糊比對

7.7.1定義topic類型隊列、交換機并綁定:

同7.4.1

7.7.2 controller中發送消息

/**
     * 測試Direct模式.
     *direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全比對的Queue中
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/direct")
    public ResponseEntity direct(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //routing key為一個句點号“. ”分隔的字元串(我們将被句點号“. ”分隔開的每一段獨立的字元串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
        //binding key與routing key一樣也是句點号“. ”分隔的字元串
        //binding key中可以存在兩種特殊字元“*”與“#”,用于做模糊比對,其中“*”用于比對一個單詞,“#”用于比對多個單詞(可以是零個)
        //   如果寫成   "DIRECT_ROUTING_KEY_.*"  則為topic模式 即模糊比對
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_.*", p, correlationData);
        return ResponseEntity.ok();
    }
           

7.7.3 receiver端接收消息

同7.4.3

這個時候AB隊列都能夠收到消息,且他們的監聽這也能監聽到同樣的消息

8.上代碼

這個源碼親測可用的哈,下載下傳後直接浏覽器調用controller對應的demo就能看驗證效果了

下載下傳位址:https://pan.baidu.com/s/1J3JQoyNv0_gfiy_p3hrZZg

rabbitMq 環境搭建 及 與 springboot 內建(完整提供了所有業務場景demo)1.基礎知識:2.六種隊列3.應用場景4.rabbitMq與Erlang版本對應關系5.windows搭建6.linux搭建7.與spingboot內建8.上代碼