RabbitMQ用多路由,多队列来破除流控
发表于:2024-10-07 作者:千家信息网编辑
千家信息网最后更新 2024年10月07日,本篇内容主要讲解"RabbitMQ用多路由,多队列来破除流控",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"RabbitMQ用多路由,多队列来破除流控"吧!
千家信息网最后更新 2024年10月07日RabbitMQ用多路由,多队列来破除流控
本篇内容主要讲解"RabbitMQ用多路由,多队列来破除流控",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"RabbitMQ用多路由,多队列来破除流控"吧!
流控机制是我们在使用RabbitMQ最头疼的问题,一旦并发激增时,消费者消费队列消息就像滴水一样慢。
现在我们下单后,需要给通知中心发送消息,让通知中心通知服务商收取订单,并确认提供服务。
我们先给Order接口添加一个发送消息的方法。
public interface Order {public void makeOrder(Order order); public OrderSuccessResult getResult(Order order); public void postOrder(Order order);}
实现类实现该方法
@Data@AllArgsConstructor@NoArgsConstructor@ServiceOrderVersion(value = 1)@RequiredArgsConstructorpublic class ServiceOrder extends AbstractOrder {private Long id; @NonNull private String code; @NonNull private Store store; @NonNull private ProviderService service; @NonNull private Car car; @NonNull private Date serviceDate; @NonNull private String contact; @NonNull private String contactTel; private AppUser user; @NonNull private String content; private int status; private Date createDate; @Override public void makeOrder(Order order) { ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class); IdService idService = SpringBootUtil.getBean(IdService.class); ((ServiceOrder)order).setId(idService.genId()); ((ServiceOrder)order).setCode(getCodeInfo(idService)); AppUser loginAppUser = AppUserUtil.getLoginAppUser(); AppUser user = new AppUser(); user.setId(loginAppUser.getId()); user.setUsername(loginAppUser.getUsername()); ((ServiceOrder)order).setUser(user); ((ServiceOrder)order).setStatus(1); ((ServiceOrder)order).setCreateDate(new Date()); serviceOrderDao.save((ServiceOrder) order); }@Override public OrderSuccessResult getResult(Order order) { ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class); this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult(); return this.orderSuccessResult.getResult(order); }@Override public void postOrder(Order order) { MessageSender sender = SpringBootUtil.getBean(MessageSender.class); CompletableFuture.runAsync(() ->sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER, OwnerCarCenterMq.ROUTING_KEY_ORDER, order) ); }private String getCodeInfo(IdService idService) { String flow = String.valueOf(idService.genId()); flow = flow.substring(14,flow.length()); String pre = DateUtils.format(new Date(), DateUtils.pattern9); return pre + flow; }}
其中我们定义了这么一组队列名,交换机,和路由
public interface OwnerCarCenterMq {/** * 队列名 */ String ORDER_QUEUE = "order"; /** * 服务系统exchange名 */ String MQ_EXCHANGE_ORDER = "order.topic.exchange"; /** * 服务添加routing key */ String ROUTING_KEY_ORDER = "post.order";}
为了避免流控,我们定义了10个队列,并全部绑定到一个交换机上。
@Configurationpublic class RabbitmqConfig { @Bean public ListorderQueues() { List queues = new ArrayList<>(); for (int i = 1;i < 11;i++) { Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i); queues.add(queue); } return queues; } @Bean public TopicExchange orderExchange() { return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER); } @Bean public List bindingOrders() { List bindings = new ArrayList<>(); for (int i = 1;i < 11;i++) { Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange()) .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i); bindings.add(binding); } return bindings; }}
重新封装消息提供者,每次发送都随机选取一个路由来进行发送。
@Slf4j@Componentpublic class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired private RabbitTemplate rabbitTemplate; public void send(String exchange,String routingKey,Object content) {log.info("send content=" + content); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); ThreadLocalRandom random = ThreadLocalRandom.current(); this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content)); }/** * 确认后回调: * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.info("send ack fail, cause = " + cause); } else {log.info("send ack success"); } }/** * 失败后return回调: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); }/** * 对消息对象进行二进制序列化 * @param o * @return */ private byte[] serialize(Object o) { Kryo kryo = new Kryo(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); Output output = new Output(stream); kryo.writeObject(output, o); output.close(); return stream.toByteArray(); }}
我们可以看到在ServiceOrder里,我们是通过异步来进行发送到。
Controller如下
@Slf4j@RestControllerpublic class OrderController {private ThreadLocalorderFactory = new ThreadLocal<>(); private ThreadLocal orderService = new ThreadLocal<>(); @Autowired private OrderBean orderBean; @Transactional @SuppressWarnings("unchecked")@PostMapping("/makeeorder")public Result makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {log.info(orderStr); Order order = setOrderFactory(orderStr,type); orderService.get().makeOrder(order); orderService.get().postOrder(order); return Result.success(orderService.get().getResult(order)); }/** * 判断是哪一种类型的订单来获取哪一种类型的具体订单工厂 * @param orderStr * @return */ private Order setOrderFactory(String orderStr,String type) { Class> classType = orderBean.getOrderMap().get(type); Object order = JSONObject.parseObject(orderStr, classType);// if (orderStr.contains("service")) {// order = JSON.parseObject(orderStr, ServiceOrder.class);// }else if (orderStr.contains("product")) {// order = JSON.parseObject(orderStr, ProductOrder.class);// } Class> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory"); this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));// if (order instanceof ServiceOrder) {// this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));// }else if (order instanceof ProductOrder) {// this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));// } orderService.set(orderFactory.get().getOrder()); return (Order) order; }}
最后是在我们的通知中心模块接收消息,同时对这10个队列实行监控
@Slf4j@Component@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1, OwnerCarCenterMq.ORDER_QUEUE + "_" + 2, OwnerCarCenterMq.ORDER_QUEUE + "_" + 3, OwnerCarCenterMq.ORDER_QUEUE + "_" + 4, OwnerCarCenterMq.ORDER_QUEUE + "_" + 5, OwnerCarCenterMq.ORDER_QUEUE + "_" + 6, OwnerCarCenterMq.ORDER_QUEUE + "_" + 7, OwnerCarCenterMq.ORDER_QUEUE + "_" + 8, OwnerCarCenterMq.ORDER_QUEUE + "_" + 9, OwnerCarCenterMq.ORDER_QUEUE + "_" + 10})public class ServiceOrderConsummer {@Getter private QueueserviceOrders = new ConcurrentLinkedDeque<>(); @RabbitHandler public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {try {//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); ServiceOrder order = unSerialize(data); this.serviceOrders.add(order); log.info(String.valueOf(order)); } catch (IOException e) { e.printStackTrace(); //丢弃这条消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); log.info("receiver fail"); } }/** * 反序列化 * @param data * @return */ private ServiceOrder unSerialize(byte[] data) { Input input = null; try { Kryo kryo = new Kryo(); input = new Input(new ByteArrayInputStream(data)); return kryo.readObject(input,ServiceOrder.class); }finally { input.close(); } }}
项目启动后,我们可以看到rabbitmq的情况如下
现我们来对其进行压测,启动Jmeter,我们使用1000线程来进行压测测试。各配置如下
保存文件上传服务器,因为本人是华为云的服务器,故在服务器上进行压测,不进行远程压测
在服务器的jmeter的bin目录下输入
./jmeter -n -t model/rabbit.jmx -l log.jtl
这里-n为不启动图形界面,-t使用我们上传的配置文件,-l记录日志
压测结果如下
我们在压测过程中来看一下rabbitmq的UI界面
消费基本上是实时的,没有出现流控积压现象。
到此,相信大家对"RabbitMQ用多路由,多队列来破除流控"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
消息
服务
队列
服务器
路由
消费
方法
订单
内容
序列
文件
界面
类型
学习
配置
头疼
实用
更深
二进制
交换机
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
单位网络安全责任制
数据库技术与管理
兴仁网络安全系统好不好
删除数据库会影响什么
三级数据库技术经验
多功能软件开发服务商
东华医为是什么数据库
中药图像材数据库
网络技术工程师培训教程
连接苹果手表网络安全
社保转移人员查询失败服务器异常
数据库中的模块对象基本类型
网络安全宣传周的意义
网络技术公司的公司理念
相亲软件开发女生
工业网络安全防控
手游 安卓 苹果服务器
数据库access简答题
网络安全设备访问权限
常采用的网络安全技术
什么免费服务器最好
软件开发语言有哪几种
全球预警数据库
串口服务器 作用
安卓应用用什么数据库
网络安全助力工程师
dell服务器从光驱启动
丛台区昌盛软件开发工作室
数据库应用2020
软件企业服务器排名