千家信息网

Spring batch入门示例

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,1 场景说明读取CVS文件,经过处理后,保存到数据库。2 项目结构应用程序启动主程序DemoApplication.java读取文件(输入文件)UserItemReader.java处理数据UserI
千家信息网最后更新 2025年02月01日Spring batch入门示例

1 场景说明

读取CVS文件,经过处理后,保存到数据库。

2 项目结构

应用程序

启动主程序

DemoApplication.java

读取文件(输入文件)

UserItemReader.java

处理数据

UserItemProcess.java

输出文件

UserItemWriter.java

调度批作业

定时处理配置

QuartzConfiguration.java

定时调度

QuartzJobLauncher.java

辅助文件

数据文件

User.txt

对象实体(传递对象)

User.java

Meaven配置文件

Pom.xml

2.1 Pom.xml

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.zy

SpringBatchDemo1

0.0.1-SNAPSHOT

jar

SpringBatchDemo1

Demo project for Spring Boot

org.springframework.boot

spring-boot-starter-parent

1.5.10.RELEASE

UTF-8

UTF-8

1.8

org.springframework

spring-context-support

org.springframework.boot

spring-boot-starter-batch

org.springframework

spring-oxm

org.projectlombok

lombok

mysql

mysql-connector-java

runtime

org.springframework.boot

spring-boot-starter-test

test

org.springframework.batch

spring-batch-test

test

org.projectlombok

lombok

org.quartz-scheduler

quartz

2.3.0

com.h3database

h3

runtime

org.springframework.boot

spring-boot-maven-plugin

2.2 User.java

package com.zy.model;

public class User {

private String id;

private String name;

private String age;

public User(String id, String name, String age) {

this.id = id;

this.name = name;

this.age = age;

}

public String getId() {

return id;

}

public void setId(String id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public String getAge() {

return age;

}

public void setAge(String age) {

this.age = age;

}

@Override

public String toString() {

return "User [id=" + id + ", name=" + name + ", age=" + age + "]";

}

}

2.3 UserItemReader.java

package com.zy.reader;

import org.springframework.batch.item.file.FlatFileItemReader;

import org.springframework.batch.item.file.LineMapper;

import org.springframework.batch.item.file.mapping.DefaultLineMapper;

import org.springframework.batch.item.file.mapping.FieldSetMapper;

import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;

import org.springframework.batch.item.file.transform.FieldSet;

import org.springframework.batch.item.file.transform.LineTokenizer;

import org.springframework.core.io.ClassPathResource;

import org.springframework.validation.BindException;

import com.zy.model.User;

//从user.txt文件中读取信息到User

public class UserItemReader extends FlatFileItemReader {

public UserItemReader(){

createReader();

}

private void createReader(){

this.setResource(new ClassPathResource("data/User.txt"));

this.setLinesToSkip(1);

this.setLineMapper(userLineMapper());

}

private LineMapper userLineMapper(){

DefaultLineMapper lineMapper = new DefaultLineMapper<>();

lineMapper.setLineTokenizer(userLineTokenizer());

lineMapper.setFieldSetMapper(new UserFieldStepMapper());

lineMapper.afterPropertiesSet();

return lineMapper;

}

private LineTokenizer userLineTokenizer(){

DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();

tokenizer.setNames(new String[]{"ID", "NAME", "AGE"});

return tokenizer;

}

private static class UserFieldStepMapper implements FieldSetMapper{

@Override

public User mapFieldSet(FieldSet fieldSet) throws BindException {

return new User(fieldSet.readString("ID"),

fieldSet.readString("NAME"),

fieldSet.readString("AGE"));

}

}

}

2.4 User.txt

ID,NAME,AGE

1,zy,28

2,tom,20

3,terry,30

4,lerry,18

5,bob,25

6,linda,27

7,marry,39

8,long,22

9,kin,33

10,王五,40

2.5 UserItemProcessor.java

package com.zy.processor;

import org.springframework.batch.item.ItemProcessor;

import com.zy.model.User;

public class UserItemProcessor implements ItemProcessor {

@Override

public User process(User item) throws Exception {

if (Integer.parseInt(item.getAge()) > 20) {

return item;

}

return null;

}

}

2.6 UserItemWriter.java

package com.zy.writer;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

import com.zy.model.User;

public class UserItemWriter implements ItemWriter {

@Override

public void write(List items) throws Exception {

for(User user : items){

System.out.println(user);

}

}

}

2.7 QuartzJobLauncher

package com.zy.QuartzConfiguration;

import java.text.SimpleDateFormat;

import java.util.Date;

import org.quartz.JobDataMap;

import org.quartz.JobDetail;

import org.quartz.JobExecutionContext;

import org.quartz.JobExecutionException;

import org.quartz.JobKey;

import org.springframework.batch.core.Job;

import org.springframework.batch.core.JobExecution;

import org.springframework.batch.core.JobParameters;

import org.springframework.batch.core.configuration.JobLocator;

import org.springframework.batch.core.launch.JobLauncher;

import org.springframework.scheduling.quartz.QuartzJobBean;

public class QuartzJobLauncher extends QuartzJobBean {

@Override

protected void executeInternal(JobExecutionContext context) throws JobExecutionException {

JobDetail jobDetail = context.getJobDetail();

JobDataMap jobDataMap = jobDetail.getJobDataMap();

String jobName = jobDataMap.getString("jobName");

JobLauncher jobLauncher = (JobLauncher) jobDataMap.get("jobLauncher");

JobLocator jobLocator = (JobLocator) jobDataMap.get("jobLocator");

System.out.println("jobName : " + jobName);

System.out.println("jobLauncher : " + jobLauncher);

System.out.println("jobLocator : " + jobLocator);

JobKey key = context.getJobDetail().getKey();

System.out.println(key.getName() + " : " + key.getGroup());

SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

System.out.println("Current time : " + sf.format(new Date()));

try {

Job job = jobLocator.getJob(jobName);

JobExecution jobExecution = jobLauncher.run(job, new JobParameters());

} catch (Exception e) {

e.printStackTrace();

}

}

}

2.8 QuartzConfiguration

package com.zy.QuartzConfiguration;

import java.util.HashMap;

import java.util.Map;

import org.springframework.batch.core.configuration.JobLocator;

import org.springframework.batch.core.configuration.JobRegistry;

import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;

import org.springframework.batch.core.launch.JobLauncher;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.quartz.CronTriggerFactoryBean;

import org.springframework.scheduling.quartz.JobDetailFactoryBean;

import org.springframework.scheduling.quartz.SchedulerFactoryBean;

@Configuration

public class QuartzConfiguration {

//自动注入进来的是SimpleJobLauncher

@Autowired

private JobLauncher jobLauncher;

@Autowired

private JobLocator jobLocator;

/*用来注册job*/

/*JobRegistry会自动注入进来*/

@Bean

public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry){

JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();

jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);

return jobRegistryBeanPostProcessor;

}

@Bean

public JobDetailFactoryBean jobDetailFactoryBean(){

JobDetailFactoryBean jobFactory = new JobDetailFactoryBean();

jobFactory.setJobClass(QuartzJobLauncher.class);

jobFactory.setGroup("my_group");

jobFactory.setName("my_job");

Map map = new HashMap<>();

map.put("jobName", "zyJob");

map.put("jobLauncher", jobLauncher);

map.put("jobLocator", jobLocator);

jobFactory.setJobDataAsMap(map);

return jobFactory;

}

@Bean

public CronTriggerFactoryBean cronTriggerFactoryBean(){

CronTriggerFactoryBean cTrigger = new CronTriggerFactoryBean();

System.out.println("------- : " + jobDetailFactoryBean().getObject());

cTrigger.setJobDetail(jobDetailFactoryBean().getObject());

cTrigger.setStartDelay(3000);

cTrigger.setName("my_trigger");

cTrigger.setGroup("trigger_group");

cTrigger.setCron_Expression("0/3 * * * * ? "); //每间隔3s触发一次Job任务

return cTrigger;

}

@Bean

public SchedulerFactoryBean schedulerFactoryBean(){

SchedulerFactoryBean schedulerFactor = new SchedulerFactoryBean();

schedulerFactor.setTriggers(cronTriggerFactoryBean().getObject());

return schedulerFactor;

}

}

2.9 BatchConfiguration

package com.zy.config;

import org.springframework.batch.core.Job;

import org.springframework.batch.core.Step;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;

import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;

import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.Import;

import com.zy.QuartzConfiguration.QuartzConfiguration;

import com.zy.model.User;

import com.zy.processor.UserItemProcessor;

import com.zy.reader.UserItemReader;

import com.zy.writer.UserItemWriter;

@Configuration

@EnableBatchProcessing

//@Import({QuartzConfiguration.class})

public class BatchConfiguration {

@Autowired

public JobBuilderFactory jobBuilderFactory;

@Autowired

public StepBuilderFactory stepBuilderFactory;

/*创建job*/

@Bean

public Job jobMethod(){

return jobBuilderFactory.get("zyJob")

.start(stepMethod())

.build();

}

/*创建step*/

@Bean

public Step stepMethod(){

return stepBuilderFactory.get("myStep1")

.chunk(3)

.reader(new UserItemReader())

.processor(new UserItemProcessor())

.writer(new UserItemWriter())

.allowStartIfComplete(true)

.build();

}

}

3 执行Job输出结果

2019-04-30 21:31:48.049 INFO 9344 --- [ryBean_Worker-5] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=zyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

jobName : zyJob

jobLauncher : org.springframework.batch.core.launch.support.SimpleJobLauncher@2d27244d

jobLocator : org.springframework.batch.core.configuration.support.MapJobRegistry@6fc00b5

my_job : my_group

Current time : 2019-04-30 21:31:51

2019-04-30 21:31:51.012 INFO 9344 --- [ryBean_Worker-6] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=zyJob]] launched with the following parameters: [{}]

2019-04-30 21:31:51.028 INFO 9344 --- [ryBean_Worker-6] o.s.batch.core.job.SimpleStepHandler : Executing step: [myStep1]

User [id=1, name=zy, age=28]

User [id=3, name=terry, age=30]

User [id=5, name=bob, age=25]

User [id=6, name=linda, age=27]

User [id=7, name=marry, age=39]

User [id=8, name=long, age=22]

User [id=9, name=kin, age=33]

User [id=10, name=ww, age=40]

4 概念总结


Job Repository

作业仓库,负责Job,Step执行过程中的状态保存。


Job Launcher

作业调度器,提供执行Job的入口


Job

作业,多个Step组成,封装整个批处理操作。


Step

作业步,Job的一个执行环节,由多个或者一个Step组装成Job


Tasklet

Step中具体执行的逻辑的操作,可以重复执行,可以具体的设置同步,异步操作。


Chunk

给定数量的Item集合,可以定义对Chunk的读操作,处理操作,写操作,提交间隔。


Item

一条数据记录。


ItemReader

从数据源(文件系统,数据库,队列等)读取Item


ItemProcessor

在写入数据源之前,对数据进行处理(如:数据清洗,转换,过滤,数据校验等)。


ItemWriter

将Item批量写入数据源(文件系统,数据库,队列等)。

5 Spring Batch 结构

Spring Batch的一个基本层级结构。

首先,Spring Batch运行的基本单位是一个Job,一个Job就做一件批处理的事情。

一个Job包含很多Step,step就是每个job要执行的单个步骤。

如下图所示,Step里面,会有Tasklet,Tasklet是一个任务单元,它是属于可以重复利用的东西。

然后是Chunk,chunk就是数据块,你需要定义多大的数据量是一个chunk。

Chunk里面就是不断循环的一个流程,读数据,处理数据,然后写数据。Spring Batch会不断的循环这个流程,直到批处理数据完成。



数据 文件 处理 作业 就是 数据库 数据源 结构 调度 不断 任务 多个 对象 流程 系统 队列 循环 输出 配置 东西 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 吃鸡土耳其是哪个服务器 大学生网络安全情景短剧剧本 辽宁通信管理局官网络安全处 软件开发行情如何 安卓开发软件开发方案报价 判断数据库是否存在漏洞 衡阳青鸟软件开发学校怎么样 网络安全app广告 图书馆数据库类别 数据库无法启动了但是无法读取 小学生网络安全知识黑板报 手机打游戏无法连接服务器 软件开发维护实习周报 人寿软件开发中心待遇 哥网交互网络技术有限公司 星韵全能抽奖软件数据库密码 网络技术应用统计学笔记 誉辉互联网科技有限公司 区块链和数据库的相同点 软件开发过程测试完整最新版 我的世界网易学校服务器 排行榜小说软件开发 华为暂停网络安全 重庆qt软件开发 如何复制sql列中数据库 网络技术高中会考试题 西安软件开发培训需要多长时间 网络安全常用问题 服务器的保留时间多久 数据库外部关键字怎么搞
0