Rate limiter 是後端領域中常見的一項技術,用於限制特定時間內的請求數量,或者限制使用者特定時間內的請求數量。對於提供 API 服務的公司或服務來說,後端伺服器通常都會加上 rate limiter,以防止系統被過於頻繁地使用,從而提升系統的穩定性。
也因此,如果在面試有對外提供 API 服務的公司的後端工程師職缺時,有不小的機率會被問到如何保護 API 被過於頻繁的呼叫。
本文將介紹 4 種常見的 rate limiter 方法,並且實際以 Python 實作一遍,藉此加深我們對 rate limiter 的理解。
本文環境
- Python 3
限流機制 Rate limiter 簡介
Rate limiter 是 1 種控制系統資源使用頻率的技術,可以限制 1 段時間內的 request 數量。舉例來說,我們可以在 API endpoint 加上 rate limiter 限制使用者在 10 秒內只能呼叫 2 次 API,藉此避免使用者使用自動化程式瘋狂呼叫特定 API。
搜尋引擎、虛擬貨幣交易所、貨幣匯率等應用的 API 通常都會設定 rate limiter,例如 Coinbase 就規定每秒最多 5 個 requests,每小時最多 10,000 個 requests,如果達到上限就會收到 429 Too Many Requests 的 HTTP 回應狀態碼:
Rate limiter 可以運用在伺服器端與客戶端:
對伺服器端(或應用層)而言,可以避免系統過載,增加系統穩定性。當使用者頻繁對系統進行大量 requests,小則會對系統帶來一定的壓力,大則可能導致系統過載(例如資料庫過於忙碌導致無法正常服務),使用 rate limiter 可以限制使用者不合理的行為,從而增加系統穩定性。Rate limiter 通常會應用在 API 上,特別是控制使用者使用量的部分,前文所述 Coinbase 案例即屬此種應用。
如果你使用的 API 是按次計費,那麼你就屬於客戶端,在客戶端加上 rate limiter 將可以限制你呼叫 API 的次數,藉此控制你的支出成本。
目前實作 rate limiter 有 4 種常見的方法/策略:
- Fixed Window / 固定窗口
- Sliding Window / 滑動窗口
- Token Bucket
- Leaky Bucket
我們將分項探討此 4 種方法,並以 Python 實作 4 種 rate limiter,藉此了解各種方法。
Fixed Window / 固定窗口
Fixed Window 是 4 種常見的 rate limiter 方法中最直覺、最容易理解的 1 種。
簡單來說,fixed window 將時間切分成 1 個個連續單位,稱為 window,例如每小時僅允許 10,000 個 requests 的話,是將 24 小時切為 24 個 windows,我們只要紀錄當前 window 的 request 數量是否超過限制即可。
以下是使用 Python 實作的 fixed window rate limiter:
fixed_window_rate_limiter.py
import threading
import time
class FixedWindowRateLimiter:
def __init__(self, max_requests, window_size):
"""
Initialize the rate limiter.
:param max_requests: The maximum number of requests allowed per window.
:param window_size: The size of each window in seconds.
"""
self.max_requests = max_requests
self.window_size = window_size
self.current_count = 0
self.current_window_id = self.get_current_window_id()
self.lock = threading.Lock()
def get_current_window_id(self):
"""
Get the current time window.
:return: The start time of the current window.
"""
return int(time.time() // self.window_size)
def is_request_allowed(self):
"""
Check if a request is allowed based on the current window.
:return: True if the request is allowed, False otherwise.
"""
with self.lock:
current_window = self.get_current_window_id()
if current_window != self.current_window_id:
self.current_window_id = current_window
self.current_count = 0
if self.current_count < self.max_requests:
self.current_count += 1
return True
return False
if __name__ == '__main__':
rate_limiter = FixedWindowRateLimiter(5, 2)
for i in range(1, 11):
current_window_id = rate_limiter.get_current_window_id()
if rate_limiter.is_request_allowed():
print(f"{current_window_id=}, request {i:0>2} allowed.")
else:
print(f"{current_window_id=}, request {i:0>2} blocked.")
time.sleep(0.2)
上述程式執行指令如下:
$ python fixed_window_rate_limiter.py
我們在 __main__
區塊做了 1 個每 2 秒只允許 5 個 requests 的 rate limiter, 其執行結果如下:
current_window_id=860807646, request 01 allowed.
current_window_id=860807646, request 02 allowed.
current_window_id=860807646, request 03 allowed.
current_window_id=860807646, request 04 allowed.
current_window_id=860807646, request 05 allowed.
current_window_id=860807646, request 06 blocked.
current_window_id=860807646, request 07 blocked.
current_window_id=860807646, request 08 blocked.
current_window_id=860807646, request 09 blocked.
current_window_id=860807647, request 10 allowed.
從結果可以看到 window 860807646 有 5 個 requests 被允許,4 個 requests 被阻擋,到了新的 window 860807647 時,又重新允許 request 執行。
上述程式碼的重點在於 is_request_allowed()
方法:
def is_request_allowed(self):
with self.lock:
current_window = self.get_current_window_id()
if current_window != self.current_window_id:
self.current_window_id = current_window
self.current_count = 0
if self.current_count < self.max_requests:
self.current_count += 1
return True
return False
由於時間是一直向前的,所以每次收到新的 request,我們都需要計算目前處於哪個 window,如果時間的 window 變了,就重置 current_window_id
與 current_count
,接著查看 current_count
是否超過 max_requests
,如果沒有超過,就代表仍在容許範圍,如果超過就代表達到上限。
Fixed window 有個明顯缺點 —— 「對於突峰(spike)的處理能力較差」。
當發送 requests 的時間處於 window 交界處時,就可以超出容許值,這是因為一進入新的 window,舊 window 的 count 就不再納入考量,系統仍有短時間內收到大量 requests 的風險。
滑動窗口 / Sliding Window
Sliding window 的設計就是為了能夠解決 fixed window 的問題,所以它的設計上就必須保存舊 window 的 request 數,sliding window 的 window size 也跟 fixed window 同樣是固定大小,但與 fixed window 不同的是,sliding window 的 window 會隨著時間而移動。
以下是使用 Python 實作的 sliding window rate limiter:
sliding_window_rate_limiter.py
import threading
import time
from collections import deque
class SlidingWindowRateLimiter:
def __init__(self, max_requests, window_size):
"""
Initialize the rate limiter.
:param max_requests: The maximum number of requests allowed per window.
:param window_size: The size of the sliding window in seconds.
"""
self.max_requests = max_requests
self.window_size = window_size
self.request_timestamps = deque()
self.lock = threading.Lock()
def _remove_old_requests(self):
"""
Remove requests that are outside the current window.
"""
current_time = time.time()
clean_threshold = current_time - self.window_size
while self.request_timestamps and self.request_timestamps[0] <= clean_threshold:
self.request_timestamps.popleft()
def is_request_allowed(self):
"""
Check if a request is allowed based on the sliding window.
:return: True if the request is allowed, False otherwise.
"""
with self.lock:
self._remove_old_requests()
if len(self.request_timestamps) < self.max_requests:
self.request_timestamps.append(time.time())
return True
return False
if __name__ == "__main__":
rate_limiter = SlidingWindowRateLimiter(max_requests=2, window_size=1)
for i in range(1, 11):
ts = time.time()
if rate_limiter.is_request_allowed():
print(f"{ts=}\trequest {i:0>2} allowed")
else:
print(f"{ts=}\trequest {i:0>2} blocked")
time.sleep(0.2)
上述程式執行指令如下:
$ python sliding_window_rate_limiter.py
我們設在 __main__
區塊做了 1 個每 1 秒只允許 2 個 requests 的 rate limiter,其執行結果如下:
ts=1721618917.485729 request 01 allowed
ts=1721618917.688738 request 02 allowed
ts=1721618917.893614 request 03 blocked
ts=1721618918.0975401 request 04 blocked
ts=1721618918.301672 request 05 blocked
ts=1721618918.5055192 request 06 allowed
ts=1721618918.706221 request 07 allowed
ts=1721618918.911444 request 08 blocked
ts=1721618919.11663 request 09 blocked
ts=1721618919.3200068 request 10 blocked
從上述執行結果可以看到,因為 window 是隨時間移動的,所以就算從 1721618917 切換到 1721618918 也沒有導致像 fixed window 般切換 window 導致的 count 重置。
上述程式碼的重點在於使用 request_timestamps
屬性儲存每 1 個 request 的 timestamp,因此每次在執行 is_request_allowed()
方法之前,需要把超出 window size 的 timestamp 清掉:
def _remove_old_requests(self):
current_time = time.time()
clean_threshold = current_time - self.window_size
while (
self.request_timestamps and
self.request_timestamps[0] <= clean_threshold
):
self.request_timestamps.popleft()
於是 is_request_allowed()
變得相對單純,只需判斷 request_timestamps
屬性長度是否超出限制即可:
def is_request_allowed(self):
with self.lock:
self._remove_old_requests()
if len(self.request_timestamps) < self.max_requests:
self.request_timestamps.append(time.time())
return True
return False
從 sliding window rate limiter 的程式碼可以發現 1 件事,當容許 requests 數相當多時(例如 1 小時 10,000 個 requests),會使得 request_timestamps
屬性變得龐大(也需消耗記憶體),進而導致系統每次收到 request 都需要花一些時間對 request_timestamps
屬性進行清理,這將拖慢一些系統的回應速度。
這時可以考慮試試 Token Bucket 的做法。
Token Bucket
Token Bucket 相當有趣,它將 rate limiter 視為放了許多 tokens 的桶子,當收到 request 時,如果桶子裡有 token,就代表可以提供服務,然後從桶子取走 1 個 token,當 tokens 都被取光了,那就代表達到 rate limiter 使用上限,因此拒絕提供服務;而 rate limiter 也會定期補充 token,不會在 token 被拿光之後就永久不再提供服務。
以下是以 Python 實作的 token bucket rate limiter:
token_bucket_rate_limiter.py
import threading
import time
class TokenBucketRateLimiter:
def __init__(self, capacity, refill_rate):
"""
Initialize the rate limiter.
:param capacity: The maximum number of tokens in the bucket.
:param refill_rate: The rate at which tokens are added to the bucket (tokens per second).
"""
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill_timestamp = time.time()
self.lock = threading.Lock()
def _refill(self):
"""
Add tokens to the bucket based on the time elapsed since the last refill.
"""
current_time = time.time()
elapsed = current_time - self.last_refill_timestamp
added_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + added_tokens)
self.last_refill_timestamp = current_time
def is_request_allowed(self):
"""
Check if the request can be processed by consuming the required tokens.
:return: True if the request is allowed, False otherwise.
"""
with self.lock:
self._refill()
if self.tokens >= 1:
self.tokens -= 1
return True
return False
if __name__ == "__main__":
rate_limiter = TokenBucketRateLimiter(capacity=5, refill_rate=1)
# Simulate making requests
for i in range(1, 16):
ts = time.time()
if rate_limiter.is_request_allowed():
print(f"{ts=} request {i:0>2} allowed.")
else:
print(f"{ts=} request {i:0>2} blocked.")
time.sleep(0.5)
上述程式執行指令如下:
$ python token_bucket_rate_limiter.py
我們設在 __main__
區塊做了 1 個初始擁有 5 個 tokens 的 rate limiter,該 rate limiter 每 1 秒會補充 1 個 token,實驗每 0.5 秒收到 1 個 request 的情況,其執行結果如下:
ts=1721629573.7187788 request 01 allowed.
ts=1721629574.221472 request 02 allowed.
ts=1721629574.7257988 request 03 allowed.
ts=1721629575.2276852 request 04 allowed.
ts=1721629575.732173 request 05 allowed.
ts=1721629576.237281 request 06 allowed.
ts=1721629576.738861 request 07 allowed.
ts=1721629577.241088 request 08 allowed.
ts=1721629577.744705 request 09 allowed.
ts=1721629578.249012 request 10 blocked.
ts=1721629578.7537541 request 11 allowed.
ts=1721629579.258592 request 12 blocked.
ts=1721629579.761495 request 13 allowed.
ts=1721629580.264918 request 14 blocked.
ts=1721629580.770061 request 15 allowed.
可以看到一開始 bucket 內的 tokens 還夠用,所以 9 個 requests 後,就用光 token 了。接著,每 1 秒才補充 1 次 token,所以第 9 次之後,開始每秒只接受 1 個 request。
上述程式碼的重點在於 _refill()
方法:
def _refill(self):
current_time = time.time()
elapsed = current_time - self.last_refill_timestamp
added_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + added_tokens)
self.last_refill_timestamp = current_time
該方法會以現在時間減上次執行 _refill()
方法的時間,藉此判斷應該補充幾個 tokens 到 bucket 之中。同時,水桶中的 tokens 數不得超過設定的上限,因此以 min(self.capacity, self.tokens + added_tokens)
限制水桶內的 token 數量。最後更新執行 _refill()
方法的時間到 last_refill_timestamp
屬性之中。
如此一來,is_request_allowed()
方法也變得很簡單:
def is_request_allowed(self):
with self.lock:
self._refill()
if self.tokens >= 1:
self.tokens -= 1
return True
return False
只要補充完 token 後,再判斷 bucket 內 token 數是否大於等於 1 即可。
Token Bucket 藉由簡單的機制就做到與 sliding window 類似作用,同時又不需要儲存過往收到的 request 紀錄,也不用每次對 request 的時間紀錄進行清理,是 1 個相當聰明的作法。
誰說 1 次只能拿 1 個 token
Token bucket 其實還可以導入權重的概念,也就是將某些比較耗資源的運算,改成一次拿走多個 tokens,藉此控制資源使用量:
def is_request_allowed(self, tokens=1):
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
Leaky Bucket
Leaky Bucket 其實是 token bucket 的相反。
Leaky Bucket 與 token bucket 同樣是 1 個存放 token 的桶子,不過 leaky bucket 的桶子是空的,每收到 1 個 request 就會往桶子放 1 個 token,如果桶子滿了,就代表系統滿載,無法提供服務;而 rate limiter 也會定期取走 token,不會在 bucket 滿了之後就永久不再提供服務。
以下是使用 Python 實作的 leaky bucket rate limiter:
leaky_bucket_rate_limiter.py
import threading
import time
class LeakyBucketRateLimiter:
def __init__(self, capacity, leak_rate):
"""
Initialize the rate limiter.
:param capacity: The maximum number of tokens in the bucket.
:param leak_rate: The rate at which tokens are processed (tokens per second).
"""
self.capacity = capacity
self.leak_rate = leak_rate
self.tokens = 0
self.last_leak_timestamp = time.time()
self.lock = threading.Lock()
def _leak(self):
"""
Process (leak) tokens from the bucket based on the time elapsed since the last check.
"""
current_time = time.time()
elapsed = current_time - self.last_leak_timestamp
leaked_tokens = elapsed * self.leak_rate
self.tokens = max(0, self.tokens - leaked_tokens)
self.last_leak_timestamp = current_time
def is_request_allowed(self):
"""
Check if the request can be processed by adding the required tokens to the bucket.
:return: True if the request is allowed, False otherwise.
"""
with self.lock:
self._leak()
if self.tokens + 1 <= self.capacity:
self.tokens += 1
return True
return False
if __name__ == "__main__":
rate_limiter = LeakyBucketRateLimiter(capacity=5, leak_rate=1)
# Simulate making requests
for i in range(1, 16):
ts = time.time()
if rate_limiter.is_request_allowed():
print(f"{ts=} request {i:0>2} allowed.")
else:
print(f"{ts=} request {i:0>2} blocked.")
time.sleep(0.5)
由於 Leaky Bucket 與 token bucket 概念相同,只是表達方式相反,所以就不多加贅述了。
補充知識
Rate limiter 容易被誤認為可以防禦阻斷式攻擊(DoS,Deny-of-Service)和分散式阻斷式攻擊(DDoS,Distributed Denial-of-Service)。實際上,只要大量 requests 進入後端伺服器(或應用層 / application layer),造成網路頻寬耗盡或後端伺服器運算資源滿載,即使後端伺服器的大多數請求都被 rate limiter 判定應阻擋,仍無法真正保護伺服器。
這是因為即使 rate limiter 有效地阻止了大量請求,這些請求仍會占用伺服器的網路頻寬和運算資源,最終導致伺服器無法正常回應,這就達到 DoS 或 DDoS 攻擊的目的。
因為 DoS, DDoS 攻擊成功的要件並不需要耗盡資料庫伺服器等重要資源,只需讓伺服器無法正常運作即可。
總結
Rate limiter 是後端領域相當常見的技術,在保護系統避免受過度使用或濫用的情況下非常有用。
以上!
Enjoy!
References
The Fundamentals of Rate Limiting: How it Works and Why You Need it