RabbitMQ是基于AMQP協定的一款消息管理系統,是部署最廣泛的開源消息中間件,是最受歡迎的開源消息中間件之一:
- 官網: http://www.rabbitmq.com/
- 官方教程: http://www.rabbitmq.com/getstarted.html
- AMQP 協定:
AMQP(advanced message queuing protocol)在2003年時被提出,最早用于解決金融領不同平台之間的消息傳遞互動問題。顧名思義,AMQP是一種協定,更準确的說是一種binary wire-level protocol(連結協定)。這是其和JMS的本質差别,AMQP不從API層進行限定,而是直接定義網絡交換的資料格式。這使得實作了AMQP的provider天然性就是跨平台的。以下是AMQP協定模型;
不同MQ的差別:
ActiveMQ:ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。它是一個完全支援JMS規範的的消息中間件。豐富的API,多種叢集架構模式讓ActiveMQ在業界成為老牌的消息中間件,在中小型企業頗受歡迎!
Kafka:Kafka是LinkedIn開源的分布式釋出-訂閱消息系統,目前歸屬于Apache頂級項目。Kafka主要特點是基于Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用于日志收集和傳輸。0.8版本開始支援複制,不支援事務,對消息的重複、丢失、錯誤沒有嚴格要求,适合産生大量資料的網際網路服務的資料收集業務。
RocketMQ:RocketMQ是阿裡開源的消息中間件,它是純Java開發,具有高吞吐量、高可用性、适合大規模分布式系統應用的特點。RocketMQ思路起源于Kafka,但并不是Kafka的一個Copy,它對消息的可靠傳輸及事務性做了優化,目前在阿裡集團被廣泛應用于交易、充值、流計算、消息推送、日志流式處理、binglog分發等場景。
RabbitMQ:RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基于AMQP協定來實作。AMQP的主要特征是面向消息、隊列、路由(包括點對點和釋出/訂閱)、可靠性、安全。AMQP協定更多用在企業系統内對資料一緻性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。
總結:RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的處理,一般應用在大資料日志處理或對實時性(少量延遲),可靠性(少量丢資料)要求稍低的場景使用,比如ELK日志收集。
2. Linux中下載下傳和安裝
2.1 下載下傳位址
官網下載下傳位址:
https://www.rabbitmq.com/download.html
下載下傳安裝壓縮包:
百度雲下載下傳連結:
https://pan.baidu.com/s/17hZab0ZKZLtoi-6q14R27Q,提取碼:
dwti
注意:這裡的安裝包是centos7安裝的包!
2.2 安裝步驟
2.2.1 将rabbitmq安裝包上傳到linux系統中
這裡我使用XFTP工具上傳,使用XShell 連接配接雲伺服器,上傳位置為:=/usr/local/src/software/rabbitMQ/,如果對應的檔案夾不存在,自己建立一下!
2.2.2 安裝Erlang依賴包
進入目标檔案夾下:
cd /usr/local/src/software/rabbitMQ/
,并檢視安裝包資訊:
ls -l
執行安裝Erlang依賴包:
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
執行安裝Erlang記憶體管理的依賴包:
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
2.2.3 安裝RabbitMQ安裝包(需要聯網)
執行安裝rabbitmq:
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm
2.2.4 複制配置檔案
執行指令:cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config,該指令是将rabbitmq.config.example 配置檔案複制到/etc/rabbitmq/ 下,并該名為rabbitmq.config!
注意:如果不知道rabbitmq.config.example 配置檔案的具體位置,可以使用指令:find / -name rabbitmq.config.example去查找其位置!
因為是從
/目錄開始查找可能會延遲幾秒!
2.2.5 檢視/修改配置檔案
檢視配置檔案位置:
ll /etc/rabbitmq/
修改配置檔案:
vim /etc/rabbitmq/rabbitmq.config
開放來賓賬戶權限loopback_users:
把注釋開放,注意尾部的逗号也去掉:
3. 相關指令
3.2 開啟web界面管理工具
指令:
rabbitmq-plugins enable rabbitmq_management
3.2 啟動、停止指令
服務啟動:systemctl start rabbitmq-server
服務重新開機:systemctl restart rabbitmq-server
服務關閉:systemctl stop rabbitmq-server
檢視服務運作狀态:systemctl status rabbitmq-server
4. RabbitMQ可視化管理頁面
4.1 開放端口
如果是本地虛拟機開放防火牆對應端口,如果是雲伺服器開放安全組端口:
4.2 通路管理頁面
浏覽器通路:
http://伺服器IP位址:15672使用者名和密碼預設是我們在配置檔案中開啟的來賓賬戶:guest/guest,輸入賬号密碼後進入管理頁:
有上圖可知,為了之後使用代碼整合RabbitMq,我們還需要在安全組中開放另外兩個端口:
4.3 管理界面導航介紹
4.4 Admin使用者和虛拟主機管理
4.4.1 添加使用者
上面的Tags選項,其實是指定使用者的角色,可選的有以下幾個:
超級管理者(administrator)
可登陸管理控制台,可檢視所有的資訊,并且可以對使用者,政策(policy)進行操作。
監控者(monitoring)
可登陸管理控制台,同時可以檢視rabbitmq節點的相關資訊(程序數,記憶體使用情況,磁盤使用情況等)
政策制定者(policymaker)
可登陸管理控制台, 同時可以對policy進行管理。但無法檢視節點的相關資訊(上圖紅框辨別的部分)。
普通管理者(management)
僅可登陸管理控制台,無法看到節點資訊,也無法對政策進行管理。
其他
無法登陸管理控制台,通常就是普通的生産者和消費者。
4.4. 2 添加虛拟主機
點選給虛拟主機leyou 設定通路使用者以及其權限:
接下來登出,并以leyou使用者登入即可!
5. Java API 操作RabbitMQ
5 種消息模型:簡單消息模型:1個生産者 + 1個隊列 + 1個消費者;
工作隊列消息模型:1個生産者 + 1個隊列 + 多個消費者,一條消息隻能被消費一次;
訂閱消息模型之 fanout:1個生産者 + 1個交換機 + 多個隊列 + 多個消費者,一條消息可以被多個消費者消費;
訂閱消息模型之durect/router:1個生産者 + 1個交換機 + 多個隊列 + 多個消費者 ,routingKey ,一條消息發送給符合 routingKey 的隊列;
訂閱消息模型之topic:通配符,#:比對一個或者多個 *:一個詞;
官網教程位址:
https://www.rabbitmq.com/getstarted.html 參考傳智播客的Demo 案例:代碼案例,基本上是參考官網的quickStart的,可以快速上手!
6. Spring Boot 整合RabbiMQ
Spring AMQP 官網位址: https://spring.io/projects/spring-amqp SpringBoot整合官方文檔位址: https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-messaging
6.1 pom.xml 引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.2 application.yml 中進行配置
# Spring 相關配置
spring:
# RabbitMQ 相關配置
rabbitmq:
# 主機ip位址
host: 8.xxx.xx.xx6
# 授權使用者名
username: leyou
# 授權使用者密碼
password: leyou
# 授權虛拟主機名稱
virtual-host: leyou
6.3 RabbitMQ 監聽器(消息消費者)元件注入IOC
/**
* RabbitMQ 監聽器元件:相當于消費者
*/
@Component
public class Listener {
@RabbitListener(bindings = @QueueBinding(
// value = "spring.test.queue" 隊列名稱
// durable = "true" 隊列消息持久化
value = @Queue(value = "spring.test.queue", durable = "true"),
// value = "spring.test.exchange" 交換機名稱
// durable = "true" 交換機消息持久化
// ignoreDeclarationExceptions = "true" 忽略聲明異常
// type = ExchangeTypes.TOPIC 交換機類型:TOPIC
exchange = @Exchange(
value = "spring.test.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
// durable = "true"
),
// 消息接收的路由表達式
key = {"#.#"}))
public void listen(String msg) {
System.out.println("接收到消息:" + msg);
}
}
6.4 Test 測試模拟消息生産者
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqDemo {
// 注入AmqpTemplate模闆
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testSend() throws InterruptedException {
String msg = "hello, Spring boot amqp";
// 通過AmqpTemplate模闆發送消息
// 參數1:交換機
// 參數2:a.b是否符合監聽元件(消息消費者)中定義的消息接收的路由表達式
// 參數3:消息内容
this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
// 等待10秒後再結束
Thread.sleep(10000);
}
}
6.5 執行測試
可以看到消息生産後,被消息監聽器(消費者)接收成功!