Hadoop-2.6.0NodeManager Restart Recover实现分析(二)

      继上篇《Hadoop-2.6.0NodeManager Restart Recover实现分析(二)》。

      4、NMLeveldbStateStoreService实现分析

      在

      1)、initStorage()

      initStorage()方法中,完成了存储相关的初始化,如下:

  @Override
  protected void initStorage(Configuration conf)
      throws IOException {
    Path storeRoot = createStorageDir(conf);
    Options options = new Options();
    options.createIfMissing(false);
    options.logger(new LeveldbLogger());
    LOG.info("Using state database at " + storeRoot + " for recovery");
    File dbfile = new File(storeRoot.toString());
    try {
      db = JniDBFactory.factory.open(dbfile, options);
    } catch (NativeDB.DBException e) {
      if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
        LOG.info("Creating state database at " + dbfile);
        isNewlyCreated = true;
        options.createIfMissing(true);
        try {
          db = JniDBFactory.factory.open(dbfile, options);
          // store version
          storeVersion();
        } catch (DBException dbErr) {
          throw new IOException(dbErr.getMessage(), dbErr);
        }
      } else {
        throw e;
      }
    }
    checkVersion();
  }

      最主要的是通过JniDBFactory.factory.open(dbfile, options)打开了一个db实例。

      2)startStorage()

      为空。

      3)closeStorage()

      关闭db,如下:

  @Override
  protected void closeStorage() throws IOException {
    if (db != null) {
      db.close();
    }
  }

      下面,以Applications为例,罗列下实现方式:

      1)storeApplication()

      存储Application,如下:

  @Override
  public void storeApplication(ApplicationId appId,
      ContainerManagerApplicationProto p) throws IOException {
    String key = APPLICATIONS_KEY_PREFIX + appId;
    try {
      db.put(bytes(key), p.toByteArray());
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

      key为ContainerManager/applications/再加appId,value为ContainerManagerApplicationProto,消息内容如下:

message ContainerManagerApplicationProto {
  optional ApplicationIdProto id = 1;
  optional string user = 2;
  optional bytes credentials = 3;
  repeated ApplicationACLMapProto acls = 4;
  optional LogAggregationContextProto log_aggregation_context = 5;
}

      2)storeFinishedApplication()

      存储已完成Application,如下:

  @Override
  public void storeFinishedApplication(ApplicationId appId)
      throws IOException {
    String key = FINISHED_APPS_KEY_PREFIX + appId;
    try {
      db.put(bytes(key), new byte[0]);
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

      其中,key为ContainerManager/finishedApps/再加appId,value为空,即new byte[0]。
      3)removeApplication()

      删除Application,如下:

  @Override
  public void removeApplication(ApplicationId appId)
      throws IOException {
    try {
      WriteBatch batch = db.createWriteBatch();
      try {
        String key = APPLICATIONS_KEY_PREFIX + appId;
        batch.delete(bytes(key));
        key = FINISHED_APPS_KEY_PREFIX + appId;
        batch.delete(bytes(key));
        db.write(batch);
      } finally {
        batch.close();
      }
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

      根据各自的key分别删除storeApplication()和storeFinishedApplication()存储的Application信息,

      4)loadApplicationsState()
      加载所有的Application状态,以便进行recovery。如下:

  @Override
  public RecoveredApplicationsState loadApplicationsState()
      throws IOException {
    RecoveredApplicationsState state = new RecoveredApplicationsState();
    state.applications = new ArrayList<ContainerManagerApplicationProto>();
    String keyPrefix = APPLICATIONS_KEY_PREFIX;
    LeveldbIterator iter = null;
    try {
      iter = new LeveldbIterator(db);
      iter.seek(bytes(keyPrefix));
      while (iter.hasNext()) {
        Entry<byte[], byte[]> entry = iter.next();
        String key = asString(entry.getKey());
        if (!key.startsWith(keyPrefix)) {
          break;
        }
        state.applications.add(
            ContainerManagerApplicationProto.parseFrom(entry.getValue()));
      }

      state.finishedApplications = new ArrayList<ApplicationId>();
      keyPrefix = FINISHED_APPS_KEY_PREFIX;
      iter.seek(bytes(keyPrefix));
      while (iter.hasNext()) {
        Entry<byte[], byte[]> entry = iter.next();
        String key = asString(entry.getKey());
        if (!key.startsWith(keyPrefix)) {
          break;
        }
        ApplicationId appId =
            ConverterUtils.toApplicationId(key.substring(keyPrefix.length()));
        state.finishedApplications.add(appId);
      }
    } catch (DBException e) {
      throw new IOException(e);
    } finally {
      if (iter != null) {
        iter.close();
      }
    }

    return state;
  }
      通过LeveldbIterator和key的前缀ContainerManager/applications/进行load。
      后续会分析哪些地方调用了上述方法,未完待续!

      

时间: 2024-10-26 09:38:10

Hadoop-2.6.0NodeManager Restart Recover实现分析(二)的相关文章

Hadoop-2.6.0NodeManager Restart Recover实现分析(一)

一.简介       This document gives an overview of NodeManager (NM) restart, a feature that enables the NodeManager to be restarted without losing the active containers running on the node. At a high level, the NM stores any necessary state to a local sta

一些有用的javascript实例分析(二)

原文:一些有用的javascript实例分析(二) 1 5 求出数组中所有数字的和 2 window.onload = function () 3 { 4 var oBtn = document.getElementsByTagName("button")[0]; 5 var oInput = document.getElementsByTagName("input")[0] 6 var oStrong = document.getElementsByTagName

Oracle数据库安全策略分析(二)_oracle

正在看的ORACLE教程是:Oracle数据库安全策略分析(二). SQL*DBA命令的安全性: 如果您没有SQL*PLUS应用程序,您也可以使用SQL*DBA作SQL查权限相关的命令只能分配给Oracle软件拥有者和DBA组的用户,因为这些命令被授予了特殊的系统权限. (1) startup (2) shutdown (3) connect internal 数据库文件的安全性: Oracle软件的拥有者应该这些数据库文件($ORACLE_HOME/dbs/*.dbf)设置这些文件的使用权限为

Hadoop RPC通信Client客户端的流程分析

Hadoop的RPC的通信与其他系统的RPC通信不太一样,作者针对Hadoop的使用特点,专门的设计了一套RPC框架,这套框架个人感觉还是有点小复杂的.所以我打算分成Client客户端和Server服务端2个模块做分析.如果你对RPC的整套流程已经非常了解的前提下,对于Hadoop的RPC,你也一定可以非常迅速的了解的.OK,下面切入正题. Hadoop的RPC的相关代码都在org.apache.hadoop.ipc的包下,首先RPC的通信必须遵守许多的协议,其中最最基本的协议即使如下: /**

Hadoop连载系列之六:数据收集分析系统Chukwa

系列几篇文章中介绍了分布式存储和计算系统Hadoop以及Hadoop集群的搭建.Zookeeper集群搭建.HBase分布式部署等.当Hadoop集群的数量达到1000+时,集群自身的信息将会大量增加.Apache开发出一个开源的数据收集和分析系统-Chukwa来处理Hadoop集群的数据.Chukwa有几个非常吸引人的特点:它架构清晰,部署简单;收集的数据类型广泛,具有很强的扩展性;与 Hadoop 无缝集成,能完成海量数据的收集与整理. 1 Chukwa简介 ----------------

Hadoop周边生态软件和简要工作原理(二)

Sqoop: sqoop在hadoop生态系统中也是应用率比较高的软件,主要是用来做ETL工具,由yadoo研发并提交给http://www.aliyun.com/zixun/aggregation/14417.html">Apache.Hadoop整个生态圈里面,大部分的应用都是Yadoo研发的,贡献非常大.Yahoo里面出来两拨人,分别组建了Cloudera和Hortonworks. 所谓ETL,就是数据的抽取(extract)加载(load)转换(transform).将一种格式或表

PostgreSQL 空间独立事件相关性分析 二 - 人车拟合

标签 PostgreSQL , PostGIS , 人车拟合 背景 独立事件相关性分析是一件很有意思的事情,例如 探探软件的擦肩而过功能点,在不同时空与你擦肩而过的人. 舆情分析. 商品最佳销售组合. 安全系统中的人车拟合,对时空数据进行处理,用于司机.乘客.车辆的拟合. 人车拟合 1.建立表结构 人 create table u_pos ( id int8, uid int8, crt_time timestamp, pos geometry ); 车 create table c_pos (

驳“麦包包的简单SEO分析”二种观点

A5首页搜索专业里有一篇文章"管中窥豹:麦包包的简单SEO分析",其实昨天就看到了这篇文章,只是仅仅是一扫而过,无细读不成方圆啊,今天好好看了下这篇文章,发现除了第一点品牌和第四点流量之外,其他二点笔者都不是挺赞成的,所以才有了这篇文章"驳麦包包的简单SEO分析三种观点",多多交流才是一切吧. 首先呢,关于收录 原图:   现图:   从两张图中看到收录的差异性很大,可惜原作者并未截取百度权重流量估值,不然可以更好的对比一下了,其实昨天14号时候,百度的收录和反链一

SEO成功案例分析二:买购网

前一篇文章给大家分析了博宝商城的优化案例,有兴趣的可以去看一下,今天给大家带来的案例是一个导购网站:爱购网.首先声明一下,这里的分析不涉及网站权重,因为一个网站的权重不是短时间内可以提高的,需要时间的积累.而其他的方面基本上我们还是可以短时间内做到的,比如说内容.结构等.首先给大家把网站的网址贴出来,下面是通过爱站网得到的一些基本数据   然后我们以"化妆品排行版"为例给大家分析下爱购网的用兵之道.先把要分析的网址贴出来http://www.maigoo.com/maigoo/017h