Spark - A Fault-Tolerant Abstraction for In-Memory Cluster Computing

Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing

 

为什么需要Spark?

当前已经有比较多的compute framework 
比如, Hadoop用于batch分析, 全量分析 
Storm用于streaming分析 
但是这些场景, 数据都是只需要使用一次, 不需要反复使用, 对于数据需要被反复多次使用的场景, 现有的framework都无法很好的handle, 对于hadoop大量磁盘I/O, 对于storm会有大量的网络I/O

Spark解决的核心问题, 怎样高效的data reuse? 
主要针对两个场景, 
iterative算法, 很多机器学习和图的算法都需要迭代的, 比如K-means, PageRank 
interactive数据挖掘和分析 
Data reuse is common in many iterative machine learning and graph algorithms, including PageRank, K-means clustering, and logistic regression. 
Another compelling use case is interactive data mining, where a user runs multiple adhoc queries on the same subset of the data.

Unfortunately, in most current frameworks, the only way to reuse data between computations (e.g., between two MapReduce jobs) is to write it to an external stable storage system, e.g., a distributed file system. This incurs substantial overheads due to data replication, disk I/O, and serialization serialization, which can dominate application execution times.

 

之前的研究和工作, 但都是仅仅针对某个领域或问题, 没有通用的方案 
Recognizing this problem, researchers have developed specialized frameworks for some applications that require data reuse 
For example, Pregel [22] is a system for iterative graph computations that keeps intermediate data in memory 
HaLoop [7] offers an iterative MapReduce interface.

所以Spark通过定义RDD来提供一种通用的高效的数据重用方案 
In this paper, we propose a new abstraction called resilient distributed datasets (RDDs) that enables efficient data reuse in a broad range of applications. 
RDDs are fault-tolerant, parallel data structures that let users explicitly persist intermediate results in memory, control their partitioning to optimize data placement, and manipulate them using a rich set of operators.

 

Resilient Distributed Datasets (RDDs)

RDD特性

RDD is represents a read-only collection of objects partitioned across a set of machines that can be rebuilt if a partition is lost.

1. RDD是分布式的数据集, 并且是只读的, 只可以以两种方式创建, 从distributed file(HDFS, S3)创建, 和从其他RDD transformation

2. RDD不支持fine-grained随机写, 只支持Coarse-grained的transform, ex, map, filter, join 
    这看上去限制非常的严格, RDD只支持很有限的几种操作, 但是当前RDD只需要支持mapreduce的计算模型, 所以其实也是够用的 
    但这却是RDD最大的特性,  易于failover, 支持部分恢复

优势: 
    a. More efficient fault tolerance, avoid overhead of checkpointing, as they can be recovered using lineage
    b. Recomputed in parallel fail partition, without having to roll back the whole program with checkpoint

3. RDD不需要materialized 
   由于RDD只支持粗粒度的transform, 不支持随机对数据细粒度的write, 所以不需要象一般的分布式memory那样需要做类似复本或checkpoint之类的materialized 
   因为可以简单记录下, 从raw data到RDD的transform history, 称为lineage, 如果发生fail, 可以用lineage简单replay出RDD, 所以不需要materialized

4. RDDs are lazy and ephemeral 
   当定义RDD的时候, 并不会真正的产生数据 
   只有在action中, 真正需要这些数据的时候, RDD才会被产生 
   并且默认是, discarded from memory after use 

5. RDD persisitence, 关键特性, 支持cache 
   Cache, 将RDD在第一次使用后保存在memory里面, 以便于后面反复使用, 并当memory不够时, 会将部分RDD spill到磁盘, 牺牲效率来保证可用性 
   Disk persistence, 可以通过设置persist flag来将选择将RDD persist到disk 
   用户定义RDD spill优先级, set a persistence priority on each RDD to specify which in-memory data should spill to disk first

对于persistence特性的设计目标,   
   let users trade off between the cost of storing an RDD, the speed of accessing it, the probability of losing part of it, and the cost of recomputing it.

6. RDD可自定义partition的方式 
   They can also ask that an RDD’s elements be partitioned across machines based on a key in each record. 
   This is useful for placement optimizations, such as ensuring that two datasets that will be joined together are hash-partitioned in the same way.

   比如对于PageRank, 为了optimize communication specify a partitioning for links (e.g., hash-partition the link lists by URL across nodes)

  links = spark.textFile(...).map(...).partitionBy(myPartFunc).persist() //partition后必须要persist,否则无意义.如果persist数据丢失, 恢复也比较麻烦, 无法local恢复

7. RDD具有data locality, 最方便的case是通过mesos重用HDFS的集群 
   RDDs can schedule tasks based on data locality to improve performance

8. RDD通过backpu node来缓解慢节点问题, 同hadoop speculator设计 
   RDDs is that their immutable nature lets a system mitigate slow nodes (stragglers) by running backup copies of slow tasks as in MapReduce

9. RDD的操作分为transform和action 
   RDD是lazy的, 所以通过transform定义RDD并不会实际产生数据. action才是真正的数据产生点, 只有在action中真正用到RDD的时候, RDD才会被生成.

Actions, which are operations that return a value to the application or export data to a storage system.

 

RDD VS. DSM

To understand the benefits of RDDs as a distributed memory abstraction, we compare them against distributed shared memory (DSM).

In DSM systems, applications read and write to arbitrary locations in a global address space. 
DSM is a very general abstraction, but this generality makes it harder to implement in an efficient and faulttolerant manner on commodity clusters. 
分布式共享内存, 这是个非常general的概念, general和高效本身就是矛盾的, 所以对于DSM很难达到高效和易容错

其实这个比较有些不合理, 两者只是同时使用distributed memory, 其他没有太多相同的地方 
DSM是数据库模型, RDD是MapReduce模型 
Mysql和Hadoop的相同点...都使用磁盘...没人会抱怨mysql不具有data locality, straggler mitigation…

 

 

Applications Not Suitable for RDDs

RDDs are best suited for batch applications that apply the same operation to all elements of a dataset. 
RDDs can efficiently remember each transformation as one step in a lineage graph and can recover lost partitions without having to log large amounts of data.

RDDs would be less suitable for applications that make asynchronous finegrained updates to shared state, such as a storage system for a web application or an incremental web crawler.

其实, 很简单的问题, 如果适合Mapreduce模型的batch分析问题, 并且需要解决数据高效重用性问题, 就使用RDD 
如果随机读写的case, 你需要的是内存数据库 
如果是实时streaming分析的case, 那看看storm

 

 

 

Spark Programming Interface

Spark provides the RDD abstraction through a languageintegrated API similar to DryadLINQ [31] in Scala.

Developers write a driver program that connects to a cluster of workers. 
The driver defines one or more RDDs and invokes actions on them, also tracks the RDDs’ lineage. 
The workers are long-lived processes that can store RDD partitions in RAM across operations.

 

 

RDD Operations in Spark

RDD transformations and actions available in Spark.

Transformations are lazy operations that define a new RDD 
Actions launch a computation to return a value to the program or write data to external storage.

 

Example: Console Log Mining

Suppose that a web service is experiencing errors and an operator wants to search terabytes of logs in the Hadoop filesystem (HDFS) to find the cause.

Log分析的例子, 下图显示了RDD的transform的过程, 
每个方框都代表, 一个RDD, 箭头代表, tansform的过程

具体看看下面的代码例子, 这是个典型的interactive analysis的例子 
前面提到过, RDD是lazy and ephemeral 
所以在action之前, 只有RDD的定义存在 
并且RDD数据是临时的, 不会保存, 用完就丢弃了 
但对于这个代码case, 这样效率有问题, 因为actions里面, 需要对errors数据多次查询和操作, 所谓的interactive 
所以需要加上cache, 这样第一遍使用完后, RDD会被buffer在memory里面

//RDD
lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))

//Cache
errors.persist()

//Actions
errors.count()
errors.filter(_.contains("MySQL")).count()
errors.filter(_.contains("HDFS")).map(_.split('\t')(3)).collect()

Example: Logistic Regression(逻辑回归)

Many machine learning algorithms are iterative in nature because they run iterative optimization procedures, such as gradient descent, to maximize a function. 
They can thus run much faster by keeping their data in memory.

典型的iterative analysis的例子, 需要不断迭代以达到最优化, 如k-means算法 
A common classification algorithm that searches for a hyperplane (超平面) w that best separates two sets of points (e.g., spam and non-spam emails). 
The algorithm uses gradient descent (梯度下降): it starts w at a random value, and on each iteration, it sums a function of w over the data to move w in a direction that improves it.

val points = spark.textFile(...).map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
    val gradient = points.map{ p => p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y}.reduce((a,b) => a+b)
    w -= gradient
}

 

Representing RDDs

One of the challenges in providing RDDs as an abstraction is choosing a representation for them that can track lineage across a wide range of transformations.

如何定义和表示RDD, RDD由什么部分组成?

In a nutshell, we propose representing each RDD through a common interface that exposes five pieces of information:

1. a set of partitions, which are atomic pieces of the dataset; 一系列数据分区 
2. partitions data placement; 数据分区所存放的location 
3. a set of dependencies on parent RDDs; 所依赖的parent RDD 
4. a function for computing the dataset based on its parents; RDD的transform逻辑(如何从parent RDD转换到当前RDD) 
5. metadata about its partitioning scheme; 划分规则, 基于hash或range 

对应于上面的组成部分, 提供如下的common interface 
这些interface对RDD做了很好的封装和抽象, 可以利用这些接口很简单的实现RDD的transform 
 

Represent dependencies between RDDs

Narrow dependencies, where each partition of the parent RDD is used by at most one partition of the child RDD

Wide dependencies, where multiple child partitions may depend on it

依赖关系之所以重要, 由于以下两点

First, narrow dependencies allow for pipelined execution on one cluster node, which can compute all the parent partitions. For example, one can apply a map followed by a filter on an 
element-by-element basis. In contrast, wide dependencies require data from all parent partitions to be available and to be shuffled across the nodes using a MapReducelike operation.

Second, recovery after a node failure is more efficient with a narrow dependency, as only the lost parent partitions need to be recomputed, and they can be recomputed in parallel on different nodes. In contrast, in a lineage graph with wide dependencies, a single failed node might cause the loss of some partition from all the ancestors of an RDD, requiring a complete re-execution.

Narrow依赖, 比如map, 是可以在单节点上完成的, 非常高效; 而wide依赖, 需要多个节点的数据, 需要数据迁移, shuffle, 进行类似reduce的操作 
Narrow依赖, 易于failover, fail后只需要重新replay出该partition. 而wide依赖, 比较复杂, 一个partition fail, 需要恢复所有他的parent RDD. 
所以面对wide依赖的case, RDD的优越性要大大打折扣.

从下图可见, 每个空心大框表示RDD, 内部小的实心框表示partition 
map, filter典型的narrow依赖 
union, 多个RDD合并成一个RDD, 可见对于每个partition仍是narrow关系 
Join, 通常都是wide关系, 但如果在生成两个RDD时, 对join key使用相同的partitioner, 保证join时两个RDD中具有相同key的数据都在一个节点上, 这样就是narrow关系 
GroupbyKey, wide关系

 

 

Implementation

We have implemented Spark in about 14,000 lines of Scala. 
The system runs over the Mesos cluster manager, allowing it to share resources with Hadoop, MPI and other applications.

Job Scheduling

Spark’s scheduler uses our representation of RDDs

Overall, our scheduler is similar to Dryad’s [19], but it additionally takes into account which partitions of persistent RDDs are available in memory.

Whenever a user runs an action (e.g., count or save) on an RDD, the scheduler examines that RDD’s lineage graph to build a DAG of stages to execute.

Each stage contains as many pipelined transformations with narrow dependencies as possible. The boundaries of the stages are the shuffle operations required for wide dependencies, or any already computed partitions that can shortcircuit the computation of a parent RDD.

The scheduler then launches tasks to compute missing partitions from each stage until it has computed the target RDD.

Scheduler首先把job lineage转化为如下的stages, 
所谓stage就是最大narrow依赖集合, 为什么要划分stage? stage是可以并发的在单节点上独立执行的 
类似map/reduce, stage类似map, 首先需要所有map执行完, 才能开始reduce. 
同时spark的persisitence特性, 对于已经在memory里面cache的数据无需从新计算

Scheduler考虑data locality, 数据cache的节点, 或是preferred locations (HDFS的block所在节点)

 

Interpreter Integration

Scala includes an interactive shell similar to those of Ruby and Python. 
Given the low latencies attained with in-memory data, we wanted to let users run Spark interactively from the interpreter to query big datasets.

We found the Spark interpreter to be useful in processing large traces obtained as part of our research and exploring datasets stored in HDFS. 
We also plan to use to run higher-level query languages interactively, e.g., SQL.

 

Memory Management

Spark provides three options for storage of persistent RDDs:

in-memory storage as deserialized Java objects, in-memory storage as serialized data, and on-disk storage.

 

Support for Checkpointing

Although lineage can always be used to recover RDDs after a failure, such recovery may be time-consuming for RDDs with long lineage chains. 
In general, checkpointing is useful for RDDs with long lineage graphs containing wide dependencies, such as the rank datasets in our PageRank example.

本文章摘自博客园,原文发布日期:2013-03-30

时间: 2024-09-20 17:40:04

Spark - A Fault-Tolerant Abstraction for In-Memory Cluster Computing的相关文章

分布式系统编程,你到哪一级了?

介绍 当分布式系统编程成为你生活中的一部分时,你需要经历一段学习曲线.这篇文章描述了一下我当前在这个领域大致属于哪个层次,并希望能为你指出足够多 的错误,从别人的错误中学习,从而使你能以最优的路径通向成功.先声明一下,我在1995年时达到第1级,我现在处于第3级.你自己属于哪一级呢? 第0级:完全一无所知 每个程序员都从这一级开始.我不会在此浪费太多口舌,因为这实在没什么太多可说的.相反,我会引用一些我曾经经历过的对话,为从未接触过分布式系统的开发者们提供一些建议. 对话1: NN:"在分布式系

Data Processing with SMACK: Spark, Mesos, Akka, Cassandra, and Kafka

Data Processing with SMACK: Spark, Mesos, Akka, Cassandra, and Kafka This article introduces the SMACK (Spark, Mesos, Akka, Cassandra, and Kafka) stack and illustrates how you can use it to build scalable data processing platforms While the SMACK sta

Spark RDDs(弹性分布式数据集):为内存中的集群计算设计的容错抽象

本文是阅读<Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing>过程中,抽了局部一些关注点翻译出来的文章,没有将全文都翻译.希望这些碎片化甚至不通顺的记录,可以帮助读者取代阅读原论文. 论文地址http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf 第一节 主要介绍了现有的集群计算框架存在的问题,

Spark:大数据的电花火石!

什么是Spark?可能你很多年前就使用过Spark,反正当年我四六级单词都是用的星火系列,没错,星火系列的洋名就是Spark. 当然这里说的Spark指的是Apache Spark,Apache Sparkis a fast and general engine for large-scale data processing: 一种快速通用可扩展的数据分析引擎.如果想要搞清楚Spark是什么,那么我们需要知道它解决了什么问题,还有是怎么解决这些问题的.   Spark解决了什么问题? 在这里不得

Apache Spark源码走读(一)Spark论文阅读笔记&amp;Job提交与运行

<一>Spark论文阅读笔记 楔子 源码阅读是一件非常容易的事,也是一件非常难的事.容易的是代码就在那里,一打开就可以看到.难的是要通过代码明白作者当初为什么要这样设计,设计之初要解决的主要问题是什么. 在对Spark的源码进行具体的走读之前,如果想要快速对Spark的有一个整体性的认识,阅读Matei Zaharia做的Spark论文是一个非常不错的选择. 在阅读该论文的基础之上,再结合Spark作者在2012 Developer Meetup上做的演讲Introduction to Spa

Spark的现状与未来发展

Spark的发展 对于一个具有相当技术门槛与复杂度的平台,Spark从诞生到正式版本的成熟,经历的时间如此之短,让人感到惊诧.2009年,Spark诞生于伯克利大学AMPLab,最开初属于伯克利大学的研究性项目.它于2010年正式开源,并于2013年成为了Aparch基金项目,并于2014年成为Aparch基金的顶级项目,整个过程不到五年时间. 由于Spark出自伯克利大学,使其在整个发展过程中都烙上了学术研究的标记,对于一个在数据科学领域的平台而言,这也是题中应有之义,它甚至决定了Spark的

Apache Flink fault tolerance源码剖析(五)

上一篇文章我们谈论了保存点的相关内容,其中就谈到了保存点状态的存储.这篇文章我们来探讨用户程序状态的存储,也是在之前的文章中多次提及的state backend(中文暂译为状态终端). 基于数据流API而编写的程序经常以各种各样的形式保存着状态: 窗口收集/聚合元素(这里的元素可以看作是窗口的状态)直到它们被触发 转换函数可能会使用key/value状态接口来存储数据 转换函数可能实现Checkpointed接口来让它们的本地变量受益于fault tolerant机制 当检查点机制工作时,上面谈

Spark On YARN内存分配

本文主要了解Spark On YARN部署模式下的内存分配情况,因为没有深入研究Spark的源代码,所以只能根据日志去看相关的源代码,从而了解"为什么会这样,为什么会那样". 说明 按照Spark应用程序中的driver分布方式不同,Spark on YARN有两种模式: yarn-client模式.yarn-cluster模式. 当在YARN上运行Spark作业,每个Spark executor作为一个YARN容器运行.Spark可以使得多个Tasks在同一个容器里面运行. 下图是y

Spark技术内幕之任务调度:从SparkContext开始

SparkContext是开发Spark应用的入口,它负责和整个集群的交互,包括创建RDD,accumulators and broadcast variables.理解Spark的架构,需要从这个入口开始.下图是官网的架构图. DriverProgram就是用户提交的程序,这里边定义了SparkContext的实例.SparkContext定义在core/src/main/scala/org/apache/spark/SparkContext.scala. Spark默认的构造函数接受org.