Spark Streaming原理简析

执行流程

数据的接收

StreamingContext实例化的时候,需要传入一个SparkContext,然后指定要连接的spark matser url,即连接一个spark engine,用于获得executor。

实例化之后,首先,要指定一个接收数据的方式,如

val lines = ssc.socketTextStream("localhost", 9999)

这样从socket接收文本数据。这个步骤返回的是一个ReceiverInputDStream的实现,内含Receiver,可接收数据并转化为RDD放内存里。

ReceiverInputDStream有一个需要子类实现的方法

def getReceiver(): Receiver[T]

子类实现这个方法,worker节点调用后能得到Receiver,使得数据接收的工作能分布到worker上。

如果是local跑,由于Receiver接收数据在本地,所以在启动streaming application的时候,要注意分配的core数目要大于Receiver数目,才能腾出cpu做计算任务的调度。

Receiver需要子类实现

def onStart()
def onStop()

来定义一个数据接收器的初始化、接收到数据后如何存、如何在结束的时候释放资源。

Receiver提供了一系列store()接口,如store(ByteBuffer)store(Iterator)等等。这些store接口是实现好了的,会由worker节点上初始化的ReceiverSupervisor来完成这些存储功能。ReceiverSupervisor还会对Receiver做监控,如监控是否启动了、是否停止了、是否要重启、汇报error等等。

ReceiverSupervisor的存储接口的实现,借助的是BlockManager,数据会以RDD的形式被存放,根据StorageLevel选择不同存放策略。默认是序列化后存内存,放不下的话写磁盘(executor)。被计算出来的RDD中间结果,默认存放策略是序列化后只存内存。

ReceiverSupervisor在做putBlock操作的时候,会首先借助BlockManager存好数据,然后往ReceiverTracker发送一个AddBlock的消息。ReceiverTracker内部的ReceivedBlockTracker用于维护一个receiver接收到的所有block信息,即BlockInfo,所以AddBlock会把信息存放在ReceivedBlockTracker里。未来需要计算的时候,ReceiverTracker根据streamId,从ReceivedBlockTracker取出对应的block列表。

RateLimiter帮助控制Receiver速度,spark.streaming.receiver.maxRate参数。

数据源方面,普通的数据源为file, socket, akka, RDDs。高级数据源为Twitter, Kafka, Flume等。开发者也可以自己定制数据源。

任务调度

JobScheduler在context里初始化。当context start的时候,触发scheduler的start。

scheduler的start触发了ReceiverTrackerJobGenerator的start。这两个类是任务调度的重点。前者在worker上启动Receiver接收数据,并且暴露接口能够根据streamId获得对应的一批Block地址。后者基于数据和时间来生成任务描述。

JobScheduler内含一个线程池,用于调度任务执行。spark.streaming.concurrentJobs可以控制job并发度,默认是1,即它只能一个一个提job。

job来自JobGenerator生成的JobSetJobGenerator根据时间,生成job并且执行cp。

JobGenerator的生成job逻辑:
- 调用ReceiverTrackerallocateBlocksToBatch方法,为本批数据分配好block,即准备好数据
- 间接调用DStreamgenerateJob(time)方法,制造可执行的RDD

DStream切分RDD和生成可执行的RDD,即getOrCompute(time)
- 如果这个时间点的RDD已经生成好了,那么从内存hashmap里拿出来,否则下一步
- 如果时间是批次间隔的整数倍,则下一步,否则这个时间点不切
- 调用DStream的子类的compute方法,得到RDD。可能是一个RDD,也可以是个RDD列表
- 对每个RDD,调用persist方法,制定默认的存储策略。如果时间点合适,同时调用RDD的checkpoint方法,制定好cp策略
- 得到这些RDD后,调用SparkContext.runJob(rdd, emptyFunction)。把这整个变成一个function,生成Job类。未来会在executor上触发其runJob

JobGenerator成功生成job后,调用JobScheduler.submitJobSet(JobSet)JobScheduler会使用线程池提交JobSet中的所有job。该方法调用结束后,JobGenerator发送一个DoCheckpoint的消息,注意这里的cp是driver端元数据的cp,而不是RDD本身的cp。如果time合适,会触发cp操作,内部的CheckpointWriter类会完成write(streamingContext, time)

JobScheduler提交job的线程里,触发了job的run()方法,同时,job跑完后,JobScheduler处理JobCompleted(job)。如果job跑成功了,调用JobSethandleJobCompletion(Job),做些计时和数数工作,如果整个JobSet完成了,调用JobGeneratoronBatchCompletion(time)方法,JobGenerator接着会做clearMetadata的工作,然后JobScheduler打印输出;如果job跑失败了,JobScheduler汇报error,最后会在context里抛异常。

更多说明

特殊操作

  1. transform:可以与外部RDD交互,比如做维表的join
  2. updateStateByKey:生成StateDStream,比如做增量计算。WordCount例子
    • 每一批都需要与增量RDD进行一次cogroup之后,然后执行update function。两个RDD做cogroup过程有些开销:RDD[K, V]和RDD[K, U]合成RDD[K, List[V], List[U]],List[U]一般size是1,理解为oldvalue,即RDD[K, batchValueList, Option[oldValue]]。然后update function处理完,变成RDD[K, newValue]。
    • 批与批之间严格有序,即增量合并操作,是有序的,批之间没发并发
    • 增量RDD的分区数可以开大,即这步增量的计算可以调大并发
  3. window:batch size,window length, sliding interval三个参数组成的滑窗操作。把多个批次的RDD合并成一个UnionRDD进行计算。
  4. foreachRDD: 这个操作是一个输出操作,比较特殊。
  /**
   * Apply a function to each RDD in this DStream. This is an output operator, so
   * 'this' DStream will be registered as an output stream and therefore materialized.
   */
  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
    new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
  }

DStream.foreachRDD()操作使开发者可以直接控制RDD的计算逻辑,而不是通过DStream映射过去。所以借助这个方法,可以实现MLlib, Spark SQL与Streaming的集合,如:结合Spark SQL、DataFrame做Wordcount

Cache

如果是window操作,默认接收的数据都persist在内存里。

如果是flume, kafka源头,默认接收的数据replicate成两份存起来。

Checkpoint

与state有关的流计算,计算出来的结果RDD,会被cp到HDFS上,原文如下:

Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depends on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increase in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.

cp的时间间隔也可以设定,可以多批做一次cp。

cp的操作是同步的。

简单的不带state操作的流任务,可以不开启cp。

driver端的metadata也有cp策略。driver cp的时候是将整个StreamingContext对象写到了可靠存储里。

全文完 :)

时间: 2024-07-30 23:12:45

Spark Streaming原理简析的相关文章

PHP的错误报错级别设置原理简析

原理简析 摘录php.ini文件的默认配置(php5.4): ; Common Values: ; E_ALL (Show all errors, warnings and notices including coding standards.) ; E_ALL & ~E_NOTICE (Show all errors, except for notices) ; E_ALL & ~E_NOTICE & ~E_STRICT (Show all errors, except for

Java Annotation 及几个常用开源项目注解原理简析

文简单介绍下 Annotation 示例.概念及作用.分类.自定义.解析,并对几个 Android 开源库 Annotation 原理进行简析. PDF 版: Java Annotation.pdf, PPT 版:Java Annotation.pptx, Keynote 版:Java Annotation.key 一.Annotation 示例 Override Annotation Java 1 2 @Override public void onCreate(Bundle savedIns

腾讯Android自动化测试实战3.2 Robotium原理简析

3.2 Robotium原理简析 如前文所述,一个基本的自动化测试用例主要分为获取控件.控件操作.断言三个步骤,而在实际编写测试用例的过程中,我们常常会遇到各种各样的问题,比如: 在这样的UI结构下该如何获取控件? 为何报这样或那样的错? 明明滑动了为何没有效果? 因为不同的项目有其自身的独特性与复杂性,没有任何书籍可以解决实际过程中遇到的所有问题,甚至即使求助Google搜索也可能得不到自己想要的答案.因此,对于任何一门技术而言都很有必要知其然并知其所以然,只有了解了其原理实现,才能更高效地运

cpu工作原理简析

在了解CPU工作原理之前,我们先简单谈谈CPU是如何生产出来的.CPU是在特别纯净的硅材料上制造的.一个CPU芯片包含上百万个精巧的晶体管.人们在一块指甲盖大小的硅片上,用化学的方法蚀刻或光刻出晶体管.因此,从这个意义上说,CPU正是由晶体管组合而成的.简单而言,晶体管就是微型电子开关,它们是构建CPU的基石,你可以把一个晶体管当作一个电灯开关,它们有个操作位,分别代表两种状态:ON(开)和OFF(关).这一开一关就相当于晶体管的连通与断开,而这两种状态正好与二进制中的基础状态"0"和

Android热补丁技术—dexposed原理简析(手机淘宝采用方案)

上篇文章<Android无线开发的几种常用技术>我们介绍了几种android移动应用开发中的常用技术,其中的热补丁正在被越来越多的开发团队所使用,它涉及到dalvik虚拟机和android的一些核心技术,现在就来介绍下它的一些原理. 本篇先介绍dexposed方案:https://github.com/alibaba/dexposed,它是手机淘宝团队使用的热补丁方案,后来开源到github上,取的名字dexposed表明了自己是基于大名鼎鼎的xposed hook方案,有饮水思源.回馈开源项

ARP攻击原理简析及防御措施

0x1  简介 网络欺骗攻击作为一种非常专业化的攻击手段,给网络安全管理者,带来严峻的考验.网络安全的战场已经从互联网蔓延到用户内部的网络,特别是局域网.目前利用ARP欺骗的木马病毒在局域网中广泛传播,导致网络随机掉线甚至整体瘫痪,通讯被窃听,信息被篡改等严重后果. 0x2  ARP协议概述 ARP协议(address resolution protocol)地址解析协议 一台主机和另一台主机通信,要知道目标的IP地址,但是在局域网中传输数据的网卡却不能直接识别IP地址,所以用ARP解析协议将I

委托的使用与原理简析

一.委托声明与本质 1.声明委托 public delegate void SayHelloDelegate(string who); 2.使用ILSpy反编译后,看其本质 public class auto ansi sealed SayHelloDelegate: MulticastDelegate 编译器自动生成了一个委托类,继承自MulticastDelegate 委托被标识为class,说明委托是一种数据类型:类 委托类即可嵌套在一个类型中定义,也可以在全局范围中定义,就是说由于委托是

JavaScript mapreduce工作原理简析_基础知识

谷歌在2003到2006年间连续发表了三篇非常有影响力的文章,分别是2003年在SOSP上发布的GFS,2004年在OSDI上发布的MapReduce,以及2006年在OSDI上发布的BigTable.GFS是文件系统相关的,其对后来的分布式文件系统设计具有指导意义:MapReduce是一种并行计算的编程模型,用于作业调度:BigTable是一个用于管理结构化数据的分布式存储系统,构建在GFS.Chubby.SSTable等Google技术之上.相当多的Google应用使用了这三种技术,比如Go

搜索引擎原理简析 不懂搜索引擎原理的SEOer就是在裸奔

中介交易 http://www.aliyun.com/zixun/aggregation/6858.html">SEO诊断 淘宝客 云主机 技术大厅 不懂搜索引擎原理的SEOer就是在裸奔. 嗯,在结束废话之前,再插一句:中国第一个基于网页索引搜索的搜索引擎是北大的天网. 好,先上图来简单看下搜索引擎的"三板斧":数据搜集->预处理[索引]->排名. 数据搜集 即数据的搜集阶段,将网页从浩如瀚海的9201.html">互联网世界搜集到自己的数