定時任務總會遇到任務重疊執行的情況,比如一個任務1分鐘執行一次,而任務的執行時間超過了1分鐘,這樣就會有兩個相同任務併發執行了。有時候我們是允許這種情況的發生的,比如任務執行的代碼是冪等的,而有時候我們可能考慮到一些情況是不允許這種事情發生的。 在實際場景中,我們定時任務調度使用quartz來實現觸 ...
定時任務總會遇到任務重疊執行的情況,比如一個任務1分鐘執行一次,而任務的執行時間超過了1分鐘,這樣就會有兩個相同任務併發執行了。有時候我們是允許這種情況的發生的,比如任務執行的代碼是冪等的,而有時候我們可能考慮到一些情況是不允許這種事情發生的。
在實際場景中,我們定時任務調度使用quartz來實現觸發的,定時任務的業務代碼分佈在各個應用,用soa調用。
對於quartz來說,官方文檔上明確對這種需求有指定的解決辦法,就是使用註解@DisallowConcurrentExecution;
意思是:禁止併發執行多個相同定義的JobDetail,就是我們想要的。
下麵一個實現的例子:可以對比兩個job:AllowConcurrentExecutionTestJob,DisallowConcurrentExecutionTestJob
public class AllowConcurrentExecutionTestJob implements Job { public AllowConcurrentExecutionTestJob() { } public void execute(JobExecutionContext context) throws JobExecutionException { try { List<JobExecutionContext> list = context.getScheduler().getCurrentlyExecutingJobs(); for(JobExecutionContext jobExecutionContext : list){ // job內部獲取容器內變數 System.out.println(jobExecutionContext.getJobDetail().getKey().getName()); } Thread.sleep(4000); } catch (SchedulerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Hello World! AllowConcurrentExecutionTestJob is executing."); } }
@DisallowConcurrentExecution public class DisallowConcurrentExecutionTestJob implements org.quartz.Job { public DisallowConcurrentExecutionTestJob() { } public void execute(JobExecutionContext context) throws JobExecutionException { try { List<JobExecutionContext> list = context.getScheduler().getCurrentlyExecutingJobs(); for(JobExecutionContext jobExecutionContext : list){ // job內部獲取容器內變數 System.out.println(jobExecutionContext.getJobDetail().getKey().getName()); } Thread.sleep(4000); } catch (SchedulerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Hello World! DisallowConcurrentExecutionTestJob is executing."); } }
測試代碼:
public class QuartzTest { public static void main(String[] args) throws InterruptedException { try { // Grab the Scheduler instance from the Factory Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); // and start it off scheduler.start(); // define the job and tie it to our HelloJob class JobDetail job = JobBuilder.newJob(DisallowConcurrentExecutionTestJob.class) .withIdentity("job1", "group1") .build(); // Trigger the job to run now, and then repeat every 40 seconds Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("trigger1", "group1") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(1) .repeatForever()) .build(); // define the job and tie it to our HelloJob class JobDetail job2 = JobBuilder.newJob(AllowConcurrentExecutionTestJob.class) .withIdentity("job2", "group1") .build(); // Trigger the job to run now, and then repeat every 40 seconds Trigger trigger2 = TriggerBuilder.newTrigger() .withIdentity("trigger2", "group1") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(1) .repeatForever()) .build(); // Tell quartz to schedule the job using our trigger scheduler.scheduleJob(job2, trigger2); // scheduler.scheduleJob(job2, trigger2); // wait trigger Thread.sleep(20000); scheduler.shutdown(); } catch (SchedulerException se) { se.printStackTrace(); } } }
我們還發現在job的execute里傳參是JobExecutionContext,它可以讓我們拿到正在執行的job的信息。所以我想在job里直接判斷一下就可以知道有沒有已經在執行的相同job。
public class SelfDisAllowConExeTestJob implements org.quartz.Job{ public void execute(JobExecutionContext context) throws JobExecutionException { try { List<JobExecutionContext> list = context.getScheduler().getCurrentlyExecutingJobs(); Set<String> jobs = new HashSet<String>(); int i=0; for (JobExecutionContext jobExecutionContext : list){ if(context.getJobDetail().getKey().getName().equals(jobExecutionContext.getJobDetail().getKey().getName())){ i++; } } if(i>1){ System.out.printf("self disallow "); return; } Thread.sleep(4000); } catch (SchedulerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Hello World! SelfDisAllowConExeTestJob is executing."); } }
測試代碼:
public class OuartzSelfMapTest { public static void main(String[] args) throws InterruptedException { try { // Grab the Scheduler instance from the Factory Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); // and start it off scheduler.start(); // define the job and tie it to our HelloJob class JobDetail job = JobBuilder.newJob(SelfDisAllowConExeTestJob.class) .withIdentity("job1", "group1") .build(); // Trigger the job to run now, and then repeat every 40 seconds Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("trigger1", "group1") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(1) .repeatForever()) .build(); // Tell quartz to schedule the job using our trigger scheduler.scheduleJob(job, trigger); // wait trigger Thread.sleep(20000); scheduler.shutdown(); } catch (SchedulerException se) { se.printStackTrace(); } } }
我們在實際代碼中經常會結合spring,特地去看了一下MethodInvokingJobDetailFactoryBean的concurrent屬性來控制是否限制併發執行的實現:
Class<?> jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class);
/** * Extension of the MethodInvokingJob, implementing the StatefulJob interface. * Quartz checks whether or not jobs are stateful and if so, * won't let jobs interfere with each other. */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public static class StatefulMethodInvokingJob extends MethodInvokingJob { // No implementation, just an addition of the tag interface StatefulJob // in order to allow stateful method invoking jobs. }
當然,在quartz里有一個StatefulJob,方便直接繼承就實現了concurrent=false的事情了。
那麼啰嗦了這麼多,其實就是想表達,quartz里並沒有一個可以設置說是否併發的介面,而是需要自定義的job自行繼承,或使用註解來實現的。
另外,還有一個相關的註解:@PersistJobDataAfterExecution
意思是:放在JobDetail 里的JobDataMap是共用的,也就是相同任務之間執行時可以傳輸信息。很容易想到既然是共用的,那麼就會有併發的問題,就如開頭說的這個場景就會導致併發問題。所以官方文檔也特別解釋這個註解最好和@DisallowConcurrentExecution一起使用。
以下是例子:
@PersistJobDataAfterExecution public class PersistJob implements Job { public void execute(JobExecutionContext context) throws JobExecutionException { JobDataMap data = context.getJobDetail().getJobDataMap(); int i = data.getInt("P"); System.out.printf("PersistJob=>"+i); i++; data.put("P", i); } }
測試代碼:
public class PersistJobDataQuartzTest { public static void main(String[] args) throws InterruptedException { try { // Grab the Scheduler instance from the Factory Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); // and start it off scheduler.start(); JobDataMap jobDataMap = new JobDataMap(); jobDataMap.put("P",1); // define the job and tie it to our HelloJob class JobDetail job = JobBuilder.newJob(PersistJob.class) .withIdentity("job1", "group1").usingJobData(jobDataMap) .build(); // Trigger the job to run now, and then repeat every 40 seconds Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("trigger1", "group1") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(1) .repeatForever()) .build(); // Tell quartz to schedule the job using our trigger scheduler.scheduleJob(job, trigger); // wait trigger Thread.sleep(20000); scheduler.shutdown(); } catch (SchedulerException se) { se.printStackTrace(); } } }View Code
參考文檔:
https://jayvilalta.com/blog/2014/06/04/understanding-the-disallowconcurrentexecution-job-attribute/ http://www.quartz-scheduler.org/documentation/quartz-2.1.x/tutorials/tutorial-lesson-03 http://www.cnblogs.com/lnlvinso/p/4194725.html