问题描述
如何从foreachRDD获取数据?stream.foreachRDD(rdd=>{}
解决方案
解决方案二:
什么叫获取数据?JavaStreamingContextjsc=newJavaStreamingContext(newJavaSparkContext(newSparkConf()),newDuration(1000));JavaDStream<String>dStream=jsc.socketTextStream("localhost",16888);dStream.foreachRDD(newVoidFunction2<JavaRDD<String>,Time>(){@Overridepublicvoidcall(JavaRDD<String>rdd,Timetime)throwsException{rdd.foreach(newVoidFunction<String>(){@Overridepublicvoidcall(Strings)throwsException{System.out.println(System.currentTimeMillis()+":"+s);}});}});jsc.awaitTermination();
解决方案三:
stream是从kafka消费消息,希望stream.foreachRDD(rdd=>{在一个节点获取所有数据,而不是在每个worker中获得各自的数据,比如,以下可以获得count值,stream.foreachRDD(rdd=>{rdd.collect()valcount=rdd.count()print("linecountis"+count)}问题是:如何获得每个消息的内容,而不仅是count值
解决方案四:
我用的是scala,java有点看不懂。
解决方案五:
引用2楼hghdown的回复:
stream是从kafka消费消息,希望stream.foreachRDD(rdd=>{在一个节点获取所有数据,而不是在每个worker中获得各自的数据,比如,以下可以获得count值,stream.foreachRDD(rdd=>{rdd.collect()valcount=rdd.count()print("linecountis"+count)}问题是:如何获得每个消息的内容,而不仅是count值
stream.foreachRDD(rdd=>{rdd.foreach(val=>{println(val)//你要的单个记录})}
解决方案六:
这样只能在worker端操作,无法获得并显示数据,如果是:rdd.collect().foreach(line=>{println(line)})则获得如下的结果:consumer.kafka.MessageAndMetadata@10a794c2
解决方案七:
引用5楼hghdown的回复:
这样只能在worker端操作,无法获得并显示数据,如果是:rdd.collect().foreach(line=>{println(line)})则获得如下的结果:consumer.kafka.MessageAndMetadata@10a794c2
你打开sparkwebui点击你的Application,在上边点击Executor,随便找个Executor的stdout,println的输出就在那里。获得数据?stream.foreachRDD(rdd=>{rdd.foreach(line=>{println(line.topic)println(line.key)println(line.message)//不都在这了么....})})
解决方案八:
很感谢@link0007热心回帖。我这里是采用了一个工具来从kafka消费消息:dibbhatt/kafka-spark-consumerhttps://github.com/dibbhatt/kafka-spark-consumer其用法代码中是这样的:valprops=newjava.util.Properties()kafkaPropertiesforeach{case(key,value)=>props.put(key,value)}valtmp_stream=ReceiverLauncher.launch(ssc,props,numberOfReceivers,StorageLevel.MEMORY_ONLY)tmp_stream.foreachRDD(rdd=>{rdd.collect()//可以得到消息的数目,但不知如何得到数据println("nnNumberofrecordsinthisbatch:"+rdd.count())//这里没有类似line.message的方法})ssc.start()ssc.awaitTermination()采用各种方法得到的数据都是:consumer.kafka.MessageAndMetadata@10a794c2目前我还是不知道如何能或得到数据。这就是我的主要问题。
解决方案九:
rdd.collect()是把消息都取回到driver了。返回的是一个Arrayvaldata=rdd.collect就行了。rdd.count是另外一个action,又触发了一个job,是分布式count