golang.org/x/time/rateパッケージのトークンバケットのトークンの補充方法

goの標準ライブラリである golang.org/x/time/rate をサンプル的に動かして、トークンバケットのトークンの補充方法を理解したのでメモ。

golang.org/x/time/rate

この記事のまとめ

トークンバケットアルゴリズムとは

トークンバケットアルゴリズムは、一定時間ごとの処理やリクエストの量を制限するために使用できます。主に以下の要素で構成される。

バケット: トークンを入れておく箱。同時に入るトークンの数が決まっている。

トークン: 処理やリクエストの単位。 一定時間ごとにバケットに補充される。

補充レート: トークンが補充される速度。

任意の処理やリクエストが発生した際に、バケットからトークンを取り出す。トークンが不足している場合はリクエストを拒否または待機し、それ以外の場合はトークンを消費して、リクエストや処理を実行する。

参考

https://ja.wikipedia.org/wiki/トークンバケット

使ってみた

簡易なHTTPサーバ。`rate.NewLimiter(rate.Every(2*time.Second), 1)` により、2秒に1回のリクエストを許可するリミッターを作成した。

package main

import (
	"log"
	"net/http"
	"time"

	"golang.org/x/time/rate"
)

var rateLimit *rate.Limiter

func main() {
	rateLimit = rate.NewLimiter(rate.Every(2*time.Second), 1)

	http.HandleFunc("/wait", WaitHandler)
	http.HandleFunc("/allow", AllowHandler)

	http.ListenAndServe(":5000", nil)
}

func WaitHandler(w http.ResponseWriter, r *http.Request) {
	rateLimit.Wait(r.Context())
	log.Println("GET /wait called")
}

func AllowHandler(w http.ResponseWriter, r *http.Request) {
	if !rateLimit.Allow() {
		log.Println("Too many requests")
		http.Error(w, "Too many requests", http.StatusTooManyRequests)
		return
	}
	log.Println("GET /allow called")
}

このサーバに対して、約1秒に1回のリクエストを実行。

/wait はすべてのリクエストを実行できたが、/allow は2回1回エラーとなる。

`for i in {1..5}; do curl localhost:5000/wait ; sleep 1; done`

2025/01/22 22:19:30 GET /wait called
2025/01/22 22:19:32 GET /wait called
2025/01/22 22:19:34 GET /wait called
2025/01/22 22:19:36 GET /wait called
2025/01/22 22:19:38 GET /wait called

`for i in {1..5}; do curl localhost:5000/allow ; sleep 1; done`

2025/01/22 22:19:49 GET /allow called
2025/01/22 22:19:50 Too many requests
2025/01/22 22:19:51 GET /allow called
2025/01/22 22:19:52 Too many requests
2025/01/22 22:19:53 GET /allow called

Waitは処理を待機させ、Allowは即座にエラーを返すことを確認した。

トークンの補充方法

3流エンジニアの私がパッと思いついたのは、goroutineにより定期的にバケットにトークンを補充する方法。常に一定量のトークンがバケットに入っており、定期的に補充されるアニメーションを想像すると、この方法が自然な気がするが、 golang.org/x/time/rate では遅延評価(「使うときに実態を計算すればいいよね」評価)を採用している。

WaitやAllowメソッドの内部では、以下の流れで処理が行われる

  1. reserveメソッドが呼ばれ、使用する分のトークンがあるか確認

  2. advanceメソッドが呼ばれ、前回のトークンの使用時刻(または使用しようとして失敗した時刻)から指定した時刻(現在時刻)までの経過時間を計算

  3. その経過時間に基づいて補充されるべきトークン量を計算

reserveN メソッド内のトークンの補充処理の抜粋

// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
	defer lim.mu.Unlock()

...

	t, tokens := lim.advance(t)

	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {

...
	if ok {
...
		// Update state
		lim.last = t
		lim.tokens = tokens
	}
...
}

変数okはリクエストを処理できるかどうかのチェック結果で、trueの場合に最終更新時刻とトークンが更新される。

補充されるトークンの量は遅延評価により決定される。つまり、定期的(1秒ごと)にトークンが補充されるのではなく、「前回のトークン更新時刻からの経過時間×トークン補充率」により計算されたトークンがバケットに補充される。

advance メソッドにより補充されるトークンが計算される

// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
	last := lim.last
	if t.Before(last) {
		last = t
	}

	// Calculate the new number of tokens, due to time that passed.
	elapsed := t.Sub(last)
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}
	return t, tokens
}

コメントにも書かれているとおり、このadvanceメソッドは呼び出し元での構造体のロックを前提としている。毎回どこでロックをかければよいか迷子になるので、とても参考になる。