天天看点

logstash实战之apacheLogs和csv

实战apache日志

时间处理:

"@timestamp" => --T03::Z        Logstash写入日志的时间
"timestamp" => "17/May/2015:10:05:47 +0000"     apache日志时间
           

需求:将

timestamp

赋值给

@timestamp

字段,但保留日志写入时间

解决办法:

(1)使用mutate插件:

mutate {
   copy => { "@timestamp" => "@read_timestamp" }
}
           

(2)使用ruby插件:

ruby {
    code => "event.set('@read_timestamp',event.get('@timestamp'))"
}
           
filter {
        grok {
                match => { "message" => "%{COMBINEDAPACHELOG}"}
        }
        ruby {
                code => "event.set('@read_timestamp',event.get('@timestamp'))"
        }
        date {
                match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z" ]
        }
        mutate {
                remove_field => ["message","timestamp"]
        }
}   
           

(3)使用

@metadata

,其内容不会输出到output中

  • 适合用来存储做条件判断、临时存储的字段
  • 相比

    remove_field

    有一定的性能提升
filter {
        grok {
                #match => { "message" => "%{COMBINEDAPACHELOG}"}
                match => {"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:[@metadata][timestamp]}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}'}
        }
        ruby {
                code => "event.set('@read_timestamp',event.get('@timestamp'))"
        }
        date {
                #match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z" ]
                match => ["[@metadata][timestamp]","dd/MMM/yyyy:HH:mm:ss Z"]
        }
           
logstash实战之apacheLogs和csv

完整配置:

input {
        file {
                path => "/opt/apache.log"
                start_position => "beginning"
        }
}
filter {
        #mutate{add_field => {"[@metadata][debug]"=>true}}
        grok {
                #match => { "message" => "%{COMBINEDAPACHELOG}"}
                match => {"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:[@metadata][timestamp]}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}'}
        }
        ruby {
                code => "event.set('@read_timestamp',event.get('@timestamp'))"
        }
        date {
                #match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z" ]
                match => ["[@metadata][timestamp]","dd/MMM/yyyy:HH:mm:ss Z"]
        }
        mutate {
                convert => {"bytes" => "integer"}
        }
        geoip {
                source => "clientip"
                fields => ["location","country_name","city_name","region_name"]
        }
        useragent {
                source => "agent"
                target => "useragent"
        }
        #mutate { remove_field => ["timestamp"]}
        mutate {
                add_field => { "[@metadata][index]" => "apache_logs-%{+YYYY.MM}"}
        }
        if "_grokparsefailure" in [tags] {
                mutate {
                        replace => { "[@metadata][index]" => "apache_logs_failure-%{+YYYY.MM}"}
                }
        }else {
                mutate{remove_field=>["message"]}
        }
}
output {
        if [@metadata][debug]{
                stdout{codec=>rubydebug{metadata=>true}}
        }else{
                stdout{codec=>dots}
                elasticsearch {
                        hosts => [ "192.168.20.101:9200","192.168.20.102:9200" ]
                        index => "%{[@metadata][index]}"
                        user => "elastic"
                        password => "123456"
                }
        }
}       
           

收集csv类型的数据

input{
    file{
        path => "/opt/earthquakes/earthquakes.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}
filter{
    csv{
        columns => ["timestamp","latitude","longitude","depth","mag","magType","nst","gap","dmin","rms","source","event_id"]
        convert => {"latitude" => "float"}
        convert => {"longitude" => "float"}
        convert => {"depth" => "float"}
        convert => {"mag" => "float"}
        convert => {"gap" => "float"}
        convert => {"dmin" => "float"}
        convert => {"rms" => "float"}
    }
    mutate{
        add_field => {"location"=>"%{latitude},%{longitude}"}
        remove_field => [ "latitude","longitude"]
    }
    #2016/01/01 00:30:04.91
    date{
        match => ["timestamp","yyyy/MM/dd HH:mm:ss.SS"]
        remove_field => ["timestamp"]
    }
}
output{
    elasticsearch{
        hosts => [ "192.168.20.101:9200","192.168.20.102:9200" ]
        index => "earthquake"
        user => "elastic"
        password => "123456"
    }
    stdout{codec=>rubydebug}
}
           

继续阅读