有这么个故事(如有雷同,纯属巧合)。有一天,某运营同学给某开发同学一个excel文件,里面是个客户清单。
“帮我查下这些用户的消耗呢”。
开发同学扫了一眼,几百个用户。这个事肯定是可以办的,但是想到麻烦程度,开发同学心里肯定是有不少羊驼经过的啦。
“有点麻烦啊”,开发同学轻轻抱怨。
“我懂的,把这个表和ODPS里的表join下就好了嘛。”运营同学努努嘴。
“……”。于是,开发同学把excel数据导出成文本格式,然后dship上传到ODPS,ODPS上编写SQL,dship下载,大功告成。
这里说得很轻松,但其实整个过程真的挺麻烦呢。要是这个过程中还要对excel中的数据进行过滤,最终结果还要绘个图,还是需要不少时间。
但是,如果这个开发同学使用PyOdps 0.4+版本新特性,一切就都轻松写意了。
为了模拟这个过程,我们拿movielens 100K的数据做例子,现在本地有一个excel表格,里面有100个需要查询的用户,表格包含两个字段,分别是用户ID和年龄。在ODPS上,我们有一张电影评分表,现在我们要求出这100用户个中年龄在20-30之间,按每个年龄来求电影评分均值,并用条形图展现。
可以想象,这个过程如果按照前面的描述,有多麻烦。那么用PyOdps DataFrame API呢。
首先,我们读出本地Excel文件。
In [14]: from odps.df import read_excel
In [15]: users = read_excel('/Users/chine/userids.xlsx')
In [16]: users.head(10)
|==========================================| 1 / 1 (100.00%) 0s
Out[16]:
id age
0 46 27
1 917 22
2 217 22
3 889 24
4 792 40
5 267 23
6 626 23
7 433 27
8 751 24
9 932 58
In [40]: users.count()
|==========================================| 1 / 1 (100.00%) 0s
100
然后我们用join语句,过滤出来电影评分表中这些用户的评分数据。
In [17]: ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))
In [18]: ratings.head(10)
|==========================================| 1 / 1 (100.00%) 2s
Out[18]:
user_id movie_id rating unix_timestamp
0 196 242 3 881250949
1 186 302 3 891717742
2 22 377 1 878887116
3 244 51 2 880606923
4 166 346 1 886397596
5 298 474 4 884182806
6 115 265 2 881171488
7 253 465 5 891628467
8 305 451 3 886324817
9 6 86 3 883603013
In [25]: filter_ratings = ratings.join(users.filter(users.age.between(20, 30)), ('user_id', 'id'))[ratings, lambda x, y: y.age]
# 这里做字段抽取时,可以使用Collection,也可以使用lambda表达式,参数是左右两个Collection
In [26]: filter_ratings.head(10)
|==========================================| 1 / 1 (100.00%) 44s
Out[26]:
user_id movie_id rating unix_timestamp age
0 3 350 3 889237076 23
1 3 332 1 889237224 23
2 3 327 4 889237455 23
3 3 341 1 889237055 23
4 3 317 2 889237482 23
5 3 336 1 889237198 23
6 3 322 3 889237269 23
7 3 323 2 889237269 23
8 3 339 3 889237141 23
9 3 268 3 889236961 23
然后我们就可以按年龄聚合,求出评分均值啦。绘图也一气呵成。
In [28]: age_ratings = filter_ratings.groupby('age').agg(lambda x: x.rating.mean())
In [29]: age_ratings.head(10)
|==========================================| 1 / 1 (100.00%) 30s
Out[29]:
age rating_mean
0 20 4.002309
1 21 4.051643
2 22 3.227513
3 23 3.519174
4 24 3.481013
5 25 3.774744
6 26 3.391509
7 27 3.355130
8 28 3.382883
9 29 3.705660
In [30]: age_ratings.plot(kind='bar', rot=45)
|==========================================| 1 / 1 (100.00%) 29s
Out[30]: <matplotlib.axes._subplots.AxesSubplot at 0x10b875f10>
超级简单,有木有!
这里的users其实是存在于本地的,而ratings是存在于ODPS上,用户依然可以join这两个Collection。其实对于0.4之前的版本,本地数据上传的接口也很容易(但是无法使用DataFrame API来进行本地过滤),但是对于0.4版本,不管一个Collection是存在于ODPS还是本地,用户都可以执行join和union的操作。
而这一切都源自0.4版本带来的新特性,DataFrame API的pandas计算后端。
DataFrame API使用pandas计算
我们知道,PyOdps DataFrame API类似于pandas的接口,但还是有些许不同的,那我们为什么不能用pandas来执行本地计算呢,这样也能充分利用pandas的一些特性,如支持各种数据输入。
所以,除了过去使用odps.models.Table来初始化DataFrame,我们也可以使用pandas DataFrame来初始化。
In [41]: import numpy as np
In [42]: import pandas as pd
In [44]: pandas_df = pd.DataFrame(np.random.random((10, 3)), columns=list('abc'))
In [45]: pandas_df
Out[45]:
a b c
0 0.583845 0.301504 0.764223
1 0.153269 0.335511 0.455193
2 0.725460 0.460367 0.294741
3 0.315234 0.907264 0.849361
4 0.678395 0.642199 0.746051
5 0.977872 0.841084 0.931561
6 0.903927 0.846036 0.982424
7 0.347098 0.373247 0.193810
8 0.672611 0.242942 0.381713
9 0.461411 0.687164 0.514689
In [46]: df = DataFrame(pandas_df)
In [49]: type(df)
Out[49]: odps.df.core.DataFrame
In [47]: df.head(3)
|==========================================| 1 / 1 (100.00%) 0s
Out[47]:
a b c
0 0.583845 0.301504 0.764223
1 0.153269 0.335511 0.455193
2 0.725460 0.460367 0.294741
In [48]: df[df.a < 0.5].a.sum()
|==========================================| 1 / 1 (100.00%) 0s
1.2770121422535428
这里转化成PyOdps DataFrame后,所有的计算也一样,变成延迟执行,PyOdps DataFrame在计算前的优化也同样适用。
这样做的好处是,除了前面我们提到的,能结合本地和ODPS做计算外;还有个好处就是方便进行本地调试。所以,我们可以用写出以下代码:
DEBUG = True
if DEBUG:
# 这个操作使用tunnel下载,因此速度很快。对于分区表,需要给出所有分区值。
df = ratings[:100].to_pandas(wrap=True)
else:
df = ratings
在DEBUG的时候,我们能够利用PyOdps DataFrame在对原始表做切片操作时使用tunnel下载,速度很快的特性,选择原始表的一小部分数据来作为本地测试数据。值得注意的是, 本地计算通过不一定能在ODPS上也计算通过,比如自定义函数的沙箱限制 。
目前pandas计算后端尚不支持窗口函数。
apply和MapReduce API
使用apply对单行数据调用自定义函数
以前我们对于某个字段,能调用map来使用自定义函数,现在结合axis=1的apply,我们能对一行数据进行操作。
In [13]: ratings.apply(lambda row: row.rating / float(row.age), axis=1, reduce=True, types='float', names='rda').head(10)
|==========================================| 1 / 1 (100.00%) 1m44s
Out[13]:
rda
0 0.166667
1 0.166667
2 0.208333
3 0.208333
4 0.125000
5 0.208333
6 0.166667
7 0.208333
8 0.208333
9 0.125000
reduce为True的时候,会返回一个sequence,详细参考文档。
MapReduce API
PyOdps DataFrame API也提供MapReduce API。我们还是以movielens 100K为例子,看如何使用。
现在假设我们需要求出每部电影前两名的评分,直接上代码。
from odps.df import output
@output(['movie_id', 'movie_title', 'movie_rating'], ['int', 'string', 'int'])
def mapper(row):
yield row.movie_id, row.title, row.rating
@output(['title', 'top_rating'], ['string', 'int'])
def reducer(keys):
i = [0]
def h(row, done):
if i[0] < 2:
yield row.movie_title, row.movie_rating
i[0] += 1
return h
In [7]: top_ratings = ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False)
In [10]: top_ratings.head(10)
|==========================================| 1 / 1 (100.00%) 3m48s
Out[10]:
title top_rating
0 Toy Story (1995) 5
1 Toy Story (1995) 5
2 GoldenEye (1995) 5
3 GoldenEye (1995) 5
4 Four Rooms (1995) 5
5 Four Rooms (1995) 5
6 Get Shorty (1995) 5
7 Get Shorty (1995) 5
8 Copycat (1995) 5
9 Copycat (1995) 5
利用刚刚说的本地DEBUG特性,我们也能使用本地计算来验证,计算结果能很快得出。人生苦短!
In [22]: local_ratings = ratings[:100].to_pandas(wrap=True)
|==========================================| 1 / 1 (100.00%) 2s
In [23]: local_ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False).head(10)
|==========================================| 1 / 1 (100.00%) 0s
Out[23]:
title top_rating
0 Shanghai Triad (Yao a yao yao dao waipo qiao) ... 5
1 Twelve Monkeys (1995) 4
2 Seven (Se7en) (1995) 4
3 Usual Suspects, The (1995) 5
4 Postino, Il (1994) 3
5 Mr. Holland's Opus (1995) 4
6 Taxi Driver (1976) 5
7 Crumb (1994) 5
8 Star Wars (1977) 5
9 Star Wars (1977) 5
cache机制
在0.4之前的版本,我们提供了一个persist接口,来保存执行结果。但是这个操作是个立即执行接口。现在我们提供cache接口,cache的collection会被单独计算,但不会立即执行。
In [25]: tmpdf = ratings[ratings.title.len() > 10].cache()
In [26]: tmpdf['title', 'movie_id'].head(3)
|==========================================| 1 / 1 (100.00%) 35s
Out[26]:
title movie_id
0 Seven (Se7en) (1995) 11
1 Event Horizon (1997) 260
2 Star Wars (1977) 50
In [27]: tmpdf.count() # tmpdf已经被cache,所以我们能立刻计算出数量
|==========================================| 1 / 1 (100.00%) 0s
99823
记住,目前的cache接口,计算的结果还是要落地的,并不是存放在内存中。
而一个collection如果已经被计算过,这个过程会自动触发cache机制,后续的计算过程会从这计算个向后进行,而不再需要从头计算。
其他特性
PyOdps 0.4版本还带来一些其他特性,比如join支持mapjoin(只对ODPS后端有效);Sequence上支持unique和nunique;execute_sql执行时支持设置hints,对于IPython插件,支持使用SET来设置hints,等等。
PyOdps下一步计划
对于PyOdps的DataFrame API来说,我们的短期目标是能完成ODPS SQL能做的所有事情,然后在这个基础上再带来更多SQL不容易做到的,但是却很有用的操作。现在,除了自定义聚合函数,我们已经能基本涵盖所有的SQL场景。
PyOdps非常年轻,期待大家来使用、提feature、贡献代码。
- 安装方法:pip install pyodps
- Github:https://github.com/aliyun/aliyun-odps-python-sdk
- 文档:http://pyodps.readthedocs.org/
- bug report:https://github.com/aliyun/aliyun-odps-python-sdk/issues