天天看點

建立日志服務(Log Service)源表

本頁目錄

日志服務(Log Service),簡稱LOG,原SLS。是針對實時資料的一站式服務,在阿裡巴巴集團經曆大量大資料場景錘煉而成。無需開發就能快捷完成資料采集、消費、投遞以及查詢分析等功能,幫助提升運維、營運效率,建立DT時代海量日志處理能力。

日志服務本身是流資料存儲,實時計算 Flink能将其作為流式資料輸入。對于日志服務而言,資料格式類似JSON,示例如下。

  1. {

  2. "a": 1000,

  3. "b": 1234,

  4. "c": "li"

  5. }

對于實時計算而言,我們需要定義的DDL如下(sls即Log Service)。

  1. create table sls_stream(

  2. a int,

  3. b int,

  4. c varchar

  5. ) with (

  6. type ='sls',

  7. endPoint ='http://cXXXXXXXXyuncs.com',

  8. accessId ='0iXXXXXXXAs',

  9. accessKey ='yF60EXXXXXXXPJ2zhCfHU',

  10. startTime = '2017-07-05 00:00:00',

  11. project ='ali-XXXXX-streamtest',

  12. logStore ='strXXXtest',

  13. consumerGroup ='consXXXXroupTest1'

  14. );

目前預設支援三個屬性字段的擷取,也支援其他自定義寫入的字段。

字段名 注釋說明

__source__

消息源

__topic__

消息主題

__timestamp__

日志時間

舉例

通過 

HEADER

 關鍵字擷取屬性字段。

測試資料

  1. __topic__: ens_altar_flow

  2. result: {"MsgID":"ems0a","Version":"0.0.1"}

測試代碼

  1. CREATE TABLE sls_log (

  2. __topic__ varchar HEADER,

  3. result varchar

  4. )

  5. WITH

  6. (

  7. type ='sls'

  8. );

  9. CREATE TABLE sls_out (

  10. name varchar,

  11. MsgID varchar,

  12. Version varchar

  13. )

  14. WITH

  15. (

  16. type ='RDS'

  17. );

  18. INSERT INTO sls_out

  19. SELECT

  20. __topic__,

  21. JSON_VALUE(result,'$.MsgID'),

  22. JSON_VALUE(result,'$.Version')

  23. FROM

  24. sls_log

測試結果

name(VARCHAT) MsgID(VARCHAT) Version(VARCHAT)
ens_altar_flow ems0a 0.0.1

參數 備注
endPoint 消費端點資訊 日志服務的ENDPOINT位址
accessId sls讀取的accessKey N/A
accessKey sls讀取的密鑰
project 讀取的sls項目
logStore project下的具體的logStore
consumerGroup 消費組名 使用者可以自定義消費組名(沒有固定格式)
startTime 消費日志開始的時間點
heartBeatIntervalMills 可選,消費用戶端心跳間隔時間 預設為10s
maxRetryTimes 讀取最大嘗試次數 可選,預設為5
batchGetSize 單次讀取logGroup條數 可選,預設為10
lengthCheck 單行字段條數檢查政策 可選,預設為SKIP,其它可選值為EXCEPTION、PAD。SKIP:字段數目不符合時跳過 。EXCEPTION:字段數目不符合時抛出異常。PAD:按順序填充,不存在的置為null。
columnErrorDebug 是否打開調試開關,如果打開,會把解析異常的log列印出來 可選,預設為false
注意:
  • SLS暫不支援MAP類型的資料。
  • 字段順序支援無序(建議字段順序和表中定義一緻)。
  • 輸入資料源為Json形式時,注意定義分隔符,并且需要采用内置函數分析JSON_VALUE,否則就會解析失敗。報錯如下:
    1. 2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing error, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]

  • batchGetSize設定不能超過1000,否則會報錯
  • batchGetSize指明的是logGroup擷取的數量,如果單條logItem的大小和 batchGetSize都很大,很有可能會導緻頻繁的GC,這種情況下該參數應注意調小。

日志服務和實時計算字段類型對應關系,建議您使用該對應關系進行DDL聲明:

日志服務字段類型 流計算字段類型
STRING VARCHAR

本文轉自實時計算——

建立日志服務(Log Service)源表