DEV Community

Mehedee Siddique
Mehedee Siddique

Posted on • Updated on

Introduction to Celery

Celery is an asynchronous task queue for python. We mostly use it to run tasks outside the cycle of our regular application, e.g. HTTP request-response cycle. Also, we can use Celery to schedule tasks for a specific time or run periodic tasks, as in cron-jobs.

A classic example is sending an email on user signup. You want to send a user an on-boarding email when they’re signing up but that means you’d need to hold the request, post the email, wait for success response for email sending and then, return response to the user . As sending email is a network call, this might take some time which means, for this time, your application and the request sits idly, so does the user, thinking why the hell am I signing up for this shitty application!

Here comes Celery to the rescue. With Celery you can just publish an email sending task to a Celery Worker (a celery application containing your email sending task’s codes, more on this later), finish the signup process and return response to the user, all while the Celery worker is doing the task of sending the email to the user. This way your request-response lifecycle is cut short and everyone is happy.

Async Job Execution

There are two main and one optional components of Celery:

Broker - as the name suggests, an intermediary between the client application/s and worker/s.

Worker - where the task actually executes.

Result Backend aka. Backend (Optional) - where the results of executed tasks are stored for retrieval.

Broker

The Broker is responsible for receiving task messages from the client application/s, queue them and deliver them to available worker/s. It works as an intermediary. Broker is also responsible for maintaining multiple queues according to your need and priority. It also does the job of redelivering failed tasks to workers for retrying(if configured) and more. The most popular broker used with Celery is RabbitMQ. Redis can also be used as broker.

Worker

The worker is the executor of your tasks. There can be multiple workers running at once. Workers constantly monitor broker for new tasks. Whenever a worker has a free slot, it checks with broker for new tasks and picks them if available. Worker then executes the tasks and checks with broker for new tasks again.

Result Backend

Celery gives you an option to store the result of your task. It is achieved with a result backend. Backend stores the task’s execution result along with the return value from your tasks. Backend is also necessary to design some of the canvas workflows(more on this later). One of the most popular choice for Result Backend is Redis because of it’s super fast Key-Value storage that makes fetching results of a task very efficient. RabbitMQ can also be used as Backend.

Lets jump into some actual codes now. We will use RabbitMQ as broker and Redis as backend. Lets create an app directory inside our content root directory and create a python file worker.py.

First we create connections strings for RabbitMQ broker and Redis backend. Then we pass them to create a celery-app instance.

from celery import Celery


# Broker(RabbitMQ) connection str
CELERY_BROKER: str = (
    f"pyamqp://user:Pass1234@rabbitmq:5672//"
)

# Result Backend(Redis) connection str
CELERY_BACKEND: str = f"redis://redis:6379"


# Celery App instance
celery_app = Celery(
    __name__, broker=CELERY_BROKER, backend=CELERY_BACKEND
)
Enter fullscreen mode Exit fullscreen mode

Our celery application is ready. Now lets make a task for it to execute.

First, we will define a function send_newsletter_welcome_email_task that accepts an email address string and sends a welcome email to that address. Lets import and use smtplib and loguru for email sending and logging. Then we will import the celery_app instance from the worker.py and decorate our email sending function with @celery_app.task().

from loguru import logger
from smtplib import SMTP

from .worker import celery_app


@celery_app.task()
def send_newsletter_welcome_email_task(email: str):
    logger.info(f"Send welcome email task received")
    with SMTP(
            host="smtp.freesmtpservers.com",
            port=25,
            timeout=60,
    ) as smtp:
        from_addr = "newsletter@mehedees.dev"
        smtp.sendmail(
            from_addr=from_addr,
            to_addrs=email,
            msg=f"To:{email}\nFrom: {from_addr}\r\nSubject: Welcome\n\nWelcome to the newsletter!",
        )
        logger.info("Email successfully sent")
    logger.info(f"Send welcome email task finished")
    return email
Enter fullscreen mode Exit fullscreen mode

Our task is ready. Now we will define a mock web app to trigger our task.

from fastapi import FastAPI, Body
from loguru import logger

from .tasks import send_newsletter_welcome_email_task


app = FastAPI(
    debug=True,
    title="Test Python Celery",
    description="Test basics of Python Celery",
    openapi_url=f"/openapi.json",
)


@app.post(path='/newsletter/signup')
async def newsletter_signup(email: str = Body(embed=True)):
    logger.info(f"Received newsletter signup request from {email}")
    # Doing some processing bla bla bla
    logger.info("Initiating welcome email sending task")
    send_newsletter_welcome_email_task.delay(email)
    # Return response now, celery will take care of sending the welcome mail
    return {
        'success': 'True',
        'code': 200,
    }
Enter fullscreen mode Exit fullscreen mode

We have created a quick FastAPI web application with newsletter signup route. We’ve also imported our task here. The view function does some dummy processing and then calls the email sending task.
To call the task we call .delay() on our task: send_newsletter_welcome_email_task.delay(email). When triggered, the task will be sent to the broker(RabbitMQ). Then the broker will deliver the task to the available worker. The worker will then execute the task resulting in the welcome email being sent.

We could also trigger the task with .apply_async([email,]). .delay() is actually a shortcut for .apply_async().

Our code is now almost ready. We have defined the celery app instance, defined a task, created a web app to trigger our task. We have one modification left for the celery app instance, we need to introduce our task to the application beforehand. Otherwise our worker won’t recognize the task that has been delivered to it for execution. Lets do it

from celery import Celery


# Broker(RabbitMQ) connection str
CELERY_BROKER: str = (
    f"pyamqp://user:Pass1234@rabbitmq:5672//"
)

# Result Backend(Redis) connection str
CELERY_BACKEND: str = f"redis://redis:6379"


# Celery App instance
celery_app = Celery(
    __name__, broker=CELERY_BROKER, backend=CELERY_BACKEND
)

# Autodiscovery of defined tasks
celery_app.autodiscover_tasks(packages=['app'])
Enter fullscreen mode Exit fullscreen mode

The last line discovers files named task.py inside the package paths provided to the autodiscover_tasks method.

Our code is ready now. Lets run our Celery Worker with following command

celery -A app.worker.celery_app worker --loglevel=INFO

We will see something like

/usr/local/lib/python3.11/site-packages/celery/platforms.py:829: SecurityWarning: You're running the worker with superuser privileges: this is
2023-09-13T18:28:38.979250505Z absolutely not recommended!
2023-09-13T18:28:38.979365023Z 
2023-09-13T18:28:38.979406969Z Please specify a different user using the --uid option.
2023-09-13T18:28:38.979557797Z 
2023-09-13T18:28:38.979587460Z User information: uid=0 euid=0 gid=0 egid=0
2023-09-13T18:28:38.979641167Z 
2023-09-13T18:28:38.979666256Z   warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(
2023-09-13T18:28:39.124459733Z  
2023-09-13T18:28:39.124513560Z  -------------- celery@e96b71c99d22 v5.3.1 (emerald-rush)
2023-09-13T18:28:39.124523657Z --- ***** ----- 
2023-09-13T18:28:39.124527360Z -- ******* ---- Linux-5.15.49-linuxkit-x86_64-with 2023-09-13 18:28:39
2023-09-13T18:28:39.124530601Z - *** --- * --- 
2023-09-13T18:28:39.124533566Z - ** ---------- [config]
2023-09-13T18:28:39.124536551Z - ** ---------- .> app:         app.worker:0x7fcc3b5f9cd0
2023-09-13T18:28:39.124539840Z - ** ---------- .> transport:   amqp://user:**@rabbitmq:5672//
2023-09-13T18:28:39.124542931Z - ** ---------- .> results:     redis://redis:6379/
2023-09-13T18:28:39.124546424Z - *** --- * --- .> concurrency: 6 (prefork)
2023-09-13T18:28:39.124549579Z -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
2023-09-13T18:28:39.124552584Z --- ***** ----- 
2023-09-13T18:28:39.124555464Z  -------------- [queues]
2023-09-13T18:28:39.124558372Z                 .> celery           exchange=celery(direct) key=celery
2023-09-13T18:28:39.124561364Z                 
2023-09-13T18:28:39.124564209Z 
2023-09-13T18:28:39.124567139Z [tasks]
2023-09-13T18:28:39.124570067Z   . app.tasks.send_newsletter_welcome_email_task

[2023-09-13 18:29:10,192: INFO/MainProcess] mingle: searching for neighbors
2023-09-13T18:29:11.223127806Z [2023-09-13 18:29:11,222: INFO/MainProcess] mingle: all alone
2023-09-13T18:29:11.268981135Z [2023-09-13 18:29:11,268: INFO/MainProcess] celery@e96b71c99d22 ready.
2023-09-13T18:29:14.031251535Z [2023-09-13 18:29:14,030: INFO/MainProcess] Events of group {task} enabled by remote.
Enter fullscreen mode Exit fullscreen mode

Our worker is now running. Lets run our FastAPI app now on port 12345:

uvicorn app.app:app --reload --workers 1 --host 0.0.0.0 --port 12345

We will see something like:

INFO:     Will watch for changes in these directories: ['/app']
2023-09-13T18:28:36.833486177Z INFO:     Uvicorn running on http://0.0.0.0:12345 (Press CTRL+C to quit)
2023-09-13T18:28:36.833500622Z INFO:     Started reloader process [1] using WatchFiles
2023-09-13T18:28:39.603132930Z INFO:     Started server process [8]
2023-09-13T18:28:39.603200518Z INFO:     Waiting for application startup.
2023-09-13T18:28:39.604262220Z INFO:     Application startup complete.
Enter fullscreen mode Exit fullscreen mode

All is ready now. Lets finally test if Celery can execute our task asynchronously. We will now post an email address to our newsletter signup route

curl -d '{"email":"test.celery@mehedees.dev"}' -H "Content-Type: application/json" -X POST http://0.0.0.0:12345/newsletter/signup

Our mock app logs:

2023-09-13 18:31:28.498 | INFO     | app.app:newsletter_signup:17 - Received newsletter signup request from test.celery@mehedees.dev
2023-09-13T18:31:28.499976407Z 2023-09-13 18:31:28.499 | INFO     | app.app:newsletter_signup:19 - Initiating welcome email sending task
2023-09-13T18:31:29.099523900Z INFO:     172.18.0.1:61574 - "POST /newsletter/signup HTTP/1.1" 200 OK
Enter fullscreen mode Exit fullscreen mode

Mock app has triggered the email sending task(sent to broker) and returned response without waiting for the blocking email sending task to finish executing. Meanwhile Celery worker logs:

[2023-09-13 18:31:29,103: INFO/MainProcess] Task app.tasks.send_newsletter_welcome_email_task[cb2f6812-16d6-48ec-9e04-fa20b57bfead] received
2023-09-13T18:31:29.141559704Z 2023-09-13 18:31:29.129 | INFO     | app.tasks:send_newsletter_welcome_email_task:9 - Send welcome email task received
2023-09-13T18:31:31.923187151Z 2023-09-13 18:31:31.922 | INFO     | app.tasks:send_newsletter_welcome_email_task:21 - Email successfully sent
2023-09-13T18:31:32.237598103Z 2023-09-13 18:31:32.237 | INFO     | app.tasks:send_newsletter_welcome_email_task:22 - Send welcome email task finished
2023-09-13T18:31:32.319138219Z [2023-09-13 18:31:32,313: INFO/ForkPoolWorker-4] Task app.tasks.send_newsletter_welcome_email_task[cb2f6812-16d6-48ec-9e04-fa20b57bfead] succeeded in 3.189990901999977s: 'test.celery@mehedees.dev'
Enter fullscreen mode Exit fullscreen mode

Worker has successfully received the task from broker and executed it asynchronously.

You can find the fully functional and dockerized codebase here.

I started this article originally to share my experience with Celery Canvas Workflows and Concurrency options but then realized, an introductory article might be helpful. So, I'll be writing at least two more articles, one focusing on production ready Celery app development and another on advanced features and issues.

Constructive criticisms and feedback are welcome!

Keep learning, never settle(sorry oneplus) and, till I see you again!

Edit: The 2nd post on Configuring Celery is now available.

Top comments (0)