千家信息网

Springboot中怎么实现多线程并发定时任务

发表于:2025-01-20 作者:千家信息网编辑
千家信息网最后更新 2025年01月20日,小编给大家分享一下Springboot中怎么实现多线程并发定时任务,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!一、实现1
千家信息网最后更新 2025年01月20日Springboot中怎么实现多线程并发定时任务

小编给大家分享一下Springboot中怎么实现多线程并发定时任务,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

一、实现

1、启动类

在启动类添加注解@EnableScheduling开启,不然不起用做。

2、新建任务类

添加注解@Component注册到spring的容器中。

package com.example.demo.task;import com.example.demo.entity.MyTask;import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.annotation.SchedulingConfigurer;import org.springframework.scheduling.config.CronTask;import org.springframework.scheduling.config.ScheduledTaskRegistrar;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;import javax.annotation.PreDestroy;import java.util.Arrays;import java.util.List;import java.util.Objects;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledFuture;/** * @path:com.example.demo.task.ScheduledTask.java * @className:ScheduledTask.java * @description:定时任务 * @author:tanyp * @dateTime:2020/7/23 21:37  * @editNote: */@Slf4j@Componentpublic class ScheduledTask implements SchedulingConfigurer {    private volatile ScheduledTaskRegistrar registrar;    private final ConcurrentHashMap> scheduledFutures = new ConcurrentHashMap<>();    private final ConcurrentHashMap cronTasks = new ConcurrentHashMap<>();    /**     * 默认启动10个线程     */    private static final Integer DEFAULT_THREAD_POOL = 10;    @Override    public void configureTasks(ScheduledTaskRegistrar registrar) {        registrar.setScheduler(Executors.newScheduledThreadPool(DEFAULT_THREAD_POOL));        this.registrar = registrar;    }    @PreDestroy    public void destroy() {        this.registrar.destroy();    }    /**     * @methodName:refreshTask     * @description:初始化任务     * 1、从数据库获取执行任务的集合【TxTask】     * 2、通过调用 【refresh】 方法刷新任务列表     * 3、每次数据库中的任务发生变化后重新执行【1、2】     * @author:tanyp     * @dateTime:2020/7/23 21:37     * @Params: [tasks]     * @Return: void     * @editNote:     */    public void refreshTask(List tasks) {        // 删除已经取消任务        scheduledFutures.keySet().forEach(key -> {            if (Objects.isNull(tasks) || tasks.size() == 0) {                scheduledFutures.get(key).cancel(false);                scheduledFutures.remove(key);                cronTasks.remove(key);                return;            }            tasks.forEach(task -> {                if (!Objects.equals(key, task.getTaskId())) {                    scheduledFutures.get(key).cancel(false);                    scheduledFutures.remove(key);                    cronTasks.remove(key);                    return;                }            });        });        // 添加新任务、更改执行规则任务        tasks.forEach(txTask -> {            String expression = txTask.get_Expression();            // 任务表达式为空则跳过            if (StringUtils.isEmpty(expression)) {                return;            }            // 任务已存在并且表达式未发生变化则跳过            if (scheduledFutures.containsKey(txTask.getTaskId()) && cronTasks.get(txTask.getTaskId()).get_Expression().equals(expression)) {                return;            }            // 任务执行时间发生了变化,则删除该任务            if (scheduledFutures.containsKey(txTask.getTaskId())) {                scheduledFutures.get(txTask.getTaskId()).cancel(false);                scheduledFutures.remove(txTask.getTaskId());                cronTasks.remove(txTask.getTaskId());            }            CronTask task = new CronTask(new Runnable() {                @Override                public void run() {                    // 执行业务逻辑                    try {                        log.info("执行单个任务,任务ID【{}】执行规则【{}】", txTask.getTaskId(), txTask.get_Expression());                        System.out.println("==========================执行任务=============================");                    } catch (Exception e) {                        log.error("执行发送消息任务异常,异常信息:{}", e);                    }                }            }, expression);            ScheduledFuture future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());            cronTasks.put(txTask.getTaskId(), task);            scheduledFutures.put(txTask.getTaskId(), future);        });    }}

3、创建自启动任务类

package com.example.demo.task;import com.example.demo.task.ScheduledTask;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.ApplicationArguments;import org.springframework.boot.ApplicationRunner;import org.springframework.stereotype.Component;import java.util.Arrays;import java.util.List;/** * @path:com.example.demo.task.MyApplicationRunner.java * @className:ScheduledTask.java * @description:自启动 * @author:tanyp * @dateTime:2020/7/23 21:37  * @editNote: */@Slf4j@Componentpublic class MyApplicationRunner implements ApplicationRunner {    @Autowired    private ScheduledTask scheduledTask;    @Override    public void run(ApplicationArguments args) throws Exception {        log.info("================项目启动初始化定时任务====开始===========");        /**         * 初始化三个任务:         * 1、10秒执行一次         * 2、15秒执行一次         * 3、20秒执行一次         */        List tasks = Arrays.asList(                MyTask.builder().taskId("10001")._expression("*/10 * * * * ?").build(),                MyTask.builder().taskId("10002")._expression("*/15 * * * * ?").build(),                MyTask.builder().taskId("10003")._expression("*/20 * * * * ?").build()        );        scheduledTask.refreshTask(tasks);        log.info("================项目启动初始化定时任务====完成==========");    }}

4、实体

package com.example.demo.entity;import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;/** * @path:com.example.demo.entity.MyTask.java * @className:MyTask.java * @description:任务实体 * @author:tanyp * @dateTime:2020/7/23 21:41  * @editNote: */@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class MyTask {    /**     * 任务id     */    private String taskId;    /**     * 任务执行规则时间     */    private String expression;}

二、测试

初始化三个任务,分别为:

10秒执行一次(*/10 * * * * ?)
15秒执行一次(*/15 * * * * ?)
20秒执行一次(*/20 * * * * ?)

三、动态使用方式

1、启动方式有两种:

  • 启动项目后,手动调用ScheduledTask.refreshTask(List tasks),并初始化任务列表;

  • 使用我测试中的方式,配置项目启动完成后自动调用初始任务的方法,并初始化任务列表。

2、数据初始化

只需要给 List集合赋值并调用refreshTask()方法即可:

  • 根据业务需求修改MyTask实体类;

  • 这里的初始化数据可以从数据库读取数据赋值给集合;

例如:从mysql读取任务配置表的数据,调用refreshTask()方法。

3、如何动态?

  • 修改:修改某一项正在执行的任务规则;

  • 添加:添加一项新的任务;

  • 删除:停止某一项正在执行的任务。

例如:我们有一张任务配置表,此时进行分别新增一条或多条数据、删除一条或多条数据、改一条数据,只需要完成以上任何一项操作后,重新调用一下refreshTask()方法即可。

怎么重新调用 refreshTask()方法:可以另外启一个任务实时监控任务表的数据变化。

以上是"Springboot中怎么实现多线程并发定时任务"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0