sqoop client java api将mysql的数据导到hdfs

问题描述

sqoop client java api将mysql的数据导到hdfs
 package com.hadoop.recommend;

import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.validation.Status;

public class MysqlToHDFS {
    public static void main(String[] args) {
        sqoopTransfer();
    }
    public static void sqoopTransfer() {
        //初始化
        String url = "http://master:12000/sqoop/";
        SqoopClient client = new SqoopClient(url);
        //创建一个源链接 JDBC
        long fromConnectorId = 2;
        MLink fromLink = client.createLink(fromConnectorId);
        fromLink.setName("JDBC connector");
        fromLink.setCreationUser("hadoop");
        MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig();
        fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://master:3306/hive");
        fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
        fromLinkConfig.getStringInput("linkConfig.username").setValue("root");
        fromLinkConfig.getStringInput("linkConfig.password").setValue("");
        Status fromStatus = client.saveLink(fromLink);
        if(fromStatus.canProceed()) {
         System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId());
        } else {
         System.out.println("创建JDBC Link失败");
        }
        //创建一个目的地链接HDFS
        long toConnectorId = 1;
        MLink toLink = client.createLink(toConnectorId);
        toLink.setName("HDFS connector");
        toLink.setCreationUser("hadoop");
        MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig();
        toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://master:9000/");
        Status toStatus = client.saveLink(toLink);
        if(toStatus.canProceed()) {
         System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId());
        } else {
         System.out.println("创建HDFS Link失败");
        }

        //创建一个任务
        long fromLinkId = fromLink.getPersistenceId();
        long toLinkId = toLink.getPersistenceId();
        MJob job = client.createJob(fromLinkId, toLinkId);
        job.setName("MySQL to HDFS job");
        job.setCreationUser("hadoop");
        //设置源链接任务配置信息
        MFromConfig fromJobConfig = job.getFromJobConfig();
        fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop");
        fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop");
        fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
        MToConfig toJobConfig = job.getToJobConfig();
        toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/user/hdfs/recommend");
        MDriverConfig driverConfig = job.getDriverConfig();
        driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");

        Status status = client.saveJob(job);
        if(status.canProceed()) {
         System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
        } else {
         System.out.println("JOB创建失败。");
        }

        //启动任务
        long jobId = job.getPersistenceId();
        MSubmission submission = client.startJob(jobId);
        System.out.println("JOB提交状态为 : " + submission.getStatus());
        while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
          System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
          //三秒报告一次进度
          try {
            Thread.sleep(3000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        System.out.println("JOB执行结束... ...");
        System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
        Counters counters = submission.getCounters();
        if(counters != null) {
          System.out.println("计数器:");
          for(CounterGroup group : counters) {
            System.out.print("	");
            System.out.println(group.getName());
            for(Counter counter : group) {
              System.out.print("		");
              System.out.print(counter.getName());
              System.out.print(": ");
              System.out.println(counter.getValue());
            }
          }
        }
        if(submission.getExceptionInfo() != null) {
          System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
        }
        System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕");
    }
}

报了这个错失咋回事??

解决方案

sqoop--mysql与hdfs数据互导
sqoop导oracle.mysql数据到hdfs hive

解决方案二:

http://master:12000/sqoop/

这个master在hosts文件中做了映射?

把master改为IP行不行,是不是防火墙的原因

解决方案三:

是不是sqoop的server不是master机器?

时间: 2024-10-31 11:07:05

sqoop client java api将mysql的数据导到hdfs的相关文章

Java编程 数据库MySQL 添加数据

问题描述 Java编程 数据库MySQL 添加数据 at java.awt.EventDispatchThread.pumpEventsForHierarchy(Unknown Source) at java.awt.EventDispatchThread.pumpEvents(Unknown Source) at java.awt.EventDispatchThread.pumpEvents(Unknown Source) at java.awt.EventDispatchThread.run

使用Java API压缩和解压缩数据

许多资料来源中都含有多余数据或对存储信息无用的数据.这常常造成客户机 和服务器应用程序间或电脑间浩如烟海的数据传输.很明显,数据存储和信息传 输问题解决办法是,安装辅助存储装置并扩展现有的通信设备.然而,要做到这 一点,就需要增加组织的运行费用.减轻部分数据存储和信息传输的方法之一是, 以更有效的代码表示数据.本文简要介绍数据压缩和解压缩,以及如何有效地.方便地从JavaTM应用程序内部使用 java.util.zip包压缩和解压缩数据. 虽然 WinZip.gzip和Java ARchive(

使用mapreduce将MSSQL数据导到HDFS实例

今天写了一下MapReduce程序从MSSQL SERVER2008数据库里取数据分析.程序发布到hadoop机器上运行报SQLEXCEPTION错误 奇怪了,我的SQL语句中没有LIMIT,这LIMIT哪来的.我翻看了DBInputFormat类的源码, protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,       Configuration conf) throws IOExce

sqoop从MySQL导入数据到hive报错 class not found

问题描述 sqoop从MySQL导入数据到hive报错 class not found 解决方案 sqoop 导入oracle 报错 Imported Failed : Attempted to generate class with no columns问题解决办法mysql启动报错 MySQL manager or server PID file could not be found! [FAILED]启动HIVE 服务报错 HWI WAR file not found 解决方案二: 您好,

编码-java向mySql插入数据乱码

问题描述 java向mySql插入数据乱码 为什么我执行以下命令行后暂时成功改成了utf8,但是退出mysql后重新进入又变成gbk编码了呢 mysql> SET character_set_client = utf8; mysql> SET character_set_results = utf8; mysql> SET character_set_connection = utf8; 解决方案 你要在my.ini需要修改两处 default-character-set=utf8 ch

java怎么调用mysql数据库里面的数据

问题描述 java怎么调用mysql数据库里面的数据 比如用户注册,要怎么调用数据库里面的数据以此来判断用户名有没有被注册 解决方案 java提供了操作数据库的工具jdbc,用jdbc连接数据库,可以查询表记录,然后与用户输入进行比对.这里有一个简单的登陆例子,参考一下:http://www.2cto.com/kf/201401/270812.html 解决方案二: java调用MySQL数据库 解决方案三: 用户注册,一般来说有一个唯一的标志,例如:用户名.如果用户注册时输入的用户名已经在数据

java实现连接mysql数据库单元测试查询数据的实例代码_java

1.按照javaweb项目的要求逐步建立搭建起机构,具体的类包有:model .db.dao.test; 具体的架构详见下图: 2.根据搭建的项目架构新建数据库test和数据库表t_userinfo并且添加对应的测试数据; (这里我使用的是绿色版的数据库,具体的下载地址:http://pan.baidu.com/s/1mg88YAc) 具体的建立数据库操作详见下图: 开发实例"> 3.编写包中的各种类代码,具体参考代码如下: UserInfo.java /** * FileName: Us

sqoop2:从mysql导出数据到hdfs

sqoop2:从mysql导出数据到hdfs中 sqoop-shell 启动sqoopp-shell jjzhu:bin didi$ sqoop2-shell Setting conf dir: /opt/sqoop-1.99.7/bin/../conf Sqoop home directory: /opt/sqoop-1.99.7 Sqoop Shell: Type 'help' or '\h' for help. sqoop:000> set server --host localhost

Apache Sqoop 1.99.4 发布,Hadoop 数据迁移

Apache Sqoop 1.99.4 发布,这是 Sqoop2 的第四个里程碑版本,是非常重要的一个里程碑. Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导入到Hadoop的HDFS中,也可以将HDFS的数据导入到关系型数据库中. 该版本改进内容和新特性: Improvement [SQOOP-773] – Sqoop2: Batch execution support fo