RocketMQ的事务消息是什么意思
发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,这篇文章主要介绍"RocketMQ的事务消息是什么意思",在日常操作中,相信很多人在RocketMQ的事务消息是什么意思问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"R
千家信息网最后更新 2025年02月06日RocketMQ的事务消息是什么意思
这篇文章主要介绍"RocketMQ的事务消息是什么意思",在日常操作中,相信很多人在RocketMQ的事务消息是什么意思问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"RocketMQ的事务消息是什么意思"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
一、 背景
阿里的RocketMQ以前版本阉割的消息回查,在新版又重新加入了,解决小公司没能力做可靠消息中间件产品。同时RocketMQ也参考了Kafka实现,性能上也很不错。
二、 版本
org.apache.rocketmq rocketmq-spring-boot-starter 2.0.3
三、源码解读
官方demo
@SpringBootApplicationpublic class ProducerApplication implements CommandLineRunner { private static final String TX_PGROUP_NAME = "myTxProducerGroup"; @Resource private RocketMQTemplate rocketMQTemplate; @Value("${demo.rocketmq.transTopic}") private String springTransTopic; public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } @Override public void run(String... args) throws Exception { // Send transactional messages testTransaction(); } private void testTransaction() throws MessagingException { String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = MessageBuilder .withPayload("Hello RocketMQ " + i) .setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i) .build(); SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME, springTransTopic + ":" + tags[i % tags.length], msg, null); System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n", msg.getPayload(), sendResult.getSendStatus()); Thread.sleep(10); } catch (Exception e) { e.printStackTrace(); } } } @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME) class TransactionListenerImpl implements RocketMQLocalTransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMaplocalTrans = new ConcurrentHashMap<>(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId); int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(transId, status); if (status == 0) { // Return local transaction with success(commit), in this case, // this message will not be checked in checkLocalTransaction() System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload()); return RocketMQLocalTransactionState.COMMIT; } if (status == 1) { // Return local transaction with failure(rollback) , in this case, // this message will not be checked in checkLocalTransaction() System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload()); return RocketMQLocalTransactionState.ROLLBACK; } System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT; Integer status = localTrans.get(transId); if (null != status) { switch (status) { case 0: retState = RocketMQLocalTransactionState.UNKNOWN; break; case 1: retState = RocketMQLocalTransactionState.COMMIT; break; case 2: retState = RocketMQLocalTransactionState.ROLLBACK; break; } } System.out.printf("------ !!! checkLocalTransaction is executed once," + " msgTransactionId=%s, TransactionState=%s status=%s %n", transId, retState, status); return retState; } }}
事务消息调用的是RocketMQTemplate.sendMessageInTransaction()
,那么就从这里开始
//RocketMQTemplatepublic TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message> message, final Object arg) throws MessagingException { try { //从本地缓存中获取生产者组名的生产者 TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup); org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); return txProducer.sendMessageInTransaction(rocketMsg, arg); } catch (MQClientException e) { throw RocketMQUtil.convert(e); }}
进入txProducer.sendMessageInTransaction(rocketMsg, arg)
//TransactionMQProducerpublic TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { //是否已经设置事务监听器(本地事务、回调查询) if (null == this.transactionListener) { throw new MQClientException("TransactionListener is null", null); } return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);}
进入defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg)
//DefaultMQProducerImplpublic TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; //设置为预消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { //发送消息 sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { //发送成功,当消息对象中的isWaitStoreMsgOK=true(默认true),如果 isWaitStoreMsgOK=false,当没有捕获到异常,那么将返回SEND_OK case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } //执行传入的本地分支事务 if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); //执行注解或者生产者构造传入的事务监听器 } else if (transactionListener != null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; //刷盘超时 case FLUSH_DISK_TIMEOUT: //数据同步到Slave服务器器超时 case FLUSH_SLAVE_TIMEOUT: //无Slave服务器器可用 case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } try { //发送二次确认消息 this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } //封装执行结果 TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult;}public void endTransaction( final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { //提交事务 case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; //提交事务 case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; //提交事务 case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; //单向发送二次确认消息,不需要服务端相应,由消息回查监听补偿 this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());}
刚开始看RocketMQ的事务消息Example时,用的监听器执行本地事务,还以为是通过向服务端发送预消息,异步监听服务端响应再处理本地事务,那客户端根本没法实时响应。
到此,关于"RocketMQ的事务消息是什么意思"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
事务
消息
服务
监听
意思
学习
生产者
监听器
生产
更多
服务器
版本
帮助
不错
实用
成功
接下来
中间件
分支
单向
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
佛山软件开发课程
中普数据库过多取数
网络安全有哪四个原则
pos机服务器出错
数据库9925
綦江区网络软件开发流程要求
中国移动软件开发平台
数据库系统工程师过关率
禹战互联网科技有限公司
数据库连接池推荐
10月网络安全对话
郑州糖小果网络技术有限公司
数据库查询重复5次以上数据
服务器安全实验总结
计算机网络安全专业毕业论文
北京华新网络技术
sql如何批量删除数据库
上海市夺畅网络技术有限公司
勘察测绘研究院软件开发
招聘程序员的要求软件开发
小学网络安全学生要了解的
打开程序老是提示连接数据库失败
忠诚智慧科技互联网有限公司
数据库增加字段不为空
乙酸乙酯红外光谱图物竟数据库
深度社神经网络技术的应用
创建数据库表对象
非线编服务器
国家基层信息化网络安全
服务器显卡不能识别