天天看點

kuiper流式計算完整執行個體示範

背景

前面文章分享了如何安裝kuiper和kuiper-manager,本篇文章通過一個完整的例子來示範kuiper的一個比較完整的流式計算。

下圖仍舊使用了kuiper官網文檔中的圖,我在裡面稍微加了一些注釋:

kuiper流式計算完整執行個體示範

流式計算建立操作過程

kuiper的流式計算建立于操作分為如下幾個步驟:

  1. 使用指令行/rest/控制台建立一個流(對應sources)
  2. 基于建立的流編寫路由規則(對應sql/rule部分)
  3. 使用mqtt工具給mqtt broker發送消息(上文中kuiper使用emqx作為其mqtt broker)
  4. kuiper将符合路由規則的資料轉發到目的地(sinks)

建立一個流

1)docker指令行方式

# 進入kuiper容器
      
docker exec -it kuiper /bin/bash
      
# 建立一個流,定義了temperature和humidity這兩個字段,後面會對應mqtt消息(payload)的兩個字段,其中DataSource可以了解為訂閱的topic      
bin/kuiper create stream demo2 '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="demo2")'      

2) rest方式

#-d指定了stream的具體資料
curl -H "Content-Type: application/json" -X POST -d '{"sql":"create stream demo2 (temperature float, humidity bigint) WITH ( datasource = \"demo2\",FORMAT = \"json\")"}' http://localhost:9081/streams      

建立規則

1)docker指令行

#編寫rule規則檔案myRule,過濾temperature>30的資料,并輸出到log裡面(使用了kuiper的log插件),内容如下
{
    "sql": "SELECT temperature from demo2 where temperature > 30",
    "actions": [{
        "log":  {}
    }]
}

#指令行建立規則,指定ruleid為ruleDemo
bin/kuiper create rule ruleDemo -f myRule      
curl  -H "Content-Type: application/json"  -X POST -d '{"id":"ruleDemo","sql":"SELECT temperature from demo2 where temperature > 30","actions":[{"log":{}}]}'  http://localhost:9081/rules      

發送mqtt消息給emqx

#給emqx(192.168.200.2)發送{"temperature": 40, "humidity" : 20},指定topic為demo2
mosquitto_pub -h 192.168.200.2 -m '{"temperature": 40, "humidity" : 20}' -t demo2      

檢視規則過濾後的資料

kuiper流式計算完整執行個體示範

 以上就是kuiper流式計算的例子。總結一下:kuiper運作在某個邊緣裝置(這裡是一台虛機)上,訂閱了emqx的topic:demo2,當有{"temperature": 40, "humidity" : 20}這樣的資料上報的時候,kuiper的規則引擎會通過sql将資料輸出到log中。

部落客:測試生财(一個不為996而996的測開碼農)

座右銘:專注測試開發與自動化運維,努力讀書思考寫作,為内卷的人生奠定财務自由。

内容範疇:技術提升,職場雜談,事業發展,閱讀寫作,投資理财,健康人生。

csdn:https://blog.csdn.net/ccgshigao

部落格園:https://www.cnblogs.com/qa-freeroad/

51cto:https://blog.51cto.com/14900374

微信公衆号:測試生财(定期分享獨家内容和資源)

kuiper流式計算完整執行個體示範