千家信息网

RocketMQ的事务消息是什么意思

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,这篇文章主要介绍"RocketMQ的事务消息是什么意思",在日常操作中,相信很多人在RocketMQ的事务消息是什么意思问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"R
千家信息网最后更新 2025年02月06日RocketMQ的事务消息是什么意思

这篇文章主要介绍"RocketMQ的事务消息是什么意思",在日常操作中,相信很多人在RocketMQ的事务消息是什么意思问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"RocketMQ的事务消息是什么意思"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

一、 背景

阿里的RocketMQ以前版本阉割的消息回查,在新版又重新加入了,解决小公司没能力做可靠消息中间件产品。同时RocketMQ也参考了Kafka实现,性能上也很不错。

二、 版本

    org.apache.rocketmq    rocketmq-spring-boot-starter    2.0.3

三、源码解读

官方demo

@SpringBootApplicationpublic class ProducerApplication implements CommandLineRunner {    private static final String TX_PGROUP_NAME = "myTxProducerGroup";    @Resource    private RocketMQTemplate rocketMQTemplate;    @Value("${demo.rocketmq.transTopic}")    private String springTransTopic;    public static void main(String[] args) {        SpringApplication.run(ProducerApplication.class, args);    }    @Override    public void run(String... args) throws Exception {        // Send transactional messages        testTransaction();    }        private void testTransaction() throws MessagingException {        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};        for (int i = 0; i < 10; i++) {            try {                Message msg = MessageBuilder                                        .withPayload("Hello RocketMQ " + i)                                        .setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i)                                        .build();                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,                                                                        springTransTopic + ":" + tags[i % tags.length],                                                                        msg,                                                                        null);                System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",                                    msg.getPayload(),                                    sendResult.getSendStatus());                Thread.sleep(10);            } catch (Exception e) {                e.printStackTrace();            }        }    }    @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)    class TransactionListenerImpl implements RocketMQLocalTransactionListener {        private AtomicInteger transactionIndex = new AtomicInteger(0);        private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();        @Override        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);            System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId);            int value = transactionIndex.getAndIncrement();            int status = value % 3;            localTrans.put(transId, status);            if (status == 0) {                // Return local transaction with success(commit), in this case,                // this message will not be checked in checkLocalTransaction()                System.out.printf("    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());                return RocketMQLocalTransactionState.COMMIT;            }            if (status == 1) {                // Return local transaction with failure(rollback) , in this case,                // this message will not be checked in checkLocalTransaction()                System.out.printf("    # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());                return RocketMQLocalTransactionState.ROLLBACK;            }            System.out.printf("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");            return RocketMQLocalTransactionState.UNKNOWN;        }        @Override        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);            RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;            Integer status = localTrans.get(transId);            if (null != status) {                switch (status) {                    case 0:                        retState = RocketMQLocalTransactionState.UNKNOWN;                        break;                    case 1:                        retState = RocketMQLocalTransactionState.COMMIT;                        break;                    case 2:                        retState = RocketMQLocalTransactionState.ROLLBACK;                        break;                }            }            System.out.printf("------ !!! checkLocalTransaction is executed once," +                    " msgTransactionId=%s, TransactionState=%s status=%s %n",                transId, retState, status);            return retState;        }    }}

事务消息调用的是RocketMQTemplate.sendMessageInTransaction(),那么就从这里开始

//RocketMQTemplatepublic TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message message, final Object arg) throws MessagingException {    try {        //从本地缓存中获取生产者组名的生产者        TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);        org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,                                                                                                   charset, destination, message);        return txProducer.sendMessageInTransaction(rocketMsg, arg);    } catch (MQClientException e) {        throw RocketMQUtil.convert(e);    }}

进入txProducer.sendMessageInTransaction(rocketMsg, arg)

//TransactionMQProducerpublic TransactionSendResult sendMessageInTransaction(final Message msg,        final Object arg) throws MQClientException {    //是否已经设置事务监听器(本地事务、回调查询)    if (null == this.transactionListener) {        throw new MQClientException("TransactionListener is null", null);    }    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);}

进入defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg)

//DefaultMQProducerImplpublic TransactionSendResult sendMessageInTransaction(final Message msg,                                                          final LocalTransactionExecuter localTransactionExecuter, final Object arg)        throws MQClientException {    TransactionListener transactionListener = getCheckListener();    if (null == localTransactionExecuter && null == transactionListener) {        throw new MQClientException("tranExecutor is null", null);    }    Validators.checkMessage(msg, this.defaultMQProducer);    SendResult sendResult = null;        //设置为预消息    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());    try {        //发送消息        sendResult = this.send(msg);    } catch (Exception e) {        throw new MQClientException("send message Exception", e);    }    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;    Throwable localException = null;    switch (sendResult.getSendStatus()) {        //发送成功,当消息对象中的isWaitStoreMsgOK=true(默认true),如果 isWaitStoreMsgOK=false,当没有捕获到异常,那么将返回SEND_OK        case SEND_OK: {            try {                if (sendResult.getTransactionId() != null) {                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());                }                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                if (null != transactionId && !"".equals(transactionId)) {                    msg.setTransactionId(transactionId);                }                //执行传入的本地分支事务                if (null != localTransactionExecuter) {                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);                //执行注解或者生产者构造传入的事务监听器                } else if (transactionListener != null) {                    log.debug("Used new transaction API");                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);                }                if (null == localTransactionState) {                    localTransactionState = LocalTransactionState.UNKNOW;                }                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {                    log.info("executeLocalTransactionBranch return {}", localTransactionState);                    log.info(msg.toString());                }            } catch (Throwable e) {                log.info("executeLocalTransactionBranch exception", e);                log.info(msg.toString());                localException = e;            }        }            break;        //刷盘超时        case FLUSH_DISK_TIMEOUT:        //数据同步到Slave服务器器超时        case FLUSH_SLAVE_TIMEOUT:        //无Slave服务器器可用        case SLAVE_NOT_AVAILABLE:            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;            break;        default:            break;    }    try {        //发送二次确认消息        this.endTransaction(sendResult, localTransactionState, localException);    } catch (Exception e) {        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);    }    //封装执行结果    TransactionSendResult transactionSendResult = new TransactionSendResult();    transactionSendResult.setSendStatus(sendResult.getSendStatus());    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());    transactionSendResult.setMsgId(sendResult.getMsgId());    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());    transactionSendResult.setTransactionId(sendResult.getTransactionId());    transactionSendResult.setLocalTransactionState(localTransactionState);    return transactionSendResult;}public void endTransaction(        final SendResult sendResult,        final LocalTransactionState localTransactionState,        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {        final MessageId id;    if (sendResult.getOffsetMsgId() != null) {        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());    } else {        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());    }    String transactionId = sendResult.getTransactionId();    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();    requestHeader.setTransactionId(transactionId);    requestHeader.setCommitLogOffset(id.getOffset());    switch (localTransactionState) {        //提交事务        case COMMIT_MESSAGE:            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);            break;        //提交事务        case ROLLBACK_MESSAGE:            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);            break;        //提交事务        case UNKNOW:            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);            break;        default:            break;    }    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());    requestHeader.setMsgId(sendResult.getMsgId());    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;        //单向发送二次确认消息,不需要服务端相应,由消息回查监听补偿    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,                                                                   this.defaultMQProducer.getSendMsgTimeout());}

刚开始看RocketMQ的事务消息Example时,用的监听器执行本地事务,还以为是通过向服务端发送预消息,异步监听服务端响应再处理本地事务,那客户端根本没法实时响应。

到此,关于"RocketMQ的事务消息是什么意思"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0