為了實作各業務平台日志資訊采集到大資料平台hdfs上。之前的定技術棧是 flume->kafka->storm->hdfs. 其中通過storm需要寫代碼,穩定性,可擴充性,維護性不好。
從kafka到hdfs,有專門的日志工具logstash可以解決這個問題. 目前已經在我們的開發環境穩定的運作了一個禮拜(http://192.168.23.31:50070/explorer.html#/data/logstash)。
現請部署到生産環境。
logstash安裝配置如下:
1. logstash下載下傳安裝, 下載下傳比較慢的話,可以傳給你。
wget -c https://download.elastic.co/logstash/logstash/packages/centos/logstash-2.3.4-1.noarch.rpm
rpm -ivh logstash-2.3.4-1.noarch.rpm
2. logstash hdfs插件下載下傳安裝
git clone https://github.com/heqin5136/logstash-output-webhdfs-discontinued.git
cd logstash-output-webhdfs-discontinued
/opt/logstash/bin/plugin install logstash-output-webhdfs
3.logstash配置
vim /etc/logstash/conf.d/logstash.conf
input {
kafka {
zk_connect =>"192.168.1.50:2181,192.168.1.51:2181,192.168.1.52:2181" #kafka的zk叢集位址,請改為生産環境
group_id => "hdfs" #消費者組,不要和ELK上的消費者一樣
topic_id => "flume_kafka_channel_topic" #kafka topic,改為生産環境
consumer_id => "logstash-consumer-192.168.23.31" #消費者id,自定義
consumer_threads => 1
queue_size => 200
codec => plain{ charset => "UTF-8" }
auto_offset_reset => "smallest"
}
}
filter {
grok {
match => { "message" =>
#"%{TIMESTAMP_ISO8601:date} (?<thread_name>.+?\bhost\b.+?)(?<thread>.+?\bu001Cbi\b)(?<action>.+?\baction\b)(?<type>.+?\btype\b)(?<content>.*)"
"(?<thread>.+?\bu001Cbi\b)(?<action>.+?\baction\b)(?<type>.+?\btype\b)(?<content>.*)"
}
}
}
output {
#如果你一個topic中會有好幾種日志,可以提取出來分開存儲在hdfs上。
if [action] == "-action" and [type] == "-type" {
webhdfs {
workers => 2
host => "192.168.23.31" #hdfs的namenode位址,改為生産環境
port => 50070 #webhdfs端口
user => "root" #hdfs運作的使用者啊,以這個使用者的權限去寫hdfs。
path => "/data/logstash/log-%{+YYYY}-%{+MM}/apiLog-%{+YYYY}-%{+MM}-%{+dd}.log" #按月建目錄,按天建log檔案。
flush_size => 500
#compression => "snappy" #壓縮格式,可以不壓縮
idle_flush_time => 10
retry_interval => 0.5
codec => plain{ charset => "UTF-8" }
}
}
}
4. logstash 配置檢查,啟動,停止
/etc/init.d/logstash configtest start stop