PyOdps 0.4版本发布,从一个故事说起

有这么个故事(如有雷同,纯属巧合)。有一天,某运营同学给某开发同学一个excel文件,里面是个客户清单。

“帮我查下这些用户的消耗呢”。

开发同学扫了一眼,几百个用户。这个事肯定是可以办的,但是想到麻烦程度,开发同学心里肯定是有不少羊驼经过的啦。

“有点麻烦啊”,开发同学轻轻抱怨。

“我懂的,把这个表和ODPS里的表join下就好了嘛。”运营同学努努嘴。

“……”。于是,开发同学把excel数据导出成文本格式,然后dship上传到ODPS,ODPS上编写SQL,dship下载,大功告成。

这里说得很轻松,但其实整个过程真的挺麻烦呢。要是这个过程中还要对excel中的数据进行过滤,最终结果还要绘个图,还是需要不少时间。

但是,如果这个开发同学使用PyOdps 0.4+版本新特性,一切就都轻松写意了。

为了模拟这个过程,我们拿movielens 100K的数据做例子,现在本地有一个excel表格,里面有100个需要查询的用户,表格包含两个字段,分别是用户ID和年龄。在ODPS上,我们有一张电影评分表,现在我们要求出这100用户个中年龄在20-30之间,按每个年龄来求电影评分均值,并用条形图展现。

可以想象,这个过程如果按照前面的描述,有多麻烦。那么用PyOdps DataFrame API呢。

首先,我们读出本地Excel文件。

In [14]: from odps.df import read_excel

In [15]: users = read_excel('/Users/chine/userids.xlsx')

In [16]: users.head(10)
|==========================================|   1 /  1  (100.00%)         0s
Out[16]:
    id  age
0   46   27
1  917   22
2  217   22
3  889   24
4  792   40
5  267   23
6  626   23
7  433   27
8  751   24
9  932   58

In [40]: users.count()
|==========================================|   1 /  1  (100.00%)         0s
100

然后我们用join语句,过滤出来电影评分表中这些用户的评分数据。

In [17]: ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))

In [18]: ratings.head(10)
|==========================================|   1 /  1  (100.00%)         2s
Out[18]:
   user_id  movie_id  rating  unix_timestamp
0      196       242       3       881250949
1      186       302       3       891717742
2       22       377       1       878887116
3      244        51       2       880606923
4      166       346       1       886397596
5      298       474       4       884182806
6      115       265       2       881171488
7      253       465       5       891628467
8      305       451       3       886324817
9        6        86       3       883603013

In [25]: filter_ratings = ratings.join(users.filter(users.age.between(20, 30)), ('user_id', 'id'))[ratings, lambda x, y: y.age]
# 这里做字段抽取时,可以使用Collection,也可以使用lambda表达式,参数是左右两个Collection

In [26]: filter_ratings.head(10)
|==========================================|   1 /  1  (100.00%)        44s
Out[26]:
   user_id  movie_id  rating  unix_timestamp  age
0        3       350       3       889237076   23
1        3       332       1       889237224   23
2        3       327       4       889237455   23
3        3       341       1       889237055   23
4        3       317       2       889237482   23
5        3       336       1       889237198   23
6        3       322       3       889237269   23
7        3       323       2       889237269   23
8        3       339       3       889237141   23
9        3       268       3       889236961   23

然后我们就可以按年龄聚合,求出评分均值啦。绘图也一气呵成。

In [28]: age_ratings = filter_ratings.groupby('age').agg(lambda x: x.rating.mean())

In [29]: age_ratings.head(10)
|==========================================|   1 /  1  (100.00%)        30s
Out[29]:
   age  rating_mean
0   20     4.002309
1   21     4.051643
2   22     3.227513
3   23     3.519174
4   24     3.481013
5   25     3.774744
6   26     3.391509
7   27     3.355130
8   28     3.382883
9   29     3.705660

In [30]: age_ratings.plot(kind='bar', rot=45)
|==========================================|   1 /  1  (100.00%)        29s
Out[30]: <matplotlib.axes._subplots.AxesSubplot at 0x10b875f10>

超级简单,有木有!

这里的users其实是存在于本地的,而ratings是存在于ODPS上,用户依然可以join这两个Collection。其实对于0.4之前的版本,本地数据上传的接口也很容易(但是无法使用DataFrame API来进行本地过滤),但是对于0.4版本,不管一个Collection是存在于ODPS还是本地,用户都可以执行join和union的操作。

而这一切都源自0.4版本带来的新特性,DataFrame API的pandas计算后端。

DataFrame API使用pandas计算

我们知道,PyOdps DataFrame API类似于pandas的接口,但还是有些许不同的,那我们为什么不能用pandas来执行本地计算呢,这样也能充分利用pandas的一些特性,如支持各种数据输入。

所以,除了过去使用odps.models.Table来初始化DataFrame,我们也可以使用pandas DataFrame来初始化。

In [41]: import numpy as np

In [42]: import pandas as pd

In [44]: pandas_df = pd.DataFrame(np.random.random((10, 3)), columns=list('abc'))

In [45]: pandas_df
Out[45]:
          a         b         c
0  0.583845  0.301504  0.764223
1  0.153269  0.335511  0.455193
2  0.725460  0.460367  0.294741
3  0.315234  0.907264  0.849361
4  0.678395  0.642199  0.746051
5  0.977872  0.841084  0.931561
6  0.903927  0.846036  0.982424
7  0.347098  0.373247  0.193810
8  0.672611  0.242942  0.381713
9  0.461411  0.687164  0.514689

In [46]: df = DataFrame(pandas_df)

In [49]: type(df)
Out[49]: odps.df.core.DataFrame

In [47]: df.head(3)
|==========================================|   1 /  1  (100.00%)         0s
Out[47]:
          a         b         c
0  0.583845  0.301504  0.764223
1  0.153269  0.335511  0.455193
2  0.725460  0.460367  0.294741

In [48]: df[df.a < 0.5].a.sum()
|==========================================|   1 /  1  (100.00%)         0s
1.2770121422535428

这里转化成PyOdps DataFrame后,所有的计算也一样,变成延迟执行,PyOdps DataFrame在计算前的优化也同样适用。

这样做的好处是,除了前面我们提到的,能结合本地和ODPS做计算外;还有个好处就是方便进行本地调试。所以,我们可以用写出以下代码:

DEBUG = True

if DEBUG:
    # 这个操作使用tunnel下载,因此速度很快。对于分区表,需要给出所有分区值。
    df = ratings[:100].to_pandas(wrap=True)
else:
    df = ratings

在DEBUG的时候,我们能够利用PyOdps DataFrame在对原始表做切片操作时使用tunnel下载,速度很快的特性,选择原始表的一小部分数据来作为本地测试数据。值得注意的是, 本地计算通过不一定能在ODPS上也计算通过,比如自定义函数的沙箱限制

目前pandas计算后端尚不支持窗口函数。

apply和MapReduce API

使用apply对单行数据调用自定义函数

以前我们对于某个字段,能调用map来使用自定义函数,现在结合axis=1的apply,我们能对一行数据进行操作。

In [13]: ratings.apply(lambda row: row.rating / float(row.age), axis=1, reduce=True, types='float', names='rda').head(10)
|==========================================|   1 /  1  (100.00%)      1m44s
Out[13]:
        rda
0  0.166667
1  0.166667
2  0.208333
3  0.208333
4  0.125000
5  0.208333
6  0.166667
7  0.208333
8  0.208333
9  0.125000

reduce为True的时候,会返回一个sequence,详细参考文档

MapReduce API

PyOdps DataFrame API也提供MapReduce API。我们还是以movielens 100K为例子,看如何使用。

现在假设我们需要求出每部电影前两名的评分,直接上代码。

from odps.df import output

@output(['movie_id', 'movie_title', 'movie_rating'], ['int', 'string', 'int'])
def mapper(row):
    yield row.movie_id, row.title, row.rating

@output(['title', 'top_rating'], ['string', 'int'])
def reducer(keys):
    i = [0]
    def h(row, done):
        if i[0] < 2:
            yield row.movie_title, row.movie_rating
        i[0] += 1
    return h

In [7]: top_ratings = ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False)

In [10]: top_ratings.head(10)
|==========================================|   1 /  1  (100.00%)      3m48s
Out[10]:
               title  top_rating
0   Toy Story (1995)           5
1   Toy Story (1995)           5
2   GoldenEye (1995)           5
3   GoldenEye (1995)           5
4  Four Rooms (1995)           5
5  Four Rooms (1995)           5
6  Get Shorty (1995)           5
7  Get Shorty (1995)           5
8     Copycat (1995)           5
9     Copycat (1995)           5

利用刚刚说的本地DEBUG特性,我们也能使用本地计算来验证,计算结果能很快得出。人生苦短!

In [22]: local_ratings = ratings[:100].to_pandas(wrap=True)
|==========================================|   1 /  1  (100.00%)         2s

In [23]: local_ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False).head(10)
|==========================================|   1 /  1  (100.00%)         0s
Out[23]:
                                               title  top_rating
0  Shanghai Triad (Yao a yao yao dao waipo qiao) ...           5
1                              Twelve Monkeys (1995)           4
2                               Seven (Se7en) (1995)           4
3                         Usual Suspects, The (1995)           5
4                                 Postino, Il (1994)           3
5                          Mr. Holland's Opus (1995)           4
6                                 Taxi Driver (1976)           5
7                                       Crumb (1994)           5
8                                   Star Wars (1977)           5
9                                   Star Wars (1977)           5

cache机制

在0.4之前的版本,我们提供了一个persist接口,来保存执行结果。但是这个操作是个立即执行接口。现在我们提供cache接口,cache的collection会被单独计算,但不会立即执行。

In [25]: tmpdf = ratings[ratings.title.len() > 10].cache()

In [26]: tmpdf['title', 'movie_id'].head(3)
|==========================================|   1 /  1  (100.00%)        35s
Out[26]:
                  title  movie_id
0  Seven (Se7en) (1995)        11
1  Event Horizon (1997)       260
2      Star Wars (1977)        50

In [27]: tmpdf.count()  # tmpdf已经被cache,所以我们能立刻计算出数量
|==========================================|   1 /  1  (100.00%)         0s
99823

记住,目前的cache接口,计算的结果还是要落地的,并不是存放在内存中。

而一个collection如果已经被计算过,这个过程会自动触发cache机制,后续的计算过程会从这计算个向后进行,而不再需要从头计算。

其他特性

PyOdps 0.4版本还带来一些其他特性,比如join支持mapjoin(只对ODPS后端有效);Sequence上支持unique和nunique;execute_sql执行时支持设置hints,对于IPython插件,支持使用SET来设置hints,等等。

PyOdps下一步计划

对于PyOdps的DataFrame API来说,我们的短期目标是能完成ODPS SQL能做的所有事情,然后在这个基础上再带来更多SQL不容易做到的,但是却很有用的操作。现在,除了自定义聚合函数,我们已经能基本涵盖所有的SQL场景。

PyOdps非常年轻,期待大家来使用、提feature、贡献代码。

时间: 2024-09-19 09:08:25

PyOdps 0.4版本发布,从一个故事说起的相关文章

ThinkPHP 5.0.1版本发布,小版本大更新

喜迎国庆,ThinkPHP5.0.1版本发布,提前祝TPer国庆佳节快乐,事业顺利,用ThinkPHP开发更健康^_^ ThinkPHP V5.0--为API开发而设计的高性能框架hinkPHP5.0版本是一个颠覆和重构版本,官方团队历时十月,倾注了大量的时间和精力,采用全新的架构思想,引入了更多的PHP新特性,优化了核心,减少了依赖,实现了真正的惰性加载,支持composer,并针对API开发做了大量的优化,包括路由.日志.异常.模型.数据库.模板引擎和验证等模块都已经重构,不适合原有3.2项

OpenFaces 3.0 最终版本发布,支持JSF2.0

Dmitry Pikhulya在TeamDev Support社区上发帖称OpenFaces 3.0 最终版本发布了,并支持JSF2.0.全文翻译如下: OpenFaces 3.0 最终版本发布.本次发布主要关注与JSF2.0 规范的兼容性,JSF2.0 替代 JSF1.2 标准,使得开发更为简单. 现在使用JSF 2.0 开发的开发人员,可以使用很多成熟的 OpenFaces 组件,如 DataTable, TreeTable, DayTable, 及其它很多有用的组件.这里要特别感谢那些为早

CYQ.Data 数据框架 V4.0 开源版本发布(源码提供下载,秋色园V2.5版本标配框架)

说明的说明:   博客园团队两次移此文出首页,说 这篇文章不属于知识分享型文章,并且有广告嫌疑. 本文的确属于分享型文章,而且分享的知识点比其它文章都多很多,看看网友回复"谢谢分享"就知道是分享型文章了.   所谓广告嫌疑,这东西一被扣上,就很难说的清. 本框架从2007年就始发布在博客园,一直更新维护到现在,其中是有过渡到最新版本是收费,但是仍保留开放很多版本的开源的. 但目前发布的,都是开源的免费版本,再说,涉及到收费就是广告?ext也有收费版本,出现ext相关文章你咋不说是广告?

JRainbow 0.3版本发布

简单介绍       JRainbow是一款基于Spring+Hibernate+Struts2+Extjs4的企业级开发的Eclipse开源插件.主要用于快速开发,针对都是Java开发人员,只是为程序员节省部分时间.非傻瓜式.非一键建站,适用于二次开发.       JRainbow插件主要功能是支持数据库生成后台代码及简单的Extjs页面代码.支持多次生成代码.支持程序员二次开发.暂时只支持MySQL及Oracle数据库. 插件获取 百度网盘:http://pan.baidu.com/s/1

ThinkPHP V5.0.9 版本发布

###ThinkPHP V5.0--为API开发而设计的高性能框架 V5.0.9版本主要为BUG修正和改进,可以从5.0.8无缝升级,推荐更新,主要改进如下: 主要更新 修正模型一些已知问题优化数据库查询机制改进数据库断线重连判断修正社区反馈的一些BUG更多参考:更新日志 更新日志 [ 模型和数据库 ] 修正关联自动写入修正模型数据变化判断对为空数据的支持修正Query类的useSoftDelete方法返回值修正一对一嵌套关联数组定义的问题修正使用了手动参数绑定的时候的缓存BUG改进数据库类的一

jemalloc 5.0.0 全新版本发布,内存分配管理

jemalloc 发布了全新的 5.0.0 版本.与以前所有的版本不同,新版本不使用自然对齐的"chunks"进行虚拟内存管理,而是使用页面对齐的"extents". jemalloc 是一个通用的 malloc(3)实现,它强调了分段回避和可伸缩并发支持.jemalloc 在 2005 年首次作为 FreeBSD libc 分配器使用,2010年,jemalloc 的功能延伸到如堆分析和监控/调优等.现代的 jemalloc 版本依然集成在 FreeBSD 中.

傲游云浏览器Maxthon V1.0 Linux版本发布

傲游浏览器正式发布了针对桌面 Linux 平台的云浏览器 1.0 版本.作为针对桌面 Linux 的首次发布,傲游带来了一些 Win 现有用户喜闻乐见的功能:实现傲游帐户的表格自动填充.可以显示上一次打开的页面.在收藏夹可以通过悬停切换文件夹.以及全新的UI界面. 由于本人从未是傲游用户,更多功能还是有待系列粉丝发掘吧. 传送门: 官方 Deb, RPM 及 Tar 下载 PS:目测是使用 WebCore/WebKit 作为引擎的. 查看本栏目更多精彩内容:http://www.bianceng

微软发布TypeScript 2.0 RC版本

微软释放了TypeScript 2.0的发布候选版本,包含了tagged union功能以及对globs的支持. 在微软博客的声明中,TypeScript的项目主管Daniel Rosenwasser这样说到: "这个RC版本能够帮助我们了解完整的2.0版本会是什么样子,我们正在寻求广泛的反馈,使其更加稳定,从而让2.0成为一个可靠的发布版本.整体而言,对于通常的使用来说,这个RC版本足够稳定,从这个时间点开始,我们不会再添加重要的新特性了." 其实,从2.0 beta版本发布到现在,

Synfig Studio 0.63.01发布 2D矢量动画制作软件

Synfig是一款工业级.强大的2D矢量http://www.aliyun.com/zixun/aggregation/13470.html">动画制作软件,能够用最少的人力和资源制作出电影品质的动画.Synfig Studio是开源免费软件,可用于Windows,Linux和MacOS X操作系统. Synfig Studio 0.63.01版本发布了一个dash选项,用于创建高级的轮廓,新的图标和画布窗口的重排,均匀的链接到bline选项.网格是基于变焦,一些错误修正. 软件信息:ht