DEV Community

Flavio Curella
Flavio Curella

Posted on

Refining exceptions with Context Decorators

Very often, 3rd party API clients are designed to return a generic "API Exception", with details contained inside some property. One notorious example of this is boto3: most exceptions come as a ClientError with a response.

Usually this is fine, but occasionally I need finer-grained exceptions than what the client is giving me.

One common scenario is configuring a celery task to retry a while after the rate limiting: I want to get errors for all exceptions, but for rate-limiting, I just want to retry the task later. Celery offers a convenient autoretry_for argument for that, but we can't use it just for throttling because boto3 does not return an exception specific enough.

I could wrap the login in a try ... except clause and inspect the exception, but that get repetitive pretty quickly, especially as I add more and more tasks.

For these kind of situations, I create a context decorator. The decorator inspects the exception for me and, if it's the one I'm looking, raises a custom exception that then I can catch:

# myproject/exceptions.py

import contexlib

from botocore.exceptions import ClientError


class ThrottleException(ClientError):
    pass


class raise_throttle(contextlib.ContextDecorator):
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        if exc_type == ClientError:
            if exc_value.response.get("Error", {}).get("Code") == "Throttling":
                raise ThrottleException(
                    exc_value.response, exc_value.operation_name
                ) from exc_value
        # returning `False` makes the decorator
        # raise the original exception, if any.
        return False

This decorator can be used on any function or method that calls the API:

# myproject/myapp/aws.py
import boto3


@raise_throttle()
def upload(content):
    s3 = boto3.resource("s3")
    bucket = s3.Bucket("my-bucket")
    bucket.put_object(
        Body=b"lorem ipsum",
        Key="Hamlet.txt",
    )

And the exception can be caught by celery's autoretry_for:

# myproject/myapp/tasks.py

from myproject.celery import app
from myproject.exceptions import ThrottleException
from myproject.myapp.aws import upload


@app.task(autoretry_for=(ThrottleException,))
def my_task(content):
    upload(content)

But I don't want to retry the task immediately, since the throttling takes a while to be lifted. Therefore I create a Task base class that retries with exponential backoff:

# myproject/tasks.py

import random

from celery import Task


def jitter(jitter_max=1.4):
    return random.uniform(1, jitter_max)


def exponential(retries, factor=3):
    return (factor ** (retries + 1))


def exp_jitter(retries, exp_factor=3, jitter_max=1.4):
    return exponential(retries, exp_factor) * jitter(jitter_max)


class ExpBackoffTask(Task):
    abstract = True
    max_retries = 10

    def retry(self, *args, **kwargs):
        countdown = kwargs.get('countdown', None)
        if countdown is None:
            # if no explicit countdown is given, use an exponential backoff
            # with some random jitter, giving us a top-end under 24 hours at
            # which the last retry will be attempted.
            kwargs['countdown'] = int(
                exp_jitter(self.request.retries)
            )
        return super().retry(*args, **kwargs)

I can then use the custom class as base:

# myproject/myapp/tasks.py

from myproject.celery import app
from myproject.exceptions import ThrottleException
from myproject.tasks import ExpBackoffTask
from myproject.myapp.aws import upload


@app.task(base=ExpBackoffTask, autoretry_for=(ThrottleException,))
def my_task(content):
    upload(content)

Top comments (0)