千家信息网

Spring Boot集成quartz如何实现定时任务并支持切换任务数据源

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本篇内容介绍了"Spring Boot集成quartz如何实现定时任务并支持切换任务数据源"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况
千家信息网最后更新 2025年01月24日Spring Boot集成quartz如何实现定时任务并支持切换任务数据源

本篇内容介绍了"Spring Boot集成quartz如何实现定时任务并支持切换任务数据源"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

org.quartz实现定时任务并自定义切换任务数据源

在工作中经常会需要使用到定时任务处理各种周期性的任务,org.quartz是处理此类定时任务的一个优秀框架。随着项目一点点推进,此时我们并不满足于任务仅仅是定时执行,我们还想要对任务进行更多的控制,随时能对任务进行人为干预,就需要对quartz有更深入的了解。而随着微服务的流行,项目中多数据源的情况也越来越常见,在定时任务中集成多数据源切换的功能也需要集成进来。

集成quartz实现定时任务

集成quartz实现定时任务

quartz中实现定时任务需要了解的基本概念

Job

通过实现Job类,在实现方法中写我们具体想要定时任务完成的工作,然后交给quartz管理。

JobDetail

Job只负责实现具体任务,所以还需要借助JobDetail来存储一些描述Job的基本信息。

Quartz JobBuilder

为构造JobDetail实体提供的builder-style API。你可以这样使用它来构建一个JobDetail

@Beanpublic JobDetail jobDetail() { return JobBuilder.newJob().ofType(SampleJob.class) .storeDurably() .withIdentity("Qrtz_Job_Detail") .withDescription("Invoke Sample Job service...") .build();}

Spring JobDetailFactoryBean

Spring中配置JobDetail的方式:

@Beanpublic JobDetailFactoryBean jobDetail() { JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean(); jobDetailFactory.setJobClass(SampleJob.class); jobDetailFactory.setDescription("Invoke Sample Job service..."); jobDetailFactory.setDurability(true); return jobDetailFactory;}

Trigger

触发器,代表一个调度参数的配置,什么时候去调度:

@Beanpublic Trigger trigger(JobDetail job) { return TriggerBuilder.newTrigger().forJob(job) .withIdentity("Qrtz_Trigger") .withDescription("Sample trigger") .withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1)) .build();}

Scheduler

调度器,通过JobTrigger来注册一个调度器:

@Beanpublic Scheduler scheduler(Trigger trigger, JobDetail job) { StdSchedulerFactory factory = new StdSchedulerFactory(); factory.initialize(new ClassPathResource("quartz.properties").getInputStream()); Scheduler scheduler = factory.getScheduler(); scheduler.setJobFactory(springBeanJobFactory()); scheduler.scheduleJob(job, trigger); scheduler.start(); return scheduler;}

给系统添加一个Job

quartzJob就是我们需要去执行的任务,由Scheduler调度器负责调度任务们依靠制定好的Trigger来定时执行任务。

因此首先我们需要结合以上基础给系统添加一个Job。

addJob

    public void addJob(BaseJob job) throws SchedulerException {        /** 创建JobDetail实例,绑定Job实现类        * JobDetail 表示一个具体的可执行的调度程序,job是这个可执行调度程序所要执行的内容        * 另外JobDetail还包含了这个任务调度的方案和策略**/        // 指明job的名称,所在组的名称,以及绑定job类        JobDetail jobDetail = JobBuilder.newJob(job.getBeanClass())                .withIdentity(job.getJobKey())                .withDescription(job.getDescription())                .usingJobData(job.getDataMap())                .build();        /**         * Trigger代表一个调度参数的配置,什么时候去调度         */        //定义调度触发规则, 使用cronTrigger规则        Trigger trigger = TriggerBuilder.newTrigger()                .withIdentity(job.getJobName(),job.getJobGroup())                .withSchedule(CronScheduleBuilder.cronSchedule(job.getCron_Expression()))                .startNow()                .build();        //将任务和触发器注册到任务调度中去        scheduler.scheduleJob(jobDetail,trigger);        //判断调度器是否启动        if(!scheduler.isStarted()){            scheduler.start();        }        log.info(String.format("定时任务:%s.%s-已添加到调度器!", job.getJobGroup(),job.getJobName()));    }

首先需要定义好我们的Job,之后通过Job初始化JobDetailTrigger,最后将JobDetailTrigger注册到调度器中。

BaseJob

Job的结构如下:

public abstract class BaseJob implements Job,Serializable {    private static final long serialVersionUID = 1L;    private static final String JOB_MAP_KEY = "self";    /**     * 任务名称     */    private String jobName;    /**     * 任务分组     */    private String jobGroup;    /**     * 任务状态 是否启动任务     */    private String jobStatus;    /**     * cron表达式     */    private String cronExpression;    /**     * 描述     */    private String description;    /**     * 任务执行时调用哪个类的方法 包名+类名     */    private Class beanClass = this.getClass();    /**     * 任务是否有状态     */    private String isConcurrent;    /**     * Spring bean     */    private String springBean;    /**     * 任务调用的方法名     */    private String methodName;     /**     * 该任务所使用的数据源     */    private String dataSource = DataSourceEnum.DB1.getName();    /**     * 为了将执行后的任务持久化到数据库中     */    @JsonIgnore    private JobDataMap dataMap = new JobDataMap();    public JobKey getJobKey(){        return JobKey.jobKey(jobName, jobGroup);// 任务名称和组构成任务key    }    ...}

可以看到Job中定义了任务的一些基本信息,重点关注其中的dataSourcedataMap属性。其中dataSource是任务所使用的数据源,并给了一个默认值;由于任务在添加后会持久化到数据库中,之后解析任务就会用到dataMap

SchedulerConfig

在添加Job的时候,JobDetailTrigger都是通过关键字new生成的,而调度器Scheduler则需要放在容器中维护。

@Configuration@Orderpublic class SchedulerConfig {    @Autowired    private MyJobFactory myJobFactory;    @Value("${spring.profiles.active}")    private String profile;    /*     * 通过SchedulerFactoryBean获取Scheduler的实例     */    @Bean(name = "scheduler")    public Scheduler scheduler() throws Exception {        return schedulerFactoryBean().getScheduler();    }        @Bean    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {        SchedulerFactoryBean factory = new SchedulerFactoryBean();        factory.setOverwriteExistingJobs(true);        // 延时启动        factory.setStartupDelay(20);        // 加载quartz数据源配置        factory.setQuartzProperties(quartzProperties());        // 自定义Job Factory,用于Spring注入        factory.setJobFactory(myJobFactory);        /*********全局监听器配置************/        JobListener myJobListener = new SchedulerListener();        factory.setGlobalJobListeners(myJobListener);//直接添加为全局监听器        return factory;    }    @Bean    public Properties quartzProperties() throws IOException {        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();        if (Util.PRODUCT.equals(profile)) {//正式环境            System.out.println("正式环境quartz配置");            propertiesFactoryBean.setLocation(new ClassPathResource("/quartz-prod.properties"));        } else {            System.out.println("测试环境quartz配置");            propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));        }        //在quartz.properties中的属性被读取并注入后再初始化对象        propertiesFactoryBean.afterPropertiesSet();        return propertiesFactoryBean.getObject();    }    /*     * quartz初始化监听器     */    @Bean    public QuartzInitializerListener executorListener() {        return new QuartzInitializerListener();    }}

上述代码中,将scheduler加入到Spring容器中。scheduler是由SchedulerFactoryBean进行维护的,在SchedulerFactoryBean中对调度器工厂做了一些基本设置并从配置文件中加载了quartz数据源配置(配置文件的读取会根据运行环境profile来进行自动切换),配置了一个全局监听器用以监听任务的执行过程。

MyJobFactory

使用Spring提供的JobFactory

@Componentpublic class MyJobFactory extends AdaptableJobFactory {    @Autowired    private AutowireCapableBeanFactory capableBeanFactory;    @Override    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {        // 调用父类的方法        Object jobInstance = super.createJobInstance(bundle);        // 进行注入        capableBeanFactory.autowireBean(jobInstance);        return jobInstance;    }}

quartz.properties

quartz.properties中是quartz连接数据库的一些配置信息。

# \u56FA\u5B9A\u524D\u7F00org.quartz# \u4E3B\u8981\u5206\u4E3Ascheduler\u3001threadPool\u3001jobStore\u3001plugin\u7B49\u90E8\u5206##org.quartz.scheduler.instanceName = DefaultQuartzSchedulerorg.quartz.scheduler.rmi.export = falseorg.quartz.scheduler.rmi.proxy = falseorg.quartz.scheduler.wrapJobExecutionInUserTransaction = false# \u5B9E\u4F8B\u5316ThreadPool\u65F6\uFF0C\u4F7F\u7528\u7684\u7EBF\u7A0B\u7C7B\u4E3ASimpleThreadPoolorg.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool# threadCount\u548CthreadPriority\u5C06\u4EE5setter\u7684\u5F62\u5F0F\u6CE8\u5165ThreadPool\u5B9E\u4F8B# \u5E76\u53D1\u4E2A\u6570org.quartz.threadPool.threadCount = 5# \u4F18\u5148\u7EA7org.quartz.threadPool.threadPriority = 5org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = trueorg.quartz.jobStore.misfireThreshold = 5000# \u9ED8\u8BA4\u5B58\u50A8\u5728\u5185\u5B58\u4E2D#org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore#\u6301\u4E45\u5316org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX#org.quartz.jobStore.useProperties=falseorg.quartz.jobStore.tablePrefix = QRTZ_org.quartz.jobStore.dataSource = qzDSorg.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driverorg.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/quartz?characterEncoding=UTF-8&useSSL=false&testOnBorrow=true&testWhileIdle=trueorg.quartz.dataSource.qzDS.user=quartzorg.quartz.dataSource.qzDS.password=123456org.quartz.dataSource.qzDS.maxConnections = 30org.quartz.dataSource.qzDS.validationQuery = SELECT 1 FROM DUALorg.quartz.dataSource.qzDS.validateOnCheckout = trueorg.quartz.dataSource.qzDS.idleConnectionValidationSeconds = 40#org.quartz.dataSource.qzDS.discardIdleConnectionsSeconds = 60

quartz会根据这个配置文件将Job持久化到数据库中,也因此quartz会需要初始化一些数据库表,表结构文件在文末。

SchedulerListener

调度器监听器用以监听任务的执行状态。

public class SchedulerListener implements JobListener {    private final Logger LOG = LoggerFactory.getLogger(SchedulerListener.class);    public static final String LISTENER_NAME = "QuartSchedulerListener";    @Override    public String getName() {        return LISTENER_NAME; //must return a name    }    //任务被调度前    @Override    public void jobToBeExecuted(JobExecutionContext context) {        String dataSource = context.getJobDetail().getJobDataMap().getString("dataSource");        // 切换任务的数据源        DataSourceContextHolder.setDB(dataSource);        String jobName = context.getJobDetail().getKey().toString();        LOG.info("Job {} is going to start,switch dataSource to {},Thread name {}", jobName, dataSource, Thread.currentThread().getName());    }    //任务调度被拒了    @Override    public void jobExecutionVetoed(JobExecutionContext context) {        String jobName = context.getJobDetail().getKey().toString();        LOG.error("job {} is jobExecutionVetoed", jobName);        //可以做一些日志记录原因    }    //任务被调度后    @Override    public void jobWasExecuted(JobExecutionContext context,                               JobExecutionException jobException) {        // 清空存储的数据源        String jobName = context.getJobDetail().getKey().toString();        DataSourceContextHolder.clearDB();        LOG.info("Job : {} is finished", jobName);        if (jobException != null && !jobException.getMessage().equals("")) {            LOG.error("Exception thrown by: " + jobName                    + " Exception: " + jobException.getMessage());        }    }}

SchedulerListener监听任务被调度前、调度后和调度被拒绝时的状态,在任务被调度之前和之后对任务所使用的数据源进行了处理。如果项目中不需要数据源切换的话,这个监听器是不需要的,到此已经完成了quartz的集成。

多数据源切换

多数据源切换

通过自定义DynamicDataSource来覆盖Spring Boot中原有的数据源。

DataSourceConfig

通过读取配置文件中不同的数据源,初始化项目中可能用到的数据源用以切换。

/** * 多数据源配置类 */@Configurationpublic class DataSourceConfig {    //数据源1    @Bean(name = "datasource1")    @ConfigurationProperties(prefix = "spring.datasource.db1") // application.properteis中对应属性的前缀    public DataSource dataSource1() {        return DataSourceBuilder.create().build();    }    //数据源2    @Bean(name = "datasource2")    @ConfigurationProperties(prefix = "spring.datasource.db2") // application.properteis中对应属性的前缀    public DataSource dataSource2() {        return DataSourceBuilder.create().build();    }    /**     * 动态数据源: 通过AOP在不同数据源之间动态切换     *     * @return     */    @Primary    @Bean(name = "dynamicDataSource")    public DataSource dynamicDataSource() {        DynamicDataSource dynamicDataSource = new DynamicDataSource();        // 默认数据源        dynamicDataSource.setDefaultTargetDataSource(dataSource1());        // 配置多数据源        Map dsMap = new HashMap();        dsMap.put(DataSourceEnum.DB1.getName(), dataSource1());        dsMap.put(DataSourceEnum.DB2.getName(), dataSource2());        dynamicDataSource.setTargetDataSources(dsMap);        return dynamicDataSource;    }    @Bean    public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();        //设置数据源        sqlSessionFactoryBean.setDataSource(dataSource);        return sqlSessionFactoryBean.getObject();    }    /**     * 配置@Transactional注解事物     *     * @return     */    @Bean    public PlatformTransactionManager transactionManager() {        return new DataSourceTransactionManager(dynamicDataSource());    }}

数据源配置

spring:  datasource:    db1:      driver-class-name: com.mysql.cj.jdbc.Driver      username: doctor      password: 123456      type: com.zaxxer.hikari.HikariDataSource      jdbc-url: jdbc:mysql://127.0.0.1:3306/doctor?useSSL=false&testOnBorrow=true&testWhileIdle=true    db2:      driver-class-name: com.mysql.cj.jdbc.Driver      username: quartz      password: 123456      type: com.zaxxer.hikari.HikariDataSource      jdbc-url: jdbc:mysql://127.0.0.1:3307/quartz?useSSL=false&testOnBorrow=true&testWhileIdle=true

DataSourceContextHolder

由于quartz在执行过程中是通过不同的线程来执行Job的,因此此处通过ThreadLocal来保存线程所使用的数据源情况。

/** * 保存本地数据源 */public class DataSourceContextHolder {    private static final Logger LOG = LoggerFactory.getLogger(DataSourceContextHolder.class);    /**     * 默认数据源     */    public static final String DEFAULT_DS = DataSourceEnum.DB1.getName();    /**     * ThreadLocal之后会进行讲解     */    private static final ThreadLocal contextHolder = new ThreadLocal<>();    // 设置数据源名    public static void setDB(String dbType) {        LOG.info("切换到{}数据源", dbType);        contextHolder.set(dbType);    }    // 获取数据源名    public static String getDB() {        return (contextHolder.get());    }    // 清除数据源名    public static void clearDB() {        contextHolder.remove();    }}

DynamicDataSource

获取执行中所使用的数据源。由于数据源被保存在了DataSourceContextHolder中的ThreadLocal中,所以直接获取就行了。

/** * 获取本地数据源 */public class DynamicDataSource extends AbstractRoutingDataSource {    private static final Logger LOG = LoggerFactory.getLogger(DynamicDataSource.class);    @Override    protected Object determineCurrentLookupKey() {        LOG.info("数据源为{}", DataSourceContextHolder.getDB());        return DataSourceContextHolder.getDB();    }}

至此就完成了集成quartz及数据源切换的功能。然后就是具体的任务了。

执行任务

具体的任务需要继承BaseJob并在execute方法中重写具体需要执行的任务。

execute

@Slf4j@Servicepublic class ReadNumJob extends BaseJob {    @Autowired    private RedisService redisService;    @Autowired    private JdbcTemplate jdbcTemplate;    private final Logger LOG = LoggerFactory.getLogger(ReadNumJob.class);    @Override    public void execute(JobExecutionContext context) {       doSomething();    }}

指定数据源

然后在添加任务时指定任务所使用的数据源

ReadNumJob job = new ReadNumJob();job.setJobName("test");job.setJobGroup("hys");job.setDescription("test");// 指定数据源job.getDataMap().put("dataSource", DataSourceEnum.DB1.getName());job.setCron_Expression("0 */1 * * * ?");try {jobAndTriggerService.addJob(job);} catch (SchedulerException e) {e.printStackTrace();}

"Spring Boot集成quartz如何实现定时任务并支持切换任务数据源"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0