千家信息网

如何整合RocketMQ事务消息

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,今天就跟大家聊聊有关如何整合RocketMQ事务消息,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。一、 选择RocketMQ原因ActiveM
千家信息网最后更新 2025年02月03日如何整合RocketMQ事务消息

今天就跟大家聊聊有关如何整合RocketMQ事务消息,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

一、 选择RocketMQ原因

ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ选型

二、 整合思路

RocketMQ提供了事务消息回查,查看官方Demo

@SpringBootApplicationpublic class ProducerApplication implements CommandLineRunner {    private static final String TX_PGROUP_NAME = "myTxProducerGroup";    @Resource    private RocketMQTemplate rocketMQTemplate;    @Value("${demo.rocketmq.transTopic}")    private String springTransTopic;        public static void main(String[] args) {        SpringApplication.run(ProducerApplication.class, args);    }    @Override    public void run(String... args) throws Exception {        // Send transactional messages        testTransaction();    }    private void testTransaction() throws MessagingException {        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};        for (int i = 0; i < 10; i++) {            try {                Message msg = MessageBuilder                                        .withPayload("Hello RocketMQ " + i)                                        .setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i)                                        .build();                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,                                                                        springTransTopic + ":" + tags[i % tags.length],                                                                        msg,                                                                        null);                System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",                                    msg.getPayload(),                                    sendResult.getSendStatus());                Thread.sleep(10);            } catch (Exception e) {                e.printStackTrace();            }        }    }    @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)    class TransactionListenerImpl implements RocketMQLocalTransactionListener {        private AtomicInteger transactionIndex = new AtomicInteger(0);        private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();        @Override        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);            System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId);            int value = transactionIndex.getAndIncrement();            int status = value % 3;            localTrans.put(transId, status);            if (status == 0) {                // Return local transaction with success(commit), in this case,                // this message will not be checked in checkLocalTransaction()                System.out.printf("    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());                return RocketMQLocalTransactionState.COMMIT;            }            if (status == 1) {                // Return local transaction with failure(rollback) , in this case,                // this message will not be checked in checkLocalTransaction()                System.out.printf("    # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());                return RocketMQLocalTransactionState.ROLLBACK;            }            System.out.printf("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");            return RocketMQLocalTransactionState.UNKNOWN;        }        @Override        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);            RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;            Integer status = localTrans.get(transId);            if (null != status) {                switch (status) {                    case 0:                        retState = RocketMQLocalTransactionState.UNKNOWN;                        break;                    case 1:                        retState = RocketMQLocalTransactionState.COMMIT;                        break;                    case 2:                        retState = RocketMQLocalTransactionState.ROLLBACK;                        break;                }            }            System.out.printf("------ !!! checkLocalTransaction is executed once," +                    " msgTransactionId=%s, TransactionState=%s status=%s %n",                transId, retState, status);            return retState;        }    }}

需要在testTransaction()中发送消息,然后在TransactionListenerImpl类中实现executeLocalTransaction()方法才能执行整个本地事务,然后在checkLocalTransaction()中实现事务消息回查。

查看源代码可以知道testTransaction()方法和executeLocalTransaction()是在同一个线程当中,只不过包装RocketMQTemplate中。

三、问题和解决方法

3.1事务消息面临的几个问题:

  1. 消息发送的事务消息回调查询和本地事务没严格的先后顺序,怎么保证,回查时,事务操作肯定已经完成。

  2. 事务消息回调使用transaction_id查询,那么transaction_id存放在哪里,同时保证transaction_id关联的业务操作执行成功。

  3. 怎么把事务回调查询操作隔离出业务,保证不侵入代码中。

  4. 下游消费者怎么保证接口幂等性。

  5. 下游消费者怎么提高幂等性查询性能。

  6. 怎么把幂等性操作隔离出业务,保证不侵入代码中。

3.2 解决方法

  1. 因为数据库或者其他业务操作可能会存在延时,那么不能保证回查时业务操作已完成,那么可以多次回查,并设置最大回查次数,同时不能丢弃MQ消息持久化,方便手动恢复。

  2. 可以使用本地消息表落地的发送消息,同时可以采用切面、继承等等方式将落地消息隔离出业务代码之外,保证本地消息落库不侵入,注意必须要保证本地消息落库和本地业务落库在同一个事务之内!

  3. 事务消息回查可以使用第2点的本地消息表,根据transaction_id查询,判断本地事务的执行结果,也和第2点一样,可以使用一些方式将事务消息回查代码隔离出业务代码,保证不侵入。

  4. 幂等性的方法:

    • 数据库唯一约束

    • 状态机CAS单向流转

    • 消息去重表

  5. ,在执行本地业务前,先对redis判断是业务id是否存在,存在则直接返回消费成功,在执行本地业务之后,可以将消费信息异步落地到redis当中。注意:需要保证本地业务和消息幂等性操作在同一个事务当中,同时redis落地操作在事务之外。

  6. 比较好的方案应该是数据库唯一约束 + 消息去重表,在消息去重表中对业务id设置唯一约束,同时将消息落地操作隔离出本地业务之外,保证不侵入。

  7. 定时清理历史的本地消息表(消息去重表)。

看完上述内容,你们对如何整合RocketMQ事务消息有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

0