DataX使用指南——ODPS to ODPS

1. DataX是什么

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

2. 何时用DataX

  1. DataX是离线数据同步工具,当需要迁移增量时,建议使用DTS,而不是DataX;
  2. 针对离线数据,当数据量很大或表非常多时,建议使用DataX。此时配置文件可编写脚本批量生成,详见ODPS数据迁移指南。同时可以增大DataX本身的并发,并提高运行DataX的任务机数量,来达到高并发,从而实现快速迁移;

3. DataX怎么用

3.1 DataX的配置文件

如下是DataX的配置文件示例:

{
    "job": {
           "content":[
                {
                    "reader":{
                        "name":"odpsreader",
                        "parameter":{
                            "accessId":"<accessID>",
                            "accessKey":"**",
                            "column":[
                                "col_1",
                                "col_2"
                            ],
                            "odpsServer":"http://service.odps.aliyun-inc.com/api",
                            "partition":[
                                "dt=20160524"
                            ],
                            "project":"src_project_name",
                            "splitMode":"record",
                            "table":"table_name_1"
                        }
                    },
                    "writer":{
                        "name":"odpswriter",
                        "parameter":{
                            "accessId":"<accessId>",
                            "accessKey":"**",
                            "accountType":"aliyun",
                            "column":[
                                "ci_name",
                                "geohash"
                            ],
                            "odpsServer":"http://service.odps.xxx.com/api",
                            "partition":"dt=20160524",
                            "project":"dst_project_name",
                            "table":"nb_tab_http"
                        }
                    }
                }
            ],
        "setting":{
            "speed":{
                "channel":20
            }
        }
    }
}
  1. 整个配置文件是一个job的描述;
  2. job下面有两个配置项,content和setting,其中content用来描述该任务的源和目的端的信息,setting用来描述任务本身的信息;
  3. content又分为两部分,reader和writer,分别用来描述源端和目的端的信息;
  4. 本例中由于源和目的都是ODPS,所以类型为odpsreader和odpswriter。均包含accessId,accessKey与odpsServer的api地址。
  5. 同时预迁移表的project,table以及列名和分区信息都要一一填写清楚。
  6. setting中的speed项表示同时起20个并发去跑该任务。

3.2 运行Datax任务

运行Datax任务很简单,只要执行python脚本即可。

python /home/admin/datax3/bin/datax.py ./json/table_1.json

运行后,可在终端查看运行信息。建议真正跑任务时,可按照ODPS迁移指南中给出的批量工具的方式运行。即将所有的命令整理到一个sh文件中,最后再用nohup运行该文件。

cat /root/datax_tools/run_datax.sh
python /home/admin/datax3/bin/datax.py ./json/table_1.json > ./log/table_1.log
#实际运行
nohup /root/datax_tools/run_datax.sh > result.txt 2>&1 &

4. DataX调优

DataX调优要分成几个部分,任务机指运行Datax任务所在的机器。

  1. 网络本身的带宽等硬件因素造成的影响;
  2. DataX本身的参数;
  3. 从源端到任务机;
  4. 从任务机到目的端;

即当觉得DataX传输速度慢时,需要从上述四个方面着手开始排查。

4.1 网络本身的带宽等硬件因素造成的影响

此部分主要需要了解网络本身的情况,即从源端到目的端的带宽是多少,平时使用量和繁忙程度的情况,从而分析是否是本部分造成的速度缓慢。一下提供几个思路。

  1. 可使用从源端到目的端scp的方式观察速度;
  2. 结合监控观察任务运行时间段时,网络整体的繁忙情况,来判断是否应将任务避开网络高峰运行;
  3. 观察任务机的负载情况,尤其是网络和磁盘IO,观察其是否成为瓶颈,影响了速度;

4.2 DataX本身的参数

  1. 可通过增加如下的core参数,去除掉DataX默认对速度的限制;

    {
        "core":{
            "transport":{
                "channel":{
                    "speed":{
                        "record":-1,
                        "byte":-1
                    }
                }
            }
        },
        "job":{
            ...
        }
    }
    
  2. 针对odpsreader有如下参数可以调节,注意并不是压缩速度就会提升,根据具体情况不同,速度还有可能下降,此项为试验项,需要具体情况具体分析。
    ...
    “parameter”:{
        "isCompress":"true",
        ...
    }
    
  3. 针对odpswrtier有如下参数可以调节,其中isCompress选项同reader,blockSizeInMB,为按块写入,后面的值为块的大小。该项值并不是越大越好,一般可以结合tunnel做综合考量。过分大的 blockSizeInMB 可能造成速度波动以及内存OOM。
    ...
    “parameter”:{
        "isCompress":"true",
        "blockSizeInMB":128,
        ...
    }
    
  4. 针对任务本身,有如下参数可以调节,注意如果调整了tunnel的数量,可能会造成JVM虚拟机崩溃,故需修改相应的参数;
    "job": {
    "setting": {
        "speed": {
            "channel": 32
        }
    }
    

    channel增大,为防止OOM,需要修改datax工具的datax.py文件。如下所示,可根据任务机的实际配置,提升-Xms与-Xmx,来防止OOM。由此可以看出,tunnel并不是越大越好,过分大反而会影响宿主机的性能。

    DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
    

4.3 从源端到任务机

  1. 可使用dataX从源端传输分区信息到本机,来观察速度。并和初始任务的速度进行比较,从而判断是哪一部分的原因造成的速度缓慢;具体配置文件如下:

    "writer": {
        "name": "txtfilewriter",
        "parameter": {
            "path": "/root/log",
            "fileName": "test_src_result.csv",
            "writeMode": "truncate",
            "dateFormat": "yyyy-MM-dd"
        }
    }
    

    试验时,注意观察任务机本身的IO负载。

  2. 如果判断结果是源端的速度慢,可将任务机迁移至源端所在的集群,并再次运行任务,观察结果;
  3. 可尝试用odpscmd,直接从源端odps下载分区到本地,和上述结果作比较。如果odpscmd速度快,可尝试调整datax的相关参数;
    odpscmd --config=odps_config.ini.src
    > tunnel download -thread 20 project_name.table_name/dt='20150101' log.txt;
    

    上述是通过tunnel使用20线程下载数据到本地。

  4. 如果是在专有云环境可尝试指定源端的tunnel server的ip进行测试,从而排除从域名到负载均衡部分的网络造成的影响。源端的tunnel server的ip需要咨询云端管理员。
     ...
    “parameter”:{
        "tunnelServer":"http://src_tunnel_server_ip",
        ...
    }
    

    注意此步骤可选择负载较低的tunnel_server。

  5. 观察源端tunnel server的负载情况,尤其是磁盘IO和网卡的负载,从而判断是否为tunnel sever负载过高造成了资源瓶颈。
  6. 观察源端的表结果,当有多个分区键或列过多时,都有可能造成传输的性能下降,此时可考虑换一张表进行测试,以排除表结构等问题造成的影响。

4.4 从任务机到目的端

  1. 可使用datax从任务机传输文件分区文件到目的端,来观察速度。并和初始任务的速度进行比较,从而判断是哪一部分的原因造成的速度缓慢;具体配置文件如下:

    "reader": {
        "name": "txtfilereader",
        "parameter": {
            "path": ["/home/haiwei.luo/case00/data"],
            "encoding": "UTF-8",
            "column": [
                {
                    "index": 0,
                    "type": "long"
                },
                {
                    "index": 1,
                    "type": "boolean"
                },
                {
                    "index": 2,
                    "type": "double"
                },
                {
                    "index": 3,
                    "type": "string"
                },
                {
                    "index": 4,
                    "type": "date",
                    "format": "yyyy.MM.dd"
                }
            ],
            "fieldDelimiter": ","
        }
    },
    
  2. 如果判断结果是源端的速度慢,可将任务机迁移至源端所在的集群,并再次运行任务,观察结果;
  3. 可尝试用odpscmd,直接从本地上传分区到目的端,和上述结果作比较。如果odpscmd速度快,可尝试调整datax的相关参数;
    odpscmd --config=odps_config.ini.src
    > tunnel upload ./log.txt mingxuantest/pt='20150101';
    

    上述log.txt可为上一步tunnel下载的文件,或自行编写。

  4. 如果是在专有云环境可尝试指定指定端的tunnel server的ip进行测试,从而排除从域名到负载均衡部分的网络造成的影响。源端的tunnel server的ip需要咨询云端管理员。
     ...
    “parameter”:{
        "tunnelServer":"http://dst_tunnel_server_ip",
        ...
    }
    

    注意此步骤可选择负载较低的tunnel_server。

  5. 观察源端tunnel server的负载情况,尤其是磁盘IO和网卡的负载,从而判断是否为tunnel sever负载过高造成了资源瓶颈。

4.5 综合

  1. 通过对DataX本身参数,源端到任务机、任务机到目的端的网络、负载等情况综合考量,进行针对各个部分的优化;
  2. 同时,可在多台机器上部署DataX,将任务平均分配到多台机器上并发运行,来提高速度;
时间: 2024-10-02 20:08:39

DataX使用指南——ODPS to ODPS的相关文章

ODPS到ODPS数据迁移指南

1.工具选择与方案确定 目前,有两种方式可用于专有云环境下的从MaxCompute到MaxCompute整体数据迁移. (1)使用DataX工具进行迁移,迁移所需的作业配置文件及运行脚本,可用DataX批量配置工具来生成: (2)通过大数据开发套件(DataIDE)- 数据开发进行迁移,这种方式可选择界面向导模式逐步配置,操作简单容易上手: 2.具体实施 2.1使用DataX工具  这种场景需要先从源MaxCompute中导出元数据DDL,在目标MaxCompute中初始化表,然后借助DataX

阿里巴巴大数据计算平台MaxCompute(原名ODPS)全套攻略(持续更新20171122)

  概况介绍 大数据计算服务(MaxCompute,原名ODPS,产品地址:https://www.aliyun.com/product/odps)是一种快速.完全托管的TB/PB级数据仓库解决方案.MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全.本文收录了大量的MaxCompute产品介绍.技术介绍,帮助您快速了解MaxCompute/ODPS. MaxCompute 2.0:阿里巴巴的大数

使用 MaxCompute(原ODPS) java sdk 运行安全相关命令

转自zhenhong 使用 MaxCompute console 的同学,可能都使用过 odps 安全相关的命令.官方文档上有详细的 odps 安全指南,并给出了安全相关命令列表. 简而言之,权限管理.项目空间安全配置以及用户及授权管理都属于 odps 安全命令相关的范畴. 再直白一点,以下列关键字开头的命令,都是 odps 安全相关操作命令: GRANT/REVOKE ... SHOW GRANTS/ACL/PACKAGE/LABEL/ROLE/PRINCIPALS SHOW PRIV/PRI

OSS文件上传及OSS与ODPS之间数据连通

场景描述        有这样一种场景,用户在自建服务器上存有一定数量级的CSV格式业务数据,某一天用户了解到阿里云的OSS服务存储性价比高(嘿嘿,颜值高),于是想将CSV数据迁移到云上OSS中,并且未来还想对这些数据做一些离线分析,挖掘其中存在价值,因此需要将OSS中文件再通过一种方式同步到ODPS数加平台上,面对这样需求,小编我经过参考文档,实践,调试并修复Bug,实现出以下一种解决方案. 实现目标     通过OSS的Java SDK以及批量数据通道tunnel SDK实现以下两个功能:

MaxCompute(原ODPS) 事件(Event)机制

免费开通大数据服务:https://www.aliyun.com/product/odps 转自habai 什么是 ODPS 事件机制 Odps event 用于监控表和实例等odps资源(目前只用于监控表).当表状态发生变化时,odps 会向预先注册(订阅)的 uri 发送信息.Event通知只有在订阅Event之后才能收到.每个project中可以订阅一个或多个Event.Event是用户的数据,同表数据一样,创建或修改时都需要有这个Project的操作权限.关于Event的Restful

使用阿里巴巴开源数据库同步工具DATAX实现跨数据库同步

使用阿里巴巴开源数据库同步工具DATAX实现跨数据库同步 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.HDFS.Hive.OceanBase.HBase.OTS.ODPS 等各种异构数据源之间高效的数据同步功能. 点击进入 先请配置DataX 环境变量 Linux.Windows JDK(1.8) Python(推荐Python2.6.X) Apache Maven 3.x (Compile DataX) 下面演示dataX 配置示例:从M

DataX配置及使用

一. DataX3.0概览 ​ DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL.Oracle等).HDFS.Hive.ODPS.HBase.FTP等各种异构数据源之间稳定高效的数据同步功能. 设计理念 为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源.当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步. 当前使用现状 DataX在阿里巴巴

阿里巴巴开源离线同步工具 DataX3.0 介绍

一. DataX3.0概览 DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL.Oracle等).HDFS.Hive.ODPS.HBase.FTP等各种异构数据源之间稳定高效的数据同步功能. 设计理念 为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源.当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步. 当前使用现状 DataX在阿里巴巴集团

MaxCompute实战之数据存储

MaxCompute(原名ODPS),是阿里巴巴自主研发的海量数据处理平台. 主要服务于批量结构化数据的存储和计算,可以提供海量数据仓库的解决方案以及针对大数据的分析建模服务.支持PB级别数据分析,常用场景如下:大型互联网企业的数据仓库和BI分析.网站的日志分析.电子商务网站的交易分析.用户特征和兴趣挖掘等. 我们接入MaxCompute主要做两个工作,一是网站的日志分析,二是用户特征和兴趣挖掘.其中网站日志分析项目组内(就是高德开放平台)已经玩的比较溜了,"用户特征和兴趣挖掘"还在探