SpringCloud中怎么实现gateway限流
SpringCloud中怎么实现gateway限流,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
路由过滤器允许以某种方式修改传入的HTTP请求或传出的HTTP响应,路径过滤器的范围限定为特定路径,Spring Cloud Gateway包含许多内置的GatewayFilter工厂。
Spring Cloud Gateway限流就是通过内置的RequestRateLimiterGateWayFilterFactory工厂来实现的。
当然,官方的肯定不能满足我们部分业务需求,因此可以自定义限流过滤器。
## yml如下配置,就可以为该路由添加此拦截器:
spring: cloud: gateway: routes: - id: test_route uri: localhost predicates: - Path=/host/address filters: - name: RequestRateLimiter args: ## 允许用户每秒执行多少请求,而不会丢弃任何请求。这是令牌桶填充的速率。 redis-rate-limiter.replenishRate: 1 ## 是一秒钟内允许用户执行的最大请求数。这是令牌桶可以容纳的令牌数。将此值设置为零将阻止所有请求。 redis-rate-limiter.burstCapacity: 3 ## KeyResolver是一个简单的获取用户请求参数 我这里以主机地址为key来作限流 key-resolver: "#{@hostAddrKeyResolver}"
## RequestRateLimiterGateWayFilterFactory代码:
//AbstractGatewayFilterFactory实现GatewayFilterFactory接口,自定义的过滤工厂可以继承//AbstractGatewayFilterFactory并编写apply方法public class RequestRateLimiterGatewayFilterFactory extends AbstractGatewayFilterFactory{ public static final String KEY_RESOLVER_KEY = "keyResolver"; private final RateLimiter defaultRateLimiter; private final KeyResolver defaultKeyResolver; public RequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter,KeyResolver defaultKeyResolver) { super(Config.class); this.defaultRateLimiter = defaultRateLimiter; this.defaultKeyResolver = defaultKeyResolver; } public KeyResolver getDefaultKeyResolver() { return defaultKeyResolver; } public RateLimiter getDefaultRateLimiter() { return defaultRateLimiter; } @SuppressWarnings("unchecked") @Override public GatewayFilter apply(Config config) { //yml中我们配置的hostAddrKeyResolver KeyResolver resolver = (config.keyResolver == null) ? defaultKeyResolver : config.keyResolver; //这个就是限流的具体实现,默认使用RedisRateLimiter RateLimiter
分析:
1.加载KeyResolver,从配置文件中加载,此处我配置了hostAddrKeyResolver,即根据host地址来进行限流。如果为空,使用默认的PrincipalNameKeyResolver
2.加载RateLimiter,默认使用RedisRateLimiter。
3.执行RedisRateLimiter的isAllowed方法,得到response,如果isAllowed为true则通过拦截,否则返回429(isAllowed方法具体实现下文描述)。
## HostAddrKeyResolver:
@Slf4jpublic class HostAddrKeyResolver implements KeyResolver { @Override public Monoresolve(ServerWebExchange exchange) { log.info("HostAddrKeyResolver 限流"); return Mono.just(exchange.getRequest().getRemoteAddress().getHostName()); }}
在启动类中注入bean
@Beanpublic HostAddrKeyResolver hostAddrKeyResolver() { return new HostAddrKeyResolver();}
## RedisRateLimiter:
@Override @SuppressWarnings("unchecked") public MonoisAllowed(String routeId, String id) { //判断是否初始化 if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } //获取配置 Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig); if (routeConfig == null) { throw new IllegalArgumentException("No Configuration found for route " + routeId); } //令牌桶填充速率 int replenishRate = routeConfig.getReplenishRate(); //令牌桶可容纳令牌数 int burstCapacity = routeConfig.getBurstCapacity(); try { //获取redis的key,执行lua脚本时传入 List keys = getKeys(id); //获取参数,执行lua脚本时传入 List scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1"); Flux > flux = this.redisTemplate.execute(this.script, keys, scriptArgs); // .log("redisratelimiter", Level.FINER); return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) .reduce(new ArrayList
(), (longs, l) -> { longs.addAll(l); return longs; }) .map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft)); if (log.isDebugEnabled()) { log.debug("response: " + response); } return response; }); } catch (Exception e) { /* * We don't want a hard dependency on Redis to allow traffic. Make sure to set * an alert so you know if this is happening too much. Stripe's observed * failure rate is 0.01%. */ log.error("Error determining if user allowed from redis", e); } return Mono.just(new Response(true, getHeaders(routeConfig, -1L))); } @NotNull public HashMap getHeaders(Config config, Long tokensLeft) { HashMap headers = new HashMap<>(); headers.put(this.remainingHeader, tokensLeft.toString()); headers.put(this.replenishRateHeader, String.valueOf(config.getReplenishRate())); headers.put(this.burstCapacityHeader, String.valueOf(config.getBurstCapacity())); return headers; } static List getKeys(String id) { // use {} around keys to use Redis Key hash tags // this allows for using redis cluster // Make a unique key per user. String prefix = "request_rate_limiter.{" + id; //令牌桶剩余令牌数 String tokenKey = prefix + "}.tokens"; //令牌桶最后填充令牌时间 String timestampKey = prefix + "}.timestamp"; return Arrays.asList(tokenKey, timestampKey); }
分析:
1.判断是否初始化,加载配置,获取令牌填充速率和令牌桶大小
2.根据路由id组合成两个redis中的key值,传入lua脚本
request_rate_limiter.{id}.tokens 令牌桶剩余令牌数
request_rate_limiter.{id}.timestamp 令牌桶最后填充令牌时间
3.把令牌填充速率,令牌桶大小,当前时间(单位:秒),消耗令牌数(默认为1)组合传入lua脚本
4.执行lua脚本
5.flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) 这个是对执行lua脚本过程中发生异常的处理,它会忽略异常,返回令牌。这样就能跟redis解耦,不对它强依赖。
该实现核心主要体现在lua脚本上,它使用的是令牌桶算法
详见spring-cloud-gateway-core下的request_rate_limiter.lua
## 获取剩余令牌数的redis keylocal tokens_key = KEYS[1]## 获取最后一次填充令牌的时间local timestamp_key = KEYS[2]--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key) ## 令牌填充速率local rate = tonumber(ARGV[1])## 令牌桶大小local capacity = tonumber(ARGV[2])## 当前秒数local now = tonumber(ARGV[3])## 消耗令牌数,默认1local requested = tonumber(ARGV[4]) ## 计算令牌桶需要填充的时间local fill_time = capacity/rate## 计算key的存活时间local ttl = math.floor(fill_time2) --redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)--redis.log(redis.LOG_WARNING, "ttl " .. ttl) ## 获取剩余的令牌数local last_tokens = tonumber(redis.call("get", tokens_key))if last_tokens == nil then last_tokens = capacityend--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens) ## 获取令牌最后填充时间local last_refreshed = tonumber(redis.call("get", timestamp_key))if last_refreshed == nil then last_refreshed = 0end--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed) local delta = math.max(0, now-last_refreshed)## 计算得到剩余的令牌数local filled_tokens = math.min(capacity, last_tokens+(deltarate))## 大于请求消耗令牌 allowed 设为truelocal allowed = filled_tokens >= requestedlocal new_tokens = filled_tokenslocal allowed_num = 0if allowed then new_tokens = filled_tokens - requested allowed_num = 1end --redis.log(redis.LOG_WARNING, "delta " .. delta)--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens) redis.call("setex", tokens_key, ttl, new_tokens)redis.call("setex", timestamp_key, ttl, now) return { allowed_num, new_tokens }
看完上述内容,你们掌握SpringCloud中怎么实现gateway限流的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!