背景
阿裡雲日志服務提供可托管、可擴充、高可用的資料加工服務。資料加工服務可用于資料的規整、富化、流轉、脫敏和過濾。本文為讀者帶來了資料加工動态解析與分發的最佳實踐。
場景
現有多個不同的APP,所有APP的程式運作日志都輸入到同一個中心Logstore中。每個APP的日志都是以分隔符分隔的文本日志,但是日志字段schema各不相同。日志樣例如下:
APP_1的日志樣例
content: schema_app1|113.17.4.39|www.zsc.mock.com|PUT|1082|404|https|28.3|Mozilla/5.0
APP_2的日志樣例
content: schema_app2|183.93.165.82|db-01|MySQL|5.5|0|cn-shanghai|1072|user-2
APP_3的日志樣例
content: schema_app3|root|container4|image3|www.jd.mock.com|221.176.106.202|200|01/Apr/2021:06:27:56
上述APP的日志格式為:"schema_id|字段值1|字段值2|字段值3..."
- schema_id為該日志的字段schema的ID
- "字段值X"是日志的各個字段值,每個字段值的字段名由schema_id對應schema定義。
所有schema的定義存儲在OSS的一個檔案中,并與schema_id一一映射。Schema定義檔案的内容格式如下:
{
"schema_app1": {
"fields": ["client_ip", "host", "http_method", "resquest_length", "status_code", "request_time", "user_agent"],
"logstore": "logstore_app1"
},
"schema_app2": {
"fields": ["client_ip", "db_name", "db_type", "db_version", "fail", "region", "check_rows", "user_name"],
"logstore": "logstore_app2"
},
"schema_app3": {
"fields": ["user", "container_name", "image_name", "referer", "container_ip", "status_code", "datetime"],
"logstore": "logstore_app3"
},
}
其中schema_app1等是schema_id。每個schema的定義包含兩個字段,fields和logstore,fields定義了該schema對應的字段名清單,logstore定義了該schema的日志要分發的目标logstore名。
需求
- 對中心Logstore中不同Schema的日志進行動态解析(Schema在動态變化),将分隔符分隔的各個字段值映射到對應的字段名上,形成結構化的日志。
- 不同Schema的日志分發到不同的Logstore中。
例子:
- 中心Logstore的原始日志
content: schema_app1|113.17.4.39|www.zsc.mock.com|PUT|1082|404|https|28.3|Mozilla/5.0
content: schema_app2|183.93.165.82|db-01|MySQL|5.5|0|cn-shanghai|1072|user-2
- 加工後的日志
輸出到logstore_app1:
{"client_ip": "113.17.4.39", "host": "www.zsc.mock.com", "http_method": "PUT", "resquest_length": 1082, "status_code": 404, "request_time": 28.3, "user_aent": "Mozilla/5.0"}
輸出到logstore_app2:
{"client_ip": "183.93.165.82", "db_name": "db-01", "db_type": "MySQL", "db_version": "5.5", "fail": 0, "region": "cn-shanghai", "check_rows": 1072, "user_name": "user-2"}
資料加工文法
資料加工的建立流程參考
建立資料加工任務# 1.原始日志切分出schema_id和日志内容raw_content
e_set("split_array", str_partition(v("content"), "|"))
e_set("schema_id", lst_get(v("split_array"), 0))
e_set("raw_content", lst_get(v("split_array"), 2))
# 2.根據schema_id從OSS讀取對應的schema内容
e_set(
"schema",
dct_get(
res_oss_file(
endpoint="http://oss-cn-hangzhou.aliyuncs.com",
ak_id=res_local("AK_ID"),
ak_key=res_local("AK_KEY"),
bucket="ali-licheng-demo",
file="schema_lib/schema.json",
change_detect_interval=20,
),
v("schema_id"),
),
)
# 3.從schema中讀取字段名清單fields和分發的目标Logstore
e_set("fields", dct_get(v("schema"), "fields"))
e_set("target_logstore", dct_get(v("schema"), "logstore"))
# 丢棄多餘字段
e_keep_fields("raw_content", "fields", "target_logstore", F_TIME, F_META)
# 4.解析分隔符日志,并映射到fields中的字段上
e_psv("raw_content", json_parse(v("fields")))
# 丢棄多餘字段
e_drop_fields("fields", "raw_content")
# 5.根據schema中定義的logstore名,動态分發
e_output(project="licheng-simulator-test", logstore=v("target_logstore"))
上述加工文法的加工總體流程如下:
1.将原始日志切分出schema_id和日志内容raw_content,即:
原始日志:
content: schema_app1|113.17.4.39|www.zsc.mock.com|PUT|1082|404|https|28.3|Mozilla/5.0
切分為:
schema_id: schema_app1
raw_content: 113.17.4.39|www.zsc.mock.com|PUT|1082|404|https|28.3|Mozilla/5.0
2.根據schema_id從OSS讀取對應的schema内容
- res_oss_file函數用于讀取OSS中的檔案,具體用法參考 res_oss_file使用說明 。
3. 從schema中讀取字段名清單fields和分發的目标Logstore
- 每個schema中都定義了該schema日志的字段名清單以及分發的目标Logstore名
4.解析分隔符日志,并映射到fields中的字段上
- 使用e_psv函數解析豎線分隔符日志(|),并映射到schema中定義的字段名清單上。
- 參考 e_csv, e_psv, e_tsv用法指南
5.根據schema中定義的分發logstore名,實作日志的動态分發。
- e_output用法參考 e_output,e_coutput用法指南
加工後的結果示例

後續維護
後續維護過程中,如果APP日志的Schema發生變化,或者有新的APP日志進來,隻需在OSS中的schema庫檔案中修改和增加對應APP的schema定義即可,無需對加工任務做任何修改。