Spark jdbc postgresql数据库连接和写入操作源码解读

概述:Spark postgresql jdbc 数据库连接和写入操作源码解读,详细记录了SparkSQL对数据库的操作,通过java程序,在本地开发和运行。整体为,Spark建立数据库连接,读取数据,将DataFrame数据写入另一个数据库表中。附带完整项目源码(完整项目源码github)。

1.首先在postgreSQL中创建一张测试表,并插入数据。(完整项目源码Github)

1.1. 在postgreSQL中的postgres用户下,创建 products

CREATE TABLE products (
    product_no integer,
    name text,
    price numeric
);

1.2. 在 products 插入数据

INSERT INTO products (product_no, name, price) VALUES
    (1, 'Cheese', 9.99),
    (2, 'Bread', 1.99),
    (3, 'Milk', 2.99);

查看数据库写入结果。

2.编写SPARK程序。(完整项目源码Github

2.1.读取Postgresql某一张表的数据为DataFrame(完整项目源码Github

SparkPostgresqlJdbc.java
Properties connectionProperties = new Properties();

//增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)
connectionProperties.put("user","postgres");
connectionProperties.put("password","123456");
connectionProperties.put("driver","org.postgresql.Driver");

//SparkJdbc读取Postgresql的products表内容
Dataset<Row> jdbcDF = spark.read()
        .jdbc("jdbc:postgresql://localhost:5432/postgres","products",connectionProperties).select("name","price");

//显示jdbcDF数据内容
jdbcDF.show();

2.2.写入Postgresql某张表中

//将jdbcDF数据新建并写入newproducts,append模式是连接模式,默认的是"error"模式。
jdbcDF.write().mode("append")
        .jdbc("jdbc:postgresql://localhost:5432/postgres","newproducts",connectionProperties);

3.运行程序,并查看结果(如果在IDEA中开发不熟练,可以看我另一篇博文spark (java API) 在Intellij IDEA中开发并运行)。

3.1.直接在intellij IDEA(社区版)中运行。

a.在运行按钮的“Edit Configeration”中的VM option中添加“-Dspark.master=local”

3.2.在终端(Terminal)中运行。

/opt/spark-2.1.0-bin-hadoop2.7/bin/spark-submit \
  --class "SparkPostgresqlJdbc" \
  --master local[4] \
  --driver-class-path /home/xiaolei/.m2/repository/org/postgresql/postgresql/9.4.1212/postgresql-9.4.1212.jar \
  target/SparkPostgresqlJdbc-1.0-SNAPSHOT.jar

其中 --driver-class-path 指定下载的postgresql JDBC数据
库驱动路径,命令执行要在项目的根目录中(/home/xiaolei/Data/GS/Spark/SparkPostgresqlJdbc)。

查看Spark写入数据库中的数据

4.以下为项目中主要源码(完整项目源码Github):

4.1.项目配置源码pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>wangxiaolei</groupId>
    <artifactId>SparkPostgresqlJdbc</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.4.1212</version>
        </dependency>
    </dependencies>
</project>

4.2.java源码SparkPostgresqlJdbc.java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;

/**
 * MIT.
 * Author: wangxiaolei(王小雷).
 * Date:17-2-9.
 * Project:SparkPostgresqlJdbc.
 */
public class SparkPostgresqlJdbc {
    public static void main (String[] args) {

        SparkSession spark = SparkSession
                .builder()
                .appName("SparkPostgresqlJdbc")
                .config("spark.some.config.option","some-value")
                .getOrCreate();
    //启动runSparkPostgresqlJdbc程序
        runSparkPostgresqlJdbc(spark);

        spark.stop();

    }

    private static void runSparkPostgresqlJdbc(SparkSession spark){
        //new一个属性
        System.out.println("确保数据库已经开启,并创建了products表和插入了数据");
        Properties connectionProperties = new Properties();

        //增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)
        System.out.println("增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)");
        connectionProperties.put("user","postgres");
        connectionProperties.put("password","123456");
        connectionProperties.put("driver","org.postgresql.Driver");

        //SparkJdbc读取Postgresql的products表内容
        System.out.println("SparkJdbc读取Postgresql的products表内容");
        Dataset<Row> jdbcDF = spark.read()
                .jdbc("jdbc:postgresql://localhost:5432/postgres","products",connectionProperties).select("name","price");
        //显示jdbcDF数据内容
        jdbcDF.show();

        //将jdbcDF数据新建并写入newproducts,append模式是连接模式,默认的是"error"模式。
        jdbcDF.write().mode("append")
                .jdbc("jdbc:postgresql://localhost:5432/postgres","newproducts",connectionProperties);

    }
}

完整项目源码Github

时间: 2024-11-02 03:50:07

Spark jdbc postgresql数据库连接和写入操作源码解读的相关文章

Apache OFbiz entity engine源码解读

简介 最近一直在看Apache OFbiz entity engine的源码.为了能够更透彻得理解,也因为之前没有看人别人写过分析它的文章,所以决定自己来写一篇. 首先,我提出一个问题,如果你有兴趣可以想一下它的答案: JDBC真的给数据访问提供了足够的抽象,以至于你可以在多个支持jdbc访问的数据库之间任意切换而完全不需要担心你的数据访问代码吗? 我曾经在微博上有过关于该问题的思考: 其实这个感慨正是来自于我之前在看的一篇关于jdbc的文章,里面提到了jdbc中的一些设计模式(工厂方法),提供

jQuery源码解读之addClass()方法分析

 这篇文章主要介绍了jQuery源码解读之addClass()方法,注释形式较为详细的分析了addClass()方法的实现技巧与相关注意事项,具有一定参考借鉴价值,需要的朋友可以参考下     本文较为详细的分析了jQuery源码解读之addClass()方法.分享给大家供大家参考.具体分析如下: 给jQuery原型对象扩展addClass功能,jQuery.fn就是jQuery.prototype 代码如下: jQuery.fn.extend({ /* 可以看出这是一个函数名叫addClass

Apache Beam WordCount编程实战及源码解读

概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上.完整项目Github源码 负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来. 1.Apache Beam编程实战–前言,Apache B

基于Docker的TensorFlow机器学习框架搭建和实例源码解读

概述:基于Docker的TensorFlow机器学习框架搭建和实例源码解读,TensorFlow作为最火热的机器学习框架之一,Docker是的容器,可以很好的结合起来,为机器学习或者科研人员提供便捷的机器学习开发环境,探索人工智能的奥秘,容器随开随用方便快捷.源码解析TensorFlow容器创建和示例程序运行,为热爱机器学者降低学习难度. 默认机器已经装好了Docker(Docker安装和使用可以看我另一篇博文:Ubuntu16.04安装Docker1.12+开发实例+hello world+w

jQuery 1.5 源码解读 面向中高阶JSER_jquery

几乎很难从jQuery分离其中的一部分功能.所以在这里我分享下应该读 jQuery 源码的一些成果,以及读源码的方法.啃代码是必须的. 1. 代码折叠是必须的. 因此必须在支持语法折叠的编辑器里打开源码. 根据折叠层次,我们可以很快知道: 所有 jQuery 的代码都在一个函数中: (function( window, undefined ) {// jQuery 代码 })(window); 这样可以避免内部对象污染全局.传入的参数1是 window, 参数2是 undefined , 加快j

CI框架源码解读之利用Hook.php文件完成功能扩展的方法_php实例

本文实例讲述了CI框架源码解读之利用Hook.php文件完成功能扩展的方法.分享给大家供大家参考,具体如下: 看了hook.php的源码,就知道CI使用hook来进行扩展的原理了. hook的基本知识 http://codeigniter.org.cn/user_guide/general/hooks.html CI中hook的使用经历了一个:开启hook,定义hook,调用hook,执行hook的过程. 手册中已经告知了开启.定义.调用的方法.那么hook的实现原理是啥呢. <?php if

jQuery源码解读之addClass()方法分析_jquery

本文较为详细的分析了jQuery源码解读之addClass()方法.分享给大家供大家参考.具体分析如下: 给jQuery原型对象扩展addClass功能,jQuery.fn就是jQuery.prototype 复制代码 代码如下: jQuery.fn.extend({ /* 可以看出这是一个函数名叫addClass的插件方法. */     addClass: function( value ) {         var classes, elem, cur, clazz, j, finalV

SpringMVC源码解读之HandlerMapping_java

概述 对于Web开发者,MVC模型是大家再熟悉不过的了,SpringMVC中,满足条件的请求进入到负责请求分发的DispatcherServlet,DispatcherServlet根据请求url到控制器的映射(HandlerMapping中保存),HandlerMapping最终返回HandlerExecutionChain,其中包含了具体的处理对象handler(也即我们编程时写的controller)以及一系列的拦截器interceptors,此时DispatcherServlet会根据返

PHP网页游戏学习之Xnova(ogame)源码解读(十六)_php实例

十九.攻击任务(MissionCaseAttack.php) 按照舰队任务的编号,排在第一个的就是攻击任务.这个代码很长,看的时候要有耐心. 好在引用的内容并不是很多,并且给出了详细的注释,读者不会晕头转向. function MissionCaseAttack ($FleetRow) { global $user, $phpEx, $xnova_root_path, $pricelist, $lang, $resource, $CombatCaps; // 在舰队的记录中,fleet_star