阿里云大数据利器之-RDS迁移到Maxcompute实现动态分区

当前,很多用户的业务数据存放在传统关系型数据库上,例如阿里云的RDS,做业务读写操作。当数据量非常大的时候,此时传统关系型数据库会显得有些吃力,那么会经常有将mysql数据库的数据迁移到[大数据处理平台-大数据计算服务(Maxcompute,原ODPS)(https://www.aliyun.com/product/odps?spm=5176.doc27800.765261.309.dcjpg2),利用其强大的存储和计算能力进行各种查询计算,结果再回流到RDS。

         一般情况下,业务数据是按日期来区分的,有的静态数据可能是按照区域或者地域来区分,在Maxcompute中数据可以按照分区来存放,可以简单理解为一份数据放在不同的子目录下,子目录的名称以日期来命名。那么在RDS数据迁移到Maxcompute上的过程中,很多用户希望可以自动的创建分区,动态的将RDS中的数据,比如按日期区分的数据存放到Maxcompute中,这个流程自动化创建。同步的工具是使用Maxcompute的配套产品-[大数据开发套件](https://data.aliyun.com/product/ide?spm=5176.7741945.765261.313.TQqfkK)。下面就举例说明RDS-Maxcompute自动分区几种方法的使用。

一,将RDS中的数据定时每天同步到Maxcompute中,自动创建按天日期的分区。

这里就要用到大数据开发套件-数据集成的功能,我们采用界面化的配置。

如图地方,设置Maxcompute的分区格式

一般配置到这个地方的时候,默认是系统自带时间参数:${bdp.system.bizdate} 格式是yyyymmdd。也就是说在调度执行这个任务的时候,这个分区会被自动替换为 **任务执行日期的前一天**,相对用户比较方便的,因为一般用户业务数据是当前跑前一天的业务数据,这个日期也叫业务日期。

如图

如果用户想使用当天任务运行的日期作为分区值,需要自定义这个参数,方法如图,也可以参考文档

https://help.aliyun.com/document_detail/30281.html?spm=5176.product30254.6.604.SDunjF

自定义的参数,格式非常灵活,日期是当天日期,用户可以自由选择哪一天,以及格式。

可供参考的变量参数配置方式如下:

后N年:$[add_months(yyyymmdd,12*N)]

前N年:$[add_months(yyyymmdd,-12*N)]

后N月:$[add_months(yyyymmdd,N)]

前N月:$[add_months(yyyymmdd,-N)]

后N周:$[yyyymmdd+7*N]

前N周:$[yyyymmdd-7*N]

后N天:$[yyyymmdd+N]

前N天:$[yyyymmdd-N]

后N小时:$[hh24miss+N/24]

前N小时:$[hh24miss-N/24]

后N分钟:$[hh24miss+N/24/60]

前N分钟:$[hh24miss-N/24/60]

注意:

请以中括号 [] 编辑自定义变量参数的取值计算公式,例如 key1=$[yyyy-mm-dd]。

默认情况下,自定义变量参数的计算单位为天。例如 $[hh24miss-N/24/60] 表示 (yyyymmddhh24miss-(N/24/60 * 1天)) 的计算结果,然后按 hh24miss 的格式取时分秒。

使用 add_months 的计算单位为月。例如 $[add_months(yyyymmdd,12 N)-M/24/60] 表示 (yyyymmddhh24miss-(12 N 1月))-(M/24/60 1天) 的结果,然后按 yyyymmdd 的格式取年月日。

如图,配置完成后,我们来测试运行看下,直接查看日志

可以,看到日志中,Maxcompute(日志中打印原名ODPS)的信息中

partition分区,date_test=20170829,自动替换成功。

再看下实际的数据过去了没呢

我们看到数据是过来了,成功自动创建了一个分区值。那么这个任务定时调度的时候,就会自动生成一个分区,每天自动的将RDS中的数据同步到Maxcompute中的按照日期创建的分区中。

二,如果用户的数据有很多运行日期之前的历史数据,怎么自动同步,自动分区呢。大数据开发套件-运维中心-有个补数据的功能。

首先,我们需要在RDS端把历史数据按照日期筛选出来,比如历史数据2017-08-25这天的数据,我要让他自动同步到Maxcompute的20170825的分区中。

在RDS阶段可以设置where过滤条件,如图

在Maxcompute页面,还是按照之前一样配置

然后一定要 保存-提交。

提交后到运维中心-任务管理-图形模式-补数据

选择日期区间

提交运行,这个时候就会同时生成多个同步的任务实例按顺序执行

看下运行的日志,可以看到运行过程对RDS数据的抽取,在Maxcompute自动创建的分区

看下运行结果,数据写入的情况,自动创建了分区,数据同步过来了。

三,如果用户数据量比较巨大,第一次全量的数据,或者并不是按照日期分区,是按照省份等分区。那么此时数据集成就不能做到自动分区了。也就是说,想按照RDS中某个字段进行hash,相同的字段值自动放到Maxcompute中以这个字段对应值的分区中。

同步本身是做不了的,是在Maxcompute中通过SQL完成,是Maxcompute的特有功能,实际上也是真正的动态分区,大家可以参考文章

?spm=5176.8091938.0.0.JMsroJ

。那么就需要我们先把数据全量同步到Maxcompute的一个临时表。

流程如下

1,先创建一个SQL脚本节点-用来创建临时表

drop table if exists emp_test_new_temp;
CREATE TABLE emp_test_new_temp
(date_time STRING,
	name STRING,
	age BIGINT,
	sal DOUBLE);	

2,创建同步任务的节点,就是简单的同步任务,将RDS数据全量同步到Maxcompute,不需要设置分区。

3,使用sql进行动态分区到目的表

drop table if exists emp_test_new;
--创建一个ODPS分区表(最终目的表)
	CREATE TABLE emp_test_new (
	date_time STRING,
	name STRING,
	age BIGINT,
	sal DOUBLE
)
PARTITIONED BY (
	date_test STRING
);
--执行动态分区sql,按照临时表的字段date_time自动分区,date_time字段中相同的数据值,会按照这个数据值自动创建一个分区值
--例如date_time中有些数据是2017-08-25,会自动在ODPS分区表中创建一个分区,date=2017-08-25
--动态分区sql如下
--可以注意到sql中select的字段多写了一个date_time,就是指定按照这个字段自动创建分区
insert overwrite table emp_test_new partition(date_test)select date_time,name,age,sal,date_time from emp_test_new_temp
--导入完成后,可以把临时表删除,节约存储成本
drop table if exists emp_test_new_temp;

最后将三个节点配置成一个工作流,按顺序执行

执行过程,我们重点观察,最后一个节点的动态分区过程

最后,看下数据

完成动态分区,自动化分区。是不是很神奇,相同的日期数据,到了同一个分区里。如果是以省份命名,也是如此,我自己都怕了。

大数据开发套件实际上可以完成绝大部分的自动化作业,尤其是数据同步迁移,调度等,界面化操作使得数据集成变得简单,不用苦逼的加班搞ETL了,你懂的。

时间: 2024-09-08 10:00:04

阿里云大数据利器之-RDS迁移到Maxcompute实现动态分区的相关文章

阿里云大数据利器之-RDS迁移到Maxcompute实现自动分区

当前,很多用户的业务数据存放在传统关系型数据库上,例如阿里云的RDS,做业务读写操作.当数据量非常大的时候,此时传统关系型数据库会显得有些吃力,那么会经常有将mysql数据库的数据迁移到[大数据处理平台-大数据计算服务(Maxcompute,原ODPS)(https://www.aliyun.com/product/odps?spm=5176.doc27800.765261.309.dcjpg2),利用其强大的存储和计算能力进行各种查询计算,结果再回流到RDS.          一般情况下,业

阿里云大数据利器之-使用sql实现流计算做实时展现业务( flume故障转移版 )

实时业务处理的需求越来越多,也有各种处理方案,比如storm,spark等都可以.那以数据流的方向可以总结成数据源-数据搜集-缓存队列-实时处理计算-数据展现.本文就用阿里云产品简单实现了一个实时处理的方案. 一,总体架构 按照数据流向 数据采集:flume(配置故障转移) 缓存队列:datahubhttps://help.aliyun.com/product/53345.html?spm=5176.7618386.3.4.cigK2v 数据计算:阿里流计算(StreamCompute)http

阿里云大数据利器之-使用flume+sql实现流计算做实时展现业务(归档Maxcompute)

实时业务处理的需求越来越多,也有各种处理方案,比如storm,spark等都可以.那以数据流的方向可以总结成数据源-数据搜集-缓存队列-实时处理计算-数据展现.本文就用阿里云产品简单实现了一个实时处理的方案. 一,总体架构 按照数据流向 数据采集:flume(配置故障转移) 缓存队列:datahubhttps://help.aliyun.com/product/53345.html?spm=5176.7618386.3.4.cigK2v 数据计算:阿里流计算(StreamCompute)http

阿里云大数据利器Maxcompute学习之-假如你使用过hive

如果您是一个大数据开发工程师并且使用过hadoop的hive框架,那么恭喜您,阿里云的大数据计算服务-Maxcompute,您已经会了90%.这篇文章就来简单对比下Maxcompute和hive的异同,来方便刚开始使用Maxcompute的用户,从hive秒速迁移到Maxcompute的使用上. 首先,回顾下hive的概念. 1.hive是基于hadoop的,以表的形式来存储数据,实际上数据是存储在hdfs上,数据库和表其实是hdfs上的两层目录,数据是放在表名称目录下的,计算还是转换成mapr

阿里云大数据利器Maxcompute学习之--数据同步任务常见日志报错总结

在使用大数据开发套件时最常用的就是数据同步模块,工单里最常见的问题就是其中数据同步的问题,这里总结一些常见一些从Maxcompute到其他数据源的同步任务报错案例,主要是日志中出现数据回滚写入的问题.   那首先看下日志中数据回滚的原因,当数据写入rds或者hybridDB等一些支持事务的数据库中,数据批量写入,一旦由于各种原因没有写入成功,这个批次的数据会回滚重新写入,如果再次写入失败,就会报脏数据的错误导致任务失败.数据写入失败可能是以下原因导致回滚.1,脏数据(数据值超过数据类型最大范围,

阿里云大数据利器Maxcompute-使用mapjoin优化查询

大数据计算服务(MaxCompute,原名 ODPS)是一种快速.完全托管的 GB/TB/PB 级数据仓库解决方案.https://help.aliyun.com/document_detail/27800.html?spm=5176.7840267.6.539.po3IvS 主要有三种操作数据的方式SQL,UDF,MapReduce,了解hadoop的同学就比较熟悉这些东西了. 那么Maxcompute的SQL和标准SQL最大的区别就是在Maxcompute中SQL会被解析成MapReduce

阿里云大数据利器Maxcompute学习之--分区表的使用

初学大数据Maxcompute时部分用户不是很熟悉Maxcompute分区表的概念和使用,那这篇文章来简单介绍下分区表的概念及使用场景.  实际上,分区在很多框架中都有这个概念,比如开源框架中的hive等.打个比喻,某城市粮仓里存放麦子,粮仓里按照县城分为很多区域,每个县城都有自己的一块地方,每个县城的麦子放在自己对应的区域上.如果上级领导来检查,想看某县城的麦子情况,那直接可以根据区域来迅速找到该县城的麦子.对应到Maxcompute分区表,粮仓相当于其中一张表,每个区域相当于以这个县城命名的

阿里云大数据利器Maxcompute学习之--窗口函数实现分组TopN

看到很多用户经常会问如何对分组内进行排序. 官方文档:https://help.aliyun.com/document_detail/34994.html?spm=5176.doc27891.6.611.Q1bk3j 例如需求: 1. odps 里面能否做排名操作,比如一个表里面有 用户ID 和 金额 两个字段,用金额大小排序的话,我如何计算用户的排名(金额最大的是 第一名 ,以此类推) 2. 计算每个金融产品的最大投资者,或者前几名 类似这一类的需求,我们总结为实现分组内的排序,取TopN,那

双11来临,阿里云大数据(数加)会出哪些绝招?

双11来临,阿里云大数据(数加)会出哪些绝招? 双11电商       一年一度的"双11狂欢节"就要到了,眼看参加商家们都已经忙得不可开交:备货.营销.广告.预售......以往作战一般会历经"预热"."蓄势"."爆发"."返场"四个阶段,前两个阶段尤其重要,而眼看11月临近,很多商家再次为流量问题而伤神,阿里云的大数据团队继"数据魔方"."全景洞察"之后,新推出一