千家信息网

Spring Boot中怎么利用RabbitMQ实现优先级队列

发表于:2024-10-02 作者:千家信息网编辑
千家信息网最后更新 2024年10月02日,这篇文章给大家介绍Spring Boot中怎么利用RabbitMQ实现优先级队列,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。本地运行 RabbitMQdocker run -d
千家信息网最后更新 2024年10月02日Spring Boot中怎么利用RabbitMQ实现优先级队列

这篇文章给大家介绍Spring Boot中怎么利用RabbitMQ实现优先级队列,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

本地运行 RabbitMQ

docker run -d \--name rabbitmq \--restart always \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_USER=user \-e RABBITMQ_DEFAULT_PASS=password \rabbitmq:3-management

访问可视化面板

地址:http://127.0.0.1:15672/

账号:user

密码:password

Spring Boot With RabbitMQ

Spring Boot 集成 RabbitMQ

                 org.springframework.boot            spring-boot-starter-amqp        

基本参数配置

# host & portspring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672

Queue / Exchange / Routing 配置

/** * RabbitMQ 配置 */@Configurationpublic class RabbitMQConfig {    private static final String EXCHANGE = "priority-exchange";    public static final String QUEUE = "priority-queue";    private static final String ROUTING_KEY = "priority.queue.#";    /**     * 定义优先级队列     */    @Bean    Queue queue() {        Map args= new HashMap<>();        args.put("x-max-priority", 100);        return new Queue(QUEUE, false, false, false, args);    }    /**     * 定义交换器     */    @Bean    TopicExchange exchange() {        return new TopicExchange(EXCHANGE);    }    @Bean    Binding binding(Queue queue, TopicExchange exchange) {        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);    }}

priority queue 定义参考官方文档:https://www.rabbitmq.com/priority.html

Spring Boot 应用启动后,会自动创建 Queue 和 Exchange ,并相互绑定,优先级队列会有如图所示标识。

RabbitMQ Publisher

Spring Boot 相关配置

# 是否开启消息发送到交换器(Exchange)后触发回调spring.rabbitmq.publisher-confirms=false# 是否开启消息发送到队列(Queue)后触发回调spring.rabbitmq.publisher-returns=false# 消息发送失败重试相关配置spring.rabbitmq.template.retry.enabled=truespring.rabbitmq.template.retry.initial-interval=3000msspring.rabbitmq.template.retry.max-attempts=3spring.rabbitmq.template.retry.max-interval=10000msspring.rabbitmq.template.retry.multiplier=1

发送消息

@Component@AllArgsConstructorpublic class FileMessageSender {    private static final String EXCHANGE = "priority-exchange";    private static final String ROUTING_KEY_PREFIX = "priority.queue.";    private final RabbitTemplate rabbitTemplate;    /**     * 发送设置有优先级的消息     *     * @param priority 优先级     */    public void sendPriorityMessage(String content, Integer priority) {        rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content,                message -> {                    message.getMessageProperties().setPriority(priority);                    return message;                });    }}

RabbitMQ Consumer

Spring Boot 相关配置

# 消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认)spring.rabbitmq.listener.simple.acknowledge-mode=AUTO# 最小线程数量spring.rabbitmq.listener.simple.concurrency=10# 最大线程数量spring.rabbitmq.listener.simple.max-concurrency=10# 每个消费者可能未完成的最大未确认消息数量spring.rabbitmq.listener.simple.prefetch=1

消费者执行耗时较长的话,建议 spring.rabbitmq.listener.simple.prefetch 设置为较小数值,让优先级越高的消息更快加入到消费者线程。

监听消息

@Slf4j@Componentpublic class MessageListener {    /**     * 处理消息     */    @RabbitListener(queues = "priority-queue")    public void listen(String message) {        log.info(message);    }}

番外补充

1、自定义消息发送确认的回调

配置如下:

# 开启消息发送到交换器(Exchange)后触发回调spring.rabbitmq.publisher-confirms=true# 开启消息发送到队列(Queue)后触发回调spring.rabbitmq.publisher-returns=true

自定义

RabbitTemplate.ConfirmCallback

实现类

@Slf4jpublic class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        log.info("消息唯一标识: {}", correlationData);        log.info("确认状态: {}", ack);        log.info("造成原因: {}", cause);    }}

自定义

RabbitTemplate.ConfirmCallback

实现类

@Slf4jpublic class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {        log.info("消息主体: {}", message);        log.info("回复编码: {}", replyCode);        log.info("回复内容: {}", replyText);        log.info("交换器: {}", exchange);        log.info("路由键: {}", routingKey);    }}

配置 rabbitTemplate

@Component@AllArgsConstructorpublic class RabbitTemplateInitializingBean implements InitializingBean {    private final RabbitTemplate rabbitTemplate;    @Override    public void afterPropertiesSet() {        rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack());        rabbitTemplate.setReturnCallback(new RabbitReturnCallback());    }    }

关于Spring Boot中怎么利用RabbitMQ实现优先级队列就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0