RabbitMQ用多路由,多队列来破除流控
发表于:2025-01-30 作者:千家信息网编辑
千家信息网最后更新 2025年01月30日,本篇内容主要讲解"RabbitMQ用多路由,多队列来破除流控",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"RabbitMQ用多路由,多队列来破除流控"吧!
千家信息网最后更新 2025年01月30日RabbitMQ用多路由,多队列来破除流控
本篇内容主要讲解"RabbitMQ用多路由,多队列来破除流控",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"RabbitMQ用多路由,多队列来破除流控"吧!
流控机制是我们在使用RabbitMQ最头疼的问题,一旦并发激增时,消费者消费队列消息就像滴水一样慢。
现在我们下单后,需要给通知中心发送消息,让通知中心通知服务商收取订单,并确认提供服务。
我们先给Order接口添加一个发送消息的方法。
public interface Order {public void makeOrder(Order order); public OrderSuccessResult getResult(Order order); public void postOrder(Order order);}
实现类实现该方法
@Data@AllArgsConstructor@NoArgsConstructor@ServiceOrderVersion(value = 1)@RequiredArgsConstructorpublic class ServiceOrder extends AbstractOrder {private Long id; @NonNull private String code; @NonNull private Store store; @NonNull private ProviderService service; @NonNull private Car car; @NonNull private Date serviceDate; @NonNull private String contact; @NonNull private String contactTel; private AppUser user; @NonNull private String content; private int status; private Date createDate; @Override public void makeOrder(Order order) { ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class); IdService idService = SpringBootUtil.getBean(IdService.class); ((ServiceOrder)order).setId(idService.genId()); ((ServiceOrder)order).setCode(getCodeInfo(idService)); AppUser loginAppUser = AppUserUtil.getLoginAppUser(); AppUser user = new AppUser(); user.setId(loginAppUser.getId()); user.setUsername(loginAppUser.getUsername()); ((ServiceOrder)order).setUser(user); ((ServiceOrder)order).setStatus(1); ((ServiceOrder)order).setCreateDate(new Date()); serviceOrderDao.save((ServiceOrder) order); }@Override public OrderSuccessResult getResult(Order order) { ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class); this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult(); return this.orderSuccessResult.getResult(order); }@Override public void postOrder(Order order) { MessageSender sender = SpringBootUtil.getBean(MessageSender.class); CompletableFuture.runAsync(() ->sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER, OwnerCarCenterMq.ROUTING_KEY_ORDER, order) ); }private String getCodeInfo(IdService idService) { String flow = String.valueOf(idService.genId()); flow = flow.substring(14,flow.length()); String pre = DateUtils.format(new Date(), DateUtils.pattern9); return pre + flow; }}
其中我们定义了这么一组队列名,交换机,和路由
public interface OwnerCarCenterMq {/** * 队列名 */ String ORDER_QUEUE = "order"; /** * 服务系统exchange名 */ String MQ_EXCHANGE_ORDER = "order.topic.exchange"; /** * 服务添加routing key */ String ROUTING_KEY_ORDER = "post.order";}
为了避免流控,我们定义了10个队列,并全部绑定到一个交换机上。
@Configurationpublic class RabbitmqConfig { @Bean public ListorderQueues() { List queues = new ArrayList<>(); for (int i = 1;i < 11;i++) { Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i); queues.add(queue); } return queues; } @Bean public TopicExchange orderExchange() { return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER); } @Bean public List bindingOrders() { List bindings = new ArrayList<>(); for (int i = 1;i < 11;i++) { Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange()) .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i); bindings.add(binding); } return bindings; }}
重新封装消息提供者,每次发送都随机选取一个路由来进行发送。
@Slf4j@Componentpublic class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired private RabbitTemplate rabbitTemplate; public void send(String exchange,String routingKey,Object content) {log.info("send content=" + content); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); ThreadLocalRandom random = ThreadLocalRandom.current(); this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content)); }/** * 确认后回调: * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.info("send ack fail, cause = " + cause); } else {log.info("send ack success"); } }/** * 失败后return回调: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); }/** * 对消息对象进行二进制序列化 * @param o * @return */ private byte[] serialize(Object o) { Kryo kryo = new Kryo(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); Output output = new Output(stream); kryo.writeObject(output, o); output.close(); return stream.toByteArray(); }}
我们可以看到在ServiceOrder里,我们是通过异步来进行发送到。
Controller如下
@Slf4j@RestControllerpublic class OrderController {private ThreadLocalorderFactory = new ThreadLocal<>(); private ThreadLocal orderService = new ThreadLocal<>(); @Autowired private OrderBean orderBean; @Transactional @SuppressWarnings("unchecked")@PostMapping("/makeeorder")public Result makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {log.info(orderStr); Order order = setOrderFactory(orderStr,type); orderService.get().makeOrder(order); orderService.get().postOrder(order); return Result.success(orderService.get().getResult(order)); }/** * 判断是哪一种类型的订单来获取哪一种类型的具体订单工厂 * @param orderStr * @return */ private Order setOrderFactory(String orderStr,String type) { Class> classType = orderBean.getOrderMap().get(type); Object order = JSONObject.parseObject(orderStr, classType);// if (orderStr.contains("service")) {// order = JSON.parseObject(orderStr, ServiceOrder.class);// }else if (orderStr.contains("product")) {// order = JSON.parseObject(orderStr, ProductOrder.class);// } Class> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory"); this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));// if (order instanceof ServiceOrder) {// this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));// }else if (order instanceof ProductOrder) {// this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));// } orderService.set(orderFactory.get().getOrder()); return (Order) order; }}
最后是在我们的通知中心模块接收消息,同时对这10个队列实行监控
@Slf4j@Component@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1, OwnerCarCenterMq.ORDER_QUEUE + "_" + 2, OwnerCarCenterMq.ORDER_QUEUE + "_" + 3, OwnerCarCenterMq.ORDER_QUEUE + "_" + 4, OwnerCarCenterMq.ORDER_QUEUE + "_" + 5, OwnerCarCenterMq.ORDER_QUEUE + "_" + 6, OwnerCarCenterMq.ORDER_QUEUE + "_" + 7, OwnerCarCenterMq.ORDER_QUEUE + "_" + 8, OwnerCarCenterMq.ORDER_QUEUE + "_" + 9, OwnerCarCenterMq.ORDER_QUEUE + "_" + 10})public class ServiceOrderConsummer {@Getter private QueueserviceOrders = new ConcurrentLinkedDeque<>(); @RabbitHandler public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {try {//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); ServiceOrder order = unSerialize(data); this.serviceOrders.add(order); log.info(String.valueOf(order)); } catch (IOException e) { e.printStackTrace(); //丢弃这条消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); log.info("receiver fail"); } }/** * 反序列化 * @param data * @return */ private ServiceOrder unSerialize(byte[] data) { Input input = null; try { Kryo kryo = new Kryo(); input = new Input(new ByteArrayInputStream(data)); return kryo.readObject(input,ServiceOrder.class); }finally { input.close(); } }}
项目启动后,我们可以看到rabbitmq的情况如下
现我们来对其进行压测,启动Jmeter,我们使用1000线程来进行压测测试。各配置如下
保存文件上传服务器,因为本人是华为云的服务器,故在服务器上进行压测,不进行远程压测
在服务器的jmeter的bin目录下输入
./jmeter -n -t model/rabbit.jmx -l log.jtl
这里-n为不启动图形界面,-t使用我们上传的配置文件,-l记录日志
压测结果如下
我们在压测过程中来看一下rabbitmq的UI界面
消费基本上是实时的,没有出现流控积压现象。
到此,相信大家对"RabbitMQ用多路由,多队列来破除流控"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
消息
服务
队列
服务器
路由
消费
方法
订单
内容
序列
文件
界面
类型
学习
配置
头疼
实用
更深
二进制
交换机
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据可视化的数据库
用er方法建立数据库的概念模型
服务器编号手机版
疾控网络安全培训
在数据库中的select语句
重庆服务器报废公司有哪些
甘肃政法网络安全与执法专业
河北教育网络安全生态答案
网络安全取什么题目
城市宿客互联网科技公司
pubgm如何换服务器
金蝶连接云服务器超时怎么办
高新技术企业计算机网络安全
万师傅软件开发成本
电梯服务器改梯速
pc艾尔登法环无法登录游戏服务器
网络安全和信息化职业有哪些
打开sybasedb数据库
数据库性别属于什么数据类型
为什么辐射4无法登录到服务器
软件开发公司的部门设置
软件开发管理学题库
通讯软件开发流程
刑警网络安全
bdai代表的网络安全事件
速成网络技术
网络安全部队雇佣兵主要来源
如何导入网站数据库
软件开发岗位责任制
区块链部署一般用多少服务器