Spark SQL中的数据源

Spark 支持通过 DataFrame 来操作大量的数据源,包括外部文件(如 json、avro、parquet、sequencefile 等等)、hive、关系数据库、cassandra 等等。

本文测试环境为 Spark 1.3。

加载和保存文件

最简单的方式是调用 load 方法加载文件,默认的格式为 parquet,你可以修改 spark.sql.sources.default 指定默认的格式:

scala> val df = sqlContext.load("people.parquet")
scala> df.select("name", "age").save("namesAndAges.parquet")

你也可以收到指定数据源,使用全路径名称,如:org.apache.spark.sql.parquet,对于内置的数据源,你也可以使用简称,如:jsonparquetjdbc

scala> val df = sqlContext.load("people.json", "json")
scala> df.select("name", "age").save("namesAndAges.parquet", "parquet")

保存操作还可以指定保存模式,用于处理文件已经存在的情况下如何操作。

Scala/Java Python 含义
SaveMode.ErrorIfExists (default) “error” (default) 如果存在,则报错
SaveMode.Append “append” 追加模式
SaveMode.Overwrite “overwrite” 覆盖模式
SaveMode.Ignore “ignore” 忽略,类似 SQL 中的 CREATE TABLE IF NOT EXISTS

Parquet 数据源

加载数据

Spark SQL 支持读写 Parquet文件。

Scala:

// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.parquetFile("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

Java:

// sqlContext from the previous example is used in this example.

DataFrame schemaPeople = ... // The DataFrame from the previous example.

// DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");

// Read in the Parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile = sqlContext.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

Python:

# sqlContext from the previous example is used in this example.

schemaPeople # The DataFrame from the previous example.

# DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet")

# Read in the Parquet file created above.  Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = sqlContext.parquetFile("people.parquet")

# Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print teenName

SQL:

CREATE TEMPORARY TABLE parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "examples/src/main/resources/people.parquet"
)

SELECT * FROM parquetTable

自动发现分区

Parquet 数据源可以自动识别分区目录以及分区列的类型,目前支持数据类型和字符串类型。

例如,对于这样一个目录结构,有两个分区字段:gender、country。

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

将 path/to/table 路径传递给 SQLContext.parquetFile 或 SQLContext.load 时,Spark SQL 将会字段获取分区信息,并返回 DataFrame 的 schema 如下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

schema 自动扩展

Parquet 还支持 schema 自动扩展。

Scala:

// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sparkContext.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.saveAsParquetFile("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sparkContext.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.saveAsParquetFile("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.parquetFile("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partiioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

Python:

# sqlContext from the previous example is used in this example.

# Create a simple DataFrame, stored into a partition directory
df1 = sqlContext.createDataFrame(sc.parallelize(range(1, 6))\
                                   .map(lambda i: Row(single=i, double=i * 2)))
df1.save("data/test_table/key=1", "parquet")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
df2 = sqlContext.createDataFrame(sc.parallelize(range(6, 11))
                                   .map(lambda i: Row(single=i, triple=i * 3)))
df2.save("data/test_table/key=2", "parquet")

# Read the partitioned table
df3 = sqlContext.parquetFile("data/test_table")
df3.printSchema()

# The final schema consists of all 3 columns in the Parquet files together
# with the partiioning column appeared in the partition directory paths.
# root
# |-- single: int (nullable = true)
# |-- double: int (nullable = true)
# |-- triple: int (nullable = true)
# |-- key : int (nullable = true)

配置参数

  • spark.sql.parquet.binaryAsString:默认为 false,是否将 binary 当做字符串处理
  • spark.sql.parquet.int96AsTimestamp:默认为 true
  • spark.sql.parquet.cacheMetadata :默认为 true,是否缓存元数据
  • spark.sql.parquet.compression.codec:默认为 gzip,支持的值:uncompressed, snappy, gzip, lzo
  • spark.sql.parquet.filterPushdown:默认为 false
  • spark.sql.hive.convertMetastoreParquet:默认为 false

JSON 数据源

Spark SQL 能够自动识别 JSON 数据的 schema ,SQLContext 中有两个方法处理 JSON:

  • jsonFile:从一个 JSON 目录中加载数据,JSON 文件中每一行为一个 JSON 对象。
  • jsonRDD:从一个 RDD 中加载数据,RDD 的每一个元素为一个 JSON 对象的字符串。

一个 Scala 的例子如下:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "people.json"
// Create a DataFrame from the file(s) pointed to by path
val people = sqlContext.jsonFile(path)

// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)

Hive 数据源

Spark SQL 支持读和写 Hive 中的数据。Spark 源码本身不包括 Hive,故编译时候需要添加 -Phive 和 -Phive-thriftserver 开启对 Hive 的支持。另外,Hive assembly jar 需要存在于每一个 worker 节点上,因为他们需要 SerDes 去访问存在于 Hive 中的数据。

Scala:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

Java:

// sc is an existing JavaSparkContext.
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL.
Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();

Python:

# sc is an existing SparkContext.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results = sqlContext.sql("FROM src SELECT key, value").collect()

JDBC 数据源

Spark SQL 支持通过 JDBC 访问关系数据库,这需要用到 JdbcRDD。为了访问某一个关系数据库,需要将其驱动添加到 classpath,例如:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

访问 jdbc 数据源需要提供以下参数:

  • url
  • dbtable
  • driver
  • partitionColumn, lowerBound, upperBound, numPartitions

Scala 示例:

val jdbcDF = sqlContext.load("jdbc", Map(
  "url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename"))

Java:

Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:postgresql:dbserver");
options.put("dbtable", "schema.tablename");

DataFrame jdbcDF = sqlContext.load("jdbc", options)

Python:

df = sqlContext.load("jdbc", url="jdbc:postgresql:dbserver", dbtable="schema.tablename")

SQL:

CREATE TEMPORARY TABLE jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename"
)

访问 Avro

这不是 Spark 内置的数据源,要想访问 Avro 数据源 ,需要做些处理。这部分内容可以参考 如何将Avro数据加载到Spark 和 Spark with Avro

访问 Cassandra

TODO

测试

Spark 和 Parquet

参考上面的例子,将 people.txt 文件加载到 Spark:

scala> import sqlContext.implicits._
scala> case class People(name: String, age: Int)
scala> val people = sc.textFile("people.txt").map(_.split(",")).map(p => People(p(0), p(1).trim.toInt)).toDF()
scala> people.registerTempTable("people")
scala> val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
scala> teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

然后,将 people 这个 DataFrame 转换为 parquet 格式:

scala> people.saveAsParquetFile("people.parquet")
scala> val parquetFile = sqlContext.parquetFile("people.parquet")

另外,也可以从 hive 中加载 parquet 格式的文件。

hive> create table people_parquet like people stored as parquet;
hive> insert overwrite table people_parquet select * from people;

使用 HiveContext 来从 hive 中加载 parquet 文件,这里不再需要定义一个 case class ,因为 parquet 中已经包含了文件的 schema。

scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
scala> import hc.implicits._
scala>val peopleRDD = hc.parquetFile("people.parquet")
scala> peopleRDD.registerAsTempTable("pp")
scala>val teenagers = hc.sql("SELECT name FROM pp WHERE age >= 13 AND age <= 19")
scala>teenagers.collect.foreach(println)

注意到 impala 中处理 parquet 文件时,会将字符串保存为 Binary,为了修正这个问题,可以添加下面一行代码:

scala> sqlContext.setConf("spark.sql.parquet.binaryAsString","true")

SparkSql Join

下面是两个表左外连接的例子:

scala>import sqlContext.implicits._
scala>import org.apache.spark.sql.catalyst.plans._

scala> case class Dept(dept_id:String,dept_name:String)
scala> val dept = sc.parallelize(List( ("DEPT01","Information Technology"), ("DEPT02","WHITE HOUSE"),("DEPT03","EX-PRESIDENTS OFFICE"),("DEPT04","SALES"))).map( d => Dept(d._1,d._2)).toDF.as( "dept" )

scala> case class Emp(first_name:String,last_name:String,dept_id:String)
scala> val emp = sc.parallelize(List( ("Rishi","Yadav","DEPT01"),("Barack","Obama","DEPT02"),("Bill","Clinton","DEPT04"))).map( e => Emp(e._1,e._2,e._3)).toDF.as("emp")

scala> val alldepts = dept.join(emp,dept("dept_id") === emp("dept_id"), "left_outer").select("dept.dept_id","dept_name","first_name","last_name")

scala> alldepts.foreach(println)
[DEPT01,Information Technology,Rishi,Yadav]
[DEPT02,WHITE HOUSE,Barack,Obama]
[DEPT04,SALES,Bill,Clinton]
[DEPT03,EX-PRESIDENTS OFFICE,null,null]

支持的连接类型有:innerouterleft_outerright_outersemijoin

时间: 2024-10-03 20:24:12

Spark SQL中的数据源的相关文章

Spark SQL中的DataFrame

在2014年7月1日的 Spark Summit 上,Databricks 宣布终止对 Shark 的开发,将重点放到 Spark SQL 上.在会议上,Databricks 表示,Shark 更多是对 Hive 的改造,替换了 Hive 的物理执行引擎,因此会有一个很快的速度.然而,不容忽视的是,Shark 继承了大量的 Hive 代码,因此给优化和维护带来了大量的麻烦.随着性能优化和先进分析整合的进一步加深,基于 MapReduce 设计的部分无疑成为了整个项目的瓶颈. 详细内容请参看 Sh

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(四)

使用Spark SQL命令行工具 Spark SQL CLI是一个很方便的工具,它可以用local mode运行hive metastore service,并且在命令行中执行输入的查询.注意Spark SQL CLI目前还不支持和Thrift JDBC server通信. 用如下命令,在spark目录下启动一个Spark SQL CLI ./bin/spark-sql Hive配置在conf目录下hive-site.xml,core-site.xml,hdfs-site.xml中设置.你可以用

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(三)

JSON数据集 Scala Java Python R Sql Spark SQL在加载JSON数据的时候,可以自动推导其schema并返回DataFrame.用SQLContext.read.json读取一个包含String的RDD或者JSON文件,即可实现这一转换. 注意,通常所说的json文件只是包含一些json数据的文件,而不是我们所需要的JSON格式文件.JSON格式文件必须每一行是一个独立.完整的的JSON对象.因此,一个常规的多行json文件经常会加载失败. // sc是已有的Sp

spark-在Spark SQL中,列名为敏感词汇时如何处理?

问题描述 在Spark SQL中,列名为敏感词汇时如何处理? 有一张表,其第二列的列名为first.在运行SQL语句select first from tablename时老是报错,原因是把first列名当作SQL中的first()函数了. 请问这种情况如何处理?难道只能去改表tablename中的列名了吗? 解决方案 没用过Spark SQL 不过你可以试试用双引号或者方括号分隔first 解决方案二: 你可以试试用双引号或者方括号分隔first

Spark生态系统中的图数据分析知识

图结构可有效表示稀疏矩阵,因而图数据分析可用于实现大数据分析.对于Spark生态系统中的图处理系统GraphX,<Spark GraphX in Action>一书给出了详细的教程和典型用例,将教会读者如何使用GraphX和GraphFrames进行图分析.本文是Info对该书作者的访谈,内容包括图数据及分析技术.GraphX高效程序开发.图数据分析的趋势等. 如何定义图数据? Michael Malak:就事论事,图结构看上去并非像股价图那样,而是边和点的集合.但这只是一种模糊的数学抽象.更

关于CarbonData+Spark SQL的一些应用实践和调优经验分享

大数据时代,中大型企业数据的爆发式增长,几乎每天都能产生约 100GB 到 10TB 的数据.而企业数据分系统构建与扩张,导致不同应用场景下大数据冗余严重.行业亟需一个高效.统一的融合数仓,从海量数据中快速获取有效信息,从而洞察机遇.规避风险. 在这样的现状下,CarbonData 诞生了,作为首个由中国贡献给Apache社区的顶级开源项目,CarbonData 提供了一种新的融合数据存储方案,以一份数据同时支持多种大数据应用场景,并通过丰富的索引技术.字典编码.列存等特性提升了 IO 扫描和计

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(一)

Spark SQL, DataFrames 以及 Datasets 编程指南 概要 Spark SQL是Spark中处理结构化数据的模块.与基础的Spark RDD API不同,Spark SQL的接口提供了更多关于数据的结构信息和计算任务的运行时信息.在Spark内部,Spark SQL会能够用于做优化的信息比RDD API更多一些.Spark SQL如今有了三种不同的API:SQL语句.DataFrame API和最新的Dataset API.不过真正运行计算的时候,无论你使用哪种API或语

Spark修炼之道(进阶篇)——Spark入门到精通:第八节 Spark SQL与DataFrame(一)

本节主要内宾 Spark SQL简介 DataFrame 1. Spark SQL简介 Spark SQL是Spark的五大核心模块之一,用于在Spark平台之上处理结构化数据,利用Spark SQL可以构建大数据平台上的数据仓库,它具有如下特点: (1)能够无缝地将SQL语句集成到Spark应用程序当中 (2)统一的数据访问方式 DataFrames and SQL provide a common way to access a variety of data sources, includ

为什么说Spark SQL远远超越了MPP SQL

这里说的并不是性能,因为我没尝试对比过(下文会有简单的说明),而是尝试从某种更高一层次的的角度去看,为什么Spark SQL 是远远超越MPP SQL的. Spark SQL 和 MPP SQL 其实不在一个维度上.简而言之, MPP SQL 是 Spark SQL 的一个子集 Spark SQL 成为了一种跨越领域的交互形态 MPP SQL 是 Spark SQL 的一个子集 MPP SQL 要解决的技术问题是海量数据的查询问题.这里根据实际场景,你还可以加上一些修饰词汇,譬如秒级,Ad-ho