天天看點

Golang使用協程進行mqtt的publish資訊性能測試需求話不多說,直接上代碼

需求

開發語言:golang

目的:并發10000個mqtt連接配接,循環發送publish資訊,當時間戳小于某個值的時候,中止循環,退出連接配接

publish内容是json格式的,未設定時,有預設值,可以通過golang代碼修改json内容

登入資訊存取在csv檔案中,csv檔案有多少列,就并發多少個裝置連接配接

話不多說,直接上代碼

main.go

package main
import (
    "encoding/csv"
    "encoding/json"
    "fmt"
    "os"
    "strconv"
    "time"
    mqtt "github.com/eclipse/paho.mqtt.golang"
)
var total int
// 讀取csv檔案裡的第3列資料,存入一個string數組裡
func readcsv(filename string) []string {
    var userNameList []string
    f, _ := os.Open(filename)
    defer f.Close()
    w := csv.NewReader(f)
    data, err := w.ReadAll()
    if err != nil {
        fmt.Println(err)
    }
    for i := range data {
        userNameList = append(userNameList, data[i][2])
    }
    return userNameList
}
func mqttDevice(username string, end_number chan int) {
    // mqtt裝置連接配接,設定IP位址
    opts := mqtt.NewClientOptions().AddBroker("localhost:1883")
    // 設定連接配接的使用者名密碼
    opts.SetUsername(username)
    // 使用連接配接資訊進行連接配接
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    time.Sleep(1 * time.Second)
    fmt.Println("connect success:" + username)
    // 讀取json檔案,json檔案裡的是預設參數
    fileReader, _ := os.Open("test.json")
    var eiopJsonMap map[string]interface{}
    json.NewDecoder(fileReader).Decode(&eiopJsonMap)
    // 設定一個開始時間戳和結束時間戳
    startTime := 1598167852000
    endTime := 1598167852000
    // 循環發送遙測,每次遙測間隔時間戳為15分鐘
    for ; startTime < endTime; startTime = startTime + 900000 {
        // 定義一個ep的初始值為1,每循環一次就+1
        var ep int
        ep++
        eiopJsonMap["ts"] = startTime
        eiopJsonMap["values"].(map[string]interface{})["ep"] = ep
        // 把修改過的json内容從map轉換為json格式
        eiopJsonText, _ := json.Marshal(eiopJsonMap)
        fmt.Println(string(eiopJsonText))
        // 發送遙測,發完之後休眠1秒
        result := client.Publish("topic", 0, true, eiopJsonText)
        result.Wait()
        time.Sleep(1 * time.Second)
    }
    // 發送完資訊之後,退出連接配接
    fmt.Println("disconnect:" + username)
    client.Disconnect(250)
    total++
    end_number <- total
}
func main() {
    userNameList := readcsv("connect_info.csv")
    endNumber := make(chan int, len(userNameList))
    // 變量所有的username,通過go關鍵字并發多個裝置
    for _, userName := range userNameList {
        go mqttDevice(userName, endNumber)
    }
    // 當所有的裝置都發送完畢後,關閉程式
    for i := range endNumber {
        fmt.Println("已經有" + strconv.Itoa(i) + "個裝置發送完畢")
        if i == len(userNameList) {
            return
        }
    }
}      

test.json

{
    "ts": 1603088274000,
    "values": {
        "ep": 12
    }
}      

connect_info.csv

localhost,1883,XecUwSmMGiYJp2BspMK2
localhost,1883,GqVqoPP2wblDjS2P9pQ9