Spring-batch (ItemProcessor) 数据处理过程
发表于:2024-09-30 作者:千家信息网编辑
千家信息网最后更新 2024年09月30日,Spring-batch学习总结(五)学习目标:掌握ItemProcessor1.ItemProcessor:spring-batch中数据处理的过程2.ItemProcessor主要用于实现业务逻辑
千家信息网最后更新 2024年09月30日Spring-batch (ItemProcessor) 数据处理过程
Spring-batch学习总结(五)
学习目标:掌握ItemProcessor
1.ItemProcessor:spring-batch中数据处理的过程
2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等
3.Spring-batch为我们提供ItemProcessor这个接口,它包含一个方法O process(I item
4.我们用代码进行演示:
例:我们读取数据库表person_buf中的数据,将其id为奇数的数据剔除,将读出name进行字母大写转换
首先观察数据库表数据结构:
代码:
Person
package com.dhcc.batch.batchDemo.processor;import java.util.Date;public class Person { private Integer id; private String name; private String perDesc; private Date createTime; private Date updateTime; private String sex; private Float score; private Double price; public Person() { super(); } public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score, Double price) { super(); this.id = id; this.name = name; this.perDesc = perDesc; this.createTime = createTime; this.updateTime = updateTime; this.sex = sex; this.score = score; this.price = price; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getCreateTime() { return createTime; } public String getPerDesc() { return perDesc; } public void setPerDesc(String perDesc) { this.perDesc = perDesc; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public Float getScore() { return score; } public void setScore(Float score) { this.score = score; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; } @Override public String toString() { return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime=" + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]"; }}
PersonLineAggregator
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class PersonLineAggregator implements LineAggregator { //JSON private ObjectMapper mapper=new ObjectMapper(); @Override public String aggregate(Person person) { try { return mapper.writeValueAsString(person); } catch (JsonProcessingException e) { throw new RuntimeException("unable to writer...",e); } }}
PersonRowMapper
package com.dhcc.batch.batchDemo.processor;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;/** * 实现将数据库中的每条数据映射到Person对象中 * @author Administrator * */public class PersonRowMapper implements RowMapper { /** * rs一条结果集,rowNum代表当前行 */ @Override public Person mapRow(ResultSet rs, int rowNum) throws SQLException { return new Person(rs.getInt("id") ,rs.getString("name") ,rs.getString("per_desc") ,rs.getDate("create_time") ,rs.getDate("update_time") ,rs.getString("sex") ,rs.getFloat("score") ,rs.getDouble("price")); }}
ProcessorFileApplication
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class ProcessorFileApplication { public static void main(String[] args) { SpringApplication.run(ProcessorFileApplication.class, args); }}
ProcessorFileOutputFromDBConfiguration
package com.dhcc.batch.batchDemo.processor;import java.io.File;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.CompositeItemProcessor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;@Configurationpublic class ProcessorFileOutputFromDBConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Autowired private ItemProcessor fristNameUpperCaseProcessor; @Autowired private ItemProcessor idFilterProcessor; @Bean public Job ProcessorFileOutputFromDBJob() { return jobBuilderFactory.get("ProcessorFileOutputFromDBJob") .start(ProcessorFileOutputFromDBStep()) .build(); } @Bean public Step ProcessorFileOutputFromDBStep() { return stepBuilderFactory.get("ProcessorFileOutputFromDBStep") .chunk(100) .reader(ProcessorFileOutputFromItemWriter()) .processor(personDataProcessor()) .writer(ProcessorFileOutputFromItemReader()) .build(); } @Bean @StepScope public JdbcPagingItemReader ProcessorFileOutputFromItemWriter() { JdbcPagingItemReader reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); // 设置数据源 reader.setFetchSize(100); // 设置一次最大读取条数 reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中 MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列 queryProvider.setFromClause("from person_buf"); // 设置要查询的表 Map sortKeys = new HashMap();// 定义一个集合用于存放排序列 sortKeys.put("id", Order.ASCENDING);// 按照升序排序 queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider);// 设置排序列 return reader; } @Bean public CompositeItemProcessor personDataProcessor(){ CompositeItemProcessor processor=new CompositeItemProcessor<>(); List> listProcessor=new ArrayList<>(); listProcessor.add(fristNameUpperCaseProcessor); listProcessor.add(idFilterProcessor); processor.setDelegates(listProcessor); return processor; } @Bean @StepScope public FlatFileItemWriter ProcessorFileOutputFromItemReader() { FlatFileItemWriter writer = new FlatFileItemWriter(); try { File path = new File("D:" + File.separator + "newPerson.json").getAbsoluteFile(); System.out.println("file is create in :" + path); writer.setResource(new FileSystemResource(path)); writer.setLineAggregator(new PersonLineAggregator()); writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; }}
FristNameUpperCaseProcessor
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class FristNameUpperCaseProcessor implements ItemProcessor { @Override public Person process(Person item) throws Exception { return new Person(item.getId(), item.getName().toUpperCase(), item.getPerDesc(), item.getCreateTime(), item.getUpdateTime(), item.getSex(), item.getScore(), item.getPrice()); }}
IdFilterProcessor
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class IdFilterProcessor implements ItemProcessor { @Override public Person process(Person item) throws Exception { if (item.getId() % 2 == 0) { return item; } else { return null; } }}
运行结果:
观察写入完成后的文件:
可以看出我们已经完成了我们的目标
数据
数据库
排序
代码
目标
结果
学习
查询
观察
数据处理
过程
处理
最大
业务
代表
升序
大写
奇数
字母
对象
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器轴流风机怎么样
网络安全维护的术语
泰格系统服务器密码如何查看
网络安全大赛专业用语
昆山im即时通讯软件开发
java软件开发样
网络安全和国家安全的关系
如何文件服务器
bgp.net服务器怎么租
天津金山云网络技术有限公司
软件开发人力资源外包合同
根服务器为什么只有13个
云服务器安全脚本
5g网络技术细分产业
台州音频视频系统服务器
蓝牙标签打印软件开发
软件开发 赚钱
河北企业软件开发服务放心可靠
数据库分析师招生简章
云服务器会被病毒攻击吗
天天有彩互联网科技有限公司
自主可控网络安全行业
本地数据库组装
本地应用如何对接云数据库
平利天气预报软件开发
长城R450服务器
spring 操作数据库框架
软件开发对硬盘需求大吗
数据库实体和实体属性
软件开发日志接口