Spark-Caching /Checkpointing

功能:

cacheing和checkpointing这2种操作是都是用来防止rdd(弹性分布式数据集)每次被引用时被重复计算带来的时间和空间上不必要的损失。

区别:

Caching
cache 机制保证了需要访问重复数据的应用(如迭代型算法和交互式应用)可以运行的更快。有多种级别的持久化策略让开发者选择,使开发者能够对空间和计算成本进行权衡,同时能指定out of memory时对rdd的操作(缓存在内存或者磁盘,并且可以指定在内存不够的情况下按照FIFO的策略选取一部分block交换到磁盘来产生空余空间)。因此Spark不但可以对rdd重复计算还能在节点发生故障时重新计算丢失的分区。最后,被缓存的rdd存在于一个running的应用的生命周期内,如果这个应用终止了,那么缓存的rdd也会同时被删除。
Checkpointing
checkpointing把rdd存储到一个可靠的存储系统(例如HDFS,S3)。checkpoint一个rdd有点类似于Hadoop中把中间计算结果存储到磁盘,损失部分执行性能来获得更好的从运行过程中出现failures时recover的能力。因为rdd是checkpoint在外部的存储系统(磁盘,HDFS,S3等),所以checkpoint过的rdd能够被其他的应用重用。

caching和checkpointing的联系

由rdd的计算路径来了解caching和checkpointing的相互作用。 Spark engine的核心是DAGScheduler。它把一个spark job分解成由若干个stages组成的DAG。每一个shuffle或者result stage再分解成一个个在RDD的分区中独立运行的task。一个RDD的iterator方法是一个task访问基础数据分区的入口:

/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
        getOrCompute(split, context)
    } else {
        computeOrReadCheckpoint(split, context)
    }
}

如果设置了存储级别,表明rdd可能被缓存,它首先尝试调用getOrCompute方法从block manager中得到分区。

/**
* Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.
*/
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
        val blockId = RDDBlockId(id, partition.index)
        var readCachedBlock = true
        // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
        SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
        readCachedBlock = false
        computeOrReadCheckpoint(partition, context)
    }) match {
        case Left(blockResult) =>
        if (readCachedBlock) {
            val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
                existingMetrics.incBytesReadInternal(blockResult.bytes)
            new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
                override def next(): T = {
                            existingMetrics.incRecordsReadInternal(1)
                            delegate.next()
                }
            }
        } else {
            new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
        }
        case Right(iter) =>
            new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
        }
}
如果block manager中没有这个rdd的分区,那么它就去computeOrReadCheckpoint:
/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
    if (isCheckpointedAndMaterialized) {
        firstParent[T].iterator(split, context)
    } else {
        compute(split, context)
    }
}

=computeOrReadCheckpoint这个方法会从checkpoint中寻找对应的数据,如果rdd没有被checkpoint,那么就从当前计算的分区开始计算。
cache 机制是每计算出一个要 cache 的 partition 就直接将其 cache 到内存了。但是checkpoint 没有使用这种第一次计算得到就存储的方法,而是等到 job 结束后另外启动专门的 job 去完成 checkpoint 。也就是说需要 checkpoint 的 RDD 会被计算两次。
因此,在使用 rdd.checkpoint() 的时候,建议加上 rdd.cache(),这样第二次运行的 job 就不用再去计算该 rdd 了,直接读取 cache 写磁盘。其实 Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 这样的方法,相当于 cache 到磁盘上,这样可以做到 rdd 第一次被计算得到时就存储到磁盘上,但这个 persist 和 checkpoint 有很多不同。前者虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。
一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。
而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的,也就是说可以被下一个 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。

总结

使用checkpoint*会消耗更多的时间在rdd的读写*上(因为要使用外部存储系统HDFS,S3,或者磁盘),但是Spark worker的一些failures不一定导致重新计算。
另一方面,caching的rdd 不会永久占用存储空间,但是重新计算在Spark worker出现一些failures的时候是必要的。
综上,这2个都是取决于开发者自己的角度结合业务场景来使用,一般情况下,综合计算任务的性能来进行2者的选择(大部分情况用cache就够了,如果感觉 job 可能会出错可以手动去 checkpoint 一些 critical 的 RDD)。

时间: 2024-08-19 23:49:21

Spark-Caching /Checkpointing的相关文章

Spark SQL性能优化

性能优化参数 针对Spark SQL 性能调优参数如下: 代码示例 import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.Row; import org.a

Spark RDDs(弹性分布式数据集):为内存中的集群计算设计的容错抽象

本文是阅读<Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing>过程中,抽了局部一些关注点翻译出来的文章,没有将全文都翻译.希望这些碎片化甚至不通顺的记录,可以帮助读者取代阅读原论文. 论文地址http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf 第一节 主要介绍了现有的集群计算框架存在的问题,

Spark Streaming原理简析

执行流程 数据的接收 StreamingContext实例化的时候,需要传入一个SparkContext,然后指定要连接的spark matser url,即连接一个spark engine,用于获得executor. 实例化之后,首先,要指定一个接收数据的方式,如 val lines = ssc.socketTextStream("localhost", 9999) 这样从socket接收文本数据.这个步骤返回的是一个ReceiverInputDStream的实现,内含Receive

Apache Spark源码走读(一)Spark论文阅读笔记&amp;Job提交与运行

<一>Spark论文阅读笔记 楔子 源码阅读是一件非常容易的事,也是一件非常难的事.容易的是代码就在那里,一打开就可以看到.难的是要通过代码明白作者当初为什么要这样设计,设计之初要解决的主要问题是什么. 在对Spark的源码进行具体的走读之前,如果想要快速对Spark的有一个整体性的认识,阅读Matei Zaharia做的Spark论文是一个非常不错的选择. 在阅读该论文的基础之上,再结合Spark作者在2012 Developer Meetup上做的演讲Introduction to Spa

《Spark官方文档》Spark Streaming编程指南(一)

Spark Streaming编程指南 概览   Spark Streaming是对核心Spark API的一个扩展,它能够实现对实时数据流的流式处理,并具有很好的可扩展性.高吞吐量和容错性.Spark Streaming支持从多种数据源提取数据,如:Kafka.Flume.Twitter.ZeroMQ.Kinesis以及TCP套接字,并且可以提供一些高级API来表达复杂的处理算法,如:map.reduce.join和window等.最后,Spark Streaming支持将处理完的数据推送到文

《循序渐进学Spark 》Spark 编程模型

本节书摘来自华章出版社<循序渐进学Spark >一书中的第1章,第3节,作者 小象学院 杨 磊,更多章节内容可以访问"华章计算机"公众号查看. Spark机制原理 本书前面几章分别介绍了Spark的生态系统.Spark运行模式及Spark的核心概念RDD和基本算子操作等重要基础知识.本章重点讲解Spark的主要机制原理,因为这是Spark程序得以高效执行的核心.本章先从Application.job.stage和task等层次阐述Spark的调度逻辑,并且介绍FIFO.FA

《Spark官方文档》Spark Streaming编程指南(二)

累加器和广播变量 首先需要注意的是,累加器(Accumulators)和广播变量(Broadcast variables)是无法从Spark Streaming的检查点中恢复回来的.所以如果你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,因为这样累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化.代码示例如下: Scala Java Python object WordBlacklist { @volatile private var ins

Spark Streaming Programming Guide

参考,http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html  Overview SparkStreaming支持多种流输入,like Kafka, Flume, Twitter, ZeroMQ or plain old TCP sockets,并且可以在上面进行transform操作,最终数据存入HDFS,数据库或dashboard 另外可以把Spark's in-built machine

Spark Streaming 数据清理机制

前言 为啥要了解机制呢?这就好比JVM的垃圾回收,虽然JVM的垃圾回收已经巨牛了,但是依然会遇到很多和它相关的case导致系统运行不正常. 这个内容我记得自己刚接触Spark Streaming的时候,老板也问过我,运行期间会保留多少个RDD? 当时没回答出来.后面在群里也有人问到了,所以就整理了下.文中如有谬误之处,还望指出. DStream 和 RDD 我们知道Spark Streaming 计算还是基于Spark Core的,Spark Core 的核心又是RDD. 所以Spark Str