1. RabbitMQ是什麼?
MQ
是什麼?隊列是什麼,
MQ
我們可以了解為消息隊列,隊列我們可以了解為管道。以管道的方式做消息傳遞。
生活場景:
1.其實我們在雙11的時候,當我們淩晨大量的秒殺和搶購商品,然後去結算的時候,就會發現,界面會提醒我們,讓我們稍等,以及一些友好的圖檔文字提醒。而不是像前幾年的時代,動不動就頁面卡死,報錯等來呈現給使用者。
在這業務場景中,我們就可以采用隊列的機制來處理,因為同時結算就隻能達到這麼多。
2.在我們平時的超市中購物也是一樣,當我們在結算的時候,并不會一窩蜂一樣湧入收銀台,而是排隊結算。這也是隊列機制。
2. RabbitMQ簡介
AMQP,即Advanced Message Queuing Protocol,進階消息隊列協定,是應用層協定的一個開放标準,為面向消息的中間件設計。消息中間件主要用于元件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、隊列、路由(包括點對點和釋出/訂閱)、可靠性、安全。 RabbitMQ是一個開源的AMQP實作,伺服器端用Erlang語言編寫,支援多種用戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支援AJAX。用于在分布式系統中存儲轉發消息,在易用性、擴充性、高可用性等方面表現不俗。 下面将重點介紹RabbitMQ中的一些基礎概念,了解了這些概念,是使用好RabbitMQ的基礎。
-
使用了一些機制來保證可靠性,比如持久化、傳輸确認、釋出确認。可靠性(Reliablity):
-
在消息進入隊列之前,通過Exchange來路由消息。對于典型的路由功能,Rabbit已經提供了一些内置的Exchange來實作。針對更複雜的路由功能,可以将多個Exchange綁定在一起,也通過插件機制實作自己的Exchange。靈活的路由(Flexible Routing):
-
多個RabbitMQ伺服器可以組成一個叢集,形成一個邏輯Broker。消息叢集(Clustering):
-
隊列可以在叢集中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。高可用(Highly Avaliable Queues):
-
支援多種消息隊列協定,如STOMP、MQTT等。多種協定(Multi-protocol):
-
幾乎支援所有常用語言,比如Java、.NET、Ruby等。多種語言用戶端(Many Clients):
-
提供了易用的使用者界面,使得使用者可以監控和管理消息Broker的許多方面。管理界面(Management UI):
-
如果消息異常,RabbitMQ提供了消息的跟蹤機制,使用者可以找出發生了什麼。跟蹤機制(Tracing):
-
提供了許多插件,來從多方面進行擴充,也可以編輯自己的插件。插件機制(Plugin System):
2.1 定義和特征
- RbbitMQ是面向消息的中間件,用于元件之間的解耦,主要展現在消息的發送者和消費者之間無強依賴關系
- RabbitMQ特點:高可用,可擴充,多語言用戶端,管理界面等;
- 主要使用場景:流量削峰,異步處理,應用解耦等;
2.2 安裝
- ubuntu 的參照: https://gitee.com/zhangyafeii/rabbitmq
- 以下為centos7的安裝過程
安裝erlang
# centos7
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.17/rabbitmq-server-3.7.17-1.el7.noarch.rpm
yum install epel-release
yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl
rpm -ivh esl-erlang_22.0.7-1~centos~7_amd64.rpm
安裝rabbitmq
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.0/rabbitmq-server-3.8.0-1.el7.noarch.rpm
yum -y install socat
rpm -ivh rabbitmq-server-3.8.0-1.el7.noarch.rpm
啟動
chkconfig rabbitmq-server on # 開機啟動
systemctl start rabbitmq-server.service # 啟動
systemctl stop rabbitmq-server.service # 停止
systemctl restart rabbitmq-server.service # 重新開機
rabbitmqctl status # 檢視狀态
rabbitmq-plugins enable rabbitmq_management # 啟動Web管理器
修改配置
vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.0/ebin/rabbit.app
将:{loopback_users, [<<”guest”>>]}, 改為:{loopback_users, []}, 原因:rabbitmq從3.3.0開始禁止使用guest/guest權限通過 除localhost外的通路
systemctl restart rabbitmq-server.service # 重新開機服務
3. RabbitMQ核心概念
-
辨別消息隊列伺服器實體.Broker:
-
虛拟主機。辨別一批交換機、消息隊列和相關對象。虛拟主機是共享相同的身份認證和加密環境的獨立伺服器域。每個vhost本質上就是一個mini版的RabbitMQ伺服器,擁有自己的隊列、交換器、綁定和權限機制。vhost是AMQP概念的基礎,必須在連結時指定,RabbitMQ預設的vhost是 /。Virtual Host:
-
交換器,用來接收生産者發送的消息并将這些消息路由給伺服器中的隊列。Exchange:
-
消息隊列,用來儲存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裡面,等待消費者連接配接到這個隊列将其取走。Queue:
-
綁定,用于消息隊列和交換機之間的關聯。一個綁定就是基于路由鍵将交換機和消息隊列連接配接起來的路由規則,是以可以将交換器了解成一個由綁定構成的路由表。Banding:
-
信道,多路複用連接配接中的一條獨立的雙向資料流通道。新到是建立在真實的TCP連接配接内地虛拟連結,AMQP指令都是通過新到發出去的,不管是釋出消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于作業系統來說,建立和銷毀TCP都是非常昂貴的開銷,是以引入了信道的概念,以複用一條TCP連接配接。Channel:
-
網絡連接配接,比如一個TCP連接配接。Connection:
-
消息的生産者,也是一個向交換器釋出消息的用戶端應用程式。Publisher:
-
消息的消費者,表示一個從一個消息隊列中取得消息的用戶端應用程式。Consumer:
-
消息,消息是不具名的,它是由消息頭和消息體組成。消息體是不透明的,而消息頭則是由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(優先級)、delivery-mode(消息可能需要持久性存儲[消息的路由模式])等。Message:
AMQP消息路由
AMQP中消息的路由過程和JMS存在一些差别。AMQP中增加了Exchange和Binging的角色。生産者把消息釋出到Exchange上,消息最終到達隊列并被消費者接收,而Binding決定交換器的消息應該發送到哪個隊列。
-
Exchange類型
Exchange分發消息時,根據類型的不同分發政策有差別。目前共四種類型:direct、fanout、topic、headers(headers比對AMQP消息的header而不是路由鍵(Routing-key),此外headers交換器和direct交換器完全一緻,但是性能差了很多,目前幾乎用不到了。是以直接看另外三種類型。)。
direct
消息中的路由鍵(routing key)如果和Binding中的binding key一緻,交換器就将消息發到對應的隊列中。路由鍵與隊列名完全比對。
fanout
每個發到fanout類型交換器的消息都會分到所有綁定的隊列上去。fanout交換器不處理該路由鍵,隻是簡單的将隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每台子網内的主機都獲得了一份複制的消息。fanout類型轉發消息是最快的。
topic
topic交換器通過模式比對配置設定消息的路由鍵屬性,将路由鍵和某個模式進行比對,此時隊列需要綁定到一個模式上。它将路由鍵(routing-key)和綁定鍵(bingding-key)的字元串切分成單詞,這些單詞之間用點隔開。它同樣也會識别兩個通配符:"#"和"*"。#比對0個或多個單詞,比對不多不少一個單詞。
4. RabbitMQ的運作模式
- 簡單模式
建立執行個體
package RabbitMQ
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
//url格式 amqp://賬号:密碼@rabbitmq伺服器位址:端口号/vhost
const MQURL = "amqp://zhangyafei:[email protected]:5672/imooc"
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
// 隊列名稱
QueueName string
//交換機
Exchange string
//key
Key string
// 連接配接資訊
Mqurl string
}
//建立結構體執行個體
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
var err error
// 建立rabbitmq連接配接
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "建立連接配接錯誤!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "擷取channel失敗!")
return rabbitmq
}
//斷開channel和connection
func (r *RabbitMQ) Destory() {
r.channel.Close()
r.conn.Close()
}
//錯誤處理函數
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
}
}
// 簡單模式step1: 1.建立簡單模式下的rabbitmq執行個體
func NewRabbitMQSimple(queueName string) *RabbitMQ {
return NewRabbitMQ(queueName, "", "")
}
// 簡單模式step2: 2.簡單模式下生産
func (r *RabbitMQ) PublishSimple(message string) {
// 1. 申請隊列,如果隊列不存在則自動建立,如果存在則跳過建立
// 保證隊列存在,消息能發送到隊列中
_, err := r.channel.QueueDeclare(
r.QueueName,
// 是否持久化
false,
// 是否為自動删除
false,
// 是否具有排他性
false,
// 是否阻塞
false,
// 額外屬性
nil,
)
if err != nil {
fmt.Println(err)
}
// 2. 發送消息到隊列中
r.channel.Publish(
r.Exchange,
r.QueueName,
// 如果為true,根據exchange類型和routekey規則,如果無法找到符合條件的隊列,則會把發送的消息傳回給發送者
false,
// 如果為true,當exchange發送消息到隊列後發現隊列上沒有綁定消費者,則會把消息發還給發送者
false,
amqp.Publishing{ContentType: "text/plain", Body: []byte(message)},
)
}
// 簡單模式step3: 3.簡單模式下消費
func (r *RabbitMQ) ConsumeSimple() {
// 1. 申請隊列,如果隊列不存在則自動建立,如果存在則跳過建立
// 保證隊列存在,消息能發送到隊列中
_, err := r.channel.QueueDeclare(
r.QueueName,
// 是否持久化
false,
// 是否為自動删除
false,
// 是否具有排他性
false,
// 是否阻塞
false,
// 額外屬性
nil,
)
if err != nil {
fmt.Println(err)
}
// 2. 接收消息
msgs, err := r.channel.Consume(
r.QueueName,
// 用來區分多個消費者
"",
// 是否自動應答
true,
// 是否具有排他性
false,
// 如果為true,表示不能将同一個conn中的消息發送給這個conn中的消費者
false,
// 隊列是否阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
// 3. 啟用協程處理消息
go func() {
for d := range msgs {
// 實作我們要處理的邏輯函數
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf("[*] waiting for messages, to exit process CTRL+C")
<-forever
}
建立執行個體
生産者
package main
import (
"RabbitMQ/RabbitMQ/RabbitMQ"
"fmt"
)
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple")
rabbitmq.PublishSimple("hello imooc!")
fmt.Println("發送成功")
}
mainSimplePublish.go
消費者
package main
import (
"RabbitMQ/RabbitMQ/RabbitMQ"
)
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple")
rabbitmq.ConsumeSimple()
}
mainSimpleReceive.go
- 工作模式
生産者
package main
import (
"RabbitMQ/RabbitMQ/RabbitMQ"
"fmt"
"strconv"
"time"
)
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple")
for i := 0; i<= 100; i++ {
rabbitmq.PublishSimple("hello imooc!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
mainWorkPublish.go
package main
import (
"RabbitMQ/RabbitMQ/RabbitMQ"
)
func main() {
rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple")
rabbitmq.ConsumeSimple()
}
mainWorkReceive.go
- 訂閱模式
建立執行個體
// 訂閱模式下建立RabbitMQ執行個體
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
rabbitmq := NewRabbitMQ("", exchangeName, "")
var err error
// 擷取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
// 擷取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
// 訂閱模式下生産
func (r *RabbitMQ) PublishPub(message string) {
// 1. 嘗試建立交換機
err := r.channel.ExchangeDeclare(
r.Exchange,
"fanout", // 廣播類型
true, // 持久化
false, // 是否删除
false, // true表示這個exchange不可以被client用來推送消息的,僅用來進行exchange和exchange之間的綁定
false,
nil,
)
r.failOnErr(err, "Failed to declare a exchange")
// 2. 發送消息
err = r.channel.Publish(
r.Exchange,
"",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
// 訂閱模式消費端的代碼
func (r *RabbitMQ) ReceiveSub() {
// 1. 嘗試建立交換機
err := r.channel.ExchangeDeclare(
r.Exchange,
"fanout", // 廣播類型
true, // 持久化
false, // 是否删除
false, // true表示這個exchange不可以被client用來推送消息的,僅用來進行exchange和exchange之間的綁定
false,
nil,
)
r.failOnErr(err, "Failed to declare a exchange")
// 2. 試探性建立隊列
q, err := r.channel.QueueDeclare(
"", // 随機生産隊列名稱
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "failed to declare a queue")
// 綁定隊列到 exchange中
err = r.channel.QueueBind(
q.Name,
"", // 在訂閱模式下,這裡的key為空
r.Exchange,
false,
nil)
// 消費消息
messages, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
// 3. 啟用協程處理消息
go func() {
for d := range messages {
// 實作我們要處理的邏輯函數
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf("[*] waiting for messages, to exit process CTRL+C")
<-forever
}
生産者
package main
import (
"RabbitMQ/RabbitMQ/RabbitMQ"
"fmt"
"strconv"
"time"
)
func main() {
rabbitmq := RabbitMQ.NewRabbitMQPubSub("NewProduct")
for i := 0; i <= 100; i++ {
rabbitmq.PublishPub("訂閱模式生産第" + strconv.Itoa(i) + "條資料")
fmt.Println("訂閱模式生産第" + strconv.Itoa(i) + "條資料")
time.Sleep(1 * time.Second)
}
}
mainPub.go
消費者
package main
import (
"RabbitMQ/RabbitMQ/RabbitMQ"
)
func main() {
rabbitmq := RabbitMQ.NewRabbitMQPubSub("NewProduct")
rabbitmq.ReceiveSub()
}
mainSub.go
- 路由模式
建立執行個體、
// 路由模式下建立RabbitMQ執行個體
func NewRabbitMQRouting(exchangeName string, routingkey string) *RabbitMQ {
// 建立RabbitMQ執行個體
rabbitmq := NewRabbitMQ("", exchangeName, routingkey)
var err error
// 擷取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
// 擷取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
// 路由模式發送消息
func (r *RabbitMQ) PublishRouting(message string) {
// 1. 嘗試建立交換機
err := r.channel.ExchangeDeclare(
r.Exchange,
"direct", // 定向類型
true, // 持久化
false, // 是否删除
false, // true表示這個exchange不可以被client用來推送消息的,僅用來進行exchange和exchange之間的綁定
false,
nil,
)
r.failOnErr(err, "Failed to declare a exchange")
// 2. 發送消息
err = r.channel.Publish(
r.Exchange,
r.Key,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
// 路由模式消費端的代碼
func (r *RabbitMQ) ReceiveRouting() {
// 1. 試探性的建立交換機
err := r.channel.ExchangeDeclare(
r.Exchange,
"direct", // 廣播類型
true, // 持久化
false, // 是否删除
false, // true表示這個exchange不可以被client用來推送消息的,僅用來進行exchange和exchange之間的綁定
false,
nil,
)
r.failOnErr(err, "Failed to declare a exchange")
// 2. 試探性建立隊列
q, err := r.channel.QueueDeclare(
"", // 随機生産隊列名稱
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "failed to declare a queue")
// 綁定隊列到 exchange中
err = r.channel.QueueBind(
q.Name,
r.Key, // 在訂閱模式下,這裡的key為空
r.Exchange,
false,
nil)
// 消費消息
messages, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
// 3. 啟用協程處理消息
go func() {
for d := range messages {
// 實作我們要處理的邏輯函數
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf("[*] waiting for messages, to exit process CTRL+C")
<-forever
}
package main
import (
"RabbitMQ/RabbitMQ/RabbitMQ"
"fmt"
"strconv"
"time"
)
func main() {
rabbit_imooc_one := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one")
rabbit_imooc_two := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_two")
for i := 0; i <= 100; i++ {
rabbit_imooc_one.PublishRouting("Hello imooc one!" + strconv.Itoa(i))
rabbit_imooc_two.PublishRouting("Hello imooc two!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
PublishRouting.go
package main
import "RabbitMQ/RabbitMQ/RabbitMQ"
func main() {
rabbitmq_imooc_one := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one")
rabbitmq_imooc_one.ReceiveRouting()
}
ReceiveRouting1.go
package main
import "RabbitMQ/RabbitMQ/RabbitMQ"
func main() {
rabbitmq_imooc_two := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_two")
rabbitmq_imooc_two.ReceiveRouting()
}
ReceiveRouting2.go
作者:張亞飛
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明。