前言
本文将介紹在Blink實時計算平台建立以Tablestore作為流計算的源表以及結果表作業的流程。
表格存儲通道服務
表格存儲
通道服務是基于表格存儲(Tablestore)資料接口之上的全增量一體化服務,它通過一組Tunnel Service API和SDK為使用者提供了增量、全量和增量加全量三種類型的分布式資料實時消費通道。通過為資料表建立Tunnel Service資料通道,使用者可以通過流式計算的方式對表中曆史存量和新增的資料進行消費處理。
流計算能将Tunnel Service資料通道作為流式資料的輸入,每條資料類似一個JSON格式,如下所示:
{
"OtsRecordType": "PUT", // 資料操作類型,包括PUT、UPDATE、DELETE
"OtsRecordTimestamp": 1506416585740836, //資料寫入時間(微秒),全量資料時為0
"PrimaryKey": [
{
"ColumnName": "pk_1", //第一主鍵列
"Value": 1506416585881590900
},
{
"ColumnName": "pk_2", //第二主鍵列
"Value": "string_pk_value"
}
],
"Columns": [
{
"OtsColumnType": "Put", // 列操作類型,包括PUT、DELETE_ONE_VERSION、DELETE_ALL_VERSION
"ColumnName": "attr_0",
"Value": "hello_table_store",
},
{
"OtsColumnType": "DELETE_ONE_VERSION", // DELETE操作沒有Value字段
"ColumnName": "attr_1"
}
]
}
其中,資料的各個主鍵和屬性列值均可以在BLINK DDL以列名以及相應的類型映射讀取,例如上述執行個體,我們需要定義的DDL如下所示:
create table tablestore_stream(
pk_1 BIGINT,
pk_2 VARCHAR,
attr_0 VARCHAR,
attr_1 DOUBLE,
primary key(pk_1, pk_2)
) with (
type ='ots',
endPoint ='http://blink-demo.cn-hangzhou.vpc.tablestore.aliyuncs.com',
instanceName = "blink-demo",
tableName ='demo_table',
tunnelName = 'blink-demo-stream',
accessId ='xxxxxxxxxxx',
accessKey ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
ignoreDelete = 'false' //是否忽略delete操作的資料
);
如果字段名稱有字首,需要使用反撇,例:OTS字段名稱為TEST.test,BLINK DDL定義為
TEST.test
。而OtsRecordType、OtsRecordTimestamp字段以及每個Column的OtsColumnType字段都會支援通過屬性字段的方式讀取:
字段名 | 說明 |
---|---|
OtsRecordType | 資料操作類型 |
OtsRecordTimestamp | 資料操作時間(全量資料時為0) |
列名_OtsColumnType | 以具體列名和_"_____OtsColumnType__"_拼接,某列的操作類型 |
需要OtsRecordType和某些Column的OtsColumnType字段時,Blink提供了
HEADER
關鍵字用于擷取源表中的屬性字段,具體DDL:
create table tablestore_stream(
OtsRecordType VARCHAR HEADER,
OtsRecordTimestamp BIGINT HEADER,
pk_1 BIGINT,
pk_2 VARCHAR,
attr_0 VARCHAR,
attr_1 DOUBLE,
attr_1_OtsColumnType VARCHAR HEADER,
primary key(pk_1, pk_2)
) with (
...
);
WITH參數
參數 | 注釋說明 | 備注 |
---|---|---|
endPoint | 表格存儲的執行個體通路位址 | |
instanceName | 表格存儲的執行個體名稱 | |
tableName | 表格存儲的資料表名 | |
tunnelName | 表格存儲資料表的資料通道名 | |
accessId | 表格存儲讀取的accessKey | |
accessKey | 表格存儲讀取的秘鑰 | |
ignoreDelete | 是否忽略DELETE操作類型的實時資料 | 可選,預設為false |
SQL示例
資料同步,ots sink會以update的方式寫入結果表:
create table otsSource (
pkstr VARCHAR,
pklong BIGINT,
col0 VARCHAR,
primary key(pkstr, pklong)
) WITH (
type ='ots',
endPoint ='http://blink-demo.cn-hangzhou.ots.aliyuncs.com',
instanceName = "blink-demo",
tableName ='demo_table',
tunnelName = 'blink-demo-stream',
accessId ='xxxxxxxxxxx',
accessKey ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
ignoreDelete = 'true'
);
CREATE TABLE otsSink (
pkstr VARCHAR,
pklong BIGINT,
col0 VARCHAR,
primary key(pkstr, pklong)
) WITH (
type='ots',
instanceName='blink-target',
tableName='demo_table',
accessId ='xxxxxxxxxxx',
accessKey ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
endPoint='https://blink-target.cn-hangzhou.ots.aliyuncs.com',
valueColumns='col0'
);
INSERT INTO otsSink
SELECT t.pkstr, t.pklong, t.col0
FROM otsSource AS t
流計算作業建立流程
在Blink實時計算平台資料開發子產品建立新任務,并填寫節點類型、Blink版本、節點名稱以及目标檔案夾等相關内容,如下圖所示:

建立任務之後,進入該任務,點選切換為SQL模式按鈕。按照之前介紹的DDL定義開發自己的任務。如下圖所示:
作業完成之後,點選釋出,選擇運作環境及配置可用CU,此次建立的流式作業就正式啟動了,可通過運維界面管理作業以及檢視作業運作相關資訊。如下圖所示: