logstash是什麼就不介紹了,具體看代碼
1.input為elasticsearch
input {
elasticsearch {
hosts => "192.168.1.16:9200" //這裡是你es的IP位址和端口号
index => "position" //索引名
size => 10000 //每次刷入的量
query => '{"query":{"bool":{"disable_coord":false,"adjust_pure_negative":true,"boost":1}},"_source":{"includes":["_id","ent_status","formatted_address","dom","city","adcode","level","ent_type","city_code","data_date","update_date","pripid","province","entname","district","location"]}}' //需要查詢的條件
scroll => "5m"
docinfo => true
}
}
2.filter對input進來的資料做操作
資料格式如下圖
filter {
//上述中lat和lon是裡層的資料,需要對location做處理,新增一個屬性,json化一下這樣就和外層的資料在一個級别,便于我後期取值
mutate {
add_field => {
"local_value" => "%{location}"
}
}
json {
source => "local_value"
remove_field => ["location","local_value"]
}
//取出來的資料如果有的沒有這個字段,後期取值回事%{data_date},為了避免如果沒有這個字段的我們新增一個屬性
if![data_date]{
mutate {
add_field => {
"data_date" => ""
}
}
}
}
3.output,以csv的格式放到hdfs上
output {
webhdfs {
host => "192.168.100.11"
port => 2222
user => hadoop
flush_size => 5000
idle_flush_time => 10
path => "/tmp/position/test-%{+YYYY}-%{+MM}-%{+dd}/position-%{+HH}.csv"
//以下為需要的字段,且字段之間使用\u0001來分隔
codec => plain {
format => "%{[@metadata]_id}%{entname}%{pripid}%{ent_type}%{ent_status}%{adcode}%{city_code}%{formatted_address}%{district}%{province}%{city}%{lon}%{lat}%{level}%{dom}%{data_date}%{update_date}"
}
}
}