千家信息网

RabbitMQ怎么用

发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,这篇文章将为大家详细讲解有关RabbitMQ怎么用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。1. RabbitMQ实战应用技巧1.1. 前言由于项目原因,之后会
千家信息网最后更新 2024年11月26日RabbitMQ怎么用

这篇文章将为大家详细讲解有关RabbitMQ怎么用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

1. RabbitMQ实战应用技巧

1.1. 前言

由于项目原因,之后会和RabbitMQ比较多的打交道,所以让我们来好好整理下RabbitMQ的应用实战技巧,尽量避免日后的采坑

1.2. 概述

RabbitMQ有几个重要的概念:虚拟主机,交换机,队列和绑定

  • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定,我们可以从虚拟主机层面的颗粒度进行权限控制

  • 交换机:Exchange用于转发消息,它并不存储消息,如果没有Queue队列绑定到Exchange,它会直接丢弃掉生产者发来的数据。
    交换机还有个关联的重要概念:路由键,消息转发到哪个队列根据路由键决定

  • 绑定:就是绑定交换机和队列,它是多对多的关系,也就是说多个交换机可以绑同一个队列,也可以一个交换机绑多个队列

1.3. 交换机

交换机有四种类型的模式Direct, topic, Headers and Fanout

1.3.1. Direct Exchage

Direct模式使用的是RabbitMQ的默认交换机,也是最简单的模式,适合比较简单的场景

如下图所示,使用Direct模式,我们需要创建不同的队列,而默认交换机则通过Routing key路由键的值来决定转发到哪个队列,可以看到,路由键绑定队列是可以指定多个的

1.3.2. Topic Exchange

Topic模式主要是根据通配符匹配,也就类似于模糊匹配,当这种匹配模式和路由键匹配后交换机就能转发消息到指定队列

  • 路由键为一串字符串,由句号(.)隔开,比如a.b.c

  • *)代表指定位置一个单词,(#)代表零个或者多个单词,比如a.*.b.#,表示a和b中间随意填个单词,b后面可以跟n个单词,比如a.x.b.c.d.e

Topic模式和Direct模式的区别在于交换机需要自己指定,路由键支持模糊匹配,例如:

rabbitTemplate.convertAndSend("topicExchange","a.x.b.d", " hello world!");

1.3.3. Headers Exchage

Headers也是根据规则匹配,但它不是根据路由键了,headers有个自定义匹配规则,它将匹配键值设在了消息的headers属性上,当这些键值对有一对或者全部匹配时,消息才会被投递到对应队列,这种模式效率相对较低,一般不推荐使用

1.3.4. Fanout Exchange

Fanout即为大名鼎鼎的广播模式了,它不需要管路由键,会把消息发给绑定它的全部队列,就算配置了路由键也会被忽略

1.4. 复杂情况

  1. 首先我们Direct模式,一个生产者一个消费者的情况,也就对应了一个发送者和一个队列A接收,这是没有疑问的,发送什么接收什么

  2. 当Direct模式,一个生产者发消息,开启多个消费者也就是多个相同queue,此时消息由多个消费者均匀分摊,不会重复消费(前提ack正常)

  3. 当Topic模式,一个交换机绑定两个队列,路由键有重叠关系,如下代码,此时指定路由键topic.message发送消息,队列queueMessagequeueMessages都能接收到相同消息,也就是说,topic模式可以实现类似于广播模式的形式,甚至更加灵活,它能否转发到消息由路由键决定。

  4. 相比于Fanout模式,我们如果要对消费者队列分组发送,我们需要指定不同的路由键;而Fanout模式则需要指定不同的交换机和队列绑定,实际使用结合实际情况

@Configurationpublic class TopicRabbitConfig {    final static String message = "topic.message";    final static String messages = "topic.messages";    @Bean    public Queue queueMessage() {        return new Queue(TopicRabbitConfig.message);    }    @Bean    public Queue queueMessages() {        return new Queue(TopicRabbitConfig.messages);    }    @Bean    TopicExchange exchange() {        return new TopicExchange("exchange");    }    @Bean    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");    }    @Bean    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");    }}

1.5. springboot配置

我们的常用配置如下

spring.rabbitmq.addresses=localhost:5672spring.rabbitmq.username=userspring.rabbitmq.password=123456spring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=1000##设置监听限制:最大10,默认5spring.rabbitmq.listener.simple.concurrency=5spring.rabbitmq.listener.simple.max-concurrency=10spring.rabbitmq.publisher-confirms=truespring.rabbitmq.publisher-returns=truespring.rabbitmq.template.mandatory=truespring.rabbitmq.listener.simple.acknowledge-mode=manual

其中最后四条配置需要着重解释:

  • spring.rabbitmq.publisher-confirms为true,表示生产者消息发出后,MQ的broker接收到了消息,发送回执表示确认接收,不设置则可能导致消息丢失

  • spring.rabbitmq.publisher-returns为true,表示当消息不能到达MQ的Broker端,,则使用监听器对不可达的消息做后续处理,这种一般是路由键没配好,或MQ宕机才可能发生

  • spring.rabbitmq.template.mandatory当上面两个为true时,这个一定要配true,否则上面两个不起作用

  • spring.rabbitmq.listener.simple.acknowledge-mode这个为manual表示手工确认,实际生产应该设为手工,才能保证你的业务是处理完成的,注意业务的幂等性,可重复调用,手工确认代码如下例子

@Componentpublic class RabbitReceiver {                @RabbitListener(bindings = @QueueBinding(                        value = @Queue(value = "queue-1",                         durable="true"),                        exchange = @Exchange(value = "exchange-1",                         durable="true",                         type= "topic",                         ignoreDeclarationExceptions = "true"),                        key = "springboot.*"                        )        )        @RabbitHandler        public void onMessage(Message message, Channel channel) throws Exception {                System.err.println("--------------------------------------");                System.err.println("消费端Payload: " + message.getPayload());                Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);                //手工ACK,获取deliveryTag                channel.basicAck(deliveryTag, false);        }}

1.6. 队列属性

  1. queue : 队列的名字

  2. durable : 为true表示队列中数据持久化到磁盘,可以防止mq宕机重启数据丢失

  3. exclusive : 为true表示排他性,只允许一个当前连接访问该队列,当前已连接就不允许新的连接进入否则报错,当连接断开当前队列会销毁

  4. autoDelete : 为true表示自动删除,当没有Connection连接到队列的时候,会自动删除

  5. arguments : 这个参数用来添加一些额外参数的,如下图片

    • 比如添加x-message-ttl为5000,则表示消息超过5秒没被处理就会超时过期;

    • x-expires设置120000表示队列在2分钟内没被消费则被删除;

    • x-max-length,x-max-length-bytes表示传送数据的最大长度和字节数

    • x-dead-letter-exchangex-dead-letter-routing-key表示死信交换机和死信路由,放在需要过期或处理失败的队列属性中,这些数据会转发到死信队列存储起来,创建普通的交换机和队列绑定,把交换机名填到x-dead-letter-exchange的值,填写路由键要符合死信队列的路由键

    • x-max-priority,表示设置优先级,范围为0~255,只有当消息堆积的时候,这个优先级才有意义,数字越大优先级越高

    • x-queue-mode当为lazy,表示惰性队列,3.6.0之后才被引入的概念,相比默认的模式,惰性队列模式会将生产者产生的消息直接存到磁盘中,这当然会增加IO开销,但适合应对大量消息堆积的情况;因为当大量消息堆积时,内存也不够存放,会将消息转存到磁盘,这个过程也是比较耗时且过程中不能接收新的消息。如果需要将普通队列转换成惰性队列需要将原来的队列删除,重新创建个惰性队列绑定。

1.7. 交换机属性

  1. exchange : 交换机名称

  2. type : 交换机类型

  3. durable : 持久化,同队列

  4. autoDelete : 是否自动删除,同队列

  5. internal : 若为true,表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定。

  6. arguments : 额外参数,目前只有个alternate-exchange,表示当生产者发送消息到这个交换机,路由不到该交换机的队列,则会尝试这个参数指定的交换机进行路由,若路由键匹配,则路由到alternate-exchange指定的队列,相当于转发了,刚好和上一个参数internal配合,若不想本交换机起到路由队列的作用,可以设置internal为true,把消息都转发到alternate-exchange指定的交换机,由该交换机来路由指定队列,

    • 如下图:exchange0设置了alternate-exchange交换机为exchange1,生产者发送数据到exchange0路由键为test1,在exchange0路由不到,则转发到exchange1判断路由符合,发送到队列queue1

1.8. 磁盘和内存

在RabbitMQ的管理界面,当我们集群部署时可以看到Nodes节点中Info字段可能为**disc也可能ram**,表示了磁盘存储或内存储存。事实上,在集群部署的时候,我们至少要一个磁盘储存,它代表了将交换机,队列,绑定,用户等元数据持久化保存到磁盘,一遍重启RabbitMQ也能恢复到原先的状态,当只有一个节点时,必定是磁盘存储;而内存储存也有它的优势,就是效率更高速度更快

1.9. 报错案例

  • 当报下列错误,表示你一定存在排他性队列,也就是设置了exclusive属性的队列,由于同一个连接创建的不同通道可以访问同一个队列,此时由于这个排他属性会得到资源被锁定错误,也就是下列的错误。

  • 由此我们可以知道,若你把队列设置成了exclusive属性的,那么就别创建新的连接去访问同一个队列

ESOURCE_LOCKED - cannot obtain exclusive access to locked queue xxxxxx

关于"RabbitMQ怎么用"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

队列 交换机 消息 路由 模式 磁盘 生产 多个 属性 数据 生产者 消费 也就是 参数 不同 主机 内存 单词 情况 惰性 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 数据库中平凡依赖是什么原因 广东国内网络技术开发展示 女生学软件开发可以吗 pkpm软件开发应届生薪资 胶片放映机服务器 里乐网络技术有限公司 网络安全板块拉升任子行涨近8% 数据库如何选下拉列表 数据库sql循环语句 了解网络安全知识内容 mongodb登陆数据库 网络安全硕士留学 远程数据库连接慢 临沂股票软件开发 济南有实力的浪潮服务器供应商 安全设备日志推送给审计服务器 数据库技术应用现状 数据安全级别与标记数据库字段 北京杜革网络技术有限公司 网络安全建设整体思路 中秋节网络安全手抄报 计算机数据库素材 格斗哈拉连接服务器需要多久 数据存储在数据库是否安全 php后台密码数据库 通辽软件开发公司 关于旅游软件开发内容及目标 软件开发工具是什么样的软件 一般纳税人软件开发所得税 广州哪里有软件开发公司电话
0