天天看点

logstash-jdbc的一次坑-sql数据库索引数据到elasticsearch时间字段格式化

      这次业务是需要从sqlserver中通过logstash-jdbc把数据索引到ES中,记录一下遇到的一个坑,百度谷歌都找不到答案。

首先设置数据库的时间字段,因为项目字段太多这里用一个字段就行

logstash-jdbc的一次坑-sql数据库索引数据到elasticsearch时间字段格式化

字段的类型为datatime类型

然后是logstash的一开始的配置

input {
    jdbc {
        jdbc_driver_library => "/opt/elk/logstash-6.1.0/jdbc/sqljdbc4.jar"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://192.169.2.204:1433;database=pancm_db;"
        jdbc_user => "sa"
        jdbc_password => "123zxcvbnm,./"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        schedule => "* * * * *"
        record_last_run => "true"
       #lowercase_column_names => "false"
        last_run_metadata_path => "/opt/elk/logstash-6.1.0/config/templates/logstash_jdbc_last_run"
        clean_run => "false"
        statement_filepath => "/opt/elk/logstash-6.1.0/config/templates/pb_valid_phone.sql"
    }
}
filter {
      mutate{
        remove_field => [ "message","@timestamp"]
    }
 }
output {
   elasticsearch {
     hosts => ["http://192.169.2.188:9200","http://192.169.2.156:9200"]
     index => "pb_valid_phone"
     document_type => "pb_valid_phone"
     document_id => "%{phone}"
     pool_max => 1000
     manage_template => true
     template_name => "pb_valid_phone"
     template_overwrite => true
     template => "/opt/elk/logstash-6.1.0/config/templates/pb_valid_phone.json"
 }
}
           

我的pb_valid_phone.sql的sql内容为

SELECT * FROM PB_VALID_PHONE  WHERE LASTUPDATETM > :sql_last_value OR LASTDLTM > :sql_last_value
           

好,万事俱备,启动

得一启动报了如下的错。因为我的数据模版里面期望的时间格式是yyyy-MM-dd HH:mm:ss.SSS,所以才会报如下的错误

[2018-03-27T14:26:00,191][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>"123456789102", :_index=>"pb_valid_phone", :_type=>"pb_valid_phone", :_routing=>nil}, #<LogStash::Event:0xbf2fb0>], :response=>{"index"=>{"_index"=>"pb_valid_phone", "_type"=>"pb_valid_phone", "_id"=>"123456789102", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse [lastdltm]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Invalid format: \"2018-03-20T01:49:47.000Z\" is malformed at \"T01:49:47.000Z\""}}}}
           
logstash-jdbc的一次坑-sql数据库索引数据到elasticsearch时间字段格式化

各种找原因,还是不行,一开始也想在过滤器那里做手脚还是不行

后面发现先把数据库的时间字符串,然后在格式化成时间索引进ES就可以

修改一下SQL语句就行

SELECT
	CONVERT (VARCHAR (30),LASTUPDATETM,25) AS LASTUPDATETM,
FROM
	PB_VALID_PHONE 
WHERE LASTUPDATETM > :sql_last_value OR LASTDLTM > :sql_last_value
           

这样的做的好处还有一个就是不会导致时区的问题,你数据库是什么时间,存进es就是什么时间。

记录下来。方便踩坑。