物聯網的一個特點是萬物聯網,會産生大量的資料。
例如 :
一盒藥,從生産,到運輸,到藥店,到售賣。每流經一個節點,都會記錄它的資訊。
又如 :
健康手環,兒童防丢手表,一些動物遷徙研究的傳感器(如中華鲟),水紋監測,電網監測,瓦斯管道監測,氣象監測等等這些資訊。
股價的實時預測。
車流實時資料統計,車輛軌迹實時合并。
商場人流實時統計。
資料監控實時處理,例如資料庫的監控,伺服器的監控,作業系統的監控等。
等等。。。。。。
傳感器種類繁多,采集的資料量已經達到了海量。
這些資料比電商雙十一的量有過之而不及,怎樣才能處理好這些資料呢?如何做到實時的流式資料處理?
postgresql提供了一個很好的基于流的資料處理産品,實時計算能力達到了單機10w記錄/s(普通x86伺服器)。
下面是應用case。
下載下傳并安裝pipelinedb,它是基于postgresql改進的流式資料處理資料庫。
配置環境變量腳本
初始化資料庫
配置參數
啟動資料庫
建立流(從表裡消費資料)
應用場景例子,
.1. 假設傳感器會上傳3個資料,分别是傳感器id,時間,以及采樣值。
gid, crt_time, val
應用需要實時統計每分鐘,每小時,每天,每個傳感器上傳的值的最大,最小,平均值,以及 count。
建立三個流視圖,每個代表一個統計次元。
如下:
激活流
插入資料測試
壓力測試:
假設有10萬個傳感器,傳感器上傳的取值範圍1到100。
每秒約處理10萬記錄,統計次元見上面的流sql。
多輪測試後
.2. 假設車輛運作過程中,每隔一段時間會上傳位置資訊,
gid, crt_time, poi
應用需要按天,繪制車輛的路徑資訊(把多個point聚合成路徑類型,或者數組類型,或者字元串,。。。)。
假設有1000萬量車,每輛車每次上傳一個坐标和時間資訊,(或者是一批資訊)。
應用需求,
.2.1. 按天繪制車輛的路徑資訊
.2.2. 按小時統計每個區域有多少量車經過
建立流 (這裡假設點資訊已經經過了二進制編碼,用一個int8來表示,友善壓力測試)
壓力測試
.3. 按交警探頭為機關,統計每個探頭采集的車輛資訊。
例如
.3.1 以車輛為機關,統計車輛的探頭位置資訊,串成軌迹資料。
.3.2 以探頭為機關,統計每個路口的車流資訊。(假設一個探頭對應一個路口)
第一個需求和前面的繪制車輛軌迹例子一樣,統計路口流量資訊則是以探頭id為機關進行統計。
用法都差不多,不再舉例
.4. 實時股價預測。
可以結合madlib或者plr進行多元回歸,選擇最好的r2,根據對應的截距和斜率推測下一組股價。
需要用到udf,具體的用法參考我以前寫的文章。
這裡不再舉例。
.5. 商場wifi傳感器的資訊實時統計。
根據wifi提供的位置資訊,實時統計每個店鋪的人流量。店鋪的人均駐留時間,總計駐留時間。
.6. 假設你的資料處理場景,pg現有的函數無法處理怎麼辦?沒問題,pg提供了自定義udf,資料類型,操作符,索引方法等一系列api。你可以根據業務的需求,在此基礎上實作。
用法還有很多,無法窮舉。
下面再結合一個當下非常流行的消息隊列,pipelinedb可以實時的從消息隊列取資料并進行實時計算。
例子:
在本地起一個nginx服務端,并且使用siege模拟http請求,nginx将記錄這些行為,存儲為json格式到檔案中。
在本地起kafka服務端,使用kafkacat将nginx的通路日志不斷的push到kafka。
在pipelinedb中訂閱kafka的消息,并實時處理為想要的統計資訊,(web頁面的通路人數,延遲,等資訊)
安裝kafka
安裝siege和nginx
建立一個nginx配置檔案,記錄通路日志到/tmp/access.log,格式為json
啟動nginx
配置主機名
啟動kafka
産生一個随機url檔案
使用siege模拟通路這些url,nginx會産生通路日志到/tmp/access.log
将通路日志輸出到kafkacat,推送到kafka消息系統,對應的topic為logs_topic。
原始的消費方式如下:
接下來我們使用pipelinedb來實時消費這些消息,并轉化為需要的統計結果。
查詢流視圖,可以獲得目前nginx的通路統計。
接下來做一個更深入的實時分析,分析每個url的通路次數,使用者數,99%使用者的通路延遲低于多少。
使用pipelinedb + kafka,應用場景又更豐富了。
如何建構更大的實時消息處理叢集?
規劃好資料的分片規則(避免跨節點的統計),如果有跨節點通路需求,可以在每個節點使用次元表,來實作。
例如每天要處理 萬億 條消息,怎麼辦?
根據以上壓力測試,平均每台機器每秒處理10萬記錄(每天處理86億),計算需要用到116台postgresql。是不是很省心呢?
一個圖例:
每一層都可以擴充
從lvs到 haproxy到 kafka到 postgresql到 離線分析hawq。