背景
前面文章分享了如何安裝kuiper和kuiper-manager,本篇文章通過一個完整的例子來示範kuiper的一個比較完整的流式計算。
下圖仍舊使用了kuiper官網文檔中的圖,我在裡面稍微加了一些注釋:
流式計算建立操作過程
kuiper的流式計算建立于操作分為如下幾個步驟:
- 使用指令行/rest/控制台建立一個流(對應sources)
- 基于建立的流編寫路由規則(對應sql/rule部分)
- 使用mqtt工具給mqtt broker發送消息(上文中kuiper使用emqx作為其mqtt broker)
- kuiper将符合路由規則的資料轉發到目的地(sinks)
建立一個流
1)docker指令行方式
# 進入kuiper容器
docker exec -it kuiper /bin/bash
# 建立一個流,定義了temperature和humidity這兩個字段,後面會對應mqtt消息(payload)的兩個字段,其中DataSource可以了解為訂閱的topic
bin/kuiper create stream demo2 '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="demo2")'
2) rest方式
#-d指定了stream的具體資料
curl -H "Content-Type: application/json" -X POST -d '{"sql":"create stream demo2 (temperature float, humidity bigint) WITH ( datasource = \"demo2\",FORMAT = \"json\")"}' http://localhost:9081/streams
建立規則
1)docker指令行
#編寫rule規則檔案myRule,過濾temperature>30的資料,并輸出到log裡面(使用了kuiper的log插件),内容如下
{
"sql": "SELECT temperature from demo2 where temperature > 30",
"actions": [{
"log": {}
}]
}
#指令行建立規則,指定ruleid為ruleDemo
bin/kuiper create rule ruleDemo -f myRule
curl -H "Content-Type: application/json" -X POST -d '{"id":"ruleDemo","sql":"SELECT temperature from demo2 where temperature > 30","actions":[{"log":{}}]}' http://localhost:9081/rules
發送mqtt消息給emqx
#給emqx(192.168.200.2)發送{"temperature": 40, "humidity" : 20},指定topic為demo2
mosquitto_pub -h 192.168.200.2 -m '{"temperature": 40, "humidity" : 20}' -t demo2
檢視規則過濾後的資料
以上就是kuiper流式計算的例子。總結一下:kuiper運作在某個邊緣裝置(這裡是一台虛機)上,訂閱了emqx的topic:demo2,當有{"temperature": 40, "humidity" : 20}這樣的資料上報的時候,kuiper的規則引擎會通過sql将資料輸出到log中。
部落客:測試生财(一個不為996而996的測開碼農)
座右銘:專注測試開發與自動化運維,努力讀書思考寫作,為内卷的人生奠定财務自由。
内容範疇:技術提升,職場雜談,事業發展,閱讀寫作,投資理财,健康人生。
csdn:https://blog.csdn.net/ccgshigao
部落格園:https://www.cnblogs.com/qa-freeroad/
51cto:https://blog.51cto.com/14900374
微信公衆号:測試生财(定期分享獨家内容和資源)