千家信息网

Spring-batch (ItemProcessor) 数据处理过程

发表于:2024-09-30 作者:千家信息网编辑
千家信息网最后更新 2024年09月30日,Spring-batch学习总结(五)学习目标:掌握ItemProcessor1.ItemProcessor:spring-batch中数据处理的过程2.ItemProcessor主要用于实现业务逻辑
千家信息网最后更新 2024年09月30日Spring-batch (ItemProcessor) 数据处理过程

Spring-batch学习总结(五)
学习目标:掌握ItemProcessor
1.ItemProcessor:spring-batch中数据处理的过程
2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等
3.Spring-batch为我们提供ItemProcessor这个接口,它包含一个方法O process(I item
4.我们用代码进行演示:
例:我们读取数据库表person_buf中的数据,将其id为奇数的数据剔除,将读出name进行字母大写转换
首先观察数据库表数据结构:

代码:
Person

package com.dhcc.batch.batchDemo.processor;import java.util.Date;public class Person {    private Integer id;    private String name;    private String perDesc;    private Date createTime;    private Date updateTime;    private String sex;    private Float score;    private Double price;    public Person() {        super();    }    public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score,            Double price) {        super();        this.id = id;        this.name = name;        this.perDesc = perDesc;        this.createTime = createTime;        this.updateTime = updateTime;        this.sex = sex;        this.score = score;        this.price = price;    }    public Integer getId() {        return id;    }    public void setId(Integer id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public Date getCreateTime() {        return createTime;    }    public String getPerDesc() {        return perDesc;    }    public void setPerDesc(String perDesc) {        this.perDesc = perDesc;    }    public void setCreateTime(Date createTime) {        this.createTime = createTime;    }    public Date getUpdateTime() {        return updateTime;    }    public void setUpdateTime(Date updateTime) {        this.updateTime = updateTime;    }    public String getSex() {        return sex;    }    public void setSex(String sex) {        this.sex = sex;    }    public Float getScore() {        return score;    }    public void setScore(Float score) {        this.score = score;    }    public Double getPrice() {        return price;    }    public void setPrice(Double price) {        this.price = price;    }    @Override    public String toString() {        return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime="                + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]";    }}

PersonLineAggregator

package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class PersonLineAggregator implements LineAggregator {    //JSON    private ObjectMapper mapper=new ObjectMapper();    @Override    public String aggregate(Person person) {        try {            return mapper.writeValueAsString(person);        } catch (JsonProcessingException e) {            throw new RuntimeException("unable to writer...",e);        }    }}

PersonRowMapper

package com.dhcc.batch.batchDemo.processor;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;/** * 实现将数据库中的每条数据映射到Person对象中 * @author Administrator * */public class PersonRowMapper implements RowMapper {    /**     * rs一条结果集,rowNum代表当前行     */    @Override    public Person mapRow(ResultSet rs, int rowNum) throws SQLException {        return new Person(rs.getInt("id")                ,rs.getString("name")                ,rs.getString("per_desc")                ,rs.getDate("create_time")                ,rs.getDate("update_time")                ,rs.getString("sex")                ,rs.getFloat("score")                ,rs.getDouble("price"));    }}

ProcessorFileApplication

package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class ProcessorFileApplication {    public static void main(String[] args) {        SpringApplication.run(ProcessorFileApplication.class, args);    }}

ProcessorFileOutputFromDBConfiguration

package com.dhcc.batch.batchDemo.processor;import java.io.File;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.CompositeItemProcessor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;@Configurationpublic class ProcessorFileOutputFromDBConfiguration {    @Autowired    private JobBuilderFactory jobBuilderFactory;    @Autowired    private StepBuilderFactory stepBuilderFactory;    @Autowired    private DataSource dataSource;    @Autowired    private ItemProcessor fristNameUpperCaseProcessor;    @Autowired    private ItemProcessor idFilterProcessor;    @Bean    public Job ProcessorFileOutputFromDBJob() {        return jobBuilderFactory.get("ProcessorFileOutputFromDBJob")                .start(ProcessorFileOutputFromDBStep())                .build();    }    @Bean    public Step ProcessorFileOutputFromDBStep() {        return stepBuilderFactory.get("ProcessorFileOutputFromDBStep")                .chunk(100)                .reader(ProcessorFileOutputFromItemWriter())                .processor(personDataProcessor())                .writer(ProcessorFileOutputFromItemReader())                .build();    }    @Bean    @StepScope    public JdbcPagingItemReader ProcessorFileOutputFromItemWriter() {        JdbcPagingItemReader reader = new JdbcPagingItemReader<>();        reader.setDataSource(this.dataSource); // 设置数据源        reader.setFetchSize(100); // 设置一次最大读取条数        reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();        queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列        queryProvider.setFromClause("from person_buf"); // 设置要查询的表        Map sortKeys = new HashMap();// 定义一个集合用于存放排序列        sortKeys.put("id", Order.ASCENDING);// 按照升序排序        queryProvider.setSortKeys(sortKeys);        reader.setQueryProvider(queryProvider);// 设置排序列        return reader;    }    @Bean    public CompositeItemProcessor personDataProcessor(){        CompositeItemProcessor processor=new CompositeItemProcessor<>();        List> listProcessor=new ArrayList<>();        listProcessor.add(fristNameUpperCaseProcessor);        listProcessor.add(idFilterProcessor);        processor.setDelegates(listProcessor);        return processor;    }    @Bean    @StepScope    public FlatFileItemWriter ProcessorFileOutputFromItemReader() {        FlatFileItemWriter writer = new FlatFileItemWriter();        try {            File path = new File("D:" + File.separator + "newPerson.json").getAbsoluteFile();            System.out.println("file is create in :" + path);            writer.setResource(new FileSystemResource(path));            writer.setLineAggregator(new PersonLineAggregator());            writer.afterPropertiesSet();        } catch (Exception e) {            e.printStackTrace();        }        return writer;    }}

FristNameUpperCaseProcessor

package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class FristNameUpperCaseProcessor implements ItemProcessor {    @Override    public Person process(Person item) throws Exception {        return new Person(item.getId(), item.getName().toUpperCase(), item.getPerDesc(), item.getCreateTime(),                item.getUpdateTime(), item.getSex(), item.getScore(), item.getPrice());    }}

IdFilterProcessor

package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class IdFilterProcessor implements ItemProcessor {    @Override    public Person process(Person item) throws Exception {        if (item.getId() % 2 == 0) {            return item;        } else {            return null;        }    }}

运行结果:

观察写入完成后的文件:

可以看出我们已经完成了我们的目标

0