第1章 初识Pig
Pig编程指南
1.1 Pig是什么?
Pig提供了一个基于Hadoop的并行地执行数据流处理的引擎。它包含了一种脚本语言,称为Pig Latin,用来描述这些数据流。Pig Latin本身提供了许多传统的数据操作(如join、sort、filter等),同时允许用户自己开发一些自定义函数用来读取、处理和写数据。
Pig是一个Apache开源项目。这意味着用户可以免费下载源码或者二进制包,自由使用它,对这个项目贡献自己的代码,同时也可以在Apache License的许可范围下将Pig用到自己的产品中或者在需要的时候修改代码来满足特定需求。
1.1.1 Pig是基于Hadoop的
Pig运行于Hadoop之上,它同时使用到Hadoop分布式文件系统HDFS和Hadoop处理系统MapReduce。
HDFS是一个分布式文件系统,它将文件存储到Hadoop集群的各个节点上。它负责将文件分割成许多数据块然后分发到不同的节点机器上,其中包括对每个数据块进行多份冗余备份,这样可以避免因为某台机器宕掉而造成数据丢失。HDFS提供了一种类似POSIX的用户交互形式给用户。默认情况下,Pig从HDFS中读取输入文件,使用HDFS来存放MapReduce任务所生成的中间数据,最终将输出写入HDFS中。在第11章,用户将看到Pig不只是可以从HDFS中读取输入文件或将输出文件写入HDFS的。
MapReduce是一个简单而强大的并行数据处理算法。MapReduce计算框架下的每个任务都由3个主要阶段组成:map阶段、shuffle阶段和reduce阶段。在map阶段,程序可以并行独立操作输入数据中的每一条记录。因为可以同时运行多个map任务,所以即使输入的数据量达到GB或者TB级别,只要有足够多的机器,map阶段通常在1分钟内就可以完成。
MapReduce任务的一个特别之处在于需要确定数据是根据哪个键进行收集的。例如,假设用户在处理一个网站的Web服务器日志,而且这个网站需要用户登录后才能操作,那么用户就可能会使用用户ID作为数据收集的键,因为通过这个用户ID就可以知道每个用户在这个网站上的对应的所有操作。map阶段后紧跟着就是shuffle阶段,在这个阶段数据已经根据用户指定的键收集起来并且分发到不同的机器上去了,这是为reduce阶段做准备。包含同一键的所有记录将会交由同一个reducer处理。
在reduce阶段,程序将提取每个键以及包含该键的所有记录。这个过程也是在多台机器上并行执行完成的。当处理完所有组时,reducer就可以写输出了。下面我们将通过一个简单的MapReduce程序进行演示。想更多地了解MapReduce是如何工作的,请看附录B“MapReduce介绍”。
MapReduce演示程序
假设现在有一个MapReduce程序对一个文本文件进行词频统计。该程序本身是MapReduce提供的演示程序。在这个例子中,map阶段会从文本文件中一次读取一行,然后分割出每个词作为一个字符串,之后对于分割出的每个单词,会输出单词本身以及数字1,数字1表示这个单词出现过1次。在shuffle阶段,将使用单词作为键,哈希分发对应的记录到不同的reducer中去。在reduce 阶段会将相同的单词对应的出现次数相加,并最终将求和后的数值和单词本身一起输出。以童谣“Mary Had a Little Lamb”为例,输入将是:
Mary had a little lamb
its fleece was white as snow
and everywhere that Mary went
the lamb was sure to go.
这里假设每一行都被发送到不同的map任务中去了。当然事实上,每个map任务处理的数据要远远大于这个数量,这里只是为了后面更好地去描述。MapReduce整个过程的数据流如图1-1所示。
map阶段一旦结束,shuffle阶段将会把包含相同单词的所有记录提交到同一个reducer中。对于这个例子我们假设有两个reducer:以A~L开头的单词提交到第一个reducer中,而以M~Z开头的单词提交到第二个reducer中。这两个reducer最终将会把每个单词的出现次数分别相加然后输出。
Pig的所有数据处理过程都是使用MapReduce来执行的。Pig将用户所写的Pig Latin脚本编译成一个或者多个MapReduce任务,然后在Hadoop上执行。例子1-1展示了如何使用Pig Latin脚本来对童谣“Mary Had a Little Lamb”进行词频统计。
例1-1 使用Pig对童谣“Mary和她的羔羊”进行词频统计
--加载文件名为Mary的文件,
--并将记录中的唯一字段命名为‘line’。
input = load 'mary' as (line);
--TOKENIZE将line按单词分割成列
--flatten 接受TOKENIZE 操作后产生的记录集合然后分开成独立的列,
-- 这个独立的列称为word
words = foreach input generate flatten(TOKENIZE(line)) as word;
-- 现在按照word进行分组
grpd = group words by word;
-- 计数
cntd = foreach grpd gengerate group, COUNT(words);
-- 打印结果
dump cntd;
在使用Pig时无须去过度关注map、shuffle和reduce阶段,因为Pig会将脚本中的操作解析成相应的MapReduce阶段。
1.1.2 Pig Latin,一种并行数据流语言
Pig Latin是一种数据流语言,这意味着它允许用户去描述如何从一个或多个数据源并行读取数据,然后并行地进行处理,最后将处理结果并行地输出到一个或多个结果集中。这些数据流可以像前面提到的那个词频统计例子一样是个简单的线性流。同时它们也可以是复杂的工作流,其中可以包含一些加入多个输入的节点,也可以包含一些将输入数据分割成多个流的节点,这些节点都是通过不同的操作符来处理的。用数学语言来描述的话,Pig Latin 描述的是一个有向无环图(DAG),在这个图中,节点代表处理数据的操作符,节点间的向量代表数据流。
这意味着Pig Latin和用户之前见过的许多编程语言会有所不同。在Pig Latin中没有if语句,也没有for循环操作。这是因为传统的过程语言和面向对象语言描述的是控制流,而数据流只处于一个从属地位。而Pig Latin更专注于数据流。想了解Pig Latin脚本中如何在处理数据流的同时加入控制流,请阅读第9章。
查询语言和数据流语言的比较
大体一瞥,人们会说Pig Latin不过是SQL的一个面向过程化的版本。尽管确实有一定的相似性,但是其实两者具有非常多的差异。SQL是一种查询语言,它关注于允许用户构造查询,它允许用户去描述他们想得到什么问题的答案,而不是如何给出问题的答案。然而在Pig Latin中,用户可以详细描述如何对输入的数据进行处理。
Pig Latin和SQL的另一个主要区别是SQL面向的是回答一个问题,因此当用户想同时进行多个数据操作时,他们要么使用多个查询语句,这时需要将一些查询的中间数据存放到临时表中;要么写一个大的包含子查询的查询语句,将一些初始的处理过程由子查询来完成。然而,很多用户发现子查询令人困惑而且也并非那么容易去构建。同时,子查询使用的是由内而外的设计,也就是说,在数据管道最里面的子查询会最先执行。
Pig被设计为实现知道将要进行的一系列的数据操作,因此不需要通过颠倒顺序的子查询的方式来写数据管道,也无需使用临时表来存放中间数据。这点将通过例子1-2和例子1-3来进行演示。
现在假设有个用户想先按某个键对表进行group分组操作,然后和第二张表进行join连接操作。在SQL查询中,因为join操作发生在group操作之后,所以要么使用子查询,要么写两个查询语句,同时将中间结果保存到临时表中。例子1-3用到了一个临时表,因为这样可读性要好些。
例1-2 SQL中先进行分组然后进行连接操作
CREATE TEMP TABLE t1 AS
SELECT customer, sum(purchase) AS total_purchases
FROM transactions
GROUP BY customer;
SELECT customer, total_purchases, zipcode
FROM t1, customer_profile
WHERE t1.customer = customer_profile.customer;
在Pig Latin中,是另外一种方式,如例1-3所示。
例1-3 Pig Latin中先进行分组然后进行连接操作
-- 加载汇报文件,按照customer字段进行分组,然后计算他们的总购物金额
txns = load 'transactions' as (customer, purchase);
grouped = group txnx by customer;
total = foreach grouped generate group, SUM(txns.purchase) as tp;
-- 加载 customer_profile 文件
profile = load 'customer_profile'as (customer, zipcode);
-- 对已经分好组并进行了累加计算的汇报文件数据和customer_profile文件进行连接
answer = join total by group, profile by customer;
-- 将结果输出到控制台
dump answer;
此外,SQL和Pig Latin各因不同的应用场景而生。SQL的应用场景是RDBMS,在这种场景下,数据是标准化的,并且加上了模式和其他一些特有的约束(例如,null值也是不可以脱离约束单独存在的等)。Pig是为Hadoop数据处理环境而设计的,在这种环境下,模式有时是未知的或不一致的,数据可能没有进行恰当的约束而且很少进行数据标准化。基于这些不同,Pig不需要将数据事先导入表中,当数据导入HDFS中后,它就可以直接操作这些存放在HDFS的数据。
如果语言和文化类似,那么融入一个新的环境可能会更加容易些。我和妻子一起去过法国几次。我会讲很少的法语,但是因为是商业语言(或许是因为美国人和大不列颠人喜欢到法国度假),对于我来说法语中已经包含了足够多的英语口语,使我足够应付得了。而我的妻子,她会讲法语。她在法国有朋友去拜访时,她可以和他们很好地交谈。她可以去那些不在通常的旅游线路上的其他景区探险。她的法国经历比我要多得多,因为她会讲当地本土语言——法语。
在数据处理范畴里,SQL就是英语。它有个非常好的特点就是无论是人还是工具都认识它,也就是说它的入门门槛很低。我们的目标是使Pig成为像Hadoop这样的并行数据处理系统范畴里的母语。尽管这可能要求用户需要进行一定的学习才能使用,但是它可以让用户更加充分地利用Hadoop提供的计算能力。
Pig和MapReduce的区别是什么
我刚刚声明Pig团队的一个目标是使Pig Latin成为像Hadoop这样的并行数据处理环境的母语。但是难道MapReduce提供的还不够吗?有必要使用Pig吗?
Pig比直接使用MapReduce有几个优点。Pig Latin 提供了所有标准的数据处理操作,例如join、filter、group by、order by、union等。MapReduce直接提供了group by操作(也就是shuffle和reduce两个阶段做的事情),同时通过实现分组操作间接地提供了order by 操作。过滤器操作和推测执行操作可以在map阶段进行简单实现。但是其他的操作,特别是join操作无法提供,所以必须由用户自己进行代码实现。
Pig提供了一些对这些标准的数据操作的复杂的、完备的实现。例如,因为每个键对应的记录的个数很少是均匀地分布在集群中的,所以提交给reducer的数据经常会产生数据倾斜。也就是说,有的reducer需要比别的reducer处理10倍或更多倍的数据。Pig具有join和order by 操作可以处理这种情况,而且(在一些情况下)可以重新均衡reducer负荷。这些需要Pig团队花费好几个月的时间编写MapReduce程序,然后再重构代码,这确实耗费时间。
在MapReduce中,在map阶段和reduce阶段的内部的数据处理对于系统来说是不透明的。这意味着MapReduce没有机会优化或者检查用户的代码。Pig另一方面,可以通过分析Pig Latin脚本来了解用户描述的数据流。这意味着Pig可以在早期进行错误检查(例如用户是否将一个string类型的字段放到一个integer类型的字段中?)和进行优化(例如这两个group操作是否可以合并?)
MapReduce没有一个类型系统,这是有意这么设计的,因为这样可以给用户更大的自由度去使用他们自己的数据类型和序列化框架。但这样就产生了一个不好的问题,就是限制了系统在运行前和运行时对用户代码进行检查的能力。
这几个方面都表明Pig Latin相对于MapReduce Java代码更容易编写和维护。我做了一个并非科学的实验,对于同一个操作我分别使用Pig Latin和MapReduce进行实现。假设有个文件存有用户数据,另一文件存放了对于某个网站的点击数据,例子1-4所示的Pig Latin脚本将找到年龄为18~25岁的用户访问最多的5个页面。
例1-4 查找访问次数最多的前5个URL
Users = load 'users' as (name, age);
Fltrd = filter Users by age >= 18 and age <=25;
Pages = load 'pages' as (user, url);
Jnd = join Fltrd by name, Pages by user;
Grpd = group Jnd by url;
Smmd = foreach Grpd generate group, COUNT(Jnd) as clicks;
Srtd = order Smmd by clicks desc;
Top5 = limit Srtd 5;
store Top5 into 'top5sites';
这段脚本的第1行表示加载文件名为users的文件,同时声明这份数据有两个字段:name和age,而且为这个输入取别名为Users。第2行是个过滤器,将Users中age这个字段值大于等于18而且小于等于25的记录过滤出来,不满足条件的数据将被忽略。经过过滤器后,留下的数据就是在我们感兴趣的年龄范围内的了。我们将这个过滤器的结果取别名为Fltrd。
第3行是第2个load加载数据语句,这个语句加载了文件Pages,并取别名为Pages,它声明了两个字段:user和url。
“Jnd = join”这一行以Fltrd.name和Pages.user为键,对Fltrd和Pages进行join连接操作。通过这次join操作我们就可以得到每个用户访问过的所有URL链接了。
“Grpd = group”这一行按照URL进行分组。结果是每一个url,例如pignews.com/frontpage,都对应着一组url字段中包含了对应值的所有记录。紧跟着的下一行会统计每个URL对应的记录个数。这一行后我们就知道了,对于每个URL,被年龄为18~25岁的用户访问了多少次。
之后的一件事就是按访问次数从访问最多到访问最少进行排序。“Srtd = order”这一行就是根据前一行的统计结果进行desc(降序)排列。因此,最大值将在第1行。因为最终我们只需要最前面的5条记录,所以最后一行将统计结果限制在前5行。最后的结果重新存放到HDFS中一个叫做top5sites的文件中。
在Pig Latin中整个处理过程需要写9行代码,耗时在15分钟左右,其中包括写代码和对代码进行调试的时间。如果以MapReduce(这里省略了)来写的话,需要差不多170行的代码而且花费了我4个小时的时间才调试成功。Pig Latin同样利于维护,因为这段代码,对于后来的其他开发者同样是容易理解和方便修改的。
当然Pig所带来的这些便利同样是有代价的。通过MapReduce框架可以开发一些算法,在Pig中却很难实现。同时对于开发者,他们需要放弃一个层次的控制权。一名优秀的工程师,只有给予足够的时间,总是可以将一个普通的系统做得足够好。因此对于不常见的算法或者是对于性能要求很高的话,这种情况下使用MapReduce仍然是正确的选择。基本上这种情况也和选择Java编码而不选择使用像Python这样的脚本语言的情形是一样的。Java功能强大,但是因为它是高级程序语言,所以使用它开发需要比脚本语言花费更多的时间。开发者需要根据实际情况选择合适的工具。
1.1.3 Pig的用途
以我的经验,Pig Latin的使用场景可以分为独立的三大类:传统的抽取转换加载(ETL)数据流、原生数据研究和迭代处理。
最大的使用场景就是数据流了。一个通常的例子就是网络公司从他们的Web服务器上收集到日志,进行数据清洗,之后进行简单的聚合预计算,然后导入数据仓库中。在这种情况下,数据被加载到计算网格中,之后使用Pig从数据泥潭中清理出有价值的数据。同时还可以使用Pig将用户网页操作数据和用户数据库信息进行join连接,这样可以将用户cookie和已知的用户信息关联起来。
另外一个数据流应用的例子是使用Pig处理离线数据来建立用户行为预测模型。Pig被用来扫描所有的用户和网站的交互数据,最终将用户分为各种各样的群组。然后,对于每个群组会生成一个数学模型,根据该模型可以预知这个群组的用户对各种类型的广告或者新闻文章的反映是什么样子的。通过这种方式,网站可以知道展示什么样的广告可以更有可能获得更多的点击,或者发布什么样的新闻故事可以更有可能吸引用户和挽留用户再次访问。
传统上,使用像SQL这样的语言执行点对点的查询可以快速地为问题准备好相应的数据。然而,对于原始数据的研究,一些用户还是偏向使用Pig Latin脚本。因为Pig可以在无模式,模式信息不全,或者模式不一致的情况下进行操作,同时因为Pig可以很容易地控制封装的数据,因此对于那些期望在数据没有进行清洗也没有写入数据仓库的情况下,分析数据的研究人员经常更偏好于使用Pig。经常处理大规模数据集的研究人员经常会使用像Perl或者Python这样的脚本语言进行处理。具有这些使用背景的用户通常更喜欢使用Pig这样的数据流范式而非像SQL那样的声明式查询语言。
创建迭代处理模型的用户也开始使用Pig。假设有一个新闻门户网站,它保留了一个它跟踪的关于该网站的所有新闻故事的图。在这个图中每个新闻故事都是一个节点,节点间的连线表示的是相关故事间的关系。例如,所有关于即将来临的选举的故事都是联系到一起的。每5分钟都有一组新的故事进来,这时数据处理引擎需要将这组故事增加到图中。这些故事中有一些是新的,有一些是对之前的故事进行的更新,还有一些是替代之前已经存储的一些故事的。这时需要对整个故事图做一些数据处理步骤。例如,对于建立行为目的模型的处理过程就需要将用户数据和整个故事图进行连接。每5分钟重新运行整个图是不可行的,因为对于适当数量的硬件资源来说在5分钟内运行出结果是不可能的。但是模型创建者不想只是每天更新一次这些模型,因为那意味着会错过一整天的时间来提供机会。
为了应付这个问题,有必要定期地首先对整个图进行连接,例如可以按照天来进行连接。然后,每5分钟一旦有数据进来,就可以立即完成对新进来的数据进行连接操作,同时这个结果是可以和对整个图做连接的结果整合在一起的。这个组合步骤并不容易,因为需要在5分钟内完成对整个图进行插入、更新和删除操作。使用Pig Latin来表达这种组合关系是可以的并且是相当方便的。
目前所说的一切都隐含着一点:Pig(与MapReduce一样)是面向数据批处理的。如果需要处理的是GB或者TB数量级的数据,那么Pig是个不错的选择。但是因为它期望的是序列地读取一个文件中的所有记录然后序列地将输出写入存储中,因此对于那些需要写单条或者少量记录,或者查询随机序列下的多条不同记录这样的任务,Pig(与MapReduce一样)并非是个好选择。关于在这些情况下选用什么样的软件才是合理的更多讨论请查看第12.3节“NoSQL数据库”。
1.1.4 Pig的设计思想
在早期,作为潜在贡献者加入Pig项目的人们并非了解这个项目究竟是关于什么的。他们并不清楚怎样做才是最好的贡献或者哪些贡献会被接受以及哪些不会被接受。因此,Pig团队发布了一个项目设计思想声明,其内容总结为Pig渴望成为:
Pig什么都吃
不管数据是否有元数据,Pig都可以操作。不管数据是关系型的、嵌套型的,或者是非结构化的,Pig也同样可以操作。而且它还可以很容易地通过扩展,不单单可以操作文件,还可以操作key/value型的存储,以及数据库等。
Pig无处不在
Pig期望成为一种并行数据处理语言。它不会局限于是一种特殊的并行处理框架。它首先是基于Hadoop之上的实现,但是我们期望它并非只能在Hadoop平台上使用。
Pig是家畜
Pig被设计为可以让用户很容易地控制和修改的语言。
Pig允许用户随时整合加入他们的代码,因此目前它支持用户自定义字段类型转换函数、用户自定义聚合方法函数和用户定义条件式函数。这些函数可以使用Java来写也可以使用最终可以编译成Java代码的脚本语言(例如Jython)编写。Pig支持用户定义的加载和存储函数。Pig通过自己的stream 命令和需要MapReduce相关的JAR包的mapreduce命令可以执行外部的执行命令。Pig同样允许用户为自己的特定使用场景提供一个用户自定义的分区方法函数,使他们执行的任务在reduce阶段可以达到一个均衡的负荷。
Pig有一个优化器,它可以将Pig Latin脚本中的操作过程进行重新排列以达到更好的性能,例如将MapReduce任务进行合并等。但是,如果对于某种情形下这种优化是不必要的话,用户可以很容易地将最优控制器关闭,这样执行过程就不会发生改变。
Pig会飞
Pig处理数据很快。我们会持续地优化性能,同时不会增加一些使Pig显得较重而降低性能的新功能。