千家信息网

Guava中RateLimiter的实现原理是什么

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,这期内容当中小编将会给大家带来有关Guava中RateLimiter的实现原理是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1、RateLimiter基本属性
千家信息网最后更新 2025年01月24日Guava中RateLimiter的实现原理是什么

这期内容当中小编将会给大家带来有关Guava中RateLimiter的实现原理是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

1、RateLimiter基本属性

// RateLimiter属性/**  * The underlying timer; used both to measure elapsed time and sleep as necessary. A separate  * object to facilitate testing.  * 用来计时, RateLimiter将实例化的时间设置为0值, 后续都是相对时间  */private final SleepingStopwatch stopwatch;// Can't be initialized in the constructor because mocks don't call the constructor.// 锁, RateLimiter依赖于synchronized来控制并发, 限流器里面属性都没有加volatile修饰private volatile @Nullable Object mutexDoNotUseDirectly;
// SmoothRateLimiter属性/**  * The currently stored permits.  * 当前还有多少permits没有被使用, 被存下来的permits数量  */double storedPermits;/**  * The maximum number of stored permits.  * 最大允许缓存的permits数量, 也就是storedPermits能达到的最大值  */double maxPermits;/**  * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits  * per second has a stable interval of 200ms.  * 每隔多少时间产生一个permit  */double stableIntervalMicros;/**  * 下一次可以获取permits的时间, 这个时间也是相对于RateLimiter的构造时间的  *   * nextFreeTicketMicros是一个很关键的属性.我们每次获取permits的时候,先拿storedPermits的值,  * 如果够,storedPermits减去相应的值就可以了,如果不够,那么还需要将nextFreeTicketMicros往前推,  * 表示我预占了接下来多少时间的量了.那么下一个请求来的时候,如果还没到nextFreeTicketMicros这个时间点,  * 需要sleep到这个点再返回,当然也要将这个值再往前推  */private long nextFreeTicketMicros = 0L; // could be either in the past or future

2、SmoothBursty实现

2.1、构造方法

public static RateLimiter create(double permitsPerSecond) {    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());}@VisibleForTestingstatic RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {    // SmoothBursty默认缓存最多1s的permits, 不可修改, 也就是说最多会缓存1 * permitsPerSecond这么多个permits到池中    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);    rateLimiter.setRate(permitsPerSecond);    return rateLimiter;}SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {    super(stopwatch);    this.maxBurstSeconds = maxBurstSeconds;}public final void setRate(double permitsPerSecond) {    // 加锁    synchronized (mutex()) {       doSetRate(permitsPerSecond, stopwatch.readMicros());    }}@Overridefinal void doSetRate(double permitsPerSecond, long nowMicros) {    // 在关键节点, 会先更新下storedPermits到正确的值    resync(nowMicros);    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;    // 每隔多少时间产生一个permit    this.stableIntervalMicros = stableIntervalMicros;    doSetRate(permitsPerSecond, stableIntervalMicros);}void resync(long nowMicros) {    // if nextFreeTicket is in the past, resync to now    // 如果nextFreeTicketMicros已经过掉了, 想象一下很长时间没有再次调用limiter.acquire()的场景    // 需要将nextFreeTicketMicros设置为当前时间, 重新计算下storedPermits    if (nowMicros > nextFreeTicketMicros) {      // 新生成的permits, 构造函数中进来时生成的newPermits为无限大      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();      // 构造函数进来时maxPermits为0, 所以这里的storedPermits也是0      storedPermits = min(maxPermits, storedPermits + newPermits);      nextFreeTicketMicros = nowMicros;    }}@Overridedouble coolDownIntervalMicros() {    // 构造函数进来时, 此值为0.0    return stableIntervalMicros;}
@Overridevoid doSetRate(double permitsPerSecond, double stableIntervalMicros) {    double oldMaxPermits = this.maxPermits;    // maxPermits为1秒产生的permits    maxPermits = maxBurstSeconds * permitsPerSecond;    if (oldMaxPermits == Double.POSITIVE_INFINITY) {      // if we don't special-case this, we would get storedPermits == NaN, below      storedPermits = maxPermits;    } else {      // 等比例缩放storedPermits      storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state                                             : storedPermits * maxPermits / oldMaxPermits;      }}

2.2、acquire

// 常用api@CanIgnoreReturnValuepublic double acquire() {    return acquire(1);}@CanIgnoreReturnValuepublic double acquire(int permits) {    // 预约, 如果当前不能直接获取到permits, 需要等待, 返回值表示需要sleep多久    long microsToWait = reserve(permits);    // sleep    stopwatch.sleepMicrosUninterruptibly(microsToWait);    // 返回sleep的时长    return 1.0 * microsToWait / SECONDS.toMicros(1L);}final long reserve(int permits) {    checkPermits(permits);    synchronized (mutex()) {      return reserveAndGetWaitLength(permits, stopwatch.readMicros());    }}final long reserveAndGetWaitLength(int permits, long nowMicros) {    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);    // 返回当前线程为了拿这些许可需要睡眠多久, 如果立即可以拿到就不需要睡眠, 否则需要睡到nextFreeTicketMicros    return max(momentAvailable - nowMicros, 0);}/**   * 我们可以看到,获取permits的时候,其实是获取了两部分,一部分来自于存量storedPermits,存量不够的话,   * 另一部分来自于预占未来的freshPermits.这里提一个关键点吧,我们看到,返回值是nextFreeTicketMicros   * 的旧值,因为只要到这个时间点,就说明当次acquire可以成功返回了,而不管storedPermits够不够.   * 如果storedPermits不够,会将nextFreeTicketMicros往前推一定的时间,预占了一定的量.   */@Overridefinal long reserveEarliestAvailable(int requiredPermits, long nowMicros) {    // 这里做一次同步, 更新storePermits和nextFreeTicketMicros(如果需要)    resync(nowMicros);    // 刚刚已经更新过了    long returnValue = nextFreeTicketMicros;    // storedPermits可以使用多少个    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);    // storePermits不够的部分    double freshPermits = requiredPermits - storedPermitsToSpend;    // 为了这不够的部分, 需要等待多久.    // SmoothBursty中storedPermitsToWaitTime返回0, 直接就可以取    // SmoothWarmingUp的实现中,由于需要预热,所以从storedPermits中取permits需要花费一定的时间    long waitMicros =        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)            + (long) (freshPermits * stableIntervalMicros);    // 将nextFreeTicketMicros往前推    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);    // 减去被拿走的部分    this.storedPermits -= storedPermitsToSpend;    return returnValue;}@Overridelong storedPermitsToWaitTime(double storedPermits, double permitsToTake) {    // SmoothBursty中对于已经存储下来的storedPermits可以直接获取到, 不需要等待    return 0L;}

2.3、tryAcquire

public boolean tryAcquire(Duration timeout) {    return tryAcquire(1, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);}public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {    long timeoutMicros = max(unit.toMicros(timeout), 0);    checkPermits(permits);    long microsToWait;    synchronized (mutex()) {        long nowMicros = stopwatch.readMicros();        // 判断一下超时时间内能不能获得到锁, 不能获得直接返回false        if (!canAcquire(nowMicros, timeoutMicros)) {            return false;        } else {            // 可以获得的话再算下为了获得这个许可需要等待多长时间            // 这边上面已经分析过了            microsToWait = reserveAndGetWaitLength(permits, nowMicros);        }    }    stopwatch.sleepMicrosUninterruptibly(microsToWait);    return true;}private boolean canAcquire(long nowMicros, long timeoutMicros) {    // 就是看下nowMicros + timeoutMicros >= nextFreeTicketMicros    // 意思就是看下超时时间内有没有达到可以获取令牌的时间    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;}@Overridefinal long queryEarliestAvailable(long nowMicros) {    return nextFreeTicketMicros;}

2.4、setRate

public final void setRate(double permitsPerSecond) {    synchronized (mutex()) {       doSetRate(permitsPerSecond, stopwatch.readMicros());    }}@Overridefinal void doSetRate(double permitsPerSecond, long nowMicros) {    resync(nowMicros);    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;    this.stableIntervalMicros = stableIntervalMicros;    doSetRate(permitsPerSecond, stableIntervalMicros);}

3、SmoothWarmingUp实现

3.1、构造方法

// SmoothWamingUp实现public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod) {    return create(permitsPerSecond, toNanosSaturated(warmupPeriod), TimeUnit.NANOSECONDS);}public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {    return create(permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());}@VisibleForTestingstatic RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit,                          double coldFactor, SleepingStopwatch stopwatch) {    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);    rateLimiter.setRate(permitsPerSecond);    return rateLimiter;}SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {    super(stopwatch);    // 为了达到从0到maxPermits花费warmupPeriodMicros的时间    this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);    this.coldFactor = coldFactor;}

3.2、setRate

public final void setRate(double permitsPerSecond) {    synchronized (mutex()) {       doSetRate(permitsPerSecond, stopwatch.readMicros());    }}@Overridefinal void doSetRate(double permitsPerSecond, long nowMicros) {    // 实现和SmoothBursty一样    resync(nowMicros);    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;    this.stableIntervalMicros = stableIntervalMicros;    doSetRate(permitsPerSecond, stableIntervalMicros);}/**  * 含义是storedPermits中每个permit的增长速度  * 为了达到从 0 到 maxPermits 花费 warmupPeriodMicros 的时间  */@Overridedouble coolDownIntervalMicros() {    return warmupPeriodMicros / maxPermits;}@Overridevoid doSetRate(double permitsPerSecond, double stableIntervalMicros) {    double oldMaxPermits = maxPermits;    // coldFactor是固定的3    // 当达到maxPermits时, 此时系统处于最冷却的时候, 获取一个permit需要coldIntervalMicros    // 而如果storedPermits < thresholPermits的时候, 只需要stableIntervalMicros    double coldIntervalMicros = stableIntervalMicros * coldFactor;    // https://www.fangzhipeng.com/springcloud/2019/08/20/ratelimit-guava-sentinel.html    // https://blog.csdn.net/forezp/article/details/100060686    // 梯形的面积等于2倍的长方形的面积    thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;    maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);    // 计算斜线的斜率    slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);    if (oldMaxPermits == Double.POSITIVE_INFINITY) {      // if we don't special-case this, we would get storedPermits == NaN, below      storedPermits = 0.0;    } else {      // 等比例缩放      storedPermits = (oldMaxPermits == 0.0) ? maxPermits // initial state is cold                                             : storedPermits * maxPermits / oldMaxPermits;    }}

// storedPermits是上面两个图中最右侧那条绿色线, 表示RateLimiter已经存储了多少许可证, 那么获取storedPermits许可证// 时应当也是从最右侧开始拿, 从右往左减许可证. 因此就会出现3种情况// 1) storedPermits已经大于thresholdPermits, 而且所需的许可证permitsToTake右侧部分已经足够提供, 对应上图2// 2) storedPermits已经大于thresholdPermits, 而且所需的许可证右侧不够, 还需要从左侧拿, 对应上图1// 3) storedPermits小于thresholdPermits, 此时获取每个许可证所需的时间是固定的, 对应下面if逻辑返回false的情况@Overridelong storedPermitsToWaitTime(double storedPermits, double permitsToTake) {    double availablePermitsAboveThreshold = storedPermits - thresholdPermits;    long micros = 0;    // 计算梯形部分的面积    if (availablePermitsAboveThreshold > 0.0) {      double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);      double length = permitsToTime(availablePermitsAboveThreshold)                      + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);      micros = (long) (permitsAboveThresholdToTake * length / 2.0);      permitsToTake -= permitsAboveThresholdToTake;    }    // 计算长方形部分的面积    micros += (long) (stableIntervalMicros * permitsToTake);    return micros;}

上述就是小编为大家分享的Guava中RateLimiter的实现原理是什么了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

0