文章目錄
- 一、ElasticSearch 安裝
-
- 1.elasticsearch 單節點安裝
- 2.elasticsearch 分布式叢集安裝
- 3.elasticsearch 配置身份認證
- 二、Elasticsearch cerebro 展示工具安裝
- 三、Kibana 安裝
- 四、Filebeat 安裝(EFK 架構)
-
- 1.Filebeat 的基礎使用:
- 2.filebeat 收集 nginx
-
- 2.1.安裝 nginx
- 2.2.安裝 filebeat
-
- 2.2.1.收集 nginx 原生日志,推送給 es
-
- 2.2.2.1.filebeat 收集單個檔案
- 2.2.2.2.filebeat 收集多個檔案
-
- 注意事項:
- 2.2.2.3.kibana 展示 nginx 日志
- 2.2.2.收集 nginx json 日志,推送給 es
- 2.2.3.使用内置子產品收集 nginx 日志 -- 不好用,生産不建議使用...
- 2.2.4.收集 nginx 指定字段資訊,忽略其他
- 3.filebeat 收集 tomcat
-
-
- 3.1.使用内置子產品收集 tomcat 日志 -- 不好用,生産不建議使用...
- 3.2.收集 tomcat 原生日志
- 3.3.收集 tomcat json 日志
- 3.4.收集 tomcat 多行比對
-
- 4.filebeat 收集 nginx 日志儲存到本地
- 五、Logstash 安裝(ELFK 架構)
-
- 1.單節點/分布式叢集安裝logstash
- 2.修改 logstash 的配置檔案
- 3.logstash filter grok插件根據正則取出想要的字段
- 4.logstash filter date插件修改寫入時間
- 5.filebeat 收集 nginx,tomcat日志推送給logstash,logstash發送es
- 六、Kibana 自定義 dashboard
-
- 1.統計PV(名額)
- 2.統計用戶端IP(名額)
- 3.統計web下載下傳帶寬(名額)
- 4.通路頁面統計(水準條形圖)
- 5.IP的Top 5統計(餅圖)
- 6.統計後端IP服務通路高的Top 5(圓環圖)
- 7.最終效果圖
- 七、Kafka部署(ELFK架構配合Kafka)
-
- 1.kafka 單節點部署
-
- 1.1.zookeeper 單節點
- 1.2.kafka 單節點
- 1.3.filebeat 收集 nginx 日志發送給kafka,logstash消費kafka消息發送給es,Kibana最終展示
- 2.kafka 分布式叢集部署
-
- 2.1.zookeeper 叢集部署
- 2.2.kafka 叢集部署
- 2.3.filebeat 收集 tomcat 日志發送給kafka,logstash消費kafka消息發送給es,Kibana最終展示
- 2.4.filebeat收集 nginx,tomcat,mysql-slow日志發送 kafka,logstash grok 分析 nginx,發送給es,kibana展示
環境 | IP |
---|---|
ElasticSearch、Logstash、Kafka、Zookeeper | 172.16.3.226/21 |
ElasticSearch、Logstash、Kafka、Zookeeper | 172.16.3.227/21 |
ElasticSearch、Logstash、Kafka、Zookeeper | 172.16.3.228/21 |
Kibana、FileBeat、Nginx、Tomcat | 172.16.4.184/21 |
- 軟體包下載下傳位址
- ELFK版本:點選這裡可以搜尋自己想要下載下傳的版本
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.5-x86_64.rpm
https://artifacts.elastic.co/downloads/kibana/kibana-7.17.5-x86_64.rpm
https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.17.5-x86_64.rpm
https://artifacts.elastic.co/downloads/logstash/logstash-7.17.5-x86_64.rpm
https://dlcdn.apache.org/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz
https://downloads.apache.org/kafka/3.2.1/kafka_2.12-3.2.1.tgz
https://114-233-226-9.d.123pan.cn:30443/123-676/913c4533/1811652132-0/913c45332b22860b096217d9952c2ea4?v=3&t=1662523894&s=ac456641406e505eab6019bc617d3e28&i=d3a74ca9&filename=jdk-8u333-linux-x64.tar.gz&d=c1e7e2f9
一、ElasticSearch 安裝
1.elasticsearch 單節點安裝
- 3.226 機器上操作:
1、yum localinstall elasticsearch-7.17.5-x86_64.rpm -y
2、cd /etc/elasticsearch
3、備份一下 elasticsearch 預設配置檔案
cp elasticsearch.yml{,.bak}
# systemctl cat elasticsearch 有興趣可以查一下elasticsearch的啟動資訊
4、修改 elasticsearch 配置檔案
egrep -v "^#|^$" elasticsearch.yml
cluster.name: chinaedu-elk
node.name: chinaedu-elk226
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 172.16.3.226
http.port: 9200
discovery.seed_hosts: ["172.16.3.226"]
相關參數說明:
cluster.name:
叢集名稱,若不指定,則預設是"elasticsearch",⽇志⽂件的字首也是叢集名稱。
node.name:
指定節點的名稱,可以⾃定義,推薦使⽤目前的主機名,要求叢集唯⼀。
path.data:
資料路徑。
path.logs:
⽇志路徑
network.host:
ES服務監聽的IP位址
http.port:
ES服務對外暴露的端口
discovery.seed_hosts:
服務發現的主機清單,對于單點部署⽽⾔,主機清單和"network.host"字段配置相同
即可。
5、啟動 elasticsearch
systemctl daemon-reload
systemctl start elasticsearch
2.elasticsearch 分布式叢集安裝
- 3.226 操作
- 在這需要注意所有主機都安裝一下
elasticsearch
yum localinstall elasticsearch-7.17.5-x86_64.rpm -y
1、修改 elasticsearch 配置
cp /etc/elasticsearch/elasticsearch.yml{,.bak}
egrep -v "^#|^$" /etc/elasticsearch/elasticsearch.yml
cluster.name: chinaedu-elk
node.name: elk226
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200
discovery.seed_hosts: ["172.16.3.226","172.16.3.227","172.16.3.228"]
cluster.initial_master_nodes: ["172.16.3.226","172.16.3.227","172.16.3.228"]
溫馨提示:
"node.name"各個節點配置要區厘清楚,建議寫對應的主機名稱。
2、将 3.226 上的elasticsearch配置檔案同步到其他主機
scp /etc/elasticsearch/elasticsearch.yml [email protected]:/etc/elasticsearch/
scp /etc/elasticsearch/elasticsearch.yml [email protected]:/etc/elasticsearch/
3、3.227 配置
...
node.name: elk227
4、3.228 配置
...
node.name: elk228
5、所有節點啟用 elasticsearch
# 在啟動之前先删除 elasticsearch 生成的資料
rm -rf /var/{log,lib}/elasticsearch/* /tmp/*
systemctl daemon-reload
systemctl start elasticsearch
6、啟動完成後可以驗證一下elasticsearch 是否正常
curl 127.0.0.1:9200
curl 127.0.0.1:9200/_cat/nodes?v
3.elasticsearch 配置身份認證
elasticsearch7 中開始免費了賬号密碼認證功能,下面是xpack方式開啟叢集密碼認證
1、在es的任一節點下生成p12檔案,在es目錄下執行指令
/usr/share/elasticsearch/bin/elasticsearch-certutil ca -out /etc/elasticsearch/cert/elastic-certificates.p12 -pass ""
2、生成p12檔案後,将p12檔案複制到其他節點的機器中,盡量保持p12的目錄路徑一緻
scp -r /etc/elasticsearch/cert/ [email protected]:/etc/elasticsearch/cert/
scp -r /etc/elasticsearch/cert/ [email protected]:/etc/elasticsearch/cert/
3、所有主機修改 elastic-certificates.p12 權限以及屬組
chown root.elasticsearch /etc/elasticsearch/cert/ -R && chmod 660 /etc/elasticsearch/cert/*
4、在所有節點的es config 目錄下的elasticsearch.yml 檔案新增如下配置(注意p12檔案的目錄路徑):
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: p12檔案的絕對目錄路徑
xpack.security.transport.ssl.truststore.path: p12檔案的絕對目錄路徑
5、重新開機所有es
systemctl daemon-reload
systemctl restart elasticsearch
6、配置/自動生成密碼,es中預設有5個使用者
- 随機生成 -
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords auto
Changed password for user apm_system
PASSWORD apm_system = BKZDPuXJI2LCLkhueRLr
Changed password for user kibana_system
PASSWORD kibana_system = 8dOH6NAG6We7gtSMatgG
Changed password for user kibana
PASSWORD kibana = 8dOH6NAG6We7gtSMatgG
Changed password for user logstash_system
PASSWORD logstash_system = XrRbfLgxFYS8tvHPgaGh
Changed password for user beats_system
PASSWORD beats_system = DyOfdQ7XQWLcAtuZ99yV
Changed password for user remote_monitoring_user
PASSWORD remote_monitoring_user = i50tI88A8JS82i89n72A
Changed password for user elastic
PASSWORD elastic = wk9KI8qgCo5IDm2BLino
- 手動配置 -
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive
會提示每個密碼都要輸入兩遍
二、Elasticsearch cerebro 展示工具安裝
其他安裝方式可點選這裡
- 這裡示範 Kubernetes 安裝
kind: Deployment
apiVersion: apps/v1
metadata:
name: cerebro
labels:
k8s.kuboard.cn/name: cerebro
spec:
replicas: 1
selector:
matchLabels:
k8s.kuboard.cn/name: cerebro
template:
metadata:
labels:
k8s.kuboard.cn/name: cerebro
spec:
containers:
- name: cerebro
image: lmenezes/cerebro:latest
imagePullPolicy: IfNotPresent
restartPolicy: Always
revisionHistoryLimit: 10
---
kind: Service
apiVersion: v1
metadata:
name: cerebro-nginx
spec:
ports:
- name: yerc7y
protocol: TCP
port: 9000
targetPort: 9000
selector:
k8s.kuboard.cn/name: cerebro
type: NodePort
三、Kibana 安裝
1、yum localinstall kibana-7.17.5-x86_64.rpm -y
2、cp /etc/kibana/kibana.yml{,.bak}
3、修改 Kibana 配置
egrep -v "^*#|^$" kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "8dOH6NAG6We7gtSMatgG"
i18n.locale: "zh-CN"
4、啟動 Kibana
systemctl daemon-reload
systemctl start kibana.service
- 浏覽器通路Kibana: http://172.16.4.184:5601/
四、Filebeat 安裝(EFK 架構)
1.Filebeat 的基礎使用:
- https://blog.csdn.net/qq_43164571/article/details/126538709
2.filebeat 收集 nginx
2.1.安裝 nginx
1、配置 Nginx Yum源
yum install yum-utils -y
cat > /etc/yum.repos.d/nginx.repo << 'EOF'
[nginx-stable]
name=nginx stable repo
baseurl=http://nginx.org/packages/centos/$releasever/$basearch/
gpgcheck=1
enabled=1
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true
[nginx-mainline]
name=nginx mainline repo
baseurl=http://nginx.org/packages/mainline/centos/$releasever/$basearch/
gpgcheck=1
enabled=0
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true
EOF
2、安裝 Nginx
yum-config-manager --enable nginx-mainline
yum install nginx -y
3、啟動 Nginx
systemctl start nginx
2.2.安裝 filebeat
yum -y localinstall filebeat-7.17.5-x86_64.rpm
cp /etc/filebeat/filebeat.yml{,.bak}
2.2.1.收集 nginx 原生日志,推送給 es
2.2.2.1.filebeat 收集單個檔案
cat /etc/filebeat/filebeat.yml # 收集單個日志
filebeat.inputs:
- type: filestream
enabled: true # 是否啟用目前輸入類型,預設為true
id: my-filestream-id
paths:
- /var/log/nginx/*.log
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "nginx-access-%{+yyyy.MM.dd}"
setup.ilm.enabled: false # 關閉索引生命周期
setup.template.enabled: false # 允許自動生成index模闆
setup.template.overwrite: true # 如果存在子產品則覆寫
systemctl start filebeat.service
- 通路幾次 Nginx服務産生一些日志;;;
curl 127.0.0.1
檢視 elasticsearch 是否有nginx-access索引;
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "nginx-access"
2.2.2.2.filebeat 收集多個檔案
cat /etc/filebeat/filebeat.yml # 收集多個日志
filebeat.inputs:
- type: filestream
enabled: true # 是否啟用目前輸入類型,預設為true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
tags: ["access"] # 建立 tags 字段可以用于判斷
- type: filestream
enabled: true # 是否啟用目前輸入類型,預設為true
id: error-nginx-id
paths:
- /var/log/nginx/error.log
tags: ["error"] # 建立 tags 字段可以用于判斷
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
indices:
- index: "nginx-access-%{+yyyy.MM.dd}"
when.contains:
tags: "access"
- index: "nginx-error-%{+yyyy.MM.dd}"
when.contains:
tags: "error"
setup.ilm.enabled: false # 關閉索引生命周期
setup.template.enabled: false # 允許自動生成index模闆
setup.template.overwrite: true # 如果存在子產品則覆寫
systemctl start filebeat.service
- 通路幾次 Nginx服務産生一些日志;;;
curl 127.0.0.1
檢視 elasticsearch 是否有nginx-access、nginx-error索引;
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | egrep "nginx-access|nginx-error"
注意事項:
7.17.5版本可能遇到的問題:
(1)input源配置⼀旦超過4個,寫⼊ES時,就可能會複現出部分資料⽆法寫⼊的問題;
有兩種解決⽅案:
⽅案⼀: 拆成多個filebeat執行個體。運⾏多個filebeat執行個體時需要指定資料路徑"--path.data"。
filebeat -e -c ~/config/23-systemLog-to-es.yml --path.data /tmp/filebeat
⽅案⼆: ⽇志聚合思路解決問題。
1)部署服務
yum -y install rsyslog
2)修改配置⽂件
vim /etc/rsyslog.conf
...
$ModLoad imtcp
$InputTCPServerRun 514
...
*.* /var/log/oldboyedu.log
3)重新開機服務并測試
systemctl restart rsyslog
logger "1111"
2.2.2.3.kibana 展示 nginx 日志

2.2.2.收集 nginx json 日志,推送給 es
1、修改 nginx 輸出格式
vim /etc/nginx/nginx.conf
log_format oldboyedu_nginx_json '{"@timestamp":"$time_iso8601",'
'"host":"$server_addr",'
'"clientip":"$remote_addr",'
'"SendBytes":$body_bytes_sent,'
'"responsetime":$request_time,'
'"upstreamtime":"$upstream_response_time",'
'"upstreamhost":"$upstream_addr",'
'"http_host":"$host",'
'"uri":"$uri",'
'"domain":"$host",'
'"xff":"$http_x_forwarded_for",'
'"referer":"$http_referer",'
'"tcp_xff":"$proxy_protocol_addr",'
'"http_user_agent":"$http_user_agent",'
'"status":"$status"}';
access_log /var/log/nginx/access.log oldboyedu_nginx_json;
2、定義Filebeat配置檔案識别json格式
cat /etc/filebeat/filebeat.yaml
filebeat.inputs:
- type: filestream
enabled: true
id: access-nginx-json-id
paths:
- /var/log/nginx/access.log
tags: ["access"]
# 以JSON格式解析message字段的内容
parsers:
- ndjson:
keys_under_root: true
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "nginx-access-json-%{+yyyy.MM.dd}"
3、啟動 Filebeat
4、查一下 elasticsearch 索引是否存在
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "nginx-access-json"
2.2.3.使用内置子產品收集 nginx 日志 – 不好用,生産不建議使用…
1、還原 Nginx 日志預設配置;;;
2、cat /etc/filebeat/filebeat.yml
filebeat.config.modules:
# 指定子產品配置檔案路徑,${path.config} 代表 /etc/filebeat
path: ${path.config}/modules.d/nginx.yml
# 是否開啟熱加載功能
reload.enabled: true
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "nginx-access-modlues-%{+yyyy.MM.dd}"
setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true
3、filebeat -c filebeat.yml modules list # 檢視支援的子產品
4、filebeat -c filebeat.yml modules enable nginx # 啟用 Nginx 子產品
# 5、filebeat -c filebeat.yml modules disable nginx # 禁用 Nginx 子產品
6、修改 nginx 子產品配置
egrep -v "^*#|^$" /etc/filebeat/modules.d/nginx.yml
- module: nginx
access:
enabled: true
var.paths: ["/var/log/nginx/access.log"]
error:
enabled: false
var.paths: ["/var/log/nginx/error.log"]
ingress_controller:
enabled: false
7、啟動Filebeat
8、查一下 elasticsearch 索引是否存在
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "nginx-access-modlues"
2.2.4.收集 nginx 指定字段資訊,忽略其他
1、cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: filestream
enabled: true # 是否啟用目前輸入類型,預設為true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
tags: ["access"] # 建立 tags 字段可以用于判斷
- type: filestream
enabled: true # 是否啟用目前輸入類型,預設為true
id: error-nginx-id
paths:
- /var/log/nginx/error.log
tags: ["error"] # 建立 tags 字段可以用于判斷
include_lines: ['\[error\]'] # 收集包含[error]字段的資訊
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
indices:
- index: "nginx-access-%{+yyyy.MM.dd}"
when.contains:
tags: "access"
- index: "nginx-error-%{+yyyy.MM.dd}"
when.contains:
tags: "error"
setup.ilm.enabled: false # 關閉索引生命周期
setup.template.enabled: false # 允許自動生成index模闆
setup.template.overwrite: true # 如果存在子產品則覆寫
2、啟動filebeat
3.filebeat 收集 tomcat
3.1.使用内置子產品收集 tomcat 日志 – 不好用,生産不建議使用…
1、這裡安裝 tomcat 步驟忽略
2、配置 Tomcat beat檔案
egrep -v "^*#|^$" /etc/filebeat/filebeat.yml
filebeat.config.modules:
path: ${path.config}/modules.d/tomcat.yml
reload.enabled: true
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "tomcat-modlues-%{+yyyy.MM.dd}"
setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true
3、filebeat -c filebeat.yml modules list # 檢視支援的子產品
4、filebeat -c filebeat.yml modules enable tomcat # 啟用 tomcat 子產品
# 5、filebeat -c filebeat.yml modules disable tomcat # 禁用 tomcat 子產品
6、修改 Tomcat 子產品配置
egrep -v "^*#|^$" /etc/filebeat/modules.d/tomcat.yml
- module: tomcat
log:
enabled: true
var.input: file
var.paths:
- /data/logs/tomcat/catalina.out
7、啟動 Filebea
8、查一下 elasticsearch 索引是否存在
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "tomcat-modlues"
3.2.收集 tomcat 原生日志
1、egrep -v "^*#|^$" /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
tags: ["catalina"]
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "catalina.out-tomcat-%{+yyyy.MM.dd}"
setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true
2、啟動 Filebeat
systemctl enable filebeat
3、查一下 elasticsearch 索引是否存在
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "catalina.out-tomcat"
3.3.收集 tomcat json 日志
- 這裡就不做測試了,原理跟Nginx json是一樣的。
3.4.收集 tomcat 多行比對
1、修改 server.xml 模拟 tomcat 報錯
164 <Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
165 prefix="localhost_access_log" suffix=".txt"
166 pattern="%h %l %u %t "%r" %s %b" />
167
168 </Host111111> # 在/Host後面新增一些内容
2、多啟動幾次 tomcat 生成一些報錯日志,然後将 server.xml 配置還原,再起啟動
3、egrep -v "^*#|^$" /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
tags: ["catalina"]
parsers:
- multiline:
# 指定多行比對的類型,可選值為"pattern","count"
type: pattern
# 指定比對模式,比對以2個數字開頭的
pattern: '^\d{2}'
# 下面兩個參數,參考官方架構圖即可;
# https://www.elastic.co/guide/en/beats/filebeat/7.17/multiline-examples.html
negate: true
match: after
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "catalina.out-error-%{+yyyy.MM.dd}"
setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true
4.filebeat 收集 nginx 日志儲存到本地
1、cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: filestream
enabled: true
paths:
- /var/log/nginx/access.log
tags: ["firewalld"]
output.file:
# 檔案儲存的路徑
path: "/tmp/filebeat"
# 本地儲存的檔案名字
filename: filebeat-nginx-access.log
# 指定檔案的滾動大小,預設為20M
rotate_every_kb: 102400
# 指定儲存檔案個數,預設是7個,有效值默為2-1024個
number_of_files: 7
# 指定檔案的權限
permissions: 0600
2、啟動Filebeat
ll /tmp/filebeat/
總用量 8
-rw------- 1 root root 5209 8月 26 15:25 filebeat-nginx-access.log
五、Logstash 安裝(ELFK 架構)
1.單節點/分布式叢集安裝logstash
1、安裝logstash
yum localinstall logstash-7.17.5-x86_64.rpm -y
2、建立軟連接配接,在全局下可以執行logstash指令
ln -sv /usr/share/logstash/bin/logstash /usr/local/bin
2.修改 logstash 的配置檔案
(1)編寫配置⽂件
cat > conf.d/01-stdin-to-stdout.conf <<'EOF'
input {
stdin {}
}
output {
stdout {}
}
EOF
(2)檢查配置⽂件文法
logstash -tf conf.d/01-stdin-to-stdout.conf
(3)啟動logstash執行個體
logstash -f conf.d/01-stdin-to-stdout.conf
3.logstash filter grok插件根據正則取出想要的字段
注釋: Nginx 輸出日志的格式:
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'"$status" "$body_bytes_sent" "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" '
'"$request_length" "$request_time" '
'"$host" "$upstream_addr" "$upstream_status" '
'"$upstream_response_length" "$upstream_response_time"';
access_log /var/log/nginx/access.log main;
(1)filebeat配置:
filebeat.inputs:
- type: filestream
enabled: true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
fields:
type_index: nginx-access
fields_under_root: true
output.logstash:
hosts: ["172.16.3.226:5044","172.16.3.227:5044","172.16.3.228:5044"]
(2)logstash配置:
input {
beats {
port => 5044
}
}
filter {
grok {
# 參考文檔: https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html
# 正則模式可參考: https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/grok-patterns
match => {
"message" => '%{IP:client} - (%{USERNAME:user}|-) \[%{HTTPDATE:timestamp}\] "%{WORD:request_verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" "%{NUMBER:status}" "%{NUMBER:bytes}" "(?:%{URI:referrer}|-)" "?(%{DATA:user_agent}|-)" "?(%{DATA:http_x_forwarded_for}|-)" "%{NUMBER:request_length}" "?(%{BASE10NUM:request_time}|-)" "%{HOSTNAME:hostname}" "%{NOTSPACE:upstream_addr}" "(%{NUMBER:upstream_status}|-)" "(%{NUMBER:upstream_response_length}|-)" "?(%{BASE10NUM:upstream_response_time}|-)"'
}
}
mutate {
# 參考文檔:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-mutate.html
# 将指定字段轉換成相應對資料類型.
convert => [
"bytes", "integer", # 轉換成int類型,這樣就可以對字段進行算術運算,如果不轉換則預設是字元串類型。
"request_time", "integer",
"upstream_connect_time", "integer",
# "[geoip][coordinates]", "float",
"upstream_response_time", "integer",
"request_length", "integer",
"upstream_response_length", "integer",
"response", "integer",
"upstream_status", "integer"
]
}
mutate {
gsub => [
"bytes", "-", "0",
"request_time", "-", "0",
"upstream_connect_time", "-", "0",
"upstream_response_time", "-", "0",
"request_length", "-", "0",
"upstream_response_length", "-", "0",
"upstream_status", "-", "0"
]
}
mutate {
# 删除不要的字段
remove_field => [ "message","@version","agent","ecs","tags","input" ]
}
}
output {
#stdout {}
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
4.logstash filter date插件修改寫入時間
input {
beats {
port => 5044
}
}
filter {
grok {
match => {
"message" => '%{IP:client} - (%{USERNAME:user}|-) \[%{HTTPDATE:timestamp}\] "%{WORD:request_verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" "%{NUMBER:status}" "%{NUMBER:bytes}" "(?:%{URI:referrer}|-)" "?(%{DATA:user_agent}|-)" "?(%{DATA:http_x_forwarded_for}|-)" "%{NUMBER:request_length}" "?(%{BASE10NUM:request_time}|-)" "%{HOSTNAME:hostname}" "%{NOTSPACE:upstream_addr}" "%{NUMBER:upstream_status}" "%{NUMBER:upstream_response_length}" "?(%{BASE10NUM:upstream_response_time}|-)"'
}
}
mutate {
# 參考文檔:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-mutate.html
# 将指定字段轉換成相應對資料類型.
convert => [
"bytes", "integer",
"request_time", "integer",
"upstream_connect_time", "integer",
# "[geoip][coordinates]", "float",
"upstream_response_time", "integer",
"request_length", "integer",
"upstream_response_length", "integer",
"response", "integer",
"upstream_status", "integer"
]
}
mutate {
gsub => [
"bytes", "-", "0",
"request_time", "-", "0",
"upstream_connect_time", "-", "0",
"upstream_response_time", "-", "0",
"request_length", "-", "0",
"upstream_response_length", "-", "0",
"upstream_status", "-", "0"
]
}
mutate {
# 删除不要的字段
remove_field => [ "message","@version","agent","ecs","tags","input" ]
}
#參考文檔: https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-date.html
date {
# 比對時間字段并解析"timestamp"
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
output {
#stdout {}
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
5.filebeat 收集 nginx,tomcat日志推送給logstash,logstash發送es
(1)Filebeat配置:
filebeat.inputs:
- type: filestream
enabled: true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
fields:
type_index: nginx-access # 定義索引名稱
fields_under_root: true # 把fields設定為頂級字段,否則elasticsearch無法識别。
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
fields:
type_index: catalina-access # 定義索引名稱
fields_under_root: true # 把fields設定為頂級字段,否則elasticsearch無法識别。
output.logstash:
hosts: ["172.16.3.226:5044","172.16.3.227:5044","172.16.3.228:5044"]
(2)logstash配置:
input {
beats {
port => 5044
}
}
output {
# stdout {}
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}" # [type_index] 擷取Filebeat設定的名稱
user => "elastic"
password => "chinaedu"
}
}
六、Kibana 自定義 dashboard
1.統計PV(名額)
Page View(簡稱:"PV")
⻚⾯通路或點選量。
kibana界⾯⿏标依次點選如下:
(1)菜單欄;
(2)dashboards
(3)建立新的儀表闆
(4)建立可視化
(5)選擇名額
(6)選擇索引模式(例如"nginx-access-*")
(7)名額欄中選擇:
選擇函數:計數
顯示名稱: 空
(8)儲存到庫
标題:lms-saas 總通路量
2.統計用戶端IP(名額)
用戶端IP:
通常指的是通路Web伺服器的用戶端IP位址,但要注意,用戶端IP數量并不難代表UV。
kibana界⾯⿏标依次點選如下:
(1)菜單欄;
(2)dashboards
(3)建立新的儀表闆
(4)建立可視化
(5)名額
(6)選擇索引模式(例如"nginx-access-*")
(7)名額欄中選擇:
選擇函數: 唯⼀計數
選擇字段: clientip.keyword
顯示名稱: 空
(8)儲存到庫
标題:lms-saas IP
3.統計web下載下傳帶寬(名額)
帶寬:
統計nginx傳回給用戶端⽂件⼤⼩的字段進⾏累計求和。
kibana界⾯⿏标依次點選如下:
(1)菜單欄;
(2)dashboards
(3)建立新的儀表闆
(4)建立可視化
(5)名額
(6)選擇索引模式(例如"nginx-access-*")
(7)名額欄中選擇:
選擇函數: 求和
選擇字段: bytes
顯示名稱: 空
值格式:位元組(1024)
(8)儲存到庫
标題:lms-saas 總流量
4.通路頁面統計(水準條形圖)
通路資源統計:
對URI的通路次數統計。
kibana界⾯⿏标依次點選如下:
(1)菜單欄;
(2)dashboards
(3)建立新的儀表闆
(4)建立可視化
(5)⽔平條形圖
(6)選擇索引模式(例如"nginx-access-*")
(7)"垂直軸"
選擇函數:排名最前值
字段: request.keyword
值數目:5
排名依據:通路量
排名方向:降序
進階:取消"将其他值分為其他"
顯示名稱: 空
(8)"水準軸"
聚合: 計數
顯示名稱: 空
5.IP的Top 5統計(餅圖)
IP的TopN統計:
統計通路量的用戶端IP最⼤的是誰。
kibana界⾯⿏标依次點選如下:
(1)菜單欄;
(2)dashboards
(3)建立新的儀表闆
(4)建立可視化
(5)餅圖
(6)切片依據:
選擇函數:排名最前
選擇字段:client.keyword
進階:取消"将其他值分為其他"
顯示名稱: 空
(7)大小調整依據:
選擇函數:計數
(8)儲存到庫:
标題:lms-saas 用戶端IP top5
6.統計後端IP服務通路高的Top 5(圓環圖)
IP的TopN統計:
統計通路量的用戶端IP最⼤的是誰。
kibana界⾯⿏标依次點選如下:
(1)菜單欄;
(2)dashboards
(3)建立新的儀表闆
(4)建立可視化
(5)圓環圖
(6)切片依據:
選擇函數:排名最前值
選擇字段:upstream_addr.keyword
進階:取消"将其他值分為其他"
顯示名稱: 空
(7)大小調整依據:
選擇函數:計數
(8)儲存到庫:
标題:lms-saas upstream Top5
7.最終效果圖
七、Kafka部署(ELFK架構配合Kafka)
- 注意:這裡我是單獨部署的zookeeper,沒有用kafka内置zookeeper。
- 如果想用 kafka 内置的zookeeper則可以參考這篇文章
1.kafka 單節點部署
1.1.zookeeper 單節點
(1)解壓 zookeeper 軟體包
tar -xf jdk-8u333-linux-x64.tar.gz -C /usr/local/
tar -xf apache-zookeeper-3.8.0-bin.tar.gz -C /usr/local/
(2)建立環境變量
cat >> /etc/profile << 'EOF'
export JAVA_HOME=/usr/local/jdk1.8.0_333
export PATH=$PATH:$JAVA_HOME/bin
export ZK_HOME=/usr/local/apache-zookeeper-3.8.0-bin/
export PATH=$PATH:$ZK_HOME/bin
EOF
source /etc/profile
(3)建立zookeeper配置檔案
cp /usr/local/apache-zookeeper-3.8.0-bin/conf/{zoo_sample.cfg,zoo.cfg}
egrep -v "^#|^$" /usr/local/apache-zookeeper-3.8.0-bin/conf/zoo.cfg
dataDir=/tmp/zookeeper
dataLogDir=/var/log/zookeeper
clientPort=2181
tickTime=2000
initLimit=10
syncLimit=5
(4)啟動zookeeper節點
zkServer.sh start
zkServer.sh status # 檢視zk服務的狀态資訊
zkServer.sh stop
zkServer.sh restart
zookeeper配置檔案解釋:
dataDir ZK資料存放目錄。.
dataLogDir ZK日志存放目錄。
clientPort 用戶端連接配接ZK服務的端口。
tickTime ZK伺服器之間或用戶端與伺服器之間維持心跳的時間間隔。
initLimit 允許follower(相對于Leaderer言的“用戶端”)連接配接并同步到Leader的初始化連接配接時間,以tickTime為機關。當初始化連接配接時間超過該值,則表示連接配接失敗。
syncLimit Leader與Follower之間發送消息時,請求和應答時間⻓度。如果follower在設定時間内不能與leader通信,那麼此follower将會被丢棄。
1.2.kafka 單節點
(1)解壓 kafka 軟體包
tar zxf kafka_2.12-3.2.1.tgz -C /usr/local/
(2)配置環境變量
cat >> /etc/profile << 'EOF'
export KAFKA_HOME=/usr/local/kafka_2.12-3.2.1
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile
(3)修改kafka配置檔案
cp /usr/local/kafka_2.12-3.2.1/config/server.properties{,.bak}
egrep -v "^#|^$" /usr/local/kafka_2.12-3.2.1/config/server.properties
broker.id=226
listeners=PLAINTEXT://172.16.3.226:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.3.226:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
(4)啟動kafka
kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.2.1/config/server.properties
kafka-server-stop.sh # 關閉Kafka服務
(5)驗證kafka節點,是否正常工作
1.啟動生産者
kafka-console-producer.sh --topic chinaedu-linux80 --bootstrap-server 172.16.3.226:9092
>AAAAAAAA
>BBBBBBB
>CCCCCCCC
2、啟動消費者
kafka-console-consumer.sh --topic chinaedu-linux80 --bootstrap-server 172.16.3.226:9092 --from-beginning
AAAAAAAA
BBBBBBB
CCCCCCCC
溫馨提示:
"--topic":要生成消息的主題id。
"--bootstrap-server":指定kafka節點的位址跟端口
"--from-beginning":代表從該topic的最開始位置讀取資料,若不加該參數,則預設從topic的末尾讀取。
kafka配置檔案解釋:
broker.id 每個server需要單獨配置broker id,如果不配置系統會自動配置。需要和上一步ID一緻
listeners 監聽位址,格式PLAINTEXT://IP:端口。
num.network.threads 接收和發送網絡資訊的線程數。
num.io.threads 伺服器用于處理請求的線程數,其中可能包括磁盤I/O。
socket.send.buffer.bytes 套接字伺服器使用的發送緩沖區(SO_SNDBUF)
socket.receive.buffer.bytes 套接字伺服器使用的接收緩沖區(SO_RCVBUF)
socket.request.max.bytes 套接字伺服器将接受的請求的最大大小(防止OOM)
log.dirs 日志檔案目錄。
num.partitions partition數量。
num.recovery.threads.per.data.dir 在啟動時恢複日志、關閉時刷盤日志每個資料目錄的線程的數量,預設1。
offsets.topic.replication.factor 偏移量話題的複制因子(設定更高保證可用),為了保證有效的複制,偏移話題的複制因子是可配置的,在偏移話題的第一次請求的時候可用的broker的數量至少為複制因子的大小,否則要麼話題建立失敗,要麼複制因子取可用broker的數量和配置複制因子的最小值。
log.retention.hours 日志檔案删除之前保留的時間(機關小時),預設168
log.segment.bytes 單個日志檔案的大小,預設1073741824
log.retention.check.interval.ms 檢查日志段以檢視是否可以根據保留政策删除它們的時間間隔。
zookeeper.connect ZK主機位址,如果zookeeper是叢集則以逗号隔開。
zookeeper.connection.timeout.ms 連接配接到Zookeeper的逾時時間。
1.3.filebeat 收集 nginx 日志發送給kafka,logstash消費kafka消息發送給es,Kibana最終展示
(1)filebeat配置
filebeat.inputs:
- type: filestream
enabled: true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
fields:
type_index: nginx-access
fields_under_root: true
output.kafka:
hosts: ["172.16.3.226:9092"]
topic: "log"
(2)logstash配置
input {
kafka {
codec => json
topics => ["log"]
group_id => "log"
consumer_threads => 8
bootstrap_servers => "172.16.3.226:9092"
}
}
output {
stdout {}
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
(3)Kibana展示,參考下圖:
2.kafka 分布式叢集部署
2.1.zookeeper 叢集部署
(1)解壓 zookeeper 軟體包
tar -xf jdk-8u333-linux-x64.tar.gz -C /usr/local/
tar -xf apache-zookeeper-3.8.0-bin.tar.gz -C /usr/local/
(2)建立環境變量
cat >> /etc/profile << 'EOF'
export JAVA_HOME=/usr/local/jdk1.8.0_333
export PATH=$PATH:$JAVA_HOME/bin
export ZK_HOME=/usr/local/apache-zookeeper-3.8.0-bin/
export PATH=$PATH:$ZK_HOME/bin
EOF
source /etc/profile
(3)建立zookeeper配置檔案
cp /usr/local/apache-zookeeper-3.8.0-bin/conf/{zoo_sample.cfg,zoo.cfg}
egrep -v "^#|^$" /usr/local/apache-zookeeper-3.8.0-bin/conf/zoo.cfg
dataDir=/tmp/zookeeper
dataLogDir=/var/log/zookeeper
clientPort=2181
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.16.3.226:2888:3888
server.2=172.16.3.227:2888:3888
server.3=172.16.3.228:2888:3888
(3)建立data、log目錄
mkdir -p /tmp/zookeeper /var/log/zookeeper
echo 1 > /tmp/zookeeper/myid # 每台 kafka 機器都要做成唯一的ID,3.226機器
echo 2 > /tmp/zookeeper/myid # 每台 kafka 機器都要做成唯一的ID,3.227機器
echo 3 > /tmp/zookeeper/myid # 每台 kafka 機器都要做成唯一的ID,3.228機器
(5)啟動zookeeper節點
zkServer.sh start
zkServer.sh status # 檢視zk服務的狀态資訊
zkServer.sh stop
zkServer.sh restart
2.2.kafka 叢集部署
(1)解壓 kafka 軟體包
tar zxf kafka_2.12-3.2.1.tgz -C /usr/local/
(2)配置環境變量
cat >> /etc/profile << 'EOF'
export KAFKA_HOME=/usr/local/kafka_2.12-3.2.1
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile
(3)修改kafka配置檔案
cp /usr/local/kafka_2.12-3.2.1/config/server.properties{,.bak}
egrep -v "^#|^$" /usr/local/kafka_2.12-3.2.1/config/server.properties
broker.id=226
listeners=PLAINTEXT://172.16.3.226:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.3.226:2181,172.16.3.227:2181,172.16.3.228:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
(4)227 配置
...
broker.id=227
listeners=PLAINTEXT://172.16.3.227:9092
(5)228 配置
...
broker.id=228
listeners=PLAINTEXT://172.16.3.228:9092
(6)啟動kafka
kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.2.1/config/server.properties
kafka-server-stop.sh # 關閉Kafka服務
(7)驗證kafka是否是叢集模式:
zkCli.sh ls /brokers/ids | grep "^\["
[226, 227, 228]
2.3.filebeat 收集 tomcat 日志發送給kafka,logstash消費kafka消息發送給es,Kibana最終展示
(1)filebeat配置
filebeat.inputs:
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
fields:
type_index: catalina-out
fields_under_root: true
output.kafka:
hosts: ["172.16.3.226:9092","172.16.3.227:9092","172.16.3.228:9092"]
topic: "log"
(2)logstash配置
input {
kafka {
codec => json
topics => ["log"]
group_id => "log"
consumer_threads => 8
bootstrap_servers => "172.16.3.226:9092,172.16.3.227:9092,172.16.3.228:9092"
}
}
output {
stdout {}
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
(3)Kibana展示,參考下圖:
2.4.filebeat收集 nginx,tomcat,mysql-slow日志發送 kafka,logstash grok 分析 nginx,發送給es,kibana展示
(1)filebeat配置檔案:
filebeat.inputs:
- type: filestream
enabled: true
id: nginx-access-id
paths:
- /var/log/nginx/access.log
fields:
type_index: nginx-access
fields_under_root: true
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
fields:
type_index: catalina-out
fields_under_root: true
- type: filestream
enabled: true
id: mysql-slowlog-id
paths:
- /data/mysql/logs/slowquery.log
fields:
type_index: mysql-slowlog
fields_under_root: true
parsers:
- multiline:
type: pattern
pattern: '^# Time: '
negate: true
match: after
output.kafka:
hosts: ["172.16.3.226:9092","172.16.3.227:9092","172.16.3.228:9092"]
topic: "log"
(2)logstash配置檔案:
input {
kafka {
codec => json
topics => ["log"]
group_id => "log"
consumer_threads => 8
bootstrap_servers => "172.16.3.226:9092,172.16.3.227:9092,172.16.3.228:9092"
}
}
filter {
if "nginx" in [type_index] {
grok {
match => ['message', '%{IPV4:remote_addr} - ?(%{DATA:remote_user}|-) \[%{HTTPDATE:timestamp}\] "%{WORD:http_method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" "%{NUMBER:response}" "(?:%{NUMBER:bytes}|-)" "(?:%{URI:referrer}|-)" "?(%{DATA:user_agent}|-)" "?(%{DATA:http_x_forwarded_for}|-)" "%{NUMBER:request_length}" "?(%{BASE10NUM:request_time}|-)" "%{HOSTNAME:hostname}" "%{NOTSPACE:upstream_addr}" "%{NUMBER:upstream_status}" "%{NUMBER:upstream_response_length}" "?(%{BASE10NUM:upstream_response_time}|-)"']
}
mutate {
gsub => [
"bytes", "-", "0",
"request_time", "-", "0",
"upstream_connect_time", "-", "0",
"upstream_response_time", "-", "0",
"request_length", "-", "0",
"upstream_response_length", "-", "0",
"upstream_status", "-", "0"
]
}
mutate {
convert => [
"bytes", "integer",
"request_time", "integer",
"upstream_connect_time", "integer",
# "[geoip][coordinates]", "float",
"upstream_response_time", "integer",
"request_length", "integer",
"upstream_response_length", "integer",
"response", "integer",
"upstream_status", "integer"
]
}
mutate {
remove_field => [ "msg" , "message" ]
}
}
}
output {
# stdout {}
if "nginx" in [type_index] {
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "logstash-%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
else {
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
}
(3)通過Kibana檢視es是否收集到日志