Flink关系型API的公共部分

关系型程序的公共部分

下面的代码段展示了Table&SQL API所编写流式程序的程序模式:

val env = StreamExecutionEnvironment.getExecutionEnvironment

//创建TableEnvironment对象
val tableEnv = TableEnvironment.getTableEnvironment(env)

//注册表
tableEnv.registerTable("table1", ...)           //或者
tableEnv.registerTableSource("table2", ...)     //或者
tableEnv.registerExternalCatalog("extCat", ...) 

//基于Table API的查询创建Table对象
val tapiResult = tableEnv.scan("table1").select(...)
//从SQL查询创建Table
val sqlResult  = tableEnv.sql("SELECT ... FROM table2 ...")

//将Table API的查询到的结果表输出到TableSink,SQL查询到的结果表同样如此
tapiResult.writeToSink(...)

//触发执行
env.execute()

通过分步解读以上代码段,我们可以发现一个关系型的Flink程序大致分为如下几步:

  • 构建环境对象
  • 注册表、catalog相关的信息(source部分)
  • 调用Table&SQL API创建表、对表进行查询
  • 得到结果表的数据并输出(sink部分)
  • 调用环境对象的execute方法触发程序执行

这几步中,跟关系型API有关的2 ~ 4步,我们发现在一个关系型的程序中,用户既可以混合使用Table&SQL API,并且除了后端环境对象的不同,TableEnvironment相关的部分在API层面上具有相同的抽象,也就是说,一套程序主体既可以适用于batch模式也可以适用于Streaming模式,这对用户而言也许更具吸引力。接下来,我们将对关系型API相关的几步进行解读。

TableEnvironment

跟streaming和batch程序一样,关系型程序也会要求先构建一个环境对象。因为Flink致力于为streaming和batch提供统一的关系型API,因此关系型程序只有唯一的环境对象TableEnvironment。

但具体到一些内部实现上,streaming跟batch还是有着较大的差异。所以,TableEnvironment针对两者又扩展了StreamTableEnvironment和BatchTableEnvironment这两个抽象类。这两个类主要提供streaming和batch的特定语义,比如提供DataSet、DataStream跟Table之间的转换。

最终的关系型程序中,原先streaming跟batch的环境对象和TableEnvironment对象都是必须的,它们承担着不同的职责:

  • streaming/batch 环境对象:辅助构建Table环境对象、触发程序执行调用、构建DataStream、DataSet;
  • TTableEnvironment对象:构建关系型程序的主体逻辑;

catalog

在Calcite中存在多个概念,其中一个概念就是“catalog”。从关系型的观点上来看,catalog处于所有的schema(外部的、概念上的、内部的)以及mapping(外部与概念以及概念与内部之间)之上的[1]。从SQL标准的角度来看,catalog在一个SQL环境中被称之为schame的集合,一个SQL环境包含零个或多个catalog,而一个catalog包含一个或多个schema(总是会包含一个名为“INFORMATION_SCHEMA”的schema)[2]。

在Calcite中,catalog定义了可在SQL查询中被访问的元数据跟命名空间。其中包含了如下几个概念:

  • Schema:一个定义了模式与表的集合,可被任意地嵌套
  • Table:表示一个单独的数据集,字段通过RelDataType来定义
  • RelDataType:表示在数据集中的字段,支持所有的SQL数据类型,包括结构体与数组
  • Statistic:提供用于优化的表统计信息

以一个SQL查询为例,来认识一下catalog中包含的那些概念:

在接下来讲source的这一小节,我们将看到被注册进Table&SQL API中当作“表”使用的对象,最终都会被转换为Calcite所识别的Table对象并加入其Schema中。

另外,Flink允许用户注册外部的catalog以提供如何访问外部数据库的相关信息,通过TableEnvironment对象的registerExternalCatalog方法即可注入。外部的catalog必须继承ExternalCatalog这一trait,它相当于外部数据库跟Table&SQL API的一个连接器。而Table&SQL API某种程度上又充当了外部catalog跟Calcite的连接器,整个桥接模式如下图所示:

对应到代码实现上来,Flink会通过一个ExternalCatalogSchema类来完成跟Calcite的catalog API的对接,包括注册跟获取catalog以及内部的子schema等。示例代码如下:

//获得TableEnvironment对象
val tableEnv = TableEnvironment.getTableEnvironment(env)

//创建一个外部的catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog

//注册
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

一旦外部的catalog被注册到环境对象,Table&SQL API都可以以类似于“catalog.database.table”这样的全限定名来访问表等信息。当前Flink提供了一个基于内存的ExternalCatalogSchema的实现:InMemoryExternalCatalog,它内部维护了两个映射:

  • 数据库映射:数据库名对应ExternalCatalog实例;
  • 表映射:表名对应ExternalCatalogTable实例;

source

source作为Table&SQL API的数据源,同时也是程序的入口。当前Flink的Table&SQL API整体而言支持三种source:Table source、DataSet以及DataStream,它们都通过特定的API注册到Table环境对象。

我们先来看Table source,它直接以表对象作为source。这里的表对象可细分为:

  • Flink以Table类定义的关系表对象,通过TableEnvironment的registerTable方法注册;
  • 外部source经过桥接而成的表对象,基础抽象为TableSource,通过具体环境对象的registerTableSource;

下图展示了,Table source被注册时,对应的内部转化图(虚线表示对应关系):

由上图可见,不管是直接注册Table对象还是注册外部source,在内部都直接对应了特定的XXXTable对象。

TableSource trait针对Streaming和Batch分部扩展有两个trait,它们是StreamTableSource和BatchTableSource,它们各自都提供了从数据源转换为核心对象(DataStream跟DataSource)的方法。

除了这三个基本的trait之外,还有一些特定对source的需求以独立的trait提供以方便实现者自行组合,比如ProjectableTableSource这一trait,它支持将Projection下推(push-down)到TableSource。Flink内置实现的CsvTableSource就继承了这一trait。

当前Flink所支持的TableSource大致上分为两类:

  • CsvTableSouce:同时可用于Batch跟Streaming模式;
  • kafka系列TableSource:包含Kafka的各个版本(0.8,0.9,0.10)以及各种不同的格式(Json、Avro),基本上它们只支持Streaming模式,它们都依赖于各种kafka的connector;

使用方式如下:

// specify JSON field names and types
val typeInfo = Types.ROW(
  Array("id", "name", "score"),
  Array(Types.INT, Types.STRING, Types.DOUBLE)
)

val kafkaTableSource = new Kafka08JsonTableSource(
    kafkaTopic,
    kafkaProperties,
    typeInfo)
tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);

CsvTableSource的构建方式如下:

val csvTableSource = CsvTableSource
    .builder
    .path("/path/to/your/file.csv")
    .field("name", Types.STRING)
    .field("id", Types.INT)
    .field("score", Types.DOUBLE)
    .field("comments", Types.STRING)
    .fieldDelimiter("#")
    .lineDelimiter("$")
    .ignoreFirstLine
    .ignoreParseErrors
    .commentPrefix("%")
    .build

除了以TableSource作为Table&SQL的source,还支持通过特定的环境对象直接注册DataStream、DataSet。注册DataStream的示例如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val cust = env.fromElements(...)
val ord = env.fromElements(...)

// register the DataStream cust as table "Customers" with fields derived from the datastream
tableEnv.registerDataStream("Customers", cust)

// register the DataStream ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount)

注册DataSet的示例如下:

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val cust = env.fromElements(...)
val ord = env.fromElements(...)

// register the DataSet cust as table "Customers" with fields derived from the dataset
tableEnv.registerDataSet("Customers", cust)

// register the DataSet ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount)

以上,通过调用环境对象的register[DataStream/DataSet]方法是一种显式注册的方式,除此之外,还有隐式注册方式。隐式注册方式,通过对DataStream跟DataSet对象增加的toTable方法来实现,使用方式示例如下:

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataSet from an external source
val ds: DataSet[(Long, String, Integer)] = env.readCsvFile(...)

val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
val result = tableEnv.sql(
  s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")

我们知道DataStream跟DataSet原先是没有toTable API的,如何为它们增加该API的呢?答案是利用了Scala的包对象(package object),该特性主要用于兼容旧版本的库或对某些类型的API进行增强。具体而言,toTable API其实是实现在DataSetConversions和DataStreamConversions两个类中,然后在包对象中对他们进行实例化。而定位到toTable的实现时,会看到它们其实是间接调用了特定环境对象的fromDataStream/fromDataSet方法并将当前的DataStream跟DataSet传递给这两个方法并通过方法返回得到Table对象。fromDataStream/fromDataSet方法对在实现时会调用跟registerDataStream/registerDataSet方法对相同的内部注册方法。

fromDataStream/fromDataSet方法通常主要的场景在于为DataStream/DataSet转换为Table对象提供便利,它本身也进行了隐式注册。然而,你也可以对通过这对方法得到的Table对象,再次调用registerTable来进行显式注册,不过通常没有必要。

因此,综合而言,注册DataStream跟DataSet的对应关系如下:

以上我们已经分析了所有的Table source的注册方式,有多种register系列方法并最终对应了内部各种XXXTable对象。稍显混乱,其实这些XXXTable对象是有联系的,并且所有的register系列方法最终都调用了TableEnvironment的registerTableInternal方法。因此其实注册Table source的内部原理是一致的,我们来分析一下。

TableEnvironment内部会以一个SchemaPlus类型的数据结构,它是Calcite中的数据结构,用来存储被注册的表、函数等在内的一系列对象(这些对象统称为Calcite中的Schema)。由此可见它无法直接接受Flink自定义的类似于TableSouce这样的对象,那么这里存在一个问题就是两个框架的衔接问题。这就是Flink定义那么多内部XXXTable类型的原因之一,我们来梳理一下它们之间的关系如下:

上图中的XXXTable对象同时以括号标明了在注册时它是由什么对象转化而来。

sink

sink其实跟source是反向的,一个是将数据源接入进来,另一个是将数据写到外部。因此,我们对比着source来看sink,当你实现一个Table&SQL程序并希望将处理之后的结果输出到外部。通常有以下几种方式:

  • 在Table对象上调用writeToSink API,它接收一个TableSink的实例;
  • 将Table再次转换为DataSet/DataStream,然后像输出DataSet/DataStream一样的方式来处理;

TableSink根据后端模式的差别,提供了两种实现:针对batch的BatchTableSink以及针对streaming的多种sink,它们拥有不同的特征,列举如下:

  • AppendStreamTableSink:它只支持插入变更,如果Table对象同时有更新和删除的变更,那么将会抛出TableException;
  • RetractStreamTableSink:它支持输出一个streaming模式的表,该表上可以有插入、更新和删除变更;
  • UpsertStreamTableSink:它支持输出一个streaming模式的表,该表上可以有插入、更新和删除变更,且还要求表要么有唯一的键字段要么是append-only模式的,如果两者都不满足,将抛出TableException;

跟source一样,内置的CsvTableSink同时兼具streaming跟batch的语义。

TableSink主要通过Table 的writeToSink API对外提供能力,然而最终的实现主要还是在特定的环境对象上。对BatchTableSink而言,BatchTableEnvironment会将具体的Table对象转换为DataSet,然后输出:

//将Table翻译为DataSet
val result: DataSet[T] = translate(table)(outputType)
//将DataSet给TableSink以使其输出
batchSink.emitDataSet(result)

针对streaming的各种sink则会在StreamTableEnvironment中挨个枚举不同的sink类型进行处理。但步骤跟BatchTableSink类似:先翻译为DataStream然后输出。

在source中可以直接从DataSet/DataStream转换为Table对象一样,同样从Table对象也可以转换为DataSet/DataStream对象。它们的实现手段都是类似的,通过package object对Table API进行增强,以使得Table 具备toDataSet/toXXXStream的API,最终由特定环境对象的toDataSet/toXXXStream方法完整具体的任务。

我们以CsvTableSink来分析一下,具体的emit是如何实现的,概况来讲有两步:

  1. 对数据集或数据流应用map运算符以CsvFormatter格式化器进行格式化;
  2. 再调用DataSet、DataStream的writeAsText sink到文件系统;

真正复杂的是各个Table环境对象中的translate方法,它们用于将Table翻译为DataSet/DataStream,这其中包含将相关的Table API调用以及SQL查询所对应的关系型的表达式树转换成DataSet/DataStream特定的运算符。这并不是本节的重点,我们将在后续对此进行介绍。

原文发布时间为:2017-07-13

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-08-01 20:11:48

Flink关系型API的公共部分的相关文章

Flink关系型API简介

在接触关系型API之前,用户通常会采用DataStream.DataSet API来编写Flink程序,它们都提供了丰富的处理能力,以DataStream为例,它有如下这些优点: 富有表现力的流处理,包括但不限于:转换数据,更新状态,定义窗口.聚合,事件时间语义,有状态且保证正确性等: 高度自定义的窗口逻辑:分配器.触发器.逐出器以及允许延迟等: 提升与外部系统连接能力的异步I/O接口: ProcessFunction给予用户访问时间戳和定时器等低层级的操作能力: 但它同时也存在一些使用壁垒导致

阿里云API调用公共参数有哪些

公共参数 公共请求参数 公共请求参数是指每个接口都需要使用到的请求参数. 名称类型是否必须描述 FormatString否返回值的类型,支持 JSON 与 XML.默认为 XML. VersionString是API 版本号,为日期形式:YYYY-MM-DD,本版本对应为 2014-05-26. AccessKeyIdString是阿里云颁发给用户的访问服务所用的密钥 ID. SignatureString是签名结果串,关于签名的计算方法,请参见<签名机制>. SignatureMethodS

Flink DataSet API Programming Guide

Example Program 编程的风格和spark很类似, ExecutionEnvironment  -- SparkContext DataSet – RDD Transformations 这里用Java的接口,所以传入function需要用FlatMapFunction类对象   public class WordCountExample { public static void main(String[] args) throws Exception { final Executi

Stream Processing for Everyone with SQL and Apache Flink

Where did we come from? With the 0.9.0-milestone1 release, Apache Flink added an API to process relational data with SQL-like expressions called the Table API. The central concept of this API is a Table, a structured data set or stream on which relat

Apache Flink源码解析之stream-window

window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析.本篇的内容主要集中在package org.apache.flink.streaming.api.windowing下. Window 一个Window代表有限对象的集合.一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点--所有应该进入这个窗口的元素都已经到达. Flink的根窗口对象是一个抽象类,只提供了一个抽象方法: public abstract long maxTimes

如何更好的设计RESTful API

当您的数据模型已开始稳定,您可以为您的网络应用程序创建公共API. 你意识到,很难对你的API进行重大更改,一旦它发布,并希望尽可能得到尽可能多的前面. 现在,互联网对API设计的意见有很多. 但是,因为没有一个广泛采用的标准在所有情况下都有效,所以你前面有一堆选择:你应该接受什么格式? 你应该如何认证? 你的API是否应该版本化?构建API是您可以做的最重要的事情之一,以提高您的服务的价值. 通过使用API,您的服务/核心应用程序有可能成为其他服务增长的平台. 看看当前巨大的科技公司:Face

Apache Flink改进及其在阿里巴巴搜索中的应用

本文整理自阿里搜索基础设施团队研究员蒋晓伟在Flink Forward 2016大会上的演讲,原始演讲视频可以在这里查看. 以下为演讲整理 阿里是世界上最大的电子商务零售商,其2015年的年销售额就超过了eBay和Amazon的总和,达3940亿.Alibaba Search,个性化搜索和推荐平台,既是顾客的关键入口,也承担了大部分的在线收益.因此,阿里搜索基础设施团队一直在努力改进产品. 对于电子商务网站上的搜索引擎,到底什么最重要?必然是实时地为每一位用户提供最相关和准确的搜索结果.以阿里巴

Javascript操作DOM常用API总结

转载自:http://www.cnblogs.com/lrzw32/p/5008913.html Javascript操作DOM常用API总结 文本整理了javascript操作DOM的一些常用的api,根据其作用整理成为创建,修改,查询等多种类型的api,主要用于复习基础知识,加深对原生js的认识. 基本概念 在讲解操作DOM的api之前,首先我们来复习一下一些基本概念,这些概念是掌握api的关键,必须理解它们. Node类型 DOM1级定义了一个Node接口,该接口由DOM中所有节点类型实现

Spark Streaming和Flink的Word Count对比

准备: nccat for windows/linux 都可以 通过 TCP 套接字连接,从流数据中创建了一个 Spark DStream/ Flink DataSream, 然后进行处理, 时间窗口大小为10s 因为 示例需要, 所以 需要下载一个netcat, 来构造流的输入. 代码: spark streaming package cn.kee.spark; public final class JavaNetworkWordCount { private static final Pat