StreamingPro添加Scala script 模块支持

SQL 在解析字符串方面,能力还是有限,因为支持的算子譬如substring,split等有限,且不具备复杂的流程表达能力。我们内部有个通过JSON描述的DSL引擎方便配置化解析,然而也有一定的学习时间成本。

我们当然可以通过SQL的 UDF函数等来完成字符串解析,在streamingpro中也很简单,只要注册下你的UDF函数库即可:

"udf_register": {
    "desc": "测试",
    "strategy": "....SparkStreamingRefStrategy",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "...SQLUDFCompositor",
        "params": [
          {
            "analysis": "streaming.core.compositor.spark.udf.func.MLFunctions"
          }
        ]
      }
    ]
  }

这样你就可以在SQL中使用MLfunctions里面所有的udf函数了。然而为此专门提供一个jar包也是略显麻烦。

这个时候如果能直接写脚本解析就好了,最好是能支持各种脚本,比如groovy,javascript,python,scala,java等。任何一个会编程的人都可以实现一个比较复杂的解析逻辑。

核心是ScriptCompositor模块:

{
        "name": "...ScriptCompositor",
        "params": [
          {
            "inputTableName": "test",
            "outputTableName": "test3"
          },
          {
            "raw": [
              "val Array(a,b)=rawLine.split(\"\t\");",
              "Map(\"a\"->a,\"b\"->b)"
            ]
          }
        ]
      }

如果我想在代码里直接处理所有的列,则如下:

{
        "name": "streaming.core.compositor.spark.transformation.ScriptCompositor",
        "params": [
          {
            "inputTableName": "test2",
            "outputTableName": "test3",
            "useDocMap": true
          },
          {
            "anykey": "val Array(a,b)=doc(\"raw\").toString.split(\"\t\");Map(\"a\"->a,\"b\"->b)"
          }
        ]
}

通过添加useDocMap为true,则你在代码里可以通过doc(doc是个Map[String,Any]) 来获取你想要的任何字段,然后形成一个新的Map。

如果你只要新生成Map里的字段,忽略掉旧的,则设置ignoreOldColumns=true 即可。

你可以把代码放到一个文件里,如下:

{
        "name": "....ScriptCompositor",
        "params": [
          {
            "inputTableName": "test",
            "outputTableName": "test3"
          },
          {
            "raw": "file:///tmp/raw_process.scala"
          }
        ]
      }

通过inputTableName指定输入的表,outputTableName作为输出结果表。 raw代表inputTableName中你需要解析的字段,然后通过你的scala脚本进行解析。在脚本中 rawLine 是固定的,对应raw字段(其他字段也是一样)的值。脚本只有一个要求,最后的返回结果暂时需要是个Map[String,Any]。

这里,你只是提供了一个map作为返回值,作为一行,然后以outputTableName指定的名字输出,作为下一条SQL的输入,所以StreamingPro需要推测出你的Schema。 数据量大到一定程度,推测Schema的效率就得不到保证,这个时候,你可以通过配置schema来提升性能:

{
        "name": "....ScriptCompositor",
        "params": [
          {
            "inputTableName": "test",
            "outputTableName": "test3",
            "schema": "file:///tmp/schema.scala",
            "useDocMap": true
          },
          {
            "raw": "file:///tmp/raw_process.scala"
          }
        ]
      }

schema.scala的内容大致如下:

Some(
StructType(
Array(
StructField("a", StringType, true),
StructField("b", StringType, true)))
)

后续roadmap是:

  1. 支持外部脚本,比如放在hdfs或者http服务器上。
  2. 支持java 脚本
  3. 支持javascript脚本
  4. 支持 python 脚本
  5. 支持 ruby脚本
  6. 支持 groovy 脚本

举个案例,从HDFS读取一个文件,并且映射为只有一个raw字段的表,接着通过ScriptCompositor配置的scala代码解析raw字段,展开成a,b两个字段,然后继续用SQL继续处理,最后输出。

{
  "convert_data_parquet": {
    "desc": "测试",
    "strategy": "...SparkStreamingStrategy",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "...SQLSourceCompositor",
        "params": [
          {
            "path": "file:///tmp/hdfsfile",
            "format": "org.apache.spark.sql.execution.datasources.hdfs",
            "fieldName": "raw"
          }
        ]
      },
      {
        "name": "...JSONTableCompositor",
        "params": [
          {
            "tableName": "test"
          }
        ]
      },
      {
        "name": "...ScriptCompositor",
        "params": [
          {
            "inputTableName": "test",
            "outputTableName": "test3"
          },
          {
            "raw": [
              "val Array(a,b)=rawLine.split(\"\t\");",
              "Map(\"a\"->a,\"b\"->b)"
            ]
          }
        ]
      },
      {
        "name": "...transformation.SQLCompositor",
        "params": [
          {
            "sql": "select a,b  from test3 "
          }
        ]
      },
      {
        "name": "...streaming.core.compositor.spark.output.SQLUnitTestCompositor",
        "params": [
          {
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}

体验地址: https://github.com/allwefantasy/streamingpro/blob/master/README.md#downloads

时间: 2024-08-08 03:31:53

StreamingPro添加Scala script 模块支持的相关文章

PaaS供应商Engine Yard添加对Node.js支持

&http://www.aliyun.com/zixun/aggregation/37954.html">nbsp;Engine Yard Labs项目的开发是用以加快对新特性和服务的实验. 平台即服务(PaaS)供应商Engine Yard在其新项目里添加了对Node.js的支持,用以加快对新特性和服务的实验. Engine Yard Labs项目添加了一系列处于实验中的服务."它让我们在市场中更快地实现创新,"Engine Yard产品管理和营销副总裁Mik

NGINX 加载动态模块(NGINX 1.9.11开始增加加载动态模块支持)

NGINX 1.9.11开始增加加载动态模块支持,从此不再需要替换nginx文件即可增加第三方扩展.目前官方只有几个模块支持动态加载,第三方模块需要升级支持才可编译成模块. tinywan@tinywan:~/nginx-1.12.0$ ./configure --help | grep dynamic --with-http_xslt_module=dynamic enable dynamic ngx_http_xslt_module --with-http_image_filter_modu

为excel vba中添加、删除模块并插入全过程图文详解

  为excel vba中添加.删除模块并插入全过程图文详解         方法/步骤 1.点按快速启动栏excel 程序图标 进入excel 界面 点击选中任意单元格 然后按alt+f11 进入vbe界面 2.点击菜单栏 插入命令 在弹出的活动菜单中点按模块命令 3.另一种方式插入模块的方法可以在工程资管管理器中鼠标点击空白处 右键单击鼠标 在弹出的快捷菜单中选择插入命令 二级菜单中选择模块命令 4.如图所示模块1.模块2分别是通过菜单栏插入命令 和工程资源管理器点击右键创建的模块 5.如果

iOS 新特性分列式 之 iOS 7.x - 主要内容:扁平 UI、64位支持、多任务加强、Objective-C 模块支持

iOS 新特性分列式 之 iOS 7.x - 主要内容:扁平 UI.64位支持.多任务加强.Objective-C 模块支持 太阳火神的美丽人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商业用途-保持一致"创作公用协议 转载请保留此句:太阳火神的美丽人生 -  本博客专注于 敏捷开发及移动和物联设备研究:iOS.Android.Html5.Arduino.pcDuino,否则,出自本博客的文章拒绝转载或再转载,谢谢合作. iOS7.1 支持外

linux下不重新编译apache添加安装mod_ssl模块和错误的处理方法

安装步骤 1.进入apache源码目录. 2.进入module文件夹下的ssl目录. 3.找到oepnssl 的include路径,ubuntu系统是在/usr/include/openssl目录. 4.运行apxs root@v238:~/httpd-2.2.26/modules/ssl# /usr/local/apache2/bin/apxs -i -c -a -D HAVE_OPENSSL=1 -I /usr/include/openssl -lcrypto -lssl -ldl *.c

如何添加iptables/netfilter模块到安卓内核

问题描述 如何添加iptables/netfilter模块到安卓内核 如何添加iptables/netfilter模块到安卓内核,,谁能帮我一下啊,把内核添加netfilter模块

ssh做的一个web项目,添加xfire的webservice支持后报错,小弟急求

问题描述 由于我们提供的接口有点多,我就想在原来的工程上添加xfire的webservice支持,谁知道,添加了报错,看了不少帖子说是spring的jar包和spring-.1.2.6.jar冲突,然后把后面的jar包删除,不包那个错,[dtss]2015-01-1517:35:39,346ERROR[main]org.springframework.ws.transport.http.MessageDispatcherServlet.initServletBean-290|Contextini

VBasic 6.0 怎么给控件添加它所不支持的功能

问题描述 VBasic 6.0 怎么给控件添加它所不支持的功能 VBasic 想改变下XP工具条控件的功能,让它允许用户鼠标拖动和释放,求具体的做法 解决方案 你可以自己封装一个控件,把原来的控件嵌入其中,添加你的功能

tipask问题添加热门问答模块

有个网站是用tipask来搭建的问答系统,今天客户要求添加热门问题模块,之前都是用tipask的原生模块,比如说推荐问答.未解决问答等,看来只能自己二次开发了,上网找了下关于tipask的二次开发还真是少--于是简单翻了下tipask的代码,用我的方式增加了热门问题模块. tipask的主要函数都在model文件夹下,找到系统核心文件base.class.php,里面有这样一个函数fromcache,用于生成函数下各种模块,例如悬赏问题.精彩推荐等,我们要做的就是在这里按照前后代码加上一条我们需