千家信息网

怎么进行spring boot rabbitMQ RPC实现

发表于:2025-02-08 作者:千家信息网编辑
千家信息网最后更新 2025年02月08日,今天就跟大家聊聊有关怎么进行spring boot rabbitMQ RPC实现,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。环境配置pack
千家信息网最后更新 2025年02月08日怎么进行spring boot rabbitMQ RPC实现

今天就跟大家聊聊有关怎么进行spring boot rabbitMQ RPC实现,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

环境配置

package com.example.demo;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Scope;@Configuration@EnableRabbitpublic class RabbitMQConfigurer {    @Autowired    private ConnectionFactory connectionFactory;    @Bean    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)    public RabbitTemplate rabbitTemplate() {        //必须是prototype类型        //Reply received after timeout        RabbitTemplate rabbitTemplate =  new RabbitTemplate(this.connectionFactory);        rabbitTemplate.setReceiveTimeout(9000);        return rabbitTemplate;    }       @Bean    @Qualifier("rpcTestExchange")    public DirectExchange rpcTestExchange() {        return new DirectExchange("rpcTest");    }    @Bean    public Queue rpcTestQueue() {        return new Queue("rpcTestQueue");    }    @Bean    public Binding rpcTestBind() {        return BindingBuilder.bind(rpcTestQueue()).to(rpcTestExchange()).with("addUser");    }}

server 端

package com.example.demo;import com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;@Component@RabbitListener(queues = "rpcTestQueue")public class UserServer {    private static final Logger LOGGER = LoggerFactory.getLogger(UserServer.class);    private final RabbitTemplate rabbitTemplate;    @Autowired    public UserServer(RabbitTemplate rabbitTemplate) {        this.rabbitTemplate = rabbitTemplate;    }    @RabbitHandler    public void process(@Payload String payload, Channel channel, @Header(AmqpHeaders.REPLY_TO) String replyTo,                        @Header(AmqpHeaders.CORRELATION_ID) String correlationId) throws Exception {        LOGGER.info("====== server receive data 【{}】  ====== ", payload);        this.rabbitTemplate.convertAndSend(replyTo, "then " + payload + " is create", message -> {            message.getMessageProperties().setCorrelationId(correlationId);            return message;        });        LOGGER.info("====== server response queue 【{}】 ======", replyTo);    }}

client 端

package com.example.demo;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Arrays;@Componentpublic class Client {    private static final Logger LOGGER = LoggerFactory.getLogger(Client.class);    private final RabbitTemplate rabbitTemplate;    @Autowired    public Client(RabbitTemplate rabbitTemplate) {        this.rabbitTemplate = rabbitTemplate;    }    public void doRequest() {        for (String name : Arrays.asList("张三", "李四", "王五")) {            LOGGER.info("---- client send user name is 【{}】", name);            Object response = this.rabbitTemplate.convertSendAndReceive("rpcTest", "addUser", name);            LOGGER.info("---- and response is : {} -------", response);        }    }}

客户端:

在请求发送消息之前,创建一个【匿名队列】绑定至默认的交换机(即 /)。将队【匿名队列】名称放在 reply_to 中与消息一起发送。

服务端:

处理理消息后,将应答消息发送至默认交换机即(/)。

看完上述内容,你们对怎么进行spring boot rabbitMQ RPC实现有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

0