Spring-batch(ItemWriter)数据写入数据库,普通文件,xml文件,多文件分类写入
Spring-batch学习总结(四)
一.ItemWriter简介
1.对于read读取数据时是一个item为单位的循环读取,而对于writer写入数据则是以chunk为单位,一块一块的进行写入
2.例(我们举一个小例子来认识其writer原理):
代码:
OutOverViewApplication
package com.dhcc.batch.batchDemo.output.outview;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class OutOverViewApplication { public static void main(String[] args) { SpringApplication.run(OutOverViewApplication.class, args); }}
OutputViewItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.outview;import java.util.ArrayList;import java.util.List;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class OutputViewItemWriterConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("OutputViewItemWriter") private ItemWriter super String> outputViewItemWriter; @Bean public Job OutputViewItemWriterJob3() { return jobBuilderFactory.get("OutputViewItemWriterJob3") .start(OutputViewItemWriterStep3()) .build(); } @Bean public Step OutputViewItemWriterStep3() { return stepBuilderFactory.get("OutputViewItemWriterStep3") .chunk(10) .reader(listViewItemRead()) .writer(outputViewItemWriter) .build(); } @Bean @StepScope public ListItemViewReader listViewItemRead() { List dataList=new ArrayList<>(); for(int i=0;i<100;i++) { dataList.add("my name is zhongqiujie"+i); } return new ListItemViewReader(dataList); }}
ListItemViewReader
package com.dhcc.batch.batchDemo.output.outview;import java.util.Iterator;import java.util.List;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.NonTransientResourceException;import org.springframework.batch.item.ParseException;import org.springframework.batch.item.UnexpectedInputException;@SuppressWarnings("hiding")public class ListItemViewReader implements ItemReader{ private final Iterator iterator; public ListItemViewReader(List data) { this.iterator = data.iterator(); } @Override public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { if (iterator.hasNext()) { return this.iterator.next(); } else { return null; } }}
OutputViewItemWriter implements
package com.dhcc.batch.batchDemo.output.outview;import java.util.List;import org.springframework.batch.item.ItemWriter;import org.springframework.stereotype.Component;@Component("OutputViewItemWriter")public class OutputViewItemWriter implements ItemWriter { @Override public void write(List extends String> items) throws Exception { System.out.println("writer chunk size is :" + items.size()); for (String item : items) { System.out.println("writer data is:" + item); } }}
运行结果:
二.将数据写入到数据库
1.在spring batch中为我们提供了许多将数据写入到数据库中的writer
(1)Neo4jItemWriter;
(2)MongoItemWriter;
..........
2.此处我们只学习JdbcBatchItemWriter
例:我们先在数据库中建立数据表alipaytrando,结构如下:
接下来我们将项目中的springbatchtest2文件读出并写入到数据库表alipaytrando中
Springbatchtest2文件结构如下:
开始写代码:
AlipayTranDo
package com.dhcc.batch.batchDemo.output.db.entity;public class AlipayTranDo { private String tranId; private String channel; private String tranType; private String counterparty; private String goods; private String amount; private String isDebitCredit; private String state; public AlipayTranDo(String tranId, String channel, String tranType, String counterparty, String goods, String amount, String isDebitCredit, String state) { super(); this.tranId = tranId; this.channel = channel; this.tranType = tranType; this.counterparty = counterparty; this.goods = goods; this.amount = amount; this.isDebitCredit = isDebitCredit; this.state = state; } public String getTranId() { return tranId; } public void setTranId(String tranId) { this.tranId = tranId; } public String getChannel() { return channel; } public void setChannel(String channel) { this.channel = channel; } public String getTranType() { return tranType; } public void setTranType(String tranType) { this.tranType = tranType; } public String getCounterparty() { return counterparty; } public void setCounterparty(String counterparty) { this.counterparty = counterparty; } public String getGoods() { return goods; } public void setGoods(String goods) { this.goods = goods; } public String getAmount() { return amount; } public void setAmount(String amount) { this.amount = amount; } public String getIsDebitCredit() { return isDebitCredit; } public void setIsDebitCredit(String isDebitCredit) { this.isDebitCredit = isDebitCredit; } public String getState() { return state; } public void setState(String state) { this.state = state; } @Override public String toString() { return "AlipayTranDO{" + "tranId='" + tranId + '\'' + ", channel='" + channel + '\'' + ", tranType='" + tranType + '\'' + ", counterparty='" + counterparty + '\'' + ", goods='" + goods + '\'' + ", amount='" + amount + '\'' + ", isDebitCredit='" + isDebitCredit + '\'' + ", state='" + state + '\'' + '}'; } }
AlipayTranDoFileMapper
package com.dhcc.batch.batchDemo.output.db.util;import org.springframework.batch.item.file.mapping.FieldSetMapper;import org.springframework.batch.item.file.transform.FieldSet;import org.springframework.validation.BindException;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;public class AlipayTranDoFileMapper implements FieldSetMapper { @Override public AlipayTranDo mapFieldSet(FieldSet fieldSet) throws BindException { return new AlipayTranDo(fieldSet.readString("tranId") , fieldSet.readString("channel") ,fieldSet.readString("tranType") , fieldSet.readString("counterparty") , fieldSet.readString("goods") ,fieldSet.readString("amount") , fieldSet.readString("isDebitCredit") , fieldSet.readString("state") ); }}
OutputItemWriterDBApplication
package com.dhcc.batch.batchDemo.output.db.jdbcout;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class OutputItemWriterDBApplication { public static void main(String[] args) { SpringApplication.run(OutputItemWriterDBApplication.class, args); }}
*OutputItemWriterDBConfiguration
package com.dhcc.batch.batchDemo.output.db.jdbcout;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;@Configurationpublic class OutputItemWriterDBConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("outputDBItemReader") private ItemReader extends AlipayTranDo> outputDBItemReader; @Autowired @Qualifier("outputDBItemWriter") private ItemWriter super AlipayTranDo> outputDBItemWriter; @Autowired private MyProcess myProcess; @Bean public Job OutputItemWriterDBJob2() { return jobBuilderFactory.get("OutputItemWriterDBJob2").start(OutputItemWriterDBStep2()).build(); } @Bean public Step OutputItemWriterDBStep2() { return stepBuilderFactory.get("OutputItemWriterDBStep2").chunk(50) .reader(outputDBItemReader) .processor(myProcess) .writer(outputDBItemWriter) .build(); }}
OutputItemWriterDBItemReaderConfiguration
package com.dhcc.batch.batchDemo.output.db.jdbcout;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.mapping.DefaultLineMapper;import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;import com.dhcc.batch.batchDemo.output.db.util.AlipayTranDoFileMapper;@Configurationpublic class OutputItemWriterDBItemReaderConfiguration { @Bean public FlatFileItemReader outputDBItemReader(){ FlatFileItemReader reader=new FlatFileItemReader(); reader.setEncoding("UTF-8"); reader.setResource(new ClassPathResource("/data/init/springbatchtest2.csv")); reader.setLinesToSkip(5); DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer(); tokenizer.setNames(new String[] {"tranId","channel","tranType","counterparty","goods","amount","isDebitCredit","state"} ); DefaultLineMapper lineMapper=new DefaultLineMapper(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new AlipayTranDoFileMapper()); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; }}
MyProcess
package com.dhcc.batch.batchDemo.output.db.jdbcout;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;@Componentpublic class MyProcess implements ItemProcessor { @Override public AlipayTranDo process(AlipayTranDo item) throws Exception { System.out.println(item); return item; }}
OutputItemWriterDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.db.jdbcout;import javax.sql.DataSource;import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;@Configurationpublic class OutputItemWriterDBItemWriterConfiguration { @Autowired private DataSource dataSource; @Bean public JdbcBatchItemWriter outputDBItemWriter() { System.out.println(); JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>(); writer.setDataSource(dataSource); writer.setSql( "insert into alipaytrando" + "(tranId,channel,tranType,counterparty,goods,amount,isDebitCredit,state) values" + "(:tranId,:channel,:tranType,:counterparty,:goods,:amount,:isDebitCredit,:state) "); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider()); return writer; }}
运行结果:
观察控制台可得我们的项目运行成功,接下来我们再到数据中观察数据是否成功插入
发现表中数据已经插入成功
三.将数据写入到普通文件中
1.FlatFileItemWriter可以将任何一个类型为T的对象数据写入到普通文件中
2.例:我们将数据库中的alipaytrando中的数据读出并且写入到普通文件中接下里我们开始编写代码:
实体类AlipayTranDo与上一个例子一样,我们不在重复展示
AlipayTranDoFileMapper
package com.dhcc.batch.batchDemo.output.flatfile;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;public class AlipayTranDoFileMapper implements RowMapper { @Override public AlipayTranDo mapRow(ResultSet rs, int rowNum) throws SQLException { return new AlipayTranDo(rs.getString("tranId"), rs.getString("channel"), rs.getString("tranType"), rs.getString("counterparty"), rs.getString("goods"), rs.getString("amount"), rs.getString("isDebitCredit"), rs.getString("state")); }}
AlipayTranDoLineAggregator
package com.dhcc.batch.batchDemo.output.flatfile;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class AlipayTranDoLineAggregator implements LineAggregator { //JSON private ObjectMapper mapper=new ObjectMapper(); @Override public String aggregate(AlipayTranDo alipayTranDo) { try { return mapper.writeValueAsString(alipayTranDo); } catch (JsonProcessingException e) { throw new RuntimeException("unable to writer...",e); } }}
FlatFileOutputFromDBConfiguration
package com.dhcc.batch.batchDemo.output.flatfile;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FlatFileOutputFromDBConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("flatFileOutputFromDBItemReader") private ItemReader extends AlipayTranDo> flatFileOutputFromDBItemReader; @Autowired @Qualifier("flatFileOutputFromDBItemWriter") private ItemWriter super AlipayTranDo> flatFileOutputFromDBItemWriter; @Bean public Job FlatFileOutputFromDBJob() { return jobBuilderFactory.get("FlatFileOutputFromDBJob").start(FlatFileOutputFromDBStep()).build(); } @Bean public Step FlatFileOutputFromDBStep() { return stepBuilderFactory.get("FlatFileOutputFromDBStep").chunk(100) .reader(flatFileOutputFromDBItemReader).writer(flatFileOutputFromDBItemWriter).build(); }}
FlatFileOutputFromDBItemReaderConfiguration
package com.dhcc.batch.batchDemo.output.flatfile;import java.util.HashMap;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FlatFileOutputFromDBItemReaderConfiguration { @Autowired private DataSource dataSource; @Bean public JdbcPagingItemReader flatFileOutputFromDBItemReader() { JdbcPagingItemReader reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); // 设置数据源 reader.setFetchSize(100); // 设置一次最大读取条数 reader.setRowMapper(new AlipayTranDoFileMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中 MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("tranId,channel,tranType,counterparty,goods,amount,isDebitCredit,state"); // 设置查询的列 queryProvider.setFromClause("from alipaytrando"); // 设置要查询的表 Map sortKeys = new HashMap();// 定义一个集合用于存放排序列 sortKeys.put("tranId", Order.ASCENDING);// 按照升序排序 queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider);// 设置排序列 return reader; }}
FlatFileOutputFromDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.flatfile;import java.io.File;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;@Configurationpublic class FlatFileOutputFromDBItemWriterConfiguration { @Bean public FlatFileItemWriter flatFileOutputFromDBItemWriter(){ FlatFileItemWriter writer=new FlatFileItemWriter(); try { File path=new File("D:"+File.separator+"alipayTranDo.data").getAbsoluteFile();// String path=File.createTempFile("alipayTranDo", ".data").getAbsolutePath(); System.out.println("file is create in :"+path); writer.setResource(new FileSystemResource(path)); writer.setLineAggregator(new AlipayTranDoLineAggregator()); writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; }}
OutputItemWriterFlatFileApplication
package com.dhcc.batch.batchDemo.output.flatfile;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class OutputItemWriterFlatFileApplication { public static void main(String[] args) { SpringApplication.run(OutputItemWriterFlatFileApplication.class, args); }}
运行结果:
控制台显示文件读取写入成功,我们根据文件地址,观察写入后的普通文件
四.将数据写入到xml文件中
1.将数据写入到xml文件中,我们必须用到StaxEventItemWriter;
2.我们也会用到XStreamMarshall来序列文件
例:我们将数据库表alipaytrando中的数据写入到本地磁盘中
代码(此处我们只展示writer,用来写入的类,其他的均与上一个例子相同):
XMLFileOutputFromDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.xmlfile;import java.io.File;import java.util.HashMap;import java.util.Map;import org.springframework.batch.item.xml.StaxEventItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;import org.springframework.oxm.xstream.XStreamMarshaller;@Configurationpublic class XMLFileOutputFromDBItemWriterConfiguration { @Bean public StaxEventItemWriter xmlFileOutputFromDBItemWriter() throws Exception { XStreamMarshaller marshaller = new XStreamMarshaller(); @SuppressWarnings("rawtypes") Map aliases = new HashMap<>(); aliases.put("alipayTranDo", AlipayTranDo.class); marshaller.setAliases(aliases); StaxEventItemWriter writer = new StaxEventItemWriter<>(); writer.setRootTagName("alipaytrandos"); writer.setMarshaller(marshaller); File path = new File("D:" + File.separator + "alipayTranDo.xml").getAbsoluteFile(); System.out.println("file is create in :" + path); writer.setResource(new FileSystemResource(path)); writer.afterPropertiesSet(); return writer; }}
运行结果:
根据地址观察写入后的xml文件
五.将数据写入到多文件
1.将数据写入多个文件,我们使用CompositItemWriter
2.例(1):我们将数据表alipaytrandao中的数据分别写入到xml文件和json文件中
此处我们只展示writer(其余代码与上例相同):
mutipleFileOutputFromDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.mutiple.composit;import java.io.File;import java.util.Arrays;import java.util.HashMap;import java.util.Map;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.CompositeItemWriter;import org.springframework.batch.item.xml.StaxEventItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;import org.springframework.oxm.xstream.XStreamMarshaller;@Configurationpublic class mutipleFileOutputFromDBItemWriterConfiguration { @Bean public FlatFileItemWriter jsonFileItemWriter(){ FlatFileItemWriter writer=new FlatFileItemWriter(); try { File path=new File("D:"+File.separator+"alipayTranDo1.json").getAbsoluteFile();// String path=File.createTempFile("alipayTranDo", ".json").getAbsolutePath(); System.out.println("file is create in :"+path); writer.setResource(new FileSystemResource(path)); writer.setLineAggregator(new AlipayTranDoLineAggregator()); writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; } @Bean public StaxEventItemWriter xmlFileItemWriter() throws Exception{ XStreamMarshaller marshaller=new XStreamMarshaller(); @SuppressWarnings("rawtypes") Map aliases=new HashMap<>(); aliases.put("alipayTranDo", AlipayTranDo.class); marshaller.setAliases(aliases); StaxEventItemWriter writer=new StaxEventItemWriter<>(); writer.setRootTagName("alipaytrandos"); writer.setMarshaller(marshaller); File path=new File("D:"+File.separator+"alipayTranDo1.xml").getAbsoluteFile(); System.out.println("file is create in :"+path); writer.setResource(new FileSystemResource(path)); writer.afterPropertiesSet(); return writer; } @Bean public CompositeItemWriter alipayTranDoFileOutputFromDBItemWriter() throws Exception{ CompositeItemWriter itemWriter=new CompositeItemWriter<>(); itemWriter.setDelegates(Arrays.asList(xmlFileItemWriter(),jsonFileItemWriter())); itemWriter.afterPropertiesSet(); return itemWriter; }}
运行结果:
观察文件:
Json:
Xml:
3.例(2):我们将同一个文件进行分类写入:
首先我们观察数据库表person_buf的数据结构(数据总数是10001):
我们的目标是将数据从数据库读出按照id的奇偶分别写入不同类型的文件中
接下来上代码:
Person
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import java.util.Date;public class Person { private Integer id; private String name; private String perDesc; private Date createTime; private Date updateTime; private String sex; private Float score; private Double price; public Person() { super(); } public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score, Double price) { super(); this.id = id; this.name = name; this.perDesc = perDesc; this.createTime = createTime; this.updateTime = updateTime; this.sex = sex; this.score = score; this.price = price; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getCreateTime() { return createTime; } public String getPerDesc() { return perDesc; } public void setPerDesc(String perDesc) { this.perDesc = perDesc; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public Float getScore() { return score; } public void setScore(Float score) { this.score = score; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; } @Override public String toString() { return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime=" + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]"; }}
PersonLineAggregator
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class PersonLineAggregator implements LineAggregator { //JSON private ObjectMapper mapper=new ObjectMapper(); @Override public String aggregate(Person person) { try { return mapper.writeValueAsString(person); } catch (JsonProcessingException e) { throw new RuntimeException("unable to writer...",e); } }}
PersonRowMapper
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;/** * 实现将数据库中的每条数据映射到Person对象中 * @author Administrator * */public class PersonRowMapper implements RowMapper { /** * rs一条结果集,rowNum代表当前行 */ @Override public Person mapRow(ResultSet rs, int rowNum) throws SQLException { return new Person(rs.getInt("id") ,rs.getString("name") ,rs.getString("per_desc") ,rs.getDate("create_time") ,rs.getDate("update_time") ,rs.getString("sex") ,rs.getFloat("score") ,rs.getDouble("price")); }}
OutputItemWriterMutipleClassFileApplication
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class OutputItemWriterMutipleClassFileApplication { public static void main(String[] args) { SpringApplication.run(OutputItemWriterMutipleClassFileApplication.class, args); }}
ClassifierMutipleFileOutputFromDBConfiguration
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemStream;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class ClassifierMutipleFileOutputFromDBConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("mutipleFileOutputFromDBItemReader") private ItemReader extends Person> mutipleFileOutputFromDBItemReader; @Autowired @Qualifier("alipayTranDoFileOutputFromDBItemWriter") private ItemWriter super Person> alipayTranDoFileOutputFromDBItemWriter; @Autowired @Qualifier("jsonFileItemWriter") private ItemStream jsonFileItemWriter; @Autowired @Qualifier("xmlFileItemWriter") private ItemStream xmlFileItemWriter; @Bean public Job mutipleFileOutputFromDBJob1() { return jobBuilderFactory.get("mutipleFileOutputFromDBJob1") .start(mutipleFileOutputFromDBStep1()) .build(); } @Bean public Step mutipleFileOutputFromDBStep1() { return stepBuilderFactory.get("mutipleFileOutputFromDBStep1").chunk(100) .reader(mutipleFileOutputFromDBItemReader) .writer(alipayTranDoFileOutputFromDBItemWriter) .stream(jsonFileItemWriter) .stream(xmlFileItemWriter) .build(); }}
mutipleFileOutputFromDBItemReaderConfiguration
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import java.util.HashMap;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class mutipleFileOutputFromDBItemReaderConfiguration { @Autowired private DataSource dataSource; @Bean public JdbcPagingItemReader mutipleFileOutputFromDBItemReader() { JdbcPagingItemReader reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); // 设置数据源 reader.setFetchSize(100); // 设置一次最大读取条数 reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中 MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列 queryProvider.setFromClause("from person_buf"); // 设置要查询的表 Map sortKeys = new HashMap();// 定义一个集合用于存放排序列 sortKeys.put("id", Order.ASCENDING);// 按照升序排序 queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider);// 设置排序列 return reader; }}
mutipleFileOutputFromDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import java.io.File;import java.util.HashMap;import java.util.Map;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.ClassifierCompositeItemWriter;import org.springframework.batch.item.xml.StaxEventItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;import org.springframework.oxm.xstream.XStreamMarshaller;@Configurationpublic class mutipleFileOutputFromDBItemWriterConfiguration { @Bean public FlatFileItemWriter jsonFileItemWriter(){ FlatFileItemWriter writer=new FlatFileItemWriter(); try { File path=new File("D:"+File.separator+"person.json").getAbsoluteFile(); System.out.println("file is create in :"+path); writer.setResource(new FileSystemResource(path)); writer.setLineAggregator(new PersonLineAggregator()); writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; } @Bean public StaxEventItemWriter xmlFileItemWriter() throws Exception{ XStreamMarshaller marshaller=new XStreamMarshaller(); @SuppressWarnings("rawtypes") Map aliases=new HashMap<>(); aliases.put("person", Person.class); marshaller.setAliases(aliases); StaxEventItemWriter writer=new StaxEventItemWriter<>(); writer.setRootTagName("persons"); writer.setMarshaller(marshaller); File path=new File("D:"+File.separator+"person.xml").getAbsoluteFile(); System.out.println("file is create in :"+path); writer.setResource(new FileSystemResource(path)); writer.afterPropertiesSet(); return writer; } @Bean public ClassifierCompositeItemWriter alipayTranDoFileOutputFromDBItemWriter() throws Exception{ ClassifierCompositeItemWriter itemWriter=new ClassifierCompositeItemWriter(); itemWriter.setClassifier(new MyWriterClassifier(jsonFileItemWriter(),xmlFileItemWriter())); return itemWriter; }}
MyWriterClassifier
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import org.springframework.batch.item.ItemWriter;import org.springframework.classify.Classifier;public class MyWriterClassifier implements Classifier> { private ItemWriter jsonWriter; private ItemWriter xmlWriter; /** * */ private static final long serialVersionUID = -2911015707834323846L; public MyWriterClassifier(ItemWriter jsonWriter, ItemWriter xmlWriter) { this.jsonWriter = jsonWriter; this.xmlWriter = xmlWriter; } @Override public ItemWriter super Person> classify(Person classifiable) { if (classifiable.getId()%2==0) { return jsonWriter; }else { return xmlWriter; } }}
运行结果:
观察文件:
Person.json:(我们可以看出id为偶数的都写在了json文件中)
Person.xml:(我们可以看出id为奇数的都写在了xml文件中)