文章目錄
- 寫在前面
- 1. 介紹
- 1.1 什麼是MQ
- 1.2 什麼是RabbitMQ
- 1.3 AMQP 協定
- 2. Go語言操作RabbitMQ
- 2.1 下載下傳
- 2.2 引入驅動
- 2.3 HelloWorld 模型
- 2.3.1 生産者
- 2.3.2 消費者
- 2.3.3 結果
- 2.4 Work Queues 模型
- 2.4.1 生産者
- 2.4.2 消費者
- 2.4.3 結果
- 2.5 Publish/Subscribe 模型
- 2.5.1 生産者
- 2.5.2 消費者
- 2.5.3 結果
- 2.6 Routing 模型
- 2.6.1 生産者
- 2.6.2 消費者
- 2.7 Topics 模型
- 2.7.1 生産者
- 2.7.2 消費者
- 2.8 RPC 模型
寫在前面
本文是使用Go語言實作各種RabbitMQ的中間件模型
1. 介紹
1.1 什麼是MQ
MQ
(Message Quene) : 翻譯為
消息隊列
,通過典型的
生産者
和
消費者
模型,生産者不斷向消息隊列中生産消息,消費者不斷的從隊列中擷取消息。因為消息的生産和消費都是異步的,而且隻關心消息的發送和接收,沒有業務邏輯的侵入,輕松的實作系統間解耦。
别名為
消息中間件
通過利用高效可靠的消息傳遞機制進行平台無關的資料交流,并基于資料通信來進行分布式系統的內建。
目前市面上有很多消息中間件:RabbitMQ,RocketMQ,Kafka等等…
1.2 什麼是RabbitMQ
RabbitMQ
是使用Erlang語言開發的開源消息隊列系統,基于
AMQP協定
來實作。AMQP的主要特征是面向消息、隊列、路由(包括點對點和釋出/訂閱)、可靠性、安全。AMQP協定更多用在企業系統内對資料一緻性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求可能比較低了。
1.3 AMQP 協定
AMQP(advanced message queuing protocol)
在2003年時被提出,最早用于解決金融領不同平台之間的消息傳遞互動問題。
顧名思義,AMQP是一種協定,更準确的說是一種binary wire-level protocol(連結協定)。這是其和JMS的本質差别,AMQP不從API層進行限定,而是直接定義網絡交換的資料格式。這使得實作了AMQP的provider天然性就是跨平台的。以下是AMQP協定模型:

2. Go語言操作RabbitMQ
2.1 下載下傳
下載下傳rabbitmq過程就省了,可以直接到官網網站下載下傳安裝,像安裝qq一樣。
2.2 引入驅動
- 驅動
go get github.com/streadway/amqp
- 連接配接
var MQ *amqp.Connection
// RabbitMQ 連結
func RabbitMQ(connString string) {
conn, err := amqp.Dial(connString)
if err != nil {
panic(err)
}
MQ = conn
}
2.3 HelloWorld 模型
P代表生産者,C代表消費者,紅色部分是隊列。
生産者生成消息到隊列中,消費者進行消費,直連單點模式。
2.3.1 生産者
- 聲明連接配接對象
var ProductMQ *amqp.Connection
- 聲明通道
ch, err := ProductMQ.Channel()
- 建立隊列
q, err := ch.QueueDeclare("hello", // 隊列名字
false, // 是否持久化,
false, // 不用的時候是否自動删除
false, // 用來指定是否獨占隊列
false, // no-wait
nil, // 其他參數
)
參數1(name):隊列名字
參數2(durable):持久化,隊列中所有的資料都是在記憶體中的,如果為true的話,這個通道關閉之後,資料就會存在磁盤中持久化,false的話就會丢棄
參數3(autoDelete):不需要用到隊列的時候,是否将消息删除
參數4(exclusive):是否獨占隊列,true的話,就是隻能是這個程序獨占這個隊列,其他都不能對這個隊列進行讀寫
參數5(noWait):是否阻塞
參數6(args):其他參數
- 釋出消息
body := "Hello World!"
err = ch.Publish(
"", // 交換機
q.Name, // 隊列名字
false, // 是否強制性
// 當mandatory标志位設定為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那麼會調用basic.return方法将消息傳回給生産者
// 當mandatory設定為false時,出現上述情形broker會直接将消息扔掉
false, //當immediate标志位設定為true時,如果exchange在将消息路由到queue(s)時發現對于的queue上麼有消費者,那麼這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或者多個)都沒有消費者時,該消息會通過basic.return方法返還給生産者
// 是否立刻
/**
概括來說,mandatory标志告訴伺服器至少将該消息route到一個隊列中,否則将消息返還給生産者;immediate标志告訴伺服器如果該消息關聯的queue上有消費者,則馬上将消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生産者,不用将消息入隊列等待消費者了。
**/
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body), // 發送的消息
})
參數1(exchange):交換機,後續會講到
參數2(route-key):隊列名字
參數3(mandatory):是否強制性,
當mandatory标志位設定為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那麼會調用
basic.return
方法将消息傳回給生産者
當mandatory設定為false時,出現上述情形
broker會直接将消息扔掉
參數4(immediate):是否立即處理
當immediate标志位設定為true時,如果exchange在将消息路由到queue(s)時發現對于的queue上麼有消費者,那麼這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或者多個)都沒有消費者時,該消息會通過basic.return方法返還給生産者
也就是說,mandatory 标志告訴伺服器至少将該消息route到一個隊列中,否則将消息返還給生産者;immediate标志告訴伺服器如果該消息關聯的queue上有消費者,則馬上将消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生産者,不用将消息入隊列等待消費者了。
參數5(msg):釋出的消息,ContentType是傳輸類型,Body是發送的消息。
2.3.2 消費者
- 聲明通道
ch, err := ConsumerMQ.Channel()
- 建立隊列
q, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil,
)
- 讀取隊列消息
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
由于消費者端需要一直監聽,是以我們要用一個for循環+channel去阻塞主程序,使得主程序一直處于監聽狀态。
forever := make(chan bool)
go func() {
for d := range msgs {
fmt.Printf("Received a message: %s", d.Body)
}
}()
fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
2.3.3 結果
- 生産者
- 消費者
2.4 Work Queues 模型
Work queues
,也被稱為(
Task queues
),任務模型。當消息處理比較耗時的時候,可能生産消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。
此時就可以使用work queues模型:讓多個消費者綁定到一個隊列,共同消費隊列中的消息。隊列中的消息一旦消費,就會消失,是以任務是不會被重複執行的。
2.4.1 生産者
生成10條消息到隊列中
body := "Hello World! "
for i := 0; i < 10; i++ {
msg := strconv.Itoa(i)
err = ch.Publish(
"", // 交換機
q.Name, // 隊列名字
false, // 是否強制性
false, // 是否立刻
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body+msg), // 發送的消息
})
}
2.4.2 消費者
建立兩個一樣的消費者進行監聽消費,與上面
2.3.2
的消費者保持一緻
2.4.3 結果
消費者1号
消費者2号
2.5 Publish/Subscribe 模型
fanout 扇出 也稱為廣播
在廣播模式下,消息發送流程如下:
- 可以有多個消費者
- 每個消費者有自己的queue(隊列)
- 每個隊列都要綁定到Exchange(交換機)
- 生産者發送的消息,隻能發送到交換機,交換機來決定要發給哪個隊列,生産者無法決定。
- 交換機把消息發送給綁定過的所有隊列
- 隊列的消費者都能拿到消息。實作一條消息被多個消費者消費
2.5.1 生産者
- 聲明交換機
_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
參數1(name):交換機名稱
參數2(kind):交換機類型
- 生産消息
_ = ch.Publish("logs", "", false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
2.5.2 消費者
- 聲明交換機
_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil, )
- 聲明隊列
q, _ := ch.QueueDeclare("", false, false, true, false, nil, )
- 綁定交換機
_ = ch.QueueBind(q.Name, "", "logs", false, nil, )
- 消費消息
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
2.5.3 結果
- 生産者
- 消費者
2.6 Routing 模型
- P:生産者,向Exchange發送消息,發送消息時,會指定一個routing key。
- X:Exchange(交換機),接收生産者的消息,然後把消息遞交給 與routing key完全比對的隊列
- C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
- C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
在
fanout
模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到
Direct
類型的Exchange。
在Direct模型下:
- 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個
(路由key)RoutingKey
- 消息的發送方在 向 Exchange發送消息時,也必須指定消息的
。RoutingKey
- Exchange不再把消息交給每一個綁定的隊列,而是根據消息的
進行判斷,隻有隊列的Routing Key
與消息的Routingkey
完全一緻,才會接收到消息Routing key
2.6.1 生産者
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
body := "Hello World "
_ = ch.Publish("logs_direct", "", false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
2.6.2 消費者
- 隻接受warn
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "warn", "logs_direct", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
- 隻接受info
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "info", "logs_direct", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
2.7 Topics 模型
Topic
類型的
Exchange
與
Direct
相比,都是可以根據
RoutingKey
把消息路由到不同的隊列。隻不過
Topic
類型
Exchange
可以讓隊列在綁定
Routing key
的時候使用通配符!
這種模型
Routingkey
一般都是由一個或多個單詞組成,多個單詞之間以”.”分割,例如:
item.insert
-
統配符
* 比對不多不少恰好1個詞
# 比對一個或多個詞
-
如:
fan.# 比對 fan.one.two 或者 fan.one 等
fan.* 隻能比對 fan.one
2.7.1 生産者
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
body := "Hello World "
_ = ch.Publish("logs_topic", "", false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
2.7.2 消費者
- 隻接受*.one
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "*.one", "logs_topic", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
- 隻接受*.fan
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "*.fan", "logs_topic", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )