问题描述
Spark监视UI页面上,出现好多SQLXXX,这个正常吗?请各位帮我看看,谢谢了!下面是我的代码,就是从Kafka里拉取数据,然后转换成DateFrame后存储到elasticsearch中,似乎只要是进入foreachRDD里面一回,就会在SparkUI页面上产生一个SQL监视对象,是不是我代码写法有问题vallogs=KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet).map(_._2)/**schema初期化*/valhost=StructField("host",StringType,true)valhandle=StructField("handle",StringType,true)valhost_handle=StructField("host_handle",StringType,true)valtimestemp=StructField("timestemp",StringType,true)valtime=StructField("time",IntegerType,true)valfromip=StructField("fromip",StringType,true)valschema=StructType(Array(host,handle,host_handle,timestemp,time,fromip))logs.foreachRDD{rdd=>/**SQLContext初期化*/valsqlContext=newSQLContext(rdd.sparkContext)///*//*DataFrame作成//*/if(!rdd.partitions.isEmpty){valrowRDD=rdd.map(_.split("")).map(p=>Row(p(1),p(8).substring(1,p(8).length()),p(1)+"_"+p(8).substring(1,p(8).length()),stringTodate(p(5).substring(1,p(5).length()))+p(6).substring(0,p(6).length()-1),p(20).toInt,p(2).substring(1,p(2).length())))sqlContext.createDataFrame(rowRDD,schema).saveToEs(esResource)}sqlContext.clearCache()}ssc.start()ssc.awaitTermination()
解决方案
本帖最后由 lucky8251 于 2016-04-14 21:47:01 编辑
解决方案二:
有人知道吗?
解决方案三:
logs.foreachRDD{rdd=>/**SQLContext初期化*/valsqlContext=newSQLContext(rdd.sparkContext)///*//*DataFrame作成//*/if(!rdd.partitions.isEmpty){valrowRDD=rdd.map(_.split("")).map(p=>Row(p(1),p(8).substring(1,p(8).length()),p(1)+"_"+p(8).substring(1,p(8).length()),stringTodate(p(5).substring(1,p(5).length()))+p(6).substring(0,p(6).length()-1),p(20).toInt,p(2).substring(1,p(2).length())))sqlContext.createDataFrame(rowRDD,schema).saveToEs(esResource)}你每个rdd都给创建一个SQLContext当然机webUI上面会有很多的sqltab了。你用一个单例的SQLContex就可以了。