Spark Streaming + Spark SQL 实现配置化ETL流程

项目地址

前言

传统的Spark Streaming程序需要:

  • 构建StreamingContext
  • 设置checkpoint
  • 链接数据源
  • 各种transform
  • foreachRDD 输出

通常而言,你可能会因为要走完上面的流程而构建了一个很大的程序,比如一个main方法里上百行代码,虽然在开发小功能上足够便利,但是复用度更方面是不够的,而且不利于协作,所以需要一个更高层的开发包提供支持。

如何开发一个Spark Streaming程序

我只要在配置文件添加如下一个job配置,就可以作为标准的的Spark Streaming 程序提交运行:

{

  "test": {
    "desc": "测试",
    "strategy": "streaming.core.strategy.SparkStreamingStrategy",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor",
        "params": [
          {
            "metadata.broker.list":"xxx",
            "auto.offset.reset":"largest",
            "topics":"xxx"
          }
        ]
      },
      {
        "name": "streaming.core.compositor.spark.JSONTableCompositor",
        "params": [{"tableName":"test"}
        ]
      },
      {
        "name": "streaming.core.compositor.spark.SQLCompositor",
        "params": [{"sql":"select a from test"}
        ]
      },
      {
        "name": "streaming.core.compositor.RDDPrintOutputCompositor",
        "params": [
          {
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}

上面的配置相当于完成了如下的一个流程:

  1. 从Kafka消费数据
  2. 将Kafka数据转化为表
  3. 通过SQL进行处理
  4. 打印输出

是不是很简单,而且还可以支持热加载,动态添加job等

特性

该实现的特性有:

  1. 配置化
  2. 支持多Job配置
  3. 支持各种数据源模块
  4. 支持通过SQL完成数据处理
  5. 支持多种输出模块

未来可扩展的支持包含:

  1. 动态添加或者删除job更新,而不用重启Spark Streaming
  2. 支持Storm等其他流式引擎
  3. 更好的多job互操作

配置格式说明

该实现完全基于ServiceframeworkDispatcher 完成,核心功能大概只花了三个小时。

这里我们先理出几个概念:

  1. Spark Streaming 定义为一个App
  2. 每个Action定义为一个Job.一个App可以包含多个Job

配置文件结构设计如下:

{

  "job1": {
    "desc": "测试",
    "strategy": "streaming.core.strategy.SparkStreamingStrategy",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor",
        "params": [
          {
            "metadata.broker.list":"xxx",
            "auto.offset.reset":"largest",
            "topics":"xxx"
          }
        ]
      } ,
    ],
    "configParams": {
    }
  },
  "job2":{
   ........
 }
}

一个完整的App 对应一个配置文件。每个顶层配置选项,如job1,job2分别对应一个工作流。他们最终都会运行在一个App上(Spark Streaming实例上)。

  • strategy 用来定义如何组织 compositor,algorithm, ref 的调用关系
  • algorithm作为数据来源
  • compositor 数据处理链路模块。大部分情况我们都是针对该接口进行开发
  • ref 是对其他job的引用。通过配合合适的strategy,我们将多个job组织成一个新的job
  • 每个组件( compositor,algorithm, strategy) 都支持参数配置

上面主要是解析了配置文件的形态,并且ServiceframeworkDispatcher 已经给出了一套接口规范,只要照着实现就行。

模块实现

那对应的模块是如何实现的?本质是将上面的配置文件,通过已经实现的模块,转化为Spark Streaming程序。

以SQLCompositor 的具体实现为例:

class SQLCompositor[T] extends Compositor[T] {

  private var _configParams: util.List[util.Map[Any, Any]] = _
  val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName)

//策略引擎ServiceFrameStrategy 会调用该方法将配置传入进来
  override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
    this._configParams = configParams
  }

// 获取配置的sql语句
  def sql = {
    _configParams(0).get("sql").toString
  }

  def outputTable = {
    _configParams(0).get("outputTable").toString
  }

//执行的主方法,大体是从上一个模块获取SQLContext(已经注册了对应的table),
//然后根据该模块的配置,设置查询语句,最后得到一个新的dataFrame.
// middleResult里的T其实是DStream,我们会传递到下一个模块,Output模块
//params参数则是方便各个模块共享信息,这里我们将对应处理好的函数传递给下一个模块
  override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
    var dataFrame: DataFrame = null
    val func = params.get("table").asInstanceOf[(RDD[String]) => SQLContext]
    params.put("sql",(rdd:RDD[String])=>{
      val sqlContext = func(rdd)
      dataFrame = sqlContext.sql(sql)
      dataFrame
    })
    middleResult
  }
}

上面的代码就完成了一个SQL模块。那如果我们要完成一个自定义的.map函数呢?可类似下面的实现:

abstract class MapCompositor[T,U] extends Compositor[T]{
  private var _configParams: util.List[util.Map[Any, Any]] = _
  val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName)

  override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
    this._configParams = configParams
  }

  override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
    val dstream = middleResult(0).asInstanceOf[DStream[String]]
    val newDstream = dstream.map(f=>parseLog(f))
    List(newDstream.asInstanceOf[T])
  }
  def parseLog(line:String): U
}

class YourCompositor[T,U] extends MapCompositor[T,U]{

 override def parseLog(line:String):U={
     ....your logical
  }
}

同理你可以实现filter,repartition等其他函数。

总结

该方式提供了一套更为高层的API抽象,用户只要关注具体实现而无需关注Spark的使用。同时也提供了一套配置化系统,方便构建数据处理流程,并且复用原有的模块,支持使用SQL进行数据处理。

广告

这个只是我们大系统的一小部分,愿意和我们一起进一步完善该系统么?欢迎加入我们(请私信我)。

时间: 2025-01-26 12:49:22

Spark Streaming + Spark SQL 实现配置化ETL流程的相关文章

Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL、DataFrame与Spark Streaming

主要内容 Spark SQL.DataFrame与Spark Streaming 1. Spark SQL.DataFrame与Spark Streaming 源码直接参照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala import org.apache.spark.SparkConf

《Spark大数据分析实战》——3.2节Spark Streaming

3.2 Spark StreamingSpark Streaming是一个批处理的流式计算框架.它的核心执行引擎是Spark,适合处理实时数据与历史数据混合处理的场景,并保证容错性.下面将对Spark Streaming进行详细的介绍.3.2.1 Spark Streaming简介Spark Streaming是构建在Spark上的实时计算框架,扩展了Spark流式大数据处理能力.Spark Streaming将数据流以时间片为单位进行分割形成RDD,使用RDD操作处理每一块数据,每块数据(也就

Spark Streaming Direct Approach (No Receivers) 分析

前言 这个算是Spark Streaming 接收数据相关的第三篇文章了. 前面两篇是: Spark Streaming 数据产生与导入相关的内存分析 Spark Streaming 数据接收优化 Spark Streaming 接受数据的方式有两种: Receiver-based Approach Direct Approach (No Receivers) 上面提到的两篇文章讲的是 Receiver-based Approach . 而这篇文章则重点会分析Direct Approach (N

Spark Streaming Dynamic Resource Allocation

Problem Statement DRA has already been implemented since Spark 1.2 . However the existing Spark DRA on Yarn implementation does not embody the specific property of Spark Streaming.   Spark DRA works when there are some executors being idle for  remov

《循序渐进学Spark 》Spark架构与集群环境

Spark架构与集群环境 本章首先介绍Spark大数据处理框架的基本概念,然后介绍Spark生态系统的主要组成部分,包括Spark SQL.Spark Streaming.MLlib和GraphX,接着简要描述了Spark的架构,便于读者认识和把握,最后描述了Spark集群环境搭建及Spark开发环境的构建方法. 1.1 Spark概述与架构 随着互联网规模的爆发式增长,不断增加的数据量要求应用程序能够延伸到更大的集群中去计算.与单台机器计算不同,集群计算引发了几个关键问题,如集群计算资源的共享

为什么越来越多的公司在使用Spark Streaming

Databricks最近对1400多家Spark用户进行了一次调查,结果显示这些用户对Spark Streaming的使用率与2014年相比增长了56%,另外,有48%的受访者将Spark Streaming标记为最常用的Spark组件.在Spark Streaming不断增长的用户群中,Uber.Netflix和Pinterest等家喻户晓的公司赫然在列,那么为什么使用Spark Streaming加速业务发展的公司越来越多呢?最近Spark Streaming的主要开发人员Tathagata

如何基于Spark Streaming构建实时计算平台

1.前言 随着互联网技术的迅速发展,用户对于数据处理的时效性.准确性与稳定性要求越来越高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了很多公司一个很大的挑战. 自2015年携程实时计算平台搭建以来,经过两年多不断的技术演进,目前实时集群规模已达上百台,平台涵盖各个SBU与公共部门数百个实时应用,全年JStorm集群稳定性达到100%.目前实时平台主要基于JStorm与Spark Streaming构建而成,相信关注携程实时平台的朋友在去年已经看到一篇关于携程实时平台的分享:

Kafka+Spark Streaming+Redis实时计算整合实践

基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming.Spark SQL.MLlib.GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑.这也得益于Scala编程语言的简洁性.这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算. 我们的应用场景是分析用户使用手机App的行为,描述如下所示: 手机客户端会收集用户的行为事件(我们以点击

《Spark官方文档》Spark Streaming编程指南(二)

累加器和广播变量 首先需要注意的是,累加器(Accumulators)和广播变量(Broadcast variables)是无法从Spark Streaming的检查点中恢复回来的.所以如果你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,因为这样累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化.代码示例如下: Scala Java Python object WordBlacklist { @volatile private var ins