實(shí)戰(zhàn)?Spring?Cloud?Gateway?之限流篇
來源:https://www.aneasystone.com/archives/2020/08/spring-cloud-gateway-current-limiting.html
話說在 Spring Cloud Gateway 問世之前,Spring Cloud 的微服務(wù)世界里,網(wǎng)關(guān)一定非 Netflix Zuul 莫屬。但是由于 Zuul 1.x 存在的一些問題,比如阻塞式的 API,不支持 WebSocket 等,一直被人所詬病,而且 Zuul 升級新版本依賴于 Netflix 公司,經(jīng)過幾次跳票之后,Spring 開源社區(qū)決定推出自己的網(wǎng)關(guān)組件,替代 Netflix Zuul。從 18 年 6 月 Spring Cloud 發(fā)布的 Finchley 版本開始,Spring Cloud Gateway 逐漸嶄露頭角,它基于 Spring 5.0、Spring Boot 2.0 和 Project Reactor 等技術(shù)開發(fā),不僅支持響應(yīng)式和無阻塞式的 API,而且支持 WebSocket,和 Spring 框架緊密集成。盡管 Zuul 后來也推出了 2.x 版本,在底層使用了異步無阻塞式的 API,大大改善了其性能,但是目前看來 Spring 并沒有打算繼續(xù)集成它的計(jì)劃。根據(jù)官網(wǎng)的描述,Spring Cloud Gateway 的主要特性如下:
- Built on Spring Framework 5, Project Reactor and Spring Boot 2.0
- Able to match routes on any request attribute
- Predicates and filters are specific to routes
- Hystrix Circuit Breaker integration
- Spring Cloud DiscoveryClient integration
- Easy to write Predicates and Filters
- Request Rate Limiting
- Path Rewriting
一、常見的限流場景
緩存、降級?和?限流?被稱為高并發(fā)、分布式系統(tǒng)的三駕馬車,網(wǎng)關(guān)作為整個(gè)分布式系統(tǒng)中的第一道關(guān)卡,限流功能自然必不可少。通過限流,可以控制服務(wù)請求的速率,從而提高系統(tǒng)應(yīng)對突發(fā)大流量的能力,讓系統(tǒng)更具彈性。限流有著很多實(shí)際的應(yīng)用場景,比如雙十一的秒殺活動, 12306 的搶票等。1.1 限流的對象
通過上面的介紹,我們對限流的概念可能感覺還是比較模糊,到底限流限的是什么?顧名思義,限流就是限制流量,但這里的流量是一個(gè)比較籠統(tǒng)的概念。如果考慮各種不同的場景,限流是非常復(fù)雜的,而且和具體的業(yè)務(wù)規(guī)則密切相關(guān),可以考慮如下幾種常見的場景:- 限制某個(gè)接口一分鐘內(nèi)最多請求 100 次
- 限制某個(gè)用戶的下載速度最多 100KB/S
- 限制某個(gè)用戶同時(shí)只能對某個(gè)接口發(fā)起 5 路請求
- 限制某個(gè) IP 來源禁止訪問任何請求
1.2 限流的處理方式
在系統(tǒng)中設(shè)計(jì)限流方案時(shí),有一個(gè)問題值得設(shè)計(jì)者去仔細(xì)考慮,當(dāng)請求者被限流規(guī)則攔截之后,我們該如何返回結(jié)果。一般我們有下面三種限流的處理方式:- 拒絕服務(wù)
- 排隊(duì)等待
- 服務(wù)降級
1.3 限流的架構(gòu)
針對不同的系統(tǒng)架構(gòu),需要使用不同的限流方案。如下圖所示,服務(wù)部署的方式一般可以分為單機(jī)模式和集群模式:


二、常見的限流算法
通過上面的學(xué)習(xí),我們知道限流可以分為請求頻率限流和并發(fā)量限流,根據(jù)系統(tǒng)架構(gòu)的不同,又可以分為網(wǎng)關(guān)層限流和分布式限流。在不同的應(yīng)用場景下,我們需要采用不同的限流算法。這一節(jié)將介紹一些主流的限流算法。有一點(diǎn)要注意的是,利用池化技術(shù)也可以達(dá)到限流的目的,比如線程池或連接池,但這不是本文的重點(diǎn)。2.1 固定窗口算法(Fixed Window)
固定窗口算法是一種最簡單的限流算法,它根據(jù)限流的條件,將請求時(shí)間映射到一個(gè)時(shí)間窗口,再使用計(jì)數(shù)器累加訪問次數(shù)。譬如限流條件為每分鐘 5 次,那么就按照分鐘為單位映射時(shí)間窗口,假設(shè)一個(gè)請求時(shí)間為 11:00:45,時(shí)間窗口就是 11:00:00 ~ 11:00:59,在這個(gè)時(shí)間窗口內(nèi)設(shè)定一個(gè)計(jì)數(shù)器,每來一個(gè)請求計(jì)數(shù)器加一,當(dāng)這個(gè)時(shí)間窗口的計(jì)數(shù)器超過 5 時(shí),就觸發(fā)限流條件。當(dāng)請求時(shí)間落在下一個(gè)時(shí)間窗口內(nèi)時(shí)(11:01:00 ~ 11:01:59),上一個(gè)窗口的計(jì)數(shù)器失效,當(dāng)前的計(jì)數(shù)器清零,重新開始計(jì)數(shù)。計(jì)數(shù)器算法非常容易實(shí)現(xiàn),在單機(jī)場景下可以使用 AtomicLong、LongAdder 或 Semaphore 來實(shí)現(xiàn)計(jì)數(shù),而在分布式場景下可以通過 Redis 的 INCR 和 EXPIRE 等命令并結(jié)合 EVAL 或 lua 腳本來實(shí)現(xiàn),Redis 官網(wǎng)提供了幾種簡單的實(shí)現(xiàn)方式。無論是請求頻率限流還是并發(fā)量限流都可以使用這個(gè)算法。不過這個(gè)算法的缺陷也比較明顯,那就是存在嚴(yán)重的臨界問題。由于每過一個(gè)時(shí)間窗口,計(jì)數(shù)器就會清零,這使得限流效果不夠平滑,惡意用戶可以利用這個(gè)特點(diǎn)繞過我們的限流規(guī)則。如下圖所示,我們的限流條件本來是每分鐘 5 次,但是惡意用戶在 11:00:00 ~ 11:00:59 這個(gè)時(shí)間窗口的后半分鐘發(fā)起 5 次請求,接下來又在 11:01:00 ~ 11:01:59 這個(gè)時(shí)間窗口的前半分鐘發(fā)起 5 次請求,這樣我們的系統(tǒng)就在 1 分鐘內(nèi)承受了 10 次請求。
2.2 滑動窗口算法(Rolling Window 或 Sliding Window)
為了解決固定窗口算法的臨界問題,可以將時(shí)間窗口劃分成更小的時(shí)間窗口,然后隨著時(shí)間的滑動刪除相應(yīng)的小窗口,而不是直接滑過一個(gè)大窗口,這就是滑動窗口算法。我們?yōu)槊總€(gè)小時(shí)間窗口都設(shè)置一個(gè)計(jì)數(shù)器,大時(shí)間窗口的總請求次數(shù)就是每個(gè)小時(shí)間窗口的計(jì)數(shù)器的和。如下圖所示,我們的時(shí)間窗口是 5 秒,可以按秒進(jìn)行劃分,將其劃分成 5 個(gè)小窗口,時(shí)間每過一秒,時(shí)間窗口就滑過一秒:
2.3 漏桶算法(Leaky Bucket)
除了計(jì)數(shù)器算法,另一個(gè)很自然的限流思路是將所有的請求緩存到一個(gè)隊(duì)列中,然后按某個(gè)固定的速度慢慢處理,這其實(shí)就是漏桶算法(Leaky Bucket)。漏桶算法假設(shè)將請求裝到一個(gè)桶中,桶的容量為 M,當(dāng)桶滿時(shí),請求被丟棄。在桶的底部有一個(gè)洞,桶中的請求像水一樣按固定的速度(每秒 r 個(gè))漏出來。我們用下面這個(gè)形象的圖來表示漏桶算法:

2.4 令牌桶算法(Token Bucket)
令牌桶算法(Token Bucket)是目前應(yīng)用最廣泛的一種限流算法,它的基本思想由兩部分組成:生成令牌?和?消費(fèi)令牌。- 生成令牌:假設(shè)有一個(gè)裝令牌的桶,最多能裝 M 個(gè),然后按某個(gè)固定的速度(每秒 r 個(gè))往桶中放入令牌,桶滿時(shí)不再放入;
- 消費(fèi)令牌:我們的每次請求都需要從桶中拿一個(gè)令牌才能放行,當(dāng)桶中沒有令牌時(shí)即觸發(fā)限流,這時(shí)可以將請求放入一個(gè)緩沖隊(duì)列中排隊(duì)等待,或者直接拒絕;

2
3????private?final?long?capacity;
4????private?final?double?refillTokensPerOneMillis;
5????private?double?availableTokens;
6????private?long?lastRefillTimestamp;
7
8????public?TokenBucket(long?capacity,?long?refillTokens,?long?refillPeriodMillis)?{
9????????this.capacity?=?capacity;
10????????this.refillTokensPerOneMillis?=?(double)?refillTokens?/?(double)?refillPeriodMillis;
11????????this.availableTokens?=?capacity;
12????????this.lastRefillTimestamp?=?System.currentTimeMillis();
13????}
14
15????synchronized?public?boolean?tryConsume(int?numberTokens)?{
16????????refill();
17????????if?(availableTokens?18????????????return?false;
19????????}?else?{
20????????????availableTokens?-=?numberTokens;
21????????????return?true;
22????????}
23????}
24
25????private?void?refill()?{
26????????long?currentTimeMillis?=?System.currentTimeMillis();
27????????if?(currentTimeMillis?>?lastRefillTimestamp)?{
28????????????long?millisSinceLastRefill?=?currentTimeMillis?-?lastRefillTimestamp;
29????????????double?refill?=?millisSinceLastRefill?*?refillTokensPerOneMillis;
30????????????this.availableTokens?=?Math.min(capacity,?availableTokens? ?refill);
31????????????this.lastRefillTimestamp?=?currentTimeMillis;
32????????}
33????}
34}
可以像下面這樣創(chuàng)建一個(gè)令牌桶(桶大小為 100,且每秒生成 100 個(gè)令牌):1TokenBucket?limiter?=?new?TokenBucket(100,?100,?1000);
從上面的代碼片段可以看出,令牌桶算法的實(shí)現(xiàn)非常簡單也非常高效,僅僅通過幾個(gè)變量的運(yùn)算就實(shí)現(xiàn)了完整的限流功能。核心邏輯在于 refill()?這個(gè)方法,在每次消費(fèi)令牌時(shí),計(jì)算當(dāng)前時(shí)間和上一次填充的時(shí)間差,并根據(jù)填充速度計(jì)算出應(yīng)該填充多少令牌。在重新填充令牌后,再判斷請求的令牌數(shù)是否足夠,如果不夠,返回 false,如果足夠,則減去令牌數(shù),并返回 true。在實(shí)際的應(yīng)用中,往往不會直接使用這種原始的令牌桶算法,一般會在它的基礎(chǔ)上作一些改進(jìn),比如,填充速率支持動態(tài)調(diào)整,令牌總數(shù)支持透支,基于 Redis 支持分布式限流等,不過總體來說還是符合令牌桶算法的整體框架,我們在后面學(xué)習(xí)一些開源項(xiàng)目時(shí)對此會有更深的體會。
三、一些開源項(xiàng)目
有很多開源項(xiàng)目中都實(shí)現(xiàn)了限流的功能,這一節(jié)通過一些開源項(xiàng)目的學(xué)習(xí),了解限流是如何實(shí)現(xiàn)的。3.1 Guava 的 RateLimiter
Google Guava 是一個(gè)強(qiáng)大的核心庫,包含了很多有用的工具類,例如:集合、緩存、并發(fā)庫、字符串處理、I/O 等等。其中在并發(fā)庫中,Guava 提供了兩個(gè)和限流相關(guān)的類:RateLimiter 和 SmoothRateLimiter。Guava 的 RateLimiter 基于令牌桶算法實(shí)現(xiàn),不過在傳統(tǒng)的令牌桶算法基礎(chǔ)上做了點(diǎn)改進(jìn),支持兩種不同的限流方式:平滑突發(fā)限流(SmoothBursty)?和?平滑預(yù)熱限流(SmoothWarmingUp)。下面的方法可以創(chuàng)建一個(gè)平滑突發(fā)限流器(SmoothBursty):1RateLimiter?limiter?=?RateLimiter.create(5);
RateLimiter.create(5)?表示這個(gè)限流器容量為 5,并且每秒生成 5 個(gè)令牌,也就是每隔 200 毫秒生成一個(gè)。我們可以使用 limiter.acquire()?消費(fèi)令牌,如果桶中令牌足夠,返回 0,如果令牌不足,則阻塞等待,并返回等待的時(shí)間。我們連續(xù)請求幾次:1System.out.println(limiter.acquire());
2System.out.println(limiter.acquire());
3System.out.println(limiter.acquire());
4System.out.println(limiter.acquire());
輸出結(jié)果如下:10.0
20.198239
30.196083
40.200609
可以看出限流器創(chuàng)建之后,初始會有一個(gè)令牌,然后每隔 200 毫秒生成一個(gè)令牌,所以第一次請求直接返回 0,后面的請求都會阻塞大約 200 毫秒。另外,SmoothBursty 還具有應(yīng)對突發(fā)的能力,而且?還允許消費(fèi)未來的令牌,比如下面的例子:1RateLimiter?limiter?=?RateLimiter.create(5);
2System.out.println(limiter.acquire(10));
3System.out.println(limiter.acquire(1));
4System.out.println(limiter.acquire(1));
會得到類似下面的輸出:10.0
21.997428
30.192273
40.200616
限流器創(chuàng)建之后,初始令牌只有一個(gè),但是我們請求 10 個(gè)令牌竟然也通過了,只不過看后面請求發(fā)現(xiàn),第二次請求花了 2 秒左右的時(shí)間把前面的透支的令牌給補(bǔ)上了。Guava 支持的另一種限流方式是平滑預(yù)熱限流器(SmoothWarmingUp),可以通過下面的方法創(chuàng)建:
1RateLimiter?limiter?=?RateLimiter.create(2,?3,?TimeUnit.SECONDS);
2System.out.println(limiter.acquire(1));
3System.out.println(limiter.acquire(1));
4System.out.println(limiter.acquire(1));
5System.out.println(limiter.acquire(1));
6System.out.println(limiter.acquire(1));
第一個(gè)參數(shù)還是每秒創(chuàng)建的令牌數(shù)量,這里是每秒 2 個(gè),也就是每 500 毫秒生成一個(gè),后面的參數(shù)表示從冷啟動速率過渡到平均速率的時(shí)間間隔,也就是所謂的熱身時(shí)間間隔(warm up period)。我們看下輸出結(jié)果:10.0
21.329289
30.994375
40.662888
50.501287
第一個(gè)請求還是立即得到令牌,但是后面的請求和上面平滑突發(fā)限流就完全不一樣了,按理來說 500 毫秒就會生成一個(gè)令牌,但是我們發(fā)現(xiàn)第二個(gè)請求卻等了 1.3s,而不是 0.5s,后面第三個(gè)和第四個(gè)請求也等了一段時(shí)間。不過可以看出,等待時(shí)間在慢慢的接近 0.5s,直到第五個(gè)請求等待時(shí)間才開始變得正常。從第一個(gè)請求到第五個(gè)請求,這中間的時(shí)間間隔就是熱身階段,可以算出熱身的時(shí)間就是我們設(shè)置的 3 秒。3.2 Bucket4j
Bucket4j是一個(gè)基于令牌桶算法實(shí)現(xiàn)的強(qiáng)大的限流庫,它不僅支持單機(jī)限流,還支持通過諸如 Hazelcast、Ignite、Coherence、Infinispan 或其他兼容 JCache API (JSR 107)?規(guī)范的分布式緩存實(shí)現(xiàn)分布式限流。在使用 Bucket4j 之前,我們有必要先了解 Bucket4j 中的幾個(gè)核心概念:- Bucket
- Bandwidth
- Refill
1Bucket?bucket?=?Bucket4j.builder().addLimit(limit).build();
2if(bucket.tryConsume(1))?{
3????System.out.println("ok");
4}?else?{
5????System.out.println("error");
6}
Bandwidth 的意思是帶寬, 可以理解為限流的規(guī)則。Bucket4j 提供了兩種方法來創(chuàng)建 Bandwidth:simple 和 classic。下面是 simple 方式創(chuàng)建的 Bandwidth,表示桶大小為 10,填充速度為每分鐘 10 個(gè)令牌:1Bandwidth?limit?=?Bandwidth.simple(10,?Duration.ofMinutes(1));
simple方式桶大小和填充速度是一樣的,classic 方式更靈活一點(diǎn),可以自定義填充速度,下面的例子表示桶大小為 10,填充速度為每分鐘 5 個(gè)令牌:1Refill?filler?=?Refill.greedy(5,?Duration.ofMinutes(1));
2Bandwidth?limit?=?Bandwidth.classic(10,?filler);
其中,Refill 用于填充令牌桶,可以通過它定義填充速度,Bucket4j 有兩種填充令牌的策略:間隔策略(intervally)?和?貪婪策略(greedy)。在上面的例子中我們使用的是貪婪策略,如果使用間隔策略可以像下面這樣創(chuàng)建 Refill:1Refill?filler?=?Refill.intervally(5,?Duration.ofMinutes(1));
所謂間隔策略指的是每隔一段時(shí)間,一次性的填充所有令牌,比如上面的例子,會每隔一分鐘,填充 5 個(gè)令牌,如下所示:

- 基于令牌桶算法
- 高性能,無鎖實(shí)現(xiàn)
- 不存在精度問題,所有計(jì)算都是基于整型的
- 支持通過符合 JCache API 規(guī)范的分布式緩存系統(tǒng)實(shí)現(xiàn)分布式限流
- 支持為每個(gè) Bucket 設(shè)置多個(gè) Bandwidth
- 支持同步和異步 API
- 支持可插拔的監(jiān)聽 API,用于集成監(jiān)控和日志
- 不僅可以用于限流,還可以用于簡單的調(diào)度
3.3 Resilience4j
Resilience4j 是一款輕量級、易使用的高可用框架。用過 Spring Cloud 早期版本的同學(xué)肯定都聽過 Netflix Hystrix,Resilience4j 的設(shè)計(jì)靈感就來自于它。自從 Hystrix 停止維護(hù)之后,官方也推薦大家使用 Resilience4j 來代替 Hystrix。
1//?創(chuàng)建一個(gè)?Bulkhead,最大并發(fā)量為?150
2BulkheadConfig?bulkheadConfig?=?BulkheadConfig.custom()
3????.maxConcurrentCalls(150)
4????.maxWaitTime(100)
5????.build();
6Bulkhead?bulkhead?=?Bulkhead.of("backendName",?bulkheadConfig);
7
8//?創(chuàng)建一個(gè)?RateLimiter,每秒允許一次請求
9RateLimiterConfig?rateLimiterConfig?=?RateLimiterConfig.custom()
10????.timeoutDuration(Duration.ofMillis(100))
11????.limitRefreshPeriod(Duration.ofSeconds(1))
12????.limitForPeriod(1)
13????.build();
14RateLimiter?rateLimiter?=?RateLimiter.of("backendName",?rateLimiterConfig);
15
16//?使用?Bulkhead?和?RateLimiter?裝飾業(yè)務(wù)邏輯
17Supplier?supplier?=?()?->?backendService.doSomething();
18Supplier?decoratedSupplier?=?Decorators.ofSupplier(supplier)
19??.withBulkhead(bulkhead)
20??.withRateLimiter(rateLimiter)
21??.decorate();
22
23//?調(diào)用業(yè)務(wù)邏輯
24Try?try?=?Try.ofSupplier(decoratedSupplier);
25assertThat(try.isSuccess()).isTrue();
Resilience4j 在功能特性上比 Bucket4j 強(qiáng)大不少,而且還支持并發(fā)量限流。不過最大的遺憾是,Resilience4j 不支持分布式限流。3.4 其他
網(wǎng)上還有很多限流相關(guān)的開源項(xiàng)目,不可能一一介紹,這里列出來的只是冰山之一角:- https://github.com/mokies/ratelimitj
- https://github.com/wangzheng0822/ratelimiter4j
- https://github.com/wukq/rate-limiter
- https://github.com/marcosbarbero/spring-cloud-zuul-ratelimit
- https://github.com/onblog/SnowJena
- https://gitee.com/zhanghaiyang/spring-boot-starter-current-limiting
- https://github.com/Netflix/concurrency-limits
四、在網(wǎng)關(guān)中實(shí)現(xiàn)限流
在文章一開始介紹 Spring Cloud Gateway 的特性時(shí),我們注意到其中有一條 Request Rate Limiting,說明網(wǎng)關(guān)自帶了限流的功能,但是 Spring Cloud Gateway 自帶的限流有很多限制,譬如不支持單機(jī)限流,不支持并發(fā)量限流,而且它的請求頻率限流也是不盡人意,這些都需要我們自己動手來解決。4.1 實(shí)現(xiàn)單機(jī)請求頻率限流
Spring Cloud Gateway 中定義了關(guān)于限流的一個(gè)接口 RateLimiter,如下:1public?interface?RateLimiter<C>?extends?StatefulConfigurable<C>?{
2????Mono?isAllowed(String?routeId,?String?id);
3}
這個(gè)接口就一個(gè)方法 isAllowed,第一個(gè)參數(shù) routeId 表示請求路由的 ID,根據(jù) routeId 可以獲取限流相關(guān)的配置,第二個(gè)參數(shù) id 表示要限流的對象的唯一標(biāo)識,可以是用戶名,也可以是 IP,或者其他的可以從 ServerWebExchange 中得到的信息。我們看下 RequestRateLimiterGatewayFilterFactory 中對 isAllowed 的調(diào)用邏輯: 1@Override
2public?GatewayFilter?apply(Config?config)?{
3????//?從配置中得到?KeyResolver
4????KeyResolver?resolver?=?getOrDefault(config.keyResolver,?defaultKeyResolver);
5????//?從配置中得到?RateLimiter
6????RateLimiter
從上面的的邏輯可以看出,通過實(shí)現(xiàn) KeyResolver 接口的 resolve 方法就可以自定義要限流的對象了。1public?interface?KeyResolver?{
2????Mono?resolve(ServerWebExchange?exchange) ;
3}
比如下面的?HostAddrKeyResolver?可以根據(jù) IP 來限流: 1public?interface?KeyResolver?{
2????Mono?resolve(ServerWebExchange?exchange) ;
3}
4比如下面的 HostAddrKeyResolver 可以根據(jù) IP 來限流:
5public?class?HostAddrKeyResolver?implements?KeyResolver?{
6????@Override
7????public?Mono?resolve(ServerWebExchange?exchange)? {
8????????return?Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
9????}
10}
我們繼續(xù)看 Spring Cloud Gateway 的代碼發(fā)現(xiàn),RateLimiter 接口只提供了一個(gè)實(shí)現(xiàn)類 RedisRateLimiter:
1public?Mono?isAllowed(String?routeId,?String?id)? {
2????Config?routeConfig?=?loadConfiguration(routeId);
3
4????//?How?many?requests?per?second?do?you?want?a?user?to?be?allowed?to?do?
5????int?replenishRate?=?routeConfig.getReplenishRate();
6
7????//?How?many?seconds?for?a?token?refresh?
8????int?refreshPeriod?=?routeConfig.getRefreshPeriod();
9
10????//?How?many?tokens?are?requested?per?request?
11????int?requestedTokens?=?routeConfig.getRequestedTokens();
12
13????final?io.github.resilience4j.ratelimiter.RateLimiter?rateLimiter?=?RateLimiterRegistry
14????????????.ofDefaults()
15????????????.rateLimiter(id,?createRateLimiterConfig(refreshPeriod,?replenishRate));
16
17????final?boolean?allowed?=?rateLimiter.acquirePermission(requestedTokens);
18????final?Long?tokensLeft?=?(long)?rateLimiter.getMetrics().getAvailablePermissions();
19
20????Response?response?=?new?Response(allowed,?getHeaders(routeConfig,?tokensLeft));
21????return?Mono.just(response);
22}
有意思的是,這個(gè)類?還有一個(gè)早期版本,是基于 Bucket4j 實(shí)現(xiàn)的: 1public?Mono?isAllowed(String?routeId,?String?id)? {
2
3????Config?routeConfig?=?loadConfiguration(routeId);
4
5????//?How?many?requests?per?second?do?you?want?a?user?to?be?allowed?to?do?
6????int?replenishRate?=?routeConfig.getReplenishRate();
7
8????//?How?much?bursting?do?you?want?to?allow?
9????int?burstCapacity?=?routeConfig.getBurstCapacity();
10
11????//?How?many?tokens?are?requested?per?request?
12????int?requestedTokens?=?routeConfig.getRequestedTokens();
13
14????final?Bucket?bucket?=?bucketMap.computeIfAbsent(id,
15????????????(key)?->?createBucket(replenishRate,?burstCapacity));
16
17????final?boolean?allowed?=?bucket.tryConsume(requestedTokens);
18
19????Response?response?=?new?Response(allowed,
20????????????getHeaders(routeConfig,?bucket.getAvailableTokens()));
21????return?Mono.just(response);
22}
實(shí)現(xiàn)方式都是類似的,在上面對 Bucket4j 和 Resilience4j 已經(jīng)作了比較詳細(xì)的介紹,這里不再贅述。不過從這里也可以看出 Spring 生態(tài)圈對 Resilience4j 是比較看好的,我們也可以將其引入到我們的項(xiàng)目中。4.2 實(shí)現(xiàn)分布式請求頻率限流
上面介紹了如何實(shí)現(xiàn)單機(jī)請求頻率限流,接下來再看下分布式請求頻率限流。這個(gè)就比較簡單了,因?yàn)樯厦嬲f了,Spring Cloud Gateway 自帶了一個(gè)限流實(shí)現(xiàn),就是 RedisRateLimiter,可以用于分布式限流。它的實(shí)現(xiàn)原理依然是基于令牌桶算法的,不過實(shí)現(xiàn)邏輯是放在一段 lua 腳本中的,我們可以在 src/main/resources/META-INF/scripts 目錄下找到該腳本文件 request_rate_limiter.lua: 1local?tokens_key?=?KEYS[1]
2local?timestamp_key?=?KEYS[2]
3
4local?rate?=?tonumber(ARGV[1])
5local?capacity?=?tonumber(ARGV[2])
6local?now?=?tonumber(ARGV[3])
7local?requested?=?tonumber(ARGV[4])
8
9local?fill_time?=?capacity/rate
10local?ttl?=?math.floor(fill_time*2)
11
12local?last_tokens?=?tonumber(redis.call("get",?tokens_key))
13if?last_tokens?==?nil?then
14??last_tokens?=?capacity
15end
16
17local?last_refreshed?=?tonumber(redis.call("get",?timestamp_key))
18if?last_refreshed?==?nil?then
19??last_refreshed?=?0
20end
21
22local?delta?=?math.max(0,?now-last_refreshed)
23local?filled_tokens?=?math.min(capacity,?last_tokens (delta*rate))
24local?allowed?=?filled_tokens?>=?requested
25local?new_tokens?=?filled_tokens
26local?allowed_num?=?0
27if?allowed?then
28??new_tokens?=?filled_tokens?-?requested
29??allowed_num?=?1
30end
31
32if?ttl?>?0?then
33??redis.call("setex",?tokens_key,?ttl,?new_tokens)
34??redis.call("setex",?timestamp_key,?ttl,?now)
35end
36
37return?{?allowed_num,?new_tokens?}
這段代碼和上面介紹令牌桶算法時(shí)用 Java 實(shí)現(xiàn)的那段經(jīng)典代碼幾乎是一樣的。這里使用 lua 腳本,主要是利用了 Redis 的單線程特性,以及執(zhí)行 lua 腳本的原子性,避免了并發(fā)訪問時(shí)可能出現(xiàn)請求量超出上限的現(xiàn)象。想象目前令牌桶中還剩 1 個(gè)令牌,此時(shí)有兩個(gè)請求同時(shí)到來,判斷令牌是否足夠也是同時(shí)的,兩個(gè)請求都認(rèn)為還剩 1 個(gè)令牌,于是兩個(gè)請求都被允許了。有兩種方式來配置 Spring Cloud Gateway 自帶的限流。第一種方式是通過配置文件,比如下面所示的代碼,可以對某個(gè) route 進(jìn)行限流: 1spring:
2??cloud:
3????gateway:
4??????routes:
5??????-?id:?test
6????????uri:?http://httpbin.org:80/get
7????????filters:
8????????-?name:?RequestRateLimiter
9??????????args:
10????????????key-resolver:?'#{@hostAddrKeyResolver}'
11????????????redis-rate-limiter.replenishRate:?1
12????????????redis-rate-limiter.burstCapacity:?3
其中,key-resolver 使用 SpEL 表達(dá)式?#{@beanName}?從 Spring 容器中獲取 hostAddrKeyResolver 對象,burstCapacity 表示令牌桶的大小,replenishRate 表示每秒往桶中填充多少個(gè)令牌,也就是填充速度。第二種方式是通過下面的代碼來配置: 1@Bean
2public?RouteLocator?myRoutes(RouteLocatorBuilder?builder)?{
3??return?builder.routes()
4????.route(p?->?p
5??????.path("/get")
6??????.filters(filter?->?filter.requestRateLimiter()
7????????.rateLimiter(RedisRateLimiter.class,?rl?->?rl.setBurstCapacity(3).setReplenishRate(1)).and())
8??????.uri("http://httpbin.org:80"))
9????.build();
10}
這樣就可以對某個(gè) route 進(jìn)行限流了。但是這里有一點(diǎn)要注意,Spring Cloud Gateway 自帶的限流器有一個(gè)很大的坑,replenishRate 不支持設(shè)置小數(shù),也就是說往桶中填充的 token 的速度最少為每秒 1 個(gè),所以,如果我的限流規(guī)則是每分鐘 10 個(gè)請求(按理說應(yīng)該每 6 秒填充一次,或每秒填充 1/6 個(gè) token),這種情況 Spring Cloud Gateway 就沒法正確的限流。網(wǎng)上也有人提了 issue,support greater than a second resolution for the rate limiter,但還沒有得到解決。4.3 實(shí)現(xiàn)單機(jī)并發(fā)量限流
上面學(xué)習(xí) Resilience4j 的時(shí)候,我們提到了 Resilience4j 的一個(gè)功能特性,叫?隔離(Bulkhead)。Bulkhead 這個(gè)單詞的意思是船的艙壁,利用艙壁可以將不同的船艙隔離起來,這樣如果一個(gè)船艙破損進(jìn)水,那么只損失這一個(gè)船艙,其它船艙可以不受影響。借鑒造船行業(yè)的經(jīng)驗(yàn),這種模式也被引入到軟件行業(yè),我們把它叫做?艙壁模式(Bulkhead pattern)。艙壁模式一般用于服務(wù)隔離,對于一些比較重要的系統(tǒng)資源,如 CPU、內(nèi)存、連接數(shù)等,可以為每個(gè)服務(wù)設(shè)置各自的資源限制,防止某個(gè)異常的服務(wù)把系統(tǒng)的所有資源都消耗掉。這種服務(wù)隔離的思想同樣可以用來做并發(fā)量限流。正如前文所述,Resilience4j 提供了兩種 Bulkhead 的實(shí)現(xiàn):SemaphoreBulkhead 和 ThreadPoolBulkhead,這也正是艙壁模式常見的兩種實(shí)現(xiàn)方案:一種是帶計(jì)數(shù)的信號量,一種是固定大小的線程池??紤]到多線程場景下的線程切換成本,默認(rèn)推薦使用信號量。在操作系統(tǒng)基礎(chǔ)課程中,我們學(xué)習(xí)過兩個(gè)名詞:互斥量(Mutex)?和?信號量(Semaphores)?;コ饬坑糜诰€程的互斥,它和臨界區(qū)有點(diǎn)相似,只有擁有互斥對象的線程才有訪問資源的權(quán)限,由于互斥對象只有一個(gè),因此任何情況下只會有一個(gè)線程在訪問此共享資源,從而保證了多線程可以安全的訪問和操作共享資源。而信號量是用于線程的同步,這是由荷蘭科學(xué)家 E.W.Dijkstra 提出的概念,它和互斥量不同,信號允許多個(gè)線程同時(shí)使用共享資源,但是它同時(shí)設(shè)定了訪問共享資源的線程最大數(shù)目,從而可以進(jìn)行并發(fā)量控制。下面是使用信號量限制并發(fā)訪問的一個(gè)簡單例子: 1public?class?SemaphoreTest?{
2
3????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(100);
4????private?static?Semaphore?semaphore?=?new?Semaphore(10);
5
6????public?static?void?main(String[]?args)?{
7????????for?(int?i?=?0;?i?100;?i )?{
8????????????threadPool.execute(new?Runnable()?{
9????????????????@Override
10????????????????public?void?run()?{
11????????????????????try?{
12????????????????????????semaphore.acquire();
13????????????????????????System.out.println("Request?processing?...");
14????????????????????????semaphore.release();
15????????????????????}?catch?(InterruptedException?e)?{
16????????????????????????e.printStack();
17????????????????????}
18????????????????}
19????????????});
20????????}
21????????threadPool.shutdown();
22????}
23}
這里我們創(chuàng)建了 100 個(gè)線程同時(shí)執(zhí)行,但是由于信號量計(jì)數(shù)為 10,所以同時(shí)只能有 10 個(gè)線程在處理請求。說到計(jì)數(shù),實(shí)際上,在 Java 里除了 Semaphore 還有很多類也可以用作計(jì)數(shù),比如 AtomicLong 或 LongAdder,這在并發(fā)量限流中非常常見,只是無法提供像信號量那樣的阻塞能力: 1public?class?AtomicLongTest?{
2
3????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(100);
4????private?static?AtomicLong?atomic?=?new?AtomicLong();
5
6????public?static?void?main(String[]?args)?{
7????????for?(int?i?=?0;?i?100;?i )?{
8????????????threadPool.execute(new?Runnable()?{
9????????????????@Override
10????????????????public?void?run()?{
11????????????????????try?{
12????????????????????????if(atomic.incrementAndGet()?>?10)?{
13????????????????????????????System.out.println("Request?rejected?...");
14????????????????????????????return;
15????????????????????????}
16????????????????????????System.out.println("Request?processing?...");
17????????????????????????atomic.decrementAndGet();
18????????????????????}?catch?(InterruptedException?e)?{
19????????????????????????e.printStack();
20????????????????????}
21????????????????}
22????????????});
23????????}
24????????threadPool.shutdown();
25????}
26}
4.4 實(shí)現(xiàn)分布式并發(fā)量限流通過在單機(jī)實(shí)現(xiàn)并發(fā)量限流,我們掌握了幾種常用的手段:信號量、線程池、計(jì)數(shù)器,這些都是單機(jī)上的概念。那么稍微拓展下,如果能實(shí)現(xiàn)分布式信號量、分布式線程池、分布式計(jì)數(shù)器,那么實(shí)現(xiàn)分布式并發(fā)量限流不就易如反掌了嗎?關(guān)于分布式線程池,是我自己杜撰的詞,在網(wǎng)上并沒有找到類似的概念,比較接近的概念是資源調(diào)度和分發(fā),但是又感覺不像,這里直接忽略吧。關(guān)于分布式信號量,還真有這樣的東西,比如 Apache Ignite 就提供了 IgniteSemaphore 用于創(chuàng)建分布式信號量,它的使用方式和 Semaphore 非常類似。使用 Redis 的 ZSet 也可以實(shí)現(xiàn)分布式信號量,比如?這篇博客介紹的方法,還有《Redis in Action》這本電子書中也提到了這樣的例子,教你如何實(shí)現(xiàn) Counting semaphores。另外,Redisson 也實(shí)現(xiàn)了基于 Redis 的分布式信號量 RSemaphore,用法也和 Semaphore 類似。使用分布式信號量可以很容易實(shí)現(xiàn)分布式并發(fā)量限流,實(shí)現(xiàn)方式和上面的單機(jī)并發(fā)量限流幾乎是一樣的。最后,關(guān)于分布式計(jì)數(shù)器,實(shí)現(xiàn)方案也是多種多樣。比如使用 Redis 的 INCR 就很容易實(shí)現(xiàn),更有甚者,使用 MySQL 數(shù)據(jù)庫也可以實(shí)現(xiàn)。只不過使用計(jì)數(shù)器要注意操作的原子性,每次請求時(shí)都要經(jīng)過這三步操作:取計(jì)數(shù)器當(dāng)前的值、判斷是否超過閾值,超過則拒絕、將計(jì)數(shù)器的值自增。這其實(shí)和信號量的 P 操作是一樣的,而釋放就對應(yīng) V 操作。所以,利用分布式信號量和計(jì)數(shù)器就可以實(shí)現(xiàn)并發(fā)量限流了嗎?問題當(dāng)然沒有這么簡單。實(shí)際上,上面通過信號量和計(jì)數(shù)器實(shí)現(xiàn)單機(jī)并發(fā)量限流的代碼片段有一個(gè)嚴(yán)重 BUG:1semaphore.acquire();
2System.out.println("Request?processing?...");
3semaphore.release();
想象一下如果在處理請求時(shí)出現(xiàn)異常了會怎么樣?很顯然,信號量被該線程獲取了,但是卻永遠(yuǎn)不會釋放,如果請求異常多了,這將導(dǎo)致信號量被占滿,最后一個(gè)請求也進(jìn)不來。在單機(jī)場景下,這個(gè)問題可以很容易解決,加一個(gè) finally 就行了:1try?{
2????semaphore.acquire();
3????System.out.println("Request?processing?...");
4}?catch?(InterruptedException?e)?{
5????e.printStack();
6}?finally?{
7????semaphore.release();
8}
由于無論出現(xiàn)何種異常,finally 中的代碼一定會執(zhí)行,這樣就保證了信號量一定會被釋放。但是在分布式系統(tǒng)中,就不是加一個(gè) finally 這么簡單了。這是因?yàn)樵诜植际较到y(tǒng)中可能存在的異常不一定是可被捕獲的代碼異常,還有可能是服務(wù)崩潰或者不可預(yù)知的系統(tǒng)宕機(jī),就算是正常的服務(wù)重啟也可能導(dǎo)致分布式信號量無法釋放。對于這個(gè)問題,我和幾個(gè)同事連續(xù)討論了幾個(gè)晚上,想出了兩種解決方法:第一種方法是使用帶 TTL 的計(jì)數(shù)器,第二種方法是基于雙窗口滑動的一種比較 tricky 的算法。第一種方法比較容易理解,我們?yōu)槊總€(gè)請求賦予一個(gè)唯一 ID,并在 Redis 里寫入一個(gè)鍵值對,key 為 requests_xxx(xxx 為請求 ID),value 為 1,并給這個(gè) key 設(shè)置一個(gè) TTL(如果你的應(yīng)用中存在耗時(shí)非常長的請求,譬如對于一些 WebSockket 請求可能會持續(xù)幾個(gè)小時(shí),還需要開一個(gè)線程定期去刷新這個(gè) key 的 TTL)。然后在判斷并發(fā)量時(shí),使用 KEYS 命令查詢 requests_*?開頭的 key 的個(gè)數(shù),就可以知道當(dāng)前一共有多少個(gè)請求,如果超過并發(fā)量上限則拒絕請求。這種方法可以很好的應(yīng)對服務(wù)崩潰或重啟的問題,由于每個(gè) key 都設(shè)置了 TTL,所以經(jīng)過一段時(shí)間后,這些 key 就會自動消失,就不會出現(xiàn)信號量占滿不釋放的情況了。但是這里使用 KEYS 命令查詢請求個(gè)數(shù)是一個(gè)非常低效的做法,在請求量比較多的情況下,網(wǎng)關(guān)的性能會受到嚴(yán)重影響。我們可以把 KEYS 命令換成 SCAN,性能會得到些許提升,但總體來說效果還是很不理想的。針對第一種方法,我們可以進(jìn)一步優(yōu)化,不用為每個(gè)請求寫一個(gè)鍵值對,而是為每個(gè)分布式系統(tǒng)中的每個(gè)實(shí)例賦予一個(gè)唯一 ID,并在 Redis 里寫一個(gè)鍵值對,key 為 instances_xxx(xxx 為實(shí)例 ID),value 為這個(gè)實(shí)例當(dāng)前的并發(fā)量。同樣的,我們?yōu)檫@個(gè) key 設(shè)置一個(gè) TTL,并且開啟一個(gè)線程定期去刷新這個(gè) TTL。每接受一個(gè)請求后,計(jì)數(shù)器加一,請求結(jié)束,計(jì)數(shù)器減一,這和單機(jī)場景下的處理方式一樣,只不過在判斷并發(fā)量時(shí),還是需要使用 KEYS 或 SCAN 獲取所有的實(shí)例,并計(jì)算出并發(fā)量的總和。不過由于實(shí)例個(gè)數(shù)是有限的,性能比之前的做法有了明顯的提升。第二種方法我稱之為?雙窗口滑動算法,結(jié)合了 TTL 計(jì)數(shù)器和滑動窗口算法。我們按分鐘來設(shè)置一個(gè)時(shí)間窗口,在 Redis 里對應(yīng) 202009051130?這樣的一個(gè) key,value 為計(jì)數(shù)器,表示請求的數(shù)量。當(dāng)接受一個(gè)請求后,在當(dāng)前的時(shí)間窗口中加一,當(dāng)請求結(jié)束,在當(dāng)前的時(shí)間窗口中減一,注意,接受請求和請求結(jié)束的時(shí)間窗口可能不是同一個(gè)。另外,我們還需要一個(gè)本地列表來記錄當(dāng)前實(shí)例正在處理的所有請求和請求對應(yīng)的時(shí)間窗口,并通過一個(gè)小于時(shí)間窗口的定時(shí)線程(如 30 秒)來遷移過期的請求,所謂過期,指的是請求的時(shí)間窗口和當(dāng)前時(shí)間窗口不一致。那么具體如何遷移呢?我們首先需要統(tǒng)計(jì)列表中一共有多少請求過期了,然后將列表中的過期請求時(shí)間更新為當(dāng)前時(shí)間窗口,并從 Redis 中上一個(gè)時(shí)間窗口移動相應(yīng)數(shù)量到當(dāng)前時(shí)間窗口,也就是上一個(gè)時(shí)間窗口減 X,當(dāng)前時(shí)間窗口加 X。由于遷移線程定期執(zhí)行,所以過期的請求總是會被移動到當(dāng)前窗口,最終 Redis 中只有當(dāng)前時(shí)間窗口和上個(gè)時(shí)間窗口這兩個(gè)時(shí)間窗口中有數(shù)據(jù),再早一點(diǎn)的窗口時(shí)間中的數(shù)據(jù)會被往后遷移,所以可以給這個(gè) key 設(shè)置一個(gè) 3 分鐘或 5 分鐘的 TTL。判斷并發(fā)量時(shí),由于只有兩個(gè) key,只需要使用 MGET 獲取兩個(gè)值相加即可。下面的流程圖詳細(xì)描述了算法的運(yùn)行過程:
- 請求結(jié)束時(shí),直接在 Redis 中當(dāng)前時(shí)間窗口減一即可,就算是負(fù)數(shù)也沒關(guān)系。請求列表中的該請求不用急著刪除,可以打上結(jié)束標(biāo)記,在遷移線程中統(tǒng)一刪除(當(dāng)然,如果請求的開始時(shí)間和結(jié)束時(shí)間在同一個(gè)窗口,可以直接刪除);
- 遷移的時(shí)間間隔要小于時(shí)間窗口,一般設(shè)置為 30s;
- Redis 中的 key 一定要設(shè)置 TTL,時(shí)間至少為 2 個(gè)時(shí)間窗口,一般設(shè)置為 3 分鐘;
- 遷移過程涉及到“從上一個(gè)時(shí)間窗口減”和“在當(dāng)前時(shí)間窗口加”兩個(gè)操作,要注意操作的原子性;
- 獲取當(dāng)前并發(fā)量可以通過 MGET 一次性讀取兩個(gè)時(shí)間窗口的值,不用 GET 兩次;
- 獲取并發(fā)量和判斷并發(fā)量是否超限,這個(gè)過程也要注意操作的原子性。
總結(jié)
網(wǎng)關(guān)作為微服務(wù)架構(gòu)中的重要一環(huán),充當(dāng)著一夫當(dāng)關(guān)萬夫莫開的角色,所以對網(wǎng)關(guān)服務(wù)的穩(wěn)定性要求和性能要求都非常高。為保證網(wǎng)關(guān)服務(wù)的穩(wěn)定性,一代又一代的程序員們前仆后繼,想出了十八般武藝:限流、熔斷、隔離、緩存、降級、等等等等。這篇文章從限流入手,詳細(xì)介紹了限流的場景和算法,以及源碼實(shí)現(xiàn)和可能踩到的坑。盡管限流只是網(wǎng)關(guān)的一個(gè)非常小的功能,但卻影響到網(wǎng)關(guān)的方方面面,在系統(tǒng)架構(gòu)的設(shè)計(jì)中至關(guān)重要。雖然我試著從不同的角度希望把限流介紹的更完全,但終究是管中窺豹,只見一斑,還有很多的內(nèi)容沒有介紹到,比如阿里開源的 Sentinel 組件也可以用于限流,因?yàn)槠邢尬茨苷归_。另外前文提到的 Netflix 不再維護(hù) Hystrix 項(xiàng)目,這是因?yàn)樗麄儼丫Ψ诺搅硪粋€(gè)限流項(xiàng)目 concurrency-limits 上了,這個(gè)項(xiàng)目的目標(biāo)是打造一款自適應(yīng)的,極具彈性的限流組件,它借鑒了 TCP 擁塞控制的算法(TCP congestion control algorithm),實(shí)現(xiàn)系統(tǒng)的自動限流,感興趣的同學(xué)可以去它的項(xiàng)目主頁了解更多內(nèi)容。本文篇幅較長,難免疏漏,如有問題,還望不吝賜教。參考
- 微服務(wù)網(wǎng)關(guān)實(shí)戰(zhàn)——Spring Cloud Gateway
- 《億級流量網(wǎng)站架構(gòu)核心技術(shù)》張開濤
- 聊聊高并發(fā)系統(tǒng)之限流特技
- 架構(gòu)師成長之路之限流
- 微服務(wù)接口限流的設(shè)計(jì)與思考
- 常用4種限流算法介紹及比較
- 來談?wù)勏蘖?從概念到實(shí)現(xiàn)
- 高并發(fā)下的限流分析
- 計(jì)數(shù)器算法
- 基于Redis的限流系統(tǒng)的設(shè)計(jì)
- API 調(diào)用次數(shù)限制實(shí)現(xiàn)
- Techniques to Improve QoS
- An alternative approach to rate limiting
- Scaling your API with rate limiters
- Brief overview of token-bucket algorithm
- Rate limiting Spring MVC endpoints with bucket4j
- Rate Limiter Internals in Resilience4j
- 高可用框架Resilience4j使用指南
- 阿里巴巴開源限流系統(tǒng) Sentinel 全解析
- spring cloud gateway 之限流篇
- 服務(wù)容錯(cuò)模式
- 你的API會自適應(yīng)「彈性」限流嗎???