四种分布式限流算法实现!( 四 )


  • 代码实现
public class LeakyBucketRateLimiter {private RedissonClient redissonClient = RedissonConfig.getInstance();private static final String KEY_PREFIX = "LeakyBucket:";/*** 桶的大小*/private Long bucketSize;/*** 漏水速率 , 单位:个/秒*/private Long leakRate;public LeakyBucketRateLimiter(Long bucketSize, Long leakRate) {this.bucketSize = bucketSize;this.leakRate = leakRate;//这里启动一个定时任务 , 每s执行一次ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);executorService.scheduleAtFixedRate(this::leakWater, 0, 1, TimeUnit.SECONDS);}/*** 漏水*/public void leakWater() {RSet<String> pathSet=redissonClient.getSet(KEY_PREFIX+":pathSet");//遍历所有path,删除旧请求for(String path:pathSet){String redisKey = KEY_PREFIX + path;RScoredSortedSet<Long> bucket = redissonClient.getScoredSortedSet(KEY_PREFIX + path);// 获取当前时间long now = System.currentTimeMillis();// 删除旧的请求bucket.removeRangeByScore(0, true,now - 1000 * leakRate,true);}}/*** 限流*/public boolean triggerLimit(String path) {//加锁 , 防止并发初始化问题RLock rLock = redissonClient.getLock(KEY_PREFIX + "LOCK:" + path);try {rLock.lock(100,TimeUnit.MILLISECONDS);String redisKey = KEY_PREFIX + path;RScoredSortedSet<Long> bucket = redissonClient.getScoredSortedSet(redisKey);//这里用一个set , 来存储所有pathRSet<String> pathSet=redissonClient.getSet(KEY_PREFIX+":pathSet");pathSet.add(path);// 获取当前时间long now = System.currentTimeMillis();// 检查桶是否已满if (bucket.size() < bucketSize) {// 桶未满 , 添加一个元素到桶中bucket.add(now,now);return false;}// 桶已满 , 触发限流System.out.println("[triggerLimit] path:"+path+" bucket size:"+bucket.size());return true;}finally {rLock.unlock();}}}在代码实现里 , 我们用了RSet来存储path , 这样一来 , 一个定时任务 , 就可以搞定所有path对应的桶的出水 , 而不用每个桶都创建一个一个定时任务 。
这里我直接用ScheduledExecutorService启动了一个定时任务 , 1s跑一次 , 当然集群环境下 , 每台机器都跑一个定时任务 , 对性能是极大的浪费 , 而且不好管理 , 我们可以用分布式定时任务 , 比如xxl-job去执行leakWater 。
  • 最后还是大家熟悉的测试
 class LeakyBucketRateLimiterTest {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(30, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));@Test@DisplayName("漏桶算法")void triggerLimit() throws InterruptedException {LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(10L, 1L);for (int i = 0; i < 8; i++) {CountDownLatch countDownLatch = new CountDownLatch(20);for (int j = 0; j < 20; j++) {threadPoolExecutor.execute(() -> {boolean isLimit = leakyBucketRateLimiter.triggerLimit("/test");System.out.println(isLimit);countDownLatch.countDown();});}countDownLatch.await();//休眠10sTimeUnit.SECONDS.sleep(10L);}}}漏桶算法能够有效防止网络拥塞 , 实现也比较简单 。
但是 , 因为漏桶的出水速率是固定的 , 假如突然来了大量的请求 , 那么只能丢弃超量的请求 , 即使下游能处理更大的流量 , 没法充分利用系统资源 。
令牌桶算法令牌桶算法来了!
算法原理令牌桶算法是对漏桶算法的一种改进 。
它的主要思想是:系统以一种固定的速率向桶中添加令牌 , 每个请求在发送前都需要从桶中取出一个令牌 , 只有取到令牌的请求才被通过 。因此 , 令牌桶算法允许请求以任意速率发送 , 只要桶中有足够的令牌 。
四种分布式限流算法实现!

文章插图
令牌桶算法
算法实现我们继续看怎么实现 , 首先是要发放令牌 , 要固定速率 , 那我们又得开个线程 , 定时往桶里投令牌 , 然后……
——然后Redission提供了令牌桶算法的实现 , 舒不舒服?
拿来吧你
拿来就用!
  • 代码实现
public class TokenBucketRateLimiter {public static final String KEY = "TokenBucketRateLimiter:";/*** 阈值*/private Long limit;/*** 添加令牌的速率 , 单位:个/秒*/private Long tokenRate;public TokenBucketRateLimiter(Long limit, Long tokenRate) {this.limit = limit;this.tokenRate = tokenRate;}/*** 限流算法*/public boolean triggerLimit(String path){RedissonClient redissnotallow=RedissonConfig.getInstance();RRateLimiter rateLimiter = redissonClient.getRateLimiter(KEY+path);// 初始化 , 设置速率模式 , 速率 , 间隔 , 间隔单位rateLimiter.trySetRate(RateType.OVERALL, limit, tokenRate, RateIntervalUnit.SECONDS);// 获取令牌return rateLimiter.tryAcquire();}}


推荐阅读