Spark SQL数据加载和保存实例讲解

一、前置知识详解
Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作,
Load:可以创建DataFrame,
Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。

二、Spark SQL读写数据代码实战

import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.List; public class SparkSQLLoadSaveOps { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext = new SQLContext(sc); /** * read()是DataFrameReader类型,load可以将数据读取出来 */ DataFrame peopleDF = sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json"); /** * 直接对DataFrame进行操作 * Json: 是一种自解释的格式,读取Json的时候怎么判断其是什么格式? * 通过扫描整个Json。扫描之后才会知道元数据 */ //通过mode来指定输出文件的是append。创建新文件来追加文件 peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames"); } }

读取过程源码分析如下:
1. read方法返回DataFrameReader,用于读取数据。

/** * :: Experimental :: * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]]. * {{{ * sqlContext.read.parquet("/path/to/file.parquet") * sqlContext.read.schema(schema).json("/path/to/file.json") * }}} * * @group genericdata * @since 1.4.0 */ @Experimental //创建DataFrameReader实例,获得了DataFrameReader引用 def read: DataFrameReader = new DataFrameReader(this)

2.  然后再调用DataFrameReader类中的format,指出读取文件的格式。

/** * Specifies the input data source format. * * @since 1.4.0 */ def format(source: String): DataFrameReader = { this.source = source this }

3.  通过DtaFrameReader中load方法通过路径把传入过来的输入变成DataFrame。

/** * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by * a local or distributed file system). * * @since 1.4.0 */ // TODO: Remove this one in Spark 2.0. def load(path: String): DataFrame = { option("path", path).load() }

至此,数据的读取工作就完成了,下面就对DataFrame进行操作。
下面就是写操作!!!

1. 调用DataFrame中select函数进行对列筛选

/** * Selects a set of columns. This is a variant of `select` that can only select * existing columns using column names (i.e. cannot construct expressions). * * {{{ * // The following two are equivalent: * df.select("colA", "colB") * df.select($"colA", $"colB") * }}} * @group dfops * @since 1.3.0 */ @scala.annotation.varargs def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

2.  然后通过write将结果写入到外部存储系统中。

/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */ @Experimental def write: DataFrameWriter = new DataFrameWriter(this)

3.   在保持文件的时候mode指定追加文件的方式

/** * Specifies the behavior when data or table already exists. Options include: // Overwrite是覆盖 * - `SaveMode.Overwrite`: overwrite the existing data. //创建新的文件,然后追加 * - `SaveMode.Append`: append the data. * - `SaveMode.Ignore`: ignore the operation (i.e. no-op). * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @since 1.4.0 */ def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this }

4.   最后,save()方法触发action,将文件输出到指定文件中。

/** * Saves the content of the [[DataFrame]] at the specified path. * * @since 1.4.0 */ def save(path: String): Unit = { this.extraOptions += ("path" -> path) save() }

三、Spark SQL读写整个流程图如下

四、对于流程中部分函数源码详解

DataFrameReader.Load()

1. Load()返回DataFrame类型的数据集合,使用的数据是从默认的路径读取。

/** * Returns the dataset stored at path as a DataFrame, * using the default data source configured by spark.sql.sources.default. * * @group genericdata * @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0. */ @deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0") def load(path: String): DataFrame = { //此时的read就是DataFrameReader read.load(path) }

2.  追踪load源码进去,源码如下:
在DataFrameReader中的方法。Load()通过路径把输入传进来变成一个DataFrame。

/** * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by * a local or distributed file system). * * @since 1.4.0 */ // TODO: Remove this one in Spark 2.0. def load(path: String): DataFrame = { option("path", path).load() }

3.  追踪load源码如下:

/** * Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external * key-value stores). * * @since 1.4.0 */ def load(): DataFrame = { //对传入的Source进行解析 val resolved = ResolvedDataSource( sqlContext, userSpecifiedSchema = userSpecifiedSchema, partitionColumns = Array.empty[String], provider = source, options = extraOptions.toMap) DataFrame(sqlContext, LogicalRelation(resolved.relation)) }

DataFrameReader.format()

1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。
Spark SQL在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.

/** * Specifies the input data source format.Built-in options include “parquet”,”json”,etc. * * @since 1.4.0 */ def format(source: String): DataFrameReader = { this.source = source //FileType this }

DataFrame.write()

1. 创建DataFrameWriter实例

/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */ @Experimental def write: DataFrameWriter = new DataFrameWriter(this) 1

2.  追踪DataFrameWriter源码如下:
以DataFrame的方式向外部存储系统中写入数据。

/** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, * key-value stores, etc). Use [[DataFrame.write]] to access this. * * @since 1.4.0 */ @Experimental final class DataFrameWriter private[sql](df: DataFrame) {

DataFrameWriter.mode()

1. Overwrite是覆盖,之前写的数据全都被覆盖了。
Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。

/** * Specifies the behavior when data or table already exists. Options include: * - `SaveMode.Overwrite`: overwrite the existing data. * - `SaveMode.Append`: append the data. * - `SaveMode.Ignore`: ignore the operation (i.e. no-op). //默认操作 * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @since 1.4.0 */ def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this }

2.  通过模式匹配接收外部参数

/** * Specifies the behavior when data or table already exists. Options include: * - `overwrite`: overwrite the existing data. * - `append`: append the data. * - `ignore`: ignore the operation (i.e. no-op). * - `error`: default option, throw an exception at runtime. * * @since 1.4.0 */ def mode(saveMode: String): DataFrameWriter = { this.mode = saveMode.toLowerCase match { case "overwrite" => SaveMode.Overwrite case "append" => SaveMode.Append case "ignore" => SaveMode.Ignore case "error" | "default" => SaveMode.ErrorIfExists case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.") } this }

DataFrameWriter.save()

1. save将结果保存传入的路径。

/** * Saves the content of the [[DataFrame]] at the specified path. * * @since 1.4.0 */ def save(path: String): Unit = { this.extraOptions += ("path" -> path) save() }

2.  追踪save方法。

/** * Saves the content of the [[DataFrame]] as the specified table. * * @since 1.4.0 */ def save(): Unit = { ResolvedDataSource( df.sqlContext, source, partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), mode, extraOptions.toMap, df) }

3.  其中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。

// This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default", defaultValue = Some("org.apache.spark.sql.parquet"), doc = "The default data source to use in input/output.")

DataFrame.scala中部分函数详解:

1. toDF函数是将RDD转换成DataFrame

/** * Returns the object itself. * @group basic * @since 1.3.0 */ // This is declared with parentheses to prevent the Scala compiler from treating // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. def toDF(): DataFrame = this

2.  show()方法:将结果显示出来

/** * Displays the [[DataFrame]] in a tabular form. For example: * {{{ * year month AVG('Adj Close) MAX('Adj Close) * 1980 12 0.503218 0.595103 * 1981 01 0.523289 0.570307 * 1982 02 0.436504 0.475256 * 1983 03 0.410516 0.442194 * 1984 04 0.450090 0.483521 * }}} * @param numRows Number of rows to show * @param truncate Whether truncate long strings. If true, strings more than 20 characters will * be truncated and all cells will be aligned right * * @group action * @since 1.5.0 */ // scalastyle:off println def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate)) // scalastyle:on println

追踪showString源码如下:showString中触发action收集数据。

/** * Compose the string representing rows for output * @param _numRows Number of rows to show * @param truncate Whether truncate long strings and align cells right */ private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = { val numRows = _numRows.max(0) val sb = new StringBuilder val takeResult = take(numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows) val numCols = schema.fieldNames.length

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

时间: 2024-10-24 16:31:50

Spark SQL数据加载和保存实例讲解的相关文章

数据加载后保存数据到另外数据表中急急急急急急急急急急急急急急

问题描述 数据加载后保存数据到另外数据表中急急急急急急急急急急急急急急 datagrid容器 数据加载后保存到另一个数据库表 sqlite数据库中加载一组数据到datagrid中. 现在把数据保存到sql server 数据库中. 解决方案 Dim Con As New ADODB.Connection '定义ADO对象 Dim rs As New ADODB.Recordset Con.ConnectionString = "driver=SQL Server;server=192.168.1

iframe异步加载实现点击左边菜单加载右边内容实例讲解_jquery

关于iframe异步加载,我们常用的大都是左边菜单栏右边是内容页面,要求我们不能左边菜单不能刷新的情况下,异步加载右边的内容页面. 话不多说,做了一个实例大致是这样的: 1.首先在你的项目中建立三个文件如: 2.在Default页面引入jquery文件并在body中加入也下代码: 复制代码 代码如下: <div style="width: 20%; float: left"> <div id="butten" style="cursor:

Knockout应用开发指南 第六章:加载或保存JSON数据

原文:Knockout应用开发指南 第六章:加载或保存JSON数据 加载或保存JSON数据 Knockout可以实现很复杂的客户端交互,但是几乎所有的web应用程序都要和服务器端交换数据(至少为了本地存储需要序列化数据),交换数据最方便的就是使用JSON格式 – 大多数的Ajax应用程序也是使用这种格式.   加载或保存数据 Knockout不限制你用任何技术加载和保存数据.你可以使用任何技术和服务器来交互.用的最多的是使用jQuery的Ajax帮助,例如:getJSON,post和ajax.你

oracle的sql loader数据加载工具

SQL*LOADER是ORACLE的数据加载工具,通常用来将操作系统文件迁移到ORACLE数据库中.SQL*LOADER是大型数据 仓库选择使用的加载方法. 在NT下,SQL*LOADER的命令为SQLLDR,在UNIX下一般为sqlldr/sqlload. 如执行:d:/oracle>sqlldr SQL*Loader: Release 8.1.6.0.0 - Production on 星期二 1月 8 11:06:42 2002 (c) Copyright 1999 Oracle Corp

Swift教程_CoreData实例(三)_构建控制层(列表数据加载、删除数据)

Swift教程_CoreData实例(一)_构建storyboard Swift教程_CoreData实例(二)_构建数据层 Swift教程_CoreData实例(三)_构建控制层(列表数据加载.删除数据) Swift教程_CoreData实例(四)_构建控制层(查询.更新数据) Swift教程_CoreData实例(五)_构建控制层(添加数据) 四.构建控制层 控制层总体结构包括列表的数据加载.数据的新增.删除.更新.这里我们先来搞定列表controller的功能(数据加载.删除),即PKOB

Oracle 高速批量数据加载工具sql*loader使用说明_oracle

SQL*Loader(SQLLDR)是Oracle的高速批量数据加载工具.这是一个非常有用的工具,可用于多种平面文件格式向Oralce数据库中加载数据.SQLLDR可以在极短的时间内加载数量庞大的数据.它有两种操作模式. 传统路径:(conventional path):SQLLDR会利用SQL插入为我们加载数据. 直接路径(direct path):采用这种模式,SQLLDR不使用SQL:而是直接格式化数据库块. 利用直接路径加载,你能从一个平面文件读数据,并将其直接写至格式化的数据库块,而绕

php+jquery+html实现点击不刷新加载更多的实例代码_php实例

基本原理:页面载入时,jQuery向后台请求数据,PHP通过查询数据库将最新的几条记录显示在列表页,在列表页的底部有个"更多"链接,通过触发该链接,向服务端发送Ajax请求,后台PHP程序得到请求参数,并作出相应,获取数据库相应的记录并以JSON的形式返回给前台页面,前台页面jQuery解析JSON数据,并将数据追加到列表页.其实就是Ajax分页效果. HTML 首先要引入jquery库和jquery.more.js插件,jquery.more.js已经将许多功能都封装好了,并提供了参

将在线数据加载到阿里云Greenplum

本文说明如何设计一个 ETL 作业以便将在线关系数据库里的数据,加载到阿里云的Greenplum 数据库中,如何调度和监控该ETL 作业的日常运行. 本文使用的软件是开源ETL 工具软件 Kettle 5.x,以及基于Kettle的傲飞数据整合平台,该平台可以用来Kettle作业的调度.监控等功能.并可以完成数据源管理等功能. 软件下载安装 1.   Java 1.6 或 1.7 2.   傲飞数据整合平台的下载地址(包含了 Kettle和服务端):http://pan.baidu.com/s/

php+jquery+html实现点击不刷新加载更多的实例代码

基本原理:页面载入时,jQuery向后台请求数据,PHP通过查询数据库将最新的几条记录显示在列表页,在列表页的底部有个"更多"链接,通过触发该链接,向服务端发送Ajax请求,后台PHP程序得到请求参数,并作出相应,获取数据库相应的记录并以JSON的形式返回给前台页面,前台页面jQuery解析JSON数据,并将数据追加到列表页.其实就是Ajax分页效果. HTML 首先要引入jquery库和jquery.more.js插件,jquery.more.js已经将许多功能都封装好了,并提供了参