千家信息网

Spring Boot + RabbitMQ如何实现分布式事务

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,小编给大家分享一下Spring Boot + RabbitMQ如何实现分布式事务,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!一:分布式事务解决方案1.两阶段提交(2PC)第一阶段:
千家信息网最后更新 2025年02月01日Spring Boot + RabbitMQ如何实现分布式事务

小编给大家分享一下Spring Boot + RabbitMQ如何实现分布式事务,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

一:分布式事务解决方案

1.两阶段提交(2PC)

第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交.

第二阶段:事务协调器要求每个数据库提交数据。

案例可参照http://blog.itpub.net/28624388/viewspace-2137095/

2.补偿事务(TCC)

TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:

Try 阶段主要是对业务系统做检测及资源预留

Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 Confirm阶段时,默认 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。

Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

3.本地消息表(异步确保)

本地消息表这种实现方式应该是业界使用最多的,其核心思想是将分布式事务拆分成本地事务进行处理。

基本思路:

a.消息生产方,需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交,也就是说他们要在一个数据库里面。然后消息会经过MQ发送到消息的消费方。如果消息发送失败,会进行重试发送。

b.消息消费方,需要处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作。

c.生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

二:Spring Boot + RabbitMQ分布式事务实现

1.pom.xml依赖配置

                        org.springframework.boot                        spring-boot-starter-amqp                

2.application.yaml rabbitmq配置

# RabbitMQ          rabbitmq:    host: 112.74.105.178    port: 5672    username: admin    password: admin    virtual-host: /    publisher-confirms: true    publisher-returns: true    listener:      simple:        acknowledge-mode: manual

3.RabbitMQConfig.java

@Configurationpublic class RabbitMQConfig {// 下单并且派单存队列        public static final String ORDER_DIC_QUEUE = "order_dis_queue";        // 补单队列,判断订单是否已经被创建        public static final String ORDER_CREATE_QUEUE = "order_create_queue";        // 下单并且派单交换机        private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";        @Bean        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {                RabbitTemplate template = new RabbitTemplate(connectionFactory);                template.setMessageConverter(new Jackson2JsonMessageConverter());                return template;        }        @Bean        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {                SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();                factory.setConnectionFactory(connectionFactory);                factory.setMessageConverter(new Jackson2JsonMessageConverter());                return factory;        }        @Bean        public Queue OrderDicQueue() {                return new Queue(ORDER_DIC_QUEUE);        }        @Bean        public Queue OrderCreateQueue() {                return new Queue(ORDER_CREATE_QUEUE);        }        @Bean        DirectExchange directOrderExchange() {                return new DirectExchange(ORDER_EXCHANGE_NAME);        }        @Bean        Binding bindingExchangeOrderDicQueue() {                return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");        }        @Bean        Binding bindingExchangeOrderCreateQueue() {                return BindingBuilder.bind(OrderCreateQueue()).to(directOrderExchange()).with("orderRoutingKey");        }}

4. 消息生产者

public class MsgPushInfoServiceImpl extends ServiceImpl                implements MsgPushInfoService, RabbitTemplate.ConfirmCallback {        @Autowired        private RabbitTemplate rabbitTemplate;        public void orderAndDsipatch() {                try {                        String orderId = "123456";                        JSONObject jsonObect = new JSONObject();                        jsonObect.put("orderId", orderId);                        String msg = jsonObect.toString();                        System.out.println("msg:" + msg);                        MessageProperties messageProperties = new MessageProperties();                messageProperties.setContentType("application/json");                messageProperties.setMessageId(orderId);                Message message = new Message(msg.getBytes(),messageProperties);                                        CorrelationData correlationData = new CorrelationData(orderId);                        rabbitTemplate.setMandatory(true);                        rabbitTemplate.setConfirmCallback(this);                        rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData);                } catch (Exception e) {                        e.printStackTrace();                }        }        @Override        public void confirm(CorrelationData correlationData, boolean ack, String cause) {                String orderId = correlationData.getId();                System.out.println("消息id:" + orderId);                if (ack) { // 消息发送成功                        System.out.println("消息发送确认成功");                } else {                        // 重试机制                        System.out.println("消息发送确认失败:" + cause);                }        }}

5.消息消费者

@Componentpublic class DispatchReceiver {        @RabbitHandler        @RabbitListener(queues = "order_dis_queue", containerFactory = "rabbitListenerContainerFactory")        public void process(Message message, Channel channel) {                System.out.println("rev : " + message.getMessageProperties().getMessageId());                try {                        System.out.println("======basicNack====="+message.getMessageProperties().getDeliveryTag());                        //业务处理成功,则删除消息                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);                        //业务处理失败,则发送补偿消息                } catch (Exception e) {                        e.printStackTrace();                }        }}

看完了这篇文章,相信你对"Spring Boot + RabbitMQ如何实现分布式事务"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!

消息 事务 业务 阶段 成功 处理 分布式 数据 生产 补偿 消费 数据库 思想 方案 机制 核心 状态 篇文章 系统 资源 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网信办宣布进行网络安全审查 做我们能为网络安全做什么 郎溪新能源软件开发服务生产过程 节点控制 软件开发 海云安网络安全技术 香港凤凰网络技术有限公司 什么服务器不用担心隐私泄露 按键精灵如何读取服务器空间 西数服务器 应届生去大连找软件开发工作 软件开发构架图 服务器主机重启自动进入bios 招商引资项目数据库 地区网络安全的产业链的布局 准服务器 南京市安卓软件开发工程师招聘 网络安全法对企业的好处 智能互联网高科技数学 网络安全老旧漏洞 数字化转型与数据库的关系论文 金蝶专业版服务器安装不了 做网络技术维护工作有前途吗 软件开发议价记录 杨浦区参考数据库销售厂家报价 二叉树数据存放数据库 瓦罗兰特可以玩哪些服务器 戴尔服务器安装滑轨 龙芯2服务器 学习网络安全与管理的意义 重庆视频监控平台软件开发
0