ELK+kafka分布式日志采集分析
- 1 kafka的介紹
-
- 1.1 ELK+kafka的優點
- 1.2 部署架構
- 2 Elasticsearch的部署
- 3 kafka的部署(server3,server4,server5)
-
- 3.1 JDK環境
- 3.2 kafka的部署
- 3.3 zookeeper的部署
- 3.4 kafka叢集的使用
- 4 Filebeat的部署(server6)
- 5 Logstash的部署
- 6 Kibana的部署(server7)
- 7 搭建Cerebro Elasticsearch監控
1 kafka的介紹
kafka是一個釋出訂閱消息系統,由topic區分消息種類,每個topic中可以有多個partition,每個kafka叢集有一個多個broker伺服器組成,producer可以釋出消息到kafka中,consumer可以消費kafka中的資料。kafka就是生産者和消費者中間的一個暫存區,可以儲存一段時間的資料保證使用
zookeeper作為解決分布式一緻性問題的工具而被kafka依賴。而分布式模式,即去中心化的叢集模式,需要讓消費者知道現在有哪些生産者(對于消費者而言,kafka就是生産者)是可用的。如果沒了zk每次消費者在消費之前都去嘗試連接配接生産者測試下是否連接配接成功,效率就會變得很低
Kafka使用zk的分布式協調服務,将生産者,消費者,消息儲存(broker,用于存儲資訊,消息讀寫等)結合在一起。同時借助zk,kafka能夠将生産者,消費者和broker在内的所有元件在無狀态的條件下建立起生産者和消費者的訂閱關系,實作生産者的負載均衡
1.1 ELK+kafka的優點
單純使用EIK實作分布式日志收集缺點:
- 當産生日志的服務節點越來越多,Logstash也需要部署越來越多,擴充不好。 讀取IO檔案,可能會産生日志丢失
- 讀取檔案不是實時性,中間需要引入到Kafka,日志實時釋出到Kafka,Logstash訂閱并實時擷取消息
1.2 部署架構
實驗環境:關閉防火牆和selinux
軟體 | 主機 |
---|---|
es1 | server1 |
es2 | server2 |
kafka1 | server3 |
kafka2 | server4 |
kafka3,logstash | server5 |
nginx,filebeat | server6 |
kibana,Cerebro | server7 |
Filebeat->Logstash->Kafka->Logstash->Elasticsearch->Kibana

2 Elasticsearch的部署
- ES叢集前面已經部署過,參考部落格:Elasticsearch的部署
3 kafka的部署(server3,server4,server5)
kafka是一個消息隊列伺服器,kafka服務稱為broker(中間人),消息發送者稱為生産者,消息接受者稱為消費者,通過部署多個broker提供高可用的消息服務叢集,典型的是三個broker;消息以topic的方式發送到broker,消費者訂閱topic,實作按需取用的消費模式;建立topic需要指定replication-factor(複制數目,通常=broker數目);每個topic可能有多個分區,每個分區的消息内容不會重複。
3.1 JDK環境
(1) 安裝java環境:
rpm -ivh jdk-8u121-linux-x64.rpm
3.2 kafka的部署
(1)下載下傳kafka
wget http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz
(2)解壓并做軟連結
tar zxf kafka_2.12-2.8.0.tgz -C /usr/local/
cd /usr/local/
ln -s /usr/local/kafka_2.12-2.8.0/ /usr/local/kafka
(3)編輯kafka的
server.properties
配置檔案
broker.id=3
listeners=PLAINTEXT://172.25.12.3:9092
host.name=172.25.12.3
message.max.bytes=12695150
compression.codec=snappy
max.partition.fetch.bytes=4194304
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.25.12.3:2181,172.25.12.4:2181,172.25.12.5:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
(4)編輯kafka服務的啟動腳本
vim /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
#Type=simple
#Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/java/jdk-11.0.1/bin"
#User=root
#Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
3.3 zookeeper的部署
使用kafka自帶的zookeeper
(1)編輯kafka的
zookeeper.properties
配置檔案
dataDir=/usr/local/kafka/zookeeper/data
datalogDir=/usr/local/kafka/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.2=172.25.12.3:2888:3888
server.3=172.25.12.4:2888:3888
server.4=172.25.12.5:2888:3888
(2)對每個zookeeper叢集設定一個myid,必須與上面的zookeeper.properties配置檔案下面的server.xxx的ID對應,有幾個叢集就需要設定幾個ID
echo 2 > /usr/local/kafka/zookeeper/data/myid
(3)編輯zookeeper服務的啟動腳本
vim /etc/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper service
After=network.target
[Service]
#Type=simple
#Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/java/jdk-11.0.1/bin"
#User=root
#Group=root
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
3.4 kafka叢集的使用
(1)啟動kafka叢集
注意要先啟動zookeeper,再啟動kafka
systemctl enable --now zookeeper kafka
(2)檢視其群的角色:
netstat -nlpt | grep -E "2181|2888|3888"
有2888端口的是的是,目前server4節點為leader
(3)建立并檢視topic
[[email protected] ~]# cd /usr/local/kafka/bin/
## zookeeper指定其中一個節點即可,叢集之間會自動同
[[email protected] bin]# ./kafka-topics.sh --create --zookeeper 172.25.12.4:2181 --replication-factor 1 --partitions 1 --topic alia
Created topic alia.
[[email protected] bin]# ./kafka-topics.sh --list --zookeeper 172.25.12.4:2181
111http_log
__consumer_offsets
alia
[[email protected] bin]# ./kafka-topics.sh --describe --zookeeper 172.25.12.4:2181 --topic alia
Topic: alia TopicId: Wn2VYU5oS2G_tep4VQ0jTg PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: alia Partition: 0 Leader: 3 Replicas: 3 Isr: 3
(4)模拟生産者和消費者
- 啟動Producer并發送消息,執行如下腳本
[[email protected] bin]# ./kafka-console-producer.sh --broker-list 172.25.12.4:9092 --topic alia
>
- 另一個終端,啟動Consumer,并訂閱我們上面建立的名稱為alia的Topic中生産的消息,執行如下腳本
[[email protected] bin]# cd /usr/local/kafka/bin/
[[email protected] bin]# ./kafka-console-consumer.sh --bootstrap-server 172.25.12.4:9092 -from-beginning --topic alia
>
- 生産者寫入資料
- 從開始位置消費資料
此時kafka叢集已經搭建完成了
(5)删除toplic:
./kafka-topics.sh --delete --zookeeper 172.25.12.3:2181 --topic alia
4 Filebeat的部署(server6)
(1) Filebeat的部署參考部落格:Filebeat
(2)安裝nginx
(3)修改 Filebeat的配置檔案(實作資料采集後從filebeat——>Logstash)
- filebeat的配置檔案:
cat /etc/filebeat/filebeat.yml | grep -v "#"
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log ## 采集nginx的日志路徑
codec: 'plain'
fields:
topic: nginx-log
fields_under_root: true
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.kibana:
output.logstash:
hosts: ["172.25.12.5:5044"] ## logstash接收資料
codec: json
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
(4)啟動filebeat:systemctl start filebeat.service
5 Logstash的部署
(1)Logstash的部署
(2)資料從filebeat——>kafka
vim /etc/logstash/conf.d/o_kafka.conf
input {
beats {
port => 5044
codec => json
}
}
output {
kafka {
bootstrap_servers => "172.25.12.3:9092,172.25.12.4:9092,172.25.12.5:9092"
topic_id => "nginx-log"
codec => json
}
}
(3)資料從 kafka——>ES
tvim /etc/logstash/conf.d/o_es.conf
input {
kafka {
bootstrap_servers => "172.25.12.3:9092,172.25.12.4:9092,172.25.12.5:9092"
topics => ["http-log"]
}
}
output {
elasticsearch {
hosts => ["http://172.25.12.1:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
}
(4) logstash -f /etc/logstash/conf.d/ &
6 Kibana的部署(server7)
(1)參考部落格Kibana的部署
在server1上通路server6,日志資料同步更新
7 搭建Cerebro Elasticsearch監控
wget https://github.com/lmenezes/cerebro/releases/download/v0.8.1/cerebro-0.8.1.tgz
tar xzf cerebro-0.8.1.tgz
vim /etc/cerebro/application.conf
hosts = [
{
host = "http://172.25.12.1:9200"
name = "ELK Cluster"
},
# Example of host with authentication
#{
# host = "http://some-authenticated-host:9200"
# name = "Secured Cluster"
# auth = {
# username = "username"
# password = "secret-password"
# }
#}
]
啟動服務:
[[email protected] ~]# cerebro-0.8.1/bin/cerebro
[info] play.api.Play - Application started (Prod)
[info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000