《Spark官方文档》Spark操作指南

原文链接   译者:小村长

Spark–Quick Start

本项目是 Apache Spark官方文档的中文翻译版,致力于打造一个全新的大数据处理平台来满足大数据处理和分析的各个使用场景,本次翻译主要针对对Spark感兴趣和致力于从事大数据方法开发的人员提供有价值的中文资料,希望能够对大家的工作和学习有所帮助。

Spark最近几年在国内外都比较火,在淘宝、百度、腾讯、高伟达等一些公司有比较成熟的应用,做大数据方面的开发人员或多或少都与其有接触。Spark的中文资料相对前几年相对较多,但是我认为官方文档才是最好最完美的学习资料,今天让小村长为你揭开Spark的神秘面纱,一同走进Spark的精神世界。

本向导提供了Spark的简单介绍和快速入门,我第一次通过Spark shell来介绍Python 和 Scala API的使用,然后向你们展示怎么通过Java、Python和Scala开发一个Spark应用程序,你可以通过查看开发向导了解更多开发细节。

为了更快地学习这篇使用指南,首先你需要下载一个Spark安装包,由于我们并没有使用HDFS,所以你可以下载任何兼容Hadoop版本的安装包。

Spark Shell的使用

基础部分

Spark’s shell 提供了一种学习API的简单方法, 也是一个强大的数据分析工具.它可以通过Scala(它是一种运行在Java虚拟机上并且能够使用Java库的编程语言 )或者Python来操作. 在Spark 目录下运行以下脚本:
./bin/spark-shell (Scala)
./bin/pyspark (Python)
Spark主要的抽象是一个分布式数据集(RDD). RDDs 能够通过Hadoop的InputFormats (例如HDFS文件)或者通过其他的RDD转换生成.现在让我们用Spark源码目录中README 文件的文本内容生成一个新的RDD:

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

RDDs 有很多actions(我翻译成操作因子), 通过转换操作能生成一个新的RDD,让我们来进行一些actions操作:

scala> textFile.count() // Number of items in this RDD
res0: Long = 126

scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark

现在让我们使用transformation(翻译成转换因子).我们将使用 filter 操作来返回一个新的RDD数据集.

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

我们能够通过链式编程把transformations 操作和 actions操作连接一起:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

RDD更多操作

RDD的 actions 和transformations 能够使用更多复杂的操作. 我们能够获取更多操作通过下面链接:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

第一个map函数统计一行单词个数,创建一个先的RDD. reduce函数是寻找单词最多的某一行所含单词个数 .map和reduce函数里面的参数称为函数自变量(闭包),它能够使用Scala或者Java的任何特性. 例如,我们很轻松的在任何地方调用和定义函数 . 我们可以使用 Math.max()函数是代码看起来更简洁更加容易理解:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一个普通的数据处理流程是MapReduce,这也是Hadoop中很普通的数据处理方式. Spark 继承MapReduce数据处理流程并是它更加容易:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

现在,我们通过 flatMap map函数来进行合并操作 并且通过reduceByKey 转换函数来统计每个单词的个数并生成一个RDD键值对. 统计单词的数量可以通过 collect 函数来实行:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

缓存

Spark也支持把一个数据集放到一个集群的缓存里面.当数据多次访问它很有用,例如当你查询一个常用的数据集或者运行一个 PageRank算法的时候. 举一个简单的例子, 让我们把 linesWithSpark 数据集缓存起来:

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

它可能是不明智的用Spark浏览和缓存一个100行的数据. 这个部分通常运用在大的数据集上, 当集群有十个甚至成千上百个节点的时候. 你也能够通过 bin/spark-shell来连接集群, 更多的描述请看 programming guide.

独立应用开发

假如我希望开发一个独立的应用通过Spark API. 我们可以通过 Scala (with sbt), Java (with Maven), and Python来调用开发Spark API.

现在我们创建一个简单的Spark应用,实际上, 它的名字是 SimpleApp.scala:

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

注意一个应用应该定义在一个 main()函数里面或者通过继承scala.Appscala.App的子类可能不能正确的工作.

这个程序仅仅是统计README文件中包含‘a’和‘b’的数量.注意你需要替换你的本地的YOUR_SPARK_HOME 环境变量. 不同于前面的Spark shell例子, 它需要初始化SparkContext, 我们需要初始化 SparkContext作为程序运行的一部分.

我们需要创建一个 SparkConf 对象,它包含了我们应用所包含的信息.

我们的应用依赖Spark API, 所以我们也包含sbt 配置文件, simple.sbt, 它包含了Spark的一些依赖. 这个文件增加一个仓库,而这些仓库是Spark运行不可缺少的一部分:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.2"

为了使 sbt能够正确的工作, 我们需要布局SimpleApp.scalasimple.sbt按计划执行这种类型结构.一旦配置正确, 我们能够创建一个JAR包包含这个应用的代码, 然后使用 spark-submit 来运行我们的程序.

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23

下一章

配置运行你的第一个Spark程序!

  • 想深入的了解SparkAPI, 可以打开 Spark programming guide, 或者看“Programming Guides” 菜单了解其他组件.
  • 如果想在集群上面运行引用, 可以点击 deployment overview了解.
  • 最后, Spark包含几个简单的实例 (Scala, Java, Python, R). 你可以按照下面的实例运行他们:
# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
时间: 2024-08-04 07:42:22

《Spark官方文档》Spark操作指南的相关文章

《Spark 官方文档》

Spark是一个高效的分布式计算系统,本文是Spark官方文档的翻译. 编程指南: 快速入门 编程指南 在Spark里构建模块 Spark Streaming编程 Spark SQL, DataFrames 以及 Datasets 编程指南 机器学习库MLlib GraphX: Spark's new API for graph processing API文档: Spark Scala API (Scaladoc) Spark Java API (Javadoc) Spark Python A

《Spark官方文档》Spark Streaming编程指南(一)

Spark Streaming编程指南 概览   Spark Streaming是对核心Spark API的一个扩展,它能够实现对实时数据流的流式处理,并具有很好的可扩展性.高吞吐量和容错性.Spark Streaming支持从多种数据源提取数据,如:Kafka.Flume.Twitter.ZeroMQ.Kinesis以及TCP套接字,并且可以提供一些高级API来表达复杂的处理算法,如:map.reduce.join和window等.最后,Spark Streaming支持将处理完的数据推送到文

《Spark 官方文档》Spark编程指南

Spark编程指南 概述 总体上来说,每个Spark应用都包含一个驱动器(driver)程序,驱动器运行用户的main函数,并在集群上执行各种并行操作. Spark最重要的一个抽象概念就是弹性分布式数据集(resilient distributed dataset – RDD),RDD是一个可分区的元素集合,其包含的元素可以分布在集群各个节点上,并且可以执行一些分布式并行操作.RDD通常是通过,HDFS(或者其他Hadoop支持的文件系统)上的文件,或者驱动器中的Scala集合对象,来创建或转换

《Spark官方文档》Spark Streaming编程指南(二)

累加器和广播变量 首先需要注意的是,累加器(Accumulators)和广播变量(Broadcast variables)是无法从Spark Streaming的检查点中恢复回来的.所以如果你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,因为这样累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化.代码示例如下: Scala Java Python object WordBlacklist { @volatile private var ins

《Spark 官方文档》在Mesos上运行Spark

在Mesos上运行Spark Spark可以在由Apache Mesos 管理的硬件集群中运行. 在Mesos集群中使用Spark的主要优势有: 可以在Spark以及其他框架(frameworks)之间动态划分资源. 可以同时部署多个Spark实例,且各个实例间的资源分配可以调整. 工作原理 在独立部署的Spark集群中,下图里的Cluster Manager代表Spark master.然而,在Mesos集群中,Mesos master将取代Spark master在下图中的地位. 如果一个S

《Spark 官方文档》在YARN上运行Spark

在YARN上运行Spark 对 YARN (Hadoop NextGen) 的支持是从Spark-0.6.0开始的,后续的版本也一直持续在改进. 在YARN上启动 首先确保 HADOOP_CONF_DIR 或者 YARN_CONF_DIR 变量指向一个包含Hadoop集群客户端配置文件的目录.这些配置用于读写HDFS和连接YARN资源管理器(ResourceManager).这些配置应该发布到YARN集群上所有的节点,这样所有的YARN容器才能使用同样的配置.如果这些配置引用了Java系统属性或

《Spark 官方文档》Spark安全性

Spark安全性 Spark目前已经支持以共享秘钥的方式进行身份认证.开启身份认证配置参数为 spark.authenticate .这个配置参数决定了Spark通讯协议是否使用共享秘钥做身份验证.验证过程就是一个基本的握手过程,确保通讯双方都有相同的秘钥并且可以互相通信.如果共享秘钥不同,双方是不允许通信的.共享秘钥可用以下方式创建: 对于以YARN 方式部署的Spark,将 spark.authenticate 设为true可以自动生成并分发共享秘钥.每个Spark应用会使用唯一的共享秘钥.

《Spark官方文档》集群模式概览

Spark 1.6.0  译者:dlbrant 集群模式概览 本文简要描述了Spark在集群中各个组件如何运行.想了解如何在集群中启动Spark应用,请参考application submission guide . 组件 Spark应用在集群上运行时,包括了多个独立的进程,这些进程之间通过你的主程序(也叫作驱动器,即:driver)中的SparkContext对象来进行协调. 特别要指出的是,SparkContext能与多种集群管理器通信(包括:Spark独立部署时自带的集群管理器,Mesos

《Spark 官方文档》Spark配置(一)

Spark配置 Spark有以下三种方式修改配置: Spark properties (Spark属性)可以控制绝大多数应用程序参数,而且既可以通过 SparkConf 对象来设置,也可以通过Java系统属性来设置. Environment variables (环境变量)可以指定一些各个机器相关的设置,如IP地址,其设置方法是写在每台机器上的conf/spark-env.sh中. Logging (日志)可以通过log4j.properties配置日志. Spark属性 Spark属性可以控制