Flink - RocksDBStateBackend

如果要考虑易用性和效率,使用rocksDB来替代普通内存的kv是有必要的

有了rocksdb,可以range查询,可以支持columnfamily,可以各种压缩

但是rocksdb本身是一个库,是跑在RocksDBStateBackend中的

所以taskmanager挂掉后,数据还是没了,

所以RocksDBStateBackend仍然需要类似HDFS这样的分布式存储来存储snapshot

 

kv state需要由rockdb来管理,这是和内存或file backend最大的不同

AbstractRocksDBState

/**
 * Base class for {@link State} implementations that store state in a RocksDB database.
 *
 * <p>State is not stored in this class but in the {@link org.rocksdb.RocksDB} instance that
 * the {@link RocksDBStateBackend} manages and checkpoints.
 *
 * @param <K> The type of the key.
 * @param <N> The type of the namespace.
 * @param <S> The type of {@link State}.
 * @param <SD> The type of {@link StateDescriptor}.
 */
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>>
        implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
    /** Serializer for the namespace */
    private final TypeSerializer<N> namespaceSerializer;

    /** The current namespace, which the next value methods will refer to */
    private N currentNamespace;

    /** Backend that holds the actual RocksDB instance where we store state */
    protected RocksDBStateBackend backend;

    /** The column family of this particular instance of state */
    protected ColumnFamilyHandle columnFamily;

    /**
     * We disable writes to the write-ahead-log here.
     */
    private final WriteOptions writeOptions;

    /**
     * Creates a new RocksDB backed state.
     *
     * @param namespaceSerializer The serializer for the namespace.
     */
    protected AbstractRocksDBState(ColumnFamilyHandle columnFamily,
            TypeSerializer<N> namespaceSerializer,
            RocksDBStateBackend backend) {

        this.namespaceSerializer = namespaceSerializer;
        this.backend = backend;

        this.columnFamily = columnFamily;

        writeOptions = new WriteOptions();
        writeOptions.setDisableWAL(true);
    }

    @Override
    public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(long checkpointId,
            long timestamp) throws Exception {
        throw new RuntimeException("Should not be called. Backups happen in RocksDBStateBackend.");
    }
}

 

RocksDBValueState

/**
 * {@link ValueState} implementation that stores state in RocksDB.
 *
 * @param <K> The type of the key.
 * @param <N> The type of the namespace.
 * @param <V> The type of value that the state state stores.
 */
public class RocksDBValueState<K, N, V>
    extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>>
    implements ValueState<V> {

    @Override
    public V value() {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
        try {
            writeKeyAndNamespace(out);
            byte[] key = baos.toByteArray();
            byte[] valueBytes = backend.db.get(columnFamily, key); //从db读出value
            if (valueBytes == null) {
                return stateDesc.getDefaultValue();
            }
            return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
        } catch (IOException|RocksDBException e) {
            throw new RuntimeException("Error while retrieving data from RocksDB.", e);
        }
    }

    @Override
    public void update(V value) throws IOException {
        if (value == null) {
            clear();
            return;
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
        try {
            writeKeyAndNamespace(out);
            byte[] key = baos.toByteArray();
            baos.reset();
            valueSerializer.serialize(value, out);
            backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); //将kv写入db
        } catch (Exception e) {
            throw new RuntimeException("Error while adding data to RocksDB", e);
        }
    }
}

因为对于kv state,key就是当前收到数据的key,所以key是直接从backend.currentKey()中读到;参考,Flink - Working with State

 

RocksDBStateBackend

初始化过程,

/**
 * A {@link StateBackend} that stores its state in {@code RocksDB}. This state backend can
 * store very large state that exceeds memory and spills to disk.
 *
 * <p>All key/value state (including windows) is stored in the key/value index of RocksDB.
 * For persistence against loss of machines, checkpoints take a snapshot of the
 * RocksDB database, and persist that snapshot in a file system (by default) or
 * another configurable state backend.
 *
 * <p>The behavior of the RocksDB instances can be parametrized by setting RocksDB Options
 * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and
 * {@link #setOptions(OptionsFactory)}.
 */
public class RocksDBStateBackend extends AbstractStateBackend {

    // ------------------------------------------------------------------------
    //  Static configuration values
    // ------------------------------------------------------------------------

    /** The checkpoint directory that we copy the RocksDB backups to. */
    private final Path checkpointDirectory;

    /** The state backend that stores the non-partitioned state */
    private final AbstractStateBackend nonPartitionedStateBackend;

    /**
     * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
     * to store state. The different k/v states that we have don't each have their own RocksDB
     * instance. They all write to this instance but to their own column family.
     */
    protected volatile transient RocksDB db; //RocksDB实例

    /**
     * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the
     * file system and location defined by the given URI.
     *
     * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system
     * host and port in the URI, or have the Hadoop configuration that describes the file system
     * (host / high-availability group / possibly credentials) either referenced from the Flink
     * config, or included in the classpath.
     *
     * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.
     * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
     */
    public RocksDBStateBackend(String checkpointDataUri) throws IOException {
        this(new Path(checkpointDataUri).toUri());
    }

    /**
     * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the
     * file system and location defined by the given URI.
     *
     * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system
     * host and port in the URI, or have the Hadoop configuration that describes the file system
     * (host / high-availability group / possibly credentials) either referenced from the Flink
     * config, or included in the classpath.
     *
     * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.
     * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
     */
    public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
        // creating the FsStateBackend automatically sanity checks the URI
        FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri); //仍然使用FsStateBackend来存snapshot

        this.nonPartitionedStateBackend = fsStateBackend;
        this.checkpointDirectory = fsStateBackend.getBasePath();
    }

    // ------------------------------------------------------------------------
    //  State backend methods
    // ------------------------------------------------------------------------

    @Override
    public void initializeForJob(
            Environment env,
            String operatorIdentifier,
            TypeSerializer<?> keySerializer) throws Exception {

        super.initializeForJob(env, operatorIdentifier, keySerializer);

        this.nonPartitionedStateBackend.initializeForJob(env, operatorIdentifier, keySerializer);

        RocksDB.loadLibrary(); //初始化rockdb

        List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1); //columnFamily的概念和HBase相同,放在独立的文件
        // RocksDB seems to need this...
        columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
        List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
        try {
            db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); //真正的open rocksDB
        } catch (RocksDBException e) {
            throw new RuntimeException("Error while opening RocksDB instance.", e);
        }
    }

 

snapshotPartitionedState

@Override
public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
    if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) {
        return new HashMap<>();
    }

    if (fullyAsyncBackup) {
        return performFullyAsyncSnapshot(checkpointId, timestamp);
    } else {
        return performSemiAsyncSnapshot(checkpointId, timestamp);
    }
}

 

snapshot分为全异步和半异步两种,

 

半异步,

/**
 * Performs a checkpoint by using the RocksDB backup feature to backup to a directory.
 * This backup is the asynchronously copied to the final checkpoint location.
 */
private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performSemiAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
    // We don't snapshot individual k/v states since everything is stored in a central
    // RocksDB data base. Create a dummy KvStateSnapshot that holds the information about
    // that checkpoint. We use the in injectKeyValueStateSnapshots to restore.

    final File localBackupPath = new File(instanceBasePath, "local-chk-" + checkpointId);
    final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);

    long startTime = System.currentTimeMillis();

    BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath());
    // we disabled the WAL
    backupOptions.setBackupLogFiles(false);
    // no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot
    backupOptions.setSync(false); //设为异步

    try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), backupOptions)) {
        // wait before flush with "true"
        backupEngine.createNewBackup(db, true); //利用rocksDB自己的backupEngine生成新的backup,存在本地磁盘
    }

    long endTime = System.currentTimeMillis(); //这部分是同步做的,需要计时看延时
    LOG.info("RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");

    // draw a copy in case it get's changed while performing the async snapshot
    List<StateDescriptor> kvStateInformationCopy = new ArrayList<>();
    for (Tuple2<ColumnFamilyHandle, StateDescriptor> state: kvStateInformation.values()) {
        kvStateInformationCopy.add(state.f1);
    }
    SemiAsyncSnapshot dummySnapshot = new SemiAsyncSnapshot(localBackupPath, //
            backupUri,
            kvStateInformationCopy,
            checkpointId);

    HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();
    result.put("dummy_state", dummySnapshot);
    return result;
}

 

SemiAsyncSnapshot.materialize

@Override
public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
    try {
        long startTime = System.currentTimeMillis();
        HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);  //从本地磁盘copy到hdfs
        long endTime = System.currentTimeMillis();
        LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
        return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);
    } catch (Exception e) {
        FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
        fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
        throw e;
    } finally {
        FileUtils.deleteQuietly(localBackupPath);
    }
}

 

全异步

/**
 * Performs a checkpoint by drawing a {@link org.rocksdb.Snapshot} from RocksDB and then
 * iterating over all key/value pairs in RocksDB to store them in the final checkpoint
 * location. The only synchronous part is the drawing of the {@code Snapshot} which
 * is essentially free.
 */
private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performFullyAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
    // we draw a snapshot from RocksDB then iterate over all keys at that point
    // and store them in the backup location

    final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);

    long startTime = System.currentTimeMillis();

    org.rocksdb.Snapshot snapshot = db.getSnapshot(); //生成snapshot,但不用落盘

    long endTime = System.currentTimeMillis();
    LOG.info("Fully asynchronous RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");

    // draw a copy in case it get's changed while performing the async snapshot
    Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamiliesCopy = new HashMap<>();
    columnFamiliesCopy.putAll(kvStateInformation);
    FullyAsyncSnapshot dummySnapshot = new FullyAsyncSnapshot(snapshot, //直接把snapshot传入
            this,
            backupUri,
            columnFamiliesCopy,
            checkpointId);

    HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();
    result.put("dummy_state", dummySnapshot);
    return result;
}

 

FullyAsyncSnapshot.materialize

可以看到需要自己去做db内容的序列化到文件的过程

@Override
public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
    try {
        long startTime = System.currentTimeMillis();

        CheckpointStateOutputView outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);

        outputView.writeInt(columnFamilies.size());

        // we don't know how many key/value pairs there are in each column family.
        // We prefix every written element with a byte that signifies to which
        // column family it belongs, this way we can restore the column families
        byte count = 0;
        Map<String, Byte> columnFamilyMapping = new HashMap<>();
        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
            columnFamilyMapping.put(column.getKey(), count);

            outputView.writeByte(count);

            ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
            ooOut.writeObject(column.getValue().f1);
            ooOut.flush();

            count++;
        }

        ReadOptions readOptions = new ReadOptions();
        readOptions.setSnapshot(snapshot);

        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
            byte columnByte = columnFamilyMapping.get(column.getKey());

            synchronized (dbCleanupLock) {
                if (db == null) {
                    throw new RuntimeException("RocksDB instance was disposed. This happens " +
                            "when we are in the middle of a checkpoint and the job fails.");
                }
                RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
                iterator.seekToFirst();
                while (iterator.isValid()) {
                    outputView.writeByte(columnByte);
                    BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
                            outputView);
                    BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
                            outputView);
                    iterator.next();
                }
            }
        }

        StateHandle<DataInputView> stateHandle = outputView.closeAndGetHandle();

        long endTime = System.currentTimeMillis();
        LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
        return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
    } finally {
        synchronized (dbCleanupLock) {
            if (db != null) {
                db.releaseSnapshot(snapshot);
            }
        }
        snapshot = null;
    }
}

 

CheckpointStateOutputView

backend.createCheckpointStateOutputView

public CheckpointStateOutputView createCheckpointStateOutputView(
        long checkpointID, long timestamp) throws Exception {
    return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
}

关键createCheckpointStateOutputStream

 

RocksDBStateBackend

@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
        long checkpointID, long timestamp) throws Exception {

    return nonPartitionedStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp);
}

 

看看nonPartitionedStateBackend是什么?

public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
    // creating the FsStateBackend automatically sanity checks the URI
    FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri);

    this.nonPartitionedStateBackend = fsStateBackend;
    this.checkpointDirectory = fsStateBackend.getBasePath();
}

其实就是FsStateBackend,最终rocksDB还是要用FsStateBackend来存储snapshot

 

restoreState

@Override
public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
    if (keyValueStateSnapshots.size() == 0) {
        return;
    }

    KvStateSnapshot dummyState = keyValueStateSnapshots.get("dummy_state");
    if (dummyState instanceof FinalSemiAsyncSnapshot) {
        restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot) dummyState);
    } else if (dummyState instanceof FinalFullyAsyncSnapshot) {
        restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot) dummyState);
    } else {
        throw new RuntimeException("Unknown RocksDB snapshot: " + dummyState);
    }
}

同样也分为两种,半异步和全异步,过程基本就是snapshot的逆过程

时间: 2024-09-23 03:13:11

Flink - RocksDBStateBackend的相关文章

Apache Flink fault tolerance源码剖析(五)

上一篇文章我们谈论了保存点的相关内容,其中就谈到了保存点状态的存储.这篇文章我们来探讨用户程序状态的存储,也是在之前的文章中多次提及的state backend(中文暂译为状态终端). 基于数据流API而编写的程序经常以各种各样的形式保存着状态: 窗口收集/聚合元素(这里的元素可以看作是窗口的状态)直到它们被触发 转换函数可能会使用key/value状态接口来存储数据 转换函数可能实现Checkpointed接口来让它们的本地变量受益于fault tolerant机制 当检查点机制工作时,上面谈

Apache Flink fault tolerance源码剖析完结篇

这篇文章是对Flinkfault tolerance的一个总结.虽然还有些细节没有涉及到,但是基本的实现要点在这个系列中都已提及. 回顾这个系列,每篇文章都至少涉及一个知识点.我们来挨个总结一下. 恢复机制实现 Flink中通常需要进行状态恢复的对象是operator以及function.它们通过不同的方式来达到状态快照以及状态恢复的能力.其中function通过实现Checkpointed的接口,而operator通过实现StreamOpeator接口.这两个接口的行为是类似的. 当然对于数据

Flink原理与实现:详解Flink中的状态管理

Flink原理与实现系列文章 : Flink 原理与实现:架构和拓扑概览Flink 原理与实现:如何生成 StreamGraphFlink 原理与实现:如何生成 JobGraphFlink原理与实现:如何生成ExecutionGraph及物理执行图Flink原理与实现:Operator Chain原理 上面Flink原理与实现的文章中,有引用word count的例子,但是都没有包含状态管理.也就是说,如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算.从

Flink - Working with State

All transformations in Flink may look like functions (in the functional processing terminology), but are in fact stateful operators.  You can make every transformation (map, filter, etc) stateful by using Flink's state interface or checkpointing inst

Flink运行时之生成作业图

生成作业图 在分析完了流处理程序生成的流图(StreamGraph)以及批处理程序生成的优化后的计划(OptimizedPlan)之后,下一步就是生成它们面向Flink运行时执行引擎的共同抽象--作业图(JobGraph). 什么是作业图 作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一. 相比流图(StreamGraph)以及批处理优化计划(OptimizedPlan),JobGraph发生了一些变化,已

阿里蒋晓伟谈流计算和批处理引擎Blink,以及Flink和Spark的异同与优势

首届阿里巴巴在线技术峰会(Alibaba Online Technology Summit),将于7月19日-21日 20:00-21:30 在线举办.本次峰会邀请到阿里集团9位技术大V,分享电商架构.安全.数据处理.数据库.多应用部署.互动技术.Docker持续交付与微服务等一线实战经验,解读最新技术在阿里集团的应用实践. 7月19日晚8点,阿里搜索事业部资深搜索专家蒋晓伟将在线分享<阿里流计算和批处理引擎Blink>,其基于Apache Flink项目并且在API和它上兼容,深度分享阿里为

Flink 原理与实现:内存管理

如今,大数据领域的开源框架(Hadoop,Spark,Storm)都使用的 JVM,当然也包括 Flink.基于 JVM 的数据分析引擎都需要面对将大量数据存到内存中,这就不得不面对 JVM 存在的几个问题: Java 对象存储密度低.一个只包含 boolean 属性的对象占用了16个字节内存:对象头占了8个,boolean 属性占了1个,对齐填充占了7个.而实际上只需要一个bit(1/8字节)就够了. Full GC 会极大地影响性能,尤其是为了处理更大数据而开了很大内存空间的JVM来说,GC

Flink关系型API的公共部分

关系型程序的公共部分 下面的代码段展示了Table&SQL API所编写流式程序的程序模式: val env = StreamExecutionEnvironment.getExecutionEnvironment //创建TableEnvironment对象 val tableEnv = TableEnvironment.getTableEnvironment(env) //注册表 tableEnv.registerTable("table1", ...) //或者 tabl

Apache Flink源码解析之stream-window

window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析.本篇的内容主要集中在package org.apache.flink.streaming.api.windowing下. Window 一个Window代表有限对象的集合.一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点--所有应该进入这个窗口的元素都已经到达. Flink的根窗口对象是一个抽象类,只提供了一个抽象方法: public abstract long maxTimes