#頭條創作挑戰賽#
目錄
- 前言
- 1,關于 eKuiper 規則引擎
- 2,使用 EdgeX 進行相關開發。
- 3,在edgex 管理端上面進行操作:
- 4,或者更新到最新的ui:2.2.0也可以解決問題
- 5,主要是利用redis的topic訂閱釋出消息
- 6,編寫beego的http服務接收資料
- 7,總結EdgeX規則引擎使用
1,關于 ekuiper 規則引擎
LF Edge eKuiper 是一個輕量級的物聯網資料分析和流處理引擎,運作在資源受限的邊緣裝置上。
可以進行配置規則,使用的是golang 進行開發的。
使用 apache2.0 協定開源的。
https://ekuiper.org/zh
github 位址:
https://github.com/lf-edge/ekuiper
簡單地介紹:
主要就是編寫SQL Flow 語句,進行規則配置。
視訊參考:
https://www.bilibili.com/video/BV11B4y1V7pQ/
EdgeX 規則引擎 eKuiper 實戰
2,使用 EdgeX 進行相關開發。
可以增加官方的管理背景界面,修改docker-compose的配置。
rulesengine:
container_name: edgex-kuiper
depends_on:
- database
environment:
CONNECTION__EDGEX__REDISMSGBUS__PORT: 6379
CONNECTION__EDGEX__REDISMSGBUS__PROTOCOL: redis
CONNECTION__EDGEX__REDISMSGBUS__SERVER: edgex-redis
CONNECTION__EDGEX__REDISMSGBUS__TYPE: redis
EDGEX__DEFAULT__PORT: 6379
EDGEX__DEFAULT__PROTOCOL: redis
EDGEX__DEFAULT__SERVER: edgex-redis
EDGEX__DEFAULT__TOPIC: rules-events
EDGEX__DEFAULT__TYPE: redis
KUIPER__BASIC__CONSOLELOG: "true"
KUIPER__BASIC__RESTPORT: 59720
hostname: edgex-kuiper
image: lfedge/ekuiper:1.4.4-alpine
networks:
edgex-network: {}
ports:
- 127.0.0.1:59720:59720/tcp
read_only: true
restart: always
security_opt:
- no-new-privileges:true
user: kuiper:kuiper
volumes:
- kuiper-data:/kuiper/data:z
rulesengine-manager:
container_name: edgex-kuiper-manager
image: emqx/ekuiper-manager:1.6
networks:
edgex-network: {}
ports:
- 9082:9082/tcp
特别注意端口是9082 ,鏡像位址是:emqx/ekuiper-manager:1.6,同時放到一個網絡裡面。就可以直接通路edgex的規則引擎了。
https://ekuiper.org/docs/zh/latest/operation/manager-ui/overview.html
# 位址:http://localhost:9082
使用者名:admin
密碼:public
預設進入管理端是空,需要增加服務:http://edgex-kuiper:59720
增加成功了,同時相關的配置也讀取到了,操作也很友善。
3,在edgex 管理端上面進行操作:
http://127.0.0.1:4000/
建立一個 edgex 共享的流
# 簡單配置
CREATE STREAM EdgexStream () WITH (
FORMAT = "JSON",
TYPE = "edgex"
)
# 或者複雜點的配置也加上
CREATE STREAM EdgexStream () WITH (
DATASOURCE = "redis",
KEY = "",
FORMAT = "json",
CONF_KEY = "default",
TYPE = "edgex",
STRICT_VALIDATION = "true",
TIMESTAMP = "",
TIMESTAMP_FORMAT = "",
RETAIN_SIZE = "0",
SHARED = "true"
)
SELECT * FROM EdgexStream
建立規則
可以設定一個空消息:
也可以定制一個消息模闆,配置好位址和服務:
結果服務沒有啟動成功:
報錯:
Stopped: read properties map[method:GET retryInterval:1 sendSingle:false url:http://edgex-core-command:59882/edgex/api/getData] fail with error: 2 error(s) decoding: * 'retryInterval' expected type 'int', got unconvertible type 'string', value: '1' * 'sendSingle' expected type 'bool', got unconvertible type 'string', value: 'false'.
看樣子是界面傳遞的 false 在 edgex 中轉換錯誤了。
{
"triggered": true,
"id": "rule1",
"sql": "SELECT * FROM EdgexStream",
"actions": [
{
"rest": {
"retryInterval": 1,
"sendSingle": false,
"url": "http://edgex-core-command:59882/edgex/api/getData",
"method": "GET"
}
}
],
"options": {
"isEventTime": false,
"lateTolerance": 1000,
"concurrency": 1,
"bufferLength": 1024,
"sendMetaToSink": false,
"sendError": true,
"qos": 0,
"checkpointInterval": 300000
}
}
直接使用 postman 送出資料:
然後可以了,規則服務終于可以了,估計需要更新 ui 和 規則引擎的版本不相容造成的。
更多詳細的配置參考:
https://ekuiper.org/docs/zh/latest/edgex/edgex_source_tutorial.html
SELECT * FROM EdgexStream WHERE meta (deviceName) = "device-virtual"
或者更新到最新的ui 版本也可以解決這個問題。
4,或者更新到最新的ui:2.2.0也可以解決問題
要保障 device-virtual 的裝置啟動成功,就開始往 edgex 當中發送資料。
配置好了規則引擎之後就可以接受觸發的資料了。
寫的假位址,這樣就可以算接受資料了。
2022/10/29 14:00:19.874 [D] [server.go:2836] | 172.18.0.10| 404 | 1.156644ms| nomatch| GET /edgex/api/getData
2022/10/29 14:00:19.876 [D] [server.go:2836] | 172.18.0.10| 404 | 257.062µs| nomatch| GET /edgex/api/getData
2022/10/29 14:00:19.878 [D] [server.go:2836] | 172.18.0.10| 404 | 297.857µs| nomatch| GET /edgex/api/getData
2022/10/29 14:00:19.879 [D] [server.go:2836] | 172.18.0.10| 404 | 258.943µs| nomatch| GET /edgex/api/getData
也可以配置成發送到edgex的其他服務上:
然後再看處理的消息數量,有所增加。
5,主要是利用redis的topic訂閱釋出消息
通過訂閱 SUBSCRIBE rules-events 消息是通過這個釋出的。格式
$ docker exec -it edgex-redis sh
/data # redis-cli
127.0.0.1:6379> SUBSCRIBE rules-events
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "rules-events"
3) (integer) 1
1) "message"
2) "rules-events"
3) "{\"ReceivedTopic\":\"\",\"CorrelationID\":\"9a7a86fd-eac7-43ea-881a-da4672592f1f\",\"Payload\":\"eyJhcGlWZXJzaW9uIjoidjIiLCJpZCI6IjgwZjBjMjgwLTQwNjEtNDRlNy04MjJmLTQ3MGNiZjQ0N2M2OCIsImRldmljZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwic291cmNlTmFtZSI6IkZsb2F0MzIiLCJvcmlnaW4iOjE2NjcwNTU2Nzk3OTM2MzYwNjEsInJlYWRpbmdzIjpbeyJpZCI6Ijg3MzI2ZDE5LWM2NTEtNGE3NC04YWJkLTkxOTkyMjQ0ZTYzZSIsIm9yaWdpbiI6MTY2NzA1NTY3OTc5MzYzNjA2MSwiZGV2aWNlTmFtZSI6IlJhbmRvbS1GbG9hdC1EZXZpY2UiLCJyZXNvdXJjZU5hbWUiOiJGbG9hdDMyIiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwidmFsdWVUeXBlIjoiRmxvYXQzMiIsInZhbHVlIjoiMS42NTkwNzllKzM4In1dfQ==\",\"ContentType\":\"application/json\"}"
1) "message"
2) "rules-events"
3) "{\"ReceivedTopic\":\"\",\"CorrelationID\":\"232d1863-be69-4cbc-971f-29c81e8b3d61\",\"Payload\":\"eyJhcGlWZXJzaW9uIjoidjIiLCJpZCI6IjQ2ZGU0ZGE2LTcwMWEtNGQ3ZC1iZmRiLTNmMGI4OTRjNmVjYiIsImRldmljZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwic291cmNlTmFtZSI6IkZsb2F0NjQiLCJvcmlnaW4iOjE2NjcwNTU2Nzk3OTQwOTg3MDUsInJlYWRpbmdzIjpbeyJpZCI6IjBjYTkyZGVkLTIyNjUtNDYyMC1hMzc1LWUxMDllNDU2MThjNSIsIm9yaWdpbiI6MTY2NzA1NTY3OTc5NDA5ODcwNSwiZGV2aWNlTmFtZSI6IlJhbmRvbS1GbG9hdC1EZXZpY2UiLCJyZXNvdXJjZU5hbWUiOiJGbG9hdDY0IiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwidmFsdWVUeXBlIjoiRmxvYXQ2NCIsInZhbHVlIjoiLTUuMzM1MTI4ZSszMDcifV19\",\"ContentType\":\"application/json\"}"
1) "message"
使用的是base64 加密了下,估計是防止json 資料問題。解析後的資料:
https://tool.oschina.net/encrypt?type=3
使用工具轉換下,可以看到是virtual裝置發送的資訊。
{"apiVersion":"v2","id":"80f0c280-4061-44e7-822f-470cbf447c68","deviceName":"Random-Float-Device","profileName":"Random-Float-Device","sourceName":"Float32","origin":1667055679793636061,"readings":[{"id":"87326d19-c651-4a74-8abd-91992244e63e","origin":1667055679793636061,"deviceName":"Random-Float-Device","resourceName":"Float32","profileName":"Random-Float-Device","valueType":"Float32","value":"1.659079e+38"}]}
6,編寫beego的http服務接收資料
配置好規則位址是 POST方法,然後資料格式是json。
使用beego 的POST 方法接受資料:
func (c *IndexController) GetEdgexData() {
body := c.Ctx.Input.RequestBody
log.Println("########### GetEdgexData :",string(body)," ###########")
defer c.ServeJSON()
return
}
本來以為是給json,但是是給數組,數組裡面是個json。
2022/10/29 15:59:30 ########### GetEdgexData : [{"Bool":true}] ###########
2022/10/29 15:59:30.385 [D] [server.go:2836] | 172.17.0.1| 200 | 308.432µs| match| POST /edgex/api/getData r:/edgex/api/getData
2022/10/29 15:59:35 filterAdmin request url : /edgex/api/getData
2022/10/29 15:59:35 ########### GetEdgexData : [{"Int8":-46}] ###########
2022/10/29 15:59:35.163 [D] [server.go:2836] | 172.17.0.1| 200 | 162.124µs| match| POST /edgex/api/getData r:/edgex/api/getData
2022/10/29 15:59:35 ########### GetEdgexData : [{"Int64":5618840669469908222}] ###########
2022/10/29 15:59:35.164 [D] [server.go:2836] | 172.17.0.1| 200 | 196.329µs| match| POST /edgex/api/getData r:/edgex/api/getData
2022/10/29 15:59:35 ########### GetEdgexData : [{"Int16":-11546}] ###########
2022/10/29 15:59:35.165 [D] [server.go:2836] | 172.17.0.1| 200 | 132.69µs| match| POST /edgex/api/getData r:/edgex/api/getData
7,總結EdgeX規則引擎使用
總體來說 EdgeX的eKuiper 規則引擎使用起來是非常的友善的。
通過和 EdgeX 深度整合,可以直接轉換調用成各種方法,也可以自定義轉發到 rest mqtt mq 等地方。
同時可以接收到device-virtual的資料。整個流程也非常清晰友善。