Flink - Working with State

All transformations in Flink may look like functions (in the functional processing terminology), but are in fact stateful operators. 
You can make every transformation (mapfilter, etc) stateful by using Flink’s state interface or checkpointing instance fields of your function. 
You can register any instance field as managed state by implementing an interface. 
In this case, and also in the case of using Flink’s native state interface, Flink will automatically take consistent snapshots of your state periodically, and restore its value in the case of a failure.

讨论如何使用Flink的state接口来管理状态数据,对于这些状态数据,Flink会自动的定期做snapshots,并且当failure后,会自动restore这些状态

 

State主要可以分为两种,Keyed State and Operator State

 

Keyed State

Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStream.

You can think of Keyed State as Operator State that has been partitioned, or sharded, with exactly one state-partition per key. Each keyed-state is logically bound to a unique composite of <parallel-operator-instance, key>, and since each key “belongs” to exactly one parallel instance of a keyed operator, we can think of this simply as <operator, key>.

Keyed State is further organized into so-called Key Groups. Key Groups are the atomic unit by which Flink can redistribute Keyed State; there are exactly as many Key Groups as the defined maximum parallelism. During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.

Keyed state 只能用于KeyedStream

Keyed state会以key做partitioned, or sharded,每个keyed-state都会关联一个parallel-operator-instance

Keyed state的问题在于,在并发度增加时,需要把Keyed State切分开

为了便于keyed state的迁移和管理,实现Key Groups,这是Flink redistribute的最小单位

 

Operator State

With Operator State (or non-keyed state), each operator state is bound to one parallel operator instance. The Kafka source connector is a good motivating example for the use of Operator State in Flink. Each parallel instance of this Kafka consumer maintains a map of topic partitions and offsets as its Operator State.

The Operator State interfaces support redistributing state among parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution; the following are currently defined:

  • List-style redistribution: Each operator returns a List of state elements. The whole state is logically a concatenation of all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators. Each operator gets a sublist, which can be empty, or contain one or more elements.

Operator State就是non-keyed state,比如Kafka source connector,这种operator state在做redistribution,比较简单

 

Raw and Managed State

Keyed State and Operator State exist in two forms: managed and raw.

Managed State is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. Examples are “ValueState”, “ListState”, etc. Flink’s runtime encodes the states and writes them into the checkpoints.

Raw State is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into the checkpoint. Flink knows nothing about the state’s data structures and sees only the raw bytes.

All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. Using managed state (rather than raw state) is recommended, since with managed state Flink is able to automatically redistribute state when the parallelism is changed, and also do better memory management.

Raw是没有被Flink管理的状态

 

Using Managed Keyed State

The Key/Value state interface provides access to different types of state that are all scoped to the key of the current input element. 
This means that this type of state can only be used on a KeyedStream, which can be created via stream.keyBy(…).

Key/Value state 只能用于KeyedStream

Now, we will first look at the different types of state available and then we will see how they can be used in a program. The available state primitives are:

  • ValueState<T>: This keeps a value that can be updated and retrieved (scoped to key of the input element, mentioned above, so there will possibly be one value for each key that the operation sees). The value can be set using update(T)and retrieved using T value().
  • ListState<T>: This keeps a list of elements. You can append elements and retrieve an Iterable over all currently stored elements. Elements are added using add(T), the Iterable can be retrieved using Iterable<T> get().
  • ReducingState<T>: This keeps a single value that represents the aggregation of all values added to the state. The interface is the same as for ListState but elements added using add(T) are reduced to an aggregate using a specifiedReduceFunction.

All types of state also have a method clear() that clears the state for the currently active key (i.e. the key of the input element).

3种不同类型的state,

ValueState,单值的state,可以通过update(T)T value()来操作

ListState<T>, 多只的state,通过add(T)或Iterable<T> get()来操作和访问

ReducingState<T>,多值状态,但是只保留reduce的结果

并且所有的state,都有clear,来清除状态数据

It is important to keep in mind that these state objects are only used for interfacing with state. The state is not necessarily stored inside but might reside on disk or somewhere else. 
The second thing to keep in mind is that the value you get from the state depend on the key of the input element. 
So the value you get in one invocation of your user function can be different from the one you get in another invocation if the key of the element is different.

这些state对象只能被状态接口使用, 
并且取出的状态对象,取决于input element的key;所以不同的调用user function 得到的state value是不一样的,因为element的key 可能不同

 

To get a state handle you have to create a StateDescriptor this holds the name of the state (as we will later see you can create several states, and they have to have unique names so that you can reference them), the type of the values that the state holds and possibly a user-specified function, such as a ReduceFunction. Depending on what type of state you want to retrieve you create one of ValueStateDescriptorListStateDescriptor or ReducingStateDescriptor.

对于state,需要一个StateDescriptor ,作为name用于reference这个state,如果你定义多个state,他们的StateDescriptor 必须是unique的。 
不同类型的state,有不同类型的StateDescriptor

 

State is accessed using the RuntimeContext, so it is only possible in rich functions
Please see here for information about that but we will also see an example shortly. 
The RuntimeContext that is available in a RichFunction has these methods for accessing state:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)

 

State对象通过RuntimeContext的接口获取到,当然不同类型的state,对应于不同的接口; 
关键是,如果要使用state,必须要使用rich function,用普通的function是无法获取到的

This is an example FlatMapFunction that shows how all of the parts fit together:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

 

Using Managed Operator State

A stateful function can implement either the more general CheckpointedFunction interface, or the ListCheckpointed<T extends Serializable> interface.

In both cases, the non-keyed state is expected to be a List of serializable objects, independent from each other, thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the BufferingSink contains elements(test1, 2) and (test2, 2), when increasing the parallelism to 2, (test1, 2) may end up in task 0, while (test2, 2) will go to task 1.

Operator state,即non-keyed state , 被表示为serializable 对象列表,这些对象间是无关的,所以在变更parallelism 时,只需要简单的repartitioned

可以通过实现ListCheckpointed<T extends Serializable>CheckpointedFunction接口,来实现对operator state的管理

 

ListCheckpointed

The ListCheckpointed interface requires the implementation of two methods:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

On snapshotState() the operator should return a list of objects to checkpoint and restoreState has to handle such a list upon recovery. 
If the state is not re-partitionable, you can always return a Collections.singletonList(MY_STATE) in thesnapshotState().

Collections.singletonList表示不可变列表

 

CheckpointedFunction

The CheckpointedFunction interface also requires the implementation of two methods:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

Whenever a checkpoint has to be performed snapshotState() is called. The counterpart, initializeState(), is called every time the user-defined function is initialized, be that when the function is first initialized or be that when actually recovering from an earlier checkpoint. Given this, initializeState() is not only the place where different types of state are initialized, but also where state recovery logic is included.

This is an example of a function that uses CheckpointedFunction, a stateful SinkFunction that uses state to buffer elements before sending them to the outside world:

给个例子,stateful sinkfunction,在发送前先cache,

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction,
                   CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        checkpointedState = context.getOperatorStateStore().
            getSerializableListState("buffered-elements"); //通过context初始化state

        if (context.isRestored()) { //如果context中有可以restore的数据
            for (Tuple2<String, Integer> element : checkpointedState.get()) { //restore
                bufferedElements.add(element);
            }
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear(); //清空
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element); //snapshot
        }
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { //这干嘛用的?
        // this is from the CheckpointedRestoring interface.
        this.bufferedElements.addAll(state);
    }
}

 

 

Stateful Source Functions

Stateful sources require a bit more care as opposed to other operators. 
In order to make the updates to the state and output collection atomic (required for exactly-once semantics on failure/recovery), the user is required to get a lock from the source’s context.

对于有状态的source,有些不一样的是,在更新state和output时,注意要加锁来保证exactly-once,比如避免多个线程同时更新offset

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  current offset for exactly once semantics */
    private Long offset;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) { //加锁保证原子性
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset); //不可变list,表示不可re-partitionable
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}

 

 

https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/state_backends.html

 

State Backends

Programs written in the Data Stream API often hold state in various forms:

  • Windows gather elements or aggregates until they are triggered
  • Transformation functions may use the key/value state interface to store values
  • Transformation functions may implement the Checkpointed interface to make their local variables fault tolerant

主要的state,包含几种,

windows里面gather的elements 
Transformation functions中用key/value state interface创建的state 
Transformation functions 中通过Checkpointed interface 去对local variables做的state

 

When checkpointing is activated, such state is persisted upon checkpoints to guard against data loss and recover consistently. 
How the state is represented internally, and how and where it is persisted upon checkpoints depends on the chosen State Backend.

关键,state如何和存到何处,还是看具体用什么State Backend

 

Available State Backends

Out of the box, Flink bundles these state backends:

  • MemoryStateBacked
  • FsStateBackend
  • RocksDBStateBackend

If nothing else is configured, the system will use the MemoryStateBacked.

当前有3种state backends,默认的是用MemoryStateBacked

 

The MemoryStateBackend

The MemoryStateBacked holds data internally as objects on the Java heap. Key/value state and window operators hold hash tables that store the values, triggers, etc.

Upon checkpoints, this state backend will snapshot the state and send it as part of the checkpoint acknowledgement messages to the JobManager (master), which stores it on its heap as well.

MemoryStateBackend顾名思义,就是state是存储在Java heap中的;在做checkpoints的时候,state backend 会将state snapshot放入 checkpoint acknowledgement messages 发给JobManager,JobManager 仍然是将它存在heap中。

 

The FsStateBackend

The FsStateBackend is configured with a file system URL (type, address, path), such as for example “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.

The FsStateBackend holds in-flight data in the TaskManager’s memory. Upon checkpointing, it writes state snapshots into files in the configured file system and directory. 
Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).

State snapshot数据是存在文件系统中的,而JobManager的内存中,只是存放最小的元数据

 

The RocksDBStateBackend

只是用RocksDB来替换文件系统,

NOTE: To use the RocksDBStateBackend you also have to add the correct maven dependency to your project:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
  <version>1.0.3</version>
</dependency>

The backend is currently not part of the binary distribution. See here for an explanation of how to include it for cluster execution.

 

Configuring a State Backend

State backends can be configured per job. In addition, you can define a default state backend to be used when the job does not explicitly define a state backend.

Setting the Per-job State Backend

The per-job state backend is set on the StreamExecutionEnvironment of the job, as shown in the example below:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

 

Setting Default State Backend

A default state backend can be configured in the flink-conf.yaml, using the configuration key state.backend.

Possible values for the config entry are jobmanager (MemoryStateBackend), filesystem (FsStateBackend), or the fully qualified class name of the class that implements the state backend factory FsStateBackendFactory.

In the case where the default state backend is set to filesystem, the entry state.backend.fs.checkpointdir defines the directory where the checkpoint data will be stored.

A sample section in the configuration file could look as follows:

# The backend that will be used to store operator state checkpoints

state.backend: filesystem

# Directory for storing checkpoints

state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints
时间: 2024-12-24 18:33:51

Flink - Working with State的相关文章

Apache Flink fault tolerance源码剖析(五)

上一篇文章我们谈论了保存点的相关内容,其中就谈到了保存点状态的存储.这篇文章我们来探讨用户程序状态的存储,也是在之前的文章中多次提及的state backend(中文暂译为状态终端). 基于数据流API而编写的程序经常以各种各样的形式保存着状态: 窗口收集/聚合元素(这里的元素可以看作是窗口的状态)直到它们被触发 转换函数可能会使用key/value状态接口来存储数据 转换函数可能实现Checkpointed接口来让它们的本地变量受益于fault tolerant机制 当检查点机制工作时,上面谈

Flink原理与实现:详解Flink中的状态管理

Flink原理与实现系列文章 : Flink 原理与实现:架构和拓扑概览Flink 原理与实现:如何生成 StreamGraphFlink 原理与实现:如何生成 JobGraphFlink原理与实现:如何生成ExecutionGraph及物理执行图Flink原理与实现:Operator Chain原理 上面Flink原理与实现的文章中,有引用word count的例子,但是都没有包含状态管理.也就是说,如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算.从

Flink - RocksDBStateBackend

如果要考虑易用性和效率,使用rocksDB来替代普通内存的kv是有必要的 有了rocksdb,可以range查询,可以支持columnfamily,可以各种压缩 但是rocksdb本身是一个库,是跑在RocksDBStateBackend中的 所以taskmanager挂掉后,数据还是没了, 所以RocksDBStateBackend仍然需要类似HDFS这样的分布式存储来存储snapshot   kv state需要由rockdb来管理,这是和内存或file backend最大的不同 Abstr

Flink - state管理

在Flink – Checkpoint 没有描述了整个checkpoint的流程,但是对于如何生成snapshot和恢复snapshot的过程,并没有详细描述,这里补充   StreamOperator /** * Basic interface for stream operators. Implementers would implement one of * {@link org.apache.flink.streaming.api.operators.OneInputStreamOper

Flink - state

public class StreamTaskState implements Serializable, Closeable { private static final long serialVersionUID = 1L; private StateHandle<?> operatorState; private StateHandle<Serializable> functionState; private HashMap<String, KvStateSnapsho

Flink运行时之流处理程序生成流图

流处理程序生成流图 DataStream API所编写的流处理应用程序在生成作业图(JobGraph)并提交给JobManager之前,会预先生成流图(StreamGraph). 什么是流图 流图(StreamGraph)是表示流处理程序拓扑的数据结构,它封装了生成作业图(JobGraph)的必要信息.它的类继承关系如下图所示: 当你基于StreamGraph的继承链向上追溯,会发现它实现了FlinkPlan接口. Flink效仿了传统的关系型数据库在执行SQL时生成执行计划并对其进行优化的思路

Flink运行时之TaskManager执行Task

TaskManager执行任务 当一个任务被JobManager部署到TaskManager之后,它将会被执行.本篇我们将分析任务的执行细节. submitTask方法分析 一个任务实例被部署所产生的实际影响就是JobManager会将一个TaskDeploymentDescriptor对象封装在SubmitTask消息中发送给TaskManager.而处理该消息的入口方法是submitTask方法,它是TaskManager接收任务部署并启动任务执行的入口方法,值得我们关注一下它的实现细节.

Flink之CEP-API简介

CEP API的核心是Pattern API,它允许你快速定义复杂的事件模式.每个模式包含多个阶段(stage)或者我们也可称为状态(state).为了从一个状态切换到另一个状态,用户可以指定条件,这些条件可以作用在邻近的事件或独立事件上. Pattern在外部无法通过构造器进行实例化,构造器的访问限定符是protected的,因此Pattern对象只能通过begin和next以及followedBy(用于创建其派生类FollowedByPattern)来创建,在创建时需要指定其名称. 每个模式

Apache Flink fault tolerance源码剖析(二)

继续Flink Fault Tolerance机制剖析.上一篇文章我们结合代码讲解了Flink中检查点是如何应用的(如何根据快照做失败恢复,以及检查点被应用的场景),这篇我们来谈谈检查点的触发机制以及基于Actor的消息驱动的协同机制.这篇涉及到一个非常关键的类--CheckpointCoordinator. org.apache.flink.runtime.checkpoint.CheckpointCoordinator 该类可以理解为检查点的协调器,用来协调operator和state的分布