一、关联
与Spark类似,Spark Streaming也可以利用maven仓库。编写你自己的Spark Streaming程序,你需要引入下面的依赖到你的SBT或者Maven项目中 org.apache.spark spark-streaming_2.10 1.2 为了从Kafka, Flume和Kinesis这些不在Spark核心API中提供的源获取数据,我们需要添加相关的模块spark-streaming-xyz_2.10 到依赖中 以下为一些常用组件 kafka:spark-streaming-kafka_2.10 flume:spark-streaming-flume_2.10 Kinesis:spark-streaming-kinesis-asl_2.10 Twitter:spark-streaming-twitter_2.10 ZeroMQ:spark-streaming-zeromq_2.10 MQTT:spark-streaming-mqtt_2.10
二、初始化StreamingContext
为了初始化Spark Streaming程序,一个StreamingContext对象必需被创建,它是Spark Streaming所有流操作的主要入口。一个StreamingContext对象可以用SparkConf对象创建。 import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) appName 表示你的应用程序显示在集群UI上的名字, master 是一个Spark、Mesos、YARN集群URL或者一个特殊字符串“local[*]”,它表示程序用本地模式运行。当程序运行在集群中时,你并不希望在程序中硬编码 master ,而是希望用 spark-submit启动应用程序,并从 spark-submit中得到 master 的值。对于本地测试或者单元测试,你可以传递“local”字符串在同一个进程内运行Spark Streaming。需要注意的是,它在内部创建了一个SparkContext对象,你可以通过 ssc.sparkContext 访问这个SparkContext对象。 当一个上下文(context)定义之后,你必须按照以下几步进行操作: 1.定义输入源 2.准备好流计算指令 3.利用 streamingContext.start() 方法接收和处理数据 4.处理过程将一直持续,直到 streamingContext.stop() 方法被调用 需要注意的地方: 1.一旦一个context已经启动,就不能有新的流算子建立或者是添加到context中。 2.一旦一个context已经停止,它就不能再重新启动 3.在JVM中,同一时间只能有一个StreamingContext处于活跃状态 4.在StreamingContext上调用 stop() 方法,也会关闭SparkContext对象。如果只想仅关闭 5.StreamingContext对象,设置 stop() 的可选参数为false 6.一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的 7.StreamingContext在后面StreamingContext创建之前关闭(不关闭SparkContext)。
三、离散流(DStreams)
离散流或者DStreams是Spark Streaming提供的基本的抽象,它代表一个连续的数据流。它要么是从源中获取的输入流,要么是输入流通过转换算子生成的处理后的数据流。在内部,DStreams由一系列连续的RDD组成。DStreams中的每个RDD都包含确定时间间隔内的数据。 任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作。
四、输入DStreams和receivers
输入DStreams表示从数据源获取输入数据流的DStreams。在SparkStreaming快速例子中, lines 表示输入DStream,它代表从netcat服务器获取的数据流。每一个输入流DStream和一个 Receiver 对象相关联,这个 Receiver从源中获取数据,并将数据存入内存中用于处理。 输入DStreams表示从数据源获取的原始数据流。Spark Streaming拥有两类数据源: 基本源(Basic sources):这些源在StreamingContext API中直接可用。例如文件系统、套接字连接、Akka的actor等。 高级源(Advanced sources):这些源包括Kafka,Flume,Kinesis,Twitter等等。它们需要通过额外的类来使用。 需要注意的是,如果你想在一个流应用中并行地创建多个输入DStream来接收多个数据流,你能够创建多个输入流。它将创建多个Receiver同时接收多个数据流。但是, receiver作为一个长期运行的任务运行在Spark worker或executor中。因此,它占有一个核,这个核是分配给Spark Streaming应用程序的所有核中的一个(it occupies one of the cores allocated to the SparkStreaming application)。所以,为Spark Streaming应用程序分配足够的核(如果是本地运行,那么是线程)用以处理接收的数据并且运行 receiver 是非常重要的。 几点需要注意的地方: 如果分配给应用程序的核的数量少于或者等于输入DStreams或者receivers的数量,系统只能够接收数据而不能处理它们。 当运行在本地,如果你的master URL被设置成了“local”,这样就只有一个核运行任务。这对程序来说是不足的,因为作为 receiver 的输入DStream将会占用这个核,这样就没有剩余的核来处理数据了。 基本源: 文件流(File Streams):从任何与HDFS API兼容的文件系统中读取数据,一个DStream可以通过如下方式创建。 需要注意的地方: 1.所有文件必须具有相同的数据格式 2.所有文件必须在`dataDirectory`目录下创建,文件是自动的移动和重命名到数据目录下 3.一旦移动,文件必须被修改。所以如果文件被持续的附加数据,新的数据不会被读取。 基于自定义actor的流: DStream可以调streamingContext.actorStream(actorProps, actor-name) 方法从Akka actors获取的数据流来创建。 RDD队列作为数据流: 为了用测试数据测试Spark Streaming应用程序,人们也可以调用streamingContext.queueStream(queueOfRDDs) 方法基于RDD队列创建DStreams。每个push到队列的RDD都被当做DStream的批数据,像流一样处理。 高级源: 这类源需要非Spark库接口,并且它们中的部分还需要复杂的依赖(例如kafka和flume)。 自定义源: 在Spark 1.2中,这些源不被Python API支持。输入DStream也可以通过自定义源创建,你需要做的是实现用户自定义的 receiver ,这个 receiver 可以从自定义源接收数据以及将数据推到Spark中。 Receiver可靠性 基于可靠性有两类数据源。源(如kafka、flume)允许。如果从这些可靠的源获取数据的系统能够正确的应答所接收的数据,它就能够确保在任何情况下不丢失数据。这样,就有两种类型的receiver: Reliable Receiver:一个可靠的receiver正确的应答一个可靠的源,数据已经收到并且被正确地复制到了Spark中。 Unreliable Receiver :这些receivers不支持应答。即使对于一个可靠的源,开发者可能实现一个非可靠的receiver,这个receiver不会正确应答。
五、DStream中的转换(transformation)
和RDD类似,transformation允许从输入DStream来的数据被修改。DStreams支持很多在RDD中可用的transformation算子。一些常用的算子如下所示: map(func): 利用函数 func 处理原DStream的每个元素,返回一个新的DStream filter(func): 返回一个新的DStream,它仅仅包含源DStream中满足函数func的项 repartition(numPartitions):通过创建更多或者更少的partition改变这个DStream的并行级别(level of parallelism) union(otherStream): 返回一个新的DStream,它包含源DStream和otherStream的联合元素 count():通过计算源DStream中每个RDD的元素数量,返回一个包含单元素(single-element)RDDs的新DStream reduce(func):利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素(single-element)RDDs的新DStream。函数应该是相关联的,以使计算可以并行化 countByValue():这个算子应用于元素类型为K的DStream上,返回一个(K,long)对的新DStream,每个键的值是在原DStream的每个RDD中的频率。 join(otherStream,[numTasks]):当应用于两个DStream(一个包含(K,V)对,一个包含(K,W)对),返回一个包含(K, (V, W))对的新DStream transform(func):通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。这个可以在DStream中的任何RDD操作中使用 updateStateByKey(func):利用给定的函数更新DStream的状态,返回一个新"state"的DStream。 重点介绍下面两个算子: updateStateByKey:操作允许不断用新信息更新它的同时保持任意状态。你需要通过两步来使用它 1.定义状态-状态可以是任何的数据类型 2.定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态 示例如下: val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") // Initial state RDD for mapWithState operation val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using mapWithState // This will give a DStream made of state (which is the cumulative count of the words) val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } val stateDstream = wordDstream.mapWithState( StateSpec.function(mappingFunc).initialState(initialRDD)) stateDstream.print() ssc.start() ssc.awaitTermination() } Transform操作: transform 操作(以及它的变化形式如 transformWith )允许在DStream运行任何RDD-to-RDD函数。它能够被用来应用任何没在DStream API中提供的RDD操作(It can be used to apply any RDDoperation that is not exposed in the DStream API)。例如,连接数据流中的每个批(batch)和另外一个数据集的功能并没有在DStream API中提供,然而你可以简单的利用 transform 方法做到。如果你想通过连接带有预先计算的垃圾邮件信息的输入数据流来清理实时数据,然后过了它们,你可以按如下方法来做: val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam informa tion val cleanedDStream = wordCounts.transform(rdd => { rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... })
六、DStreams上的输出操作
输出操作允许DStream的操作推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。目前,定义了下面几种输出操作: print(): 在DStream的每个批数据中打印前10条元素,这个操作在开发和调试中都非常有用。在Python API中调用 pprint() 。 saveAsObjectFiles(prefix,[suffix]): saveAsTextFiles(prefix,[suffix]): 保存DStream的内容为一个文本文件。每一个批间隔的文件的文件名基于 prefix 和 suffix 生成。"prefix-TIME_IN_MS[.suffix]" saveAsHadoopFiles(prefix,[suffix]): 保存DStream的内容为一个hadoop文件。每一个批间隔的文件的文件名基于 prefix 和 suffix 生成。"prefix-TIME_IN_MS[.suffix]",在Python API中不可用。 foreachRDD(func): 在从流中生成的每个RDD上应用函数 func 的最通用的输出操作。这个函数应该推送每个RDD的数据到外部系统,例如保存RDD到文件或者通过网络写到数据库中。需要注意的是, func 函数在驱动程序中执行,并且通常都有RDD action在里面推动RDD流的计算。 利用foreachRDD的设计模式 dstream.foreachRDD是一个强大的原语,发送数据到外部系统中。然而,明白怎样正确地、有效地用这个原语是非常重要的。下面几点介绍了如何避免一般错误。经常写数据到外部系统需要建一个连接对象(例如到远程服务器的TCP连接),用它发送数据到远程系统。为了达到这个目的,开发人员可能不经意的在Spark驱动中创建一个连接对象,但是在Sparkworker中尝试调用这个连接对象保存记录到RDD中,如下: dstream.foreachRDD(rdd => { val connection = createNewConnection() // executed at the driver rdd.foreach(record => { connection.send(record) // executed at the worker }) }) 这是不正确的,因为这需要先序列化连接对象,然后将它从driver发送到worker中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该在worker中初始化)等等。正确的解决办法是在worker中创建连接对象。然而,这会造成另外一个常见的错误-为每一个记录创建了一个连接对象。例如: dstream.foreachRDD(rdd => { rdd.foreach(record => { val connection = createNewConnection() connection.send(record) connection.close() }) }) 通常,创建一个连接对象有资源和时间的开支。因此,为每个记录创建和销毁连接对象会导致非常高的开支,明显的减少系统的整体吞吐量。一个更好的解决办法是利用 rdd.foreachPartition 方法。为RDD的partition创建一个连接对象,用这个两件对象发送partition中的所有记录。 dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() }) }) 这就将连接对象的创建开销分摊到了partition的所有记录上了。最后,可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支。 dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse }) }) 需要注意的是,池中的连接对象应该根据需要延迟创建,并且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。 其它需要注意的地方: 输出操作通过懒执行的方式操作DStreams,正如RDD action通过懒执行的方式操作RDD。具体地看,RDD actions和DStreams输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者用于输出操作 dstream.foreachRDD() ,但是没有任何RDD action操作在dstream.foreachRDD() 里面,那么什么也不会执行。系统仅仅会接收输入,然后丢弃它们。 默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。