继上篇《Hadoop-2.6.0NodeManager Restart Recover实现分析(二)》。
4、NMLeveldbStateStoreService实现分析
在
1)、initStorage()
initStorage()方法中,完成了存储相关的初始化,如下:
@Override protected void initStorage(Configuration conf) throws IOException { Path storeRoot = createStorageDir(conf); Options options = new Options(); options.createIfMissing(false); options.logger(new LeveldbLogger()); LOG.info("Using state database at " + storeRoot + " for recovery"); File dbfile = new File(storeRoot.toString()); try { db = JniDBFactory.factory.open(dbfile, options); } catch (NativeDB.DBException e) { if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { LOG.info("Creating state database at " + dbfile); isNewlyCreated = true; options.createIfMissing(true); try { db = JniDBFactory.factory.open(dbfile, options); // store version storeVersion(); } catch (DBException dbErr) { throw new IOException(dbErr.getMessage(), dbErr); } } else { throw e; } } checkVersion(); }
最主要的是通过JniDBFactory.factory.open(dbfile, options)打开了一个db实例。
2)startStorage()
为空。
3)closeStorage()
关闭db,如下:
@Override protected void closeStorage() throws IOException { if (db != null) { db.close(); } }
下面,以Applications为例,罗列下实现方式:
1)storeApplication()
存储Application,如下:
@Override public void storeApplication(ApplicationId appId, ContainerManagerApplicationProto p) throws IOException { String key = APPLICATIONS_KEY_PREFIX + appId; try { db.put(bytes(key), p.toByteArray()); } catch (DBException e) { throw new IOException(e); } }
key为ContainerManager/applications/再加appId,value为ContainerManagerApplicationProto,消息内容如下:
message ContainerManagerApplicationProto { optional ApplicationIdProto id = 1; optional string user = 2; optional bytes credentials = 3; repeated ApplicationACLMapProto acls = 4; optional LogAggregationContextProto log_aggregation_context = 5; }
2)storeFinishedApplication()
存储已完成Application,如下:
@Override public void storeFinishedApplication(ApplicationId appId) throws IOException { String key = FINISHED_APPS_KEY_PREFIX + appId; try { db.put(bytes(key), new byte[0]); } catch (DBException e) { throw new IOException(e); } }
其中,key为ContainerManager/finishedApps/再加appId,value为空,即new byte[0]。
3)removeApplication()
删除Application,如下:
@Override public void removeApplication(ApplicationId appId) throws IOException { try { WriteBatch batch = db.createWriteBatch(); try { String key = APPLICATIONS_KEY_PREFIX + appId; batch.delete(bytes(key)); key = FINISHED_APPS_KEY_PREFIX + appId; batch.delete(bytes(key)); db.write(batch); } finally { batch.close(); } } catch (DBException e) { throw new IOException(e); } }
根据各自的key分别删除storeApplication()和storeFinishedApplication()存储的Application信息,
4)loadApplicationsState()
加载所有的Application状态,以便进行recovery。如下:
@Override public RecoveredApplicationsState loadApplicationsState() throws IOException { RecoveredApplicationsState state = new RecoveredApplicationsState(); state.applications = new ArrayList<ContainerManagerApplicationProto>(); String keyPrefix = APPLICATIONS_KEY_PREFIX; LeveldbIterator iter = null; try { iter = new LeveldbIterator(db); iter.seek(bytes(keyPrefix)); while (iter.hasNext()) { Entry<byte[], byte[]> entry = iter.next(); String key = asString(entry.getKey()); if (!key.startsWith(keyPrefix)) { break; } state.applications.add( ContainerManagerApplicationProto.parseFrom(entry.getValue())); } state.finishedApplications = new ArrayList<ApplicationId>(); keyPrefix = FINISHED_APPS_KEY_PREFIX; iter.seek(bytes(keyPrefix)); while (iter.hasNext()) { Entry<byte[], byte[]> entry = iter.next(); String key = asString(entry.getKey()); if (!key.startsWith(keyPrefix)) { break; } ApplicationId appId = ConverterUtils.toApplicationId(key.substring(keyPrefix.length())); state.finishedApplications.add(appId); } } catch (DBException e) { throw new IOException(e); } finally { if (iter != null) { iter.close(); } } return state; }
通过LeveldbIterator和key的前缀ContainerManager/applications/进行load。
后续会分析哪些地方调用了上述方法,未完待续!