spark streaming多个输入DStream并行运行的问题

问题描述

spark官网,多个receive(对应多个输入dstream)并行运行通过下面的代码解决:intnumStreams=5;List<JavaPairDStream<String,String>>kafkaStreams=newArrayList<JavaPairDStream<String,String>>(numStreams);for(inti=0;i<numStreams;i++){kafkaStreams.add(KafkaUtils.createStream(...));}JavaPairDStream<String,String>unifiedStream=streamingContext.union(kafkaStreams.get(0),kafkaStreams.subList(1,kafkaStreams.size()));unifiedStream.print();

我的程序中,使用kafka源,单个输入dstream是没有问题,当采用多个Dstream时,经过测试,两个输入DStream中的数据都接收到了,但问题是:程序只运行一次,或者说只接收一次数据,后面就不再接收了,我的代码如下:StringgroupId=args[0];Stringzookeepers=args[1];Stringtopics="tpsN5a";IntegernumPartitions=Integer.parseInt(args[3]);Map<String,Integer>topicsMap=newHashMap<String,Integer>();for(Stringtopic:topics.split(",")){topicsMap.put(topic,numPartitions);}//多长时间统计一次DurationbatchInterval=Durations.seconds(2);SparkConfsparkConf=newSparkConf().setAppName("JavaKafkaConsumerWordCount");JavaStreamingContextssc=newJavaStreamingContext(sparkConf,batchInterval);JavaPairReceiverInputDStream<String,String>kafkaStream=KafkaUtils.createStream(ssc,zookeepers,groupId,topicsMap,StorageLevel.MEMORY_AND_DISK_SER());Stringtopics2="tpsN5b";Map<String,Integer>topicsMap2=newHashMap<String,Integer>();topicsMap2.put(topics2,numPartitions);JavaPairReceiverInputDStream<String,String>kafkaStream2=KafkaUtils.createStream(ssc,zookeepers,groupId,topicsMap2,StorageLevel.MEMORY_AND_DISK_SER());List<JavaPairDStream<String,String>>kafkaStreams=newArrayList<JavaPairDStream<String,String>>(2);kafkaStreams.add(kafkaStream);kafkaStreams.add(kafkaStream2);ssc.checkpoint("/spark/stream/checkpoint/d1");JavaPairDStream<String,String>unifiedStream=ssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,kafkaStreams.size()));JavaDStream<String>lines=unifiedStream//kafkaStream.map(newFunction<Tuple2<String,String>,String>(){@OverridepublicStringcall(Tuple2<String,String>arg0)throwsException{logger.warn(Thread.currentThread().getName()+"msg1:"+arg0._1+"|msg2:"+arg0._2);returnarg0._2();}});

请教如何解决上面提到的问题,当采用多个输入DStream并行接收数据时,streaming程序能持续接收数据,而不是只接收一次?

解决方案

解决方案二:
我用的是spark1.3.1,使用了预写日志,我的预写日志的接收数据中,能接收到kafka发的消息,但在程序中怎么接收不到呢
解决方案三:
问题解决了吗?我也遇到了类似的问题。

时间: 2024-08-03 17:56:33

spark streaming多个输入DStream并行运行的问题的相关文章

《Spark官方文档》Spark Streaming编程指南(二)

累加器和广播变量 首先需要注意的是,累加器(Accumulators)和广播变量(Broadcast variables)是无法从Spark Streaming的检查点中恢复回来的.所以如果你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,因为这样累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化.代码示例如下: Scala Java Python object WordBlacklist { @volatile private var ins

《Spark大数据分析实战》——3.2节Spark Streaming

3.2 Spark StreamingSpark Streaming是一个批处理的流式计算框架.它的核心执行引擎是Spark,适合处理实时数据与历史数据混合处理的场景,并保证容错性.下面将对Spark Streaming进行详细的介绍.3.2.1 Spark Streaming简介Spark Streaming是构建在Spark上的实时计算框架,扩展了Spark流式大数据处理能力.Spark Streaming将数据流以时间片为单位进行分割形成RDD,使用RDD操作处理每一块数据,每块数据(也就

Kafka+Spark Streaming+Redis实时计算整合实践

基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming.Spark SQL.MLlib.GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑.这也得益于Scala编程语言的简洁性.这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算. 我们的应用场景是分析用户使用手机App的行为,描述如下所示: 手机客户端会收集用户的行为事件(我们以点击

Spark修炼之道(进阶篇)——Spark入门到精通:第十节 Spark Streaming(一)

本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-operations Spark流式计算简介 Spark Streaming相关核心类 入门案例 1. Spark流式计算简介 Hadoop的MapReduce及Spark SQL等只能进行离线计算,无法满足实时性要求较高的业务需求,例如实时推荐.实时网站性能分析等,流式计算可以解决这些问题.目前有三种比较常

如何基于Spark Streaming构建实时计算平台

1.前言 随着互联网技术的迅速发展,用户对于数据处理的时效性.准确性与稳定性要求越来越高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了很多公司一个很大的挑战. 自2015年携程实时计算平台搭建以来,经过两年多不断的技术演进,目前实时集群规模已达上百台,平台涵盖各个SBU与公共部门数百个实时应用,全年JStorm集群稳定性达到100%.目前实时平台主要基于JStorm与Spark Streaming构建而成,相信关注携程实时平台的朋友在去年已经看到一篇关于携程实时平台的分享:

Spark修炼之道(进阶篇)——Spark入门到精通:第十一节 Spark Streaming—— DStream Transformation操作

本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html DStream Transformation操作 1. Transformation操作 Transformation Meaning map(func) 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream. flatMap(func) 与map方法类似,只不过各个输入项可以被输出为零个或多

Spark修炼之道(进阶篇)——Spark入门到精通:第十二节 Spark Streaming—— DStream Window操作

作者:周志湖 微信号:zhouzhihubeyond 本节主要内容 Window Operation 入门案例 1. Window Operation Spark Streaming提供窗口操作(Window Operation),如下图所示: 上图中,红色实线表示窗口当前的滑动位置,虚线表示前一次窗口位置,窗口每滑动一次,落在该窗口中的RDD被一起同时处理,生成一个窗口DStream(windowed DStream),窗口操作需要设置两个参数: (1)窗口长度(window length),

spark streaming问题-六台机器集群,40M数据就报错,spark streaming运行例子程序wordcount

问题描述 六台机器集群,40M数据就报错,spark streaming运行例子程序wordcount 请大神帮忙解决一下:六台机器,SparkStreaming的例子程序,运行在yarn上四个计算节点(nodemanager),每台8G内存,i7处理器,想测测性能. 自己写了socket一直向一个端口发送数据,spark 接收并处理 运行十几分钟汇报错:WARN scheduler TaskSetManagerost task 0.1 in stage 265.0 :java.lang.Exc

大数据-spark streaming运行一段时间报以下异常,怎么解决

问题描述 spark streaming运行一段时间报以下异常,怎么解决 Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1568735.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1568735.0 (TID 11808399, iZ9