日志系统之Flume采集加morphline解析

概述

这段时间花了部分时间在处理消息总线跟日志的对接上。这里分享一下在日志采集和日志解析中遇到的一些问题和处理方案。

日志采集-flume

logstash VS flume

首先谈谈我们在日志采集器上的选型。由于我们选择采用ElasticSearch作为日志的存储与搜索引擎。而基于ELK(ElasticSearch,Logstash,Kibana)的技术栈在日志系统方向又是如此流行,所以把Logstash列入考察对象也是顺理成章,Logstash在几大主流的日志收集器里算是后起之秀,被Elastic收购之后更加成熟,社区也比较活跃。

Logstash的设计:inputfilter,output。flume的设计sourcechannelsink,当然flume也有interceptor。具体的设计就不多废话,大致上都是拆分解耦pipeline(管道)的思想。同时,它们都支持分布式扩展,比如Logstash既可以作为shipper也可作为indexer,flume可以多个agent组成分布式事件流。

我对flume的接触早于Logstash。最近调研Logstash的时候,对它强大的filter印象深刻,特别是grok。而之前flume阵营强调最多的是它的source,sink,channel对各种开源组件的扩展支持非常强大。

Logstash固然是一个不错的,但它采用JRuby语言(一种形似Ruby语法的JVM平台的语言)实现使得它的定制性不够灵活,这是我放弃Logstash的主要原因。因为生态的原因,我确实需要Java技术栈提供的扩展性(这里主要目标是将消息总线作为日志采集的缓存队列),而这正是flume的强项。但flume里很少有提及对日志的解析支持,即便有支持正则的interceptor,也只是很有限的查找、替换之类的。经过一番调研发现其实flume提供了这样一个interceptor——morphline。它可以完成对日志的解析。

日志解析-morphline

morphline简介

morphline是由flume的母公司cloudera开源的一个ETL框架。它用于构建、改变基于Hadoop进行ETL(extract、transfer、load)的流式处理程序。(值得一提的是flume是由cloudera捐献给Apache的,后来经过重构成了flume-ng)。morphline使得你在构建ETL
Job不需要编码并且不需要大量的MapReduce技巧。

morphline是一个富配置文件可以很简单得定义一个转化链,用于从任何数据源消费任何类型的数据,处理数据然后加载结果到Hadoop组件中。它用简单的配置步骤代替了Java编程。

morphline是一个类库,可以嵌入任何java程序中。morphline是一个内存容器可以存储转化命令。这些命令以插件的形式被加载到morphline中以执行任务,比如加载、解析、转化或者处理单条记录。一个记录是在内存中的名称-值对的数据结构。而且morphline是可扩展的,可以集成已存在的功能和第三方系统。

这篇文章不是morphline的软文,所以更多介绍请移步cloudera的CDK官方文档

这里有副图,形象地展示了morphline大致的处理模型:

这里还有一幅图,展示了在大数据生态系统中,morphline的架构模型:

后来morphline的开发主要由Kite主导,它是构建于Hadoop上的一套抽象的数据模型层的API接口。这里有kiteSDK关于morphline的文档说明

强大的正则提取器——grok

其实我找morphline就是为了找grok,或者找到一种提供grok的切入口。grok利用正则的解析能力从非结构化的日志数据中提取结构化的字段。因为Logstash已经提供了一大堆的经过验证的grok规则,这是Logstash的优势,如果能够将这些规则直接在flume里使用,那么将能够直接集成Logstash的能力(其实,只要有文本是规则的,正则都能提取出来,但已经有成熟的东西就没必要自己再花费巨大的功夫去验证)。这里有grok的说明文档,就不再过多介绍了。

服务端使用morphline

flume在agent里利用morphline。在client端对日志进行ETL的优势可以利用客户端PC分散的计算能力以省去服务端解析的麻烦,但agent的数量非常之多,而且散布在各个生产服务器上,日志的格式也是五花八门。也就是说,在agent做太多的事情将使得我们在应对改变的时候缺乏灵活性。所以,我们在客户端只收集不解析。而在服务端利用morphline对日志进行解析。相当于启动一个解析服务,从日志采集队列中提取日志,用morphline进行解析转换,然后再将解析过的更结构化的日志发送到索引队列,等到索引服务将其存入ElasticSearch。整个过程大致如下图:

这种异步的基于队列的pipeline其实跟Storm这样的流处理器的同步pipeline本质上殊途同归,都是在利用廉价的PC来平摊计算量。

程序示例

为了在你的程序中使用morphline,首先需要添加对morphline的maven依赖:

        <dependency>
            <groupId>org.kitesdk</groupId>
            <artifactId>kite-morphlines-all</artifactId>
            <version>${kite.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-common</artifactId>
                </exclusion>
            </exclusions>
            <type>pom</type>
            <optional>true</optional>
        </dependency>

版本是1.0.0。需要注意的是,这里面有些依赖,需要从twitter的仓库里去下载,所以你懂的:请自备梯子。

示例程序:

    private void process(Message message) {
        msgBuffer.add(message);

        if (msgBuffer.size() < MESSAGE_BUFFER_SIZE) return;

        try {
            Notifications.notifyBeginTransaction(morphline);

            for (Message msg : msgBuffer) {
                Event logEvent = GSON.fromJson(new String(msg.getContent()), Event.class);

                String originalLog = new String(logEvent.getBody());
                logEvent.getHeaders().put(MORPHLINE_GROK_FIELD_NAME, originalLog);
                logEvent.setBody(null);

                Record record = new Record();
                for (Map.Entry<String, String> entry : logEvent.getHeaders().entrySet()) {
                    record.put(entry.getKey(), entry.getValue());
                }
                byte[] bytes = logEvent.getBody();
                if (bytes != null && bytes.length > 0) {
                    logger.info("original : " + new String(bytes));
                    record.put(Fields.ATTACHMENT_BODY, bytes);
                }

                Notifications.notifyStartSession(morphline);
                boolean success = morphline.process(record);
                if (!success) {
                    logger.error("failed to process record! from : " + morphlineFileAndId);
                    logger.error("record body : " + new String(logEvent.getBody()));
                }
            }

            //do some ETL jobs
            List<Record> records = this.extract();

            List<Event> events = this.transfer(records);

            this.load(events);

        } catch (JsonSyntaxException e) {
            logger.error(e);
            Notifications.notifyRollbackTransaction(morphline);
        } finally {
            //clear buffer and extractor
            this.extracter.getRecords().clear();
            this.msgBuffer.clear();
            Notifications.notifyCommitTransaction(morphline);
            Notifications.notifyShutdown(morphline);
        }
    }

这里只是部分代码,展示morphline的大致用法。主要的逻辑在配置文件中:

morphlines : [
    {
        id : morphline1
        importCommands : ["org.kitesdk.**"]

        commands : [
            {
                grok {
                    dictionaryString : """

                                       """
                    expressions : {
                                original : """"""
                    }
                    extract : true
                    numRequiredMatches : atLeastOnce # default is atLeastOnce
                    findSubstrings : false
                    addEmptyStrings : false
                }
            }

            { logInfo { format : "output record: {}", args : ["@{}"] } }
        ]
    }
]

如上所述,我们最主要的是想利用grok来解析日志,而logstash已经提供了大量的grok patterns供你开箱即用,但对于自定义的日志格式类型,你通常都需要自行解析。这里有个grok
在线debug工具

综述

其实,业界使用flume都是规模较大的互联网公司,比如美团。它们通常会使用flume+kafka+storm+hadoop生态系统。利用storm
stream做实时解析,利用mapreduce做离线分析,这种高度定制化的使用场景,几乎不需要flume的agent在客户端进行解析的能力,因此flume的morphline也就很少被提及。

但morphline还是不可多得的文本ETL利器,无论你是在采集的时候直接用morphline
做ETL还是在服务端做,flume+morphline加起来带来的灵活性也不输Logstash。

原文发布时间为:2015-11-20

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2025-01-30 10:55:05

日志系统之Flume采集加morphline解析的相关文章

日志系统之Flume日志收集

最近接手维护一个日志系统,它用于对应用服务器上的日志进行收集然后提供实时分析.处理并最后将日志存储到目标存储引擎.针对这三个环节,业界已经有一套组件来应对各自的需求需求,它们是flume+kafka+hdfs/hbase.我们在实时分析.存储这两个环节,选择跟业界的实践相同,但agent是团队自己写的,出于对多种数据源的扩展需求以及原来收集日志的方式存在的一些不足,于是调研了一下flume的agent.结果是flume非常契合我们的实际需求,并且拥有良好的扩展性与稳定性.于是打算采用flume的

【大数据技巧】Flume采集网站日志到MaxCompute常见问题汇总

免费开通大数据服务:https://www.aliyun.com/product/odps 本文列举了Flume采集网站日志到MaxCompute的一些常见问题,欢迎大家补充: Q:找不到指定路径的文件 A:本实验要在Linux系统下运行,路径也要写在Linux下的路径 Q:找不到指定sink type的类 A:插件错误,需要用新版本的 flume插件 写新版本的 datahub Q:不能执行SinkRunner A:插件错误,需要用新版本的 flume插件 写新版本的 datahub Q:不识

日志系统重构之多源聚合的采集器

最近对日志系统的采集机制进行了重构,增强了对单一主机上多个日志源采集的便捷性. 重构之前 重构之前的设计以日志类型为中心,一个日志类型对应一个独立的flume的配置模板,一个日志类型的一个日志源(具体到某个节点上特定的日志文件)对应一个flume配置文件(也即一个flume agent).flume配置模板主要针对这个日志类型的日志文件名称.是否存在多行日志.日志首字符.目标接收方有关的配置等固定属性.光靠这些配置还不够,因为这些日志类型所对应的日志会存在于各个具体的主机节点以及可能不同的文件系

教你一步搭建Flume分布式日志系统

在前篇几十条业务线日志系统如何收集处理?中已经介绍了Flume的众多应用场景,那此篇中先介绍如何搭建单机版日志系统. 环境 CentOS7.0       Java1.8 下载 官网下载 http://flume.apache.org/download.html 当前最新版  apache-flume-1.7.0-bin.tar.gz 下载后上传到CentOS中的/usr/local/ 文件夹中,并解压到当前文件中重命名为flume170    /usr/local/flume170 tar -

日志系统之基于Zookeeper的分布式协同设计

最近这段时间在设计和实现日志系统,在整个日志系统系统中Zookeeper的作用非常重要--它用于协调各个分布式组件并提供必要的配置信息和元数据.这篇文章主要分享一下Zookeeper的使用场景.这里主要涉及到Zookeeper在日志系统中的使用,但其实它在我们的消息总线和搜索模块中也同样非常重要. 日志元数据 日志的类型和日志的字段这里我们统称为日志的元数据.我们构建日志系统的目的最终主要是为了:日志搜索,日志分析.这两大块我们很大程度上依赖于--ElasticSearch(关于什么是Elast

日志系统之扩展Flume-LineDeserializer

继续闲聊日志系统,在之前的博文里已提到我们在日志收集上的选择是flume-ng.应用程序将日志打到各自的日志文件或指定的文件夹(日志文件按天滚动),然后利用flume的agent去日志文件中收集. Deserializer简介 flume将一条日志抽象成一个event.这里我们从日志文件中收集日志采用的是定制版的SpoolDirectorySource(我们对当日日志文件追加写入收集提供了支持).从日志源中将每条日志转换成event需要Deserializer(反序列化器).flume的每一个s

ELK统一日志系统的应用

收集和分析日志是应用开发中至关重要的一环,互联网大规模.分布式的特性决定了日志的源头越来越分散, 产生的速度越来越快,传统的手段和工具显得日益力不从心.在规模化场景下,grep.awk 无法快速发挥作用,我们需要一种高效.灵活的日志分析方式,可以给故障处理,问题定位提供更好的支持.基于全文搜索引擎 Lucene 构建的 ELKstack 平台,是目前比较流行的日志收集方解决方案. ELK系统的部署按照官方文档操作即可,相关资料也很多,这篇文章更多的关注三个组件的设计和实现,帮助大家了解这个流行的

日志系统之HBase日志存储设计优化

本人博客文章如未特别注明皆为原创!如有转载请注明出处:http://blog.csdn.net/yanghua_kobe/article/details/46482319 继续谈论最近接手的日志系统,上篇关于日志收集相关的内容,这篇我们谈谈日志存储相关的话题. 简介 我们首先来总结一下日志这种数据的业务特点:它几乎没有更新的需求,一个组件或一个系统通常有一个固定的日志格式,但就多个组件或系统而言它会存在各种五花八门的自定义的tag,这些tag建立的目的通常是为了后期查询/排查线上问题的需要,因此

开源日志系统比较

原文地址:http://www.cnblogs.com/ibook360/p/3159544.html 1. 背景介绍   许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征: (1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦: (2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统: (3) 具有高可扩展性.即:当数据量增加时,可以通过增加节点进行水平扩展. 本文从设计