问题描述
最近写了一个从Kafka读取数据,处理之后通过saveAsNewAPIHadoopDataset方法写入到hbase中,正常运行的时候没有报错,写入也正常,但是当手动停止应用,再次执行(通过Checkpoint恢复)的时候就会报错,跪求大神们解答!!报错信息如下:15/12/2216:26:52WARNVerifiableProperties:Propertyserializer.classisnotvalid15/12/2216:26:57WARNFileOutputCommitter:OutputPathisnullinsetupJob()15/12/2216:26:58WARNTaskSetManager:Losttask0.0instage1.0(TID1,10.4.120.183):java.lang.NullPointerExceptionatorg.apache.hadoop.fs.Path.<init>(Path.java:105)atorg.apache.hadoop.fs.Path.<init>(Path.java:94)atorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getDefaultWorkFile(FileOutputFormat.java:286)atorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:129)atorg.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1030)atorg.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1014)atorg.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)atorg.apache.spark.scheduler.Task.run(Task.scala:88)atorg.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)atjava.lang.Thread.run(Thread.java:722)^C15/12/2216:26:59ERRORTaskSetManager:Task0instage1.0failed4times;abortingjob15/12/2216:26:59ERRORJobScheduler:Errorrunningjobstreamingjob1450772680000ms.0org.apache.spark.SparkException:Jobabortedduetostagefailure:Task0instage1.0failed4times,mostrecentfailure:Losttask0.3instage1.0(TID4,10.4.120.183):java.lang.NullPointerExceptionatorg.apache.hadoop.fs.Path.<init>(Path.java:105)atorg.apache.hadoop.fs.Path.<init>(Path.java:94)atorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getDefaultWorkFile(FileOutputFormat.java:286)atorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:129)atorg.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1030)atorg.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1014)atorg.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)atorg.apache.spark.scheduler.Task.run(Task.scala:88)atorg.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)atjava.lang.Thread.run(Thread.java:722)Driverstacktrace:atorg.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)atscala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)atscala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)atorg.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)atorg.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)atorg.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)atscala.Option.foreach(Option.scala:236)atorg.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)atorg.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)atorg.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)atorg.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)atorg.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)atorg.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)atorg.apache.spark.SparkContext.runJob(SparkContext.scala:1813)atorg.apache.spark.SparkContext.runJob(SparkContext.scala:1826)atorg.apache.spark.SparkContext.runJob(SparkContext.scala:1903)atorg.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1055)atorg.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:998)atorg.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:998)atorg.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)atorg.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)atorg.apache.spark.rdd.RDD.withScope(RDD.scala:306)atorg.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:998)atcom.neusoft.cn.TEST02$$anonfun$createContext$2.apply(TEST02.scala:142)atcom.neusoft.cn.TEST02$$anonfun$createContext$2.apply(TEST02.scala:139)atorg.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)atorg.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)atorg.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)atorg.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)atorg.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)atorg.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)atorg.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)atorg.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)atorg.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)atscala.util.Try$.apply(Try.scala:161)atorg.apache.spark.streaming.scheduler.Job.run(Job.scala:34)atorg.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)atorg.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)atorg.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)atscala.util.DynamicVariable.withValue(DynamicVariable.scala:57)atorg.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)atjava.lang.Thread.run(Thread.java:722)Causedby:java.lang.NullPointerExceptionatorg.apache.hadoop.fs.Path.<init>(Path.java:105)atorg.apache.hadoop.fs.Path.<init>(Path.java:94)atorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getDefaultWorkFile(FileOutputFormat.java:286)atorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:129)atorg.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1030)atorg.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1014)atorg.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)atorg.apache.spark.scheduler.Task.run(Task.scala:88)atorg.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
解决方案
解决方案二:
楼主问题解决了吗?我也遇到同样的异常,现在还没有解决。
解决方案三:
帮顶,我也想知道,最近也在学习这些
解决方案四:
lz可以贴一下代码吗,我想在foreachrdd中写但是没效果,谢了