spark 如何从foreachRDD 获取数据 ?

问题描述

如何从foreachRDD获取数据?stream.foreachRDD(rdd=>{}

解决方案

解决方案二:
什么叫获取数据?JavaStreamingContextjsc=newJavaStreamingContext(newJavaSparkContext(newSparkConf()),newDuration(1000));JavaDStream<String>dStream=jsc.socketTextStream("localhost",16888);dStream.foreachRDD(newVoidFunction2<JavaRDD<String>,Time>(){@Overridepublicvoidcall(JavaRDD<String>rdd,Timetime)throwsException{rdd.foreach(newVoidFunction<String>(){@Overridepublicvoidcall(Strings)throwsException{System.out.println(System.currentTimeMillis()+":"+s);}});}});jsc.awaitTermination();

解决方案三:
stream是从kafka消费消息,希望stream.foreachRDD(rdd=>{在一个节点获取所有数据,而不是在每个worker中获得各自的数据,比如,以下可以获得count值,stream.foreachRDD(rdd=>{rdd.collect()valcount=rdd.count()print("linecountis"+count)}问题是:如何获得每个消息的内容,而不仅是count值
解决方案四:
我用的是scala,java有点看不懂。
解决方案五:
引用2楼hghdown的回复:

stream是从kafka消费消息,希望stream.foreachRDD(rdd=>{在一个节点获取所有数据,而不是在每个worker中获得各自的数据,比如,以下可以获得count值,stream.foreachRDD(rdd=>{rdd.collect()valcount=rdd.count()print("linecountis"+count)}问题是:如何获得每个消息的内容,而不仅是count值

stream.foreachRDD(rdd=>{rdd.foreach(val=>{println(val)//你要的单个记录})}

解决方案六:
这样只能在worker端操作,无法获得并显示数据,如果是:rdd.collect().foreach(line=>{println(line)})则获得如下的结果:consumer.kafka.MessageAndMetadata@10a794c2
解决方案七:
引用5楼hghdown的回复:

这样只能在worker端操作,无法获得并显示数据,如果是:rdd.collect().foreach(line=>{println(line)})则获得如下的结果:consumer.kafka.MessageAndMetadata@10a794c2

你打开sparkwebui点击你的Application,在上边点击Executor,随便找个Executor的stdout,println的输出就在那里。获得数据?stream.foreachRDD(rdd=>{rdd.foreach(line=>{println(line.topic)println(line.key)println(line.message)//不都在这了么....})})

解决方案八:
很感谢@link0007热心回帖。我这里是采用了一个工具来从kafka消费消息:dibbhatt/kafka-spark-consumerhttps://github.com/dibbhatt/kafka-spark-consumer其用法代码中是这样的:valprops=newjava.util.Properties()kafkaPropertiesforeach{case(key,value)=>props.put(key,value)}valtmp_stream=ReceiverLauncher.launch(ssc,props,numberOfReceivers,StorageLevel.MEMORY_ONLY)tmp_stream.foreachRDD(rdd=>{rdd.collect()//可以得到消息的数目,但不知如何得到数据println("nnNumberofrecordsinthisbatch:"+rdd.count())//这里没有类似line.message的方法})ssc.start()ssc.awaitTermination()采用各种方法得到的数据都是:consumer.kafka.MessageAndMetadata@10a794c2目前我还是不知道如何能或得到数据。这就是我的主要问题。
解决方案九:
rdd.collect()是把消息都取回到driver了。返回的是一个Arrayvaldata=rdd.collect就行了。rdd.count是另外一个action,又触发了一个job,是分布式count

时间: 2024-10-24 15:58:33

spark 如何从foreachRDD 获取数据 ?的相关文章

如何设计基于Hadoop、Spark、Storm的大数据风控架构?

量化派是一家金融大数据公司,为金融机构提供数据服务和技术支持,也通过旗下产品"信用钱包"帮助个人用户展示经济财务等状况,撮合金融机构为用户提供最优质的贷款服务.金融的本质是风险和流动性,但是目前中国对于个人方面的征信行业发展落后于欧美国家,个人消费金融的需求没有得到很好的满足.按照央行最新数据,目前央行征信中心的数据覆盖人口达到8亿人[1],但其中有实际征信记录的只有3亿人左右,有5亿人在征信系统中只是一个身份证号码.此外,我国还有5亿人跟银行从来没有信贷交易关系,这5亿人对金融部门来

使用AJAX异步获取数据

ajax|数据|异步 [导读]本文给出一个例子(使用AJAX异步获取数据),介绍如何去使用AJAX. AJAX这个名字看起来很神奇,我第一次见到它也被它吸引了,它是Asynchronous JavaScript and XML的简写,异步的JAVASCRIPT和XML,关于AJAX的介绍在网上的介绍太多了,我就不多那么多了,我的口才不好,没他们说的那么精彩,可以去http://zh.wikipedia.org/wiki/AJAX 看看,在这里我只是给大家一个例子,了解如何去使用AJAX. AJA

通过编程方式在InfoPath 2010表单的下拉框修改事件中获取数据

一个简单的例子,包括一段简短的代码,在InfoPath 2010表单中下拉框的修 改事件里从一个数据源获取数据. 表单 样例表单本身很简单,只有两个控件,如下图所示: 在mydropdown下拉框属性中,添加几个值,为将要获取的SharePoint列表中 已有的几个列表项的ID.

Hibernate获取数据方式与缓存使用

Hibernate获取数据的方式有不同的几种,其与缓存结合使用的效果也不尽相同,而Hibernate中具体 怎么使用缓存其实是我们很关心的一个问题,直接涉及到性能方面. 缓存在Hibernate中主要有三个方面:一级缓存.二级缓存和查询缓存:一级缓存在Hibernate中对应 的即为session范围的缓存,也就是当session关闭时缓存即被清除,一级缓存在Hibernate中是不可配置 的部分:二级缓存在Hibernate中对应的即为SessionFactory范围的缓存,通常来讲Sessi

使用MS Enterprise Library的DAAB获取数据时抛出异常

开门见山,使用MS Enterprise Library的DAAB(Data Access Application Block)获取 数据时抛出异常.具体场景如下,通过Database对象的ExecuteReader执行两段Select语句, 前一句是不合法的,后一句是正确的.为了避免第一次执行出错导致程序的终止,特意将其 放到Try/Catch酷快中.两次数据库操作通过TrsanctionScope的形式纳入同一个Transaction 中,具体的代码如下所示. class Program {

Android从服务器端获取数据的几种方法

  在android中有时候我们不需要用到本机的SQLite数据库提供数据,更多的时候是从网络上获取数据,那么Android怎么从服务器端获取数据呢?有很多种,归纳起来有 一:基于Http协议获取数据方法.二:基于SAOP协议获取数据方法,三:忘了------- 那么我们的这篇文章主要是将关于使用Http协议获取服务器端数据,这里我们采取的服务器端技术为java,框架为Struts2,或者可以有Servlet,又或者可直接从JSP页面中获取数据. 那么,接下来我们便开始这一路程: 首先:编写服务

jQuery DataTables插件从服务器端获取数据的方法

sAjaxSource参数,值是url.table会发送ajax请求,从服务器端获取数据.服务器端返回的数据应该是一个可以被转换成JSON对象的JSON字符串.这个字符串必须严格符合JSON格式的要求.否则会出错.该数据对象该对象的key应该是"aaData",例如: Js代码: { "aaData": { "columnA":"valueA", "columnB":"valueB",

解决php使用异步调用获取数据时出现(错误c00ce56e导致此项操作无法完成)

本篇文章是对php中使用异步调用获取数据时出现(由于出现错误c00ce56e而导致此项操作无法完成)的解决方法进行了详细的分析介绍,需要的朋友参考下   [详细错误]:由于出现错误 c00ce56e 而导致此项操作无法完成 [造成原因]:未指定输出编码格式. [解决办法]:句首加入header("content-type:text/html; charset=UTF-8");  

ttp onnection问题-关于httpURLConnnection 是否可以阻塞式的获取数据

问题描述 关于httpURLConnnection 是否可以阻塞式的获取数据 用httpconnection做获取数据的操作,在建立了connection实体后,直接调用getResponseCode(),然后直接返回流读写异常.我设置了5秒超时,但是根本就没有执行5秒. 类似于httpClient的话,他的execute可以阻塞,我设置5秒超时,他会等5秒,没获取到数据然后抛异常.我想知道httpconnection能否类似的做到.貌似connect()方法是会阻塞,但是后面加上getResp