spark sql简单示例

运行环境

集群环境:CDH5.3.0

具体JAR版本如下:

spark版本:1.2.0-cdh5.3.0

hive版本:0.13.1-cdh5.3.0

hadoop版本:2.5.0-cdh5.3.0

spark sql的JAVA版简单示例

  1. spark sql直接查询JSON格式的数据
  2. spark sql的自定义函数
  3. spark sql查询hive上面的表
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.DataType;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.hive.api.java.JavaHiveContext;

/**
 * 注意:
 * 使用JavaHiveContext时
 * 1:需要在classpath下面增加三个配置文件:hive-site.xml,core-site.xml,hdfs-site.xml
 * 2:需要增加postgresql或mysql驱动包的依赖
 * 3:需要增加hive-jdbc,hive-exec的依赖
 *
 */
public class SimpleDemo {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("simpledemo").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaSQLContext sqlCtx = new JavaSQLContext(sc);
        JavaHiveContext hiveCtx = new JavaHiveContext(sc);
//        testQueryJson(sqlCtx);
//        testUDF(sc, sqlCtx);
        testHive(hiveCtx);
        sc.stop();
        sc.close();
    }

    //测试spark sql直接查询JSON格式的数据
    public static void testQueryJson(JavaSQLContext sqlCtx) {
        JavaSchemaRDD rdd = sqlCtx.jsonFile("file:///D:/tmp/tmp/json.txt");
        rdd.printSchema();

        // Register the input schema RDD
        rdd.registerTempTable("account");

        JavaSchemaRDD accs = sqlCtx.sql("SELECT address, email,id,name FROM account ORDER BY id LIMIT 10");
        List<Row> result = accs.collect();
        for (Row row : result) {
            System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getInt(2) + ","
                    + row.getString(3));
        }

        JavaRDD<String> names = accs.map(new Function<Row, String>() {
            @Override
            public String call(Row row) throws Exception {
                return row.getString(3);
            }
        });
        System.out.println(names.collect());
    }

    //测试spark sql的自定义函数
    public static void testUDF(JavaSparkContext sc, JavaSQLContext sqlCtx) {
        // Create a account and turn it into a Schema RDD
        ArrayList<AccountBean> accList = new ArrayList<AccountBean>();
        accList.add(new AccountBean(1, "lily", "lily@163.com", "gz tianhe"));
        JavaRDD<AccountBean> accRDD = sc.parallelize(accList);

        JavaSchemaRDD rdd = sqlCtx.applySchema(accRDD, AccountBean.class);

        rdd.registerTempTable("acc");

        // 编写自定义函数UDF
        sqlCtx.registerFunction("strlength", new UDF1<String, Integer>() {
            @Override
            public Integer call(String str) throws Exception {
                return str.length();
            }
        }, DataType.IntegerType);

        // 数据查询
        List<Row> result = sqlCtx.sql("SELECT strlength('name'),name,address FROM acc LIMIT 10").collect();
        for (Row row : result) {
            System.out.println(row.getInt(0) + "," + row.getString(1) + "," + row.getString(2));
        }
    }

    //测试spark sql查询hive上面的表
    public static void testHive(JavaHiveContext hiveCtx) {
        List<Row> result = hiveCtx.sql("SELECT foo,bar,name from pokes2 limit 10").collect();
        for (Row row : result) {
            System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getString(2));
        }
    }
}
时间: 2024-11-03 04:23:29

spark sql简单示例的相关文章

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(一)

Spark SQL, DataFrames 以及 Datasets 编程指南 概要 Spark SQL是Spark中处理结构化数据的模块.与基础的Spark RDD API不同,Spark SQL的接口提供了更多关于数据的结构信息和计算任务的运行时信息.在Spark内部,Spark SQL会能够用于做优化的信息比RDD API更多一些.Spark SQL如今有了三种不同的API:SQL语句.DataFrame API和最新的Dataset API.不过真正运行计算的时候,无论你使用哪种API或语

Spark SQL中的数据源

Spark 支持通过 DataFrame 来操作大量的数据源,包括外部文件(如 json.avro.parquet.sequencefile 等等).hive.关系数据库.cassandra 等等. 本文测试环境为 Spark 1.3. 加载和保存文件 最简单的方式是调用 load 方法加载文件,默认的格式为 parquet,你可以修改 spark.sql.sources.default 指定默认的格式: scala> val df = sqlContext.load("people.pa

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(二)

编程方式定义Schema Scala Java Python 如果不能事先通过case class定义schema(例如,记录的字段结构是保存在一个字符串,或者其他文本数据集中,需要先解析,又或者字段对不同用户有所不同),那么你可能需要按以下三个步骤,以编程方式的创建一个DataFrame: 从已有的RDD创建一个包含Row对象的RDD 用StructType创建一个schema,和步骤1中创建的RDD的结构相匹配 把得到的schema应用于包含Row对象的RDD,调用这个方法来实现这一步:SQ

Spark SQL组件源码分析

功能 Spark新发布的Spark SQL组件让Spark对SQL有了别样于Shark基于Hive的支持.参考官方手册,具体分三部分: 其一,能在Scala代码里写SQL,支持简单的SQL语法检查,能把RDD指定为Table存储起来.此外支持部分SQL语法的DSL. 其二,支持Parquet文件的读写,且保留Schema. 其三,能在Scala代码里访问Hive元数据,能执行Hive语句,并且把结果取回作为RDD使用. 第一点对SQL的支持主要依赖了Catalyst这个新的查询优化框架(下面会给

整理对Spark SQL的理解

Catalyst Catalyst是与Spark解耦的一个独立库,是一个impl-free的执行计划的生成和优化框架. 目前与Spark Core还是耦合的,对此user邮件组里有人对此提出疑问,见mail.   以下是Catalyst较早时候的架构图,展示的是代码结构和处理流程. Catalyst定位 其他系统如果想基于Spark做一些类sql.标准sql甚至其他查询语言的查询,需要基于Catalyst提供的解析器.执行计划树结构.逻辑执行计划的处理规则体系等类体系来实现执行计划的解析.生成.

Spark SQL性能优化

性能优化参数 针对Spark SQL 性能调优参数如下: 代码示例 import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.Row; import org.a

关于CarbonData+Spark SQL的一些应用实践和调优经验分享

大数据时代,中大型企业数据的爆发式增长,几乎每天都能产生约 100GB 到 10TB 的数据.而企业数据分系统构建与扩张,导致不同应用场景下大数据冗余严重.行业亟需一个高效.统一的融合数仓,从海量数据中快速获取有效信息,从而洞察机遇.规避风险. 在这样的现状下,CarbonData 诞生了,作为首个由中国贡献给Apache社区的顶级开源项目,CarbonData 提供了一种新的融合数据存储方案,以一份数据同时支持多种大数据应用场景,并通过丰富的索引技术.字典编码.列存等特性提升了 IO 扫描和计

Spark SQL中的DataFrame

在2014年7月1日的 Spark Summit 上,Databricks 宣布终止对 Shark 的开发,将重点放到 Spark SQL 上.在会议上,Databricks 表示,Shark 更多是对 Hive 的改造,替换了 Hive 的物理执行引擎,因此会有一个很快的速度.然而,不容忽视的是,Shark 继承了大量的 Hive 代码,因此给优化和维护带来了大量的麻烦.随着性能优化和先进分析整合的进一步加深,基于 MapReduce 设计的部分无疑成为了整个项目的瓶颈. 详细内容请参看 Sh

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(四)

使用Spark SQL命令行工具 Spark SQL CLI是一个很方便的工具,它可以用local mode运行hive metastore service,并且在命令行中执行输入的查询.注意Spark SQL CLI目前还不支持和Thrift JDBC server通信. 用如下命令,在spark目录下启动一个Spark SQL CLI ./bin/spark-sql Hive配置在conf目录下hive-site.xml,core-site.xml,hdfs-site.xml中设置.你可以用