天天看點

PostgreSQL "物聯網"應用 - 1 實時流式資料處理案例(萬億每天)

物聯網的一個特點是萬物聯網,會産生大量的資料。

例如 :

一盒藥,從生産,到運輸,到藥店,到售賣。每流經一個節點,都會記錄它的資訊。

又如 :

健康手環,兒童防丢手表,一些動物遷徙研究的傳感器(如中華鲟),水紋監測,電網監測,瓦斯管道監測,氣象監測等等這些資訊。

股價的實時預測。

車流實時資料統計,車輛軌迹實時合并。

商場人流實時統計。

資料監控實時處理,例如資料庫的監控,伺服器的監控,作業系統的監控等。

等等。。。。。。

傳感器種類繁多,采集的資料量已經達到了海量。

這些資料比電商雙十一的量有過之而不及,怎樣才能處理好這些資料呢?如何做到實時的流式資料處理?

postgresql提供了一個很好的基于流的資料處理産品,實時計算能力達到了單機10w記錄/s(普通x86伺服器)。

下面是應用case。

下載下傳并安裝pipelinedb,它是基于postgresql改進的流式資料處理資料庫。

配置環境變量腳本

初始化資料庫

配置參數

啟動資料庫

建立流(從表裡消費資料)

應用場景例子,

.1. 假設傳感器會上傳3個資料,分别是傳感器id,時間,以及采樣值。

gid, crt_time, val

應用需要實時統計每分鐘,每小時,每天,每個傳感器上傳的值的最大,最小,平均值,以及 count。

建立三個流視圖,每個代表一個統計次元。

如下:

激活流

插入資料測試

壓力測試:

假設有10萬個傳感器,傳感器上傳的取值範圍1到100。

每秒約處理10萬記錄,統計次元見上面的流sql。

多輪測試後

.2. 假設車輛運作過程中,每隔一段時間會上傳位置資訊,

gid, crt_time, poi

應用需要按天,繪制車輛的路徑資訊(把多個point聚合成路徑類型,或者數組類型,或者字元串,。。。)。

假設有1000萬量車,每輛車每次上傳一個坐标和時間資訊,(或者是一批資訊)。

應用需求,

.2.1. 按天繪制車輛的路徑資訊

.2.2. 按小時統計每個區域有多少量車經過

建立流 (這裡假設點資訊已經經過了二進制編碼,用一個int8來表示,友善壓力測試)

壓力測試

.3. 按交警探頭為機關,統計每個探頭采集的車輛資訊。

例如

.3.1 以車輛為機關,統計車輛的探頭位置資訊,串成軌迹資料。

.3.2 以探頭為機關,統計每個路口的車流資訊。(假設一個探頭對應一個路口)

第一個需求和前面的繪制車輛軌迹例子一樣,統計路口流量資訊則是以探頭id為機關進行統計。

用法都差不多,不再舉例

.4. 實時股價預測。

可以結合madlib或者plr進行多元回歸,選擇最好的r2,根據對應的截距和斜率推測下一組股價。

需要用到udf,具體的用法參考我以前寫的文章。

這裡不再舉例。

.5. 商場wifi傳感器的資訊實時統計。

根據wifi提供的位置資訊,實時統計每個店鋪的人流量。店鋪的人均駐留時間,總計駐留時間。

.6. 假設你的資料處理場景,pg現有的函數無法處理怎麼辦?沒問題,pg提供了自定義udf,資料類型,操作符,索引方法等一系列api。你可以根據業務的需求,在此基礎上實作。

用法還有很多,無法窮舉。

下面再結合一個當下非常流行的消息隊列,pipelinedb可以實時的從消息隊列取資料并進行實時計算。

例子:

在本地起一個nginx服務端,并且使用siege模拟http請求,nginx将記錄這些行為,存儲為json格式到檔案中。

在本地起kafka服務端,使用kafkacat将nginx的通路日志不斷的push到kafka。

在pipelinedb中訂閱kafka的消息,并實時處理為想要的統計資訊,(web頁面的通路人數,延遲,等資訊)

安裝kafka

安裝siege和nginx

建立一個nginx配置檔案,記錄通路日志到/tmp/access.log,格式為json

啟動nginx

配置主機名

啟動kafka

産生一個随機url檔案

使用siege模拟通路這些url,nginx會産生通路日志到/tmp/access.log

将通路日志輸出到kafkacat,推送到kafka消息系統,對應的topic為logs_topic。

原始的消費方式如下:

接下來我們使用pipelinedb來實時消費這些消息,并轉化為需要的統計結果。

查詢流視圖,可以獲得目前nginx的通路統計。

接下來做一個更深入的實時分析,分析每個url的通路次數,使用者數,99%使用者的通路延遲低于多少。

使用pipelinedb + kafka,應用場景又更豐富了。

如何建構更大的實時消息處理叢集?

規劃好資料的分片規則(避免跨節點的統計),如果有跨節點通路需求,可以在每個節點使用次元表,來實作。

例如每天要處理 萬億 條消息,怎麼辦?

根據以上壓力測試,平均每台機器每秒處理10萬記錄(每天處理86億),計算需要用到116台postgresql。是不是很省心呢?

一個圖例:

每一層都可以擴充

從lvs到 haproxy到 kafka到 postgresql到 離線分析hawq。

PostgreSQL "物聯網"應用 - 1 實時流式資料處理案例(萬億每天)