说明
版本:5.6.1
见官网
对比flume
- 不重复消费,数据不丢失
- 目前flume支持hdfs比较好(个人理解)
离线安装
先配置JAVA_HOME
下载解压即可
标准输入输出
文件到标准输出
#启动
bin/logstash -f conf/file-stdout.conf
#说明:conf是自创目录,file-stdout.conf内容:
input {
file {
path => "/home/bingo/data/test.log"
start_position => "beginning"
ignore_older =>
}
}
output {
stdout{}
}
#多文件
path => "/home/bingo/data/*.log"
#多目录
path => "/home/bingo/data/*/*.log"
#参数说明
start_position:默认end,是从文件末尾开始解析
ignore_older:默认超过小时的日志不解析,表示不忽略任何过期日志
执行命令后会看到控制台输出log文件的内容
退出
ctrl+c
再次启动,可以看到刚才打印的内容不再打印,即不重复消费
在另一个session连接终端往test.log追加内容
echo "updating" > test.log
可以看到追加的内容输出到控制台
消息前面带有时间和主机名,如: 2017-09-23T00:56:36.670Z node02 updating
上游到elasticsearch
bin/logstash -f conf/flow-es.conf
#flow-es.conf内容
input {
file {
type => "flow"
path => "/home/bingo/data/logstash/logs/*/*.txt"
discover_interval =>
start_position => "beginning"
}
}
output {
if [type] == "flow" {
elasticsearch {
index => "flow-%{+YYYY.MM.dd}"
hosts => ["master01:9200", "worker01:9200", "worker02:9200"]
}
}
}
上游到kafka
#控制台源
bin/logstash -e 'input { stdin {} } output { kafka { topic_id => "test" bootstrap_servers => "node01:9092,node02:9092,node03:9092"} }'
#输入
spark <enter>
#文件源
bin/logstash -f conf/flow-kafka.conf
#flow-kafka.conf内容
input {
file {
path => "/home/bingo/data/logstash/logs/*/*.txt"
discover_interval =>
start_position => "beginning"
}
}
output {
kafka {
topic_id => "testlog"
codec => plain {
format => "%{message}"
charset => "UTF-8"
}
bootstrap_servers => "node01:9092,node02:9092,node03:9092"
}
}
kafka端消费数据
bin/kafka-console-consumer.sh --zookeeper localhost: --topic test --from-beginning
#输出
{"message":"spark","@version":"1","@timestamp":"2017-03-25T18:55:47.814Z","host":"worker01"}
#如果启动多个消费端,每个消费端都会有相同的输出
kafka到elasticsearch
bin/logstash -f conf/kafka-es.conf
#kafka-es.conf内容
input {
kafka {
type => "level-one"
auto_offset_reset => "smallest"
codec => plain {
charset => "UTF-8"
}
group_id => "es"
topic_id => "testlog"
zk_connect => "node01:2181,node02:2181,node03:2181"
}
}
filter {
mutate {
split => { "message" => " " }
add_field => {
"event_type" => "%{message[3]}"
"current_map" => "%{message[4]}"
"current_X" => "%{message[5]}"
"current_y" => "%{message[6]}"
"user" => "%{message[7]}"
"item" => "%{message[8]}"
"item_id" => "%{message[9]}"
"current_time" => "%{message[12]}"
}
remove_field => [ "message" ]
}
}
output {
elasticsearch {
index => "testlog-%{+YYYY.MM.dd}"
codec => plain {
charset => "UTF-8"
}
hosts => ["node01:9200", "node02:9200", "node03:9200"]
}
}
。。。更新中。。。