1.DisallowConcurrentExceution
從字面意思來看也就是不允許并發執行
簡單的示範一下
[DisallowConcurrentExecution]
public class TestDisallowConcurrentExectionJob : IJob
{
public async Task Execute(IJobExecutionContext context)
{
await Task.Run(() =>
{
var nextTime = context.NextFireTimeUtc?.ToLocalTime();
var currentTime = DateTime.Now;
Console.WriteLine($"CurrentTime={currentTime}, FireTime={context.ScheduledFireTimeUtc?.ToLocalTime()}, NextTime={nextTime}");
});
Thread.Sleep(10000);
}
}
public class TestDisallowConcurrentExectionScheduler
{
public async static Task Test()
{
var scheduler = await StdSchedulerFactory.GetDefaultScheduler();
await scheduler.Start();
var job = JobBuilder.CreateForAsync<TestDisallowConcurrentExectionJob>()
.WithIdentity("TestDisallowConcurrentExectionJob")
.UsingJobData("myKey", "myValue")
.Build();
var trigger = TriggerBuilder.Create()
.WithSimpleSchedule(s =>
s.WithIntervalInSeconds(1)
.RepeatForever())
.Build();
await scheduler.ScheduleJob(job, trigger);
}
}
未添加特性的結果
添加特性的結果
Quartz預設是十個線程工作線程加一個排程線程,當線程在等待時,其他工作線程也會進來,而加上DiallowConcurrentExection特性時則都會處于等待狀态
原理圖
源碼解析
通過QuartzSchedulerThread中的Run方法調用AcquireNextTriggers擷取下一次的觸發器
public virtual Task<IReadOnlyCollection<IOperableTrigger>> AcquireNextTriggers(
DateTimeOffset noLaterThan,
int maxCount,
TimeSpan timeWindow,
CancellationToken cancellationToken = default)
{
lock (lockObject)
{
var result = new List<IOperableTrigger>();
// return empty list if store has no triggers.
if (timeTriggers.Count == 0)
{
return Task.FromResult<IReadOnlyCollection<IOperableTrigger>>(result);
}
var acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
var excludedTriggers = new HashSet<TriggerWrapper>();
DateTimeOffset batchEnd = noLaterThan;
while (true)
{
var tw = timeTriggers.FirstOrDefault();
if (tw == null)
{
break;
}
if (!timeTriggers.Remove(tw))
{
break;
}
if (tw.Trigger.GetNextFireTimeUtc() == null)
{
continue;
}
if (ApplyMisfire(tw))
{
if (tw.Trigger.GetNextFireTimeUtc() != null)
{
timeTriggers.Add(tw);
}
continue;
}
if (tw.Trigger.GetNextFireTimeUtc() > batchEnd)
{
timeTriggers.Add(tw);
break;
}
// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
// put it back into the timeTriggers set and continue to search for next trigger.
JobKey jobKey = tw.Trigger.JobKey;
IJobDetail job = jobsByKey[jobKey].JobDetail;
if (job.ConcurrentExecutionDisallowed)
{
if (!acquiredJobKeysForNoConcurrentExec.Add(jobKey))
{
excludedTriggers.Add(tw);
continue; // go to next trigger in store.
}
}
tw.state = InternalTriggerState.Acquired;
tw.Trigger.FireInstanceId = GetFiredTriggerRecordId();
IOperableTrigger trig = (IOperableTrigger) tw.Trigger.Clone();
if (result.Count == 0)
{
var now = SystemTime.UtcNow();
var nextFireTime = tw.Trigger.GetNextFireTimeUtc().GetValueOrDefault(DateTimeOffset.MinValue);
var max = now > nextFireTime ? now : nextFireTime;
batchEnd = max + timeWindow;
}
result.Add(trig);
if (result.Count == maxCount)
{
break;
}
}
// If we did excluded triggers to prevent ACQUIRE state due to DisallowConcurrentExecution, we need to add them back to store.
if (excludedTriggers.Count > 0)
{
foreach (var excludedTrigger in excludedTriggers)
{
timeTriggers.Add(excludedTrigger);
}
}
return Task.FromResult<IReadOnlyCollection<IOperableTrigger>>(result);
}
}
RAMJobStore中的TriggersFired方法
當添加了DisallowConcurrentExecution特性後
先從TimeTriggers中移除Trigger
再把Job添加BlockedJobs中
if (job.ConcurrentExecutionDisallowed)
{
IEnumerable<TriggerWrapper> trigs = GetTriggerWrappersForJob(job.Key);
foreach (TriggerWrapper ttw in trigs)
{
if (ttw.state == InternalTriggerState.Waiting)
{
ttw.state = InternalTriggerState.Blocked;
}
if (ttw.state == InternalTriggerState.Paused)
{
ttw.state = InternalTriggerState.PausedAndBlocked;
}
timeTriggers.Remove(ttw);
}
blockedJobs.Add(job.Key);
}
RAMJobStore中的TriggeredJobComplete方法
當Job執行完後
如果添加了DisallowConcurrentExecution特性
先移除BlockedJobs中Job 再把Trigger添加到TimeTriggers中
if (jd.ConcurrentExecutionDisallowed)
{
blockedJobs.Remove(jd.Key);
IEnumerable<TriggerWrapper> trigs = GetTriggerWrappersForJob(jd.Key);
foreach (TriggerWrapper ttw in trigs)
{
if (ttw.state == InternalTriggerState.Blocked)
{
ttw.state = InternalTriggerState.Waiting;
timeTriggers.Add(ttw);
}
if (ttw.state == InternalTriggerState.PausedAndBlocked)
{
ttw.state = InternalTriggerState.Paused;
}
}
signaler.SignalSchedulingChange(null, cancellationToken);
}
2.PersistJobDataAfterExecution
從字面意思來看也就是執行後保留作業資料
簡單示範一下
[PersistJobDataAfterExecution]
public class TestPersistJobDataAfterExecutionJob : IJob
{
public async Task Execute(IJobExecutionContext context)
{
await Task.Run(() =>
{
var myVaule = context.JobDetail.JobDataMap["myKey"];
Console.WriteLine(myVaule);
context.JobDetail.JobDataMap["myKey"] = myVaule + new Random().Next(1,10).ToString();
});
}
}
public class TestPersistJobDataAfterExcutionScheduler
{
public async static Task Test()
{
var scheduler = await StdSchedulerFactory.GetDefaultScheduler();
await scheduler.Start();
var job = JobBuilder.CreateForAsync<TestPersistJobDataAfterExecutionJob>()
.WithIdentity("TestPersistJobDataAfterExcutionJob")
.UsingJobData("myKey", "myValue")
.Build();
var trigger = TriggerBuilder.Create()
.WithSimpleSchedule(s =>
s.WithIntervalInSeconds(1)
.RepeatForever())
.Build();
await scheduler.ScheduleJob(job, trigger);
}
}
未加特性的結果
加特性的結果
原理圖
源碼解析
QuartzSchedulerThread中的Run方法
JobRunShell shell;
try
{
shell = qsRsrcs.JobRunShellFactory.CreateJobRunShell(bndle);
await shell.Initialize(qs, CancellationToken.None).ConfigureAwait(false);
}
catch (SchedulerException)
{
await qsRsrcs.JobStore.TriggeredJobComplete(trigger, bndle.JobDetail, SchedulerInstruction.SetAllJobTriggersError, CancellationToken.None).ConfigureAwait(false);
continue;
}
var threadPoolRunResult = qsRsrcs.ThreadPool.RunInThread(() => shell.Run(CancellationToken.None));
if (threadPoolRunResult == false)
{
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
Log.Error("ThreadPool.RunInThread() returned false!");
await qsRsrcs.JobStore.TriggeredJobComplete(trigger, bndle.JobDetail, SchedulerInstruction.SetAllJobTriggersError, CancellationToken.None).ConfigureAwait(false);
}
JobRunShell初始化方法
public virtual async Task Initialize(
QuartzScheduler sched,
CancellationToken cancellationToken = default)
{
qs = sched;
IJob job;
IJobDetail jobDetail = firedTriggerBundle.JobDetail;
try
{
job = sched.JobFactory.NewJob(firedTriggerBundle, scheduler);
}
catch (SchedulerException se)
{
await sched.NotifySchedulerListenersError($"An error occurred instantiating job to be executed. job= '{jobDetail.Key}'", se, cancellationToken).ConfigureAwait(false);
throw;
}
catch (Exception e)
{
SchedulerException se = new SchedulerException($"Problem instantiating type '{jobDetail.JobType.FullName}'", e);
await sched.NotifySchedulerListenersError($"An error occurred instantiating job to be executed. job= '{jobDetail.Key}'", se, cancellationToken).ConfigureAwait(false);
throw se;
}
jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);
}
SimpleJobFactory中的NewJob函數可以看出Job是無狀态的直接通過反射建立的
public virtual IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler)
{
IJobDetail jobDetail = bundle.JobDetail;
Type jobType = jobDetail.JobType;
try
{
if (log.IsDebugEnabled())
{
log.Debug($"Producing instance of Job '{jobDetail.Key}', class={jobType.FullName}");
}
return ObjectUtils.InstantiateType<IJob>(jobType);
}
catch (Exception e)
{
SchedulerException se = new SchedulerException($"Problem instantiating class '{jobDetail.JobType.FullName}'", e);
throw se;
}
}
JobRunShell中Run方法将JobExecutionContextImpl塞給了Execute方法
private JobExecutionContextImpl jec;
// Execute the job
try
{
if (log.IsDebugEnabled())
{
log.Debug("Calling Execute on job " + jobDetail.Key);
}
await job.Execute(jec).ConfigureAwait(false);
endTime = SystemTime.UtcNow();
}
JobRunShell中Run方法調用的NotifyJobStoreJobComplete函數
await qs.NotifyJobStoreJobComplete(trigger, jobDetail, instCode, cancellationToken).ConfigureAwait(false);
JobRunShell中的NotifyJobStoreJobComplete 可以看出調用了JobStore.TriggeredJobComplete
public virtual Task NotifyJobStoreJobComplete(
IOperableTrigger trigger,
IJobDetail detail,
SchedulerInstruction instCode,
CancellationToken cancellationToken = default)
{
return resources.JobStore.TriggeredJobComplete(trigger, detail, instCode, cancellationToken);
}
Quartz.NET中StdScheduleFactory如果未指定JobStore則預設RAMJobStore
進而可以看出RAMJobStore中通過TriggerJobComplete方法來檢查是否有PersistJobDataAfterExecution特性
如果有通過MemberwiseClone函數克隆出資料來再通過JobBuilder來建構一個JobDetail
if (jobDetail.PersistJobDataAfterExecution)
{
JobDataMap newData = jobDetail.JobDataMap;
if (newData != null)
{
newData = (JobDataMap) newData.Clone();
newData.ClearDirtyFlag();
}
jd = jd.GetJobBuilder().SetJobData(newData).Build();
jw.JobDetail = jd;
}
if (jd.ConcurrentExecutionDisallowed)
{
blockedJobs.Remove(jd.Key);
IEnumerable<TriggerWrapper> trigs = GetTriggerWrappersForJob(jd.Key);
foreach (TriggerWrapper ttw in trigs)
{
if (ttw.state == InternalTriggerState.Blocked)
{
ttw.state = InternalTriggerState.Waiting;
timeTriggers.Add(ttw);
}
if (ttw.state == InternalTriggerState.PausedAndBlocked)
{
ttw.state = InternalTriggerState.Paused;
}
}
signaler.SignalSchedulingChange(null, cancellationToken);
}
最終會通過JobRunShell中的Run方法中的ReturnJob方法 傳回Job
qs.RemoveInternalSchedulerListener(this);
if (jec != null)
{
if (jec.JobInstance != null)
{
qs.JobFactory.ReturnJob(jec.JobInstance);
}
jec.Dispose();
}
public virtual void ReturnJob(IJob job)
{
var disposable = job as IDisposable;
disposable?.Dispose();
}