问题描述
spark官网,多个receive(对应多个输入dstream)并行运行通过下面的代码解决:intnumStreams=5;List<JavaPairDStream<String,String>>kafkaStreams=newArrayList<JavaPairDStream<String,String>>(numStreams);for(inti=0;i<numStreams;i++){kafkaStreams.add(KafkaUtils.createStream(...));}JavaPairDStream<String,String>unifiedStream=streamingContext.union(kafkaStreams.get(0),kafkaStreams.subList(1,kafkaStreams.size()));unifiedStream.print();
我的程序中,使用kafka源,单个输入dstream是没有问题,当采用多个Dstream时,经过测试,两个输入DStream中的数据都接收到了,但问题是:程序只运行一次,或者说只接收一次数据,后面就不再接收了,我的代码如下:StringgroupId=args[0];Stringzookeepers=args[1];Stringtopics="tpsN5a";IntegernumPartitions=Integer.parseInt(args[3]);Map<String,Integer>topicsMap=newHashMap<String,Integer>();for(Stringtopic:topics.split(",")){topicsMap.put(topic,numPartitions);}//多长时间统计一次DurationbatchInterval=Durations.seconds(2);SparkConfsparkConf=newSparkConf().setAppName("JavaKafkaConsumerWordCount");JavaStreamingContextssc=newJavaStreamingContext(sparkConf,batchInterval);JavaPairReceiverInputDStream<String,String>kafkaStream=KafkaUtils.createStream(ssc,zookeepers,groupId,topicsMap,StorageLevel.MEMORY_AND_DISK_SER());Stringtopics2="tpsN5b";Map<String,Integer>topicsMap2=newHashMap<String,Integer>();topicsMap2.put(topics2,numPartitions);JavaPairReceiverInputDStream<String,String>kafkaStream2=KafkaUtils.createStream(ssc,zookeepers,groupId,topicsMap2,StorageLevel.MEMORY_AND_DISK_SER());List<JavaPairDStream<String,String>>kafkaStreams=newArrayList<JavaPairDStream<String,String>>(2);kafkaStreams.add(kafkaStream);kafkaStreams.add(kafkaStream2);ssc.checkpoint("/spark/stream/checkpoint/d1");JavaPairDStream<String,String>unifiedStream=ssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,kafkaStreams.size()));JavaDStream<String>lines=unifiedStream//kafkaStream.map(newFunction<Tuple2<String,String>,String>(){@OverridepublicStringcall(Tuple2<String,String>arg0)throwsException{logger.warn(Thread.currentThread().getName()+"msg1:"+arg0._1+"|msg2:"+arg0._2);returnarg0._2();}});
请教如何解决上面提到的问题,当采用多个输入DStream并行接收数据时,streaming程序能持续接收数据,而不是只接收一次?
解决方案
解决方案二:
我用的是spark1.3.1,使用了预写日志,我的预写日志的接收数据中,能接收到kafka发的消息,但在程序中怎么接收不到呢
解决方案三:
问题解决了吗?我也遇到了类似的问题。