千家信息网

RocketMQ如何解决分布式事务

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,本篇内容主要讲解"RocketMQ如何解决分布式事务",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"RocketMQ如何解决分布式事务"吧!一致性如何保证:
千家信息网最后更新 2025年02月01日RocketMQ如何解决分布式事务

本篇内容主要讲解"RocketMQ如何解决分布式事务",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"RocketMQ如何解决分布式事务"吧!

一致性如何保证:

RocketMQ解决分布式事务(可靠消息最终一致性方案)

1、A系统发送一个prepared消息到MQ,如果这个prepared消息发送失败那么就直接取消操作别执行了。

2、如果这个消息发送成功了、就接着执行本地事务(executeLocalTransaction),如果成功就告诉MQ发送确认消息,如果失败,就告诉MQ发送回滚消息。

3、如果发送了确认消息、那么B系统会接收到确认消息,然后执行本地事务。

4、上面的第2步, 由于网络原因发送确认or回滚消息失败,但是broker有轮询机制,根据唯一id查询本地事务状态,MQ会自动定时轮询所有prepared消息回调你的接口(checkLocalTransaction),问你,这个消息是不是本地事务处理失败了,所有没有发送确认的消息,是继续重试还是回滚?一版来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个就是避免可能本地事务执行成功了,而确认消息却发送失败了。

PS:此方案是不支持事务发起服务进行回滚的,但是大部分互联网应用都不会要求事务发起方进行回滚,如果一定要事务发起方进行回滚应该采用2PC、3PC、TCC等强一致性方案来实现分布式事务,比如LCN。

订单-库存-分布式事务

这里通过一个实例来讲一下RocketMQ实现分布式事务具体编码。

场景: 下单场景,订单服务生成订单,当订单支付成功之后,修改订单状态已支付,并且要通知库存服务进行库存的扣减。

数据库设计:

CREATE TABLE `yzy_order` (  `id` int(11) NOT NULL,  `order_id` varchar(100) NOT NULL DEFAULT '' COMMENT '订单id',  `buy_num` int(11) DEFAULT NULL COMMENT '购买数量',  `good_id` int(11) DEFAULT NULL COMMENT '商品ID',  `user_id` int(11) DEFAULT NULL COMMENT '用户ID',  `pay_status` int(11) DEFAULT NULL COMMENT '支付状态,0:没有支付,1:已经支付',  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ciCREATE TABLE `yzy_repo` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `good_name` varchar(100) NOT NULL DEFAULT '' COMMENT '商品名称',  `num` int(11) NOT NULL DEFAULT '0' COMMENT '库存数量',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='测试,库存表表'


开始实战

订单服务service的主要方法

package com.transaction.order;import com.alibaba.dubbo.config.annotation.Reference;import com.transaction.repository.IRepositoryService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Service;import org.springframework.web.client.RestTemplate;import java.util.List;@Servicepublic class OrderService {    @Autowired    OrderDao orderDao;    public final int PAY_DONE = 1;    /**     *  检查订单是否存在并且状态是支付完成    **/    public boolean checkOrderPaySuccess(String orderId){        List allOrders = orderDao.findAll();        return  allOrders.stream()                .anyMatch(order -> order.getOrderId().equals(orderId) && order.getPayStatus() == PAY_DONE);    } /**     *  更新订单是为支付完成    **/    public void updatePayStatusByOrderId(String orderId){        orderDao.updatePayStatusByOrderId(orderId, PAY_DONE);    } /**     *  生成订单,状态默认是未支付    **/    public void save(String orderId, int num, int goodId,int userId) {        YzyOrder yzyOrder = new YzyOrder();        yzyOrder.setOrderId(orderId);        yzyOrder.setBuyNum(num);        yzyOrder.setGoodId(goodId);        yzyOrder.setUserId(userId);        orderDao.save(yzyOrder);    }}

业务流程

1.在订单表创建一个状态是未支付的订单

在终端或者浏览器 执行 curl '127.0.0.1:8081/order/save?num=2&good_id=1&user_id=1001'

 /**     * 生成订单接口     * @param num     * @param goodId     * @param userId     * @return     */    @GetMapping("save")    public String makeOrder(            @RequestParam("num") int num,            @RequestParam("good_id") int goodId,            @RequestParam("user_id") int userId) {        orderService.save(UUID.randomUUID().toString(), num, goodId,userId);        return "success";    }

2.用户支付完成,通过MQ通知库存服务扣减库存

OrderController:pay 发送订单支付成功的MQ事务消息,这里注意体会,并不是直接调用OrderService::updatePayStatusByOrderId 然后发送普通的MQ消息。而是先发送事务消息到MQ,然后MQ回调订单服务的TransactionListener::executeLocalTransaction,在这里完成订单状态的更新,保证发送事务消息和更新订单状态的一致性.

  @GetMapping("pay")    public String pay(@RequestParam("order_id") String orderId)            throws UnsupportedEncodingException, MQClientException, JsonProcessingException {        transactionProducer.sendOrderPaySucessEvent(orderId);        return "success";    }

3.订单服务端的事务消息监听器

@Componentpublic class TransactionProducer implements InitializingBean {    private TransactionMQProducer producer;    @Autowired    private OrderService orderService;    @Autowired    private OrderDao orderDao;    @Override    public void afterPropertiesSet() throws Exception {        producer = new TransactionMQProducer("order-pay-group");        producer.setNamesrvAddr("mq01.stag.kk.srv:9876;mq02.stag.kk.srv:9876");        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build();        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60,                TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory);        producer.setExecutorService(executor);        //设置发送消息的回调        producer.setTransactionListener(new TransactionListener() {            /**             * 根据消息发送的结果 判断是否执行本地事务             *             * 回调该方法的时候说明 消息已经成功发送到了MQ,可以把订单状态更新为 "支付成功"             */            @Override            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {                // 根据本地事务执行成与否判断 事务消息是否需要commit与 rollback                ObjectMapper objectMapper = new ObjectMapper();                LocalTransactionState state = LocalTransactionState.UNKNOW;                try {                    OrderRecord record = objectMapper.readValue(msg.getBody(), OrderRecord.class);                    //MQ已经收到了TransactionProducer send方法发送的事务消息,下面执行本地的事务                    //本地记录订单信息                    orderService.updatePayStatusByOrderId(record.getOrderId());                    state = LocalTransactionState.COMMIT_MESSAGE;                } catch (UnsupportedEncodingException e) {                    e.printStackTrace();                    state = LocalTransactionState.ROLLBACK_MESSAGE;                } catch (IOException e) {                    e.printStackTrace();                    state = LocalTransactionState.ROLLBACK_MESSAGE;                }                return state;            }            /**             * RocketMQ 回调 根据本地事务是否执行成功 告诉broker 此消息是否投递成功             * @return             */            @Override            public LocalTransactionState checkLocalTransaction(MessageExt msg) {                ObjectMapper objectMapper = new ObjectMapper();                LocalTransactionState state = LocalTransactionState.UNKNOW;                OrderRecord record = null;                try {                    record = objectMapper.readValue(msg.getBody(), OrderRecord.class);                } catch (IOException e) {                    e.printStackTrace();                }                try {                    //根据是否有transaction_id对应转账记录 来判断事务是否执行成功                    boolean isLocalSuccess = orderService.checkOrderPaySuccess(record.getOrderId());                    if (isLocalSuccess) {                        state = LocalTransactionState.COMMIT_MESSAGE;                    } else {                        state = LocalTransactionState.ROLLBACK_MESSAGE;                    }                } catch (Exception e) {                    e.printStackTrace();                }                return state;            }        });        producer.start();    }    public void sendOrderPaySucessEvent(String orderId) throws JsonProcessingException, UnsupportedEncodingException, MQClientException {        ObjectMapper objectMapper = new ObjectMapper();        YzyOrder order = orderDao.findAll().stream()                .filter(item->item.getOrderId().equals(orderId))                .collect(Collectors.toList()).get(0);        if(order == null){            System.out.println("not found order " + orderId);        }        // 构造发送的事务 消息        OrderRecord record = new OrderRecord();        record.setUserId(order.getUserId());        record.setOrderId(orderId);        record.setBuyNum(order.getBuyNum());        record.setPayStatus(order.getPayStatus());        record.setGoodId(order.getGoodId());        Message message = new Message("Order-Success", "", record.getOrderId(),                objectMapper.writeValueAsString(record).getBytes(RemotingHelper.DEFAULT_CHARSET));        TransactionSendResult result = producer.sendMessageInTransaction(message, null);        System.out.println("发送事务消息 ,orderId = " + record.getOrderId() + " " + result.toString());    }}

4.库存服务扣减库存

需要注意的问题:

1. 扣减库存要防止在并发的情况下被扣成负数

2. 先select后update的方式更新库存要加分布式锁或者数据库乐观锁,update语句需要是幂等的

UPDATE t_yue SET money=$new_money WHERE id=$good_id AND money=$old_money;

3. 注意通过msgId或者orderId来进行消费幂等处理

 @Override    public int reduce(Integer buyNum, Integer goodId) {        //并发的情况下,为了防止库存被扣成负数,有三种解决方案        //1. select for update (必须放到事务中)        //2. 这段逻辑加上分布式锁        //3. 数据库加上一个version字段,乐观锁        while (true){            Optional repoOption = repositoryDao.findById(goodId);            if (!repoOption.isPresent()) {                return 0;            }            YzyRepo repo = repoOption.get();            //避免数据库库存扣减小于零            if (repo.getNum() - buyNum < 0) {                return -1;            }            repo.setNum(repo.getNum() - buyNum);            int affect = repositoryDao.updateGoodNum(repo.getNum() - buyNum, repo.getNum(), goodId);            if(affect > 0){                return affect;            }        }    }

到此,相信大家对"RocketMQ如何解决分布式事务"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0