天天看點

go語言操作rabbitmq

作者:幹飯人小羽
go語言操作rabbitmq

安裝

go get "github.com/streadway/amqp"           

文檔:

https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go

栗子一(普通模式)

生産者:

package main

import (
    "fmt"
    "log"
    "os"
    "strings"

    "github.com/streadway/amqp"
    "time"
)

/*
預設點對點模式
*/

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    // 連接配接
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    // 打開一個并發伺服器通道來處理消息
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 申明一個隊列
    q, err := ch.QueueDeclare(
        "task_queue", // name
        true,         // durable  持久性的,如果事前已經聲明了該隊列,不能重複聲明
        false,        // delete when unused
        false,        // exclusive 如果是真,連接配接一斷開,隊列删除
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    body := bodyFrom(os.Args)

    // 釋出
    err = ch.Publish(
        "",     // exchange 預設模式,exchange為空
        q.Name,           // routing key 預設模式路由到同名隊列,即是task_queue
        false,  // mandatory
        false,
        amqp.Publishing{
            // 持久性的釋出,因為隊列被聲明為持久的,釋出消息必須加上這個(可能不用),但消息還是可能會丢,如消息到緩存但MQ挂了來不及持久化。
            DeliveryMode: amqp.Persistent,
            ContentType:  "text/plain",
            Body:         []byte(body),
        })
    failOnError(err, "Failed to publish a message")
    log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = fmt.Sprintf("%s-%v","hello", time.Now())
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}           

消費者:

package main

import (
    "bytes"
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "time"
)

/*
預設點對點模式
工作方,多個,拿釋出方的消息
*/

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 指定隊列!
    q, err := ch.QueueDeclare(
        "task_queue", // name
        true,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    // Fair dispatch 預取,每個工作方每次拿一個消息,确認後才拿下一次,緩解壓力
    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    // 消費根據隊列名
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack   設定為真自動确認消息
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")

            // 确認消息被收到!!如果為真的,那麼同在一個channel,在該消息之前未确認的消息都會确認,适合批量處理
            // 真時場景:每十條消息确認一次,類似
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}           

栗子二(訂閱模式)

訂閱 生産者:

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "os"
    "strings"
    "time"
)

/*
廣播模式
釋出方
*/

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 預設模式有預設交換機,廣播自己定義一個交換機,交換機可與隊列進行綁定
    err = ch.ExchangeDeclare(
        "logs",   // name
        "fanout", // type 廣播模式
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    body := bodyFrom(os.Args)

    // 釋出
    err = ch.Publish(
        "logs", // exchange 消息發送到交換機,這個時候沒隊列綁定交換機,消息會丢棄
        "",     // routing key  廣播模式不需要這個,它會把所有消息路由到綁定的所有隊列
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = fmt.Sprintf("%s-%v","hello", time.Now())
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}           

訂閱 消費者:

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
)

/*
廣播模式
訂閱方
*/

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 同樣要申明交換機
    err = ch.ExchangeDeclare(
        "logs",   // name
        "fanout", // type
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    // 建立隊列,這個隊列沒名字,随機生成一個名字
    q, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when usused
        true,  // exclusive  表示連接配接一斷開,這個隊列自動删除
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    // 隊列和交換機綁定,即是隊列訂閱了發到這個交換機的消息
    err = ch.QueueBind(
        q.Name, // queue name  隊列的名字
        "",     // routing key  廣播模式不需要這個
        "logs", // exchange  交換機名字
        false,
        nil)
    failOnError(err, "Failed to bind a queue")

    // 開始消費消息,可開多個訂閱方,因為隊列是臨時生成的,所有每個訂閱方都能收到同樣的消息
    msgs, err := ch.Consume(
        q.Name, // queue  隊列名字
        "",     // consumer
        true,   // auto-ack  自動确認
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf(" [x] %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}           

栗子三(RPC模式)

RPC 應答方:

package main

import (
    "fmt"
    "log"
    "strconv"

    "github.com/streadway/amqp"
)

/*
RPC模式
應答方
*/

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func fib(n int) int {
    if n == 0 {
        return 0
    } else if n == 1 {
        return 1
    } else {
        return fib(n-1) + fib(n-2)
    }
}

func main() {

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "rpc_queue", // name
        false,       // durable
        false,       // delete when usused
        false,       // exclusive
        false,       // no-wait
        nil,         // arguments
    )
    failOnError(err, "Failed to declare a queue")

    // 公平分發 沒有這個則round-robbin
    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    // 消費,等待請求
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        //請求來了
        for d := range msgs {
            n, err := strconv.Atoi(string(d.Body))
            failOnError(err, "Failed to convert body to integer")

            log.Printf(" [.] fib(%d)", n)

            // 計算
            response := fib(n)

            // 回答
            err = ch.Publish(
                "",        // exchange
                d.ReplyTo, // routing key
                false,     // mandatory
                false,     // immediate
                amqp.Publishing{
                    ContentType:   "text/plain",
                    CorrelationId: d.CorrelationId,  //序列号
                    Body:          []byte(strconv.Itoa(response)),
                })
            failOnError(err, "Failed to publish a message")

            // 确認回答完畢
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Awaiting RPC requests")
    <-forever
}           

RPC 請求方:

package main

import (
    "fmt"
    "log"
    "math/rand"
    "os"
    "strconv"
    "strings"
    "time"

    "github.com/streadway/amqp"
)

/*
RPC模式
請求方
*/

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func randomString(l int) string {
    bytes := make([]byte, l)
    for i := 0; i < l; i++ {
        bytes[i] = byte(randInt(65, 90))
    }
    return string(bytes)
}

func randInt(min int, max int) int {
    return min + rand.Intn(max-min)
}

func fibonacciRPC(n int) (res int, err error) {

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 隊列聲明
    q, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when usused
        true,  // exclusive 為真即連接配接斷開就删除
        false, // noWait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive   這個為真,伺服器會認為這是該隊列唯一的消費者
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    corrId := randomString(32)

    err = ch.Publish(
        "",          // exchange
        "rpc_queue", // routing key
        false,       // mandatory
        false,       // immediate
        amqp.Publishing{
            ContentType:   "text/plain",
            CorrelationId: corrId,
            ReplyTo:       q.Name,
            Body:          []byte(strconv.Itoa(n)),
        })
    failOnError(err, "Failed to publish a message")

    for d := range msgs {
        if corrId == d.CorrelationId {
            res, err = strconv.Atoi(string(d.Body))
            failOnError(err, "Failed to convert body to integer")
            break
        }
    }

    return
}

func main() {
    rand.Seed(time.Now().UTC().UnixNano())

    n := bodyFrom(os.Args)

    log.Printf(" [x] Requesting fib(%d)", n)
    res, err := fibonacciRPC(n)
    failOnError(err, "Failed to handle RPC request")

    log.Printf(" [.] Got %d", res)
}

func bodyFrom(args []string) int {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "30"
    } else {
        s = strings.Join(args[1:], " ")
    }
    n, err := strconv.Atoi(s)
    failOnError(err, "Failed to convert arg to integer")
    return n
}