天天看點

EMQ百萬級MQTT消息服務(TLS Docker Golang)

附上:

喵了個咪的部落格:

w-blog.cn EMQ官方位址: http://emqtt.com/ EMQ中文文檔: http://emqtt.com/docs/v2/guide.html

1.TLS證書驗證

為了保障安全.我們常常會使用HTTPS來保障請求不被篡改,作為MQTT使用TLS加密的方式來保障傳輸安全

EMQ預設使用的TLS加密的端口是8883端口,預設證書在EMQ目錄下etc/certs:

對應的配置檔案在emq.conf中,可以修改你的端口和配置檔案路徑

listener.ssl.external = 8883
listener.ssl.external.keyfile = etc/certs/key.pem
listener.ssl.external.certfile = etc/certs/cert.pem           
PS:在連結的時候注意需要從之前的連結字首做如此修改 tcp:// -> ssl://

2.使用Docker部署EMQ

使用Docker部署EMQ很友善版本更新,端口管理和單節點多EMQ等,對于性能來說基本沒有帶來額外的開銷

但是官方并沒有提供對于直接可以用的Docker鏡像,但是提供的GIT大家可以自己打包以及壓縮包,筆者因為使用這裡打成了公共進行如下(2.3.5往上版本都會進維護):

registry.cn-hangzhou.aliyuncs.com/sunmi-base/sunmi-emq:2.3.5
registry.cn-hangzhou.aliyuncs.com/sunmi-base/sunmi-emq:2.3.6
registry.cn-hangzhou.aliyuncs.com/sunmi-base/sunmi-emq:2.3.7           

當然更重要的一點就是關于配置,EMQ提供通過環境變量的方式影響配置檔案,大家可以參考一下daocker-composer和Kubernetes的編排檔案,通過環境變量的方式進行配置檔案的修改,一下編排配置了mysql鑒權以及自定義鑒權語句和預設開啟mysql鑒權插件

version: '2'
services:
  emq:
    image: 'registry.cn-hangzhou.aliyuncs.com/sunmi-base/sunmi-emq:2.3.7'
    ports:
      - '31883:1883'
      - '31083:18083'
      - '38883:8883'
    environment:
      - EMQ_MQTT__ALLOW_ANONYMOUS=false
      - EMQ_AUTH__MYSQL__USERNAME=emq
      - EMQ_AUTH__MYSQL__PASSWORD=Emq666
      - EMQ_AUTH__MYSQL__DATABASE=emq
      - "EMQ_AUTH__MYSQL__SERVER=xxxxxx:3306"
      - "EMQ_AUTH__MYSQL__AUTH_QUERY=select password from mqtt_user where username = '%u' limit 1"
      - "EMQ_AUTH__MYSQL__SUPER_QUERY=select is_superuser from mqtt_user where username = '%u' limit 1"
      - "EMQ_AUTH__MYSQL__ACL_QUERY=select allow, ipaddr, username, clientid, access, REPLACE(topic,'$user','%u') from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
      - "EMQ_LOADED_PLUGINS=emq_auth_mysql,emq_recon,emq_modules,emq_retainer,emq_dashboard"
    restart: always           

以下是K8S編排檔案:

apiVersion: extensions/v1beta1                  # K8S對應的API版本
kind: Deployment                                # 對應的類型
metadata:
  name: emq-deployment
  labels:
    name: emq-deployment
spec:
  replicas: 1                                   # 鏡像副本數量
  template:
    metadata:
      labels:                                   # 容器的标簽 可和service關聯
        app: emq
    spec:
      containers:
        - name: emq                        # 容器名和鏡像
          image: registry.cn-hangzhou.aliyuncs.com/sunmi-base/sunmi-emq:2.3.6
          imagePullPolicy: Always
          env:                                  # 環境變量
          - name: EMQ_MQTT__ALLOW_ANONYMOUS
            value: "false"
          - name: EMQ_AUTH__MYSQL__USERNAME
            value: "emq"
          - name: EMQ_AUTH__MYSQL__PASSWORD
            value: "Emq666"
          - name: EMQ_AUTH__MYSQL__DATABASE
            value: "emq"
          - name: EMQ_AUTH__MYSQL__SERVER
            value: "xxxxxx:3306"
          - name: EMQ_AUTH__MYSQL__AUTH_QUERY
            value: "select password from mqtt_user where username = '%u' limit 1"
          - name: EMQ_AUTH__MYSQL__SUPER_QUERY
            value: "select is_superuser from mqtt_user where username = '%u' limit 1"
          - name: EMQ_AUTH__MYSQL__ACL_QUERY
            value: "select allow, ipaddr, username, clientid, access, REPLACE(topic,'$user','%u') from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
          - name: EMQ_LOADED_PLUGINS
            value: "emq_auth_mysql,emq_recon,emq_modules,emq_retainer,emq_dashboard"
---
apiVersion: v1
kind: Service
metadata:
  name: emq-service                          # 名稱
  labels:
    name: emq-service
spec:
  type: NodePort                                  # 開發端口的類型
  selector:                                       # service負載的容器需要有同樣的labels
    app: emq
  ports:
  - name: emq-service-1883-30111
    port: 1883                                    # 通過service來通路的端口
    targetPort: 1883                              # 對應容器的端口
    nodePort: 30111
  - name: emq-service-8883-30112
    port: 8883                                    # 通過service來通路的端口
    targetPort: 8883                              # 對應容器的端口
    nodePort: 30112
  - name: emq-service-18083-30113
    port: 18083                                    # 通過service來通路的端口
    targetPort: 18083                              # 對應容器的端口
    nodePort: 30113           
PS:需要在主控端做好TCP鍊路優化

3.Goalng用戶端

筆者這邊使用的是gobot庫基于

https://github.com/eclipse/paho.mqtt.golang

paho體系下的庫,例子如下:

package main

import (
  "gobot.io/x/gobot"
  "gobot.io/x/gobot/platforms/mqtt"
  "fmt"
  "time"
)

func main() {
  mqttAdaptor := mqtt.NewAdaptor("tcp://0.0.0.0:1883", "pinger")

  work := func() {
    mqttAdaptor.On("hello", func(msg mqtt.Message) {
      fmt.Println(msg)
    })
    mqttAdaptor.On("hola", func(msg mqtt.Message) {
      fmt.Println(msg)
    })
    data := []byte("o")
    gobot.Every(1*time.Second, func() {
      mqttAdaptor.Publish("hello", data)
    })
    gobot.Every(5*time.Second, func() {
      mqttAdaptor.Publish("hola", data)
    })
  }

  robot := gobot.NewRobot("mqttBot",
    []gobot.Connection{mqttAdaptor},
    work,
  )

  robot.Start()
}           

使用 mqttAdaptor.Publish可以發送消息 mqttAdaptor.On 可以訂閱消息,如果有使用者驗證可以使用如下方式:

mqttAdaptor = mqtt.NewAdaptorWithAuth(
        "EMQ.host",
        "EMQ.clientID",
        "EMQ.userName",
        "EMQ.passWordActive",
    )           

斷開也可以配置重連規則自動重連(預設未開啟,作為服務端強烈建議開啟)

mqttAdaptor.SetAutoReconnect(true)           

以及消息清理機制(預設斷開連接配接清理消息)

mqttAdaptor.SetCleanSession(false)           

也可以指定使用TLS證書連接配接

mqttAdaptor.SetUseSSL(true)
# 下面可以指定證書(如果EMQ使用了标準的CA憑證下面就不用配置了)
mqttAdaptor.SetClientKey(`/client/client-key.pem`)
mqttAdaptor.SetClientCert(`/client/client-cert.pem`)           

4. 總結

在EMQ和MQTT使用過程中還有很多的細節需要注意,關注細節才能走的更遠

注:筆者能力有限有說的不對的地方希望大家能夠指出,也希望多多交流!