天天看點

Doris SQL日志審計部署,以及sql收集輸出kafka,後續血緣分析

1,介紹

  Doris 的審計日志插件是一個可選插件。使用者可以在運作時安裝或解除安裝這個插件,該插件可以将 FE 的審計日志定期的導入到指定 Doris 叢集中,以友善使用者通過 SQL 對審計日志進行檢視和分析,這裡的資料其實是Doris FE log目錄下的 

fe.audit.log

 檔案中的資料。

  我們要做的是安裝這個插件,然後我們可以通過doris sql去對應的表查詢sql語句,也可以通過doris 目錄下的fe.audit.log檔案将日志接出之後寫入外部操作。

  通過filebeat資料接出到kafka,通過flink程式消費解析日志然後分析出血緣關系,這是我們最終的目的。

2,安裝Doris 的審計日志插件

  1)參考官網方式,編譯得到 auditloader.zip 檔案

   審計日志插件 | Apache Doris

  2)修改 fe conf下的配置檔案,添加參數,啟用plugin架構

       plugin_enable = true

3)auditloader.zip檔案上傳到fe,解壓到/plugins目錄

     在fe/plugins 目錄下插件一個檔案夾,然後auditloader.zip解壓進這個檔案夾,比如我這裡建立檔案夾auditdemo ,解壓之後的效果:

Doris SQL日志審計部署,以及sql收集輸出kafka,後續血緣分析

  4) plugin.conf 配置檔案修改

   plugin.conf 配置資訊:

   # 批量的最大大小,預設為 50MB

   max_batch_size=52428800

   # 批量加載的最大間隔,預設為 60 秒

   max_batch_interval_sec=60

   # 加載審計的 Doris FE 主機,預設為 127.0.0.1:8030。

   # 這應該是Stream load 的主機端口

   frontend_host_port=127.0.0.1:8030

   # 審計表的資料庫

   database=doris_audit_db__

   # 審計表名,儲存審計資料

   table=doris_audit_tbl__

   # 用來連接配接doris的使用者. 此使用者必須對審計表具有 LOAD_PRIV 權限.

   user=root

   # 用來連接配接doris的使用者密碼

   password=root

  5)建立插件表

   庫為配置檔案裡面的配置 database=doris_audit_db__

 create table doris_audit_tbl__
 (
    query_id varchar(48) comment "查詢唯一ID",
     time datetime not null comment "查詢開始時間",
    client_ip varchar(32) comment "查詢用戶端IP",
     user varchar(64) comment "查詢使用者名",
    db varchar(96) comment "查詢的資料庫",
    state varchar(8) comment "查詢狀态:EOF, ERR, OK",
    query_time bigint comment "查詢執行時間(毫秒)",
    scan_bytes bigint comment "查詢掃描的位元組數",
    scan_rows bigint comment "查詢掃描的記錄行數",
    return_rows bigint comment "查詢傳回的結果記錄數",
    stmt_id int comment "SQL語句的增量ID",
    is_query tinyint comment "這個是否是查詢:1 or 0",
    frontend_ip varchar(32) comment "執行這個語句的FE IP",
    stmt varchar(5000) comment "原始語句,如果超過 5000 位元組,則進行修剪"
 ) engine=OLAP
 duplicate key(query_id, time, client_ip)
 partition by range(time) ()
 distributed by hash(query_id) buckets 1
 properties(
     "dynamic_partition.time_unit" = "DAY",
     "dynamic_partition.start" = "-30",
     "dynamic_partition.end" = "3",
     "dynamic_partition.prefix" = "p",
     "dynamic_partition.buckets" = "1",
     "dynamic_partition.enable" = "true",
     "replication_num" = "3"
 );      

  6)啟動插件,在mysql 用戶端

Doris SQL日志審計部署,以及sql收集輸出kafka,後續血緣分析
  1. 安裝一個本地 zip 檔案插件:
     INSTALL PLUGIN FROM "/wyyt/software/doris-0.14.7/fe/plugins/auditdemo.zip";
    
          
  2. 安裝一個本地目錄中的插件:
     INSTALL PLUGIN FROM "/wyyt/software/doris-0.14.7/fe/plugins/auditdemo/";
          

3,測試效果

  1)檢視對應庫的表對應表資訊:

Doris SQL日志審計部署,以及sql收集輸出kafka,後續血緣分析

  2)檢視fe目錄下log目錄下 fe.audit.log

Doris SQL日志審計部署,以及sql收集輸出kafka,後續血緣分析

4,将fe.audit.log檔案日志輸出

  1)filebeat-7.13.3-linux-x86_64.tar.gz 下載下傳安裝 

Doris SQL日志審計部署,以及sql收集輸出kafka,後續血緣分析

  2)配置檔案,直接監控fe log目錄下的fe.audit.log檔案 将日志輸出到kafka

5,通過程式解析log日志

  通過flink代碼消費kafka的資料然後血緣解析,結果輸出到mysql或者redis存儲供前端頁面查詢。

  1)指定消費檔案

Doris SQL日志審計部署,以及sql收集輸出kafka,後續血緣分析

 2)指定輸出

Doris SQL日志審計部署,以及sql收集輸出kafka,後續血緣分析

3)啟動指令

   ./filebeat -e -c filebeat.yml

4)運作效果

Doris SQL日志審計部署,以及sql收集輸出kafka,後續血緣分析

5)JSON格式資料寫入kafka

{

"@timestamp": "2021-10-09T03:43:35.716Z",

"@metadata": {

"beat": "filebeat",

"type": "_doc",

"version": "7.13.3"

},

"ecs": {

"version": "1.8.0"

},

"host": {

"architecture": "x86_64",

"name": "bi-524",

"os": {

"codename": "Core",

"type": "linux",

"platform": "centos",

"version": "7 (Core)",

"family": "redhat",

"name": "CentOS Linux",

"kernel": "3.10.0-1127.19.1.el7.x86_64"

},

"id": "b521378110f947d896c25d753d4d472f",

"containerized": false,

"ip": [

"192.168.5.24",

"fe80::250:56ff:fe90:8b62",

"169.254.123.1",

"fe80::42:52ff:fe91:14a4",

"fe80::b061:35ff:fe7e:a765"

],

"mac": [

"00:50:56:90:8b:62",

"02:42:52:91:14:a4",

"b2:61:35:7e:a7:65"

],

"hostname": "bi-524"

},

"log": {

"offset": 46640,

"file": {

"path": "/wyyt/software/doris-0.14.7/fe/log/fe.audit.log"

}

},

"message": "2021-10-09 11:43:26,236 [query] |Client\u003d127.0.0.1:47609|User\u003droot|Db\u003ddefault_cluster:example_db|State\u003dEOF|Time\u003d12|ScanBytes\u003d0|ScanRows\u003d0|ReturnRows\u003d1|StmtId\u003d350|QueryId\u003de5f03872e37245e4-982ea3ae68173079|IsQuery\u003dtrue|feIp\u003d192.168.5.24|Stmt\u003dSELECT @@session.auto_increment_increment AS auto_increment_increment, @@character_set_client AS character_set_client, @@character_set_connection AS character_set_connection, @@character_set_results AS character_set_results, @@character_set_server AS character_set_server, @@init_connect AS init_connect, @@interactive_timeout AS interactive_timeout, @@language AS language, @@license AS license, @@lower_case_table_names AS lower_case_table_names, @@max_allowed_packet AS max_allowed_packet, @@net_buffer_length AS net_buffer_length, @@net_write_timeout AS net_write_timeout, @@query_cache_size AS query_cache_size, @@query_cache_type AS query_cache_type, @@sql_mode AS sql_mode, @@system_time_zone AS system_time_zone, @@time_zone AS time_zone, @@tx_isolation AS tx_isolation, @@wait_timeout AS wait_timeout|CpuTimeMS\u003d0",

"input": {

"type": "log"

},

"agent": {

"type": "filebeat",

"version": "7.13.3",

"hostname": "bi-524",

"ephemeral_id": "fc3ce98f-0ace-445b-878b-ad39f7ece367",

"id": "b37c7155-80e0-4f95-a3d9-cd92023d1e14",

"name": "bi-524"

}

}

6,暫無 --後續程式消費kafka

7,接入血緣分析 --暫時還沒完成

未完待續..............

參考峰哥文章:

Apache Doris SQL 日志審計

繼續閱讀