数据进入阿里云数加-分析型数据库AnalyticDB(原ADS)的N种方法

从  ?spm=0.0.0.0.HEVojb&do=login  转载。

数据进入AnalyticDB(原ADS)的N种方法

 

分析型数据库(AnalyticDB)是阿里巴巴自主研发的海量数据实时高并发在线分析(Realtime OLAP)云计算服务,用户可以在毫秒级针对千亿级数据进行即时的多维分析透视和业务探索。

想使用阿里云分析型数据,对于大多数人首先碰到的问题就是数据如何进入到分析型数据库中。按照分析型数据库数据表的更新类型,大致可以分为批量导入和实时写入两种,下面我们针对两种写入方式分别介绍几种常用方案。

注意:在分析型数据库中建表等准备工作在此不详细说明,请参考分析型数据库的官方说明文档。

https://help.aliyun.com/document_detail/26403.html?spm=5176.doc26412.6.569.xfDdNf

 

本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps

阿里云数加-分析型数据库AnalyticDB产品地址:https://www.aliyun.com/product/ads

一、数据批量导入到分析型数据库

批量导入是利用分析型数据库内置的导入接口,将数据从MaxCompute导入到分析型数据库,因此批量导入方式必须有MaxCompute资源的支持。如果源端为非MaxCompute数据源,那么都需要通过MaxCompute进行中转。批量导入方式适合一次导入比较大量的数据(TB级别)。 下边分别介绍如何将MaxCompute数据源和非MaxCompute数据源批量导入分析型数据库。

1、MaxCompute数据源批量导入分析型数据库

1)、通过DataIDE实现批量数据导入

i. 开通数加开发环境,在数据源管理中配置分析型数据库数据源,并保证连通性。

 

ii. 账号授权

源端为MaxCompute数据表,首次导入一个新的MaxCompute表时,需要在MaxCompute中将表Describe和Select权限授权给AnalyticDB的导入账号。公共云导入账号为garuda_build@aliyun.com以及garuda_data@aliyun.com(两个都需要授权)。

1.  


1.   USE projecta;--表所属ODPS project
2.   ADD USER ALIYUN$garuda_build@aliyun.com;--输入正确的云账号
3.   ADD USER ALIYUN$garuda_data@aliyun.com;
4.   GRANT Describe,Select ON TABLE table_name TO USER ALIYUN$garuda_build@aliyun.com;--输入需要赋权的表和正确的云账号
5.   GRANT Describe,Select ON TABLE table_name TO USER ALIYUN$garuda_data@aliyun.com;

另外为了保护用户的数据安全,导入操作的执行账号与MaxCompute数据表的创建者账号必须是同一个阿里云账号。

 

iii. 创建数据同步任务,配置数据映射

 

 

iv. 保存后提交运行,可以通过执行日志监控任务状态。

特别说明:此方法与DataIDE的工作流结合可以实现周期性自动数据导入。

 

2)、通过数据集成(Data Integration)实现批量数据导入

数据集成(Data Integration),是阿里集团对外提供的稳定高效、弹性伸缩的数据集成平台,为阿里云大数据计算引擎(包括MaxCompute、Analytic DB、OSS)提供离线(批量)数据进出通道。有别于传统的客户端点对点同步运行工具,数据集成本身以公有云服务为基本设计目标,集群化、服务化、多租户、水平扩展等功能都是其基本实现要求。采云间、御膳房、聚石塔、孔明灯的后台数据同步均是基于数据集成完成各自的数据传输需求。

 

使用示例

i. 开通数据集成,在数据源管理中配置MaxCompute数据源和分析型数据数据源,并保证连通性。

ii. 创建Pipeline

Pipeline是数据集成权限管理、资源隔离的基本单元,为权限管理、安全控制提供管理和控制,同时也是数据同步作业运行的容器。用户进入数据集成后,须首先创建一个Pipeline。

进入数据集成控制台创建普通Pipeline。

 

 

iii. 创建作业

系统默认使用界面视图进行创建作业,填写数据源端和目的端的信息。

 

也可以使用JSON视图,选择源端和目的端类型,系统会自动生成模板,填写相应的信息即可。

配置字段映射关系。

 

创建作业成功。

 

iv. 账号授权

操作与第一章第1节第1)部分的账号授权相同。

 

v. 运行作业

数据集成可以手动运行作业,也可以定会运行。

手动运行

 

定时运行

 

可以查看执行日志。

 

3)、通过DataX 实现批量数据导入

DataX 是阿里巴巴集团内被广泛使用的异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、MaxCompute(原ODPS)、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。DataX 本身作为离线数据同步框架,采用Framework + plugin架构构建,将数据源读取和写入抽象成为Reader/Writer 插件,纳入到整个同步框架中。

 

使用示例

i. 直接下载DataX 工具包,下载后解压至本地某个目录,修改权限为755。下载地址:

http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

ii. 查看作业配置文件模板

python datax.py -r odpsreader -w adswriter

 

iii. 根据配置文件模板填写相关选项(源和目标数据库的用户名、密码、

URL、表名、列名等)。

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "odpsreader",
                    "parameter": {
                        "accessId": "your_access_id",
                        "accessKey": "your_access_key",
                        "column": [
                                   "id",
                                   "name"
                            ],
                        "odpsServer": "http://service.odps.aliyun.com/api",
                        "packageAuthorizedProject": "",
                        "partition": [],
                        "project": "your_project_name",
                        "splitMode": "record",
                        "table": "your_table_name"
                    }
                },
                "writer": {
                    "name": "adswriter",
                    "parameter": {
                        "lifeCycle": 2,
                        "overWrite": "true",
                        "partition": "",
                        "password": "your_access_key",
                        "schema": "your_database_name",
                        "table": "your_table_name",
                        "url": "host:port",
                        "username": "your_access_id"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

 

iv. 账号授权

操作与第一章第1节第1)部分的账号授权相同。

 

v. 启动任务

python {your_datax_dir}/bin/datax.py {your_jsonfile_path}

 

 

导入任务的所有相关信息及执行日志会打印到标准输出。

 

4)、通过分析型数据库LOAD命令实现批量数据导入

无论是使用DataIDE,还是数据传输、DataX,其实本质都是利用分析型数据库的LOAD命令将数据从MaxCompute批量写入分析型数据库,所以LOAD命令是最原始的方法。

 

操作示例

i. 通过ads.console.aliyun.com登录分析型数据库的web管理工具DMS。

 

ii. 账号授权

操作与第一章第1节第1)部分的账号授权相同。

 

iii. 执行导入

在DMS上执行导入有两种方式,导入向导和SQL。

导入向导是一个配置窗口,只需把源表和目标表设置好,即可提交导入任务。

 

配置好后点击确定,如果之前的步骤都操作无误,导入任务即可成功提交。此种方式比较适合不熟悉SQL编写的使用者。

 

SQL方式是直接执行LOAD数据的SQL来提交导入任务。

 

点击执行,如果之前的步骤都操作无误,导入任务即可成功提交。

 

iv. 查看导入任务状态

任务提交后可以通过DMS的导入状态页面查看任务状态。

 

 

2、非MaxCompute数据源批量导入分析型数据库

1)、通过DateIDE实现批量数据导入

i. 开通数加开发环境,数据源需要配置到数加DataIDE 中,并保证连通性。目前支持的数据源如下图:

注意:由于分析型数据中的数据是二维表形式的结构化数据,如果源端是oss或者ftp,源数据文件必须具有明显的结构化schema,比如csv、tsv等。

 

ii. 账户授权

在分析型数据库中给cloud-data-pipeline@aliyun-inner.com这个账号至少授予表的Load Data权限。

 

iii. 创建数据同步任务,配置数据映射

 

以RDS到分析型数据库为例。

 

iv. 保存后提交运行,可以通过执行日志监控执行成功与否。

 

2)、通过数据集成(Data Integration)实现批量数据导入

数据集成目前数据集成支持(和即将支持)的数据通道包括:

l   关系型数据库: RDS(MySQL、SQL Server、PostgreSQL)、DRDS

l   NoSQL数据存储: OTS、OCS

l   数据仓库: MaxCompute、Analytic DB

l   结构化存储: OSS

l   文本:TXT、FTP

 

同时也存在一些约束和限制:

支持且仅支持结构化(例如RDS、DRDS等)、半结构化(OTS等)、无结构化(OCS、OSS、TXT等, 要求具体同步数据必须抽象为结构化数据)的数据同步。换言之,Data Integration支持传输能够抽象为逻辑二维表的数据同步,其他完全非结构化数据,例如OSS中存放的一段MP3,Data Integration不支持将其同步到分析型数据库

使用示例(以RDS->分析型数据库为例)

i. 开通数据集成,在数据源管理中配置RDS数据源,并保证连通性。

 

ii. 创建Pipeline

Pipeline是数据集成权限管理、资源隔离的基本单元,为权限管理、安全控制提供管理和控制,同时也是数据同步作业运行的容器。用户进入数据集成后,须首先创建一个Pipeline。

进入数据集成控制台创建普通Pipeline。

 

 

iii. 创建作业

系统默认使用界面视图进行创建作业,填写数据源端和目的端的信息。

 

也可以使用JSON视图,选择源端和目的端类型,系统会自动生成模板,填写相应的信息即可。

配置字段映射关系。

 

创建作业成功。

 

iv. 账号授权

操作与第一章第2节第1)部分的账号授权相同。

 

v. 运行作业

数据集成可以手动运行作业,也可以定会运行。

手动运行

 

定时运行

 

可以查看执行日志。

3)、通过DataX 实现批量数据导入

DataX目前已经有了比较全面的插件体系,主流的RDBMS 数据库、NOSQL、大数据计算系统都已经接入。DataX 目前支持数据如下:


类型


数据源


Reader(读)


Writer(写)


RDBMS 关系型数据库


Mysql




Oracle




SqlServer




Postgresql




达梦




阿里云数仓数据存储


ODPS




ADS



OSS




OCS




NoSQL数据存储


OTS




Hbase0.94




Hbase1.1




MongoDB




无结构化数据存储


TxtFile




FTP




HDFS



 

与数据集成一样,当数据源是OCS、OSS、TXT、FTP或HDFS等非结构化数据时,必须抽象为结构化数据。

 

使用示例(以RDS->ADS为例)

注意:由于批量写数据到分析型数据库本质都是通过Load命令从MaxCompute将数据导入分析型数据库,其中涉及MaxCompute中转环境,DataX作为开源工具,并不提供MaxCompute环境,因此使用者需要自行准备MaxCompute环境。另外,由于涉及账号授权等安全操作,DataX也无法将这部分操作集成到工具中,因此整个数据迁移的流程实际由两部组成:RDS->MaxCompute和MaxCompute->分析型数据库。

 

i. 直接下载DataX 工具包,下载后解压至本地某个目录,修改权限为755。下载地址:

http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

ii. 查看作业配置文件模板

python datax.py -r rdsreader -w odpswriter (RDS->MaxCompute)

python datax.py –r odpsreader –w adswriter (MaxCompute->分析型数据库)

 

iii. 根据配置文件模板填写相关选项(源和目标数据库的用户名、密码、

URL、表名、列名等)。

RDS->MaxCompute

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [“id”, “name”],
                        "connection": [
                            {
                                "jdbcUrl": [“jdbc:mysql://host:port/your_database_name”],
                                "table": [your_table_name]
                            }
                        ],
                        "password": "your_password",
                        "username": "your_username",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "odpswriter",
                    "parameter": {
                        "accessId": "your_access_id",
                        "accessKey": "your_access_key",
                        "column": [“id”, “name”],
                        "odpsServer": " http://service.odps.aliyun.com/api",
                        "partition": "",
                        "project": "your_project_name",
                        "table": "your_table_name",
                        "truncate": true,
                        "tunnelServer": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

 

MaxCompute->分析型数据库

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "odpsreader",
                    "parameter": {
                        "accessId": "your_access_id",
                        "accessKey": "your_access_key",
                        "column": [
                                   "id",
                                   "name"
                            ],
                        "odpsServer": "http://service.odps.aliyun.com/api",
                        "packageAuthorizedProject": "",
                        "partition": [],
                        "project": "your_project_name",
                        "splitMode": "record",
                        "table": "your_table_name"
                    }
                },
                "writer": {
                    "name": "adswriter",
                    "parameter": {
                        "lifeCycle": 2,
                        "overWrite": "true",
                        "partition": "",
                        "password": "your_access_key",
                        "schema": "your_database_name",
                        "table": "your_table_name",
                        "url": "host:port",
                        "username": "your_access_id"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

 

 

iv. 账号授权

操作与第一章第1节第1)部分的账号授权相同。

 

v. 执行任务

依次执行RDS->MaxCompute和MaxCompute->分析型数据库两个任务。

python {your_datax_dir}/bin/datax.py {your_jsonfile_path}

 

 

4)、通过分析型数据库LOAD命令实现批量数据导入

正如前面所说,外部数据批量进入分析型数据库,最终都是通过LOAD命令从MaxCompute将数据导入,因此理论上讲,只要数据能够进入MaxCompute,就可以继续进入分析型数据库。所以只要分别完成这两个步骤,数据也就最终进入了分析型数据库。

异构数据源如何进入MaxCompute,请参考另外一篇文章?spm=5176.100240.searchblog.131.wa3XNH。

通过LOAD命令从MaxCompute将数据批量导入分析型数据库请参考本章第一节的第4)部分内容,在此不再赘述。

二、数据实时写入分析型数据库

实时写入是为了满足使用者需要数据实时进入分析型数据库而开发的功能。实时写入的本质是利用insert语句将数据一条一条的插入目标表。

注意:实时写入目标表的更新方式必须是实时更新。

1、利用应用程序实时写入

分析型数据库支持大部分版本的MySQL JDBC驱动,支持的版本号:

l   5.0系列: 5.0.2,5.0.3,5.0.4,5.0.5,5.0.7,5.0.8

l   5.1系列: 5.1.1,5.1.2,5.1.3,5.1.4,5.1.5,5.1.6,5.1.7,5.1.8,5.1.11,5.1.12,5.1.13,5.1.14,5.1.15,5.1.16,5.1.17,5.1.18,5.1.19,5.1.20,5.1.21,5.1.22,5.1.23,5.1.24,5.1.25,5.1.26,5.1.27,5.1.28,5.1.29,5.1.31

l   5.4系列

l   5.5系列

 

目前已经验证可以使用Java、C++、Python、PHP、Scala、R等语言编写程序执行数据写入分析型数据库,下面以Java程序为例。

       

 Connection connection = null;
        Statement statement = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            String url = "jdbc:mysql://host:ip/{your_database_name}?useUnicode=true&characterEncoding=UTF-8";

            Properties connectionProps = new Properties();
            connectionProps.put("user", {your_access_id});
            connectionProps.put("password", {your_access_key});

            connection = DriverManager.getConnection(url, connectionProps);
            statement = connection.createStatement();

            String sql = "insert into table your_table values (1, ‘name1’);";
            int status = statement.executeUpdate(sql);

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }

 

注意:在连接数据库时,用户名和密码是连接该数据库云账号的Access Id和Access Key。

 

注意:进行写入时,在以下几个地方进行优化,可以提升写入性能。

l   采用批量写入(batch insert)模式,即每次在VALUES部分添加多行数据,一般建议每次批量写入数据量大约为16KB,以提高网络和磁盘吞吐。

l   按hash分区列聚合写入。分析型数据库需要对数据进行分区存储,当一次Batch insert中含有属于不同分区的多行数据时,将会耗费大量CPU资源进行分区号计算。因此建议在写入程序中提前计算好每行数据的分区号,并且将属于同一分区的多行数据组成一个批次,一次性插入。用户可自行实现该聚合方法,对分区号的计算规则为partition_num = CRC32(hash_partition_column_value) mod m,其中hash_partition_column_value是分区列的值,m是分区总数。

l   如果对一行的所有列都进行插入,则去除col_name并保证values顺序与表结构中的col_name顺序一致,以降低网络带宽耗用。

l   保持主键相对有序。分析型数据库的insert语句要求必须提供主键,且主键可以为复合主键。当确定复合主键时,根据业务含义调整复合主键中各个列的次序,从业务层面保证插入时主键是严格递增或近似递增的,也可以提升实时写入速度。

l   增加ignore关键字。执行不带ignore关键字的insert  sql,当主键冲突时,后续数据会覆盖之前插入的数据;带上ignore关键字,则主键冲突时,会保留之前插入的数据而自动忽略新数据。如果业务层没有数据覆盖的语义要求,则建议所有insert sql都加上ignore关键字,以减小覆盖数据带来的性能开销。

 

2、通过DataIDE实时写入

i. 开通数加开发环境,数据源需要配置到数加DataIDE 中,并保证连通性。目前支持的数据源如下图:

注意:由于分析型数据中的数据是二维表形式的结构化数据,如果源端是oss或者ftp,源数据文件必须具有明显的结构化schema,比如csv、tsv等。

 

ii. 创建数据同步任务,配置数据映射

 

以RDS到分析型数据库为例,导入模式选择实时导入。

注意:分析型数据库中的目标表必须是实时更新表。

 

iv. 保存后提交运行,可以通过执行日志监控执行成功与否。

 

3、通过数据集成(Data Integration)实时写入

注意:由于分析型数据中的数据是二维表形式的结构化数据,如果源端是oss或者ftp,源数据文件必须具有明显的结构化schema,比如csv、tsv等。

使用示例(以RDS->分析型数据库为例)

i. 开通数据集成,在数据源管理中配置RDS数据源,并保证连通性。

 

ii. 创建Pipeline

Pipeline是数据集成权限管理、资源隔离的基本单元,为权限管理、安全控制提供管理和控制,同时也是数据同步作业运行的容器。用户进入数据集成后,须首先创建一个Pipeline。

进入数据集成控制台创建普通Pipeline。

 

 

iii. 创建作业

系统默认使用界面视图进行创建作业,填写数据源端和目的端的信息,导入模式选择实时导入。

 

也可以使用JSON视图,选择源端和目的端类型,系统会自动生成模板,填写相应的信息即可,writeMode填写insert。

配置字段映射关系。

 

创建作业成功。

 

iv. 运行作业

数据集成可以手动运行作业,也可以定会运行。

手动运行

 

定时运行

 

可以查看执行日志。

 

4、通过DataX实时写入

注意:由于分析型数据中的数据是二维表形式的结构化数据,如果源端是oss或者ftp,源数据文件必须具有明显的结构化schema,比如csv、tsv等。

 

使用示例(以RDS->ADS为例)

 

i. 直接下载DataX 工具包,下载后解压至本地某个目录,修改权限为755。下载地址:

http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

ii. 查看作业配置文件模板

python datax.py -r rdsreader -w adswriter

 

iii. 根据配置文件模板填写相关选项(源和目标数据库的用户名、密码、

URL、表名、列名等)。

{
"job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": {your_username},
                            "password": {your_password},
                            "column": ["id","name"],
                            "connection": [
                                {
                                    "table": [
                                        {your_table_name}
                                    ],
                                    "jdbcUrl": [
                                        "jdbc:mysql://host:port/{your_database_name}"
                                    ]
                                }
                            ]
                        }
                    },
                    "writer": {
                        "name": "adswriter",
                        "parameter": {
                                "writeMode": "insert",
                            "username": {your_access_id},
                            "password": {your_access_key},
                            "column": ["id","name"],
                            "url": "host:port",
                            "partition": "",
                            "schema": {your_database_name},
                            "table": {your_table_name}
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": 1
                }
            }
        }
    }

 

iv. 执行任务

python {your_datax_dir}/bin/datax.py {your_jsonfile_path}

 

 

说明:使用DataIDE、数据集成和DataX进行实时写入底层都是调用JDBC接口,但是这三个工具都已经进行了写入的优化,可以节省用户的开发量。另外DataIDE还可以提供定时调度功能,方便数据写入与数据处理进行集成。因此,在没有特殊要求的场景,建议使用DataIDE实现数据实时写入分析型数据库。

5、利用Kettle实时写入数据

Pentaho Data Integration(又称Kettle)是一款非常受欢迎的开源ETL工具软件。分析型数据库支持用户利用Kettle将外部数据源写入实时写入表中。Kettle的数据输出程序并未为分析型数据库进行过优化,因此写入分析型数据库的速度并不是很快(通常不超过700 rec/s),不是特别适合大批量数据的写入,但是对于本地文件上传、小数据表等的写入等场景是非常合适的。

 

使用示例

i. 下载Kettle,并解压。下载地址http://jaist.dl.sourceforge.net/project/pentaho/Data%20Integration/7.0/pdi-ce-7.0.0.0-25.zip

 

ii. 启动Kettle,创建转换

源端为表输入组件,目的端为表输出组件。

 

iii. 配置组件属性

表输入组件

 

 

表输出组件

 

 

iv. 执行转换

 

可以登录DMS查看写入分析型数据库的数据。

6、利用DataHub和流计算实时写入

DataHub服务是阿里云提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布 (Publish)和订阅 (Subscribe)的功能。DataHub具体介绍请参考https://datahub.console.aliyun.com/intro/index.html

 

Alibaba Cloud StreamCompute(阿里云流计算)是运行在阿里云平台上的流式大数据分析平台,提供给用户在云上进行流式数据实时化分析工具。流计算具体介绍请参考https://stream.console.aliyun.com/help/index.html

 

DataHub和流计算结合使用可以实现数据实时加工处理并写入分析型数据库的需求。

 

使用示例

i. 开通DataHub服务和流计算服务,并创建项目。

目前这两个产品均处于公测阶段,可以申请使用。

DataHub:请联系阿里云业务接口同学进行开通

流计算:https://data.aliyun.com/product/sc

注意:由于流计算依赖于DataHub数据源,因此两个产品必须都开通才能实现此场景。

 

 

 

ii. 数据写入DataHub

DataHub已提供SDK,可以通过编写Java程序写入数据。同时与当前流行的部分开源数据收集工具互通,例如LogStash,Fluentd等,可以通过这些数据收集工具直接将数据流向DataHub。具体实现方法请参考https://datahub.console.aliyun.com/intro/guide/index.html

本例中采用java程序调用SDK写入数据。

 


              String projectName = {your_project_name};
              String topicName = {your_topic_name};
              String accessId = {your_access_id};
              String accessKey = {your_access_key};
              String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
              AliyunAccount account = new AliyunAccount(accessId, accessKey);
              DatahubConfiguration conf = new DatahubConfiguration(account, endpoint);
              DatahubClient client = new DatahubClient(conf);

              RecordSchema schema = new RecordSchema();
              schema.addField(new Field("id", FieldType.BIGINT));
              schema.addField(new Field("name", FieldType.STRING));

              ListShardResult listShardResult = client.listShard(projectName, topicName);

              List<RecordEntry> recordEntries = new ArrayList<RecordEntry>();
              String shardId = listShardResult.getShards().get(0).getShardId();

              for (long i = 0; i <= 100; i++) {
                     RecordEntry entry = new RecordEntry(schema);
                     entry.setString(1, "name" + i);
                     entry.setBigint(0, i);
                     entry.setShardId(shardId);
                     recordEntries.add(entry);
              }

              PutRecordsResult result = client.putRecords(projectName, topicName, recordEntries);

 

 

iii. 创建流计算任务

--创建DataHub数据源

CREATE STREAM TABLE cdp_demo_rt_dh (
       id      BIGINT,
       name    STRING
) WITH (
       type='datahub',
       endpoint='http://dh-cn-hangzhou-internal.aliyuncs.com',
       roleArn='acs:ram::1811270634786818:role/aliyunstreamdefaultrole',
       projectName='bigdatatraining',
       topic='cdp_demo_rt'
);

 

--创建分析型数据库数据源

CREATE RESULT TABLE cdp_demo_rt_ads (
    id BIGINT,
    name STRING,
    PRIMARY KEY(id)
) WITH (
    type='ads',
    url='jdbc:mysql://trainning-db1-840e4b36.cn-hangzhou-1.ads.aliyuncs.com:10208/trainning_db1',
    username='gfCrBlzhaVzZeWDP',
    password='pWMAPt15gU8IBox9bj9rpjGUxjXcqI',
    tableName='cdp_demo_rt'
);

 

 

--计算逻辑并写入分析型数据库

REPLACE INTO TABLE cdp_demo_rt_ads
    SELECT id, name FROM cdp_demo_rt_dh;

 

iv. 执行流计算任务

 

通过DMS查看写入分析型数据库的数据。

 

当然,根据具体的场景,可以在流计算中定义更加复杂的处理逻辑。

7、通过数据传输(Data Transmission)的分析型数据库插件将RDS MySQL增量数据实时写入分析型数据库

通过阿里云数据传输(https://www.aliyun.com/product/dts/),并使用 dts-ads-writer 插件, 可以将您在阿里云RDS中的数据表的变更实时同步到分析型数据库中对应的实时写入表中。

注意:RDS端目前暂时仅支持MySQL引擎

 

使用示例

i. 开通数据传输服务并创建数据订阅通道,具体操作请参考https://help.aliyun.com/document_detail/dts/Getting-Started/data-subscription.html?spm=5176.doc26427.2.3.nJqFL7

 

 

ii. 下载dts-ads-writer插件,并编辑配置文件app.conf

下载地址https://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/26427/cn_zh/1470925197355/dts-ads-writer-0.17.zip?spm=5176.doc26427.2.2.nJqFL7&file=dts-ads-writer-0.17.zip

 

{

  "dtsAccessId": {your_access_id},

  "dtsAccessKey": {your_access_key},

  "dtsTunnelId": {your_tunnel_id},

  "adsUserName": {your_access_id},

  "adsPassword": {your_access_key},

  "adsJdbcUrl": "jdbc:mysql://host:port/{your_analyticdb_database_name}",

  "options": {

        "traceSql": false,

        "detailLog": false,

        "isReplaceInvalidInsertValue": true,

        "invalidInsertValueCharacters": "\\\\n,\\\\r,\\\\t,'"

  },

  "tables": [

    {

      "source": {

        "primaryKeys": [

          "id"

        ],

        "db": {your_rds_database_name},

        "table": {your_rds_table_name}

      },

      "target": {

        "db": {your_analyticdb_database_name},

        "table": {your_analyticdb_table_name}

},

      "columnMapping": {

        "id": "id",

        "name": "name"

      }

    }

  ]

}

 

 

iii. 启动插件消费dts订阅通道数据,并写入分析型数据库

启动后可以通过日志文件查看插件运行情况。

 

iv. 在RDS中写入一条数据

 

v. 查看分析型数据库中的数据

 

注意:使用数据传输的dts-ads-writer插件可以将数据变化实时同步到分析型数据库,数据变化不单单指insert,delete和update操作也可以同步到分析型数据库。

时间: 2024-09-19 09:54:20

数据进入阿里云数加-分析型数据库AnalyticDB(原ADS)的N种方法的相关文章

数加分析型数据库:让你的数据探索更灵活、准确、快速响应和高并发

在大数据时代,大家越来越注重数据探索的灵活性.准确性.快速响应和高并发.为此,阿里云数加团队在结合多年应用经验的基础上,推出了分析型数据库. 什么是分析型数据库? 分析型数据库(Analytic DB,原名ADS),是阿里巴巴自主研发的海量数据实时高并发在线分析(Realtime OLAP)云计算服务,能让用户可以在毫秒级针对千亿级数据进行即时的多维分析透视和业务探索.分析型数据库对海量数据的自由计算和极速响应能力,能让用户在瞬息之间进行灵活的数据探索,快速发现数据价值,并可直接嵌入业务系统为终

一分钟了解阿里云产品:分析型数据库

一.             概述   阿里云产品种类繁多,今天让我们一起来了解下分析型数据库(Analytic DB)吧!   什么是分析型数据库呢?   分析型数据库是阿里巴巴自主研发的海量数据实时高并发在线分析(Realtime OLAP)云计算服务,使得您可以在毫秒级针对千亿级数据进行即时的多维分析透视和业务探索.分析型数据库对海量数据的自由计算和极速响应能力,能让用户在瞬息之间进行灵活的数据探索,快速发现数据价值,并可直接嵌入业务系统为终端客户提供分析服务.     分析型数据库帮您实

【大数据干货】数据进入阿里云数加-大数据计算服务MaxCompute(原ODPS)的N种方式

免费开通大数据服务:https://www.aliyun.com/product/odps 想用阿里云大数据计算服务(MaxCompute),对于大多数人首先碰到的问题就是数据如何迁移到MaxCompute中.按照数据迁移场景,大致可以分为批量数据.实时数据.本地文件.日志文件等的迁移,下面我们针对每种场景分别介绍几种常用方案. 大数据计算服务(MaxCompute) 快速.完全托管的TB/PB级数据仓库解决方案,向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海

基于阿里云数加平台的大数据Serverless实践

本文PPT来自班输于10月16日在2016年杭州云栖大会上发表的<基于阿里云数加平台的大数据Serverless实践>. 数加是阿里云大数据的品牌名,其旗下包含一系列的大数据产品及服务,可以为用户提供一站式的数据开发.分析.应用平台.数加提供的服务包括智能语音/图象/视频分析服务.企业级数据仓库服务,地理信息可视化服务,风险预警与管控服务等等.其在基础平台的大数据产品包括数据开发.机器学习.大数据计算.分析型数据库.流计算,在数据应用层的产品包括数据可视化DataV.推荐引擎.人脸识别等等.

【Best Practice】基于阿里云数加·MaxCompute及Quick BI构建网站用户画像分析

前文背景:[Best Practice]基于阿里云数加·StreamCompute快速构建网站日志实时分析大屏   开通阿里云数加产品 前提条件 为了保证整个实验的顺利开展,需要用户使用开通相关产品及服务,包括DataHub.MaxCompute.AnalyticDB.Data IDE.Quick BI.      业务场景 数据来源于网站上的HTTP访问日志数据,基于这份网站日志来实现如下分析需求: n   统计并展现网站的PV和UV,并能够按照用户的终端类型(如Android.iPad.iP

Serverless理念的弄潮儿—— 阿里云数加平台助力大数据普惠

免费开通大数据服务:https://www.aliyun.com/product/odps 阿里云坚持将计算能力变成像水电煤一样的公共服务,提供给大众,而非单单而不是卖服务器给客户,这跟今日流行的Serverless 架构理念是一致的.Serverless 理念在数加平台得到了很好的体现,数加平台今天已经可以提供很多业务场景化的计算服务,比如推荐引擎,规则引擎,以及各种人工智能的服务,助力企业在DT时代更敏捷.更智能.更具洞察力.在本文中,班输从数据平台简介.大数据应用特点.数加平台Server

阿里云数加(大数据)打造雄安智慧新区

       自从4月1日,中共中央.国务院印发通知,决定设立河北雄安新区之后,这个无名小城就一夜爆红.雄安新区规划范围涉及河北省雄县.容城.安新3县及周边部分区域,地处北京.天津.保定腹地,是继深圳经济特区和上海浦东新区之后又一具有全国意义的新区.一时之间,刚诞生不久的雄安新区,被纷纷涌入的买房"军团".暴涨的房价.发展趋势的猜测等等新闻包围的水泄不通.        雄安新区横空出世,引发各界高度关注."前有深圳.浦东,今天的雄安新区潜力有多大"?有媒体用这样的

王宝强离婚成了谁的狂欢?——通过阿里云·数加严肃解读数据背后的媒体传播路径

文/林济源 北京大学深研院  2016年8月14日凌晨,正当许多夜猫在为奥运会损耗青春的肉体,一条突如其来的微博推送想必让无数人倍感精神,似乎拥有"上帝视角"的每个他或她已成为暗夜里的裁判,义愤填膺地点评其中的"背叛"."阴谋"."道德"."财产". 第二天,宝宝的离婚门继续发酵,与"张继科内裤"为代表的奥运热点一起包揽了微博热搜的前十,而从搜索量就可知其不在一个量级. 无耻的是,朋友圈

阿里云·数加“公众趋势分析”怎么用?大厨带你做测评!(内含娱乐圈案例分析)

小组成员:林济源 吴开元 张涵 文/编辑: 林济源 公众趋势分析:https://data.aliyun.com/product/prophet  原价50400元/年 秒杀价69元/年 每天100笔订单 阿里云·数加的数据应用--公众趋势分析已悄然上线,其在网站上把自己描述为"基于全网公开发布数据.传播路径和受众群体画像,利用语义分析.情感算法和机器学习,分析公众对品牌形象.热点事件和公共政策的认知趋势." 不仅如此,公众趋势分析原来还是阿里小ai的一只"眼睛",