Flink - state

public class StreamTaskState implements Serializable, Closeable {

    private static final long serialVersionUID = 1L;

    private StateHandle<?> operatorState;

    private StateHandle<Serializable> functionState;

    private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;

Flink中state分为三种,

可以看到,StreamTaskState是对三种state的封装,

1. KVState

是最基本的state,

抽象是一对,KvState和KvStateSnapshot 
通过两个接口,互相转化

/**
 * Key/Value state implementation for user-defined state. The state is backed by a state
 * backend, which typically follows one of the following patterns: Either the state is stored
 * in the key/value state object directly (meaning in the executing JVM) and snapshotted by the
 * state backend into some store (during checkpoints), or the key/value state is in fact backed
 * by an external key/value store as the state backend, and checkpoints merely record the
 * metadata of what is considered part of the checkpoint.
 *
 * @param <K> The type of the key.
 * @param <N> The type of the namespace.
 * @param <S> The type of {@link State} this {@code KvState} holds.
 * @param <SD> The type of the {@link StateDescriptor} for state {@code S}.
 * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.
 */
public interface KvState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> {

    /**
     * Sets the current key, which will be used when using the state access methods.
     *
     * @param key The key.
     */
    void setCurrentKey(K key);

    /**
     * Sets the current namespace, which will be used when using the state access methods.
     *
     * @param namespace The namespace.
     */
    void setCurrentNamespace(N namespace);

    /**
     * Creates a snapshot of this state.
     *
     * @param checkpointId The ID of the checkpoint for which the snapshot should be created.
     * @param timestamp The timestamp of the checkpoint.
     * @return A snapshot handle for this key/value state.
     *
     * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system
     *                   can react to failed snapshots.
     */
    KvStateSnapshot<K, N, S, SD, Backend> snapshot(long checkpointId, long timestamp) throws Exception;

    /**
     * Disposes the key/value state, releasing all occupied resources.
     */
    void dispose();
}

定义也比较简单,关键是snapshot接口,产生KvStateSnapshot

public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
        extends StateObject {

    /**
     * Loads the key/value state back from this snapshot.
     *
     * @param stateBackend The state backend that created this snapshot and can restore the key/value state
     *                     from this snapshot.
     * @param keySerializer The serializer for the keys.
     * @param classLoader The class loader for user-defined types.
     *
     * @return An instance of the key/value state loaded from this snapshot.
     *
     * @throws Exception Exceptions can occur during the state loading and are forwarded.
     */
    KvState<K, N, S, SD, Backend> restoreState(
        Backend stateBackend,
        TypeSerializer<K> keySerializer,
        ClassLoader classLoader) throws Exception;
}

KvStateSnapshot,对应于KvState,关键是restoreState接口

以具体的,FsState为例,

public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {

可以看到AbstractFsState是继承AbstractHeapState的,因为对于FsState的状态也是cache在Heap中的,只是在snapshot的时候需要写文件

所以先看下AbstractHeapState,

/**
 * Base class for partitioned {@link ListState} implementations that are backed by a regular
 * heap hash map. The concrete implementations define how the state is checkpointed.
 *
 * @param <K> The type of the key.
 * @param <N> The type of the namespace.
 * @param <SV> The type of the values in the state.
 * @param <S> The type of State
 * @param <SD> The type of StateDescriptor for the State S
 * @param <Backend> The type of the backend that snapshots this key/value state.
 */
public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
        implements KvState<K, N, S, SD, Backend>, State {

    /** Map containing the actual key/value pairs */
    protected final HashMap<N, Map<K, SV>> state; //可以看到这里,多了个namespace的概念,避免key太容易重复

    /** Serializer for the state value. The state value could be a List<V>, for example. */
    protected final TypeSerializer<SV> stateSerializer;

    /** The serializer for the keys */
    protected final TypeSerializer<K> keySerializer;

    /** The serializer for the namespace */
    protected final TypeSerializer<N> namespaceSerializer;

    /** This holds the name of the state and can create an initial default value for the state. */
    protected final SD stateDesc; //StateDescriptor,用于放一些state的信息,比如default值

    /** The current key, which the next value methods will refer to */
    protected K currentKey;

    /** The current namespace, which the access methods will refer to. */
    protected N currentNamespace = null;

    /** Cache the state map for the current key. */
    protected Map<K, SV> currentNSState;

    /**
     * Creates a new empty key/value state.
     *
     * @param keySerializer The serializer for the keys.
     * @param namespaceSerializer The serializer for the namespace.
     * @param stateDesc The state identifier for the state. This contains name
     *                           and can create a default state value.
     */
    protected AbstractHeapState(TypeSerializer<K> keySerializer,
        TypeSerializer<N> namespaceSerializer,
        TypeSerializer<SV> stateSerializer,
        SD stateDesc) {
        this(keySerializer, namespaceSerializer, stateSerializer, stateDesc, new HashMap<N, Map<K, SV>>());
    }

 
AbstractFsState

public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
        extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {

    /** The file system state backend backing snapshots of this state */
    private final FsStateBackend backend;

    public abstract KvStateSnapshot<K, N, S, SD, FsStateBackend> createHeapSnapshot(Path filePath); //

    @Override
    public KvStateSnapshot<K, N, S, SD, FsStateBackend> snapshot(long checkpointId, long timestamp) throws Exception {

        try (FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { //

            // serialize the state to the output stream
            DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(new DataOutputStream(out));
            outView.writeInt(state.size());
            for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) {
                N namespace = namespaceState.getKey();
                namespaceSerializer.serialize(namespace, outView);
                outView.writeInt(namespaceState.getValue().size());
                for (Map.Entry<K, SV> entry: namespaceState.getValue().entrySet()) {
                    keySerializer.serialize(entry.getKey(), outView);
                    stateSerializer.serialize(entry.getValue(), outView);
                }
            }
            outView.flush(); //真实的内容是刷到文件的

            // create a handle to the state
            return createHeapSnapshot(out.closeAndGetPath()); //snapshot里面需要的只是path
        }
    }
}

 

对于kv state,也分为好几类,valuestate,liststate,reducestate,foldstate,

简单起见,先看valuestate

public class FsValueState<K, N, V>
    extends AbstractFsState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
    implements ValueState<V> {

    @Override
    public V value() {
        if (currentNSState == null) {
            currentNSState = state.get(currentNamespace); //现初始化当前namespace的kv
        }
        if (currentNSState != null) {
            V value = currentNSState.get(currentKey);
            return value != null ? value : stateDesc.getDefaultValue(); //取出value,如果为null,从stateDesc中取出default
        }
        return stateDesc.getDefaultValue();
    }

    @Override
    public void update(V value) {
        if (currentKey == null) {
            throw new RuntimeException("No key available.");
        }

        if (value == null) {
            clear();
            return;
        }

        if (currentNSState == null) {
            currentNSState = new HashMap<>();
            state.put(currentNamespace, currentNSState);
        }

        currentNSState.put(currentKey, value); //更新
    }

    @Override
    public KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, FsStateBackend> createHeapSnapshot(Path filePath) {
        return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, filePath); //以文件路径,创建snapshot
    }

 

继续看FsStateSnapshot

public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
        extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {

    public abstract KvState<K, N, S, SD, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, SV>> stateMap); //

    @Override
    public KvState<K, N, S, SD, FsStateBackend> restoreState(
        FsStateBackend stateBackend,
        final TypeSerializer<K> keySerializer,
        ClassLoader classLoader) throws Exception {

        // state restore
        ensureNotClosed();

        try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {
            // make sure the in-progress restore from the handle can be closed
            registerCloseable(inStream);

            DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);

            final int numKeys = inView.readInt();
            HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);

            for (int i = 0; i < numKeys; i++) {
                N namespace = namespaceSerializer.deserialize(inView);
                final int numValues = inView.readInt();
                Map<K, SV> namespaceMap = new HashMap<>(numValues);
                stateMap.put(namespace, namespaceMap);
                for (int j = 0; j < numValues; j++) {
                    K key = keySerializer.deserialize(inView);
                    SV value = stateSerializer.deserialize(inView);
                    namespaceMap.put(key, value);
                }
            }

            return createFsState(stateBackend, stateMap); //
        }
        catch (Exception e) {
            throw new Exception("Failed to restore state from file system", e);
        }
    }
}

 

FsValueState内部实现的snapshot

public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> {
    private static final long serialVersionUID = 1L;

    public Snapshot(TypeSerializer<K> keySerializer,
        TypeSerializer<N> namespaceSerializer,
        TypeSerializer<V> stateSerializer,
        ValueStateDescriptor<V> stateDescs,
        Path filePath) {
        super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
    }

    @Override
    public KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, V>> stateMap) {
        return new FsValueState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap);
    }
}

 

2. FunctionState

stateHandle对于KvState,更为通用一些

/**
 * StateHandle is a general handle interface meant to abstract operator state fetching.
 * A StateHandle implementation can for example include the state itself in cases where the state
 * is lightweight or fetching it lazily from some external storage when the state is too large.
 */
public interface StateHandle<T> extends StateObject {

    /**
     * This retrieves and return the state represented by the handle.
     *
     * @param userCodeClassLoader Class loader for deserializing user code specific classes
     *
     * @return The state represented by the handle.
     * @throws java.lang.Exception Thrown, if the state cannot be fetched.
     */
    T getState(ClassLoader userCodeClassLoader) throws Exception;
}

 

3. OperatorState,典型的是windowOperater的状态

OperatorState,也是用StateHandle作为,snapshot的抽象

 

看下这三种State如何做snapshot的

AbstractStreamOperator,看看和checkpoint相关的接口,可以看到只会snapshot KvState

@Override
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
    // here, we deal with key/value state snapshots

    StreamTaskState state = new StreamTaskState();

    if (stateBackend != null) {
        HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
            stateBackend.snapshotPartitionedState(checkpointId, timestamp);
        if (partitionedSnapshots != null) {
            state.setKvStates(partitionedSnapshots);
        }
    }

    return state;
}

@Override
@SuppressWarnings("rawtypes,unchecked")
public void restoreState(StreamTaskState state) throws Exception {
    // restore the key/value state. the actual restore happens lazily, when the function requests
    // the state again, because the restore method needs information provided by the user function
    if (stateBackend != null) {
        stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates());
    }
}

@Override
public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
    if (stateBackend != null) {
        stateBackend.notifyOfCompletedCheckpoint(checkpointId);
    }
}

 

AbstractUdfStreamOperator
public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT>

这个首先继承了AbstractStreamOperator,看下checkpoint相关的接口,

@Override
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
    StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp); //先执行super的snapshotOperatorState,即Kv state的snapshot

    if (userFunction instanceof Checkpointed) {
        @SuppressWarnings("unchecked")
        Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;

        Serializable udfState;
        try {
            udfState = chkFunction.snapshotState(checkpointId, timestamp); //snapshot,function的状态
        }
        catch (Exception e) {
            throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
        }

        if (udfState != null) {
            try {
                AbstractStateBackend stateBackend = getStateBackend();
                StateHandle<Serializable> handle =
                        stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp); //调用stateBackend存储state,并返回snapshot
                state.setFunctionState(handle);
            }
            catch (Exception e) {
                throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
                        + e.getMessage(), e);
            }
        }
    }

    return state;
}

@Override
public void restoreState(StreamTaskState state) throws Exception {
    super.restoreState(state);

    StateHandle<Serializable> stateHandle =  state.getFunctionState();

    if (userFunction instanceof Checkpointed && stateHandle != null) {
        @SuppressWarnings("unchecked")
        Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;

        Serializable functionState = stateHandle.getState(getUserCodeClassloader());
        if (functionState != null) {
            try {
                chkFunction.restoreState(functionState);
            }
            catch (Exception e) {
                throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
            }
        }
    }
}

@Override
public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
    super.notifyOfCompletedCheckpoint(checkpointId);

    if (userFunction instanceof CheckpointListener) {
        ((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);
    }
}

可以看到这个operater,会snapshot kv state,和udf中的function的state

 

WindowOperator,典型的operater state
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable

public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {

    if (mergingWindowsByKey != null) {
        TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
        ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
        for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) {
            setKeyContext(key.getKey());
            ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
            mergeState.clear();
            key.getValue().persist(mergeState);
        }
    }

    StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);

    AbstractStateBackend.CheckpointStateOutputView out =
        getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);

    snapshotTimers(out);

    taskState.setOperatorState(out.closeAndGetHandle());

    return taskState;
}

@Override
public void restoreState(StreamTaskState taskState) throws Exception {
    super.restoreState(taskState);

    final ClassLoader userClassloader = getUserCodeClassloader();

    @SuppressWarnings("unchecked")
    StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
    DataInputView in = inputState.getState(userClassloader);

    restoreTimers(in);
}
时间: 2024-10-25 21:48:06

Flink - state的相关文章

Flink - state管理

在Flink – Checkpoint 没有描述了整个checkpoint的流程,但是对于如何生成snapshot和恢复snapshot的过程,并没有详细描述,这里补充   StreamOperator /** * Basic interface for stream operators. Implementers would implement one of * {@link org.apache.flink.streaming.api.operators.OneInputStreamOper

【资料合集】Apache Flink 精选PDF下载

Apache Flink是一款分布式.高性能的开源流式处理框架,在2015年1月12日,Apache Flink正式成为Apache顶级项目.目前Flink在阿里巴巴.Bouygues Teleccom.Capital One等公司得到应用,如阿里巴巴对Apache Flink的应用案例. 为了更好地让大家了解和使用Apache Flink,我们收集了25+个Flink相关的演讲PDF(资料来自Apache Flink官网推荐)和相关文章,供大家参考. PDF下载 Robert Metzger:

Flink - Working with State

All transformations in Flink may look like functions (in the functional processing terminology), but are in fact stateful operators.  You can make every transformation (map, filter, etc) stateful by using Flink's state interface or checkpointing inst

Apache Flink fault tolerance源码剖析(五)

上一篇文章我们谈论了保存点的相关内容,其中就谈到了保存点状态的存储.这篇文章我们来探讨用户程序状态的存储,也是在之前的文章中多次提及的state backend(中文暂译为状态终端). 基于数据流API而编写的程序经常以各种各样的形式保存着状态: 窗口收集/聚合元素(这里的元素可以看作是窗口的状态)直到它们被触发 转换函数可能会使用key/value状态接口来存储数据 转换函数可能实现Checkpointed接口来让它们的本地变量受益于fault tolerant机制 当检查点机制工作时,上面谈

Flink运行时之流处理程序生成流图

流处理程序生成流图 DataStream API所编写的流处理应用程序在生成作业图(JobGraph)并提交给JobManager之前,会预先生成流图(StreamGraph). 什么是流图 流图(StreamGraph)是表示流处理程序拓扑的数据结构,它封装了生成作业图(JobGraph)的必要信息.它的类继承关系如下图所示: 当你基于StreamGraph的继承链向上追溯,会发现它实现了FlinkPlan接口. Flink效仿了传统的关系型数据库在执行SQL时生成执行计划并对其进行优化的思路

Flink运行时之TaskManager执行Task

TaskManager执行任务 当一个任务被JobManager部署到TaskManager之后,它将会被执行.本篇我们将分析任务的执行细节. submitTask方法分析 一个任务实例被部署所产生的实际影响就是JobManager会将一个TaskDeploymentDescriptor对象封装在SubmitTask消息中发送给TaskManager.而处理该消息的入口方法是submitTask方法,它是TaskManager接收任务部署并启动任务执行的入口方法,值得我们关注一下它的实现细节.

Flink之CEP-API简介

CEP API的核心是Pattern API,它允许你快速定义复杂的事件模式.每个模式包含多个阶段(stage)或者我们也可称为状态(state).为了从一个状态切换到另一个状态,用户可以指定条件,这些条件可以作用在邻近的事件或独立事件上. Pattern在外部无法通过构造器进行实例化,构造器的访问限定符是protected的,因此Pattern对象只能通过begin和next以及followedBy(用于创建其派生类FollowedByPattern)来创建,在创建时需要指定其名称. 每个模式

Apache Flink fault tolerance源码剖析(二)

继续Flink Fault Tolerance机制剖析.上一篇文章我们结合代码讲解了Flink中检查点是如何应用的(如何根据快照做失败恢复,以及检查点被应用的场景),这篇我们来谈谈检查点的触发机制以及基于Actor的消息驱动的协同机制.这篇涉及到一个非常关键的类--CheckpointCoordinator. org.apache.flink.runtime.checkpoint.CheckpointCoordinator 该类可以理解为检查点的协调器,用来协调operator和state的分布

Flink - FLIP

https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals   FLIP-1 : Fine Grained Recovery from Task Failures   When a task fails during execution, Flink currently resets the entire execution graph and triggers complete re-executi