千家信息网

Spring-batch(ItemWriter)数据写入数据库,普通文件,xml文件,多文件分类写入

发表于:2024-11-23 作者:千家信息网编辑
千家信息网最后更新 2024年11月23日,Spring-batch学习总结(四)一.ItemWriter简介1.对于read读取数据时是一个item为单位的循环读取,而对于writer写入数据则是以chunk为单位,一块一块的进行写入2.例(
千家信息网最后更新 2024年11月23日Spring-batch(ItemWriter)数据写入数据库,普通文件,xml文件,多文件分类写入

Spring-batch学习总结(四)
一.ItemWriter简介
1.对于read读取数据时是一个item为单位的循环读取,而对于writer写入数据则是以chunk为单位,一块一块的进行写入
2.例(我们举一个小例子来认识其writer原理):
代码:
OutOverViewApplication

package com.dhcc.batch.batchDemo.output.outview;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class OutOverViewApplication {    public static void main(String[] args) {        SpringApplication.run(OutOverViewApplication.class, args);    }}

OutputViewItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.outview;import java.util.ArrayList;import java.util.List;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class OutputViewItemWriterConfiguration {    @Autowired    private JobBuilderFactory jobBuilderFactory;    @Autowired    private StepBuilderFactory stepBuilderFactory;    @Autowired    @Qualifier("OutputViewItemWriter")    private ItemWriter outputViewItemWriter;    @Bean    public Job OutputViewItemWriterJob3() {        return jobBuilderFactory.get("OutputViewItemWriterJob3")                .start(OutputViewItemWriterStep3())                .build();    }    @Bean    public Step OutputViewItemWriterStep3() {        return stepBuilderFactory.get("OutputViewItemWriterStep3")                .chunk(10)                .reader(listViewItemRead())                .writer(outputViewItemWriter)                .build();    }    @Bean    @StepScope    public ListItemViewReader listViewItemRead() {        List dataList=new ArrayList<>();        for(int i=0;i<100;i++) {            dataList.add("my name is zhongqiujie"+i);        }        return new ListItemViewReader(dataList);    }}

ListItemViewReader

package com.dhcc.batch.batchDemo.output.outview;import java.util.Iterator;import java.util.List;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.NonTransientResourceException;import org.springframework.batch.item.ParseException;import org.springframework.batch.item.UnexpectedInputException;@SuppressWarnings("hiding")public class ListItemViewReader implements ItemReader{    private final Iterator iterator;    public ListItemViewReader(List data) {        this.iterator = data.iterator();    }    @Override    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {        if (iterator.hasNext()) {            return this.iterator.next();        } else {            return null;        }    }}

OutputViewItemWriter implements

package com.dhcc.batch.batchDemo.output.outview;import java.util.List;import org.springframework.batch.item.ItemWriter;import org.springframework.stereotype.Component;@Component("OutputViewItemWriter")public class OutputViewItemWriter implements ItemWriter {    @Override    public void write(List items) throws Exception {        System.out.println("writer chunk size is :" + items.size());        for (String item : items) {            System.out.println("writer data is:" + item);        }    }}

运行结果:

二.将数据写入到数据库
1.在spring batch中为我们提供了许多将数据写入到数据库中的writer
(1)Neo4jItemWriter;
(2)MongoItemWriter;
..........
2.此处我们只学习JdbcBatchItemWriter
例:我们先在数据库中建立数据表alipaytrando,结构如下:

接下来我们将项目中的springbatchtest2文件读出并写入到数据库表alipaytrando中
Springbatchtest2文件结构如下:

开始写代码:
AlipayTranDo

package com.dhcc.batch.batchDemo.output.db.entity;public class AlipayTranDo {        private String tranId;        private String channel;        private String tranType;        private String counterparty;        private String goods;        private String amount;        private String isDebitCredit;        private String state;        public AlipayTranDo(String tranId, String channel, String tranType, String counterparty, String goods,                String amount, String isDebitCredit, String state) {            super();            this.tranId = tranId;            this.channel = channel;            this.tranType = tranType;            this.counterparty = counterparty;            this.goods = goods;            this.amount = amount;            this.isDebitCredit = isDebitCredit;            this.state = state;        }        public String getTranId() {            return tranId;        }        public void setTranId(String tranId) {            this.tranId = tranId;        }        public String getChannel() {            return channel;        }        public void setChannel(String channel) {            this.channel = channel;        }        public String getTranType() {            return tranType;        }        public void setTranType(String tranType) {            this.tranType = tranType;        }        public String getCounterparty() {            return counterparty;        }        public void setCounterparty(String counterparty) {            this.counterparty = counterparty;        }        public String getGoods() {            return goods;        }        public void setGoods(String goods) {            this.goods = goods;        }        public String getAmount() {            return amount;        }        public void setAmount(String amount) {            this.amount = amount;        }        public String getIsDebitCredit() {            return isDebitCredit;        }        public void setIsDebitCredit(String isDebitCredit) {            this.isDebitCredit = isDebitCredit;        }        public String getState() {            return state;        }        public void setState(String state) {            this.state = state;        }        @Override        public String toString() {            return "AlipayTranDO{" +                    "tranId='" + tranId + '\'' +                    ", channel='" + channel + '\'' +                    ", tranType='" + tranType + '\'' +                    ", counterparty='" + counterparty + '\'' +                    ", goods='" + goods + '\'' +                    ", amount='" + amount + '\'' +                    ", isDebitCredit='" + isDebitCredit + '\'' +                    ", state='" + state + '\'' +                    '}';        }    }

AlipayTranDoFileMapper

package com.dhcc.batch.batchDemo.output.db.util;import org.springframework.batch.item.file.mapping.FieldSetMapper;import org.springframework.batch.item.file.transform.FieldSet;import org.springframework.validation.BindException;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;public class AlipayTranDoFileMapper implements FieldSetMapper {    @Override    public AlipayTranDo mapFieldSet(FieldSet fieldSet) throws BindException {        return new AlipayTranDo(fieldSet.readString("tranId")                , fieldSet.readString("channel")                ,fieldSet.readString("tranType")                , fieldSet.readString("counterparty")                , fieldSet.readString("goods")                ,fieldSet.readString("amount")                , fieldSet.readString("isDebitCredit")                , fieldSet.readString("state")                );    }}

OutputItemWriterDBApplication

package com.dhcc.batch.batchDemo.output.db.jdbcout;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class OutputItemWriterDBApplication {    public static void main(String[] args) {        SpringApplication.run(OutputItemWriterDBApplication.class, args);    }}

*OutputItemWriterDBConfiguration

package com.dhcc.batch.batchDemo.output.db.jdbcout;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;@Configurationpublic class OutputItemWriterDBConfiguration {    @Autowired    private JobBuilderFactory jobBuilderFactory;    @Autowired    private StepBuilderFactory stepBuilderFactory;    @Autowired    @Qualifier("outputDBItemReader")    private ItemReader outputDBItemReader;    @Autowired    @Qualifier("outputDBItemWriter")    private ItemWriter outputDBItemWriter;    @Autowired    private MyProcess myProcess;    @Bean    public Job OutputItemWriterDBJob2() {        return jobBuilderFactory.get("OutputItemWriterDBJob2").start(OutputItemWriterDBStep2()).build();    }    @Bean    public Step OutputItemWriterDBStep2() {        return stepBuilderFactory.get("OutputItemWriterDBStep2").chunk(50)                .reader(outputDBItemReader)                .processor(myProcess)                .writer(outputDBItemWriter)                .build();    }}

OutputItemWriterDBItemReaderConfiguration

package com.dhcc.batch.batchDemo.output.db.jdbcout;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.mapping.DefaultLineMapper;import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;import com.dhcc.batch.batchDemo.output.db.util.AlipayTranDoFileMapper;@Configurationpublic class OutputItemWriterDBItemReaderConfiguration {    @Bean    public FlatFileItemReader outputDBItemReader(){        FlatFileItemReader reader=new FlatFileItemReader();        reader.setEncoding("UTF-8");        reader.setResource(new ClassPathResource("/data/init/springbatchtest2.csv"));        reader.setLinesToSkip(5);        DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();        tokenizer.setNames(new String[]                 {"tranId","channel","tranType","counterparty","goods","amount","isDebitCredit","state"}        );        DefaultLineMapper lineMapper=new DefaultLineMapper();        lineMapper.setLineTokenizer(tokenizer);        lineMapper.setFieldSetMapper(new AlipayTranDoFileMapper());        lineMapper.afterPropertiesSet();        reader.setLineMapper(lineMapper);        return reader;    }}

MyProcess

package com.dhcc.batch.batchDemo.output.db.jdbcout;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;@Componentpublic class MyProcess implements ItemProcessor {    @Override    public AlipayTranDo process(AlipayTranDo item) throws Exception {        System.out.println(item);        return item;    }}

OutputItemWriterDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.db.jdbcout;import javax.sql.DataSource;import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;@Configurationpublic class OutputItemWriterDBItemWriterConfiguration {    @Autowired    private DataSource dataSource;    @Bean    public JdbcBatchItemWriter outputDBItemWriter() {        System.out.println();        JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>();        writer.setDataSource(dataSource);        writer.setSql(                "insert into alipaytrando"                + "(tranId,channel,tranType,counterparty,goods,amount,isDebitCredit,state) values"                + "(:tranId,:channel,:tranType,:counterparty,:goods,:amount,:isDebitCredit,:state) ");        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());        return writer;    }}

运行结果:

观察控制台可得我们的项目运行成功,接下来我们再到数据中观察数据是否成功插入

发现表中数据已经插入成功

三.将数据写入到普通文件中
1.FlatFileItemWriter可以将任何一个类型为T的对象数据写入到普通文件中
2.例:我们将数据库中的alipaytrando中的数据读出并且写入到普通文件中接下里我们开始编写代码:
实体类AlipayTranDo与上一个例子一样,我们不在重复展示
AlipayTranDoFileMapper

package com.dhcc.batch.batchDemo.output.flatfile;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;public class AlipayTranDoFileMapper implements RowMapper {    @Override    public AlipayTranDo mapRow(ResultSet rs, int rowNum) throws SQLException {        return new AlipayTranDo(rs.getString("tranId"), rs.getString("channel"), rs.getString("tranType"),                rs.getString("counterparty"), rs.getString("goods"), rs.getString("amount"),                rs.getString("isDebitCredit"), rs.getString("state"));    }}

AlipayTranDoLineAggregator

package com.dhcc.batch.batchDemo.output.flatfile;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class AlipayTranDoLineAggregator implements LineAggregator {    //JSON    private ObjectMapper mapper=new ObjectMapper();    @Override    public String aggregate(AlipayTranDo alipayTranDo) {        try {            return mapper.writeValueAsString(alipayTranDo);        } catch (JsonProcessingException e) {            throw new RuntimeException("unable to writer...",e);        }    }}

FlatFileOutputFromDBConfiguration

package com.dhcc.batch.batchDemo.output.flatfile;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FlatFileOutputFromDBConfiguration {    @Autowired    private JobBuilderFactory jobBuilderFactory;    @Autowired    private StepBuilderFactory stepBuilderFactory;    @Autowired    @Qualifier("flatFileOutputFromDBItemReader")    private ItemReader flatFileOutputFromDBItemReader;    @Autowired    @Qualifier("flatFileOutputFromDBItemWriter")    private ItemWriter flatFileOutputFromDBItemWriter;    @Bean    public Job FlatFileOutputFromDBJob() {        return jobBuilderFactory.get("FlatFileOutputFromDBJob").start(FlatFileOutputFromDBStep()).build();    }    @Bean    public Step FlatFileOutputFromDBStep() {        return stepBuilderFactory.get("FlatFileOutputFromDBStep").chunk(100)                .reader(flatFileOutputFromDBItemReader).writer(flatFileOutputFromDBItemWriter).build();    }}

FlatFileOutputFromDBItemReaderConfiguration

package com.dhcc.batch.batchDemo.output.flatfile;import java.util.HashMap;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FlatFileOutputFromDBItemReaderConfiguration {    @Autowired    private DataSource dataSource;    @Bean    public JdbcPagingItemReader flatFileOutputFromDBItemReader() {        JdbcPagingItemReader reader = new JdbcPagingItemReader<>();        reader.setDataSource(this.dataSource); // 设置数据源        reader.setFetchSize(100); // 设置一次最大读取条数        reader.setRowMapper(new AlipayTranDoFileMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();        queryProvider.setSelectClause("tranId,channel,tranType,counterparty,goods,amount,isDebitCredit,state"); // 设置查询的列        queryProvider.setFromClause("from alipaytrando"); // 设置要查询的表        Map sortKeys = new HashMap();// 定义一个集合用于存放排序列        sortKeys.put("tranId", Order.ASCENDING);// 按照升序排序        queryProvider.setSortKeys(sortKeys);        reader.setQueryProvider(queryProvider);// 设置排序列        return reader;    }}

FlatFileOutputFromDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.flatfile;import java.io.File;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;@Configurationpublic class FlatFileOutputFromDBItemWriterConfiguration {    @Bean    public FlatFileItemWriter flatFileOutputFromDBItemWriter(){        FlatFileItemWriter writer=new FlatFileItemWriter();        try {            File path=new File("D:"+File.separator+"alipayTranDo.data").getAbsoluteFile();//          String path=File.createTempFile("alipayTranDo", ".data").getAbsolutePath();            System.out.println("file is create in :"+path);            writer.setResource(new FileSystemResource(path));            writer.setLineAggregator(new AlipayTranDoLineAggregator());            writer.afterPropertiesSet();        } catch (Exception e) {            e.printStackTrace();        }        return writer;    }}

OutputItemWriterFlatFileApplication

package com.dhcc.batch.batchDemo.output.flatfile;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class OutputItemWriterFlatFileApplication {    public static void main(String[] args) {        SpringApplication.run(OutputItemWriterFlatFileApplication.class, args);    }}

运行结果:

控制台显示文件读取写入成功,我们根据文件地址,观察写入后的普通文件

四.将数据写入到xml文件中
1.将数据写入到xml文件中,我们必须用到StaxEventItemWriter;
2.我们也会用到XStreamMarshall来序列文件
例:我们将数据库表alipaytrando中的数据写入到本地磁盘中
代码(此处我们只展示writer,用来写入的类,其他的均与上一个例子相同):

XMLFileOutputFromDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.xmlfile;import java.io.File;import java.util.HashMap;import java.util.Map;import org.springframework.batch.item.xml.StaxEventItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;import org.springframework.oxm.xstream.XStreamMarshaller;@Configurationpublic class XMLFileOutputFromDBItemWriterConfiguration {    @Bean    public StaxEventItemWriter xmlFileOutputFromDBItemWriter() throws Exception {        XStreamMarshaller marshaller = new XStreamMarshaller();        @SuppressWarnings("rawtypes")        Map aliases = new HashMap<>();        aliases.put("alipayTranDo", AlipayTranDo.class);        marshaller.setAliases(aliases);        StaxEventItemWriter writer = new StaxEventItemWriter<>();        writer.setRootTagName("alipaytrandos");        writer.setMarshaller(marshaller);        File path = new File("D:" + File.separator + "alipayTranDo.xml").getAbsoluteFile();        System.out.println("file is create in :" + path);        writer.setResource(new FileSystemResource(path));        writer.afterPropertiesSet();        return writer;    }}

运行结果:

根据地址观察写入后的xml文件

五.将数据写入到多文件
1.将数据写入多个文件,我们使用CompositItemWriter或者使用ClassifierCompositItemWriter
2.例(1):我们将数据表alipaytrandao中的数据分别写入到xml文件和json文件中
此处我们只展示writer(其余代码与上例相同):
mutipleFileOutputFromDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.mutiple.composit;import java.io.File;import java.util.Arrays;import java.util.HashMap;import java.util.Map;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.CompositeItemWriter;import org.springframework.batch.item.xml.StaxEventItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;import org.springframework.oxm.xstream.XStreamMarshaller;@Configurationpublic class mutipleFileOutputFromDBItemWriterConfiguration {    @Bean    public FlatFileItemWriter jsonFileItemWriter(){        FlatFileItemWriter writer=new FlatFileItemWriter();        try {            File path=new File("D:"+File.separator+"alipayTranDo1.json").getAbsoluteFile();//          String path=File.createTempFile("alipayTranDo", ".json").getAbsolutePath();            System.out.println("file is create in :"+path);            writer.setResource(new FileSystemResource(path));            writer.setLineAggregator(new AlipayTranDoLineAggregator());            writer.afterPropertiesSet();        } catch (Exception e) {            e.printStackTrace();        }        return writer;    }    @Bean     public StaxEventItemWriter xmlFileItemWriter() throws Exception{        XStreamMarshaller marshaller=new XStreamMarshaller();        @SuppressWarnings("rawtypes")        Map aliases=new HashMap<>();        aliases.put("alipayTranDo", AlipayTranDo.class);        marshaller.setAliases(aliases);        StaxEventItemWriter writer=new StaxEventItemWriter<>();        writer.setRootTagName("alipaytrandos");        writer.setMarshaller(marshaller);        File path=new File("D:"+File.separator+"alipayTranDo1.xml").getAbsoluteFile();        System.out.println("file is create in :"+path);        writer.setResource(new FileSystemResource(path));        writer.afterPropertiesSet();        return writer;    }    @Bean    public CompositeItemWriter alipayTranDoFileOutputFromDBItemWriter() throws Exception{        CompositeItemWriter itemWriter=new CompositeItemWriter<>();        itemWriter.setDelegates(Arrays.asList(xmlFileItemWriter(),jsonFileItemWriter()));        itemWriter.afterPropertiesSet();        return itemWriter;    }}

运行结果:

观察文件:
Json:

Xml:

3.例(2):我们将同一个文件进行分类写入:
首先我们观察数据库表person_buf的数据结构(数据总数是10001):

我们的目标是将数据从数据库读出按照id的奇偶分别写入不同类型的文件中
接下来上代码:
Person

package com.dhcc.batch.batchDemo.output.mutiple.classifier;import java.util.Date;public class Person {    private Integer id;    private String name;    private String perDesc;    private Date createTime;    private Date updateTime;    private String sex;    private Float score;    private Double price;    public Person() {        super();    }    public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score,            Double price) {        super();        this.id = id;        this.name = name;        this.perDesc = perDesc;        this.createTime = createTime;        this.updateTime = updateTime;        this.sex = sex;        this.score = score;        this.price = price;    }    public Integer getId() {        return id;    }    public void setId(Integer id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public Date getCreateTime() {        return createTime;    }    public String getPerDesc() {        return perDesc;    }    public void setPerDesc(String perDesc) {        this.perDesc = perDesc;    }    public void setCreateTime(Date createTime) {        this.createTime = createTime;    }    public Date getUpdateTime() {        return updateTime;    }    public void setUpdateTime(Date updateTime) {        this.updateTime = updateTime;    }    public String getSex() {        return sex;    }    public void setSex(String sex) {        this.sex = sex;    }    public Float getScore() {        return score;    }    public void setScore(Float score) {        this.score = score;    }    public Double getPrice() {        return price;    }    public void setPrice(Double price) {        this.price = price;    }    @Override    public String toString() {        return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime="                + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]";    }}

PersonLineAggregator

package com.dhcc.batch.batchDemo.output.mutiple.classifier;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class PersonLineAggregator implements LineAggregator {    //JSON    private ObjectMapper mapper=new ObjectMapper();    @Override    public String aggregate(Person person) {        try {            return mapper.writeValueAsString(person);        } catch (JsonProcessingException e) {            throw new RuntimeException("unable to writer...",e);        }    }}

PersonRowMapper

package com.dhcc.batch.batchDemo.output.mutiple.classifier;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;/** * 实现将数据库中的每条数据映射到Person对象中 * @author Administrator * */public class PersonRowMapper implements RowMapper {    /**     * rs一条结果集,rowNum代表当前行     */    @Override    public Person mapRow(ResultSet rs, int rowNum) throws SQLException {        return new Person(rs.getInt("id")                ,rs.getString("name")                ,rs.getString("per_desc")                ,rs.getDate("create_time")                ,rs.getDate("update_time")                ,rs.getString("sex")                ,rs.getFloat("score")                ,rs.getDouble("price"));    }}

OutputItemWriterMutipleClassFileApplication

package com.dhcc.batch.batchDemo.output.mutiple.classifier;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class OutputItemWriterMutipleClassFileApplication {    public static void main(String[] args) {        SpringApplication.run(OutputItemWriterMutipleClassFileApplication.class, args);    }}

ClassifierMutipleFileOutputFromDBConfiguration

package com.dhcc.batch.batchDemo.output.mutiple.classifier;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemStream;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class ClassifierMutipleFileOutputFromDBConfiguration {    @Autowired    private JobBuilderFactory jobBuilderFactory;    @Autowired    private StepBuilderFactory stepBuilderFactory;    @Autowired    @Qualifier("mutipleFileOutputFromDBItemReader")    private ItemReader mutipleFileOutputFromDBItemReader;    @Autowired    @Qualifier("alipayTranDoFileOutputFromDBItemWriter")    private ItemWriter alipayTranDoFileOutputFromDBItemWriter;    @Autowired    @Qualifier("jsonFileItemWriter")    private ItemStream jsonFileItemWriter;    @Autowired    @Qualifier("xmlFileItemWriter")    private ItemStream xmlFileItemWriter;    @Bean    public Job mutipleFileOutputFromDBJob1() {        return jobBuilderFactory.get("mutipleFileOutputFromDBJob1")                .start(mutipleFileOutputFromDBStep1())                .build();    }    @Bean    public Step mutipleFileOutputFromDBStep1() {        return stepBuilderFactory.get("mutipleFileOutputFromDBStep1").chunk(100)                .reader(mutipleFileOutputFromDBItemReader)                .writer(alipayTranDoFileOutputFromDBItemWriter)                .stream(jsonFileItemWriter)                .stream(xmlFileItemWriter)                .build();    }}

mutipleFileOutputFromDBItemReaderConfiguration

package com.dhcc.batch.batchDemo.output.mutiple.classifier;import java.util.HashMap;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class mutipleFileOutputFromDBItemReaderConfiguration {    @Autowired    private DataSource dataSource;    @Bean    public JdbcPagingItemReader mutipleFileOutputFromDBItemReader() {        JdbcPagingItemReader reader = new JdbcPagingItemReader<>();        reader.setDataSource(this.dataSource); // 设置数据源        reader.setFetchSize(100); // 设置一次最大读取条数        reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();        queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列        queryProvider.setFromClause("from person_buf"); // 设置要查询的表        Map sortKeys = new HashMap();// 定义一个集合用于存放排序列        sortKeys.put("id", Order.ASCENDING);// 按照升序排序        queryProvider.setSortKeys(sortKeys);        reader.setQueryProvider(queryProvider);// 设置排序列        return reader;    }}

mutipleFileOutputFromDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.mutiple.classifier;import java.io.File;import java.util.HashMap;import java.util.Map;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.ClassifierCompositeItemWriter;import org.springframework.batch.item.xml.StaxEventItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;import org.springframework.oxm.xstream.XStreamMarshaller;@Configurationpublic class mutipleFileOutputFromDBItemWriterConfiguration {    @Bean    public FlatFileItemWriter jsonFileItemWriter(){        FlatFileItemWriter writer=new FlatFileItemWriter();        try {            File path=new File("D:"+File.separator+"person.json").getAbsoluteFile();            System.out.println("file is create in :"+path);            writer.setResource(new FileSystemResource(path));            writer.setLineAggregator(new PersonLineAggregator());            writer.afterPropertiesSet();        } catch (Exception e) {            e.printStackTrace();        }        return writer;    }    @Bean     public StaxEventItemWriter xmlFileItemWriter() throws Exception{        XStreamMarshaller marshaller=new XStreamMarshaller();        @SuppressWarnings("rawtypes")        Map aliases=new HashMap<>();        aliases.put("person", Person.class);        marshaller.setAliases(aliases);        StaxEventItemWriter writer=new StaxEventItemWriter<>();        writer.setRootTagName("persons");        writer.setMarshaller(marshaller);        File path=new File("D:"+File.separator+"person.xml").getAbsoluteFile();        System.out.println("file is create in :"+path);        writer.setResource(new FileSystemResource(path));        writer.afterPropertiesSet();        return writer;    }    @Bean    public ClassifierCompositeItemWriter alipayTranDoFileOutputFromDBItemWriter() throws Exception{        ClassifierCompositeItemWriter itemWriter=new ClassifierCompositeItemWriter();        itemWriter.setClassifier(new MyWriterClassifier(jsonFileItemWriter(),xmlFileItemWriter()));        return itemWriter;    }}

MyWriterClassifier

package com.dhcc.batch.batchDemo.output.mutiple.classifier;import org.springframework.batch.item.ItemWriter;import org.springframework.classify.Classifier;public class MyWriterClassifier implements Classifier> {    private ItemWriter jsonWriter;    private ItemWriter xmlWriter;    /**     *      */    private static final long serialVersionUID = -2911015707834323846L;    public MyWriterClassifier(ItemWriter jsonWriter, ItemWriter xmlWriter) {        this.jsonWriter = jsonWriter;        this.xmlWriter = xmlWriter;    }    @Override    public ItemWriter classify(Person classifiable) {        if (classifiable.getId()%2==0) {            return jsonWriter;        }else {            return xmlWriter;        }    }}

运行结果:

观察文件:
Person.json:(我们可以看出id为偶数的都写在了json文件中)

Person.xml:(我们可以看出id为奇数的都写在了xml文件中)

0