如何进行RabbitMq的简单使用
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这期内容当中小编将会给大家带来有关如何进行RabbitMq的简单使用,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1.pom文件中加入依赖org.springfra
千家信息网最后更新 2025年02月03日如何进行RabbitMq的简单使用
这期内容当中小编将会给大家带来有关如何进行RabbitMq的简单使用,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
1.pom文件中加入依赖
org.springframework.boot spring-boot-starter-amqp 2.3.3.RELEASE
2.配置文件,配置mq
自动配置信息 这里我开启ACK消息确认server.port=8088#服务器配置spring.application.name=rabbitmq-test-sending#rabbitmq连接参数spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest# 开启发送确认spring.rabbitmq.publisher-confirms=true# 开启发送失败退回spring.rabbitmq.publisher-returns=true# 开启ACKspring.rabbitmq.listener.direct.acknowledge-mode=manual
3.Rabbit配置类,使用topic交换器,使用通配符,一个交换器对应多个queue
import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class RabbitmqConfig {//队列 @Bean public Queue queueTest1(){return new Queue("queueTest1",true); }/* * 设置消息队列的TTL(过期时间) * */ @Bean public Queue queueTest2(){/** * 队列的名称,是否持久化,是否独享、排外的,是否自动删除,队列的其他属性参数 * (1)x-message-ttl:消息的过期时间,单位:毫秒; * (2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒; * (3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息; * (4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息; * (5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。 * (6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中; * (7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值 * (8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false) * (9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级; * (10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息; * (11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。 */ Maparguments = new HashMap<>(); arguments.put("x-message-ttl", 5000);return new Queue("queueTest2", true, false, false, arguments); }//交换机 @Bean public TopicExchange exchangeTest(){//可以传exchange名字,是否支持持久化,是否可以自动删除 return new TopicExchange("exchangeTest",true,false); }@Bean public Binding bindQueueTest1AndExchange(){return BindingBuilder.bind(queueTest1()).to(exchangeTest()).with("phone.routing.*"); }@Bean public Binding bindQueueTest2AndExchange(){return BindingBuilder.bind(queueTest2()).to(exchangeTest()).with("web.routing.*"); }}
4.生产者
import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.Date;import java.util.UUID;/** 生产者,带消息确认* */@Servicepublic class PruSender implements RabbitTemplate.ReturnCallback {@Autowired private RabbitTemplate rabbitTemplate;//routing_key,把消息发送到相应的队列中 public void sendMessage(String routing_key){//发送内容 String context = "你好现在是 " + new Date();this.rabbitTemplate.setReturnCallback(this);//发送失败退回 this.rabbitTemplate.setConfirmCallback((correlationData,ack,message)->{//手动发送消息确认 if(!ack){ System.out.println("消息发送失败" + message + correlationData.toString()); }else{ System.out.println("消息发送成功" + correlationData.toString()); } }); CorrelationData correlationData = new CorrelationData(); correlationData.setId(UUID.randomUUID().toString());//交换机名称、routingKey、内容、消息Id this.rabbitTemplate.convertAndSend("exchangeTest",routing_key, context,correlationData); }@Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("sender return success" + message.toString() + "===" + i + "===" + s1 + "===" + s2); }}
5.消费者
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;import java.io.IOException;/** 消费者,带消息确认** */@Service@RabbitListener(queues = "queueTest")public class Receiver {//消息内容,通道,消息的属性信息 @RabbitHandler public void immediateProcess(String text,Channel channel,Message message) throws IOException {try { System.out.println("Receiver" + text);/** * 手动确认,通知mq已经成功消费改条信息,可以删除了 * //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息 */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace();/* *消费消息失败 * 第二个参数是否应用于多消息,第三个参数是否从新计入队列 * */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true); } }}
交换机类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
上述就是小编为大家分享的如何进行RabbitMq的简单使用了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
消息
队列
消费
内容
最大
交换器
消费者
配置
交换机
信息
参数
时间
路由
名称
模式
死信
长度
支持
成功
内存
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
流水施工和网络技术的作用
朗驰网络视频服务器登录密码
山东电站网络安全安装
大学数据库理论
上海市博升拓网络技术有限公司
0元代付代码软件开发
美国usda数据库orac值
网络技术相关的软件
dayz怎么分是中国服务器
韶音科技软件开发待遇怎么样
网络安全42项风险
介绍一下软件开发
数据库insert的作用
软件开发生命周期的相关内容
南宁创现网络技术公司
戴尔服务器设置显卡启动
企业软件开发交税
计算机网络技术局域网
ftp服务器支持多少客户端连接
大理精益管理软件开发
天行vpn服务器
家庭网络安全咨询
网络安全的名言手抄报图片
数据库的内置函数大全
咸宁一流的计算机软件开发
linux查询服务器内存
网络安全攻防技战法
建立干部工作数据库
科技创新互联网行业
01022数据库技术及应用