Apache Flume之正则过滤器

在当今的大数据世界中,应用程序产生大量的电子数据 – 这些巨大的电子数据存储库包含了有价值的、宝贵的信息。 对于人类分析师或领域专家,很难做出有趣的发现或寻找可以帮助决策过程的模式。 我们需要自动化的流程来有效地利用庞大的,信息丰富的数据进行规划和投资决策。 在处理数据之前,收集数据,聚合和转换数据是绝对必要的,并最终将数据移动到那些使用不同分析和数据挖掘工具的存储库中。

执行所有这些步骤的流行工具之一是Apache Flume。 这些数据通常是以事件或日志的形式存储。 Apache Flume有三个主要组件:

  • Source:数据源可以是企业服务器,文件系统,云端,数据存储库等。
  • Sink:Sink是可以存储数据的目标存储库。 它可以是一个集中的地方,如HDFS,像Apache Spark这样的处理引擎,或像ElasticSearch这样的数据存储库/搜索引擎。
  • Channel:在事件被sink消耗前由Channel 存储。 Channel 是被动存储。 Channel 支持故障恢复和高可靠性; Channel 示例是由本地文件系统和基于内存的Channel 支持的文件通道。

Flume是高度可配置的,并且支持许多源,channel,serializer和sink。它还支持数据流。 Flume的强大功能是拦截器,支持在运行中修改/删除事件的功能。支持的拦截器之一是regex_filter。

regex_filter将事件体解释为文本,并将其与提供的正则表达式进行对比,并基于匹配的模式和表达式,包括或排除事件。我们将详细看看regex_filter。

要求

从数据源中,我们以街道号,名称,城市和角色的形式获取数据。现在,数据源可能是实时流数据,也可能是任何其他来源。在本示例中,我已经使用Netcat服务作为侦听给定端口的源,并将每行文本转换为事件。要求以文本格式将数据保存到HDFS中。在将数据保存到HDFS之前,必须根据角色对数据进行过滤。只有经理的记录需要存储在HDFS中;其他角色的数据必须被忽略。例如,允许以下数据:


  1. 1,alok,mumbai,manager 
  2.  
  3. 2,jatin,chennai,manager  

下列的数据是不被允许的:


  1. 3,yogesh,kolkata,developer 
  2.  
  3. 5,jyotsana,pune,developer  

如何达到这个要求

可以通过使用 regex_filter 拦截器来实现。这个拦截器将根据规则基础来进行事件过滤,只有感兴趣的事件才会发送到对应的槽中,同时忽略其他的事件。


  1. ## Describe regex_filter interceptor and configure exclude events attribute 
  2.  
  3. a1.sources.r1.interceptors = i1 
  4.  
  5. a1.sources.r1.interceptors.i1.type = regex_filter 
  6.  
  7. a1.sources.r1.interceptors.i1.regex = developer 
  8.  
  9. a1.sources.r1.interceptors.i1.excludeEvents = true  

HDFS 槽允许数据存储在 HDFS 中,使用文本/序列格式。也可以使用压缩格式存储。


  1. a1.channels = c1 
  2.  
  3. a1.sinks = k1 
  4.  
  5. a1.sinks.k1.type = hdfs 
  6.  
  7. a1.sinks.k1.channel = c1 
  8.  
  9. ## assumption is that Hadoop is CDH 
  10.  
  11. a1.sinks.k1.hdfs.path = hdfs://quickstart.cloudera:8020/user/hive/warehouse/managers 
  12.  
  13. a1.sinks.k1.hdfs.fileType= DataStream 
  14.  
  15. a1.sinks.k1.hdfs.writeFormat = Text  

如何运行示例

首先,你需要 Hadoop 来让示例作为 HDFS 的槽来运行。如果你没有一个 Hadoop 集群,可以将槽改为日志,然后只需要启动 Flume。 在某个目录下存储 regex_filter_flume_conf.conf 文件然后使用如下命令运行代理。


  1. flume-ng agent --conf conf --conf-file regex_filter_flume_conf.conf --name a1 -Dflume.root.logger=INFO,console 

注意代理名称是 a1。我用了 Netcat 这个源。


  1. a1.sources.r1.type = netcat 
  2.  
  3. a1.sources.r1.bind = localhost 
  4.  
  5. a1.sources.r1.port = 44444  

一旦 Flume 代理启动,运行下面命令用来发送事件给 Flume。


  1. telnet localhost 40000 

现在我们只需要提供如下输入文本:


  1. 1,alok,mumbai,manager 
  2.  
  3. 2,jatin,chennai,manager 
  4.  
  5. 3,yogesh,kolkata,developer 
  6.  
  7. 4,ragini,delhi,manager 
  8.  
  9. 5,jyotsana,pune,developer 
  10.  
  11. 6,valmiki,banglore,manager  

访问 HDFS 你会观察到 HDFS 在 hdfs://quickstart.cloudera:8020/user/hive/warehouse/managers 下创建了一个文件,文件只包含经理的数据。

完整的 flume 配置 — regex_filter_flume_conf.conf — 如下:


  1. # Name the components on this agent 
  2.  
  3. a1.sources = r1 
  4.  
  5. a1.sinks = k1 
  6.  
  7. a1.channels = c1 
  8.  
  9. # Describe/configure the source - netcat 
  10.  
  11. a1.sources.r1.type = netcat 
  12.  
  13. a1.sources.r1.bind = localhost 
  14.  
  15. a1.sources.r1.port = 44444 
  16.  
  17. # Describe the HDFS sink 
  18.  
  19. a1.channels = c1 
  20.  
  21. a1.sinks = k1 
  22.  
  23. a1.sinks.k1.type = hdfs 
  24.  
  25. a1.sinks.k1.channel = c1 
  26.  
  27. a1.sinks.k1.hdfs.path = hdfs://quickstart.cloudera:8020/user/hive/warehouse/managers 
  28.  
  29. a1.sinks.k1.hdfs.fileType= DataStream 
  30.  
  31. a1.sinks.k1.hdfs.writeFormat = Text 
  32.  
  33. ## Describe regex_filter interceptor and configure exclude events attribute 
  34.  
  35. a1.sources.r1.interceptors = i1 
  36.  
  37. a1.sources.r1.interceptors.i1.type = regex_filter 
  38.  
  39. a1.sources.r1.interceptors.i1.regex = developer 
  40.  
  41. a1.sources.r1.interceptors.i1.excludeEvents = true 
  42.  
  43. # Use a channel which buffers events in memory 
  44.  
  45. a1.channels.c1.type = memory 
  46.  
  47. a1.channels.c1.capacity = 1000 
  48.  
  49. a1.channels.c1.transactionCapacity = 100 
  50.  
  51. # Bind the source and sink to the channel 
  52.  
  53. a1.sources.r1.channels = c1 
  54.  
  55. a1.sinks.k1.channel = c1  

本文作者:佚名

来源:51CTO

时间: 2024-11-05 14:51:49

Apache Flume之正则过滤器的相关文章

基于Apache Flume Datahub插件将日志数据同步上云

本文用到的 阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps 简介 Apache Flume是一个分布式的.可靠的.可用的系统,可用于从不同的数据源中高效地收集.聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件.本文将介绍如何使用Apache Flume的Datahub Sink插件将日志数据实时上传到Datahub. 环境要求 JDK (1.7及以上,推荐1.7) Flume-NG 1.

Apache Flume - File通道设计

原文链接:https://blogs.apache.org/flume/entry/apache_flume_filechannel 说明:翻译在尽量符合原文表达的基础上,尽量保证行文流畅.水平有限,请多指正! 这篇文章是关于Flume FileChannel的.Flume是为高效收集聚合大量日志数据设计的可靠的.可用的分布式系统.它有一个基于流式数据流的简单灵活的体系.它提供了可控的可靠机制和许多故障转移与恢复机制.它使用了一个用于在线分析应用的简单可扩展的数据模型. FileChannel是

使用Apache Flume抓取数据(1)

使用Apache Flume抓取数据,怎么来抓取呢?不过,在了解这个问题之前,我们必须明确ApacheFlume是什么? 一.什么是Apache Flume Apache Flume是用于数据采集的高性能系统 ,名字来源于原始的近乎实时的日志数据采集工具,现在广泛用于任何流事件数据的采集,支持从很多数据源聚合数据到HDFS. 最初由Cloudera开发 ,在2011年贡献给了Apache基金会 ,在2012年变成了Apache的顶级项目,Flume OG升级换代成了Flume NG. Flume

99.12. Apache Flume

http://flume.apache.org/ Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and

分布式日志收集系统Apache Flume的设计介绍

概述 Flume是Cloudera公司的一款高性能.高可能的分布式日志收集系统.现在已经是Apache Top项目.Github地址.同Flume相似的日志收集系统还有Facebook Scribe,Apache Chuwka,Apache Kafka(也是LinkedIn的).Flume是后起之秀,本文尝试简要分析Flume数据流通过程中提供的组件.可靠性保证来介绍Flume的主要设计,不涉及Flume具体的安装使用,也不涉及代码层面的剖析.写博文来记录这个工具主要是觉得与最近开发的一个流式的

Apache Shiro内置过滤器

shiro内置过滤器研究   anon org.apache.shiro.web.filter.authc.AnonymousFilter authc org.apache.shiro.web.filter.authc.FormAuthenticationFilter authcBasic org.apache.shiro.web.filter.authc.BasicHttpAuthenticationFilter perms org.apache.shiro.web.filter.authz.

Kafka实战-Flume到Kafka

1.概述 前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据.下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览 下面开始今天的分享内容. 2.数据来源 Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到 Kafka(供实时计算处理)和HDFS(离线计算处理).关于Flume集群的Agent部署,这里就不多做赘述了,不清楚的同学可以参

Flume(NG)架构设计要点及配置实践

Flume NG是一个分布式.可靠.可用的系统,它能够将不同数据源的海量日志数据进行高效收集.聚合.移动,最后存储到一个中心化数据存储系统中.由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本.经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡. 架构设计要点 Flume的架构主要有一下几个核心概念: Event:一个数据单元,带有一个可选的消息头 Flow:Even

IBM BigInsights Flume 轻松部署可扩展的实时日志收集系统

IBM BigInsights Flume 简介 Flume 是开源的海量日志收集系统,支持对日志的实时性收集.初始的 flume 版本是 flume OG(Flume original generation) 由 Cloudera 公司开发,叫做 Cloudera Flume:后来,cloudera 把 flume 贡献给 Apache,版本改为 FLUME NG(Flume next generation)现在称为 Apache Flume.最初始的 BigInsights 使用 flume