Spark计算过程分析

基本概念



Spark是一个分布式的内存计算框架,其特点是能处理大规模数据,计算速度快。Spark延续了Hadoop的MapReduce计算模型,相比之下Spark的计算过程保持在内存中,减少了硬盘读写,能够将多个操作进行合并后计算,因此提升了计算速度。同时Spark也提供了更丰富的计算API。

MapReduce是Hadoop和Spark的计算模型,其特点是Map和Reduce过程高度可并行化;过程间耦合度低,单个过程的失败后可以重新计算,而不会导致整体失败;最重要的是数据处理中的计算逻辑可以很好的转换为Map和Reduce操作。对于一个数据集来说,Map对每条数据做相同的转换操作,Reduce可以按条件对数据分组,然后在分组上做操作。除了Map和Reduce操作之外,Spark还延伸出了如filter,flatMap,count,distinct等更丰富的操作。

RDD的是Spark中最主要的数据结构,可以直观的认为RDD就是要处理的数据集。RDD是分布式的数据集,每个RDD都支持MapReduce类操作,经过MapReduce操作后会产生新的RDD,而不会修改原有RDD。RDD的数据集是分区的,因此可以把每个数据分区放到不同的分区上进行计算,而实际上大多数MapReduce操作都是在分区上进行计算的。Spark不会把每一个MapReduce操作都发起运算,而是尽量的把操作累计起来一起计算。Spark把操作划分为转换(transformation)和动作(action),对RDD进行的转换操作会叠加起来,直到对RDD进行动作操作时才会发起计算。这种特性也使Spark可以减少中间结果的吞吐,可以快速的进行多次迭代计算。


系统结构



Spark自身只对计算负责,其计算资源的管理和调度由第三方框架来实现。常用的框架有YARN和Mesos。本文以YARN为例进行介绍。先看一下Spark on YARN的系统结构图:

Spark on YARN系统结构图

图中共分为三大部分:Spark Driver, Worker, Cluster manager。其中Driver program负责将RDD转换为任务,并进行任务调度。Worker负责任务的执行。YARN负责计算资源的维护和分配。

Driver可以运行在用户程序中,或者运行在其中一个Worker上。Spark中的每一个应用(Application)对应着一个Driver。这个Driver可以接收RDD上的计算请求,每个动作(Action)类型的操作将被作为一个Job进行计算。Spark会根据RDD的依赖关系构建计算阶段(Stage)的有向无环图,每个阶段有与分区数相同的任务(Task)。这些任务将在每个分区(Partition)上进行计算,任务划分完成后Driver将任务提交到运行于Worker上的Executor中进行计算,并对任务的成功、失败进行记录和重启等处理。

Worker一般对应一台物理机,每个Worker上可以运行多个Executor,每个Executor都是独立的JVM进程,Driver提交的任务就是以线程的形式运行在Executor中的。如果使用YARN作为资源调度框架的话,其中一个Worker上还会有Executor launcher作为YARN的ApplicationMaster,用于向YARN申请计算资源,并启动、监测、重启Executor。


计算过程



这里我们从RDD到输出结果的整个计算过程为主线,探究Spark的计算过程。这个计算过程可以分为:

  1. RDD构建:构建RDD之间的依赖关系,将RDD转换为阶段的有向无环图。
  2. 任务调度:根据空闲计算资源情况进行任务提交,并对任务的运行状态进行监测和处理。
  3. 任务计算:搭建任务运行环境,执行任务并返回任务结果。
  4. Shuffle过程:两个阶段之间有宽依赖时,需要进行Shuffle操作。
  5. 计算结果收集:从每个任务收集并汇总结果。

在这里我们用一个简洁的CharCount程序为例,这个程序把含有a-z字符的列表转化为RDD,对此RDD进行了Map和Reduce操作计算每个字母的频数,最后将结果收集。其代码如下:

CharCount例子程序


RDD构建和转换



RDD按照其作用可以分为两种类型,一种是对数据源的封装,可以把数据源转换为RDD,这种类型的RDD包括NewHadoopRDD,ParallelCollectionRDD,JdbcRDD等。另一种是对RDD的转换,从而实现一种计算方法,这种类型的RDD包括MappedRDD,ShuffledRDD,FilteredRDD等。数据源类型的RDD不依赖于其他RDD,计算类的RDD拥有自己的RDD依赖。

RDD有三个要素:分区,依赖关系,计算逻辑。分区是保证RDD分布式的特性,分区可以对RDD的数据进行划分,划分后的分区可以分布到不同的Executor中,大部分对RDD的计算都是在分区上进行的。依赖关系维护着RDD的计算过程,每个计算类型的RDD在计算时,会将所依赖的RDD作为数据源进行计算。根据一个分区的输出是否被多分区使用,Spark还将依赖分为窄依赖和宽依赖。RDD的计算逻辑是其功能的体现,其计算过程是以所依赖的RDD为数据源进行的。

例子中共产生了三个RDD,除了第一个RDD之外,每个RDD与上级RDD有依赖关系。

  1. spark.parallelize(data, partitionSize)方法将产生一个数据源型的ParallelCollectionRDD,这个RDD的分区是对列表数据的切分,没有上级依赖,计算逻辑是直接返回分区数据。
  2. map函数将会创建一个MappedRDD,其分区与上级依赖相同,会有一个依赖于ParallelCollectionRDD的窄依赖,计算逻辑是对ParallelCollectionRDD的数据做map操作。
  3. reduceByKey函数将会产生一个ShuffledRDD,分区数量与上面的MappedRDD相同,会有一个依赖于MappedRDD的宽依赖,计算逻辑是Shuffle后在分区上的聚合操作。

RDD的依赖关系

Spark在遇到动作类操作时,就会发起计算Job,把RDD转换为任务,并发送任务到Executor上执行。从RDD到任务的转换过程是在DAGScheduler中进行的。其总体思路是根据RDD的依赖关系,把窄依赖合并到一个阶段中,遇到宽依赖则划分出新的阶段,最终形成一个阶段的有向无环图,并根据图的依赖关系先后提交阶段。每个阶段按照分区数量划分为多个任务,最终任务被序列化并提交到Executor上执行。

RDD到Task的构建过程

当RDD的动作类操作被调用时,RDD将调用SparkContext开始提交Job,SparkContext将调用DAGScheduler把RDD转化为阶段的有向无环图,然后首先将有向无环图中没有未完成的依赖的阶段进行提交。在阶段被提交时,每个阶段将产生与分区数量相同的任务,这些任务称之为一个TaskSet。任务的类型分为 ShuffleMapTask和ResultTask,如果阶段的输出将用于下个阶段的输入,也就是需要进行Shuffle操作,则任务类型为ShuffleMapTask。如果阶段的输入即为Job结果,则任务类型为ResultTask。任务创建完成后会交给TaskSchedulerImpl进行TaskSet级别的调度执行。


任务调度



在任务调度的分工上,DAGScheduler负责总体的任务调度,SchedulerBackend负责与Executors通信,维护计算资源信息,并负责将任务序列化并提交到Executor。TaskSetManager负责对一个阶段的任务进行管理,其中会根据任务的数据本地性选择优先提交的任务。TaskSchedulerImpl负责对TaskSet进行调度,通过调度策略确定TaskSet优先级。同时是一个中介者,其将DAGScheduler,SchedulerBackend和TaskSetManager联结起来,对Executor和Task的相关事件进行转发。

在任务提交流程上,DAGScheduler提交TaskSet到TaskSchedulerImpl,使TaskSet在此注册。TaskSchedulerImpl通知SchedulerBackend有新的任务进入,SchedulerBackend调用makeOffers根据注册到自己的Executors信息,确定是否有计算资源执行任务,如有资源则通知TaskSchedulerImpl去分配这些资源。 TaskSchedulerImpl根据TaskSet调度策略优先分配TaskSet接收此资源。TaskSetManager再根据任务的数据本地性,确定提交哪些任务。最终任务的闭包被SchedulerBackend序列化,并传输给Executor进行执行。

Spark的任务调度

根据以上过程,Spark中的任务调度实际上分了三个层次。第一层次是基于阶段的有向无环图进行Stage的调度,第二层次是根据调度策略(FIFO,FAIR)进行TaskSet调度,第三层次是根据数据本地性(Process,Node,Rack)在TaskSet内进行调度。


任务计算



任务的计算过程是在Executor上完成的,Executor监听来自SchedulerBackend的指令,接收到任务时会启动TaskRunner线程进行任务执行。在TaskRunner中首先将任务和相关信息反序列化,然后根据相关信息获取任务所依赖的Jar包和所需文件,完成准备工作后执行任务的run方法,实际上就是执行ShuffleMapTask或ResultTask的run方法。任务执行完毕后将结果发送给Driver进行处理。

在Task.run方法中可以看到ShuffleMapTask和ResultTask有着不同的计算逻辑。ShuffleMapTask是将所依赖RDD的输出写入到ShuffleWriter中,为后面的Shuffle过程做准备。ResultTask是在所依赖RDD上应用一个函数,并返回函数的计算结果。在这两个Task中只能看到数据的输出方式,而看不到应有的计算逻辑。实际上计算过程是包含在RDD中的,调用RDD. Iterator方法获取RDD的数据将触发这个RDD的计算动作(RDD. Iterator),由于此RDD的计算过程中也会使用所依赖RDD的数据。从而RDD的计算过程将递归向上直到一个数据源类型的RDD,再递归向下计算每个RDD的值。需要注意的是,以上的计算过程都是在分区上进行的,而不是整个数据集,计算完成得到的是此分区上的结果,而不是最终结果。

从RDD的计算过程可以看出,RDD的计算过程是包含在RDD的依赖关系中的,只要RDD之间是连续窄依赖,那么多个计算过程就可以在同一个Task中进行计算,中间结果可以立即被下个操作使用,而无需在进程间、节点间、磁盘上进行交换。

RDD计算过程


Shuffle过程



Shuffle是一个对数据进行分组聚合的操作过程,原数据将按照规则进行分组,然后使用一个聚合函数应用于分组上,从而产生新数据。Shuffle操作的目的是把同组数据分配到相同分区上,从而能够在分区上进行聚合计算。为了提高Shuffle性能,还可以先在原分区对数据进行聚合(mapSideCombine),然后再分配部分聚合的数据到新分区,第三步在新分区上再次进行聚合。

在划分阶段时,只有遇到宽依赖才会产生新阶段,才需要Shuffle操作。宽依赖与窄依赖取决于原分区被新分区的使用关系,只要一个原分区会被多个新分区使用,则为宽依赖,需要Shuffle。否则为窄依赖,不需要Shuffle。

以上也就是说只有阶段与阶段之间需要Shuffle,最后一个阶段会输出结果,因此不需要Shuffle。例子中的程序会产生两个阶段,第一个我们简称Map阶段,第二个我们简称Reduce阶段。Shuffle是通过Map阶段的ShuffleMapTask与Reduce阶段的ShuffledRDD配合完成的。其中ShuffleMapTask会把任务的计算结果写入ShuffleWriter,ShuffledRDD从ShuffleReader中读取数据,Shuffle过程会在写入和读取过程中完成。以HashShuffle为例,HashShuffleWriter在写入数据时,会决定是否在原分区做聚合,然后根据数据的Hash值写入相应新分区。HashShuffleReader再根据分区号取出相应数据,然后对数据进行聚合。

Spark的Shuffle过程


计算结果收集



ResultTask任务计算完成后可以得到每个分区的计算结果,此时需要在Driver上对结果进行汇总从而得到最终结果。

RDD在执行collect,count等动作时,会给出两个函数,一个函数在分区上执行,一个函数在分区结果集上执行。例如collect动作在分区上(Executor中)执行将Iterator转换为Array的函数,并将此函数结果返回到Driver。Driver 从多个分区上得到Array类型的分区结果集,然后在结果集上(Driver中)执行合并Array的操作,从而得到最终结果。


总结



Spark对于RDD的设计是其精髓所在。用RDD操作数据的感觉就一个字:爽!。想到RDD背后是几吨重的大数据集,而我们随手调用下map(), reduce()就可以把它转换来转换去,一种半两拨千斤的感觉就会油然而生。我想是以下特性给我们带来了这些:

  1. RDD把不同来源,不同类型的数据进行了统一,使我们面对RDD的时候就会产生一种信心,就会认为这是某种类型的RDD,从而可以进行RDD的所有操作。
  2. 对RDD的操作可以叠加到一起计算,我们不必担心中间结果吞吐对性能的影响。
  3. RDD提供了更丰富的数据集操作函数,这些函数大都是在MapReduce基础上扩充的,使用起来很方便。
  4. RDD为提供了一个简洁的编程界面,背后复杂的分布式计算过程对开发者是透明的。从而能够让我们把关注点更多的放在业务上。
时间: 2024-11-03 20:56:04

Spark计算过程分析的相关文章

spark计算hdfs上的文件时报错

问题描述 spark计算hdfs上的文件时报错 scala> val rdd = sc.textFile("hdfs://...") scala> rdd.count java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$AppendRequestProto overrides final method getUnknownFields.

spark计算mongodb数据,不知是环境的问题还是代码的问题,还没入门大神们帮帮忙啊

问题描述 spark计算mongodb数据,不知是环境的问题还是代码的问题,还没入门大神们帮帮忙啊 spark计算mongodb中的数据,总是计算不出结果,这些错误信息也找不到是为什么, 有一两次能计算出结果 .第一次接触这个东西 大神们帮帮忙啊 主要代码如下: SparkConf sparkConf = new SparkConf(); sparkConf.setMaster(SPARK_PATH); sparkConf.setAppName("Logs_Collect"); Str

《Spark大数据处理:技术、应用与性能优化》——第3章 Spark计算模型3.1 Spark程序模型

第3章 Spark计算模型 创新都是站在巨人的肩膀上产生的,在大数据领域也不例外.微软的Dryad使用DAG执行模式.子任务自由组合的范型.该范型虽稍显复杂,但较为灵活.Pig也针对大关系表的处理提出了很多有创意的处理方式,如flatten.cogroup.经典虽难以突破,但作为后继者的Spark借鉴经典范式并进行创新.经过实践检验,Spark的编程范型在处理大数据时显得简单有效.的数据处理与传输模式也大获全胜.Spark站在巨人的肩膀上,依靠Scala强有力的函数式编程.Actor通信模式.闭

Apache Spark机器学习.1.2 在机器学习中应用Spark计算

1.2 在机器学习中应用Spark计算 基于RDD和内存处理的创新功能,Apache Spark真正使得分布式计算对于数据科学家和机器学习专业人员来说简便易用.Apache Spark团队表示:Apache Spark基于Mesos 集群管理器运行,使其可以与Hadoop以及其他应用共享资源.因此,Apache Spark可以从任何Hadoop输入源(如HDFS)中读取数据.   Apache Spark计算模型非常适合机器学习中的分布式计算.特别是在快速交互式机器学习.并行计算和大型复杂模型情

Spark Shuffle过程分析:Map阶段处理流程

默认配置情况下,Spark在Shuffle过程中会使用SortShuffleManager来管理Shuffle过程中需要的基本组件,以及对RDD各个Partition数据的计算.我们可以在Driver和Executor对应的SparkEnv对象创建过程中看到对应的配置,如下代码所示: // Let the user specify short names for shuffle managers      val shortShuffleMgrNames = Map(        "sort&

spark计算密集型

问题描述 求各位大神帮忙,现在有个程序,大概思想是:我现在有一串数,每个数都有各自影响生成一串新数,然后对这些新数分别计算出一串数据,最后对第二次生成的所有数排序,根据这些排序把第三次生成的数写到文件中,我现在spark流程就是上面说的,请问有什么好的想法可以大大提升速度吗?示例:比如,一开始只有一个数:1(可以有多个数),然后这个数生成一串新数:5,2,6,对5,2,6分别进行计算,比如2,生成数组1.3,4.5:5生成数组5.5,3.7:6生成数组2.3,6.7:然后对5,2,6进行排序,把

spark 计算输出的 rowmatrix 矩阵的存储不正常,求大牛指导

问题描述 在集群上计算会遇到一个问题,进行矩阵的奇异值分解时"M.computeSVD(5000,true,1.0E-9d)"时,A=U*s*V分解的s向量存储,V矩阵存储正常,但是分解的左奇异矩阵U默认为rowmatrix矩阵,这个矩阵存储的(存储代码:U.rows.saveAsTextFile("hdfs://s2:9000/outsvd/big_UUT1"))时候,发现存储结果为为好几部分eg:part-00000.part-00001.part-00002.

大数据计算新贵Spark在腾讯雅虎优酷成功应用解析

Spark作为Apache顶级的开源项目,项目主页见http://spark.apache.org.在迭代计算,交互式查询计算以及批量流计算方面都有相关的子项目,如Shark.Spark Streaming.MLbase.GraphX.SparkR等.从13年起Spark开始举行了自已的Spark Summit会议,会议网址见http://spark-summit.org.Amplab实验室单独成立了独立公司Databricks来支持Spark的研发. 为了满足挖掘分析与交互式实时查询的计算需求

Apache Spark源码走读(六)Task运行期之函数调用关系分析 &存储子系统分析

<一>Task运行期之函数调用关系分析 概要 本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回. 准备 spark已经安装完毕 spark运行在local mode或local-cluster mode local-cluster mode local-cluster模式也称为伪分布式,可以使用如下指令运行 MASTER=local[1,2,1024] bin/spark-shell