Spring-batch (ItemProcessor) 数据处理过程
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,Spring-batch学习总结(五)学习目标:掌握ItemProcessor1.ItemProcessor:spring-batch中数据处理的过程2.ItemProcessor主要用于实现业务逻辑
千家信息网最后更新 2025年02月04日Spring-batch (ItemProcessor) 数据处理过程
Spring-batch学习总结(五)
学习目标:掌握ItemProcessor
1.ItemProcessor:spring-batch中数据处理的过程
2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等
3.Spring-batch为我们提供ItemProcessor这个接口,它包含一个方法O process(I item
4.我们用代码进行演示:
例:我们读取数据库表person_buf中的数据,将其id为奇数的数据剔除,将读出name进行字母大写转换
首先观察数据库表数据结构:
代码:
Person
package com.dhcc.batch.batchDemo.processor;import java.util.Date;public class Person { private Integer id; private String name; private String perDesc; private Date createTime; private Date updateTime; private String sex; private Float score; private Double price; public Person() { super(); } public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score, Double price) { super(); this.id = id; this.name = name; this.perDesc = perDesc; this.createTime = createTime; this.updateTime = updateTime; this.sex = sex; this.score = score; this.price = price; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getCreateTime() { return createTime; } public String getPerDesc() { return perDesc; } public void setPerDesc(String perDesc) { this.perDesc = perDesc; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public Float getScore() { return score; } public void setScore(Float score) { this.score = score; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; } @Override public String toString() { return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime=" + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]"; }}
PersonLineAggregator
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class PersonLineAggregator implements LineAggregator { //JSON private ObjectMapper mapper=new ObjectMapper(); @Override public String aggregate(Person person) { try { return mapper.writeValueAsString(person); } catch (JsonProcessingException e) { throw new RuntimeException("unable to writer...",e); } }}
PersonRowMapper
package com.dhcc.batch.batchDemo.processor;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;/** * 实现将数据库中的每条数据映射到Person对象中 * @author Administrator * */public class PersonRowMapper implements RowMapper { /** * rs一条结果集,rowNum代表当前行 */ @Override public Person mapRow(ResultSet rs, int rowNum) throws SQLException { return new Person(rs.getInt("id") ,rs.getString("name") ,rs.getString("per_desc") ,rs.getDate("create_time") ,rs.getDate("update_time") ,rs.getString("sex") ,rs.getFloat("score") ,rs.getDouble("price")); }}
ProcessorFileApplication
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class ProcessorFileApplication { public static void main(String[] args) { SpringApplication.run(ProcessorFileApplication.class, args); }}
ProcessorFileOutputFromDBConfiguration
package com.dhcc.batch.batchDemo.processor;import java.io.File;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.CompositeItemProcessor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;@Configurationpublic class ProcessorFileOutputFromDBConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Autowired private ItemProcessor fristNameUpperCaseProcessor; @Autowired private ItemProcessor idFilterProcessor; @Bean public Job ProcessorFileOutputFromDBJob() { return jobBuilderFactory.get("ProcessorFileOutputFromDBJob") .start(ProcessorFileOutputFromDBStep()) .build(); } @Bean public Step ProcessorFileOutputFromDBStep() { return stepBuilderFactory.get("ProcessorFileOutputFromDBStep") .chunk(100) .reader(ProcessorFileOutputFromItemWriter()) .processor(personDataProcessor()) .writer(ProcessorFileOutputFromItemReader()) .build(); } @Bean @StepScope public JdbcPagingItemReader ProcessorFileOutputFromItemWriter() { JdbcPagingItemReader reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); // 设置数据源 reader.setFetchSize(100); // 设置一次最大读取条数 reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中 MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列 queryProvider.setFromClause("from person_buf"); // 设置要查询的表 Map sortKeys = new HashMap();// 定义一个集合用于存放排序列 sortKeys.put("id", Order.ASCENDING);// 按照升序排序 queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider);// 设置排序列 return reader; } @Bean public CompositeItemProcessor personDataProcessor(){ CompositeItemProcessor processor=new CompositeItemProcessor<>(); List> listProcessor=new ArrayList<>(); listProcessor.add(fristNameUpperCaseProcessor); listProcessor.add(idFilterProcessor); processor.setDelegates(listProcessor); return processor; } @Bean @StepScope public FlatFileItemWriter ProcessorFileOutputFromItemReader() { FlatFileItemWriter writer = new FlatFileItemWriter(); try { File path = new File("D:" + File.separator + "newPerson.json").getAbsoluteFile(); System.out.println("file is create in :" + path); writer.setResource(new FileSystemResource(path)); writer.setLineAggregator(new PersonLineAggregator()); writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; }}
FristNameUpperCaseProcessor
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class FristNameUpperCaseProcessor implements ItemProcessor { @Override public Person process(Person item) throws Exception { return new Person(item.getId(), item.getName().toUpperCase(), item.getPerDesc(), item.getCreateTime(), item.getUpdateTime(), item.getSex(), item.getScore(), item.getPrice()); }}
IdFilterProcessor
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class IdFilterProcessor implements ItemProcessor { @Override public Person process(Person item) throws Exception { if (item.getId() % 2 == 0) { return item; } else { return null; } }}
运行结果:
观察写入完成后的文件:
可以看出我们已经完成了我们的目标
数据
数据库
排序
代码
目标
结果
学习
查询
观察
数据处理
过程
处理
最大
业务
代表
升序
大写
奇数
字母
对象
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
区块链通信网络技术
中国网络安全审查监督管理局
国产服务器上游企业
单片机通信协议 服务器
中国软件开发的不足
coc采集器升级数据库
c 读excel验证数据库
两所开设网络安全学校
数据库的命名规则必须遵循
网络安全执法检查重点
年龄30还可以学软件开发吗
网络安全的特点黑盒性
公安信息网络安全年度总结
河北软件开发招标报价
安徽二七一起网络技术有限
第八届网络安全宣传主题班会
为什么进入游戏看不到服务器
加载服务器中是什么意思
云服务器开发需要哪些技能
手机游戏服务器处理请求信息失败
数据库系统组成不包括
有人说找软件开发者可以追回
数据库物理删除数据
山西工业软件开发服务价格
cs1.6 僵尸服务器
花都区质量网络技术开发咨询报价
全校大学生网络安全知识竞赛
计算机网络安全中s是什么意思
系统找不到数据库文件夹
什么笔记本软件开发