JobControl 的实现原理

引入实例:贝叶斯分类

贝叶斯分类是一种利用概率统计知识进行分类的统计学分类方法。该方法包括两个步骤:训练样本和分类。

其实现由多个MapReduce 作业完成,如图所示。其中,训练样本可由三个 MapReduce 作业实现:

第一个作业(ExtractJob)抽取文档特征,该作业只需要 Map 即可完成 ;

第二个作业(ClassPriorJob)计算类别的先验概率,即统计每个类别中文档的数目,并计算类别概率;

第三个作业(ConditionalProbilityJob)计算单词的条件概率,即统计<label, word> 在所有文档中出现的次数并计算单词的条件概率。

后两个作业的具体实现类似于WordCount。分类过程由一个作业(PredictJob)完成。该作业的
map()函数计算每个待分类文档属于每个类别的概率,reduce() 函数找出每个文档概率最高的类别,并输出 <docid,
label>( 编号为 docid 的文档属于类别 label)。

一个完整的贝叶斯分类算法可能需要 4 个有依赖关系的 MapReduce 作业完成,传统的做法是:为每个作业创建相应的 JobConf 对象,并按照依赖关系依次(串行)提交各个作业,如下所示:

// 为 4 个作业分别创建 JobConf 对象
JobConf extractJobConf = new JobConf(ExtractJob.class);
JobConf classPriorJobConf = new JobConf(ClassPriorJob.class);
JobConf conditionalProbilityJobConf = new JobConf(ConditionalProbilityJob. class) ;
JobConf predictJobConf = new JobConf(PredictJob.class);
...// 配置各个 JobConf
// 按照依赖关系依次提交作业
JobClient.runJob(extractJobConf);
JobClient.runJob(classPriorJobConf);
JobClient.runJob(conditionalProbilityJobConf);
JobClient.runJob(predictJobConf);

如果使用 JobControl,则用户只需使用 addDepending() 函数添加作业依赖关系接口,JobControl 会按照依赖关系调度各个作业,具体代码如下:

Configuration extractJobConf = new Configuration();
Configuration classPriorJobConf = new Configuration();
Configuration conditionalProbilityJobConf = new Configuration();
Configuration predictJobConf = new Configuration();
...// 设置各个Configuration
// 创建Job对象。注意,JobControl要求作业必须封装成Job对象
Job extractJob = new Job(extractJobConf);
Job classPriorJob = new Job(classPriorJobConf);
Job conditionalProbilityJob = new Job(conditionalProbilityJobConf);
Job predictJob = new Job(predictJobConf);
//设置依赖关系,构造一个DAG作业
classPriorJob.addDepending(extractJob);
conditionalProbilityJob.addDepending(extractJob);
predictJob.addDepending(classPriorJob);
predictJob.addDepending(conditionalProbilityJob);
//创建JobControl对象,由它对作业进行监控和调度
JobControl JC = new JobControl("Native Bayes");
JC.addJob(extractJob);//把4个作业加入JobControl中
JC.addJob(classPriorJob);
JC.addJob(conditionalProbilityJob);
JC.addJob(predictJob);
JC.run(); //提交DAG作业

在实际运行过程中,不依赖于其他任何作业的 extractJob 会优先得到调度,一旦运行完成,classPriorJob 和
conditionalProbilityJob 两个作业同时被调度,待它们全部运行完成后,predictJob
被调度。对比以上两种方案,可以得到一个简单的结论:使用 JobControl 编写 DAG 作业更加简便,且能使多个无依赖关系的作业并行运行。

JobControl 设计原理分析

JobControl 由两个类组成:Job 和 JobControl。其中,Job 类封装了一个 MapReduce 作业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态,以此更新自己的状态,其
状态转移图如图所示。作业刚开始处于 WAITING 状态。如果没有依赖作业或者所有依赖作业均已运行完成,则进入READY 状态。一旦进入
READY 状态,则作业可被提交到 Hadoop 集群上运行,并进入 RUNNING 状态。在 RUNNING
状态下,根据作业运行情况,可能进入 SUCCESS 或者 FAILED 状态。需要注意的是,如果一个作业的依赖作业失败,则该作业也会失败,于是形成“多米诺骨牌效应”, 后续所有作业均会失败。

JobControl
封装了一系列 MapReduce 作业及其对应的依赖关系。
它将处于不同状态的作业放入不同的哈希表中,并按照图所示的状态转移作业,直到所有作业运行完成。在实现的时候,JobControl
包含一个线程用于周期性地监控和更新各个作业的运行状态,调度依赖作业运行完成的作业,提交处于 READY 状态的作业等。同时,它还提供了一些API
用于挂起、恢复和暂停该线程。

Job类深入剖析

在Job类的起始部分,定义了一些数据域,包括job所处的状态,以及其他相关的信息,具体代码如下:

import java.util.ArrayList;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.jobcontrol.Job;

  // 一个 job 将处于如下的一种状态
  final public static int SUCCESS = 0;    //成功
  final public static int WAITING = 1;     //警告
  final public static int RUNNING = 2;    //运行
  final public static int READY = 3;    //准备
  final public static int FAILED = 4;    //失败
  final public static int DEPENDENT_FAILED = 5;    //依赖的作业失败

  private JobConf theJobConf;
  private int state;
  private String jobID;         // 通过JobControl class分配和使用
  private JobID mapredJobID;    // 通过map/reduce分配的job ID
  private String jobName;        // 外部名字, 通过client app分配/使用
  private String message;        // 一些有用的信息例如用户消耗,
  // e.g. job失败的原因
  private ArrayList<Job> dependingJobs;    // 当前job所依赖的jobs列表
  private JobClient jc = null;        // map reduce job client

接着定义了两个构造函数:

  /**
   * Construct a job.
   * @param jobConf a mapred job configuration representing a job to be executed.
   * @param dependingJobs an array of jobs the current job depends on
   */
  public Job(JobConf jobConf, ArrayList<Job> dependingJobs) throws IOException {
    this.theJobConf = jobConf;
    this.dependingJobs = dependingJobs;
    this.state = Job.WAITING;
    this.jobID = "unassigned";
    this.mapredJobID = null; //not yet assigned
    this.jobName = "unassigned";
    this.message = "just initialized";
    this.jc = new JobClient(jobConf);
  }

  /**
   * Construct a job.
   *
   * @param jobConf mapred job configuration representing a job to be executed.
   * @throws IOException
   */
  public Job(JobConf jobConf) throws IOException {
    this(jobConf, null);
  }

接着重写了String类中的toString方法,代码如下:

toString

接下来是一长串的get/set获取设置属性的代码: 

get/set

当Job处于writing状态下的时候,可以向依赖列表中添加所依赖的Job:

  /**
   * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job
   * is waiting to run, not during or afterwards.
   *
   * @param dependingJob Job that this Job depends on.
   * @return <tt>true</tt> if the Job was added.
   */
  public synchronized boolean addDependingJob(Job dependingJob) {
    if (this.state == Job.WAITING) { //only allowed to add jobs when waiting
      if (this.dependingJobs == null) {
        this.dependingJobs = new ArrayList<Job>();
      }
      return this.dependingJobs.add(dependingJob);
    } else {
      return false;
    }
  }

还提供了是否处于完成状态和是否处于准备状态的判断方法:

  /**
   * @return true if this job is in a complete state
   */
  public boolean isCompleted() {
    return this.state == Job.FAILED ||
      this.state == Job.DEPENDENT_FAILED ||
      this.state == Job.SUCCESS;
  }

  /**
   * @return true if this job is in READY state
   */
  public boolean isReady() {
    return this.state == Job.READY;
  }
    

提供了检查正在运行的Job的状态,如果完成,判断是成功还是失败,代码如下:

/**
   * Check the state of this running job. The state may
   * remain the same, become SUCCESS or FAILED.
   */
  private void checkRunningState() {
    RunningJob running = null;
    try {
      running = jc.getJob(this.mapredJobID);
      if (running.isComplete()) {
        if (running.isSuccessful()) {
          this.state = Job.SUCCESS;
        } else {
          this.state = Job.FAILED;
          this.message = "Job failed!";
          try {
            running.killJob();
          } catch (IOException e1) {

          }
          try {
            this.jc.close();
          } catch (IOException e2) {

          }
        }
      }

    } catch (IOException ioe) {
      this.state = Job.FAILED;
      this.message = StringUtils.stringifyException(ioe);
      try {
        if (running != null)
          running.killJob();
      } catch (IOException e1) {

      }
      try {
        this.jc.close();
      } catch (IOException e1) {

      }
    }
  }

实现了检查并更新Job的状态的checkState()方法:

/**
   * Check and update the state of this job. The state changes
   * depending on its current state and the states of the depending jobs.
   */
   synchronized int checkState() {
    if (this.state == Job.RUNNING) {
      checkRunningState();
    }
    if (this.state != Job.WAITING) {
      return this.state;
    }
    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
      this.state = Job.READY;
      return this.state;
    }
    Job pred = null;
    int n = this.dependingJobs.size();
    for (int i = 0; i < n; i++) {
      pred = this.dependingJobs.get(i);
      int s = pred.checkState();
      if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
        break; // a pred is still not completed, continue in WAITING
        // state
      }
      if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
        this.state = Job.DEPENDENT_FAILED;
        this.message = "depending job " + i + " with jobID "
          + pred.getJobID() + " failed. " + pred.getMessage();
        break;
      }
      // pred must be in success state
      if (i == n - 1) {
        this.state = Job.READY;
      }
    }
    return this.state;
  }
    

最后包含提交Job的方法submit(),代码如下:

  /**
   * Submit this job to mapred. The state becomes RUNNING if submission
   * is successful, FAILED otherwise.
   */
  protected synchronized void submit() {
    try {
      if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
        FileSystem fs = FileSystem.get(theJobConf);
        Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
        for (int i = 0; i < inputPaths.length; i++) {
          if (!fs.exists(inputPaths[i])) {
            try {
              fs.mkdirs(inputPaths[i]);
            } catch (IOException e) {

            }
          }
        }
      }
      RunningJob running = jc.submitJob(theJobConf);
      this.mapredJobID = running.getID();
      this.state = Job.RUNNING;
    } catch (IOException ioe) {
      this.state = Job.FAILED;
      this.message = StringUtils.stringifyException(ioe);
    }
  }

}

完整的Job类源代码如下:

Job

JobControl类深入剖析

在JobControl类的起始部分,定义了一些数据域,包括线程所处的状态,以及其他相关的信息,具体代码如下:

  // The thread can be in one of the following state
  private static final int RUNNING = 0;
  private static final int SUSPENDED = 1;
  private static final int STOPPED = 2;
  private static final int STOPPING = 3;
  private static final int READY = 4;

  private int runnerState;            // the thread state

  private Map<String, Job> waitingJobs;
  private Map<String, Job> readyJobs;
  private Map<String, Job> runningJobs;
  private Map<String, Job> successfulJobs;
  private Map<String, Job> failedJobs;

  private long nextJobID;
  private String groupName;

接下来是对应的构造函数:

  /**
   * Construct a job control for a group of jobs.
   * @param groupName a name identifying this group
   */
  public JobControl(String groupName) {
    this.waitingJobs = new Hashtable<String, Job>();
    this.readyJobs = new Hashtable<String, Job>();
    this.runningJobs = new Hashtable<String, Job>();
    this.successfulJobs = new Hashtable<String, Job>();
    this.failedJobs = new Hashtable<String, Job>();
    this.nextJobID = -1;
    this.groupName = groupName;
    this.runnerState = JobControl.READY;
  }

接着是一个将Map的Jobs转换为ArrayList的转换方法(toArrayList),代码如下:

private static ArrayList<Job> toArrayList(Map<String, Job> jobs) {
    ArrayList<Job> retv = new ArrayList<Job>();
    synchronized (jobs) {
      for (Job job : jobs.values()) {
        retv.add(job);
      }
    }
    return retv;
}

类中当然少不了一些get方法:

  /**
   * @return the jobs in the success state
   */
  public ArrayList<Job> getSuccessfulJobs() {
    return JobControl.toArrayList(this.successfulJobs);
  }
  public ArrayList<Job> getFailedJobs() {
    return JobControl.toArrayList(this.failedJobs);
  }
  private String getNextJobID() {
    nextJobID += 1;
    return this.groupName + this.nextJobID;
  }

类中还有将Job插入Job队列的方法:

 private static void addToQueue(Job aJob, Map<String, Job> queue) {
    synchronized(queue) {
      queue.put(aJob.getJobID(), aJob);
    }
  }

  private void addToQueue(Job aJob) {
    Map<String, Job> queue = getQueue(aJob.getState());
    addToQueue(aJob, queue);
  }

既然有插入队列,就有从Job队列根据Job运行状态而取出的方法,代码如下:

  private Map<String, Job> getQueue(int state) {
    Map<String, Job> retv = null;
    if (state == Job.WAITING) {
      retv = this.waitingJobs;
    } else if (state == Job.READY) {
      retv = this.readyJobs;
    } else if (state == Job.RUNNING) {
      retv = this.runningJobs;
    } else if (state == Job.SUCCESS) {
      retv = this.successfulJobs;
    } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
      retv = this.failedJobs;
    }
    return retv;
  }

添加一个新的Job的方法:

  /**
   * Add a new job.
   * @param aJob the new job
   */
  synchronized public String addJob(Job aJob) {
    String id = this.getNextJobID();
    aJob.setJobID(id);
    aJob.setState(Job.WAITING);
    this.addToQueue(aJob);
    return id;
  }

  /**
   * Add a collection of jobs
   *
   * @param jobs
   */
  public void addJobs(Collection<Job> jobs) {
    for (Job job : jobs) {
      addJob(job);
    }
  }

获取线程的状态,设置、停止线程的方法:

/**
   * @return the thread state
   */
  public int getState() {
    return this.runnerState;
  }

  /**
   * set the thread state to STOPPING so that the
   * thread will stop when it wakes up.
   */
  public void stop() {
    this.runnerState = JobControl.STOPPING;
  }

  /**
   * suspend the running thread
   */
  public void suspend () {
    if (this.runnerState == JobControl.RUNNING) {
      this.runnerState = JobControl.SUSPENDED;
    }
  }

  /**
   * resume the suspended thread
   */
  public void resume () {
    if (this.runnerState == JobControl.SUSPENDED) {
      this.runnerState = JobControl.RUNNING;
    }
  }
    

检查运行、等待的Jobs,将符合条件的添加至相应的队列: 

  synchronized private void checkRunningJobs() {
    Map<String, Job> oldJobs = null;
    oldJobs = this.runningJobs;
    this.runningJobs = new Hashtable<String, Job>();

    for (Job nextJob : oldJobs.values()) {
      int state = nextJob.checkState();
      /*
        if (state != Job.RUNNING) {
        System.out.println("The state of the running job " +
        nextJob.getJobName() + " has changed to: " + nextJob.getState());
        }
      */
      this.addToQueue(nextJob);
    }
  }

  synchronized private void checkWaitingJobs() {
    Map<String, Job> oldJobs = null;
    oldJobs = this.waitingJobs;
    this.waitingJobs = new Hashtable<String, Job>();

    for (Job nextJob : oldJobs.values()) {
      int state = nextJob.checkState();
      /*
        if (state != Job.WAITING) {
        System.out.println("The state of the waiting job " +
        nextJob.getJobName() + " has changed to: " + nextJob.getState());
        }
      */
      this.addToQueue(nextJob);
    }
  }

  synchronized private void startReadyJobs() {
    Map<String, Job> oldJobs = null;
    oldJobs = this.readyJobs;
    this.readyJobs = new Hashtable<String, Job>();

    for (Job nextJob : oldJobs.values()) {
      //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
      nextJob.submit();
      //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
      this.addToQueue(nextJob);
    }
  }

判断是否所有的JOb都结束的方法:

  synchronized public boolean allFinished() {
    return this.waitingJobs.size() == 0 &&
      this.readyJobs.size() == 0 &&
      this.runningJobs.size() == 0;
  }
    

检查运行Jobs的状态、更新等待Job状态、在准备状态下提交的Run方法:

/**
   *  The main loop for the thread.
   *  The loop does the following:
   *      Check the states of the running jobs
   *      Update the states of waiting jobs
   *      Submit the jobs in ready state
   */
  public void run() {
    this.runnerState = JobControl.RUNNING;
    while (true) {
      while (this.runnerState == JobControl.SUSPENDED) {
        try {
          Thread.sleep(5000);
        }
        catch (Exception e) {

        }
      }
      checkRunningJobs();
      checkWaitingJobs();
      startReadyJobs();
      if (this.runnerState != JobControl.RUNNING &&
          this.runnerState != JobControl.SUSPENDED) {
        break;
      }
      try {
        Thread.sleep(5000);
      }
      catch (Exception e) {

      }
      if (this.runnerState != JobControl.RUNNING &&
          this.runnerState != JobControl.SUSPENDED) {
        break;
      }
    }
    this.runnerState = JobControl.STOPPED;
  }

}

完整的JobControl类:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred.jobcontrol;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Hashtable;
import java.util.Map;

/** This class encapsulates a set of MapReduce jobs and its dependency. It tracks
 *  the states of the jobs by placing them into different tables according to their
 *  states.
 *
 *  This class provides APIs for the client app to add a job to the group and to get
 *  the jobs in the group in different states. When a
 *  job is added, an ID unique to the group is assigned to the job.
 *
 *  This class has a thread that submits jobs when they become ready, monitors the
 *  states of the running jobs, and updates the states of jobs based on the state changes
 *  of their depending jobs states. The class provides APIs for suspending/resuming
 *  the thread,and for stopping the thread.
 *
 */
public class JobControl implements Runnable{

  // The thread can be in one of the following state
  private static final int RUNNING = 0;
  private static final int SUSPENDED = 1;
  private static final int STOPPED = 2;
  private static final int STOPPING = 3;
  private static final int READY = 4;

  private int runnerState;            // the thread state

  private Map<String, Job> waitingJobs;
  private Map<String, Job> readyJobs;
  private Map<String, Job> runningJobs;
  private Map<String, Job> successfulJobs;
  private Map<String, Job> failedJobs;

  private long nextJobID;
  private String groupName;

  /**
   * Construct a job control for a group of jobs.
   * @param groupName a name identifying this group
   */
  public JobControl(String groupName) {
    this.waitingJobs = new Hashtable<String, Job>();
    this.readyJobs = new Hashtable<String, Job>();
    this.runningJobs = new Hashtable<String, Job>();
    this.successfulJobs = new Hashtable<String, Job>();
    this.failedJobs = new Hashtable<String, Job>();
    this.nextJobID = -1;
    this.groupName = groupName;
    this.runnerState = JobControl.READY;
  }

  private static ArrayList<Job> toArrayList(Map<String, Job> jobs) {
    ArrayList<Job> retv = new ArrayList<Job>();
    synchronized (jobs) {
      for (Job job : jobs.values()) {
        retv.add(job);
      }
    }
    return retv;
  }

  /**
   * @return the jobs in the waiting state
   */
  public ArrayList<Job> getWaitingJobs() {
    return JobControl.toArrayList(this.waitingJobs);
  }

  /**
   * @return the jobs in the running state
   */
  public ArrayList<Job> getRunningJobs() {
    return JobControl.toArrayList(this.runningJobs);
  }

  /**
   * @return the jobs in the ready state
   */
  public ArrayList<Job> getReadyJobs() {
    return JobControl.toArrayList(this.readyJobs);
  }

  /**
   * @return the jobs in the success state
   */
  public ArrayList<Job> getSuccessfulJobs() {
    return JobControl.toArrayList(this.successfulJobs);
  }

  public ArrayList<Job> getFailedJobs() {
    return JobControl.toArrayList(this.failedJobs);
  }

  private String getNextJobID() {
    nextJobID += 1;
    return this.groupName + this.nextJobID;
  }

  private static void addToQueue(Job aJob, Map<String, Job> queue) {
    synchronized(queue) {
      queue.put(aJob.getJobID(), aJob);
    }
  }

  private void addToQueue(Job aJob) {
    Map<String, Job> queue = getQueue(aJob.getState());
    addToQueue(aJob, queue);
  }

  private Map<String, Job> getQueue(int state) {
    Map<String, Job> retv = null;
    if (state == Job.WAITING) {
      retv = this.waitingJobs;
    } else if (state == Job.READY) {
      retv = this.readyJobs;
    } else if (state == Job.RUNNING) {
      retv = this.runningJobs;
    } else if (state == Job.SUCCESS) {
      retv = this.successfulJobs;
    } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
      retv = this.failedJobs;
    }
    return retv;
  }

  /**
   * Add a new job.
   * @param aJob the new job
   */
  synchronized public String addJob(Job aJob) {
    String id = this.getNextJobID();
    aJob.setJobID(id);
    aJob.setState(Job.WAITING);
    this.addToQueue(aJob);
    return id;
  }

  /**
   * Add a collection of jobs
   *
   * @param jobs
   */
  public void addJobs(Collection<Job> jobs) {
    for (Job job : jobs) {
      addJob(job);
    }
  }

  /**
   * @return the thread state
   */
  public int getState() {
    return this.runnerState;
  }

  /**
   * set the thread state to STOPPING so that the
   * thread will stop when it wakes up.
   */
  public void stop() {
    this.runnerState = JobControl.STOPPING;
  }

  /**
   * suspend the running thread
   */
  public void suspend () {
    if (this.runnerState == JobControl.RUNNING) {
      this.runnerState = JobControl.SUSPENDED;
    }
  }

  /**
   * resume the suspended thread
   */
  public void resume () {
    if (this.runnerState == JobControl.SUSPENDED) {
      this.runnerState = JobControl.RUNNING;
    }
  }

  synchronized private void checkRunningJobs() {

    Map<String, Job> oldJobs = null;
    oldJobs = this.runningJobs;
    this.runningJobs = new Hashtable<String, Job>();

    for (Job nextJob : oldJobs.values()) {
      int state = nextJob.checkState();
      /*
        if (state != Job.RUNNING) {
        System.out.println("The state of the running job " +
        nextJob.getJobName() + " has changed to: " + nextJob.getState());
        }
      */
      this.addToQueue(nextJob);
    }
  }

  synchronized private void checkWaitingJobs() {
    Map<String, Job> oldJobs = null;
    oldJobs = this.waitingJobs;
    this.waitingJobs = new Hashtable<String, Job>();

    for (Job nextJob : oldJobs.values()) {
      int state = nextJob.checkState();
      /*
        if (state != Job.WAITING) {
        System.out.println("The state of the waiting job " +
        nextJob.getJobName() + " has changed to: " + nextJob.getState());
        }
      */
      this.addToQueue(nextJob);
    }
  }

  synchronized private void startReadyJobs() {
    Map<String, Job> oldJobs = null;
    oldJobs = this.readyJobs;
    this.readyJobs = new Hashtable<String, Job>();

    for (Job nextJob : oldJobs.values()) {
      //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
      nextJob.submit();
      //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
      this.addToQueue(nextJob);
    }
  }

  synchronized public boolean allFinished() {
    return this.waitingJobs.size() == 0 &&
      this.readyJobs.size() == 0 &&
      this.runningJobs.size() == 0;
  }

  /**
   *  The main loop for the thread.
   *  The loop does the following:
   *      Check the states of the running jobs
   *      Update the states of waiting jobs
   *      Submit the jobs in ready state
   */
  public void run() {
    this.runnerState = JobControl.RUNNING;
    while (true) {
      while (this.runnerState == JobControl.SUSPENDED) {
        try {
          Thread.sleep(5000);
        }
        catch (Exception e) {

        }
      }
      checkRunningJobs();
      checkWaitingJobs();
      startReadyJobs();
      if (this.runnerState != JobControl.RUNNING &&
          this.runnerState != JobControl.SUSPENDED) {
        break;
      }
      try {
        Thread.sleep(5000);
      }
      catch (Exception e) {

      }
      if (this.runnerState != JobControl.RUNNING &&
          this.runnerState != JobControl.SUSPENDED) {
        break;
      }
    }
    this.runnerState = JobControl.STOPPED;
  }

}
时间: 2024-09-18 00:07:35

JobControl 的实现原理的相关文章

路由器QOS功能原理和工作方式

设置路由器时,大多会用到路由器的安全机制,也就常说的QOS功能,QOS功能可以保护整个网络的安全,本篇带你了解其具体的原理和工作的方式. 一.QOS用来解决带宽解决网络延迟和阻塞等问题的一种技术,一般里面包含优先级别.弹性带宽管理等等,主要用来解决各种网络的攻击和病毒,保护网络的正常运行,它主要有以下几个方面的功能: 1.端口优先:可针对源端口.目的端口进行设置优先的级别,一般来说如果是玩游戏为主.那么我可以针对一些主流游戏的端口.优先这些游戏的带宽. 2.IP/网段优先:可针对源IP.目的IP

PhotoShop中正片负片叠底的原理介绍

关于正片叠底,正片,负片,通道,色相,色相环等等的相关理论一堆,大家可以从网上查到,原理就不讲了. 感觉单通道正片叠底效果应该属于填充色一类,但却与填充色又有很大的差异,与照片滤镜功能也有所差异,运用得当,最大的优点是在叠底后仍能保持比较好的照片通透度,而且简单易用,特别适合不太熟悉PS操作的朋友,此类方法运用广泛,配合起来使用比较方便,慢慢介绍吧. photoshop教程注:以下介绍的为RGB模式下的叠底,与CMYK模式下有所区别 方法一,单通道正片叠底 例一,叠出阳光色.提示:图片应尽量少漏

JavaScript 预解析的原理及实现

事实上或某种现象证明并不是这样的,通过<JavaScript权威指南>及网上相关资料了解到,JavaScript有"预解析"行为.理解这一特性是很重要的,不然在实际开发中你可能会遇到很多无从解析的问题,甚至导致程序bug的存在.为了解析这一现象,也作为自己的一次学习总结,本文逐步引导你来认识JavaScript"预解析",如果我的见解有误,还望指正. (1) 如果JavaScript仅是运行时自上往下逐句解析的,下面的代码能正确运行是可以理解的,因为我们

控件-mscomm串口波形绘制范例,求大神解析这三个函数,急急急,绘制波形图的原理是什么,拜托了

问题描述 mscomm串口波形绘制范例,求大神解析这三个函数,急急急,绘制波形图的原理是什么,拜托了 //串口void CPort_testDlg::OnComm() { //if(stop)return; VARIANT m_input1; COleSafeArray m_input2; long lengthi; BYTE data[600]; CString str; int ai=0bi=0ci=0di=0; int sum=0; if(m_Comm.GetCommEvent()==2)

你应该知道的RPC原理

      在学校期间大家都写过不少程序,比如写个hello world服务类,然后本地调用下,如下所示.这些程序的特点是服务消费方和服务提供方是本地调用关系. 而一旦踏入公司尤其是大型互联网公司就会发现,公司的系统都由成千上万大大小小的服务组成,各服务部署在不同的机器上,由不同的团队负责.这时就会遇到两个问题:1)要搭建一个新服务,免不了需要依赖他人的服务,而现在他人的服务都在远端,怎么调用?2)其它团队要使用我们的新服务,我们的服务该怎么发布以便他人调用?下文将对这两个问题展开探讨. 1 p

计算机网络原理相关面试问题

1.简单介绍OSI的七层网络模型,画图描绘,描述主要几层的各自作用.OSI(Open System Interconnect,开放系统互连)七层网络模型. TCP/IP四层模型和OSI七层模型 表1-1是 TCP/IP四层模型和OSI七层模型对应表.我们把OSI七层网络模型和Linux TCP/IP四层概念模型对应,然后将各种网络协议归类. 表1-1  TCP/IP四层模型和OSI七层模型对应表 OSI七层网络模型 Linux TCP/IP四层概念模型 对应网络协议 应用层(Applicatio

图片原理与优化 如何在网站设计中发挥更好的效果

中介交易 SEO诊断 淘宝客 云主机 技术大厅 前言:该文收集了前辈们的一些关于图片优化的技巧,在此收拢到一起,对于各个方法的优化原理做了一些研究,希望能给大家对于图片优化这一块起到抛砖引玉的作用. 提到图片,我们不得不从位图开始说起,位图图像(bitmap),也称为点阵图像或绘制图像,是由称作像素(图片元素)的单个点组成的.这些点可以进行不同的排列和染色以构成一副图片.当放大位图时,可以看见赖以构成整个图像的无数单个方块. 常见的格式中JPG.PNG.GIF亦属于位图,所以它们的数据结构大致相

懒人促进社会进步 - 5种索引的原理和优化Case (btree,hash,gin,gist,brin)

标签 PostgreSQL , 多列聚合 , gin , btree , n_distinct , 选择性 , 如何选择索引方法(hash,btree,gin,gist,brin) , 如何优化索引 , 相关性 背景 在广告行业,精准营销是一个较热的话题,之前写过一个案例,如何使用PostgreSQL的array类型和GIN索引实时圈人的场景. <万亿级营销(圈人)迈入毫秒时代 - 万亿user_tags级实时推荐系统数据库设计> 使用以上方法,程序需要作出一些调整(当然,如果用户原本就是Po

HybridDB PostgreSQL &quot;Sort、Group、distinct 聚合、JOIN&quot; 不惧怕数据倾斜的黑科技和原理 - 多阶段聚合

标签 PostgreSQL , Greenplum , JOIN , group by , distinct , 聚合 , 非分布键 , 数据倾斜 , 多阶段聚合 背景 对于分布式系统,数据分布存储,例如随机.哈希分布. Greenplum数据库支持两种数据分布模式: 1.哈希(指定单个.或多个字段) 2.随机分布(无需指定任何字段) 数据分布存储后,面临一些挑战: JOIN,排序,group by,distinct. 1.JOIN涉及非分布键字段 2.排序,如何保证输出顺序全局有序 3.gro