分布式日志收集ELK+Kafka
随着技術的發展,越來越多的項目從傳統的單體架構逐漸發展為微服務架構,雖然系統在一步一步的更新,但是面臨的挑戰也是越來越多,今天我們就來談談日志采集的問題。很明顯,在微服務架構中,服務往往都是叢集化部署,假設現在我們A項目有8070,8080,8090三個端口,如果用戶端調用A項目調不通該怎麼排查?答案肯定是派三個人,三個人分别去不同的端口找本地日志,這樣才能最快的解決問題,但是成本也是相當的高,而且做法也相當的Low,撇開這些不談,系統一直在運作那麼日志也一直在滾動,搜個日志頭差不多就秃一半了。
ELK+Kafka的安裝
E:elasticsearch:這是一個快速的搜尋引擎架構 https://www.elastic.co/cn/
L:logstash:相當于搬運工的角色,将日志從Kafka伺服器搬到elasticsearch https://www.elastic.co/cn/logstash
K:kibana:https://www.elastic.co/cn/kibana操作elastic資料的可視化界面
Kafka:一種高吞吐量的分布式釋出訂閱消息系統 http://kafka.apache.org/
ElasticSearch+Kibana安裝
1.去鏡像倉庫拉取指定版本的es和kibana
docker pull elasticsearch:6.7.2
docker pull kibana:6.7.2
2.運作es鏡像
docker run -it --name elasticsearch -d -p 9200:9200 -p 9300:9300 -p 5601:5601 elasticsearch:6.7.2
3.運作kibana鏡像,共用es的網絡
docker run -it -d -e ELASTICSEARCH_URL=http://127.0.0.1:9200 --name kibana --network=container:elasticsearch kibana:6.7.2
4.分别通路es和kibana位址
http://192.168.2.200:9200/ ES界面位址端口号為9200
http://192.168.2.200:5601/ Kibana界面位址端口号為5601
logstash的安裝
1.拉取鏡像,注意,版本一定要與上面的一緻
docker pull daocloud.io/library/logstash:6.7.2
2.啟動logstash鏡像,同時映射檔案夾
docker run -d --name logstash1 -v /Users/work/docker/logstash/:/etc/logstash/pipeline/ daocloud.io/library/logstash:6.7.2
3.docker exec -it logstash1 /bin/bash 指令進入容器内,然後下載下傳插件
這是jdbc插件
bin/logstash-plugin install logstash-input-jdbc
這是es插件
bin/logstash-plugin install logstash-output-elasticsearch
4.下載下傳mysql-connector-java的jar包,放在剛剛配置的映射/Users/work/docker/logstash/檔案夾中
5.編寫logstash的配置檔案logstash.config,也放在剛剛配置的映射/Users/work/docker/logstash/檔案夾中
input {
kafka {
//輸入到kafka的位址和主題
bootstrap_servers => "192.168.55.200:9092"
topics => ["mylog"]
}
}
output {
stdout { codec => json_lines }
//輸出到es的位址和索引
elasticsearch {
hosts => ["192.168.55.200:9200"]
index => "mylog"
}
}
6.在容器中修改logstash conf目錄下的logstash.yml,改為自己的es位址
xpack.monitoring.elasticsearch.url: http://192.168.55.200:9200
7.vi pipelines,修改完成後儲存重新開機logstash鏡像即可
- pipeline.id: log1
path.config: "/etc/logstash/pipeline/logstash.conf"
SpringBoot整合ELK+Kafka
1.引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<!-- springBoot內建kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.修改配置檔案
spring:
datasource:
url: jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
username: root
password: 1234
driver-class-name: com.mysql.jdbc.Driver
#ES連接配接位址,注意:9200是外部通路位址,9300是内部通訊位址,叢集名稱需要與es頁面的名稱相同
data:
elasticsearch:
cluster-name: docker-cluster
cluster-nodes: 192.168.55.200:9300
kafka:
# kafka伺服器位址(可以多個)
bootstrap-servers: 192.168.55.200:9092
consumer:
# 指定一個預設的組名
group-id: kafka1
# earliest:當各分區下有已送出的offset時,從送出的offset開始消費;無送出的offset時,從頭開始消費
# latest:當各分區下有已送出的offset時,從送出的offset開始消費;無送出的offset時,消費新産生的該分區下的資料
# none:topic各分區都存在已送出的offset時,從offset後開始消費;隻要有一個分區不存在已送出的offset,則抛出異常
auto-offset-reset: earliest
# key/value的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# key/value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 批量抓取
batch-size: 65536
# 緩存容量
buffer-memory: 524288
# 伺服器位址
bootstrap-servers: 192.168.55.200:9092
3.自定義測試接口,注意:發送主題需要與logstash監聽的主題一緻,接下來大家可以去kibana界面查詢資料啦
@GetMapping("/sendLogs")
public String sendLogs()
{
log.info("調用發送日志接口");
kafkaTemplate.send("mylog","logs-success:"+System.currentTimeMillis());
return "success";
}
ELK+Kafka總結
通過上面的學習,相信大家對這套分布式日志收集系統有了一定的了解。說白了,就是logstash是一個中間件,它訂閱到mq的主題拉取資料并再把資料推送給ES,通過Kibana可視化界面展示我們的ES資料。這樣就實作了我們日志高效的統一化管理,查日志統一到這個系統,既友善又快速。可能又有小夥伴會問了,為什麼要加上Kafka呢,我們以前都是用mysql的啊。首先,如果是傳統的logstash訂閱到mysql,那麼日志的過濾收集全部集中在伺服器上,消耗伺服器的資源或非常的大,調試和跟蹤也非常的困難;其次,如果是日志流量大的情況下,可能會存在資料丢失的問題;再者,日志量龐大的情況下,肯定要根據服務名稱劃分不同的log表,那我現在有50個服務,難不成你還弄50張表去存放每個服務對應的日志嗎,那就對應50個lostash,成本又高,還low。是以,ELK+kafka既可以實作日志的統一管理,高效查詢,又可以做到流量削峰,日志不丢失,在微服務系統中還是非常的有必要。