DataX配置及使用

一. DataX3.0概览

​ DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

  • 设计理念

    为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

  • 当前使用现状

    DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。

此前已经开源DataX1.0版本,此次介绍为阿里巴巴开源全新版本DataX3.0,有了更多更强大的功能和更好的使用体验。Github主页地址:https://github.com/alibaba/DataX。

二、DataX3.0框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

三. DataX3.0插件体系

​ 经过几年积累,DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:

类型 数据源 Reader(读) Writer(写)
RDBMS 关系型数据库 Mysql
Oracle
SqlServer
Postgresql
达梦
阿里云数仓数据存储 ODPS
ADS
OSS
OCS
NoSQL数据存储 OTS
Hbase0.94
Hbase1.1
MongoDB
无结构化数据存储 TxtFile
FTP
HDFS

DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。详情请看:DataX数据源指南

四、DataX3.0核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

核心模块介绍:

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

DataX调度流程:

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。
  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

五、DataX安装:

(1)、下载DataX源码:

[root@iZ2zeh44pi6rlahxj7s9azZ ~]# git clone git@github.com:alibaba/DataX.git

(2)、通过maven打包:

[root@iZ2zeh44pi6rlahxj7s9azZ ~]# cd  {DataX_source_code_home}

[root@iZ2zeh44pi6rlahxj7s9azZ ~]# yum install -y maven

[root@iZ2zeh44pi6rlahxj7s9azZ ~]# mvn -U clean package assembly:assembly -Dmaven.test.skip=true

打包成功,日志显示如下:

[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------
[INFO] Total time: 08:12 min
[INFO] Finished at: 2015-12-13T16:26:48+08:00
[INFO] Final Memory: 133M/960M
[INFO] -----------------------------------------------------------------

打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/ ,结构如下:

[root@iZ2zeh44pi6rlahxj7s9azZ ~]#  cd  {DataX_source_code_home}

[root@iZ2zeh44pi6rlahxj7s9azZ ~]#  ls ./target/datax/datax/

bin             conf            job             lib             log             log_perf


六、mysql数据传输到oracle

(1)、生成mysql到oracle的配置文件:

[root@iZ2zeh44pi6rlahxj7s9azZ bin]# python datax.py -r mysqlreader -w oraclewriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !

Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.

Please refer to the mysqlreader document:

     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

Please refer to the oraclewriter document:

     https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md

Please save the following configuration as a json file and  use

     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json

to run the job.

{

    "job": {

        "content": [

            {

                "reader": {

                    "name": "mysqlreader",

                    "parameter": {

                        "column": [],

                        "connection": [

                            {

                                "jdbcUrl": [],

                                "table": []

                            }

                        ],

                        "password": "",

                        "username": "",

                        "where": ""

                    }

                },

                "writer": {

                    "name": "oraclewriter",

                    "parameter": {

                        "column": [],

                        "connection": [

                            {

                                "jdbcUrl": "",

                                "table": []

                            }

                        ],

                        "password": "",

                        "preSql": [],

                        "username": ""

                    }

                }

            }

        ],

        "setting": {

            "speed": {

                "channel": ""

            }

        }

    }

}

(2)、传输文件配置

[root@iZ2zeh44pi6rlahxj7s9azZ bin]# vim ../job/myor.json

{

    "job": {

        "content": [

            {

                "reader": {

                    "name": "mysqlreader",

                    "parameter": {

                        "column": ["id","qiye","diqu"],

                        "connection": [

                            {

                                "jdbcUrl": ["jdbc:mysql://[HOST_NAME]:PORT/[DATABASE_NAME]"],

                                "table": ["***"]

                            }

                        ],

                        "password": "***",

                        "username": "***",

                        "where": ""

                    }

                },

                "writer": {

                    "name": "oraclewriter",

                    "parameter": {

                        "column": ["id","qiye","diqu"],

                        "connection": [

                            {

                                "jdbcUrl": "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]",

                                "table": ["***"]

                            }

                        ],

                        "password": "***",

                        "preSql": ["delete from ceshi"],

                        "username": "***"

                    }

                }

            }

        ],

        "setting": {

            "speed": {

                "channel": "1"

            }

        }

    }

}

(3)、执行传输过程

[root@iZ2zeh44pi6rlahxj7s9azZ bin]# python datax.py ../job/myor.json

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !

Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.

2017-09-25 20:09:01.200 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl

2017-09-25 20:09:01.215 [main] INFO  Engine - the machine info  =>

    osInfo:    Oracle Corporation 1.8 25.144-b01

    jvmInfo:    Linux amd64 3.10.0-514.6.2.el7.x86_64

    cpu num:    1

    totalPhysicalMemory:    -0.00G

    freePhysicalMemory:    -0.00G

    maxFileDescriptorCount:    -1

    currentOpenFileDescriptorCount:    -1

.

.

.

2017-09-25 20:19:12.557 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1419776 records, 31164761 bytes | Speed 54.10KB/s, 2457 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 597.972s |  All Task WaitReaderTime 1.983s | Percentage 0.00%

2017-09-25 20:19:32.558 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1464832 records, 32195809 bytes | Speed 50.34KB/s, 2252 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 617.847s |  All Task WaitReaderTime 2.031s | Percentage 0.00%

2017-09-25 20:19:42.559 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1491456 records, 32744862 bytes | Speed 53.62KB/s, 2662 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 628.044s |  All Task WaitReaderTime 2.054s | Percentage 0.00%

2017-09-25 20:19:52.561 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1516032 records, 33271617 bytes | Speed 51.44KB/s, 2457 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 637.787s |  All Task WaitReaderTime 2.076s | Percentage 0.00%

(4)、登录oracle验证传输是否成功

未传输时:

[oracle@iz2zec57gfl6i9vbtdksl1z ~]$ sqlplus ***/***

SQL*Plus: Release 11.2.0.4.0 Production on Mon Sep 25 20:11:44 2017

Copyright (c) 1982, 2013, Oracle.  All rights reserved.

Connected to:

Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production

With the Partitioning, Oracle Label Security, OLAP, Data Mining,

Oracle Database Vault and Real Application Testing options

SQL> select * from ceshi;

no rows selected

传输后:

[oracle@iz2zec57gfl6i9vbtdksl1z ~]$ sqlplus ***/***

SQL*Plus: Release 11.2.0.4.0 Production on Mon Sep 25 20:15:28 2017

Copyright (c) 1982, 2013, Oracle.  All rights reserved.

Connected to:

Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production

With the Partitioning, Oracle Label Security, OLAP, Data Mining,

Oracle Database Vault and Real Application Testing options

SQL> select count(*) from ceshi;

  COUNT(*)

----------

    917504

DataX参考https://github.com/alibaba/DataX

时间: 2024-11-03 03:44:47

DataX配置及使用的相关文章

使用阿里巴巴开源数据库同步工具DATAX实现跨数据库同步

使用阿里巴巴开源数据库同步工具DATAX实现跨数据库同步 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.HDFS.Hive.OceanBase.HBase.OTS.ODPS 等各种异构数据源之间高效的数据同步功能. 点击进入 先请配置DataX 环境变量 Linux.Windows JDK(1.8) Python(推荐Python2.6.X) Apache Maven 3.x (Compile DataX) 下面演示dataX 配置示例:从M

MaxCompute(原ODPS)开发入门指南——数据上云篇

MaxCompute(原ODPS)开发入门指南--数据上云篇 写在最前面 >>>进入了解更多>>>阿里云数加·MaxCompute大数据计算服务. 根据<MaxCompute(原ODPS)开发入门指南--计量计费篇>的了解,大家清楚了MaxCompute可以做什么,计费模式如何,想必大家也开通了MaxCompute想进行一次POC,但是大家遇到第一个问题一定是我的数据如何上云? 可通过多种方式数据流入MaxCompute MaxCompute(原ODPS)提

Hadoop数据迁到MaxCompute

通过最佳实践帮助您实现上述案例效果 Step1:数据准备 接下来,我们需要准备好一张表及数据集: Hive表名:hive_dplus_good_sale: 是否分区表:分区表,分区名为pt: hdfs文件数据列分隔符:英文逗号: 表数据量:100条. 源hive表建表语句 CREATE TABLE IF NOT EXISTS hive_dplus_good_sale( create_time timestamp, good_cate STRING, brand STRING, buyer_id

MySQL超时参数以及相关数据集成、DataX数据同步案例分享

一.背景 MySQL系统变量提供关于服务器的一些配置和能力信息,大部分变量可在mysqld服务进程启动时设置,部分变量可在mysqld服务进程运行时设置.合理的系统变量设值范围,是保障MySQL稳定提供服务的重要因素.本文主要描述MySQL数据库的超时timeout相关的一些系统变量,部分参数同程序应用中常见到的CommunicationsException: Communications link failure异常息息相关. 本文也结合数据同步的场景,对使用DataX3进行MySQL数据同步

DataX实现oracle到oracle之间数据传递

文章讲的是DataX实现oracle到oracle之间数据传递,首先需要注意的是DATAX是通过JDBC的方式读取ORACLE数据,然后通过OCI的方式写数据,DX也可以通过JDBC写的方式进行,但是OCI比JDBC速度更快. 进入DataX安装目录的bin目录,执行命令 ./datax.py -e 输入交换数据数据库对应的代码,它会自动生成相应的xml配置文件 编辑配置文件参数,有"?"的是必须配置的,默认的可以保持不变 执行代码: vi /home/taobao/datax/job

TableStore: 使用Datax将实例A的数据迁移到实例B中

背景 现在我们需要将数据从一个老的实例A迁移到实例B上面,做一下备份,我们打算使用Datax作为我们的数据传输工具,其中用到了otsreader和otswriter. 将数据从A实例备份到B实例 第一步,TableStore环境准备,当前Datax不支持自动建表的功能,所以我们需要在B中创建迁移对应的表.创建表的时候有两个选择,第一是使用 ots cli,第二是使用table store的SDK.我们建议使用SDK. ots cli : https://market.aliyun.com/pro

使用DataX同步MaxCompute数据到TableStore(原OTS)优化指南

概述 现在越来越多的技术架构下会组合使用MaxCompute和TableStore,用MaxCompute作大数据分析,计算的结果会导出到TableStore提供在线访问.MaxCompute提供海量数据计算的能力,而TableStore提供海量数据高并发低延迟读写的能力. 将MaxCompute内数据导出至TableStore,目前可选的几种主要途径包括: 自己编写工具:使用MaxCompute SDK通过Tunnel读取表数据,再通过TableStore SDK再写入数据. DataX:自己

使用Datax将MySQL中的数据导入到TableStore中

背景 由于我们的数据在MySQL中的数据已经快接近亿级别,在访问MySQL并发读写的时候遇到了很大的瓶颈,严重的Block了我们的业务发展,主要从白天十点到晚上十点之前,并发访问的用户比较多,我们在写的前面加上了队列,系统后台自动同步.但是读上没有很好的办法解决,所以我们急需一个有较高吞吐量的实时存储系统. 本来准备自己搭建Hbase集群,但是考虑到运维代价和成本,最终放弃了这个方案.后面给阿里云发工单,了解到阿里云有一个类似于Hbase的产品,叫做TableStore,简单看了一下,总结一下优

datax转换mongodb的数据到mysql遇到一个问题

问题描述 datax转换mongodb的数据到mysql遇到一个问题 公司新项目需要从旧的mongodb库了把数据导入到新的mysql库.在用淘宝的datax工具导入数据的时候遇到以下的问题 mongodb的是数据很多都是一个document里不是所有字段都的值都有. 例如:user = [{"name":"张三","id":"0000001″}, {"age":12,"id":"000