使用mapreduce将MSSQL数据导到HDFS实例

今天写了一下MapReduce程序从MSSQL SERVER2008数据库里取数据分析。程序发布到hadoop机器上运行报SQLEXCEPTION错误

奇怪了,我的SQL语句中没有LIMIT,这LIMIT哪来的。我翻看了DBInputFormat类的源码,

protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,

      Configuration conf) throws IOException {

 

    @SuppressWarnings("unchecked")

    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());

    try {

      // use database product name to determine appropriate record reader.

      if (dbProductName.startsWith("ORACLE")) {

        // use Oracle-specific db reader.

        return new OracleDBRecordReader<T>(split, inputClass,

            conf, createConnection(), getDBConf(), conditions, fieldNames,

            tableName);

      } else if (dbProductName.startsWith("MYSQL")) {

        // use MySQL-specific db reader.

        return new MySQLDBRecordReader<T>(split, inputClass,

            conf, createConnection(), getDBConf(), conditions, fieldNames,

            tableName);

      } else {

        // Generic reader.

        return new DBRecordReader<T>(split, inputClass,

            conf, createConnection(), getDBConf(), conditions, fieldNames,

            tableName);

      }

    } catch (SQLException ex) {

      throw new IOException(ex.getMessage());

    }

  }

DBRecordReader的源码

protected String getSelectQuery() {

    StringBuilder query = new StringBuilder();

 

    // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits.

    if(dbConf.getInputQuery() == null) {

      query.append("SELECT ");

 

      for (int i = 0; i < fieldNames.length; i++) {

        query.append(fieldNames[i]);

        if (i != fieldNames.length -1) {

          query.append(", ");

        }

      }

 

      query.append(" FROM ").append(tableName);

      query.append(" AS ").append(tableName); //in hsqldb this is necessary

      if (conditions != null && conditions.length() > 0) {

        query.append(" WHERE (").append(conditions).append(")");

      }

 

      String orderBy = dbConf.getInputOrderBy();

      if (orderBy != null && orderBy.length() > 0) {

        query.append(" ORDER BY ").append(orderBy);

      }

    } else {

      //PREBUILT QUERY

      query.append(dbConf.getInputQuery());

    }

        

    try {

      query.append(" LIMIT ").append(split.getLength()); //问题所在

      query.append(" OFFSET ").append(split.getStart());

    } catch (IOException ex) {

      // Ignore, will not throw.

    }

 

    return query.toString();

  }

终于找到原因了。

原来,hadoop只实现了Mysql的DBRecordReader(MySQLDBRecordReader)和ORACLE的DBRecordReader(OracleDBRecordReader)。

原因找到了,我参考着OracleDBRecordReader实现了MSSQL SERVER的DBRecordReader代码如下:

MSSQLDBInputFormat的代码:

/**
 *
 */
package org.apache.hadoop.mapreduce.lib.db;

import java.io.IOException;
import java.sql.SQLException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;

/**
 * @author summer
 *  MICROSOFT SQL SERVER
 */
public class MSSQLDBInputFormat<T extends DBWritable> extends DBInputFormat<T> {

    public static void setInput(Job job,
              Class<? extends DBWritable> inputClass,
              String inputQuery, String inputCountQuery,String rowId) {
            job.setInputFormatClass(MSSQLDBInputFormat.class);
            DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
            dbConf.setInputClass(inputClass);
            dbConf.setInputQuery(inputQuery);
            dbConf.setInputCountQuery(inputCountQuery);
            dbConf.setInputFieldNames(new String[]{rowId});
          }
    
    @Override
    protected RecordReader<LongWritable, T> createDBRecordReader(
            org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split,
            Configuration conf) throws IOException {
        
         @SuppressWarnings("unchecked")
            Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
            try {
             
                return new MSSQLDBRecordReader<T>(split, inputClass,
                    conf, createConnection(), getDBConf(), conditions, fieldNames,
                    tableName);
            
            } catch (SQLException ex) {
              throw new IOException(ex.getMessage());
            }
        
        
    }

    
}

MSSQLDBRecordReader的代码:

/**
 *
 */
package org.apache.hadoop.mapreduce.lib.db;

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;

import org.apache.hadoop.conf.Configuration;

/**
 * @author summer
 *
 */
public class MSSQLDBRecordReader <T extends DBWritable> extends DBRecordReader<T>{

    public MSSQLDBRecordReader(DBInputFormat.DBInputSplit split,
              Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
              String cond, String [] fields, String table) throws SQLException {
        super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
        
    }

    @Override
    protected String getSelectQuery() {
         StringBuilder query = new StringBuilder();
            DBConfiguration dbConf = getDBConf();
            String conditions = getConditions();
            String tableName = getTableName();
            String [] fieldNames = getFieldNames();

            // Oracle-specific codepath to use rownum instead of LIMIT/OFFSET.
            if(dbConf.getInputQuery() == null) {
              query.append("SELECT ");
          
              for (int i = 0; i < fieldNames.length; i++) {
                query.append(fieldNames[i]);
                if (i != fieldNames.length -1) {
                  query.append(", ");
                }
              }
          
              query.append(" FROM ").append(tableName);
              if (conditions != null && conditions.length() > 0)
                query.append(" WHERE ").append(conditions);
              String orderBy = dbConf.getInputOrderBy();
              if (orderBy != null && orderBy.length() > 0) {
                query.append(" ORDER BY ").append(orderBy);
              }
            } else {
              //PREBUILT QUERY
              query.append(dbConf.getInputQuery());
            }
                
            try {
              DBInputFormat.DBInputSplit split = getSplit();
              if (split.getLength() > 0){
                String querystring = query.toString();
                String id = fieldNames[0];
                query = new StringBuilder();
                query.append("SELECT TOP "+split.getLength()+"* FROM ( ");
                query.append(querystring);
                query.append(" ) a WHERE " + id +" NOT IN (SELECT TOP ").append(split.getEnd());
                query.append(" "+id +" FROM (");
                query.append(querystring);
                query.append(" ) b");
                query.append(" )");
                System.out.println("----------------------MICROSOFT SQL SERVER QUERY STRING---------------------------");
                System.out.println(query.toString());
                System.out.println("----------------------MICROSOFT SQL SERVER QUERY STRING---------------------------");
              }
            } catch (IOException ex) {
              // ignore, will not throw.
            }              

            return query.toString();
    }
    
    

}

mapreduce的代码

/**
 *
 */
package com.nltk.sns.mapreduce;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.MSSQLDBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.nltk.sns.ETLUtils;

/**
 * @author summer
 *
 */
public class LawDataEtl {

    public static class CaseETLMapper extends
        Mapper<LongWritable, LawCaseRecord, LongWritable, Text>{

        static final int step = 6;
        
        LongWritable key = new LongWritable(1);
        Text value = new Text();
        
        @Override
        protected void map(
                LongWritable key,
                LawCaseRecord lawCaseRecord,
                Mapper<LongWritable, LawCaseRecord, LongWritable, Text>.Context context)
                throws IOException, InterruptedException {
            
            System.out.println("-----------------------------"+lawCaseRecord+"------------------------------");
            
            key.set(lawCaseRecord.id);
            String source = ETLUtils.format(lawCaseRecord.source);
            List<String> words = ETLUtils.split(source, step);
            for(String w:words){
                value.set(w);
                context.write(key, value);
            }
        }
    }
                
    static final String driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
    static final String dbUrl = "jdbc:sqlserver://192.168.0.100:1433;DatabaseName=lawdb";
    static final String uid = "sa";
    static final String pwd = "cistjava";
    static final String inputQuery = "select sid,source from LawDB.dbo.case_source where sid<1000";
    static final String inputCountQuery = "select count(1) from LawDB.dbo.case_source where sid<1000";
    static final String jarClassPath = "/user/lib/sqljdbc4.jar";
    static final String outputPath = "hdfs://ubuntu:9000/user/lawdata";
    static final String rowId = "sid";
    
    public static Job configureJob(Configuration conf) throws Exception{
        
        String jobName = "etlcase";
        Job job =  Job.getInstance(conf, jobName);

        job.addFileToClassPath(new Path(jarClassPath));
        MSSQLDBInputFormat.setInput(job, LawCaseRecord.class, inputQuery, inputCountQuery,rowId);
        job.setJarByClass(LawDataEtl.class);
        
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(CaseETLMapper.class);
        
        return job;
    }
    
    public static void main(String[] args) throws Exception{
        
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        fs.delete(new Path(outputPath), true);
        
        DBConfiguration.configureDB(conf, driverClass, dbUrl, uid, pwd);
        conf.set(MRJobConfig.NUM_MAPS, String.valueOf(10));
        Job job = configureJob(conf);
        System.out.println("------------------------------------------------");
        System.out.println(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
        System.out.println(conf.get(DBConfiguration.URL_PROPERTY));
        System.out.println(conf.get(DBConfiguration.USERNAME_PROPERTY));
        System.out.println(conf.get(DBConfiguration.PASSWORD_PROPERTY));
        System.out.println("------------------------------------------------");
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
        
    }
}

辅助类的代码:

/**
 *
 */
package com.nltk.sns;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;

/**
 * @author summer
 *
 */
public class ETLUtils {

    public final static String NULL_CHAR = "";
    public final static String PUNCTUATION_REGEX = "[(\\pP)&&[^\\|\\{\\}\\#]]+";
    public final static String WHITESPACE_REGEX = "[\\p{Space}]+";
    
    public static String format(String s){
        
        return s.replaceAll(PUNCTUATION_REGEX, NULL_CHAR).replaceAll(WHITESPACE_REGEX, NULL_CHAR);
    }
    
    public static List<String> split(String s,int stepN){
        
        List<String> splits = new ArrayList<String>();
        if(StringUtils.isEmpty(s) || stepN<1)
            return splits;
        int len = s.length();
        if(len<=stepN)
            splits.add(s);
        else{
            for(int j=1;j<=stepN;j++)
                for(int i=0;i<=len-j;i++){
                    String key = StringUtils.mid(s, i,j);
                    if(StringUtils.isEmpty(key))
                        continue;
                    splits.add(key);
                }
        }
        return splits;
        
    }
    
    public static void main(String[] args){
        
        String s="谢婷婷等与姜波等";
        int stepN = 2;
        List<String> splits = split(s,stepN);
        System.out.println(splits);
    }
}

运行成功了

代码初略的实现,主要是为了满足我的需求,大家可以根据自己的需要进行修改。

实际上DBRecordReader作者实现的并不好,我们来看DBRecordReader、MySQLDBRecordReader和OracleDBRecordReader源码,DBRecordReader和MySQLDBRecordReader耦合度太高。一般而言,就是对于没有具体实现的数据库DBRecordReader也应该做到运行不报异常,无非就是采用单一的SPLIT和单一的MAP。

使用MapReduce将HDFS数据导入到HBase

package com.bank.service;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 使用MapReduce批量导入Hbase(没有Reduce函数的MapReduce)
 * @author mengyao
 *
 */
public class DataImportToHbase extends Configured implements Tool {

    static class DataImportToHbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
        private static String familyName = "info";
        private static String[] qualifiers = {"gzh", "currency", "version", "valuta", "qfTime", "flag", "machineID"};
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] values = line.split("\t");
            if (values.length == 7 && values.length == qualifiers.length) {
                String row = values[0]+"_"+values[1]+"_"+values[2]+"_"+values[3];
                long timestamp = System.currentTimeMillis();
                ImmutableBytesWritable immutable = new ImmutableBytesWritable(Bytes.toBytes(row));
                Put put = new Put(Bytes.toBytes(row));
                for (int i = 0; i < values.length; i++) {
                    String qualifier = qualifiers[i];
                    String val = values[i];
                    put.add(Bytes.toBytes(familyName), Bytes.toBytes(qualifier), timestamp, Bytes.toBytes(val));
                }
                context.write(immutable, put);
            } else {
                System.err.println(" ERROR: value length must equale qualifier length ");
            }
        }
    }

    @Override
    public int run(String[] arg0) throws Exception {
        Job job = Job.getInstance(getConf(), DataImportToHbase.class.getSimpleName());
        job.setJarByClass(DataImportToHbase.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(arg0[0]));
        
        job.setMapperClass(DataImportToHbaseMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        
        TableMapReduceUtil.initTableReducerJob(arg0[1], null, job);        
        job.setNumReduceTasks(0);
        TableMapReduceUtil.addDependencyJars(job);
        
        return job.waitForCompletion(true) ? 0 : 1;
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "h5:2181,h6:2181,h7:2181");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("dfs.socket.timeout", "3600000");
        String[] otherArgs = new GenericOptionsParser(args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println(" ERROR: <dataInputDir> <tableName>");
            System.exit(2);
        }
        int status = ToolRunner.run(conf, new DataImportToHbase(), otherArgs);
        System.exit(status);
    }
}

时间: 2024-07-31 09:41:27

使用mapreduce将MSSQL数据导到HDFS实例的相关文章

sqoop client java api将mysql的数据导到hdfs

问题描述 sqoop client java api将mysql的数据导到hdfs package com.hadoop.recommend; import org.apache.sqoop.client.SqoopClient; import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MFromConfig; import org.apache.sqoop.model.MJob; import org

PHP把MSSQL数据导入到MYSQL实例

实例一  代码如下 复制代码 <?php //国内的PNR码连接 $hostname="127.0.0.1"; //MSSQL服务器的IP地址 或 服务器的名字 $dbuser="sa"; //MSSQL服务器的帐号 $dbpasswd="sa"; //MSSQL服务器的密码 $dbname="aa"; //数据库的名字 $conn = mssql_connect($hostname,$dbuser,$dbpasswd)

hadoop mapreduce 数据分析 丢数据

问题描述 hadoop mapreduce 数据分析 丢数据 最近发现hadoop的mapreduce程序会丢数据,不知道是什么原因,请教各位:hadoop环境,通过mapreduce程序分析hdfs上的数据,一天的数据是按小时存储的,每一个小时一个文件价,数据格式都是一样的,现在如果在16点这个文件价里有一条数据a,如果我用mr分析一整天的数据,数据a则丢失,如果单独跑16点这个文件夹里的数据,则数据a不会丢失,可以正常被分析出来,只要一加上其他时间段的数据,数据a就分析不出来,请问这是为什么

mssql数据迁移到mysql

为了把项目从 mssql+.net 转化为 mysql+jsp, 需要把数据从mssql迁移到mysql,因为数据不太复杂,不想用转换工具,就手动尝试迁移,方法记录,以便以后参考: 1. mssql 数据导出为sql  方法是在mssql数据库管理平台 选中数据库,鼠标右键菜单选 所有任务->生成脚本 ,在对话框中注意选上生成数据脚本,否则只有表结构脚本. 2. 把生成的脚本通过记事本打开,通过查找替换做些修改,使符合mysql语法,   主要是 如go等替换为空格,              

图片-asp.net怎么实现批量数据导出word模板中(每条数据导一个模板),之后全部打包下载

问题描述 asp.net怎么实现批量数据导出word模板中(每条数据导一个模板),之后全部打包下载 之前做的单个的做好了,但是批量的不知道怎么实现word模板, 这个时候单条的效果图: 单条的代码: 求救啊,做了好长时间了,哪位大神帮帮忙 解决方案 思路大致为: 1. 创建一个文件夹用来保存生成的Word文件 2. 因为你的数据是表,可以遍历数据表的记录,或者遍历表格控件的行来获取数据 3. 为每一天记录生成一个Word文件,类似你单条的代码,都保存到第一个创建的文件夹里面 4. 打包第一步创建

sqlserver2008迁移-sql server2008数据导进mysql数据库时报错

问题描述 sql server2008数据导进mysql数据库时报错 我使用的是navicat for mysql工具导入:首先选择导入向导-->选择ODBC导入类型--->选择s数据源:sql server Native client for 10.0-->输入服务器名称.用户名.密码.数据库.点击测试连接时,显示链接成功.最后点确定时报错了:无法打开文件provider=sqlncli10........;server spn="":这个错误 怎么解决? 解决方案

hbase-HBase中的数据export到HDFS上

问题描述 HBase中的数据export到HDFS上 我现在把HDFS上的数据import到HBase中去了,现在想要知道,如何将HBase中的数据export到HDFS上,并且是以自己想要的格式存放到HDFS上,如:原文件是什么格式,我导出的就是什么格式,大神帮帮忙啊

如何将网站上的json数据导到我的java小程序中?

问题描述 如何将网站上的json数据导到我的java小程序中? 我刚学完java基础,打算单纯用java做一个股票数据查询小程序.在聚合数据上找到了一个资料:http://www.juhe.cn/docs/api/id/21 ,我不知道要怎么弄了. 解决方案 发送http请求,读取返回的json数据,存到文件等,然后java程序访问文件 解决方案二: JAVA用POST向网页发送请求,接收数据 下面是一个范例程序: import java.io.BufferedReader; import ja

eclipse用servet怎样将数据库中的数据导成xml文件

问题描述 eclipse用servet怎样将数据库中的数据导成xml文件 eclipse用servet怎样将数据库中的数据导成xml文件,求源码,急用,谢谢 解决方案 http://zhidao.baidu.com/link?url=eOfCGfToOHvadTWBtHKQ2jnkH8ZxGbTLasgCs0lbkQTYCrZlh_JwuDt5a-ryg7TvKyG0kIF_6vShzqeWD690Yq 解决方案二: 这个只能有类似代码吧,没有通用的代码,有也需要做配置,毕竟你的表结构是别人不知