Spark Shuffle过程分析:Map阶段处理流程

默认配置情况下,Spark在Shuffle过程中会使用SortShuffleManager来管理Shuffle过程中需要的基本组件,以及对RDD各个Partition数据的计算。我们可以在Driver和Executor对应的SparkEnv对象创建过程中看到对应的配置,如下代码所示:


  1. // Let the user specify short names for shuffle managers 
  2.     val shortShuffleMgrNames = Map( 
  3.       "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, 
  4.       "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) 
  5.     val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") 
  6.     val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) 
  7.     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) 

如果需要修改ShuffleManager实现,则只需要修改配置项spark.shuffle.manager即可,默认支持sort和 tungsten-sort,可以指定自己实现的ShuffleManager类。

因为Shuffle过程中需要将Map结果数据输出到文件,所以需要通过注册一个ShuffleHandle来获取到一个ShuffleWriter对象,通过它来控制Map阶段记录数据输出的行为。其中,ShuffleHandle包含了如下基本信息:

  • shuffleId:标识Shuffle过程的唯一ID
  • numMaps:RDD对应的Partitioner指定的Partition的个数,也就是ShuffleMapTask输出的Partition个数
  • dependency:RDD对应的依赖ShuffleDependency

下面我们看下,在SortShuffleManager中是如何注册Shuffle的,代码如下所示:


  1. override def registerShuffle[K, V, C]( 
  2.       shuffleId: Int, 
  3.       numMaps: Int, 
  4.       dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { 
  5.     if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) { 
  6.       new BypassMergeSortShuffleHandle[K, V]( 
  7.         shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 
  8.     } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { 
  9.       new SerializedShuffleHandle[K, V]( 
  10.         shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 
  11.     } else { 
  12.       new BaseShuffleHandle(shuffleId, numMaps, dependency) 
  13.     } 
  14.   } 

上面代码中,对应如下3种ShuffleHandle可以选择,说明如下:

  • BypassMergeSortShuffleHandle

如果dependency不需要进行Map Side Combine,并且RDD对应的ShuffleDependency中的Partitioner设置的Partition的数量(这个不要和parent RDD的Partition个数混淆,Partitioner指定了map处理结果的Partition个数,每个Partition数据会在Shuffle过程中全部被拉取而拷贝到下游的某个Executor端)小于等于配置参数spark.shuffle.sort.bypassMergeThreshold的值,则会注册BypassMergeSortShuffleHandle。默认情况下,spark.shuffle.sort.bypassMergeThreshold的取值是200,这种情况下会直接将对RDD的 map处理结果的各个Partition数据写入文件,并最后做一个合并处理。

  • SerializedShuffleHandle

如果ShuffleDependency中的Serializer,允许对将要输出数据对象进行排序后,再执行序列化写入到文件,则会选择创建一个SerializedShuffleHandle。

  • BaseShuffleHandle

除了上面两种ShuffleHandle以后,其他情况都会创建一个BaseShuffleHandle对象,它会以反序列化的格式处理Shuffle输出数据。

Map阶段处理流程分析

Map阶段RDD的计算,对应ShuffleMapTask这个实现类,它最终会在每个Executor上启动运行,每个ShuffleMapTask处理RDD的一个Partition的数据。这个过程的核心处理逻辑,代码如下所示:


  1. val manager = SparkEnv.get.shuffleManager 
  2.       writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 
  3.       writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) 

上面代码中,在调用rdd的iterator()方法时,会根据RDD实现类的compute方法指定的处理逻辑对数据进行处理,当然,如果该Partition对应的数据已经处理过并存储在MemoryStore或DiskStore,直接通过BlockManager获取到对应的Block数据,而无需每次需要时重新计算。然后,write()方法会将已经处理过的Partition数据输出到磁盘文件。

在Spark Shuffle过程中,每个ShuffleMapTask会通过配置的ShuffleManager实现类对应的ShuffleManager对象(实际上是在SparkEnv中创建),根据已经注册的ShuffleHandle,获取到对应的ShuffleWriter对象,然后通过ShuffleWriter对象将Partition数据写入内存或文件。所以,接下来我们可能关心每一种ShuffleHandle对应的ShuffleWriter的行为,可以看到SortShuffleManager中获取到ShuffleWriter的实现代码,如下所示:


  1. /** Get a writer for a given partition. Called on executors by map tasks. */ 
  2.   override def getWriter[K, V]( 
  3.       handle: ShuffleHandle, 
  4.       mapId: Int, 
  5.       context: TaskContext): ShuffleWriter[K, V] = { 
  6.     numMapsForShuffle.putIfAbsent( 
  7.       handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) 
  8.     val env = SparkEnv.get 
  9.     handle match { 
  10.       case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => 
  11.         new UnsafeShuffleWriter( 
  12.           env.blockManager, 
  13.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 
  14.           context.taskMemoryManager(), 
  15.           unsafeShuffleHandle, 
  16.           mapId, 
  17.           context, 
  18.           env.conf) 
  19.       case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => 
  20.         new BypassMergeSortShuffleWriter( 
  21.           env.blockManager, 
  22.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 
  23.           bypassMergeSortHandle, 
  24.           mapId, 
  25.           context, 
  26.           env.conf) 
  27.       case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => 
  28.         new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) 
  29.     } 
  30.   } 

我们以最简单的SortShuffleWriter为例进行分析,在SortShuffleManager可以通过getWriter()方法创建一个SortShuffleWriter对象,然后在ShuffleMapTask中调用SortShuffleWriter对象的write()方法处理Map输出的记录数据,write()方法的处理代码,如下所示:


  1. /** Write a bunch of records to this task's output */ 
  2.   override def write(records: Iterator[Product2[K, V]]): Unit = { 
  3.     sorter = if (dep.mapSideCombine) { 
  4.       require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") 
  5.       new ExternalSorter[K, V, C]( 
  6.         context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) 
  7.     } else { 
  8.       // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't 
  9.       // care whether the keys get sorted in each partition; that will be done on the reduce side 
  10.       // if the operation being run is sortByKey. 
  11.       new ExternalSorter[K, V, V]( 
  12.         context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) 
  13.     } 
  14.     sorter.insertAll(records) 
  15.  
  16.     // Don't bother including the time to open the merged output file in the shuffle write time, 
  17.     // because it just opens a single file, so is typically too fast to measure accurately 
  18.     // (see SPARK-3570). 
  19.     val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) 
  20.     val tmp = Utils.tempFileWith(output) 
  21.     val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) 
  22.     val partitionLengths = sorter.writePartitionedFile(blockId, tmp) 
  23.     shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) 
  24.     mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) 
  25.   } 

从SortShuffleWriter类中的write()方法可以看到,最终调用了ExeternalSorter的insertAll()方法,实现了Map端RDD某个Partition数据处理并输出到内存或磁盘文件,这也是处理Map阶段输出记录数据最核心、最复杂的过程。我们将其分为两个阶段进行分析:第一阶段是,ExeternalSorter的insertAll()方法处理过程,将记录数据Spill到磁盘文件;第二阶段是,执行完insertAll()方法之后的处理逻辑,创建Shuffle Block数据文件及其索引文件。

内存缓冲写记录数据并Spill到磁盘文件

查看SortShuffleWriter类的write()方法可以看到,在内存中缓存记录数据的数据结构有两种:一种是Buffer,对应的实现类PartitionedPairBuffer,设置mapSideCombine=false时会使用该结构;另一种是Map,对应的实现类是PartitionedAppendOnlyMap,设置mapSideCombine=false时会使用该结构。根据是否指定mapSideCombine选项,分别对应不同的处理流程,我们分别说明如下:

设置mapSideCombine=false时

这种情况在Map阶段不进行Combine操作,在内存中缓存记录数据会使用PartitionedPairBuffer这种数据结构来缓存、排序记录数据,它是一个Append-only Buffer,仅支持向Buffer中追加数据键值对记录,PartitionedPairBuffer的结构如下图所示:

默认情况下,PartitionedPairBuffer初始分配的存储容量为capacity = initialCapacity = 64,实际上这个容量是针对key的容量,因为要存储的是键值对记录数据,所以实际存储键值对的容量为2*initialCapacity = 128。PartitionedPairBuffer是一个能够动态扩充容量的Buffer,内部使用一个一维数组来存储键值对,每次扩容结果为当前Buffer容量的2倍,即2*capacity,最大支持存储2^31-1个键值对记录(1073741823个)。

通过上图可以看到,PartitionedPairBuffer存储的键值对记录数据,键是(partition, key)这样一个Tuple,值是对应的数据value,而且curSize是用来跟踪写入Buffer中的记录的,key在Buffer中的索引位置为2*curSize,value的索引位置为2*curSize+1,可见一个键值对的key和value的存储在PartitionedPairBuffer内部的数组中是相邻的。

使用PartitionedPairBuffer缓存键值对记录数据,通过跟踪实际写入到Buffer内的记录数据的字节数来判断,是否需要将Buffer中的数据Spill到磁盘文件,如下代码所示:


  1. protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { 
  2.     var shouldSpill = false 
  3.     if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { 
  4.       // Claim up to double our current memory from the shuffle memory pool 
  5.       val amountToRequest = 2 * currentMemory - myMemoryThreshold 
  6.       val granted = acquireMemory(amountToRequest) 
  7.       myMemoryThreshold += granted 
  8.       // If we were granted too little memory to grow further (either tryToAcquire returned 0, 
  9.       // or we already had more memory than myMemoryThreshold), spill the current collection 
  10.       shouldSpill = currentMemory >= myMemoryThreshold 
  11.     } 
  12.     shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold 
  13.     // Actually spill 
  14.     if (shouldSpill) { 
  15.       _spillCount += 1 
  16.       logSpillage(currentMemory) 
  17.       spill(collection) 
  18.       _elementsRead = 0 
  19.       _memoryBytesSpilled += currentMemory 
  20.       releaseMemory() 
  21.     } 
  22.     shouldSpill 
  23.   } 

上面elementsRead表示存储到PartitionedPairBuffer中的记录数,currentMemory是对Buffer中的总记录数据大小(字节数)的估算,myMemoryThreshold通过配置项spark.shuffle.spill.initialMemoryThreshold来进行设置的,默认值为5 * 1024 * 1024 = 5M。当满足条件elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold时,会先尝试向MemoryManager申请2 * currentMemory – myMemoryThreshold大小的内存,如果能够申请到,则不进行Spill操作,而是继续向Buffer中存储数据,否则就会调用spill()方法将Buffer中数据输出到磁盘文件。

向PartitionedPairBuffer中写入记录数据,以及满足条件Spill记录数据到磁盘文件,具体处理流程,如下图所示:

为了查看按照怎样的规则进行排序,我们看一下,当不进行Map Side Combine时,创建ExternalSorter对象的代码如下所示:


  1. // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't 
  2.       // care whether the keys get sorted in each partition; that will be done on the reduce side 
  3.       // if the operation being run is sortByKey. 
  4.       new ExternalSorter[K, V, V]( 
  5.         context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) 

上面aggregator = None,ordering = None,在对PartitionedPairBuffer中的记录数据Spill到磁盘之前,要使用默认的排序规则进行排序,排序的规则是只对PartitionedPairBuffer中的记录按Partition ID进行升序排序,可以查看WritablePartitionedPairCollection伴生对象类的代码(其中PartitionedPairBuffer类实现了特质WritablePartitionedPairCollection),如下所示:


  1. /** 
  2.    * A comparator for (Int, K) pairs that orders them by only their partition ID. 
  3.    */ 
  4.   def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] { 
  5.     override def compare(a: (Int, K), b: (Int, K)): Int = { 
  6.       a._1 - b._1 
  7.     } 
  8.   } 

上面图中,引用了SortShuffleWriter.writeBlockFiles这个子序列图,用来生成Block数据文件和索引文件,后面我们会单独说明。通过对RDD进行计算生成一个记录迭代器对象,通过该迭代器迭代出的记录会存储到PartitionedPairBuffer中,当满足Spill条件时,先对PartitionedPairBuffer中记录进行排序,最后Spill到磁盘文件,这个过程中PartitionedPairBuffer中的记录数据的变化情况,如下图所示:

上图中,对内存中PartitionedPairBuffer中的记录按照Partition ID进行排序,并且属于同一个Partition的数据记录在PartitionedPairBuffer内部的data数组中是连续的。排序结束后,在Spill到磁盘文件时,将对应的Partition ID去掉了,只在文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中连续存储键值对数据,但同时在另一个内存数组结构中会保存文件中每个Partition拥有的记录数,这样就能根据Partition的记录数来顺序读取文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中属于同一个Partition的全部记录数据。

ExternalSorter类内部维护了一个SpillFile的ArrayBuffer数组,最终可能会生成多个SpillFile,SpillFile的定义如下所示:


  1. private[this] case class SpilledFile( 
  2.     file: File, 
  3.     blockId: BlockId, 
  4.     serializerBatchSizes: Array[Long], 
  5.     elementsPerPartition: Array[Long]) 

每个SpillFile包含一个blockId,标识Map输出的该临时文件;serializerBatchSizes表示每次批量写入到文件的Object的数量,默认为10000,由配置项spark.shuffle.spill.batchSize来控制;elementsPerPartition表示每个Partition中的Object的数量。调用ExternalSorter的insertAll()方法,最终可能有如下3种情况:

  • Map阶段输出记录数较少,没有生成SpillFile,那么所有数据都在Buffer中,直接对Buffer中记录排序并输出到文件
  • Map阶段输出记录数较多,生成多个SpillFile,同时Buffer中也有部分记录数据
  • Map阶段输出记录数较多,只生成多个SpillFile
  • 有关后续如何对上面3种情况进行处理,可以想见后面对子序列图SortShuffleWriter.writeBlockFiles的说明。
  • 设置mapSideCombine=true时

这种情况在Map阶段会执行Combine操作,在Map阶段进行Combine操作能够降低Map阶段数据记录的总数,从而降低Shuffle过程中数据的跨网络拷贝传输。这时,RDD对应的ShuffleDependency需要设置一个Aggregator用来执行Combine操作,可以看下Aggregator类声明,代码如下所示:


  1. /** 
  2.  * :: DeveloperApi :: 
  3.  * A set of functions used to aggregate data. 
  4.  * 
  5.  * @param createCombiner function to create the initial value of the aggregation. 
  6.  * @param mergeValue function to merge a new value into the aggregation result. 
  7.  * @param mergeCombiners function to merge outputs from multiple mergeValue function. 
  8.  */ 
  9. @DeveloperApi 
  10. case class Aggregator[K, V, C] ( 
  11.     createCombiner: V => C, 
  12.     mergeValue: (C, V) => C, 
  13.     mergeCombiners: (C, C) => C) { 
  14.   ... ... 

由于在Map阶段只用到了构造Aggregator的几个函数参数createCombiner、mergeValue、mergeCombiners,我们对这几个函数详细说明如下:

  • createCombiner:进行Aggregation开始时,需要设置初始值。因为在Aggregation过程中使用了类似Map的内存数据结构来管理键值对,每次加入前会先查看Map内存结构中是否存在Key对应的Value,第一次肯定不存在,所以首次将某个Key的Value加入到Map内存结构中时,Key在Map内存结构中第一次有了Value。
  • mergeValue:某个Key已经在Map结构中存在Value,后续某次又遇到相同的Key和一个新的Value,这时需要通过该函数,将旧Value和新Value进行合并,根据Key检索能够得到合并后的新Value。
  • mergeCombiners:一个Map内存结构中Key和Value是由mergeValue生成的,那么在向Map中插入数据,肯定会遇到Map使用容量达到上限,这时需要将记录数据Spill到磁盘文件,那么多个Spill输出的磁盘文件中可能存在同一个Key,这时需要对多个Spill输出的磁盘文件中的Key的多个Value进行合并,这时需要使用mergeCombiners函数进行处理。

该类中定义了combineValuesByKey、combineValuesByKey、combineCombinersByKey,由于这些函数是在Reduce阶段使用的,所以在这里先不说明,后续文章我们会单独详细来分析。

我们通过下面的序列图来描述,需要进行Map Side Combine时的处理流程,如下所示:

对照上图,我们看一下,当需要进行Map Side Combine时,对应的ExternalSorter类insertAll()方法中的处理逻辑,代码如下所示:


  1. val shouldCombine = aggregator.isDefined 
  2.  
  3.     if (shouldCombine) { 
  4.       // Combine values in-memory first using our AppendOnlyMap 
  5.       val mergeValue = aggregator.get.mergeValue 
  6.       val createCombiner = aggregator.get.createCombiner 
  7.       var kv: Product2[K, V] = null 
  8.       val update = (hadValue: Boolean, oldValue: C) => { 
  9.         if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) 
  10.       } 
  11.       while (records.hasNext) { 
  12.         addElementsRead() 
  13.         kv = records.next() 
  14.         map.changeValue((getPartition(kv._1), kv._1), update) 
  15.         maybeSpillCollection(usingMap = true) 
  16.       } 
  17.     } 

上面代码中,map是内存数据结构,最重要的是update函数和map的changeValue方法(这里的map对应的实现类是PartitionedAppendOnlyMap)。update函数所做的工作,其实就是对createCombiner和mergeValue这两个函数的使用,第一次遇到一个Key调用createCombiner函数处理,非首次遇到同一个Key对应新的Value调用mergeValue函数进行合并处理。map的changeValue方法主要是将Key和Value在map中存储或者进行修改(对出现的同一个Key的多个Value进行合并,并将合并后的新Value替换旧Value)。

PartitionedAppendOnlyMap是一个经过优化的哈希表,它支持向map中追加数据,以及修改Key对应的Value,但是不支持删除某个Key及其对应的Value。它能够支持的存储容量是0.7 * 2 ^ 29 = 375809638。当达到指定存储容量或者指定限制,就会将map中记录数据Spill到磁盘文件,这个过程和前面的类似,不再累述。

创建Shuffle Block数据文件及其索引文件

无论是使用PartitionedPairBuffer,还是使用PartitionedAppendOnlyMap,当需要容量满足Spill条件时,都会将该内存结构(buffer/map)中记录数据Spill到磁盘文件,所以Spill到磁盘文件的格式是相同的。对于后续Block数据文件和索引文件的生成逻辑也是相同,如下图所示:

假设,我们生成的Shuffle Block文件对应各个参数为:shuffleId=2901,mapId=11825,reduceId=0,这里reduceId是一个NOOP_REDUCE_ID,表示与DiskStore进行磁盘I/O交互操作,而DiskStore期望对应一个(map, reduce)对,但是对于排序的Shuffle输出,通常Reducer拉取数据后只生成一个文件(Reduce文件),所以这里默认reduceId为0。经过上图的处理流程,可以生成一个.data文件,也就是Block数据文件;一个.index文件,也就是包含了各个Partition在数据文件中的偏移位置的索引文件。这个过程生成的文件,示例如下所示:


  1. shuffle_2901_11825_0.data  
  2. shuffle_2901_11825_0.index 

这样,对于每个RDD的多个Partition进行处理后,都会生成对应的数据文件和索引文件,后续在Reduce端就可以读取这些Block文件,这些记录数据在文件中都是经过分区(Partitioned)的。

本文作者:时延军

来源:51CTO

时间: 2024-11-08 20:26:07

Spark Shuffle过程分析:Map阶段处理流程的相关文章

Spark Shuffle Write阶段磁盘文件分析

前言 上篇写了 Spark Shuffle 内存分析后,有不少人提出了疑问,大家也对如何落文件挺感兴趣的,所以这篇文章会详细介绍,Sort Based Shuffle Write 阶段是如何进行落磁盘的 流程分析. 入口处: org.apache.spark.scheduler.ShuffleMapTask.runTask runTask对应的代码为: val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any

Spark Shuffle模块——Suffle Read过程分析

在阅读本文之前,请先阅读Spark Sort Based Shuffle内存分析 Spark Shuffle Read调用栈如下: 1. org.apache.spark.rdd.ShuffledRDD#compute() 2. org.apache.spark.shuffle.ShuffleManager#getReader() 3. org.apache.spark.shuffle.hash.HashShuffleReader#read() 4. org.apache.spark.stora

Spark计算过程分析

基本概念 Spark是一个分布式的内存计算框架,其特点是能处理大规模数据,计算速度快.Spark延续了Hadoop的MapReduce计算模型,相比之下Spark的计算过程保持在内存中,减少了硬盘读写,能够将多个操作进行合并后计算,因此提升了计算速度.同时Spark也提供了更丰富的计算API. MapReduce是Hadoop和Spark的计算模型,其特点是Map和Reduce过程高度可并行化:过程间耦合度低,单个过程的失败后可以重新计算,而不会导致整体失败:最重要的是数据处理中的计算逻辑可以很

Hive map阶段缓慢,优化过程详细分析

背景 同事写了这样一段HQL(涉及公司数据,表名由假名替换,语句与真实场景略有不同,但不影响分析): CREATE TABLE tmp AS  SELECT         t1.exk,         t1.exv,         M.makename AS m_makename,         S.makename AS s_makename,  FROM    (SELECT            exk,            exv     FROM xx.xxx_log    

Spark sc.textFile(...).map(...).count() 执行完整流程

引子 今天正好有人在群里问到相关的问题,不过他的原始问题是: 我在RDD里面看到很多  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)),但是我找不到context是从哪里来的 另外还有pid,iter都是哪来的呢? 如果你照着源码点进去你会很困惑.为莫名其妙怎么就有了这些iterator呢? Transform 和Action的来源 一般刚接触Spark 的同学,都会被告知这两个概念.Trans

Spark Streaming + Spark SQL 实现配置化ETL流程

项目地址 前言 传统的Spark Streaming程序需要: 构建StreamingContext 设置checkpoint 链接数据源 各种transform foreachRDD 输出 通常而言,你可能会因为要走完上面的流程而构建了一个很大的程序,比如一个main方法里上百行代码,虽然在开发小功能上足够便利,但是复用度更方面是不够的,而且不利于协作,所以需要一个更高层的开发包提供支持. 如何开发一个Spark Streaming程序 我只要在配置文件添加如下一个job配置,就可以作为标准的

开源大数据周刊-第56期

阿里云E-Mapreduce实践: 使用hadoop restful api实现对集群信息的统计本文根据hadoop/spark的RESTful API,实现了对集群基本信息的统计功能,包括HDFS文件系统.job情况.资源队列情况的统计.这些API只提供了基础的数据,具体的统计与分析,还需要基于这些基础数据做一些简单的开发. 资讯 全球因Hadoop服务器配置不当导致的数据泄露或达5120TB 网络犯罪分子近期开始针对配置不当的 Hadoop Clusters 与 CouchDB 服务器展开攻

Spark技术内幕:Shuffle Read的整体流程

回忆一下,每个Stage的上边界,要么需要从外部存储读取数据,要么需要读取上一个Stage的输出:而下边界,要么是需要写入本地文件系统(需要Shuffle),以供childStage读取,要么是最后一个Stage,需要输出结果.这里的Stage,在运行时的时候就是可以以pipeline的方式运行的一组Task,除了最后一个Stage对应的是ResultTask,其余的Stage对应的都是ShuffleMap Task. 而除了需要从外部存储读取数据和RDD已经做过cache或者checkpoin

Apache Spark源码走读(十)ShuffleMapTask计算结果的保存与读取 &amp;WEB UI和Metrics初始化及数据更新过程分析

<一>ShuffleMapTask计算结果的保存与读取 概要 ShuffleMapTask的计算结果保存在哪,随后Stage中的task又是如何知道从哪里去读取的呢,这个过程一直让我困惑不已. 用比较通俗一点的说法来解释一下Shuffle数据的写入和读取过程 每一个task负责处理一个特定的data partition task在初始化的时候就已经明确处理结果可能会产生多少个不同的data partition 利用partitioner函数,task将处理结果存入到不同的partition,这