天天看點

大資料:實時收集Starrocks審計日志入表進行SQL查詢分析

作者:明少三年

需求背景

StarRocks 的審計日志記錄業務對SR服務端的請求日志,日志内容預設區分為 'slow_query' 和 'query' 2個子產品(标簽)的操作請求,除了直接對日志進行分析以外,還可以将審計日志記錄從日志流入到sr的自定義表中,友善開發通過SQL查詢方式對業務任務請求進行分析優化和性能問題的定位。

實作架構

大資料:實時收集Starrocks審計日志入表進行SQL查詢分析

資料流實作架構

軟體環境需求

Centos 7.x

StarRocks 2.4.x(這裡比較重要,不同版本日志内容有差異)

Kafka

filebeat

實作過程

建立kafka topic,用來存儲filebeat收集的audit log。

./bin/kafka-topics.sh --create --bootstrap-server broker清單 --replication-factor 副本數 --partitions 分區數 --topic topic名稱           

部署filebeat,推薦使用rpm包安裝。

大資料:實時收集Starrocks審計日志入表進行SQL查詢分析

filebeat rpm download

編寫 filebeat.yml 配置檔案,這裡對版本有要求,我這個隻适用sr2.4.0~2.4.2。

filebeat.inputs:
- type: log
  enabled: true
  ignore_older: 5m
  include_lines: ['slow_query', 'query']
  multiline:
     pattern: '^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}'
     negate: true
     match: after  
  paths:
    - 這裡寫審計日志AuditLog的實際存儲全路徑
  fields:
    log_topics: 這裡寫KafkaTopic名稱
processors:
  - script:
      lang: javascript
      id: my_filter
      tag: enable
      source: >
        function process(event) {
            var str= event.Get("message");
            var slow_time = str.substr(0, 19);
            var query_type = str.substr(25,5);
            var detail_query = str.substr(38);
            var js_arr = detail_query.split("|");
            var len = js_arr.length;


            if(len == 18 && query_type =='query'){
                  var Client_tmp = js_arr[0];
                  var Client_tmp2 = Client_tmp.replace('Client=','');
                  var Client_tmp3 = Client_tmp2.replace('t=','');
                  var Client_arr = Client_tmp3.split(":");
                  var Client = Client_arr[0];
                  var User_tmp = js_arr[1];
                  var User = User_tmp.replace('User=','');
                  var AuthorizedUser_tmp = js_arr[2];
                  var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
                  var ResourceGroup_tmp = js_arr[3];
                  var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
                  var Catalog_tmp = js_arr[4];
                  var Catalog = Catalog_tmp.replace('Catalog=','');
                  var Db_tmp = js_arr[5];
                  var Db = Db_tmp.replace('Db=','');
                  var State_tmp = js_arr[6];
                  var State = State_tmp.replace('State=','');
                  var ErrorCode_tmp = js_arr[7];
                  var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
                  var Time_tmp = js_arr[8];
                  var Time = Time_tmp.replace('Time=','');
                  var ScanBytes_tmp = js_arr[9];
                  var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
                  var ScanRows_tmp = js_arr[10];
                  var ScanRows = ScanRows_tmp.replace('ScanRows=','');
                  var ReturnRows_tmp = js_arr[11];
                  var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
                  var StmtId_tmp = js_arr[len-6];
                  var StmtId = StmtId_tmp.replace('StmtId=','');
                  var QueryId_tmp = js_arr[len-5];
                  var QueryId = QueryId_tmp.replace('QueryId=','');
                  var IsQuery_tmp = js_arr[len-4];
                  var IsQuery = IsQuery_tmp.replace('IsQuery=','');
                  var feIp_tmp = js_arr[len-3];
                  var feIp = feIp_tmp.replace('feIp=','');
                  var Stmt_tmp = js_arr[len-2];
                  var Stmt = Stmt_tmp.replace('Stmt=','');
                  var Stmt = Stmt.substring(0,65530);
                  var Digest_tmp = js_arr[len-1];
                  var Digest = Digest_tmp.replace('Digest=','');

            } 
            else if(len == 22 && query_type == 'query'){
                  var Client_tmp = js_arr[0];
                  var Client_tmp2 = Client_tmp.replace('Client=','');
                  var Client_tmp3 = Client_tmp2.replace('t=','');
                  var Client_arr = Client_tmp3.split(":");
                  var Client = Client_arr[0];
                  var User_tmp = js_arr[1];
                  var User = User_tmp.replace('User=','');
                  var AuthorizedUser_tmp = js_arr[2];
                  var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
                  var ResourceGroup_tmp = js_arr[3];
                  var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
                  var Catalog_tmp = js_arr[4];
                  var Catalog = Catalog_tmp.replace('Catalog=','');
                  var Db_tmp = js_arr[5];
                  var Db = Db_tmp.replace('Db=','');
                  var State_tmp = js_arr[6];
                  var State = State_tmp.replace('State=','');
                  var ErrorCode_tmp = js_arr[7];
                  var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
                  var Time_tmp = js_arr[8];
                  var Time = Time_tmp.replace('Time=','');
                  var ScanBytes_tmp = js_arr[9];
                  var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
                  var ScanRows_tmp = js_arr[10];
                  var ScanRows = ScanRows_tmp.replace('ScanRows=','');
                  var ReturnRows_tmp = js_arr[11];
                  var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
                  var CpuCostNs_tmp = js_arr[12];
                  var CpuCostNs = CpuCostNs_tmp.replace('CpuCostNs=','');
                  var MemCostBytes_tmp = js_arr[13];
                  var MemCostBytes = MemCostBytes_tmp.replace('MemCostBytes=','');
                  var StmtId_tmp = js_arr[len-8];
                  var StmtId = StmtId_tmp.replace('StmtId=','');
                  var QueryId_tmp = js_arr[len-7];
                  var QueryId = QueryId_tmp.replace('QueryId=','');
                  var IsQuery_tmp = js_arr[len-6];
                  var IsQuery = IsQuery_tmp.replace('IsQuery=','');
                  var feIp_tmp = js_arr[len-5];
                  var feIp = feIp_tmp.replace('feIp=','');
                  var Stmt_tmp = js_arr[len-4];
                  var Stmt = Stmt_tmp.replace('Stmt=','');
                  var Stmt = Stmt.substring(0,65530);
                  var Digest_tmp = js_arr[len-3];
                  var Digest = Digest_tmp.replace('Digest=','');
                  var PlanCpuCost_tmp = js_arr[len-2];
                  var PlanCpuCost = PlanCpuCost_tmp.replace('PlanCpuCost=','');
                  var PlanMemCost_tmp = js_arr[len-1];
                  var PlanMemCost = PlanMemCost_tmp.replace('PlanMemCost=','');

            }
            else if(len == 18 && query_type == 'slow_'){
                  var Client_tmp = js_arr[0];
                  var Client_tmp2 = Client_tmp.replace('Client=','');
                  var Client_tmp3 = Client_tmp2.replace('t=','');
                  var Client_arr = Client_tmp3.split(":");
                  var Client = Client_arr[0];
                  var User_tmp = js_arr[1];
                  var User = User_tmp.replace('User=','');
                  var AuthorizedUser_tmp = js_arr[2];
                  var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
                  var ResourceGroup_tmp = js_arr[3];
                  var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
                  var Catalog_tmp = js_arr[4];
                  var Catalog = Catalog_tmp.replace('Catalog=','');
                  var Db_tmp = js_arr[5];
                  var Db = Db_tmp.replace('Db=','');
                  var State_tmp = js_arr[6];
                  var State = State_tmp.replace('State=','');
                  var ErrorCode_tmp = js_arr[7];
                  var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
                  var Time_tmp = js_arr[8];
                  var Time = Time_tmp.replace('Time=','');
                  var ScanBytes_tmp = js_arr[9];
                  var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
                  var ScanRows_tmp = js_arr[10];
                  var ScanRows = ScanRows_tmp.replace('ScanRows=','');
                  var ReturnRows_tmp = js_arr[11];
                  var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
                  var CpuCostNs_tmp = js_arr[12];
                  var CpuCostNs = CpuCostNs_tmp.replace('CpuCostNs=','');
                  var MemCostBytes_tmp = js_arr[13];
                  var MemCostBytes = MemCostBytes_tmp.replace('MemCostBytes=','');
                  var StmtId_tmp = js_arr[len-6];
                  var StmtId = StmtId_tmp.replace('StmtId=','');
                  var QueryId_tmp = js_arr[len-5];
                  var QueryId = QueryId_tmp.replace('QueryId=','');
                  var IsQuery_tmp = js_arr[len-4];
                  var IsQuery = IsQuery_tmp.replace('IsQuery=','');
                  var feIp_tmp = js_arr[len-3];
                  var feIp = feIp_tmp.replace('feIp=','');
                  var Stmt_tmp = js_arr[len-2];
                  var Stmt = Stmt_tmp.replace('Stmt=','');
                  var Stmt = Stmt.substring(0,65530);
                  var Digest_tmp = js_arr[len-1];
                  var Digest = Digest_tmp.replace('Digest=','');
            }
            else{
                  var Client_tmp = js_arr[0];
                  var Client_tmp2 = Client_tmp.replace('Client=','');
                  var Client_tmp3 = Client_tmp2.replace('t=','');
                  var Client_arr = Client_tmp3.split(":");
                  var Client = Client_arr[0];
                  var User_tmp = js_arr[1];
                  var User = User_tmp.replace('User=','');
                  var AuthorizedUser_tmp = js_arr[2];
                  var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
                  var ResourceGroup_tmp = js_arr[3];
                  var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
                  var Catalog_tmp = js_arr[4];
                  var Catalog = Catalog_tmp.replace('Catalog=','');
                  var Db_tmp = js_arr[5];
                  var Db = Db_tmp.replace('Db=','');
                  var State_tmp = js_arr[6];
                  var State = State_tmp.replace('State=','');
                  var ErrorCode_tmp = js_arr[7];
                  var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
                  var Time_tmp = js_arr[8];
                  var Time = Time_tmp.replace('Time=','');
                  var ScanBytes_tmp = js_arr[9];
                  var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
                  var ScanRows_tmp = js_arr[10];
                  var ScanRows = ScanRows_tmp.replace('ScanRows=','');
                  var ReturnRows_tmp = js_arr[11];
                  var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
                  var StmtId_tmp = js_arr[len-6];
                  var StmtId = StmtId_tmp.replace('StmtId=','');
                  var QueryId_tmp = js_arr[len-5];
                  var QueryId = QueryId_tmp.replace('QueryId=','');
                  var IsQuery_tmp = js_arr[len-4];
                  var IsQuery = IsQuery_tmp.replace('IsQuery=','');
                  var feIp_tmp = js_arr[len-3];
                  var feIp = feIp_tmp.replace('feIp=','');
                  var Stmt_tmp = js_arr[len-2];
                  var Stmt = Stmt_tmp.replace('Stmt=','');
                  var Stmt = Stmt.substring(0,65530);
                  var Digest_tmp = js_arr[len-1];
                  var Digest = Digest_tmp.replace('Digest=','');              
            }
            event.Put("query_type",query_type);
            event.Put("execution_time",slow_time);
            event.Put("Client",Client);
            event.Put("User",User);
            event.Put("AuthorizedUser",AuthorizedUser);
            event.Put("ResourceGroup","ResourceGroup");
            event.Put("Catalog","Catalog");
            event.Put("Db",Db);
            event.Put("State",State);
            event.Put("ErrorCode","ErrorCode");
            event.Put("Time",Time);
            event.Put("ScanBytes",ScanBytes);
            event.Put("ScanRows",ScanRows);
            event.Put("ReturnRows",ReturnRows);
            event.Put("CpuCostNs",CpuCostNs);
            event.Put("MemCostBytes",MemCostBytes);
            event.Put("StmtId",StmtId);
            event.Put("QueryId",QueryId);
            event.Put("IsQuery",IsQuery);
            event.Put("feIp",feIp);
            event.Put("Stmt",Stmt);
            event.Put("Digest",Digest);
            event.Put("PlanCpuCost",PlanCpuCost);
            event.Put("PlanMemCost",PlanMemCost);
            event.Put("igid",'這裡寫fe節點的自定義辨別');
        }
  - drop_fields:
      fields: ["ecs","agent","message","log","host"]


output.kafka:
  enabled: true
  hosts: ["這裡寫KafkaBrokerList"]
  topic: '%{[fields][log_topics]}'
  worker: 1
  timeout: 30s
  broker_timeout: 10s
  keep_alive: 0
  compression: gzip
  required_acks: 1
  client_id: 這裡寫個自定義的client_id辨別
           

啟動filebeat程序,配置檔案名: filebeat.yml

filebeat -c filebeat.yml -e >> filebeat.log 2>&1 &           

檢查Kafka内的日志資料是否正常,特别是要注意kv的切分是否比對

大資料:實時收集Starrocks審計日志入表進行SQL查詢分析

kafka message

在StarRocks中建日志存儲表,sr可以是收集的來源sr也可以是另外的sr

CREATE TABLE 庫名.`表名` (
  `execution_time` datetime  COMMENT "slow time",
  `igid` varchar(40)  COMMENT "",
  `db_name` varchar(300) COMMENT "db name",
  `fe_ip` varchar(30)  COMMENT "fe ip",
  `query_id` varchar(200)  COMMENT "QueryId",
  `query_type` varchar(10) COMMENT "query_type",
  `time` bigint(20)  COMMENT "SQL run time",
  `client` varchar(30)  COMMENT "client ip",
  `user` varchar(200) COMMENT "user name",
  `authorizedUser` varchar(200) COMMENT "AuthorizedUser",
  `resourceGroup` varchar(50) COMMENT "ResourceGroup",
  `catalog` varchar(60) COMMENT "Catalog",
  `errorCode` varchar(60) COMMENT "ErrorCode",
  `state` varchar(200) COMMENT "State",
  `scan_bytes` bigint(20)  COMMENT "ScanBytes",
  `scan_rows` bigint(20) COMMENT "ScanRows",
  `return_rows` bigint(20) COMMENT "ReturnRows",
  `cpu_cost_ns` bigint(20) COMMENT "CpuCostNs",
  `mem_cost_bytes` bigint(20) COMMENT "MemCostBytes",
  `PlanCpuCost` decimal(20,6) COMMENT "PlanCpuCost",
  `PlanMemCost` decimal(20,6) COMMENT "PlanMemCost",
  `stmt_id` bigint(20)  COMMENT "StmtId",
  `is_query` varchar(50)  COMMENT "IsQuery",
  `stmt` varchar(65533)  COMMENT "Stmt,Query Detail",
  `digest` varchar(50) default null comment 'SQL指紋 Digest'
) ENGINE=OLAP
DUPLICATE KEY(`execution_time`, `igid`, `db_name`, `fe_ip`, `query_id`)
COMMENT "Starrocks audit表"
PARTITION BY RANGE(`execution_time`)
(
PARTITION P20230209 VALUES [('2023-02-09 00:00:00'), ('2023-02-10 00:00:00')),
PARTITION P20230210 VALUES [('2023-02-10 00:00:00'), ('2023-02-11 00:00:00')))
DISTRIBUTED BY HASH(`igid`) BUCKETS 24
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "P"
);           

建立routine load,實時消費Kafka的日志資料

CREATE ROUTINE LOAD 上文建立的庫名.routineLoad名 ON
 上文建立的表名 columns(execution_time,igid,db_name,fe_ip,query_id,time,client,user,authorizedUser,resourceGroup,catalog,errorCode,state,scan_bytes,scan_rows,return_rows,cpu_cost_ns,mem_cost_bytes,PlanCpuCost,PlanMemCost,stmt_id,is_query,stmt,query_type,digest)
PROPERTIES ("format"="json","jsonpaths"="[\"$.execution_time\",\"$.igid\",\"$.Db\",\"$.feIp\",\"$.QueryId\",\"$.Time\",\"$.Client\",\"$.User\",\"$.AuthorizedUser\",\"$.ResourceGroup\",\"$.Catalog\",\"$.ErrorCode\",\"$.State\",\"$.ScanBytes\",\"$.ScanRows\",\"$.ReturnRows\",\"$.CpuCostNs\",\"$.MemCostBytes\",\"$.PlanCpuCost\",\"$.PlanMemCost\",\"$.StmtId\",\"$.IsQuery\",\"$.Stmt\",\"$.query_type\",\"$.Digest\"]","desired_concurrent_number"="8","max_error_number" = "9999999999","max_batch_rows"="200000","max_batch_size" = "209715200","max_batch_interval" = "10","strict_mode" = "false" )
FROM KAFKA("kafka_broker_list"= "上文配置的Kafka BrokerList", "kafka_topic" = "上文定義的Kafka Topic名稱", "property.kafka_default_offsets" = "OFFSET_END", "property.client.id" = "上文自定義的client_id辨別", "property.group.id" = "自定義個groupid辨別");
           

檢視表内的資料,如果不滿足要排查問題出在哪裡

在其他fe node進行同樣的filebeat收集

結語

日志流入表後,可以在此基礎上開發生成監控分析圖表,進行一些聚合統計分析。

繼續閱讀