今天写了一下MapReduce程序从MSSQL SERVER2008数据库里取数据分析。程序发布到hadoop机器上运行报SQLEXCEPTION错误
奇怪了,我的SQL语句中没有LIMIT,这LIMIT哪来的。我翻看了DBInputFormat类的源码,
protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
Configuration conf) throws IOException {
@SuppressWarnings("unchecked")
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
try {
// use database product name to determine appropriate record reader.
if (dbProductName.startsWith("ORACLE")) {
// use Oracle-specific db reader.
return new OracleDBRecordReader<T>(split, inputClass,
conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName);
} else if (dbProductName.startsWith("MYSQL")) {
// use MySQL-specific db reader.
return new MySQLDBRecordReader<T>(split, inputClass,
conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName);
} else {
// Generic reader.
return new DBRecordReader<T>(split, inputClass,
conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName);
}
} catch (SQLException ex) {
throw new IOException(ex.getMessage());
}
}
DBRecordReader的源码
protected String getSelectQuery() {
StringBuilder query = new StringBuilder();
// Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits.
if(dbConf.getInputQuery() == null) {
query.append("SELECT ");
for (int i = 0; i < fieldNames.length; i++) {
query.append(fieldNames[i]);
if (i != fieldNames.length -1) {
query.append(", ");
}
}
query.append(" FROM ").append(tableName);
query.append(" AS ").append(tableName); //in hsqldb this is necessary
if (conditions != null && conditions.length() > 0) {
query.append(" WHERE (").append(conditions).append(")");
}
String orderBy = dbConf.getInputOrderBy();
if (orderBy != null && orderBy.length() > 0) {
query.append(" ORDER BY ").append(orderBy);
}
} else {
//PREBUILT QUERY
query.append(dbConf.getInputQuery());
}
try {
query.append(" LIMIT ").append(split.getLength()); //问题所在
query.append(" OFFSET ").append(split.getStart());
} catch (IOException ex) {
// Ignore, will not throw.
}
return query.toString();
}
终于找到原因了。
原来,hadoop只实现了Mysql的DBRecordReader(MySQLDBRecordReader)和ORACLE的DBRecordReader(OracleDBRecordReader)。
原因找到了,我参考着OracleDBRecordReader实现了MSSQL SERVER的DBRecordReader代码如下:
MSSQLDBInputFormat的代码:
/**
*
*/
package org.apache.hadoop.mapreduce.lib.db;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
/**
* @author summer
* MICROSOFT SQL SERVER
*/
public class MSSQLDBInputFormat<T extends DBWritable> extends DBInputFormat<T> {
public static void setInput(Job job,
Class<? extends DBWritable> inputClass,
String inputQuery, String inputCountQuery,String rowId) {
job.setInputFormatClass(MSSQLDBInputFormat.class);
DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
dbConf.setInputClass(inputClass);
dbConf.setInputQuery(inputQuery);
dbConf.setInputCountQuery(inputCountQuery);
dbConf.setInputFieldNames(new String[]{rowId});
}
@Override
protected RecordReader<LongWritable, T> createDBRecordReader(
org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split,
Configuration conf) throws IOException {
@SuppressWarnings("unchecked")
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
try {
return new MSSQLDBRecordReader<T>(split, inputClass,
conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName);
} catch (SQLException ex) {
throw new IOException(ex.getMessage());
}
}
}
MSSQLDBRecordReader的代码:
/**
*
*/
package org.apache.hadoop.mapreduce.lib.db;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
/**
* @author summer
*
*/
public class MSSQLDBRecordReader <T extends DBWritable> extends DBRecordReader<T>{
public MSSQLDBRecordReader(DBInputFormat.DBInputSplit split,
Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
String cond, String [] fields, String table) throws SQLException {
super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
}
@Override
protected String getSelectQuery() {
StringBuilder query = new StringBuilder();
DBConfiguration dbConf = getDBConf();
String conditions = getConditions();
String tableName = getTableName();
String [] fieldNames = getFieldNames();
// Oracle-specific codepath to use rownum instead of LIMIT/OFFSET.
if(dbConf.getInputQuery() == null) {
query.append("SELECT ");
for (int i = 0; i < fieldNames.length; i++) {
query.append(fieldNames[i]);
if (i != fieldNames.length -1) {
query.append(", ");
}
}
query.append(" FROM ").append(tableName);
if (conditions != null && conditions.length() > 0)
query.append(" WHERE ").append(conditions);
String orderBy = dbConf.getInputOrderBy();
if (orderBy != null && orderBy.length() > 0) {
query.append(" ORDER BY ").append(orderBy);
}
} else {
//PREBUILT QUERY
query.append(dbConf.getInputQuery());
}
try {
DBInputFormat.DBInputSplit split = getSplit();
if (split.getLength() > 0){
String querystring = query.toString();
String id = fieldNames[0];
query = new StringBuilder();
query.append("SELECT TOP "+split.getLength()+"* FROM ( ");
query.append(querystring);
query.append(" ) a WHERE " + id +" NOT IN (SELECT TOP ").append(split.getEnd());
query.append(" "+id +" FROM (");
query.append(querystring);
query.append(" ) b");
query.append(" )");
System.out.println("----------------------MICROSOFT SQL SERVER QUERY STRING---------------------------");
System.out.println(query.toString());
System.out.println("----------------------MICROSOFT SQL SERVER QUERY STRING---------------------------");
}
} catch (IOException ex) {
// ignore, will not throw.
}
return query.toString();
}
}
mapreduce的代码
/**
*
*/
package com.nltk.sns.mapreduce;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.MSSQLDBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.nltk.sns.ETLUtils;
/**
* @author summer
*
*/
public class LawDataEtl {
public static class CaseETLMapper extends
Mapper<LongWritable, LawCaseRecord, LongWritable, Text>{
static final int step = 6;
LongWritable key = new LongWritable(1);
Text value = new Text();
@Override
protected void map(
LongWritable key,
LawCaseRecord lawCaseRecord,
Mapper<LongWritable, LawCaseRecord, LongWritable, Text>.Context context)
throws IOException, InterruptedException {
System.out.println("-----------------------------"+lawCaseRecord+"------------------------------");
key.set(lawCaseRecord.id);
String source = ETLUtils.format(lawCaseRecord.source);
List<String> words = ETLUtils.split(source, step);
for(String w:words){
value.set(w);
context.write(key, value);
}
}
}
static final String driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
static final String dbUrl = "jdbc:sqlserver://192.168.0.100:1433;DatabaseName=lawdb";
static final String uid = "sa";
static final String pwd = "cistjava";
static final String inputQuery = "select sid,source from LawDB.dbo.case_source where sid<1000";
static final String inputCountQuery = "select count(1) from LawDB.dbo.case_source where sid<1000";
static final String jarClassPath = "/user/lib/sqljdbc4.jar";
static final String outputPath = "hdfs://ubuntu:9000/user/lawdata";
static final String rowId = "sid";
public static Job configureJob(Configuration conf) throws Exception{
String jobName = "etlcase";
Job job = Job.getInstance(conf, jobName);
job.addFileToClassPath(new Path(jarClassPath));
MSSQLDBInputFormat.setInput(job, LawCaseRecord.class, inputQuery, inputCountQuery,rowId);
job.setJarByClass(LawDataEtl.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(CaseETLMapper.class);
return job;
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(outputPath), true);
DBConfiguration.configureDB(conf, driverClass, dbUrl, uid, pwd);
conf.set(MRJobConfig.NUM_MAPS, String.valueOf(10));
Job job = configureJob(conf);
System.out.println("------------------------------------------------");
System.out.println(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
System.out.println(conf.get(DBConfiguration.URL_PROPERTY));
System.out.println(conf.get(DBConfiguration.USERNAME_PROPERTY));
System.out.println(conf.get(DBConfiguration.PASSWORD_PROPERTY));
System.out.println("------------------------------------------------");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
辅助类的代码:
/**
*
*/
package com.nltk.sns;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
/**
* @author summer
*
*/
public class ETLUtils {
public final static String NULL_CHAR = "";
public final static String PUNCTUATION_REGEX = "[(\\pP)&&[^\\|\\{\\}\\#]]+";
public final static String WHITESPACE_REGEX = "[\\p{Space}]+";
public static String format(String s){
return s.replaceAll(PUNCTUATION_REGEX, NULL_CHAR).replaceAll(WHITESPACE_REGEX, NULL_CHAR);
}
public static List<String> split(String s,int stepN){
List<String> splits = new ArrayList<String>();
if(StringUtils.isEmpty(s) || stepN<1)
return splits;
int len = s.length();
if(len<=stepN)
splits.add(s);
else{
for(int j=1;j<=stepN;j++)
for(int i=0;i<=len-j;i++){
String key = StringUtils.mid(s, i,j);
if(StringUtils.isEmpty(key))
continue;
splits.add(key);
}
}
return splits;
}
public static void main(String[] args){
String s="谢婷婷等与姜波等";
int stepN = 2;
List<String> splits = split(s,stepN);
System.out.println(splits);
}
}
运行成功了
代码初略的实现,主要是为了满足我的需求,大家可以根据自己的需要进行修改。
实际上DBRecordReader作者实现的并不好,我们来看DBRecordReader、MySQLDBRecordReader和OracleDBRecordReader源码,DBRecordReader和MySQLDBRecordReader耦合度太高。一般而言,就是对于没有具体实现的数据库DBRecordReader也应该做到运行不报异常,无非就是采用单一的SPLIT和单一的MAP。
使用MapReduce将HDFS数据导入到HBase
package com.bank.service;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 使用MapReduce批量导入Hbase(没有Reduce函数的MapReduce)
* @author mengyao
*
*/
public class DataImportToHbase extends Configured implements Tool {
static class DataImportToHbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private static String familyName = "info";
private static String[] qualifiers = {"gzh", "currency", "version", "valuta", "qfTime", "flag", "machineID"};
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] values = line.split("\t");
if (values.length == 7 && values.length == qualifiers.length) {
String row = values[0]+"_"+values[1]+"_"+values[2]+"_"+values[3];
long timestamp = System.currentTimeMillis();
ImmutableBytesWritable immutable = new ImmutableBytesWritable(Bytes.toBytes(row));
Put put = new Put(Bytes.toBytes(row));
for (int i = 0; i < values.length; i++) {
String qualifier = qualifiers[i];
String val = values[i];
put.add(Bytes.toBytes(familyName), Bytes.toBytes(qualifier), timestamp, Bytes.toBytes(val));
}
context.write(immutable, put);
} else {
System.err.println(" ERROR: value length must equale qualifier length ");
}
}
}
@Override
public int run(String[] arg0) throws Exception {
Job job = Job.getInstance(getConf(), DataImportToHbase.class.getSimpleName());
job.setJarByClass(DataImportToHbase.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, new Path(arg0[0]));
job.setMapperClass(DataImportToHbaseMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
TableMapReduceUtil.initTableReducerJob(arg0[1], null, job);
job.setNumReduceTasks(0);
TableMapReduceUtil.addDependencyJars(job);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "h5:2181,h6:2181,h7:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("dfs.socket.timeout", "3600000");
String[] otherArgs = new GenericOptionsParser(args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println(" ERROR: <dataInputDir> <tableName>");
System.exit(2);
}
int status = ToolRunner.run(conf, new DataImportToHbase(), otherArgs);
System.exit(status);
}
}