org.apache.hadoop.mapred.JobTracker类是个独立的进程,有自己的main函数。JobTracker是在网络环境中提交及运行MR任务的核心位置。
main方法主要代码有两句:
//创建jobTracker对象
JobTracker tracker = startTracker(new JobConf());
//启动各个服务,包括JT内部一些重要的服务或者线程
tracker.offerService();
一、startTracker(new JobConf())根据配置文件启动JobTracker,这个方法会调用startTracker(conf, generateNewIdentifier())方法进行启动操作,generateNewIdentifier()将会返回一个以节点当前时间格式化成“yyyyMMddHHmm”的字符串。
startTracker函数是一个静态函数,它调用JobTracker的构造函数生成一个JobTracker类的实例,构造函数主要工作是对一些重要的变量进行初始化,名为result。然后,进行了一系列初始化活动,包括启动RPC server,启动内置的jetty服务器,检查是否需要重启JobTracker等。
初始化的重要对象包括:
1、secretManager:DelegationTokenSecretManager的实例,MR安全管理相关类;
2、aclsManager:ACLsManager的实例,作业级别和队列级别的管理和访问权限控制;
3、taskScheduler:TaskScheduler的实例,调度器对象,hadoop默认的调度器是FIFO策略的JobQueueTaskScheduler;
4、interTrackerServer:Server的实例,RPC Server;
5、infoServer:HttpServer的实例,将Job、Task、TaskTracker相关信息显示到WEB前端,封装的是jetty;
6、recoveryManager:RecoveryManager的实例,作业恢复管理,即JobTracker启动时,恢复上次停止时正在运行的作业,并恢复各个任务的运行状态;recoveryManager.checkAndAddJob(status)会检查出那些作业需要恢复并放入Set<JobID> jobsToRecover; // set of jobs to be recovered,为后面的recoveryManager.recover()做准备;
7、jobHistoryServer:JobHistoryServer的实例,用于查看作业历史信息的Server;
8、dnsToSwitchMapping:DNSToSwitchMapping的实例,用于构建集群的网络拓扑结构,它能将节点地址(IP或者host)映射成网络位置。
二、 tracker.offerService()
/** * Run forever */ public void offerService() throws InterruptedException, IOException { // Prepare for recovery. This is done irrespective of the status of restart // flag. while (true) { try { recoveryManager.updateRestartCount(); break; } catch (IOException ioe) { LOG.warn("Failed to initialize recovery manager. ", ioe); // wait for some time Thread.sleep(FS_ACCESS_RETRY_PERIOD); LOG.warn("Retrying..."); } } taskScheduler.start(); // Start the recovery after starting the scheduler try { recoveryManager.recover(); } catch (Throwable t) { LOG.warn("Recovery manager crashed! Ignoring.", t); } // refresh the node list as the recovery manager might have added // disallowed trackers refreshHosts(); //用于发现和清理死掉的TaskTracker this.expireTrackersThread = new Thread(this.expireTrackers, "expireTrackers"); this.expireTrackersThread.start(); //用于清理长时间驻留在内存中的已经运行完成的作业信息 this.retireJobsThread = new Thread(this.retireJobs, "retireJobs"); this.retireJobsThread.start(); //用于发现已经被分配给某个TaskTracker但一直未汇报信息的任务 expireLaunchingTaskThread.start(); if (completedJobStatusStore.isActive()) { completedJobsStoreThread = new Thread(completedJobStatusStore, "completedjobsStore-housekeeper"); //将已经运行完成的作业运行信息保存到HDFS上,并提供了一套存取这些信息的API。 completedJobsStoreThread.start(); } // start the inter-tracker server once the jt is ready this.interTrackerServer.start(); synchronized (this) { state = State.RUNNING; } LOG.info("Starting RUNNING"); this.interTrackerServer.join(); LOG.info("Stopped interTrackerServer"); }
返回栏目页:http://www.bianceng.cnhttp://www.bianceng.cn/webkf/tools/
以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索实例
, 函数
, 运行
, jetty
, this
, 作业
, jobtracker
, 控制实例源码
Disallowed
jobtracker没有启动、hadoop启动jobtracker、jobtracker 无法启动、jobtracker 启动、mapred.job.tracker,以便于您获取更多的相关知识。