1.算法
在高并發的應用中,限流是一個繞不開的話題。限流可以保障我們的 API 服務對所有用戶的可用性,也可以防止網絡攻擊。
一般開發高并發系統常見的限流有:限制總并發數(比如數據庫連接池、線程池)、限制瞬時并發數(如 nginx 的 limit_conn 模塊,用來限制瞬時并發連接數)、限制時間窗口內的平均速率(如 Guava 的 RateLimiter、nginx 的 limit_req 模塊,限制每秒的平均速率);其他還有如限制遠程接口調用速率、限制 MQ 的消費速率。另外還可以根據網絡連接數、網絡流量、CPU 或內存負載等來限流。
限流算法
做限流 (Rate Limiting/Throttling) 的時候,除了簡單的控制并發,如果要準確的控制 TPS,簡單的做法是維護一個單位時間內的 Counter,如判斷單位時間已經過去,則將 Counter 重置零。此做法被認為沒有很好的處理單位時間的邊界,比如在前一秒的最后一毫秒里和下一秒的第一毫秒都觸發了最大的請求數,也就是在兩毫秒內發生了兩倍的 TPS。
微服務架構和分布式架構的區別,常用的更平滑的限流算法有兩種:漏桶算法和令牌桶算法。很多傳統的服務提供商如華為中興都有類似的專利,參考采用令牌漏桶進行報文限流的方法。
漏桶算法
漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶里,漏桶以一定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率),然后就拒絕請求,可以看出漏桶算法能強行限制數據的傳輸速率。
可見這里有兩個變量,一個是桶的大小,支持流量突發增多時可以存多少的水(burst),另一個是水桶漏洞的大小(rate)。因為漏桶的漏出速率是固定的參數,所以,即使網絡中不存在資源沖突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率。因此,漏桶算法對于存在突發特性的流量來說缺乏效率。
令牌桶算法
微服務gateway的作用?令牌桶算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的算法,更加容易理解。隨著時間流逝,系統會按恒定 1/QPS 時間間隔(如果 QPS=100,則間隔是 10ms)往桶里加入 Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了。新請求來臨時,會各自拿走一個 Token,如果沒有 Token 可拿了就阻塞或者拒絕服務。
令牌桶的另外一個好處是可以方便的改變速度。一旦需要提高速率,則按需提高放入桶中的令牌的速率。一般會定時(比如 100 毫秒)往桶中增加一定數量的令牌,有些變種算法則實時的計算應該增加的令牌的數量。Guava 中的 RateLimiter 采用了令牌桶的算法,設計思路參見?How is the RateLimiter designed, and why?,詳細的算法實現參見源碼。
?
feign調用經過網關嗎,?
?
本文討論在gateway集成的實現
2.創建gateway工程
詳情見:spring cloud網關gateway
微服務的五大組件?在此基礎上pom中加入?
<!--RequestRateLimiter限流--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis-reactive</artifactId></dependency>
3.配置類
package com.common.config;import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import reactor.core.publisher.Mono;/*** @Title:* @Auther: * @Date: 2019/8/28 17:13* @Version: 1.0* @Description:*/ @Configuration public class RequestRateLimiterConfig {@Bean@PrimaryKeyResolver apiKeyResolver() {//按URL限流,即以每秒內請求數按URL分組統計,超出限流的url請求都將返回429狀態return exchange -> Mono.just(exchange.getRequest().getPath().toString());}@BeanKeyResolver userKeyResolver() {//按用戶限流return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));}@BeanKeyResolver ipKeyResolver() {//按IP來限流return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());}}
4.yml配置
application.yml
spring:application:name: gateway8710cloud:gateway:default-filter:routes:- id: user-serverpredicates:- Path=/java/**filters:- StripPrefix=1# 限流過濾器,使用gateway內置令牌算法- name: RequestRateLimiterargs:# 令牌桶每秒填充平均速率,即行等價于允許用戶每秒處理多少個請求平均數redis-rate-limiter.replenishRate: 10# 令牌桶的容量,允許在一秒鐘內完成的最大請求數redis-rate-limiter.burstCapacity: 20# 用于限流的鍵的解析器的 Bean 對象的名字。它使用 SpEL 表達式根據#{@beanName}從 Spring 容器中獲取 Bean 對象。key-resolver: "#{@apiKeyResolver}"uri: lb://service-helloword# uri: "http://192.168.111.133:8708/project/hello"redis:#Redis數據庫索引(默認為0)database: 0#連接超時時間(毫秒) springboot2.0 中該參數的類型為Duration,這里在配置的時候需要指明單位timeout: 20s#密碼password: testcluster:# 獲取失敗 最大重定向次數max-redirects: 3#測試環境redisnodes:- 10.0.0.1:6380- 10.0.0.2:6380- 10.0.0.3:6380- 10.0.0.1:6381- 10.0.0.2:6381- 10.0.0.3:6381lettuce:pool:#連接池最大連接數(使用負值表示沒有限制)max-active: 300#連接池最大阻塞等待時間(使用負值表示沒有限制)max-wait: -1s#連接池中的最大空閑連接max-idle: 100#連接池中的最小空閑連接min-idle: 20 server:port: 8710 eureka:client:serviceUrl:#指向注冊中心defaultZone: http://192.168.111.133:8888/eureka/instance:# 每間隔1s,向服務端發送一次心跳,證明自己依然”存活“lease-renewal-interval-in-seconds: 1# 告訴服務端,如果我2s之內沒有給你發心跳,就代表我“死”了,將我踢出掉。lease-expiration-duration-in-seconds: 2
目錄結構如下
net微服務架構有哪些?
?
5.啟動測試
需要用jmeter來做并發測試,一秒內啟30個進程,重復發請求10000次。詳情見并發測試JMeter及發送Json請求
springcloud常用組件?
?
測試結果,沒有搶到令牌的請求就返回429,這邊的限流相當于平均request:10/s
spring boot簡介,
?redis中存儲項
?
?多個請求,如兩個(url分別為/project/getToken,/project/login)不同的并發請求
?
?
6.原理
基于redis+lua
lua腳本路徑
local tokens_key = KEYS[1] local timestamp_key = KEYS[2] local rate = tonumber(ARGV[1]) local capacity = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local requested = tonumber(ARGV[4])local fill_time = capacity/rate local ttl = math.floor(fill_time*2) local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil thenlast_tokens = capacity end --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil thenlast_refreshed = 0 end --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)local delta = math.max(0, now-last_refreshed) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed thennew_tokens = filled_tokens - requestedallowed_num = 1 end redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now)return { allowed_num, new_tokens }
引入腳本的地方
相關源碼:
限流源碼RedisRateLimiter
/** Copyright 2017-2019 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.springframework.cloud.gateway.filter.ratelimit;import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean;import javax.validation.constraints.Min;import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;import org.springframework.beans.BeansException; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.validation.Validator; import org.springframework.validation.annotation.Validated;/*** See https://stripe.com/blog/rate-limiters and* https://gist.github.com/ptarjan/e38f45f2dfe601419ca3af937fff574d#file-1-check_request_rate_limiter-rb-L11-L34.** @author Spencer Gibb* @author Ronny Br?unlich*/ @ConfigurationProperties("spring.cloud.gateway.redis-rate-limiter") public class RedisRateLimiter extends AbstractRateLimiter<RedisRateLimiter.Config>implements ApplicationContextAware {/*** @deprecated use {@link Config#replenishRate}*/@Deprecatedpublic static final String REPLENISH_RATE_KEY = "replenishRate";/*** @deprecated use {@link Config#burstCapacity}*/@Deprecatedpublic static final String BURST_CAPACITY_KEY = "burstCapacity";/*** Redis Rate Limiter property name.*/public static final String CONFIGURATION_PROPERTY_NAME = "redis-rate-limiter";/*** Redis Script name.*/public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript";/*** Remaining Rate Limit header name.*/public static final String REMAINING_HEADER = "X-RateLimit-Remaining";/*** Replenish Rate Limit header name.*/public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate";/*** Burst Capacity Header name.*/public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity";private Log log = LogFactory.getLog(getClass());private ReactiveRedisTemplate<String, String> redisTemplate;private RedisScript<List<Long>> script;private AtomicBoolean initialized = new AtomicBoolean(false);private Config defaultConfig;// configuration properties/*** Whether or not to include headers containing rate limiter information, defaults to* true.*/private boolean includeHeaders = true;/*** The name of the header that returns number of remaining requests during the current* second.*/private String remainingHeader = REMAINING_HEADER;/** The name of the header that returns the replenish rate configuration. */private String replenishRateHeader = REPLENISH_RATE_HEADER;/** The name of the header that returns the burst capacity configuration. */private String burstCapacityHeader = BURST_CAPACITY_HEADER;public RedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,RedisScript<List<Long>> script, Validator validator) {super(Config.class, CONFIGURATION_PROPERTY_NAME, validator);this.redisTemplate = redisTemplate;this.script = script;initialized.compareAndSet(false, true);}public RedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity) {super(Config.class, CONFIGURATION_PROPERTY_NAME, null);this.defaultConfig = new Config().setReplenishRate(defaultReplenishRate).setBurstCapacity(defaultBurstCapacity);}static List<String> getKeys(String id) {// use `{}` around keys to use Redis Key hash tags// this allows for using redis cluster// Make a unique key per user.String prefix = "request_rate_limiter.{" + id;// You need two Redis keys for Token Bucket.String tokenKey = prefix + "}.tokens";String timestampKey = prefix + "}.timestamp";return Arrays.asList(tokenKey, timestampKey);}public boolean isIncludeHeaders() {return includeHeaders;}public void setIncludeHeaders(boolean includeHeaders) {this.includeHeaders = includeHeaders;}public String getRemainingHeader() {return remainingHeader;}public void setRemainingHeader(String remainingHeader) {this.remainingHeader = remainingHeader;}public String getReplenishRateHeader() {return replenishRateHeader;}public void setReplenishRateHeader(String replenishRateHeader) {this.replenishRateHeader = replenishRateHeader;}public String getBurstCapacityHeader() {return burstCapacityHeader;}public void setBurstCapacityHeader(String burstCapacityHeader) {this.burstCapacityHeader = burstCapacityHeader;}@Override@SuppressWarnings("unchecked")public void setApplicationContext(ApplicationContext context) throws BeansException {if (initialized.compareAndSet(false, true)) {this.redisTemplate = context.getBean("stringReactiveRedisTemplate",ReactiveRedisTemplate.class);this.script = context.getBean(REDIS_SCRIPT_NAME, RedisScript.class);if (context.getBeanNamesForType(Validator.class).length > 0) {this.setValidator(context.getBean(Validator.class));}}}/* for testing */ Config getDefaultConfig() {return defaultConfig;}/*** This uses a basic token bucket algorithm and relies on the fact that Redis scripts* execute atomically. No other operations can run between fetching the count and* writing the new count.*/@Override@SuppressWarnings("unchecked")
// routeId也就是我們的fsh-house,id就是限流的URL,也就是/project/hello。public Mono<Response> isAllowed(String routeId, String id) {
// 會判斷RedisRateLimiter是否初始化了if (!this.initialized.get()) {throw new IllegalStateException("RedisRateLimiter is not initialized");}
// 獲取routeId對應的限流配置Config routeConfig = loadConfiguration(routeId);
// 允許用戶每秒做多少次請求// How many requests per second do you want a user to be allowed to do?int replenishRate = routeConfig.getReplenishRate();
// 令牌桶的容量,允許在一秒鐘內完成的最大請求數// How much bursting do you want to allow?int burstCapacity = routeConfig.getBurstCapacity();try {List<String> keys = getKeys(id);
// 限流key的名稱(request_rate_limiter.{/login}.timestamp,request_rate_limiter.{/login}.tokens)// The arguments to the LUA script. time() returns unixtime in seconds.List<String> scriptArgs = Arrays.asList(replenishRate + "",burstCapacity + "", Instant.now().getEpochSecond() + "", "1");
// 執行LUA腳本// allowed, tokens_left = redis.eval(SCRIPT, keys, args)Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys,scriptArgs);// .log("redisratelimiter", Level.FINER);return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))).reduce(new ArrayList<Long>(), (longs, l) -> {longs.addAll(l);return longs;}).map(results -> {boolean allowed = results.get(0) == 1L;Long tokensLeft = results.get(1);Response response = new Response(allowed,getHeaders(routeConfig, tokensLeft));if (log.isDebugEnabled()) {log.debug("response: " + response);}return response;});}catch (Exception e) {/** We don't want a hard dependency on Redis to allow traffic. Make sure to set* an alert so you know if this is happening too much. Stripe's observed* failure rate is 0.01%.*/log.error("Error determining if user allowed from redis", e);}return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));}/* for testing */ Config loadConfiguration(String routeId) {Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);if (routeConfig == null) {routeConfig = getConfig().get(RouteDefinitionRouteLocator.DEFAULT_FILTERS);}if (routeConfig == null) {throw new IllegalArgumentException("No Configuration found for route " + routeId + " or defaultFilters");}return routeConfig;}@NotNullpublic Map<String, String> getHeaders(Config config, Long tokensLeft) {Map<String, String> headers = new HashMap<>();if (isIncludeHeaders()) {headers.put(this.remainingHeader, tokensLeft.toString());headers.put(this.replenishRateHeader,String.valueOf(config.getReplenishRate()));headers.put(this.burstCapacityHeader,String.valueOf(config.getBurstCapacity()));}return headers;}@Validatedpublic static class Config {@Min(1)private int replenishRate;@Min(1)private int burstCapacity = 1;public int getReplenishRate() {return replenishRate;}public Config setReplenishRate(int replenishRate) {this.replenishRate = replenishRate;return this;}public int getBurstCapacity() {return burstCapacity;}public Config setBurstCapacity(int burstCapacity) {this.burstCapacity = burstCapacity;return this;}@Overridepublic String toString() {return "Config{" + "replenishRate=" + replenishRate + ", burstCapacity="+ burstCapacity + '}';}}}
?