打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Flink - RocksDBStateBackend(好)

Flink - RocksDBStateBackend

https://www.cnblogs.com/fxjwind/p/6114236.html

如果要考虑易用性和效率,使用rocksDB来替代普通内存的kv是有必要的

有了rocksdb,可以range查询,可以支持columnfamily,可以各种压缩

但是rocksdb本身是一个库,是跑在RocksDBStateBackend中的

所以taskmanager挂掉后,数据还是没了,

所以RocksDBStateBackend仍然需要类似HDFS这样的分布式存储来存储snapshot

kv state需要由rockdb来管理,这是和内存或file backend最大的不同

AbstractRocksDBState

/** * Base class for {@link State} implementations that store state in a RocksDB database. * * <p>State is not stored in this class but in the {@link org.rocksdb.RocksDB} instance that * the {@link RocksDBStateBackend} manages and checkpoints. * * @param <K> The type of the key. * @param <N> The type of the namespace. * @param <S> The type of {@link State}. * @param <SD> The type of {@link StateDescriptor}. */public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>>        implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
    /** Serializer for the namespace */    private final TypeSerializer<N> namespaceSerializer;    /** The current namespace, which the next value methods will refer to */    private N currentNamespace;    /** Backend that holds the actual RocksDB instance where we store state */    protected RocksDBStateBackend backend;    /** The column family of this particular instance of state */    protected ColumnFamilyHandle columnFamily;    /**     * We disable writes to the write-ahead-log here.     */    private final WriteOptions writeOptions;    /**     * Creates a new RocksDB backed state.     *     * @param namespaceSerializer The serializer for the namespace.     */    protected AbstractRocksDBState(ColumnFamilyHandle columnFamily,            TypeSerializer<N> namespaceSerializer,            RocksDBStateBackend backend) {        this.namespaceSerializer = namespaceSerializer;        this.backend = backend;        this.columnFamily = columnFamily;        writeOptions = new WriteOptions();        writeOptions.setDisableWAL(true);    }    @Override    public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(long checkpointId,            long timestamp) throws Exception {        throw new RuntimeException("Should not be called. Backups happen in RocksDBStateBackend.");    }}

RocksDBValueState

/** * {@link ValueState} implementation that stores state in RocksDB. * * @param <K> The type of the key. * @param <N> The type of the namespace. * @param <V> The type of value that the state state stores. */public class RocksDBValueState<K, N, V>    extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>>    implements ValueState<V> {    @Override    public V value() {        ByteArrayOutputStream baos = new ByteArrayOutputStream();        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);        try {            writeKeyAndNamespace(out);            byte[] key = baos.toByteArray();            byte[] valueBytes = backend.db.get(columnFamily, key); //从db读出value            if (valueBytes == null) {                return stateDesc.getDefaultValue();            }            return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));        } catch (IOException|RocksDBException e) {            throw new RuntimeException("Error while retrieving data from RocksDB.", e);        }    }    @Override    public void update(V value) throws IOException {        if (value == null) {            clear();            return;        }        ByteArrayOutputStream baos = new ByteArrayOutputStream();        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);        try {            writeKeyAndNamespace(out);            byte[] key = baos.toByteArray();            baos.reset();            valueSerializer.serialize(value, out);            backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); //将kv写入db        } catch (Exception e) {            throw new RuntimeException("Error while adding data to RocksDB", e);        }    }}

因为对于kv state,key就是当前收到数据的key,所以key是直接从backend.currentKey()中读到;参考,Flink - Working with State

RocksDBStateBackend

初始化过程,

/** * A {@link StateBackend} that stores its state in {@code RocksDB}. This state backend can * store very large state that exceeds memory and spills to disk. *  * <p>All key/value state (including windows) is stored in the key/value index of RocksDB. * For persistence against loss of machines, checkpoints take a snapshot of the * RocksDB database, and persist that snapshot in a file system (by default) or * another configurable state backend. *  * <p>The behavior of the RocksDB instances can be parametrized by setting RocksDB Options * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and * {@link #setOptions(OptionsFactory)}. */public class RocksDBStateBackend extends AbstractStateBackend {    // ------------------------------------------------------------------------    //  Static configuration values    // ------------------------------------------------------------------------        /** The checkpoint directory that we copy the RocksDB backups to. */    private final Path checkpointDirectory;    /** The state backend that stores the non-partitioned state */    private final AbstractStateBackend nonPartitionedStateBackend;    /**     * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}     * to store state. The different k/v states that we have don't each have their own RocksDB     * instance. They all write to this instance but to their own column family.     */    protected volatile transient RocksDB db; //RocksDB实例    /**     * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the     * file system and location defined by the given URI.     *      * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system     * host and port in the URI, or have the Hadoop configuration that describes the file system     * (host / high-availability group / possibly credentials) either referenced from the Flink     * config, or included in the classpath.     *     * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.     * @throws IOException Thrown, if no file system can be found for the scheme in the URI.     */    public RocksDBStateBackend(String checkpointDataUri) throws IOException {        this(new Path(checkpointDataUri).toUri());    }    /**     * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the     * file system and location defined by the given URI.     *     * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system     * host and port in the URI, or have the Hadoop configuration that describes the file system     * (host / high-availability group / possibly credentials) either referenced from the Flink     * config, or included in the classpath.     *     * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.     * @throws IOException Thrown, if no file system can be found for the scheme in the URI.     */    public RocksDBStateBackend(URI checkpointDataUri) throws IOException {        // creating the FsStateBackend automatically sanity checks the URI        FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri); //仍然使用FsStateBackend来存snapshot                this.nonPartitionedStateBackend = fsStateBackend;        this.checkpointDirectory = fsStateBackend.getBasePath();    }        // ------------------------------------------------------------------------    //  State backend methods    // ------------------------------------------------------------------------        @Override    public void initializeForJob(            Environment env,             String operatorIdentifier,            TypeSerializer<?> keySerializer) throws Exception {                super.initializeForJob(env, operatorIdentifier, keySerializer);        this.nonPartitionedStateBackend.initializeForJob(env, operatorIdentifier, keySerializer);        RocksDB.loadLibrary(); //初始化rockdb        List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1); //columnFamily的概念和HBase相同,放在独立的文件        // RocksDB seems to need this...        columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));        List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);        try {            db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); //真正的open rocksDB        } catch (RocksDBException e) {            throw new RuntimeException("Error while opening RocksDB instance.", e);        }    }

snapshotPartitionedState

@Overridepublic HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {    if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) {        return new HashMap<>();    }    if (fullyAsyncBackup) {        return performFullyAsyncSnapshot(checkpointId, timestamp);    } else {        return performSemiAsyncSnapshot(checkpointId, timestamp);    }}

snapshot分为全异步和半异步两种,

半异步,

/** * Performs a checkpoint by using the RocksDB backup feature to backup to a directory. * This backup is the asynchronously copied to the final checkpoint location. */private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performSemiAsyncSnapshot(long checkpointId, long timestamp) throws Exception {    // We don't snapshot individual k/v states since everything is stored in a central    // RocksDB data base. Create a dummy KvStateSnapshot that holds the information about    // that checkpoint. We use the in injectKeyValueStateSnapshots to restore.    final File localBackupPath = new File(instanceBasePath, "local-chk-" + checkpointId);    final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);    long startTime = System.currentTimeMillis();    BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath());    // we disabled the WAL    backupOptions.setBackupLogFiles(false);    // no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot    backupOptions.setSync(false); //设为异步    try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), backupOptions)) {        // wait before flush with "true"        backupEngine.createNewBackup(db, true); //利用rocksDB自己的backupEngine生成新的backup,存在本地磁盘    }    long endTime = System.currentTimeMillis(); //这部分是同步做的,需要计时看延时    LOG.info("RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");    // draw a copy in case it get's changed while performing the async snapshot    List<StateDescriptor> kvStateInformationCopy = new ArrayList<>();    for (Tuple2<ColumnFamilyHandle, StateDescriptor> state: kvStateInformation.values()) {        kvStateInformationCopy.add(state.f1);    }    SemiAsyncSnapshot dummySnapshot = new SemiAsyncSnapshot(localBackupPath, //            backupUri,            kvStateInformationCopy,            checkpointId);    HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();    result.put("dummy_state", dummySnapshot);    return result;}

SemiAsyncSnapshot.materialize

@Overridepublic KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {    try {        long startTime = System.currentTimeMillis();        HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);  //从本地磁盘copy到hdfs        long endTime = System.currentTimeMillis();        LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");        return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);    } catch (Exception e) {        FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());        fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);        throw e;    } finally {        FileUtils.deleteQuietly(localBackupPath);    }}

全异步

/** * Performs a checkpoint by drawing a {@link org.rocksdb.Snapshot} from RocksDB and then * iterating over all key/value pairs in RocksDB to store them in the final checkpoint * location. The only synchronous part is the drawing of the {@code Snapshot} which * is essentially free. */private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performFullyAsyncSnapshot(long checkpointId, long timestamp) throws Exception {    // we draw a snapshot from RocksDB then iterate over all keys at that point    // and store them in the backup location    final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);    long startTime = System.currentTimeMillis();    org.rocksdb.Snapshot snapshot = db.getSnapshot(); //生成snapshot,但不用落盘    long endTime = System.currentTimeMillis();    LOG.info("Fully asynchronous RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");    // draw a copy in case it get's changed while performing the async snapshot    Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamiliesCopy = new HashMap<>();    columnFamiliesCopy.putAll(kvStateInformation);    FullyAsyncSnapshot dummySnapshot = new FullyAsyncSnapshot(snapshot, //直接把snapshot传入            this,            backupUri,            columnFamiliesCopy,            checkpointId);    HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();    result.put("dummy_state", dummySnapshot);    return result;}

FullyAsyncSnapshot.materialize

可以看到需要自己去做db内容的序列化到文件的过程

@Overridepublic KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {    try {        long startTime = System.currentTimeMillis();        CheckpointStateOutputView outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);        outputView.writeInt(columnFamilies.size());        // we don't know how many key/value pairs there are in each column family.        // We prefix every written element with a byte that signifies to which        // column family it belongs, this way we can restore the column families        byte count = 0;        Map<String, Byte> columnFamilyMapping = new HashMap<>();        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {            columnFamilyMapping.put(column.getKey(), count);            outputView.writeByte(count);            ObjectOutputStream ooOut = new ObjectOutputStream(outputView);            ooOut.writeObject(column.getValue().f1);            ooOut.flush();            count++;        }        ReadOptions readOptions = new ReadOptions();        readOptions.setSnapshot(snapshot);        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {            byte columnByte = columnFamilyMapping.get(column.getKey());            synchronized (dbCleanupLock) {                if (db == null) {                    throw new RuntimeException("RocksDB instance was disposed. This happens " +                            "when we are in the middle of a checkpoint and the job fails.");                }                RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);                iterator.seekToFirst();                while (iterator.isValid()) {                    outputView.writeByte(columnByte);                    BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),                            outputView);                    BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),                            outputView);                    iterator.next();                }            }        }        StateHandle<DataInputView> stateHandle = outputView.closeAndGetHandle();        long endTime = System.currentTimeMillis();        LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");        return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);    } finally {        synchronized (dbCleanupLock) {            if (db != null) {                db.releaseSnapshot(snapshot);            }        }        snapshot = null;    }}

CheckpointStateOutputView

backend.createCheckpointStateOutputView

public CheckpointStateOutputView createCheckpointStateOutputView(        long checkpointID, long timestamp) throws Exception {    return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));}

关键createCheckpointStateOutputStream

RocksDBStateBackend

@Overridepublic CheckpointStateOutputStream createCheckpointStateOutputStream(        long checkpointID, long timestamp) throws Exception {        return nonPartitionedStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp);}

看看nonPartitionedStateBackend是什么?

public RocksDBStateBackend(URI checkpointDataUri) throws IOException {    // creating the FsStateBackend automatically sanity checks the URI    FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri);        this.nonPartitionedStateBackend = fsStateBackend;    this.checkpointDirectory = fsStateBackend.getBasePath();}

其实就是FsStateBackend,最终rocksDB还是要用FsStateBackend来存储snapshot

restoreState

@Overridepublic final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {    if (keyValueStateSnapshots.size() == 0) {        return;    }    KvStateSnapshot dummyState = keyValueStateSnapshots.get("dummy_state");    if (dummyState instanceof FinalSemiAsyncSnapshot) {        restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot) dummyState);    } else if (dummyState instanceof FinalFullyAsyncSnapshot) {        restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot) dummyState);    } else {        throw new RuntimeException("Unknown RocksDB snapshot: " + dummyState);    }}

同样也分为两种,半异步和全异步,过程基本就是snapshot的逆过程

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Flink的状态后端(State Backends)
小红书如何实现高效推荐?解密背后的大数据计算平台架构
Rocksdb源码剖析一----Rocksdb概述与基本组件 - 逆风飞扬 - CSDN博客
(玩命snapshot)甘正噶黑絲,你扯左未?
蚂蚁金服生产级 Raft 算法库 SOFAJRaft 存储模块剖析 | SOFAJRaft 实现原理...
翻硕考研常见词汇(一)
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服