Java干货丨限流常规设计和实例( 六 )

然后看下获取令牌:
public double acquire() { return acquire(1); }public double acquire(int permits) { long microsToWait = reserve(permits); // 等待microsToWait时间 控制速率 stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); }final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } // 获得需要等待的时间 final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); }// 子类SmoothRateLimiter实现 abstract long reserveEarliestAvailable(int permits, long nowMicros);reserveEarliestAvailable方法 , 刷新下一个请求能被授予的时间
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { // 每次acquire都会触发resync resync(nowMicros); // 返回值就是下一个请求能被授予的时间 long returnValue = https://www.isolves.com/it/cxkf/yy/JAVA/2019-11-29/nextFreeTicketMicros; // 选出请求令牌数和存储令牌数较小的一个 也就是从存储的令牌中需要消耗的数量 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 计算本次请求中需要行创建的令牌 double freshPermits = requiredPermits - storedPermitsToSpend; // 计算需要等待的时间 就是存储令牌消耗的时间+新令牌产生需要的时间 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); // 刷新下一个请求能被授予的时间 是将这次等待的时间加上原先的值 就是把这次请求需要产生的等待时间延迟给下一次请求 这就是一个大请求会马上授予 但后续的请求会被等待长时间 所以这里的思路核心就是再每次请求时都是在预测下一次请求到来的时间this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 刷新存储令牌 this.storedPermits -= storedPermitsToSpend; return returnValue; }// 这个是用来计算存储令牌在消耗时的节流时间 也就是通过这个方法子类可以控存储令牌的速率 我们看到的SmoothBursty的实现是始终返回0 表示消耗存储的令牌不需要额外的等待时间 我们在预热的实现中可以看到不一样的实现 abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);再来看一下请求令牌方法带超时时间的方法:
public boolean tryAcquire(long timeout, TimeUnit unit) { return tryAcquire(1, timeout, unit); } public boolean tryAcquire(int permits) { return tryAcquire(permits, 0, MICROSECONDS); } public boolean tryAcquire() { return tryAcquire(1, 0, MICROSECONDS); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); // 判定设置的超时时间是否足够等待下一个令牌的给予 , 等不了 , 就直接失败 if (!canAcquire(nowMicros, timeoutMicros)) { return false; } else { // 获得需要等待的时间 microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } // 等待 stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; }private boolean canAcquire(long nowMicros, long timeoutMicros) { return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; }以上可以基本理解一个普通的限流器的实现方式 , 可以看到实现中可以通过doSetRate , storedPermitsToWaitTime , coolDownIntervalMicros方法进行定制自己的限流策略 。
那么这里的SmoothBursty的策略是:

  • 桶大小通过固定的maxBurstSeconds控制 maxPermits = maxBurstSeconds * permitsPerSecond;
  • 消耗累积令牌不计入时间到等待时间中
  • 累积令牌时的速率和令牌消耗速率保持一致
我们继续看稍微复杂点的SmoothWarmingUp , 毕竟为了说明它人家作者都用注释画了示意图 。
static final class SmoothWarmingUp extends SmoothRateLimiter { // 预热累计消耗时间 private final long warmupPeriodMicros; /** * The slope of the line from the stable interval (when permits == 0), to the cold interval * (when permits == maxPermits) */ private double slope; private double thresholdPermits; // 冷却因子 固定是3 意思是通过这个因子可以计算在令牌桶满的时候 , 消耗令牌需要的最大时间 private double coldFactor; SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) { super(stopwatch); this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); this.coldFactor = coldFactor; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = maxPermits; // 通过coldFactor就可以算出coldInterval的最高点 即stableIntervalMicros的3倍 也就是说图中的梯形最高点是固定的了double coldIntervalMicros = stableIntervalMicros * coldFactor; // 根据warmupPeriodMicros*2=thresholdPermits*stableIntervalMicros换算thresholdPermits 因为我们看到梯形最高点是固定的 那么通过设置warmupPeriod是可以控制thresholdPermits , 从而控制maxPermits的值 thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; // 也是根据上面提到的公式计算maxPermits maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); // 倾斜角度 slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = 0.0; } else { storedPermits = (oldMaxPermits == 0.0) ? maxPermits // 这里的初始值是maxPermits : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { // 超过thresholdPermits的存储令牌 double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0; // measuring the integral on the right part of the function (the climbing line) if (availablePermitsAboveThreshold > 0.0) { double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); // 这里就开始算这个梯形的面积了 梯形面积=(上底+下底)*高/2 double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); micros = (long) (permitsAboveThresholdToTake * length / 2.0); permitsToTake -= permitsAboveThresholdToTake; } // measuring the integral on the left part of the function (the horizontal line) micros += (long) (stableIntervalMicros * permitsToTake); return micros; } private double permitsToTime(double permits) { return stableIntervalMicros + permits * slope; } // 为了确保从0到maxpermit所花费的时间等于预热时间 可以看下resync方法中对coolDownIntervalMicros方法的使用 @Override double coolDownIntervalMicros() { return warmupPeriodMicros / maxPermits; } }


推荐阅读