Spark连接JDBC数据源

在实际的项目环境中,成熟的技术体系对关系型数据库的依赖远远超过hdfs,而且对大数据运算的结果,实践中也倾向于保存到数据库中,以便进行多种方式的可视化。所以本次实践主要完成spark从mysql中读取和写入数据。一般这个操作有两种方式,一种是自己建立jdbc连接,像一般数据库操作一样的写法,一种就是利用spark自带的jdbc操作函数。

首先要把mysql jdbc connector的jar包上传到集群中每台机器的spark/jars目录,这是一个讨巧的办法,因为spark运行之前一定把这里面所有的jar都加到CALSS_PATH里面去了。

通过spark.read.jdbc读取出来的返回值是DataFrame,如下代码所示。`

val rfidCardMap = spark.read.jdbc(mysqlHelper.DB_URL_R,"t_rfid_card",Array("org_id="+ ORG_ID), mysqlHelper.PROPERTIES).map(row => {
  (row.getAs[String]("card_id"), row.getAs[String]("card_label"))
}).rdd.collect() toMap`

此函数需要传入参数依次为:数据库连接url,表名,过滤条件表达式列表,带有用户名密码信息的属性对象。读取了数据之后,形成一个(String,String)对象返回。这里有两个要注意的:

  1. getAs的类型必须和数据库中列的类型严格匹配
  2. 返回元组类型的对象比返回自定义类的对象写法要轻松一些。如果是返回自定义类的对象,编译会出错,一般说法是语句之前加入import spark.implicits._会有效,但未必见得。尚待进一步探索。

如下是一个比较复杂的解析处理代码示例。`

val teamWeightMapRDD = dfMedicalWaste.map(row => {
  (rfidCardMap.get(row.getAs[String]("team_id")) toString,
  sdf.format(new Date(row.getAs[Timestamp]("rec_ts").getTime)) toInt,
  row.getAs[Double]("mw_weight"))
}).rdd.cache()`

这里sdf就是java里面常用的SimpleDateFormat,它把一个时间戳字段转化成了6个长度的整型。

处理完成后,将结果回写数据库时采用的是本地jdbc连接写法,这块内容很普通了。

这次实践有个特别清晰的理解就是scala的类型推断,由于要统计某个地点一段时间之内的产量总和、平均产量、最大和最小单位时间产量,使用到了DoubleRDDFunctions,代码如下:`

val weightArrayRDD = teamWeightMapRDD.filter(teamWeight => {
  teamWeight._1 == teamName && teamWeight._2 >= week._1 && teamWeight._2 < week._2
}).map(teamWeight => {
  (teamWeight._2, teamWeight._3)
}).reduceByKey((a, b) =>
  a + b
).map(item => {
  item._2
}).cache()`

使用的时候如下:`

line.append(weightArrayRDD sum).append("\t")
line.append(weightArrayRDD mean).append("\t")
line.append(weightArrayRDD max).append("\t")
line.append(weightArrayRDD min).append("\t")`

scala会根据返回值类型进行类型推断,从而匹配可以使用的函数,同样是RDD或者DataFram,包含的类型不同,可以使用的函数也不同,这一切都是透明的。

时间: 2025-01-29 23:09:30

Spark连接JDBC数据源的相关文章

编码错误-连接池数据源的配置文件错误

问题描述 连接池数据源的配置文件错误 <?xml version="1.0" encoding="UTF-8"?> type="javax.sql.DataSource" maxActive="100" maxIdle="30" maxWait="10000" username="HUANGZHIHAO" password="123456&quo

SQL Server 2008 连接JDBC图文

  SQL Server 2008是目前windows上使用最多的sql数据库,2008的安装机制是基于framework重写的,特点是非常耗时间(我的小本本配置还是可以的.^_^).但不需要原ISO或隐藏起来的MSI文件(如果你不小心手工删除这些安装文件的话,也不必担心.) 这也是为什么大多数时候,SQL Server 2008的安装日志文件setup.log会有1G大小的原因.因为安装工序实在是太庞大了. SQL Server 2008是一个重大的产品版本,它推出了许多新的特性和关键的改进,

win7 64位下 excel 连接odbc 数据源报错,32位下可以正常

问题描述 win7 64位下 excel 连接odbc 数据源报错,32位下可以正常 odbc数据源32位 excel 32位 配置的odbc数据源如图所示 在excel中 连接到odbc数据源 这种错误怎么解决 解决方案 你选错驱动了,这是Oracle的驱动,不是Access的. 解决方案二: WIN7 64位 ODBC连接Oracle 32位报错问题

Drill官网文档翻译五:连接到数据源

存储插件是Drill中,连接到数据源的模块.一个存储插件通常会优化Drill查询的执行,提供数据的定位,命名空间下的配置和读数据要用到的格式.Drill已经内置了一些存储插件,你只需要根据你的环境配置一下就可以使用了.借助存储插件,你可以连接到各种数据源,像数据库,本地或是分布式的文件,或是Hive数据库. 你可以修改一个存储插件的默认配置X,并给一个新的唯一的名字"Y".这个新的文档就会把Y当成一个完全不同的插件,虽然它本身只是原有插件重新配置了一下.当你执行一个drill查询的时候

php连接odbc数据源并保存与查询数据的方法_php技巧

本文实例讲述了php连接odbc数据源并保存与查询数据的方法.分享给大家供大家参考. 具体实现代码如下: 复制代码 代码如下: $connstr = "driver=microsoft access driver (*.mdb);dbq=".realpath("db.mdb");     $connid = odbc_connect($connstr,"","",sql_cur_use_odbc); $odbc_exec =

JDBC数据源连接池配置及应用_java

使用JDBC建立数据库连接的两种方式: 1.在代码中使用DriverManager获得数据库连接.这种方式效率低,并且其性能.可靠性和稳定性随着用户访问量得增加逐渐下降. 2.使用配置数据源的方式连接数据库,该方式其实质就是在上述方法的基础上增加了数据库连接池,这种方式效率高. 数据源连接池的方式连接数据库与在代码中使用DriverManager获得数据库连接存在如下差别: 1)数据源连接池的方式连接数据库是在程序中,通过向一个JNDI(Java Naming and  Directory In

JDBC数据源(DataSource)的简单实现

数据源技术是Java操作数据库的一个很关键技术,流行的持久化框架都离不开数据源的应用. 数据源提供了一种简单获取数据库连接的方式,并能在内部通过一个池的机制来复用数据库连接,这 样就大大减少创建数据库连接的次数,提高了系统性能. 对于数据源的应用,一般都选择实用开源的数据源或数据库连接池来使用,比如,常见的有DBCP. C3P0.Proxool等等.但用起来有些笨重和麻烦.下面自己手动实现个精简的数据源,代码如下: package com.lavasoft.simpledatesource; i

从实例出发:如何删除JDBC数据源

数据|数据源 当我们确定一个数据源不会再被使用的时候,可以将它删除,如示例13-7所示.[程序源代码] 1 // ==================== Program Description ==========================2 // 程序名称:示例13-7 : DeleteDataSource.java3 // 程序目的:删除数据源4 // ==============================================================5 im

在JBuilder中连接JDBC方法

JBuilder的初学者总是为配置JDBC发愁..下面介绍就在msSQL中的JDBC配置: 一,你确定你安装的jdbc是否完全版,因为有个单机版只能连接本机上的sql,单机版约2m,完全版约6.5m 二.安装好jdbc后,假如目录在C:ProgramFilesMicrosoftSQLServer2000JDBC,打开Jbuilder, 选择Tools-->ConfigureLibraries,然后在左边的列表框下选择New,填入:Name:sql,Location:UserHome, 然后点击A