Flink中的分析程序实现了对数据集的某些操作 (例如,数据过滤,映射,合并,分组)。这些数据最初来源于特定的数据源(例如来自于读文件或数据集合)。操作执行的结果通过数据池以写入数据到(分布式)文件系统或标准输出(例如命令行终端)的形式返回。Flink程序可以运行在不同的环境中,既能够独立运行,也可以嵌入到其他程序中运行。程序可以运行在本地的JVM上,也可以运行在服务器集群中。
为了创建你自己的Flink程序,我们鼓励你从program skeleton(程序框架)开始,并逐渐增加你自己的transformations(变化)。以下是更多的用法和高级特性的索引。
- Example Program
- Program Skeleton
- Project setup
- Lazy Evaluation
- Transformations
- Specifying Keys
- Passing Functions to Flink
- Data Types
- Data Sources
- Data Sinks
- Broadcast Variables
- Parallel Execution
- Executing Plans
示例程序
以下程序是一段完整可运行的WordCount示例程序。你可以复制粘贴这些代码并在本地运行。
from flink.plan.Environment import get_environment
from flink.functions.GroupReduceFunction import GroupReduceFunction
class Adder(GroupReduceFunction):
def reduce(self, iterator, collector):
count, word = iterator.next()
count += sum([x[0] for x in iterator])
collector.collect((count, word))
env = get_environment()
data = env.from_elements("Who's there?",
"I think I hear them. Stand, ho! Who's there?")
data \
.flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
.group_by(1) \
.reduce_group(Adder(), combinable=True) \
.output()
env.execute(local=True)
程序框架
从示例程序可以看出,Flink程序看起来就像普通的python程序一样。每个程序都包含相同的基本组成部分:
1.获取一个运行环境
2.加载/创建初始数据
3.指定对这些数据的操作
4.指定计算结果的存放位置
5.运行程序
接下来,我们将对每个步骤给出概述,更多细节可以参考与之对应的小节。
Environment(运行环境)是所有Flink程序的基础。你可以通过调用Environment类中的一些静态方法来建立一个环境:
get_environment()
运行环境可通过多种读文件的方式来指定数据源。如果是简单的按行读取文本文件,你可以采用:
env = get_environment()
text = env.read_text("file:///path/to/file")
这样,你就获得了可以进行操作(apply transformations)的数据集。关于数据源和输入格式的更多信息,请参考 Data Sources。
一旦你获得了一个数据集DataSet,你就可以通过transformations来创建一个新的数据集,并把它写入到文件,再次transform,或者与其他数据集相结合。你可以通过对数据集调用自己个性化定制的函数来进行数据操作。例如,一个类似这样的数据映射操作:
data.map(lambda x: x*2)
这将会创建一个新的数据集,其中的每个数据都是原来数据集中的2倍。若要获取关于所有transformations的更多信息,及所有数据操作的列表,请参考Transformations。
当你需要将所获得的数据集写入到磁盘时,调用下面三种函数的其中一个即可。
data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
output()
其中,最后一种方法仅适用于在本机上进行开发/调试,它会将数据集的内容输出到标准输出。(请注意,当函数在集群上运行时,结果将会输出到整个集群节点的标准输出流,即输出到workers的.out文件。)前两种方法,能够将数据集写入到对应的文件中。关于写入到文件的更多信息,请参考Data Sinks。
当你设计好了程序之后,你需要在环境中执行execute命令来运行程序。可以选择在本机运行,也可以提交到集群运行,这取决于Flink的创建方式。你可以通过设置execute(local=True)强制程序在本机运行。
创建项目
除了搭建好Flink运行环境,就无需进行其他准备工作了。Python包可以从你的Flink版本对应的/resource文件夹找到。在执行工作任务时,Flink 包,plan包和optional包均可以通过HDFS自动分发。
Python API已经在安装了Python2.7或3.4的Linux/Windows系统上测试过。
默认情况下,Flink通过调用”python”或”python3″来启动python进程,这取决于使用了哪种启动脚本。通过在 flink-conf.yaml 中设置 “python.binary.python[2/3]”对应的值,来设定你所需要的启动方式。
延迟(惰性)求值
所有的Flink程序都是延迟执行的。当程序的主函数执行时,数据的载入和操作并没有在当时发生。与此相反,每一个被创建出来的操作都被加入到程序的计划中。当程序环境中的某个对象调用了execute()函数时,这些操作才会被真正的执行。不论该程序是在本地运行还是集群上运行。
延迟求值能够让你建立复杂的程序,并在Flink上以一个整体的计划单元来运行。
数据变换
数据变换(Data transformations)可以将一个或多个数据集映射为一个新的数据集。程序能够将多种变换结合到一起来进行复杂的整合变换。
该小节将概述各种可以实现的数据变换。transformations documentation数据变换文档中,有关于所有数据变换和示例的全面介绍。
Transformation | Description 变换描述 |
Map | 输入一个元素,输出一个元素
|
FlatMap | 输入一个元素,输出0,1,或多个元素
|
MapPartition | 通过一次函数调用实现并行的分割操作。该函数将分割变换作为一个”迭代器”,并且能够产生任意数量的输出值。每次分割变换的元素数量取决于变换的并行性和之前的操作结果。
|
Filter | 对每一个元素,计算一个布尔表达式的值,保留函数计算结果为true的元素。
|
Reduce | 通过不断的将两个元素组合为一个,来将一组元素结合为一个单一的元素。这种缩减变换可以应用于整个数据集,也可以应用于已分组的数据集。
|
ReduceGroup | 将一组元素缩减为1个或多个元素。缩减分组变换可以被应用于一个完整的数据集,或者一个分组数据集。
|
Aggregate | 对一个数据集包含所有元组的一个域,或者数据集的每个数据组,执行某项built-in操作(求和,求最小值,求最大值)。聚集变换可以被应用于一个完整的数据集,或者一个分组数据集。
|
Join | 对两个数据集进行联合变换,将得到一个新的数据集,其中包含在两个数据集中拥有相等关键字的所有元素对。也可通过JoinFunction来把成对的元素变为单独的元素。关于join keys的更多信息请查看 keys 。
|
CoGroup | 是Reduce变换在二维空间的一个变体。将来自一个或多个域的数据加入数据组。变换函数transformation function将被每一对数据组调用。关于定义coGroup keys的更多信息,请查看 keys 。
|
Cross | 计算两个输入数据集的笛卡尔乘积(向量叉乘),得到所有元素对。也可通过CrossFunction实现将一对元素转变为一个单独的元素。
|
Union | 将两个数据集进行合并。
|
ZipWithIndex | 为数据组中的元素逐个分配连续的索引。了解更多信息,请参考 [Zip Elements Guide](zip_elements_guide.html#zip-with-a-dense-index).
|
指定Keys
一些变换(例如Join和CoGroup),需要在进行变换前,为作为输入参数的数据集指定一个关键字,而另一些变换(例如Reduce和GroupReduce),则允许在变换操作之前,对数据集根据某个关键字进行分组。
数据集可通过如下方式分组
reduced = data \
.group_by(<define key here>) \
.reduce_group(<do something>)
Flink中的数据模型并不是基于键-值对。你无需将数据集整理为keys和values的形式。键是”虚拟的”:它们被定义为在真实数据之上,引导分组操作的函数。