千家信息网

SpringBoot+SpringBatch+Quartz整合定时批量任务方式的示例分析

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要介绍了SpringBoot+SpringBatch+Quartz整合定时批量任务方式的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着
千家信息网最后更新 2025年01月23日SpringBoot+SpringBatch+Quartz整合定时批量任务方式的示例分析

这篇文章主要介绍了SpringBoot+SpringBatch+Quartz整合定时批量任务方式的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

一、引言

需求内容如下:

PC网页触发一条设备升级记录(下图),后台要定时批量设备更新。这里定时要用到Quartz,批量数据处理要用到SpringBatch,二者结合,可以完成该需求。

由于之前,没有用过SpringBatch,于是上网查了下资料,发现可参考的不是很多,于是只能去慢慢的翻看官方文档。

遇到不少问题,就记录一下吧。

二、代码具体实现

1、pom文件

            org.springframework.boot      spring-boot-starter-web              org.postgresql      postgresql              org.springframework.boot      spring-boot-starter-jdbc              org.springframework.boot      spring-boot-starter-batch              org.projectlombok      lombok              org.springframework.boot      spring-boot-starter-batch       

2、application.yaml文件

spring:  datasource:    username: thinklink    password: thinklink    url: jdbc:postgresql://172.16.205.54:5432/thinklink    driver-class-name: org.postgresql.Driver  batch:    job:      enabled: falseserver:  port: 8073#upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/# 每次批量处理的数据量,默认为5000batch-size: 5000

3、Service实现类

触发批处理任务的入口,执行一个job

@Service("batchService")public class BatchServiceImpl implements BatchService {        // 框架自动注入    @Autowired    private JobLauncher jobLauncher;    @Autowired    private Job updateDeviceJob;    /**     * 根据 taskId 创建一个Job     * @param taskId     * @throws Exception     */    @Override    public void createBatchJob(String taskId) throws Exception {        JobParameters jobParameters = new JobParametersBuilder()                .addString("taskId", taskId)                .addString("uuid", UUID.randomUUID().toString().replace("-",""))                .toJobParameters();        // 传入一个Job任务和任务需要的参数        jobLauncher.run(updateDeviceJob, jobParameters);    }}

4、SpringBatch配置类

此部分最重要(☆☆☆☆☆)

@Configurationpublic class BatchConfiguration {    private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class);    @Value("${batch-size:5000}")    private int batchSize;        // 框架自动注入    @Autowired    public JobBuilderFactory jobBuilderFactory;        // 框架自动注入    @Autowired    public StepBuilderFactory stepBuilderFactory;        // 数据过滤器,对从数据库读出来的数据,注意进行操作    @Autowired    public TaskItemProcessor taskItemProcessor;    // 接收job参数    public Map parameters;    public Object taskId;    @Autowired    private JdbcTemplate jdbcTemplate;        // 读取数据库操作    @Bean    @StepScope    public JdbcCursorItemReader itemReader(DataSource dataSource) {        String querySql = " SELECT " +                " e. ID AS taskId, " +                " e.user_id AS userId, " +                " e.timing_startup AS startTime, " +                " u.device_id AS deviceId, " +                " d.app_name AS appName, " +                " d.compose_file AS composeFile, " +                " e.failure_retry AS failureRetry, " +                " e.tetry_times AS retryTimes, " +                " e.device_managered AS deviceManagered " +                " FROM " +                " eiot_upgrade_task e " +                " LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " +                " LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " +                " WHERE " +                " ( " +                " u.device_upgrade_status = 0 " +                " OR u.device_upgrade_status = 2" +                " )" +                " AND e.tetry_times > u.retry_times " +                " AND e. ID = ?";        return new JdbcCursorItemReaderBuilder()                .name("itemReader")                .sql(querySql)                .dataSource(dataSource)                .queryArguments(new Object[]{parameters.get("taskId").getValue()})                .rowMapper(new DispatchRequest.DispatchRequestRowMapper())                .build();    }        // 将结果写回数据库    @Bean    @StepScope    public ItemWriter itemWriter() {        return new ItemWriter() {            private int updateTaskStatus(DispatchRequest dispatchRequest, int status) {                log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status);                Integer retryTimes = jdbcTemplate.queryForObject(                        "select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?",                        new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class                );                retryTimes += 1;                int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " +                        "where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId());                if (updateCount <= 0) {                    log.warn("no task updated");                } else {                    log.info("count of {} task updated", updateCount);                }                // 最后一次重试                if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) {                    log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId());                    return 1;                } else {                    return 0;                }            }            @Override            @Transactional            public void write(List list) throws Exception {                Map taskMap = jdbcTemplate.queryForMap(                        "select device_managered, device_count, task_status from eiot_upgrade_task where id = ?",                        list.get(0).getDispatchRequest().getTaskId() // 我们认定一个批量里面,taskId都是一样的                        );                int deviceManagered = (int)taskMap.get("device_managered");                Integer deviceCount = (Integer) taskMap.get("device_count");                if (deviceCount == null) {                    log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId());                }                int taskStatus = (int)taskMap.get("task_status");                for (ProcessResult result: list) {                    deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus());                }                if (deviceCount != null && deviceManagered == deviceCount) {                    taskStatus = 2; //任务状态 0:待升级,1:升级中,2:已完成                }                jdbcTemplate.update("update eiot_upgrade_task  set device_managered = ?, task_status = ? " +                        "where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId());            }        };    }    /**     * 定义一个下发更新的 job     * @return     */    @Bean    public Job updateDeviceJob(Step updateDeviceStep) {        return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))                .listener(new JobListener()) // 设置Job的监听器                .flow(updateDeviceStep)// 执行下发更新的Step                .end()                .build();    }    /**     * 定义一个下发更新的 step     * @return     */    @Bean    public Step updateDeviceStep(JdbcCursorItemReader itemReader,ItemWriter itemWriter) {        return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))                . chunk(batchSize)                .reader(itemReader) //根据taskId从数据库读取更新设备信息                .processor(taskItemProcessor) // 每条更新信息,执行下发更新接口                .writer(itemWriter)                .build();    }    // job 监听器    public class JobListener implements JobExecutionListener {        @Override        public void beforeJob(JobExecution jobExecution) {            log.info(jobExecution.getJobInstance().getJobName() + " before... ");            parameters = jobExecution.getJobParameters().getParameters();            taskId = parameters.get("taskId").getValue();            log.info("job param taskId : " + parameters.get("taskId"));        }        @Override        public void afterJob(JobExecution jobExecution) {            log.info(jobExecution.getJobInstance().getJobName() + " after... ");            // 当所有job执行完之后,查询设备更新状态,如果有失败,则要定时重新执行job            String sql = " SELECT " +                    " count(*) " +                    " FROM " +                    " eiot_upgrade_device d " +                    " LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " +                    " WHERE " +                    " u. ID = ? " +                    " AND d.retry_times < u.tetry_times " +                    " AND ( " +                    " d.device_upgrade_status = 0 " +                    " OR d.device_upgrade_status = 2 " +                    " ) ";            // 获取更新失败的设备个数            Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);            log.info("update device failure count : " + count);            // 下面是使用Quartz触发定时任务            // 获取任务时间,单位秒//            String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);            // 此处方便测试,应该从数据库中取taskId对应的重试间隔,单位秒            Integer millSecond = 10;            if(count != null && count > 0){                String jobName = "UpgradeTask_" + taskId;                String reTaskId = taskId.toString();                Map params = new HashMap<>();                params.put("jobName",jobName);                params.put("taskId",reTaskId);                if (QuartzManager.checkNameNotExist(jobName))                {                    QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond);                }            }        }    }}

5、Processor,处理每条数据

可以在此对数据进行过滤操作

@Component("taskItemProcessor")public class TaskItemProcessor implements ItemProcessor {    public static final int STATUS_DISPATCH_FAILED = 2;    public static final int STATUS_DISPATCH_SUCC = 1;    private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class);    @Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")    private String dispatchUrl;    @Autowired    JdbcTemplate jdbcTemplate;    /**     * 在这里,执行 下发更新指令 的操作     * @param dispatchRequest     * @return     * @throws Exception     */    @Override    public ProcessResult process(final DispatchRequest dispatchRequest) {        // 调用接口,下发指令        String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();        log.info("request url:" + url);        RestTemplate restTemplate = new RestTemplate();        HttpHeaders headers = new HttpHeaders();        headers.setContentType(MediaType.APPLICATION_JSON_UTF8);        MultiValueMap params = new LinkedMultiValueMap();        JSONObject jsonOuter = new JSONObject();        JSONObject jsonInner = new JSONObject();        try {            jsonInner.put("jobId",dispatchRequest.getTaskId());            jsonInner.put("name",dispatchRequest.getName());            jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));            jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy()));            jsonInner.put("timestamp",dispatchRequest.getTimestamp());            jsonOuter.put("method","updateApp");            jsonOuter.put("params",jsonInner);        } catch (JSONException e) {            log.info("JSON convert Exception :" + e);        }catch (IOException e) {            log.info("Base64Util bytesToBase64Str :" + e);        }        log.info("request body json :" + jsonOuter);        HttpEntity requestEntity = new HttpEntity(jsonOuter.toString(),headers);        int status;        try {            ResponseEntity response = restTemplate.postForEntity(url,requestEntity,String.class);            log.info("response :" + response);            if (response.getStatusCode() == HttpStatus.OK) {                status = STATUS_DISPATCH_SUCC;            } else {                status = STATUS_DISPATCH_FAILED;            }        }catch (Exception e){            status = STATUS_DISPATCH_FAILED;        }        return new ProcessResult(dispatchRequest, status);    }}

6、封装数据库返回数据的实体Bean

注意静态内部类

public class DispatchRequest {    private String taskId;    private String deviceId;    private String userId;    private String name;    private byte[] composeFile;    private String policy;    private String timestamp;    private String md5;    private int failureRetry;    private int retryTimes;    private int deviceManagered;   // 省略构造函数,setter/getter/tostring方法   //......       public static class DispatchRequestRowMapper implements RowMapper {        @Override        public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException {            DispatchRequest dispatchRequest = new DispatchRequest();            dispatchRequest.setTaskId(resultSet.getString("taskId"));            dispatchRequest.setUserId(resultSet.getString("userId"));            dispatchRequest.setPolicy(resultSet.getString("startTime"));            dispatchRequest.setDeviceId(resultSet.getString("deviceId"));            dispatchRequest.setName(resultSet.getString("appName"));            dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));            dispatchRequest.setTimestamp(DateUtil.DateToString(new Date()));            dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));            dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));            dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));            return dispatchRequest;        }    }}

7、启动类上要加上注解

@SpringBootApplication@EnableBatchProcessingpublic class Application {    public static void main(String[] args) {        SpringApplication.run(Application.class, args);    }}

三、小结一下

其实SpringBatch并没有想象中那么好用,当从数据库中每次取5000条数据后,进入processor中是逐条处理的,这个时候不能不行操作,等5000条数据处理完之后,再一次性执行ItemWriter方法。

在使用的过程中,最坑的地方是ItemReader和ItemWriter这两个地方,如何执行自定义的Sql,参考文中代码就行。至于Quartz定时功能,很简单,只要定时创建SpringBatch里面的Job,让这个job启动就好了,此处就不在给出了,贴的代码太多了。由于公司一些原因,代码不能放到GitHub上。

spring-batch与quartz集成过程中遇到的问题

问题

启动时报Exception

Driver's Blob representation is of an unsupported type: weblogic.jdbc.wrapper.Blob_oracle_sql_BLOB

原因

quartz的driverDelegateClass配置的是OracleDelegate,应用运行在weblogic上

解决

driverDelegateClass对应配置改为

org.quartz.impl.jdbcjobstore.oracle.weblogic.WebLogicOracleDelegate

感谢你能够认真阅读完这篇文章,希望小编分享的"SpringBoot+SpringBatch+Quartz整合定时批量任务方式的示例分析"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!

0