问题描述
大家好,本人现在需要使用sparkstreaming来导入数据,清洗后写入oracle数据库,本人在streaming与数据库连接问题上遇到问题,希望大家帮我检查下错误,问题出现在下面的代码中,本人想从dstream得到的数据插入到oracle数据库中,但wordCounts.foreachRDD(rdd=>{出现编译错误,希望各位帮我检查错误并指正,多谢啦wordCounts.foreachRDD(rdd=>{ps=conn.prepareStatement("insertintotesttablesvalues(?,?)")ps.setString(1,rdd._1)ps.setInt(2,rdd._2)ps.executeUpdate()})所有的代码如下valsparkConf=newSparkConf().setAppName("TextStream")valsc=newSparkContext(sparkConf)Class.forName("oracle.jdbc.driver.OracleDriver").newInstance()valconn=DriverManager.getConnection("jdbc:oracle:thin:@host/oracledb","user","password")varps:PreparedStatement=nullvalssc=newStreamingContext(sparkConf,Seconds(6))//CreatetheFileInputDStreamonthedirectoryandusethevallines=ssc.textFileStream(hdfsHost+"/user/input/")valwords=lines.flatMap(_.split(","))valwordCounts=words.map(x=>(x,1))wordCounts.print()wordCounts.foreachRDD(rdd=>{ps=conn.prepareStatement("insertintotesttablesvalues(?,?)") ps.setString(1,rdd._1) ps.setInt(2,rdd._2) ps.executeUpdate()})
解决方案
解决方案二:
你至少把什么编译错误贴出来吧
解决方案三:
引用1楼suicidedamsel的回复:
你至少把什么编译错误贴出来吧
解决方案四:
你至少把什么编译错误贴出来吧
解决方案五:
谢谢楼主分享辛苦了
解决方案六:
你试试,看这样可以不!objectTest{caseclassPerson(words:String,number:Int)defmain(args:Array[String]){valsparkConf=newSparkConf().setAppName("TextStream")valsc=newSparkContext(sparkConf)Class.forName("oracle.jdbc.driver.OracleDriver").newInstance()valconn=DriverManager.getConnection("jdbc:oracle:thin:@host/oracledb","user","password")varps:PreparedStatement=nullvalssc=newStreamingContext(sparkConf,Seconds(6))//CreatetheFileInputDStreamonthedirectoryandusethevallines=ssc.textFileStream("/user/input/")valwords=lines.flatMap(_.split(","))valwordCounts=words.map(x=>(x,1))//wordCounts.print()wordCounts.foreachRDD((rdd:RDD[(String,Int)])=>{rdd.foreach(lines=>{ps=conn.prepareStatement("insertintotesttablesvalues(?,?)")ps.setString(1,lines._1)ps.setInt(2,lines._2)ps.executeUpdate()})})}}