千家信息网

怎么进行RabbitMQ Federation插件的分析

发表于:2025-01-27 作者:千家信息网编辑
千家信息网最后更新 2025年01月27日,怎么进行RabbitMQ Federation插件的分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。在文章开始之前,我
千家信息网最后更新 2025年01月27日怎么进行RabbitMQ Federation插件的分析

怎么进行RabbitMQ Federation插件的分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

在文章开始之前,我们先介绍一下联邦机制的基本概念。联邦机制的实现,依赖于RabbitMQ的Federation插件,该插件的主要目标是为了RabbitMQ可以在多个 Broker节点或者集群中进行消息的无缝传递。

Federation插件可以让多个交换器和多个队列进行联邦。一个联邦交换器或者一个联邦队列接受上游(位于其他Broker上的交换器和队列)消息。联邦交换器能够将原本发送给上游交换器(upstream exchange)的消息路由到本地的某个队列中;联邦队列则允许一个本地消费者接收到来自上游队列(upstream queue)的消息。

联邦交换器

下面先假设一种场景,BrokerA服务部署在上海,BrokerB服务部署在北京。来自上海的ClientA向BrokerA的exchangeA发送消息网络延迟很小,但是北京的ClientB向BrokerA的exchangeA发送消息那么将会面临网络延迟的问题。Federation机制则可以帮助我们解决这个问题。

首先在BrokerA的exchangeA上与北京的BrokerB建立一条单向的Federation Link。此时Federation插件会在BrokerB上建立一个同名的交换器(可以配置,默认同名),并且还会建立一个内部交换器federation:exchangeA->Broker B(其中Broker为集群名称)通过相同的绑定建进行绑定,于此同时Federation插件会建立一个federation:exchangeA->Broker B(BrokerB为集群名称),并且将内部交换器federation:exchangeA->Broker B绑定到该队列。

Federation插件会在队列federation:exchangeA->Broker B与BrokerA中的交换器exchangeA之间建立一条AMQP连接来实时地消费队列federation:exchangeA->Broker B中的数据。这些操作都是内部的,对外部业务客户端来说这条Federation link建立在BrokerA的exchangeA和BrokerB的exchangeA之间。

此时ClientB可以以较小的网络延迟向BrokerB的exchangeA发送消息,并且该消息会被正确路由到BrokerA中的exchangeA中,通过Federation插件我们可以以较小的网络延迟向与客户端属于不同地域的Broker节点发送消息。

"max_hops=1"表示一条消息最多被转发的次数为1。

默认的交换器(每个vhost下都会默认创建一个名为""的交换器)和内部交换器,不能对其使用Federation的功能。

联邦队列

队列queue1和queue2原本在broker2中,由于某种需求将其配置为federated queue并将broker1作为upstream。Federation插件会在broker1上创建同名的队列queue1和queue2,与broker2中的队列queue1和queue2分别建立两条单向独立的Federation link。当有消费者ClientA连接broker2并通过Basic.Consume消费队列queue1(或queue2)中的消息时,如果队列queue1(或queue2)中本身有若干消息堆积,那么ClientA直接消费这些消息,此时broker2中的queue1(或queue2)并不会拉取broker1中的queue1(或queue2)的消息;如果队列queue1(或queue2)中没有消息堆积或者消息被消费完了,那么它会通过Federation link拉取在broker1中的上游队列queue1(或queue2)中的消息(如果有消息),然后存储到本地,之后再被消费者ClientA进行消费。

和federated exchange不同,一条消息可以在联邦队列间转发无限次。两个队列可以互为联邦队列。

如果两个队列互为联邦队列,队列中的消息除了被消费,还会转向有多余消费能力的一方,如果这种"多余的消费能力"在broker1和broker2中来回切换,那么消费也会在broker1和broker2中的队列queue中来回转发

federation queue只能使用Basic.Consume进行消费,并且不具备传递性。

Federation使用

Federation插件默认在RabbitMQ发布包中,执行rabbitmq-plugins enable rabbitmq_federation命令可以开启Federation功能。rabbitmq-plugins enable rabbitmq_federation命令会同时开启amqp_client插件。如果要开启Federation的管理插件,需要执行rabbitmq-plugins enable rabbitmqfederation_management命令。

当需要在集群中使用Federation功能的时候,集群中所有的节点都应该开启Federation插件。

在前面的章节中我们搭建了一个3个节点的rabbitmq集群(假设在上海),我们在搭建一个两个节点的集群(假设在北京),这个读者自行去搭建吧。在我们第一个集群中默认的vhost下有一个exchangeA和queueA,我们北京的一个客户端需要向exchangeA中发消息,为了减轻网络延迟,我们使用联邦机制,将北京的集群作为上行资源,下面我们讲一下配置。

首先在上海集群中配置Federation Upstreams,配置如下图:

  • name:定义这个upstream的名称,必填项

  • uri:定义upstream的AMQP连接,我们这里设置的是amqp://upstreamer:123456@rabbit111:5672(这里是一个北京集群一个节点地址),格式amqp://username:password@host:port

  • Prefetch count:定义Federation内部缓存的消息条数,即在收到上游消息之后且在发送到下游之前缓存的消息条数。

  • Reconnect delay:Federation link由于某种原因断开之后,需要等待多少秒开始重新建立连接。

  • Acknowledgement Mode:定义Federation link 的消息确认方式。其有3种:on-confirm、on-publish、no-ack。默认为on-confirm,表示在接收到下游的确认消息(等待下游的Basic.Ack)之后再向上游发送消息确认,这个选项可以确保网络失败或者Broker宕机时不会丢失消息,但也是处理速度最慢的选项。如果设置为on-publish,则表示消息发送到下游后(并需要等待下游的Basic.Ack)再向上游发送消息确认,这个选项可以确保在网络失败的情况下不会丢失消息,但不能确保Broker宕机时不会丢失消息。no-ack表示无须进行消息确认,这个选项处理速度最快,但也最容易丢失消息。

  • Trust User-ID:设定Federation是否使用"Validated User-ID" 这个功能。如果设置为false或者没有设置,那么Federation会忽略消息的user_id 这个属性;如果设置为true,则Federation只会转发user_id为上游任意有效的用户的消息。

  • Exchange:指定upstream exchange的名称,默认情况下和federated exchange同名,这里我们指定的是beijingExchange

  • Max hops:指定消息被丢弃前在Federation link中最大的跳转次数。默认为1。注意即使设置max-hops参数为大于1的值,同一条消息也不会在同一个Broker中出现2次,但是有可能会在多个节点中被复制。

  • Expires:指定Federation link断开之后,federated queue所对应的upstream queue的超时时间,默认为"none",表示为不删除,单位为ms。这个参数相当于设置普通队列的x-expires参数。设置这个值可以避免Federation link断开之后,生产者一直在向北京集群中的upstream exchange发送消息,这些消息又不能被转发到上海的集群中而被消费掉,进而造成upstream exchange中有大量的消息堆积。

  • Message TTL:为federated queue所对应的upstream queue设置,相当于普通队列的x-message-ttl参数。默认为"none",表示消息没有超时时间。

  • HA policy:为federated queue所对应的upstream queue设置,相当于普通队列的x-ha-policy参数,默认为 "none",表示队列没有任何HA。

  • Queue:执行upstream queue的名称,默认情况下和federated queue同名

在配置完了Federation Upstreams之后,再定义一个Policy用于匹配交换器exchangeA,并且使用刚才的Federation Upstreams配置,如下图:当策略配置完毕之后,Federation将会在北京的集群上建立一个beijingExchange,并建立Federation link,如下图:

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

0