Flink运行时之统一的数据交换对象

统一的数据交换对象

在Flink的执行引擎中,流动的元素主要有两种:缓冲(Buffer)和事件(Event)。Buffer主要针对用户数据交换,而Event则用于一些特殊的控制标识。但在实现时,为了在通信层统一数据交换,Flink提供了数据交换对象——BufferOrEvent。它是一个既可以表示Buffer又可以表示Event的类。上层使用者只需调用isBuffer和isEvent方法即可判断当前收到的这条数据是Buffer还是Event。

缓冲

缓冲(Buffer)是数据交换的载体,几乎所有的数据(当然事件是特殊的)交换都需要经过Buffer。Buffer底层依赖于Flink自管理内存的内存段(MemorySegment)作为数据的容器。Buffer在内存段上做了一层封装,这一层封装是为了对基于引用计数的Buffer回收机制提供支持。

引用计数是计算机编程语言中的一种内存管理技术,是指将资源(可以是对象、内存或磁盘)的被引用次数保存起来,当被引用次数变为零时就将其释放的过程。使用引用计数技术可以实现自动资源管理的目的。具体做法可简述为:当创建一个对象的实例并在堆上申请内存时,对象的引用计数就为1,在其他对象中需要持有这个对象时,就需要把该对象的引用计数加1,需要释放一个对象时,就将该对象的引用计数减1,直至对象的引用计数为0,对象的内存会被释放。

引用计数还可以指使用引用计数技术回收未使用资源的垃圾回收算法,Objective-C就是使用这种方式进行内存管理的典型语言之一。

它在内部维护着一个计数器referenceCount,初始值为1。内存回收由缓冲回收器(BufferRecycler)来完成,回收的对象就是内存段(MemorySegment)。

实现引用计数的方法有两个。第一个为retain,用于将引用计数加一:

public Buffer retain() {
    synchronized (recycleLock) {
        //预防性检测,先确认内存段是否已被回收
        ensureNotRecycled();
        referenceCount++;
        return this;
    }
}

第二个为回收(或将引用计数减一)的方法recycle,当引用计数减为0时,BufferRecycler会对内存段进行回收:

public void recycle() {
    synchronized (recycleLock) {
        if (--referenceCount == 0) {
            recycler.recycle(memorySegment);
        }
    }
}

BufferRecycler接口有一个名为FreeingBufferRecycler的简单实现者,它的做法是直接释放内存段。当然通常为了分配和回收的效率,会对Buffer进行预先分配然后加入到Buffer池中。所以,BufferRecycler的常规实现是基于缓冲池的。除此之外,还有另一个接口BufferProvider(它约定了Buffer提供者如何以同步和异步的模式提供Buffer)共同作为缓冲池(BufferPool)的基接口。

整个的Buffer簇的类图如下:

缓冲池工厂(BufferPoolFactory)用于创建和销毁缓冲池,网络缓冲池(NetworkBufferPool)是其唯一的实现者。NetworkBufferPool缓存了固定数目的内存段,主要用于网络栈通信。

NetworkBufferPool在构造器的参数中要求指定其缓存的内存段数目,然后它会初始化固定大小的一个队列作为内存段池。与此同时,构造器参数还允许指定内存段大小以及Flink自主管理的内存类型。并根据这些参数初始化队列中的内存段:

if (memoryType == MemoryType.HEAP) {
    for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
        byte[] memory = new byte[segmentSize];
        availableMemorySegments.add(MemorySegmentFactory.wrapPooledHeapMemory(memory, null));
    }
} else if (memoryType == MemoryType.OFF_HEAP) {
    for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
        ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
        availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
    }
}

上面的代码段中调用了我们在分析内存管理时分析的内存段工厂(MemorySegmentFactory),注意这里wrapPooledXXX方法其实没什么特殊的,只是新建了相关的内存段实例。不要被其方法名迷惑,所谓的池化的机制都是要在外部维护,比如这里的NetworkBufferPool定义了维护内存段池(也即availableMemorySegments)的一系列方法,比如requestMemorySegment、recycle、destroy等。

因为BufferPool当前只有LocalBufferPool这一个实现,所以NetworkBufferPool在实现BufferPoolFactory的createBufferPool方法时会直接实例化LocalBufferPool。NetworkBufferPool用一个Set维护了其所创建的所有LocalBufferPool的引用。createBufferPool方法要求在创建时指定需要创建的是固定大小的BufferPool还是非固定大小的BufferPool。如果是非固定大小的,NetworkBufferPool也专门提供了一个Set来维护它们,这主要是为了在创建或销毁BufferPool时对这些非固定大小的BufferPool里的Buffer进行“重分布”。

这里对非固定大小的BufferPool里的内存段进行重分布值得我们重点关注一下。其实,所有BufferPool所申请的内存段都归属于NetworkBufferPool所维护的内存段池。只有NetworkBufferPool了解内存段池的所有信息,包括剩余可用的内存段数目。当createBufferPool方法或者destroyBufferPool方法被调用时,对应的可用的内存段数目也会相应得产生变化。这时,为了让内存段被合理地分配并加以利用,所有非固定大小的BufferPool都需要根据最新的可用内存段数来重分布其所包含的内存段数目。具体的重分布的实现如下:

private void redistributeBuffers() throws IOException {
    //获得非固定大小的BufferPool个数
    int numManagedBufferPools = managedBufferPools.size();
    //如果没有,则直接返回,避免除零错误
    if (numManagedBufferPools == 0) {
        return;
    }   

    //当前总共可用的内存段数目(未实际分配)
    int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
    //每个BufferPool可额外附“赠”的内存段数目
    int numExcessBuffersPerPool = numAvailableMemorySegment / numManagedBufferPools;
    //当然可用的内存段不一定正好能完全分摊给所有的非固定大小的BufferPool,所以剩下的余量以轮转的方式分摊
    int numLeftoverBuffers = numAvailableMemorySegment % numManagedBufferPools;
    int bufferPoolIndex = 0;
    //遍历BufferPool挨个扩充
    for (LocalBufferPool bufferPool : managedBufferPools) {
        int leftoverBuffers = bufferPoolIndex++ < numLeftoverBuffers ? 1 : 0;
        bufferPool.setNumBuffers(
            bufferPool.getNumberOfRequiredMemorySegments() + numExcessBuffersPerPool + leftoverBuffers
        );
    }
}

该方法的调用环境必须处于同步块中。

LocalBufferPool的setNumBuffers方法并不只是设置一下数目这么简单,具体的逻辑我们暂且按下不表。我们先来看一下LocalBufferPool的实现,它用于管理从NetworkBufferPool申请到的一组Buffer实例。LocalBufferPool中维护着的一些信息:

//当前缓冲区池最少需要的内存段的数目
private final int numberOfRequiredMemorySegments;

//当前可用的内存段,这些内存段已从网络缓冲池中请求到本地,但当前没有被当做缓冲区用于数据传输
private final Queue<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();

//注册过的获取Buffer可用性的侦听器,当无Buffer可用时,才可注册侦听器
private final Queue<EventListener<Buffer>> registeredListeners = new ArrayDeque<EventListener<Buffer>>();

//缓冲池的当前大小
private int currentPoolSize;

//从网络缓冲池请求的以及以某种形式关联着的所有的内存段数目
private int numberOfRequestedMemorySegments;

LocalBufferPool被实例化时,虽然指定了其所需要的内存段的最小数目,但是NetworkBufferPool并没有将这些内存段实例分配给它,也就是说不是预先静态分配的,而是调用方调用requestBuffer方法(来自BufferProvider接口),在内部触发对NetworkBufferPool的实例方法requestMemorySegment的调用进而获取到内存段。我们来看一下requestBuffer方法的实现:

private Buffer requestBuffer(boolean isBlocking) throws InterruptedException, IOException {
    synchronized (availableMemorySegments) {
        //在请求之前,可能需要先返还多余的,也就是超出currentPoolSize的内存段给NetworkBufferPool
        returnExcessMemorySegments();
        boolean askToRecycle = owner != null;

        //当可用内存段队列为空时,说明已没有空闲的内存段,则可能需要从NetworkBufferPool获取
        while (availableMemorySegments.isEmpty()) {
            if (isDestroyed) {
                throw new IllegalStateException("Buffer pool is destroyed.");
            }
            //获取的条件是:所请求的总内存段的数目小于当前池大小
            if (numberOfRequestedMemorySegments < currentPoolSize) {
                //请求一个内存段
                final MemorySegment segment = networkBufferPool.requestMemorySegment();
                if (segment != null) {
                    //所请求的内存段总数目加一,并将请求的内存段加入到可用内存段队列中,然后跳出本轮while循环
                    //注意这里是continue而不是break,这里还必须继续判断队列中是否有元素可用,
                    //因为当前对象可能处于分布式的场景下
                    numberOfRequestedMemorySegments++;
                    availableMemorySegments.add(segment);
                    continue;
                }
            }

            //如果总内存段的数目已大于等于本地缓冲池大小,判断是否需要释放,如果需要,让缓冲区池归属者释放一个内存段
            if (askToRecycle) {
                owner.releaseMemory(1);
            }
            //如果是阻塞式的请求模式,则对当前队列阻塞等待两秒钟,接着仍然继续while循环
            if (isBlocking) {
                availableMemorySegments.wait(2000);
            } else {
                //否则,直接返回空并退出循环
                return null;
            }
        }

        //当有可用内存段时,直接从队列中获取内存段并新建一个Buffer实例
        return new Buffer(availableMemorySegments.poll(), this);
    }
}

在上文我们介绍过,在NetworkBufferPool中创建或者销毁BufferPool时,所有非固定大小的BufferPool会被重分布。在分析其实现是,我们看到了它会调用LocalBufferPool的实例方法setNumBuffers,该方法会调整本地缓冲池的大小,并可能会对其所申请的内存段数目产生影响:

public void setNumBuffers(int numBuffers) throws IOException {
    synchronized (availableMemorySegments) {
        //重分布后新的Buffer数量不得小于最小要求的内存段数量
        checkArgument(numBuffers >= numberOfRequiredMemorySegments,
            "Buffer pool needs at least " + numberOfRequiredMemorySegments +
            " buffers, but tried to set to " + numBuffers + ".");
        //修改缓冲池容量
        currentPoolSize = numBuffers;
        //如果当前保有的内存段数目大于新的缓冲池容量,则将超出部分归还
        //注意这里归还并不是精确强制归还的,当本地缓冲池中没有多余的内存段时,归还动作将会终止
        returnExcessMemorySegments();
        //这是第二重保险,如果Buffer存在归属者且此时本地缓冲区池中保有的内存段仍然大于缓冲池容量
        //则会对多余的内存段进行释放
        if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {
            owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);
        }
    }
}

Buffer是数据交换的载体,在所有涉及到数据交换的地方都会用到它。因此理解其相关的实现对于,理解Flink的整个数据流交换体系非常有帮助。

事件

Flink的数据流中不仅仅只有用户的数据,还包含了一些特殊的事件,这些事件都是由算子注入到数据流中的。它们在每个流分区里伴随着其他的数据元素而被有序地分发。接收到这些事件的算子会对这些事件给出响应,典型的事件类型有:

  • 检查点屏障:用于隔离多个检查点之间的数据,保障快照数据的一致性;
  • 迭代屏障:标识流分区已到达了一个超级步的结尾;
  • 子分区数据结束标记:当消费任务获取到该事件时,表示其所消费的对应的分区中的数据已被全部消费完成;

事件假设一个流分区维持着元素顺序。鉴于此,在Flink中一元算子在消费单一流分区时,能够保证FIFO(先进先出)的元素顺序。而为了保证流处理的速率同时避免反压,算子有时会接收超过一个流分区的元素并将它们合并。综合各种场景,Flink中的数据流在任何形式的重分区或广播之后不提供顺序保证。而对无序元素的处理任务交给算子自行实现。

在Flink中所有事件的最终基类都是AbstractEvent。AbstractEvent这一抽象类又派生出另一个抽象类RuntimeEvent,几乎所有预先内置的事件都直接派生于此。除了预定义的事件外,Flink还支持自定义的扩展事件,所有自定义的事件都继承自派生于AbstractEvent的TaskEvent。总结一下,其类继承关系图如下:

上图中继承自RuntimeEvent的三个事件类就是上文列举的典型事件。其中只有CheckpointBarrier包含检查点编号和时间戳这两个属性,其他两个事件类主要起到标识作用。

原文发布时间为:2016-12-20

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-08-19 21:19:39

Flink运行时之统一的数据交换对象的相关文章

Flink运行时之结果分区消费端

结果分区消费端 在前一篇,我们讲解了生产者分区,生产者分区是生产者任务生产中间结果数据的过程.消费者任务在获得结果分区可用的通知之后,会发起对数据的请求.我们仍然以生产者分区的例子作为假设,其在消费端示意图如下: 可以看到在生产端和消费端存在对等的模型,具体ResultSubpartition中的数据如何被消费,我们将在本篇进行深入剖析. 输入网关 输入网关(InputGate)用于消费中间结果(IntermediateResult)在并行执行时由子任务生产的一个或多个结果分区(ResultPa

Flink运行时之生成作业图

生成作业图 在分析完了流处理程序生成的流图(StreamGraph)以及批处理程序生成的优化后的计划(OptimizedPlan)之后,下一步就是生成它们面向Flink运行时执行引擎的共同抽象--作业图(JobGraph). 什么是作业图 作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一. 相比流图(StreamGraph)以及批处理优化计划(OptimizedPlan),JobGraph发生了一些变化,已

Flink运行时之基于Netty的网络通信上

概述 本文以及接下来的几篇文章将介绍Flink运行时TaskManager间进行数据交换的核心部分--基于Netty通信框架远程请求ResultSubpartition.作为系列文章的第一篇,先列出一些需要了解的基础对象. NettyConnectionManager Netty连接管理器(NettyConnectionManager)是连接管理器接口(ConnectionManager)针对基于Netty的远程连接管理的实现者.它是TaskManager中负责网络通信的网络环境对象(Netwo

Flink运行时之合久必分的特定任务

合久必分的特定任务 前面我们谈到了TaskManager对每个Task实例会启动一个独立的线程来执行.在分析线程执行的核心代码时,我们看到最终执行的是AbstractInvokable这样执行体的invoke方法.所谓合久必分,鉴于流处理任务跟批处理任务执行模式上存在巨大的差异,在对AbstractInvokable的实现时,它们将会走向两个不同的分支. 流处理相关的任务 流处理所对应的任务的继承关系图如下: 从上面的继承关系图可见,StreamTask是流处理任务的抽象.因为在DataStre

Flink运行时之生产端结果分区

生产端结果分区 生产者结果分区是生产端任务所产生的结果.以一个简单的MapReduce程序为例,从静态的角度来看,生产端的算子(Map)跟消费端的算子(Reduce),两者之间交换数据通过中间结果集(IntermediateResult).形如下图: 而IntermediateResult只是在静态表述时的一种概念,在运行时,算子会被分布式部署.执行,我们假设两个算子的并行度都为2,那么对应的运行时模型如下图: 生产端的Map算子会产生两个子任务实例,它们各自都会产生结果分区(ResultPar

Flink运行时之客户端提交作业图-上

客户端提交作业图 作业图(JobGraph)是Flink的运行时所能理解的作业表示,无论程序通过是DataStream还是DataSet API编写的,它们的JobGraph提交给JobManager以及之后的处理都将得到统一.本篇我们将分析客户端如何提交JobGraph给JobManager. 流处理程序提交作业图 在前面讲解Flink的核心概念的时候我们谈到了Flink利用了"惰性求值"的概念,只有当最终调用execute方法时,才会真正开始执行.因此,execute方法是我们的切

Flink运行时之网络通信NetworkEnvironment分析

网络环境(NetworkEnvironment)是TaskManager进行网络通信的主对象,主要用于跟踪中间结果并负责所有的数据交换.每个TaskManager的实例都包含一个网络环境对象,在TaskManager启动时创建.NetworkEnvironment管理着多个协助通信的关键部件,它们是: NetworkBufferPool:网络缓冲池,负责申请一个TaskManager的所有的内存段用作缓冲池: ConnectionManager:连接管理器,用于管理本地(远程)通信连接: Res

Flink运行时之基于Netty的网络通信中

PartitionRequestClient 分区请求客户端(PartitionRequestClient)用于发起远程PartitionRequest请求,它也是RemoteChannel跟Netty通信层之间进行衔接的对象. 对单一的TaskManager而言只存在一个NettyClient实例.但处于同一TaskManager中不同的任务实例可能会跟不同的远程TaskManager上的任务之间交换数据,不同的TaskManager实例会有不同的ConnectionID(用于标识不同的IP地

Flink运行时之基于Netty的网络通信(下)

客户端核心处理器 这一篇,我们分析一下客户端协议栈中的核心的处理器PartitionRequestClientHandler,该处理器用于处理服务端的响应消息. 我们以客户端获取到响应之后回调该处理器的channelRead方法为入口来进行分析: public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { //当没有待解析的原始消息时,直接解码消息,否则将消息加入到stagedMe