Spark修炼之道(进阶篇)——Spark入门到精通:第十节 Spark Streaming(一)

本节主要内容

本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-operations

  1. Spark流式计算简介
  2. Spark Streaming相关核心类
  3. 入门案例

1. Spark流式计算简介

Hadoop的MapReduce及Spark SQL等只能进行离线计算,无法满足实时性要求较高的业务需求,例如实时推荐、实时网站性能分析等,流式计算可以解决这些问题。目前有三种比较常用的流式计算框架,它们分别是Storm,Spark Streaming和Samza,各个框架的比较及使用情况,可以参见:http://www.csdn.net/article/2015-03-09/2824135。本节对Spark Streaming进行重点介绍,Spark Streaming作为Spark的五大核心组件之一,其原生地支持多种数据源的接入,而且可以与Spark MLLib、Graphx结合起来使用,轻松完成分布式环境下在线机器学习算法的设计。Spark支持的输入数据源及输出文件如下图所示:

在后面的案例实战当中,会涉及到这部分内容。中间的”Spark Streaming“会对输入的数据源进行处理,然后将结果输出,其内部工作原理如下图所示:


Spark Streaming接受实时传入的数据流,然后将数据按批次(batch)进行划分,然后再将这部分数据交由Spark引擎进行处理,处理完成后将结果输出到外部文件。

先看下面一段基于Spark Streaming的word count代码,它可以很好地帮助初步理解流式计算

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: StreamingWordCount <directory>")
      System.exit(1)
    }

    //创建SparkConf对象
    val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")
    // Create the context
    //创建StreamingContext对象,与集群进行交互
    val ssc = new StreamingContext(sparkConf, Seconds(20))

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    //如果目录中有新创建的文件,则读取
    val lines = ssc.textFileStream(args(0))
    //分割为单词
    val words = lines.flatMap(_.split(" "))
    //统计单词出现次数
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    //打印结果
    wordCounts.print()
    //启动Spark Streaming
    ssc.start()
    //一直运行,除非人为干预再停止
    ssc.awaitTermination()
  }
}

运行上面的程序后,再通过命令行界面,将文件拷贝到相应的文件目录,具体如下:

程序在运行时,根据文件创建时间对文件进行处理,在上一次运行时间后创建的文件都会被处理,输出结果如下:

2. Spark Streaming相关核心类

1. DStream(discretized stream)

Spark Streaming提供了对数据流的抽象,它就是DStream,它可以通过前述的 Kafka, Flume等数据源创建,DStream本质上是由一系列的RDD构成。各个RDD中的数据为对应时间间隔( interval)中流入的数据,如下图所示:

对DStream的所有操作最终都要转换为对RDD的操作,例如前面的StreamingWordCount程序,flatMap操作将作用于DStream中的所有RDD,如下图所示:

2.StreamingContext
在Spark Streaming当中,StreamingContext是整个程序的入口,其创建方式有多种,最常用的是通过SparkConf来创建:

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

创建StreamingContext对象时会根据SparkConf创建SparkContext

  /**
   * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
   * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
   * @param batchDuration the time interval at which streaming data will be divided into batches
   */
  def this(conf: SparkConf, batchDuration: Duration) = {
    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
  }

也就是说StreamingContext是对SparkContext的封装,StreamingContext还有其它几个构造方法,感兴趣的可以了解,后期在源码解析时会对它进行详细的讲解,创建StreamingContext时会指定batchDuration,它用于设定批处理时间间隔,需要根据应用程序和集群资源情况去设定。当创建完成StreamingContext之后,再按下列步骤进行:

  1. 通过输入源创建InputDStreaim
  2. 对DStreaming进行transformation和output操作,这样操作构成了后期流式计算的逻辑
  3. 通过StreamingContext.start()方法启动接收和处理数据的流程
  4. 使用streamingContext.awaitTermination()方法等待程序处理结束(手动停止或出错停止)
  5. 也可以调用streamingContext.stop()方法结束程序的运行

关于StreamingContext有几个值得注意的地方:

1.StreamingContext启动后,增加新的操作将不起作用。也就是说在StreamingContext启动之前,要定义好所有的计算逻辑
2.StreamingContext停止后,不能重新启动。也就是说要重新计算的话,需要重新运行整个程序。
3.在单个JVM中,一段时间内不能出现两个active状态的StreamingContext
4.调用StreamingContext的stop方法时,SparkContext也将被stop掉,如果希望StreamingContext关闭时,保留SparkContext,则需要在stop方法中传入参数stopSparkContext=false
/**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed). By default, if stopSparkContext is not specified, the underlying
* SparkContext will also be stopped. This implicit behavior can be configured using the
* SparkConf configuration spark.streaming.stopSparkContextByDefault.
*
* @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
*/
def stop(
stopSparkContext: Boolean = conf.getBoolean(“spark.streaming.stopSparkContextByDefault”, true)
): Unit = synchronized {
stop(stopSparkContext, false)
}
5.SparkContext对象可以被多个StreamingContexts重复使用,但需要前一个StreamingContexts停止后再创建下一个StreamingContext对象。

3. InputDStreams及Receivers
InputDStream指的是从数据流的源头接受的输入数据流,在前面的StreamingWordCount程序当中,val lines = ssc.textFileStream(args(0)) 就是一种InputDStream。除文件流外,每个input DStream都关联一个Receiver对象,该Receiver对象接收数据源传来的数据并将其保存在内存中以便后期Spark处理。

Spark Streaimg提供两种原生支持的流数据源:

  1. Basic sources(基础流数据源)。直接通过StreamingContext API创建,例如文件系统(本地文件系统及分布式文件系统)、Socket连接及Akka的Actor。
    文件流(File Streams)的创建方式:
    a. streamingContext.fileStreamKeyClass, ValueClass, InputFormatClass
    b. streamingContext.textFileStream(dataDirectory)
    实时上textFileStream方法最终调用的也是fileStream方法
    def textFileStream(directory: String): DStream[String] = withNamedScope(“text file stream”) {
    fileStreamLongWritable, Text, TextInputFormat.map(_._2.toString)
    }

    基于Akka Actor流数据的创建方式:
    streamingContext.actorStream(actorProps, actor-name)

    基于Socket流数据的创建方式:
    ssc.socketTextStream(hostname: String,port: Int,storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)

    基于RDD队列的流数据创建方式:
    streamingContext.queueStream(queueOfRDDs)

  2. Advanced sources(高级流数据源)。如Kafka, Flume, Kinesis, Twitter等,需要借助外部工具类,在运行时需要外部依赖(下一节内容中介绍)

Spark Streaming还支持用户
3. Custom Sources(自定义流数据源),它需要用户定义receiver,该部分内容也放在下一节介绍

最后有两个需要注意的地方:

  1. 在本地运行Spark Streaming时,master URL不能使用“local” 或 “local[1]”,因为当input DStream与receiver(如sockets, Kafka, Flume等)关联时,receiver自身就需要一个线程来运行,此时便没有线程去处理接收到的数据。因此,在本地运行SparkStreaming程序时,要使用“local[n]”作为master URL,n要大于receiver的数量。
  2. 在集群上运行Spark Streaming时,分配给Spark Streaming程序的CPU核数也必须大于receiver的数量,否则系统将只接受数据,无法处理数据。

3. 入门案例

为方便后期查看运行结果,修改日志级别为Level.WARN

import org.apache.spark.Logging

import org.apache.log4j.{Level, Logger}

/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {

  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}
  1. NetworkWordCount
    基于Socket流数据
object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }
    //修改日志层次为Level.WARN
    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    //创建SocketInputDStream,接收来自ip:port发送来的流数据
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

配置运行时参数

使用

//启动netcat server
root@sparkmaster:~/streaming# nc -lk 9999

运行NetworkWordCount 程序,然后在netcat server运行的控制台输入任意字符串

root@sparkmaster:~/streaming# nc -lk 9999
Hello WORLD
HELLO WORLD WORLD
TEWST
NIMA

  1. QueueStream
    基于RDD队列的流数据
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object QueueStream {

  def main(args: Array[String]) {

    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("QueueStream").setMaster("local[4]")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create the queue through which RDDs can be pushed to
    // a QueueInputDStream
    //创建RDD队列
    val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()

    // Create the QueueInputDStream and use it do some processing
    // 创建QueueInputDStream
    val inputStream = ssc.queueStream(rddQueue)

    //处理队列中的RDD数据
    val mappedStream = inputStream.map(x => (x % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)

    //打印结果
    reducedStream.print()

    //启动计算
    ssc.start()

    // Create and push some RDDs into
    for (i <- 1 to 30) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 3000, 10)
      Thread.sleep(1000)

    //通过程序停止StreamingContext的运行
    ssc.stop()
  }
}

时间: 2024-09-20 18:25:04

Spark修炼之道(进阶篇)——Spark入门到精通:第十节 Spark Streaming(一)的相关文章

Spark修炼之道(进阶篇)——Spark入门到精通:第一节 Spark 1.5.0集群搭建

作者:周志湖 网名:摇摆少年梦 微信号:zhouzhihubeyond 本节主要内容 操作系统环境准备 Hadoop 2.4.1集群搭建 Spark 1.5.0 集群部署 注:在利用CentOS 6.5操作系统安装spark 1.5集群过程中,本人发现Hadoop 2.4.1集群可以顺利搭建,但在Spark 1.5.0集群启动时出现了问题(可能原因是64位操作系统原因,源码需要重新编译,但本人没经过测试),经本人测试在ubuntu 10.04 操作系统上可以顺利成功搭建.大家可以利用CentOS

Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运行流程解析

1.整体运行流程 使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits

Scala入门到精通——第二十节 类型参数(二)

本节主要内容 Ordering与Ordered特质 上下文界定(Context Bound) 多重界定 类型约束 1. Ordering与Ordered特质 在介绍上下文界定之前,我们对scala中的Ordering与Ordered之间的关联与区别进行讲解,先看Ordering.Ordered的类继承层次体系: 通过上面两个图可以看到,Ordering混入了java中的Comparator接口,而Ordered混入了java的Comparable接口,我们知道java中的Comparator是一

Scala入门到精通——第十节 Scala类层次结构、Traits初步

本节主要内容 Scala类层次结构总览 Scala中原生类型的实现方式解析 Nothing.Null类型解析 Traits简介 Traits几种不同使用方式 1 Scala类层次结构 Scala中的类层次结构图如下: 来源:Programming in Scala 从上面的类层次结构图中可以看到,处于继承层次最顶层的是Any类,它是scala继承的根类,scala中所有的类都是它的子类 Any类中定义了下面几个方法: //==与!=被声明为final,它们不能被子类重写 final def ==

Spark修炼之道系列教程预告

课程内容 Spark修炼之道(基础篇)--Linux基础(15讲).Akka分布式编程(8讲) Spark修炼之道(进阶篇)--Spark入门到精通(30讲) Spark修炼之道(实战篇)--Spark应用开发实战篇(20讲) Spark修炼之道(高级篇)--Spark源码解析(50讲) 部分内容会在实际编写时动态调整,或补充.或删除. Spark修炼之道(基础篇)--Linux大数据开发基础(15讲). Linux大数据开发基础--第一节:Ubuntu Linux安装与介绍 Linux大数据开

Spark修炼之道——Spark学习路线、课程大纲

课程内容 Spark修炼之道(基础篇)--Linux基础(15讲).Akka分布式编程(8讲) Spark修炼之道(进阶篇)--Spark入门到精通(30讲) Spark修炼之道(实战篇)--Spark应用开发实战篇(20讲) Spark修炼之道(高级篇)--Spark源码解析(50讲) 部分内容会在实际编写时动态调整,或补充.或删除. Spark修炼之道(基础篇)--Linux大数据开发基础(15讲). Linux大数据开发基础--第一节:Ubuntu Linux安装与介绍 Linux大数据开

Spark修炼之道(进阶篇)——Spark入门到精通:第十二节 Spark Streaming—— DStream Window操作

作者:周志湖 微信号:zhouzhihubeyond 本节主要内容 Window Operation 入门案例 1. Window Operation Spark Streaming提供窗口操作(Window Operation),如下图所示: 上图中,红色实线表示窗口当前的滑动位置,虚线表示前一次窗口位置,窗口每滑动一次,落在该窗口中的RDD被一起同时处理,生成一个窗口DStream(windowed DStream),窗口操作需要设置两个参数: (1)窗口长度(window length),

Spark修炼之道(进阶篇)——Spark入门到精通:第三节 Spark Intellij IDEA开发环境搭建

作者:周志湖 网名:摇摆少年梦 微信号:zhouzhihubeyond 本节主要内容 Intellij IDEA 14.1.4开发环境配置 Spark应用程序开发 1. Intellij IDEA 14.1.4开发环境配置 Intellij IDEA 功能十分强大,能够开发JAVA.Scala等相关应用程序,在依赖管理 智能提示等方面做到了极致,大家可以到:http://www.jetbrains.com/idea/download/下载,目前有两种:Ultimate Edition Free

Spark修炼之道(基础篇)——Linux大数据开发基础:第六节:vi、vim编辑器(二)

本节主要内容 缓冲区的使用 文件的存盘与读盘 文本查找 文本替换 作者:周志湖 微信号:zhouzhihubeyond 网名:摇摆少年梦 1. 缓冲区的使用 在利用vim进行文本编辑时,编辑修改后的文本不会立即保存到硬盘上,而是保存在缓冲区中,如果没有把缓冲区里的文件存盘,原始文件不会被更改.vim在打开文件时将文本内容读到缓冲区中,在进行文本编辑时,修改的文本保存在缓冲区,此时硬盘上的原文件不变.下面让我们来演示一下缓冲区的使用. 假设采用vim 同时打开两个文本文件: root@ubuntu

Spark修炼之道(高级篇)——Spark源码阅读:第二节 SparkContext的创建

博文推荐:http://blog.csdn.net/anzhsoft/article/details/39268963,由大神张安站写的Spark架构原理,使用Spark版本为1.2,本文以Spark 1.5.0为蓝本,介绍Spark应用程序的执行流程. 本文及后面的源码分析都以下列代码为样板 import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount{ def main(args: Array[String])