Flink如何应对背压问题

经常有人会问Flink如何处理背压问题。其实,答案很简单:Flink没用使用任何通用方案来解决这个问题,因为那根本不需要那样的方案。它利用自身作为一个纯数据流引擎的优势来优雅地响应背压问题。这篇文章,我们将介绍背压问题,然后我们将深挖Flink的运行时如何在task之间传输数据缓冲区内的数据以及流数据如何自然地两端降速来应对背压,最终将以一个小示例来演示它。

什么是背压

像Flink这样的流处理系统需要能够优雅地应对背压问题。背压通常产生于这样一种场景:当一个系统接收数据的速率高于它在一个瞬时脉冲内能处理的数据。许多日常问题都会导致背压。例如,垃圾回收卡顿可能会导致流入的数据快速堆积,或者一个数据源可能生产数据的速度过快。背压如果不能得到正确地处理,可能会导致资源被耗尽或者甚至出现更糟的情况导致数据丢失。

让我们来看一个简单的例子。假设存在一个数据流pipeline作为source,一个流处理job,以及一个sink以每秒500万条记录的速度处理数据,整个流处理程序处于稳定的状态。如下图所示(一个黑色的条状代表1百万个记录,该图是系统中其中1秒的快照):

在同一时间点,不管是流处理job还是sink,如果有1秒的卡顿,那么将导致至少500万条记录的积压。换句话说,source可能会产生一个脉冲,显示在一秒内数据的生产速度突然翻倍。

我们如何来应对类似这样的场景呢?当然,其中一种方案是删除这些元素。但数据丢失对许多流处理程序而言是不可接受的!这些应用要求exactly once的一致性。另一种方案是数据放在某个缓冲区内。缓冲区也需要被持久化,因为在失败的情况下,这些数据需要被重放 以防止数据丢失。理想情况下,这些数据应该被缓冲到某个持久化的channel里(例如,如果source本身提供持久化保证的情况下,可以是该source本身 – Apache Kafka是一个很不错的选择)。而理想的应对措施是:背压从sink到source的整个pipeline,同时对source进行限流来适配整个pipeline中最慢组件的速度,从而获得稳定状态:

Flink中的背压

Flink运行时的构造部件是operators以及streams。每一个operator消费一个中间/过渡状态的流,对它们进行转换,然后生产一个新的流。描述这种机制最好的类比是:Flink使用有效的分布式阻塞队列来作为有界的缓冲区。如同Java里通用的阻塞队列跟处理线程进行连接一样,一旦队列达到容量上限,一个相对较慢的接受者将拖慢发送者。

以下面这个示例(两个task组成的一个简单的flow)来看Flink如何应对背压:

1、记录“A”进入Flink,然后被Task 1处理 
2、记录被序列化进缓冲区 
3、缓冲区内的数据被移动到Task 2,task 2会从缓冲区内读取记录

这里有一个重要的事实:为了记录能被Flink处理,缓冲区必须是可用的

在Flink中这些分布式的队列被认为是逻辑流,而它们的有界容量可以通过每一个生产、消费流管理的缓冲池获得。缓冲池是缓冲区的集合,它们都可以在被消费完之后循环利用。这个观点很好理解:你从池里获取一个缓冲区,填进数据,然后在数据被消费后,将该缓冲区返还回缓冲池,之后你还可以再次使用它。

这些缓冲池的大小在运行时能动态变化。在不同的发送者/接收者存在不同的处理速度的情况下,网络栈里的内存缓冲区的数量(等于队列的容量)决定了系统能够提供的缓冲区的数量。Flink保证总是有足够的缓冲区提供给应用程序,但处理的速度是由用户的程序以及可用内存的数量决定的。内存越多,意味着系统可以轻松应对一定的瞬时背压(short periods,short GC)。越少的内存意味着需要对背压进行更多的“即时”响应(意思是,如果内存少缓冲区就容易被填满,那么需要立即作出响应,消费走数据才能应对这个问题)。

回到上面那个简单的示例:Task 1在其输出端被分配了一个缓冲池,Task 2在其输入端也有一个。如果当前有一个缓冲区可供序列化的“A”使用,我们就序列化它然后分配该缓冲区。

我们来看两种场景:

  • 本地传输:如果task1和task2都运行在同一个工作节点(TaskManager),缓冲区可以被直接共享给下一个task,一旦task 2消费了数据它会被回收。如果task 2比task 1慢,buffer会以比task 1填充的速度更慢的速度进行回收从而迫使task 1降速。
  • 远程传输:如果task 1和task 2运行在不同的工作节点上。一旦缓冲区内的数据被发送出去(TCP Channel),它就会被回收。在接收端,数据被拷贝到输入缓冲池的缓冲区中,如果没有缓冲区可用,从TCP连接中的数据读取动作将会被中断。输出端通常以watermark机制来保证不会有太多的数据在传输途中。如果有足够的数据已经进入可发送状态,会等到情况稳定到阈值以下才会进行发送。这可以保证没有太多的数据在路上。如果新的数据在消费端没有被消费(因为没有可用的缓冲区),这种情况会降低发送者发送数据的速度。

这个在固定大小的缓冲池之间的流示例,保证了Flink健壮的背压机制,从而使得task生产数据的速度跟消费的速度对等。

我们描述的这个方案可以从两个task之间的数据传输自然地扩展到更复杂的pipeline中,并保证背压在整个pipeline上扩散。

让我们来看一个简单的实验,它展示了Flink遇到背压问题后的表现。我们运行一个简单的生产者-消费者流拓扑,主要的功能是在本地的task之间传输数据,我们在task生产记录时改变它的速度。就本次测试而言,我们使用比默认配置更少的内存来使得背压问题得到凸显。我们为每个task配备两个大小为4096B(byte)的缓冲区。在通常的Flink部署场景中,task的缓冲区数量会比这更多,容量也会更大。另外,这个测试运行在单一的JVM中,但使用了完整的Flink功能栈。

下面这张图显示了:随着时间的改变,生产者(黄色线)和消费者(绿色线)基于所达到的最大吞吐(在单一JVM中每秒达到8百万条记录)的平均吞吐百分比。我们通过衡量task每5秒钟处理的记录数来衡量平均吞吐。

首先,我们运行生产者task到它最大生产速度的60%(我们通过Thread.sleep()来模拟降速)。消费者以同样的速度处理数据。然后,我们将消费task的速度降至其最高速度的30%。你就会看到背压问题产生了,正如我们所见,生产者的速度也自然降至其最高速度的30%。接着,我们对消费者停止人为降速,之后生产者和消费者task都达到了其最大的吞吐。接下来,我们再次将消费者的速度降至30%,pipeline给出了立即响应:生产者的速度也被自动降至30%。最后,我们再次停止限速,两个task也再次恢复100%的速度。这所有的迹象表明:生产者和消费者在pipeline中的处理都在跟随彼此的吞吐而进行适当的调整,这就是我们在流pipeline中描述的行为。

总结

Flink与持久化的source(例如kafka),能够为你提供即时的背压处理,而无需担心数据丢失。Flink不需要一个特殊的机制来处理背压,因为Flink中的数据传输相当于已经提供了应对背压的机制。因此,Flink所获得的最大吞吐量由其pipeline中最慢的部件决定。

原文发布时间为:2016-04-21

时间: 2024-08-17 16:06:35

Flink如何应对背压问题的相关文章

开源大数据周刊-第2期

阿里云E-Mapreduce动态 E-Mapreduce团队计划下周发布VPC方案,敬请期待. Aliyun-emapreduce-demo发布在github上,敬请关注. 资讯播报 阿里云启动公益云计划 李连杰也喊话要拥抱大数据 云栖大会·深圳峰会4月20日在深圳举行.大会上,阿里云正式启动了公益云计划,壹基金创始李连杰也参与此次启动仪式.李连杰表示,我们希望通过壹基金,还有云计算.大数据,能为更多NGO组织提供榜样,更好地运用科技来解决社会的公益慈善问题 QCon北京:构建大数据生态需要哪些

Flink流处理迭代之化解反馈环

我们都知道Flink在可迭代的流处理中引入了反馈边来将本次迭代的结果反馈给迭代头以进行下一次迭代,这在执行拓扑中引入了环(反馈环).Flink主要应对的执行拓扑还是有向无环图(DAG),最终它选择了将反馈环进行化解使其能够适配有向无环图的结构,而如何对反馈环进行化解是我们这一篇主要探讨的话题. 任何提交给Flink执行的程序在提交之前都必须先生成作业图,对于用DataStream API编写的流处理程序在生成作业图之前,还会先生成流图.因此,如果想化解迭代产生的反馈环其时机只能是在部署执行之前的

Flink 原理与实现:如何处理反压问题

流处理系统需要能优雅地处理反压(backpressure)问题.反压通常产生于这样的场景:短时负载高峰导致系统接收数据的速率远高于它处理数据的速率.许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或者遇到大促或秒杀活动导致流量陡增.反压如果不能得到正确的处理,可能会导致资源耗尽甚至系统崩溃. 目前主流的流处理系统 Storm/JStorm/Spark Streaming/Flink 都已经提供了反压机制,不过其实现各不相同. Storm 是通过监控 Bolt 中的接收队

Flink内存管理源码解读之基础数据结构

在分布式实时计算领域,如何让框架/引擎足够高效地在内存中存取.处理海量数据是一个非常棘手的问题.在应对这一问题上Flink无疑是做得非常杰出的,Flink的自主内存管理设计也许比它自身的知名度更高一些.正好最近在研读Flink的源码,所以开两篇文章来谈谈Flink的内存管理设计. Flink的内存管理的亮点体现在作为以Java为主的(部分功能用Scala实现,也是一种遵循JVM规范并依赖JVM解释执行的函数式编程语言)的程序却自主实现内存的管理而不完全依赖于JVM的内存管理机制.它的优势在于灵活

Flink中task之间的数据交换机制

Flink中的数据交换构建在如下两条设计原则之上: 数据交换的控制流(例如,为实例化交换而进行的消息传输)是接收端初始化的,这非常像最初的MapReduce. 数据交换的数据流(例如,在网络上最终传输的数据)被抽象成一个叫做IntermediateResult的概念,它是可插拔的.这意味着系统基于相同的实现逻辑可以既支持流数据,又支持批处理数据的传输. 数据传输包含多个对象,它们是: JobManager master节点,用于响应任务调度.恢复.协作,以及通过ExecutionGraph数据结

Apache Flink fault tolerance源码剖析(六)

上篇文章我们分析了基于检查点的用户状态的保存机制--状态终端.这篇文章我们来分析barrier(中文常译为栅栏或者屏障,为了避免引入名称争议,此处仍用英文表示).检查点的barrier是提供exactly once一致性保证的主要保证机制.这篇文章我们会就此展开分析. 这篇文章我们侧重于核心代码分析,原理我们在这个系列的第一篇文章<Flink数据流的Fault Tolerance机制> 一致性保证 Flink的一致性保证也依赖于检查点机制.在利用检查点进行恢复时,数据流会进行重放(replay

Apache Flink fault tolerance源码剖析完结篇

这篇文章是对Flinkfault tolerance的一个总结.虽然还有些细节没有涉及到,但是基本的实现要点在这个系列中都已提及. 回顾这个系列,每篇文章都至少涉及一个知识点.我们来挨个总结一下. 恢复机制实现 Flink中通常需要进行状态恢复的对象是operator以及function.它们通过不同的方式来达到状态快照以及状态恢复的能力.其中function通过实现Checkpointed的接口,而operator通过实现StreamOpeator接口.这两个接口的行为是类似的. 当然对于数据

Flink内存管理源码解读之内存管理器

回顾 上一篇文章我们谈了Flink自主内存管理的一些基础的数据结构.那篇中主要讲了数据结构的定义,这篇我们来看看那些数据结构的使用,以及内存的管理设计. 概述 这篇文章我们主要探讨Flink的内存管理类MemoryManager涉及到对内存的分配.回收,以及针对预分配内存而提供的memory segment pool.还有支持跨越多个memory segment数据访问的page view. 本文探讨的类主要位于pageckage : org.apache.flink.runtime.memor

大数据框架对比:Hadoop、Storm、Samza、Spark和Flink

简介 大数据是收集.整理.处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称.虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性.规模,以及价值在最近几年才经历了大规模扩展. 在之前的文章中,我们曾经介绍过有关大数据系统的常规概念.处理过程,以及各种专门术语,本文将介绍大数据系统一个最基本的组件:处理框架.处理框架负责对系统中的数据进行计算,例如处理从非易失存储中读取的数据,或处理刚刚摄入到系统中的数据.数据的计算则是指从大量单一数据点中提取信息和见解