天天看點

logstash 從elasticsearch中以csv的資料格式到hdfs

logstash是什麼就不介紹了,具體看代碼

1.input為elasticsearch

input {
  elasticsearch {
    hosts => "192.168.1.16:9200"  //這裡是你es的IP位址和端口号
    index => "position"      //索引名
    size => 10000      //每次刷入的量
     query => '{"query":{"bool":{"disable_coord":false,"adjust_pure_negative":true,"boost":1}},"_source":{"includes":["_id","ent_status","formatted_address","dom","city","adcode","level","ent_type","city_code","data_date","update_date","pripid","province","entname","district","location"]}}'     //需要查詢的條件
    scroll => "5m"
    docinfo => true
  }
}
           

2.filter對input進來的資料做操作

資料格式如下圖

logstash 從elasticsearch中以csv的資料格式到hdfs
filter {
//上述中lat和lon是裡層的資料,需要對location做處理,新增一個屬性,json化一下這樣就和外層的資料在一個級别,便于我後期取值
  mutate {
   add_field => {
     "local_value" => "%{location}"
     }
  }
  json {
  source => "local_value"
  remove_field => ["location","local_value"]
  } 
  //取出來的資料如果有的沒有這個字段,後期取值回事%{data_date},為了避免如果沒有這個字段的我們新增一個屬性
 if![data_date]{
   mutate {
    add_field => {
      "data_date" => ""
     }
   }
 }
}   
           

3.output,以csv的格式放到hdfs上

output {
  webhdfs { 
    host => "192.168.100.11"
    port => 2222
    user => hadoop
    flush_size => 5000
    idle_flush_time => 10 
    path => "/tmp/position/test-%{+YYYY}-%{+MM}-%{+dd}/position-%{+HH}.csv"
    //以下為需要的字段,且字段之間使用\u0001來分隔
    codec => plain {
     format => "%{[@metadata]_id}%{entname}%{pripid}%{ent_type}%{ent_status}%{adcode}%{city_code}%{formatted_address}%{district}%{province}%{city}%{lon}%{lat}%{level}%{dom}%{data_date}%{update_date}"
  }
}
}
           

繼續閱讀