Storm-源码分析-LocalState (backtype.storm.utils)

LocalState

A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk.

基于map实现, 每次读写都需要从磁盘上将数据读出, 并反序列化成map, 这个过程称为snapshot. 所以说是比较简单和低效的, 只能用于读取配置或参数, 这种偶尔读取的场景.

    public synchronized Map<Object, Object> snapshot() throws IOException {
        int attempts = 0;
        while(true) {
            String latestPath = _vs.mostRecentVersionPath();
            if(latestPath==null) return new HashMap<Object, Object>();
            try {
                return (Map<Object, Object>) Utils.deserialize(FileUtils.readFileToByteArray(new File(latestPath)));
            } catch(IOException e) {
                attempts++;
                if(attempts >= 10) {
                    throw e;
                }
            }
        }

读写操作都是基于map的操作, get和put, 但是put需要做persist操作. 
这里使用synchronized来做对象的线程间同步, 对于一个LocalState对象, 所有synchronized标有的函数只能被串行操作.

    public Object get(Object key) throws IOException {
        return snapshot().get(key);
    }
    public synchronized void put(Object key, Object val, boolean cleanup) throws IOException {
        Map<Object, Object> curr = snapshot();
        curr.put(key, val);
        persist(curr, cleanup);
    }

当然不止这么简单, 为了达到atomic, 还使用了VersionedStore, 参考下一章 
persist不会去update现有的文件, 而是不断的产生递增version的文件, 故每一批更新都会产生一个新的文件

把需要写入的数据序列化 
创建新的versionfile的path 
把数据写入versionfile 
调用succeedVersion, 创建tokenfile以标志versionfile的写入完成 
清除旧版本, 只保留4个版本

    private void persist(Map<Object, Object> val, boolean cleanup) throws IOException {
        byte[] toWrite = Utils.serialize(val);
        String newPath = _vs.createVersion();
        FileUtils.writeByteArrayToFile(new File(newPath), toWrite);
        _vs.succeedVersion(newPath);
        if(cleanup) _vs.cleanup(4);
    }

 

VersionedStore

    public VersionedStore(String path) throws IOException {
      _root = path;
      mkdirs(_root);
    }

这个store, 其实就是_root目录下的一堆文件 
文件分两种, 
VersionFile, _root + version, 真正的数据存储文件 
TokenFile, _root + version + “.version”, 标志位文件, 标志version文件是否完成写操作, 以避免读到正在更新的文件 

getAllVersions就是读出所有_root目录下的所有完成写操作的文件, 读出version, 并做从大到小的排序

    public List<Long> getAllVersions() throws IOException {
        List<Long> ret = new ArrayList<Long>();
        for(String s: listDir(_root)) {
            if(s.endsWith(FINISHED_VERSION_SUFFIX)) {
                ret.add(validateAndGetVersion(s));
            }
        }
        Collections.sort(ret);
        Collections.reverse(ret);
        return ret;
    }

 

找到最新的版本文件

    public Long mostRecentVersion() throws IOException {
        List<Long> all = getAllVersions();
        if(all.size()==0) return null;
        return all.get(0);

 

创建新版本号, 用当前时间作为version

    public String createVersion() throws IOException {
        Long mostRecent = mostRecentVersion();
        long version = Time.currentTimeMillis();
        if(mostRecent!=null && version <= mostRecent) {
            version = mostRecent + 1;
        }
        return createVersion(version);
    }

    public String createVersion(long version) throws IOException {
        String ret = versionPath(version);
        if(getAllVersions().contains(version))
            throw new RuntimeException("Version already exists or data already exists");
        else
            return ret;
    }
 

创建tokenfile, 以标记versionfile写完成

    public void succeedVersion(String path) throws IOException {
        long version = validateAndGetVersion(path);
        // should rewrite this to do a file move
        createNewFile(tokenPath(version));
    }
 

清除旧的版本, 只保留versionsToKeep个, 清除操作就是删除versionfile和tokenfile

    public void cleanup(int versionsToKeep) throws IOException {
        List<Long> versions = getAllVersions();
        if(versionsToKeep >= 0) {
            versions = versions.subList(0, Math.min(versions.size(), versionsToKeep));
        }
        HashSet<Long> keepers = new HashSet<Long>(versions);

        for(String p: listDir(_root)) {
            Long v = parseVersion(p);
            if(v!=null && !keepers.contains(v)) {
                deleteVersion(v);
            }
        }
    }

本文章摘自博客园,原文发布日期:2013-07-11
时间: 2024-10-21 23:48:56

Storm-源码分析-LocalState (backtype.storm.utils)的相关文章

Storm-源码分析- Scheduler (backtype.storm.scheduler)

首先看看IScheduler接口的定义, 主要实现两个接口, prepare和schedule 对于schedule的参数注释写的非常清楚,  topologies包含所有topology的静态信息, 而cluster中包含了topology的运行态信息  根据他们就可以来判断如何assignment package backtype.storm.scheduler; import java.util.Map; public interface IScheduler { void prepare

Storm-源码分析- spout (backtype.storm.spout)

1. ISpout接口 ISpout作为实现spout的核心interface, spout负责feeding message, 并且track这些message.  如果需要Spout track发出的message, 必须给出message-id, 这个message-id可以是任意类型, 但是如果不指定或将message-id置空, storm就不会track这个message 必须要注意的是, spout线程会在一个线程中调用ack, fail, nextTuple, 所以不用考虑互斥,

Storm-源码分析-EventManager (backtype.storm.event)

大体结构, 定义protocol EventManager, 其实就是定义interface 函数event-manager, 主要做2件事  1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行  2. 返回实现EventManager的匿名record(reify部分, 实现protocol) 这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner (ns backtype.storm

Storm-源码分析-acker (backtype.storm.daemon.acker)

backtype.storm.daemon.acker  设计的巧妙在于, 不用分别记录和track, stream过程中所有的tuple, 而只需要track root tuple, 而所有中间过程都通过异或更新track entry acker-init, 在spout发送一个tuple时触发, 初始化这个root tuple的track entry   acker-ack, 在blot ack一个tuple的时候触发, 会对该tuple的anchors-to-ids中记录的每个(root,

Storm-源码分析- hook (backtype.storm.hooks)

task hook 在某些task事件发生时, 如果用户希望执行一些额外的逻辑, 就需要使用hook 当前定义如下事件, emit, cleanup, spoutAck-- 用户只需要开发实现ITaskHook的类, 并将类名配置到(storm-conf TOPOLOGY-AUTO-TASK-HOOKS) 系统会在这些事件发生时, 自动调用所有注册的hook中的相应的functions   public interface ITaskHook { void prepare(Map conf, T

Storm-源码分析- Messaging (backtype.storm.messaging)

先定义两个接口和一个类  TaskMessage类本身比较好理解, 抽象storm的message格式  对于IContext, 注释也说了, 定义messaging plugin, 通过什么渠道去发送message, storm这里设计成可替换的  默认定义storm实现了local和ZMQ两种plugin, 当然你可以实现更多的  local应该是用于local mode, 而ZMQ用于distributed mode IContext接口主要是用于创建IConnection, 体现对soc

Storm-源码分析- bolt (backtype.storm.task)

Bolt关键的接口为execute,  Tuple的真正处理逻辑, 通过OutputCollector.emit发出新的tuples, 调用ack或fail处理的tuple /** * An IBolt represents a component that takes tuples as input and produces tuples * as output. An IBolt can do everything from filtering to joining to functions

Storm-源码分析-Stats (backtype.storm.stats)

会发现, 现在storm里面有两套metrics系统, metrics framework和stats framework 并且在所有地方都是同时注册两套, 貌似准备用metrics来替代stats, 但当前版本UI仍然使用stats   这个模块统计的数据怎么被使用, 1. 在worker中, 会定期调用do-executor-heartbeats去往zk同步hb  可以看到, stats也会作为hb的一部分被同步到zk上 (defnk do-executor-heartbeats [work

Storm-源码分析- timer (backtype.storm.timer)

mk-timer timer是基于PriorityQueue实现的(和PriorityBlockingQueue区别, 在于没有阻塞机制, 不是线程安全的), 优先级队列是堆数据结构的典型应用  默认情况下, 按照自然顺序(其实就是默认comparator的定义), 最小的元素排在堆头  当然也可以自己重新实现comparator接口, 比如timer就用reify重新实现了comparator接口 整个过程其实比较简单, 开个timer-thread, 不断check PriorityQueu