logstash的關鍵配置.這個配置檔案的目的是這樣的:
1.使用udp端口9998 接受來自資料源的資訊;
2.通過filter插件,先把message轉成json格式,這一步很重要,很多文章裡都不提這個;
3.判斷接受到的資料裡,是否包含app_name這個key,如果包含,就把app_name裡的值提出來,和目前日期組合到一起,作為傳入elasticsearch的index名字。如果不包含,就生成一個字元串non_index_log作為elasticsearch的index名字;
4.把日志轉發到elasticsearch裡,同時列印在控制台裡
input {
udp {
port => 9998
}
}
filter {
json {
source => "message"
}
if [app_name] {
mutate {
add_field => {
"index_name" => "%{app_name}-%{+YYYY.MM.dd}"
}
}
} else {
mutate {
add_field => {
"index_name" => "non_index_log"
}
}
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch-master:9200"]
index => "logstash-roc-%{index_name}"
}
stdout { codec => rubydebug }
}
為了測試這樣配置之後的logstash效果如何,我們可以使用一個python代碼
# All rights reserved.
#
# filename : example2.py
# description :
#
# created by zhenwei.Li at 2020/9/2 15:52
import logging
import logstash
import sys
host = '192.168.0.12'
test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.LogstashHandler(host, 9998, version=1))
test_logger.error('python-logstash: test logstash error message.')
# add extra field to logstash message
extra = {
'app_name': 'temp-lizhenwei-python2',
'test_string': 'python version: ' + repr(sys.version_info),
'test_boolean': True,
'test_dict': {'a': 1, 'b': 'c'},
'test_float': 1.23,
'test_integer': 123,
'type': 'temp-lishuai-python3'
}
test_logger.info('python-logstash: test extra fields', extra=extra)
通過運作上述代碼,我們可以去檢查logstash的控制台是否産生了index_name,理論上來說,會産生一個
"index_name" => "temp-lizhenwei-python2-2020.09.03"和"index_name" => "non_index_log"
如下圖所示

最後,我們去kibana裡面檢查一下elasticsearch的index,确實産生了我們需要的兩個index