StreamingPro 支持多输入,多输出配置

前言
最近正好有个需求,就是从不同的数据库以及表里拉出数据,经过一定的处理放到ES里供查询,最好还能放个到parquet里,这样可以支持更复杂的SQL。之前StreamingPro是只能配置一个数据源的,所以做了些改造,方便配置多个数据源,以及多个写出。

最新的下载地址: https://pan.baidu.com/s/1eRO5Wga依然的,比较大,因为现在他还能支持Thrift JDBC /Rest SQL: 使用StreamingPro 快速构建Spark SQL on CarbonData

输入配置

{
        "name": "batch.sources",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test2",
            "header": "true"
          }
        ]
      },

以前用的是 batch.source, 如果你有多个输入源,则需要使用batch.sources 组件。每个源需要配置一个outputTable,也就是说这个源取个名字,方便后面使用。

如果是数据库,则可以这么写:

{
        "name": "batch.sources",
        "params": [
          {
             url:"jdbc:mysql://localhost/test?user=fred&password=secret",
            "dbtable":"table1",
            "driver":"com.mysql...",
            "path": "-",
            "format": "jdbc",
            "outputTable": "test",

          },
          {
            "path": "-",
            "format": "com.databricks.spark.csv",
            "outputTable": "test2",
            "header": "true"
          }
        ]
      },

输出

{
        "name": "batch.outputs",
        "params": [
          {
            "format": "json",
            "path": "file:///tmp/kk2",
            "inputTableName": "finalOutputTable"
          },
          {
            "format": "parquet",
            "path": "file:///tmp/kk3",
            "inputTableName": "finalOutputTable"
          }
        ]
      }

我这里同时输出为json以及parquet格式。

一个简单但是涉及点比较多的例子

{
  "convert-multi-csv-to-json": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "batch.sources",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test2",
            "header": "true"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select city as tp  from test limit 100",
            "outputTableName": "sqlTable"
          }
        ]
      },
      {
        "name": "batch.script",
        "params": [
          {
            "inputTableName": "sqlTable",
            "outputTableName": "scriptTable",
            "useDocMap": true
          },
          {
            "-": "val count = doc(\"tp\").toString.length;Map(\"count\"->count)"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select scriptTable.tp,scriptTable.count,test2.city,test2.name  from scriptTable,test2 limit 100",
            "outputTableName": "finalOutputTable"
          }
        ]
      },
      {
        "name": "batch.outputs",
        "params": [
          {
            "format": "json",
            "path": "file:///tmp/kk2",
            "inputTableName": "finalOutputTable"
          },
          {
            "format": "parquet",
            "path": "file:///tmp/kk3",
            "inputTableName": "finalOutputTable"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}

在 batch.sql 里你可以引用任何一个源的表,或者之前已经在batch.sql里申明的outputTable, 同理batch.script。 而在batch.outputs里,你则可以将任何一张表写入到MySQL,ES,HDFS等文件存储系统中。

将配置文件保存一下,然后就可以启动了:

SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-0.4.8-SNAPSHOT-online-1.6.1.jar    \
-streaming.name test    \
-streaming.platform spark \
-streaming.job.file.path file://$SHome/batch.json
时间: 2025-01-21 06:06:39

StreamingPro 支持多输入,多输出配置的相关文章

《Adobe Audition CS6中文版经典教程》——1.4 用Audition(Mac或Windows)测试输入和输出

1.4 用Audition(Mac或Windows)测试输入和输出 因为在前面的课程中已经指定了默认输入和输出,所以Audition默认将使用这些设备进行录音和播放.现在测试这些连接,以确保输入和输出配置正确. 1 选择File(文件)> New(新建)> Audio File(音频文件),在Waveform Editor内创建新的文件,打开一个对话框. 2 命名该文件,"取样速率"默认设置为Preferences内选择的值. 3 选择通道数."通道数"

Python入门(二)——IDE选择PyCharm,输入和输出,基础规范,数据类型和变量,常量,字符串和编码,格式化

Python入门(二)--IDE选择PyCharm,输入和输出,基础规范,数据类型和变量,常量,字符串和编码,格式化 我们从今天就开始正式的学习PY交易了,PY交易还行,我们有基础之后学习起来倒不是说那么的被动,我学习的是Python2.7,虽然现在随着版本的迁移至Python3,但是这个并不会对我们造成什么困扰,毕竟只是一个适应阶段 我们的学习资料:廖雪峰官方网站Python2.7教程 一.IDE选择PyCharm 我们虽然在前面一章已经配置过,但是我们还是有很多的IDE可以开发Py,比如su

Merlin的魔力: Merlin的新I/O缓冲区的输入和输出

Java 2 平台标准版(Java 2 Platform Standard Edition,J2SE)1.4 对 Java 平台的 I/O 处理能力做了大量更改.它不仅用流到流的链接方式继续支持以前 J2SE 发行版的基于流的 I/O 操作,而且 Merlin 还添加了新的功能 - 称之为新 I/O 类(NIO),现在这些类位于 java.nio 包中. I/O 执行输入和输出操作,将数据从文件或系统控制台等传送至或传送出应用程序.(有关 Java I/O 的其它信息,请参阅 参考资料). 缓冲

如何在Kettle4.2上面实现cassandra的输入与输出

这是在QQ群里有人问到的一个问题. 如何在pdi-ce-4.2.X-stable上面实现cassandra的输入与输出,或是实现hadoop,hbase,mapreduce,mongondb的输入输出? 在kettle中实现cassandra的输入与输出有以下两种方式: 第一种方式:自己编写cassandra输入输出组件 第二种方式:使用别人编写好的插件,将其集成进来 当然还有第三种方法,直接使用4.3版本的pdi. 第一种方法需要对cassandra很熟悉编写插件才可以做到,第二种方法可以通过

《Arduino实战》——第3章 简单项目:输入和输出 3.1 认识模拟电路

第3章 简单项目:输入和输出 本章涵盖的内容 着眼于模拟世界 读取一个模拟输入 使用扬声器发声 搭建一架五声音阶电子琴 在前一章中,我们了解了Arduino数字化的一面,循序渐进地搭建了一系列项目,展示了Arduino的输入.输出和中断等特性.在本章,我们将着眼于Arduino的另一面,看看它如何与我们周围的世界交流. 基本上,我们周边的世界可以分成两部分--模拟和数字--在这一章我们将研究与模拟世界的交互.让我们再从一个简单的元件开始,电位器,它可以为Arduino提供模拟量的输入.接下来,我

《Python参考手册(第4版•修订版)》——1.4 文件输入和输出

1.4 文件输入和输出 以下程序可打开一个文件并逐行读取该文件的内容: f = open("foo.txt") # 返回一个文件对象 line = f.readline() # 调用文件的readline()方法 while line: print line, # 后面跟','将忽略换行符 # print(line,end='') # 在Python 3中使用 line = f.readline() f.close() open()函数返回一个新的文件对象.调用该对象的方法可以执行各种

利用Python中的输入和输出功能进行读取和写入的教程_python

读取.写入和 Python 编写程序的最后一个基本步骤就是从文件读取数据和把数据写入文件.阅读完这篇文章之后,可以在自己的 to-do 列表中加上检验这个技能学习效果的任务.简单输出 贯穿整个系列,一直用 print 语句写入(输出)数据,它默认把表达式作为 string 写到屏幕上(或控制台窗口上).清单 1 演示了这一点.清单 1 重复了第一个 Python 程序 "Hello, World!",但是做了一些小的调整. 清单 1. 简单输出 >>> print &

java数组-这是我的代码,如何才能让数组实现连续的输入和输出

问题描述 这是我的代码,如何才能让数组实现连续的输入和输出 public static void main(String[] args) { // TODO Auto-generated method stub Scanner sc = new Scanner(System.in); System.out.println(""请输入数字个数""); int n = sc.nextInt(); System.out.println(""请输入数字&

无线网络最新技术:多重输入多重输出

由于多重输入多重输出(multiple-input, multiple-output:MIMO)的技术提供了一个扩展无线区域网络(WLAN)范围的极佳方式,因而最近成为了焦点.MIMO技术始于1985年,但直到现在才应用于晶片层级的装置,以大幅改善传输范围与容量. 由于MIMO并不是单一概念,而是由多种无线射频技术所组成,因此我们必须充份了解MIMO的运作和效能.当应用于WLAN时,有些MIMO技术能与现时的WLAN标准(如802.11a.802.11b与802.11g)相容,因而能扩充其传输范