DEV Community

Yevhen Tienkaiev
Yevhen Tienkaiev

Posted on

Spring Boot + Quartz Scheduler in Cluster mode

Quartz Scheduler in Cluster mode

I want to share my experience in setupping and configuring Quartz Scheduler in Cluster mode with Spring Boot.


Modules description

So we have two application in the project: supervisor and worker.

Supervisor  -  is the main application there you can manage jobs(schedule/remove/check statuses). Besides, I didn't find how to disable execution of jobs on master, so it also have an a "worker" capabilities.

Worker  -  simply executes scheduled jobs, nothing interesting.

I use MySQL as database for Quartz cluster. For simplicity I launch it in docker by using docker compose.

The config files…

Spring config file for supervisor contains connection string to the database and credentials and port that will be used for REST API.



endpoints:
    jmx:
        unique-names: true
server:
    port: 8080
spring:
    datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        password: 12345
        url: jdbc:mysql://localhost/test?createDatabaseIfNotExist=true&useSSL=false&allowPublicKeyRetrieval=true
        username: root
    liquibase:
        change-log: classpath:db/changelog/db.changelog-master.xml


Enter fullscreen mode Exit fullscreen mode

application.yml

Also it contains path to the file with rules for initial creation of tables in MySQL. The SQL tables creation scripts are from quartz-2.2.3-distribution.tar.gz/quartz-2.2.3-distribution.tar

Spring config file for worker are almost same — it not contains info for initial table creation and different REST API port.

Quartz config files for supervisor and worker are also almost identical:



#============================================================================
# Configure Main Scheduler Properties
#============================================================================
org.quartz.scheduler.instanceName=spring-boot-quartz-cluster-example
org.quartz.scheduler.instanceId=AUTO

#============================================================================
# Configure ThreadPool
#============================================================================
org.quartz.threadPool.threadCount=1

#============================================================================
# Configure JobStore
#============================================================================
#org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
# In new spring use LocalDataSourceJobStore https://github.com/spring-projects/spring-framework/issues/27709
org.quartz.jobStore.class=org.springframework.scheduling.quartz.LocalDataSourceJobStore
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.misfireThreshold=60000
org.quartz.jobStore.tablePrefix=QRTZ_

org.quartz.jobStore.isClustered=true
org.quartz.jobStore.clusterCheckinInterval=20000


Enter fullscreen mode Exit fullscreen mode

the difference is only in how many thread pool are — org.quartz.threadPool.threadCount

The code…

So for first we need autowiring support for jobs, and spring still not have support for this: SPR-14471, SPR-9698. Really, vote for this issues!

Fortunately the workaround is available and I say many thanks to this guy and still don’t understand why it’s not in spring yet…

TestJob1



package com.github.hronom.spring.boot.quartz.cluster.example.common.job;

import com.github.hronom.spring.boot.quartz.cluster.example.common.service.TestService;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;

@DisallowConcurrentExecution
public class TestJob1 implements Job {

    @Autowired
    private TestService testService;

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            String id = jobExecutionContext.getJobDetail().getKey().getName();
            testService.run(id);
        } catch (Exception e) {
            throw new JobExecutionException(e);
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

It just call method of the autowired local class that currently runs inside instance where job executed. Because service can throw exception, I add capturing of all exception and wrap it in JobExecutionException.

TestServiceImpl

The service are also simple - it emulates long running process and randomly throws exceptions:



package com.github.hronom.spring.boot.quartz.cluster.example.supervisor.services;

import com.github.hronom.spring.boot.quartz.cluster.example.common.service.TestService;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Service;

import java.util.Random;
import java.util.concurrent.TimeUnit;

@Service
public class TestServiceImpl implements TestService {
    private final Log logger = LogFactory.getLog(getClass());

    private final Random random = new Random();

    public void run(String id) throws Exception {
        logger.info("Running job on supervisor, job id " + id);
        if (random.nextInt(3) == 1) {
            throw new Exception("Randomly generated test exception on supervisor");
        }
        try {
            Thread.sleep(TimeUnit.MINUTES.toMillis(1));
        } catch (InterruptedException e) {
            logger.error("Error", e);
        }
        logger.info("Completed job on supervisor, job id " + id);
    }
}


Enter fullscreen mode Exit fullscreen mode

Throwing of exception added to check mechanic how Quartz handle exceptions in jobs.

JobsListenerService

Each instance of supervisor/worker has a jobs listener to listen events raised by local execution of jobs.



package com.github.hronom.spring.boot.quartz.cluster.example.common.service;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;
import org.springframework.stereotype.Service;

@Service
public class JobsListenerService implements JobListener {
    private final Log logger = LogFactory.getLog(getClass());

    @Override
    public String getName() {
        return "Main Listener";
    }

    @Override
    public void jobToBeExecuted(JobExecutionContext context) {
        logger.info("Job to be executed " + context.getJobDetail().getKey().getName());
    }

    @Override
    public void jobExecutionVetoed(JobExecutionContext context) {
        logger.info("Job execution vetoed " + context.getJobDetail().getKey().getName());
    }

    @Override
    public void jobWasExecuted(
        JobExecutionContext context, JobExecutionException jobException
    ) {
        logger.info(
            "Job was executed " +
            context.getJobDetail().getKey().getName() +
            (jobException != null ? ", with error" : "")
        );
    }
}


Enter fullscreen mode Exit fullscreen mode

SchedulerConfig



package com.github.hronom.spring.boot.quartz.cluster.example.supervisor.configs;

import com.github.hronom.spring.boot.quartz.cluster.example.common.spring.AutowiringSpringBeanJobFactory;
import com.github.hronom.spring.boot.quartz.cluster.example.common.service.JobsListenerService;

import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import java.io.IOException;
import java.util.Properties;

import javax.sql.DataSource;

import liquibase.integration.spring.SpringLiquibase;

@Configuration
@EnableAsync
@EnableScheduling
public class SchedulerConfig {

    @Bean
    public JobFactory jobFactory(
        ApplicationContext applicationContext,
        // Injecting SpringLiquibase to ensure liquibase is already initialized and created the Quartz tables
        SpringLiquibase springLiquibase
    ) {
        AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
        jobFactory.setApplicationContext(applicationContext);
        return jobFactory;
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource, JobFactory jobFactory, JobsListenerService jobsListenerService)
        throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setDataSource(dataSource);
        factory.setJobFactory(jobFactory);
        factory.setQuartzProperties(quartzProperties());
        factory.setGlobalJobListeners(jobsListenerService);
        // https://medium.com/@rudra.ramesh/use-following-code-in-supervisor-app-while-creating-schedulerfactorybean-object-now-supervisor-fd2f95365350
        // If you need to disable launching of jobs on supervisor use this:
        //factory.setAutoStartup(false);
        return factory;
    }

    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }
}


Enter fullscreen mode Exit fullscreen mode

As you can see we create custom JobFactoryAutowiringSpringBeanJobFactory for autowiring support in jobs.

Also we create SchedulerFactoryBean that sets:

  • data source(MySQL)
  • custom job factory for autowiring
  • Quartz properties from config file
  • jobs global listener(listener this listen for events of the local scheduler)

JobsService

This service contains methods to manage jobs on the supervisor, worker doesn’t has such service.



package com.github.hronom.spring.boot.quartz.cluster.example.supervisor.services;

import com.github.hronom.spring.boot.quartz.cluster.example.common.job.TestJob1;
import com.github.hronom.spring.boot.quartz.cluster.example.supervisor.controllers.JobStatus;

import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.utils.Key;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;

import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;

@Service
public class JobsService {
    private final String groupName = "normal-group";

    private final Scheduler scheduler;

    @Autowired
    public JobsService(SchedulerFactoryBean schedulerFactory) {
        this.scheduler = schedulerFactory.getScheduler();
    }

    public List<String> addNewJobs(int jobs) throws SchedulerException {
        LinkedList<String> list = new LinkedList<>();
        for (int i = 0; i < jobs; i++) {
            list.add(addNewJob());
        }
        return list.stream().sorted(Comparator.naturalOrder()).collect(Collectors.toList());
    }

    public String addNewJob() throws SchedulerException {
        String id = UUID.randomUUID().toString();

        JobDetail job =
            newJob(TestJob1.class)
                .withIdentity(id, groupName)
                // http://www.quartz-scheduler.org/documentation/quartz-2.2.x/configuration/ConfigJDBCJobStoreClustering.html
                // https://stackoverflow.com/a/19270566/285571
                .requestRecovery(true)
                .build();

        Trigger trigger =
            newTrigger()
                .withIdentity(id + "-trigger", groupName)
                .startNow()
                .withSchedule(
                    simpleSchedule().withIntervalInSeconds(30)
                )
                .build();

        scheduler.scheduleJob(job, trigger);

        return id;
    }

    public boolean deleteJob(String id) throws SchedulerException {
        JobKey jobKey = new JobKey(id, groupName);
        return scheduler.deleteJob(jobKey);
    }

    public List<String> getJobs() throws SchedulerException {
        return scheduler
            .getJobKeys(GroupMatcher.jobGroupEquals(groupName))
            .stream()
            .map(Key::getName)
            .sorted(Comparator.naturalOrder())
            .collect(Collectors.toList());
    }

    /**
     * Check realization was inspired by https://stackoverflow.com/a/31479434/285571
     */
    public List<JobStatus> getJobsStatuses() throws SchedulerException {
        LinkedList<JobStatus> list = new LinkedList<>();
        for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))) {
            JobDetail jobDetail = scheduler.getJobDetail(jobKey);
            List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobDetail.getKey());
            for (Trigger trigger : triggers) {
                Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                if (Trigger.TriggerState.COMPLETE.equals(triggerState)) {
                    list.add(new JobStatus(jobKey.getName(), true));
                } else {
                    list.add(new JobStatus(jobKey.getName(), false));
                }
            }
        }
        list.sort(Comparator.comparing(o -> o.id));
        return list;
    }
}


Enter fullscreen mode Exit fullscreen mode

For test purposes we use method addNewJobs to add new jobs in batch mode(by default we add 10 jobs)

The method addNewJob adds new jobs to scheduler by doing next things:

  • Create job detail there we acquire job id and group name. Also we set that job must be recoverable. The recoverability of job means that if one node in cluster fails/crashes while processing job, the job will be processed on other alive node. More description about this here and here.
  • Creating trigger with interval 30 sec. We want to fire trigger only once.

Also we have other interesting method getJobsStatuses. It helps check status of jobs across all cluster. For doing that we retrieve all available jobs details and check the state of trigger for this job details. So if trigger state is COMPLETE this means that trigger has already fired and job now executing.


This realization was inspired by: https://stackoverflow.com/a/31479434/285571

That’s all, full source code available at GitHub: https://github.com/Hronom/spring-boot-quartz-cluster-example

Also I want to say thanks to post that gives me a good starting point: http://www.opencodez.com/java/quartz-scheduler-with-spring-boot.htm

Top comments (1)

Collapse
 
hendisantika profile image
Hendi Santika

Nice jobs