第2章
插 件 配 置
插件是Logstash最大的特色。各种不同的插件源源不断地被创造出来,发布到社区中供大家使用。本章会按照插件的类别,对一般场景下的一些常用插件做详细的配置和用例介绍。本章介绍的插件包括:1)输入插件。基于shipper端场景,主要介绍STDIN、TCP、File等插件。2)编解码插件。编解码通常是会被遗忘的环节,但是运用好了,会大大提高工作效率,本节介绍最常用的JSON和multiline插件。3)过滤器插件。名为过滤器,其实各种数据裁剪和计算都可以在这类插件里完成,是Logstash最强大的一环。本节会详细介绍grok、date、mutate、ruby、metrics等插件的妙用。4)输出插件。Logstash虽然经常跟Elasticsearch并称,但是作为一个日志传输框架,它其实可以输出数据到各种不同的地方。比如Graphite、HDFS、Nagios等等。本章会介绍这些常用的输出插件用法。
2.1 输入插件
在“Hello World”示例中,我们已经见到并介绍了Logstash的运行流程和配置的基础语法。从这章开始,我们就要逐一介绍Logstash流程中比较常用的一些插件,并在介绍中针对其主要适用的场景、推荐的配置,作一些说明。
限于篇幅,接下来内容中,配置示例不一定能贴完整。请记住一个原则:Logstash配置一定要有一个input和一个output。在演示过程中,如果没有写明input,默认就会使用“Hello World”里我们已经演示过的logstash-input-stdin,同理,没有写明的output就是logstash-output-stdout。
以上请读者自明。
2.1.1 标准输入
我们已经见过好几个示例使用stdin了。这也应该是Logstash里最简单和基础的插件了。所以,在这段中,我们先介绍一些未来每个插件都会有的一些方法。
配置示例如下:
input {
stdin {
add_field => {"key"
=>"value"}
codec =>"plain"
tags => ["add"]
type =>"std"
}
}
用上面的新stdin设置重新运行一次最开始的Hello
World示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:bin/logstash -f stdin.conf。输入“Hello World”并回车后,你会在终端看到如下输出:
{
"message"
=>"hello world",
"@version"
=>"1",
"@timestamp"
=>"2014-08-08T06:48:47.789Z",
"type"
=>"std",
"tags"
=> [
[0] "add"
],
"key"
=>"value",
"host"
=>"raochenlindeMacBook-Air.local"
}
type和tags是Logstash事件中两个特殊的字段。通常来说,我们会在“输入区段”中通过type来标记事件类型—我们肯定是提前能知道这个事件属于什么类型的。而tags则是在数据处理过程中,由具体的插件来添加或者删除的。
最常见的用法是像下面这样:
input {
stdin {
type =>"web"
}
}
filter {
if [type] == "web" {
grok {
match => ["message",
%{COMBINEDAPACHELOG}]
}
}
}
output {
if "_grokparsefailure" in [tags]
{
nagios_nsca {
nagios_status =>"1"
}
} else {
elasticsearch {
}
}
}
看起来蛮复杂的,对吧?
继续学习,你也可以写出来的。
2.1.2 文件输入
分析网站访问日志应该是一个运维工程师最常见的工作了。所以我们先学习一下怎么用Logstash来处理日志文件。
Logstash使用一个名叫FileWatch的Ruby Gem库来监听文件变化。这个库支持glob展开文件路径,而且会记录一个叫.sincedb的数据库文件来跟踪被监听日志文件的当前读取位置。所以,不要担心Logstash会漏过你的数据。
sincedb文件中记录了每个被监听的文件的inode, major number, minor
number和pos。
配置示例如下:
input {
file {
path => ["/var/log/*.log",
"/var/log/message"]
type =>"system"
start_position
=>"beginning"
}
}
有一些比较有用的配置项,可以用来指定FileWatch库的行为:
discover_interval:Logstash每隔多久去检查一次被监听的path下是否有新文件,默认值是15秒。
exclude:不想被监听的文件可以排除出去,这里跟path一样支持glob展开。
sincedb_path:如果你不想用默认的$HOME/.sincedb(Windows平台上为%USERPROFILE%\.sincedb,该变量默认值是:C:\Windows\System32\config\systemprofile),可以通过这个配置定义sincedb文件到其他位置。
sincedb_write_interval:Logstash每隔多久写一次sincedb文件,默认是15秒。
stat_interval:Logstash每隔多久检查一次被监听文件状态(是否有更新),默认是1秒。
start_position:Logstash从什么位置开始读取文件数据,默认是结束位置,也就是说,Logstash进程会以类似tail-F的形式运行。如果你是要导入原有数据,把这个设定改成"beginning",Logstash进程就从头开始读取,有点类似于less+F命令的形式运行。
close_older:一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3
600秒,即一小时。
ignore_older:在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是86
400秒,即一天。
注意事项如下:
1)通常你要导入原有数据进Elasticsearch的话,你还需要filter/date插件来修改默认的"@timestamp"字段值。稍后会学习这方面的知识。
2)FileWatch只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。
3)LogStash::Inputs::File只是在进程运行的注册阶段初始化一个FileWatch对象。所以它不能支持类似fluentd那样的path =>"/path/to/%{+yyyy/MM/dd/hh}.log"写法。达到相同目的,你只能写成path
=>"/path/to/*/*/*/*.log"。FileWatch模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用**来缩写表示递归全部子目录。
4)start_position仅在该文件从未被监听过的时候起作用。如果sincedb文件中已经有这个文件的inode记录了,那么Logstash依然会从记录过的pos开始读取数据。所以重复测试的时候每回需要删除sincedb文件。此外,官方博客上提供了另一个巧妙的思路:https://www.elastic.co/blog/logstash-configuration-tuning。把sincedb_path定义为/dev/null,则每次重启自动从头开始读。
5)因为Windows平台上没有inode的概念,Logstash某些版本在Windows平台上监听文件不是很靠谱。Windows平台上,推荐考虑使用nxlog作为收集端,参见后面5.5节。
2.1.3 TCP输入
未来你可能会用Redis服务器或者其他的消息队列系统来作为Logstash
Broker的角色。不过Logstash其实也有自己的TCP/UDP插件,在临时任务的时候,也算能用,尤其是测试环境。
虽然LogStash::Inputs::TCP用Ruby的Socket和OpenSSL库实现了高级的SSL功能,但Logstash本身只能在SizedQueue中缓存20个事件。这就是我们建议在生产环境中换用其他消息队列的原因。
配置示例如下:
input {
tcp {
port => 8888
mode =>"server"
ssl_enable => false
}
}
目前来看,LogStash::Inputs::TCP最常见的用法就是配合nc命令导入旧数据。在启动Logstash进程后,在另一个终端运行如下命令即可导入数据:
# nc 127.0.0.1
8888 < olddata
这种做法比用LogStash::Inputs::File好,因为当nc命令结束,我们就知道数据导入完毕了。而用input/file方式,Logstash进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。
2.1.4 syslog输入
syslog可能是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog应该会是你的第一选择。尤其是网络设备,比如思科中syslog几乎是唯一可行的办法。
我们这里不解释如何配置你的syslog.conf、rsyslog.conf或者syslog-ng.conf来发送数据,而只讲如何把Logstash配置成一个syslog服务器来接收数据。
有关rsyslog的用法,后面的5.4节会有更详细的介绍。
配置示例如下:
input {
syslog {
port =>"514"
}
}
作为最简单的测试,我们先暂停一下本机的syslogd(或rsyslogd)进程,然后启动Logstash进程(这样就不会有端口冲突问题)。现在,本机的syslog就会默认发送到Logstash里了。我们可以用自带的logger命令行工具发送一条“Hello World”信息到syslog里(即Logstash里)。看到的Logstash输出像下面这样:
{
"message"
=>"Hello World",
"@version"
=>"1",
"@timestamp"
=>"2014-08-08T09:01:15.911Z",
"host"
=>"127.0.0.1",
"priority"
=>31,
"timestamp"
=>"Aug 8 17:01:15",
"logsource"
=>"raochenlindeMacBook-Air.local",
"program"
=>"com.apple.metadata.mdflagwriter",
"pid"
=>"381",
"severity"
=>7,
"facility"
=>3,
"facility_label"
=>"system",
"severity_label"
=>"Debug"
}
Logstash是用UDPSocket、TCPServer和LogStash::Filters::Grok来实现LogStash::Inputs::Syslog的。所以你其实可以直接用Logstash配置实现一样的效果,如下所示:
input {
tcp {
port =>"8514"
}
}
filter {
grok {
match => ["message",
"%{SYSLOGLINE}" ]
}
syslog_pri { }
}
建议在使用LogStash::Inputs::Syslog的时候走TCP协议来传输数据。
因为具体实现中,UDP监听器只用了一个线程,而TCP监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。
如果你已经在使用UDP监听器收集日志,用下行命令检查你的UDP接收队列大小:
# netstat -plnu
| awk 'NR==1 || $4~/:514$/{print $2}'
Recv-Q
228096
228096是UDP接收队列的默认最大大小,这时候linux内核开始丢弃数据包了!
强烈建议使用LogStash::Inputs::TCP和LogStash::Filters::Grok配合实现同样的syslog功能!
虽然LogStash::Inputs::Syslog在使用TCPServer的时候可以采用多线程处理数据的接收,但是在同一个客户端数据的处理中,其grok和date是一直在该线程中完成的,这会导致总体上的处理性能几何级的下降—经过测试,TCPServer每秒可以接收50 000条数据,而在同一线程中启用grok后每秒只能处理5 000条,再加上date只能达到500条!
才将这两步拆分到filters阶段后,Logstash支持对该阶段插件单独设置多线程运行,大大提高了总体处理性能。在相同环境下,logstash -f tcp.conf -w 20的测试中,总体处理性能可以达到每秒30 000条数据!
测试采用Logstash作者提供的命令:
yes
"<44>May 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000
出处见:https://
github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile
如果你实在没法切换到TCP协议,可以自己写程序,或者使用其他基于异步I/O框架(比如libev)的项目。下面是一个简单的异步I/O实现UDP监听数据输入Elasticsearch的示例:https://gist.github.com/chenryn/7c922ac424324ee0d695。
2.1.5 http_poller抓取
Logstash作为数据采集系统,也支持自己作为一个HTTP客户端去抓取网页数据或者接口数据。这方面有一个很明显的IT运维应用场景:很多业务系统软件本身提供了RESTful的内部运行状态接口,可以直接通过接口采集这些监控信息。
更长期的方案应该是编写对应的metricbeat模块,但是直接采用logstash-input-http_poller显然更快捷。
比如Nginx的性能状态,社区有一个非常全面的性能状态监控模块:nginx-module-vts。在新浪微博,后端池分为核心接口、非核心接口两块,我们要分别监控的话,nginx-module-vts的配置如下:
http {
vhost_traffic_status_zone;
map $uri $filter_uri {
default 'non-core';
/2/api/timeline core;
~^/2/api/unread core;
}
server {
vhost_traffic_status_filter_by_set_key
$filter_uri;
location /status {
auth_basic "Restricted";
auth_basic_user_file pass_file;
vhost_traffic_status_display;
vhost_traffic_status_display_format
json;
}
}
}
则对应的logstash-input-http_poller配置如下:
input {
http_poller {
urls => {
0 => {
method => get
url =>
"http://localhost:80/status/format/json"
headers => {
Accept =>
"application/json"
}
auth => {
user =>
"YouKnowIKnow"
password =>
"IKnowYouDonotKnow"
}
}
1 => {
method => get
url => "http://localhost:80/status/con
... up%3D*"
headers => {
Accept =>
"application/json"
}
auth => {
user =>
"YouKnowIKnow"
password =>
"IKnowYouDonotKnow"
}
}
}
request_timeout => 60
interval => 60
codec => "json"
}
}
这段配置就可以就可以每60秒获得一次 vts 数据,并重置计数了。
注意,url是一个Hash值,所以它的执行顺序是根据Hash.map来的,为了确保我们是先获取数据再重置,这里干脆用0, 1来作为Hash的key,这样顺序就没问题了。