千家信息网

如何使用Spring中的重试机制

发表于:2024-11-18 作者:千家信息网编辑
千家信息网最后更新 2024年11月18日,这篇文章主要介绍"如何使用Spring中的重试机制",在日常操作中,相信很多人在如何使用Spring中的重试机制问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"如何使用S
千家信息网最后更新 2024年11月18日如何使用Spring中的重试机制

这篇文章主要介绍"如何使用Spring中的重试机制",在日常操作中,相信很多人在如何使用Spring中的重试机制问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"如何使用Spring中的重试机制"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

背景

重试,其实我们其实很多时候都需要的,为了保证容错性,可用性,一致性等。一般用来应对外部系统的一些不可预料的返回、异常等,特别是网络延迟,中断等情况。还有在现在流行的微服务治理框架中,通常都有自己的重试与超时配置,比如dubbo可以设置retries=1,timeout=500调用失败只重试1次,超过500ms调用仍未返回则调用失败。

如果我们要做重试,要为特定的某个操作做重试功能,则要硬编码,大概逻辑基本都是写个循环,根据返回或异常,计数失败次数,然后设定退出条件。这样做,且不说每个操作都要写这种类似的代码,而且重试逻辑和业务逻辑混在一起,给维护和扩展带来了麻烦。

从面向对象的角度来看,我们应该把重试的代码独立出来。

使用介绍

基本使用

先举个例子:

@Configuration  @EnableRetry  public class Application {      @Bean      public RetryService retryService(){          return new RetryService();      }      public static void main(String[] args) throws Exception{          ApplicationContext applicationContext = new AnnotationConfigApplicationContext("springretry");          RetryService service1 = applicationContext.getBean("service", RetryService.class);          service1.service();      }  }  @Service("service")  public class RetryService {      @Retryable(value = IllegalAccessException.class, maxAttempts = 5,              backoff= @Backoff(value = 1500, maxDelay = 100000, multiplier = 1.2))      public void service() throws IllegalAccessException {          System.out.println("service method...");          throw new IllegalAccessException("manual exception");      }      @Recover      public void recover(IllegalAccessException e){          System.out.println("service retry after Recover => " + e.getMessage());      }  }

@EnableRetry - 表示开启重试机制

@Retryable - 表示这个方法需要重试,它有很丰富的参数,可以满足你对重试的需求

@Backoff - 表示重试中的退避策略

@Recover - 兜底方法,即多次重试后还是失败就会执行这个方法

Spring-Retry 的功能丰富在于其重试策略和退避策略,还有兜底,监听器等操作。

然后每个注解里面的参数,都是很简单的,大家看一下就知道是什么意思,怎么用了,我就不多讲了。关注公众号Java技术栈,在后台回复:spring,可以获取我整理的 Spring 系列教程,非常齐全。

重试策略

看一下Spring Retry自带的一些重试策略,主要是用来判断当方法调用异常时是否需要重试。(下文原理部分会深入分析实现)

  • SimpleRetryPolicy 默认最多重试3次

  • TimeoutRetryPolicy 默认在1秒内失败都会重试

  • ExpressionRetryPolicy 符合表达式就会重试

  • CircuitBreakerRetryPolicy 增加了熔断的机制,如果不在熔断状态,则允许重试

  • CompositeRetryPolicy 可以组合多个重试策略

  • NeverRetryPolicy 从不重试(也是一种重试策略哈)

  • AlwaysRetryPolicy 总是重试

….等等

退避策略

看一下退避策略,退避是指怎么去做下一次的重试,在这里其实就是等待多长时间。(下文原理部分会深入分析实现)

  • FixedBackOffPolicy 默认固定延迟1秒后执行下一次重试

  • ExponentialBackOffPolicy 指数递增延迟执行重试,默认初始0.1秒,系数是2,那么下次延迟0.2秒,再下次就是延迟0.4秒,如此类推,最大30秒。

  • ExponentialRandomBackOffPolicy 在上面那个策略上增加随机性

  • UniformRandomBackOffPolicy 这个跟上面的区别就是,上面的延迟会不停递增,这个只会在固定的区间随机

  • StatelessBackOffPolicy 这个说明是无状态的,所谓无状态就是对上次的退避无感知,从它下面的子类也能看出来

原理

原理部分我想分开两部分来讲,一是重试机制的切入点,即它是如何使得你的代码实现重试功能的;二是重试机制的详细,包括重试的逻辑以及重试策略和退避策略的实现。另外,关注公众号Java技术栈,在后台回复:面试,可以获取我整理的 Spring 系列面试题和答案,非常齐全。

切入点

@EnableRetry

@Target(ElementType.TYPE)  @Retention(RetentionPolicy.RUNTIME)  @EnableAspectJAutoProxy(proxyTargetClass = false)  @Import(RetryConfiguration.class)  @Documented  public @interface EnableRetry {   /**    * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed    * to standard Java interface-based proxies. The default is {@code false}.    *    * @return whether to proxy or not to proxy the class    */   boolean proxyTargetClass() default false;  }

我们可以看到@EnableAspectJAutoProxy(proxyTargetClass = false)这个并不陌生,就是打开Spring AOP功能。

重点看看@Import(RetryConfiguration.class)@Import相当于注册这个Bean

我们看看这个RetryConfiguration是个什么东西:

它是一个AbstractPointcutAdvisor,它有一个pointcut和一个advice。我们知道,在IOC过程中会根据PointcutAdvisor类来对Bean进行Pointcut的过滤,然后生成对应的AOP代理类,用advice来加强处理。

看看RetryConfiguration的初始化:

@PostConstruct  public void init() {      Set> retryableAnnotationTypes = new LinkedHashSet>(1);      retryableAnnotationTypes.add(Retryable.class);      //创建pointcut      this.pointcut = buildPointcut(retryableAnnotationTypes);      //创建advice      this.advice = buildAdvice();      if (this.advice instanceof BeanFactoryAware) {          ((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);      }  }
protected Pointcut buildPointcut(Set> retryAnnotationTypes) {      ComposablePointcut result = null;      for (Class retryAnnotationType : retryAnnotationTypes) {          Pointcut filter = new AnnotationClassOrMethodPointcut(retryAnnotationType);          if (result == null) {              result = new ComposablePointcut(filter);          }          else {              result.union(filter);          }      }      return result;  }

上面代码用到了AnnotationClassOrMethodPointcut,其实它最终还是用到了AnnotationMethodMatcher来根据注解进行切入点的过滤。这里就是@Retryable注解了。

//创建advice对象,即拦截器  protected Advice buildAdvice() {      //下面关注这个对象   AnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor();   if (retryContextCache != null) {    interceptor.setRetryContextCache(retryContextCache);   }   if (retryListeners != null) {    interceptor.setListeners(retryListeners);   }   if (methodArgumentsKeyGenerator != null) {    interceptor.setKeyGenerator(methodArgumentsKeyGenerator);   }   if (newMethodArgumentsIdentifier != null) {    interceptor.setNewItemIdentifier(newMethodArgumentsIdentifier);   }  if (sleeper != null) {    interceptor.setSleeper(sleeper);  }   return interceptor;  }

AnnotationAwareRetryOperationsInterceptor

继承关系

可以看出AnnotationAwareRetryOperationsInterceptor是一个MethodInterceptor,在创建AOP代理过程中如果目标方法符合pointcut的规则,它就会加到interceptor列表中,然后做增强,我们看看invoke方法做了什么增强。

@Override  public Object invoke(MethodInvocation invocation) throws Throwable {      MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());      if (delegate != null) {          return delegate.invoke(invocation);      }      else {          return invocation.proceed();      }  }

这里用到了委托,主要是需要根据配置委托给具体"有状态"的interceptor还是"无状态"的interceptor。

private MethodInterceptor getDelegate(Object target, Method method) {      if (!this.delegates.containsKey(target) || !this.delegates.get(target).containsKey(method)) {          synchronized (this.delegates) {              if (!this.delegates.containsKey(target)) {                 this.delegates.put(target, new HashMap());              }              Map delegatesForTarget = this.delegates.get(target);              if (!delegatesForTarget.containsKey(method)) {                  Retryable retryable = AnnotationUtils.findAnnotation(method, Retryable.class);                  if (retryable == null) {                      retryable = AnnotationUtils.findAnnotation(method.getDeclaringClass(), Retryable.class);                  }                  if (retryable == null) {                      retryable = findAnnotationOnTarget(target, method);                  }                  if (retryable == null) {                      return delegatesForTarget.put(method, null);                  }                  MethodInterceptor delegate;                  //支持自定义MethodInterceptor,而且优先级最高                  if (StringUtils.hasText(retryable.interceptor())) {                      delegate = this.beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class);                  }                  else if (retryable.stateful()) {                      //得到"有状态"的interceptor                      delegate = getStatefulInterceptor(target, method, retryable);                  }                  else {                      //得到"无状态"的interceptor                      delegate = getStatelessInterceptor(target, method, retryable);                  }                  delegatesForTarget.put(method, delegate);              }          }      }      return this.delegates.get(target).get(method);  }

getStatefulInterceptor和getStatelessInterceptor都是差不多,我们先看看比较简单的getStatelessInterceptor。

private MethodInterceptor getStatelessInterceptor(Object target, Method method, Retryable retryable) {      //生成一个RetryTemplate      RetryTemplate template = createTemplate(retryable.listeners());      //生成retryPolicy      template.setRetryPolicy(getRetryPolicy(retryable));      //生成backoffPolicy      template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));      return RetryInterceptorBuilder.stateless()              .retryOperations(template)              .label(retryable.label())              .recoverer(getRecoverer(target, method))              .build();  }

具体生成retryPolicy和backoffPolicy的规则,我们等下再回头来看。

RetryInterceptorBuilder其实就是为了生成RetryOperationsInterceptor。RetryOperationsInterceptor也是一个MethodInterceptor,我们来看看它的invoke方法。

分享资料:Spring Boot 学习笔记太全了!

public Object invoke(final MethodInvocation invocation) throws Throwable {      String name;      if (StringUtils.hasText(label)) {          name = label;      } else {          name = invocation.getMethod().toGenericString();      }      final String label = name;      //定义了一个RetryCallback,其实看它的doWithRetry方法,调用了invocation的proceed()方法,是不是有点眼熟,这就是AOP的拦截链调用,如果没有拦截链,那就是对原来方法的调用。      RetryCallback retryCallback = new RetryCallback() {          public Object doWithRetry(RetryContext context) throws Exception {              context.setAttribute(RetryContext.NAME, label);               /*               * If we don't copy the invocation carefully it won't keep a reference to               * the other interceptors in the chain. We don't have a choice here but to               * specialise to ReflectiveMethodInvocation (but how often would another               * implementation come along?).               */              if (invocation instanceof ProxyMethodInvocation) {                  try {                      return ((ProxyMethodInvocation) invocation).invocableClone().proceed();                  }                  catch (Exception e) {                      throw e;                  }                  catch (Error e) {                      throw e;                  }                  catch (Throwable e) {                      throw new IllegalStateException(e);                  }              }              else {                  throw new IllegalStateException(                          "MethodInvocation of the wrong type detected - this should not happen with Spring AOP, " +                                  "so please raise an issue if you see this exception");              }          }       };      if (recoverer != null) {          ItemRecovererCallback recoveryCallback = new ItemRecovererCallback(                  invocation.getArguments(), recoverer);          return this.retryOperations.execute(retryCallback, recoveryCallback);      }      //最终还是进入到retryOperations的execute方法,这个retryOperations就是在之前的builder set进来的RetryTemplate。      return this.retryOperations.execute(retryCallback);  }

无论是RetryOperationsInterceptor还是StatefulRetryOperationsInterceptor,最终的拦截处理逻辑还是调用到RetryTemplate的execute方法,从名字也看出来,RetryTemplate作为一个模板类,里面包含了重试统一逻辑。

不过,我看这个RetryTemplate并不是很"模板",因为它没有很多可以扩展的地方。推荐阅读:最新 Spring 系列教程。

重试逻辑及策略实现

上面介绍了Spring Retry利用了AOP代理使重试机制对业务代码进行"入侵"。下面我们继续看看重试的逻辑做了什么。RetryTemplate的doExecute方法。

protected  T doExecute(RetryCallback retryCallback,     RecoveryCallback recoveryCallback, RetryState state)     throws E, ExhaustedRetryException {      RetryPolicy retryPolicy = this.retryPolicy;      BackOffPolicy backOffPolicy = this.backOffPolicy;      //新建一个RetryContext来保存本轮重试的上下文      RetryContext context = open(retryPolicy, state);      if (this.logger.isTraceEnabled()) {          this.logger.trace("RetryContext retrieved: " + context);      }      // Make sure the context is available globally for clients who need     // it...      RetrySynchronizationManager.register(context);      Throwable lastException = null;      boolean exhausted = false;      try {          //如果有注册RetryListener,则会调用它的open方法,给调用者一个通知。          boolean running = doOpenInterceptors(retryCallback, context);          if (!running) {              throw new TerminatedRetryException(                      "Retry terminated abnormally by interceptor before first attempt");          }          // Get or Start the backoff context...          BackOffContext backOffContext = null;          Object resource = context.getAttribute("backOffContext");          if (resource instanceof BackOffContext) {              backOffContext = (BackOffContext) resource;          }          if (backOffContext == null) {              backOffContext = backOffPolicy.start(context);              if (backOffContext != null) {                  context.setAttribute("backOffContext", backOffContext);              }          }          //判断能否重试,就是调用RetryPolicy的canRetry方法来判断。          //这个循环会直到原方法不抛出异常,或不需要再重试          while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {              try {                  if (this.logger.isDebugEnabled()) {                      this.logger.debug("Retry: count=" + context.getRetryCount());                  }                  //清除上次记录的异常                  lastException = null;                  //doWithRetry方法,一般来说就是原方法                  return retryCallback.doWithRetry(context);              }              catch (Throwable e) {                  //原方法抛出了异常                  lastException = e;                  try {                      //记录异常信息                      registerThrowable(retryPolicy, state, context, e);                  }                  catch (Exception ex) {                      throw new TerminatedRetryException("Could not register throwable",                              ex);                  }                  finally {                      //调用RetryListener的onError方法                      doOnErrorInterceptors(retryCallback, context, e);                  }                  //再次判断能否重试                  if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {                     try {                          //如果可以重试则走退避策略                          backOffPolicy.backOff(backOffContext);                      }                      catch (BackOffInterruptedException ex) {                          lastException = e;                          // back off was prevented by another thread - fail the retry                          if (this.logger.isDebugEnabled()) {                              this.logger                                      .debug("Abort retry because interrupted: count="                                              + context.getRetryCount());                          }                          throw ex;                      }                  }                  if (this.logger.isDebugEnabled()) {                      this.logger.debug(                              "Checking for rethrow: count=" + context.getRetryCount()                 }                 if (shouldRethrow(retryPolicy, context, state)) {                      if (this.logger.isDebugEnabled()) {                          this.logger.debug("Rethrow in retry for policy: count="                                  + context.getRetryCount());                      }                      throw RetryTemplate.wrapIfNecessary(e);                  }              }              /*               * A stateful attempt that can retry may rethrow the exception before now,               * but if we get this far in a stateful retry there's a reason for it,               * like a circuit breaker or a rollback classifier.               */              if (state != null && context.hasAttribute(GLOBAL_STATE)) {                  break;              }          }          if (state == null && this.logger.isDebugEnabled()) {              this.logger.debug(                      "Retry failed last attempt: count=" + context.getRetryCount());          }          exhausted = true;          //重试结束后如果有兜底Recovery方法则执行,否则抛异常          return handleRetryExhausted(recoveryCallback, context, state);      }      catch (Throwable e) {          throw RetryTemplate.wrapIfNecessary(e);      }     finally {          //处理一些关闭逻辑          close(retryPolicy, context, state, lastException == null || exhausted);          //调用RetryListener的close方法          doCloseInterceptors(retryCallback, context, lastException);          RetrySynchronizationManager.clear();      }  }

主要核心重试逻辑就是上面的代码了,看上去还是挺简单的。

在上面,我们漏掉了RetryPolicy的canRetry方法和BackOffPolicy的backOff方法,以及这两个Policy是怎么来的。我们回头看看getStatelessInterceptor方法中的getRetryPolicy和getRetryPolicy方法。

private RetryPolicy getRetryPolicy(Annotation retryable) {      Map attrs = AnnotationUtils.getAnnotationAttributes(retryable);      @SuppressWarnings("unchecked")     Class[] includes = (Class[]) attrs.get("value");      String exceptionExpression = (String) attrs.get("exceptionExpression");      boolean hasExpression = StringUtils.hasText(exceptionExpression);      if (includes.length == 0) {          @SuppressWarnings("unchecked")          Class[] value = (Class[]) attrs.get("include");          includes = value;      }      @SuppressWarnings("unchecked")      Class[] excludes = (Class[]) attrs.get("exclude");      Integer maxAttempts = (Integer) attrs.get("maxAttempts");      String maxAttemptsExpression = (String) attrs.get("maxAttemptsExpression");      if (StringUtils.hasText(maxAttemptsExpression)) {          maxAttempts = PARSER.parse_Expression(resolve(maxAttemptsExpression), PARSER_CONTEXT)                  .getValue(this.evaluationContext, Integer.class);      }      if (includes.length == 0 && excludes.length == 0) {          SimpleRetryPolicy simple = hasExpression ? new ExpressionRetryPolicy(resolve(exceptionExpression))                                                          .withBeanFactory(this.beanFactory)                                                   : new SimpleRetryPolicy();          simple.setMaxAttempts(maxAttempts);          return simple;      }      Map, Boolean> policyMap = new HashMap, Boolean>();      for (Class type : includes) {          policyMap.put(type, true);      }      for (Class type : excludes) {          policyMap.put(type, false);      }      boolean retryNotExcluded = includes.length == 0;      if (hasExpression) {          return new ExpressionRetryPolicy(maxAttempts, policyMap, true, exceptionExpression, retryNotExcluded)                  .withBeanFactory(this.beanFactory);      }      else {          return new SimpleRetryPolicy(maxAttempts, policyMap, true, retryNotExcluded);      }  }  嗯~,代码不难,这里简单做一下总结好了。就是通过@Retryable注解中的参数,来判断具体使用文章开头说到的哪个重试策略,是SimpleRetryPolicy还是ExpressionRetryPolicy等。  private BackOffPolicy getBackoffPolicy(Backoff backoff) {      long min = backoff.delay() == 0 ? backoff.value() : backoff.delay();      if (StringUtils.hasText(backoff.delay_Expression())) {          min = PARSER.parse_Expression(resolve(backoff.delay_Expression()), PARSER_CONTEXT)                  .getValue(this.evaluationContext, Long.class);      }      long max = backoff.maxDelay();      if (StringUtils.hasText(backoff.maxDelay_Expression())) {          max = PARSER.parse_Expression(resolve(backoff.maxDelay_Expression()), PARSER_CONTEXT)                  .getValue(this.evaluationContext, Long.class);      }      double multiplier = backoff.multiplier();      if (StringUtils.hasText(backoff.multiplier_Expression())) {          multiplier = PARSER.parse_Expression(resolve(backoff.multiplier_Expression()), PARSER_CONTEXT)                  .getValue(this.evaluationContext, Double.class);      }      if (multiplier > 0) {          ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();          if (backoff.random()) {              policy = new ExponentialRandomBackOffPolicy();          }          policy.setInitialInterval(min);          policy.setMultiplier(multiplier);          policy.setMaxInterval(max > min ? max : ExponentialBackOffPolicy.DEFAULT_MAX_INTERVAL);          if (this.sleeper != null) {              policy.setSleeper(this.sleeper);          }          return policy;      }      if (max > min) {          UniformRandomBackOffPolicy policy = new UniformRandomBackOffPolicy();          policy.setMinBackOffPeriod(min);          policy.setMaxBackOffPeriod(max);          if (this.sleeper != null) {              policy.setSleeper(this.sleeper);          }          return policy;      }      FixedBackOffPolicy policy = new FixedBackOffPolicy();      policy.setBackOffPeriod(min);      if (this.sleeper != null) {          policy.setSleeper(this.sleeper);      }      return policy;  }

嗯~,一样的味道。就是通过@Backoff注解中的参数,来判断具体使用文章开头说到的哪个退避策略,是FixedBackOffPolicy还是UniformRandomBackOffPolicy等。

那么每个RetryPolicy都会重写canRetry方法,然后在RetryTemplate判断是否需要重试。我们看看SimpleRetryPolicy的

@Override  public boolean canRetry(RetryContext context) {      Throwable t = context.getLastThrowable();      //判断抛出的异常是否符合重试的异常      //还有,是否超过了重试的次数      return (t == null || retryForException(t)) && context.getRetryCount() < maxAttempts;  }

同样,我们看看FixedBackOffPolicy的退避方法。

protected void doBackOff() throws BackOffInterruptedException {      try {          //就是sleep固定的时间          sleeper.sleep(backOffPeriod);      }      catch (InterruptedException e) {          throw new BackOffInterruptedException("Thread interrupted while sleeping", e);     }  }

至此,重试的主要原理以及逻辑大概就是这样了。

RetryContext

我觉得有必要说说RetryContext,先看看它的继承关系。

可以看出对每一个策略都有对应的Context。

在Spring Retry里,其实每一个策略都是单例来的。我刚开始直觉是对每一个需要重试的方法都会new一个策略,这样重试策略之间才不会产生冲突,但是一想就知道这样就可能多出了很多策略对象出来,增加了使用者的负担,这不是一个好的设计。

Spring Retry采用了一个更加轻量级的做法,就是针对每一个需要重试的方法只new一个上下文Context对象,然后在重试时,把这个Context传到策略里,策略再根据这个Context做重试,而且Spring Retry还对这个Context做了cache。这样就相当于对重试的上下文做了优化。

到此,关于"如何使用Spring中的重试机制"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

方法 策略 就是 逻辑 机制 还是 代码 状态 延迟 生成 原理 对象 注解 学习 功能 参数 部分 面的 上下 上下文 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 软件开发的工作量 香港服务器可以用来干啥 大家知道的网络安全接入控制 预付会计软件开发款会计分录 企鹅网络技术有限公司 数据库技术与程序设计学堂云 软件开发成都薪资水平 提示不是有效的服务器上传地址 企业网络安全电影 数据服务器操作系统 找亲家网络技术有限公司 mumble 服务器 重装系统为什么安装不了数据库 软件开发工程师蛋糕男生 数据库代数运算习题完整版 我的世界手机版创服务器视频 视频会议软件开发全网优惠 维护网络安全增强网络安全意识1 mfc数据库实验报告 通州区网络技术咨询收费 浙江高密度服务器云服务器 秋孟网络技术是什么意思 真实的较量 网络安全 我国网络安全法体系的特点 后台架构服务器 非安全逻辑服务器 西昌学院网络技术 海澜软件开发工资多少 对网络安全的看法200字 轻钢别墅龙骨设备加工软件开发
0