天天看點

filebeat+kafka+logstash部署及配置

filebeat+kafka+logstash部署及配置

工作上有個搭建ELK平台的需求,設計方案的時候,日志采集部分計劃使用filebeat+kafka+logstash的架構。終端使用filebeat進行日志的簡單采集,再通過kafka叢集送給logstash過濾和加工,再最後輸出給ElasticSearch。

一、搭建環境及軟體版本

伺服器

IP位址 作業系統
192.168.241.11 CentOs7-1804
192.168.241.12 CentOs7-1611
192.168.241.13 CentOs7-1804

JAVA JDK版本:1.8.0_151

Filebeat版本:6.5.2

Kafka版本:2.12-2.1.0

Logstash版本:6.5.2

二、安裝Kafka叢集

請參閱《Kafka叢集及Kafka-Manager部署》

三、Filebeat安裝及配置

1、Filebeat下載下傳及安裝

官網下載下傳位址為:https://www.elastic.co/downloads/beats/filebeat

本文采用的是rpm安裝方式,下載下傳到的安裝包為:filebeat-6.5.2-x86_64.rpm。

将安裝包拷貝到伺服器的/tmp目錄,執行如下指令完成安裝。

rpm -ivh /tmp/filebeat-6.5.2-x86_64.rpm
           

2、Filebeat配置

安裝完成後,filebeat主程式目錄為/usr/share/filebeat,配置檔案目錄為/etc/filebeat,日志目錄為/var/log/filebeat。這些目錄通過檢視/usr/lib/systemd/system/filebeat.service檔案就可以知道。我們要關注的是/etc/filebeat/filebeat.yml檔案,我們需要修改其中的input和output配置。本文隻根據項目的實際需要簡單講解幾個配置項,所有的配置示例在/etc/filebeat/filebeat.reference.yml檔案中,需要了解的話可結合這個檔案和百度。

input配置(輸入配置):

filebeat的輸入配置在filebeat.inputs節點下。type為log,表明這是從檔案擷取輸入。我們需要将log類型下的enabled的值置為true,在paths中編輯需要擷取輸入的檔案,檔案可以是多個,也可以使用通配符。如下圖:

filebeat+kafka+logstash部署及配置

output配置(輸出配置):

我們需要将讀取到的日志輸出到Kafka叢集中,而配制檔案中預設沒有Kafka叢集的輸出配置,是以需要在其中增加(關于Kafka叢集的輸出配置示例,可以在/etc/filebeat/filebeat.reference.yml檔案中檢視)。我們增加的Kafka叢集配置如下圖所示:

filebeat+kafka+logstash部署及配置

從上圖可以看到,首先要增加output.kafka節點,在此節點下配置enabled、hosts、topic這三個配置項。

更多配置詳情,請參考百度和官方文檔https://www.elastic.co/guide/en/beats/filebeat/current/index.html

四、Logstash安裝及配置

1、Logstash下載下傳及安裝

官方下載下傳位址為:https://www.elastic.co/downloads/logstash

本文采用的是rpm安裝方式,下載下傳到的安裝包為:logstash-6.5.2.rpm。

将安裝包拷貝到伺服器的/tmp目錄,執行如下指令完成安裝。

rpm -ivh /tmp/logstash-6.5.2.rpm
           

2、Logstash配置

安裝完成後,logstash主程式目錄為/usr/share/logstash,配置檔案目錄為/etc/logstash,日志目錄為/var/log/logstash。

Logstash的基礎配置檔案是/etc/logstash/logstash.yml,但是這個檔案我們不用動,我們隻需要将自己的輸入輸出配置寫到以“.conf”結尾的檔案裡,把這檔案放入/etc/logstash/conf.d目錄下,logstash就會讀取我們的配置。我編寫了一個檔案kafka.conf,放到/etc/logstash/conf.d,内容如下:

#輸入配置,一個input{}裡可以配置多個輸入源
input {
  #kafka輸入源配置
  kafka {
    #kafka叢集位址
    bootstrap_servers => ["192.168.241.11:9092,192.168.241.12:9092,192.168.241.13:9092"]
    #從kafka中哪個topic讀取資料,這裡的topic名要與filebeat中使用的topic保持一緻
    topics => ["filebeat-logstash"]

    #這是kafka中的消費組者ID,預設值是“logstash”。kafka将消息發到每個消費者組中,同一個組中的消費者收到的資料不重複。例如有兩個消費者組G1、G2,G1中有成員A、B,G2中有成員C、D。kafka從輸入中收到了10條消息,會将這10條消息同時發送給G1和G2,A和B各會收到這10條消息中的一部分,他們收到消息的并集就是這10條消息,C和D同理。
    group_id => "filebeat-logstash"
    #kafka消費者組中每個消費者的ID,預設值是“logstash”
    client_id => "logstashnode1"

    #logstash的消費線程,一般一個線程對應kafka中的一個partition(分區),同一組logstash的consumer_threads之和應該不大于一個topic的partition,超過了就是資源的浪費,一般的建議是相等。
    consumer_threads => 1
    #由于beat傳輸資料給kafka叢集的時候,會附加很多tag,預設情況下,logstash就會将這串tag也認為是message的一部分。這樣不利于後期的資料處理。所有需要添加codec處理。得到原本的message資料。
    codec => json
  }
}

#輸出配置,這裡表示輸出到檔案
output {
  file {
    path => "/tmp/logstash.output"
  }
}
           

配置的詳細解釋請參考注釋,更多配置詳情,請參考百度和官方文檔https://www.elastic.co/guide/en/logstash/current/index.html

五、啟動服務及驗證

根據上文完成部署後,分别啟動kafka叢集,filebeat,logstash,在filebeat所在伺服器上,往/tmp/input.test檔案中輸入内容,就會在logstash所在伺服器的/tmp/logstash.output檔案裡産生輸出。詳細驗證過程這裡就不贅述了。

繼續閱讀