flume学习(四):Flume Interceptors的使用

对于flume拦截器,我的理解是:在app(应用程序日志)和 source 之间的,对app日志进行拦截处理的。也即在日志进入到source之前,对日志进行一些包装、清新过滤等等动作。

官方上提供的已有的拦截器有:

Timestamp Interceptor

Host Interceptor

Static Interceptor

Regex Filtering Interceptor

Regex Extractor Interceptor

像很多java的开源项目如springmvc中的拦截器一样,flume的拦截器也是chain形式的,可以对一个source指定多个拦截器,按先后顺序依次处理。

Timestamp Interceptor :在event的header中添加一个key叫:timestamp,value为当前的时间戳。这个拦截器在sink为hdfs 时很有用,后面会举例说到

Host Interceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip。

Static Interceptor:可以在event的header中添加自定义的key和value。

Regex Filtering Interceptor:通过正则来清洗或包含匹配的events。

Regex Extractor Interceptor:通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

下面举例说明这些拦截器的用法,首先我们调整一下第一篇文章中的那个WriteLog类:

[java] view plaincopy

  1. public class WriteLog {  
  2.     protected static final Log logger = LogFactory.getLog(WriteLog.class);  
  3.   
  4.     /** 
  5.      * @param args 
  6.      * @throws InterruptedException 
  7.      */  
  8.     public static void main(String[] args) throws InterruptedException {  
  9.         // TODO Auto-generated method stub  
  10.         while (true) {  
  11.             logger.info(new Date().getTime());  
  12.             logger.info(“{\”requestTime\”:”  
  13.                     + System.currentTimeMillis()  
  14.                     + “,\”requestParams\”:{\”timestamp\”:1405499314238,\”phone\”:\”02038824941\”,\”cardName\”:\”测试商家名称\”,\”provinceCode\”:\”440000\”,\”cityCode\”:\”440106\”},\”requestUrl\”:\”/reporter-api/reporter/reporter12/init.do\”}”);  
  15.             Thread.sleep(2000);  
  16.   
  17.         }  
  18.     }  
  19. }  

又多输出了一行日志信息,现在每次循环都会输出两行日志信息,第一行是一个时间戳信息,第二行是一行JSON格式的字符串信息。

接下来我们用regex_filter和 timestamp这两个拦截器来实现这样一个功能:

1 过滤掉LOG4J输出的第一行那个时间戳日志信息,只收集JSON格式的日志信息

2 将收集的日志信息保存到HDFS上,每天的日志保存到以该天命名的目录下面,如2014-7-25号的日志,保存到/flume/events/14-07-25目录下面。

修改后的flume.conf如下:

[plain] view plaincopy

  1. tier1.sources=source1  
  2. tier1.channels=channel1  
  3. tier1.sinks=sink1  
  4.   
  5. tier1.sources.source1.type=avro  
  6. tier1.sources.source1.bind=0.0.0.0  
  7. tier1.sources.source1.port=44444  
  8. tier1.sources.source1.channels=channel1  
  9.   
  10. tier1.sources.source1.interceptors=i1 i2  
  11. tier1.sources.source1.interceptors.i1.type=regex_filter  
  12. tier1.sources.source1.interceptors.i1.regex=\\{.*\\}  
  13. tier1.sources.source1.interceptors.i2.type=timestamp  
  14.   
  15. tier1.channels.channel1.type=memory  
  16. tier1.channels.channel1.capacity=10000  
  17. tier1.channels.channel1.transactionCapacity=1000  
  18. tier1.channels.channel1.keep-alive=30  
  19.   
  20. tier1.sinks.sink1.type=hdfs  
  21. tier1.sinks.sink1.channel=channel1  
  22. tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d  
  23. tier1.sinks.sink1.hdfs.fileType=DataStream  
  24. tier1.sinks.sink1.hdfs.writeFormat=Text  
  25. tier1.sinks.sink1.hdfs.rollInterval=0  
  26. tier1.sinks.sink1.hdfs.rollSize=10240  
  27. tier1.sinks.sink1.hdfs.rollCount=0  
  28. tier1.sinks.sink1.hdfs.idleTimeout=60  

我们对source1添加了两个拦截器i1和i2,i1为regex_filter,过滤的正则为\\{.*\\},注意正则的写法用到了转义字符,不然source1无法启动,会报错。

i2为timestamp,在header中添加了一个timestamp的key,然后我们修改了sink1.hdfs.path在后面加上了/%y-%m-%d这一串字符,这一串字符要求event的header中必须有timestamp这个key,这就是为什么我们需要添加一个timestamp拦截器的原因,如果不添加这个拦截器,无法使用这样的占位符,会报错。还有很多占位符,请参考官方文档。

然后运行WriteLog,去hdfs上查看对应目录下面的文件,会发现内容只有JSON字符串的日志,与我们的功能描述一致。

时间: 2024-09-07 12:56:50

flume学习(四):Flume Interceptors的使用的相关文章

flume学习(五):Flume Channel Selectors使用

前几篇文章只有一个项目的日志,现在我们考虑多个项目的日志的收集,我拷贝了一份flumedemo项目,重命名为flumedemo2,添加了一个WriteLog2.java类,稍微改动了一下JSON字符串的输出,将以前requestUrl中的"reporter-api"改为了"image-api",以便和WriteLog类的输出稍微区分开来,如下: [java] view plaincopy package com.besttone.flume;      import

flume学习(一):log4j直接输出日志到flume

log4j.properties配置: log4j.rootLogger=INFOlog4j.category.com.besttone=INFO,flumelog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppenderlog4j.appender.flume.Hostname = localhostlog4j.appender.flume.Port = 44444 log4j.appender.flume.

阐述Flume OG到 Flume NG发生的革命性变化

但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,这点可以在 BigInsights http://www.aliyun.com/zixun/aggregation/11790.html">产品文档的 troubleshooting 板块发现.为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-

[20120509]IOT索引组织表相关信息的学习(四).txt

[20120509]IOT索引组织表相关信息的学习(四).txt 今天看了一个有关IOT的介绍:http://richardfoote.wordpress.com/2012/04/11/iot-secondary-indexes-primary-key-considerations-beauty-and-the-beast/     If we create a secondary index on a column that forms part of the PK, Oracle can b

艾伟:C#多线程学习(四) 多线程的自动管理(线程池)

本系列文章导航 C#多线程学习(一) 多线程的相关概念 C#多线程学习(二) 如何操纵一个线程 C#多线程学习(三) 生产者和消费者 C#多线程学习(四) 多线程的自动管理(线程池) C#多线程学习(五) 多线程的自动管理(定时器) C#多线程学习(六) 互斥对象 在多线程的程序中,经常会出现两种情况: 一种情况: 应用程序中,线程把大部分的时间花费在等待状态,等待某个事件发生,然后才能给予响应 这一般使用ThreadPool(线程池)来解决: 另一种情况:线程平时都处于休眠状态,只是周期性地被

flume学习(六):使用hive来分析flume收集的日志数据

前面已经讲过如何将log4j的日志输出到指定的hdfs目录,我们前面的指定目录为/flume/events. 如果想用hive来分析采集来的日志,我们可以将/flume/events下面的日志数据都load到hive中的表当中去. 如果了解hive的load data原理的话,还有一种更简便的方式,可以省去load data这一步,就是直接将sink1.hdfs.path指定为hive表的目录. 下面我将详细描述具体的操作步骤. 我们还是从需求驱动来讲解,前面我们采集的数据,都是接口的访问日志数

flume学习(七):如何使用event header中的key值

前面我们已经说到我们在header中添加了一个key为:flume.client.log4j.logger.source  ,然后有两个应用程序,一个设置为app1,一个设置为app2. 现在有这么一个需求,要将app1的日志输出到hdfs://master68:8020/flume/events/app1目录下面,app2的日志输出到hdfs://master68:8020/flume/events/app2目录下面,未来也可能有更多的应用程序的日志输出,也即每个程序的日志输出到各自自己的目录

flume学习(三):flume将log4j日志数据写入到hdfs

在第一篇文章中我们是将log4j的日志输出到了agent的日志文件当中.配置文件如下: [plain] view plaincopy tier1.sources=source1   tier1.channels=channel1   tier1.sinks=sink1      tier1.sources.source1.type=avro   tier1.sources.source1.bind=0.0.0.0   tier1.sources.source1.port=44444   tier

flume kafka-关于flume和kafka结合效率的问题

问题描述 关于flume和kafka结合效率的问题 最近做了个测试.是flume+kafka的.是读取文件夹的.31M的文件读了很长时间.大概20分钟.不知道什么原因.哪位大神知道啊.指导下. 下面是flume的配置 #agent section producer.sources = s producer.channels = c producer.sinks = r #source section #producer.sources.s.type = seq #producer.sources