天天看點

輕量級邊緣計算 EMQ X Kuiper 與 Azure IoT Hub 內建方案

背景

本文以一個常見的物聯網使用場景為案例,介紹了如何利用邊緣計算來實作對業務的快速、低成本和有效地處理。

在各類物聯網項目中,比如智能樓宇項目,需要将樓宇的資料(比如電梯、瓦斯、水電等)進行采集和分析。一種解決方案是将所有的裝置直接接入在雲端的物聯網平台,類似于像 Azure IoT Hub 或者 AWS IoT Hub。這種解決方案的問題在于,

  • 資料處理時延較長:通過 Internet 傳輸和雲端的處理後傳回給裝置,所需時間較長
  • 資料傳輸和存儲成本:通過 Internet 傳輸需要帶寬,對于大規模連接配接的物聯網項目來說,耗費的帶寬會相當可觀
  • 資料的安全性:有些物聯網的資料會相當敏感,全部通過物聯網傳輸的話會有風險

為了解決以上的問題,業界提出了邊緣計算的方案,邊緣計算的核心就在于把資料進行就近處理,避免不必要的時延、成本和安全問題。

業務場景

假設現有一組裝置,組中的每個裝置有一個 id,通過 MQTT 協定往 MQTT 消息伺服器上相應的主題發送資料。主題的設計如下,其中 {device_id} 為裝置的 id。

devices/{device_id}/messages
           

每個裝置發送的資料格式為 JSON,發送的通過該傳感器采集的溫度與濕度資料。

{
    "temperature": 30, 
    "humidity" : 20
}
           

現在需要實時分析資料,并提出以下的需求:對每個裝置的溫度資料按照每 10 秒鐘計算平均值(

t_av

),并且記下 10 秒鐘内的最大值 (

t_max

)、最小值(

t_min

) 和資料條數(

t_count

),計算完畢後将這 4 個結果進行儲存,以下為樣例結果資料:

[
    {
        "device_id" : "1", "t_av" : 25,  "t_max" : 45, "t_min" : 5, "t_count" : 2
    },
    {
        "device_id" : "2", "t_av" : 25,  "t_max" : 45, "t_min" : 5, "t_count" : 2
    },
    ...
]
           

方案介紹

如下圖所示,采用邊緣分析/流式資料處理的方式,在邊緣端我們采用了 EMQ X 的方案,最後将計算結果輸出到 Azure 的 IoT Hub 中。

輕量級邊緣計算 EMQ X Kuiper 與 Azure IoT Hub 內建方案
  • EMQ X Edge 可以接入各種協定類型的裝置,比如 MQTT、CoAP、LwM2M 等,這樣使用者可以不需要關心協定适配方面的問題;另外它本身也比較輕量級,适合部署在邊緣裝置上。
  • EMQ X Kuiper 是 EMQ 釋出的基于 SQL 的輕量級邊緣流式資料分析引擎,安裝包隻有約 7MB,非常适合于運作在邊緣裝置端
  • Azure IoT Hub 提供了比較全的裝置接入和資料分析的方案,此處用于雲端的結果資料接入,以及應用所需的結果資料分析

實作步驟

安裝 EMQ X Edge & Kuiper

  • 寫本文的時候,EMQ X Edge 的最新版本是4.0,使用者可以通過 Docker 來安裝和啟動 EMQ X Edge
    # docker pull emqx/emqx-edge
    # docker run -d --name emqx -p 1883:1883  emqx/emqx-edge:latest
    # docker ps
    CONTAINER ID        IMAGE                   COMMAND                  CREATED             STATUS              PORTS                                                                                                           NAMES
    a348e3ac150c        emqx/emqx-edge:latest   "/usr/bin/docker-entr"   3 seconds ago       Up 2 seconds        4369/tcp, 5369/tcp, 6369/tcp, 8080/tcp, 8083-8084/tcp, 8883/tcp, 11883/tcp, 0.0.0.0:1883->1883/tcp, 18083/tcp   emqx
               
    使用者可以通過

    telnet

    指令來判斷是否啟動成功,如下所示。
    # telnet localhost 1883
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
               
  • 安裝、啟動 Kuiper

    點選這裡下載下傳最新版 Kuiper,并解壓。在寫本文的時候,Kuiper 最新版本為 0.0.3。

    # unzip kuiper-linux-amd64-0.0.3.zip
    # cd kuiper
    # bin/server
    Serving Kuiper server on port 20498
               
    如果無法啟動,請檢視日志檔案

    log/stream.log

建立流

Kuiper 提供了一個指令用于管理流和規則,使用者可以通過在指令行視窗中敲入

bin/cli

檢視有哪些子指令及其幫助。

cli

指令預設連接配接的是本地的 Kuiper 伺服器,

cli

指令也可以連接配接到别的 Kuiper 伺服器,使用者可以在

etc/client.yaml

配置檔案中修改連接配接的 Kuiper 伺服器。使用者如果想了解更多關于指令行的資訊,可以參考這裡。

建立流定義:建立流的目的是為了定義發送到該流上的資料格式,類似于在關系資料庫中定義表的結構。 Kuiper 中所有支援的資料類型,可以參考這裡。

# cd kuiper
# bin/cli create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'
           

上述語句在 Kuiper 中建立了一個名為 demo 的流定義,包含了兩個字段,分别為 temperature 和 humidity,資料源為訂閱 MQTT 的主題

devices/+/messages

,這裡請注意采用了通配符

+

,用于訂閱不同裝置的消息。該資料源所對應的 MQTT 伺服器位址在配置檔案

etc/mqtt_source.yaml

中,可以根據所在的伺服器位址進行配置。如下圖所示,配置

servers

項目。

#Global MQTT configurations
default:
  qos: 1
  sharedsubscription: true
  servers: [tcp://127.0.0.1:1883]
           

使用者可以在指令行中敲入

describe

子指令來檢視剛建立好的流定義。

# bin/cli describe stream demo
Connecting to 127.0.0.1:20498
Fields
--------------------------------------------------------------------------------
temperature	float
humidity	bigint

FORMAT: JSON
DATASOURCE: devices/+/messages
           

資料業務邏輯處理

Kuiper 采用 SQL 實作業務邏輯,每10秒鐘統計溫度的平均值、最大值、最小值和次數,并根據裝置 ID 進行分組,實作的 SQL 如下所示。

這裡的 SQL 用了四個聚合函數,用于統計在10秒鐘視窗期内的相關值。

  • avg

    :平均值
  • max

    :最大值
  • min

    :最小值
  • count

    :計數

另外還使用了兩個基本的函數

  • mqtt

    :消息中取出 MQTT 協定的資訊,

    mqtt(topic)

    就是取得目前取得消息的主題名稱
  • split_value

    :該函數将第一個參數使用第二個參數進行分割,然後第三個參數指定下标,取得分割後的值。是以函數

    split_value("devices/001/messages", "/", 1)

    調用就傳回

    001

GROUP BY

跟的是分組的字段,分别為計算字段

device_id

;時間視窗

TUMBLINGWINDOW(ss, 10)

,該時間視窗的含義為每10秒鐘生成一批統計資料。

調試 SQL

在正式寫規則之前,我們需要對規則進行調試,Kuiper 提供了 SQL 的調試工具,可以讓使用者非常友善地對 SQL 進行調試。

  • 進入 kuiper 安裝目錄,并運作

    bin/cli query

  • 在出現的指令行提示符中輸入前面準備好的 SQL 語句。
    # bin/cli query
    Connecting to 127.0.0.1:20498
    kuiper > SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)
    query is submit successfully.
    kuiper >
               
    在日志檔案

    log/stream.log

    中,可以看到建立了一個名為

    internal-kuiper_query_rule

    的臨時規則。
    ...
    time="2019-11-12T11:56:10+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=internal-kuiper_query_rule
    time="2019-11-12T11:56:10+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=internal-kuiper_query_rule
               
    值得注意的是,這個名為

    internal-kuiper_query_rule

    的規則是通過

    query

    建立的,伺服器端每5秒鐘會檢測一下

    query

    用戶端是否線上,如果

    query

    用戶端發現有超過10秒鐘沒有反應(比如被關閉),那麼這個内部建立的

    internal-kuiper_query_rule

    規則會被自動删除,被删除的時候在日志檔案中會列印如下的資訊。
    ...
    time="2019-11-12T12:04:08+08:00" level=info msg="The client seems no longer fetch the query result, stop the query now."
    time="2019-11-12T12:04:08+08:00" level=info msg="stop the query."
    time="2019-11-12T12:04:08+08:00" level=info msg="unary operator project cancelling...." rule=internal-kuiper_query_rule
    ...
               
  • 發送測試資料

    通過任何的測試工具,向 EMQ X Edge 發送以下的測試資料。筆者在測試過程中用的是 JMeter 的 MQTT 插件,因為基于 JMeter 可以做一些比較靈活的自動資料生成,業務邏輯控制,以及大量裝置的模拟等。使用者也可以直接使用

    mosquitto

    等其它用戶端進行模拟。
    • 主題:

      devices/$device_id/messages

      ,其中

      $device_id

      為下面資料中的第一列
    • 消息:

      {"temperature": $temperature, "humidity" : $humidity}

      , 其中

      $temperature

      $humidity

      分别為下面資料中的第二列和第三列
    #device_id, temperature, humidity
    1,20,30
    2,31,40
    1,35,50
    2,20,30
    1,80,90
    2,45,20
    1,10,90
    2,12,30
    1,65,35
    2,55,32
               
    我們可以發現發送了模拟資料後,在

    query

    用戶端指令行裡在兩個10秒的時間視窗裡列印了兩組資料。這裡輸出的結果條數跟使用者發送資料的頻率有關系,如果 Kuiper 在一個時間視窗内接受到所有的資料,那麼隻列印一條結果。
    kuiper > [{"device_id":"1","t_av":45,"t_count":3,"t_max":80,"t_min":20},{"device_id":"2","t_av":25.5,"t_count":2,"t_max":31,"t_min":20}]
    
    [{"device_id":"2","t_av":37.333333333333336,"t_count":3,"t_max":55,"t_min":12},{"device_id":"1","t_av":37.5,"t_count":2,"t_max":65,"t_min":10}]
               

建立、送出規則

完成了 SQL 的調試之後,開始配置規則檔案,将結果資料通過 Kuiper 的 MQTT Sink 發送到遠端的 Azure IoT Hub 中。 在 Azure IoT Hub 中,使用者需要先建立好以下内容,

  • IoT Hub:本文建立的名稱為

    rockydemo

    ,用于接入裝置
  • IoT Device:代表了一個裝置,此處為處理裝置資料的網關,該網關安裝了 Kuiper,網關在把相關相關資料處理完畢後,将結果發送到 Azure 雲端
  • 裝置連接配接使用者名和密碼:請參考 Azure 相關的文檔了解 Azure IoT MQTT 連接配接的使用者名和密碼;關于生成 SAS Token,使用者可以參考此文檔。

如下圖所示,在 Azure IoT Hub 中建立完成的相關裝置。

輕量級邊緣計算 EMQ X Kuiper 與 Azure IoT Hub 內建方案

編寫 Kuiper 規則檔案

規則檔案是一個文本檔案,描述了業務處理的邏輯(前面已經調試好的 SQL 語句),以及 sink 的配置(消息處理結果的發送目的地)。連接配接 Azure IoT Hub 的大部分資訊都已經在前文中描述,需要注意是必須設定

protocol_version

的值為

3.1.1

,而不能為

3.1

{
  "sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)",
  "actions": [
    {
      "log": {}
    },
    {
      "mqtt": {
        "server": "ssl://rockydemo.azure-devices.net:8883",
        "topic": "devices/demo_001/messages/events/",
        "protocol_version": "3.1.1",
        "qos": 1,
        "clientId": "demo_001",
        "username": "rockydemo.azure-devices.net/demo_001/?api-version=2018-06-30",
        "password": "SharedAccessSignature sr=*******************"
      }
    }
  ]
}
           

通過 Kuiper 指令行建立規則

# bin/cli create rule rule1 -f rule1.txt
Connecting to 127.0.0.1:20498
Creating a new rule from file rule1.txt. 
Rule rule1 was created.
           

在日志檔案中可以檢視規則的運作連接配接情況,如果配置項都正确的話,應該可以看到到 Azure IoT Hub 的連接配接建立成功。

......
time="2019-11-12T14:30:34+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=rule1
time="2019-11-12T14:30:34+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=rule1
time="2019-11-12T14:30:35+08:00" level=info msg="The connection to server ssl://rockydemo.azure-devices.net:8883 was established successfully" rule=rule1
......
           
  • 通過指令

    az iot hub monitor-events -n rockydemo

    啟動 Azure IoT Hub 監控,并往本地的 EMQ X Edge 上發送跟調試 SQL 語句一樣的模拟資料。經過 Kuiper 處理後,相應的處理結果被發送到了 Azure IoT Hub 中。
    #az iot hub monitor-events -n rockydemo
    Starting event monitor, use ctrl-c to stop...
    {
        "event": {
            "origin": "demo_001",
            "payload": "[{\"device_id\":\"2\",\"t_av\":32,\"t_count\":3,\"t_max\":45,\"t_min\":20},{\"device_id\":\"1\",\"t_av\":45,\"t_count\":3,\"t_max\":80,\"t_min\":20}]"
        }
    }
    {
        "event": {
            "origin": "demo_001",
            "payload": "[{\"device_id\":\"2\",\"t_av\":33.5,\"t_count\":2,\"t_max\":55,\"t_min\":12},{\"device_id\":\"1\",\"t_av\":37.5,\"t_count\":2,\"t_max\":65,\"t_min\":10}]"
        }
    }
               

總結

通過本文,讀者可以了解到利用 EMQ X 在邊緣端的解決方案可以非常快速、靈活地開發出基于邊緣資料分析的系統,實作資料低延遲時間、低成本和安全的處理。Azure IoT 也提供了 IoT Edge 方案,與 Azure 的方案相比,

  • Kuiper 的運作時非常輕量級;Azure IoT Edge 方案需要提供相關語言的運作時,安裝包相對來說會比較大。
  • Kuiper 基于 SQL 實作業務邏輯的實作方式更加快速簡單,對複雜的業務邏輯處理缺乏一定的靈活性;Azure IoT Edge 在業務實作的靈活度上相對來說更佳。
  • Kuiper 在與第三方的 IoT Hub 進行內建的時候靈活性更好。Azure IoT Edge 一般隻跟 Azure IoT Hub 進行對接。

如果有興趣了解更多關于邊緣流式資料分析的内容,請參考 Kuiper 開源項目。

繼續閱讀