日志系统之扩展Flume-LineDeserializer

继续闲聊日志系统,在之前的博文里已提到我们在日志收集上的选择是flume-ng。应用程序将日志打到各自的日志文件或指定的文件夹(日志文件按天滚动),然后利用flume的agent去日志文件中收集。

Deserializer简介

flume将一条日志抽象成一个event。这里我们从日志文件中收集日志采用的是定制版的SpoolDirectorySource(我们对当日日志文件追加写入收集提供了支持)。从日志源中将每条日志转换成event需要Deserializer(反序列化器)。flume的每一个source对应的deserializer必须实现接口EventDeserializer,该接口定义了readEvent/readEvents方法从各种日志源读取Event。

flume主要支持两种反序列化器:

(1)AvroEventDeserializer:解析Avro容器文件的反序列化器。对Avro文件的每条记录生成一个flume
Event,并将基于avro编码的二进制记录存入event body中。

(2)LineDeserializer:它是基于日志文件的反序列化器,以“\n”行结束符将每行区分为一条日志记录。

LineDeserializer的缺陷

大部分情况下SpoolDictionarySource配合LineDeserializer工作起来都没问题。但当日志记录本身被分割成多行时,比如异常日志的堆栈或日志中包含“\n”换行符时,问题就来了:原先的按行界定日志记录的方式不能满足这种要求。形如这样的格式:

[2015-06-22 13:14:28,780] [ERROR] [sysName] [subSys or component] [Thread-9] [com.messagebus.client.handler.common.CommonLoopHandler] -*- stacktrace -*- : com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
	at com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:203)
	at com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:220)
	at com.messagebus.client.handler.common.CommonLoopHandler.handle(CommonLoopHandler.java:34)
	at com.messagebus.client.handler.consume.ConsumerDispatchHandler.handle(ConsumerDispatchHandler.java:17)
	at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
	at com.messagebus.client.handler.consume.RealConsumer.handle(RealConsumer.java:44)
	at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
	at com.messagebus.client.handler.consume.ConsumerTagGenerator.handle(ConsumerTagGenerator.java:22)
	at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
	at com.messagebus.client.handler.consume.ConsumePermission.handle(ConsumePermission.java:37)
	at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
	at com.messagebus.client.handler.consume.ConsumeParamValidator.handle(ConsumeParamValidator.java:17)
	at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
	at com.messagebus.client.carry.GenericConsumer.run(GenericConsumer.java:50)
	at java.lang.Thread.run(Thread.java:744)
Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)

当然你也可以对日志内容进行特殊处理,让一条日志的所有内容以一行输出,但这样需要对日志框架进行定制,有时这并不受你控制。因此这里最好的选择是定制日志收集器。

源码问题定位

我们先来了解一下Flume源码中LineDeserializer的核心实现:

  private String readLine() throws IOException {
    StringBuilder sb = new StringBuilder();
    int c;
    int readChars = 0;
    while ((c = in.readChar()) != -1) {
      readChars++;

      // FIXME: support \r\n
      if (c == '\n') {
        break;
      }

      sb.append((char)c);

      if (readChars >= maxLineLength) {
        logger.warn("Line length exceeds max ({}), truncating line!",
            maxLineLength);
        break;
      }
    }

    if (readChars > 0) {
      return sb.toString();
    } else {
      return null;
    }
  }

首先,构建一个StringBuilder,然后以字符为单位挨个读取,如果读取到换行符“\n”,则表示读取本条日志结束,跳出循环;否则将该字符串追加到StringBuilder中。与此同时会给读取的字符个数计数:如果读取的字符个数大于预先配置的一行日志的最大字符串长度,也会跳出循环。

这里的主要问题出在以换行符“\n”作为日志结尾的分隔符逻辑上。当我们记录异常日志时,我们需要重新找到一种界定日志记录结尾的方式。

解决思路

考虑到我们采用[]作为日志的tag界定符,每条日志几乎都是以“[”打头。因此,我们采取的做法是:判断读取到换行符“\n”后再预读下一位,如果下一位是“[”,则认为这是一条普通不换行的日志,此时再回退一个字符(因为刚刚预读了一个字符,需要让指针后退回原来的位置),然后跳出循环;而如果下一位不是“[”,则认为它是一个异常日志或者多行日志。则继续往后读取字符,当遇到换行符时,再次重复以上判断。当然如果你的日志格式是以某个固定的格式打头,首字母固定的话,才可以用这种方式,否则你很可能要配置日志的apender,使其以某个特定的符号作为日志的结尾来判断了。另外,有时也可以基于正则来匹配。

定制实现

为了提升扩展性,我们提供对预读的下一个字符进行配置,并将其命名为:newLineStartPrefix。我们新建一个反序列化类:MultiLineDeserializer。该类的大部分逻辑都跟LineDeserializer相同,主要需要重新实现上面的readLine方法,实现如下:

    private String readLine() throws IOException {
        StringBuilder sb = new StringBuilder();
        int c;
        int readChars = 0;
        while ((c = in.readChar()) != -1) {
            readChars++;

            // FIXME: support \r\n
            if (c == '\n') {

                //walk more one step
                c = in.readChar();
                if (c == -1)
                    break;
                else if (c == this.newLineStartPrefix) {    //retreat one step
                    long currentPosition = in.tell();
                    in.seek(currentPosition - 1);
                    break;
                }
            }

            sb.append((char)c);

            if (readChars >= maxLineLength) {
                logger.warn("Line length exceeds max ({}), truncating line!",
                            maxLineLength);
                break;
            }
        }

        if (readChars > 0) {
            return sb.toString();
        } else {
            return null;
        }
    }

这里有个小插曲,由于之前已定制了source/sink的缘故。原以为deserializer也可以用同样的方式进行定制。并在agent的deserializer配置中指定定制过的deserializer的完全限定名。但经过验证后发现,这条路走不通,会报错(貌似从flume官网上也找不到对deserializer定制的介绍)。因此,只能在源码上进行扩展,然后编译源码,重新生成jar。

从源码里你会发现为什么在第三方包内扩展deserializer是行不通的。从github上clone下源码,进入flume-ng-core
module的如下类:org.apache.flume.serialization.EventDeserializerType,你就会一目了然:

public enum EventDeserializerType {
  LINE(LineDeserializer.Builder.class),
  MULTILINE(MultiLineDeserializer.Builder.class),
  AVRO(AvroEventDeserializer.Builder.class),
  OTHER(null);

  private final Class<? extends EventDeserializer.Builder> builderClass;

  EventDeserializerType(Class<? extends EventDeserializer.Builder> builderClass) {
    this.builderClass = builderClass;
  }

  public Class<? extends EventDeserializer.Builder> getBuilderClass() {
    return builderClass;
  }

}

你必须显式在这里定义deserializer的枚举,然后指定其builder的Class实例,并在agent里的deserializer配置项中填写你这里的枚举名称才行。我们只需在子package:serialization中新建MultiLineDeserializer类,然后重新实现逻辑、编译、打包flume-ng-core
Module生成新的jar即可。flume将其源码中的每个Module生成的jar都放在其二进制包的lib文件夹下。你只需将重新打包好的flume-ng-core
jar替换原来的,重启agent即可看到效果。

这里还有个需要注意的地方:LineDeserializer有一个参数(maxLineLength)用于定义一个日志行的最长字符数。如果某条日志超过这个长度,将不再读取。而一条日志占据多行情况下,该值需要适当增大,因为像异常日志的堆栈长度明显比普通日志长不少,这里你可以设置为8192。

原文发布时间为:2015-06-22

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-10-29 19:23:12

日志系统之扩展Flume-LineDeserializer的相关文章

日志系统之Flume采集加morphline解析

概述 这段时间花了部分时间在处理消息总线跟日志的对接上.这里分享一下在日志采集和日志解析中遇到的一些问题和处理方案. 日志采集-flume logstash VS flume 首先谈谈我们在日志采集器上的选型.由于我们选择采用ElasticSearch作为日志的存储与搜索引擎.而基于ELK(ElasticSearch,Logstash,Kibana)的技术栈在日志系统方向又是如此流行,所以把Logstash列入考察对象也是顺理成章,Logstash在几大主流的日志收集器里算是后起之秀,被Elas

日志系统之Flume日志收集

最近接手维护一个日志系统,它用于对应用服务器上的日志进行收集然后提供实时分析.处理并最后将日志存储到目标存储引擎.针对这三个环节,业界已经有一套组件来应对各自的需求需求,它们是flume+kafka+hdfs/hbase.我们在实时分析.存储这两个环节,选择跟业界的实践相同,但agent是团队自己写的,出于对多种数据源的扩展需求以及原来收集日志的方式存在的一些不足,于是调研了一下flume的agent.结果是flume非常契合我们的实际需求,并且拥有良好的扩展性与稳定性.于是打算采用flume的

教你一步搭建Flume分布式日志系统

在前篇几十条业务线日志系统如何收集处理?中已经介绍了Flume的众多应用场景,那此篇中先介绍如何搭建单机版日志系统. 环境 CentOS7.0       Java1.8 下载 官网下载 http://flume.apache.org/download.html 当前最新版  apache-flume-1.7.0-bin.tar.gz 下载后上传到CentOS中的/usr/local/ 文件夹中,并解压到当前文件中重命名为flume170    /usr/local/flume170 tar -

ELK统一日志系统的应用

收集和分析日志是应用开发中至关重要的一环,互联网大规模.分布式的特性决定了日志的源头越来越分散, 产生的速度越来越快,传统的手段和工具显得日益力不从心.在规模化场景下,grep.awk 无法快速发挥作用,我们需要一种高效.灵活的日志分析方式,可以给故障处理,问题定位提供更好的支持.基于全文搜索引擎 Lucene 构建的 ELKstack 平台,是目前比较流行的日志收集方解决方案. ELK系统的部署按照官方文档操作即可,相关资料也很多,这篇文章更多的关注三个组件的设计和实现,帮助大家了解这个流行的

开源日志系统比较

原文地址:http://www.cnblogs.com/ibook360/p/3159544.html 1. 背景介绍   许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征: (1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦: (2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统: (3) 具有高可扩展性.即:当数据量增加时,可以通过增加节点进行水平扩展. 本文从设计

几十条业务线日志系统如何收集处理?

在互联网迅猛发展的今天 各大厂发挥十八般武艺的收集用户的各种信息,甚至包括点击的位置,我们也经常发现自己刚搜完一个东西,再打开网页时每个小广告都会出现与之相关联的商品或信息,在感叹智能的同时不惊想 什么时候泄露的行踪. 许多公司的业务平台每天都会产生大量的日志数据.收集业务日志数据,供离线和在线的分析系统使用,正是日志收集系统的要做的事情. 用户的数据除了这种后台默默的收集外,还有各种运行的日志数据和后台操作日志,因此每个业务可以算是一种类型的日志,那稍大点的公司就会有几十种日志类型要收集,而且

开发基于ASP.NET的自定义日志系统

asp.net 摘 要 介绍了利用ASP.NET和VB.NET技术开发的用户日志管理系统,实现了对自定义格式数据库系统的动态管理,使得对日志信息的管理更加及时.高效,提高了工作效率. 关键词 ASP.NET:VB.NET:自定义:日志:数据库 自定义日志管理的数据库设计 自定义日志系统是校园网一卡通系统中机房刷卡子系统的一个基于B/S开发的功能模块,该系统的后台数据库为自定义格式的数据库系统.数据库中主要的用户表和日志表的结构如下: Structure FixUse '用户表Public ID

日志系统之基于Zookeeper的分布式协同设计

最近这段时间在设计和实现日志系统,在整个日志系统系统中Zookeeper的作用非常重要--它用于协调各个分布式组件并提供必要的配置信息和元数据.这篇文章主要分享一下Zookeeper的使用场景.这里主要涉及到Zookeeper在日志系统中的使用,但其实它在我们的消息总线和搜索模块中也同样非常重要. 日志元数据 日志的类型和日志的字段这里我们统称为日志的元数据.我们构建日志系统的目的最终主要是为了:日志搜索,日志分析.这两大块我们很大程度上依赖于--ElasticSearch(关于什么是Elast

日志系统重构之多源聚合的采集器

最近对日志系统的采集机制进行了重构,增强了对单一主机上多个日志源采集的便捷性. 重构之前 重构之前的设计以日志类型为中心,一个日志类型对应一个独立的flume的配置模板,一个日志类型的一个日志源(具体到某个节点上特定的日志文件)对应一个flume配置文件(也即一个flume agent).flume配置模板主要针对这个日志类型的日志文件名称.是否存在多行日志.日志首字符.目标接收方有关的配置等固定属性.光靠这些配置还不够,因为这些日志类型所对应的日志会存在于各个具体的主机节点以及可能不同的文件系