千家信息网

REST微服务中怎么利用消息中间件实现分布式事务

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,这期内容当中小编将会给大家带来有关REST微服务中怎么利用消息中间件实现分布式事务,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。我们还是使用之前的实例,一个订票系统
千家信息网最后更新 2025年02月06日REST微服务中怎么利用消息中间件实现分布式事务

这期内容当中小编将会给大家带来有关REST微服务中怎么利用消息中间件实现分布式事务,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

我们还是使用之前的实例,一个订票系统的购票逻辑:

这篇教程的源代码可以从github上获取。

使用消息中间件实现分布式事务,也就是使用事件驱动实现。在这种方式下,Order服务不会直接调用User服务,而是往MQ上发一个消息,说明有新订单需要扣费;User服务会响应这个消息,并处理,处理完成后再发一个消息,说明有新订单需要转移票;然后就会有Ticket服务来处理。而每个服务都是在一个事务里面处理读消息、处理业务、写消息的事情。大致流程如下:

在这种方式下,订单的处理是异步的,用户发起一个订单的时候,只是生成一个正在处理的订单,然后通过消息中间件一步步的进行扣费、交票、完成订单等逻辑。而每一个服务中相应的操作,基本都是:

  1. 从一个队列中读取消息

  2. 操作相应数据库操作

  3. 往下一个队列中发送消息

也就是说,需要在这个方法中需要操作数据库和MQ两个资源,这正好是上一篇文章中介绍Spring内部事务和外部事务时使用的实例中的场景。下面就是大致的代码:

12345678
@JmsListener(destination = "order:new", containerFactory = "orderFactory")@Transactionalpublic void create(OrderDTO orderDTO) {Order order = new Order(orderDTO);order.setStatus("PENDING");orderRepository.save(order);jmsTemplate.convertAndSend("order:need_to_pay", order);}

它监听MQ的"order:new"队列,处理订单,往"order:need_to_pay"发送一个消息。然后用户服务就会接收这个消息,触发扣费流程。

在这个地方,我们可以使用JTA事务,来使用两阶段提交来实现两个资源的共同提交,但是这会影响系统的性能。而且,还需要使用的消息中间件实现了XA的规范,提供两阶段提交的功能。
这里也可以使用本地事务,这时,每个事物都会有一个JMS的Session,并使用事务。如此一来,就存在一个数据库的事物和一个JMS的事务,两个事务是相互独立并依次提交的。这样,就有可能在极少数情况下出错,但是也能采取一些错误来尽量解决。我们对上面的事务处理展开(伪代码,只是为了说明处理过程),来看看出错的情况以及该如何处理:

1234567891011
jmsTransaction.begin(); // get transactions from jms sessiondbTransaction.begin(); // get transactions from JDBC connectiontry {orderRepository.save(order);jmsTemplate.convertAndSend("order:need_to_pay", order);dbTransaction.commit();jmsTransaction.commit();} catch(Exception e) {dbTransaction.rollback();jmsTransaction.rollback();}

在上面的方法中,只要发生了错误,MQ消息的消费就算失败,MQ的监听器就会重新触发一次这个方法。
这时,如果错误发生在:

  1. 数据提交时或之前。这时,整个数据库的操作都会被重置(也可能就根本还没更新),重试的时候不需要考虑重复提交的问题,因为之前的提交都已经被回滚。

  2. 数据库提交成功,但是JMS提交失败。这时就需要防止重复提交来避免数据库的重复操作。

我们可以采用之前说过的token方式,在调用这个方法前,生成一个唯一的token。这里使用Java的UUID生成一个ID作为token。(如果这里的重复调用只是在这个服务内部重新触发,就不需要考虑分布式系统的全局一致性ID的问题。这需要根据实际情况来判断用什么样的UUID生成方式)所以,Controller里面接受购票请求如下:

1234567
@PostMapping(value = "/")@Transactionalpublic void create(@RequestBody OrderDTO orderDTO) {String uid = UUID.randomUUID().toString();orderDTO.setToken(uid);jmsTemplate.convertAndSend("order:new", orderDTO);}

然后在Service里面监听这个队列,处理购票:

123456789101112131415
@JmsListener(destination = "order:new", containerFactory = "orderFactory")@Transactionalpublic void create(OrderDTO orderDTO) {if (!this.processedUIDs.contains(orderDTO.getToken())) {Order order = new Order(orderDTO);order.setStatus("PENDING");orderRepository.save(order);orderDTO.setStatus(order.getStatus());orderDTO.setId(order.getId());} else {LOG.info("Duplicate jms message:{}", orderDTO);}jmsTemplate.convertAndSend("order:need_to_pay", orderDTO);processedUIDs.add(orderDTO.getToken());}

简单来说,解决办法就是,如果是重复触发的,就略过数据库相关的处理,直接往MQ的目标队列发送需要的数据。使得整个流程能够往下走。

刚才说的是在一个服务内出错的情况,还有一种错误情况是,订单服务和用户服务已经处理完订单创建和扣费的操作,然后到了Ticket服务的时候,却发现没有票了。虽然我们可以通过合理的设计业务逻辑来避免这种问题,例如,在操作之前先检查用户余额,检查并锁票,然后进行操作数据的事情。但是,在有些情况下,很难通过业务流程的设计来完全避免这种问题。如果出现了这种的问题,我们也可以通过撤销的流程来实现,业务流程如下:

在上面的解决方案中,使用JDK的UUID类生成一个ID,实际上这个ID只是在当前的JVM内,才能够保证是唯一的。其次,在JMS的标准中,没有规定一个消息的Listener在读取一个消息失败后,重新读取的问题。在微服务环境中,如果一个应用部署了多个实例,那个这个消息有可能会被另一个实例读到。所以在上面的方案中,使用JVM内的唯一ID放在消息的内容中,它有可能被任意一个实例处理,处理失败后,又有可能被另一个实例处理。这就会出问题。所以我们需要一个分布式环境下的生成唯一ID的解决办法。例如,先获得JVM的唯一ID以后,再加上IP+端口等信息。而且,对已经处理过的ID的缓存,也需要存在分布式环境中。

所以,我们完全可以不使用两阶段提交,就实现微服务架构下的分布式事务。使用这种方式,它的优点是:

  1. 实现简单。结合Spring的事务,几乎不用写额外的事务相关的代码,就能够实现。我们只需要更好的服务的拆分和设计业务流程。

  2. 系统吞吐量高。因为数据库或MQ不会被长期的锁住,可以并发的处理更多的事务。

  3. 容错性好。各个服务之间通过MQ来触发协调,即使在处理一个任务的时候有一个服务停了,消息还会一直保持,直到服务起来开始监听,然后继续触发这个任务。

当然这种方式也有一些缺点,最大的问题就是异步处理的问题。用户发出一个请求后,处理该业务的服务只是简单处理,往MQ发送消息开始处理流程,然后就返回了。这时候这个任务还在处理。虽然有时候我们可以通过等的方式,等待最终处理完成的消息,然后在返回给用户。但是这样又得考虑响应时间、超时、各种错误等情况。

有些人会觉得这种方式使得开发和调试都变得复杂,在我看来,恰恰相反,这使得开发和调试都简单了。首先,根据微服务架构的设计原则,就是每个服务只负责一个功能模块;再者,根据面向对象的设计原则,一个方法只做一件事情。如果我们能够合理的拆分服务,和每一步的处理方法,这正是一个好的设计。在维护的时候,每个方法、每个步骤做什么事情,都很清楚。

说到调试,我的原则是,你应该通过单元测试来发现和解决问题,而不是调试。以上面的购票流程为例,每一个服务,通过MQ触发一个方法的时候,它的参数应该是什么、状态是什么都应该是明确的,这个方法执行完成后,会产生什么新的数据,状态会更新成什么,都应该是明确的。而这些都可以通过单元测试来很好的测试。如果你的复杂流程中的每一个都能通过单元测试进行完善的测试,那么这些方法串联到一起,不但能够很好的工作,也能应付各种异常的情况。

上述就是小编为大家分享的REST微服务中怎么利用消息中间件实现分布式事务了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

处理 服务 消息 事务 数据 方法 流程 问题 订单 情况 数据库 方式 分布式 业务 实例 时候 用户 生成 设计 中间件 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 浙江宇视科技软件开发笔试 建行深圳软件开发 如何查2个表里的重复数据库 数据库怎样新建查询 网络安全隐患突出 企业如何保持数据库安全 吉林省博宇网络技术有限公司 苏州戴尔服务器家用版 重庆软件开发有用吗 软件开发综合应用结课报告 软件开发技术实训总结 图书数据库采购存在的问题 网络安全成本 如何开展网络安全周活动总结 网络安全宣传观影的心得 干货网络安全等级保护备案指南 重庆采购管理软件开发公司 100台服务器管理 关于网络安全方面的专业 信息与网络安全答案 在三甲医院软件开发 武神坛一个服务器代表队怎么选 网络安全靠大家的策划案 大兴区特色软件开发调试 wps中会议论文数据库是哪个 数据库可视化版本管理 网络安全教育主题班会背景 win10云服务器安装教程 电脑连接网络安全类型选择什么 汽车自动空调软件开发流程
0