千家信息网

springboot+rabbitmq如何实现指定消费者才能消费

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章将为大家详细讲解有关springboot+rabbitmq如何实现指定消费者才能消费,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。如何保证mq队列里的消息只
千家信息网最后更新 2025年01月23日springboot+rabbitmq如何实现指定消费者才能消费

这篇文章将为大家详细讲解有关springboot+rabbitmq如何实现指定消费者才能消费,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

如何保证mq队列里的消息只被测试服务器上的consumer消费,避免本地环境误消费?

程序里有一个应用场景使用到了rabbitmq--当财务确认收到企业的打款金额后,系统会把企业订单生成用户付款单。由于订单记录数据量大,改为通过mq来异步实现。即财务确认收款操作后,将企业订单数据放入mq,另一端监听mq消息队列,将收到的企业订单加工转换成用户付款单,并做持久化。

本地开发环境与测试环境共用一套rabbitmq。当项目部署到测试环境后,QA测试过程中,总是"莫名其妙"的发现所保存的用户付款单数据有问题。

当然,首先要排查程序,检查Consumer的数据处理的逻辑是否有bug。单元测试后,发现并不存在测试环境的bug。

原来,消息队列被"非正常"消费了!

Q: 什么情况?

A: 几个伙伴一起参与的项目,大家总是要调试自己的程序的。而如果碰巧本地程序监听到消息队列里有消息,那么,消息就被本地程序消费掉了。问题正是出现在这里!----团队开发,大家并不会及时检出git上最新的程序版本。如果本地的程序版本不是最新的正确的版本,势必会出现bug。

那么,怎么办?

每次你改了逻辑,告诉大家获取最新?

不现实,约定的东西往往不奏效的。

如何保证mq队列里的消息只被测试服务器上的consumer消费,避免本地环境误消费? 或者说,如何实现消息的定向消费呢?

只要肯琢磨,办法总比困难多!百思可得解!

我们知道,rabbitmq手动ack模式。这还不够,因为我们怎么让consumer来决定是否消费呢? 所以,我们需要一个标识----producer设定一个标识,consumer如果匹配这个标识,则消费,否则予以reject放回消息队列。

通过查看spring-rabbit/spring-amqp的代码,发现可以在spring-amqp里的MessageProperties上做文章。生产者与消费者每次消息传输都会携带一个MessageProperties,通常我们是不指定的,走MessageProperties的默认设置值。

我的策略:MessageProperties有一个属性叫AppId。我们程序所部署的测试机器就一台,即消息Producer和消息Consumer在一台机器上。那么,我就可以利用机器的IP来识别消息。只有Producer与Consumer的IP匹配,才消费消息。程序员本机IP与测试服务器IP不一样,就会拒绝接收消息,会把消息重新放回消息队列,等待测试服务器的Consumer消费。

话不多说,上代码吧,

生产者代码:

package com.sboot.mq;import org.junit.Test;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.amqp.support.converter.SimpleMessageConverter;import org.springframework.beans.factory.annotation.Autowired;import java.net.InetAddress;import java.util.UUID;public class MQProducerTest extends BaseTest {    @Autowired    RabbitTemplate rabbitTemplate;    @Test    public void test() throws Exception {        for (int i = 1; i <= 5; i++) {            MessageProperties messageProperties = new MessageProperties();            String ip = InetAddress.getLocalHost().getHostAddress();            messageProperties.setAppId(ip);//            messageProperties.setUserId(String.valueOf(i));            MessageConverter messageConverter = new SimpleMessageConverter();            String msg = UUID.randomUUID().toString();//            System.out.println(msg);            Message message1 = messageConverter.toMessage(msg, messageProperties);            rabbitTemplate.send(MessageQueueConstant.USER_SETTLEMENT_EXCHANGE, "UserSettlementRouting", message1);            System.out.println("入队完成");            Thread.sleep(500L);        }    }}

消费者手动ACK,要实现ChannelAwareMessageListener接口,感知rabbitmq.client.Channel实例,调用channel的basicAck、basicReject等方法:

package com.sboot.mq;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.amqp.support.converter.SimpleMessageConverter;import org.springframework.context.annotation.Profile;import org.springframework.stereotype.Component;import java.net.InetAddress;@Component@Profile(value = "dev")@Slf4jpublic class UserSettlementDevConsumer implements ChannelAwareMessageListener {    @RabbitHandler    @RabbitListener(queues = MessageQueueConstant.USER_SETTLEMENT_QUEUE, ackMode = "MANUAL")    @Override    public void onMessage(Message message, Channel channel) throws Exception {        Thread.currentThread().setName(UserSettlementDevConsumer.class.getSimpleName() + System.currentTimeMillis());        long tag = message.getMessageProperties().getDeliveryTag();        String appId = message.getMessageProperties().getAppId();        log.info("{}-{}, 消息出队", tag, appId);        String receiveMsg = "";        try {            //核对标识,决定是否消费消息            String ip = InetAddress.getLocalHost().getHostAddress();            if (!ip.equals(appId)) {                log.info("这不是我需要的消息。放回队列。{}", receiveMsg);//                channel.basicNack(tag, false, true);                channel.basicReject(tag, true);//                channel.basicRecover(true);                return;            }            MessageConverter messageConverter = new SimpleMessageConverter();            receiveMsg = String.valueOf(messageConverter.fromMessage(message));            。。。。在这里消费消息            log.info("success " + receiveMsg);            channel.basicAck(tag, false);        } catch (Exception e) {            log.error("receive message has an error, ", e);            channel.basicNack(tag, false, true);        }    }}

说明一下依赖的spring-rabbit包的版本,我的是2.2.0.RELEASE。如果是2.1.4版本里,@RabbitListener注解没有ackMode。

解决本案问题过程中的花絮:

spring-rabbit-2.1.4.RELEASEspring-rabbit-2.2.0.RELEASE

@RabbitListener的ackMode的值见枚举org.springframework.amqp.core.AcknowledgeMode

NONE-- no acks(自动消费 autoAck)MANUAL --Manual acks - user must ack/nack via a channel aware listener.(手动消费,Consumer端必须显式调用ack或nack)AUTO --

设置了手动消费,上文消费端的deliveryTag会是不同的long值。自动消费的deliveryTag是重复的1和2这样的。并且,自动消费时,如果要使用channel的ack/nack,会报异常:

2020-06-19 22:26:54.586 [AMQP Connection 192.168.40.20:5672] ERROR o.s.a.rabbit.connection.CachingConnectionFactory:1468 - Channel shutdown: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-06-19 22:26:54.599 [SimpleAsyncTaskExecutor-1] ERROR c.e.z.r.p.modules.mq.UserSettlementAckConsumer:49 -
org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1092)

关于"springboot+rabbitmq如何实现指定消费者才能消费"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

消费 消息 测试 程序 队列 环境 版本 消费者 企业 手动 数据 服务器 标识 订单 服务 机器 用户 篇文章 问题 代码 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 软件开发公司的职务分布 杭州恩诚网络技术有限公司 连接另一个数据库 北京小型软件开发服务介绍 域名解析和服务器 郑州培训教育软件开发 网络技术安全的书 绍兴共赢网络技术有限公司好吗 db2数据库视频百度云 视频监控网络安全模块 枣庄苹果软件开发哪家靠谱 龙族幻想服务器中断 数据库系统概论第十四章ppt 数据库访问技术之间的关系 框架链接数据库 软件开发硬件建设 apex数据库互通 360网络安全大学基地 巴中市网络安全教育专题讲座 淮安进口刀片服务器厂家 长春工程学院计算机网络技术 怎么选择云服务器 职业技术学院软件开发教材 青少年网络安全政协提案 软件开发英语和数学哪个好 计算机网络技术证书有用处吗 网管服务器数据下发超时电信 市疾控中心网络安全等级保护 直播服务器登录失败怎么回事 重生之互联网科技帝国txt
0