Flume1.5.0的安装、部署、简单应用(含伪分布式、与hadoop2.2.0、hbase0.96的案例) … …

—————————————

博文作者迦壹

博客地址http://idoall.org/home.php?mod=space&uid=1&do=blog&id=550

转载声明可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明谢谢合作

—————————————

目录

一、什么是Flume?

1)flume的特点

2)flume的可靠性

3)flume的可恢复性

4)flume 的 一些核心概念

二、flume的官方网站在哪里

三、在哪里下载

四、如何安装

五、flume的案例

1)案例1Avro

2)案例2Spool

3)案例3Exec

4)案例4Syslogtcp

5)案例5JSONHandler

6)案例6Hadoop sink

7)案例7File Roll Sink

8)案例8Replicating Channel Selector

9)案例9Multiplexing Channel Selector

10)案例10Flume Sink Processors

11)案例11Load balancing Sink Processor

12)案例12Hbase sink

一、什么是Flume?

flume 作为 cloudera 开发的实时日志收集系统受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OGoriginal generation属于 cloudera。但随着 FLume 功能的扩展Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来尤其是在 Flume OG 的最后一个发行版本 0.94.0 中日志传输不稳定的现象尤为严重为了解决这些问题2011 年 10 月 22 号cloudera 完成了 Flume-728对 Flume 进行了里程碑式的改动重构核心组件、核心配置以及代码架构重构后的版本统称为 Flume NGnext generation改动的另一原因是将 Flume 纳入 apache 旗下cloudera Flume 改名为 Apache Flume。

flume的特点

flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方用于收集数据;同时Flume提供对数据进行简单处理并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位它携带日志数据(字节数组形式)并且携带有头信息这些Event由Agent外部的Source生成当Source捕获事件后会进行特定的格式化然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

flume的可靠性 

当节点出现故障时日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障从强到弱依次分别为end-to-end收到数据agent首先将event写到磁盘上当数据传送成功后再删除如果数据发送失败可以重新发送。Store on failure这也是scribe采用的策略当数据接收方crash时将数据写到本地待恢复后继续发送Besteffort数据发送到接收方后不会进行确认。

flume的可恢复性

还是靠Channel。推荐使用FileChannel事件持久化在本地文件系统里(性能较差)。 

flume的一些核心概念

  1. Agent 使用JVM 运行Flume。每台机器运行一个agent但是可以在一个agent中包含多个sources和sinks。
  2. Client 生产数据运行在一个独立的线程。
  3. Source 从Client收集数据传递给Channel。
  4. Sink 从Channel收集数据运行在一个独立线程。
  5. Channel 连接 sources 和 sinks 这个有点像一个队列。
  6. Events 可以是日志记录、 avro 对象等。

Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成

值得注意的是Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件非常灵活。比如Channel可以把事件暂存在内存里也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase甚至是另外一个Source等等。Flume支持用户建立多级流也就是说多个agent可以协同工作并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes这也正是NB之处如下图所示:

二、flume的官方网站在哪里
http://flume.apache.org/


三、在哪里下载

http://www.apache.org/dyn/closer.cgi/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz


四、如何安装
1)将下载的flume包解压到/home/hadoop目录中你就已经完成了50%简单吧

2)修改 flume-env.sh 配置文件,主要是JAVA_HOME变量设置


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

root@m1:/home/hadoop/flume-1.5.0-bin# cp conf/flume-env.sh.template conf/flume-env.sh

root@m1:/home/hadoop/flume-1.5.0-bin# vi conf/flume-env.sh

# Licensed to the Apache Software Foundation (ASF) under one

# or more contributor license agreements.  See the NOTICE file

# distributed with this work for additional information

# regarding copyright ownership.  The ASF licenses this file

# to you under the Apache License, Version 2.0 (the

# “License”); you may not use this file except in compliance

# with the License.  You may obtain a copy of the License at

#

#     http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an “AS IS” BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

 

# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced

# during Flume startup.

 

# Enviroment variables can be set here.

 

JAVA_HOME=/usr/lib/jvm/java-7-oracle

 

# Give Flume more memory and pre-allocate, enable remote monitoring via JMX

#JAVA_OPTS=”-Xms100m -Xmx200m -Dcom.sun.management.jmxremote”

 

# Note that the Flume conf directory is always included in the classpath.

#FLUME_CLASSPATH=””

3)验证是否安装成功


1

2

3

4

5

6

7

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng version

Flume 1.5.0

Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git

Revision: 8633220df808c4cd0c13d1cf0320454a94f1ea97

Compiled by hshreedharan on Wed May  7 14:49:18 PDT 2014

Fromsourcewith checksum a01fe726e4380ba0c9f7a7d222db961f

root@m1:/home/hadoop#

出现上面的信息表示安装成功了

五、flume的案例

1)案例1Avro

Avro可以发送一个给定的文件给FlumeAvro 源使用AVRO RPC机制。

a)创建agent配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

root@m1:/home/hadoop#vi /home/hadoop/flume-1.5.0-bin/conf/avro.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

 

# Describe the sink

a1.sinks.k1.type= logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)启动flume agent a1


1

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

c)创建指定文件


1

root@m1:/home/hadoop# echo “hello world” > /home/hadoop/flume-1.5.0-bin/log.00

d)使用avro-client发送文件


1

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng avro-client -c . -H m1 -p 4141 -F /home/hadoop/flume-1.5.0-bin/log.00

f)在m1的控制台可以看到以下信息注意最后一行


1

2

3

4

5

6

7

8

9

10

root@m1:/home/hadoop/flume-1.5.0-bin/conf# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

Info: Sourcing environment configuration script/home/hadoop/flume-1.5.0-bin/conf/flume-env.sh

Info: Including Hadoop libraries found via (/home/hadoop/hadoop-2.2.0/bin/hadoop)forHDFS access

Info: Excluding/home/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-api-1.7.5.jar from classpath

Info: Excluding/home/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar from classpath

2014-08-10 10:43:25,112 (New I/O worker#1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x92464c4f, /192.168.1.50:59850 :> /192.168.1.50:4141] UNBOUND

2014-08-10 10:43:25,112 (New I/O worker#1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x92464c4f, /192.168.1.50:59850 :> /192.168.1.50:4141] CLOSED

2014-08-10 10:43:25,112 (New I/O worker#1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /192.168.1.50:59850 disconnected.

2014-08-10 10:43:26,718 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }


2)案例2Spool

Spool监测配置的目录下新增的文件并将文件中的数据读取出来。需要注意两点

1) 拷贝到spool目录下的文件不可以再打开编辑。

2) spool目录下不可包含相应的子目录

a)创建agent配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/spool.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type= spooldir

a1.sources.r1.channels = c1

a1.sources.r1.spoolDir =/home/hadoop/flume-1.5.0-bin/logs

a1.sources.r1.fileHeader =true

 

# Describe the sink

a1.sinks.k1.type= logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)启动flume agent a1


1

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console

c)追加文件到/home/hadoop/flume-1.5.0-bin/logs目录


1

root@m1:/home/hadoop# echo “spool test1″ > /home/hadoop/flume-1.5.0-bin/logs/spool_text.log

d)在m1的控制台可以看到以下相关信息


1

2

3

4

5

6

7

8

9

10

11

14/08/1011:37:13 INFOsource.SpoolDirectorySource: Spooling Directory Source runner hasshutdown.

14/08/1011:37:13 INFOsource.SpoolDirectorySource: Spooling Directory Source runner hasshutdown.

14/08/1011:37:14 INFO avro.ReliableSpoolingFileEventReader: Preparing to movefile/home/hadoop/flume-1.5.0-bin/logs/spool_text.log to/home/hadoop/flume-1.5.0-bin/logs/spool_text.log.COMPLETED

14/08/1011:37:14 INFOsource.SpoolDirectorySource: Spooling Directory Source runner hasshutdown.

14/08/1011:37:14 INFOsource.SpoolDirectorySource: Spooling Directory Source runner hasshutdown.

14/08/1011:37:14 INFO sink.LoggerSink: Event: { headers:{file=/home/hadoop/flume-1.5.0-bin/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31                spool test1 }

14/08/1011:37:15 INFOsource.SpoolDirectorySource: Spooling Directory Source runner hasshutdown.

14/08/1011:37:15 INFOsource.SpoolDirectorySource: Spooling Directory Source runner hasshutdown.

14/08/1011:37:16 INFOsource.SpoolDirectorySource: Spooling Directory Source runner hasshutdown.

14/08/1011:37:16 INFOsource.SpoolDirectorySource: Spooling Directory Source runner hasshutdown.

14/08/1011:37:17 INFOsource.SpoolDirectorySource: Spooling Directory Source runner hasshutdown.


3)案例3Exec

EXEC执行一个给定的命令获得输出的源,如果要使用tail命令必选使得file足够大才能看到输出内容


a)创建agent配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type=exec

a1.sources.r1.channels = c1

a1.sources.r1.command=tail-F/home/hadoop/flume-1.5.0-bin/log_exec_tail

 

# Describe the sink

a1.sinks.k1.type= logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)启动flume agent a1


1

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console

c)生成足够多的内容在文件里


1

root@m1:/home/hadoop# for i in {1..100};do echo “exec tail$i” >> /home/hadoop/flume-1.5.0-bin/log_exec_tail;echo $i;sleep 0.1;done

e)在m1的控制台可以看到以下信息


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

2014-08-10 10:59:25,513 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 74 65 73 74      exectailtest}

2014-08-10 10:59:34,535 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 74 65 73 74      exectailtest}

2014-08-10 11:01:40,557 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 31                  exectail1 }

2014-08-10 11:01:41,180 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 32                  exectail2 }

2014-08-10 11:01:41,180 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 33                  exectail3 }

2014-08-10 11:01:41,181 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 34                  exectail4 }

2014-08-10 11:01:41,181 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 35                  exectail5 }

2014-08-10 11:01:41,181 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 36                  exectail6 }

….

….

….

2014-08-10 11:01:51,550 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 36               exectail96 }

2014-08-10 11:01:51,550 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 37               exectail97 }

2014-08-10 11:01:51,551 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 38               exectail98 }

2014-08-10 11:01:51,551 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 39               exectail99 }

2014-08-10 11:01:51,551 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 31 30 30            exectail100 }


4)案例4Syslogtcp

Syslogtcp监听TCP的端口做为数据源

a)创建agent配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

 

# Describe the sink

a1.sinks.k1.type= logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)启动flume agent a1


1

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console

c)测试产生syslog


1

root@m1:/home/hadoop# echo “hello idoall.org syslog” | nc localhost 5140

d)在m1的控制台可以看到以下信息


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

14/08/1011:41:45 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configurationfile:/home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf

14/08/1011:41:45 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1

14/08/1011:41:45 INFO conf.FlumeConfiguration: Processing:k1

14/08/1011:41:45 INFO conf.FlumeConfiguration: Processing:k1

14/08/1011:41:45 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configurationforagents: [a1]

14/08/1011:41:45 INFO node.AbstractConfigurationProvider: Creating channels

14/08/1011:41:45 INFO channel.DefaultChannelFactory: Creating instance of channel c1typememory

14/08/1011:41:45 INFO node.AbstractConfigurationProvider: Created channel c1

14/08/1011:41:45 INFOsource.DefaultSourceFactory: Creating instance ofsourcer1,typesyslogtcp

14/08/1011:41:45 INFO sink.DefaultSinkFactory: Creating instance of sink: k1,type: logger

14/08/1011:41:45 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]

14/08/1011:41:45 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: {source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@6538b14 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }

14/08/1011:41:45 INFO node.Application: Starting Channel c1

14/08/1011:41:45 INFO instrumentation.MonitoredCounterGroup: Monitored counter groupfortype: CHANNEL, name: c1: Successfully registered new MBean.

14/08/1011:41:45 INFO instrumentation.MonitoredCounterGroup: Componenttype: CHANNEL, name: c1 started

14/08/1011:41:45 INFO node.Application: Starting Sink k1

14/08/1011:41:45 INFO node.Application: Starting Source r1

14/08/1011:41:45 INFOsource.SyslogTcpSource: Syslog TCP Source starting…

14/08/1011:42:15 WARNsource.SyslogUtils: Event created from Invalid Syslog data.

14/08/1011:42:15 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }


5)案例5JSONHandler

a)创建agent配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/post_json.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type= org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 8888

a1.sources.r1.channels = c1

 

# Describe the sink

a1.sinks.k1.type= logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)启动flume agent a1


1

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console

c)生成JSON 格式的POST request


1

root@m1:/home/hadoop# curl -X POST -d ‘[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]‘ http://localhost:8888

d)在m1的控制台可以看到以下信息


1

2

3

4

5

6

7

8

9

10

11

14/08/1011:49:59 INFO node.Application: Starting Channel c1

14/08/1011:49:59 INFO instrumentation.MonitoredCounterGroup: Monitored counter groupfortype: CHANNEL, name: c1: Successfully registered new MBean.

14/08/1011:49:59 INFO instrumentation.MonitoredCounterGroup: Componenttype: CHANNEL, name: c1 started

14/08/1011:49:59 INFO node.Application: Starting Sink k1

14/08/1011:49:59 INFO node.Application: Starting Source r1

14/08/1011:49:59 INFO mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog

14/08/1011:49:59 INFO mortbay.log: jetty-6.1.26

14/08/1011:50:00 INFO mortbay.log: Started SelectChannelConnector@0.0.0.0:8888

14/08/1011:50:00 INFO instrumentation.MonitoredCounterGroup: Monitored counter groupfortype: SOURCE, name: r1: Successfully registered new MBean.

14/08/1011:50:00 INFO instrumentation.MonitoredCounterGroup: Componenttype: SOURCE, name: r1 started

14/08/1012:14:32 INFO sink.LoggerSink: Event: { headers:{b=b1, a=a1} body: 69 64 6F 61 6C 6C 2E 6F 72 67 5F 62 6F 64 79    idoall.org_body }


6)案例6Hadoop sink

其中关于hadoop2.2.0部分的安装部署请参考文章《ubuntu12.04+hadoop2.2.0+zookeeper3.4.5+hbase0.96.2+hive0.13.1分布式环境部署

a)创建agent配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

 

# Describe the sink

a1.sinks.k1.type= hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://m1:9000/user/flume/syslogtcp

a1.sinks.k1.hdfs.filePrefix = Syslog

a1.sinks.k1.hdfs.round =true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)启动flume agent a1


1

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console

c)测试产生syslog


1

root@m1:/home/hadoop# echo “hello idoall flume -> hadoop testing one” | nc localhost 5140

d)在m1的控制台可以看到以下信息


1

2

3

4

5

6

7

8

9

10

11

12

13

14

14/08/1012:20:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter groupfortype: CHANNEL, name: c1: Successfully registered new MBean.

14/08/1012:20:39 INFO instrumentation.MonitoredCounterGroup: Componenttype: CHANNEL, name: c1 started

14/08/1012:20:39 INFO node.Application: Starting Sink k1

14/08/1012:20:39 INFO node.Application: Starting Source r1

14/08/1012:20:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter groupfortype: SINK, name: k1: Successfully registered new MBean.

14/08/1012:20:39 INFO instrumentation.MonitoredCounterGroup: Componenttype: SINK, name: k1 started

14/08/1012:20:39 INFOsource.SyslogTcpSource: Syslog TCP Source starting…

14/08/1012:21:46 WARNsource.SyslogUtils: Event created from Invalid Syslog data.

14/08/1012:21:49 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem =false

14/08/1012:21:49 INFO hdfs.BucketWriter: Creating hdfs://m1:9000/user/flume/syslogtcp//Syslog.1407644509504.tmp

14/08/1012:22:20 INFO hdfs.BucketWriter: Closing hdfs://m1:9000/user/flume/syslogtcp//Syslog.1407644509504.tmp

14/08/1012:22:20 INFO hdfs.BucketWriter: Close tries incremented

14/08/1012:22:20 INFO hdfs.BucketWriter: Renaming hdfs://m1:9000/user/flume/syslogtcp/Syslog.1407644509504.tmp to hdfs://m1:9000/user/flume/syslogtcp/Syslog.1407644509504

14/08/1012:22:20 INFO hdfs.HDFSEventSink: Writer callback called.

e)在m1上再打开一个窗口去hadoop上检查文件是否生成


1

2

3

4

5

root@m1:/home/hadoop# /home/hadoop/hadoop-2.2.0/bin/hadoop fs -ls /user/flume/syslogtcp

Found 1 items

-rw-r–r–   3 root supergroup        155 2014-08-10 12:22/user/flume/syslogtcp/Syslog.1407644509504

root@m1:/home/hadoop# /home/hadoop/hadoop-2.2.0/bin/hadoop fs -cat /user/flume/syslogtcp/Syslog.1407644509504

SEQ!org.apache.hadoop.io.LongWritable”org.apache.hadoop.io.BytesWritable^;>Gv$hello idoall flume -> hadoop testing one

7)案例7File Roll Sink

a)创建agent配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5555

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

 

# Describe the sink

a1.sinks.k1.type= file_roll

a1.sinks.k1.sink.directory =/home/hadoop/flume-1.5.0-bin/logs

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)启动flume agent a1


1

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console

c)测试产生log


1

2

root@m1:/home/hadoop# echo “hello idoall.org syslog” | nc localhost 5555

root@m1:/home/hadoop# echo “hello idoall.org syslog 2″ | nc localhost 5555

d)查看/home/hadoop/flume-1.5.0-bin/logs下是否生成文件,默认每30秒生成一个新文件


1

2

3

4

5

6

7

8

9

10

root@m1:/home/hadoop# ll /home/hadoop/flume-1.5.0-bin/logs

总用量 272

drwxr-xr-x 3 root root   4096 Aug 10 12:50 ./

drwxr-xr-x 9 root root   4096 Aug 10 10:59 ../

-rw-r–r– 1 root root     50 Aug 10 12:49 1407646164782-1

-rw-r–r– 1 root root      0 Aug 10 12:49 1407646164782-2

-rw-r–r– 1 root root      0 Aug 10 12:50 1407646164782-3

root@m1:/home/hadoop# cat /home/hadoop/flume-1.5.0-bin/logs/1407646164782-1 /home/hadoop/flume-1.5.0-bin/logs/1407646164782-2

hello idoall.org syslog

hello idoall.org syslog 2


8)案例8Replicating Channel Selector

Flume支持Fan out流从一个源到多个通道。有两种模式的Fan out分别是复制和复用。在复制的情况下流的事件被发送到所有的配置通道。在复用的情况下事件被发送到可用的渠道中的一个子集。Fan out流需要指定源和Fan out通道的规则。


这次我们需要用到m1,m2两台机器


a)在m1创建replicating_Channel_Selector配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf

 

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

 

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.type= replicating

 

# Describe the sink

a1.sinks.k1.type= avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname= m1

a1.sinks.k1.port = 5555

 

a1.sinks.k2.type= avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname= m2

a1.sinks.k2.port = 5555

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

a1.channels.c2.type= memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

b)在m1创建replicating_Channel_Selector_avro配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5555

 

# Describe the sink

a1.sinks.k1.type= logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

c)在m1上将2个配置文件复制到m2上一份


1

2

root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf

root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf<br>

d)打开4个窗口在m1和m2上同时启动两个flume agent


1

2

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console

e)然后在m1或m2的任意一台机器上测试产生syslog


1

root@m1:/home/hadoop# echo “hello idoall.org syslog” | nc localhost 5140

f)在m1和m2的sink窗口分别可以看到以下信息,这说明信息得到了同步


1

2

3

4

5

6

7

8

14/08/1014:08:18 INFO ipc.NettyServer: Connection to/192.168.1.51:46844 disconnected.

14/08/1014:08:52 INFO ipc.NettyServer: [id: 0x90f8fe1f,/192.168.1.50:35873 =>/192.168.1.50:5555] OPEN

14/08/1014:08:52 INFO ipc.NettyServer: [id: 0x90f8fe1f,/192.168.1.50:35873 =>/192.168.1.50:5555] BOUND:/192.168.1.50:5555

14/08/1014:08:52 INFO ipc.NettyServer: [id: 0x90f8fe1f,/192.168.1.50:35873 =>/192.168.1.50:5555] CONNECTED:/192.168.1.50:35873

14/08/1014:08:59 INFO ipc.NettyServer: [id: 0xd6318635,/192.168.1.51:46858 =>/192.168.1.50:5555] OPEN

14/08/1014:08:59 INFO ipc.NettyServer: [id: 0xd6318635,/192.168.1.51:46858 =>/192.168.1.50:5555] BOUND:/192.168.1.50:5555

14/08/1014:08:59 INFO ipc.NettyServer: [id: 0xd6318635,/192.168.1.51:46858 =>/192.168.1.50:5555] CONNECTED:/192.168.1.51:46858

14/08/1014:09:20 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }


9)案例9Multiplexing Channel Selector

a)在m1创建Multiplexing_Channel_Selector配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf

 

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

 

# Describe/configure the source

a1.sources.r1.type= org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.type= multiplexing

 

a1.sources.r1.selector.header =type

#映射允许每个值通道可以重叠。默认值可以包含任意数量的通道。

a1.sources.r1.selector.mapping.baidu = c1

a1.sources.r1.selector.mapping.ali = c2

a1.sources.r1.selector.default = c1

 

# Describe the sink

a1.sinks.k1.type= avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname= m1

a1.sinks.k1.port = 5555

 

a1.sinks.k2.type= avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname= m2

a1.sinks.k2.port = 5555

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

a1.channels.c2.type= memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

b)在m1创建Multiplexing_Channel_Selector_avro配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5555

 

# Describe the sink

a1.sinks.k1.type= logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

c)将2个配置文件复制到m2上一份


1

2

root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf  root@m2:/home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf

root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf

d)打开4个窗口在m1和m2上同时启动两个flume agent


1

2

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console

e)然后在m1或m2的任意一台机器上测试产生syslog


1

root@m1:/home/hadoop# curl -X POST -d ‘[{ "headers" :{"type" : "baidu"},"body" : "idoall_TEST1"}]‘ http://localhost:5140 && curl -X POST -d ‘[{ "headers" :{"type" : "ali"},"body" : "idoall_TEST2"}]‘ http://localhost:5140 && curl -X POST -d ‘[{ "headers" :{"type" : "qq"},"body" : "idoall_TEST3"}]‘ http://localhost:5140

f)在m1的sink窗口可以看到以下信息


1

2

3

4

5

6

7

8

9

10

11

12

13

14

14/08/1014:32:21 INFO node.Application: Starting Sink k1

14/08/1014:32:21 INFO node.Application: Starting Source r1

14/08/1014:32:21 INFOsource.AvroSource: Starting Avrosourcer1: { bindAddress: 0.0.0.0, port: 5555 }…

14/08/1014:32:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter groupfortype: SOURCE, name: r1: Successfully registered new MBean.

14/08/1014:32:21 INFO instrumentation.MonitoredCounterGroup: Componenttype: SOURCE, name: r1 started

14/08/1014:32:21 INFOsource.AvroSource: Avrosourcer1 started.

14/08/1014:32:36 INFO ipc.NettyServer: [id: 0xcf00eea6,/192.168.1.50:35916 =>/192.168.1.50:5555] OPEN

14/08/1014:32:36 INFO ipc.NettyServer: [id: 0xcf00eea6,/192.168.1.50:35916 =>/192.168.1.50:5555] BOUND:/192.168.1.50:5555

14/08/1014:32:36 INFO ipc.NettyServer: [id: 0xcf00eea6,/192.168.1.50:35916 =>/192.168.1.50:5555] CONNECTED:/192.168.1.50:35916

14/08/1014:32:44 INFO ipc.NettyServer: [id: 0x432f5468,/192.168.1.51:46945 =>/192.168.1.50:5555] OPEN

14/08/1014:32:44 INFO ipc.NettyServer: [id: 0x432f5468,/192.168.1.51:46945 =>/192.168.1.50:5555] BOUND:/192.168.1.50:5555

14/08/1014:32:44 INFO ipc.NettyServer: [id: 0x432f5468,/192.168.1.51:46945 =>/192.168.1.50:5555] CONNECTED:/192.168.1.51:46945

14/08/1014:34:11 INFO sink.LoggerSink: Event: { headers:{type=baidu} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 31             idoall_TEST1 }

14/08/1014:34:57 INFO sink.LoggerSink: Event: { headers:{type=qq} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 33             idoall_TEST3 }

g)在m2的sink窗口可以看到以下信息


1

2

3

4

5

6

7

8

9

10

11

12

13

14/08/1014:32:27 INFO node.Application: Starting Sink k1

14/08/1014:32:27 INFO node.Application: Starting Source r1

14/08/1014:32:27 INFOsource.AvroSource: Starting Avrosourcer1: { bindAddress: 0.0.0.0, port: 5555 }…

14/08/1014:32:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter groupfortype: SOURCE, name: r1: Successfully registered new MBean.

14/08/1014:32:27 INFO instrumentation.MonitoredCounterGroup: Componenttype: SOURCE, name: r1 started

14/08/1014:32:27 INFOsource.AvroSource: Avrosourcer1 started.

14/08/1014:32:36 INFO ipc.NettyServer: [id: 0x7c2f0aec,/192.168.1.50:38104 =>/192.168.1.51:5555] OPEN

14/08/1014:32:36 INFO ipc.NettyServer: [id: 0x7c2f0aec,/192.168.1.50:38104 =>/192.168.1.51:5555] BOUND:/192.168.1.51:5555

14/08/1014:32:36 INFO ipc.NettyServer: [id: 0x7c2f0aec,/192.168.1.50:38104 =>/192.168.1.51:5555] CONNECTED:/192.168.1.50:38104

14/08/1014:32:44 INFO ipc.NettyServer: [id: 0x3d36f553,/192.168.1.51:48599 =>/192.168.1.51:5555] OPEN

14/08/1014:32:44 INFO ipc.NettyServer: [id: 0x3d36f553,/192.168.1.51:48599 =>/192.168.1.51:5555] BOUND:/192.168.1.51:5555

14/08/1014:32:44 INFO ipc.NettyServer: [id: 0x3d36f553,/192.168.1.51:48599 =>/192.168.1.51:5555] CONNECTED:/192.168.1.51:48599

14/08/1014:34:33 INFO sink.LoggerSink: Event: { headers:{type=ali} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 32             idoall_TEST2 }

可以看到根据header中不同的条件分布到不同的channel上


10)案例10Flume Sink Processors

failover的机器是一直发送给其中一个sink当这个sink不可用的时候自动发送到下一个sink。

a)在m1创建Flume_Sink_Processors配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf

 

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

 

#这个是配置failover的关键需要有一个sink group

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

#处理的类型是failover

a1.sinkgroups.g1.processor.type= failover

#优先级数字越大优先级越高每个sink的优先级必须不相同

a1.sinkgroups.g1.processor.priority.k1 = 5

a1.sinkgroups.g1.processor.priority.k2 = 10

#设置为10秒当然可以根据你的实际状况更改成更快或者很慢

a1.sinkgroups.g1.processor.maxpenalty = 10000

 

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.type= replicating

 

 

# Describe the sink

a1.sinks.k1.type= avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname= m1

a1.sinks.k1.port = 5555

 

a1.sinks.k2.type= avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname= m2

a1.sinks.k2.port = 5555

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

a1.channels.c2.type= memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

b)在m1创建Flume_Sink_Processors_avro配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5555

 

# Describe the sink

a1.sinks.k1.type= logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

c)将2个配置文件复制到m2上一份


1

2

root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf  root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf

root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf

d)打开4个窗口在m1和m2上同时启动两个flume agent


1

2

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console

e)然后在m1或m2的任意一台机器上测试产生log


1

root@m1:/home/hadoop# echo “idoall.org test1 failover” | nc localhost 5140

f)因为m2的优先级高所以在m2的sink窗口可以看到以下信息而m1没有


1

2

3

4

5

14/08/1015:02:46 INFO ipc.NettyServer: Connection to/192.168.1.51:48692 disconnected.

14/08/1015:03:12 INFO ipc.NettyServer: [id: 0x09a14036,/192.168.1.51:48704 =>/192.168.1.51:5555] OPEN

14/08/1015:03:12 INFO ipc.NettyServer: [id: 0x09a14036,/192.168.1.51:48704 =>/192.168.1.51:5555] BOUND:/192.168.1.51:5555

14/08/1015:03:12 INFO ipc.NettyServer: [id: 0x09a14036,/192.168.1.51:48704 =>/192.168.1.51:5555] CONNECTED:/192.168.1.51:48704

14/08/1015:03:26 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }

g)这时我们停止掉m2机器上的sink(ctrl+c)再次输出测试数据


1

root@m1:/home/hadoop# echo “idoall.org test2 failover” | nc localhost 5140

h)可以在m1的sink窗口看到读取到了刚才发送的两条测试数据


1

2

3

4

5

6

14/08/1015:02:46 INFO ipc.NettyServer: Connection to/192.168.1.51:47036 disconnected.

14/08/1015:03:12 INFO ipc.NettyServer: [id: 0xbcf79851,/192.168.1.51:47048 =>/192.168.1.50:5555] OPEN

14/08/1015:03:12 INFO ipc.NettyServer: [id: 0xbcf79851,/192.168.1.51:47048 =>/192.168.1.50:5555] BOUND:/192.168.1.50:5555

14/08/1015:03:12 INFO ipc.NettyServer: [id: 0xbcf79851,/192.168.1.51:47048 =>/192.168.1.50:5555] CONNECTED:/192.168.1.51:47048

14/08/1015:07:56 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }

14/08/1015:07:56 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }

i)我们再在m2的sink窗口中启动sink


1

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

j)输入两批测试数据


1

root@m1:/home/hadoop# echo “idoall.org test3 failover” | nc localhost 5140 && echo “idoall.org test4 failover” | nc localhost 5140

k)在m2的sink窗口我们可以看到以下信息因为优先级的关系log消息会再次落到m2上


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

14/08/1015:09:47 INFO node.Application: Starting Sink k1

14/08/1015:09:47 INFO node.Application: Starting Source r1

14/08/1015:09:47 INFOsource.AvroSource: Starting Avrosourcer1: { bindAddress: 0.0.0.0, port: 5555 }…

14/08/1015:09:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter groupfortype: SOURCE, name: r1: Successfully registered new MBean.

14/08/1015:09:47 INFO instrumentation.MonitoredCounterGroup: Componenttype: SOURCE, name: r1 started

14/08/1015:09:47 INFOsource.AvroSource: Avrosourcer1 started.

14/08/1015:09:54 INFO ipc.NettyServer: [id: 0x96615732,/192.168.1.51:48741 =>/192.168.1.51:5555] OPEN

14/08/1015:09:54 INFO ipc.NettyServer: [id: 0x96615732,/192.168.1.51:48741 =>/192.168.1.51:5555] BOUND:/192.168.1.51:5555

14/08/1015:09:54 INFO ipc.NettyServer: [id: 0x96615732,/192.168.1.51:48741 =>/192.168.1.51:5555] CONNECTED:/192.168.1.51:48741

14/08/1015:09:57 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }

14/08/1015:10:43 INFO ipc.NettyServer: [id: 0x12621f9a,/192.168.1.50:38166 =>/192.168.1.51:5555] OPEN

14/08/1015:10:43 INFO ipc.NettyServer: [id: 0x12621f9a,/192.168.1.50:38166 =>/192.168.1.51:5555] BOUND:/192.168.1.51:5555

14/08/1015:10:43 INFO ipc.NettyServer: [id: 0x12621f9a,/192.168.1.50:38166 =>/192.168.1.51:5555] CONNECTED:/192.168.1.50:38166

14/08/1015:10:43 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }

14/08/1015:10:43 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }

11)案例11Load balancing Sink Processor

load balance type和failover不同的地方是load balance有两个配置一个是轮询一个是随机。两种情况下如果被选择的sink不可用就会自动尝试发送到下一个可用的sink上面。

a)在m1创建Load_balancing_Sink_Processors配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf

 

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1

 

#这个是配置Load balancing的关键需要有一个sink group

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type= load_balance

a1.sinkgroups.g1.processor.backoff =true

a1.sinkgroups.g1.processor.selector = round_robin

 

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1

 

 

# Describe the sink

a1.sinks.k1.type= avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname= m1

a1.sinks.k1.port = 5555

 

a1.sinks.k2.type= avro

a1.sinks.k2.channel = c1

a1.sinks.k2.hostname= m2

a1.sinks.k2.port = 5555

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

b)在m1创建Load_balancing_Sink_Processors_avro配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5555

 

# Describe the sink

a1.sinks.k1.type= logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

c)将2个配置文件复制到m2上一份


1

2

root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf  root@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf

root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf

d)打开4个窗口在m1和m2上同时启动两个flume agent


1

2

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console

e)然后在m1或m2的任意一台机器上测试产生log一行一行输入输入太快容易落到一台机器上


1

2

3

4

root@m1:/home/hadoop# echo “idoall.org test1″ | nc localhost 5140

root@m1:/home/hadoop# echo “idoall.org test2″ | nc localhost 5140

root@m1:/home/hadoop# echo “idoall.org test3″ | nc localhost 5140

root@m1:/home/hadoop# echo “idoall.org test4″ | nc localhost 5140

f)在m1的sink窗口可以看到以下信息


1

2

14/08/1015:35:29 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }

14/08/1015:35:33 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }

g)在m2的sink窗口可以看到以下信息


1

2

14/08/1015:35:27 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }

14/08/1015:35:29 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }

说明轮询模式起到了作用。

12)案例12Hbase sink


a)在测试之前请先参考《ubuntu12.04+hadoop2.2.0+zookeeper3.4.5+hbase0.96.2+hive0.13.1分布式环境部署》将hbase启动

b)然后将以下文件复制到flume中


1

2

3

4

5

6

7

8

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/protobuf-java-2.5.0.jar/home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-client-0.96.2-hadoop2.jar/home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-common-0.96.2-hadoop2.jar/home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-protocol-0.96.2-hadoop2.jar/home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-server-0.96.2-hadoop2.jar/home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-hadoop2-compat-0.96.2-hadoop2.jar/home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-hadoop-compat-0.96.2-hadoop2.jar/home/hadoop/flume-1.5.0-bin/lib@@@

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/htrace-core-2.04.jar/home/hadoop/flume-1.5.0-bin/lib

c)确保test_idoall_org表在hbase中已经存在test_idoall_org表的格式以及字段请参考《ubuntu12.04+hadoop2.2.0+zookeeper3.4.5+hbase0.96.2+hive0.13.1分布式环境部署》中关于hbase部分的建表代码。

d)在m1创建hbase_simple配置文件


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

 

# Describe the sink

a1.sinks.k1.type= logger

a1.sinks.k1.type= hbase

a1.sinks.k1.table = test_idoall_org

a1.sinks.k1.columnFamily = name

a1.sinks.k1.column = idoall

a1.sinks.k1.serializer =  org.apache.flume.sink.hbase.RegexHbaseEventSerializer

a1.sinks.k1.channel = memoryChannel

 

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

e)启动flume agent


1

/home/hadoop/flume-1.5.0-bin/bin/flume-ngagent -c . -f/home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console

f)测试产生syslog


1

root@m1:/home/hadoop# echo “hello idoall.org from flume” | nc localhost 5140

g)这时登录到hbase中可以发现新数据已经插入


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

root@m1:/home/hadoop# /home/hadoop/hbase-0.96.2-hadoop2/bin/hbase shell

2014-08-10 16:09:48,984 INFO  [main] Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available

HBase Shell; enter’help<RETURN>’forlist of supported commands.

Type”exit<RETURN>”to leave the HBase Shell

Version 0.96.2-hadoop2, r1581096, Mon Mar 24 16:03:18 PDT 2014

 

hbase(main):001:0> list

TABLE                                                                                                                                                                                                                 

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found bindingin[jar:file:/home/hadoop/hbase-0.96.2-hadoop2/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found bindingin[jar:file:/home/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

hbase2hive_idoall                                                                                                                                                                                                     

hive2hbase_idoall                                                                                                                                                                                                     

test_idoall_org                                                                                                                                                                                                       

3 row(s)in2.6880 seconds

 

=> ["hbase2hive_idoall","hive2hbase_idoall","test_idoall_org"]

hbase(main):002:0> scan”test_idoall_org”

ROW                                                    COLUMN+CELL                                                                                                                                                    

 10086                                                 column=name:idoall, timestamp=1406424831473, value=idoallvalue                                                                                                 

1 row(s)in0.0550 seconds

 

hbase(main):003:0> scan”test_idoall_org”

ROW                                                    COLUMN+CELL                                                                                                                                                    

 10086                                                 column=name:idoall, timestamp=1406424831473, value=idoallvalue                                                                                                 

 1407658495588-XbQCOZrKK8-0                            column=name:payload, timestamp=1407658498203, value=hello idoall.org from flume                                                                                

2 row(s)in0.0200 seconds

 

hbase(main):004:0> quit

经过这么多flume的例子测试如果你全部做完后会发现flume的功能真的很强大可以进行各种搭配来完成你想要的工作俗话说师傅领进门修行在个人如何能够结合你的产品业务将flume更好的应用起来快去动手实践吧。

这篇文章做为一个笔记希望能够对刚入门的同学起到帮助作用。

时间: 2024-10-19 07:19:48

Flume1.5.0的安装、部署、简单应用(含伪分布式、与hadoop2.2.0、hbase0.96的案例) … …的相关文章

Linux Redis 4.0.2 安装部署

Linux Redis 4.0.2 安装部署 01 安装GCC yum -y install gcc gcc-c++ libstdc++-devel tcl -y 02 下载安装包 cd /export/software/ wget http://219.238.7.71/files/403400000ABE0C0C/download.redis.io/releases/redis-4.0.2.tar.gz tar -zxvf redis-4.0.2.tar.gz -C ../servers c

Hadoop2.2.0 的安装和基本配置

Hadoop2.0的架构和1.0完全不一样,在安装配置上和1.0也有很大的不同,譬如配置文件的目录不一样了,还有要对yarn进行配置,这个在1.0是没有的.很多人第一次接触hadoop2.0的时候,会很不适应,而且官方的文档也有些写得不太清楚的地方,也有些错误.笔者在初次安装hadoop2.0的时候,看着官方的文档,中间也出现过很多问题.为了帮助大家很快的部署上hadoop2.0,笔者写了这篇文章.这篇文章主要就是介绍一个hadoop2.0的一个最基本最简单的配置,目的就是尽快的让hadoop2

Hadoop-2.8.0集群搭建、hadoop源码编译和安装、host配置、ssh免密登录、hadoop配置文件中的参数配置参数总结、hadoop集群测试,安装过程中的常见错误

25.集群搭建 25.1 HADOOP集群搭建 25.1.1集群简介 HADOOP集群具体来说包含两个集群:HDFS集群和YARN集群,两者逻辑上分离,但物理上常在一起 HDFS集群: 负责海量数据的存储,集群中的角色主要有NameNode / DataNode YARN集群: 负责海量数据运算时的资源调度,集群中的角色主要有 ResourceManager /NodeManager 25.1.2服务器准备 本案例使用虚拟机服务器来搭建HADOOP集群,所用软件及版本: ü Vmware 11.

高可用Hadoop平台-Ganglia安装部署

1.概述 最近,有朋友私密我,Hadoop有什么好的监控工具,其实,Hadoop的监控工具还是蛮多的.今天给大家分享一个老牌监控工具 Ganglia,这个在企业用的也算是比较多的,Hadoop对它的兼容也很好,不过就是监控界面就不是很美观.下次给大家介绍另一款工具--Hue,这 个界面官方称为Hadoop UI,界面美观,功能也比较丰富.今天,在这里主要给大家介绍Ganglia这款监控工具,介绍的内容主要包含如下: Ganglia背景 Ganglia安装部署.配置 Hadoop集群配置Gangl

服务器-.net framework4.5.1 ClickOnce 安装部署

问题描述 .net framework4.5.1 ClickOnce 安装部署 最近遇到了一个ClickOnce部署的问题. 在.net framework4.0的时候, 我们部署完服务端, 会使用自己写的config工具修改服务器配置和数据库配置的文件, 会导致其对应的Hash值被更改, 所以在修改完配置之后, 会使用sha.ComputeHash(inputStream);重新计算hash并且更新回去. 这个是没有问题的. 但是最近我们升级到4.5.1, 就出现问题了. 一样的操作和方法,

FreeBSD 7.0 图文安装教程第1/4页_安装教程

FreeBSD 是一种优秀的 Unix 类操作系统,它以卓越的稳定性.安全性和性能著称,是目前主流服务器操作系统之一.就在不久前,FreeBSD 发布了一个新的重大版本 FreeBSD 7.0,新版本增加了许多优秀特性.现在我们以标准PC机为例,开始介绍 FreeBSD 7.0 的安装过程. 首先,你需要拥有 FreeBSD 7.0 的安装光盘,你可以在官方网站 http://www.freebsd.org/ 中下载到安装镜像,刻录成 CD 后使用.如果你是在虚拟机中安装,直接下载镜像即可.Fr

Linux下Hadoop2.6.0集群环境的搭建

本文旨在提供最基本的,可以用于在生产环境进行Hadoop.HDFS分布式环境的搭建,对自己是个总结和整理,也能方便新人学习使用. 基础环境 JDK的安装与配置 现在直接到Oracle官网(http://www.oracle.com/)寻找JDK7的安装包不太容易,因为现在官方推荐JDK8.找了半天才找到JDK下载列表页的地址(http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html).因

CentOS 64位上编译 Hadoop2.6.0

由于hadoop-2.6.0.tar.gz安装包是在32位机器上编译的,64位的机器加载本地库.so文件时会出错,比如: java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(II[BI[BIILjava/lang/String;JZ)V  所以需要重新编译 1.编译环境 yum install cmake lzo-devel zlib-devel

hadoop2 2 0 eclipse-eclipse调用hadoop2.2.0 问题

问题描述 eclipse调用hadoop2.2.0 问题 我在eclipse里调用虚拟机上面的hadoop2.2.0的集群时,可以访问到hdfs里的文件夹,但是打开不了里面的具体文档,也没办法直接在dfs locations里直接右键上传(上传上去就一个空的文档,文档里面的数据没了,下载也一样,下载一个空的文档,文档的内容没了)这是为啥呢,求解!