2.2 编解码配置
Codec是Logstash从1.3.0版开始新引入的概念(Codec来自Coder/decoder两个单词的首字母缩写)。
在此之前,Logstash只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入期处理不同类型的数据,这全是因为有了Codec设置。
所以,这里需要纠正之前的一个概念。Logstash不只是一个input | filter | output的数据流,而是一个input | decode | filter | encode | output的数据流!Codec就是用来decode、encode事件的。
Codec的引入,使得Logstash可以更好、更方便地与其他有自定义数据格式的运维产品共存,比如graphite、fluent、netflow、collectd,以及使用msgpack、json、edn等通用数据格式的其他产品等。
事实上,我们在第一个“Hello World”用例中就已经用过Codec了—rubydebug就是一种Codec!虽然它一般只会用在stdout插件中,作为配置测试或者调试的工具。
这个五段式的流程说明源自Perl版的Logstash(后来改名叫Message::Passing模块)的设计。本书稍后5.8节会对该模块稍作介绍。
2.2.1 JSON编解码
在早期的版本中,有一种降低Logstash过滤器的CPU负载消耗的做法盛行于社区(在当时的Cookbook上有专门的一节介绍):直接输入预定义好的JSON数据,这样就可以省略掉filter/grok配置!
这个建议依然有效,不过在当前版本中需要稍微做一点配置变动,因为现在有专门的Codec设置。
1.配置示例
2.运行结果
下面访问一下用Nginx发布的Web页面,然后你会看到Logstash进程输出类似下面这样的内容:
{
"@timestamp" =>"2014-03-21T18:52:25.000+08:00",
"@version" =>"1",
"host" =>"raochenlindeMacBook-Air.local",
"client" =>"123.125.74.53",
"size" =>8096,
"responsetime" =>0.04,
"domain" =>"www.domain.com",
"url" =>"/path/to/file.suffix",
"status" =>"200"
}
3. Nginx代理服务的日志格式问题
对于一个Web服务器的访问日志,看起来已经可以很好的工作了。不过如果Nginx是作为一个代理服务器运行的话,访问日志里有些变量,比如说$upstream_response_time,可能不会一直是数字,它也可能是一个“-”字符串!这会直接导致Logstash对输入数据验证报异常。
有两个办法解决这个问题:
1)用sed在输入之前先替换-成0。运行Logstash进程时不再读取文件而是标准输入,这样命令就成了下面这个样子:
tail -F
/var/log/nginx/proxy_access.log_json \
| sed
's/upstreamtime":-/upstreamtime":0/' \
| /usr/local/logstash/bin/logstash -f
/usr/local/logstash/etc/proxylog.conf
2)日志格式中统一记录为字符串格式(即都带上双引号),然后再在Logstash中用filter/mutate插件来变更应该是数值类型的字符字段的值类型。
有关LogStash::Filters::Mutate的内容,本书稍后会有介绍。
2.2.2 多行事件编码
有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。
而Logstash正为此准备好了codec/multiline插件!当然,multiline插件也可以用于其他类似的堆栈式信息,比如Linux的内核日志。
配置示例如下:
input {
stdin {
codec => multiline {
pattern =>"^\["
negate => true
what =>"previous"
}
}
}
运行Logstash进程,然后在等待输入的终端中输入如下几行数据:
[Aug/08/08
14:54:03] hello world
[Aug/08/09
14:54:04] hello logstash
hello best practice
hello raochenlin
[Aug/08/10
14:54:05] the end
你会发现Logstash输出下面这样的返回:
{
"@timestamp" =>"2014-08-09T13:32:03.368Z",
"message" =>"[Aug/08/08 14:54:03] hello world\n",
"@version" =>"1",
"host" =>"raochenlindeMacBook-Air.local"
}
{
"@timestamp" =>"2014-08-09T13:32:24.359Z",
"message" =>"[Aug/08/09 14:54:04] hello logstash\n\n hello best practice\n\n
hello raochenlin\n",
"@version" =>"1",
"tags" => [
[0] "multiline"
],
"host" =>"raochenlindeMacBook-Air.local"
}
你看,后面这个事件,在“message”字段里存储了三行数据!
输出的事件中没有最后一行的"the end"字符串,这是因为你最后输入的回车符\n并不匹配设定的^\[正则表达式,Logstash还得等下一行数据直到匹配成功后才会输出这个事件。
其实这个插件的原理很简单,就是把当前行的数据添加到前面一行后面,直到新进的当前行匹配^\[正则为止。这个正则还可以用grok表达式,稍后你就会学习这方面的内容。具体的Java日志正则见:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/java
说到应用程序日志,Log4j肯定是第一个被大家想到的,使用codec/multiline也确实是一个办法。
不过,如果你本身就是开发人员,或者可以推动程序修改变更的话,Logstash还提供了另一种处理Log4j的方式:input/log4j。与codec/multiline不同,这个插件是直接调用了org.apache.log4j.spi.LoggingEvent处理TCP端口接收的数据。后面3.6节会详细讲述Log4j的用法。
2.2.3 网络流编码
NetFlow是Cisco发明的一种数据交换方式。NetFlow提供网络流量的会话级视图,记录下每个TCP/IP事务的信息。它的目的不是像tcpdump那样提供网络流量的完整记录,而是汇集起来形成更易于管理和易读的流向和容量的分析监控。
Cisco上配置NetFlow的方法,请参照具体的设备说明,主要是设定采集服务器的地址和端口,为运行Logstash服务的主机地址和端口(示例中为9995)。
采集NetFlow数据的Logstash服务配置示例如下:
input {
udp {
port => 9995
codec => netflow {
definitions =>"/opt/logstash-1.4.2/lib/logstash/codecs/netflow/netflow.yaml"
versions => [5]
}
}
}
output {
elasticsearch {
index =>"logstash_netflow5-%{+YYYY.MM.dd}"
host =>"localhost"
}
}
由于该插件生成的字段较多,所以建议对应的Elasticsesarch索引模板也需要单独提交:
# curl -XPUT localhost:9200/_template/logstash_netflow5 -d '{
"template" : "logstash_netflow5-*",
"settings": {
"index.refresh_interval": "5s"
},
"mappings" : {
"_default_" : {
"_all" : {"enabled" : false},
"properties" : {
"@version": { "index": "analyzed", "type": "integer" },
"@timestamp": { "index": "analyzed", "type": "date" },
"netflow": {
"dynamic": true,
"type": "object",
"properties": {
"version": { "index": "analyzed", "type": "integer" },
"flow_seq_num": { "index": "not_analyzed", "type": "long" },
"engine_type": { "index": "not_analyzed", "type": "integer" },
"engine_id": { "index": "not_analyzed", "type": "integer" },
"sampling_algorithm": { "index": "not_analyzed", "type": "integer" },
"sampling_interval": { "index": "not_analyzed", "type": "integer" },
"flow_records": { "index": "not_analyzed", "type": "integer" },
"ipv4_src_addr": { "index": "analyzed", "type": "ip" },
"ipv4_dst_addr": { "index": "analyzed", "type": "ip" },
"ipv4_next_hop": { "index": "analyzed", "type": "ip" },
"input_snmp": { "index": "not_analyzed", "type": "long" },
"output_snmp": { "index": "not_analyzed", "type": "long" },
"in_pkts": { "index": "analyzed", "type": "long" },
"in_bytes": { "index": "analyzed", "type": "long" },
"first_switched": { "index": "not_analyzed", "type": "date" },
"last_switched": { "index": "not_analyzed", "type": "date" },
"l4_src_port": { "index": "analyzed", "type": "long" },
"l4_dst_port": { "index": "analyzed", "type": "long" },
"tcp_flags": { "index": "analyzed", "type": "integer" },
"protocol": { "index": "analyzed", "type": "integer" },
"src_tos": { "index": "analyzed", "type": "integer" },
"src_as": { "index": "analyzed", "type": "integer" },
"dst_as": { "index": "analyzed", "type": "integer" },
"src_mask": { "index": "analyzed", "type": "integer" },
"dst_mask": { "index": "analyzed", "type": "integer" }
}
}
}
}
}
}'
Elasticsearch索引模板的功能,本书稍后12.6节会有详细介绍。
2.2.4 collectd输入
collectd是一个守护(daemon)进程,用来收集系统性能和提供各种存储方式来存储不同值的机制。它会在系统运行和存储信息时周期性的统计系统的相关统计信息。利用这些信息有助于查找当前系统性能瓶颈(如作为性能分析performance analysis)和预测系统未来的load(如能力部署capacity
planning)等
下面简单介绍一下:collectd的部署以及与Logstash对接的相关配置实例。
1. collectd的安装
collectd的安装同样有两种方式,使用官方软件仓库安装或源代码安装。
使用官方软件仓库安装(推荐)
collectd官方有一个隐藏的软件仓库:https://pkg.ci.collectd.org,构建有RHEL/CentOS (rpm),Debian/Ubuntu(deb)的软件包,如果你使用的操作系统属于上述的,那么推荐使用软件仓库安装。
目前collectd官方维护3个版本:5.4、5.5、5.6。根据需要选择合适的版本仓库。下面示例安装5.5版本的方法。
Debian/Ubuntu仓库安装:
echo "deb
http://pkg.ci.collectd.org/deb $(lsb_release -sc) collectd-5.5" | sudo tee
/etc/apt/sources.list.d/collectd.list
curl -s
https://pkg.ci.collectd.org/pubkey.asc | sudo apt-key add -
sudo apt-get
update && sudo apt-get install -y collectd
注意,Debian/Ubuntu软件仓库自带有collectd软件包,如果软件仓库自带的版本足够你使用,那么可以不用添加仓库,直接通过apt-get install collectd即可。
RHEL/CentOS仓库安装:
cat >
/etc/yum.repos.d/collectd.repo <<EOF
[collectd-5.5]
name=collectd-5.5
baseurl=http://pkg.ci.collectd.org/rpm/collectd-5.5/epel-\$releasever-\$basearch/
gpgcheck=1
gpgkey=http://pkg.ci.collectd.org/pubkey.asc
EOF
yum install -y
collectd
你如果需要使用其他collectd插件,此时也可一并安装对应的collectd-xxxx软件包。
源码安装collectd
collectd目前维护3个版本:5.4、5.5、5.6。源代码编译安装时同样可以根据自己需要选择对应版本的源码下载:
wget
http://collectd.org/files/collectd-5.4.1.tar.gz
tar zxvf
collectd-5.4.1.tar.gz
cd
collectd-5.4.1
./configure
--prefix=/usr --sysconfdir=/etc --localstatedir=/var --libdir=/usr/lib
--mandir=/usr/share/man --enable-all-plugins
make &&
make install
源代码编译安装需要预先解决好各种环境依赖,在RedHat平台上要提前安装如下软件包:
rpm -ivh "http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm"
yum -y install
libcurl libcurl-devel rrdtool rrdtool-devel perl-rrdtool rrdtool-prel
libgcrypt-devel gcc make gcc-c++ liboping liboping-devel perl-CPAN net-snmp
net-snmp-devel
安装启动脚本如下:
cp
contrib/redhat/init.d-collectd /etc/init.d/collectd
chmod +x
/etc/init.d/collectd
启动collectd如下:
service collectd
start
2. collectd的配置
以下配置可以实现对服务器基本的CPU、内存、网卡流量、磁盘I/O以及磁盘空间占用情况的监控:
Hostname "host.example.com"
LoadPlugin interface
LoadPlugin cpu
LoadPlugin memory
LoadPlugin network
LoadPlugin df
LoadPlugin disk
<Plugin interface>
Interface "eth0"
IgnoreSelected false
</Plugin>
<Plugin network>
<Server "10.0.0.1""25826"> ## Logstash 的 IP 地址和 collectd 的数据接收端口号
</Server>
</Plugin>
3. Logstash的配置
以下配置实现通过Logstash监听25826端口,接收从collectd发送过来的各项检测数据。
注意,logstash-filter-collectd插件本身需要单独安装,logstash插件安装说明之前已经讲过:
bin/logstash-plugin
install logstash-filter-collectd
Logstash默认自带有collectd的codec编码插件。
推荐配置示例如下: