天天看点

大数据:实时收集Starrocks审计日志入表进行SQL查询分析

作者:明少三年

需求背景

StarRocks 的审计日志记录业务对SR服务端的请求日志,日志内容默认区分为 'slow_query' 和 'query' 2个模块(标签)的操作请求,除了直接对日志进行分析以外,还可以将审计日志记录从日志流入到sr的自定义表中,方便开发通过SQL查询方式对业务任务请求进行分析优化和性能问题的定位。

实现架构

大数据:实时收集Starrocks审计日志入表进行SQL查询分析

数据流实现架构

软件环境需求

Centos 7.x

StarRocks 2.4.x(这里比较重要,不同版本日志内容有差异)

Kafka

filebeat

实现过程

创建kafka topic,用来存储filebeat收集的audit log。

./bin/kafka-topics.sh --create --bootstrap-server broker列表 --replication-factor 副本数 --partitions 分区数 --topic topic名称           

部署filebeat,推荐使用rpm包安装。

大数据:实时收集Starrocks审计日志入表进行SQL查询分析

filebeat rpm download

编写 filebeat.yml 配置文件,这里对版本有要求,我这个只适用sr2.4.0~2.4.2。

filebeat.inputs:
- type: log
  enabled: true
  ignore_older: 5m
  include_lines: ['slow_query', 'query']
  multiline:
     pattern: '^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}'
     negate: true
     match: after  
  paths:
    - 这里写审计日志AuditLog的实际存储全路径
  fields:
    log_topics: 这里写KafkaTopic名称
processors:
  - script:
      lang: javascript
      id: my_filter
      tag: enable
      source: >
        function process(event) {
            var str= event.Get("message");
            var slow_time = str.substr(0, 19);
            var query_type = str.substr(25,5);
            var detail_query = str.substr(38);
            var js_arr = detail_query.split("|");
            var len = js_arr.length;


            if(len == 18 && query_type =='query'){
                  var Client_tmp = js_arr[0];
                  var Client_tmp2 = Client_tmp.replace('Client=','');
                  var Client_tmp3 = Client_tmp2.replace('t=','');
                  var Client_arr = Client_tmp3.split(":");
                  var Client = Client_arr[0];
                  var User_tmp = js_arr[1];
                  var User = User_tmp.replace('User=','');
                  var AuthorizedUser_tmp = js_arr[2];
                  var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
                  var ResourceGroup_tmp = js_arr[3];
                  var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
                  var Catalog_tmp = js_arr[4];
                  var Catalog = Catalog_tmp.replace('Catalog=','');
                  var Db_tmp = js_arr[5];
                  var Db = Db_tmp.replace('Db=','');
                  var State_tmp = js_arr[6];
                  var State = State_tmp.replace('State=','');
                  var ErrorCode_tmp = js_arr[7];
                  var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
                  var Time_tmp = js_arr[8];
                  var Time = Time_tmp.replace('Time=','');
                  var ScanBytes_tmp = js_arr[9];
                  var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
                  var ScanRows_tmp = js_arr[10];
                  var ScanRows = ScanRows_tmp.replace('ScanRows=','');
                  var ReturnRows_tmp = js_arr[11];
                  var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
                  var StmtId_tmp = js_arr[len-6];
                  var StmtId = StmtId_tmp.replace('StmtId=','');
                  var QueryId_tmp = js_arr[len-5];
                  var QueryId = QueryId_tmp.replace('QueryId=','');
                  var IsQuery_tmp = js_arr[len-4];
                  var IsQuery = IsQuery_tmp.replace('IsQuery=','');
                  var feIp_tmp = js_arr[len-3];
                  var feIp = feIp_tmp.replace('feIp=','');
                  var Stmt_tmp = js_arr[len-2];
                  var Stmt = Stmt_tmp.replace('Stmt=','');
                  var Stmt = Stmt.substring(0,65530);
                  var Digest_tmp = js_arr[len-1];
                  var Digest = Digest_tmp.replace('Digest=','');

            } 
            else if(len == 22 && query_type == 'query'){
                  var Client_tmp = js_arr[0];
                  var Client_tmp2 = Client_tmp.replace('Client=','');
                  var Client_tmp3 = Client_tmp2.replace('t=','');
                  var Client_arr = Client_tmp3.split(":");
                  var Client = Client_arr[0];
                  var User_tmp = js_arr[1];
                  var User = User_tmp.replace('User=','');
                  var AuthorizedUser_tmp = js_arr[2];
                  var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
                  var ResourceGroup_tmp = js_arr[3];
                  var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
                  var Catalog_tmp = js_arr[4];
                  var Catalog = Catalog_tmp.replace('Catalog=','');
                  var Db_tmp = js_arr[5];
                  var Db = Db_tmp.replace('Db=','');
                  var State_tmp = js_arr[6];
                  var State = State_tmp.replace('State=','');
                  var ErrorCode_tmp = js_arr[7];
                  var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
                  var Time_tmp = js_arr[8];
                  var Time = Time_tmp.replace('Time=','');
                  var ScanBytes_tmp = js_arr[9];
                  var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
                  var ScanRows_tmp = js_arr[10];
                  var ScanRows = ScanRows_tmp.replace('ScanRows=','');
                  var ReturnRows_tmp = js_arr[11];
                  var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
                  var CpuCostNs_tmp = js_arr[12];
                  var CpuCostNs = CpuCostNs_tmp.replace('CpuCostNs=','');
                  var MemCostBytes_tmp = js_arr[13];
                  var MemCostBytes = MemCostBytes_tmp.replace('MemCostBytes=','');
                  var StmtId_tmp = js_arr[len-8];
                  var StmtId = StmtId_tmp.replace('StmtId=','');
                  var QueryId_tmp = js_arr[len-7];
                  var QueryId = QueryId_tmp.replace('QueryId=','');
                  var IsQuery_tmp = js_arr[len-6];
                  var IsQuery = IsQuery_tmp.replace('IsQuery=','');
                  var feIp_tmp = js_arr[len-5];
                  var feIp = feIp_tmp.replace('feIp=','');
                  var Stmt_tmp = js_arr[len-4];
                  var Stmt = Stmt_tmp.replace('Stmt=','');
                  var Stmt = Stmt.substring(0,65530);
                  var Digest_tmp = js_arr[len-3];
                  var Digest = Digest_tmp.replace('Digest=','');
                  var PlanCpuCost_tmp = js_arr[len-2];
                  var PlanCpuCost = PlanCpuCost_tmp.replace('PlanCpuCost=','');
                  var PlanMemCost_tmp = js_arr[len-1];
                  var PlanMemCost = PlanMemCost_tmp.replace('PlanMemCost=','');

            }
            else if(len == 18 && query_type == 'slow_'){
                  var Client_tmp = js_arr[0];
                  var Client_tmp2 = Client_tmp.replace('Client=','');
                  var Client_tmp3 = Client_tmp2.replace('t=','');
                  var Client_arr = Client_tmp3.split(":");
                  var Client = Client_arr[0];
                  var User_tmp = js_arr[1];
                  var User = User_tmp.replace('User=','');
                  var AuthorizedUser_tmp = js_arr[2];
                  var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
                  var ResourceGroup_tmp = js_arr[3];
                  var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
                  var Catalog_tmp = js_arr[4];
                  var Catalog = Catalog_tmp.replace('Catalog=','');
                  var Db_tmp = js_arr[5];
                  var Db = Db_tmp.replace('Db=','');
                  var State_tmp = js_arr[6];
                  var State = State_tmp.replace('State=','');
                  var ErrorCode_tmp = js_arr[7];
                  var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
                  var Time_tmp = js_arr[8];
                  var Time = Time_tmp.replace('Time=','');
                  var ScanBytes_tmp = js_arr[9];
                  var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
                  var ScanRows_tmp = js_arr[10];
                  var ScanRows = ScanRows_tmp.replace('ScanRows=','');
                  var ReturnRows_tmp = js_arr[11];
                  var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
                  var CpuCostNs_tmp = js_arr[12];
                  var CpuCostNs = CpuCostNs_tmp.replace('CpuCostNs=','');
                  var MemCostBytes_tmp = js_arr[13];
                  var MemCostBytes = MemCostBytes_tmp.replace('MemCostBytes=','');
                  var StmtId_tmp = js_arr[len-6];
                  var StmtId = StmtId_tmp.replace('StmtId=','');
                  var QueryId_tmp = js_arr[len-5];
                  var QueryId = QueryId_tmp.replace('QueryId=','');
                  var IsQuery_tmp = js_arr[len-4];
                  var IsQuery = IsQuery_tmp.replace('IsQuery=','');
                  var feIp_tmp = js_arr[len-3];
                  var feIp = feIp_tmp.replace('feIp=','');
                  var Stmt_tmp = js_arr[len-2];
                  var Stmt = Stmt_tmp.replace('Stmt=','');
                  var Stmt = Stmt.substring(0,65530);
                  var Digest_tmp = js_arr[len-1];
                  var Digest = Digest_tmp.replace('Digest=','');
            }
            else{
                  var Client_tmp = js_arr[0];
                  var Client_tmp2 = Client_tmp.replace('Client=','');
                  var Client_tmp3 = Client_tmp2.replace('t=','');
                  var Client_arr = Client_tmp3.split(":");
                  var Client = Client_arr[0];
                  var User_tmp = js_arr[1];
                  var User = User_tmp.replace('User=','');
                  var AuthorizedUser_tmp = js_arr[2];
                  var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
                  var ResourceGroup_tmp = js_arr[3];
                  var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
                  var Catalog_tmp = js_arr[4];
                  var Catalog = Catalog_tmp.replace('Catalog=','');
                  var Db_tmp = js_arr[5];
                  var Db = Db_tmp.replace('Db=','');
                  var State_tmp = js_arr[6];
                  var State = State_tmp.replace('State=','');
                  var ErrorCode_tmp = js_arr[7];
                  var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
                  var Time_tmp = js_arr[8];
                  var Time = Time_tmp.replace('Time=','');
                  var ScanBytes_tmp = js_arr[9];
                  var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
                  var ScanRows_tmp = js_arr[10];
                  var ScanRows = ScanRows_tmp.replace('ScanRows=','');
                  var ReturnRows_tmp = js_arr[11];
                  var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
                  var StmtId_tmp = js_arr[len-6];
                  var StmtId = StmtId_tmp.replace('StmtId=','');
                  var QueryId_tmp = js_arr[len-5];
                  var QueryId = QueryId_tmp.replace('QueryId=','');
                  var IsQuery_tmp = js_arr[len-4];
                  var IsQuery = IsQuery_tmp.replace('IsQuery=','');
                  var feIp_tmp = js_arr[len-3];
                  var feIp = feIp_tmp.replace('feIp=','');
                  var Stmt_tmp = js_arr[len-2];
                  var Stmt = Stmt_tmp.replace('Stmt=','');
                  var Stmt = Stmt.substring(0,65530);
                  var Digest_tmp = js_arr[len-1];
                  var Digest = Digest_tmp.replace('Digest=','');              
            }
            event.Put("query_type",query_type);
            event.Put("execution_time",slow_time);
            event.Put("Client",Client);
            event.Put("User",User);
            event.Put("AuthorizedUser",AuthorizedUser);
            event.Put("ResourceGroup","ResourceGroup");
            event.Put("Catalog","Catalog");
            event.Put("Db",Db);
            event.Put("State",State);
            event.Put("ErrorCode","ErrorCode");
            event.Put("Time",Time);
            event.Put("ScanBytes",ScanBytes);
            event.Put("ScanRows",ScanRows);
            event.Put("ReturnRows",ReturnRows);
            event.Put("CpuCostNs",CpuCostNs);
            event.Put("MemCostBytes",MemCostBytes);
            event.Put("StmtId",StmtId);
            event.Put("QueryId",QueryId);
            event.Put("IsQuery",IsQuery);
            event.Put("feIp",feIp);
            event.Put("Stmt",Stmt);
            event.Put("Digest",Digest);
            event.Put("PlanCpuCost",PlanCpuCost);
            event.Put("PlanMemCost",PlanMemCost);
            event.Put("igid",'这里写fe节点的自定义标识');
        }
  - drop_fields:
      fields: ["ecs","agent","message","log","host"]


output.kafka:
  enabled: true
  hosts: ["这里写KafkaBrokerList"]
  topic: '%{[fields][log_topics]}'
  worker: 1
  timeout: 30s
  broker_timeout: 10s
  keep_alive: 0
  compression: gzip
  required_acks: 1
  client_id: 这里写个自定义的client_id标识
           

启动filebeat进程,配置文件名: filebeat.yml

filebeat -c filebeat.yml -e >> filebeat.log 2>&1 &           

检查Kafka内的日志数据是否正常,特别是要注意kv的切分是否匹配

大数据:实时收集Starrocks审计日志入表进行SQL查询分析

kafka message

在StarRocks中建日志存储表,sr可以是收集的来源sr也可以是另外的sr

CREATE TABLE 库名.`表名` (
  `execution_time` datetime  COMMENT "slow time",
  `igid` varchar(40)  COMMENT "",
  `db_name` varchar(300) COMMENT "db name",
  `fe_ip` varchar(30)  COMMENT "fe ip",
  `query_id` varchar(200)  COMMENT "QueryId",
  `query_type` varchar(10) COMMENT "query_type",
  `time` bigint(20)  COMMENT "SQL run time",
  `client` varchar(30)  COMMENT "client ip",
  `user` varchar(200) COMMENT "user name",
  `authorizedUser` varchar(200) COMMENT "AuthorizedUser",
  `resourceGroup` varchar(50) COMMENT "ResourceGroup",
  `catalog` varchar(60) COMMENT "Catalog",
  `errorCode` varchar(60) COMMENT "ErrorCode",
  `state` varchar(200) COMMENT "State",
  `scan_bytes` bigint(20)  COMMENT "ScanBytes",
  `scan_rows` bigint(20) COMMENT "ScanRows",
  `return_rows` bigint(20) COMMENT "ReturnRows",
  `cpu_cost_ns` bigint(20) COMMENT "CpuCostNs",
  `mem_cost_bytes` bigint(20) COMMENT "MemCostBytes",
  `PlanCpuCost` decimal(20,6) COMMENT "PlanCpuCost",
  `PlanMemCost` decimal(20,6) COMMENT "PlanMemCost",
  `stmt_id` bigint(20)  COMMENT "StmtId",
  `is_query` varchar(50)  COMMENT "IsQuery",
  `stmt` varchar(65533)  COMMENT "Stmt,Query Detail",
  `digest` varchar(50) default null comment 'SQL指纹 Digest'
) ENGINE=OLAP
DUPLICATE KEY(`execution_time`, `igid`, `db_name`, `fe_ip`, `query_id`)
COMMENT "Starrocks audit表"
PARTITION BY RANGE(`execution_time`)
(
PARTITION P20230209 VALUES [('2023-02-09 00:00:00'), ('2023-02-10 00:00:00')),
PARTITION P20230210 VALUES [('2023-02-10 00:00:00'), ('2023-02-11 00:00:00')))
DISTRIBUTED BY HASH(`igid`) BUCKETS 24
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "P"
);           

创建routine load,实时消费Kafka的日志数据

CREATE ROUTINE LOAD 上文创建的库名.routineLoad名 ON
 上文创建的表名 columns(execution_time,igid,db_name,fe_ip,query_id,time,client,user,authorizedUser,resourceGroup,catalog,errorCode,state,scan_bytes,scan_rows,return_rows,cpu_cost_ns,mem_cost_bytes,PlanCpuCost,PlanMemCost,stmt_id,is_query,stmt,query_type,digest)
PROPERTIES ("format"="json","jsonpaths"="[\"$.execution_time\",\"$.igid\",\"$.Db\",\"$.feIp\",\"$.QueryId\",\"$.Time\",\"$.Client\",\"$.User\",\"$.AuthorizedUser\",\"$.ResourceGroup\",\"$.Catalog\",\"$.ErrorCode\",\"$.State\",\"$.ScanBytes\",\"$.ScanRows\",\"$.ReturnRows\",\"$.CpuCostNs\",\"$.MemCostBytes\",\"$.PlanCpuCost\",\"$.PlanMemCost\",\"$.StmtId\",\"$.IsQuery\",\"$.Stmt\",\"$.query_type\",\"$.Digest\"]","desired_concurrent_number"="8","max_error_number" = "9999999999","max_batch_rows"="200000","max_batch_size" = "209715200","max_batch_interval" = "10","strict_mode" = "false" )
FROM KAFKA("kafka_broker_list"= "上文配置的Kafka BrokerList", "kafka_topic" = "上文定义的Kafka Topic名称", "property.kafka_default_offsets" = "OFFSET_END", "property.client.id" = "上文自定义的client_id标识", "property.group.id" = "自定义个groupid标识");
           

查看表内的数据,如果不满足要排查问题出在哪里

在其他fe node进行同样的filebeat收集

结语

日志流入表后,可以在此基础上开发生成监控分析图表,进行一些聚合统计分析。

继续阅读