天天看點

ELK叢集之logstash(5)

Logstash工作原理

Logstash事件處理有三個階段:inputs → filters → outputs。是一個接收,處理,轉發日志的工具。支援系統日志,webserver日志,錯誤日志,應用日志,總之包括所有可以抛出來的日志類型。

ELK叢集之logstash(5)

如上圖,Logstash的資料處理過程主要包括:Inputs,Filters,Outputs 三部分,另外在Inputs和Outputs中可以使用Codecs對資料格式進行處理。這四個部分均以插件形式存在,使用者通過定義pipeline配置檔案,設定需要使用的input,filter,output,codec插件,以實作特定的資料采集,資料處理,資料輸出等功能 。

Inputs:用于從資料源擷取資料,常見的插件如file, syslog, redis, beats 等
Filters:用于處理資料如格式轉換,資料派生等,常見的插件如grok, mutate, drop, clone, geoip等
Outputs:用于資料輸出,常見的插件如elastcisearch,file, graphite, statsd等
Codecs:Codecs(編碼插件)不是一個單獨的流程,而是在輸入和輸出等插件中用于資料轉換的子產品,用于對資料進行編碼處理,常見的插件如json,multiline。Logstash不隻是一個input | filter | output 的資料流,而是一個 input | decode | filter | encode | output 的資料流!codec 就是用來 decode、encode 事件的。      

Input:輸入資料到logstash。

一些常用的輸入為:

file:從檔案系統的檔案中讀取,類似于tial -f指令

syslog:在514端口上監聽系統日志消息,并根據RFC3164标準進行解析

redis:從redis service中讀取

beats:從filebeat中讀取

Filters:資料中間處理,對資料進行操作。

一些常用的過濾器為:

grok:解析任意文本資料,Grok 是 Logstash 最重要的插件。它的主要作用就是将文本格式的字元串,轉換成為具體的結構化的資料,配合正規表達式使用。内置120多個解析文法。

官方提供的grok表達式:​​https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns​​

grok線上調試:​​https://grokdebug.herokuapp.com/​​

mutate:對字段進行轉換。例如對字段進行删除、替換、修改、重命名等。

drop:丢棄一部分events不進行處理。

clone:拷貝 event,這個過程中也可以添加或移除字段。

geoip:添加地理資訊(為前台kibana圖形化展示使用)

Outputs:outputs是logstash處理管道的最末端元件。一個event可以在處理過程中經過多重輸出,但是一旦所有的outputs都執行結束,這個event也就完成生命周期。

一些常見的outputs為:

elasticsearch:可以高效的儲存資料,并且能夠友善和簡單的進行查詢。

file:将event資料儲存到檔案中。

graphite:将event資料發送到圖形化元件中,一個很流行的開源存儲圖形化展示的元件。

Codecs:codecs 是基于資料流的過濾器,它可以作為input,output的一部配置設定置。Codecs可以幫助你輕松的分割發送過來已經被序列化的資料。

一些常見的codecs:

json:使用json格式對資料進行編碼/解碼。

multiline:将彙多個事件中資料彙總為一個單一的行。比如:java異常資訊和堆棧資訊。

logstash安裝

ELK叢集之logstash(5)
ELK叢集之logstash(5)
Logstash功能
  Logstash主要對日志進行過濾處理,也能用來做日志收集。但日志采集一般不用logstash
  輸入支援: 标準輸入、文本日志輸入等
  輸出支援: 标準輸出、輸出到es等
Logstash的安裝
  yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y
  yum localinstall logstash-7.6.2.rpm

Logstash的JVM配置檔案更新jvm.options 
  -Xms200M
  -Xmx200M

Logstash最簡單配置/etc/logstash/conf.d/logstash.conf
input{
  stdin{}
}
output{
  stdout{
    codec=>rubydebug
  }
}

Logstash的啟動和測試
  /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf
  輸入字元,檢視輸出      

安裝

ELK叢集之logstash(5)
ELK叢集之logstash(5)
Logstash讀日志檔案實戰
安裝nginx來提供日志輸入
  yum install nginx -y

編輯nginx的systemctl配置檔案/usr/lib/systemd/system/nginx.service,删除底下
KillSignal=SIGQUIT
TimeoutStopSec=5
KillMode=process
PrivateTmp=true

Logstash收集日志注意點
  預設logstash用logstash使用者啟動,日志需要給logstash使用者讀權限
  需要有新日志産生,剛啟動老的日志預設不讀取

輸入讀取nginx日志
input {
  file {
    path => "/var/log/nginx/access.log"
  }
}
output{
  stdout{
    codec=>rubydebug
  }
}

Logstash的啟動
  systemctl enable logstash
  systemctl restart logstash
  觀察日志:/var/log/messages      

logstash讀取nginx日志

ELK叢集之logstash(5)
ELK叢集之logstash(5)
input {
  file {
    path => "/var/log/nginx/access.log"
  }
}
output {
  elasticsearch {
    hosts => ["http://xxx:9200", "http://xxx:9200"]
    user => "elastic"
    password => "sjgpwd"
    index => "sjgnginx-%{+YYYY.MM.dd}"
  }
}

Logstash配置重載,重新開機Logstash慢
kill -1 pid      

logstash将日志輸出到es

ELK叢集之logstash(5)
ELK叢集之logstash(5)
模拟Nginx日志産生
while true;do 
  curl 127.0.0.1/sjgsjg
  curl 192.168.238.90/sjg666
  sleep 5
done

        Logstash正則提取Nginx日志
為什麼需要提取?使用一整行日志無法分析,需要提取單獨的字段
  分析哪個IP通路量大
  分析Nginx的響應狀态碼

Nginx日志格式
  192.168.238.90 - - [01/Aug/2020:14:53:35 +0800] "GET /sjg666 HTTP/1.1" 404 3650 "-" "Chrome xxx" "-"

Nginx日志格式配置
  log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';
$remote_addr 通路請求位址
$remote_user nginx登入的使用者
$time_local 通路時間
$request 請求的動作 目錄 協定
$status 傳回狀态
$body_bytes_sent 目前請求的位元組數
$http_referer 頁面的head資訊從哪個頁面發送的請求
$http_user_agent 使用的代理軟體 
$http_x_forwarded_for http真實請求頭      

nginx日志格式詳解

filter glok正則

Nginx日志格式

192.168.238.90 - - [01/Aug/2020:14:53:35 +0800] "GET /sjg666 HTTP/1.1" 404 3650 "-" "Chrome xxx" "-"

Grok提取利器,需要掌握正規表達式。借助Kibana的Grok工具驗證提取
  自寫正則提取(建議)
  内置規則提取(簡化):/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns

Grok自寫正則提取文法:(?<字段名>自寫正規表達式)
(?<remote_addr>\d+\.\d+\.\d+\.\d+)

内置正則提取文法:%{内置正規表達式:字段名}
%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}

混合文法提取
(?<remote_addr>\d+\.\d+\.\d+\.\d+) - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\]

普通正規表達式符号
  . 表示任意一個字元,* 表示前面一個字元出現0次或者多次
  [abc]表示中括号内任意一個字元,[^abc]表示非中括号内的字元
  [0-9]表示數字,[a-z]表示小寫字母,[A-Z]表示大寫字母,[a-zA-Z]表示所有字母,[a-zA-Z0-9]表示所有字母+數字
  [^0-9]表示非數字
  ^xx表示以xx開頭,xx$表示以xx結尾
  \s表示空白字元,\S表示非空白字元,\d表示數字

擴充正規表達式,在普通正則基礎上再進行擴充
  ?表示前面字元出現0或者1次,+前面字元出現1或者多次
  {a}表示前面字元比對a次,{a,b}表示前面字元比對a到b次
  {,b}表示前面字元比對0次到b次,{a,}前面字元比對a或a+次
  string1|string2表示比對string1或者string2      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
EMAILLOCALPART [a-zA-Z][a-zA-Z0-9_.+-=:]+
EMAILADDRESS %{EMAILLOCALPART}@%{HOSTNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b

POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
# URN, allowing use of RFC 2141 section 2.3 reserved characters
URN urn:[0-9A-Za-z][0-9A-Za-z-]{0,31}:(?:%[0-9a-fA-F]{2}|[0-9A-Za-z()+,.:=@;$_!*'/?#-])+

# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV4 (?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9])
IP (?:%{IPV6}|%{IPV4})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
IPORHOST (?:%{IP}|%{HOSTNAME})
HOSTPORT %{IPORHOST}:%{POSINT}

# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (/([\w_%!$@:.,+~-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]([A-Za-z0-9+\-.]+)+
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?

# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])

# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)

# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[APMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}

# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG [\x21-\x5a\x5c\x5e-\x7e]+
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}

# Shortcuts
QS %{QUOTEDSTRING}

# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:

# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)      

grok内置正則

ELK叢集之logstash(5)
ELK叢集之logstash(5)
input {
  file {
    path => "/var/log/nginx/access.log"
  }
}
filter {
  grok {
    match => {
      "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
    }
    remove_field => ["message"]
  }
}
output {
  elasticsearch {
    hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
    user => "elastic"
    password => "sjgpwd"
    index => "sjgnginx-%{+YYYY.MM.dd}"
  }
}      

正則提取nginx日志

ELK叢集之logstash(5)
ELK叢集之logstash(5)
Kibana顯示感歎号問題處理
  Kibana索引重新整理
  Kibana索引的操作并不會影響到資料,删除重建也沒問題

        Logstash字段特殊處理-替換或轉類型
http_user_agent包含雙引号,需要去除
filter {
  grok {
    match => {
      "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
    }
    remove_field => ["message"]
  }
  mutate {
    gsub => [ "http_user_agent",'"',"" ]
  }
}

Logstash字元串轉整形
  mutate{
    gsub => [ "http_user_agent",'"',"" ]
    convert => { "status" => "integer" }
    convert => { "body_bytes_sent" => "integer" }
  }      

字段替換或轉類型處理

ELK叢集之logstash(5)
ELK叢集之logstash(5)
Logstash替換時間戳@timestamp
Nginx模拟使用者通路
while true;do 
  curl 192.168.238.90/sjg666
  curl 127.0.0.1
  sleep 2
done

場景假設
  假設我們要分析使用者昨天的通路日志

Logstash分析所有Nginx日志,發現問題
input {
  file {
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

兩種時間
  發送日志時間,無法分析日志
  使用者的通路時間在日志裡,需要以日志裡的為準,分析的結果才準确

以使用者通路時間為準,格式為01/Aug/2020:10:34:20 +0800
filter {
  grok {
    match => {
      "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
    }
    remove_field => ["message"]
  }
  date {
    match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
    target => "@timestamp"
  }
}

日志裡如果有不同的時間格式,覆寫的時候格式要對應
20/Feb/2019:14:50:06 -> dd/MMM/yyyy:HH:mm:ss
2016-08-24 18:05:39,830 -> yyyy-MM-dd HH:mm:ss,SSS

手動統計Nginx的請求和網頁顯示進行對比
cat /var/log/nginx/access.log |awk '{print $4}'|sed 's/:[0-9][0-9]$//g'|sort |uniq -c

時間戳覆寫後删除
  mutate {
    gsub => [ "http_user_agent",'"',"" ]
    convert => { "status" => "integer" }
    convert => { "body_bytes_sent" => "integer" }
    remove_field => ["time_local"]
  }      

替換時間戳

ELK叢集之logstash(5)
ELK叢集之logstash(5)
Logstash正則提取異常處理
Logstash改成分析最新日志
input {
  file {
    path => "/var/log/nginx/access.log"
  }
}

正則提取有異常的情況
echo "sjgmethods xxx xxx" >> /var/log/nginx/access.log
tags: _grokparsefailure

設定正則出錯提取到另外的索引裡
output {
  if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
    elasticsearch {
      hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
      user => "elastic"
      password => "sjgpwd"
      index => "sjgnginx-%{+YYYY.MM.dd}"
    }
  }
  else{
    elasticsearch {
      hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
      user => "elastic"
      password => "sjgpwd"
      index => "sjgfail-%{+YYYY.MM.dd}"
    }
  }
}

        Kibana圖形簡單使用
模拟資料
  while true;do 
    curl 192.168.238.90/sjg666; 
    curl 127.0.0.1; 
    sleep 2; 
  done

首頁區域
  可以根據時間檢視通路量:每分鐘通路量
  可以根據某個字段查詢
  可以單獨看某個字段的統計

Kibana圖形有建立,選擇terms去檢視對應的資料
  餅圖的建立 pie_remote_addr
  表的建立 table_remote_addr

Kibana面闆的建立sjg_dash
  建立面闆
  在面闆上添加圖形

建議采用Grafana展示      

正則替換異常處理

ELK叢集之logstash(5)
ELK叢集之logstash(5)
Logstash分析Linux系統日志
預設的日志格式
  Aug  3 18:37:57 sjg1 sshd[1318]: Accepted password for root from xxx port 49205 ssh2
  無年份的字段

系統日志配置/etc/rsyslog.conf,重新開機rsyslog
$template sjgformat,"%$NOW% %TIMESTAMP:8:15% %hostname% %syslogtag% %msg%\n"
$ActionFileDefaultTemplate sjgformat

日志格式
2020-08-03 18:47:34 sjg1 sshd[1522]: Accepted password for root from 58.101.14.103 port 49774 ssh2
%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)

隻讀權限添加
chmod +r secure

提取secure日志,messages等其它日志提取原理類似
input {
  file {
    path => "/var/log/secure"
  }
}
filter {
  grok {
    match => {
      "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
    }
    remove_field => ["message"]
  }
  date {
    match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
    target => "@timestamp"
  }
  mutate {
    remove_field => ["timestamp"]
  }
}
output {
  elasticsearch {
    hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
    user => "elastic"
    password => "sjgpwd"
    index => "sjgsecure-%{+YYYY.MM.dd}"
  }
}      

提取ssh日志分析

ELK叢集之logstash(5)
ELK叢集之logstash(5)
#input{
#    file {
#    path => "/var/log/nginx/access.log"
#    type => "nginx"
#  }
#    file {
#    path => "/var/log/secure"
#    type => "system"
#input{
#    file {
#    path => "/var/log/nginx/access.log"
#    type => "nginx"
#  }
#    file {
#    path => "/var/log/secure"
#    type => "system"
#  }
#
#}
input {
  beats {
    host => '0.0.0.0'
    port => 5044
  }
}
filter {
  if [type] == "nginx" {
    grok {
      match => {
        "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
      }
      remove_field => ["message"]
    }
    date {
      match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }
    mutate {
      gsub => [ "http_user_agent",'"','' ]
      convert => { "status" => "integer" }
      convert => { "body_bytes_sent" => "integer" }
      remove_field => ["time_local"]
    }
  }
   if [type] == "system"{
     grok {
      match => {
        "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
      }
      remove_field => ["message"]
    }
    date {
      match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
      target => "@timestamp"
    }
    mutate {
      remove_field => ["timestamp"]
    }
  }
}
output {
  if [type] == "nginx"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testnginx-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "system"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testsystem-%{+YYYY.MM.dd}"
    }
  }
}      

logstash提取多個日志

ELK叢集之logstash(5)
ELK叢集之logstash(5)
input {
  beats {
    host => '0.0.0.0'
    port => 5044
  }
}
filter {
  if [type] == "nginx" {
    grok {
      match => {
        "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
      }
      remove_field => ["message"]
    }
    date {
      match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }
    mutate {
      gsub => [ "http_user_agent",'"','' ]
      convert => { "status" => "integer" }
      convert => { "body_bytes_sent" => "integer" }
      remove_field => ["time_local"]
    }
  }
   if [type] == "system"{
     grok {
      match => {
        "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
      }
      remove_field => ["message"]
    }
    date {
      match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
      target => "@timestamp"
    }
    mutate {
      remove_field => ["timestamp"]
    }
  }
   if [type] == "mysql"{
     grok {
      match => {
        }
      remove_field => ["message"]

     }
     date {
       match => ["timestamp", "yyMMdd HH:mm:ss"]
       target => "@timestamp"
     }
     mutate {
       remove_field => ["timestamp"]
     }

   }

   if [type] == "tomcat" {
    grok {
      match => {
        "message" => '(?<timestamp>\d+-[a-zA-Z]+-\d+ \d+:\d+:\d+.\d+) %{NOTSPACE:loglevel} %{NOTSPACE:thread}(?<loginfo>(\s+.*)+)'
      }
      remove_field => ["message"]
    }
    date {
      match => ["time_local", "dd-MMM-yyyy HH:mm:ss.SSS"]
      target => "@timestamp"
    }
    mutate {
      remove_field => ["timestamp"]
    }
  }

}
output {
  if [type] == "mysql"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testmysql-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "tomcat"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testtomcat-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "nginx"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testnginx-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "system"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testsystem-%{+YYYY.MM.dd}"
    }
  }
}      

logstash接收filebeat多主機日志

ELK叢集之logstash(5)
ELK叢集之logstash(5)
input {
  kafka {
    bootstrap_servers => "172.17.166.217:9092,172.17.166.218:9092,172.17.166.219:9092"
    topics => ["sjg"]
    group_id => "sjggroup"
    codec => "json"
  }
}
filter {
  if [type] == "nginx" {
    grok {
      match => {
        "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
      }
      remove_field => ["message"]
    }
    date {
      match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }
    mutate {
      gsub => [ "http_user_agent",'"','' ]
      convert => { "status" => "integer" }
      convert => { "body_bytes_sent" => "integer" }
      remove_field => ["time_local"]
    }
  }
   if [type] == "system"{
     grok {
      match => {
        "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
      }
      remove_field => ["message"]
    }
    date {
      match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
      target => "@timestamp"
    }
    mutate {
      remove_field => ["timestamp"]
    }
  }
   if [type] == "mysql"{
     grok {
      match => {
        }
      remove_field => ["message"]

     }
     date {
       match => ["timestamp", "yyMMdd HH:mm:ss"]
       target => "@timestamp"
     }
     mutate {
       remove_field => ["timestamp"]
     }

   }

   if [type] == "tomcat" {
    grok {
      match => {
        "message" => '(?<timestamp>\d+-[a-zA-Z]+-\d+ \d+:\d+:\d+.\d+) %{NOTSPACE:loglevel} %{NOTSPACE:thread}(?<loginfo>(\s+.*)+)'
      }
      remove_field => ["message"]
    }
    date {
      match => ["time_local", "dd-MMM-yyyy HH:mm:ss.SSS"]
      target => "@timestamp"
    }
    mutate {
      remove_field => ["timestamp"]
    }
  }

}
output {
  if [type] == "mysql"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testmysql-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "tomcat"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testtomcat-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "nginx"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testnginx-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "system"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testsystem-%{+YYYY.MM.dd}"
    }
  }
}      

logstash提取kafka隊列日志資訊

ELK叢集之logstash(5)
ELK叢集之logstash(5)
input {
  kafka {
    bootstrap_servers => "172.17.166.217:9092,172.17.166.218:9092,172.17.166.219:9092"
    topics => ["test2"]
    group_id => "test2group"
    codec => "json"
  }
}
filter {
  mutate {
    remove_field => ["agent","ecs","log","input","[host][os]"]
  }
}
#output {
#  elasticsearch {
#    hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200", "http://172.17.166.219:9200"]
#    user => "elastic"
#    password => "1.Q1.Q1.Q"
#    index => "testmetric-%{+YYYY.MM.dd}"
#  }
#}
output {
  if [service][type] == "mysql" {
    elasticsearch {
      hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
      user => "elastic"
      password => "sjgpwd"
      index => "sjgmysql-%{+YYYY.MM.dd}"
    }
  }
  else if [service][type] == "redis" {
    elasticsearch {
      hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
      user => "elastic"
      password => "sjgpwd"
      index => "sjgredis-%{+YYYY.MM.dd}"
    }
  }
  else {
    elasticsearch {
      hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
      user => "elastic"
      password => "sjgpwd"
      index => "sjgother-%{+YYYY.MM.dd}"
    }
  }
}      

logstash提取metricbeat日志發送到不同es分類索引

ELK叢集之logstash(5)
Inputs:用于從資料源擷取資料,常見的插件如file, syslog, redis, beats 等
Filters:用于處理資料如格式轉換,資料派生等,常見的插件如grok, mutate, drop, clone, geoip等
Outputs:用于資料輸出,常見的插件如elastcisearch,file, graphite, statsd等
Codecs:Codecs(編碼插件)不是一個單獨的流程,而是在輸入和輸出等插件中用于資料轉換的子產品,用于對資料進行編碼處理,常見的插件如json,multiline。Logstash不隻是一個input | filter | output 的資料流,而是一個 input | decode | filter | encode | output 的資料流!codec 就是用來 decode、encode 事件的。      

ELK叢集之logstash(5)
ELK叢集之logstash(5)
Logstash功能
  Logstash主要對日志進行過濾處理,也能用來做日志收集。但日志采集一般不用logstash
  輸入支援: 标準輸入、文本日志輸入等
  輸出支援: 标準輸出、輸出到es等
Logstash的安裝
  yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y
  yum localinstall logstash-7.6.2.rpm

Logstash的JVM配置檔案更新jvm.options 
  -Xms200M
  -Xmx200M

Logstash最簡單配置/etc/logstash/conf.d/logstash.conf
input{
  stdin{}
}
output{
  stdout{
    codec=>rubydebug
  }
}

Logstash的啟動和測試
  /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf
  輸入字元,檢視輸出      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
Logstash讀日志檔案實戰
安裝nginx來提供日志輸入
  yum install nginx -y

編輯nginx的systemctl配置檔案/usr/lib/systemd/system/nginx.service,删除底下
KillSignal=SIGQUIT
TimeoutStopSec=5
KillMode=process
PrivateTmp=true

Logstash收集日志注意點
  預設logstash用logstash使用者啟動,日志需要給logstash使用者讀權限
  需要有新日志産生,剛啟動老的日志預設不讀取

輸入讀取nginx日志
input {
  file {
    path => "/var/log/nginx/access.log"
  }
}
output{
  stdout{
    codec=>rubydebug
  }
}

Logstash的啟動
  systemctl enable logstash
  systemctl restart logstash
  觀察日志:/var/log/messages      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
input {
  file {
    path => "/var/log/nginx/access.log"
  }
}
output {
  elasticsearch {
    hosts => ["http://xxx:9200", "http://xxx:9200"]
    user => "elastic"
    password => "sjgpwd"
    index => "sjgnginx-%{+YYYY.MM.dd}"
  }
}

Logstash配置重載,重新開機Logstash慢
kill -1 pid      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
模拟Nginx日志産生
while true;do 
  curl 127.0.0.1/sjgsjg
  curl 192.168.238.90/sjg666
  sleep 5
done

        Logstash正則提取Nginx日志
為什麼需要提取?使用一整行日志無法分析,需要提取單獨的字段
  分析哪個IP通路量大
  分析Nginx的響應狀态碼

Nginx日志格式
  192.168.238.90 - - [01/Aug/2020:14:53:35 +0800] "GET /sjg666 HTTP/1.1" 404 3650 "-" "Chrome xxx" "-"

Nginx日志格式配置
  log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';
$remote_addr 通路請求位址
$remote_user nginx登入的使用者
$time_local 通路時間
$request 請求的動作 目錄 協定
$status 傳回狀态
$body_bytes_sent 目前請求的位元組數
$http_referer 頁面的head資訊從哪個頁面發送的請求
$http_user_agent 使用的代理軟體 
$http_x_forwarded_for http真實請求頭      

Grok提取利器,需要掌握正規表達式。借助Kibana的Grok工具驗證提取
  自寫正則提取(建議)
  内置規則提取(簡化):/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns

Grok自寫正則提取文法:(?<字段名>自寫正規表達式)
(?<remote_addr>\d+\.\d+\.\d+\.\d+)

内置正則提取文法:%{内置正規表達式:字段名}
%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}

混合文法提取
(?<remote_addr>\d+\.\d+\.\d+\.\d+) - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\]

普通正規表達式符号
  . 表示任意一個字元,* 表示前面一個字元出現0次或者多次
  [abc]表示中括号内任意一個字元,[^abc]表示非中括号内的字元
  [0-9]表示數字,[a-z]表示小寫字母,[A-Z]表示大寫字母,[a-zA-Z]表示所有字母,[a-zA-Z0-9]表示所有字母+數字
  [^0-9]表示非數字
  ^xx表示以xx開頭,xx$表示以xx結尾
  \s表示空白字元,\S表示非空白字元,\d表示數字

擴充正規表達式,在普通正則基礎上再進行擴充
  ?表示前面字元出現0或者1次,+前面字元出現1或者多次
  {a}表示前面字元比對a次,{a,b}表示前面字元比對a到b次
  {,b}表示前面字元比對0次到b次,{a,}前面字元比對a或a+次
  string1|string2表示比對string1或者string2      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
EMAILLOCALPART [a-zA-Z][a-zA-Z0-9_.+-=:]+
EMAILADDRESS %{EMAILLOCALPART}@%{HOSTNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b

POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
# URN, allowing use of RFC 2141 section 2.3 reserved characters
URN urn:[0-9A-Za-z][0-9A-Za-z-]{0,31}:(?:%[0-9a-fA-F]{2}|[0-9A-Za-z()+,.:=@;$_!*'/?#-])+

# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV4 (?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9])
IP (?:%{IPV6}|%{IPV4})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
IPORHOST (?:%{IP}|%{HOSTNAME})
HOSTPORT %{IPORHOST}:%{POSINT}

# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (/([\w_%!$@:.,+~-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]([A-Za-z0-9+\-.]+)+
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?

# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])

# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)

# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[APMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}

# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG [\x21-\x5a\x5c\x5e-\x7e]+
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}

# Shortcuts
QS %{QUOTEDSTRING}

# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:

# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
input {
  file {
    path => "/var/log/nginx/access.log"
  }
}
filter {
  grok {
    match => {
      "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
    }
    remove_field => ["message"]
  }
}
output {
  elasticsearch {
    hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
    user => "elastic"
    password => "sjgpwd"
    index => "sjgnginx-%{+YYYY.MM.dd}"
  }
}      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
Kibana顯示感歎号問題處理
  Kibana索引重新整理
  Kibana索引的操作并不會影響到資料,删除重建也沒問題

        Logstash字段特殊處理-替換或轉類型
http_user_agent包含雙引号,需要去除
filter {
  grok {
    match => {
      "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
    }
    remove_field => ["message"]
  }
  mutate {
    gsub => [ "http_user_agent",'"',"" ]
  }
}

Logstash字元串轉整形
  mutate{
    gsub => [ "http_user_agent",'"',"" ]
    convert => { "status" => "integer" }
    convert => { "body_bytes_sent" => "integer" }
  }      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
Logstash替換時間戳@timestamp
Nginx模拟使用者通路
while true;do 
  curl 192.168.238.90/sjg666
  curl 127.0.0.1
  sleep 2
done

場景假設
  假設我們要分析使用者昨天的通路日志

Logstash分析所有Nginx日志,發現問題
input {
  file {
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

兩種時間
  發送日志時間,無法分析日志
  使用者的通路時間在日志裡,需要以日志裡的為準,分析的結果才準确

以使用者通路時間為準,格式為01/Aug/2020:10:34:20 +0800
filter {
  grok {
    match => {
      "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
    }
    remove_field => ["message"]
  }
  date {
    match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
    target => "@timestamp"
  }
}

日志裡如果有不同的時間格式,覆寫的時候格式要對應
20/Feb/2019:14:50:06 -> dd/MMM/yyyy:HH:mm:ss
2016-08-24 18:05:39,830 -> yyyy-MM-dd HH:mm:ss,SSS

手動統計Nginx的請求和網頁顯示進行對比
cat /var/log/nginx/access.log |awk '{print $4}'|sed 's/:[0-9][0-9]$//g'|sort |uniq -c

時間戳覆寫後删除
  mutate {
    gsub => [ "http_user_agent",'"',"" ]
    convert => { "status" => "integer" }
    convert => { "body_bytes_sent" => "integer" }
    remove_field => ["time_local"]
  }      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
Logstash正則提取異常處理
Logstash改成分析最新日志
input {
  file {
    path => "/var/log/nginx/access.log"
  }
}

正則提取有異常的情況
echo "sjgmethods xxx xxx" >> /var/log/nginx/access.log
tags: _grokparsefailure

設定正則出錯提取到另外的索引裡
output {
  if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
    elasticsearch {
      hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
      user => "elastic"
      password => "sjgpwd"
      index => "sjgnginx-%{+YYYY.MM.dd}"
    }
  }
  else{
    elasticsearch {
      hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
      user => "elastic"
      password => "sjgpwd"
      index => "sjgfail-%{+YYYY.MM.dd}"
    }
  }
}

        Kibana圖形簡單使用
模拟資料
  while true;do 
    curl 192.168.238.90/sjg666; 
    curl 127.0.0.1; 
    sleep 2; 
  done

首頁區域
  可以根據時間檢視通路量:每分鐘通路量
  可以根據某個字段查詢
  可以單獨看某個字段的統計

Kibana圖形有建立,選擇terms去檢視對應的資料
  餅圖的建立 pie_remote_addr
  表的建立 table_remote_addr

Kibana面闆的建立sjg_dash
  建立面闆
  在面闆上添加圖形

建議采用Grafana展示      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
Logstash分析Linux系統日志
預設的日志格式
  Aug  3 18:37:57 sjg1 sshd[1318]: Accepted password for root from xxx port 49205 ssh2
  無年份的字段

系統日志配置/etc/rsyslog.conf,重新開機rsyslog
$template sjgformat,"%$NOW% %TIMESTAMP:8:15% %hostname% %syslogtag% %msg%\n"
$ActionFileDefaultTemplate sjgformat

日志格式
2020-08-03 18:47:34 sjg1 sshd[1522]: Accepted password for root from 58.101.14.103 port 49774 ssh2
%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)

隻讀權限添加
chmod +r secure

提取secure日志,messages等其它日志提取原理類似
input {
  file {
    path => "/var/log/secure"
  }
}
filter {
  grok {
    match => {
      "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
    }
    remove_field => ["message"]
  }
  date {
    match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
    target => "@timestamp"
  }
  mutate {
    remove_field => ["timestamp"]
  }
}
output {
  elasticsearch {
    hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
    user => "elastic"
    password => "sjgpwd"
    index => "sjgsecure-%{+YYYY.MM.dd}"
  }
}      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
#input{
#    file {
#    path => "/var/log/nginx/access.log"
#    type => "nginx"
#  }
#    file {
#    path => "/var/log/secure"
#    type => "system"
#input{
#    file {
#    path => "/var/log/nginx/access.log"
#    type => "nginx"
#  }
#    file {
#    path => "/var/log/secure"
#    type => "system"
#  }
#
#}
input {
  beats {
    host => '0.0.0.0'
    port => 5044
  }
}
filter {
  if [type] == "nginx" {
    grok {
      match => {
        "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
      }
      remove_field => ["message"]
    }
    date {
      match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }
    mutate {
      gsub => [ "http_user_agent",'"','' ]
      convert => { "status" => "integer" }
      convert => { "body_bytes_sent" => "integer" }
      remove_field => ["time_local"]
    }
  }
   if [type] == "system"{
     grok {
      match => {
        "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
      }
      remove_field => ["message"]
    }
    date {
      match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
      target => "@timestamp"
    }
    mutate {
      remove_field => ["timestamp"]
    }
  }
}
output {
  if [type] == "nginx"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testnginx-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "system"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testsystem-%{+YYYY.MM.dd}"
    }
  }
}      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
input {
  beats {
    host => '0.0.0.0'
    port => 5044
  }
}
filter {
  if [type] == "nginx" {
    grok {
      match => {
        "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
      }
      remove_field => ["message"]
    }
    date {
      match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }
    mutate {
      gsub => [ "http_user_agent",'"','' ]
      convert => { "status" => "integer" }
      convert => { "body_bytes_sent" => "integer" }
      remove_field => ["time_local"]
    }
  }
   if [type] == "system"{
     grok {
      match => {
        "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
      }
      remove_field => ["message"]
    }
    date {
      match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
      target => "@timestamp"
    }
    mutate {
      remove_field => ["timestamp"]
    }
  }
   if [type] == "mysql"{
     grok {
      match => {
        }
      remove_field => ["message"]

     }
     date {
       match => ["timestamp", "yyMMdd HH:mm:ss"]
       target => "@timestamp"
     }
     mutate {
       remove_field => ["timestamp"]
     }

   }

   if [type] == "tomcat" {
    grok {
      match => {
        "message" => '(?<timestamp>\d+-[a-zA-Z]+-\d+ \d+:\d+:\d+.\d+) %{NOTSPACE:loglevel} %{NOTSPACE:thread}(?<loginfo>(\s+.*)+)'
      }
      remove_field => ["message"]
    }
    date {
      match => ["time_local", "dd-MMM-yyyy HH:mm:ss.SSS"]
      target => "@timestamp"
    }
    mutate {
      remove_field => ["timestamp"]
    }
  }

}
output {
  if [type] == "mysql"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testmysql-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "tomcat"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testtomcat-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "nginx"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testnginx-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "system"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testsystem-%{+YYYY.MM.dd}"
    }
  }
}      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
input {
  kafka {
    bootstrap_servers => "172.17.166.217:9092,172.17.166.218:9092,172.17.166.219:9092"
    topics => ["sjg"]
    group_id => "sjggroup"
    codec => "json"
  }
}
filter {
  if [type] == "nginx" {
    grok {
      match => {
        "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
      }
      remove_field => ["message"]
    }
    date {
      match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }
    mutate {
      gsub => [ "http_user_agent",'"','' ]
      convert => { "status" => "integer" }
      convert => { "body_bytes_sent" => "integer" }
      remove_field => ["time_local"]
    }
  }
   if [type] == "system"{
     grok {
      match => {
        "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
      }
      remove_field => ["message"]
    }
    date {
      match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
      target => "@timestamp"
    }
    mutate {
      remove_field => ["timestamp"]
    }
  }
   if [type] == "mysql"{
     grok {
      match => {
        }
      remove_field => ["message"]

     }
     date {
       match => ["timestamp", "yyMMdd HH:mm:ss"]
       target => "@timestamp"
     }
     mutate {
       remove_field => ["timestamp"]
     }

   }

   if [type] == "tomcat" {
    grok {
      match => {
        "message" => '(?<timestamp>\d+-[a-zA-Z]+-\d+ \d+:\d+:\d+.\d+) %{NOTSPACE:loglevel} %{NOTSPACE:thread}(?<loginfo>(\s+.*)+)'
      }
      remove_field => ["message"]
    }
    date {
      match => ["time_local", "dd-MMM-yyyy HH:mm:ss.SSS"]
      target => "@timestamp"
    }
    mutate {
      remove_field => ["timestamp"]
    }
  }

}
output {
  if [type] == "mysql"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testmysql-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "tomcat"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testtomcat-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "nginx"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testnginx-%{+YYYY.MM.dd}"
    }
  }
  if [type] == "system"{
    elasticsearch {
      hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200","http://172.17.166.219:9200"]
      user => "elastic"
      password => "1.Q1.Q1.Q"
      index => "testsystem-%{+YYYY.MM.dd}"
    }
  }
}      
ELK叢集之logstash(5)
ELK叢集之logstash(5)
input {
  kafka {
    bootstrap_servers => "172.17.166.217:9092,172.17.166.218:9092,172.17.166.219:9092"
    topics => ["test2"]
    group_id => "test2group"
    codec => "json"
  }
}
filter {
  mutate {
    remove_field => ["agent","ecs","log","input","[host][os]"]
  }
}
#output {
#  elasticsearch {
#    hosts => ["http://172.17.166.217:9200", "http://172.17.166.218:9200", "http://172.17.166.219:9200"]
#    user => "elastic"
#    password => "1.Q1.Q1.Q"
#    index => "testmetric-%{+YYYY.MM.dd}"
#  }
#}
output {
  if [service][type] == "mysql" {
    elasticsearch {
      hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
      user => "elastic"
      password => "sjgpwd"
      index => "sjgmysql-%{+YYYY.MM.dd}"
    }
  }
  else if [service][type] == "redis" {
    elasticsearch {
      hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
      user => "elastic"
      password => "sjgpwd"
      index => "sjgredis-%{+YYYY.MM.dd}"
    }
  }
  else {
    elasticsearch {
      hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
      user => "elastic"
      password => "sjgpwd"
      index => "sjgother-%{+YYYY.MM.dd}"
    }
  }
}      

繼續閱讀