本節書摘來自華章出版社《elk stack權威指南(第2版)》一書中的第2章,第2.1節,作者 饒琛琳 更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視。
第2章
插 件 配 置
插件是logstash最大的特色。各種不同的插件源源不斷地被創造出來,釋出到社群中供大家使用。本章會按照插件的類别,對一般場景下的一些常用插件做詳細的配置和用例介紹。本章介紹的插件包括:1)輸入插件。基于shipper端場景,主要介紹stdin、tcp、file等插件。2)編解碼插件。編解碼通常是會被遺忘的環節,但是運用好了,會大大提高工作效率,本節介紹最常用的json和multiline插件。3)過濾器插件。名為過濾器,其實各種資料裁剪和計算都可以在這類插件裡完成,是logstash最強大的一環。本節會詳細介紹grok、date、mutate、ruby、metrics等插件的妙用。4)輸出插件。logstash雖然經常跟elasticsearch并稱,但是作為一個日志傳輸架構,它其實可以輸出資料到各種不同的地方。比如graphite、hdfs、nagios等等。本章會介紹這些常用的輸出插件用法。
2.1 輸入插件
在“hello world”示例中,我們已經見到并介紹了logstash的運作流程和配置的基礎文法。從這章開始,我們就要逐一介紹logstash流程中比較常用的一些插件,并在介紹中針對其主要适用的場景、推薦的配置,作一些說明。
限于篇幅,接下來内容中,配置示例不一定能貼完整。請記住一個原則:logstash配置一定要有一個input和一個output。在示範過程中,如果沒有寫明input,預設就會使用“hello world”裡我們已經示範過的logstash-input-stdin,同理,沒有寫明的output就是logstash-output-stdout。
以上請讀者自明。
2.1.1 标準輸入
我們已經見過好幾個示例使用stdin了。這也應該是logstash裡最簡單和基礎的插件了。是以,在這段中,我們先介紹一些未來每個插件都會有的一些方法。
配置示例如下:
input {
stdin {
add_field => {"key"
=>"value"}
codec =>"plain"
tags => ["add"]
type =>"std"
}
}
用上面的新stdin設定重新運作一次最開始的hello
world示例。我建議大家把整段配置都寫入一個文本檔案,然後運作指令:bin/logstash -f stdin.conf。輸入“hello world”并回車後,你會在終端看到如下輸出:
{
"message"
=>"hello world",
"@version"
=>"1",
"@timestamp"
=>"2014-08-08t06:48:47.789z",
"type"
=>"std",
"tags"
=> [
[0] "add"
],
"key"
=>"value",
"host"
=>"raochenlindemacbook-air.local"
type和tags是logstash事件中兩個特殊的字段。通常來說,我們會在“輸入區段”中通過type來标記事件類型—我們肯定是提前能知道這個事件屬于什麼類型的。而tags則是在資料處理過程中,由具體的插件來添加或者删除的。
最常見的用法是像下面這樣:
type =>"web"
filter {
if [type] == "web" {
grok {
match => ["message",
%{combinedapachelog}]
}
output {
if "_grokparsefailure" in [tags]
nagios_nsca {
nagios_status =>"1"
} else {
elasticsearch {
看起來蠻複雜的,對吧?
繼續學習,你也可以寫出來的。
2.1.2 檔案輸入
分析網站通路日志應該是一個運維工程師最常見的工作了。是以我們先學習一下怎麼用logstash來處理日志檔案。
logstash使用一個名叫filewatch的ruby gem庫來監聽檔案變化。這個庫支援glob展開檔案路徑,而且會記錄一個叫.sincedb的資料庫檔案來跟蹤被監聽日志檔案的目前讀取位置。是以,不要擔心logstash會漏過你的資料。
sincedb檔案中記錄了每個被監聽的檔案的inode, major number, minor
number和pos。
file {
path => ["/var/log/*.log",
"/var/log/message"]
type =>"system"
start_position
=>"beginning"
有一些比較有用的配置項,可以用來指定filewatch庫的行為:
discover_interval:logstash每隔多久去檢查一次被監聽的path下是否有新檔案,預設值是15秒。
exclude:不想被監聽的檔案可以排除出去,這裡跟path一樣支援glob展開。
sincedb_path:如果你不想用預設的$home/.sincedb(windows平台上為%userprofile%\.sincedb,該變量預設值是:c:\windows\system32\config\systemprofile),可以通過這個配置定義sincedb檔案到其他位置。
sincedb_write_interval:logstash每隔多久寫一次sincedb檔案,預設是15秒。
stat_interval:logstash每隔多久檢查一次被監聽檔案狀态(是否有更新),預設是1秒。
start_position:logstash從什麼位置開始讀取檔案資料,預設是結束位置,也就是說,logstash程序會以類似tail-f的形式運作。如果你是要導入原有資料,把這個設定改成"beginning",logstash程序就從頭開始讀取,有點類似于less+f指令的形式運作。
close_older:一個已經監聽中的檔案,如果超過這個值的時間内沒有更新内容,就關閉監聽它的檔案句柄。預設是3
600秒,即一小時。
ignore_older:在每次檢查檔案清單的時候,如果一個檔案的最後修改時間超過這個值,就忽略這個檔案。預設是86
400秒,即一天。
注意事項如下:
1)通常你要導入原有資料進elasticsearch的話,你還需要filter/date插件來修改預設的"@timestamp"字段值。稍後會學習這方面的知識。
2)filewatch隻支援檔案的絕對路徑,而且會不自動遞歸目錄。是以有需要的話,請用數組方式都寫明具體哪些檔案。
3)logstash::inputs::file隻是在程序運作的注冊階段初始化一個filewatch對象。是以它不能支援類似fluentd那樣的path =>"/path/to/%{+yyyy/mm/dd/hh}.log"寫法。達到相同目的,你隻能寫成path
=>"/path/to/*/*/*/*.log"。filewatch子產品提供了一個稍微簡單一點的寫法:/path/to/**/*.log,用**來縮寫表示遞歸全部子目錄。
4)start_position僅在該檔案從未被監聽過的時候起作用。如果sincedb檔案中已經有這個檔案的inode記錄了,那麼logstash依然會從記錄過的pos開始讀取資料。是以重複測試的時候每回需要删除sincedb檔案。此外,官方部落格上提供了另一個巧妙的思路:https://www.elastic.co/blog/logstash-configuration-tuning。把sincedb_path定義為/dev/null,則每次重新開機自動從頭開始讀。
5)因為windows平台上沒有inode的概念,logstash某些版本在windows平台上監聽檔案不是很靠譜。windows平台上,推薦考慮使用nxlog作為收集端,參見後面5.5節。
2.1.3 tcp輸入
未來你可能會用redis伺服器或者其他的消息隊列系統來作為logstash
broker的角色。不過logstash其實也有自己的tcp/udp插件,在臨時任務的時候,也算能用,尤其是測試環境。
雖然logstash::inputs::tcp用ruby的socket和openssl庫實作了進階的ssl功能,但logstash本身隻能在sizedqueue中緩存20個事件。這就是我們建議在生産環境中換用其他消息隊列的原因。
tcp {
port => 8888
mode =>"server"
ssl_enable => false
目前來看,logstash::inputs::tcp最常見的用法就是配合nc指令導入舊資料。在啟動logstash程序後,在另一個終端運作如下指令即可導入資料:
# nc 127.0.0.1
8888 < olddata
這種做法比用logstash::inputs::file好,因為當nc指令結束,我們就知道資料導入完畢了。而用input/file方式,logstash程序還會一直等待新資料輸入被監聽的檔案,不能直接看出是否任務完成了。
2.1.4 syslog輸入
syslog可能是運維領域最流行的資料傳輸協定了。當你想從裝置上收集系統日志的時候,syslog應該會是你的第一選擇。尤其是網絡裝置,比如思科中syslog幾乎是唯一可行的辦法。
我們這裡不解釋如何配置你的syslog.conf、rsyslog.conf或者syslog-ng.conf來發送資料,而隻講如何把logstash配置成一個syslog伺服器來接收資料。
有關rsyslog的用法,後面的5.4節會有更詳細的介紹。
syslog {
port =>"514"
作為最簡單的測試,我們先暫停一下本機的syslogd(或rsyslogd)程序,然後啟動logstash程序(這樣就不會有端口沖突問題)。現在,本機的syslog就會預設發送到logstash裡了。我們可以用自帶的logger指令行工具發送一條“hello world”資訊到syslog裡(即logstash裡)。看到的logstash輸出像下面這樣:
=>"2014-08-08t09:01:15.911z",
=>"127.0.0.1",
"priority"
=>31,
"timestamp"
=>"aug 8 17:01:15",
"logsource"
=>"raochenlindemacbook-air.local",
"program"
=>"com.apple.metadata.mdflagwriter",
"pid"
=>"381",
"severity"
=>7,
"facility"
=>3,
"facility_label"
=>"system",
"severity_label"
=>"debug"
logstash是用udpsocket、tcpserver和logstash::filters::grok來實作logstash::inputs::syslog的。是以你其實可以直接用logstash配置實作一樣的效果,如下所示:
port =>"8514"
grok {
match => ["message",
"%{syslogline}" ]
syslog_pri { }
建議在使用logstash::inputs::syslog的時候走tcp協定來傳輸資料。
因為具體實作中,udp監聽器隻用了一個線程,而tcp監聽器會在接收每個連接配接的時候都啟動新的線程來處理後續步驟。
如果你已經在使用udp監聽器收集日志,用下行指令檢查你的udp接收隊列大小:
# netstat -plnu
| awk 'nr==1 || $4~/:514$/{print $2}'
recv-q
228096
228096是udp接收隊列的預設最大大小,這時候linux核心開始丢棄資料包了!
強烈建議使用logstash::inputs::tcp和logstash::filters::grok配合實作同樣的syslog功能!
雖然logstash::inputs::syslog在使用tcpserver的時候可以采用多線程處理資料的接收,但是在同一個用戶端資料的進行中,其grok和date是一直在該線程中完成的,這會導緻總體上的處理性能幾何級的下降—經過測試,tcpserver每秒可以接收50 000條資料,而在同一線程中啟用grok後每秒隻能處理5 000條,再加上date隻能達到500條!
才将這兩步拆分到filters階段後,logstash支援對該階段插件單獨設定多線程運作,大大提高了總體處理性能。在相同環境下,logstash -f tcp.conf -w 20的測試中,總體處理性能可以達到每秒30 000條資料!
測試采用logstash作者提供的指令:
yes
"<44>may 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000
出處見:https://
github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/makefile
如果你實在沒法切換到tcp協定,可以自己寫程式,或者使用其他基于異步i/o架構(比如libev)的項目。下面是一個簡單的異步i/o實作udp監聽資料輸入elasticsearch的示例:https://gist.github.com/chenryn/7c922ac424324ee0d695。
2.1.5 http_poller抓取
logstash作為資料采集系統,也支援自己作為一個http用戶端去抓取網頁資料或者接口資料。這方面有一個很明顯的it運維應用場景:很多業務系統軟體本身提供了restful的内部運作狀态接口,可以直接通過接口采集這些監控資訊。
更長期的方案應該是編寫對應的metricbeat子產品,但是直接采用logstash-input-http_poller顯然更快捷。
比如nginx的性能狀态,社群有一個非常全面的性能狀态監控子產品:nginx-module-vts。在新浪微網誌,後端池分為核心接口、非核心接口兩塊,我們要分别監控的話,nginx-module-vts的配置如下:
http {
vhost_traffic_status_zone;
map $uri $filter_uri {
default 'non-core';
/2/api/timeline core;
~^/2/api/unread core;
server {
vhost_traffic_status_filter_by_set_key
$filter_uri;
location /status {
auth_basic "restricted";
auth_basic_user_file pass_file;
vhost_traffic_status_display;
vhost_traffic_status_display_format
json;
則對應的logstash-input-http_poller配置如下:
http_poller {
urls => {
0 => {
method => get
url =>
"http://localhost:80/status/format/json"
headers => {
accept =>
"application/json"
}
auth => {
user =>
"youknowiknow"
password =>
"iknowyoudonotknow"
}
1 => {
url => "http://localhost:80/status/con
... up%3d*"
}
request_timeout => 60
interval => 60
codec => "json"
這段配置就可以就可以每60秒獲得一次 vts 資料,并重置計數了。
注意,url是一個hash值,是以它的執行順序是根據hash.map來的,為了確定我們是先擷取資料再重置,這裡幹脆用0, 1來作為hash的key,這樣順序就沒問題了。