前言
本文将介紹在Blink實時計算平台如何使用InfluxDB作為流計算的結果表,以及建立相應的作業流程。
InfluxDB Writer
InfluxDB Sink 是 Blink 的一個插件,實作了将資料點寫入到阿裡雲時序資料庫Influxdb版中。
Quick Start
DDL定義
流計算支援使用InfluxDB作為結果輸出。示例代碼如下:
create table stream_test_influxdb(
`metric` varchar,
`timestamp` BIGINT,
`tag_value1` varchar,
`field_fieldValue1` Double
)with(
type = 'influxdb',
endpoint = 'https://localhost:3242',
database = 'blink',
batchPutsize = '1',
username = 'test',
password = 'Test1111'
);
建表預設格式:
第1列:metric,varchar
第2列:timestamp,BIGINT 機關:毫秒
第3列:tag_value1,varchar
第4列:field_fieldValue1,Double
注意:
1、 metric和timestamp必須存在。
2、支援多個tag和field,但至少添加1個tag_和1個field_。
3、結果表中隻支援metric、timestamp、tag_和field_,不能出現其他的字段。
4、endpoint支援http和https。
WITH參數
參數 | 參數說明 | 是否必填 | 備注 |
---|---|---|---|
type | 必填 | 是 | 固定為influxdb |
endpoint | 協定://host:port,支援http和https | 例如: https://localhost:3242 ,或者 http://localhost:8086 | |
database | 寫入InfluxDB的資料庫名 | 例如:db-blink或者blink | |
username | InfluxDB的使用者名 | 需要對寫入的資料庫有寫權限 | |
password | InfluxDB的密碼 | 預設為0。 | |
retentionPolicy | 保留政策 | 否 | 不先寫的話,預設寫入每個database的預設保留政策 |
batchPutSize | 批量送出的記錄條數 | 預設每次送出500個資料點。 |
FAQ:
Question:field_fieldValue支援多少個?
答:預設和InfluxDB支援的一緻。
Question:多個field_fieldValue如何寫入?
答:field_fieldValue1 類型,
field_fieldValue2 類型,
...
field_fieldValueN 類型
例如:
field_fieldValue1 Double,
field_fieldValue2 INTERGER,
field_fieldValueN INTERGER
使用步驟
建立新的任務:
在Blink實時計算平台資料開發子產品建立新任務,并填寫節點類型、Blink版本、節點名稱以及目标檔案夾等相關内容
定義任務DDL
建立任務之後,進入該任務,點選切換為SQL模式按鈕。按照之前介紹的DDL定義開發自己的任務。如下圖所示:

所用的DDL語句參看DDL定義一節。
釋出和啟動:
作業完成之後,點選釋出,選擇運作環境及配置可用CU,此次建立的流式作業就正式啟動了,可通過運維界面管理作業以及檢視作業運作相關資訊。