1、因為本項目采用的log4j2,是以在log4j2中直接配置
<Kafka name="Kafka" topic="XX_log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss}||%p||%c{1}||XX_web||%m%n"/>
<Property name="bootstrap.servers">127.0.0.1:9092</Property>
<Property name="timeout.ms">500</Property>
</Kafka>
PatternLayout 中格式采用了||将内容連接配接起來目的為了logstash進行切分,其中增加timeout.ms屬性為了保證日志系統挂掉的情況不會對業務系統産生較大影響,當然kafka可以采用叢集的方式,bootstrap.servers多個位址用“,”分隔。XX_web代表目前業務平台。
2、搭建kafka叢集這裡就不多介紹了,這裡已經介紹很全了
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
3、建立logstash動态模闆
{
"template": "*",
"settings": {
"index.refresh_interval": "5s",
"number_of_replicas": "0",
"number_of_shards": "3"
},
"mappings": {
"_default_": {
"_all": {
"enabled": false
},
"dynamic_templates": [
{
"message_field": {
"match": "message",
"match_mapping_type": "string",
"mapping": {
"type": "string",
"index": "analyzed"
}
}
},
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "string",
"index": "not_analyzed"
}
}
}
],
"properties": {
"dateTime": {
"type": "date",
"format": "yyy-MM-dd HH:mm:ss"
},
"@version": {
"type": "integer",
"index": "not_analyzed"
},
"context": {
"type": "string",
"index": "analyzed"
},
"level": {
"type": "string",
"index": "not_analyzed"
},
"class": {
"type": "string",
"index": "not_analyzed"
},
"server": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
4、配置logstash
input{
kafka {
zk_connect =>"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
group_id =>"logstash"
topic_id =>"XX_log"
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
filter {
mutate{
split=>["message","||"]
add_field => {
"dateTime" => "%{[message][0]}"
}
add_field => {
"level" => "%{[message][1]}"
}
add_field => {
"class" => "%{[message][2]}"
}
add_field => {
"server" => "%{[message][3]}"
}
add_field => {
"context" => "%{[message][4]}"
}
remove_field => ["message"]
}
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss"]
}
}
output{
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "XX_log-%{+YYYY-MM}"
codec => "json"
manage_template => true
template_overwrite => true
flush_size => 50000
idle_flush_time => 10
workers => 2
template => "E:\logstash\template\template_log.json"
}
}
按照年月将日志儲存進ES索引中index => "XX_log-%{+YYYY-MM}",logstash從kafka叢集中讀取日志資訊。
5、搭建ZK叢集,這裡就不多介紹了,網上資料比較多
6、搭建ES叢集,ES叢集比較簡單,設定的參數不要太多就可以使用。
7、配置kibana
server.port: 5601 # 服務端口
# The host to bind the server to.
server.host: "115.28.240.113"
elasticsearch.url: http://127.0.0.1:9200 ES位址-叢集
kibana.index: "kibana"
8、版本 JKD 1.7 ES-2.4, logstash 2.4, kafka-2.10,kibana-4.6.4
9、文章來源這裡