Spark入门:实现WordCount的3种方式

WordCount作为Spark的入门任务,可以很简单,也可以做到比较复杂。 本文从实现功能的角度提出了3种实现方式,至于性能影响,会在后文继续讨论。

注意: 本文使用的Spark版本还是1.6.1.如果读者您已经切换到2.0+版本,请参考GitHub spark的官方例子进行学习。 因为2.0版本的API与1.X 并不能完全兼容,特别是2.0开始使用了SparkSession的概念,而不是SparkContext!

第一种方式:mapToPair + reduceByKey

这是官方提供的实现方式,应该也是网上能找到的最多的例子。

官网地址: http://spark.apache.org/examples.html

核心代码:


  1. JavaRDD<String> textFile = sc.textFile("hdfs://..."); 
  2.  
  3. JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() { 
  4.  
  5. public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } 
  6.  
  7. }); 
  8.  
  9. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { 
  10.  
  11. public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } 
  12.  
  13. }); 
  14.  
  15. JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { 
  16.  
  17. public Integer call(Integer a, Integer b) { return a + b; } 
  18.  
  19. }); 
  20.  
  21. counts.saveAsTextFile("hdfs://..."); 

总结上面的步骤:

  1. flatmap : 将一整段文字映射成一个字符串数组
  2. mapToPair: 将word 映射成 (word, 1)
  3. reduceByKey: 按照key进行group and plus的操作, 得到最终结果
  4. collect: 这是Action,上面3个都是Transformation

第二种方式:使用countByValue代替mapToPair + reduceByKey

核心代码:


  1. JavaRDD<String> textFile = sc.textFile("hdfs://..."); 
  2.  
  3. JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() { 
  4.  
  5. public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } 
  6.  
  7. }); 
  8.  
  9. Map<String, Long> counts = words.countByValue(); 

读文件、flatmap这两步都是完全一样的,但是后面直接一个countByValue就搞定了,并且还直接collect到本地了,是不是感觉这一种实现方式更简洁了呢?

至于性能,一般来说这种方式还不错,但是这种方式有一些缺点,参考StackOverFlow的描述:

网址: http://stackoverflow.com/questions/25318153/spark-rdd-aggregate-vs-rdd-reducebykey

countByValue would be the fastest way to do this, however its implementation uses hash maps and merges them so if you have a large amount of data this approach may not scale well (especially when you consider how many issues spark already has with memory). You may want to use the standard way of counting in map reduce which would be to map the line and 1 as pairs then reduceBykey like this:

简单的说,这种方式是使用hash的方式进行merge。 如果处理的数据量比较大的时候,效果可能不怎么好。

注意: 这种方式的性能笔者确实还没有亲自实践过!

第三种方式:AggregateByKey

AggregateByKey 这个方法,可以看做是reduceByKey的增强版,因为reduceByKey的输出类型与输入类型要求是完全一致的。比如wordcount 之中的输入是Tuple2<String, Integer> 输出也同样要求是Tuple2<String,Integer>. 但是AggregateByKey的输出类型可以是不一样的数据类型。 参考下面的代码:


  1. val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D") 
  2.  
  3. val data = sc.parallelize(keysWithValuesList) 
  4.  
  5. //Create key value pairs 
  6.  
  7. val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache() 
  8.  
  9. val initialCount = 0; 
  10.  
  11. val addToCounts = (n: Int, v: String) => n + 1 
  12.  
  13. val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2 
  14.  
  15. val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts) 

输出:


  1. Aggregate By Key sum Results 
  2.  
  3. bar -> 3 
  4.  
  5. foo -> 5 

可以看到,输入是<String, String> 而输出变成了<String, Integer>

注意: 这种方法,并不是处理WordCount的最好的选择,只是说明我们可以使用AggregateByKey这种方式来实现相同的功能

其实还有另外一种实现方式: 使用DataFrame。 但是这种方式需要前期的准备比较多,即如何将数据处理并喂给DataFrame。

一般来说,DataFrame的效率相比其他的RDD的实现方式要高不少,如果在前期准备工作上面难度不是太大的话,非常推荐使用DataFrame的方式。

本文作者:rangerwolf

来源:51CTO

时间: 2024-08-17 06:52:32

Spark入门:实现WordCount的3种方式的相关文章

React.js入门实例教程之创建hello world 的5种方式_javascript技巧

一.ReactJS简介 React 是近期非常热门的一个前端开发框架.React 起源于 Facebook 的内部项目,因为该公司对市场上所有 JavaScript MVC 框架,都不满意,就决定自己写一套,用来架设 Instagram 的网站.做出来以后,发现这套东西很好用,就在2013年5月开源了.由于 React 的设计思想极其独特,属于革命性创新,性能出众,代码逻辑却非常简单.所以,越来越多的人开始关注和使用,认为它可能是将来 Web 开发的主流工具. ReactJS官网地址:http:

thinkphp 3.2.3 入门示例2(URL传参数的几种方式)

原文:thinkphp中URL传参数的几种方式 在thinkphp中,url传参合asp.net中原理类似,下面就单个参数和多个参数传递方式进行一个简单讲解 1.传单个参数  单个参数这种比较简单,例如 想像edit操作里面传递一个id值,如下写法__URL__/edit/id/1 http://localhost/index.php/user/edit/id/1 id和其值1要分别位于/后面 后台获取id通过    $id=$_GET['id']   即可获取其具体值. 2.传多个参数 传多个

Flume直接到SparkStreaming的两种方式

Flume直接到SparkStreaming的两种方式,一般是flume->kafka->SparkStreaming,如果非要从Flume直接将数据输送到SparkStreaming里面有两种方式,如下: 第一种:Push推送的方式 程序如下: package cn.lijie import org.apache.log4j.Level import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.str

Spark修炼之道(进阶篇)——Spark入门到精通:第八节 Spark SQL与DataFrame(一)

本节主要内宾 Spark SQL简介 DataFrame 1. Spark SQL简介 Spark SQL是Spark的五大核心模块之一,用于在Spark平台之上处理结构化数据,利用Spark SQL可以构建大数据平台上的数据仓库,它具有如下特点: (1)能够无缝地将SQL语句集成到Spark应用程序当中 (2)统一的数据访问方式 DataFrames and SQL provide a common way to access a variety of data sources, includ

Spark入门

http://spark.incubator.apache.org/ http://spark.incubator.apache.org/documentation.html http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html, 非常好的hand-on exercises 源码分析 http://jerryshao.me/archive.html http://www.cnblogs.com/jerr

Spark源码分析:多种部署方式之间的区别与联系(1)

<http://www.aliyun.com/zixun/aggregation/13383.html">Spark源码分析:多种部署方式之间的区别与联系(1)> <Spark源码分析:多种部署方式之间的区别与联系(2)> 从官方的文档我们可以知道,Spark的部署方式有很多种:local.Standalone.Mesos.YARN-..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来看,其实流程都差不多. 从代码中,我们可以得知其实Spark的部署

用Jsp来实现文件下载功能的几种方式

1.最直接最简单的,方式是把文件地址直接放到html页面的一个链接中.这样做的缺点是把文件在服务器上的路径暴露了,并且还无法对文件下载进行其它的控制(如权限).这个就不写示例了. 2.在服务器端把文件转换成输出流,写入到response,以response把文件带到浏览器,由浏览器来提示用户是否愿意保存文件到本地.(示例如下) <%response.setContentType(fileminitype);response.setHeader("Location",filenam

Mysql 查看端口号的几种方式

链接: http://blog.itpub.net/blog/post/id/1592460/ 标题: MySQL 查看端口的几种方式 作者:lōττéry版权所有[文章允许转载,但必须以链接方式注明源地址,否则追究法律责任.] 注释:    今天通过"Navicat for MySQL"工具链接生产环境数据库时,需要输入 mysql"端口"号,所以找到了几种 查看mysql端口的方法,特此整理下提供参考.   默认端口 3306:    OS层 ***** ps

FTP文件传输协议两种方式的工作原理

FTP是一种文件传输协议,它支持两种模式,一种方式叫做Standard (也就是 Active,主动方式),一种是 Passive (也就是PASV,被动方式). Standard模式 FTP的客户端发送 PORT 命令到FTP server.Passive模式FTP的客户端发送 PASV命令到 FTP Server. 下面介绍一个这两种方式的工作原理: Standard模式 FTP 客户端首先和FTP Server的TCP 21端口建立连接,通过这个通道发送命令,客户端需要接收数据的时候在这个