问题描述
由于处理数据的需求,我写了一个简单的spark应用处理数据,本想着处理速度应该有很大的提高,然而结果令我难以接受!1spark集群运行条件:3个work节点,Hdfs文件管理系统,数据输入2.5G左右,运行时间大约8分钟。spark应用程序如下:packageexamples;importjava.util.List;importjava.util.regex.Pattern;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.mapred.TextOutputFormat;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function;importorg.apache.spark.api.java.function.PairFunction;importscala.Tuple2;importscala.Tuple6;importcom.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;publicclassCompression{privatestaticfinalPatternSPACE=Pattern.compile("");publicstaticvoidmain(String[]args)throwsException{if(args.length<4){System.err.println("Usage:Compression<file><interval><slice><type>");System.exit(1);}finalIntegerinterval=Integer.valueOf(args[1]);SparkConfconf=newSparkConf().setAppName("Compression"+args[3]);JavaSparkContextctx=newJavaSparkContext(conf);JavaRDD<String>lines=ctx.textFile(args[0]);JavaPairRDD<Integer,Tuple2<Integer,Double>>key_v=lines.mapToPair(newPairFunction<String,Integer,Tuple2<Integer,Double>>(){@OverridepublicTuple2<Integer,Tuple2<Integer,Double>>call(Strings){String[]x=SPACE.split(s);Integerorder=Integer.valueOf(x[0]);Integerk=order/interval;Tuple2<Integer,Double>v=newTuple2<Integer,Double>(order%interval,Double.valueOf(x[1]));returnnewTuple2<Integer,Tuple2<Integer,Double>>(k,v);}});JavaPairRDD<Integer,Iterable<Tuple2<Integer,Double>>>segments=key_v.groupByKey();//JavaPairRDD<Integer,Iterable<Tuple2<Integer,Double>>>segments=key_v.distinct();JavaPairRDD<Integer,Tuple6<Double,Double,Double,Double,Double,Double>>compressdata=segments.mapValues(newFunction<Iterable<Tuple2<Integer,Double>>,Tuple6<Double,Double,Double,Double,Double,Double>>(){@OverridepublicTuple6<Double,Double,Double,Double,Double,Double>call(Iterable<Tuple2<Integer,Double>>list)throwsException{//TODOAuto-generatedmethodstubDoublemax=Double.MIN_VALUE;Doublemin=Double.MAX_VALUE;Doubletotal=0.0;Doublestart=0.0;Doubleend=0.0;Doubleavg=0.0;Doubles=0.0;Integerlen=0;for(Tuple2<Integer,Double>v:list){len++;if(v._1.equals(0))start=v._2;total+=v._2;if(v._2>max)max=v._2;if(v._2<min)min=v._2;}avg=total/len;Doubletemp=0.0;len-=1;for(Tuple2<Integer,Double>v:list){if(v._1.equals(len))end=v._2;temp+=(v._2-avg)*(v._2-avg);}s=Math.sqrt(temp/len);returnnewTuple6<Double,Double,Double,Double,Double,Double>(start,end,max,min,avg,s);}});compressdata.saveAsHadoopFile("/SparkTest/Compression/result"+args[3],Text.class,IntWritable.class,TextOutputFormat.class);System.exit(0);}}
2单机运行,时间大约在2分钟左右packagedeal;importjava.io.BufferedReader;importjava.io.BufferedWriter;importjava.io.File;importjava.io.FileReader;importjava.io.FileWriter;importjava.io.IOException;importjava.text.DateFormat;importjava.text.SimpleDateFormat;importjava.util.ArrayList;importjava.util.Date;importjavax.xml.crypto.Data;publicclassCompression{publicstaticvoidmain(String[]args)throwsIOException{//TODOAuto-generatedmethodstubif(args.length<3){System.out.println("In,out,intervaln");System.exit(0);}Filerecord=newFile("records.txt");BufferedWriterrw=newBufferedWriter(newFileWriter(record,true));Datedates=newDate();DateFormatformat=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss");Stringtimestart=format.format(dates);Filefiler=newFile(args[0]);Filefilew=newFile(args[1]);Integerinterval=Integer.valueOf(args[2]);BufferedReaderreader=null;BufferedWriterwriter=null;try{reader=newBufferedReader(newFileReader(filer));writer=newBufferedWriter(newFileWriter(filew));StringtempString=null;intline=0;onedataone=null;while((tempString=reader.readLine())!=null){String[]lines=tempString.split("");if(line%interval==0){if(one!=null){one.calculate();writer.write(one.ToString());}one=newonedata(line/interval);}one.list.add(Double.valueOf(tempString.split("")[1]));line++;}reader.close();writer.close();}catch(IOExceptione){e.printStackTrace();}finally{if(reader!=null){try{reader.close();}catch(IOExceptione1){}}}Datedatee=newDate();Stringtimeend=format.format(datee);longcha=datee.getTime()-dates.getTime();rw.write(args[0]+"t"+timestart+"t"+timeend+"t"+cha+"n");rw.close();}}classonedata{publicArrayList<Double>list=newArrayList<Double>();Doublemax=Double.MIN_VALUE;Doublemin=Double.MAX_VALUE;Doublestart=0.0;Doubleend=0.0;Doubleavg=0.0;Doubles=0.0;Integerorder=0;publiconedata(Integero){//TODOAuto-generatedconstructorstuborder=o;}publicvoidcalculate(){Doublesum=0.0;start=list.get(0);end=list.get(list.size()-1);for(Doubledouble1:list){sum+=double1;if(max<double1)max=double1;if(min>double1)min=double1;}avg=sum/list.size();Doubletemp=0.0;for(Doubledouble1:list){temp+=(double1-avg)*(double1-avg);}s=Math.sqrt(temp/list.size());}publicStringToString(){returnorder+"t("+start+","+end+","+max+","+min+","+avg+","+s+")n";}}
求各位大牛解释为啥spark集群运行时消耗的时间比单机的时候多那么多?谢谢!
解决方案
解决方案二:
是不是同样的代码?如果不是同样的代码,不好比较.
解决方案三:
处理逻辑基本都是一样的,单机的代码是正常的Java写的处理数据的代码。集群运行的代码是Spark应用的代码。
解决方案四:
集群怎么配置的处理的数据量有多大
解决方案五:
集群处理大数据有优势,集群启动分配任务是需要时间的