Spring-batch (ItemProcessor) 数据处理过程
发表于:2024-09-30 作者:千家信息网编辑
千家信息网最后更新 2024年09月30日,Spring-batch学习总结(五)学习目标:掌握ItemProcessor1.ItemProcessor:spring-batch中数据处理的过程2.ItemProcessor主要用于实现业务逻辑
千家信息网最后更新 2024年09月30日Spring-batch (ItemProcessor) 数据处理过程
Spring-batch学习总结(五)
学习目标:掌握ItemProcessor
1.ItemProcessor:spring-batch中数据处理的过程
2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等
3.Spring-batch为我们提供ItemProcessor这个接口,它包含一个方法O process(I item
4.我们用代码进行演示:
例:我们读取数据库表person_buf中的数据,将其id为奇数的数据剔除,将读出name进行字母大写转换
首先观察数据库表数据结构:
代码:
Person
package com.dhcc.batch.batchDemo.processor;import java.util.Date;public class Person { private Integer id; private String name; private String perDesc; private Date createTime; private Date updateTime; private String sex; private Float score; private Double price; public Person() { super(); } public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score, Double price) { super(); this.id = id; this.name = name; this.perDesc = perDesc; this.createTime = createTime; this.updateTime = updateTime; this.sex = sex; this.score = score; this.price = price; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getCreateTime() { return createTime; } public String getPerDesc() { return perDesc; } public void setPerDesc(String perDesc) { this.perDesc = perDesc; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public Float getScore() { return score; } public void setScore(Float score) { this.score = score; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; } @Override public String toString() { return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime=" + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]"; }}
PersonLineAggregator
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class PersonLineAggregator implements LineAggregator { //JSON private ObjectMapper mapper=new ObjectMapper(); @Override public String aggregate(Person person) { try { return mapper.writeValueAsString(person); } catch (JsonProcessingException e) { throw new RuntimeException("unable to writer...",e); } }}
PersonRowMapper
package com.dhcc.batch.batchDemo.processor;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;/** * 实现将数据库中的每条数据映射到Person对象中 * @author Administrator * */public class PersonRowMapper implements RowMapper { /** * rs一条结果集,rowNum代表当前行 */ @Override public Person mapRow(ResultSet rs, int rowNum) throws SQLException { return new Person(rs.getInt("id") ,rs.getString("name") ,rs.getString("per_desc") ,rs.getDate("create_time") ,rs.getDate("update_time") ,rs.getString("sex") ,rs.getFloat("score") ,rs.getDouble("price")); }}
ProcessorFileApplication
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class ProcessorFileApplication { public static void main(String[] args) { SpringApplication.run(ProcessorFileApplication.class, args); }}
ProcessorFileOutputFromDBConfiguration
package com.dhcc.batch.batchDemo.processor;import java.io.File;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.CompositeItemProcessor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;@Configurationpublic class ProcessorFileOutputFromDBConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Autowired private ItemProcessor fristNameUpperCaseProcessor; @Autowired private ItemProcessor idFilterProcessor; @Bean public Job ProcessorFileOutputFromDBJob() { return jobBuilderFactory.get("ProcessorFileOutputFromDBJob") .start(ProcessorFileOutputFromDBStep()) .build(); } @Bean public Step ProcessorFileOutputFromDBStep() { return stepBuilderFactory.get("ProcessorFileOutputFromDBStep") .chunk(100) .reader(ProcessorFileOutputFromItemWriter()) .processor(personDataProcessor()) .writer(ProcessorFileOutputFromItemReader()) .build(); } @Bean @StepScope public JdbcPagingItemReader ProcessorFileOutputFromItemWriter() { JdbcPagingItemReader reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); // 设置数据源 reader.setFetchSize(100); // 设置一次最大读取条数 reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中 MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列 queryProvider.setFromClause("from person_buf"); // 设置要查询的表 Map sortKeys = new HashMap();// 定义一个集合用于存放排序列 sortKeys.put("id", Order.ASCENDING);// 按照升序排序 queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider);// 设置排序列 return reader; } @Bean public CompositeItemProcessor personDataProcessor(){ CompositeItemProcessor processor=new CompositeItemProcessor<>(); List> listProcessor=new ArrayList<>(); listProcessor.add(fristNameUpperCaseProcessor); listProcessor.add(idFilterProcessor); processor.setDelegates(listProcessor); return processor; } @Bean @StepScope public FlatFileItemWriter ProcessorFileOutputFromItemReader() { FlatFileItemWriter writer = new FlatFileItemWriter(); try { File path = new File("D:" + File.separator + "newPerson.json").getAbsoluteFile(); System.out.println("file is create in :" + path); writer.setResource(new FileSystemResource(path)); writer.setLineAggregator(new PersonLineAggregator()); writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; }}
FristNameUpperCaseProcessor
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class FristNameUpperCaseProcessor implements ItemProcessor { @Override public Person process(Person item) throws Exception { return new Person(item.getId(), item.getName().toUpperCase(), item.getPerDesc(), item.getCreateTime(), item.getUpdateTime(), item.getSex(), item.getScore(), item.getPrice()); }}
IdFilterProcessor
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class IdFilterProcessor implements ItemProcessor { @Override public Person process(Person item) throws Exception { if (item.getId() % 2 == 0) { return item; } else { return null; } }}
运行结果:
观察写入完成后的文件:
可以看出我们已经完成了我们的目标
数据
数据库
排序
代码
目标
结果
学习
查询
观察
数据处理
过程
处理
最大
业务
代表
升序
大写
奇数
字母
对象
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
是开源数据库
上海计算机网络技术考题
和平精英吃鸡服务器大厅在哪里
京东网络安全负责人
衡水一中学生看网络安全宣传视频
数据库网上书店实训项目
数据库业务面试
csgo测试版怎么进服务器
数学数据库
网络技术有什么推荐的书吗
张家港智能网络技术推荐咨询
上海卫宁互联网科技有限公司
应用软件开发还是软件应用开发
华为手机的服务器怎么使用
软件开发里程碑计划模板
mysql数据库 g
青岛大学网络安全学科评估
网络安全法的成效如何
人工智能软件开发费用是多少
昆明app软件开发方向
福州网络技术培训班
天津市哪有买华3服务器的
双路服务器内存只有一半能用
软件开发如何快速寻找客户
征途2手游怎么进入服务器
戴尔服务器r620
未来数据网络技术有限公司
有数据库的考研方向吗
国外论文用什么数据库
网络安全广播稿高中