天天看點

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

作者:freewebsys

#頭條創作挑戰賽#

目錄

  • 前言
  • 1,關于 eKuiper 規則引擎
  • 2,使用 EdgeX 進行相關開發。
  • 3,在edgex 管理端上面進行操作:
  • 4,或者更新到最新的ui:2.2.0也可以解決問題
  • 5,主要是利用redis的topic訂閱釋出消息
  • 6,編寫beego的http服務接收資料
  • 7,總結EdgeX規則引擎使用

1,關于 ekuiper 規則引擎

LF Edge eKuiper 是一個輕量級的物聯網資料分析和流處理引擎,運作在資源受限的邊緣裝置上。

可以進行配置規則,使用的是golang 進行開發的。

使用 apache2.0 協定開源的。

https://ekuiper.org/zh

github 位址:

https://github.com/lf-edge/ekuiper

簡單地介紹:

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

主要就是編寫SQL Flow 語句,進行規則配置。

視訊參考:

https://www.bilibili.com/video/BV11B4y1V7pQ/

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

EdgeX 規則引擎 eKuiper 實戰

2,使用 EdgeX 進行相關開發。

可以增加官方的管理背景界面,修改docker-compose的配置。

rulesengine:
    container_name: edgex-kuiper
    depends_on:
    - database
    environment:
      CONNECTION__EDGEX__REDISMSGBUS__PORT: 6379
      CONNECTION__EDGEX__REDISMSGBUS__PROTOCOL: redis
      CONNECTION__EDGEX__REDISMSGBUS__SERVER: edgex-redis
      CONNECTION__EDGEX__REDISMSGBUS__TYPE: redis
      EDGEX__DEFAULT__PORT: 6379
      EDGEX__DEFAULT__PROTOCOL: redis
      EDGEX__DEFAULT__SERVER: edgex-redis
      EDGEX__DEFAULT__TOPIC: rules-events
      EDGEX__DEFAULT__TYPE: redis
      KUIPER__BASIC__CONSOLELOG: "true"
      KUIPER__BASIC__RESTPORT: 59720
    hostname: edgex-kuiper
    image: lfedge/ekuiper:1.4.4-alpine
    networks:
      edgex-network: {}
    ports:
    - 127.0.0.1:59720:59720/tcp
    read_only: true
    restart: always
    security_opt:
    - no-new-privileges:true
    user: kuiper:kuiper
    volumes:
    - kuiper-data:/kuiper/data:z
  rulesengine-manager:
    container_name: edgex-kuiper-manager
    image: emqx/ekuiper-manager:1.6
    networks:
      edgex-network: {}
    ports:
    - 9082:9082/tcp
           

特别注意端口是9082 ,鏡像位址是:emqx/ekuiper-manager:1.6,同時放到一個網絡裡面。就可以直接通路edgex的規則引擎了。

https://ekuiper.org/docs/zh/latest/operation/manager-ui/overview.html

# 位址:http://localhost:9082
使用者名:admin
密碼:public
           

預設進入管理端是空,需要增加服務:http://edgex-kuiper:59720

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上
使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

增加成功了,同時相關的配置也讀取到了,操作也很友善。

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上
使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

3,在edgex 管理端上面進行操作:

http://127.0.0.1:4000/

建立一個 edgex 共享的流

# 簡單配置
CREATE STREAM EdgexStream () WITH (
  FORMAT = "JSON",
  TYPE = "edgex"
)
# 或者複雜點的配置也加上
CREATE STREAM EdgexStream () WITH (
  DATASOURCE = "redis",
  KEY = "",
  FORMAT = "json",
  CONF_KEY = "default",
  TYPE = "edgex",
  STRICT_VALIDATION = "true",
  TIMESTAMP = "",
  TIMESTAMP_FORMAT = "",
  RETAIN_SIZE = "0",
  SHARED = "true"
)
           
使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上
SELECT * FROM  EdgexStream
           

建立規則

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

可以設定一個空消息:

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

也可以定制一個消息模闆,配置好位址和服務:

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

結果服務沒有啟動成功:

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

報錯:

Stopped: read properties map[method:GET retryInterval:1 sendSingle:false url:http://edgex-core-command:59882/edgex/api/getData] fail with error: 2 error(s) decoding: * 'retryInterval' expected type 'int', got unconvertible type 'string', value: '1' * 'sendSingle' expected type 'bool', got unconvertible type 'string', value: 'false'.
           

看樣子是界面傳遞的 false 在 edgex 中轉換錯誤了。

{
    "triggered": true, 
    "id": "rule1", 
    "sql": "SELECT * FROM  EdgexStream", 
    "actions": [
        {
            "rest": {
                "retryInterval": 1, 
                "sendSingle": false, 
                "url": "http://edgex-core-command:59882/edgex/api/getData", 
                "method": "GET"
            }
        }
    ], 
    "options": {
        "isEventTime": false, 
        "lateTolerance": 1000, 
        "concurrency": 1, 
        "bufferLength": 1024, 
        "sendMetaToSink": false, 
        "sendError": true, 
        "qos": 0, 
        "checkpointInterval": 300000
    }
}
           

直接使用 postman 送出資料:

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

然後可以了,規則服務終于可以了,估計需要更新 ui 和 規則引擎的版本不相容造成的。

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

更多詳細的配置參考:

https://ekuiper.org/docs/zh/latest/edgex/edgex_source_tutorial.html

SELECT * FROM EdgexStream WHERE meta (deviceName) = "device-virtual"
           

或者更新到最新的ui 版本也可以解決這個問題。

4,或者更新到最新的ui:2.2.0也可以解決問題

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

要保障 device-virtual 的裝置啟動成功,就開始往 edgex 當中發送資料。

配置好了規則引擎之後就可以接受觸發的資料了。

寫的假位址,這樣就可以算接受資料了。

2022/10/29 14:00:19.874 [D] [server.go:2836]  |    172.18.0.10| 404 |   1.156644ms| nomatch| GET      /edgex/api/getData
2022/10/29 14:00:19.876 [D] [server.go:2836]  |    172.18.0.10| 404 |    257.062µs| nomatch| GET      /edgex/api/getData
2022/10/29 14:00:19.878 [D] [server.go:2836]  |    172.18.0.10| 404 |    297.857µs| nomatch| GET      /edgex/api/getData
2022/10/29 14:00:19.879 [D] [server.go:2836]  |    172.18.0.10| 404 |    258.943µs| nomatch| GET      /edgex/api/getData
           

也可以配置成發送到edgex的其他服務上:

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

然後再看處理的消息數量,有所增加。

使用EdgeX的eKuiper規則引擎,配置處理資料,并轉發到http服務上

5,主要是利用redis的topic訂閱釋出消息

通過訂閱 SUBSCRIBE rules-events 消息是通過這個釋出的。格式

$ docker exec -it edgex-redis sh
/data # redis-cli 
127.0.0.1:6379> SUBSCRIBE rules-events
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "rules-events"
3) (integer) 1
1) "message"
2) "rules-events"
3) "{\"ReceivedTopic\":\"\",\"CorrelationID\":\"9a7a86fd-eac7-43ea-881a-da4672592f1f\",\"Payload\":\"eyJhcGlWZXJzaW9uIjoidjIiLCJpZCI6IjgwZjBjMjgwLTQwNjEtNDRlNy04MjJmLTQ3MGNiZjQ0N2M2OCIsImRldmljZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwic291cmNlTmFtZSI6IkZsb2F0MzIiLCJvcmlnaW4iOjE2NjcwNTU2Nzk3OTM2MzYwNjEsInJlYWRpbmdzIjpbeyJpZCI6Ijg3MzI2ZDE5LWM2NTEtNGE3NC04YWJkLTkxOTkyMjQ0ZTYzZSIsIm9yaWdpbiI6MTY2NzA1NTY3OTc5MzYzNjA2MSwiZGV2aWNlTmFtZSI6IlJhbmRvbS1GbG9hdC1EZXZpY2UiLCJyZXNvdXJjZU5hbWUiOiJGbG9hdDMyIiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwidmFsdWVUeXBlIjoiRmxvYXQzMiIsInZhbHVlIjoiMS42NTkwNzllKzM4In1dfQ==\",\"ContentType\":\"application/json\"}"
1) "message"
2) "rules-events"
3) "{\"ReceivedTopic\":\"\",\"CorrelationID\":\"232d1863-be69-4cbc-971f-29c81e8b3d61\",\"Payload\":\"eyJhcGlWZXJzaW9uIjoidjIiLCJpZCI6IjQ2ZGU0ZGE2LTcwMWEtNGQ3ZC1iZmRiLTNmMGI4OTRjNmVjYiIsImRldmljZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwic291cmNlTmFtZSI6IkZsb2F0NjQiLCJvcmlnaW4iOjE2NjcwNTU2Nzk3OTQwOTg3MDUsInJlYWRpbmdzIjpbeyJpZCI6IjBjYTkyZGVkLTIyNjUtNDYyMC1hMzc1LWUxMDllNDU2MThjNSIsIm9yaWdpbiI6MTY2NzA1NTY3OTc5NDA5ODcwNSwiZGV2aWNlTmFtZSI6IlJhbmRvbS1GbG9hdC1EZXZpY2UiLCJyZXNvdXJjZU5hbWUiOiJGbG9hdDY0IiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwidmFsdWVUeXBlIjoiRmxvYXQ2NCIsInZhbHVlIjoiLTUuMzM1MTI4ZSszMDcifV19\",\"ContentType\":\"application/json\"}"
1) "message"

           

使用的是base64 加密了下,估計是防止json 資料問題。解析後的資料:

https://tool.oschina.net/encrypt?type=3

使用工具轉換下,可以看到是virtual裝置發送的資訊。

{"apiVersion":"v2","id":"80f0c280-4061-44e7-822f-470cbf447c68","deviceName":"Random-Float-Device","profileName":"Random-Float-Device","sourceName":"Float32","origin":1667055679793636061,"readings":[{"id":"87326d19-c651-4a74-8abd-91992244e63e","origin":1667055679793636061,"deviceName":"Random-Float-Device","resourceName":"Float32","profileName":"Random-Float-Device","valueType":"Float32","value":"1.659079e+38"}]}
           

6,編寫beego的http服務接收資料

配置好規則位址是 POST方法,然後資料格式是json。

使用beego 的POST 方法接受資料:

func (c *IndexController) GetEdgexData() {
	body := c.Ctx.Input.RequestBody

	log.Println("########### GetEdgexData :",string(body)," ###########")
	defer c.ServeJSON()
	return
}
           

本來以為是給json,但是是給數組,數組裡面是個json。

2022/10/29 15:59:30 ########### GetEdgexData : [{"Bool":true}]  ###########
2022/10/29 15:59:30.385 [D] [server.go:2836]  |     172.17.0.1| 200 |    308.432µs|   match| POST     /edgex/api/getData   r:/edgex/api/getData
2022/10/29 15:59:35 filterAdmin request url  :  /edgex/api/getData
2022/10/29 15:59:35 ########### GetEdgexData : [{"Int8":-46}]  ###########
2022/10/29 15:59:35.163 [D] [server.go:2836]  |     172.17.0.1| 200 |    162.124µs|   match| POST     /edgex/api/getData   r:/edgex/api/getData 
2022/10/29 15:59:35 ########### GetEdgexData : [{"Int64":5618840669469908222}]  ###########
2022/10/29 15:59:35.164 [D] [server.go:2836]  |     172.17.0.1| 200 |    196.329µs|   match| POST     /edgex/api/getData   r:/edgex/api/getData
2022/10/29 15:59:35 ########### GetEdgexData : [{"Int16":-11546}]  ###########
2022/10/29 15:59:35.165 [D] [server.go:2836]  |     172.17.0.1| 200 |     132.69µs|   match| POST     /edgex/api/getData   r:/edgex/api/getData

           

7,總結EdgeX規則引擎使用

總體來說 EdgeX的eKuiper 規則引擎使用起來是非常的友善的。

通過和 EdgeX 深度整合,可以直接轉換調用成各種方法,也可以自定義轉發到 rest mqtt mq 等地方。

同時可以接收到device-virtual的資料。整個流程也非常清晰友善。

繼續閱讀