日志服務(Log Service),簡稱LOG,原SLS。是針對實時資料的一站式服務,在阿裡巴巴集團經曆大量大資料場景錘煉而成。無需開發就能快捷完成資料采集、消費、投遞以及查詢分析等功能,幫助提升運維、營運效率,建立DT時代海量日志處理能力。
日志服務本身是流資料存儲,實時計算 Flink能将其作為流式資料輸入。對于日志服務而言,資料格式類似JSON,示例如下。
-
{
-
"a": 1000,
-
"b": 1234,
-
"c": "li"
-
}
對于實時計算而言,我們需要定義的DDL如下(sls即Log Service)。
-
create table sls_stream(
-
a int,
-
b int,
-
c varchar
-
) with (
-
type ='sls',
-
endPoint ='http://cXXXXXXXXyuncs.com',
-
accessId ='0iXXXXXXXAs',
-
accessKey ='yF60EXXXXXXXPJ2zhCfHU',
-
startTime = '2017-07-05 00:00:00',
-
project ='ali-XXXXX-streamtest',
-
logStore ='strXXXtest',
-
consumerGroup ='consXXXXroupTest1'
-
);
目前預設支援三個屬性字段的擷取,也支援其他自定義寫入的字段。
字段名 | 注釋說明 |
---|---|
| 消息源 |
| 消息主題 |
| 日志時間 |
舉例
通過
HEADER
關鍵字擷取屬性字段。
測試資料
-
__topic__: ens_altar_flow
-
result: {"MsgID":"ems0a","Version":"0.0.1"}
測試代碼
-
CREATE TABLE sls_log (
-
__topic__ varchar HEADER,
-
result varchar
-
)
-
WITH
-
(
-
type ='sls'
-
);
-
CREATE TABLE sls_out (
-
name varchar,
-
MsgID varchar,
-
Version varchar
-
)
-
WITH
-
(
-
type ='RDS'
-
);
-
INSERT INTO sls_out
-
SELECT
-
__topic__,
-
JSON_VALUE(result,'$.MsgID'),
-
JSON_VALUE(result,'$.Version')
-
FROM
-
sls_log
測試結果
name(VARCHAT) | MsgID(VARCHAT) | Version(VARCHAT) |
---|---|---|
ens_altar_flow | ems0a | 0.0.1 |
參數 | 備注 | |
---|---|---|
endPoint | 消費端點資訊 | 日志服務的ENDPOINT位址 |
accessId | sls讀取的accessKey | N/A |
accessKey | sls讀取的密鑰 | |
project | 讀取的sls項目 | |
logStore | project下的具體的logStore | |
consumerGroup | 消費組名 | 使用者可以自定義消費組名(沒有固定格式) |
startTime | 消費日志開始的時間點 | |
heartBeatIntervalMills | 可選,消費用戶端心跳間隔時間 | 預設為10s |
maxRetryTimes | 讀取最大嘗試次數 | 可選,預設為5 |
batchGetSize | 單次讀取logGroup條數 | 可選,預設為10 |
lengthCheck | 單行字段條數檢查政策 | 可選,預設為SKIP,其它可選值為EXCEPTION、PAD。SKIP:字段數目不符合時跳過 。EXCEPTION:字段數目不符合時抛出異常。PAD:按順序填充,不存在的置為null。 |
columnErrorDebug | 是否打開調試開關,如果打開,會把解析異常的log列印出來 | 可選,預設為false |
注意:
- SLS暫不支援MAP類型的資料。
- 字段順序支援無序(建議字段順序和表中定義一緻)。
- 輸入資料源為Json形式時,注意定義分隔符,并且需要采用内置函數分析JSON_VALUE,否則就會解析失敗。報錯如下:
2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing error, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]
- batchGetSize設定不能超過1000,否則會報錯
- batchGetSize指明的是logGroup擷取的數量,如果單條logItem的大小和 batchGetSize都很大,很有可能會導緻頻繁的GC,這種情況下該參數應注意調小。
日志服務和實時計算字段類型對應關系,建議您使用該對應關系進行DDL聲明:
日志服務字段類型 | 流計算字段類型 |
---|---|
STRING | VARCHAR |
本文轉自實時計算——
建立日志服務(Log Service)源表