天天看點

【RabbitMQ】Go語言實作六種消息中間件模型

文章目錄

  • ​​寫在前面​​
  • ​​1. 介紹​​
  • ​​1.1 什麼是MQ​​
  • ​​1.2 什麼是RabbitMQ​​
  • ​​1.3 AMQP 協定​​
  • ​​2. Go語言操作RabbitMQ​​
  • ​​2.1 下載下傳​​
  • ​​2.2 引入驅動​​
  • ​​2.3 HelloWorld 模型​​
  • ​​2.3.1 生産者​​
  • ​​2.3.2 消費者​​
  • ​​2.3.3 結果​​
  • ​​2.4 Work Queues 模型​​
  • ​​2.4.1 生産者​​
  • ​​2.4.2 消費者​​
  • ​​2.4.3 結果​​
  • ​​2.5 Publish/Subscribe 模型​​
  • ​​2.5.1 生産者​​
  • ​​2.5.2 消費者​​
  • ​​2.5.3 結果​​
  • ​​2.6 Routing 模型​​
  • ​​2.6.1 生産者​​
  • ​​2.6.2 消費者​​
  • ​​2.7 Topics 模型​​
  • ​​2.7.1 生産者​​
  • ​​2.7.2 消費者​​
  • ​​2.8 RPC 模型​​

寫在前面

本文是使用Go語言實作各種RabbitMQ的中間件模型

1. 介紹

1.1 什麼是MQ

​MQ​

​​(Message Quene) : 翻譯為 ​

​消息隊列​

​​,通過典型的 ​

​生産者​

​​和​

​消費者​

​模型,生産者不斷向消息隊列中生産消息,消費者不斷的從隊列中擷取消息。因為消息的生産和消費都是異步的,而且隻關心消息的發送和接收,沒有業務邏輯的侵入,輕松的實作系統間解耦。

别名為 ​

​消息中間件​

​ 通過利用高效可靠的消息傳遞機制進行平台無關的資料交流,并基于資料通信來進行分布式系統的內建。

目前市面上有很多消息中間件:RabbitMQ,RocketMQ,Kafka等等…

1.2 什麼是RabbitMQ

​RabbitMQ​

​​是使用Erlang語言開發的開源消息隊列系統,基于​

​AMQP協定​

​來實作。AMQP的主要特征是面向消息、隊列、路由(包括點對點和釋出/訂閱)、可靠性、安全。AMQP協定更多用在企業系統内對資料一緻性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求可能比較低了。

1.3 AMQP 協定

​AMQP(advanced message queuing protocol)​

​ 在2003年時被提出,最早用于解決金融領不同平台之間的消息傳遞互動問題。

顧名思義,AMQP是一種協定,更準确的說是一種binary wire-level protocol(連結協定)。這是其和JMS的本質差别,AMQP不從API層進行限定,而是直接定義網絡交換的資料格式。這使得實作了AMQP的provider天然性就是跨平台的。以下是AMQP協定模型:

【RabbitMQ】Go語言實作六種消息中間件模型

2. Go語言操作RabbitMQ

2.1 下載下傳

下載下傳rabbitmq過程就省了,可以直接到官網網站下載下傳安裝,像安裝qq一樣。

2.2 引入驅動

  • 驅動

​go get github.com/streadway/amqp​

  • 連接配接
var MQ *amqp.Connection

// RabbitMQ 連結
func RabbitMQ(connString string) {
    conn, err := amqp.Dial(connString)
    if err != nil {
        panic(err)
    }
    MQ = conn
}      

2.3 HelloWorld 模型

【RabbitMQ】Go語言實作六種消息中間件模型

P代表生産者,C代表消費者,紅色部分是隊列。

生産者生成消息到隊列中,消費者進行消費,直連單點模式。

2.3.1 生産者

  • 聲明連接配接對象
var ProductMQ *amqp.Connection      
  • 聲明通道
ch, err := ProductMQ.Channel()      
  • 建立隊列
q, err := ch.QueueDeclare("hello",  // 隊列名字
    false,      // 是否持久化,
    false,   // 不用的時候是否自動删除
    false,      // 用來指定是否獨占隊列
    false,      // no-wait
    nil,            // 其他參數
)      

參數1(name):隊列名字

參數2(durable):持久化,隊列中所有的資料都是在記憶體中的,如果為true的話,這個通道關閉之後,資料就會存在磁盤中持久化,false的話就會丢棄

參數3(autoDelete):不需要用到隊列的時候,是否将消息删除

參數4(exclusive):是否獨占隊列,true的話,就是隻能是這個程序獨占這個隊列,其他都不能對這個隊列進行讀寫

參數5(noWait):是否阻塞

參數6(args):其他參數

  • 釋出消息
body := "Hello World!"

err = ch.Publish(
    "",     // 交換機
    q.Name, // 隊列名字
    false,  // 是否強制性
    // 當mandatory标志位設定為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那麼會調用basic.return方法将消息傳回給生産者
    // 當mandatory設定為false時,出現上述情形broker會直接将消息扔掉
    false, //當immediate标志位設定為true時,如果exchange在将消息路由到queue(s)時發現對于的queue上麼有消費者,那麼這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或者多個)都沒有消費者時,該消息會通過basic.return方法返還給生産者
    // 是否立刻
    /**
    概括來說,mandatory标志告訴伺服器至少将該消息route到一個隊列中,否則将消息返還給生産者;immediate标志告訴伺服器如果該消息關聯的queue上有消費者,則馬上将消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生産者,不用将消息入隊列等待消費者了。
    **/
    amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(body), // 發送的消息
    })      

參數1(exchange):交換機,後續會講到

參數2(route-key):隊列名字

參數3(mandatory):是否強制性,

當mandatory标志位設定為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那麼會調用​

​basic.return​

​方法将消息傳回給生産者

當mandatory設定為false時,出現上述情形​

​broker會直接将消息扔掉​

參數4(immediate):是否立即處理

當immediate标志位設定為true時,如果exchange在将消息路由到queue(s)時發現對于的queue上麼有消費者,那麼這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或者多個)都沒有消費者時,該消息會通過basic.return方法返還給生産者

也就是說,mandatory 标志告訴伺服器至少将該消息route到一個隊列中,否則将消息返還給生産者;immediate标志告訴伺服器如果該消息關聯的queue上有消費者,則馬上将消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生産者,不用将消息入隊列等待消費者了。

參數5(msg):釋出的消息,ContentType是傳輸類型,Body是發送的消息。

2.3.2 消費者

  • 聲明通道
ch, err := ConsumerMQ.Channel()      
  • 建立隊列
q, err := ch.QueueDeclare(
    "hello",
    false,
    false,
    false,
    false,
    nil,
)      
  • 讀取隊列消息
msgs, err := ch.Consume(
    q.Name,
    "",    
    true,  
    false,  
    false,  
    false, 
    nil,    
)      

由于消費者端需要一直監聽,是以我們要用一個for循環+channel去阻塞主程序,使得主程序一直處于監聽狀态。

forever := make(chan bool)
go func() {
    for d := range msgs {
        fmt.Printf("Received a message: %s", d.Body)
    }
}()
fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever      

2.3.3 結果

  • 生産者
【RabbitMQ】Go語言實作六種消息中間件模型
  • 消費者
【RabbitMQ】Go語言實作六種消息中間件模型

2.4 Work Queues 模型

【RabbitMQ】Go語言實作六種消息中間件模型

​Work queues​

​​,也被稱為(​

​Task queues​

​),任務模型。當消息處理比較耗時的時候,可能生産消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。

此時就可以使用work queues模型:讓多個消費者綁定到一個隊列,共同消費隊列中的消息。隊列中的消息一旦消費,就會消失,是以任務是不會被重複執行的。

2.4.1 生産者

生成10條消息到隊列中

body := "Hello World!  "
for i := 0; i < 10; i++ {
    msg := strconv.Itoa(i)
    err = ch.Publish(
        "",     // 交換機
        q.Name, // 隊列名字
        false,  // 是否強制性
        false,  // 是否立刻
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body+msg), // 發送的消息
        })
}      

2.4.2 消費者

建立兩個一樣的消費者進行監聽消費,與上面​

​2.3.2​

​的消費者保持一緻

【RabbitMQ】Go語言實作六種消息中間件模型

2.4.3 結果

消費者1号

【RabbitMQ】Go語言實作六種消息中間件模型

消費者2号

【RabbitMQ】Go語言實作六種消息中間件模型

2.5 Publish/Subscribe 模型

fanout 扇出 也稱為廣播
【RabbitMQ】Go語言實作六種消息中間件模型

在廣播模式下,消息發送流程如下:

  • 可以有多個消費者
  • 每個消費者有自己的queue(隊列)
  • 每個隊列都要綁定到Exchange(交換機)
  • 生産者發送的消息,隻能發送到交換機,交換機來決定要發給哪個隊列,生産者無法決定。
  • 交換機把消息發送給綁定過的所有隊列
  • 隊列的消費者都能拿到消息。實作一條消息被多個消費者消費

2.5.1 生産者

  • 聲明交換機
_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)      

參數1(name):交換機名稱

參數2(kind):交換機類型

  • 生産消息
_ = ch.Publish("logs", "", false, false,
    amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(body),
    })      

2.5.2 消費者

  • 聲明交換機
_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil, )      
  • 聲明隊列
q, _ := ch.QueueDeclare("", false, false, true, false, nil, )      
  • 綁定交換機
_ = ch.QueueBind(q.Name, "", "logs", false, nil, )      
  • 消費消息
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )      

2.5.3 結果

  • 生産者
【RabbitMQ】Go語言實作六種消息中間件模型
  • 消費者
【RabbitMQ】Go語言實作六種消息中間件模型

2.6 Routing 模型

【RabbitMQ】Go語言實作六種消息中間件模型
  • P:生産者,向Exchange發送消息,發送消息時,會指定一個routing key。
  • X:Exchange(交換機),接收生産者的消息,然後把消息遞交給 與routing key完全比對的隊列
  • C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
  • C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息

在​

​fanout​

​​模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到​

​Direct​

​類型的Exchange。

在Direct模型下:

  • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個​

    ​RoutingKey​

    ​(路由key)
  • 消息的發送方在 向 Exchange發送消息時,也必須指定消息的​

    ​RoutingKey​

    ​。
  • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的​

    ​Routing Key​

    ​​進行判斷,隻有隊列的​

    ​Routingkey​

    ​​與消息的​

    ​Routing key​

    ​完全一緻,才會接收到消息

2.6.1 生産者

_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
body := "Hello World "
_ = ch.Publish("logs_direct", "", false, false,
    amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(body),
    })      

2.6.2 消費者

  • 隻接受warn
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "warn", "logs_direct", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )      
  • 隻接受info
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "info", "logs_direct", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )      

2.7 Topics 模型

【RabbitMQ】Go語言實作六種消息中間件模型

​Topic​

​​類型的​

​Exchange​

​​與​

​Direct​

​​相比,都是可以根據​

​RoutingKey​

​​把消息路由到不同的隊列。隻不過​

​Topic​

​​類型​

​Exchange​

​​可以讓隊列在綁定​

​Routing key​

​ 的時候使用通配符!

這種模型​

​Routingkey​

​​ 一般都是由一個或多個單詞組成,多個單詞之間以”.”分割,例如: ​

​item.insert​

  • 統配符

    * 比對不多不少恰好1個詞

    # 比對一個或多個詞

  • 如:

    fan.# 比對 fan.one.two 或者 fan.one 等

    fan.* 隻能比對 fan.one

2.7.1 生産者

_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
body := "Hello World "
_ = ch.Publish("logs_topic", "", false, false,
    amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(body),
    })      

2.7.2 消費者

  • 隻接受*.one
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "*.one", "logs_topic", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )      
  • 隻接受*.fan
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "*.fan", "logs_topic", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )      

2.8 RPC 模型

【RabbitMQ】Go語言實作六種消息中間件模型

繼續閱讀