天天看點

logstash解析日志-multiline多行日志解析示例前言multiline示例

前言

Codec 是 logstash 從 1.3.0 版開始新引入的概念(Codec 來自 Coder/decoder 兩個單詞的首字母縮寫)。

在此之前,logstash 隻支援純文字形式輸入,然後以過濾器處理它。但現在,我們可以在輸入 期處理不同類型的資料,這全是因為有了 codec 設定。

是以,這裡需要糾正之前的一個概念。Logstash 不隻是一個

input | filter | output

 的資料流,而是一個 

input | decode | filter | encode | output

 的資料流!codec 就是用來 decode、encode 事件的。

codec 的引入,使得 logstash 可以更好更友善的與其他有自定義資料格式的運維産品共存,比如 graphite、fluent、netflow、collectd,以及使用 msgpack、json、edn 等通用資料格式的其他産品等。

事實上,我們在第一個 “hello world” 用例中就已經用過 codec 了 —— rubydebug 就是一種 codec!雖然它一般隻會用在 stdout 插件中,作為配置測試或者調試的工具。

multiline示例

下面記錄一個工作中實際用到的多行日志解析:

Logging Time:2018-01-05 09:29:06,303 Thread Id:[131] Logging Level:ERROR Logging Type: Plugin
Operation User:admin
Operation Type:
Plugin Type:first detail Plugin Version:1.0.0.0
Logging Description:ce5-1 process ce5-1_603096 fai
Exception Information:System.Exception: calculate error

Logging Time:2018-01-05 09:29:06,303 Thread Id:[131] Logging Level:ERROR Logging Type: Plugin
Operation User:admin
Operation Type:
Plugin Type:first detail Plugin Version:1.0.0.0
Logging Description:ce5-1 process ce5-1_603096 fai
Exception Information:System.Exception: calculate error
           

 需要解析出來的格式為:

時間:2018-01-05 09:29:06,303
子產品:Plugin+1.0.0.0
詳情:Logging Description + Exception Information
日志級别:ERROR
使用者名:admin
           

配置檔案logstash-shipper-Test.conf為:

input{
	file {
		path => "D:/TestLog/*.log"
		codec => multiline {
			#charset => "GB2312"
          	 pattern => "^Logging Time"
              negate => true
            	what => "previous"
        	}
		type=> "TAS"
		sincedb_path => "D:/TestLog/Test_sincedb"
	}
}
filter {	 
 	if [type] == "Test" {
  		grok {
			patterns_dir => ["./patterns"]
        		match => { "message" => "(?m)Logging%{SPACE}Time:%{TIMEPATTERN:time}%{SPACE}Thread%{SPACE}Id:\[%{NUMBER:pid}\]%{SPACE}Logging%{SPACE}Level:%{LOGLEVEL:level}%{SPACE}Logging%{SPACE}Type:%{GREEDYDATA:ossType}?%{SPACE}Operation%{SPACE}User:%{GREEDYDATA:user}?%{SPACE}Operation%{SPACE}Type:%{USERNAME:operate_type}?%{SPACE}Plugin%{SPACE}Type:%{GREEDYDATA:operateName}%{SPACE}Plugin%{SPACE}Version:%{GREEDYDATA:logVersion}?%{SPACE}Logging%{SPACE}Description:%{GREEDYDATA:detail}?%{SPACE}Exception%{SPACE}Information:%{GREEDYDATA:excp_detail}" }
		}
		mutate{
			add_field => {"system" => "%{type}"}
			add_field => {"module" => "system"}
		}	
		mutate {
			update=>{"module"=>"%{operateName}"}    	
  		}
  		 mutate {
			merge => {"module" => "logVersion"} 
			merge => {"detail" => "excp_detail"}   	
  		}
		mutate {
			join => {"module" => "+"}
			join => {"detail" => "+"}
   		}
		
		mutate {
		     gsub => ["module","\r\n",""] 	
   		}
		
		date{
			match => ["time","yyyy-MM-dd HH:mm:ss,SSS"]
			locale => "en"
			#timezone => "Etc/GMT-8"
			target => "@timestamp"
			add_tag => ["tmatch"]
		}
		mutate{
			remove_field => ["excp_detail"]
		}
  	}
}
output {
  redis{
	host => "127.0.0.1"
	port => 6379
	key => "logstash-Test"
    data_type => "list"
  }	
   stdout {
        codec => rubydebug
    }
}
           

繼續閱讀