千家信息网

spring batch中基于RabbitMQ远程分区Step是怎样的

发表于:2024-10-18 作者:千家信息网编辑
千家信息网最后更新 2024年10月18日,spring batch中基于RabbitMQ远程分区Step是怎样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。前言碎语小编构建的实
千家信息网最后更新 2024年10月18日spring batch中基于RabbitMQ远程分区Step是怎样的

spring batch中基于RabbitMQ远程分区Step是怎样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

前言碎语

小编构建的实例可为主服务,从服务,主从混用等模式,可以大大提高spring batch在单机处理时的时效。

项目源码:https://gitee.com/kailing/partitionjob

spring batch远程分区Step的原理

master节点将数据根据相关逻辑(ID,hash),拆分成一段一段要处理的数据集,然后将数据集放到消息中间件中(ActiveMQ,RabbitMQ ),从节点监听到消息,获取消息,读取消息中的数据集处理并发回结果。如下图:

下面按原理分步骤实施,完成spring batch的远程分区实例

第一步,首先引入相关依赖

见:https://gitee.com/kailing/partitionjob/blob/master/pom.xml

分区job主要依赖为:spring-batch-integration,提供了远程通讯的能力

第二步,Master节点数据分发

@Profile({"master", "mixed"})    @Bean    public Job job(@Qualifier("masterStep") Step masterStep) {        return jobBuilderFactory.get("endOfDayjob")                .start(masterStep)                .incrementer(new BatchIncrementer())                .listener(new JobListener())                .build();    }    @Bean("masterStep")    public Step masterStep(@Qualifier("slaveStep") Step slaveStep,                           PartitionHandler partitionHandler,                           DataSource dataSource) {        return stepBuilderFactory.get("masterStep")                .partitioner(slaveStep.getName(), new ColumnRangePartitioner(dataSource))                .step(slaveStep)                .partitionHandler(partitionHandler)                .build();    }

master节点关键部分是,他的Step需要设置从节点Step的Name,和一个数据分区器,数据分区器需要实现Partitioner接口,它返回一个Map的数据结构,这个结构完整的描述了每个从节点需要处理的分区片段。ExecutionContext保存了从节点要处理的数据边界,当然,ExecutionContext里的参数是根据你的业务来的,我这里,已数据ID为边界划分了每个区。具体的Partitioner实现如下:

/** * Created by kl on 2018/3/1. * Content :根据数据ID分片 */public class ColumnRangePartitioner implements Partitioner {    private JdbcOperations jdbcTemplate;    ColumnRangePartitioner(DataSource dataSource){        this.jdbcTemplate = new JdbcTemplate(dataSource);    }    @Override    public Map partition(int gridSize) {        int min = jdbcTemplate.queryForObject("SELECT MIN(arcid) from  kl_article", Integer.class);        int max = jdbcTemplate.queryForObject("SELECT MAX(arcid) from  kl_article", Integer.class);        int targetSize = (max - min) / gridSize + 1;        Map result = new HashMap();        int number = 0;        int start = min;        int end = start + targetSize - 1;        while (start <= max) {            ExecutionContext value = new ExecutionContext();            result.put("partition" + number, value);            if (end >= max) {                end = max;            }            value.putInt("minValue", start);            value.putInt("maxValue", end);            start += targetSize;            end += targetSize;            number++;        }        return result;    }}

第三步,Integration配置

spring batch Integration提供了远程分区通讯能力,Spring Integration拥有丰富的通道适配器(例如JMS和AMQP),基于ActiveMQ,RabbitMQ等中间件都可以实现远程分区处理。本文使用RabbitMQ来做为通讯的中间件。关于RabbitMQ的安装等不在本篇范围,下面代码描述了如何配置MQ连接,以及spring batch分区相关队列,消息适配器等。

/** * Created by kl on 2018/3/1. * Content :远程分区通讯 */@Configuration@ConfigurationProperties(prefix = "spring.rabbit")public class IntegrationConfiguration {    private String host;    private Integer port=5672;    private String username;    private String password;    private String virtualHost;    private int connRecvThreads=5;    private int channelCacheSize=10;    @Bean    public ConnectionFactory connectionFactory() {        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);        connectionFactory.setUsername(username);        connectionFactory.setPassword(password);        connectionFactory.setVirtualHost(virtualHost);        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setCorePoolSize(connRecvThreads);        executor.initialize();        connectionFactory.setExecutor(executor);        connectionFactory.setPublisherConfirms(true);        connectionFactory.setChannelCacheSize(channelCacheSize);        return connectionFactory;    }    @Bean    public MessagingTemplate messageTemplate() {        MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());        messagingTemplate.setReceiveTimeout(60000000l);        return messagingTemplate;    }    @Bean    public DirectChannel outboundRequests() {        return new DirectChannel();    }    @Bean    @ServiceActivator(inputChannel = "outboundRequests")    public AmqpOutboundEndpoint amqpOutboundEndpoint(AmqpTemplate template) {        AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(template);        endpoint.setExpectReply(true);        endpoint.setOutputChannel(inboundRequests());        endpoint.setRoutingKey("partition.requests");        return endpoint;    }    @Bean    public Queue requestQueue() {        return new Queue("partition.requests", false);    }    @Bean    @Profile({"slave","mixed"})    public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer) {        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);        adapter.setOutputChannel(inboundRequests());        adapter.afterPropertiesSet();        return adapter;    }    @Bean    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);        container.setQueueNames("partition.requests");        container.setAutoStartup(false);        return container;    }    @Bean    public PollableChannel outboundStaging() {        return new NullChannel();    }    @Bean    public QueueChannel inboundRequests() {        return new QueueChannel();    }

第四步,从节点接收分区信息并处理

@Bean    @Profile({"slave","mixed"})    @ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")    public StepExecutionRequestHandler stepExecutionRequestHandler() {        StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();        BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator();        stepLocator.setBeanFactory(this.applicationContext);        stepExecutionRequestHandler.setStepLocator(stepLocator);        stepExecutionRequestHandler.setJobExplorer(this.jobExplorer);        return stepExecutionRequestHandler;    }    @Bean("slaveStep")    public Step slaveStep(MyProcessorItem processorItem,                          JpaPagingItemReader reader) {        CompositeItemProcessor itemProcessor = new CompositeItemProcessor();        List processorList = new ArrayList<>();        processorList.add(processorItem);        itemProcessor.setDelegates(processorList);        return stepBuilderFactory.get("slaveStep")                .chunk(1000)//事务提交批次                .reader(reader)                .processor(itemProcessor)                .writer(new PrintWriterItem())                .build();    }

从节点最关键的地方在于StepExecutionRequestHandler,他会接收MQ消息中间件中的消息,并从分区信息中获取到需要处理的数据边界,如下ItemReader:

@Bean(destroyMethod = "")    @StepScope    public JpaPagingItemReader
jpaPagingItemReader( @Value("#{stepExecutionContext['minValue']}") Long minValue, @Value("#{stepExecutionContext['maxValue']}") Long maxValue) { System.err.println("接收到分片参数["+minValue+"->"+maxValue+"]"); JpaPagingItemReader
reader = new JpaPagingItemReader<>(); JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider<>(); String sql = "select * from kl_article where arcid >= :minValue and arcid <= :maxValue"; queryProvider.setSqlQuery(sql); queryProvider.setEntityClass(Article.class); reader.setQueryProvider(queryProvider); Map queryParames= new HashMap(); queryParames.put("minValue",minValue); queryParames.put("maxValue",maxValue); reader.setParameterValues(queryParames); reader.setEntityManagerFactory(entityManagerFactory); return reader; }

中的minValuemin,maxValue,正是前文中Master节点分区中设置的值

如上,已经完成了整个spring batch 远程分区处理的实例,需要注意的是,一个实例,即可主可从可主从,是有spring profile来控制的,细心的人可能会发现@Profile({"master", "mixed"})等注解,所以如果你在测试的时候,别忘了在spring boot中配置好spring.profiles.active=slave等。

看完上述内容,你们掌握spring batch中基于RabbitMQ远程分区Step是怎样的的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0