flume(1)

根据flume的架构可以我们可以看出构造很简单
source-->channel-->sink



从一个agent练手
定义conf文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost#监听本机
a1.sources.r1.port = 44444#端口

# Describe the sink 表示以日志的形式输出
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory#管道类型为内存类型
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#这两句表示sources和sinks的连在同一channel c1上
flume-ng agent -c conf -f 上面写的配置文件 --name a1 -Dflume.root.logger=INFO,console

source 类型为netcat 启用netcat输入内容则相应的所启动的flume上也会输出内容

Source类型
Avro source
需要写的配置有
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Thrift Source
a1.sources.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Exec Source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure

JMS Source
a1.sources.r1.type = jms
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE

Spooling Directory Source
a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true

....

Sink

HDFS Sink

1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

Hive Sink
a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg

Logger Sink

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

Avro Sink
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

.....

Flume Channels

Memory Channel

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

JDBC Channel

a1.channels = c1
a1.channels.c1.type = jdbc

Kafka Channel
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

时间: 2024-09-01 07:29:50

flume(1)的相关文章

日志系统之Flume采集加morphline解析

概述 这段时间花了部分时间在处理消息总线跟日志的对接上.这里分享一下在日志采集和日志解析中遇到的一些问题和处理方案. 日志采集-flume logstash VS flume 首先谈谈我们在日志采集器上的选型.由于我们选择采用ElasticSearch作为日志的存储与搜索引擎.而基于ELK(ElasticSearch,Logstash,Kibana)的技术栈在日志系统方向又是如此流行,所以把Logstash列入考察对象也是顺理成章,Logstash在几大主流的日志收集器里算是后起之秀,被Elas

Kafka实战-Flume到Kafka

1.概述 前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据.下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览 下面开始今天的分享内容. 2.数据来源 Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到 Kafka(供实时计算处理)和HDFS(离线计算处理).关于Flume集群的Agent部署,这里就不多做赘述了,不清楚的同学可以参

大数据 flume ng-大数据:flume-ng启动报错

问题描述 大数据:flume-ng启动报错 flume-ng1.5.0启动报错java.lang.OutOfMemoryError: Direct buffer memory. flume-env.sh内存配置4G绝对足够了,请求解决方法 解决方案 同样的flume环境,部署到不同的linux机器上,有些机器正常,有些机器启动就报这个错误 解决方案二: 检查下jvm的配置,jvm有最大内存上限,尽管物理机器有更大的内存.

Flume(NG)架构设计要点及配置实践

Flume NG是一个分布式.可靠.可用的系统,它能够将不同数据源的海量日志数据进行高效收集.聚合.移动,最后存储到一个中心化数据存储系统中.由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本.经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡. 架构设计要点 Flume的架构主要有一下几个核心概念: Event:一个数据单元,带有一个可选的消息头 Flow:Even

flume到hdfs写入问题-flume采集数据到hdfs性能问题

问题描述 flume采集数据到hdfs性能问题 本人目前遇到flume采集写入hdfs性能等各种问题,大致如下.在10上的xx/xx目录下的数据进行读取 sink到08上的flume 由08上的flume写到07的hdfs上 30多m的文件写了好久.有时候会内存溢出等问题 Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 Describe/configure the source a1

IBM BigInsights Flume 轻松部署可扩展的实时日志收集系统

IBM BigInsights Flume 简介 Flume 是开源的海量日志收集系统,支持对日志的实时性收集.初始的 flume 版本是 flume OG(Flume original generation) 由 Cloudera 公司开发,叫做 Cloudera Flume:后来,cloudera 把 flume 贡献给 Apache,版本改为 FLUME NG(Flume next generation)现在称为 Apache Flume.最初始的 BigInsights 使用 flume

Flume:Instagram 客户端怎么样

  Flume是一款Mc平台的强大的Instagram 客户端,Instagram 无疑是当今最火的图片社交应用.虽然它在国内偶尔会访问有些困难,但这也丝毫不会降低大家对它的喜爱. Flume 的窗口不但可以自由伸缩,满足每个用户浏览照片的习惯,也提供了 Single 和 3×3 Grid 两种浏览视觉,单击照片进入详情页,双指向右滑动回到主列表,双击图片是 Like. Flume 的频道导航栏是隐藏在窗口底部的,默认状态 Flume 看上去就像是一个放图片的悬浮框,和 LilyView 非常相

修改Flume Log4j Appender

自定义Log4j Appender 要修改Flume Log4j Appender的实现,我们先了解一下Log4j Appender是如何自定义的. 自定义log4j appender需要继承log4j公共的基类:AppenderSkeleton 打印日志核心方法:abstract protected void append(LoggingEvent event); 初始化加载资源:public void activateOptions(),默认实现为空 释放资源:public void clo

基于Apache Flume Datahub插件将日志数据同步上云

本文用到的 阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps 简介 Apache Flume是一个分布式的.可靠的.可用的系统,可用于从不同的数据源中高效地收集.聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件.本文将介绍如何使用Apache Flume的Datahub Sink插件将日志数据实时上传到Datahub. 环境要求 JDK (1.7及以上,推荐1.7) Flume-NG 1.

log4j直接输出日志到flume

log4j直接输出日志到flume         此jar是由Cloudera的CDH发行版提供的一个工具类,通过配置,可以将log4j的日志直接输出到flume,方便日志的采集.         在CDH5.3.0版本中是:flume-ng-log4jappender-1.5.0-cdh5.3.0-jar-with-dependencies.jar         所在目录是:/opt/cloudera/parcels/CDH/lib/flume-ng/tools/ 具体使用示例 log4j