使用RocketMQ怎么对消息进行处理
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这期内容当中小编将会给大家带来有关使用RocketMQ怎么对消息进行处理,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。消息发送(生产者)以maven + Sprin
千家信息网最后更新 2025年02月02日使用RocketMQ怎么对消息进行处理
这期内容当中小编将会给大家带来有关使用RocketMQ怎么对消息进行处理,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
消息发送(生产者)
以maven + SpringBoot 工程为例,先在pom.xml
增加依赖
org.apache.rocketmq rocketmq-spring-boot-starter 2.0.1
由于,这个依赖是一个starter
,直接引入依赖就可以开始写投递消息的代码了。这个starter
注册了一个叫org.apache.rocketmq.spring.core.RocketMQTemplate
的bean
,用它就可以直接把消息投递出去。 具体的API是这样的
XXXEvent xxxDto = new XXXEvent(); Messagemessage = MessageBuilder.withPayload(xxxDto).build(); String dest = String.format("%s:%s",topic-name","tag-name"); //默认投递:同步发送 不会丢失消息。如果在投递成功后发生网络异常,客户端会认为投递失败而回滚本地事务 this.rocketMQTemplate.send(dest, xxxDto);
这种投递方式能保证投递成功的消息不会丢失,但是不能保证投递一定成功。假设一次调用的流程是这样的
如果在步骤3的时候发生错误,因为出错mqClient
会认为消息投递失败而把事务回滚。如果消息已经被消费,那就会导致业务错误。我们可以用事务消息解决这个问题。
以带事务方式投递的消息,正常情况下的处理流程是这样的
出错的时候是这样的
由于普通消息没有消息回查,普通消息用的producer
不支持回查操作,不同业务的回查处理也不一样,事务消息需要使用单独的producer
。消息发送代码大概是这样的
//调用这段代码之前别做会影响数据的操作XXXEvent xxxDto = new XXXEvent();Messagemessage = MessageBuilder.withPayload(xxxDto).build();String dest = String.format("%s:%s",topic-name","tag-name");TransactionSendResult transactionSendResult = this.rocketMQTemplate.sendMessageInTransaction("poducer-name","topic-name:tag-name",message,"xxxid");if (LocalTransactionState.ROLLBACK_MESSAGE.equals(transactionSendResult.getLocalTransactionState())){ throw new RuntimeException("事务消息投递失败");}//按照RocketMQ的写法,这个地方不应该有别的代码
@RocketMQTransactionListener(txProducerGroup = "producer") class TransactionListenerImpl implements RocketMQLocalTransactionListener { //消息投递成功后执行的逻辑(半消息) //原文:When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try{ // xxxService.doSomething(); return RocketMQLocalTransactionState.COMMIT; catch(IOException e){ //不确定最终是否成功 return RocketMQLocalTransactionState.UNKNOWN; }catch(Exception e){ return RocketMQLocalTransactionState.ROLLBACK; } } //回查事务执行状态 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { Boolean result = xxxService.isSuccess(msg,arg); if(result != null){ if(result){ return RocketMQLocalTransactionState.COMMIT; }else{ return RocketMQLocalTransactionState.ROLLBACK; } } return RocketMQLocalTransactionState.UNKNOWN; } }
处理消息(消费)
普通消息和事务消息的区别只在投递的时候才明显,对应的消费端代码比较简单
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Component;@Slf4j@Component@RocketMQMessageListener(consumerGroup = "xxx-consumer", topic = "topic-name",selectorExpression = "tag-name")public class XXXEventMQListener implements RocketMQListener{ private String repeatCheckRedisKeyTemplate = "topic-name:tag:repeat-check:%s"; @Autowired private StringRedisTemplate redisTemplate; @Override public void onMessage(XXXEvent message) { log.info("consumer message {}",message); //处理消息 try{ xxxService.doSomething(message); }catch(Exception ex){ log.warn(String.format("message [%s] 消费失败",message),ex); //抛出异常后,MQClient会返回ConsumeConcurrentlyStatus.RECONSUME_LATER,这条消息会再次尝试消费 throw new RuntimException(ex); } }}
RocketMQ用ACK机制保证NameServer知道消息是否被消费在org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer
里是这么处理的
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @SuppressWarnings("unchecked") @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); rocketMQListener.onMessage(doConvertMessage(messageExt)); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}", messageExt, e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}
上述就是小编为大家分享的使用RocketMQ怎么对消息进行处理了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
消息
事务
处理
消费
成功
代码
普通
时候
保证
业务
内容
方式
流程
错误
分析
不同
专业
中小
内容丰富
再次
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
应用程序软件开发工程师
网络安全产品市场招聘
世界互联网大会上的前沿新科技
潮流软件开发服务创意
网络安全手抄报小学生简单
枣庄互联网养老软件开发公司
服务器安全防护加固流程
渤海大学数据库题库
零基础做软件开发的书籍
服务器进不了安全系统吗
数据库的日志不可用
数据库隔离级别查看
如何查看数据库中作业
网络安全防护主要对象
数据库中提取日期的
重庆的软件开发小公司比较多
服务器 软件管理
沈阳聚格网络技术有限公司
戴尔服务器关机了散热器还在工作
r星服务器安装在哪
DNF数据库技术支持
福建oa管控软件开发平台
连接公司mysql数据库
ts150 服务器
公安部网络安全管理局公告
最容易兼职的网络技术
河南快道网络技术有限公司
我的世界服务器在哪
三级网络技术考哪一本书
县级网络安全宣传周活动方案