千家信息网

如何理解Java RabbitMQ的TTL和DLX

发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,这篇文章给大家介绍如何理解Java RabbitMQ的TTL和DLX,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。RabbitMQ的TTL1、TTL概述RabbitMQ的TTL全
千家信息网最后更新 2025年01月22日如何理解Java RabbitMQ的TTL和DLX

这篇文章给大家介绍如何理解Java RabbitMQ的TTL和DLX,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

RabbitMQ的TTL

1、TTL概述

RabbitMQ的TTL全称为Time-To-Live,表示的是消息的有效期。消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了。设置TTL有两种方式:

  1. 第一种是声明队列的时候,在队列的属性中设置,这样该队列中的消息都会有相同的有效期;

  2. 第二种是发送消息时给消息设置属性,可以为每条消息都设置不同的TTL。

如果两种方式都设置了,则以设置的较小的为准。两者的区别:如果声明队列时设置了有效期,则消息过期了就会被删掉;如果是发消息时设置的有效期,消息过期了也不会被立马删掉,因为这时消息是否过期是在要投递给消费者时判断的。至于为啥要这样处理很容易想清楚:第一种方式队列的消息有效期都一样,先入队的在队列头部,头部也是最早要过期的消息,RabbitMQ起一个定时任务从队列的头部开始扫描是否有过期消息即可;第二种方式每条消息的过期时间不同,所以只有遍历整个队列才可以筛选出来过期的消息,这样效率太低了,而且消息量大了之后根本不可行的,可以等到消息要投递给消费者时再判断删除,虽然删除的不够及时但是不影响功能,其实就是用空间换时间。

如果不设置TTL,则表示此消息永久有效(默认消息是不会失效的)。如果将TTL设为0,则表示如果消息不能被立马消费则会被立即丢掉,这个特性可以部分替代RabbitMQ3.0以前支持的immediate参数,之所以所部分代替,是应为immediate参数在投递失败会有basic.return方法将消息体返回(这个功能可以利用死信队列来实现)。

2、设置消息有效期

2.1、通过队列设置有效期

还记得我们之前声明队列的方法吗,queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments),该方法的最后一个参数可以设置队列的属性,属性名为x-message-ttl,单位为毫秒。如果不清楚队列属性有哪些,可以查看web控制台的添加队列的地方。

具体代码如下:

//设置队列上所有的消息的有效期,单位为毫秒Map argss = new HashMap();arguments.put("x-message-ttl " , 5000);//5秒钟channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;

查看控制台的队列列表如下:D表示持久化,TTL表示设置了消息的有效期。

过了几秒钟后发现消息已经不存在了。

也可以用RabbitMQ的命令行模式来设置:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

还可以通过HTTP接口调用:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}' http://ip:15672/api/queues/{vhost}/{queuename}
2.2、通过发送消息时设置有效期

发送消息时basicPublish方法可以设置属性参数,里面通过expiration属性设置消息有效期,单位为毫秒,代码如下所示

Builder bd = new AMQP.BasicProperties().builder();bd.deliveryMode(2);//持久化bd.expiration("100000");//设置消息有效期100秒钟BasicProperties pros = bd.build();String message = "测试ttl消息";channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());

另外也可以通过HTTPAPI 接口设置:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPOST -d'{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my body","payload_encoding":"string"}'  http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

完整的通过队列设置消息有效期、发布消息时通过属性设置有效期的代码如下:可以运行后,观察下控制台,可以发现同时设置时,消息的有效期是以较小的为准的。项目GitHub地址 https://github.com/RookieMember/RabbitMQ-Learning.git。

package cn.wkp.rabbitmq.newest.ttl; import java.util.HashMap;import java.util.Map; import com.rabbitmq.client.AMQP;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.AMQP.BasicProperties.Builder;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection; import cn.wkp.rabbitmq.util.ConnectionUtil; /** *  * @ClassName: Send * @Description: 消息有效期  * @author wkg * @date: 2021年9月1日 下午11:28:22 */public class Send {         private final static String EXCHANGE_NAME = "ttl_exchange";        private final static String QUEUE_NAME = "ttl_queue";         public static void main(String[] argv) throws Exception {                // 获取到连接以及mq通道                Connection connection = ConnectionUtil.getConnection();                // 从连接中创建通道                Channel channel = connection.createChannel();                 // 声明交换机                channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);                                //*****1:通过队列设置有效期 2:通过消息属性设置有效期,如果都设置了以较小的为准*****                //声明队列                Map arguments=new HashMap();                //设置队列上所有的消息的有效期,单位为毫秒                arguments.put("x-message-ttl", 5000);//5秒钟                channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);                //绑定                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");                                Builder bd = new AMQP.BasicProperties().builder();                bd.deliveryMode(2);//持久化                bd.expiration("100000");//设置消息有效期100秒钟                BasicProperties pros = bd.build();                String message = "测试ttl消息";                channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());                System.out.println("Sent message:" + message);//               关闭通道和连接                channel.close();                connection.close();        }}

3、设置队列有效期(不常用,仅作了解)

上面在web管控台添加队列的时候,我们看到有一个x-expires参数,可以让队列在指定时间内 "未被使用" 的话会自动过期删除,未使用的意思是 queue 上没有任何 consumer,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。该方式可用于,例如,RPC-style 的回复 queue, 其中许多queue 会被创建出来,但是却从未被使用。

服务器会确保在过期时间到达后 queue 被删除,但是不保证删除的动作有多么的及时。在服务器重启后,持久化的queue 的超时时间将重新计算。 x-expires 参数值以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,且不能设置为 0 。所以,如果该参数设置为 1000 ,则表示该 queue 如果在 1s之内未被使用则会被删除。

Map args = new HashMap();  args.put("x-expires", 18000);  //队列有效期18秒channel.queueDeclare("myqueue", false, false, false, args);

RabbitMQ的DLX

1、DLX是什么

DLX是Dead-Letter-Exchange的简写,意思是死信交换机。

它的作用其实是用来接收死信消息(dead message)的。那什么是死信消息呢?一般消息变成死信消息有如下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false

  • 消息过期

  • 队列达到最大长度

当消息在一个队列中变成了死信消息后,可以被发送到另一个交换机,这个交换机就是DLX,绑定DLX的队列成为死信队列。当这个队列中存在死信时, RabbitMQ 就会立即自动地将这个消息重新发布到设置的DLX 上去,进而被路由到绑定该DLX的死信队列上。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的TTL 设置为0 配合使用可以弥补imrnediate 参数的功能。

2、DLX有什么用

因为消息如果未被正常消费并设置了requeue为false时会进入死信队列,我们可以监控消费死信队列中消息,来观察和分析系统的问题。DLX还有一个非常重要的作用,就是结合TTL实现延迟队列(延迟队列的使用范围还是挺广的:比如下单超过多长时间自动关闭;比如我们接入过第三方支付系统的同学一定知道,我们的订单中会传一个notify_url用于接收支付结果知,如果我们给第三方支付响应的不是成功的消息,其会隔一段时间继续调用通知我们的notify_url,超过几次后不再进行通知,一般通知频率都是 0秒-5秒-30秒-5分钟-30分钟-1小时-6小时-12小时;比如我们的家用电器定时关机。。。。。。这些场景都是可以用延迟队列实现的)。

3、DLX使用方式

下面在web管控台添加队列的时候,我们看到有两个DLX相关的参数:x-dead-letter-exchange和x-dead-letter-routing-key。x-dead-letter-exchange是设置队列的DLX的;x-dead-letter-routing-key是设置死信消息进入DLX时的routing key的,这个是可以不设置的,如果不设置,则默认使用原队列的routing key。

客户端可以通过channel.queueDeclare方法声明队列时设置x-dead-letter-exchange参数,具体代码如下所示

channel.exchangeDeclare("dlx_exchange" , "direct"); //创建DLX: dlx_exchangeMap args = new HashMap();args.put("x-dead-letter-exchange" , "dlx_exchange ");//设置DLXargs.put("x-dead-letter-routing-key" , "dlx-routing-key");//设置DLX的路由键(可以不设置)//为队列myqueue 添加DLXchannel.queueDeclare("myqueue" , false , false , false , args);

上面说的可能比较抽象,下面我们通过一个具体的例子,来演示一下DLX的具体使用:

package cn.wkp.rabbitmq.newest.dlx; import java.util.Date;import java.util.HashMap;import java.util.Map; import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.MessageProperties; import cn.wkp.rabbitmq.util.ConnectionUtil; public class SendDLX {         public static void main(String[] args) throws Exception {                Connection connection = ConnectionUtil.getConnection();                Channel channel = connection.createChannel();                //声明一个交换机,做死信交换机用                channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);                //声明一个队列,做死信队列用                channel.queueDeclare("dlx_queue", true, false, false, null);                //队列绑定到交换机上                channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");                                channel.exchangeDeclare("normal_exchange", "fanout", true, false, null);                Map arguments=new HashMap();                arguments.put("x-message-ttl" , 5000);//设置消息有效期1秒,过期后变成私信消息,然后进入DLX                arguments.put("x-dead-letter-exchange" , "dlx_exchange");//设置DLX                arguments.put("x-dead-letter-routing-key" , "dlx.test");//设置DLX的路由键(可以不设置)                //为队列normal_queue 添加DLX                channel.queueDeclare("normal_queue", true, false, false, arguments);                channel.queueBind("normal_queue", "normal_exchange", "");                                channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("测试死信消息").getBytes());                System.out.println("发送消息时间:"+ConnectionUtil.formatDate(new Date()));                                channel.close();                connection.close();        }}

上面是发送者的代码,运行后观察控制台可以看到如下所示:

死信队列dlx_queue的绑定如下,其已与死信交换机dlx_exchange(topic类型)进行了绑定,routing key为"dlx.*"

队列normal_queue的绑定如下,其已与交换机normal_exchange(fanout类型)进行了绑定

queues视图如下:DLX和DLK表示设置给normal_queue设置了死信交换机和死信消息的routing key,我们看到消息已经被路由到了死信队列上面。整个流程为:

  • 消息发送到交换机normal_exchange,然后路由到队列normal_queue上

  • 因为队列normal_queue没有消费者,消息过期后成为死信消息

  • 死信消息携带设置的x-dead-letter-routing-key=dlx.test进入到死信交换机dlx_exechage

  • dlx_exechage与dlx_queue绑定的routing key为"dlx.*",死信消息的路由键dlx.test符合该规则被路由到dlx.queue上面。

然后我们给死信队列添加消费者如下:我们测试一下死信消息进入DLX的时间,先将之前的那个死信消息删除

package cn.wkp.rabbitmq.newest.dlx; import java.io.IOException;import java.util.Date; import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope; import cn.wkp.rabbitmq.util.ConnectionUtil; public class RecvDLX {         public static void main(String[] argv) throws Exception {                Connection connection = ConnectionUtil.getConnection();                final Channel channel = connection.createChannel();                 channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);                channel.queueDeclare("dlx_queue", true, false, false, null);                channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");                 // 指该消费者在接收到队列里的消息但没有返回确认结果之前,它不会将新的消息分发给它。                channel.basicQos(1);                 Consumer consumer = new DefaultConsumer(channel) {                        @Override                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)                                        throws IOException {                                System.out.println("消费者收到消息:" + new String(body)+",当前时间:"+ConnectionUtil.formatDate(new Date()));                                // 消费者手动发送ack应答                                channel.basicAck(envelope.getDeliveryTag(), false);                        }                };                System.out.println("消费死信队列中的消息======================");                // 监听队列                channel.basicConsume("dlx_queue", false, consumer);        }}

运行结果如下(先运行的死信队列消费者,然后运行生产者):我们看到消息过期后10毫秒就被死信队列的消费者消费到了,显然,消息成为死信后是立即被发送到了DLX中。

消费死信队列中的消息======================
消费者收到消息:测试死信消息,当前时间:2021-09-24 16:30:05:740

发送消息时间:2021-09-24 17:57:00:730

关于如何理解Java RabbitMQ的TTL和DLX就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0