SparkStreaming基本概念

一、关联

与Spark类似,Spark Streaming也可以利用maven仓库。编写你自己的Spark Streaming程序,你需要引入下面的依赖到你的SBT或者Maven项目中 org.apache.spark spark-streaming_2.10 1.2 为了从Kafka, Flume和Kinesis这些不在Spark核心API中提供的源获取数据,我们需要添加相关的模块spark-streaming-xyz_2.10 到依赖中 以下为一些常用组件 kafka:spark-streaming-kafka_2.10 flume:spark-streaming-flume_2.10 Kinesis:spark-streaming-kinesis-asl_2.10 Twitter:spark-streaming-twitter_2.10 ZeroMQ:spark-streaming-zeromq_2.10 MQTT:spark-streaming-mqtt_2.10

二、初始化StreamingContext

为了初始化Spark Streaming程序,一个StreamingContext对象必需被创建,它是Spark Streaming所有流操作的主要入口。一个StreamingContext对象可以用SparkConf对象创建。 import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) appName 表示你的应用程序显示在集群UI上的名字, master 是一个Spark、Mesos、YARN集群URL或者一个特殊字符串“local[*]”,它表示程序用本地模式运行。当程序运行在集群中时,你并不希望在程序中硬编码 master ,而是希望用 spark-submit启动应用程序,并从 spark-submit中得到 master 的值。对于本地测试或者单元测试,你可以传递“local”字符串在同一个进程内运行Spark Streaming。需要注意的是,它在内部创建了一个SparkContext对象,你可以通过 ssc.sparkContext 访问这个SparkContext对象。 当一个上下文(context)定义之后,你必须按照以下几步进行操作: 1.定义输入源 2.准备好流计算指令 3.利用 streamingContext.start() 方法接收和处理数据 4.处理过程将一直持续,直到 streamingContext.stop() 方法被调用 需要注意的地方: 1.一旦一个context已经启动,就不能有新的流算子建立或者是添加到context中。 2.一旦一个context已经停止,它就不能再重新启动 3.在JVM中,同一时间只能有一个StreamingContext处于活跃状态 4.在StreamingContext上调用 stop() 方法,也会关闭SparkContext对象。如果只想仅关闭 5.StreamingContext对象,设置 stop() 的可选参数为false 6.一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的 7.StreamingContext在后面StreamingContext创建之前关闭(不关闭SparkContext)。

三、离散流(DStreams)

离散流或者DStreams是Spark Streaming提供的基本的抽象,它代表一个连续的数据流。它要么是从源中获取的输入流,要么是输入流通过转换算子生成的处理后的数据流。在内部,DStreams由一系列连续的RDD组成。DStreams中的每个RDD都包含确定时间间隔内的数据。 任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作。

四、输入DStreams和receivers

输入DStreams表示从数据源获取输入数据流的DStreams。在SparkStreaming快速例子中, lines 表示输入DStream,它代表从netcat服务器获取的数据流。每一个输入流DStream和一个 Receiver 对象相关联,这个 Receiver从源中获取数据,并将数据存入内存中用于处理。 输入DStreams表示从数据源获取的原始数据流。Spark Streaming拥有两类数据源: 基本源(Basic sources):这些源在StreamingContext API中直接可用。例如文件系统、套接字连接、Akka的actor等。 高级源(Advanced sources):这些源包括Kafka,Flume,Kinesis,Twitter等等。它们需要通过额外的类来使用。 需要注意的是,如果你想在一个流应用中并行地创建多个输入DStream来接收多个数据流,你能够创建多个输入流。它将创建多个Receiver同时接收多个数据流。但是, receiver作为一个长期运行的任务运行在Spark worker或executor中。因此,它占有一个核,这个核是分配给Spark Streaming应用程序的所有核中的一个(it occupies one of the cores allocated to the SparkStreaming application)。所以,为Spark Streaming应用程序分配足够的核(如果是本地运行,那么是线程)用以处理接收的数据并且运行 receiver 是非常重要的。 几点需要注意的地方: 如果分配给应用程序的核的数量少于或者等于输入DStreams或者receivers的数量,系统只能够接收数据而不能处理它们。 当运行在本地,如果你的master URL被设置成了“local”,这样就只有一个核运行任务。这对程序来说是不足的,因为作为 receiver 的输入DStream将会占用这个核,这样就没有剩余的核来处理数据了。 基本源: 文件流(File Streams):从任何与HDFS API兼容的文件系统中读取数据,一个DStream可以通过如下方式创建。 需要注意的地方: 1.所有文件必须具有相同的数据格式 2.所有文件必须在`dataDirectory`目录下创建,文件是自动的移动和重命名到数据目录下 3.一旦移动,文件必须被修改。所以如果文件被持续的附加数据,新的数据不会被读取。 基于自定义actor的流: DStream可以调streamingContext.actorStream(actorProps, actor-name) 方法从Akka actors获取的数据流来创建。 RDD队列作为数据流: 为了用测试数据测试Spark Streaming应用程序,人们也可以调用streamingContext.queueStream(queueOfRDDs) 方法基于RDD队列创建DStreams。每个push到队列的RDD都被当做DStream的批数据,像流一样处理。 高级源: 这类源需要非Spark库接口,并且它们中的部分还需要复杂的依赖(例如kafka和flume)。 自定义源: 在Spark 1.2中,这些源不被Python API支持。输入DStream也可以通过自定义源创建,你需要做的是实现用户自定义的 receiver ,这个 receiver 可以从自定义源接收数据以及将数据推到Spark中。 Receiver可靠性 基于可靠性有两类数据源。源(如kafka、flume)允许。如果从这些可靠的源获取数据的系统能够正确的应答所接收的数据,它就能够确保在任何情况下不丢失数据。这样,就有两种类型的receiver: Reliable Receiver:一个可靠的receiver正确的应答一个可靠的源,数据已经收到并且被正确地复制到了Spark中。 Unreliable Receiver :这些receivers不支持应答。即使对于一个可靠的源,开发者可能实现一个非可靠的receiver,这个receiver不会正确应答。

五、DStream中的转换(transformation)

和RDD类似,transformation允许从输入DStream来的数据被修改。DStreams支持很多在RDD中可用的transformation算子。一些常用的算子如下所示: map(func): 利用函数 func 处理原DStream的每个元素,返回一个新的DStream filter(func): 返回一个新的DStream,它仅仅包含源DStream中满足函数func的项 repartition(numPartitions):通过创建更多或者更少的partition改变这个DStream的并行级别(level of parallelism) union(otherStream): 返回一个新的DStream,它包含源DStream和otherStream的联合元素 count():通过计算源DStream中每个RDD的元素数量,返回一个包含单元素(single-element)RDDs的新DStream reduce(func):利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素(single-element)RDDs的新DStream。函数应该是相关联的,以使计算可以并行化 countByValue():这个算子应用于元素类型为K的DStream上,返回一个(K,long)对的新DStream,每个键的值是在原DStream的每个RDD中的频率。 join(otherStream,[numTasks]):当应用于两个DStream(一个包含(K,V)对,一个包含(K,W)对),返回一个包含(K, (V, W))对的新DStream transform(func):通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。这个可以在DStream中的任何RDD操作中使用 updateStateByKey(func):利用给定的函数更新DStream的状态,返回一个新"state"的DStream。 重点介绍下面两个算子: updateStateByKey:操作允许不断用新信息更新它的同时保持任意状态。你需要通过两步来使用它 1.定义状态-状态可以是任何的数据类型 2.定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态 示例如下: val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") // Initial state RDD for mapWithState operation val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using mapWithState // This will give a DStream made of state (which is the cumulative count of the words) val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } val stateDstream = wordDstream.mapWithState( StateSpec.function(mappingFunc).initialState(initialRDD)) stateDstream.print() ssc.start() ssc.awaitTermination() } Transform操作: transform 操作(以及它的变化形式如 transformWith )允许在DStream运行任何RDD-to-RDD函数。它能够被用来应用任何没在DStream API中提供的RDD操作(It can be used to apply any RDDoperation that is not exposed in the DStream API)。例如,连接数据流中的每个批(batch)和另外一个数据集的功能并没有在DStream API中提供,然而你可以简单的利用 transform 方法做到。如果你想通过连接带有预先计算的垃圾邮件信息的输入数据流来清理实时数据,然后过了它们,你可以按如下方法来做: val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam informa tion val cleanedDStream = wordCounts.transform(rdd => { rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... })

六、DStreams上的输出操作

输出操作允许DStream的操作推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。目前,定义了下面几种输出操作: print(): 在DStream的每个批数据中打印前10条元素,这个操作在开发和调试中都非常有用。在Python API中调用 pprint() 。 saveAsObjectFiles(prefix,[suffix]): saveAsTextFiles(prefix,[suffix]): 保存DStream的内容为一个文本文件。每一个批间隔的文件的文件名基于 prefix 和 suffix 生成。"prefix-TIME_IN_MS[.suffix]" saveAsHadoopFiles(prefix,[suffix]): 保存DStream的内容为一个hadoop文件。每一个批间隔的文件的文件名基于 prefix 和 suffix 生成。"prefix-TIME_IN_MS[.suffix]",在Python API中不可用。 foreachRDD(func): 在从流中生成的每个RDD上应用函数 func 的最通用的输出操作。这个函数应该推送每个RDD的数据到外部系统,例如保存RDD到文件或者通过网络写到数据库中。需要注意的是, func 函数在驱动程序中执行,并且通常都有RDD action在里面推动RDD流的计算。 利用foreachRDD的设计模式 dstream.foreachRDD是一个强大的原语,发送数据到外部系统中。然而,明白怎样正确地、有效地用这个原语是非常重要的。下面几点介绍了如何避免一般错误。经常写数据到外部系统需要建一个连接对象(例如到远程服务器的TCP连接),用它发送数据到远程系统。为了达到这个目的,开发人员可能不经意的在Spark驱动中创建一个连接对象,但是在Sparkworker中尝试调用这个连接对象保存记录到RDD中,如下: dstream.foreachRDD(rdd => { val connection = createNewConnection() // executed at the driver rdd.foreach(record => { connection.send(record) // executed at the worker }) }) 这是不正确的,因为这需要先序列化连接对象,然后将它从driver发送到worker中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该在worker中初始化)等等。正确的解决办法是在worker中创建连接对象。然而,这会造成另外一个常见的错误-为每一个记录创建了一个连接对象。例如: dstream.foreachRDD(rdd => { rdd.foreach(record => { val connection = createNewConnection() connection.send(record) connection.close() }) }) 通常,创建一个连接对象有资源和时间的开支。因此,为每个记录创建和销毁连接对象会导致非常高的开支,明显的减少系统的整体吞吐量。一个更好的解决办法是利用 rdd.foreachPartition 方法。为RDD的partition创建一个连接对象,用这个两件对象发送partition中的所有记录。 dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() }) }) 这就将连接对象的创建开销分摊到了partition的所有记录上了。最后,可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支。 dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse }) }) 需要注意的是,池中的连接对象应该根据需要延迟创建,并且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。 其它需要注意的地方: 输出操作通过懒执行的方式操作DStreams,正如RDD action通过懒执行的方式操作RDD。具体地看,RDD actions和DStreams输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者用于输出操作 dstream.foreachRDD() ,但是没有任何RDD action操作在dstream.foreachRDD() 里面,那么什么也不会执行。系统仅仅会接收输入,然后丢弃它们。 默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。

时间: 2024-11-18 01:42:55

SparkStreaming基本概念的相关文章

Java新手入门教程:新手必须掌握的30条Java基本概念

  Java新手必看教程是什么?当然是绿茶小编带来的Java入门需掌握的30个基本概念啦,掌握了这些概念对于学习Java大大有利,正在学习Java编程的同学们快来看看吧. 1.OOP中唯一关系的是对象的接口是什么,就像计算机的销售商她不管电源内部结构 是怎样的,他只关系能否给你提供电就行了,也就是只要知道can or not而不是how and why.所有的程序是由一定的属性和行为对象组成的,不同的对象的访问通过函数调用来完成,对象间所有的交流都是通过方法调用,通过对封装对象数据,很大 限度上

.NET中的六个重要概念:栈、堆、值类型、引用类型、装箱和拆箱

内容导读 •概述 •当你声明一个变量背后发生了什么? •堆和栈 •值类型和引用类型 •哪些是值类型,哪些是引用类型? •装箱和拆箱 •装箱和拆箱的性能问题一.概述 本文会阐述六个重要的概念:堆.栈.值类型.引用类型.装箱和拆箱.本文首先会通过阐述当你定义一个变量之后系统内部发生的改变开始讲解,然后将关注点转移到存储双雄:堆和栈.之后,我们会探讨一下值类型和引用类型,并对有关于这两种类型的重要基础内容做一个讲解. 本文会通过一个简单的代码来展示在装箱和拆箱过程中所带来的性能上的影响,请各位仔细阅读

一个例子与InnoDB索引的几个概念

1.一个简单的sql语句问题     假设当前我们有一个表记录用户信息,结构如下:     a)      表结构 CREATE TABLE `u` (   `id` int(11) NOT NULL DEFAULT '0′,   `regdate` int(1) unsigned,   -..   PRIMARY KEY (`id`),   KEY `regdate` (`regdate`) ) ENGINE=InnoDB DEFAULT CHARSET=gbk 说明:1) 由于需要按照注册时

iphone绘图的几个基本概念CGPoint、CGSize、CGRect、CGRectMake、window(窗口)、视图(view)

我一般情况下不会使用interface builder去画界面,而是用纯代码去创建界面,不是装B,而是刚从vi转到xcode不久,不太习惯interface builder而已.当然如果需要我也会使用它.一个东西的存在没有绝对的好与坏,只是存在时间与空间决定了它的价值. (忘了讲了,我的环境是xcode4.2) 首先要弄懂几个基本的概念.   一)三个结构体:CGPoint.CGSize.CGRect 1.  CGPoint   [plain] view plaincopy   /* Point

Cgroup基础概念

What is Cgroup? Cgroups 是 control groups 的缩写,是 Linux 内核提供的一种可以限制.记录.隔离进程组(process groups)所使用的物理资源(如:cpu,memory,IO等等)的机制.最初由 google 的工程师提出,后来被整合进 Linux 内核.Cgroups 也是 LXC 为实现虚拟化所使用的资源管理手段,可以说没有 cgroups 就没有 LXC. 摘自Linux Cgroups详解(王喆锋) What Cgroup can do

mysql实例-mysql里实例的概念是什么意思?

问题描述 mysql里实例的概念是什么意思? 我是一个mysql初学者,在mysql中我们通常会遇到"实例"这个概念,我想知道"实例"的定义是什么?在mysql的英语文档中,""实例""的英语单词是什么? 解决方案 应该对应于instance.就是一个操作的数据库的代称. 解决方案二: 指多个mysql进程

JSON Schema 那些事儿:基本概念

引子 在早期的淘宝 TMS 页面搭建系统中,为了解决页面模板和数据的分离问题,机智的先知们扩充了一系列灵活的 PHP 标签函数,得以将数据的定义从模板视图中解耦出来.以其中一个最为常用的函数为例: _tms_custom('{"name":"TextLinks","title":"文字链接","group":"文字链接","row":"10",&q

线框原型的概念和本质:将网站架构形象化

文章描述:线框原型(线框图)的本质及实践应用概述. 家里小猫生病,从周二开始一直折腾到现在,仍在治疗与观察中.几年来经历过几次这样的状况,虽然每次都会恢复健康平安,但一旦再次置身这样的过程里,怎样也无法停止焦虑与担心.除了尽心尽力以外,能做的只有不断祈祷,期盼着一切安好平安的状态早日回来. 篇幅不长的一篇文章,也因为这样的状况而拖沓了多日:一方面没时间,一方面没有任何心力的感觉.不多说了,我们来看今次的译文. 如今的设计圈子里,线框原型(线框图)这个词正在越来越多的被提起.过去几年中,在软件和W

基于Delphi的组件设计之概念

作为组件制作的开始,应该了解一些概念,我以为这些概念是非常重要的,将可以作为以后实践的理论基础. 一.组件的简要层次结构 一般情况下,VCL的组件可以从Tcomponent为开始.其最明显的特征就是它的属性可以在设计时通过对象察看器来操纵,另外,他还能拥有其他组件. 从Tcomponent下,分出非可视组件和可视组件. 非可视组件如TOPenDialog,TTimer,TTable等,这些组件因为继承自Tomponent,所以也就继承了在设计时可以被操纵的特性. 可视化组件始自TControl,