问题描述
publicclassLoadDataToHBase{publicstaticclassLoadDataToHBaseMapperextendsMapper<LongWritable,Text,ImmutableBytesWritable,Text>{publicstaticinty,m,d,h,n,s,mm;Calendarcal=Calendar.getInstance();//map的key用一个immutableBytesWritable类型的无意义的key,map的value是直接将原来的一行记录输出,//map完成后会shuffle和sort,将key-value按照key排序,否则写不进hfile,hfile要求后写的key不能小于先写的keyprivateImmutableBytesWritableimmutableBytesWritable=newImmutableBytesWritable();protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{immutableBytesWritable.set(Bytes.toBytes(key.get()));context.write(immutableBytesWritable,value);}}//reducer每次得到map输出的value就是输入文件的中的一行//reducer的key也无意义,每一个value值就是一个hfile格式的输出,包括rowkey、family、qualifier、timestamp、valuepublicstaticclassLoadDataToHBaseReducerextendsReducer<ImmutableBytesWritable,Text,ImmutableBytesWritable,KeyValue>{publicstaticinty,m,d,h,n,s,mm;Calendarcal=Calendar.getInstance();protectedvoidreduce(ImmutableBytesWritablekey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException{Stringvalue="";while(values.iterator().hasNext()){value=values.iterator().next().toString();if(value!=null&&!"".equals(value)){List<KeyValue>list=newArrayList<KeyValue>();list=createKeyValue(value.toString());Iterator<KeyValue>it=list.iterator();while(it.hasNext()){KeyValuekv=newKeyValue();kv=it.next();if(kv!=null){context.write(key,kv);}}}}}privateList<KeyValue>createKeyValue(Stringstr){List<KeyValue>list=newArrayList<KeyValue>();String[]values=str.toString().split("|");String[]qualifiersName=CONSTANT.qualifiersName;for(inti=1;i<qualifiersName.length;i++){longtimeStamp=System.currentTimeMillis();Stringrownum=values[0];Stringfamily=CONSTANT.familyName;Stringqualifier=qualifiersName[i];Stringvalue_str=values[i];inty=cal.get(Calendar.YEAR);intm=cal.get(Calendar.MONTH)+1;intd=cal.get(Calendar.DATE);inth=cal.get(Calendar.HOUR);intn=cal.get(Calendar.MINUTE);ints=cal.get(Calendar.SECOND);intmm=cal.get(Calendar.MILLISECOND);Stringrowkey_str=timeStamp+"-"+Integer.toString(y)+Integer.toString(m)+"/"+Integer.toString(d)+Integer.toString(h)+Integer.toString(n)+Integer.toString(s)+"/"+Integer.toString(mm)+rownum+"-"+values[4]+"-"+values[5]+"-"+values[6];KeyValuekv=newKeyValue(Bytes.toBytes(rowkey_str),Bytes.toBytes(family),Bytes.toBytes(qualifier),System.currentTimeMillis(),Bytes.toBytes(value_str));if(i!=4||i!=5||i!=6){list.add(kv);}}returnlist;}}publicstaticvoidmain(String[]args)throwsIOException,InterruptedException,ClassNotFoundException{Configurationconf=HBaseConfiguration.create();Jobjob=newJob(conf,CONSTANT.jobName);job.setJarByClass(LoadDataToHBase.class);job.setOutputKeyClass(ImmutableBytesWritable.class);//注意此处的Text.class要与map函数的输出key-value的value类型相对应job.setOutputValueClass(Text.class);job.setMapperClass(LoadDataToHBaseMapper.class);job.setReducerClass(LoadDataToHBaseReducer.class);//job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class);job.setOutputFormatClass(HFileOutputFormat.class);//job.setNumReduceTasks(4);//job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class);Configurationfs_conf=newConfiguration();FileSystemfs=FileSystem.get(fs_conf);Stringstr_inPath=CONSTANT.str_inPath;Stringstr_outPath=CONSTANT.str_outPath;//如果输出路径存在就先删掉,因为不允许输出路径事先存在PathoutPath=newPath(str_outPath);if(fs.exists(outPath))fs.delete(outPath,true);FileInputFormat.addInputPath(job,newPath(str_inPath));FileOutputFormat.setOutputPath(job,newPath(str_outPath));System.exit(job.waitForCompletion(true)?0:1);}}
publicclassCONSTANT{publicstaticfinalStringjobName="LoadDataToHBase";publicstaticfinalString[]qualifiersName={"","01_home","04_name","05_phone","07_price","08_room","09_large","10_floor","11_n","12_site","14_compay"};//publicstaticfinalString[]qualifiersName={"","00_url","01_home","02_what",//"03_compay2","04_name","05_phone","06_title",//"07_price","08_room","09_large","10_floor","11_n","12_site","13_compay"};publicstaticfinalStringfamilyName="info";publicstaticfinalStringtableName="hbase";publicstaticfinalStringstr_inPath="/user/hadoop/loadDataToHBase/input";publicstaticfinalStringstr_outPath="/user/hadoop/loadDataToHBase/output";publicstaticfinallongtimeStamp=System.currentTimeMillis();}
报错16/03/2818:35:08INFOmapreduce.Job:map100%reduce67%16/03/2818:35:08INFOmapreduce.Job:TaskId:attempt_1458611567937_0066_r_000000_2,Status:FAILEDError:java.io.IOException:Addedakeynotlexicallylargerthanpreviouskey=x00)1459161330442-20163/2863529/403-"-|-"-"-|x04info00_urlx00x00x01SxBCxCAxF3x0Bx04,lastkey=x00)1459161330442-20163/2863529/403-"-|-"-|-"x04info13_compayx00x00x01SxBCxCAxF3x0Ax04atorg.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:202)atorg.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:288)atorg.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:253)atorg.apache.hadoop.hbase.regionserver.StoreFile$Writer.append(StoreFile.java:935)atorg.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:196)atorg.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:149)atorg.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)atorg.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)atorg.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)attest1.LoadDataToHBase$LoadDataToHBaseReducer.reduce(LoadDataToHBase.java:69)attest1.LoadDataToHBase$LoadDataToHBaseReducer.reduce(LoadDataToHBase.java:1)atorg.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)atorg.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)atorg.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)atorg.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)atjava.security.AccessController.doPrivileged(NativeMethod)atjavax.security.auth.Subject.doAs(Subject.java:415)atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)atorg.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
求大神,谢谢!我该怎么改我都加TimeStamp给ROWKEY了,还是不行,少量数据还能导进去,数据量大了根本进不去惊扰5位了,抱歉刚弄了50分,求救