客户端通过RPC协议ClientRMProtocol提交Application,其实是提交了一个SubmitApplicationRequest,在Hadoop 1.0时代,是使用Writable作为序列化和反序列化框架的,而在2.0中hadoop已经废弃掉了,改用了ProtocolBuffer,它除了支持多种语言外最大的好处是向后兼容性,这样不同版本的AM,Client,RM和NM之间能相互通信。
比如对于SubmitApplicationRequest, 源代码只有PB的实现SubmitApplicationRequestPBImpl,当然用户也可以实现其他的序列化反序列化框架的实现,包括Writable实现
SubmitApplicationRequest中会封装提交上下文信息ApplicationSubmissionContext. ClientRMProtocol在RM的实现类是ClientRMService, RM在启动的时候会初始化这个Service,该Service负责端口监听RPC请求(由yarn.resourcemanager.address设置)
服务端根据提交上下文信息构造RMAppManagerSubmitEvent对象,交由rmAppManager来处理,RMAppManager在RM中的作用是管理所有的Application,比如提交/完成application,恢复一组applications等等。
ClientRMService的submitApplication方法:
public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnRemoteException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); String user = submissionContext.getUser(); user = UserGroupInformation.getCurrentUser().getShortUserName(); if (rmContext.getRMApps().get(applicationId) != null) { throw new IOException("Application with id " + applicationId + " is already present! Cannot add a duplicate!"); } submissionContext.setUser(user); // 将Application信息提交给rmAppManager来启动ApplicationMaster rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System .currentTimeMillis())); // If recovery is enabled then store the application information in a // blocking call so make sure that RM has stored the information needed // to restart the AM after RM restart without further client communication RMStateStore stateStore = rmContext.getStateStore(); LOG.info("Storing Application with id " + applicationId); try { stateStore.storeApplication(rmContext.getRMApps().get(applicationId)); } catch (Exception e) { LOG.error("Failed to store application:" + applicationId, e); ExitUtil.terminate(1, e); } LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, "ClientRMService", applicationId); SubmitApplicationResponse response = recordFactory .newRecordInstance(SubmitApplicationResponse.class); return response; }
YARN采用了基于事件驱动的编程模型,一个状态的改变可以触发一个或多个事件,同时可以触发其他状态发生变化。对于RMAppManagerSubmitEvent他有两种状态APP_SUBMIT和APP_COMPLETED,在RMAppManager Handle这个Event的时候,如果是APP_COMPLETED则执行finishApplication将ApplicationId加入completedApps队列中,若状态为APP_SUBMIT则执行submitApplication方法生成一个RMApp,放入RMContext的Map<ApplicationId, RMApp> applications总,然后生成一个状态为START的RMAppEvent交由AsyncDispatcher中央调度器来处理。
更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/webkf/tools/
以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索application
, 状态
, 一个
, user
ProtocolBuffer
yarn job提交过程、yarn mapreduce、mapreduce on yarn、yarn mapreduce 区别、yarn和mapreduce,以便于您获取更多的相关知识。