为什么spark streaming的输出只有中断程序后才输出?

问题描述

我运行spark自带的Wordcount例子,为什么只有在CTRL+C后,才会在命令行输出结果?即使把print()改成saveastextfile,也是在中断程序后才写文件。请教各位这是什么原因?代码如下:objectKafkaWordCount{defmain(args:Array[String]){if(args.length<4){System.err.println("Usage:KafkaWordCount<zkQuorum><group><topics><numThreads>")System.exit(1)}StreamingExamples.setStreamingLogLevels()valArray(zkQuorum,group,topics,numThreads)=argsvalsparkConf=newSparkConf().setAppName("KafkaWordCount")valssc=newStreamingContext(sparkConf,Seconds(2))ssc.checkpoint("checkpoint")valtopicMap=topics.split(",").map((_,numThreads.toInt)).toMapvallines=KafkaUtils.createStream(ssc,zkQuorum,group,topicMap).map(_._2)valwords=lines.flatMap(_.split(""))valwordCounts=words.map(x=>(x,1L)).reduceByKeyAndWindow(_+_,_-_,Minutes(10),Seconds(2),2)wordCounts.print()ssc.start()ssc.awaitTermination()}}

时间: 2024-10-28 04:24:15

为什么spark streaming的输出只有中断程序后才输出?的相关文章

调试程序时在不中断程序的情况下输出函数调用信息(Mac OS,Linux &amp;amp; Windows)

转载请注明出处:http://blog.csdn.net/horkychen 有时在查找问题时,不想中断程序运行就输出一下某个条件下的函数执行顺序可以帮助定位问题. 在Xcode下可以编辑断点设置中的Action设为Debugger Command, 如果你使用GDB作为调试器(项目设置),然后输入backtrace如下: 记得勾选"Automatically continue after evaluating", 这样程序就不会停在这个断点,而是继续执行下去. 运行结果: #0  a

Spark入门:Spark Streaming 概览

概览 Spark Streaming是Spark API的一个可横向扩容,高吞吐量,容错的实时数据流处理引擎,Spark能够从Kafka.Flume.Kinesis或者TCP等等输入获取数据,然后能够使用复杂的计算表达式如map,reduce,join和window对数据进行计算.计算完后的数据能够被推送到文件系统,数据库,和实时的仪表盘.另外,你也可以使用Spark ML和图计算处理实时数据流. Spark Streaming接受到了实时数据后,把它们分批进行切割,然后再交给Spark进行数据

spark streaming问题-六台机器集群,40M数据就报错,spark streaming运行例子程序wordcount

问题描述 六台机器集群,40M数据就报错,spark streaming运行例子程序wordcount 请大神帮忙解决一下:六台机器,SparkStreaming的例子程序,运行在yarn上四个计算节点(nodemanager),每台8G内存,i7处理器,想测测性能. 自己写了socket一直向一个端口发送数据,spark 接收并处理 运行十几分钟汇报错:WARN scheduler TaskSetManagerost task 0.1 in stage 265.0 :java.lang.Exc

Kafka+Spark Streaming+Redis实时计算整合实践

基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming.Spark SQL.MLlib.GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑.这也得益于Scala编程语言的简洁性.这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算. 我们的应用场景是分析用户使用手机App的行为,描述如下所示: 手机客户端会收集用户的行为事件(我们以点击

《Spark官方文档》Spark Streaming编程指南(一)

Spark Streaming编程指南 概览   Spark Streaming是对核心Spark API的一个扩展,它能够实现对实时数据流的流式处理,并具有很好的可扩展性.高吞吐量和容错性.Spark Streaming支持从多种数据源提取数据,如:Kafka.Flume.Twitter.ZeroMQ.Kinesis以及TCP套接字,并且可以提供一些高级API来表达复杂的处理算法,如:map.reduce.join和window等.最后,Spark Streaming支持将处理完的数据推送到文

《Spark官方文档》Spark Streaming编程指南(二)

累加器和广播变量 首先需要注意的是,累加器(Accumulators)和广播变量(Broadcast variables)是无法从Spark Streaming的检查点中恢复回来的.所以如果你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,因为这样累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化.代码示例如下: Scala Java Python object WordBlacklist { @volatile private var ins

在spark streaming中实时更新mllib的ALS算法的模型遇到的问题!

问题描述 在spark streaming中实时更新mllib的ALS算法的模型遇到的问题! 在spark streaming中使用ALS算法,实现模型的实时更新有人了解吗? 总是出ERROR [dag-scheduler-event-loop] scheduler.DAGSchedulerEventProcessLoop (Logging.scala:logError(96)) - DAGSchedulerEventProcessLoop failed; shutting down Spark

Spark Streaming + Spark SQL 实现配置化ETL流程

项目地址 前言 传统的Spark Streaming程序需要: 构建StreamingContext 设置checkpoint 链接数据源 各种transform foreachRDD 输出 通常而言,你可能会因为要走完上面的流程而构建了一个很大的程序,比如一个main方法里上百行代码,虽然在开发小功能上足够便利,但是复用度更方面是不够的,而且不利于协作,所以需要一个更高层的开发包提供支持. 如何开发一个Spark Streaming程序 我只要在配置文件添加如下一个job配置,就可以作为标准的

利用Spark Streaming实现分布式采集系统

前言 前两天我刚在自己的一篇文章中鼓吹数据天生就是流式的,并且指出: 批量计算已经在慢慢退化,未来必然是属于流式计算的,数据的流动必定是由数据自己驱动流转的. 而Spark Streaming 在上层概念上,完美融合了批量计算和流式计算,让他们你中有我,我中有你,这种设计使得Spark Streaming 作为流式计算的一个载体,同时也能作为其他一些需要分布式架构的问题提供解决方案. Spark Streaming 作为一些分布式任务系统基础的优势 天然就是分布式的,不用再为实现分布式协调而蛋疼