DEV Community

Cover image for Controlling outgoing rate limit
rafaquelhodev
rafaquelhodev

Posted on

Controlling outgoing rate limit

Let's imagine a scenario that one has a distributed application that interacts with a third party API. Usually, third party APIs have a rate-limit control mechanism in order to avoid their clients from bursting requests and causing down-time on their services. In such a scenario, how can the caller control the rate of outgoing requests to the third party API in a distributed environment? This post discuss a possible strategy for this problem.

There a multiple algorithms to control the rate of requests, but here we'll focus on the token bucket algorithm, because it is relatively easy to understand and to implement. This algorithm states that: a bucket can hold a maximum of T tokens, and when an application wants to make a request to the third party API, it has to take 1 token from the bucket. If the bucket is empty, it has to wait until there is a least 1 token in the bucket. Also, the bucket is refilled with 1 token at a fixed rate of R tokens/milliseconds.

The token bucket algorithm is very straightforward to understand, but how can someone use it in a distributed environment to control the outgoing request to third party APIs?

If one wants to control the outgoing rate limit in a distributed environment, a centralized source of truth for the current rate limit is necessary. There are multiple ways to implement the source of truth and I've idealized the following diagram with a possible implementation:

Each pod connects to a TCP server that controls the outgoing rate limit

In the figure above, we have a distributed application in multiple pods, and each pod can make requests to a third party API. In the application infrastructure, there is a TCP server that controls the rate limit by using the token bucket algorithm. Before making a request to the third party API, the pod asks the TCP server for a new token, and the pod waits for a response from the TCP server until there is at least one available token. After a token is available, the pod makes the request to the third party API.

The TCP server implementation can be found in this repository https://github.com/rafaquelhodev/rlimit/ and in the next section I'll discuss briefly the token bucket implementation in golang.

Token bucket implementation

Below, I'm showing the main ideas behind the token bucket implementation. Please, take a look at the https://github.com/rafaquelhodev/rlimit/ repository to understand the detailed implementation.

The rate limit control is centralized in the TokenBucket struct:

type TokenBucket struct {
    id           string
    mu           sync.Mutex
    tokens       int64
    maxTokens    int64
    refillPeriod int64
    cron         chan bool
    subs         []chan bool
}
Enter fullscreen mode Exit fullscreen mode

You can notice that there is a subs property in the TokenBucket struct. Basically, this is an array of subscribers for a specific token bucket: every time a token is requested from a client, the client is added to the subs array and the client is notified when a new token is added to the bucket.

When starting the bucket, we need to provide a maximum number of tokens the bucket can support (maxTokens) and the amount of time a token is added to the bucket (refillPeriod):

func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket {
    bucket := &TokenBucket{
        id:           id,
        tokens:       0,
        maxTokens:    maxTokens,
        refillPeriod: refillPeriod,
        cron:         make(chan bool),
        subs:         make([]chan bool, 0),
    }
    fmt.Printf("refill period  = %d\n", refillPeriod)
    bucket.startCron()
    return bucket
}
Enter fullscreen mode Exit fullscreen mode

Now, you might wonder, "how a token is added to bucket?". For that, when a bucket is created, a cron job is started, and at every refillPeriod milliseconds, a new token is added to the bucket:

func (tb *TokenBucket) startCron() {
    ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond)

    go func() {
        for {
            select {
            case <-tb.cron:
                ticker.Stop()
                return
            case <-ticker.C:
                if tb.tokens < tb.maxTokens {
                    tb.tokens += 1
                    fmt.Printf("[TOKEN REFIL] | currTokens = %d\n", tb.tokens)

                    if len(tb.subs) > 0 {
                        sub := tb.subs[0]
                        tb.subs = tb.subs[1:]
                        sub <- true
                    }
                }
            }
        }
    }()
}
Enter fullscreen mode Exit fullscreen mode

Finally, when a client wants a token from the bucket, the waitAvailable function must be called:

func (tb *TokenBucket) waitAvailable() bool {
    tb.mu.Lock()

    if tb.tokens > 0 {
        fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id)
        tb.tokens -= 1
        tb.mu.Unlock()
        return true
    }

    fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id)

    ch := tb.tokenSubscribe()

    tb.mu.Unlock()

    <-ch

    fmt.Printf("[NEW TOKEN AVAILABLED] - id %s\n", tb.id)

    tb.tokens -= 1

    return true
}
Enter fullscreen mode Exit fullscreen mode

Inspired by https://github.com/Mohamed-khattab/Token-bucket-rate-limiter

Top comments (0)