Quartz.NET 任务调度的核心元素是 scheduler, trigger 和 job,其中 trigger(用于定义调度时间的元素,即按照什么时间规则去执行任务) 和 job 是任务调度的元数据,scheduler 是实际执行调度的控制器。在Quartz.NET中主要有两种类型的 job:无状态的(stateless)和有状态的(stateful)。对于同一个 trigger 来说,有状态的 job 不能被并行执行,只有上一次触发的任务被执行完之后,才能触发下一次执行。无状态任务一般指可以并发的任务,即任务之间是独立的,不会互相干扰。一个 job 可以被多个 trigger 关联,但是一个 trigger 只能关联一个 job。某些任务需要对数据库中的数据进行增删改处理 , 这些任务不能并发执行,就需要用到无状态的任务 , 否则会造成数据混乱。
另外有些情况下,我们需要将任务保存到数据库中,特别是有些任务中包含参数,例如累加的任务,如果可以保存到数据库中,即便中间断电或者程序异常重启,中间计算的结果也不会丢失,可以从断点的结果进行运算(首先恢复任务),下面介绍一下如何用AdoJobStore将任务保存到SQL Server数据库中.
事先要在数据库上新建一个QRTZ_数据库,并执行SQL建表脚本:
1 RecoveryJob
是一个无状态的任务,代码如下:
using System;
using System.Collections.Specialized;
using System.Threading;
using Common.Logging;
using Quartz;
using Quartz.Impl;
using Quartz.Job;
using System.Windows.Forms;
namespace QuartzDemo
{
/// <summary>
/// 无状态的可恢复的任务
/// </summary>
public class RecoveryJob : IJob
{
private const string Count = "count";
public virtual void Execute(IJobExecutionContext context)
{
JobKey jobKey = context.JobDetail.Key;
if (isOpen("FrmConsole"))
{
try
{
//获取当前Form1实例
__instance = (FrmConsole)Application.OpenForms["FrmConsole"];
// 如果任务是恢复的任务的话
if (context.Recovering)
{
__instance.SetInfo(string.Format("{0} RECOVERING at {1}", jobKey, DateTime.Now.ToString("r")));
}
else
{
__instance.SetInfo(string.Format("{0} starting at {1}", jobKey, DateTime.Now.ToString("r")));
}
JobDataMap data = context.JobDetail.JobDataMap;
int count;
if (data.ContainsKey(Count))
{
//是否能从数据库中恢复,如果保存Job等信息的话,程序运行突然终端(可用调试时中断运行,而不是关闭窗体来模拟)
count = data.GetInt(Count);
}
else
{
count = 0;
}
count++;
data.Put(Count, count);
__instance.SetInfo(string.Format(" {0} Count #{1}", jobKey, count));
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
private static FrmConsole __instance = null;
/// <summary>
/// 判断窗体是否打开
/// </summary>
/// <param name="appName"></param>
/// <returns></returns>
private bool isOpen(string appName)
{
FormCollection collection = Application.OpenForms;
foreach (Form form in collection)
{
if (form.Name == appName)
{
return true;
}
}
return false;
}
}
}
2 RecoveryStatefulJob
是一个有状态的任务,和无状态的区别就是在任务类的上面用[PersistJobDataAfterExecution]标注任务是有状态的 , 有状态的任务不允许并发执行,也需要标注 [DisallowConcurrentExecution],代码如下:
using System;
using System.Collections.Specialized;
using System.Threading;
using Common.Logging;
using Quartz;
using Quartz.Impl;
using Quartz.Job;
using System.Windows.Forms;
namespace QuartzDemo
{
/// <summary>
/// 用这个[PersistJobDataAfterExecution]标注任务是有状态的,
/// 有状态的任务不允许并发执行 [DisallowConcurrentExecution]
/// </summary>
[PersistJobDataAfterExecution]
[DisallowConcurrentExecution]
public class RecoveryStatefulJob : RecoveryJob
{
}
}
3 AdoJobStoreExample
用 properties["quartz.dataSource.default.connectionString"] = "Server=(local);Database=QRTZ_;Trusted_Connection=True;";定义了数据库的连接信息,程序运行时会自动将任务保存到数据库中:
using System;
using System.Collections.Specialized;
using System.Threading;
using Common.Logging;
using Quartz;
using Quartz.Impl;
using Quartz.Job;
using System.Windows.Forms;
namespace QuartzDemo
{
/// <summary>
/// AdoJobStore的用法示例
/// </summary>
public class AdoJobStoreExample
{
public virtual void Run(bool inClearJobs, bool inScheduleJobs)
{
NameValueCollection properties = new NameValueCollection();
properties["quartz.scheduler.instanceName"] = "TestScheduler";
properties["quartz.scheduler.instanceId"] = "instance_one";
properties["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz";
properties["quartz.threadPool.threadCount"] = "5";
properties["quartz.threadPool.threadPriority"] = "Normal";
properties["quartz.jobStore.misfireThreshold"] = "60000";
properties["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz";
properties["quartz.jobStore.useProperties"] = "false";
properties["quartz.jobStore.dataSource"] = "default";
properties["quartz.jobStore.tablePrefix"] = "QRTZ_";
properties["quartz.jobStore.clustered"] = "true";
// SQLite
// properties["quartz.jobStore.lockHandler.type"] = "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz";
properties["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.SqlServerDelegate, Quartz";
// 数据库连接字符串
properties["quartz.dataSource.default.connectionString"] = "Server=(local);Database=QRTZ_;Trusted_Connection=True;";
properties["quartz.dataSource.default.provider"] = "SqlServer-20";
// First we must get a reference to a scheduler
ISchedulerFactory sf = new StdSchedulerFactory(properties);
IScheduler sched = sf.GetScheduler();
bool b是否恢复 = false;
if (inClearJobs)
{
Console.WriteLine("***** Deleting existing jobs/triggers *****");
// sched.Clear();
}
if (inScheduleJobs)
{
string schedId = sched.SchedulerInstanceId;
int count = 1;
//定义一个无状态的任务
IJobDetail job = JobBuilder.Create<RecoveryJob>()
.WithIdentity("recoveryjob_" + count, schedId)
.RequestRecovery() //recovery
.Build();
ISimpleTrigger trigger = (ISimpleTrigger)TriggerBuilder.Create()
.WithIdentity("triger_" + count, schedId)
.StartAt(DateBuilder.FutureDate(1, IntervalUnit.Second))
.WithSimpleSchedule(x => x.WithRepeatCount(20).WithInterval(TimeSpan.FromSeconds(3)))
.Build();
//可用此来查看定义的触发器触发规则
//log.InfoFormat("{0} will run at: {1} and repeat: {2} times, every {3} seconds",
//job.Key, trigger.GetNextFireTimeUtc(),
//trigger.RepeatCount,
//trigger.RepeatInterval.TotalSeconds);
try
{
//如果数据库已经存在同名job和trigger,则绑定失败
sched.ScheduleJob(job, trigger);
}
catch
{
b是否恢复 = true;
}
count++;
//定义一个有状态的任务***********************************************************
job = JobBuilder.Create<RecoveryStatefulJob>()
.WithIdentity("Statefuljob_" + count, schedId)
.RequestRecovery() // recovery
.Build();
trigger = (ISimpleTrigger)TriggerBuilder.Create()
.WithIdentity("triger_" + count, schedId)
.StartAt(DateBuilder.FutureDate(1, IntervalUnit.Second))
.WithSimpleSchedule(x => x.WithRepeatCount(20).WithInterval(TimeSpan.FromSeconds(3)))
.Build();
try
{
sched.ScheduleJob(job, trigger);
}
catch
{
b是否恢复 = true;
}
}
//启动
sched.Start();
//sched.Shutdown();
}
public string Name
{
get { return GetType().Name; }
}
public void Run()
{
bool clearJobs = true;
//clearJobs = false;
bool scheduleJobs = true;
AdoJobStoreExample example = new AdoJobStoreExample();
example.Run(clearJobs, scheduleJobs);
}
}
}
可以看到有状态的计数每次累加1,而无状态的每次执行时都会丢失累加数(新的实例),中断程序,查看数据库的QRTZ_JOB_DETAILS表,可以看见还有一个持久化的任务:
中断程序后(调试状态时不关闭窗体,而是中断调试,模拟异常关闭) ,再重新运行可以看到如下界面:
Quartz.NET - AdoJobStore作业存储
Quartz提供两种基本作业存储类型。第一种类型叫做RAMJobStore,它利用通常的内存来持久化调度程序信息。这种作业存储类型最容易配置、构造和运行。Quartz.net缺省使用的就是RAMJobStore。对许多应用来说,这种作业存储已经足够了。
然而,因为调度程序信息是存储在被分配在内存里面,所以,当应用程序停止运行时,所有调度信息将被丢失。如果你需要在重新启动之间持久化调度信息,则将需要第二种类型的作业存储。为了修正这个问题,Quartz.NET 提供了 AdoJobStore。顾名思义,作业仓库通过 ADO.NET把所有数据放在数据库中。数据持久性的代价就是性能降低和复杂性的提高。它将所有的数据通过ADO.NET保存到数据库可中。它的配置要比RAMJobStore稍微复杂,同时速度也没有那么快。但是性能的缺陷不是非常差,尤其是如果你在数据库表的主键上建立索引。
AdoJobStore几乎可以在任何数据库上工作,它广泛地使用Oracle, MySQL, MS SQLServer2000, HSQLDB, PostreSQL以及 DB2。要使用AdoJobStore,首先必须创建一套Quartz使用的数据库表,可以在Quartz的database\tables找到创建库表的SQL脚本。如果没有找到你的数据库类型的脚本,那么找到一个已有的,修改成为你数据库所需要的。需要注意的一件事情就是所有Quartz库表名都以QRTZ_作为前缀(例如:表"QRTZ_TRIGGERS",及"QRTZ_JOB_DETAIL")。实际上,可以你可以将前缀设置为任何你想要的前缀,只要你告诉AdoJobStore那个前缀是什么即可(在你的Quartz属性文件中配置)。对于一个数据库中使用多个scheduler实例,那么配置不同的前缀可以创建多套库表,十分有用。
SQL Server代码连接如下所示:
//AdoJobStoreRunner
NameValueCollection properties = new NameValueCollection();
properties["quartz.scheduler.instanceName"] = "TestScheduler";
properties["quartz.scheduler.instanceId"] = "instance_one";
properties["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz";
properties["quartz.threadPool.threadCount"] = "5";
properties["quartz.threadPool.threadPriority"] = "Normal";
properties["quartz.jobStore.misfireThreshold"] = "60000";
properties["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz";
properties["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.StdAdoDelegate, Quartz";
properties["quartz.jobStore.useProperties"] = "false";
properties["quartz.jobStore.dataSource"] = "default";
properties["quartz.jobStore.tablePrefix"] = "QRTZ_";
properties["quartz.jobStore.clustered"] = "true";
// if running MS SQL Server we need this
properties["quartz.jobStore.selectWithLockSQL"] = "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = @lockName";
properties["quartz.dataSource.default.connectionString"] = "server=local;database=quartz;user id=<span style="font-family: Arial, Helvetica, sans-serif;">sa</span><span style="font-family: Arial, Helvetica, sans-serif;">;pwd=sa;pooling=false;";</span>
properties["quartz.dataSource.default.provider"] = "SqlServer-20";
ISchedulerFactory sf = new StdSchedulerFactory(properties);
IScheduler sched = sf.GetScheduler();
string schedId = sched.SchedulerInstanceId;
int count = 1;
for (int i = 0; i < 10; i++)
{
//作业
IJobDetail job = JobBuilder
.Create<Job1>()
.WithIdentity("计算作业" + i.ToString(), "组1")
.RequestRecovery() // ask scheduler to re-execute this job if it was in progress when the scheduler went down...
.Build();
DateTimeOffset runTime = DateBuilder.EvenMinuteDate(DateTimeOffset.UtcNow);
ITrigger trigger = TriggerBuilder.Create()
.WithIdentity("trigger" + i.ToString(), "group1")
.StartAt(runTime)
.Build();
//关联任务和触发器
sched.ScheduleJob(job, trigger);
}
//开始任务
sched.Start();
<configuration>
<configSections>
<section name="quartz" type="System.Configuration.NameValueSectionHandler, System, Version=1.0.5000.0,Culture=neutral, PublicKeyToken=b77a5c561934e089" />
</configSections>
<quartz>
<add key="quartz.scheduler.instanceName" value="ExampleDefaultQuartzScheduler" />
<add key="quartz.threadPool.type" value="Quartz.Simpl.SimpleThreadPool, Quartz" />
<add key="quartz.threadPool.threadCount" value="10" />
<add key="quartz.threadPool.threadPriority" value="2" />
<add key="quartz.jobStore.misfireThreshold" value="60000" />
<add key="quartz.jobStore.type" value="Quartz.Simpl.RAMJobStore, Quartz" />
</quartz>
</configuration>