天天看點

微服務架構師成長之路(三)ELK+Kafka

分布式日志收集ELK+Kafka

随着技術的發展,越來越多的項目從傳統的單體架構逐漸發展為微服務架構,雖然系統在一步一步的更新,但是面臨的挑戰也是越來越多,今天我們就來談談日志采集的問題。很明顯,在微服務架構中,服務往往都是叢集化部署,假設現在我們A項目有8070,8080,8090三個端口,如果用戶端調用A項目調不通該怎麼排查?答案肯定是派三個人,三個人分别去不同的端口找本地日志,這樣才能最快的解決問題,但是成本也是相當的高,而且做法也相當的Low,撇開這些不談,系統一直在運作那麼日志也一直在滾動,搜個日志頭差不多就秃一半了。

ELK+Kafka的安裝

E:elasticsearch:這是一個快速的搜尋引擎架構 https://www.elastic.co/cn/

L:logstash:相當于搬運工的角色,将日志從Kafka伺服器搬到elasticsearch https://www.elastic.co/cn/logstash

K:kibana:https://www.elastic.co/cn/kibana操作elastic資料的可視化界面

Kafka:一種高吞吐量的分布式釋出訂閱消息系統 http://kafka.apache.org/

ElasticSearch+Kibana安裝

1.去鏡像倉庫拉取指定版本的es和kibana

docker pull elasticsearch:6.7.2
docker pull kibana:6.7.2
           

2.運作es鏡像

docker run -it --name elasticsearch -d -p 9200:9200 -p 9300:9300 -p 5601:5601 elasticsearch:6.7.2

3.運作kibana鏡像,共用es的網絡

docker run -it -d -e ELASTICSEARCH_URL=http://127.0.0.1:9200 --name kibana --network=container:elasticsearch kibana:6.7.2
           

4.分别通路es和kibana位址

http://192.168.2.200:9200/ ES界面位址端口号為9200

http://192.168.2.200:5601/ Kibana界面位址端口号為5601

logstash的安裝

1.拉取鏡像,注意,版本一定要與上面的一緻

docker pull daocloud.io/library/logstash:6.7.2
           

2.啟動logstash鏡像,同時映射檔案夾

docker run -d --name logstash1 -v /Users/work/docker/logstash/:/etc/logstash/pipeline/  daocloud.io/library/logstash:6.7.2
           

3.docker exec -it logstash1 /bin/bash 指令進入容器内,然後下載下傳插件

這是jdbc插件
bin/logstash-plugin install logstash-input-jdbc
這是es插件
bin/logstash-plugin install logstash-output-elasticsearch
           

4.下載下傳mysql-connector-java的jar包,放在剛剛配置的映射/Users/work/docker/logstash/檔案夾中

5.編寫logstash的配置檔案logstash.config,也放在剛剛配置的映射/Users/work/docker/logstash/檔案夾中

input {
  kafka {
  	//輸入到kafka的位址和主題
    bootstrap_servers => "192.168.55.200:9092"
    topics => ["mylog"]
  }
}
output {
    stdout { codec => json_lines }
    //輸出到es的位址和索引
    elasticsearch {
       hosts => ["192.168.55.200:9200"]
       index => "mylog"
    }
}

           

6.在容器中修改logstash conf目錄下的logstash.yml,改為自己的es位址

xpack.monitoring.elasticsearch.url: http://192.168.55.200:9200
           

7.vi pipelines,修改完成後儲存重新開機logstash鏡像即可

- pipeline.id: log1
  path.config: "/etc/logstash/pipeline/logstash.conf"
           

SpringBoot整合ELK+Kafka

1.引入依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>
        <!-- springBoot內建kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
           

2.修改配置檔案

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
    username: root
    password: 1234
    driver-class-name: com.mysql.jdbc.Driver
#ES連接配接位址,注意:9200是外部通路位址,9300是内部通訊位址,叢集名稱需要與es頁面的名稱相同
  data:
    elasticsearch:
      cluster-name: docker-cluster
      cluster-nodes: 192.168.55.200:9300
  kafka:
    # kafka伺服器位址(可以多個)
    bootstrap-servers: 192.168.55.200:9092
    consumer:
      # 指定一個預設的組名
      group-id: kafka1
      # earliest:當各分區下有已送出的offset時,從送出的offset開始消費;無送出的offset時,從頭開始消費
      # latest:當各分區下有已送出的offset時,從送出的offset開始消費;無送出的offset時,消費新産生的該分區下的資料
      # none:topic各分區都存在已送出的offset時,從offset後開始消費;隻要有一個分區不存在已送出的offset,則抛出異常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 緩存容量
      buffer-memory: 524288
      # 伺服器位址
      bootstrap-servers: 192.168.55.200:9092
           

3.自定義測試接口,注意:發送主題需要與logstash監聽的主題一緻,接下來大家可以去kibana界面查詢資料啦

@GetMapping("/sendLogs")
    public String sendLogs()
    {
        log.info("調用發送日志接口");
        kafkaTemplate.send("mylog","logs-success:"+System.currentTimeMillis());
        return "success";
    }
           

ELK+Kafka總結

通過上面的學習,相信大家對這套分布式日志收集系統有了一定的了解。說白了,就是logstash是一個中間件,它訂閱到mq的主題拉取資料并再把資料推送給ES,通過Kibana可視化界面展示我們的ES資料。這樣就實作了我們日志高效的統一化管理,查日志統一到這個系統,既友善又快速。可能又有小夥伴會問了,為什麼要加上Kafka呢,我們以前都是用mysql的啊。首先,如果是傳統的logstash訂閱到mysql,那麼日志的過濾收集全部集中在伺服器上,消耗伺服器的資源或非常的大,調試和跟蹤也非常的困難;其次,如果是日志流量大的情況下,可能會存在資料丢失的問題;再者,日志量龐大的情況下,肯定要根據服務名稱劃分不同的log表,那我現在有50個服務,難不成你還弄50張表去存放每個服務對應的日志嗎,那就對應50個lostash,成本又高,還low。是以,ELK+kafka既可以實作日志的統一管理,高效查詢,又可以做到流量削峰,日志不丢失,在微服務系統中還是非常的有必要。