後端工程師面試考什麼 — 限流機制 Rate Limiter

Posted on  Jul 22, 2024  in  後端面試準備  by  Amo Chen  ‐ 8 min read

Rate limiter 是後端領域中常見的一項技術,用於限制特定時間內的請求數量,或者限制使用者特定時間內的請求數量。對於提供 API 服務的公司或服務來說,後端伺服器通常都會加上 rate limiter,以防止系統被過於頻繁地使用,從而提升系統的穩定性。

也因此,如果在面試有對外提供 API 服務的公司的後端工程師職缺時,有不小的機率會被問到如何保護 API 被過於頻繁的呼叫。

本文將介紹 4 種常見的 rate limiter 方法,並且實際以 Python 實作一遍,藉此加深我們對 rate limiter 的理解。

本文環境

  • Python 3

限流機制 Rate limiter 簡介

Rate limiter 是 1 種控制系統資源使用頻率的技術,可以限制 1 段時間內的 request 數量。舉例來說,我們可以在 API endpoint 加上 rate limiter 限制使用者在 10 秒內只能呼叫 2 次 API,藉此避免使用者使用自動化程式瘋狂呼叫特定 API。

搜尋引擎、虛擬貨幣交易所、貨幣匯率等應用的 API 通常都會設定 rate limiter,例如 Coinbase 就規定每秒最多 5 個 requests,每小時最多 10,000 個 requests,如果達到上限就會收到 429 Too Many Requests 的 HTTP 回應狀態碼:

coinbase-rate-limit.png

Rate limiter 可以運用在伺服器端與客戶端:

  • 對伺服器端(或應用層)而言,可以避免系統過載,增加系統穩定性。當使用者頻繁對系統進行大量 requests,小則會對系統帶來一定的壓力,大則可能導致系統過載(例如資料庫過於忙碌導致無法正常服務),使用 rate limiter 可以限制使用者不合理的行為,從而增加系統穩定性。Rate limiter 通常會應用在 API 上,特別是控制使用者使用量的部分,前文所述 Coinbase 案例即屬此種應用。

  • 如果你使用的 API 是按次計費,那麼你就屬於客戶端,在客戶端加上 rate limiter 將可以限制你呼叫 API 的次數,藉此控制你的支出成本。

目前實作 rate limiter 有 4 種常見的方法/策略:

  • Fixed Window / 固定窗口
  • Sliding Window / 滑動窗口
  • Token Bucket
  • Leaky Bucket

我們將分項探討此 4 種方法,並以 Python 實作 4 種 rate limiter,藉此了解各種方法。

Fixed Window / 固定窗口

Fixed Window 是 4 種常見的 rate limiter 方法中最直覺、最容易理解的 1 種。

簡單來說,fixed window 將時間切分成 1 個個連續單位,稱為 window,例如每小時僅允許 10,000 個 requests 的話,是將 24 小時切為 24 個 windows,我們只要紀錄當前 window 的 request 數量是否超過限制即可。

以下是使用 Python 實作的 fixed window rate limiter:

fixed_window_rate_limiter.py

import threading
import time


class FixedWindowRateLimiter:
    def __init__(self, max_requests, window_size):
        """
        Initialize the rate limiter.
        :param max_requests: The maximum number of requests allowed per window.
        :param window_size: The size of each window in seconds.
        """
        self.max_requests = max_requests
        self.window_size = window_size
        self.current_count = 0
        self.current_window_id = self.get_current_window_id()
        self.lock = threading.Lock()

    def get_current_window_id(self):
        """
        Get the current time window.
        :return: The start time of the current window.
        """
        return int(time.time() // self.window_size)

    def is_request_allowed(self):
        """
        Check if a request is allowed based on the current window.
        :return: True if the request is allowed, False otherwise.
        """
        with self.lock:
            current_window = self.get_current_window_id()
            if current_window != self.current_window_id:
                self.current_window_id = current_window
                self.current_count = 0

            if self.current_count < self.max_requests:
                self.current_count += 1
                return True
            return False


if __name__ == '__main__':
    rate_limiter = FixedWindowRateLimiter(5, 2)
    for i in range(1, 11):
        current_window_id = rate_limiter.get_current_window_id()
        if rate_limiter.is_request_allowed():
            print(f"{current_window_id=}, request {i:0>2} allowed.")
        else:
            print(f"{current_window_id=}, request {i:0>2} blocked.")
        time.sleep(0.2)

上述程式執行指令如下:

$ python fixed_window_rate_limiter.py

我們在 __main__ 區塊做了 1 個每 2 秒只允許 5 個 requests 的 rate limiter, 其執行結果如下:

current_window_id=860807646, request 01 allowed.
current_window_id=860807646, request 02 allowed.
current_window_id=860807646, request 03 allowed.
current_window_id=860807646, request 04 allowed.
current_window_id=860807646, request 05 allowed.
current_window_id=860807646, request 06 blocked.
current_window_id=860807646, request 07 blocked.
current_window_id=860807646, request 08 blocked.
current_window_id=860807646, request 09 blocked.
current_window_id=860807647, request 10 allowed.

從結果可以看到 window 860807646 有 5 個 requests 被允許,4 個 requests 被阻擋,到了新的 window 860807647 時,又重新允許 request 執行。

上述程式碼的重點在於 is_request_allowed() 方法:

def is_request_allowed(self):
    with self.lock:
        current_window = self.get_current_window_id()
        if current_window != self.current_window_id:
            self.current_window_id = current_window
            self.current_count = 0

        if self.current_count < self.max_requests:
            self.current_count += 1
            return True
        return False

由於時間是一直向前的,所以每次收到新的 request,我們都需要計算目前處於哪個 window,如果時間的 window 變了,就重置 current_window_idcurrent_count,接著查看 current_count 是否超過 max_requests,如果沒有超過,就代表仍在容許範圍,如果超過就代表達到上限。

Fixed window 有個明顯缺點 —— 「對於突峰(spike)的處理能力較差」。

當發送 requests 的時間處於 window 交界處時,就可以超出容許值,這是因為一進入新的 window,舊 window 的 count 就不再納入考量,系統仍有短時間內收到大量 requests 的風險。

滑動窗口 / Sliding Window

Sliding window 的設計就是為了能夠解決 fixed window 的問題,所以它的設計上就必須保存舊 window 的 request 數,sliding window 的 window size 也跟 fixed window 同樣是固定大小,但與 fixed window 不同的是,sliding window 的 window 會隨著時間而移動。

sliding-window.png

以下是使用 Python 實作的 sliding window rate limiter:

sliding_window_rate_limiter.py

import threading
import time
from collections import deque


class SlidingWindowRateLimiter:
    def __init__(self, max_requests, window_size):
        """
        Initialize the rate limiter.
        :param max_requests: The maximum number of requests allowed per window.
        :param window_size: The size of the sliding window in seconds.
        """
        self.max_requests = max_requests
        self.window_size = window_size
        self.request_timestamps = deque()
        self.lock = threading.Lock()

    def _remove_old_requests(self):
        """
        Remove requests that are outside the current window.
        """
        current_time = time.time()
        clean_threshold = current_time - self.window_size
        while self.request_timestamps and self.request_timestamps[0] <= clean_threshold:
            self.request_timestamps.popleft()

    def is_request_allowed(self):
        """
        Check if a request is allowed based on the sliding window.
        :return: True if the request is allowed, False otherwise.
        """
        with self.lock:
            self._remove_old_requests()
            if len(self.request_timestamps) < self.max_requests:
                self.request_timestamps.append(time.time())
                return True
            return False


if __name__ == "__main__":
    rate_limiter = SlidingWindowRateLimiter(max_requests=2, window_size=1)

    for i in range(1, 11):
        ts = time.time()
        if rate_limiter.is_request_allowed():
            print(f"{ts=}\trequest {i:0>2} allowed")
        else:
            print(f"{ts=}\trequest {i:0>2} blocked")
        time.sleep(0.2)

上述程式執行指令如下:

$ python sliding_window_rate_limiter.py

我們設在 __main__ 區塊做了 1 個每 1 秒只允許 2 個 requests 的 rate limiter,其執行結果如下:

ts=1721618917.485729  request 01 allowed
ts=1721618917.688738  request 02 allowed
ts=1721618917.893614  request 03 blocked
ts=1721618918.0975401 request 04 blocked
ts=1721618918.301672  request 05 blocked
ts=1721618918.5055192 request 06 allowed
ts=1721618918.706221  request 07 allowed
ts=1721618918.911444  request 08 blocked
ts=1721618919.11663   request 09 blocked
ts=1721618919.3200068 request 10 blocked

從上述執行結果可以看到,因為 window 是隨時間移動的,所以就算從 1721618917 切換到 1721618918 也沒有導致像 fixed window 般切換 window 導致的 count 重置。

上述程式碼的重點在於使用 request_timestamps 屬性儲存每 1 個 request 的 timestamp,因此每次在執行 is_request_allowed() 方法之前,需要把超出 window size 的 timestamp 清掉:

def _remove_old_requests(self):
    current_time = time.time()
    clean_threshold = current_time - self.window_size
    while (
        self.request_timestamps and
        self.request_timestamps[0] <= clean_threshold
    ):
        self.request_timestamps.popleft()

於是 is_request_allowed() 變得相對單純,只需判斷 request_timestamps 屬性長度是否超出限制即可:

def is_request_allowed(self):
    with self.lock:
        self._remove_old_requests()
        if len(self.request_timestamps) < self.max_requests:
            self.request_timestamps.append(time.time())
            return True
        return False

從 sliding window rate limiter 的程式碼可以發現 1 件事,當容許 requests 數相當多時(例如 1 小時 10,000 個 requests),會使得 request_timestamps 屬性變得龐大(也需消耗記憶體),進而導致系統每次收到 request 都需要花一些時間對 request_timestamps 屬性進行清理,這將拖慢一些系統的回應速度。

這時可以考慮試試 Token Bucket 的做法。

Token Bucket

Token Bucket 相當有趣,它將 rate limiter 視為放了許多 tokens 的桶子,當收到 request 時,如果桶子裡有 token,就代表可以提供服務,然後從桶子取走 1 個 token,當 tokens 都被取光了,那就代表達到 rate limiter 使用上限,因此拒絕提供服務;而 rate limiter 也會定期補充 token,不會在 token 被拿光之後就永久不再提供服務。

token-bucket.png

以下是以 Python 實作的 token bucket rate limiter:

token_bucket_rate_limiter.py

import threading
import time


class TokenBucketRateLimiter:
    def __init__(self, capacity, refill_rate):
        """
        Initialize the rate limiter.
        :param capacity: The maximum number of tokens in the bucket.
        :param refill_rate: The rate at which tokens are added to the bucket (tokens per second).
        """
        self.capacity = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate
        self.last_refill_timestamp = time.time()
        self.lock = threading.Lock()

    def _refill(self):
        """
        Add tokens to the bucket based on the time elapsed since the last refill.
        """
        current_time = time.time()
        elapsed = current_time - self.last_refill_timestamp
        added_tokens = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + added_tokens)
        self.last_refill_timestamp = current_time

    def is_request_allowed(self):
        """
        Check if the request can be processed by consuming the required tokens.
        :return: True if the request is allowed, False otherwise.
        """
        with self.lock:
            self._refill()
            if self.tokens >= 1:
                self.tokens -= 1
                return True
            return False


if __name__ == "__main__":
    rate_limiter = TokenBucketRateLimiter(capacity=5, refill_rate=1)

    # Simulate making requests
    for i in range(1, 16):
        ts = time.time()
        if rate_limiter.is_request_allowed():
            print(f"{ts=} request {i:0>2} allowed.")
        else:
            print(f"{ts=} request {i:0>2} blocked.")
        time.sleep(0.5)

上述程式執行指令如下:

$ python token_bucket_rate_limiter.py

我們設在 __main__ 區塊做了 1 個初始擁有 5 個 tokens 的 rate limiter,該 rate limiter 每 1 秒會補充 1 個 token,實驗每 0.5 秒收到 1 個 request 的情況,其執行結果如下:

ts=1721629573.7187788 request 01 allowed.
ts=1721629574.221472  request 02 allowed.
ts=1721629574.7257988 request 03 allowed.
ts=1721629575.2276852 request 04 allowed.
ts=1721629575.732173  request 05 allowed.
ts=1721629576.237281  request 06 allowed.
ts=1721629576.738861  request 07 allowed.
ts=1721629577.241088  request 08 allowed.
ts=1721629577.744705  request 09 allowed.
ts=1721629578.249012  request 10 blocked.
ts=1721629578.7537541 request 11 allowed.
ts=1721629579.258592  request 12 blocked.
ts=1721629579.761495  request 13 allowed.
ts=1721629580.264918  request 14 blocked.
ts=1721629580.770061  request 15 allowed.

可以看到一開始 bucket 內的 tokens 還夠用,所以 9 個 requests 後,就用光 token 了。接著,每 1 秒才補充 1 次 token,所以第 9 次之後,開始每秒只接受 1 個 request。

上述程式碼的重點在於 _refill() 方法:

def _refill(self):
    current_time = time.time()
    elapsed = current_time - self.last_refill_timestamp
    added_tokens = elapsed * self.refill_rate
    self.tokens = min(self.capacity, self.tokens + added_tokens)
    self.last_refill_timestamp = current_time

該方法會以現在時間減上次執行 _refill() 方法的時間,藉此判斷應該補充幾個 tokens 到 bucket 之中。同時,水桶中的 tokens 數不得超過設定的上限,因此以 min(self.capacity, self.tokens + added_tokens) 限制水桶內的 token 數量。最後更新執行 _refill() 方法的時間到 last_refill_timestamp 屬性之中。

如此一來,is_request_allowed() 方法也變得很簡單:

def is_request_allowed(self):
    with self.lock:
        self._refill()
        if self.tokens >= 1:
            self.tokens -= 1
            return True
        return False

只要補充完 token 後,再判斷 bucket 內 token 數是否大於等於 1 即可。

Token Bucket 藉由簡單的機制就做到與 sliding window 類似作用,同時又不需要儲存過往收到的 request 紀錄,也不用每次對 request 的時間紀錄進行清理,是 1 個相當聰明的作法。

誰說 1 次只能拿 1 個 token

Token bucket 其實還可以導入權重的概念,也就是將某些比較耗資源的運算,改成一次拿走多個 tokens,藉此控制資源使用量:

def is_request_allowed(self, tokens=1):
    with self.lock:
        self._refill()
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False

Leaky Bucket

Leaky Bucket 其實是 token bucket 的相反。

Leaky Bucket 與 token bucket 同樣是 1 個存放 token 的桶子,不過 leaky bucket 的桶子是空的,每收到 1 個 request 就會往桶子放 1 個 token,如果桶子滿了,就代表系統滿載,無法提供服務;而 rate limiter 也會定期取走 token,不會在 bucket 滿了之後就永久不再提供服務。

leaky-bucket.png

以下是使用 Python 實作的 leaky bucket rate limiter:

leaky_bucket_rate_limiter.py

import threading
import time


class LeakyBucketRateLimiter:
    def __init__(self, capacity, leak_rate):
        """
        Initialize the rate limiter.
        :param capacity: The maximum number of tokens in the bucket.
        :param leak_rate: The rate at which tokens are processed (tokens per second).
        """
        self.capacity = capacity
        self.leak_rate = leak_rate
        self.tokens = 0
        self.last_leak_timestamp = time.time()
        self.lock = threading.Lock()

    def _leak(self):
        """
        Process (leak) tokens from the bucket based on the time elapsed since the last check.
        """
        current_time = time.time()
        elapsed = current_time - self.last_leak_timestamp
        leaked_tokens = elapsed * self.leak_rate
        self.tokens = max(0, self.tokens - leaked_tokens)
        self.last_leak_timestamp = current_time

    def is_request_allowed(self):
        """
        Check if the request can be processed by adding the required tokens to the bucket.
        :return: True if the request is allowed, False otherwise.
        """
        with self.lock:
            self._leak()
            if self.tokens + 1 <= self.capacity:
                self.tokens += 1
                return True
            return False

if __name__ == "__main__":
    rate_limiter = LeakyBucketRateLimiter(capacity=5, leak_rate=1)

    # Simulate making requests
    for i in range(1, 16):
        ts = time.time()
        if rate_limiter.is_request_allowed():
            print(f"{ts=} request {i:0>2} allowed.")
        else:
            print(f"{ts=} request {i:0>2} blocked.")
        time.sleep(0.5)

由於 Leaky Bucket 與 token bucket 概念相同,只是表達方式相反,所以就不多加贅述了。

補充知識

Rate limiter 容易被誤認為可以防禦阻斷式攻擊(DoS,Deny-of-Service)和分散式阻斷式攻擊(DDoS,Distributed Denial-of-Service)。實際上,只要大量 requests 進入後端伺服器(或應用層 / application layer),造成網路頻寬耗盡或後端伺服器運算資源滿載,即使後端伺服器的大多數請求都被 rate limiter 判定應阻擋,仍無法真正保護伺服器。

這是因為即使 rate limiter 有效地阻止了大量請求,這些請求仍會占用伺服器的網路頻寬和運算資源,最終導致伺服器無法正常回應,這就達到 DoS 或 DDoS 攻擊的目的。

因為 DoS, DDoS 攻擊成功的要件並不需要耗盡資料庫伺服器等重要資源,只需讓伺服器無法正常運作即可。

總結

Rate limiter 是後端領域相當常見的技術,在保護系統避免受過度使用或濫用的情況下非常有用。

以上!

Enjoy!

References

The Fundamentals of Rate Limiting: How it Works and Why You Need it

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!