天天看點

當Elasticsearch logstash kibana (ELK) 遇到symantec

背景:

Symantec 的裝置管理來管理者工裝置的通路,如禁用攝像頭、藍牙裝置、儲存設備等。因為經常有人申請開通裝置權限,是以要找到被禁用裝置的devID ,說白了就是USB的位址及路徑等資訊。但是symantec 本身的裝置通路報表過濾和檢視起來太麻煩了,是以有想法用ELK分析下,因為最近也在研究如何使用JDBC 來從資料庫索引資料到kibana,是以正好研究了下。

目前遇到的坑:

JDBC的資料源一定要有一個KEY字段,導入elasticsearch時,以這個KEY字段作為ID,這樣才不會重複導入資料,否則沒有KEY字段,資料會反複導入。

JDBC的日期字段導入到elasticsearch時會有轉換錯誤(object java.string 之類的)好像和Logstash-input-jdbc的版本有關系,可惜自己手動更新jdbc插件總是報另外一個錯,也沒有更新成功,是以後面又把資料庫裡的日期字段轉成字元串的形式才成功。

ELK on windows

下載下傳安裝JRE 或者JDK

設定JAVA_HOME 指向JRE或者JDK安裝目錄。

安裝elasticsearch 服務,進入D:\ELK\elasticsearch-2.1.0\bin

輸入service install Servicename

<a href="http://s3.51cto.com/wyfs02/M02/78/EA/wKiom1aEvYmBHiseAAAwH6-0ioI558.png"></a>

安裝logstash 服務,需要下載下傳NSSM ,進入NSSM的安裝目錄:執行nssm install logstash,配置如下。

<a href="http://s3.51cto.com/wyfs02/M00/78/E8/wKioL1aEvaixK02RAAAbUvAFnZc616.png"></a>

LOGstash 如果和elasticsearch 一起使用,可以在logstash的dependencies 裡面配置依賴服務elasticsearch

我們看看run.bat 的内容

logstash.bat agent -f logstash.conf

安裝kibana的windows 服務。進入NSSM的安裝目錄:執行nssm install kibana,配置如下。

<a href="http://s3.51cto.com/wyfs02/M01/78/E8/wKioL1aEvamxC6IBAAAcj06OVYc612.png"></a>

<a href="http://s3.51cto.com/wyfs02/M02/78/EA/wKiom1aEvYvTRL-6AAAW0_5m_jw066.png"></a>

elasticsearch的初始配置,一般不需要配置,但是為了提升安全性,我把監聽位址改為隻有本機能通路,修改elasticsearch.yml如下圖。

<a href="http://s3.51cto.com/wyfs02/M00/78/EA/wKiom1aEvYyiXIwCAAAu8NtdREM773.png"></a>

kibana的初始配置,同樣,Kibana也修改成了隻監聽本地端口,修改config\kibana.yml 如下圖:

<a href="http://s3.51cto.com/wyfs02/M01/78/EA/wKiom1aEvYyyke1UAABATs4IRAs253.png"></a>

Logstash的目錄結構:

<a href="http://s3.51cto.com/wyfs02/M02/78/E8/wKioL1aEvauwCwKCAABAwKEqKn8626.png"></a>

下面主要看logstash的配置:

input {

jdbc {

jdbc_driver_library =&gt; "sqljdbc42.jar"

jdbc_driver_class =&gt; "com.microsoft.sqlserver.jdbc.SQLServerDriver"

jdbc_connection_string =&gt; "jdbc:sqlserver://sep-bg:1433;databaseName=sem5"

jdbc_user =&gt; "logstash"

jdbc_password =&gt; "yourpassword"

schedule =&gt; "* * * * *"

statement_filepath =&gt; "semSqlQuery.sql"

type =&gt; "SEPDeviceLog"

}

filter {

if [type]=='SEPDeviceLog' {

grok {

patterns_dir =&gt; "../patterns"

match =&gt; { "event_desc" =&gt; "%{KEY:key1}:%{VALUE:deviceName} %{KEY:key2}:%{VALUE:devCategory} %{KEY:key3}:%{VALUE:devGUID} %{KEY:key4}:%{VALUE:devID}" }

remove_field =&gt; ["key1","key2","key3","key4"]

date {

match =&gt; ["datetime",

"yyyy-MM-dd HH:mm:ss.SSS",

"yyyy-MM-dd HH:mm:ss,SSS",

"yyyy-MM-dd HH:mm:ss",

"yyyy/MM/dd HH:mm:ss",

"MMM d HH:mm:ss",

"MMM dd HH:mm:ss",

"dd/MMM/yyyy:HH:mm:ss Z",

"yyyy-MM-dd HH:mm:ss.SSSZ",

"yyyy-MM-dd'T'HH:mm:ss.SSSZ",

"yyyy-MM-dd'T'HH:mm:ssZ",

"E MMM dd HH:mm:ss yyyy Z"

]

remove_field =&gt; ["datetime"]

output {

# stdout {

# codec =&gt; rubydebug

# }

elasticsearch {

hosts =&gt; ["localhost:9200"]

document_id =&gt; "%{uid}"

SemSQLQuery.sql 檔案内容(注意我的SQL裡面把agent_security_log_idx 作為了index 裡面每個文檔的ID,參考上面logstash的output 到elasticsearch時的document_id 設定,這樣才不會重複往elasticsearch中導入資料,而且SQL中如果有更新(插入、Update等)Elasticsearch中也會對應更改(但是Delete操作不會同步)。

此處2016-02-04年有所更新,之前的bigint 表示的milliseconds 轉換成datetime 後,發現時間不對,查了下資料,資料庫中記錄的是從1970-01-01年之後的milliseconds 秒數,

是以正确的semsqlquery.sql 内容應該如下(轉換後的時間是UTC的格式,是以使用最新版本的logstash 可以自動轉到對應的時區):

select 

agent_security_log_idx as uid, 

EVENT_TIME, 

dateadd(MILLISECOND,  EVENT_TIME % 1000 ,DATEADD(SECOND, EVENT_TIME / 1000, '19700101')) as datetime, 

EVENT_ID, 

EVENT_DESC, 

SEVERITY, 

HOST_NAME, 

LOCAL_HOST_MAC, 

user_name, 

domain_name, 

local_host_Ip_text 

from  V_AGENT_SECURITY_LOG

修正sql查詢後,目前日志裡面的時間顯示正常,最新的日志時間和我計算機的時間基本一緻。

<a href="http://s3.51cto.com/wyfs02/M02/7A/BF/wKiom1ayxEbyZKzmAAC4qTCRQ3g211.png"></a>

Grok fitler的配置檔案放在了Symantec_dev_log 檔案裡面,就定義了個KEY,VALUE的比對規則。

<a href="http://s3.51cto.com/wyfs02/M00/78/EA/wKiom1aEvZCxa8sQAAAzsv6qz1s972.png"></a>

說說JDBC的設定:

# 使用微軟網站download 的SQL Server JDBC的壓縮包。我放在了bin目錄下。

# SQL Server JDBC的Driver Class,可以從sqljdbc42.jar 檔案中的services下的java.sql.Driver檔案裡面找到。

<a href="http://s3.51cto.com/wyfs02/M01/78/E8/wKioL1aEva_Q2SuFAAB2yr1MkeI506.png"></a>

# 下面這個可以參考MSDN,基本都一樣。

# 使用者名,我的SQL配置了一個專用的SQL賬号(非內建認證),僅給了限定的讀取權限。

# SQL 查詢的檔案路徑。

# 加了一個Type,好寫過濾規則。

然後可以開啟logstash直接導入資料到elasticsearch,測試時可以開啟output 插件(我的配置檔案裡面已經注釋掉了 )/

最後就是kibana的最終使用階段了。

通路http://127.0.0.1:5601/

<a href="http://s3.51cto.com/wyfs02/M02/78/EA/wKiom1aEvZPhl33WAALbatRmT_4507.png"></a>

選擇Severity 為7的事件(應該是被禁用的裝置事件)。

<a href="http://s3.51cto.com/wyfs02/M01/78/EA/wKiom1aEvZbydlC5AAG0YwkYFnA663.png"></a>

<a href="http://s3.51cto.com/wyfs02/M02/78/E8/wKioL1aEvbeB84ZqAAF4XV8uOZg478.png"></a>

其實可以點選上面Pie圖裡面的任意塊來進行過濾和篩選。比如按機器名、按照裝置Name,裝置Class。

版權聲明:原創作品,如需轉載,請注明出處。否則将追究法律責任

本文轉自 yoke88 51CTO部落格,原文連結:http://blog.51cto.com/yoke88/1730347