MaxCompute访问TableStore(OTS) 数据(20170601更新)

免费开通大数据服务:https://www.aliyun.com/product/odps

0. 前言

MaxCompute作为阿里云大数据平台的核心计算组件,承担了集团内外大部分的分布式计算需求。而MaxCompute SQL作为分布式数据处理的主要入口,为快速方便处理/存储EB级别的离线数据提供了强有力的支持。 随着大数据业务的不断扩展,新的数据使用场景在不断产生,在这样的背景下,MaxCompute计算框架也在不断的演化,原来主要面对内部特殊格式数据的强大计算能力,也正在一步步的开放给不同的外部数据。 之前我们介绍了怎样在MaxCompute上处理存储在OSS上面的非结构化数据,在这里我们将进一步介绍如何将来自TableStore(OTS)的数据纳入MaxCompute上的计算生态,实现多种数据源之间的无缝连接。

对于在线服务等应用场景,NoSQL KV Store(e.g., BigTable, HBase)相比传统数据库,具有schema灵活,易扩展,实时性强等优点。阿里云TableStore(OTS) 是在阿里飞天分布式系统之上实现的大规模的NoSQL数据存储服务,提供海量KV数据的存储和实时访问。 在集团内部各BU以及外部阿里云生态圈中具有广泛的应用。 尤其是TableStore在行级别上的实时更新和可覆盖性写入等特性,相对于MaxCompute内置表append-only批量操作,提供了一个很好的补充。 但是TableStore作为一个偏存储的服务,对于存储在其上的海量数据,在大规模批量并行处理的计算能力有所欠缺。 在这样的背景下,打通MaxCompute的计算和TableStore存储之间的数据链路就显得尤其重要。

1. MaxCompute对TableStore数据进行读取和计算

1.0 使用前提和假设

1.0.1 MaxCompute 2.0计算框架非结构化功能的打开

首先要说明的是MaxCompute新一代的2.0计算框架还在灰度上线的过程中,默认设置下许多功能没有打开,所以要使用新引进的非结构化数据处理框架,需要申请MaxCompute 2.0试用,具体开通使用方法请参见 如何申请试用MaxCompute 2.0, 简单来说就是在开通2.0非结构化功能的前提下,在每个SQL query执行时必须带上如下setting:

set odps.task.major.version=2dot0_demo_flighting;

set odps.sql.planner.mode=lot;

set odps.sql.ddl.odps2=true;

set odps.sql.preparse.odps2=lot;

下面的范例中就不再重复了,但是本文介绍的所有功能均基于以上假设,当然这些特殊设置在2.0计算框架完全上线后就可以省略了。

1.0.2 TabelStore基本概念以及网络连通性

如果对于TableStore不熟悉或者对于整个KV table的概念比较陌生同学,可以通过TableStore的文档来先了解一些基本概念(比如主键,分区键,属性列等),这里的讨论不对这些基本概念做深入的解释和探讨。 同时对于MaxCompute非结构化框架的整体介绍可以参见之前在MaxCompute上处理存储在OSS上面的非结构化数据的介绍文章,包括External Table, StorageHandler的概念以及SERDEPROPERTIES的使用等。

MaxCompute与TableStore是两个独立的大数据计算以及大数据存储服务,所以两者之间的网络必须保证连通性。 对于MaxCompute公共云服务访问TableStore存储,推荐使用TableStore私网地址,也就是host名以ots-internal.aliyuncs.com作为结尾的地址,例如tablestore://odps-ots-dev.cn-shanghai.ots-internal.aliyuncs.com

1.0.3 TabelStore类型与MaxCompute类型的对应

TableStore与MaxCompute都有其自身的类型系统。 在MaxCompute处理TableStore数据的时候,两者之间的类型对应关系如下:

MaxCompute Type TableStore Type
STRING STRING
BIGINT INT
DOUBLE DOUBLE
BINARY* BLOB

*其中要特别说明的是, 对于MaxCompute而言,BINARY类型是在新的2.0类型系统中引进的,所以如果有需要使用到BINARY类型的时候,除了上文1.0.1 里面提到的设置之外,还需要额外添加一个设置才能生效:

set odps.sql.type.system.odps2=true;

1.1 使用STS/RAM的方式访问TableStore数据

需要首先指出的是,MaxCompute计算服务要访问TableStore数据需要有一个安全的授权通道。 在这个问题上,MaxCompute结合了阿里云的访问控制服务(RAM)和令牌服务(STS)来实现对数据的安全反问:

首先需要在RAM中授权MaxCompute访问OSS的权限。登录RAM控制台,创建角色AliyunODPSDefaultRole,并将策略内容设置为:

{
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "odps.aliyuncs.com"
        ]
      }
    }
  ],
  "Version": "1"
}

然后编辑该角色的授权策略,将权限AliyunODPSRolePolicy授权给该角色。

如果觉得这些步骤太麻烦,还可以登录阿里云账号点击此处完成一键授权。

1.2 创建External Table 关联MaxCompute与TableStore

与处理OSS数据的使用方法类似,MaxCompute通过EXTERNAL TABLE的方式来对接TableStore的数据:用户通过一个CREATE EXTERNAL TABLE的DDL语句,把对TableStore表数据的描述引入到MaxCompute的meta系统内部后,即可如同使用一个普通TABLE一样来实现对TableStore数据的处理。 这里先提供一个最简单的使用范例,并且通过该范例来讨论ODPS对接TableStore的一些概念和实现。


DROP TABLE IF EXISTS ots_table_external;

CREATE EXTERNAL TABLE IF NOT EXISTS ots_table_external
(
odps_orderkey bigint,
odps_orderdate string,
odps_custkey bigint,
odps_orderstatus string,
odps_totalprice double
)
STORED BY 'com.aliyun.odps.TableStoreStorageHandler' -- (1)
WITH SERDEPROPERTIES ( -- (2)
'tablestore.columns.mapping'=':o_orderkey, :o_orderdate, o_custkey, o_orderstatus,o_totalprice', -- (3)
'tablestore.table.name'='ots_tpch_orders' -- (4)
)
LOCATION 'tablestore://odps-ots-dev.cn-shanghai.ots-internal.aliyuncs.com'; -- (5)

这个DDL语句将一个TableStore表映射到了MaxCompute的一个External Table上。 在这个基础上,后继对TableStore数据的操作可以直接通过External Table进行。 这里先对上面这个CREATE EXTERNAL TABLE的DDL语句做一些解释:

  1. com.aliyun.odps.TableStoreStorageHandler 是MaxCompute内置的处理TableStore数据的StorageHandler, 定义了MaxCompute和TableStore的交互,相关逻辑由MaxCompute实现。
  2. SERDEPROPERITES可以理解成提供参数选项的接口,在使用TableStoreStorageHandler时,有两个必须指定的选项,分别是下面介绍的tablestore.columns.mappingtablestore.table.name。 更多的可选选项将在后面其他例子中提及。
  3. tablestore.columns.mapping选项:必需选项,用来描述对需要MaxCompute将访问的TableStore表的列,包括主键和属性列。 这其中以:打头的用来表示TableStore主键,例如这个例子中的:o_orderkey:o_orderdate。 其他的均为属性列。 TableStore支持最少1个,最多4个主键,主键类型为bigintstring,其中第一个主键为分区键。 在指定映射的时候,用户必须提供指定TableStore表的**所有**主键,对于属性列则没有必要全部提供,可以只提供需要通过MaxCompute来访问的属性列。
  4. tablestore.table.name:需要访问的TableStore表名。 如果指定的TableStore表名错误(不存在),则会报错,MaxCompute不会主动去创建TableStore表。
  5. LOCATION clause用来指定TableStore的具体信息,包括instance名字,endpoint等。 值得注意的是这里对TableStore数据的安全访问是建立在前文介绍的RAM/STS授权的前提上的。

1.3 通过External Table访问TableStore数据进行计算

在用上述DDL创建出External Table之后,TableStore的数据就引入到了MaxCompute的生态中,我们现在就可以通过正常的MaxCompute SQL语法来访问TableStore数据了,比如:


SELECT odps_orderkey, odps_orderdate, SUM(odps_totalprice) AS sum_total
FROM ots_table_external
WHERE odps_orderkey > 5000 AND odps_orderkey < 7000 AND odps_orderdate >= '1996-05-03' AND odps_orderdate < '1997-05-01'
GROUP BY odps_orderkey, odps_orderdate
HAVING sum_total> 400000.0;

可以看到,在上面的这个例子,直接使用的就是我们所熟悉的MaxCompute SQL语法,访问TableStore的所有细节由MaxCompute内部处理。 这包括在列名的选择上:可以看到,在这个SQL里面,使用的列名是odps_orderkeyodps_totalprice等,而不是原始TableStore里面的主键名o_orderkey或属性列名o_totalprice了,因为我们在创建External Table的DDL语句里,已经做了对应的mapping。 当然因为具体的mapping是每个用户可以自己控制的,所以如果在创建External Table的时候保留原始的TableStore主键/列名也是可以的。

在底层的实现上,MaxCompute框架针对TableStore数据的存储特点做了各种优化,包括并发读取,SQL语句的filtering操作转义等。 举个例子,有filtering操作(比如上面的WHERE语句)时,MaxCompute会判断filtering key是否为TableStore表格的主键,来决定如何用TableStore的GetRange API来读取最小量数据,而不是无条件读取全量数据读到MaxCompute计算节点再做过滤操作。 当然这些实现的优化用户均无需感知,由MaxCompute计算框架负责来提供高效的实现。

另外如果需要对一份数据做多次计算,相较每次从TableStore去远程读数据,一个更高效的办法是先一次性把需要的数据导入到MaxCompute内部成为一个MaxCompute(内部)表,比如:


CREATE TABLE internal_orders AS
SELECT odps_orderkey, odps_orderdate, odps_custkey, odps_totalprice
FROM ots_table_external
WHERE odps_orderkey > 5000 ;

现在internal_orders就是一个我们熟悉的MaxCompute表了,也拥有所有MaxCompute内部表的特性:包括高效的压缩列存储数据格式,完整的内部宏数据以及统计信息等。 同时因为存储在MaxCompute内部,访问速度会比访问外部的TableStore更快,尤其适用于需要进行多次计算的热点数据。

2. 数据从MaxCompute写出到TableStore

打通MaxCompute和TableStore的数据生态,除了将TableStore作为批量数据处理的数据来源以外,一个另外的重要场景是将MaxCompute的数据处理结果输出到TableStore,利用TableStore可实时更新和可单行覆盖等特点,迅速的将离线计算的结果反馈给在线应用。 这种对TableStore的数据输出可以使用MaxCompute SQL的INSERT OVERWRITE来实现。

需要再次强调的是,MaxCompute不会主动创建外部的TableStore表,所以在对TableStore表进行数据输出之前,必须保证该表已经在TableStore上创建过(否则将报错)。 这样的设计是因为TableStore建表的过程可能涉及到CU的设置,计费,数据生命周期等一系列选项,这些必须由数据的所有者来决定: MaxCompute不拥有这些外部数据,也无法做出这些选择。

紧接上面的例子,假设我们已经使用上面的DDL语句创建了ots_table_external这个外部表来打通MaxCompute与TableStoreb数据表ots_tpch_orders的链路, 而且我们还有一份存储在在MaxCompute内部有一个名为internal_orders的数据,现在希望对internal_orders中的数据进行一定处理后再写回TableStore上,那么可以直接通过对外部表做INSERT OVERWITE TABLE的操作来实现:


INSERT OVERWRITE TABLE ots_table_external
SELECT odps_orderkey, odps_orderdate, odps_custkey, CONCAT(odps_custkey, 'SHIPPED'), CEIL(odps_totalprice)
FROM internal_orders;

在这里对表中的数据做了一些处理,并且把经过处理过的数据重新写回到了TableStore里。 对于TableStore这种KV数据的NoSQL存储介质,从MaxCompute的输出将只影响相对应主键所在的行:在上面这个例子中也就是只影响所有 odps_orderkey + odps_orderdate 这两个主键值能对应行上的数据。 而且在这些TabeleStore行上面,也只会去更新在创建External Table (ots_table_external)时指定的属性列,而不会去修改未在External Table里面出现的数据列。 关于MaxCompute外部表和TableStore表的对应关系,下文会更系统的介绍。

3. 技术细节以及高级用法

从本质上来看MaxCompute表是严格结构化的数据表,要求所有的行均遵从严格一致的schema,而TableStore的表中存储的是NoSQL的“半结构化”K-V数据。这些基本数据格式上的区别决定了在两个系统的交互过程中,行为会有一些区别。 同时MaxCompute作为一个分布式计算系统,读写TableStore通过并发执行来实现,这就需要对TableStore的数据有一个切割的机制。 在默认情况下,MaxCompute会提供一个系统认为最合适的处理方法,来实现对TableStore数据的访问和计算,而这些实现也能满足绝大部分用户的需求。 但是与此同时,为了满足一些对系统比较熟悉的高级用户的特殊需求,MaxCompute也提供了更多的可配置选项,这里做一些介绍。

3.1 MaxCompute外表与TableStore数据表的对应关系

MaxCompute外表与TableStore数据表是多对一(N:1)的关系。 也就是说可以有多个MaxCompute外表(External Table)来描述一张TableStore表。 这里的N:1是两个维度上的:

  • 不同的MaxCompute外表可以描述一张TableStore表的不同属性列子集,比如如果在TableStore的一个表有3个主键列,(up to)20个属性列,那么通过MaxComptue的外表,主键必须提供完备,但是属性列则不必,比如可以只提供一个属性列,那么通过MaxCompute外表进行的所有操作,都只会基于主键和所提供的属性列上的数据。
  • 不同的MaxCompute外表可以描述一张TableStore表的不同range,在本文的例子里都是一个外表对应一个TableStore表的全range,但实际使用的时候是可以通过额外选项来指定外表对应的range start和range end的, 这可以做到一个外表只映射一个TableStore表的子range。 这个功能这里不展开介绍,有需求的话可以联系MaxCompute技术团队。

3.2 MaxCompute读取TableStore数据时的并发度

TableStore是一个分布式KV数据存储系统,每个数据表都可能存储在多个后端server上,并且根据分区键进行分区,具体存储上的分区策略由TableStore决定。 目前通过MaxCompute读取TableStore数据,默认的并发度将与TableStore后端的分区数目相同。 唯一的例外是,在采用INTEGER64作为分区键,且TableStore后端的分区数目大于1时,MaxCompute会自动对并发度再做调整,在更高的并发度上读取数据。 此外TableStore自身的系统也在不断发展,以后将提供更强大的API接口给MaxCompute来使用,到时候将可以根据后端数据的大小,来准确的做出数据切割。 更准确的控制每个并发MaxCompute worker处理的数据量和计算时间。 这方面将在这些功能实现后再更新来做具体说明。

最后,如果用户对自己存储在TableStore数据有着非常好的了解,比如对于不同key range中的数据量都能做出很好的预估,MaxCompute还提供让用户自己指定并发度的选型:用户的控制甚至可以细化到指定每个worker应该处理哪个range的数据。 有这个需求的用户可以联系MaxCompute技术团队。

3.3 MaxCompute写出TableStore数据时的并发度

在将MaxCompute内部数据写出到TableStore时,并发度由MaxCompute根据数据量自动进行控制。 当然用户也可以手动调节SQL执行过程中的mapper/reducer数目来调整并发度。 但是绝大部分情况下, MaxCompute本身适配的并发度都是比较合理的,一般不建议自己手工设置mapper/reducer数目。 另外,在一些场景上,MaxCompute计算服务与TableStore存储服务之间会存在着需要适配的情况。一般来说MaxCompute可以调度起计算节点都比较充裕,而大量计算节点同时往TableStore写出数据时可能会打满网络。 这种情况下单纯调高MaxCompute计算节点数目来并发写出数据,并不会带来额外的提速。 所以在特别大的规模上,用户最好能提前和TableStore服务沟通,以保证TableStore能提供足够的吞吐量满足需求。

3.4 MaxCompute访问TableStore的网络连通性

因为MaxCompute与TableStore是两个分开的云服务,所以在不同的部署集群上的网络连通性有可能影响MaxCompute访问TableStore的数据的可达性。 关于TableStore的节点,实例,服务地址等概念,可以参见TableStore相关介绍。 如同上文介绍的,在MaxCompute公共云服务访问TableStore存储,推荐使用TableStore私网地址(即以ots-internal.aliyuncs.com结尾的host地址)。

4. 结语:构造大数据生态

随着MaxCompute非结构化数据处理框架的上线,MaxCompute开放了处理外部数据的接口,包括之前介绍的访问OSS数据,以及本文描述的访问TableStore数据。 我们希望借此来实现整个阿里云计算与数据的生态融合: 在不同的项目上,我们已经看到了在MaxCompute上处理OSS上的海量视频,图像等非结构化数据的巨大潜力。 随着TableStore数据支持的加入,期待计算和更多数据的碰撞能打通更多的应用场景,让OSS数据,TableStore数据以及MaxCompute内部存储的数据,能在MaxCompute的核心计算引擎上产生更大的价值。

时间: 2024-09-30 18:48:10

MaxCompute访问TableStore(OTS) 数据(20170601更新)的相关文章

使用MaxCompute访问TableStore(OTS) 简明手册

关系数据库已经存在半个世纪,有非常广泛的使用场景,但是在快速迭代的互联网领域其扩展性和 schema 灵活性被诟病颇多,因此类似 TableStore/BigTable/HBase 等强调扩展性和灵活性的NoSQL数据库逐步流行起来,这些 NoSQL 数据库只提供 API 接口,不提供 SQL 访问,这就导致很多熟悉 SQL 但是不喜欢写代码的用户没法很舒服的使用此类NoSQL数据库.基于此,表格存储开发团队联合 MaxCompute(下文中 ODPS 与 MaxCompute 同义)团队打通了

MaxCompute百问集锦(持续更新)

大数据计算服务(MaxCompute,原名 ODPS,https://www.aliyun.com/product/odps)是一种快速.完全托管的 GB/TB/PB 级数据仓库解决方案.MaxCompute 向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全.同时,大数据开发套件和MaxCompute关系紧密,大数据开发套件为 MaxCompute 提供了一站式的数据同步,任务开发,数据工作流开发,数据管理和数据运

MaxCompute( 原名ODPS)大数据容灾方案与实现(及项目落地实例)专有云

一,背景与概述     复杂系统的灾难恢复是个难题,具有海量数据及复杂业务场景的大数据容灾是个大难题.     MaxCompute是集团内重要数据平台,是自主研发的大数据解决方案,其规模和稳定性在业界都是领先的.在周边系统众多,业务场景复杂,海量数据存储和计算调度都是一个难题的情况下,需要保证大数据系统在灾难发生时能够尽快切换到备用系统服务,最小限度影响客户使用.     容灾系统及方案的建设有很多种方式,如同城双活,异地多活,冷备容灾等.MaxCompute大数据的容灾方案是在多年集团内部断

SQL应用与开发:(七)数据操作 &amp;#183; 查 &amp;#183; (三)使用子查询访问和修改数据

3.使用子查询访问和修改数据 子查询和连接查询一样提供了使用单个查询访问多个表中的数据的方法.子查询在其他结果的基础上提供一种有效地方式来表示WHERE子句的条件.子查询是一个SELECT语句,它定义在SELECT.INSERT.UPDATE或DELECT语句或者另一个子查询中.子查询的SELECT语句可与外部查询指向不同的表. 嵌套的子查询或嵌套的SELECT语句是指包含一个或多个子查询的SELECT语句.子查询可嵌套在外部的SELECT.INSERT.UPDATE或DELECT语句的WHER

DataSet.AcceptChanges()后 SqlDataAdapter.Update(DataSet)时数据无法更新数据库 希望路过高人指点 谢谢了

问题描述 DataSet.AcceptChanges()后SqlDataAdapter.Update(DataSet)时数据无法更新数据库希望路过高人指点谢谢了 解决方案 解决方案二:自己顶解决方案三:有没异常出现?解决方案四:或是DataSet.HasChanges=false?解决方案五:没有任何异常解决方案六:我是你下面的并发冲突的贴,还没解决,头痛.DataSet.AcceptChanges()好像放在update的后面.解决方案七:放在update后面就起不到作用了我是GridView

利用数据库复制技术 实现数据同步更新

数据|数据库|数据同步 利用数据库复制技术 实现数据同步更新复制的概念复制是将一组数据从一个数据源拷贝到多个数据源的技术,是将一份数据发布到多个存储站点上的有效方式.使用复制技术,用户可以将一份数据发布到多台服务器上,从而使不同的服务器用户都可以在权限的许可的范围内共享这份数据.复制技术可以确保分布在不同地点的数据自动同步更新,从而保证数据的一致性.SQL复制的基本元素包括出版服务器.订阅服务器.分发服务器.出版物.文章SQL复制的工作原理SQL SERVER 主要采用出版物.订阅的方式来处理复

保持并关闭Excel数据链接更新提示

在Excel工作簿中链接其他工作簿中的数据是相当常见的.但每次打开这样的工作簿时,Excel都会自动弹出一个"是否要进行数据链接更新"的提示对话框,很是烦人.下面我们就让它"消失"并且自动更新数据链接. 单击"编辑→链接"命令打开"编辑链接"对话框.点击"启动提示"按钮,在弹出的对话框中选择"不显示该警告,但是更新链接"选项.返回Excel主界面,依次单击"工具-选项"

java中接口没有构造方法那子类怎么访问他的数据

问题描述 java中接口没有构造方法那子类怎么访问他的数据 感觉跟以往学过的知识冲突了?哪位大神能伸出援手解答下?这个问题困扰我好久了,希望走过路过的不要错过哈! 解决方案 接口有什么数据?接口就是一个定义,不存任何数据. 解决方案二: 接口中没有任何函数,接口不是类,不能构造一个接口.只能构造一个实现了这个接口的函数. 解决方案三: 接口就是公共方法的集合,只有公有常量和抽象方法,数据是类的属性,只有类才可以定义私有成员变量的. 解决方案四: 接口,不能实例化,没有构造方法 解决方案五: 接口

mfc vc6 0 sql 数据库-请问当SQL Server数据库中数据有更新的时候,如何通知到MFC上?

问题描述 请问当SQL Server数据库中数据有更新的时候,如何通知到MFC上? 编程环境VC6.0,在MFC对话框中添加了一些控件,并且能够显示数据库中内容, 使用的是CRecordSet类,读取完数据库后就调用Close关闭数据库了. 我想请问的是,如何实时的显示数据库内容? 或者当数据库数据有变更的时候才显示到 MFC的控件上? 解决方案 参考这个试试 SQL Server 2008 表数据改变后发送消息 http://blog.csdn.net/sliphades/article/de