Flume日志收集分层架构应用实践

Flume作为一个日志收集工具,非常轻量级,基于一个个Flume Agent,能够构建一个很复杂很强大的日志收集系统,它的灵活性和优势,主要体现在如下几点:

  • 模块化设计:在其Flume Agent内部可以定义三种组件:Source、Channel、Sink
  • 组合式设计:可以在Flume Agent中根据业务需要组合Source、Channel、Sink三种组件,构建相对复杂的日志流管道
  • 插件式设计:可以通过配置文件来编排收集日志管道的流程,减少对Flume代码的侵入性
  • 可扩展性:我们可以根据自己业务的需要来定制实现某些组件(Source、Channel、Sink)
  • 支持集成各种主流系统和框架:像Hadoop、HBase、Hive、Kafka、ElasticSearch、Thrift、Avro等,都能够很好的和Flume集成
  • 高级特性:Failover、Load balancing、Interceptor等

有关Flume的相关内容,可以参考官网文档,或者通过阅读我之前写的文章《Flume(NG)架构设计要点及配置实践》来快速了解。

为什么要对Flume日志收集系统进行分层设计

基于Flume设计实现分层日志收集系统,到底有什么好处呢?我们可以先看一下,如果不分层,会带来哪些问题:

  • 如果需要通过Kafka去缓冲上游基于Flume收集而构建的日志流,对于数据平台内部服务器产生的数据还好,但是如果日志数据是跨业务组,甚至是跨部门,那么就需要将Kafka相关信息暴露给外部,这样对Kafka的访问便不是数据平台内部可控的
  • 如果是外部日志进入平台内部HDFS,这样如果需要对Hadoop系统进行升级或例行维护,这种直连的方式会影响到上游部署Flume的日志流的始端日志收集服务
  • 如果数据平台内部某些系统,如Kafka集群、HDFS集群所在节点的机房位置变更,数据迁移,会使得依赖日志数据的外部系统受到不同程度的影响,外部系统需要相关开发或运维人员参与进来
  • 由于收集日志的数据源端可能是外部一些服务器(多个单个的节点),一些业务集群(相互协作的多节点组),也可能是内部一些提供收集服务的服务节点,这些所有的服务器上部署的Flume Agent都处于一层中,比较难于分组管理
  • 由于所有数据源端Flume Agent收集的日志进入数据平台的时候,没有一个统一的类似总线的组件,很难因为某些业务扩展而独立地去升级数据平台内部的接收层服务节点,可能为了升级数据平台内部某个系统或服务而导致影响了其他的接收层服务节点

通过下图我们可以看出,这种单层日志收集系统设计,存在太多的问题,而且系统或服务越多导致整个日志收集系统越难以控制:


上图中,无论是外部还是内部,只要部署了Flume Agent的节点,都直接同内部的Kafka集群和Hadoop集群相连,所以在数据平台内部只能尽量保持Kafka和Hadoop集群正常稳定运行,也要为外部日志收集Flume Agent的数据流量的陡增和异常变化做好防控准备。再者,如需停机维护或者升级某一个集群,可能都需要通知外部所有Flume Agent所在节点的业务方,做好应对(停机)准备。
接着看,如果我们基于Flume使用分层的方式来设计日志收集系统,又有哪些优势,如下图所示:

上图中,Flume日志收集系统采用两层架构设计:第一层(L1)是日志收集层,第二层(L2)是数据平台缓冲层(汇聚层)。通过这种方式,使得日志收集系统有如下特点:

  • 针对数据平台外部的业务系统,根据需要分析的数据业务类型进行分组,属于同一种类型的业务日志,在数据平台前端增加了一个Flume汇聚层节点组,该组节点只影响到它对应的L1层的业务数据
  • 如果Hadoop集群、Kafka需要停机维护或升级,对外部L1层Flume Agent没有影响,只需要在L2层做好数据的接收与缓冲即可,待维护或升级结束,继续将L2层缓存的数据导入到数据存储系统
  • 如果外部某个类型的业务日志数据节点需要扩容,直接在L1层将数据流指向数据平台内部与之相对应的L2层Flume Agent节点组即可,能够对外部因业务变化发生的新增日志收集需求,进行快速地响应和部署
  • 对于数据平台内部,因为收集日志的节点非常可控,可以直接通过L1层Flume Agent使日志数据流入HDFS或Kafka,当然为了架构统一和管理,最好也是通过L2层Flume Agent节点组来汇聚/缓冲L1层Flume Agent收集的日志数据

通过上面分析可见,分层无非是为了使的日志数据源节点的Flume Agent服务与数据平台的存储系统(Kafka/HDFS)进行解耦,同时能够更好地对同类型业务多节点的日志流进行一个聚合操作,并分离开独立管理。另外,可以根据实际业务需要,适当增加Flume系统分层,满足日志流数据的汇聚需要。

应用整体架构

我们看一下,Flume日志收集系统,在我们这个示例应用中处于一个什么位置,我简单画了一下图,加了一些有关数据处理和分析的节点/组件,如下图所示:


这里,简单了解一下上图即可,由于日志收集在整个应用系统中是很重要的一个环节,所以必须保证日志收集系统设计的可靠、可用、灵活、稳定,通过上面在日志收集系统收集日志之后,数据平台所做的大量分析处理,来凸显日志收集系统的重要性,这里其他内容不做过多说明。

Flume分层架构实践

这里,我们主要以实时收集日志为例,说明如何构建一个相对复杂的Flume分层日志收集系统。首先,简要说明一下日志收集需求:

  • 手机客户端上报的用户行为事件(App User Event),通过数据平台内部定义好的接口格式,从Nginx日志里面实时流入数据平台,这对应于Flume日志收集系统L1层
  • 通过组织各种活动,来推广某些App的产品特性,会定向向用户推送通知,单独使用推送点击(Push Click)Agent来收集这些点击行为数据
  • App所依赖的一些基础内容,会以服务的形式开放给外部第三方调用,对于由第三方App带来的用户的行为点击事件(Thirdparty Click),单独使用L1层Flume Agent进行收集
  • 第三方会在App中根据不同的内容,投放广告(Ad),对于广告曝光/点击行为的数据,与上述提到的数据收集单独分离出来,因为该日志数据后期可能会大规模推广,会有爆发性增长,在L1层进行收集
  • 在L2层主要是汇聚或缓冲L1层流入的日志数据
  • 同时,为了防止L2层Flume Agent因为故障或例行停机维护等,所以使用了Flume的Failover特性,亦即L1层每一个Sink同时指向L2层的2个相同的Flume Agent
  • L1层的Flume Agent在收集日志的过程中应该不允许在Channel中累积过多数据(但是还要防止数据流速过慢导致内存Channel数据溢出),还要能够尽量降低读写磁盘的开销,所以使用内存类型的Channel
  • L2层为了保证数据能够可靠地缓冲(在允许的一段时间内累积保存数据),如Hadoop或Kafka故障停机或停机维护升级,采用文件类型的Channel,还要尽量调大容量,也不能因为多应用共享磁盘而造成数据处理延迟,所以对于不同的Channel分别使用独立的磁盘

详细分层设计如下图所示:


上图是从实际的整个数据平台中拿出来一部分,简单便于解释说明。有关上图中所涉及到的Flume Agent的配置详情,下面会根据Flume分层的结构(L1层、L2层)来详细配置说明。由于L1层的10.10.1.101和10.10.1.102节点上部署的Flume Agent是对称的,所以下面只拿出其中一个来说明配置,不同的是,这两个节点上Flume Agent的Sink使用Failover功能,分别交叉指向L2层Flume Agent,也能够起到一定的负载均衡的作用。

上游Flume日志收集层

下面,分别针对10.10.1.101节点上的3个Flume Agent的配置内容,分别进行说明如下:

  • L1层:App用户行为事件(App User Event)日志收集

Flume Agent名称为a1,使用Exec Source、Memory Channel、Avro Sink,这里我们的Nginx日志文件始终指向/data/nginx/logs/app_user_events.log,即使日切或小时切文件,使用tail -F就能保证日志内容都被收集。具体配置内容如下所示:

01 a1.sources = s1
02 a1.channels = mc1
03 a1.sinks = k1 k2
04
05 # Configure source
06 a1.sources.s1.channels = mc1
07 a1.sources.s1.type = exec
08 a1.sources.s1.command = tail -F /data/nginx/logs/app_user_events.log
09
10 # Configure channel
11 a1.channels.mc1.type = memory
12 a1.channels.mc1.transactionCapacity = 50000
13 a1.channels.mc1.capacity = 100000
14
15 # Configure sinks
16 a1.sinks.k1.channel = mc1
17 a1.sinks.k1.type = avro
18 a1.sinks.k1.hostname = 10.10.1.122
19 a1.sinks.k1.port = 44446
20
21 a1.sinks.k2.channel = mc1
22 a1.sinks.k2.type = avro
23 a1.sinks.k2.hostname = 10.10.1.121
24 a1.sinks.k2.port = 44446
25
26 # Configure failover
27 a1.sinkgroups = g1
28 a1.sinkgroups.g1.sinks = k1 k2
29 a1.sinkgroups.g1.processor.type = failover
30 a1.sinkgroups.g1.processor.priority.k1 = 9
31 a1.sinkgroups.g1.processor.priority.k2 = 7
32 a1.sinkgroups.g1.processor.maxpenalty = 10000
  • L1层:推送点击事件(Push Click Event)日志收集
01 a2.sources = s2
02 a2.channels = mc2
03 a2.sinks = k3 k4
04
05 # Configure source
06 a2.sources.s2.channels = mc2
07 a2.sources.s2.type = exec
08 a2.sources.s2.command = tail -F /data/nginx/logs/push_click_events.log
09
10 # Configure channel
11 a2.channels.mc2.type = memory
12 a2.channels.mc2.capacity = 50000
13 a2.channels.mc2.transactionCapacity = 100000
14
15 # Configure sinks
16 a2.sinks.k3.channel = mc2
17 a2.sinks.k3.type = avro
18 a2.sinks.k3.hostname = 10.10.1.121
19 a2.sinks.k3.port = 44447
20
21 a2.sinks.k4.channel = mc2
22 a2.sinks.k4.type = avro
23 a2.sinks.k4.hostname = 10.10.1.122
24 a2.sinks.k4.port = 44447
25
26 # Configure failover
27 a2.sinkgroups = g2
28 a2.sinkgroups.g2.sinks = k3 k4
29 a2.sinkgroups.g2.processor.type = failover
30 a2.sinkgroups.g2.processor.priority.k3 = 9
31 a2.sinkgroups.g2.processor.priority.k4 = 7
32 a2.sinkgroups.g2.processor.maxpenalty = 10000
  • L1层:第三方点击事件(Thirdparty Click Event)日志收集

第三方点击事件通过统一的接口上传数据,那么配置起来也比较容易,如下所示:

01 a3.sources = s3
02 a3.channels = mc3
03 a3.sinks = k5 k6
04
05 # Configure source
06 a3.sources.s3.channels = mc3
07 a3.sources.s3.type = exec
08 a3.sources.s3.command = tail -F /data/nginx/logs/thirdparty_click_events.log
09
10 # Configure channel
11 a3.channels.mc3.type = memory
12 a3.channels.mc3.transactionCapacity = 50000
13 a3.channels.mc3.capacity = 100000
14
15 # Configure sinks
16 a3.sinks.k5.channel = mc3
17 a3.sinks.k5.type = avro
18 a3.sinks.k5.hostname = 10.10.1.121
19 a3.sinks.k5.port = 44446
20
21 a3.sinks.k6.channel = mc3
22 a3.sinks.k6.type = avro
23 a3.sinks.k6.hostname = 10.10.1.122
24 a3.sinks.k6.port = 44446
25
26 # Configure failover
27 a3.sinkgroups = g3
28 a3.sinkgroups.g3.sinks = k5 k6
29 a3.sinkgroups.g3.processor.type = failover
30 a3.sinkgroups.g3.processor.priority.k5 = 9
31 a3.sinkgroups.g3.processor.priority.k6 = 7
32 a3.sinkgroups.g3.processor.maxpenalty = 10000
  • L1层:广告点击事件(Ad Click Event)日志收集

广告点击事件日志收集配置,如下所示:

01 a4.sources = s4
02 a4.channels = mc4
03 a4.sinks = k7 k8
04
05 # Configure source
06 a4.sources.s4.channels = mc4
07 a4.sources.s4.type = exec
08 a4.sources.s4.command = tail -F /data/nginx/logs/ad.log
09
10 # Configure channel
11 a4.channels.mc4.type = memory
12 a4.channels.mc4.transactionCapacity = 50000
13 a4.channels.mc4.capacity = 100000
14
15 # Configure sinks
16 a4.sinks.k7.channel = mc4
17 a4.sinks.k7.type = avro
18 a4.sinks.k7.hostname = 10.10.1.121
19 a4.sinks.k7.port = 44448
20
21 a4.sinks.k8.channel = mc4
22 a4.sinks.k8.type = avro
23 a4.sinks.k8.hostname = 10.10.1.122
24 a4.sinks.k8.port = 44448
25
26 # Configure failover
27 a4.sinkgroups = g4
28 a4.sinkgroups.g4.sinks = k7 k8
29 a4.sinkgroups.g4.processor.type = failover
30 a4.sinkgroups.g4.processor.priority.k7 = 10
31 a4.sinkgroups.g4.processor.priority.k8 = 8
32 a4.sinkgroups.g4.processor.maxpenalty = 10000

下游Flume日志收集汇聚层

  • L2层:App用户事件+推送点击事件日志合并收集

这种业务需求是:把App用户事件和推送点击事件合并写入文件,最后都会写入HDFS,从而进一步在Hive中进行离线分析;同时又要使这两种事件分别独立地走实时计算的流程,App用户事件实时计算流程需要实时统计用户使用App过程中行为特征,而推送点击事件实时计算需要针对某一次活动来实时分析和展示用户的参与情况。
具体配置内容,如下所示:

01 a1.sources = s1 s2
02 a1.channels = fc1 fc2 fc3
03 a1.sinks = kk1 fk2 kk3
04
05 # Configure source:
06 # Configure app user event source: s1 -> fc1+fc2
07 a1.sources.s1.channels = fc1 fc2
08 a1.sources.s1.type = avro
09 a1.sources.s1.bind = 10.10.1.121
10 a1.sources.s1.port = 44446
11 a1.sources.s1.threads = 8
12
13 # Configure source
14 # Configure push click event source: s2 -> fc2+fc3
15 a1.sources.s2.channels = fc2 fc3
16 a1.sources.s2.type = avro
17 a1.sources.s2.bind = 10.10.1.122
18 a1.sources.s2.port = 44447
19 a1.sources.s2.threads = 4
20
21 # Configure file channel(/data1)
22 # Configure app user event channel: fc1 ->kk1
23 a1.channels.fc1.type = file
24 a1.channels.fc1.checkpointDir = /data1/flume/channels/app_user_event/checkpoint
25 a1.channels.fc1.useDualCheckpoints = true
26 a1.channels.fc1.backupCheckpointDir = /data1/flume/channels/app_user_event/backup
27 a1.channels.fc1.dataDirs = /data1/flume/channels/app_user_event/data
28 a1.channels.fc1.transactionCapacity = 100000
29 a1.channels.fc1.capacity = 500000
30 a1.channels.fc1.checkpointInterval = 60000
31 a1.channels.fc1.keep-alive = 5
32 a1.channels.fc1.maxFileSize = 5368709120
33
34 # Configure file channel(/data2)
35 # Configure app user event + push click event: fc2 - > fk2
36 a1.channels.fc2.type = file
37 a1.channels.fc2.checkpointDir = /data2/flume/channels/offline_file_event/checkpoint
38 a1.channels.fc2.useDualCheckpoints = true
39 a1.channels.fc2.backupCheckpointDir = /data2/flume/channels/offline_file_event/backup
40 a1.channels.fc2.dataDirs = /data2/flume/channels/offline_file_event/data
41 a1.channels.fc2.transactionCapacity = 100000
42 a1.channels.fc2.capacity = 500000
43 a1.channels.fc2.checkpointInterval = 60000
44 a1.channels.fc2.keep-alive = 5
45 a1.channels.fc2.maxFileSize = 5368709120
46
47 # Configure file channel(/data3)
48 # Configure push click channel: fc3 ->kk3
49 a1.channels.fc3.type = file
50 a1.channels.fc3.checkpointDir = /data3/flume/channels/push_click_event/checkpoint
51 a1.channels.fc3.useDualCheckpoints = true
52 a1.channels.fc3.backupCheckpointDir = /data3/flume/channels/push_click_event/backup
53 a1.channels.fc3.dataDirs = /data3/flume/channels/push_click_event/data
54 a1.channels.fc3.transactionCapacity = 100000
55 a1.channels.fc3.capacity = 500000
56 a1.channels.fc3.checkpointInterval = 60000
57 a1.channels.fc3.keep-alive = 5
58 a1.channels.fc3.maxFileSize = 5368709120
59
60 # Configure sink: RealtimeMessageSink(app user event)
61 a1.sinks.kk1.type = org.shirdrn.flume.sink.RealtimeMessageSink
62 a1.sinks.kk1.channel = fc1
63 a1.sinks.kk1.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092
64 a1.sinks.kk1.topic = json_user_event
65 a1.sinks.kk1.serializer.class = kafka.serializer.StringEncoder
66 a1.sinks.kk1.producer.type = async
67 a1.sinks.kk1.message.send.max.retries = 3
68 a1.sinks.kk1.client.id = flume_app_user_event_2_1
69 a1.sinks.kk1.event.decoder.count = 8
70 a1.sinks.kk1.output.stat.event.batch.size = 2000
71 a1.sinks.kk1.event.decoder.queue.size = 1000
72
73 # Configure sink: RichRollingFileSink
74 a1.sinks.fk2.type = org.shirdrn.flume.sink.RichRollingFileSink
75 a1.sinks.fk2.channel = fc2
76 a1.sinks.fk2.batchSize = 100
77 a1.sinks.fk2.serializer = TEXT
78 a1.sinks.fk2.sink.rollInterval = 60
79 a1.sinks.fk2.sink.directory = /data/flume/rolling_files
80 a1.sinks.fk2.sink.file.prefix = event
81 a1.sinks.fk2.sink.file.suffix = .log
82 a1.sinks.fk2.sink.file.pattern = yyyyMMddHHmmss
83
84 # Configure sink: RealtimeMessageSink(push click)
85 a1.sinks.kk3.type = org.shirdrn.flume.sink.RealtimeMessageSink
86 a1.sinks.kk3.channel = fc3
87 a1.sinks.kk3.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092
88 a1.sinks.kk3.topic = json_push_click_event
89 a1.sinks.kk3.serializer.class = kafka.serializer.StringEncoder
90 a1.sinks.kk3.producer.type = async
91 a1.sinks.kk3.message.send.max.retries = 3
92 a1.sinks.kk3.client.id = flume_push_click_2_1
93 a1.sinks.kk3.event.decoder.count = 4
94 a1.sinks.kk3.output.stat.event.batch.size = 2000
95 a1.sinks.kk3.event.decoder.queue.size = 1000

上面,可以看到我们自己实现的org.shirdrn.flume.sink.RealtimeMessageSink,该Sink主要是使Flume收集的日志写入Kafka中,在Flume 1.5.0版本中还没有内置实现,所以我们自己实现了,并在其中加入了适合我们业务的处理逻辑,比如,将Nginx日志记录行解析,然后根据实时计算需要,过滤掉不需要进入Kafka(最终在Storm集群中处理)事件数据,最后转成JSON字符串的格式,写入到Kafka中的Topic里。通过上面的配置也可以看出,可以配置很多参数,例如解析线程数、队列大小等。
由于我们需要将写入本地文件系统的文件按照我们自己的方式来定义,所以基于Flume内置的file_roll实现进行修改,实现了自己的org.shirdrn.flume.sink.RichRollingFileSink,该Sink主要是对文件名字符串进行格式化,能够通过文件名来获取到文件生成的时间(人类可读格式)。

  • L2层:广告点击事件日志收集

上面的图中,L1层可以根据需要扩展到更多的服务器节点,在L2层根据需要进行汇聚/缓冲,具体配置内容如下所示:

01 a2.sources = s3
02 a2.channels = fc4
03 a2.sinks = kk4
04
05 # Configure source: s3 -> fc4
06 a2.sources.s3.channels = fc4
07 a2.sources.s3.type = avro
08 a2.sources.s3.bind = 10.10.1.121
09 a2.sources.s3.port = 44448
10 a2.sources.s3.threads = 2
11
12 # Configure channel(/data4)
13 # Configure Ad channel: fc4 ->kk4
14 a2.channels.fc4.type = file
15 a2.channels.fc4.checkpointDir = /data4/flume/channels/ad/checkpoint
16 a2.channels.fc4.useDualCheckpoints = true
17 a2.channels.fc4.backupCheckpointDir = /data4/flume/channels/ad/backup
18 a2.channels.fc4.dataDirs = /data4/flume/channels/ad/data
19 a2.channels.fc4.transactionCapacity = 100000
20 a2.channels.fc4.capacity = 500000
21 a2.channels.fc4.checkpointInterval = 60000
22 a2.channels.fc4.keep-alive = 5
23 a2.channels.fc1.maxFileSize = 5368709120
24
25 # Configure sinks: RealtimeAdKafkaSink
26 a2.sinks.kk4.type = org.shirdrn.flume.sink.RealtimeAdKafkaSink
27 a2.sinks.kk4.channel = fc4
28 a2.sinks.kk4.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092
29 a2.sinks.kk4.topic = json_ad_event
30 a2.sinks.kk4.serializer.class = kafka.serializer.StringEncoder
31 a2.sinks.kk4.producer.type = async
32 a2.sinks.kk4.message.send.max.retries = 3
33 a2.sinks.kk4.client.id = flume_ad_2_1
34 a2.sinks.kk4.event.decoder.count = 4
35 a2.sinks.kk4.output.stat.event.batch.size = 2500
36 a2.sinks.kk4.event.decoder.queue.size = 5000

实践总结

这里我们简单总结一些内容,如下所示:

  • Flume监控

简单一点的监控,直接在启动的时候,开启一个Web端口,通过端口来获取Flume Agent服务的一些相关数据,命令类似:

1 bin/flume-ng agent -n a1 -c conf -f conf/config.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

这样便可以在Flume Agent服务节点上,浏览Web端口34545来查看,数据以JSON格式表示,比较重要的一些元数据,如channel容量、当前使用量等等,通过这些数据可以了解当前Flume的工作状态,是否需要升级扩容等等。
另外,也可以通过Ganglia来收集并分析Flume Agent服务运行状态,能够更加详细地展示Flume Agent服务的状态,因为Ganglia配置相对复杂,这里就不做过多解释,感兴趣可以尝试一下。

  • Flume内存调优

因为Flume使用Java实现的,所以就会遇到有关JVM调优的问题,这个也比较容易。默认情况下,Flume Agent进程的堆内存设置比较小,在日志数据量比较大的情况下就需要修改并调试这些参数,以满足业务需要。设置JVM相关参数,可以修改conf/flume-env.sh文件(也可以直接在启动Flume Agent服务时指定JVM选项参数),例如修改JAVA_OPTS变量,示例如下所示:

1 JAVA_OPTS="-server -Xms1024m -Xmx4096m -Dcom.sun.management.jmxremote -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=4 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/data/flume/logs/gc-ad.log"

这样,可以方便地修改GC策略,一般由于Flume实时收集日志比较注重实时性,希望能够快速地响应,尽量减少GC导致暂停业务线程被挂起的时间,所以可以将GC设置为ParNew+CMS策略。将GC日志输出,在一定程度上能够更加方便地观察Flume Agent服务运行过程中JVM GC的详细情况,通过诊断来优化服务运行。

  • 下游L2层接收消息调优

通常,在开始部署Flume日志收集系统时,上游L1层服务节点比较少,在L2层汇聚时使用默认的配置可能效果也会不错,但是如果L1层Flume Agent越来越多,就能看到L2层处理速度慢下来。L2层的Flume Agent服务一般会远远小于L1层Flume Agent服务数,这种情况下,如果L2层Flume Agent服务使用Avro Source,可以调大Avro接收线程数,示例如下:

1 a1.sources.s1.type = avro
2 a1.sources.s1.bind = 10.10.1.121
3 a1.sources.s1.port = 44446
4 a1.sources.s1.threads = 8

上面默认情况下threads参数的值1,可以将该值调大,否则的话,L1层就会堆积日志记录,严重可能导致数据丢失。

  • Flume处理业务逻辑约束

Flume的易扩展性使得我们可以根据自己的业务特点来实现一些组件,那么我们在将实际业务逻辑掺杂进Flume中时,需要考虑是否非得必须这么做?如果这么做是否会影响Flume实时传输日志的速度和效率?
Flume作为一个轻量级的日志收集工具,个人认为最好将相对复杂的业务逻辑(尤其是需要与一些存储系统,如MySQL、Redis交互时)后移,放在Storm集群中去处理,或者自己实现的业务处理集群中,而Flume就让它去做其擅长的事情——路由消息。
当然,有些业务场景可能必须在Flume日志收集层去做,如根据原始非结构化的消息,无法控制不同类型的消息路由到不同的目的地,那么可能需要在收集层做一个简单的解析或格式化,实际上这是在Flume层做了一个简单的日志分发。无论如何,如果想在Flume层插入业务逻辑处理,尽量避免过于复杂的处理而影响整个日志传输速度,如果后端有实时推荐需求,日志中事件的实时性大大延迟,就会影响实施个性化推荐。

时间: 2024-10-29 14:31:15

Flume日志收集分层架构应用实践的相关文章

日志系统之Flume日志收集

最近接手维护一个日志系统,它用于对应用服务器上的日志进行收集然后提供实时分析.处理并最后将日志存储到目标存储引擎.针对这三个环节,业界已经有一套组件来应对各自的需求需求,它们是flume+kafka+hdfs/hbase.我们在实时分析.存储这两个环节,选择跟业界的实践相同,但agent是团队自己写的,出于对多种数据源的扩展需求以及原来收集日志的方式存在的一些不足,于是调研了一下flume的agent.结果是flume非常契合我们的实际需求,并且拥有良好的扩展性与稳定性.于是打算采用flume的

常见的几种Flume日志收集场景实战

这里主要介绍几种常见的日志的source来源,包括监控文件型,监控文件内容增量,TCP和HTTP. Spool类型 用于监控指定目录内数据变更,若有新文件,则将新文件内数据读取上传 在教你一步搭建Flume分布式日志系统最后有介绍此案例 Exec EXEC执行一个给定的命令获得输出的源,如果要使用tail命令,必选使得file足够大才能看到输出内容 创建agent配置文件  # vi /usr/local/flume170/conf/exec_tail.conf a1.sources = r1

《Flume日志收集与MapReduce模式》一第1章 概览与架构

第1章 概览与架构 如果在阅读本书,那就说明你正在数据的海洋中遨游.创建大量的数据是非常简单的事情,这要归功于Facebook.Twitter.Amazon.数码相机与相机照片.YouTube.Google,以及你能想得到的能够连接到互联网上的任何东西.作为网站的提供者,10年前的应用日志只是用来帮助你解决网站的问题.时至今日,如果你知道如何从大量的数据中浪里淘金,那么相同的数据就会提供关于业务与客户的有价值的信息. 此外,既然在阅读本书,那么你肯定知道创建Hadoop的目的在一定程度上就是为了

《Flume日志收集与MapReduce模式》一1.5 Flume事件

1.5 Flume事件 Flume传输的基本的数据负载叫作事件.事件由0个或多个头与体组成. 头是一些键值对,可用于路由判定或是承载其他的结构化信息(比如说事件的时间戳或是发出事件的服务器主机名).你可以将其看作是与HTTP头完成相同的功能--传递与体不同的额外信息的方式. 体是个字节数组,包含了实际的负载.如果输入由日志文件组成,那么该数组就非常类似于包含了单行文本的UTF-8编码的字符串. Flume可能会自动添加头(比如,源添加了数据来自的主机名或是创建了事件时间戳),不过体基本上是不受影

《Flume日志收集与MapReduce模式》一3.3 小结

3.3 小结 本章介绍了在数据处理管道中常用的两类通道.内存通道提供了更快的速度,这是以故障事件出现时数据丢失为代价的.此外,文件通道提供了更可靠的传输,因为它能容忍代理故障与重启,这是以牺牲性能为代价的.你需要确定哪种通道更适合于你的使用场景.在确定内存通道是否适合时,请问问自己丢失一些数据的经济上的代价如何.在考虑是否使用持久化通道时请衡量它与添加更多的硬件以弥补性能上的差异时的代价相比如何.另一个考虑就是数据问题了.写入到Hadoop中的数据不一定都来自于流式应用日志.如果接收的是每天的数

《Flume日志收集与MapReduce模式》一2.3 从“Hello World”开始

2.3 从"Hello World"开始 每一本技术图书都会有一个"Hello World"示例.下面是我们将会使用的配置文件: 这里定义了一个名为agent的代理,它有一个名为s1的源.一个名为c1的通道,以及一个名为k1的接收器.源s1的类型为netcat,它只是打开一个Socket监听事件(每个事件一行文本).它需要两个参数,分别是一个绑定IP与一个端口号.该示例使用0.0.0.0作为绑定地址(表示监听任何地址的Java约定)以及端口号12345.源配置还有一

《Flume日志收集与MapReduce模式》一3.2 文件通道

3.2 文件通道 文件通道指的是将事件存储到代理本地文件系统中的通道.虽然要比内存通道慢一些,不过它却提供了持久化的存储路径,可以应对大多数情况,它应该用在数据流中不允许出现缺口的场合.这种持久化能力是由Write Ahead Log(WAL)以及一个或多个文件存储目录联合提供的.WAL用于以一种原子且安全的方式追踪来自于通道的所有输入与输出.通过这种方式,如果代理重启,那么WAL可以重放,从而确保在清理本地文件系统的数据存储前进入到通道中的所有事件都会被写出.此外,如果数据处理策略要求磁盘上的

《Flume日志收集与MapReduce模式》一1.4 源、通道与接收器

1.4 源.通道与接收器 Flume代理的架构可以通过下面这张简单的图呈现出来.输入叫作源,输出叫作接收器.通道提供了源与接收器之间的胶水.它们都运行在叫作代理的守护进程中. 请记住如下概念:源将事件写到一个或多个通道中.通道作为事件从源到接收器传递的保留区.接收器只从一个通道接收事件.代理可能会有多个源.通道与接收器.

《Flume日志收集与MapReduce模式》一2.2 Flume配置文件概览

2.2 Flume配置文件概览 既然已经下载好了Flume,下面来花点时间看看如何配置代理.Flume代理的默认配置提供者使用了一个简单的键值对的Java属性文件,你需要在启动时向代理传递一个参数.由于可以在单个文件中配置多个代理,因此还需要额外传递一个代理标识符(叫作名字),这样它就知道该使用哪个代理了.在给出的示例中,我只指定了一个代理,使用agent这个名字.每个代理的配置都以下面这3个参数开始: 每个源.通道与接收器在该代理的上下文中也有一个唯一的名字.比如,如果不打算传递Apache访