千家信息网

.net中rabbitmq如何使用

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,小编给大家分享一下.net中rabbitmq如何使用,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!什么是rabbitMQR
千家信息网最后更新 2025年02月02日.net中rabbitmq如何使用

小编给大家分享一下.net中rabbitmq如何使用,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

    什么是rabbitMQ

    RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,
    能够实现异步消息处理

    RabbitMQ是一个消息代理:它接受和转发消息。
    你可以把它想象成一个邮局:当你把你想要发布的邮件放在邮箱中时,你可以确定邮差先生最终将邮件发送给你的收件人。在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
    RabbitMQ和邮局的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块

    优点:异步消息处理
    业务解耦(下订单操作:扣减库存、生成订单、发红包、发短信),
    将下单操作主流程:扣减库存、生成订单
    然后通过MQ消息队列完成通知,发红包、发短信
    错峰流控 (通知量 消息量 订单量大的情况实现MQ消息队列机制,淡季情况下访问量会少)

    灵活的路由(Flexible Routing)
    在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

    RabbitMQ网站端口号:15672
    程序里面实现的端口为:5672

    Rabbitmq的关键术语

      1、绑定器(Binding):根据路由规则绑定Queue和Exchange。

      2、路由键(Routing Key):Exchange根据关键字进行消息投递。

      3、交换机(Exchange):指定消息按照路由规则进入指定队列

      4、消息队列(Queue):消息的存储载体

      5、生产者(Producer):消息发布者。

      6、消费者(Consumer):消息接收者。

    Rabbitmq的运作

      从下图可以看出,发布者(Publisher)是把消息先发送到交换器(Exchange),再从交换器发送到指定队列(Queue),而先前已经声明交换器与队列绑定关系,最后消费者(Customer)通过订阅或者主动取指定队列消息进行消费。

      那么刚刚提到的订阅和主动取可以理解成,推(被动),拉(主动)。

      推,只要队列增加一条消息,就会通知空闲的消费者进行消费。(我不找你,就等你找我,观察者模式)

      拉,不会通知消费者,而是由消费者主动轮循或者定时去取队列消息。(我需要才去找你)

      使用场景我举个例子,假如有两套系统 订单系统和发货系统,从订单系统发起发货消息指令,为了及时发货,发货系统需要订阅队列,只要有指令就处理。

      可是程序偶尔会出异常,例如网络或者DB超时了,把消息丢到失败队列,这个时候需要重发机制。但是我又不想while(IsPostSuccess == True),因为只要出异常了,会在某个时间段内都会有异常,这样的重试是没意义的。

      这个时候不需要及时的去处理消息,有个JOB定时或者每隔几分钟(失败次数*间隔分钟)去取失败队列消息,进行重发。

    Publish(发布)的封装

      步骤:初始化链接->声明交换器->声明队列->换机器与队列绑定->发布消息。注意的是,我将Model存到了ConcurrentDictionary里面,因为声明与绑定是非常耗时的,其次,往重复的队列发送消息是不需要重新初始化的。

    ///           /// 交换器声明          ///           ///           /// 交换器          /// 交换器类型:          /// 1、Direct Exchange - 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全          /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 "dog",则只有被标记为"dog"的          /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog         /// 2、Fanout Exchange - 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都         /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout         /// 交换机转发消息是最快的。         /// 3、Topic Exchange - 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号"#"匹配一个或多         /// 个词,符号"*"匹配不多不少一个词。因此"audit.#"能够匹配到"audit.irs.corporate",但是"audit.*"         /// 只会匹配到"audit.irs"。         /// 持久化         /// 自动删除         /// 参数         private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,             bool durable = true,             bool autoDelete = false, IDictionary arguments = null)         {             exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();             iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);         }          ///          /// 队列声明         ///          ///          /// 队列         /// 持久化         /// 排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,         /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可         /// 以同时访问同一个连接创建的排他队列的。其二,"首次",如果一个连接已经声明了一个排他队列,其他连         /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者         /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。         /// 自动删除         /// 参数         private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,            bool autoDelete = false, IDictionary arguments = null)         {             queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();             channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);        }         ///         /// 获取Model        ///        /// 交换机名称        /// 队列名称         ///          /// 是否持久化        ///          private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)        {          return ModelDic.GetOrAdd(queue, key =>            {                 var model = _conn.CreateModel();                ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);                 QueueDeclare(model, queue, isProperties);                model.QueueBind(queue, exchange, routingKey);                ModelDic[queue] = model;                 return model;            });        }        ///        /// 发布消息         ///        /// 路由键         /// 队列信息        /// 交换机名称        /// 队列名       /// 是否持久化        ///          public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)         {           var channel = GetModel(exchange, queue, routingKey, isProperties);             try             {               channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());             }            catch (Exception ex)            {               throw ex.GetInnestException();           }       }

      下次是本机测试的发布速度截图:

      4.2W/S属于稳定速度,把反序列化(ToJson)会稍微快一些。

    Subscribe(订阅)的封装

      发布的时候是申明了交换器和队列并绑定,然而订阅的时候只需要声明队列就可。从下面代码能看到,捕获到异常的时候,会把消息送到自定义的"死信队列"里,由另外的JOB进行定时重发,因此,finally是应答成功的。

    ///         /// 获取Model        ///         /// 队列名称        ///         ///         private static IModel GetModel(string queue, bool isProperties = false)        {            return ModelDic.GetOrAdd(queue, value =>             {                 var model = _conn.CreateModel();                 QueueDeclare(model, queue, isProperties);                 //每次消费的消息数                 model.BasicQos(0, 1, false);                 ModelDic[queue] = model;                 return model;             });        }            ///         /// 接收消息        ///         ///         /// 队列名称        ///         /// 消费处理        ///         public void Subscribe(string queue, bool isProperties, Action handler, bool isDeadLetter) where T : class        {            //队列声明            var channel = GetModel(queue, isProperties);            var consumer = new EventingBasicConsumer(channel);            consumer.Received += (model, ea) =>            {                var body = ea.Body;                var msgStr = body.DeserializeUtf8();                var msg = msgStr.FromJson();                try                {                    handler(msg);                }                catch (Exception ex)                {                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");                    if (!isDeadLetter)                        PublishToDead(queue, msgStr, ex);                }                finally                {                    channel.BasicAck(ea.DeliveryTag, false);                }            };            channel.BasicConsume(queue, false, consumer);        }

      下次是本机测试的发布速度截图:

      快的时候有1.9K/S,慢的时候也有1.7K/S

    Pull(拉)的封装

      直接上代码:

     ///         /// 获取消息        ///         ///         ///         ///         ///         /// 消费处理        private void Poll(string exchange, string queue, string routingKey, Action handler) where T : class        {            var channel = GetModel(exchange, queue, routingKey);            var result = channel.BasicGet(queue, false);            if (result.IsNull())                return;            var msg = result.Body.DeserializeUtf8().FromJson();            try            {                handler(msg);            }            catch (Exception ex)            {                ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");            }            finally            {                channel.BasicAck(result.DeliveryTag, false);            }        }

      快的时候有1.8K/s,稳定是1.5K/S

    Rpc(远程调用)的封装

      首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:

      1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常

      2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。

     ///         /// RPC客户端        ///         ///         ///         ///         ///         ///         ///         public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)        {            var channel = GetModel(exchange, queue, routingKey, isProperties);            var consumer = new QueueingBasicConsumer(channel);            channel.BasicConsume(queue, true, consumer);            try            {                var correlationId = Guid.NewGuid().ToString();                var basicProperties = channel.CreateBasicProperties();                basicProperties.ReplyTo = queue;                basicProperties.CorrelationId = correlationId;                channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());                var sw = Stopwatch.StartNew();                while (true)                {                    var ea = consumer.Queue.Dequeue();                    if (ea.BasicProperties.CorrelationId == correlationId)                    {                        return ea.Body.DeserializeUtf8();                    }                    if (sw.ElapsedMilliseconds > 30000)                        throw new Exception("等待响应超时");                }            }            catch (Exception ex)            {                throw ex.GetInnestException();            }        }            ///         /// RPC服务端        ///         ///         ///         ///         ///         ///         ///         public void RpcService(string exchange, string queue, bool isProperties, Func handler, bool isDeadLetter)        {            //队列声明            var channel = GetModel(queue, isProperties);            var consumer = new EventingBasicConsumer(channel);            consumer.Received += (model, ea) =>            {                var body = ea.Body;                var msgStr = body.DeserializeUtf8();                var msg = msgStr.FromJson();                var props = ea.BasicProperties;                var replyProps = channel.CreateBasicProperties();                replyProps.CorrelationId = props.CorrelationId;                try                {                    msg = handler(msg);                }                catch (Exception ex)                {                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");                }                finally                {                    channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());                    channel.BasicAck(ea.DeliveryTag, false);                }            };            channel.BasicConsume(queue, false, consumer);        }

      可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。

    以上是".net中rabbitmq如何使用"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

    队列 消息 路由 消费 处理 交换器 时候 交换机 消费者 订单 名称 系统 订阅 主动 封装 功能 客户 客户端 机制 模式 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 镇江软件开发售后服务 不同数据库如何同步数据 安仁软件开发招生 福州鑫诺软件开发有限公司 软件开发部门 年度规划 违反网络安全法第三十五条 红海服务器扫码 网络安全威胁新趋势调查报告 华为云弹性云服务器高可用性 linux端口转发多个服务器 大学生网络安全法律法规心得体会 华为服务器管理口网关 芯片设计公司服务器配置 广西区网络安全总队长 邱县java软件开发 注册说服务器错误什么意思 动态服务器页面是啥意思 存储服务器30T有多大功率 怎么用筛选取数据库 网络技术处理员 软件开发人员属于什么分类 网络安全节是几月几日 购买软件开发怎么做分录 软件维护与软件开发 软件开发的技术手段是 苏州蓝宇软件用的什么数据库 自行申请固定ip架设服务器好吗 保护未成年人网络安全的主题题目 事业单位网络技术工程考什么 基础网络安全是什么意思
    0