天天看点

Logstash学习22_Logstash的multiline 插件:匹配多行日志 本文内容 测试数据 字段属性 按多行解析运行时日志 把多行日志解析到字段 参考资料

本文内容

  • 测试数据
  • 字段属性
  • 按多行解析运行时日志
  • 把多行日志解析到字段
  • 参考资料

在处理日志时,除了访问日志外,还要处理运行时日志,该日志大都用程序写的,比如 log4j。运行时日志跟访问日志最大的不同是,运行时日志是多行,也就是说,连续的多行才能表达一个意思。

本文主要说明,如何用 multiline 出来运行日志。代码如下(必须在filter中,否则结果还是不能合并行):

filter {      
multiline { )      
}      
如果能按多行处理,那么把他们拆分到字段就很容易了。

测试数据

[16-04-12 03:40:01 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.      
[16-04-12 03:40:02 DEBUG] impl.JdbcEntityInserter:- from product_category product_category      
where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null      
order by product_category.ORDERS asc      
[16-04-12 03:40:03 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.      
[16-04-12 03:40:04 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.      
[16-04-12 03:40:05 DEBUG] impl.JdbcEntityInserter:- from product_category product_category      
where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null      
order by product_category.ORDERS desc      
[16-04-12 03:40:06 DEBUG] impl.JdbcEntityInserter:- from product_category product_category      
where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null      
order by product_category.ORDERS asc      
[16-04-12 03:40:07 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.      

测试是在7秒内发生的(当然是假数据)。可以看到,第二、五、六秒的日志是多行的,有条SQL语句。其他是单行的。

字段属性

对 multiline 插件来说,有三个设置比较重要:negate、pattern 和 what。

negate

  • 类型是 boolean
  • 默认为 

    false

否定正则表达式(如果没有匹配的话)。

pattern

  • 必须设置
  • 类型为 string
  • 没有默认值
要匹配的正则表达式。
what
  • 必须设置
  • 可以为 

    previous 或

    next

  • 没有默认值
如果正则表达式匹配了,那么该事件是属于下一个或是前一个事件?

按多行解析运行时日志

  • sincedb_path

如果你不想用默认的 

$HOME/.sincedb

(Windows 平台上在

C:\Windows\System32\config\systemprofile\.sincedb

),可以通过这个配置定义 sincedb 文件到其他位置。

  • sincedb_write_interval

logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。

  • stat_interval

logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。

  • start_position

logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 

tail -F

 的形式运行。如果你是要导入原有数据,把这个设定改成 "beginning",logstash 进程就从头开始读取,有点类似

cat

,但是读到最后一行不会终止,而是继续变成 

tail -F

示例1:若配置文件如下所示,

input {      
file{      
path=>"/usr/local/elk/logstash/logs/c.out"      
type=>"runtimelog"      
codec=> multiline {      
pattern => "^\["      
negate => true      
what => "previous"      
}      
start_position=>"beginning"      
sincedb_path=>"/usr/local/elk/logstash/sincedb-access"      
ignore_older=>0      
}      
}      
output{      
stdout{      
codec=>rubydebug      
}      
}      
说明:匹配以“[”开头的行,如果不是,那肯定是属于前一行的。

解析结果如下所示,能解析出6个JSON:

{      
"@timestamp" => "2016-06-01T04:37:43.147Z",      
"message" => "[16-04-12 03:40:01 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T04:37:43.152Z",      
"message" => "[16-04-12 03:40:02 DEBUG] impl.JdbcEntityInserter:- from product_category product_category\nwhere product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null\norder by product_category.ORDERS asc",      
"@version" => "1",      
"tags" => [      
[0] "multiline"      
],      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T04:37:43.152Z",      
"message" => "[16-04-12 03:40:03 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T04:37:43.155Z",      
"message" => "[16-04-12 03:40:04 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T04:37:43.157Z",      
"message" => "[16-04-12 03:40:05 DEBUG] impl.JdbcEntityInserter:- from product_category product_category\nwhere product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null\norder by product_category.ORDERS desc",      
"@version" => "1",      
"tags" => [      
[0] "multiline"      
],      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T04:37:43.159Z",      
"message" => "[16-04-12 03:40:06 DEBUG] impl.JdbcEntityInserter:- from product_category product_category\nwhere product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null\norder by product_category.ORDERS asc",      
"@version" => "1",      
"tags" => [      
[0] "multiline"      
],      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      

解析时,最后一行日志,不会解析。只有当再追加一条日志时,才会解析最后一条日志。

示例2:若将配置文件修改为,

input {      
file{      
path=>"/usr/local/elk/logstash/logs/c.out"      
type=>"runtimelog"      
codec=>multiline  {      
pattern => "^\["      
negate => true      
what => "next"      
}      
start_position=>"beginning"      
sincedb_path=>"/usr/local/elk/logstash/sincedb-access"      
ignore_older=>0      
}      
}      
output{      
stdout{      
codec=>rubydebug      
}      
}      

解析结果为,能解析出7个JSON:

{      
"@timestamp" => "2016-06-01T04:40:43.232Z",      
"message" => "[16-04-12 03:40:01 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T04:40:43.237Z",      
"message" => "[16-04-12 03:40:02 DEBUG] impl.JdbcEntityInserter:- from product_category product_category",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T04:40:43.238Z",      
"message" => "where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null\norder by product_category.ORDERS asc\n[16-04-12 03:40:03 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.",      
"@version" => "1",      
"tags" => [      
[0] "multiline"      
],      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T04:40:43.239Z",      
"message" => "[16-04-12 03:40:04 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T04:40:43.244Z",      
"message" => "[16-04-12 03:40:05 DEBUG] impl.JdbcEntityInserter:- from product_category product_category",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T04:40:43.245Z",      
"message" => "where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null\norder by product_category.ORDERS desc\n[16-04-12 03:40:06 DEBUG] impl.JdbcEntityInserter:- from product_category product_category",      
"@version" => "1",      
"tags" => [      
[0] "multiline"      
],      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T04:40:43.249Z",      
"message" => "where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null\norder by product_category.ORDERS asc\n[16-04-12 03:40:07 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.",      
"@version" => "1",      
"tags" => [      
[0] "multiline"      
],      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      

示例3:若将配置文件修改为,

codec=>multiline  {      
pattern => "^\["      
negate => false      
what => "previous"      
}      

则解析结果为:

{      
"@timestamp" => "2016-06-01T05:38:50.853Z",      
"message" => "[16-04-12 03:40:01 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.\n[16-04-12 03:40:02 DEBUG] impl.JdbcEntityInserter:- from product_category product_category",      
"@version" => "1",      
"tags" => [      
[0] "multiline"      
],      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T05:38:50.856Z",      
"message" => "where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T05:38:50.858Z",      
"message" => "order by product_category.ORDERS asc\n[16-04-12 03:40:03 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.\n[16-04-12 03:40:04 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.\n[16-04-12 03:40:05 DEBUG] impl.JdbcEntityInserter:- from product_category product_category",      
"@version" => "1",      
"tags" => [      
[0] "multiline"      
],      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T05:38:50.860Z",      
"message" => "where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T05:38:50.861Z",      
"message" => "order by product_category.ORDERS desc\n[16-04-12 03:40:06 DEBUG] impl.JdbcEntityInserter:- from product_category product_category",      
"@version" => "1",      
"tags" => [      
[0] "multiline"      
],      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      
{      
"@timestamp" => "2016-06-01T05:38:50.863Z",      
"message" => "where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog"      
}      

把多行日志解析到字段

配置文件如下所示:

input {      
file{      
path=>"/usr/local/elk/logstash/logs/c.out"      
type=>"runtimelog"      
codec=>multiline  {      
pattern => "^\["      
negate => true      
what => "previous"      
}      
start_position=>"beginning"      
sincedb_path=>"/usr/local/elk/logstash/sincedb-access"      
ignore_older=>0      
}      
}      
filter {      
multiline { 
		pattern => "^\["  
		negate => true 
		what => "previous" 
	}
  	date {
    		match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]
    		locale => "cn"
  	}
      
grok {      
match=>["message","\[%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}\] %{GREEDYDATA:msg}"]      
}      
}      
output{      
stdout{      
codec=>rubydebug      
}      
}      

解析后结果:

{      
"@timestamp" => "2016-06-01T06:33:26.426Z",      
"message" => "[16-04-12 03:40:01 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog",      
"timestamp" => "16-04-12 03:40:01",      
"level" => "DEBUG",      
"msg" => "model.MappingNode:- ['/store/shopclass'] matched over."      
}      
{      
"@timestamp" => "2016-06-01T06:33:26.485Z",      
"message" => "[16-04-12 03:40:02 DEBUG] impl.JdbcEntityInserter:- from product_category product_category\nwhere product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null\norder by product_category.ORDERS asc",      
"@version" => "1",      
"tags" => [      
[0] "multiline"      
],      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog",      
"timestamp" => "16-04-12 03:40:02",      
"level" => "DEBUG",      
"msg" => "impl.JdbcEntityInserter:- from product_category product_category\nwhere product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null\norder by product_category.ORDERS asc"      
}      
{      
"@timestamp" => "2016-06-01T06:33:26.491Z",      
"message" => "[16-04-12 03:40:03 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog",      
"timestamp" => "16-04-12 03:40:03",      
"level" => "DEBUG",      
"msg" => "model.MappingNode:- ['/store/shopclass'] matched over."      
}      
{      
"@timestamp" => "2016-06-01T06:33:26.492Z",      
"message" => "[16-04-12 03:40:04 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.",      
"@version" => "1",      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog",      
"timestamp" => "16-04-12 03:40:04",      
"level" => "DEBUG",      
"msg" => "model.MappingNode:- ['/store/shopclass'] matched over."      
}      
{      
"@timestamp" => "2016-06-01T06:33:26.494Z",      
"message" => "[16-04-12 03:40:05 DEBUG] impl.JdbcEntityInserter:- from product_category product_category\nwhere product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null\norder by product_category.ORDERS desc",      
"@version" => "1",      
"tags" => [      
[0] "multiline"      
],      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog",      
"timestamp" => "16-04-12 03:40:05",      
"level" => "DEBUG",      
"msg" => "impl.JdbcEntityInserter:- from product_category product_category\nwhere product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null\norder by product_category.ORDERS desc"      
}      
{      
"@timestamp" => "2016-06-01T06:33:26.495Z",      
"message" => "[16-04-12 03:40:06 DEBUG] impl.JdbcEntityInserter:- from product_category product_category\nwhere product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null\norder by product_category.ORDERS asc",      
"@version" => "1",      
"tags" => [      
[0] "multiline"      
],      
"path" => "/usr/local/elk/logstash/logs/c.out",      
"host" => "vcyber",      
"type" => "runtimelog",      
"timestamp" => "16-04-12 03:40:06",      
"level" => "DEBUG",      
"msg" => "impl.JdbcEntityInserter:- from product_category product_category\nwhere product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null\norder by product_category.ORDERS asc"      
}      

参考资料

  • Logstash multiline

转载自:http://www.cnblogs.com/liuning8023/archive/2016/06/01/5549366.html