SparkStreaming算子reduceByKeyAndWindow的使用

reduceByKeyAndWindow这个算子也是lazy的,它用来计算一个区间里面的数据,如下图:

截图自官网,例如每个方块代表5秒钟,上面的虚线框住的是3个窗口就是15秒钟,这里的15秒钟就是窗口的长度,其中虚线到实线移动了2个方块表示10秒钟,这里的10秒钟就表示每隔10秒计算一次窗口长度的数据

举个例子: 如下图

我是这样理解的:如果这里是使用窗口函数计算wordcount 在第一个窗口(虚线窗口)计算出来(aa, 1)(bb,3)(cc,1)当到达时间10秒后窗口移动到实线窗口,就会计算这个实线窗口中的单词,这里就为(bb,1)(cc,2)(aa,1)

附上程序:

注意:窗口滑动长度和窗口长度一定要是SparkStreaming微批处理时间的整数倍,不然会报错.

package cn.lijie.kafka import cn.lijie.MyLog import org.apache.log4j.Level import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} /** * User: lijie * Date: 2017/8/8 * Time: 14:04 */ object SparkWindowDemo { val myfunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => { it.map(x => { (x._1, x._2.sum + x._3.getOrElse(0)) }) } def main(args: Array[String]): Unit = { MyLog.setLogLeavel(Level.WARN) val conf = new SparkConf().setMaster("local[2]").setAppName("window") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(2)) sc.setCheckpointDir("C:\\Users\\Administrator\\Desktop\\myck01") val ds = ssc.socketTextStream("192.168.80.123", 9999) //Seconds(5)表示窗口的宽度 Seconds(3)表示多久滑动一次(滑动的时间长度) val re = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(10)) // 窗口长度和滑动的长度一致,那么类似于每次计算自己批量的数据,用updateStateByKey也可以累计计算单词的wordcount 这里只是做个是实验 // val re = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(4), Seconds(4)).updateStateByKey(myfunc, new HashPartitioner(sc.defaultParallelism), true) re.print() ssc.start() ssc.awaitTermination() } }

时间: 2025-01-31 03:34:26

SparkStreaming算子reduceByKeyAndWindow的使用的相关文章

SparkStreaming基本概念

一.关联 与Spark类似,Spark Streaming也可以利用maven仓库.编写你自己的Spark Streaming程序,你需要引入下面的依赖到你的SBT或者Maven项目中 org.apache.spark spark-streaming_2.10 1.2 为了从Kafka, Flume和Kinesis这些不在Spark核心API中提供的源获取数据,我们需要添加相关的模块spark-streaming-xyz_2.10 到依赖中 以下为一些常用组件 kafka:spark-strea

link中使用动态算子实现排序的机制是什么,怎么样能优化?

问题描述 link中使用动态算子实现排序的机制是什么,怎么样能优化? link中使用动态算子实现排序的机制是什么,怎么样能优化? 解决方案 使用dynamic其实是运行时反射,要想效率高,用查询表达式,google MakeMemberAccess LINQ

用C语言编写函数计算子字符串substr在主字符串mainstr中的索引值

在大小写敏感的前提下,用C语言编写函数计算子字符串substr在主字符串mainstr中的索引值. 如果substr完全包含在mainstr中,请计算出索引值.否则,返回-1. 具体代码如下: findstr.c /** Author: snowdream <yanghui1986527@gmail.com> Data: 2012.03.05 Description: 假设一个主要字符串"Hello World!",和一个子字符串"World". 在大小

SparkStreaming高级

一.缓存或持久化 和RDD相似,DStreams也允许开发者持久化流数据到内存中.在DStream上使用 persist() 方法可以自动地持久化DStream中的RDD到内存中.如果DStream中的数据需要计算多次,这是非常有用的.像reduceByWindow 和 reduceByKeyAndWindow 这种窗口操作. updateStateByKey 这种基于状态的操作,持久化是默认的,不需要开发者调用 persist() 方法. 例如通过网络(如kafka,flume等)获取的输入数

SparkStreaming实例

SparkStreaming实例 import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} val conf = new SparkConf() //创建了一个具有两个执行线程以及1秒批间隔时间(即以秒为单位分割数据流)的本地StreamingContext val ssc

Flume直接到SparkStreaming的两种方式

Flume直接到SparkStreaming的两种方式,一般是flume->kafka->SparkStreaming,如果非要从Flume直接将数据输送到SparkStreaming里面有两种方式,如下: 第一种:Push推送的方式 程序如下: package cn.lijie import org.apache.log4j.Level import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.str

遗传算法中几种不同选择算子及Python实现

前言 本文对遗传算法中的几种选择策略进行了总结, 其中包括: Proportionate Roulette Wheel Selection Linear Ranking Selection Exponential Ranking Selection Tournament Selection 对于每种选择策略我都使用Python进行了相应的实现并以内置插件的形式整合进了本人所写的遗传算法框架GAFT中.对需要使用遗传算法优化问题以及学习遗传算法的童鞋可以作为参考. 项目链接: GitHub: ht

matlab源代码-我需要图像角点定位算子MATLAB代码或圆点定位算子MATLAB代码

问题描述 我需要图像角点定位算子MATLAB代码或圆点定位算子MATLAB代码 定位算子可以提高图像角点或圆点的精度,比如:forstnder定位算子,Wong-Trinder算子,Trinder的改进算子,高精度角点与直线定位算子等等 解决方案 harrir算子 这种不是百度一搜一大堆的么 解决方案二: Wong-Trinder圆点定位算子就这几个公式,求编程MATLAB 解决方案三: matlab图像处理类书籍上都有代码的

c++-opencv怎么定义一个模板作用于每一个像素值,比如log算子的模板?

问题描述 opencv怎么定义一个模板作用于每一个像素值,比如log算子的模板? 想定义一个模板,opencv中应该有专门的函数吧?可是我没找到,新人真心求教! 解决方案 http://blog.sina.com.cn/s/blog_7155fb1a0100wzkz.html 解决方案二: http://blog.csdn.net/xiaowei_cqu/article/details/7718831 Filter2D 对图像做卷积 void cvFilter2D( const CvArr* s