Cloudera 的定位在于
Bringing Big Data to the Enterprise with Hadoop
Cloudera为了让Hadoop的配置标准化,可以帮助企业安装,配置,运行hadoop以达到大规模企业数据的处理和分析。
既然是给企业使用,Cloudera的软件配置不是采用最新的hadoop 0.20,而是采用了Hadoop 0.18.3-12.cloudera.CH0_3的版本进行封装,并且集成了facebook提供的hive,yahoo提供的pig等基于hadoop的sql实现接口,使得这些软件的安装,配置和使用的成本降低并且进行了标准化。当然除了集成和封装这些成熟的工具外,Cloudera一个比较有意思的工具是sqoop,目前这个工具没有独立提供,因此这也是这次我们全面体验Cloudera的一个出发点,就是体验一下sqoop的工具的便捷性。
Sqoop (”SQL-to-Hadoop”),a tool designed to easily import information from SQL databases into your Hadoop cluster.通过sqoop,可以很方便的从传统的RDBMS里面导入数据到hadoop的集群,比如从mysql和oracle里面导入数据,非常方便,从导出到导入一条命令搞定,而且可以进行表的筛选,比起目前比较成熟的通过文本文件或者管道中转来说,开发的效率提升和配置的简洁是这个工具的特色所在。
Sqoop可以做到
Imports individual tables or entire databases to files in HDFS Generates Java classes to allow you to interact with your imported data Provides the ability to import from SQL databases straight into your Hive data warehouse
After setting up an import job in Sqoop, you can get started working with SQL database-backed data from your Hadoop MapReduce cluster in minutes.
这里我们先通过一个例子来立即体验一下sqoop,然后在给大家介绍完整的这套云计算环境的配置。
这个例子演示的是如果把客户表的数据拿到hadoop集群上进行分析,如何导出users表的数据并自动导入到hive,在通过hive进行ad-hoc的sql查询分析。这样可以体现出hadoop的强大数据处理能力,并且不影响生产库。
先建立测试USERS表:
mysql> CREATE TABLE USERS (
-> user_id INTEGER NOT NULL PRIMARY KEY,
-> first_name VARCHAR(32) NOT NULL,
-> last_name VARCHAR(32) NOT NULL,
-> join_date DATE NOT NULL,
-> zip INTEGER,
-> state CHAR(2),
-> email VARCHAR(128),
-> password_hash CHAR(64));
Query OK, 0 rows affected (0.00 sec)
插入一条测试数据
insert into USERS (user_id,first_name,last_name,join_date,zip,state,email,password_hash) values (1,'a','b','20080808',330440,'ha','test@test.com','xxxx');
Query OK, 1 row affected, 1 warning (0.00 sec)
mysql> select * from USERS;
+---------+------------+-----------+------------+--------+-------+---------------+---------------+
| user_id | first_name | last_name | join_date | zip | state | email | password_hash |
+---------+------------+-----------+------------+--------+-------+---------------+---------------+
| 1 | a | b | 2008-08-08 | 330440 | ha | test@test.com | xxxx |
+---------+------------+-----------+------------+--------+-------+---------------+---------------+
1 row in set (0.00 sec)
然后我们使用sqoop导入mysq库的USERS表到hive。
sqoop --connect jdbc:mysql://localhost/test --username root --password xxx --local --table USERS --hive-import
09/06/20 18:43:50 INFO sqoop.Sqoop: Beginning code generation
09/06/20 18:43:50 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM USERS AS t WHERE 1 = 1
09/06/20 18:43:50 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM USERS AS t WHERE 1 = 1
09/06/20 18:43:50 INFO orm.CompilationManager: HADOOP_HOME is /usr/lib/hadoop
09/06/20 18:43:50 INFO orm.CompilationManager: Found hadoop core jar at: /usr/lib/hadoop/hadoop-0.18.3-12.cloudera.CH0_3-core.jar
09/06/20 18:43:50 INFO orm.CompilationManager: Invoking javac with args: -sourcepath ./ -d /tmp/sqoop/compile/ -classpath /etc/hadoop/conf:/home/hadoop/jdk1.6/lib/tools.jar:/usr/lib/hadoop:/usr/lib/hadoop/hadoop-0.18.3-12.cloudera.CH0_3-core.jar:/usr/lib/hadoop/lib/commons-cli-2.0-SNAPSHOT.jar:/usr/lib/hadoop/lib/commons-codec-1.3.jar:/usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar:/usr/lib/hadoop/lib/commons-logging-1.0.4.jar:/usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar:/usr/lib/hadoop/lib/commons-net-1.4.1.jar:/usr/lib/hadoop/lib/hadoop-0.18.3-12.cloudera.CH0_3-fairscheduler.jar:/usr/lib/hadoop/lib/hadoop-0.18.3-12.cloudera.CH0_3-scribe-log4j.jar:/usr/lib/hadoop/lib/hsqldb.jar:/usr/lib/hadoop/lib/jets3t-0.6.1.jar:/usr/lib/hadoop/lib/jetty-5.1.4.jar:/usr/lib/hadoop/lib/junit-4.5.jar:/usr/lib/hadoop/lib/kfs-0.1.3.jar:/usr/lib/hadoop/lib/libfb303.jar:/usr/lib/hadoop/lib/libthrift.jar:/usr/lib/hadoop/lib/log4j-1.2.15.jar:/usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar:/usr/lib/hadoop/lib/oro-2.0.8.jar:/usr/lib/hadoop/lib/servlet-api.jar:/usr/lib/hadoop/lib/slf4j-api-1.4.3.jar:/usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar:/usr/lib/hadoop/lib/xmlenc-0.52.jar:/usr/lib/hadoop/lib/jetty-ext/commons-el.jar:/usr/lib/hadoop/lib/jetty-ext/jasper-compiler.jar:/usr/lib/hadoop/lib/jetty-ext/jasper-runtime.jar:/usr/lib/hadoop/lib/jetty-ext/jsp-api.jar:/usr/lib/hadoop/hadoop-0.18.3-12.cloudera.CH0_3-core.jar:/usr/lib/hadoop/contrib/sqoop/hadoop-0.18.3-12.cloudera.CH0_3-sqoop.jar ./USERS.java
09/06/20 18:43:51 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop/compile/USERS.jar
09/06/20 18:43:51 INFO manager.LocalMySQLManager: Beginning mysqldump fast path import
09/06/20 18:43:51 INFO manager.LocalMySQLManager: Performing import of table USERS from database test
09/06/20 18:43:52 INFO manager.LocalMySQLManager: Transfer loop complete.
09/06/20 18:43:52 INFO hive.HiveImport: Loading uploaded data into Hive
09/06/20 18:43:52 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM USERS AS t WHERE 1 = 1
09/06/20 18:43:52 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM USERS AS t WHERE 1 = 1
09/06/20 18:43:52 WARN hive.TableDefWriter: Column join_date had to be cast to a less precise type in Hive
09/06/20 18:43:53 INFO hive.HiveImport: Hive history file=/tmp/root/hive_job_log_root_200906201843_1606494848.txt
09/06/20 18:44:00 INFO hive.HiveImport: OK
09/06/20 18:44:00 INFO hive.HiveImport: Time taken: 5.916 seconds
09/06/20 18:44:00 INFO hive.HiveImport: Loading data to table users
09/06/20 18:44:00 INFO hive.HiveImport: OK
09/06/20 18:44:00 INFO hive.HiveImport: Time taken: 0.344 seconds
09/06/20 18:44:01 INFO hive.HiveImport: Hive import complete.
导入成功,我们在hive里面验证一下导入的正确性。
hive
Hive history file=/tmp/root/hive_job_log_root_200906201844_376630602.txt
hive> select * from USERS;
OK
1 'a' 'b' '2008-08-08' 330440 'ha' 'test@test.com' 'xxxx'
Time taken: 5.019 seconds
hive>
可以看到和mysql库的数据完全一致。
这样我们就完成了从mysql库到HDFS的导入。
并且提供了一个自动生成的USERS.java程序供MapReduce 的分析使用。
more USERS.java
// ORM class for USERS
// WARNING: This class is AUTO-GENERATED. Modify at your own risk.
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import org.apache.hadoop.sqoop.lib.JdbcWritableBridge;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
public class USERS implements DBWritable, Writable {
public static final int PROTOCOL_VERSION = 1;
private Integer user_id;
public Integer get_user_id() {
return user_id;
}
private String first_name;
public String get_first_name() {
return first_name;
}
private String last_name;
public String get_last_name() {
return last_name;
}
private java.sql.Date join_date;
public java.sql.Date get_join_date() {
return join_date;
}
private Integer zip;
public Integer get_zip() {
return zip;
}
private String state;
public String get_state() {
return state;
}
private String email;
public String get_email() {
return email;
}
private String password_hash;
public String get_password_hash() {
return password_hash;
}
public void readFields(ResultSet __dbResults) throws SQLException {
this.user_id = JdbcWritableBridge.readInteger(1, __dbResults);
this.first_name = JdbcWritableBridge.readString(2, __dbResults);
this.last_name = JdbcWritableBridge.readString(3, __dbResults);
this.join_date = JdbcWritableBridge.readDate(4, __dbResults);
this.zip = JdbcWritableBridge.readInteger(5, __dbResults);
this.state = JdbcWritableBridge.readString(6, __dbResults);
this.email = JdbcWritableBridge.readString(7, __dbResults);
this.password_hash = JdbcWritableBridge.readString(8, __dbResults);
}
public void write(PreparedStatement __dbStmt) throws SQLException {
JdbcWritableBridge.writeInteger(user_id, 1, 4, __dbStmt);
JdbcWritableBridge.writeString(first_name, 2, 12, __dbStmt);
JdbcWritableBridge.writeString(last_name, 3, 12, __dbStmt);
JdbcWritableBridge.writeDate(join_date, 4, 91, __dbStmt);
JdbcWritableBridge.writeInteger(zip, 5, 4, __dbStmt);
JdbcWritableBridge.writeString(state, 6, 1, __dbStmt);
JdbcWritableBridge.writeString(email, 7, 12, __dbStmt);
JdbcWritableBridge.writeString(password_hash, 8, 1, __dbStmt);
}
public void readFields(DataInput __dataIn) throws IOException {
if (__dataIn.readBoolean()) {
this.user_id = null;
} else {
this.user_id = Integer.valueOf(__dataIn.readInt());
}
if (__dataIn.readBoolean()) {
this.first_name = null;
} else {
this.first_name = Text.readString(__dataIn);
}
if (__dataIn.readBoolean()) {
this.last_name = null;
} else {
this.last_name = Text.readString(__dataIn);
}
if (__dataIn.readBoolean()) {
this.join_date = null;
} else {
this.join_date = new Date(__dataIn.readLong());
}
if (__dataIn.readBoolean()) {
this.zip = null;
} else {
this.zip = Integer.valueOf(__dataIn.readInt());
}
if (__dataIn.readBoolean()) {
this.state = null;
} else {
this.state = Text.readString(__dataIn);
}
if (__dataIn.readBoolean()) {
this.email = null;
} else {
this.email = Text.readString(__dataIn);
}
if (__dataIn.readBoolean()) {
this.password_hash = null;
} else {
this.password_hash = Text.readString(__dataIn);
}
}
public void write(DataOutput __dataOut) throws IOException {
if (null == this.user_id) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeInt(this.user_id);
}
if (null == this.first_name) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
Text.writeString(__dataOut, first_name);
}
if (null == this.last_name) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
Text.writeString(__dataOut, last_name);
}
if (null == this.join_date) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeLong(this.join_date.getTime());
}
if (null == this.zip) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeInt(this.zip);
}
if (null == this.state) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
Text.writeString(__dataOut, state);
}
if (null == this.email) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
Text.writeString(__dataOut, email);
}
if (null == this.password_hash) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
Text.writeString(__dataOut, password_hash);
}
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("" + user_id);
sb.append(",");
sb.append(first_name);
sb.append(",");
sb.append(last_name);
sb.append(",");
sb.append("" + join_date);
sb.append(",");
sb.append("" + zip);
sb.append(",");
sb.append(state);
sb.append(",");
sb.append(email);
sb.append(",");
sb.append(password_hash);
return sb.toString();
}
}
可以看到,自动生成的程序可读性非常好,可以进行自定义的二次开发使用。