[大数据之Spark]——快速入门

本篇文档是介绍如何快速使用spark,首先将会介绍下spark在shell中的交互api,然后展示下如何使用java,scala,python等语言编写应用。可以查看编程指南了解更多的内容。

为了良好的阅读下面的文档,最好是结合实际的练习。首先需要下载spark,然后安装hdfs,可以下载任意版本的hdfs。

Spark Shell 交互

基本操作

Spark Shell提供给用户一个简单的学习API的方式 以及 快速分析数据的工具。在shell中,既可以使用scala(运行在java虚拟机,因此可以使用java库)也可以使用python。可以在spark的bin目录下启动spark shell:

./bin/spark-shell.sh

spark操作对象是一种分布式的数据集合,叫做Resilient Distributed Dataset(RDD)。RDD可以通过hdfs文件创建,也可以通过RDD转换得来。

下面就实际操作下,看看效果。我的本地有个文件——test.txt,内容为:

hello world
haha nihao

可以通过这个文件创建一个新的RDD

val textFile = sc.textFile("test.txt")
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

在Spark中,基于RDD可以作两种操作——Actions算子操作以及Transformations转换操作。

我们可以使用一些算子操作体验下:

scala> textFile.count() //RDD有用的数量
res1: Long = 2

scala> textFile.first() //RDD第一行
res3: String = hello world

再执行一些转换操作,比如使用filter转换,返回一个新的RDD集合:

scala> val lines = textFile.filter(line=>line.contains("hello"))
lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:23

scala> lines.count()
res4: Long = 1

scala> val lines = textFile.filter(line=>line.contains("haha"))
lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:23

scala> lines.count()
res5: Long = 1

scala> lines.first()
res6: String = haha nihao

更多RDD操作

RDD算子和转换可以组成很多复杂的计算,比如我们想找出最多一行中单词最多的单词数量:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

这个操作会把一行通过split切分计数,转变为一个整型的值,然后创建成新的RDD。reduce操作用来寻找单词最多的那一行。

用户可以在任何时候调用方法和库,可以使用Math.max()函数:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一个很常见的数据操作就是map reduce,这个操作在hadoop中很常见。Spark可以轻松的实现Mapreduce任务:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:28

这里使用了flatMap,map以及reduceByKey等转换操作来计算每个单词在文件中的数量。为了在shell中显示,可以使用collect()触发计算:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

缓存

Spark也支持在分布式的环境下基于内存的缓存,这样当数据需要重复使用的时候就很有帮助。比如当需要查找一个很小的hot数据集,或者运行一个类似PageRank的算法。

举个简单的例子,对linesWithSpark RDD数据集进行缓存,然后再调用count()会触发算子操作进行真正的计算,之后再次调用count()就不会再重复的计算,直接使用上一次计算的结果的RDD了:

scala> linesWithSpark.cache()
res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:27

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

看起来缓存一个100行左右的文件很愚蠢,但是如果再非常大的数据集下就非常有用了,尤其是在成百上千的节点中传输RDD计算的结果。你也可以通过bin/spark-shell向集群提交任务,可以参考编程指南

独立应用

要使用spark api写一个自己的应用也很简单,可以基于scala、java、python去写一些简单的应用。

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

注意应用需要定义main()方法。这个程序仅仅是统计文件中包含字符ab的分别都有多少行。你可以设置YOUR_SPARK_HOME替换自己的文件目录。不像之前在shell中的例子那样,我们需要自己初始化sparkContext。

通过SparkConf构造方法创建SparkContext。

应用依赖于spark api,因此需要在程序中配置sbt的配置文件——simple.sbt,它声明了spark的依赖关系。

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"

为了让sbt正确的工作,还需要创建SimpleApp.scala以及simple.sbt。然后就可以执行打包命令,通过spark-submit运行了:

# Your directory layout should look like this 你的工程目录应该向下面这样
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application 运行sbt命令进行打包
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application 通过spark-submit提交任务jar包
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23

其他地址

通过上面的例子,就可以运行起来自己的Spark应用了。

那么可以参考下面的链接获得更多的内容:

  • 为了更深入的学习,可以阅读Spark编程指南
  • 如果想要运行Spark集群,可以参考部署指南
  • 最后,Spark在examples目录中内置了多种语言版本的例子,如scala,java,python,r等等。你可以通过下面的命令运行:
# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R

本文转自博客园xingoo的博客,原文链接:[大数据之Spark]——快速入门,如需转载请自行联系原博主。

时间: 2024-10-24 01:07:15

[大数据之Spark]——快速入门的相关文章

[大数据之Spark]——Actions算子操作入门实例

Actions reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. 这个方法会传入两个参数,计算这两个参数返回一个

[大数据之Spark]——Transformations转换入门经典实例

Spark相比于Mapreduce的一大优势就是提供了很多的方法,可以直接使用:另一个优势就是执行速度快,这要得益于DAG的调度,想要理解这个调度规则,还要理解函数之间的依赖关系. 本篇就着重描述下Spark提供的Transformations方法. 依赖关系 宽依赖和窄依赖 窄依赖(narrow dependencies) 窄依赖是指父RDD仅仅被一个子RDD所使用,子RDD的每个分区依赖于常数个父分区(O(1),与数据规模无关). 输入输出一对一的算子,且结果RDD的分区结构不变.主要是ma

【阿里云产品公测】大数据下精确快速搜索OpenSearch

相信做过一两个项目的人都会遇到上级要求做一个类似百度或者谷歌的站内搜索功能.传统的sql查询只能使用like 或者FIND_IN_SET来实现.后者性能稍微好点但是必须要逗号分隔才可以实现匹配.甚至多条件的话还可能用到OR这是极影响系统性能的.        最近公司项目需要.主要是系统查询缓慢.并且查询精度不敢恭维.一开始想到的是Lucene 毕竟是一个开放源代码的全文检索引擎工具包 并且官方还在持续更新中.当时闲暇时间大概搞了将近一个星期的时间.索引的增删查改以及中文分词IKAnalyzer

【Spark Summit East 2017】2017年大数据与Spark的发展趋势

本讲义出自Matei Zaharia在Spark Summit East 2017上的演讲,主要介绍了2016年以及2017年大数据与Spark技术的未来的汇合的发展趋势以及Databricks对于使Spark与像深度学习库这样的原生代码能够更好地进行交互所做的工作.

浪潮势不可挡:大数据有望迎来快速发展期

日前,<促进大数据发展行动纲要>正式发布,大数据有望迎来快速发展期.对此,信而富首席风险官吕宇良认为,这对信而富以及包括互联网金融在内的众多行业都绝对称得上是一个重大利好.纲要不仅高屋建瓴,明确了国家在大数据发展方面的总体目标.指导思想.总体战略,而且就如何推进数据资源共享开放.统筹规划大数据基础设施建设.公共服务大数据工程.各行业大数据工程.大数据关键技术及支撑研发.安全保障等方方面面都进行了非常完善和细致的规划. 众所周知,互联网金融的一个重要目标是为广大民众,尤其是目前还未被央行征信体系

政策频出 大数据产业步入快速发展期

文章讲的是政策频出 大数据产业步入快速发展期,随着互联网基础设施的完善和相关分析技术的成熟,大数据产业正步入快速发展期,并持续获得政策力挺. 22日,国务院新闻办公室就2015年上半年工业通信业发展情况举行发布会,工业和信息化部新闻发言人.总工程师张峰在新闻发布会上表示,工信部在下一步贯彻"互联网+"指导意见中,将重点推进组织实施智能制造重大工程,推进重要工业云.工业领域大数据中心建设.值得注意的是,工信部将出台互联网与工业融合.服务型制造.工业云和大数据等指导意见,加快法律法规制定修

《Spark 官方文档》Spark快速入门

快速入门 本教程是对Spark的一个快速简介.首先,我们通过Spark的交互式shell介绍一下API(主要是Python或Scala),然后展示一下如何用Java.Scala.Python写一个Spark应用.更完整参考看这里:programming guide 首先,请到Spark website下载一个Spark发布版本,以便后续方便学习.我们暂时还不会用到HDFS,所以你可以使用任何版本的Hadoop.   使用Spark shell交互式分析 基础 利用Spark shell 很容易学

我国云计算和大数据均进入快速发展阶段

在本届通信展期间举办的"2017中国云计算与大数据产业发展大会"上,工业和信息化部信息通信发展司处长黄业晶表示,云计算.大数据作为近年来信息通信领域发展最迅速的产业之一,对国民经济和社会发展的支撑推动作用正在日益显现.在政策和市场的双重驱动下,我国的云计算和大数据产业双双进入快速发展的阶段. 他表示,2015年国务院出台了<关于促进云计算创新发展 培育信息产业新业态的意见>和<促进大数据发展行动纲要>等重要的文件,对于我国云计算和大数据产业发展提出了相关的指导原

大数据竞赛平台——Kaggle入门篇

这篇文章适合那些刚接触Kaggle.想尽快熟悉Kaggle并且独立完成一个竞赛项目的网友,对于已经在Kaggle上参赛过的网友来说,大可不必耗费时间阅读本文.本文分为两部分介绍Kaggle,第一部分简单介绍Kaggle,第二部分将展示解决一个竞赛项目的全过程.如有错误,请指正! 1.Kaggle简介 Kaggle是一个数据分析的竞赛平台,网址:https://www.kaggle.com/ 企业或者研究者可以将数据.问题描述.期望的指标发布到Kaggle上,以竞赛的形式向广大的数据科学家征集解决