基于Apache Flume Datahub插件将日志数据同步上云

本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps


简介

Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件。本文将介绍如何使用Apache Flume的Datahub Sink插件将日志数据实时上传到Datahub。

环境要求

  • JDK (1.7及以上,推荐1.7)
  • Flume-NG 1.x
  • Apache Maven 3.x

插件部署

下载插件压缩包

$ wget http://repo.aliyun.com/download/flume-datahub-sink-1.1.0.tar.gz

解压插件压缩包

$ tar zxvf flume-datahub-sink-1.1.0.tar.gz
$ ls flume-datahub-sink
lib    libext

部署Datahub Sink插件

将解压后的插件文件夹flume-datahub-sink移动到Apache Flume安装目录下

$ mkdir {YOUR_FLUME_DIRECTORY}/plugins.d
$ mv flume-datahub-sink {YOUR_FLUME_DIRECTORY}/plugins.d/

移动后,核验Datahub Sink插件是否已经在相应目录:

$ ls { YOUR_APACHE_FLUME_DIR }/plugins.d
flume-datahub-sink

配置示例

Flume的原理、架构,以及核心组件的介绍请参考 Flume-ng的原理和使用。本文将构建一个使用Datahub Sink的Flume实例,对日志文件中的结构化数据进行解析,并上传到Datahub Topic中。

需要上传的日志文件格式如下(每行为一条记录,字段之间逗号分隔):

# test_basic.log
some,log,line1
some,log,line2
...

下面将创建Datahub Topic,并把每行日志的第一列和第二列作为一条记录写入Topic中。

创建Datahub Topic

使用Datahub WebConsole创建好Topic,schema为(string c1, string c2),下面假设建好的Topic名为test_topic。

Flume配置文件

在Flume安装目录的conf/文件夹下创建名为datahub_basic.conf的文件,并输入内容如下:

# A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log

# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessID = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_END_POINT}
a1.sinks.k1.datahub.project = test_project
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.batchSize = 1
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = c1,c2,
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里serializer配置指定了以逗号分隔的形式将输入源解析成三个字段,并忽略第三个字段。

启动Flume

配置完成后,启动Flume并指定agent的名称和配置文件路径,添加-Dflume.root.logger=INFO,console选项可以将日志实时输出到控制台。

$ cd {YOUR_FLUME_DIRECTORY}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

写入成功,显示日志如下:

...
Write success. Event count: 2
...

数据使用

日志数据通过Flume上传到Datahub后,可以使用StreamCompute流计算来进行实时分析,例如对于一些Web网站的日志,可以实时统计各个页面的PV/UV等。另外,导入Datahub的数据也可以配置Connector将数据归档至MaxCompute中,方便后续的离线分析。

对于数据归档MaxCompute的场景,一般来说需要将数据进行分区。Datahub到MaxCompute的归档可以根据MaxCompute表的分区字段自动创建分区,前提是要求MaxCompute和Datahub的字段名以及类型可以完全对应上。如果需要根据日志的传输时间自动设置分区,则在上面的例子中需要指定MaxCompute的分区相应字段和时间格式,例如按小时自动创建分区,添加的配置如下:

a1.sinks.k1.maxcompute.partition.columns = pt
a1.sinks.k1.maxcompute.partition.values = %Y%m%d%H

注意:pt这个字段需要在Datahub Topic以及MaxCompute表中都存在,且是表的分区字段。

时间: 2024-07-31 08:33:39

基于Apache Flume Datahub插件将日志数据同步上云的相关文章

基于OGG Datahub插件将Oracle数据同步上云

本文用到的 阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps 一.背景介绍 随着数据规模的不断扩大,传统的RDBMS难以满足OLAP的需求,本文将介绍如何将Oracle的数据实时同步到阿里云的大数据处理平台当中,并利用大数据工具对数据进行分析. OGG(Oracle GoldenGate)是一个基于日志的结构化数据备份工具,一般用于Oracle数据库之间的主从备份以及Oracle数据库到其他数据库(DB2, MySQL

Docker时代——如何实现日志数据一键上云

一. 准备工作 1.1 开通MaxCompute服务 参考使用MaxCompute的准备工作 1.2 开通Datahub服务 进入Datahub Web控制台,创建project(注意:首次使用的用户需要申请开通) 1.3 安装Docker环境 Docker官方说明了在不同操作系统下安装Docker的方法,您可以点击此处查看. 在阿里云ECS上,以CentOS 7.2为例,安装方式如下: sudo yum install docker sudo systemctl enable docker s

callback-将数据同步上传到微信端

问题描述 将数据同步上传到微信端 这是我第一次接触这方面,那些类有什么用和有什么方法都不知道,有知道的人可以详细说下吗...谢谢 //这是我上传数据那部分的代码 UploadResultJson result = MediaApi.Upload(token, Senparc.Weixin.QY.UploadMediaFileType.file, @"..batch_party_sample.csv"); AsynchronousJobId asynchronousJobId = Asy

一个助Hadoop集群数据快速上云工具

背景 越来越多的公司和企业希望将业务迁移到云上,同时业务数据也希望能更顺畅的迁移到云上. 当前业界有很多公司是以Hadoop技术构建数据中心,所以本文将探讨如何快速的将Hadoop文件系统(HDFS)上的数据迁移到云上. 在阿里云上使用最广泛的存储服务是OSS对象存储.OSS的数据迁移工具ossimport2可以将您本地或第三方云存储服务上的文件同步到OSS上,但这个工具无法读取Hadoop文件系统的数据,无法发挥Hadoop分布式的特点.并且因为工具只支持本地文件,所以需要将HDFS上的文件先

数据迁移上云优化之 - 带宽压缩加速

现在云已经是一个趋势毋庸置疑,但是怎样在有限的时间内把数据弄上云端是个需要考虑的问题. 如果能增量复制,整个时间窗口可以不需要太关注,因为切换的时间窗口比较短. 但是如果是全量迁移,或者增量迁移前的全量迁移,则需要考虑时间窗口. 一般来说,从线下到线上,网络带宽都比较低,这个我在以前把香港机房的数据迁移到国内机房时深有同感,速度之慢难以忍受. 下面是一套加速方案: 下面是一个例子: 首先在本地服务器建立远程数据库服务器隧道 : ssh -C -L 6666:127.0.0.1:1921 post

iOS5五大期待功能:无线数据同步上榜

导语:美国IT网站PCWorld今天撰文,列举了iOS 5最受期待的五大功能,无线数据同步和更好的地图服务榜上有名. 以下为文章摘要: 苹果全球开发者大会(WWDC)将于6月6日开幕.与往年一样,人们今年也在猜测苹果届时将会发布何种新品.可以预料的是,新版iOS 5将附带一系列重大升级.以下就是iOS 5五大令人期待的功能: 1.无线数据同步 这显然是Android最令人青睐的功能之一.尽管通过USB数据线备份和同步数据没有什么大不了,但如果能够实现无线同步,就将带来更好的体验. 苹果已经证实,

【大数据新手上路】“零基础”系列课程--Flume收集网站日志数据到MaxCompute

免费开通大数据服务:https://www.aliyun.com/product/odps 概述:大数据时代,谁掌握了足够的数据,谁就有可能掌握未来,而其中的数据采集就是将来的流动资产积累. 任何规模的企业,每时每刻都在产生大量的数据,但这些数据如何归集.提炼始终是一个困扰.而大数据技术的意义确实不在于掌握规模庞大的数据信息,而在于对这些数据进行智能处理,从中分析和挖掘出有价值的信息,但前提是如何获取大量有价值的数据. 相信很多做过网站管理的人对网站访问日志(Access Log)应该不会陌生,

MaxCompute(原ODPS)开发入门指南——数据上云篇

MaxCompute(原ODPS)开发入门指南--数据上云篇 写在最前面 >>>进入了解更多>>>阿里云数加·MaxCompute大数据计算服务. 根据<MaxCompute(原ODPS)开发入门指南--计量计费篇>的了解,大家清楚了MaxCompute可以做什么,计费模式如何,想必大家也开通了MaxCompute想进行一次POC,但是大家遇到第一个问题一定是我的数据如何上云? 可通过多种方式数据流入MaxCompute MaxCompute(原ODPS)提

flume学习(六):使用hive来分析flume收集的日志数据

前面已经讲过如何将log4j的日志输出到指定的hdfs目录,我们前面的指定目录为/flume/events. 如果想用hive来分析采集来的日志,我们可以将/flume/events下面的日志数据都load到hive中的表当中去. 如果了解hive的load data原理的话,还有一种更简便的方式,可以省去load data这一步,就是直接将sink1.hdfs.path指定为hive表的目录. 下面我将详细描述具体的操作步骤. 我们还是从需求驱动来讲解,前面我们采集的数据,都是接口的访问日志数