一、缓存或持久化
和RDD相似,DStreams也允许开发者持久化流数据到内存中。在DStream上使用 persist() 方法可以自动地持久化DStream中的RDD到内存中。如果DStream中的数据需要计算多次,这是非常有用的。像reduceByWindow 和 reduceByKeyAndWindow 这种窗口操作、 updateStateByKey 这种基于状态的操作,持久化是默认的,不需要开发者调用 persist() 方法。 例如通过网络(如kafka,flume等)获取的输入数据流,默认的持久化策略是复制数据到两个不同的节点以容错。 注意,与RDD不同的是,DStreams默认持久化级别是存储序列化数据到内存中。
二、Checkpointing
一个流应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等)。为了使这成为可能,Spark Streaming需要checkpoint足够的信息到容错存储系统中,以使系统从故障中恢复。 Metadata checkpointing:保存流计算的定义信息到容错存储系统如HDFS中。这用来恢复应用程序中运行worker的节点的故障。元数据包括 1.Configuration :创建Spark Streaming应用程序的配置信息 2.DStream operations :定义Streaming应用程序的操作集合 3.Incomplete batches:操作存在队列中的未完成的批 4.Data checkpointing :保存生成的RDD到可靠的存储系统中,这在有状态transformation(如结合跨多个批次的数据)中是必须的。在这样一个transformation中,生成的RDD依赖于之前批的RDD,随着时间的推移,这个依赖链的长度会持续增长。在恢复的过程中,为了避免这种无限增长。有状态的transformation的中间RDD将会定时地存储到可靠存储系统中,以截断这个依链。 元数据checkpoint主要是为了从driver故障中恢复数据。如果transformation操作被用到了,数据checkpoint即使在简单的操作中都是必须的。 何时checkpoint: 应用程序在下面两种情况下必须开启checkpoint: 1.使用有状态的transformation。如果在应用程序中用到了 updateStateByKey 或者reduceByKeyAndWindow ,checkpoint目录必需提供用以定期checkpoint RDD。 2.从运行应用程序的driver的故障中恢复过来。使用元数据checkpoint恢复处理信息。 注意,没有前述的有状态的transformation的简单流应用程序在运行时可以不开启checkpoint。在这种情况下,从driver故障的恢复将是部分恢复(接收到了但是还没有处理的数据将会丢失)。这通常是可以接受的,许多运行的Spark Streaming应用程序都是这种方式。 怎样配置Checkpointing: 在容错、可靠的文件系统(HDFS、s3等)中设置一个目录用于保存checkpoint信息。这可以通过streamingContext.checkpoint(checkpointDirectory) 方法来做。这运行你用之前介绍的有状态transformation。另外,如果你想从driver故障中恢复,你应该以下面的方式重写你的Streaming应用程序。 1.当应用程序是第一次启动,新建一个StreamingContext,启动所有Stream,然后调用 start() 方法 2.当应用程序因为故障重新启动,它将会从checkpoint目录checkpoint数据重新创建StreamingContext // Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, context. ... // Start the context context.start() context.awaitTermination() 如果 checkpointDirectory 存在,上下文将会利用checkpoint数据重新创建。如果这个目录不存在,将会调用 functionToCreateContext 函数创建一个新的上下文,建立DStreams。 除了使用 getOrCreate ,开发者必须保证在故障发生时,driver处理自动重启。只能通过部署运行应用程序的基础设施来达到该目的。 注意,RDD的checkpointing有存储成本。这会导致批数据(包含的RDD被checkpoint)的处理时间增加。因此,需要小心的设置批处理的时间间隔。在最小的批容量(包含1秒的数据)情况下,checkpoint每批数据会显著的减少操作的吞吐量。相反,checkpointing太少会导致谱系以及任务大小增大,这会产生有害的影响。因为有状态的transformation需要RDD checkpoint。默认的间隔时间是批间隔时间的倍数,最少10秒。它可以通过 dstream.checkpoint 来设置。典型的情况下,设置checkpoint间隔是DStream的滑动间隔的5-10大小是一个好的尝试。
三、部署应用程序和升级应用程序
**部署应用程序**: 运行一个Spark Streaming应用程序,有下面一些步骤: 1.有管理器的集群-这是任何Spark应用程序都需要的需求,详见部署指南 2.将应用程序打为jar包-你必须编译你的应用程序为jar包。如果你用spark-submit启动应用程序,你不需要将Spark和Spark Streaming打包进这个jar包。如果你的应用程序用到了高级源(如kafka,flume),你需要将它们关联的外部artifact以及它们的依赖打包进需要部署的应用程序jar包中。例如,一个应用程序用到了 TwitterUtils ,那么就需要将 spark-streaming-twitter_2.10 以及它的所有依赖打包到应用程序jar中。 3.为executors配置足够的内存-因为接收的数据必须存储在内存中,executors必须配置足够的内存用来保存接收的数据。注意,如果你正在做10分钟的窗口操作,系统的内存要至少能保存10分钟的数据。所以,应用程序的内存需求依赖于使用它的操作。 4.配置checkpointing-如果stream应用程序需要checkpointing,然后一个与Hadoop API兼容的容错存储目录必须配置为检查点的目录,流应用程序将checkpoint信息写入该目录用于错误恢复。 5.配置应用程序driver的自动重启-为了自动从driver故障中恢复,运行流应用程序的部署设施必须能监控driver进程,如果失败了能够重启它。不同的集群管理器,有不同的工具得到该功能 6.Spark Standalone:一个Spark应用程序driver可以提交到Spark独立集群运行,也就是说driver运行在一个worker节点上。进一步来看,独立的集群管理器能够被指示用来监控driver,并且在driver失败(或者是由于非零的退出代码如exit(1),或者由于运行driver的节点的故障)的情况下重启driver。 7.YARN:YARN为自动重启应用程序提供了类似的机制。 8.Mesos: Mesos可以用Marathon提供该功能 9.配置write ahead logs-在Spark 1.2中,为了获得极强的容错保证,我们引入了一个新的实验性的特性-预写日志(write ahead logs)。如果该特性开启,从receiver获取的所有数据会将预写日志写入配置的checkpoint目录。这可以防止driver故障丢失数据,从而保证零数据丢失。这个功能可以通过设置配置参数 spark.streaming.receiver.writeAheadLogs.enable 为true来开启。然而,这些较强的语义可能以receiver的接收吞吐量为代价。这可以通过并行运行多个receiver增加吞吐量来解决。另外,当预写日志开启时,Spark中的复制数据的功能推荐不用,因为该日志已经存储在了一个副本在存储系统中。可以通过设置输入DStream的存储级别为 StorageLevel.MEMORY_AND_DISK_SER 获得该功能。 **升级应用程序代码**: 如果运行的Spark Streaming应用程序需要升级,有两种可能的方法: 1.启动升级的应用程序,使其与未升级的应用程序并行运行。一旦新的程序(与就程序接收相同的数据)已经准备就绪,旧的应用程序就可以关闭。这种方法支持将数据发送到两个不同的目的地(新程序一个,旧程序一个) 2.首先,平滑的关闭( StreamingContext.stop(...) 或JavaStreamingContext.stop(...) )现有的应用程序。在关闭之前,要保证已经接收的数据完全处理完。然后,就可以启动升级的应用程序,升级的应用程序会接着旧应用程序的点开始处理。这种方法仅支持具有源端缓存功能的输入源(如flume,kafka),这是因为当旧的应用程序已经关闭,升级的应用程序还没有启动的时候,数据需要被缓存。
四、监控应用程序
除了Spark的监控功能,Spark Streaming增加了一些专有的功能。应用StreamingContext的时候,Spark web UI显示添加的 Streaming 菜单,用以显示运行的receivers(receivers是否是存活状态、接收的记录数、receiver错误等)和完成的批的统计信息(批处理时间、队列等待等待)。这可以用来监控流应用程序的处理过程。 在WEB UI中的 Processing Time 和 Scheduling Delay 两个度量指标是非常重要的。第一个指标表示批数据处理的时间,第二个指标表示前面的批处理完毕之后,当前批在队列中的等待时间。如果批处理时间比批间隔时间持续更长或者队列等待时间持续增加,这就预示系统无法以批数据产生的速度处理这些数据,整个处理过程滞后了。在这种情况下,考虑减少批处理时间。 Spark Streaming程序的处理过程也可以通过StreamingListener接口来监控,这个接口允许你获得receiver状态和处理时间。注意,这个接口是开发者API,它有可能在未来提供更多的信息。
五、性能调优
集群中的Spark Streaming应用程序获得最好的性能需要一些调整。这章将介绍几个参数和配置,提高Spark Streaming应用程序的性能。你需要考虑两件事情: 1.高效地利用集群资源减少批数据的处理时间 2.设置正确的批容量(size),使数据的处理速度能够赶上数据的接收速度 减少批数据的执行时间 设置正确的批容量 内存调优
六、减少批数据的执行时间
数据接收的并行水平: 通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个 receiver (运行在worker机器上)接收单个数据流。创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream可以被切分为两个kafka输入流,每个接收一个topic。这将在两个worker上运行两个 receiver ,因此允许数据并行接收,提高整体的吞吐量。多个DStream可以被合并生成单个DStream,这样运用在单个输入DStream的transformation操作以运用在合并的DStream上。 val numStreams = 5 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream = streamingContext.union(kafkaStreams) unifiedStream.print() 另外一个需要考虑的参数是 receiver 的阻塞时间。对于大部分的 receiver ,在存入Spark内存之前,接收的数据都被合并成了一个大数据块。每批数据中块的个数决定了任务的个数。这任务是用类似map的transformation操作接收的数据。阻塞间隔由配置参数 spark.streaming.blockInterval 决定,默认的值是200毫秒。 多输入流或者多 receiver 的可选的方法是明确地重新分配输入数据流(利用inputStream.repartition() ),在进一步操作之前,通过集群的机器数分配接收的批数据。 数据处理的并行水平: 如果运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。例如,对于分布式reduce操作如 reduceByKey 和 reduceByKeyAndWindow ,默认的并发任务数通过配置属性来确定(configuration.html#spark-properties) spark.default.parallelism 。你可以通过参数(PairDStreamFunctions(api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions))传递并行度,或者设置参数 spark.default.parallelism 修改默认值。 数据序列化: 数据序列化的总开销是平常大的,特别是当sub-second级的批数据被接收时。下面有两个相关点: Spark中RDD数据的序列化。关于数据序列化请参照Spark优化指南。注意,与Spark不同的是,默认的RDD会被持久化为序列化的字节数组,以减少与垃圾回收相关的暂停。 输入数据的序列化。从外部获取数据存到Spark中,获取的byte数据需要从byte反序列化,然后再按照Spark的序列化格式重新序列化到Spark中。因此,输入数据的反序列化花费可能是一个瓶颈。 任务的启动开支: 每秒钟启动的任务数是非常大的(50或者更多)。发送任务到slave的花费明显,这使请求很难获得亚秒(sub-second)级别的反应。通过下面的改变可以减小开支 1.任务序列化。运行kyro序列化任何可以减小任务的大小,从而减小任务发送到slave的时间。 2.执行模式。在Standalone模式下或者粗粒度的Mesos模式下运行Spark可以在比细粒度Mesos模式下运行Spark获得更短的任务启动时间。可以在在Mesos下运行Spark中获取更多信息。
七、设置正确的批容量
为了Spark Streaming应用程序能够在集群中稳定运行,系统应该能够以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这可以通过流的网络UI观察得到。批处理时间应该小于批间隔时间。 根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率可以通过应用程序维持。可以考虑 WordCountNetwork 这个例子,对于一个特定的数据处理速率,系统可能可以每2秒打印一次单词计数(批间隔时间为2秒),但无法每500毫秒打印一次单词计数。所以,为了在生产环境中维持期望的数据Spark 处理速率,就应该设置合适的批间隔时间(即批数据的容量)。 找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。为了验证你的系统是否能满足数据处理速率,你可以通过检查端到端的延迟值来判断(可以在Spark驱动程序的log4j日志中查看"Total delay"或者利用StreamingListener接口)。如果延迟维持稳定,那么系统是稳定的。如果延迟持续增长,那么系统无法跟上数据处理速率,是不稳定的。你能够尝试着增加数据处理速率或者减少批容量来作进一步的测试。注意,因为瞬间的数据处理速度增加导致延迟瞬间的增长可能是正常的,只要延迟能重新回到了低值(小于批容量)。
八、内存调优
调整内存的使用以及Spark应用程序的垃圾回收行为已经在Spark优化指南中详细介绍。在这一节,我们重点介绍几个强烈推荐的自定义选项,它们可以减少Spark Streaming应用程序垃圾回收的相关暂停,获得更稳定的批处理时间。 1.Default persistence level of DStreams:和RDDs不同的是,默认的持久化级别是序列化数据到内存中(DStream是 StorageLevel.MEMORY_ONLY_SER ,RDD是 StorageLevel.MEMORY_ONLY )。即使保存数据为序列化形态会增加序列化/反序列化的开销,但是可以明显的减少垃圾回收的暂停。 2.Clearing persistent RDDs:默认情况下,通过Spark内置策略(LUR),Spark Streaming生成的持久化RDD将会从内存中清理掉。如果spark.cleaner.ttl已经设置了,比这个时间存在更老的持久化RDD将会被定时的清理掉。正如前面提到的那样,这个值需要根据Spark Streaming应用程序的操作小心设置。然而,可以设置配置选项 spark.streaming.unpersist 为true来更智能的去持久化(unpersist)RDD。这个配置使系统找出那些不需要经常保有的RDD,然后去持久化它们。这可以减少Spark RDD的内存使用,也可能改善垃圾回收的行为。 3.Concurrent garbage collector:使用并发的标记-清除垃圾回收可以进一步减少垃圾回收的暂停时间。尽管并发的垃圾回收会减少系统的整体吞吐量,但是仍然推荐使用它以获得更稳定的批处理时间。
九、容错语义
RDD的基本容错语义: 1.一个RDD是不可变的、确定可重复计算的、分布式数据集。每个RDD记住一个确定性操作的谱系(lineage),这个谱系用在容错的输入数据集上来创建该RDD。 2.如果任何一个RDD的分区因为节点故障而丢失,这个分区可以通过操作谱系从源容错的数据集中重新计算得到。 3.假定所有的RDD transformations是确定的,那么最终转换的数据是一样的,不论Spark机器中发生何种错误。 4.Data received and replicated :在单个worker节点的故障中,这个数据会幸存下来,因为有另外一个节点保存有这个数据的副本。 5.Data received but buffered for replication:因为没有重复保存,所以为了恢复数据,唯一的办法是从源中重新读取数据。 重点关心的两种: 1.worker节点故障:任何运行executor的worker节点都有可能出故障,那样在这个节点中的所有内存数据都会丢失。如果有任何receiver运行在错误节点,它们的缓存数据将会丢失 2.Driver节点故障:如果运行Spark Streaming应用程序的Driver节点出现故障,很明显SparkContext将会丢失,所有执行在其上的executors也会丢失。 作为输入源的文件语义: 如果所有的输入数据都存在于一个容错的文件系统如HDFS,Spark Streaming总可以从任何错误中恢复并且执行所有数据。这给出了一个恰好一次(exactly-once)语义,即无论发生什么故障,所有的数据都将会恰好处理一次。 基于receiver的输入源语义: 对于基于receiver的输入源,容错的语义既依赖于故障的情形也依赖于receiver的类型。正如之前讨论的,有两种类型的receiver: 1.Reliable Receiver:这些receivers只有在确保数据复制之后才会告知可靠源。如果这样一个receiver失败了,缓冲(非复制)数据不会被源所承认。如果receiver重启,源会重发数据,因此不会丢失数据。 2.Unreliable Receiver:当worker或者driver节点故障,这种receiver会丢失数据 输出操作的语义: 根据其确定操作的谱系,所有数据都被建模成了RDD,所有的重新计算都会产生同样的结果。所有的DStream transformation都有exactly-once语义。那就是说,即使某个worker节点出现故障,最终的转换结果都是一样。然而,输出操作(如 foreachRDD )具有 at-least once 语义,那就是说,在有worker事件故障的情况下,变换后的数据可能被写入到一个外部实体不止一次。利用 saveAs***Files 将数据保存到HDFS中的情况下,以上写多次是能够被接受的(因为文件会被相同的数据覆盖)。