需求背景
StarRocks 的審計日志記錄業務對SR服務端的請求日志,日志内容預設區分為 'slow_query' 和 'query' 2個子產品(标簽)的操作請求,除了直接對日志進行分析以外,還可以将審計日志記錄從日志流入到sr的自定義表中,友善開發通過SQL查詢方式對業務任務請求進行分析優化和性能問題的定位。
實作架構
資料流實作架構
軟體環境需求
Centos 7.x
StarRocks 2.4.x(這裡比較重要,不同版本日志内容有差異)
Kafka
filebeat
實作過程
建立kafka topic,用來存儲filebeat收集的audit log。
./bin/kafka-topics.sh --create --bootstrap-server broker清單 --replication-factor 副本數 --partitions 分區數 --topic topic名稱
部署filebeat,推薦使用rpm包安裝。
filebeat rpm download
編寫 filebeat.yml 配置檔案,這裡對版本有要求,我這個隻适用sr2.4.0~2.4.2。
filebeat.inputs:
- type: log
enabled: true
ignore_older: 5m
include_lines: ['slow_query', 'query']
multiline:
pattern: '^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}'
negate: true
match: after
paths:
- 這裡寫審計日志AuditLog的實際存儲全路徑
fields:
log_topics: 這裡寫KafkaTopic名稱
processors:
- script:
lang: javascript
id: my_filter
tag: enable
source: >
function process(event) {
var str= event.Get("message");
var slow_time = str.substr(0, 19);
var query_type = str.substr(25,5);
var detail_query = str.substr(38);
var js_arr = detail_query.split("|");
var len = js_arr.length;
if(len == 18 && query_type =='query'){
var Client_tmp = js_arr[0];
var Client_tmp2 = Client_tmp.replace('Client=','');
var Client_tmp3 = Client_tmp2.replace('t=','');
var Client_arr = Client_tmp3.split(":");
var Client = Client_arr[0];
var User_tmp = js_arr[1];
var User = User_tmp.replace('User=','');
var AuthorizedUser_tmp = js_arr[2];
var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
var ResourceGroup_tmp = js_arr[3];
var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
var Catalog_tmp = js_arr[4];
var Catalog = Catalog_tmp.replace('Catalog=','');
var Db_tmp = js_arr[5];
var Db = Db_tmp.replace('Db=','');
var State_tmp = js_arr[6];
var State = State_tmp.replace('State=','');
var ErrorCode_tmp = js_arr[7];
var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
var Time_tmp = js_arr[8];
var Time = Time_tmp.replace('Time=','');
var ScanBytes_tmp = js_arr[9];
var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
var ScanRows_tmp = js_arr[10];
var ScanRows = ScanRows_tmp.replace('ScanRows=','');
var ReturnRows_tmp = js_arr[11];
var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
var StmtId_tmp = js_arr[len-6];
var StmtId = StmtId_tmp.replace('StmtId=','');
var QueryId_tmp = js_arr[len-5];
var QueryId = QueryId_tmp.replace('QueryId=','');
var IsQuery_tmp = js_arr[len-4];
var IsQuery = IsQuery_tmp.replace('IsQuery=','');
var feIp_tmp = js_arr[len-3];
var feIp = feIp_tmp.replace('feIp=','');
var Stmt_tmp = js_arr[len-2];
var Stmt = Stmt_tmp.replace('Stmt=','');
var Stmt = Stmt.substring(0,65530);
var Digest_tmp = js_arr[len-1];
var Digest = Digest_tmp.replace('Digest=','');
}
else if(len == 22 && query_type == 'query'){
var Client_tmp = js_arr[0];
var Client_tmp2 = Client_tmp.replace('Client=','');
var Client_tmp3 = Client_tmp2.replace('t=','');
var Client_arr = Client_tmp3.split(":");
var Client = Client_arr[0];
var User_tmp = js_arr[1];
var User = User_tmp.replace('User=','');
var AuthorizedUser_tmp = js_arr[2];
var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
var ResourceGroup_tmp = js_arr[3];
var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
var Catalog_tmp = js_arr[4];
var Catalog = Catalog_tmp.replace('Catalog=','');
var Db_tmp = js_arr[5];
var Db = Db_tmp.replace('Db=','');
var State_tmp = js_arr[6];
var State = State_tmp.replace('State=','');
var ErrorCode_tmp = js_arr[7];
var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
var Time_tmp = js_arr[8];
var Time = Time_tmp.replace('Time=','');
var ScanBytes_tmp = js_arr[9];
var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
var ScanRows_tmp = js_arr[10];
var ScanRows = ScanRows_tmp.replace('ScanRows=','');
var ReturnRows_tmp = js_arr[11];
var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
var CpuCostNs_tmp = js_arr[12];
var CpuCostNs = CpuCostNs_tmp.replace('CpuCostNs=','');
var MemCostBytes_tmp = js_arr[13];
var MemCostBytes = MemCostBytes_tmp.replace('MemCostBytes=','');
var StmtId_tmp = js_arr[len-8];
var StmtId = StmtId_tmp.replace('StmtId=','');
var QueryId_tmp = js_arr[len-7];
var QueryId = QueryId_tmp.replace('QueryId=','');
var IsQuery_tmp = js_arr[len-6];
var IsQuery = IsQuery_tmp.replace('IsQuery=','');
var feIp_tmp = js_arr[len-5];
var feIp = feIp_tmp.replace('feIp=','');
var Stmt_tmp = js_arr[len-4];
var Stmt = Stmt_tmp.replace('Stmt=','');
var Stmt = Stmt.substring(0,65530);
var Digest_tmp = js_arr[len-3];
var Digest = Digest_tmp.replace('Digest=','');
var PlanCpuCost_tmp = js_arr[len-2];
var PlanCpuCost = PlanCpuCost_tmp.replace('PlanCpuCost=','');
var PlanMemCost_tmp = js_arr[len-1];
var PlanMemCost = PlanMemCost_tmp.replace('PlanMemCost=','');
}
else if(len == 18 && query_type == 'slow_'){
var Client_tmp = js_arr[0];
var Client_tmp2 = Client_tmp.replace('Client=','');
var Client_tmp3 = Client_tmp2.replace('t=','');
var Client_arr = Client_tmp3.split(":");
var Client = Client_arr[0];
var User_tmp = js_arr[1];
var User = User_tmp.replace('User=','');
var AuthorizedUser_tmp = js_arr[2];
var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
var ResourceGroup_tmp = js_arr[3];
var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
var Catalog_tmp = js_arr[4];
var Catalog = Catalog_tmp.replace('Catalog=','');
var Db_tmp = js_arr[5];
var Db = Db_tmp.replace('Db=','');
var State_tmp = js_arr[6];
var State = State_tmp.replace('State=','');
var ErrorCode_tmp = js_arr[7];
var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
var Time_tmp = js_arr[8];
var Time = Time_tmp.replace('Time=','');
var ScanBytes_tmp = js_arr[9];
var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
var ScanRows_tmp = js_arr[10];
var ScanRows = ScanRows_tmp.replace('ScanRows=','');
var ReturnRows_tmp = js_arr[11];
var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
var CpuCostNs_tmp = js_arr[12];
var CpuCostNs = CpuCostNs_tmp.replace('CpuCostNs=','');
var MemCostBytes_tmp = js_arr[13];
var MemCostBytes = MemCostBytes_tmp.replace('MemCostBytes=','');
var StmtId_tmp = js_arr[len-6];
var StmtId = StmtId_tmp.replace('StmtId=','');
var QueryId_tmp = js_arr[len-5];
var QueryId = QueryId_tmp.replace('QueryId=','');
var IsQuery_tmp = js_arr[len-4];
var IsQuery = IsQuery_tmp.replace('IsQuery=','');
var feIp_tmp = js_arr[len-3];
var feIp = feIp_tmp.replace('feIp=','');
var Stmt_tmp = js_arr[len-2];
var Stmt = Stmt_tmp.replace('Stmt=','');
var Stmt = Stmt.substring(0,65530);
var Digest_tmp = js_arr[len-1];
var Digest = Digest_tmp.replace('Digest=','');
}
else{
var Client_tmp = js_arr[0];
var Client_tmp2 = Client_tmp.replace('Client=','');
var Client_tmp3 = Client_tmp2.replace('t=','');
var Client_arr = Client_tmp3.split(":");
var Client = Client_arr[0];
var User_tmp = js_arr[1];
var User = User_tmp.replace('User=','');
var AuthorizedUser_tmp = js_arr[2];
var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
var ResourceGroup_tmp = js_arr[3];
var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
var Catalog_tmp = js_arr[4];
var Catalog = Catalog_tmp.replace('Catalog=','');
var Db_tmp = js_arr[5];
var Db = Db_tmp.replace('Db=','');
var State_tmp = js_arr[6];
var State = State_tmp.replace('State=','');
var ErrorCode_tmp = js_arr[7];
var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
var Time_tmp = js_arr[8];
var Time = Time_tmp.replace('Time=','');
var ScanBytes_tmp = js_arr[9];
var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
var ScanRows_tmp = js_arr[10];
var ScanRows = ScanRows_tmp.replace('ScanRows=','');
var ReturnRows_tmp = js_arr[11];
var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
var StmtId_tmp = js_arr[len-6];
var StmtId = StmtId_tmp.replace('StmtId=','');
var QueryId_tmp = js_arr[len-5];
var QueryId = QueryId_tmp.replace('QueryId=','');
var IsQuery_tmp = js_arr[len-4];
var IsQuery = IsQuery_tmp.replace('IsQuery=','');
var feIp_tmp = js_arr[len-3];
var feIp = feIp_tmp.replace('feIp=','');
var Stmt_tmp = js_arr[len-2];
var Stmt = Stmt_tmp.replace('Stmt=','');
var Stmt = Stmt.substring(0,65530);
var Digest_tmp = js_arr[len-1];
var Digest = Digest_tmp.replace('Digest=','');
}
event.Put("query_type",query_type);
event.Put("execution_time",slow_time);
event.Put("Client",Client);
event.Put("User",User);
event.Put("AuthorizedUser",AuthorizedUser);
event.Put("ResourceGroup","ResourceGroup");
event.Put("Catalog","Catalog");
event.Put("Db",Db);
event.Put("State",State);
event.Put("ErrorCode","ErrorCode");
event.Put("Time",Time);
event.Put("ScanBytes",ScanBytes);
event.Put("ScanRows",ScanRows);
event.Put("ReturnRows",ReturnRows);
event.Put("CpuCostNs",CpuCostNs);
event.Put("MemCostBytes",MemCostBytes);
event.Put("StmtId",StmtId);
event.Put("QueryId",QueryId);
event.Put("IsQuery",IsQuery);
event.Put("feIp",feIp);
event.Put("Stmt",Stmt);
event.Put("Digest",Digest);
event.Put("PlanCpuCost",PlanCpuCost);
event.Put("PlanMemCost",PlanMemCost);
event.Put("igid",'這裡寫fe節點的自定義辨別');
}
- drop_fields:
fields: ["ecs","agent","message","log","host"]
output.kafka:
enabled: true
hosts: ["這裡寫KafkaBrokerList"]
topic: '%{[fields][log_topics]}'
worker: 1
timeout: 30s
broker_timeout: 10s
keep_alive: 0
compression: gzip
required_acks: 1
client_id: 這裡寫個自定義的client_id辨別
啟動filebeat程序,配置檔案名: filebeat.yml
filebeat -c filebeat.yml -e >> filebeat.log 2>&1 &
檢查Kafka内的日志資料是否正常,特别是要注意kv的切分是否比對
kafka message
在StarRocks中建日志存儲表,sr可以是收集的來源sr也可以是另外的sr
CREATE TABLE 庫名.`表名` (
`execution_time` datetime COMMENT "slow time",
`igid` varchar(40) COMMENT "",
`db_name` varchar(300) COMMENT "db name",
`fe_ip` varchar(30) COMMENT "fe ip",
`query_id` varchar(200) COMMENT "QueryId",
`query_type` varchar(10) COMMENT "query_type",
`time` bigint(20) COMMENT "SQL run time",
`client` varchar(30) COMMENT "client ip",
`user` varchar(200) COMMENT "user name",
`authorizedUser` varchar(200) COMMENT "AuthorizedUser",
`resourceGroup` varchar(50) COMMENT "ResourceGroup",
`catalog` varchar(60) COMMENT "Catalog",
`errorCode` varchar(60) COMMENT "ErrorCode",
`state` varchar(200) COMMENT "State",
`scan_bytes` bigint(20) COMMENT "ScanBytes",
`scan_rows` bigint(20) COMMENT "ScanRows",
`return_rows` bigint(20) COMMENT "ReturnRows",
`cpu_cost_ns` bigint(20) COMMENT "CpuCostNs",
`mem_cost_bytes` bigint(20) COMMENT "MemCostBytes",
`PlanCpuCost` decimal(20,6) COMMENT "PlanCpuCost",
`PlanMemCost` decimal(20,6) COMMENT "PlanMemCost",
`stmt_id` bigint(20) COMMENT "StmtId",
`is_query` varchar(50) COMMENT "IsQuery",
`stmt` varchar(65533) COMMENT "Stmt,Query Detail",
`digest` varchar(50) default null comment 'SQL指紋 Digest'
) ENGINE=OLAP
DUPLICATE KEY(`execution_time`, `igid`, `db_name`, `fe_ip`, `query_id`)
COMMENT "Starrocks audit表"
PARTITION BY RANGE(`execution_time`)
(
PARTITION P20230209 VALUES [('2023-02-09 00:00:00'), ('2023-02-10 00:00:00')),
PARTITION P20230210 VALUES [('2023-02-10 00:00:00'), ('2023-02-11 00:00:00')))
DISTRIBUTED BY HASH(`igid`) BUCKETS 24
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "P"
);
建立routine load,實時消費Kafka的日志資料
CREATE ROUTINE LOAD 上文建立的庫名.routineLoad名 ON
上文建立的表名 columns(execution_time,igid,db_name,fe_ip,query_id,time,client,user,authorizedUser,resourceGroup,catalog,errorCode,state,scan_bytes,scan_rows,return_rows,cpu_cost_ns,mem_cost_bytes,PlanCpuCost,PlanMemCost,stmt_id,is_query,stmt,query_type,digest)
PROPERTIES ("format"="json","jsonpaths"="[\"$.execution_time\",\"$.igid\",\"$.Db\",\"$.feIp\",\"$.QueryId\",\"$.Time\",\"$.Client\",\"$.User\",\"$.AuthorizedUser\",\"$.ResourceGroup\",\"$.Catalog\",\"$.ErrorCode\",\"$.State\",\"$.ScanBytes\",\"$.ScanRows\",\"$.ReturnRows\",\"$.CpuCostNs\",\"$.MemCostBytes\",\"$.PlanCpuCost\",\"$.PlanMemCost\",\"$.StmtId\",\"$.IsQuery\",\"$.Stmt\",\"$.query_type\",\"$.Digest\"]","desired_concurrent_number"="8","max_error_number" = "9999999999","max_batch_rows"="200000","max_batch_size" = "209715200","max_batch_interval" = "10","strict_mode" = "false" )
FROM KAFKA("kafka_broker_list"= "上文配置的Kafka BrokerList", "kafka_topic" = "上文定義的Kafka Topic名稱", "property.kafka_default_offsets" = "OFFSET_END", "property.client.id" = "上文自定義的client_id辨別", "property.group.id" = "自定義個groupid辨別");
檢視表内的資料,如果不滿足要排查問題出在哪裡
在其他fe node進行同樣的filebeat收集
結語
日志流入表後,可以在此基礎上開發生成監控分析圖表,進行一些聚合統計分析。