Hadoop数据迁到MaxCompute

通过最佳实践帮助您实现上述案例效果

Step1:数据准备

接下来,我们需要准备好一张表及数据集;

  • Hive表名:hive_dplus_good_sale;
  • 是否分区表:分区表,分区名为pt;
  • hdfs文件数据列分隔符:英文逗号;
  • 表数据量:100条。

源hive表建表语句

CREATE TABLE IF NOT EXISTS hive_dplus_good_sale(
    create_time timestamp,
    good_cate STRING,
    brand STRING,
    buyer_id STRING,
    trans_num BIGINT,
    trans_amount DOUBLE,
    click_cnt BIGINT,
    addcart_cnt BIGINT,
    collect_cnt BIGINT
    )
    PARTITIONED BY (pt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' lines terminated by '\n' ;

数据预览如下:


Hdfs&datax类型对照表

**在Hive****表中的数据类型 **

**DataX ****内部类型 **

TINYINT,SMALLINT,INT,BIGINT

Long

FLOAT,DOUBLE,DECIMAL

Double

String,CHAR,VARCHAR

String

BOOLEAN

Boolean

Date,TIMESTAMP

Date

Binary

Binary

Step2:MaxCompute目标表准备

1.1 开通MaxCompute
阿里云实名认证账号访问https://www.aliyun.com/product/odps ,开通MaxCompute,选择按量付费进行购买。


1.2 数加上创建MaxCompute project

操作步骤:

步骤1:进入数加管理控制台,前面开通MaxCompute成功页面,点击管理控制台,或者导航产品->大数据(数加)->MaxCompoute(https://www.aliyun.com/product/odps) 点击管理控制台。

步骤2:创建项目。付费模式选择I/O后付费,输入项目名称:

步骤3:创建MaxCompute表。进入大数据开发套件的数据开发页面:

新建表

对应建表语句:

CREATE TABLE IF NOT EXISTS mc_dplus_good_sale (
    create_time DATETIME COMMENT '时间',
    good_cate STRING COMMENT '商品种类',
    brand STRING COMMENT '品牌',
    buyer_id STRING COMMENT '用户id',
    trans_num BIGINT COMMENT '交易量',
    trans_amount DOUBLE COMMENT '金额',
    click_cnt BIGINT COMMENT '点击次数',
    addcart_cnt BIGINT COMMENT '加入购物车次数',
    collect_cnt BIGINT COMMENT '加入收藏夹次数'
)
PARTITIONED BY (pt STRING);

Step3:通过Datax进行数据迁移

MaxCompute中的结构化数据是以表的形式存在,我们可以通过工具或者API将数据直接写入表中。

网络传输

  • datax: https://github.com/alibaba/DataX 本case我们选用datax方式进行数据迁移。
    datax是阿里开源的一款工具,可以适配常见的数据源,包括关系数据库、文本文件等,这是一款单机的软件,适用于中小数据量的传输,传输效率按照单机网卡 50%(约50MB/s)计算,一天可以传输的数据量为
    50MB/s * 60 * 60 * 24 / 1024/1024 ≈ 4T
    具体的传输效率会取决于网络环境(机房出口带宽、本地网络),机器性能,文件格式与数量等多个因素。
    使用datax传输数据到ODPS的部署结构如下,在ECS机器或一台物理机器上部署好dataX,此机器要能够同时连通数据源与MaxCompute(原ODPS)服务。

磁盘导入

当数据量比较大(PB级别以上)时,可以考虑通过磁盘将数据导入ODPS,需要磁盘导入请联系相应的大数据业务架构师。

API: Tunnel SDK

当数据源较特殊,无法使用以上现有的工具的时候,可以用SDK编写适配的工具,从数据源读出并写入ODPS中,Tunnel是数据上传的底层接口,可以同时接受大量客户端并行的写入。
https://help.aliyun.com/document_detail/27837.html?spm=5176.doc34614.6.144.RyeirI

ECS机器部署

前提准备:获取ECS的主机名和IP:

操作步骤:

步骤1:增加调度资源。大数据开发套件->项目管理->调度资源管理->增加调度资源,输入名称和标识hive2mc:

步骤2:添加配置ECS服务器:

输入前提准备中获取到的ECS服务器名称和IP,点击添加。

步骤3:初始化ECS服务器:

按照弹框提示,登录ECS服务器(登录外网IP)在root用户下执行两个命令,即部署相关datax等服务。

命令执行成功后,回到“管理服务器”页面,点击刷新按钮可以看到服务器状态为“正常”状态:

此时该项目的调度资源组就有两个:默认资源组、刚添加成功的资源组,后面执行datax任务即使用刚添加成功的资源组进行调度运行。

任务配置

前提准备:查看hive表对应数据文件地址

操作步骤:

步骤1:前面ECS机器部署介绍的资源组配置好后,导航上点击进入“数据开发”页面进行任务开发。

步骤2:创建工作流,命名为hive2MC_demo,选择一次性调度(本case是做一次性迁移,若需要每日生产调度则选择周期调度):

步骤3:在工作流中创建shell节点:

步骤4:代码配置。Shell节点创建后,双击节点图,进入代码编辑器进行Shell代码编辑。

代码中进行datax配置,reader为hdfsreader,writer为odpswriter,具体代码如下,只需要修改reader和writer的配置:

shell_datax_home='/home/admin/shell_datax'
mkdir -p ${shell_datax_home}
shell_datax_config=${shell_datax_home}/${ALISA_TASK_ID}
echo '''
{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/user/hive/warehouse/hive_table_name/*",
                        "defaultFS": "hdfs://xxx:port",
                        "column": [
                               {
                                "index": 0,
                                "type": "date"
                               },
                               {
                                "index": 1,
                                "type": "STRING"
                               },
                               {
                                "index":2,
                                "type": "STRING"
                               },
                               {
                                "index":3,
                                "type": "STRING"
                               },
                               {
                                "index": 4,
                                "type": "long"
                               }
                               ,
                               {
                                "index": 5,
                                "type": "DOUBLE"
                               },
                               {
                                "index": 6,
                                "type": "long"
                               },
                               {
                                "index": 7,
                                "type": "long"
                               },
                               {
                                "index": 8,
                                "type": "long"
                               }
                        ],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }

                },
                "writer": {
                    "name": "odpswriter",
                        "parameter": {
                          "project": "base_case",
                          "table": "mc_dplus_good_sale",
                          "partition":"pt=20160618",
                          "column": ["create_time","good_cate","brand","buyer_id","trans_num","trans_amount","click_cnt","addcart_cnt","collect_cnt"],
                          "accessId": "xxxxxxxxxx",
                          "accessKey": "xxxxxxxxxxx",
                          "truncate": true,
                          "odpsServer": "http://service.odps.aliyun.com/api",
                          "tunnelServer": "http://dt.odps.aliyun.com",
                          "accountType": "aliyun"
                       }
                }
            }
        ]
    }
}''' > ${shell_datax_config}

echo "`date '+%Y-%m-%d %T'` shell datax config: ${shell_datax_config}"

/home/admin/datax3/bin/datax.py ${shell_datax_config}
shell_datax_run_result=$?

rm ${shell_datax_config}

if [ ${shell_datax_run_result} -ne 0 ]
then
    echo "`date '+%Y-%m-%d %T'` shell datax ended failed :("
    exit -1
fi
echo "`date '+%Y-%m-%d %T'` shell datax ended success~"

hdfsreader

  • path:hive表hdfs文件路径(注意格式);
  • defaultFS: hdfs文件系统namenode节点地址(注意格式);
  • index:hdfs数据文本第几列,以0开始。
  • Type:hdfs数据对应datax的类型,请看hdsf&datax类型对照表;
  • fileType: hdfs数据文件类型;
  • encoding: hdfs数据文件的编码;
  • fieldDelimiter: hdfs数据文件字段分隔符。
    Odpswriter
  • project: MaxCompute目标表所属项目名称;
  • table: MaxCompute目标表名称;
  • partition:表分区值,此case直接用固定值;
  • column:目标表列,注意顺序与reader的列顺序一一对应;
  • accessId:写入目标表所用的云账号access id;
  • accessKey:写入目标表所用的云长access key;
  • truncate:写入前是否清空当前表/分区数据,需要清空就true,需要保留就false。

步骤5:保存shell节点,提交工作流。

步骤6:指派执行资源组。这里将给shell任务指定执行的机器即我们前面部署的ECS。

hive2mc资源组即为《ECS机器部署》章节中配置的资源组。资源组配置好后,后面就可以执行shell任务进行数据迁移。

Step4:执行数据迁移任务

接前面的步骤,点击shell节点的操作->流程:

进入该节点的任务管理视图界面,右击节点图,选择测试节点,即可把该阶段调度执行起来:

示例名称和业务日期采用默认,直接点击生成并运行,然后点击“前往查看运行结果”。


进入测试实例视图,可以看到节点示例执行状态,如下图变成绿色打勾的符号说明执行成功,右击节点图选择 查看运行日志 查看具体执行日志。


查看日志中读出100条,写入失败0条,说明成功写入100条数据;

Step5:校验MaxCompute表数据

可以到数据开发中创建一个脚本文件


运行代码:

select create_time,good_cate,brand,buyer_id,trans_num,trans_amount,click_cnt,addcart_cnt,collect_cnt from mc_dplus_good_sale limit 10;

查看数据是否正常。


也可以执行

select count(*) from mc_dplus_good_sale where pt=20160618;

查看是否导入的数据是否是100条。

时间: 2024-09-14 05:45:37

Hadoop数据迁到MaxCompute的相关文章

[大数据新手上路]“零基础”系列课程--如何将ECS上的Hadoop数据迁移到阿里云数加·MaxCompute

免费开通大数据服务:https://www.aliyun.com/product/odps  想用阿里云数加·大数据计算服务(MaxCompute),但是现在数据还在hadoop上,怎么办? 大数据计算服务(MaxCompute) 快速.完全托管的TB/PB级数据仓库解决方案,向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全. 了解更多   别烦恼,跟着我们走,来一次MaxCompute零基础数据迁移之旅-Let'

阿里巴巴大数据计算平台MaxCompute(原名ODPS)全套攻略(持续更新20171122)

  概况介绍 大数据计算服务(MaxCompute,原名ODPS,产品地址:https://www.aliyun.com/product/odps)是一种快速.完全托管的TB/PB级数据仓库解决方案.MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全.本文收录了大量的MaxCompute产品介绍.技术介绍,帮助您快速了解MaxCompute/ODPS. MaxCompute 2.0:阿里巴巴的大数

Apache Sqoop 1.99.4 发布,Hadoop 数据迁移

Apache Sqoop 1.99.4 发布,这是 Sqoop2 的第四个里程碑版本,是非常重要的一个里程碑. Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导入到Hadoop的HDFS中,也可以将HDFS的数据导入到关系型数据库中. 该版本改进内容和新特性: Improvement [SQOOP-773] – Sqoop2: Batch execution support fo

关于mysql和hadoop数据交互的问题,和hadoop的文件夹设计

问题描述 关于mysql和hadoop数据交互的问题,和hadoop的文件夹设计 关于mysql和hadoop数据交互的问题,和hadoop的文件夹设计 关于mysql和hadoop数据交互的问题,和hadoop的文件夹设计 目前mysql按地区,商业区区分,假设读取mysql数据库的地区,根据地区划分 我昨天和领导沟通了,领导说点击率不是必要条件,地区划分才是重点,后面就是各方面劝导,只好以地区区分,关键是这个镇区区分数据和产品的话,全国有6k多个地区, 这样的hdfs文件夹数量,岂不是很崩溃

【大数据干货】数据进入阿里云数加-大数据计算服务MaxCompute(原ODPS)的N种方式

免费开通大数据服务:https://www.aliyun.com/product/odps 想用阿里云大数据计算服务(MaxCompute),对于大多数人首先碰到的问题就是数据如何迁移到MaxCompute中.按照数据迁移场景,大致可以分为批量数据.实时数据.本地文件.日志文件等的迁移,下面我们针对每种场景分别介绍几种常用方案. 大数据计算服务(MaxCompute) 快速.完全托管的TB/PB级数据仓库解决方案,向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海

阿里云数加大数据计算服务MaxCompute学习路线图 (持续更新中)

免费开通大数据服务:https://www.aliyun.com/product/odps 最近很多客户私信来咨询如何学习阿里云数加大数据计算服务MaxCompute 技术.为此,我们列了一个路线图供大家学习大数据计算服务MaxCompute.这个列表包含了一些社区的优秀资料和我们的原创文章.我们会随着大数据计算服务MaxCompute技术的发展持续更新本文,也会在继续贡献内容来帮助同学们快速入门或持续提高. 大数据计算服务(MaxCompute) 快速.完全托管的TB/PB级数据仓库解决方案,

一分钟了解阿里云产品:大数据计算服务MaxCompute概述

  阿里云发布了许多产品,今天让我们来了解下大数据计算服务MaxCompute这款产品吧.     什么是MaxCompute呢?   MaxCompute是由阿里云自主研发,是阿里巴巴自主研发的海量数据处理平台.提供针对TB/PB级数据.实时性要求不高的分布式处理能力,应用于数据分析.挖掘.商业智能等领域.主要服务于批量结构化数据的存储和计算,可以提供海量数据仓库的解决方案以及针对大数据的分析建模服务.阿里巴巴的离线数据业务都运行在MaxCompute上.   MaxCompute有什么优势和

阿里云携大数据计算平台MaxCompute欧洲开服

参考消息网6月19日报道 英媒称,阿里巴巴旗下的云计算部门阿里云将于2017年下半年将其"MaxCompute"大数据服务带入欧洲. 据英国科技经济类网站硅谷网6月15日报道,当地时间6月15日,阿里云在巴黎VivaTech国际科技创新大会上宣布大数据计算产品"MaxCompute"将于年内在欧洲市场开服,该技术涵盖处理分析.机器学习等一系列完善的数据智能服务.这也是中国类似技术的首次出海,以满足当地众多企业日益增长的数字化转型需求. 报道称,该平台允许客户存储和处

你想知道的关于Hadoop数据资源池的一切

随着Hadoop数据资源池的概念进入主流IT,越来越多的企业开始试水Hadoop.但很多只是将一部分数据池化,还没有开发出成熟的Hadoop环境. 数据资源池使用基于开源Hadoop框架和商业硬件,以池化资源的形式处理.存储和管理大数据,尤其是支持分析应用.支持者认为数据池架构提供了一个更便宜的替代传统数据仓库的选项,能够处理结构化.半结构化以及非结构化数据.不过,数据资源池的概念还相对较新,带来利益的同时也隐藏着陷阱.关注BI和大数据的咨询公司Eckerson 集团首席咨询师Wayne Eck