SpringBoot2.0基于数据库集成quartz,集群部署,增删改查,实现同一定时任务同一时间多实例每次只运行一次

    科技2022-09-04  118

    本例使用quartz持久化数据库的方式,结合JPA,简单写几个web接口,演示集群状态下定时任务在页面的增删改查,验证同一定时任务同一时间执行的唯一性,避免多实例集群部署导致同一个定时任务执行多次。

    springboot版本如下,自动引用的quartz版本为2.3.0,数据库使用mysql

    <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>

    1、下载sql文件

    1.1、下载quartz对应版本的sql文件

    官方网址:http://www.quartz-scheduler.org/downloads/

    1.2、导入sql文件到数据库

    quartz-2.3.0-distribution\quartz-2.3.0-SNAPSHOT\src\org\quartz\impl\jdbcjobstore 下载后解压文件,通过以上路径找到对应的sql文件,导入数据库 导入成功可以看到新增如下表

    1.3、各表代表意义

    2、pom依赖

    <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency> <dependency> <groupId>org.Springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <!--mysql链接依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> <exclusions> <exclusion> <groupId>org.apache.tomcat</groupId> <artifactId>tomcat-jdbc</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.zaxxer</groupId> <artifactId>HikariCP-java7</artifactId> <version>2.4.13</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>

    3、添加配置文件

    3.1、application.yml

    server: port: 8081 logging: file: logback.xml spring: dataSource: url: jdbc:mysql://localhost:3306/quartz?characterEncoding=utf-8&autoReconnect=true&allowMultiQueries=true&useSSL=true&zeroDateTimeBehavior=convertToNull&serverTimezone=UTC username: root password: ****** driver-class-name: com.mysql.cj.jdbc.Driver type: com.zaxxer.hikari.HikariDataSource hikari: minimum-idle: 5 maximum-pool-size: 15 idle-timeout: 30000 pool-name: DatebookHikariCP max-lifetime: 1800000 connection-timeout: 30000 connection-test-query: 'SELECT 1' jpa: properties: hibernate: hbm2ddl: auto: validate dialect: org.hibernate.dialect.MySQL5InnoDBDialect format_sql: true show-sql: true

    3.2、quartz.properties

    #quartz集群配置 #调度标识名 集群中每一个实例都必须使用相同的名称 org.quartz.scheduler.instanceName=DefaultQuartzScheduler #ID设置为自动获取 每一个必须不同 org.quartz.scheduler.instanceId=AUTO org.quartz.scheduler.makeSchedulerThreadDaemon=true #线程池的实现类(一般使用SimpleThreadPool即可满足需求) org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool #指定在线程池里面创建的线程是否是守护线程 org.quartz.threadPool.makeThreadsDaemons=true #指定线程数,至少为1(无默认值) org.quartz.threadPool.threadCount:20 #设置线程的优先级(最大为java.lang.Thread.MAX_PRIORITY 10,最小为Thread.MIN_PRIORITY 1,默认为5) org.quartz.threadPool.threadPriority:5 #数据保存方式为数据库持久化 org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX #数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以满足大部分数据库 org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate #表的前缀,默认QRTZ_ org.quartz.jobStore.tablePrefix=QRTZ_ #是否加入集群 org.quartz.jobStore.isClustered=true # 信息保存时间 默认值60秒 org.quartz.jobStore.misfireThreshold=25000

    3.3、日志配置文件logback.xml

    <?xml version="1.0" encoding="UTF-8"?> <configuration scan="true" scanPeriod="30 seconds"> <property name="LOG_HOME" value="/data/button/springboot" /> <!-- 控制台输出 --> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!--格式化输出,%d:日期;%thread:线程名;%-5level:级别,从左显示5个字符宽度;%msg:日志消息;%n:换行符 --> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} -%msg%n</pattern> </encoder> </appender> <!-- 按照每天生成日志文件 --> <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/springboot_button.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--日志文件输出的文件名 --> <FileNamePattern> ${LOG_HOME}/springboot_button.log.%d{yyyy-MM-dd}.%i.log </FileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <!-- 日志文件最大size --> <maxFileSize>100MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> </rollingPolicy> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!--格式化输出,%d:日期;%thread:线程名;%-5level:级别,从左显示5个字符宽度;%msg:日志消息;%n:换行符 --> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} -%msg%n</pattern> </encoder> </appender> <!-- 日志输出级别 --> <root level="ERROR"> <appender-ref ref="STDOUT" /> <appender-ref ref="FILE" /> </root> </configuration>

    4、代码

    4.1、添加quartz配置类

    类QuartzConfigure

    import org.quartz.Trigger; import org.quartz.spi.JobFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.PropertiesFactoryBean; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import javax.sql.DataSource; import java.io.IOException; import java.util.Properties; /** * @author tom * @version V1.0 * @date 2020/10/5 14:51 */ @Configuration public class QuartzConfigure { private static final String QUARTZ_CONFIG = "/quartz.properties"; @Autowired @Qualifier(value = "dataSource") private DataSource dataSource; /** * 读取Quartz配置属性 * @return * @throws IOException */ @Bean public Properties quartzProperties() throws IOException { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); propertiesFactoryBean.setLocation(new ClassPathResource(QUARTZ_CONFIG)); propertiesFactoryBean.afterPropertiesSet(); return propertiesFactoryBean.getObject(); } /** * JobFactory与schedulerFactoryBean中的JobFactory相互依赖,注意bean的名称 * 在这里为JobFactory注入了Spring上下文 * * @param applicationContext * @return */ @Bean public JobFactory buttonJobFactory(ApplicationContext applicationContext) { AutoWiredSpringBeanToJobFactory jobFactory = new AutoWiredSpringBeanToJobFactory(); jobFactory.setApplicationContext(applicationContext); return jobFactory; } /** * * @param jobFactory 为SchedulerFactory配置JobFactory * @param cronJobTrigger * @return * @throws IOException */ @Bean public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, Trigger... cronJobTrigger) throws IOException { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setJobFactory(jobFactory); factory.setOverwriteExistingJobs(true); factory.setAutoStartup(true); // 设置自行启动 factory.setQuartzProperties(quartzProperties()); factory.setTriggers(cronJobTrigger); factory.setDataSource(dataSource);// 使用自定义的dataSource替换quartz的dataSource return factory; } }

    类AutoWiredSpringBeanToJobFactory

    import org.quartz.spi.TriggerFiredBundle; import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.scheduling.quartz.SpringBeanJobFactory; /** * @author tom * @version V1.0 * @date 2020/10/5 14:54 */ public class AutoWiredSpringBeanToJobFactory extends SpringBeanJobFactory implements ApplicationContextAware { private transient AutowireCapableBeanFactory beanFactory; @Override public void setApplicationContext(final ApplicationContext context) { beanFactory = context.getAutowireCapableBeanFactory(); } @Override protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception { final Object job = super.createJobInstance(bundle); beanFactory.autowireBean(job); return job; } }

    4.2、model类

    import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.Id; /** * @author tom * @version V1.0 * @date 2020/10/5 18:32 */ @Entity @Data @NoArgsConstructor @AllArgsConstructor public class ScheduleInfo { @Id @GeneratedValue private Long id; private String jobDetailName; private String jobDetailGroup; private String triggerName; private String triggerGroup; private Integer executeCount; public ScheduleInfo(String jobDetailName,String jobDetailGroup,String triggerName, String triggerGroup,Integer executeCount){ this.jobDetailName = jobDetailName; this.jobDetailGroup = jobDetailGroup; this.triggerName = triggerName; this.triggerGroup = triggerGroup; this.executeCount = executeCount; } } import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @author tom * @version V1.0 * @date 2020/10/4 15:18 */ @Data @NoArgsConstructor @AllArgsConstructor public class Response<T> { Integer code; String message; T result; public Response (String message,T result){ this.code = 200; this.message = message; this.result = result; } }

    4.3、数据库操作类

    import com.example.quartzdemo.model.ScheduleInfo; import org.springframework.data.jpa.repository.JpaRepository; /** * @author tom * @version V1.0 * @date 2020/10/5 18:38 */ public interface QuartzInfoRepository extends JpaRepository<ScheduleInfo,Long> { ScheduleInfo findByJobDetailName(String jobDetailName); void deleteByJobDetailName(String jobDetailName); }

    4.4、quartz增删改查类

    import com.example.quartzdemo.dao.QuartzInfoRepository; import com.example.quartzdemo.model.ScheduleInfo; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.quartz.impl.matchers.GroupMatcher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Set; /** * 实现定时任务的增删改查 * @author tom * @version V1.0 * @date 2020/10/4 14:59 */ @Component public class QuartzManager { //任务组名 public static final String JOB_GROUP_NAME = "cron_group"; //触发器组名 public static final String TRIGGER_GROUP_NAME = "cron_trigger"; //触发器组名 public static final String DEFAULT_GROUP_NAME = "DEFAULT"; //调度工厂 @Autowired private SchedulerFactoryBean schedulerFactory; @Autowired QuartzInfoRepository quartzInfoRepository; public Scheduler getScheduled(){ return schedulerFactory.getScheduler(); } /** * 添加定时任务 * @param jobName 任务名称 * @param jobClazz 任务job * @param cron 执行时间之cron表达式 * @param params 任务job参数 */ public void addJob(String jobName , Class jobClazz , String cron, HashMap<String,String> params){ try { //获取Scheduler Scheduler scheduler = getScheduled(); // 任务名,任务组,任务执行类 JobDetail jobDetail= JobBuilder.newJob(jobClazz).withIdentity(jobName, JOB_GROUP_NAME).build(); // 传参(可选) JobDataMap jobDataMap = jobDetail.getJobDataMap(); if(params != null ){ for (String key:params.keySet()){ jobDataMap.put(key, params.get(key)); } } // 触发器 TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger(); // 触发器名,触发器组 triggerBuilder.withIdentity(jobName, TRIGGER_GROUP_NAME); triggerBuilder.startNow(); // 触发器时间设定 triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron)); // 创建Trigger对象 CronTrigger trigger = (CronTrigger) triggerBuilder.build(); // 调度容器设置JobDetail和Trigger scheduler.scheduleJob(jobDetail, trigger); // 启动 if (!scheduler.isShutdown()) { scheduler.start(); } quartzInfoRepository.save(new ScheduleInfo(jobName,JOB_GROUP_NAME, jobName,TRIGGER_GROUP_NAME,0)); } catch (SchedulerException e) { e.printStackTrace(); } } /** * 删除定时任务 * @param jobName 任务名称 */ public void removeJob(String jobName) { try { Scheduler scheduler = getScheduled(); TriggerKey triggerKey = TriggerKey.triggerKey(jobName, TRIGGER_GROUP_NAME); scheduler.pauseTrigger(triggerKey);// 停止触发器 scheduler.unscheduleJob(triggerKey);// 移除触发器 scheduler.deleteJob(JobKey.jobKey(jobName, JOB_GROUP_NAME));// 删除任务 System.out.println("删除定时任务"+jobName); ScheduleInfo quartzInfo = quartzInfoRepository.findByJobDetailName(jobName); if(quartzInfo != null){ quartzInfoRepository.deleteById(quartzInfo.getId()); } } catch (SchedulerException e) { e.printStackTrace(); } } /** * 删除定时任务 * @param jobName 任务名称 */ public void removeDefaultJob(String jobName) { try { Scheduler scheduler = getScheduled(); TriggerKey triggerKey = TriggerKey.triggerKey(jobName, DEFAULT_GROUP_NAME); scheduler.pauseTrigger(triggerKey);// 停止触发器 scheduler.unscheduleJob(triggerKey);// 移除触发器 scheduler.deleteJob(JobKey.jobKey(jobName, DEFAULT_GROUP_NAME));// 删除任务 } catch (SchedulerException e) { e.printStackTrace(); } } /** * 启动任务 */ public void startJobs() { try { Scheduler scheduler = getScheduled(); scheduler.start(); } catch (SchedulerException e) { e.printStackTrace(); } } /** * 关闭任务 */ public void shutdownJobs() { try { Scheduler scheduler = getScheduled(); if (!scheduler.isShutdown()) { scheduler.shutdown(); } } catch (SchedulerException e) { e.printStackTrace(); } } /** * 获取所有任务 */ public List<String> getAllJob(){ List<String> jobs = new ArrayList<>(); try { Set<JobKey> jobKeys = getScheduled().getJobKeys(GroupMatcher.anyGroup()); for(JobKey jobKey:jobKeys){ jobs.add(jobKey.getName()); } } catch (SchedulerException e) { e.printStackTrace(); } return jobs; } }

    4.5、定义Job

    import com.example.quartzdemo.dao.QuartzInfoRepository; import com.example.quartzdemo.model.ScheduleInfo; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; /** * 通过web页面创建的定时任务job * @author tom * @version V1.0 * @date 2020/10/4 15:30 */ @Component public class WebJob implements Job { @Autowired QuartzInfoRepository quartzInfoRepository; @Override public void execute(JobExecutionContext context) throws JobExecutionException { JobDataMap dataMap = context.getMergedJobDataMap(); HashMap<String,String> map = new HashMap<>(); for(Object o:dataMap.keySet()){ map.put((String)o,dataMap.getString((String)o)); } String jobName = context.getJobDetail().getKey().getName(); ScheduleInfo quartzInfo = quartzInfoRepository.findByJobDetailName(jobName); if(quartzInfo != null){ quartzInfo.setExecuteCount(quartzInfo.getExecuteCount()+1); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String nowDate = df.format(new Date()); System.out.println(nowDate+ " my web job prams: "+map.toString()+quartzInfo.getExecuteCount()); quartzInfoRepository.save(quartzInfo); } } }

    4.6、controller接口类

    import com.example.quartzdemo.job.WebJob; import com.example.quartzdemo.model.Response; import com.example.quartzdemo.quartzManager.QuartzManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; /** * 定时任务的增删查 * * @author tom * @version V1.0 * @date 2020/10/4 14:50 */ @RestController @RequestMapping("/quartz") public class QuartzController { @Autowired QuartzManager quartzManager; /** * 添加定时任务 * * @param jobName * @return */ @GetMapping("/addJob") public Response addJob(String jobName) { String cron = "0/3 * * * * ?"; HashMap<String, String> map = new HashMap<>(); map.put("jobName", jobName); quartzManager.addJob(jobName, WebJob.class, cron, map); return new Response("job添加成功", ""); } /** * 删除定时任务 * * @param jobName * @return */ @GetMapping("/deleteJob") public Response deleteJob(String jobName) { quartzManager.removeJob(jobName); return new Response("job删除成功", ""); } /** * 删除默认定时任务 * * @param jobName * @return */ @GetMapping("/removeDefaultJob") public Response removeDefaultJob(String jobName) { quartzManager.removeDefaultJob(jobName); return new Response("job删除成功", ""); } /** * 查询所有定时任务 * * @param jobName * @return */ @GetMapping("/getAllJob") public Response getAllJob(String jobName) { return new Response("成功获取所有job", quartzManager.getAllJob()); } }

    5、启动项目看看效果

    复制2个项目出来,三个项目分别改一下端口为8081、8082、8083,依次启动项目 我们查一下所有定时任务发现为空 http://localhost:8081/quartz/getAllJob 添加一个定时任务 http://localhost:8081/quartz/addJob?jobName=JsonTom888 我们发现定时任务正常运行,输出如下结果,这个时候另外两个服务并没有任何输出,这个时候我们并不能确定定时任务集群状态满足了我们的要求。 继续添加一个定时任务 http://localhost:8081/quartz/addJob?jobName=Json888 我们发现定时任务正常启动,通过数字我们能够确认每次任务只在一个实例上运行了一次,但是运气不佳,还是只在同一个实例上运行 我们尝试删除一个定时任务看看 http://localhost:8081/quartz/addJob?jobName=Json888 这时我们发现同时在三个实例上输出了如下内容,说明quartz自动进行了负载均衡策略,通过每一行的时间和后面的数字,我们发现同一定时任务同一时间只在一个实例上运行了一次,符合我们集群状态下的预想。 总结:我们可以添加更多的定时任务看看效果,每次结果在每个实例上输出的可能不一样,但是我们能看出同一定时任务同一时间只在一个实例上运行一次,符合我们集群状态下的预想。

    项目代码地址 https://github.com/JsonTom888/scheduledAndQuartz/tree/master/quartzcluster

    Processed: 0.010, SQL: 10