天天看點

Golang之RabbitMQ練習一

從現在開始開始練習消息隊列RabbitMQ

自己練習消息隊列的時候,可以使用真機安裝RabbitMQ,也可以使用容器安裝RabbitMQ,這裡使用docker容器。

環境

docker 容器中的RabbitMQ,端口映射5673 , 預設的端口為5672

一:程式設計第一步,hello world

該案例是一個簡單的隻有兩端:發送-接收

概覽

發送方步驟有:

  1. 連接配接隊列伺服器Dial(“amqp://guest:[email protected]:5673”)
  2. 隊列聲明QueueDeclare
  3. 消息發送Publish

接收方步驟有:

  1. 連接配接隊列伺服器Dial(“amqp://guest:[email protected]:5673”)
  2. 隊列聲明QueueDeclare
  3. 消息消費Comsume

本階段兩端的配置隻有第三個步驟不一樣,一個發送一個接收。

1.1:發送消息

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	//guest是預設的賬号,一般隻有localhost、127.0.0.1可以使用該賬号,其它ip使用會報錯
	conn,err := amqp.Dial("amqp://guest:[email protected]:5673/")
	failOnError(err,"連接配接隊列服務失敗!")
	ch,err := conn.Channel()
	failOnError(err,"擷取channel失敗!")
	defer ch.Close()
	//聲明隊列
	//hello發送方隊列的名稱:發送方接收方可以不一緻,隻是用來存儲目前發送方的資料的
	queue,err := ch.QueueDeclare("hello",
		false,false,false,false,nil)
	//這裡第一個參數是交換類型,這裡為空表示使用預設的交換exchange
	//第二個參數queue.Name是路由鍵的名稱:發送方接收方要一緻,消息将會發送到路由鍵對應的消息隊列,
	//接收方需要有與這一樣的路由鍵,或者接收方consume第一個隊列參數置為"",否則取不到資料;
	//接收方的ch.QueueDeclare聲明的隊列名稱需要跟發送方路由鍵名稱一緻,表示為建立該路由鍵對應的隊列。
	err = ch.Publish("",queue.Name, 
		false,false,amqp.Publishing{
		ContentType: "application/json", //contentType沒有嚴格要求
		Body: []byte("hello世界"),
	})
	failOnError(err,"消息釋出失敗")
}
           

1.2:接收消息

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	conn,err := amqp.Dial("amqp://guest:[email protected]:5673")
	failOnError(err,"連接配接失敗")
	ch,err := conn.Channel()
	failOnError(err,"打開channel失敗")
	queue,err := ch.QueueDeclare("hello",//隊列的名稱:發送方接收方不需要一緻,但要跟發送方的路由鍵保持一緻
		false,false,false,false,nil)
	failOnError(err,"打開隊列失敗")
	//第一個參數queue.Name表示消費哪個隊列:要跟發送方的路由鍵保持一緻,或者""消費所有隊列。
	msgs,err := ch.Consume(queue.Name,
	"",true,//該參數為:AutoAck,設定為true之後,表示自動确認。RabbitMQ就認為為消息發送之後就被處理,是以直接将消息從緩沖區删除,消費者異常時,如果消息未被處理,則消息丢失。false時,如果消費者出現異常,重新啟動後,會将緩沖區的消息重發給消費者。
	false,false,false,nil)
	failOnError(err,"注冊消費者失敗")
	forever := make(chan bool) //設定一個channel,用來阻塞主程序,防止程式終止
	go func() {
		for msg := range msgs {
			fmt.Println(string(msg.Body))
		}
	}()
	fmt.Println("連接配接成功,等待資料..")
	<- forever
}
           

上面的這是一種最簡單的使用消息隊列的方式,簡單,卻不安全。消息的發送可能面臨着消息還未完全消費但服務已經奔潰造成消息丢失。

二:消息可靠性與channel預取

為了解決這一種情況我們有幾種方法來盡量保證消息的可靠性。

  1. 消費方AutoAck設定為false,在這種情況下,如果一個消息從發出後,消費方沒有使用msg.Ack(false)确認消息被消費的話,該條消息則一直存儲在記憶體中。當消費者因為異常中斷後重新開機,隊列将會将緩存中的消息再一次發送給消費者,如果調用Ack确認消息被消費後,該條消息則從緩存中清除。
  2. 設定AutoAck為false,也隻能保證消費者挂掉之後重新開機消息任然存在,但是如果隊列服務挂掉之後,任然會出現消息丢失的情況。是以為了解決這種情況,可以設定持久存儲;生産者跟消費者聲明隊列的時候durable設定為true,amqp.Publishing{}構造發送消息的時候增加一個DeliveryMode: amqp.Persistent屬性。設定成這種情況後,一般隊列服務挂掉後,大機率會将資料存儲到磁盤,重新啟動隊列服務跟消費者後,隊列将重新将之前儲存的資料發送給消費者,消費者消費後發送一個Ack确認消費,則消息被從緩存跟磁盤清除。
  3. 一般消息隊列隻是簡單的将入隊的消息轉發給消費者,它不會管目前消費者是否有未确認的消息。為了解決這種情況,我們可以在消費者端設定一個預取消息數量為1,當本消費者還未确認上條消息時,隊列将會把新入隊的消息轉發給其它空閑消費者。

    注意:手動确認ack,nack,reject一般都是與channel預取結合使用,确認一些消息後才繼續處理。

    接收方Consume方法中的autoAck設定确認模式,true被稱為自動模式,false表示手動确認模式,消息隊列能将大量的消息發送給某個消費者,是以,一般如果沒有使用手動确認消息時的消費者應當是具備高速處理業務的能力。

Ack(false)表示隻确認目前這條channel中的目前消息,如果設定為true,則表示把目前channel中所有未确認的消息全部設定為确認。

代碼:

發送方:
func main() {
	conn,err := amqp.Dial("amqp://guest:[email protected]:5673/")
	failOnError(err,"連接配接隊列服務失敗!")
	ch,err := conn.Channel()
	failOnError(err,"擷取channel失敗!")
	defer ch.Close()
	//由于前面隊列中已經有一個hello隊列了,是以更改配置後需要重新定義一個隊列hello_
	queue,err := ch.QueueDeclare("hello_", 
		true,false,false,false,nil)
	fmt.Println(queue.Name)
	err = ch.Publish("",queue.Name,
		false,false,amqp.Publishing{
		DeliveryMode: amqp.Persistent,
		ContentType: "application/json",
		Body: []byte("hello世界"),
	})
	failOnError(err,"消息釋出失敗")
}
接收方---可以有多個
func main() {
	conn,err := amqp.Dial("amqp://guest:[email protected]:5673")
	failOnError(err,"連接配接失敗")
	ch,err := conn.Channel()
	failOnError(err,"打開channel失敗")
	queue,err := ch.QueueDeclare("hello_",
		true,false,false,false,nil)
	failOnError(err,"打開隊列失敗")
	//設定預取消息數量為100,在上條消息未确認時,将不再下發消息到該消費者
	//預取數量要根據場景反複測試可能才會比較準确
	 err = ch.Qos(
                100,     // prefetch count
                0,     // prefetch size
                false, // global
        )
	msgs,err := ch.Consume(queue.Name,"",false,false,false,false,nil)
	failOnError(err,"注冊消費者失敗")
	forever := make(chan bool)
	go func() {
		for msg := range msgs {
			fmt.Println(string(msg.Body))
			//Ack(false)表示隻确認目前這條channel中的目前消息,如果設定為true,
			//則表示把目前channel中所有未确認的消息全部設定為确認
			msg.Ack(false)
		}
	}()
	fmt.Println("連接配接成功,等待資料..")
	<- forever
}
           

參考:

https://www.rabbitmq.com/tutorials/tutorial-one-go.html