Apache Flink fault tolerance源码剖析(三)

上一篇文章我们探讨了基于定时任务的周期性检查点触发机制以及基于Akka的actor模型的消息驱动协同机制。这篇文章我们将探讨Zookeeper在Flink的Fault
Tolerance
所起到的作用。

其实,Flink引入Zookeeper的目的主要是让JobManager实现高可用(leader选举)。

因为Zookeeper在Flink里存在多种应用场景,本篇我们还是将重心放在Fault
Tolerance
上,即讲解Zookeeper在检查点的恢复机制上发挥的作用。

如果用一幅图表示快照机制(检查点)大致的流程可见下图:

跟本文相关的主要有4,5,6三步

两种恢复模式

因为跟本文切实相关,所以先介绍一下JobManagerRecoveryMode(恢复模式)。RecoveryMode作为一个枚举类型,它有两个枚举值:

  • STANDALONE
  • ZOOKEEPER

STANDALONE表示不对JobManager的失败进行恢复。而ZOOKEEPER表示JobManager将基于Zookeeper实现HA(高可用)。

两种类型的检查点

在前面的文章中已经提及过Flink里的检查点分为两种:PendingCheckpoint(正在处理的检查点)和CompletedCheckpoint(完成了的检查点)。

PendingCheckpoint表示一个检查点已经被创建,但还没有得到所有该应答的task的应答。一旦所有的task都给予应答,那么它将会被转化为一个CompletedCheckpointPendingCheckpoint通过toCompletedCheckpoint实例方法来将其转化为已完成了的检查点。其核心实现如下:

if (notYetAcknowledgedTasks.isEmpty()) {
    CompletedCheckpoint completed =  new CompletedCheckpoint(jobId, checkpointId,checkpointTimestamp, System.currentTimeMillis(), new ArrayList<StateForTask>(collectedStates));
    dispose(null, false);
    return completed;
}

它会检查还没有ack该检查点的task集合,如果集合为空(即所有task都已应答),则基于当前实例的属性构建一个CompletedCheckpoint的实例,并最终返回新创建的实例。但在返回之前,调用了dispose进行资源释放。

这个dispose方法是一个私有方法,其内部实现依赖于releaseState这个flag,上面的dispose调用将其置为false,意为不释放task状态:

if (releaseState) {
    for (StateForTask state : collectedStates) {
        state.discard(userClassLoader);
    }
}

但最终,collectedStates这个集合总是会被清空:

collectedStates.clear();
notYetAcknowledgedTasks.clear();

toCompletedCheckpoint方法为什么不释放task的状态呢,因为它的语义只是提供转化操作,其实collectedStates这个集合已经在构造CompletedCheckpoint时被深拷贝给CompletedCheckpoint的实例了。而这些task的状态其最终的释放,将会由CompletedCheckpointdiscard方法完成。

PendingCheckpoint的公共的discard方法的实现就会直接释放收集的状态集合:

public void discard(ClassLoader userClassLoader) {
    dispose(userClassLoader, true);
}

公共的discard方法常用于检查点超时回收以及当最新的检查点已经完成时,距离当前时间更久的未完成的检查点的自动失效

CompletedCheckpoint表示一个已经成功完成了得检查点,当一个检查点在得到所有要求的task的应答之后被认为是一个已完成的检查点。

已完成的检查点的存储

根据JobManager的恢复模式,Flink提供了两种已完成的检查点的存储机制的实现:

  • StandaloneCompletedCheckpointStore
  • ZooKeeperCompletedCheckpointStore

他们都实现了接口CompletedCheckpointStore,这个接口提供了思个值得关注的方法:

  • recover :用于恢复可访问的检查点CompletedCheckpoint的实例
  • addCheckpoint :将已完成的检查点加入到检查点集合
  • getLatestCheckpoint :获得最新的检查点
  • discardAllCheckpoints : 回收所有的已完成的检查点

针对RecoveryModeSTANDALONE提供了StandaloneCompletedCheckpointStore。它提供了一个基于JVM堆内存的ArrayDeque来存放检查点。

而针对RecoveryModeZOOKEEPER提供的ZooKeeperCompletedCheckpointStore要复杂得多。这也是我们关注的重点。它的实现依赖于两个存储机制:

在Zookeeper中的分布式存储:

private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper;

本地JVM内存中的存储:

private final ArrayDeque<Tuple2<StateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;

我们先来看恢复方法recover,恢复的过程首先是从Zookeeper获取所有的检查点,这里为了规避并发修改带来的失败,采用了循环重试的机制:

        while (true) {
            try {
                initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName();
                break;
            }
            catch (ConcurrentModificationException e) {
                LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
            }
        }

在恢复时,将从Zookeeper中读取最新的检查点,如果检查点超过一个,仅仅最新的那个检查点有效,旧的都会被丢弃。如果存在着网络分区,多个JobManager的实例并发对相同的程序实行检查点,那么选择任意一个验证通过的已完成的检查点都是没有问题的。

        if (numberOfInitialCheckpoints > 0) {
            // Take the last one. This is the latest checkpoints, because path names are strictly
            // increasing (checkpoint ID).
            Tuple2<StateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
                    .get(numberOfInitialCheckpoints - 1);

            CompletedCheckpoint latestCheckpoint = latest.f0.getState(userClassLoader);

            checkpointStateHandles.add(latest);

            LOG.info("Initialized with {}. Removing all older checkpoints.", latestCheckpoint);

            for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) {
                try {
                    removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
                }
                catch (Exception e) {
                    LOG.error("Failed to discard checkpoint", e);
                }
            }
        }

discardAllCheckpoints方法会做四件事:

  • 迭代每个检查点,将其从Zookeeper中移除
  • discard每个已完成的检查点
  • discard每个存储的状态
  • 将本地集合清空掉

检查点编号计数器

每个检查点都有各自的编号,为Long类型。根据JobManager的恢复模式分别提供了两种计数器:

  • StandaloneCheckpointIDCounter
  • ZooKeeperCheckpointIDCounter

计数器在这里被认为是一种服务,它具备startstop方法

StandaloneCheckpointIDCounter 只是简单得对
AtomicLong进行了包装,因为在这种模式下,JobManager几乎是不可恢复的,所以这么做就足够了。

ZooKeeperCheckpointIDCounter是基于Zookeeper实现的一种分布式原子累加器。具体的做法是每一个计数器,在Zookeeper上新建一个ZNode,形如:

/flink/checkpoint-counter/<job-id> 1 [persistent]
....
/flink/checkpoint-counter/<job-id> N [persistent]

在Zookeeper中的检查点编号被要求是升序的,这可以使得我们在JobManager失效的情况下,可以拥有一个共享的跨JobManager实例的计数器。

值得一提的是,这里使用的Zookeeper的客户端是CuratorFramework,同时还利用了它附带的SharedCount这一recipes来作为分布式共享的计数器。

而在累加接口方法getAndIncrement的实现上,使用了循环尝试的机制:

    public long getAndIncrement() throws Exception {
        while (true) {
            ConnectionState connState = connStateListener.getLastState();

            if (connState != null) {
                throw new IllegalStateException("Connection state: " + connState);
            }

            VersionedValue<Integer> current = sharedCount.getVersionedValue();

            Integer newCount = current.getValue() + 1;

            if (sharedCount.trySetCount(current, newCount)) {
                return current.getValue();
            }
        }
    }

另外从stop方法的实现来看,如果一个计数器停止,则会再Zookeeper中删除其对应的ZNode

检查点恢复服务

所谓的检查点恢复服务,其实就是聚合了上面的已完成的检查点存储以及检查点编号计数器这两个功能。因为Flink提供了STANDALONE以及ZOOKEEPER这两个恢复模式,所以这里存在一个基于不同模式创建服务的工厂接口CheckpointRecoveryFactory。并针对这两种恢复模式分别提供了两个工厂:StandaloneCheckpointRecoveryFactory以及ZooKeeperCheckpointRecoveryFactory

具体的功能聚合体现在这两个方法上:

    /**
     * Creates a {@link CompletedCheckpointStore} instance for a job.
     *
     * @param jobId           Job ID to recover checkpoints for
     * @param userClassLoader User code class loader of the job
     * @return {@link CompletedCheckpointStore} instance for the job
     */
    CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
            throws Exception;

    /**
     * Creates a {@link CheckpointIDCounter} instance for a job.
     *
     * @param jobId Job ID to recover checkpoints for
     * @return {@link CheckpointIDCounter} instance for the job
     */
    CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception;

两个工厂的具体实现并没有什么特别的地方。检查点恢复服务会被JobManager使用到。

小结

本篇文章我们主要分析了,Zookeeper在Flink的Fault
Tolerance
机制中发挥的作用。但因为Zookeeper在Flink中得主要用途是实现JobManager的高可用,所以里面的部分内容多少还是跟这一主题有所联系。

原文发布时间为:2016-06-02

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-09-13 19:27:09

Apache Flink fault tolerance源码剖析(三)的相关文章

Apache Flink fault tolerance源码剖析(二)

继续Flink Fault Tolerance机制剖析.上一篇文章我们结合代码讲解了Flink中检查点是如何应用的(如何根据快照做失败恢复,以及检查点被应用的场景),这篇我们来谈谈检查点的触发机制以及基于Actor的消息驱动的协同机制.这篇涉及到一个非常关键的类--CheckpointCoordinator. org.apache.flink.runtime.checkpoint.CheckpointCoordinator 该类可以理解为检查点的协调器,用来协调operator和state的分布

Apache Flink fault tolerance源码剖析完结篇

这篇文章是对Flinkfault tolerance的一个总结.虽然还有些细节没有涉及到,但是基本的实现要点在这个系列中都已提及. 回顾这个系列,每篇文章都至少涉及一个知识点.我们来挨个总结一下. 恢复机制实现 Flink中通常需要进行状态恢复的对象是operator以及function.它们通过不同的方式来达到状态快照以及状态恢复的能力.其中function通过实现Checkpointed的接口,而operator通过实现StreamOpeator接口.这两个接口的行为是类似的. 当然对于数据

Apache Flink fault tolerance源码剖析(五)

上一篇文章我们谈论了保存点的相关内容,其中就谈到了保存点状态的存储.这篇文章我们来探讨用户程序状态的存储,也是在之前的文章中多次提及的state backend(中文暂译为状态终端). 基于数据流API而编写的程序经常以各种各样的形式保存着状态: 窗口收集/聚合元素(这里的元素可以看作是窗口的状态)直到它们被触发 转换函数可能会使用key/value状态接口来存储数据 转换函数可能实现Checkpointed接口来让它们的本地变量受益于fault tolerant机制 当检查点机制工作时,上面谈

Apache Flink fault tolerance源码剖析(六)

上篇文章我们分析了基于检查点的用户状态的保存机制--状态终端.这篇文章我们来分析barrier(中文常译为栅栏或者屏障,为了避免引入名称争议,此处仍用英文表示).检查点的barrier是提供exactly once一致性保证的主要保证机制.这篇文章我们会就此展开分析. 这篇文章我们侧重于核心代码分析,原理我们在这个系列的第一篇文章<Flink数据流的Fault Tolerance机制> 一致性保证 Flink的一致性保证也依赖于检查点机制.在利用检查点进行恢复时,数据流会进行重放(replay

Apache Flink fault tolerance源码剖析(四)

上篇文章我们探讨了Zookeeper在Flink的fault tolerance中发挥的作用(存储/恢复已完成的检查点以及检查点编号生成器). 这篇文章会谈论一种特殊的检查点,Flink将之命名为--Savepoint(保存点). 因为保存点只不过是一种特殊的检查点,所以在Flink中并没有太多代码实现.但作为一个特性,值得花费一个篇幅来介绍. 检查点VS保存点 使用数据流API编写的程序可以从保存点来恢复执行.保存点允许你在更新程序的同时还能保证Flink集群不丢失任何状态. 保存点是人工触发

cocos2dx骨骼动画Armature源码剖析(三)_javascript技巧

cocos2dx里骨骼动画代码在cocos -> editor-support -> cocostudio文件夹中,win下通过筛选器,文件结构如下.(mac下没有分,是整个一坨) armature(目录): animation(目录):动画控制相关. CCProcessBase(文件): ProcessBase(类):CCTween和ArmatureAnimation的基类. CCTWeen(文件): Tween(类):控制flash里一个layer的动画. CCArmatureAnimat

MySQL · 引擎介绍 · Sphinx源码剖析(三)

在本节中我会介绍Sphinx在构建索引之前做的一些事情,主要是从mysql拉取数据保存,然后分词排序保存到内存等等一系列的操作.下面是几个相关指令 sql_query = \ SELECT id, group_id, UNIX_TIMESTAMP(date_added) AS date_added, \ title, content \ FROM documents sql_query_range = SELECT MIN(id),MAX(id) FROM documents sql_range

Mongoose源码剖析:外篇之web服务器

引言 在深入Mongoose源码剖析之前,我们应该清楚web服务器是什么?它提供什么服务?怎样提供服务?使用什么协议?客户端如何唯一标识web服务器的资源?下面我们抛开Mongoose,来介绍一个web服务的这些通性. web服务器:通常是指一个计算机程序(web服务器是什么?),在World Wide Web上提供诸如web页面的服务(提供什么服务?),使用HyperText Transfer Protocol(HTTP)(使用什么协议?).当然web服务器也可以指运行这个程序的计算机或虚拟机

日志那点事儿——slf4j源码剖析

前言: 说到日志,大多人都没空去研究,顶多知道用logger.info或者warn打打消息.那么commons-logging,slf4j,logback,log4j,logging又是什么关系呢?其中一二,且听我娓娓道来. 手码不易,转载请注明_xingoo! 涉及到的内容:日志系统的关系.Slf4j下载.源文件jar包的使用.Slf4j源码分析.JVM类加载机制浅谈 首先八卦一下这个日志家族的成员,下面这张图虽然没有包含全部的内容,但是基本也涵盖了日志系统的基本内容,不管怎么说,先记住下面这