filebeat+kafka+logstash部署及配置
工作上有個搭建ELK平台的需求,設計方案的時候,日志采集部分計劃使用filebeat+kafka+logstash的架構。終端使用filebeat進行日志的簡單采集,再通過kafka叢集送給logstash過濾和加工,再最後輸出給ElasticSearch。
一、搭建環境及軟體版本
伺服器
IP位址 | 作業系統 |
---|---|
192.168.241.11 | CentOs7-1804 |
192.168.241.12 | CentOs7-1611 |
192.168.241.13 | CentOs7-1804 |
JAVA JDK版本:1.8.0_151
Filebeat版本:6.5.2
Kafka版本:2.12-2.1.0
Logstash版本:6.5.2
二、安裝Kafka叢集
請參閱《Kafka叢集及Kafka-Manager部署》
三、Filebeat安裝及配置
1、Filebeat下載下傳及安裝
官網下載下傳位址為:https://www.elastic.co/downloads/beats/filebeat
本文采用的是rpm安裝方式,下載下傳到的安裝包為:filebeat-6.5.2-x86_64.rpm。
将安裝包拷貝到伺服器的/tmp目錄,執行如下指令完成安裝。
rpm -ivh /tmp/filebeat-6.5.2-x86_64.rpm
2、Filebeat配置
安裝完成後,filebeat主程式目錄為/usr/share/filebeat,配置檔案目錄為/etc/filebeat,日志目錄為/var/log/filebeat。這些目錄通過檢視/usr/lib/systemd/system/filebeat.service檔案就可以知道。我們要關注的是/etc/filebeat/filebeat.yml檔案,我們需要修改其中的input和output配置。本文隻根據項目的實際需要簡單講解幾個配置項,所有的配置示例在/etc/filebeat/filebeat.reference.yml檔案中,需要了解的話可結合這個檔案和百度。
input配置(輸入配置):
filebeat的輸入配置在filebeat.inputs節點下。type為log,表明這是從檔案擷取輸入。我們需要将log類型下的enabled的值置為true,在paths中編輯需要擷取輸入的檔案,檔案可以是多個,也可以使用通配符。如下圖:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2csETQE10d0cVWxQmMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLxADO5EDOyQTM0EjMxgTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
output配置(輸出配置):
我們需要将讀取到的日志輸出到Kafka叢集中,而配制檔案中預設沒有Kafka叢集的輸出配置,是以需要在其中增加(關于Kafka叢集的輸出配置示例,可以在/etc/filebeat/filebeat.reference.yml檔案中檢視)。我們增加的Kafka叢集配置如下圖所示:
從上圖可以看到,首先要增加output.kafka節點,在此節點下配置enabled、hosts、topic這三個配置項。
更多配置詳情,請參考百度和官方文檔https://www.elastic.co/guide/en/beats/filebeat/current/index.html
四、Logstash安裝及配置
1、Logstash下載下傳及安裝
官方下載下傳位址為:https://www.elastic.co/downloads/logstash
本文采用的是rpm安裝方式,下載下傳到的安裝包為:logstash-6.5.2.rpm。
将安裝包拷貝到伺服器的/tmp目錄,執行如下指令完成安裝。
rpm -ivh /tmp/logstash-6.5.2.rpm
2、Logstash配置
安裝完成後,logstash主程式目錄為/usr/share/logstash,配置檔案目錄為/etc/logstash,日志目錄為/var/log/logstash。
Logstash的基礎配置檔案是/etc/logstash/logstash.yml,但是這個檔案我們不用動,我們隻需要将自己的輸入輸出配置寫到以“.conf”結尾的檔案裡,把這檔案放入/etc/logstash/conf.d目錄下,logstash就會讀取我們的配置。我編寫了一個檔案kafka.conf,放到/etc/logstash/conf.d,内容如下:
#輸入配置,一個input{}裡可以配置多個輸入源
input {
#kafka輸入源配置
kafka {
#kafka叢集位址
bootstrap_servers => ["192.168.241.11:9092,192.168.241.12:9092,192.168.241.13:9092"]
#從kafka中哪個topic讀取資料,這裡的topic名要與filebeat中使用的topic保持一緻
topics => ["filebeat-logstash"]
#這是kafka中的消費組者ID,預設值是“logstash”。kafka将消息發到每個消費者組中,同一個組中的消費者收到的資料不重複。例如有兩個消費者組G1、G2,G1中有成員A、B,G2中有成員C、D。kafka從輸入中收到了10條消息,會将這10條消息同時發送給G1和G2,A和B各會收到這10條消息中的一部分,他們收到消息的并集就是這10條消息,C和D同理。
group_id => "filebeat-logstash"
#kafka消費者組中每個消費者的ID,預設值是“logstash”
client_id => "logstashnode1"
#logstash的消費線程,一般一個線程對應kafka中的一個partition(分區),同一組logstash的consumer_threads之和應該不大于一個topic的partition,超過了就是資源的浪費,一般的建議是相等。
consumer_threads => 1
#由于beat傳輸資料給kafka叢集的時候,會附加很多tag,預設情況下,logstash就會将這串tag也認為是message的一部分。這樣不利于後期的資料處理。所有需要添加codec處理。得到原本的message資料。
codec => json
}
}
#輸出配置,這裡表示輸出到檔案
output {
file {
path => "/tmp/logstash.output"
}
}
配置的詳細解釋請參考注釋,更多配置詳情,請參考百度和官方文檔https://www.elastic.co/guide/en/logstash/current/index.html
五、啟動服務及驗證
根據上文完成部署後,分别啟動kafka叢集,filebeat,logstash,在filebeat所在伺服器上,往/tmp/input.test檔案中輸入内容,就會在logstash所在伺服器的/tmp/logstash.output檔案裡産生輸出。詳細驗證過程這裡就不贅述了。