大数据计算服务(MaxCompute,原名 ODPS)是一种快速、完全托管的 GB/TB/PB 级数据仓库解决方案。
https://help.aliyun.com/document_detail/27800.html?spm=5176.7840267.6.539.po3IvS
主要有三种操作数据的方式SQL,UDF,MapReduce,了解hadoop的同学就比较熟悉这些东西了。
那么Maxcompute的SQL和标准SQL最大的区别就是在Maxcompute中SQL会被解析成MapReduce去执行,当然也可以直接去写MapReduce去计算数据,UDF就是当自带的一些sql引用的函数不能满足业务计算的时候,自己通过代码编写一个函数,sql执行的时候引用。
由此可见实际上底层的计算都是依靠MapReduce这个计算引擎去执行。首先了解下什么是MapReduce。一份数据很大的时候在MaxCompute上是分布式存储的,也就是会分开存放到很多服务器,当一个任务执行的时候会从这些数据所在的服务器上启动一个进程读取这些数据,进行计算等操作,还会启动一个进程把这些数据进行汇总分析并输出。那前者进程叫做Map,后者进程叫做Reduce,合起来叫MapReduce任务。
使用sql操作数据的时候,会经常用到join。比如select * from A a join B b on a.id=b.id,这句sql在转换成MapReduce任务执行的时候:
1,map任务读数据,并对两个表的数据打上不同的tag用来区分
2,reduce端接收打标记的数据,将不同标记的表数据相同关联字段的数据
假设有两个表,我们暂且叫做Big表和Small表,其中Big表数据量比较大,分布式存在n台实例服务器上,Small表存在于一台服务器就放下了。
首选MaxCompute会启动一些Map的进程(Map任务)去读取这些数据分别打上标记,Map的个数是由一个参数控制的这里暂时不解释了。注意对于读取Big表的每个Map任务有可能在其他服务器上,那么这时候就需要到数据所在的服务器上把数据拉过来,Small表也会启动一个或者几个map任务读取文件系统中的数据,读取完成后会到Reduce端接收数据进行关联,判断关联字段相等的就放在一起输出,达到关联效果。
我们可以看一个例子,我准备了一个相对大的表train_user_lt,5G大小,数据大概7亿条。
准备了一个比较小的表map_join_test,只有3条数据。
select a.* from train_user_lt a left outer join map_join_test b on a.user_id = b.user_id;
执行了这句sql,如图
这个执行的过程图是Maxcompute特有的可以帮助用户来查看任务执行的过程等叫做logview,是一个在ODPS Job提交后查看和Debug任务的工具https://help.aliyun.com/document_detail/27987.html
从图中可以看出分为三部分
1,大的表train_user_lt启动了39个map任务去读取数据707025259条
2,小的表启动一个map任务读取3条数据。
3,reduce阶段接收了3+707025259=707025262条数据,输出了707025259条数据,left outer join按照左边的大表输出。
但是看下消耗的时间是40分钟,这样来说算是很长的时间的。那么怎么优化提高速度呢,有没有一种比较方便,比较直接暴力的方式进行优化呢
那么本文的重点就来了--Mapjoin:
MAPJION会把小表全部读入内存中,把小表拷贝多份分发到大表数据所在实例上的内存里,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率会高很多。
使用的条件就是当一个大表和一个或多个小表做join时。SQL会将用户指定的小表全部加载到执行join操作的程序的内存中,从而加快join的执行速度。需要注意,在Maxcompute使用mapjoin时:
left outer join的左表必须是大表;
right outer join的右表必须是大表;
inner join左表或右表均可以作为大表;
full outer join不能使用mapjoin;
mapjoin支持小表为子查询;
使用mapjoin时需要引用小表或是子查询时,需要引用别名;
在mapjoin中,可以使用不等值连接或者使用or连接多个条件;
目前MaxCompute 在mapjoin中最多支持指定8张小表,否则报语法错误;
如果使用mapjoin,则所有小表占用的内存总和不得超过512MB。请注意由于MaxCompute 是压缩存储,因此小表在被加载到内存后,数据大小会急剧膨胀。此处的512MB限制是加载到内存后的空间大小;
多个表join时,最左边的两个表不能同时是mapjoin的表。
那么为什么说left outer join的左表必须是大表呢,
因为左表是大表的时候,会拿小表的全部数据和大表所在的实例服务器中的数据匹配一遍,刚好小表就在内存里。如果是左表是小表,那么需要把大表所有的数据拉过来跟小表匹配一遍,试想一下性能会如何。
来看下写法
select / + mapjoin(b) / a.* from train_user_lt a left outer join map_join_test b on a.user_id = b.user_id;
//就是在sql语句前加一个标记说这是mapjoin,把小表别名写在括号里
看下优化后的效果
任务变成了两个部分,map端直接读取数据和内存里的小表进行关联,然后输出,少了一步reduce。也就是说关联从reduce转到map端进行join,省去了reduce这一步,所以叫做:mapjoin。
看下执行时间1分钟20多秒。之前是40分钟。当然我这边测试是把两个比较极端的数据进行比较,所以效果比较明显。由此看来大表关联小表的时候可以使用mapjoin进行优化查询。
那么mapjoin除了优化性能,还可以干什么呢。
MaxCompute SQL不支持支持在普通join的on条件中使用不等值表达式、or ,like等逻辑等复杂的join条件,但是在mapjoin中可以进行如上操作。例如
select /+ mapjoin(a) /
a.total_price,
b.total_price
from shop a join sale_detail b
on a.total_price < b.total_price or a.total_price + b.total_price < 500;
总结:mapjoin看似很小的操作变化,实际上可以带来很大效率提升,另外还可以解决一些不等关联的业务场景。
正如马云经常说的一句话:
small is beautiful,small is powerful !