限流概述

系统存在服务上限,流量超过服务上限会导致系统卡死、崩溃。
限流:为了在高并发时系统稳定可用,牺牲或延迟部分请求流量以保证系统整体服务可用。

限流算法

  • 固定窗口计数
    • 将时间划分为多个窗口;
    • 在每个窗口内每有一次请求就将计数器加一;
    • 如果计数器超过了限制数量,则本窗口内所有的请求都被丢弃当时间到达下一个窗口时,计数器重置。
  • 滑动窗口计数
    • 将时间划分为多个区间;
    • 在每个区间内每有一次请求就将计数器加一维持一个时间窗口,占据多个区间;
    • 每经过一个区间的时间,则抛弃最老的一个区间,并纳入最新的一个区间;
    • 如果当前窗口内区间的请求计数总和超过了限制数量,则本窗口内所有的请求都被丢弃。
  • 漏桶
    • 将每个请求视作"水滴"放入"漏桶"进行存储;
    • "漏桶"以固定速率向外"漏"出请求来执行如果"漏桶"空了则停止"漏水";
    • 如果"漏桶"满了则多余的"水滴"会被直接丢弃。
  • 令牌桶
    • 令牌以固定速率生成;
    • 生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行;
    • 如果桶空了,那么尝试取令牌的请求会被直接丢弃。

漏桶和令牌桶对比

  • 两者实际上是相同的
    • 在实现上是相同的基本算法,描述不同。
    • 给定等效参数的情况下,这两种算法会将完全相同的数据包视为符合或不符合。
    • 两者实现的属性和性能差异完全是由于实现的差异造成的,即它们不是源于底层算法的差异。
  • 漏桶算法在用作计量时,可以允许具有抖动或突发性的一致输出数据包流,可用于流量管制和整形,并且可以用于可变长度数据包。
  • 参考:
    • 漏桶 - wikipedia
    • 令牌桶 - wikipedia

相关阅读:

  • 分布式服务限流实战,已经为你排好坑了
  • 接口限流算法总结 - 穿林度水

限流注解组件实现

  1. 利用 Spring 拦截器实现
  2. 使用方式:Controller 方法或类加上限流注解,请求到达拦截器时进行拦截处理
  3. 使用 Redis 记录数据,Lua 保证多个命令原子性执行。
  • 使用示例

    @RestController
    @RequestMapping("/ratelimit/custom")
    @RateLimit(threshold = 10, rateLimiter = RateLimiterEnum.FIXED_WINDOW, time = 10, timeUnit = TimeUnit.SECONDS)
    public class RateLimitController {
        @GetMapping("/fixed/window")
        @RateLimit(threshold = 10, rateLimiter = RateLimiterEnum.FIXED_WINDOW, time = 10, timeUnit = TimeUnit.SECONDS)
        public ResponseResult<String> fixedWindow(Long id) {
            id += RandomUtil.randomLong();
            log.info("custom:fixedWindow:{}", id);
            return ResponseResult.success("custom:fixedWindow:" + id);
        }
    }
    
  • 限流注解 RateLimit.java

    • 支持不同类型缓存 key: key() + keyType()
    • 支持使用不同限流算法: rateLimiter()
    • 限流流量阈值设置: threshold()
    • 限流时长设置: time() + timeUnit()
  • 限流拦截器处理 RateLimitInterceptor.java
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        if (!(handler instanceof HandlerMethod)) {
            return true;
        }
        HandlerMethod handlerMethod = ((HandlerMethod) handler);
        // 从方法和类上获取注解
        RateLimit annotation = AspectUtil.findMethodOrClassAnnotation(handlerMethod.getMethod(),
                RateLimit.class);
        if (annotation == null) {
            return true;
        }
        AspectKeyTypeEnum.KeyTypeData data = AspectKeyTypeEnum.KeyTypeData.builder()
                .prefix("rate:limit").key(annotation.key()).build();
        String limitKey = annotation.keyType()
                .obtainTypeKey(handlerMethod.getMethod(), handlerMethod.getMethodParameters(), data);
        RateLimiterEnum limiterEnum = annotation.rateLimiter();
        // 执行限流脚本
        Long isLimit = redisUtil.execute(limiterEnum.obtainScript(),
                Lists.newArrayList(limitKey), limiterEnum.obtainArgvs(annotation).toArray());
        if (isLimit != null && isLimit != 0L) {
            return true;
        }
        throw new ResponseException(ResponseEnum.RATE_LIMITED);
    }
    
  • 限流算法 lua 脚本

    固定窗口: fixed_window_rate_limiter.lua
      ```lua
      -- 限流key ,string 保存调用限流的次数
      local key = KEYS[1];
      -- 最大访问量
      local capacity = tonumber(ARGV[1]);
      -- 限流时长(毫秒)
      local ttl = tonumber(ARGV[2]);
      local count = redis.call('INCR', key);
      if (count == 1) then
          -- 首次访问设置过期时间
          redis.call('PEXPIRE', key, ttl);
      end
      local res = 0;
      if (count <= capacity) then
          res = 1;
      end
      -- 被限流返回0,未被限流返回1
      return res;
      ```
    
    滑动窗口: sliding_window_rate_limiter.lua
      ```lua
      -- 限流 key , zset 保存未被限流的 id 与时间戳
      local key = KEYS[1];
      -- 最大访问量
      local capacity = tonumber(ARGV[1]);
      -- 限流时长(毫秒)
      local ttl = tonumber(ARGV[2]);
      -- 当前时间戳(毫秒)
      local now = tonumber(ARGV[3]);
      -- 唯一ID
      local ukid = ARGV[4];
      -- 清除过期的数据
      redis.call('ZREMRANGEBYSCORE', key, 0, now - ttl);
      local count = redis.call('ZCARD', key);
      local res = 0;
      if (count < capacity) then
          -- 往 zset 中添加一个值、得分均为当前时间戳的元素,[value,score]
          redis.call("ZADD", key, now, ukid);
          -- 重置 zset 的过期时间,单位毫秒
          redis.call("PEXPIRE", key, ttl);
          res = 1;
      end
      -- 被限流返回0,未被限流返回1
      return res;
      ```
    
    漏桶: leaky_bucket_rate_limiter.lua
      ```lua
      -- 限流 key , hash 保存限流相关信息
      local key = KEYS[1];
      -- 最大访问量
      local capacity = tonumber(ARGV[1]);
      -- 限流时长(毫秒)
      local ttl = tonumber(ARGV[2]);
      -- 当前时间戳(毫秒)
      local now = tonumber(ARGV[3]);
      -- 水流出速率(每毫秒)
      local rate = tonumber(ARGV[4]);
      -- 限流信息
      local info = redis.call("HMGET", key, "last_time", "stored_water");
      -- 上次处理时间
      local last_time = tonumber(info[1]);
      -- 当前存储的水量,默认为0,存在保存值使用保存值
      local stored_water = tonumber(info[2]);
      if (stored_water == nil) then
          stored_water = 0;
      end
      if (last_time ~= nil) then
          -- 根据上次处理时间和当前时间差,计算流出后的水量
          local leaked_water = math.floor((now - last_time) * rate);
          stored_water = math.max(stored_water - leaked_water, 0);
          if (leaked_water > 0) then
              last_time = nil;
          end
      end
      -- 首次访问、泄露了水 设置上次处理时间
      if (last_time == nil) then
          redis.call("HSET", key, "last_time", now);
      end
      -- 被限流返回0,未被限流返回1
      local res = 0;
      if (capacity > stored_water) then
          redis.call("HSET", key, "stored_water", stored_water + 1);
          res = 1;
      end
      redis.call("PEXPIRE", key, ttl);
      return res;
      ```
    
    令牌桶: token_bucket_rate_limiter.lua
      ```lua
      -- 限流 key , hash 保存限流相关信息
      local key = KEYS[1];
      -- 最大访问量
      local capacity = tonumber(ARGV[1]);
      -- 限流时长(毫秒)
      local ttl = tonumber(ARGV[2]);
      -- 当前时间戳(毫秒)
      local now = tonumber(ARGV[3]);
      -- 生成令牌速率(每毫秒)
      local rate = tonumber(ARGV[4]);
      -- 限流信息
      local info = redis.call("HMGET", key, "last_time", "stored_tokens");
      -- 上次处理时间
      local last_time = tonumber(info[1]);
      -- 令牌数量,默认为最大访问量,存在保存值使用保存值
      local stored_tokens = tonumber(info[2]);
      if (stored_tokens == nil) then
          stored_tokens = capacity;
      end
      if (last_time ~= nil) then
          -- 根据上次处理时间和当前时间差,触发式往桶里添加令牌
          local add_tokens = math.floor((now - last_time) * rate);
          stored_tokens = math.min(add_tokens + stored_tokens, capacity);
          if (add_tokens > 0) then
              last_time = nil;
          end
      end
      -- 首次访问、添加了令牌 设置上次处理时间
      if (last_time == nil) then
          redis.call("HSET", key, "last_time", now);
      end
      -- 被限流返回0,未被限流返回1
      local res = 0;
      if (stored_tokens > 0) then
          redis.call("HSET", key, "stored_tokens", stored_tokens - 1);
          res = 1;
      end
      redis.call("PEXPIRE", key, ttl);
      return res;
      ```
    

其他

demo 地址:https://github.com/EastX/java-practice-demos/tree/main/demo-ratelimit

推荐阅读:

  • 限流 - JavaGuide
  • 架构之高并发:限流 - pdai
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。