千家信息网

如何设置RabbitMQ延迟队列

发表于:2024-10-22 作者:千家信息网编辑
千家信息网最后更新 2024年10月22日,小编给大家分享一下如何设置RabbitMQ延迟队列,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!延迟消费。比如:用户生成订
千家信息网最后更新 2024年10月22日如何设置RabbitMQ延迟队列

小编给大家分享一下如何设置RabbitMQ延迟队列,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

延迟消费。比如:用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单;用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。

rabbitmq的消息TTL和死信Exchange结合

1.消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。

2.Dead Letter Exchanges

Exchage的概念在这里就不在赘述。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

①.一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

②. 上面的消息的TTL到了,消息过期了。

③. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

3.实现延迟队列

我们先设置好各个配置的字符串

public interface TestMq {/**     * 队列名     */    String TEST_QUEUE = "test";;    /**     * 服务添加routing key     */    String ROUTING_KEY_TEST = "post.test";    /**     * 死信队列     */    String DEAD_QUEUE = "dead";    String ROURING_KEY_DEAD = "dead.routing.key";    String MQ_EXCHANGE_DEAD = "dead.exchange";}

配置信息

/** * rabbitmq配置 * */@Configurationpublic class RabbitmqConfig {   /**    * 死信队列    * @return    */   @Bean   public Queue deadQueue() {      Map arguments = new HashMap<>();      //此处填入死信交换机      arguments.put("x-dead-letter-exchange", TestMq.MQ_EXCHANGE_DEAD);      //此处填入消息队列的路由,而非死信队列自己的路由      arguments.put("x-dead-letter-routing-key", TestMq.ROUTING_KEY_TEST);      return new Queue(TestMq.DEAD_QUEUE,true,false,false,arguments);   }   /**    * 死信交换机    * @return    */   @Bean   public DirectExchange deadExchange() {      return new DirectExchange(TestMq.MQ_EXCHANGE_DEAD);   }   /**    * 绑定死信队列到死信交换机    * @return    */   @Bean   public Binding bindingDeadExchange() {      return BindingBuilder.bind(deadQueue()).to(deadExchange())            .with(TestMq.ROURING_KEY_DEAD);   }   /**    * 被消费者侦听的获取消息的队列    * @return    */   @Bean   public Queue testQueue() {      return new Queue(TestMq.TEST_QUEUE,true,false,false);   }   /**    * 将消息队列绑定到死信交换机,跟死信队列的路由不同    * @return    */   @Bean   public Binding bindingTest() {      return BindingBuilder.bind(testQueue()).to(deadExchange())            .with(TestMq.ROUTING_KEY_TEST);   }}

消息生产者

@Slf4j@Componentpublic class TestSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired    private RabbitTemplate rabbitTemplate;    public void send(String exchange,String routingKey,Object content) {log.info("send content=" + content);        this.rabbitTemplate.setMandatory(true);        this.rabbitTemplate.setConfirmCallback(this);        this.rabbitTemplate.setReturnCallback(this);        MessagePostProcessor processor = message -> {//给消息设置的过期时间,我们这里为10秒            message.getMessageProperties().setExpiration(10000 + "");            return message;        };        this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content),processor);    }/**     * 确认后回调:     * @param correlationData     * @param ack     * @param cause     */    @Override    public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {if (!ack) {log.info("send ack fail, cause = " + cause);        } else {log.info("send ack success");        }    }/**     * 失败后return回调:     *     * @param message     * @param replyCode     * @param replyText     * @param exchange     * @param routingKey     */    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);    }/**     * 对消息对象进行二进制序列化     * @param o     * @return     */    private byte[] serialize(Object o) {        Kryo kryo = new Kryo();        ByteArrayOutputStream stream = new ByteArrayOutputStream();        Output output = new Output(stream);        kryo.writeObject(output, o);        output.close();        return stream.toByteArray();    }}

消费者

@Slf4j@Component@RabbitListener(queues = TestMq.TEST_QUEUE)public class TestConsumer {@RabbitHandler    public void receice(byte[] data, Channel channel, Message message) throws IOException {try {//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);            Integer orderNo = unSerialize(data);            log.info(orderNo + "为收到的消息");        } catch (IOException e) {            e.printStackTrace();            //丢弃这条消息            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);            log.info("receiver fail");        }    }/**     * 反序列化     * @param data     * @return     */    private Integer unSerialize(byte[] data) {        Input input = null;        try {            Kryo kryo = new Kryo();            input = new Input(new ByteArrayInputStream(data));            return kryo.readObject(input,Integer.class);        }finally {            input.close();        }    }}

我们随便写个测试

@Servicepublic class TestService {@Autowired    private TestSender sender;    @PostConstruct    public void test() {//此处顺序为死信交换机,死信队列路由,消息        sender.send(TestMq.MQ_EXCHANGE_DEAD,TestMq.ROURING_KEY_DEAD,1);    }}

经测试

2019-10-11 17:26:18.079 INFO 879 --- [ main] c.g.rabbitdelay.config.TestSender : send content=1
2019-10-11 17:26:18.098 INFO 879 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [xxx.xxx.xxx.xxx:5672]
2019-10-11 17:26:18.227 INFO 879 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2301b75:0/SimpleConnection@243f003c [delegate=amqp://admin@xxx.xxx.xxx.xxx:5672/, localPort= 52345]
2019-10-11 17:26:18.337 INFO 879 --- [39.9.225.2:5672] c.g.rabbitdelay.config.TestSender : send ack success
2019-10-11 17:26:18.446 INFO 879 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-10-11 17:26:18.751 INFO 879 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2019-10-11 17:26:18.959 INFO 879 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-10-11 17:26:18.962 INFO 879 --- [ main] c.g.rabbitdelay.RabbitdelayApplication : Started RabbitdelayApplication in 17.093 seconds (JVM running for 27.45)
2019-10-11 17:26:28.342 INFO 879 --- [ntContainer#0-1] c.g.rabbitdelay.consumer.TestConsumer : 1为收到的消息

通过日志可以看到,发送消息是18秒,收到消息消费为28秒,中间隔了10秒钟。

以上是"如何设置RabbitMQ延迟队列"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

消息 队列 死信 路由 时间 消费 延迟 交换机 用户 消费者 订单 不同 就是 篇文章 服务 配置 内容 序列 服务器 支付 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 怎样抓取app的数据库 河北 大学 网络安全 手抄报网络安全的建议 一些外文数据库进不去怎么办 计算机网络技术文档心得 欧孚网络技术有限公司母公司 网络安全大队大队长的职位 山东合联互联网科技有限公司 深圳电力应急网络技术开发展示 信息化网络安全产品 网络安全与学生学业 计算机网络技术教代码吗 采购网络安全设备 学校信息安全网络安全整改表 旧手机做服务器有什么用 武汉博睿实训软件开发 如何在orcal建数据库 电商平台app软件开发定制 网络安全知识宣传黑板报 超图系统如何重建工程数据库 美国和以色列网络安全现状 服务器怎么做网页访问限制 欧孚网络技术有限公司母公司 软件开发投标报价单 常用软件开发方法比较 培训直播软件开发 倒排索引数据库 湛江回收服务器免费上门评估 信息网络安全职业证书 动森联网跟服务器有关么
0