spark Sql

package org.apache.spark.sql

import org.apache.spark.{SparkConf, SparkContext}

object SLA_parquetSQL {

  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("SLA Filter"))
    val sqlContext = new SQLContext(sc)
    val suffix = args(0)
    sqlContext.parquetFile("/user/hive/warehouse/sla_parquet.db/e60001_shipment_exported_" + suffix).registerTempTable("e60001_shipment_exported")
    sqlContext.parquetFile("/user/hive/warehouse/sla_parquet.db/e62005_shipment_shipped_and_closed_" + suffix).registerTempTable("e62005_shipment_shipped_and_closed")
    sqlContext.parquetFile("/user/hive/warehouse/sla_parquet.db/e62006_shipment_canceled_and_closed_" + suffix).registerTempTable("e62006_shipment_canceled_and_closed")

    val e60001_shipment_exported = sqlContext.sql("select ordernumber, type_id, event_time from e60001_shipment_exported").map(line => (line(0), (line(1).toString, line(2).toString.substring(0, 19))))
    val e62005_shipment_shipped_and_closed = sqlContext.sql("select ordernumber, type_id, event_time from e62005_shipment_shipped_and_closed").map(line => (line(0), (line(1).toString, line(2).toString.substring(0, 19))))
    val e62006_shipment_canceled_and_closed = sqlContext.sql("select ordernumber, type_id, event_time from e62006_shipment_canceled_and_closed").map(line => (line(0), (line(1).toString, line(2).toString.substring(0, 19))))

    val un = e60001_shipment_exported.union(e62005_shipment_shipped_and_closed).union(e62006_shipment_canceled_and_closed)

    un.groupByKey.filter(kv => FilterSLA.filterSLA(kv._2.toSeq)).map(kv => kv._1 + "\t" + Utils.flatValues(kv._2.toSeq)).saveAsTextFile(args(1))
  }
}

本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1718636

时间: 2024-09-20 16:32:01

spark Sql的相关文章

spark sql简单示例

运行环境 集群环境:CDH5.3.0 具体JAR版本如下: spark版本:1.2.0-cdh5.3.0 hive版本:0.13.1-cdh5.3.0 hadoop版本:2.5.0-cdh5.3.0 spark sql的JAVA版简单示例 spark sql直接查询JSON格式的数据 spark sql的自定义函数 spark sql查询hive上面的表 import java.util.ArrayList; import java.util.List; import org.apache.sp

Spark SQL组件源码分析

功能 Spark新发布的Spark SQL组件让Spark对SQL有了别样于Shark基于Hive的支持.参考官方手册,具体分三部分: 其一,能在Scala代码里写SQL,支持简单的SQL语法检查,能把RDD指定为Table存储起来.此外支持部分SQL语法的DSL. 其二,支持Parquet文件的读写,且保留Schema. 其三,能在Scala代码里访问Hive元数据,能执行Hive语句,并且把结果取回作为RDD使用. 第一点对SQL的支持主要依赖了Catalyst这个新的查询优化框架(下面会给

整理对Spark SQL的理解

Catalyst Catalyst是与Spark解耦的一个独立库,是一个impl-free的执行计划的生成和优化框架. 目前与Spark Core还是耦合的,对此user邮件组里有人对此提出疑问,见mail.   以下是Catalyst较早时候的架构图,展示的是代码结构和处理流程. Catalyst定位 其他系统如果想基于Spark做一些类sql.标准sql甚至其他查询语言的查询,需要基于Catalyst提供的解析器.执行计划树结构.逻辑执行计划的处理规则体系等类体系来实现执行计划的解析.生成.

Spark SQL性能优化

性能优化参数 针对Spark SQL 性能调优参数如下: 代码示例 import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.Row; import org.a

关于CarbonData+Spark SQL的一些应用实践和调优经验分享

大数据时代,中大型企业数据的爆发式增长,几乎每天都能产生约 100GB 到 10TB 的数据.而企业数据分系统构建与扩张,导致不同应用场景下大数据冗余严重.行业亟需一个高效.统一的融合数仓,从海量数据中快速获取有效信息,从而洞察机遇.规避风险. 在这样的现状下,CarbonData 诞生了,作为首个由中国贡献给Apache社区的顶级开源项目,CarbonData 提供了一种新的融合数据存储方案,以一份数据同时支持多种大数据应用场景,并通过丰富的索引技术.字典编码.列存等特性提升了 IO 扫描和计

Spark SQL 物理执行计划各操作实现

SparkStrategy: logical to physical Catalyst作为一个实现无关的查询优化框架,在优化后的逻辑执行计划到真正的物理执行计划这部分只提供了接口,没有提供像Analyzer和Optimizer那样的实现. 本文介绍的是Spark SQL组件各个物理执行计划的操作实现.把优化后的逻辑执行计划映射到物理执行操作类这部分由SparkStrategies类实现,内部基于Catalyst提供的Strategy接口,实现了一些策略,用于分辨logicalPlan子类并替换为

Spark SQL中的DataFrame

在2014年7月1日的 Spark Summit 上,Databricks 宣布终止对 Shark 的开发,将重点放到 Spark SQL 上.在会议上,Databricks 表示,Shark 更多是对 Hive 的改造,替换了 Hive 的物理执行引擎,因此会有一个很快的速度.然而,不容忽视的是,Shark 继承了大量的 Hive 代码,因此给优化和维护带来了大量的麻烦.随着性能优化和先进分析整合的进一步加深,基于 MapReduce 设计的部分无疑成为了整个项目的瓶颈. 详细内容请参看 Sh

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(一)

Spark SQL, DataFrames 以及 Datasets 编程指南 概要 Spark SQL是Spark中处理结构化数据的模块.与基础的Spark RDD API不同,Spark SQL的接口提供了更多关于数据的结构信息和计算任务的运行时信息.在Spark内部,Spark SQL会能够用于做优化的信息比RDD API更多一些.Spark SQL如今有了三种不同的API:SQL语句.DataFrame API和最新的Dataset API.不过真正运行计算的时候,无论你使用哪种API或语

使用Spark SQL构建交互式查询引擎

前言 StreamingPro 原来仅仅是用来作为Spark Streaming的一个配置化+SQL封装,然而不经意间,已经涵盖了批处理,交互式查询等多个方面.今天就讲讲如何使用StreamingPro构建一个交互式查询引擎. 准备工作 下载StreamingPro README中有下载地址 如果你使用了 Spark 2.0 版本,则要下载对应页面上的Spark 安装包.因为目前Spark 2.0 默认支持Scala 2.11.我提供了一个机遇Scala 2.10版本的.  我们假设您将文件放在

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(四)

使用Spark SQL命令行工具 Spark SQL CLI是一个很方便的工具,它可以用local mode运行hive metastore service,并且在命令行中执行输入的查询.注意Spark SQL CLI目前还不支持和Thrift JDBC server通信. 用如下命令,在spark目录下启动一个Spark SQL CLI ./bin/spark-sql Hive配置在conf目录下hive-site.xml,core-site.xml,hdfs-site.xml中设置.你可以用