《Flink官方文档》Python 编程指南测试版(一)

Flink中的分析程序实现了对数据集的某些操作 (例如,数据过滤,映射,合并,分组)。这些数据最初来源于特定的数据源(例如来自于读文件或数据集合)。操作执行的结果通过数据池以写入数据到(分布式)文件系统或标准输出(例如命令行终端)的形式返回。Flink程序可以运行在不同的环境中,既能够独立运行,也可以嵌入到其他程序中运行。程序可以运行在本地的JVM上,也可以运行在服务器集群中。

为了创建你自己的Flink程序,我们鼓励你从program skeleton(程序框架)开始,并逐渐增加你自己的transformations(变化)。以下是更多的用法和高级特性的索引。

示例程序

以下程序是一段完整可运行的WordCount示例程序。你可以复制粘贴这些代码并在本地运行。

from flink.plan.Environment import get_environment
from flink.functions.GroupReduceFunction import GroupReduceFunction

class Adder(GroupReduceFunction):
  def reduce(self, iterator, collector):
    count, word = iterator.next()
    count += sum([x[0] for x in iterator])
    collector.collect((count, word))

env = get_environment()
data = env.from_elements("Who's there?",
 "I think I hear them. Stand, ho! Who's there?")

data \
  .flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
  .group_by(1) \
  .reduce_group(Adder(), combinable=True) \
  .output()

env.execute(local=True)

程序框架

从示例程序可以看出,Flink程序看起来就像普通的python程序一样。每个程序都包含相同的基本组成部分:

1.获取一个运行环境
2.加载/创建初始数据
3.指定对这些数据的操作
4.指定计算结果的存放位置
5.运行程序

接下来,我们将对每个步骤给出概述,更多细节可以参考与之对应的小节。
Environment(运行环境)是所有Flink程序的基础。你可以通过调用Environment类中的一些静态方法来建立一个环境:

get_environment()

运行环境可通过多种读文件的方式来指定数据源。如果是简单的按行读取文本文件,你可以采用:

env = get_environment()
text = env.read_text("file:///path/to/file")

这样,你就获得了可以进行操作(apply transformations)的数据集。关于数据源和输入格式的更多信息,请参考 Data Sources
一旦你获得了一个数据集DataSet,你就可以通过transformations来创建一个新的数据集,并把它写入到文件,再次transform,或者与其他数据集相结合。你可以通过对数据集调用自己个性化定制的函数来进行数据操作。例如,一个类似这样的数据映射操作:

data.map(lambda x: x*2)

这将会创建一个新的数据集,其中的每个数据都是原来数据集中的2倍。若要获取关于所有transformations的更多信息,及所有数据操作的列表,请参考Transformations

当你需要将所获得的数据集写入到磁盘时,调用下面三种函数的其中一个即可。

data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
output()

其中,最后一种方法仅适用于在本机上进行开发/调试,它会将数据集的内容输出到标准输出。(请注意,当函数在集群上运行时,结果将会输出到整个集群节点的标准输出流,即输出到workers的.out文件。)前两种方法,能够将数据集写入到对应的文件中。关于写入到文件的更多信息,请参考Data Sinks

当你设计好了程序之后,你需要在环境中执行execute命令来运行程序。可以选择在本机运行,也可以提交到集群运行,这取决于Flink的创建方式。你可以通过设置execute(local=True)强制程序在本机运行。

创建项目

除了搭建好Flink运行环境,就无需进行其他准备工作了。Python包可以从你的Flink版本对应的/resource文件夹找到。在执行工作任务时,Flink 包,plan包和optional包均可以通过HDFS自动分发。

Python API已经在安装了Python2.7或3.4的Linux/Windows系统上测试过。

默认情况下,Flink通过调用”python”或”python3″来启动python进程,这取决于使用了哪种启动脚本。通过在 flink-conf.yaml 中设置 “python.binary.python[2/3]”对应的值,来设定你所需要的启动方式。

延迟(惰性)求值

所有的Flink程序都是延迟执行的。当程序的主函数执行时,数据的载入和操作并没有在当时发生。与此相反,每一个被创建出来的操作都被加入到程序的计划中。当程序环境中的某个对象调用了execute()函数时,这些操作才会被真正的执行。不论该程序是在本地运行还是集群上运行。

延迟求值能够让你建立复杂的程序,并在Flink上以一个整体的计划单元来运行。

数据变换

数据变换(Data transformations)可以将一个或多个数据集映射为一个新的数据集。程序能够将多种变换结合到一起来进行复杂的整合变换。

该小节将概述各种可以实现的数据变换。transformations documentation数据变换文档中,有关于所有数据变换和示例的全面介绍。

Transformation  Description    变换描述
Map 输入一个元素,输出一个元素

data.map(lambda x: x * 2)
FlatMap 输入一个元素,输出0,1,或多个元素

data.flat_map(
  lambda x,c: [(1,word) for word in line.lower().split() for line
in x])
MapPartition 通过一次函数调用实现并行的分割操作。该函数将分割变换作为一个”迭代器”,并且能够产生任意数量的输出值。每次分割变换的元素数量取决于变换的并行性和之前的操作结果。

data.map_partition(lambda x,c: [value * 2 for value in x])
Filter 对每一个元素,计算一个布尔表达式的值,保留函数计算结果为true的元素。

data.filter(lambda x: x > 1000)
Reduce 通过不断的将两个元素组合为一个,来将一组元素结合为一个单一的元素。这种缩减变换可以应用于整个数据集,也可以应用于已分组的数据集。

data.reduce(lambda x,y : x + y)
ReduceGroup 将一组元素缩减为1个或多个元素。缩减分组变换可以被应用于一个完整的数据集,或者一个分组数据集。

lass Adder(GroupReduceFunction):
  def reduce(self, iterator, collector):
    count, word = iterator.next()
    count += sum([x[0] for x in iterator)
    collector.collect((count, word))

data.reduce_group(Adder())
Aggregate 对一个数据集包含所有元组的一个域,或者数据集的每个数据组,执行某项built-in操作(求和,求最小值,求最大值)。聚集变换可以被应用于一个完整的数据集,或者一个分组数据集。

# This code finds the sum of all of the values in the first field
 and the maximum of all of the values in the second field
data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1)

# min(), max(), and sum() syntactic sugar functions are also available
data.sum(0).and_agg(Aggregation.Max, 1)
Join 对两个数据集进行联合变换,将得到一个新的数据集,其中包含在两个数据集中拥有相等关键字的所有元素对。也可通过JoinFunction来把成对的元素变为单独的元素。关于join keys的更多信息请查看 keys 。

# In this case tuple fields are used as keys.
# "0" is the join field on the first tuple
# "1" is the join field on the second tuple.
result = input1.join(input2).where(0).equal_to(1)
CoGroup 是Reduce变换在二维空间的一个变体。将来自一个或多个域的数据加入数据组。变换函数transformation function将被每一对数据组调用。关于定义coGroup keys的更多信息,请查看 keys 。

data1.co_group(data2).where(0).equal_to(1)
Cross 计算两个输入数据集的笛卡尔乘积(向量叉乘),得到所有元素对。也可通过CrossFunction实现将一对元素转变为一个单独的元素。

result = data1.cross(data2)
Union 将两个数据集进行合并。

data.union(data2)
ZipWithIndex 为数据组中的元素逐个分配连续的索引。了解更多信息,请参考 [Zip Elements Guide](zip_elements_guide.html#zip-with-a-dense-index).

data.zip_with_index()

指定Keys

一些变换(例如Join和CoGroup),需要在进行变换前,为作为输入参数的数据集指定一个关键字,而另一些变换(例如Reduce和GroupReduce),则允许在变换操作之前,对数据集根据某个关键字进行分组。

数据集可通过如下方式分组

reduced = data \
  .group_by(<define key here>) \
  .reduce_group(<do something>)

Flink中的数据模型并不是基于键-值对。你无需将数据集整理为keys和values的形式。键是”虚拟的”:它们被定义为在真实数据之上,引导分组操作的函数。

转载自 并发编程网 - ifeve.com

时间: 2024-12-29 08:39:50

《Flink官方文档》Python 编程指南测试版(一)的相关文章

《Apache Flink 官方文档》前言

本文档针对的是Apache Flink的 1.2.0版本. Apache Flink是一个分布式流式和批量数据处理程序的开源平台.Flink的核心是流式数据引擎,Flink通过数据流的分布式计算的方式提供数据的分发.通信和容错.Flink也构建了流引擎之上的批处理,覆盖本地迭代上的支持,内存管理和程序优化. 1.第一步 基本概念:先从Flink的数据流程序模型和分布式实时环境的基本概念开始.这会帮助你完全理解文档的其他部分,包括安装和编程指南.强烈推荐先阅读这部分内容. 快速开始:在你的本机上运

《Apache Flink官方文档》 Apache Flink介绍

下面是关于Apache Flink(以下简称Filnk)框架和流式计算的概述.为了更专业.更技术化的介绍,在Flink文档中推荐了一些"概念性"的文章. 1.无穷数据集的持续计算 在我们详细介绍Flink前,复习一下当我们计算数据选择运算模型时,很可能会遇到的一个更高级别的数据集类型.下面有两个观点经常容易混淆,很有必要去澄清它们. (1)两种数据集类型: ①无穷数据集:无穷的持续集成的数据集合. ②有界数据集:有限不会改变的数据集合. 很多现实中传统地认为有界或者批量的数据集合实际上

《Flink官方文档》Python 编程指南测试版(二)

为元组定义keys 最简单的情形是对一个数据集中的元组按照一个或多个域进行分组: reduced = data \ .group_by(0) \ .reduce_group(<do something>) 数据集中的元组被按照第一个域分组.对于接下来的group-reduce函数,输入的数据组中,每个元组的第一个域都有相同的值. grouped = data \ .group_by(0,1) \ .reduce(/*do something*/) 在上面的例子中,数据集的分组基于第一个和第二个

《Flink官方文档》监控Wikipedia 编辑流(二)

本示例应该能让你开始写自己的Flink程序.为了学习更多,你可以打开我们的基本概念和DataStream API的指南.如果你想学习如何构建一个Flink集群在自己机器上并将结果写入Kafka,请看接下来的激励练习. 激励练习:在一个Flink集群上运行,并将结果写入Kafka 请按照我们的快速开始里面的内容来在你的机器上构建一个Flink分布式,再参考Kafka的快速开始来安装Kafka,然后我们继续. 第一步,我们为了能使用Kafka连接器,需要添加Flink Kafka连接器的依赖.将这个

《KAFKA官方文档》入门指南(一)

1.入门指南 1.1简介 Apache的Kafka是一个分布式流平台(a distributed streaming platform).这到底意味着什么? 我们认为,一个流处理平台应该具有三个关键能力: 它可以让你发布和订阅记录流.在这方面,它类似于一个消息队列或企业消息系统. 它可以让你持久化收到的记录流,从而具有容错能力. 它可以让你处理收到的记录流. Kafka擅长哪些方面? 它被用于两大类应用: 建立实时流数据管道从而能够可靠地在系统或应用程序之间的共享数据 构建实时流应用程序,能够变

《Flink官方文档》示例总览

示例 Java 的示例项目和Scala 的示例项目指导了构建Maven和SBT项目,并包含了一个单词计数程序的简单实现. 监控Wikipedia编辑是一个更复杂的流式分析应用 用 Apache Flink.Elasticsearch和 Kibana 构建实时面板应用是发布在elastic.co上的一个博客,展示了如何用 Apache Flink.Elasticsearch和 Kibana去构建实时面板来解决流数据分析. 捆绑示例 Flink 资源包含了很多流式(java/scala) 和批处理(

《KAFKA官方文档》入门指南(五)

新的协议版本 ListOffsetRequest V1支持精确的基于时间戳的偏移搜索. MetadataResponse V2引入了一个新的参数: "CLUSTER_ID". FetchRequest v3支持限制请求返回的大小(除了现有的每个分区的限制),它能够返回比限制更大的消息和在请求中加入分区的顺序具有重要意义. JoinGroup V1引入了一个新的字段: "rebalance_timeout". 升级0.8.4或0.9.x版本到0.10.0.0 0.10

《Flink官方文档》Batch Examples(二)

连通分支 连通分支算法识别会一个更大的图,这部分图通过被相同的组件ID链接的所有顶点连接.类似PageRank,连通组件是一个迭代算法.在每个步骤中,每个顶点都将其当前组件ID传给所有邻居.如果小于自己的组件ID,一个顶点从邻居接受组件ID. 此实现使用增量迭代:组件ID未变化的顶点不参与下一步骤.因为后来的迭代通常只处理一些离群顶点,这将产生更好的性能. // read vertex and edge data DataSet<Long> vertices = getVertexDataSe

《Netty官方文档》开发者指南

在找教程? 访问这里. 有问题? 在StackOverflow.com提问. 请注意这个指南不是一个"用户指南".这是给想要开发Netty的贡献者("开发人员")看的而不是给想要用Netty构建一个应用的"用户"看的. 在开始前 设置你的开发环境. 除非你的贡献很微小例如单行改动或拼写校正,读一下并签署 Individual Contributor License Agreement (icla), 或者让你的雇员签署Corporate Cont