本节书摘来自华章出版社《elk stack权威指南(第2版)》一书中的第2章,第2.1节,作者 饶琛琳 更多章节内容可以访问云栖社区“华章计算机”公众号查看。
第2章
插 件 配 置
插件是logstash最大的特色。各种不同的插件源源不断地被创造出来,发布到社区中供大家使用。本章会按照插件的类别,对一般场景下的一些常用插件做详细的配置和用例介绍。本章介绍的插件包括:1)输入插件。基于shipper端场景,主要介绍stdin、tcp、file等插件。2)编解码插件。编解码通常是会被遗忘的环节,但是运用好了,会大大提高工作效率,本节介绍最常用的json和multiline插件。3)过滤器插件。名为过滤器,其实各种数据裁剪和计算都可以在这类插件里完成,是logstash最强大的一环。本节会详细介绍grok、date、mutate、ruby、metrics等插件的妙用。4)输出插件。logstash虽然经常跟elasticsearch并称,但是作为一个日志传输框架,它其实可以输出数据到各种不同的地方。比如graphite、hdfs、nagios等等。本章会介绍这些常用的输出插件用法。
2.1 输入插件
在“hello world”示例中,我们已经见到并介绍了logstash的运行流程和配置的基础语法。从这章开始,我们就要逐一介绍logstash流程中比较常用的一些插件,并在介绍中针对其主要适用的场景、推荐的配置,作一些说明。
限于篇幅,接下来内容中,配置示例不一定能贴完整。请记住一个原则:logstash配置一定要有一个input和一个output。在演示过程中,如果没有写明input,默认就会使用“hello world”里我们已经演示过的logstash-input-stdin,同理,没有写明的output就是logstash-output-stdout。
以上请读者自明。
2.1.1 标准输入
我们已经见过好几个示例使用stdin了。这也应该是logstash里最简单和基础的插件了。所以,在这段中,我们先介绍一些未来每个插件都会有的一些方法。
配置示例如下:
input {
stdin {
add_field => {"key"
=>"value"}
codec =>"plain"
tags => ["add"]
type =>"std"
}
}
用上面的新stdin设置重新运行一次最开始的hello
world示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:bin/logstash -f stdin.conf。输入“hello world”并回车后,你会在终端看到如下输出:
{
"message"
=>"hello world",
"@version"
=>"1",
"@timestamp"
=>"2014-08-08t06:48:47.789z",
"type"
=>"std",
"tags"
=> [
[0] "add"
],
"key"
=>"value",
"host"
=>"raochenlindemacbook-air.local"
type和tags是logstash事件中两个特殊的字段。通常来说,我们会在“输入区段”中通过type来标记事件类型—我们肯定是提前能知道这个事件属于什么类型的。而tags则是在数据处理过程中,由具体的插件来添加或者删除的。
最常见的用法是像下面这样:
type =>"web"
filter {
if [type] == "web" {
grok {
match => ["message",
%{combinedapachelog}]
}
output {
if "_grokparsefailure" in [tags]
nagios_nsca {
nagios_status =>"1"
} else {
elasticsearch {
看起来蛮复杂的,对吧?
继续学习,你也可以写出来的。
2.1.2 文件输入
分析网站访问日志应该是一个运维工程师最常见的工作了。所以我们先学习一下怎么用logstash来处理日志文件。
logstash使用一个名叫filewatch的ruby gem库来监听文件变化。这个库支持glob展开文件路径,而且会记录一个叫.sincedb的数据库文件来跟踪被监听日志文件的当前读取位置。所以,不要担心logstash会漏过你的数据。
sincedb文件中记录了每个被监听的文件的inode, major number, minor
number和pos。
file {
path => ["/var/log/*.log",
"/var/log/message"]
type =>"system"
start_position
=>"beginning"
有一些比较有用的配置项,可以用来指定filewatch库的行为:
discover_interval:logstash每隔多久去检查一次被监听的path下是否有新文件,默认值是15秒。
exclude:不想被监听的文件可以排除出去,这里跟path一样支持glob展开。
sincedb_path:如果你不想用默认的$home/.sincedb(windows平台上为%userprofile%\.sincedb,该变量默认值是:c:\windows\system32\config\systemprofile),可以通过这个配置定义sincedb文件到其他位置。
sincedb_write_interval:logstash每隔多久写一次sincedb文件,默认是15秒。
stat_interval:logstash每隔多久检查一次被监听文件状态(是否有更新),默认是1秒。
start_position:logstash从什么位置开始读取文件数据,默认是结束位置,也就是说,logstash进程会以类似tail-f的形式运行。如果你是要导入原有数据,把这个设定改成"beginning",logstash进程就从头开始读取,有点类似于less+f命令的形式运行。
close_older:一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3
600秒,即一小时。
ignore_older:在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是86
400秒,即一天。
注意事项如下:
1)通常你要导入原有数据进elasticsearch的话,你还需要filter/date插件来修改默认的"@timestamp"字段值。稍后会学习这方面的知识。
2)filewatch只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。
3)logstash::inputs::file只是在进程运行的注册阶段初始化一个filewatch对象。所以它不能支持类似fluentd那样的path =>"/path/to/%{+yyyy/mm/dd/hh}.log"写法。达到相同目的,你只能写成path
=>"/path/to/*/*/*/*.log"。filewatch模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用**来缩写表示递归全部子目录。
4)start_position仅在该文件从未被监听过的时候起作用。如果sincedb文件中已经有这个文件的inode记录了,那么logstash依然会从记录过的pos开始读取数据。所以重复测试的时候每回需要删除sincedb文件。此外,官方博客上提供了另一个巧妙的思路:https://www.elastic.co/blog/logstash-configuration-tuning。把sincedb_path定义为/dev/null,则每次重启自动从头开始读。
5)因为windows平台上没有inode的概念,logstash某些版本在windows平台上监听文件不是很靠谱。windows平台上,推荐考虑使用nxlog作为收集端,参见后面5.5节。
2.1.3 tcp输入
未来你可能会用redis服务器或者其他的消息队列系统来作为logstash
broker的角色。不过logstash其实也有自己的tcp/udp插件,在临时任务的时候,也算能用,尤其是测试环境。
虽然logstash::inputs::tcp用ruby的socket和openssl库实现了高级的ssl功能,但logstash本身只能在sizedqueue中缓存20个事件。这就是我们建议在生产环境中换用其他消息队列的原因。
tcp {
port => 8888
mode =>"server"
ssl_enable => false
目前来看,logstash::inputs::tcp最常见的用法就是配合nc命令导入旧数据。在启动logstash进程后,在另一个终端运行如下命令即可导入数据:
# nc 127.0.0.1
8888 < olddata
这种做法比用logstash::inputs::file好,因为当nc命令结束,我们就知道数据导入完毕了。而用input/file方式,logstash进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。
2.1.4 syslog输入
syslog可能是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog应该会是你的第一选择。尤其是网络设备,比如思科中syslog几乎是唯一可行的办法。
我们这里不解释如何配置你的syslog.conf、rsyslog.conf或者syslog-ng.conf来发送数据,而只讲如何把logstash配置成一个syslog服务器来接收数据。
有关rsyslog的用法,后面的5.4节会有更详细的介绍。
syslog {
port =>"514"
作为最简单的测试,我们先暂停一下本机的syslogd(或rsyslogd)进程,然后启动logstash进程(这样就不会有端口冲突问题)。现在,本机的syslog就会默认发送到logstash里了。我们可以用自带的logger命令行工具发送一条“hello world”信息到syslog里(即logstash里)。看到的logstash输出像下面这样:
=>"2014-08-08t09:01:15.911z",
=>"127.0.0.1",
"priority"
=>31,
"timestamp"
=>"aug 8 17:01:15",
"logsource"
=>"raochenlindemacbook-air.local",
"program"
=>"com.apple.metadata.mdflagwriter",
"pid"
=>"381",
"severity"
=>7,
"facility"
=>3,
"facility_label"
=>"system",
"severity_label"
=>"debug"
logstash是用udpsocket、tcpserver和logstash::filters::grok来实现logstash::inputs::syslog的。所以你其实可以直接用logstash配置实现一样的效果,如下所示:
port =>"8514"
grok {
match => ["message",
"%{syslogline}" ]
syslog_pri { }
建议在使用logstash::inputs::syslog的时候走tcp协议来传输数据。
因为具体实现中,udp监听器只用了一个线程,而tcp监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。
如果你已经在使用udp监听器收集日志,用下行命令检查你的udp接收队列大小:
# netstat -plnu
| awk 'nr==1 || $4~/:514$/{print $2}'
recv-q
228096
228096是udp接收队列的默认最大大小,这时候linux内核开始丢弃数据包了!
强烈建议使用logstash::inputs::tcp和logstash::filters::grok配合实现同样的syslog功能!
虽然logstash::inputs::syslog在使用tcpserver的时候可以采用多线程处理数据的接收,但是在同一个客户端数据的处理中,其grok和date是一直在该线程中完成的,这会导致总体上的处理性能几何级的下降—经过测试,tcpserver每秒可以接收50 000条数据,而在同一线程中启用grok后每秒只能处理5 000条,再加上date只能达到500条!
才将这两步拆分到filters阶段后,logstash支持对该阶段插件单独设置多线程运行,大大提高了总体处理性能。在相同环境下,logstash -f tcp.conf -w 20的测试中,总体处理性能可以达到每秒30 000条数据!
测试采用logstash作者提供的命令:
yes
"<44>may 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000
出处见:https://
github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/makefile
如果你实在没法切换到tcp协议,可以自己写程序,或者使用其他基于异步i/o框架(比如libev)的项目。下面是一个简单的异步i/o实现udp监听数据输入elasticsearch的示例:https://gist.github.com/chenryn/7c922ac424324ee0d695。
2.1.5 http_poller抓取
logstash作为数据采集系统,也支持自己作为一个http客户端去抓取网页数据或者接口数据。这方面有一个很明显的it运维应用场景:很多业务系统软件本身提供了restful的内部运行状态接口,可以直接通过接口采集这些监控信息。
更长期的方案应该是编写对应的metricbeat模块,但是直接采用logstash-input-http_poller显然更快捷。
比如nginx的性能状态,社区有一个非常全面的性能状态监控模块:nginx-module-vts。在新浪微博,后端池分为核心接口、非核心接口两块,我们要分别监控的话,nginx-module-vts的配置如下:
http {
vhost_traffic_status_zone;
map $uri $filter_uri {
default 'non-core';
/2/api/timeline core;
~^/2/api/unread core;
server {
vhost_traffic_status_filter_by_set_key
$filter_uri;
location /status {
auth_basic "restricted";
auth_basic_user_file pass_file;
vhost_traffic_status_display;
vhost_traffic_status_display_format
json;
则对应的logstash-input-http_poller配置如下:
http_poller {
urls => {
0 => {
method => get
url =>
"http://localhost:80/status/format/json"
headers => {
accept =>
"application/json"
}
auth => {
user =>
"youknowiknow"
password =>
"iknowyoudonotknow"
}
1 => {
url => "http://localhost:80/status/con
... up%3d*"
}
request_timeout => 60
interval => 60
codec => "json"
这段配置就可以就可以每60秒获得一次 vts 数据,并重置计数了。
注意,url是一个hash值,所以它的执行顺序是根据hash.map来的,为了确保我们是先获取数据再重置,这里干脆用0, 1来作为hash的key,这样顺序就没问题了。