DEV Community

Cover image for Working with Multiple Celery Queues in Django
Zach
Zach

Posted on • Originally published at circumeo.io

Working with Multiple Celery Queues in Django

Celery is an asynchronous task queue system based on distributed message passing. It allows developers to execute background jobs concurrently, separate from the primary application flow, thus ensuring tasks like sending emails, generating reports, or processing data do not block or delay a user's request.

By offloading these heavyweight tasks to Celery, web frameworks like Django can quickly respond to the user and allow the task to run in the background, ensuring that users are not kept waiting.

While employing a single queue might suffice for smaller applications, as your project grows and tasks become more varied, you may find the need to prioritize certain tasks or run them on specialized workers. In this article we'll dive into how to use multiple queues with Django, both during local development and when deploying to production.

Integrating Celery with Django

We first need to set up our Django project to work with Celery before adding multiple queues.

The first step to adding Celery to Django is to make sure the necessary dependencies are installed. We could install them on the command line, but instead we'll use a requirements.txt file to make the process repeatable.

Django==4.1.7
celery==5.2.7
redis==4.5.5
gunicorn==20.1.0
Enter fullscreen mode Exit fullscreen mode

Now create a file named celery.py in the same directory containing your settings.py file. This file will act as the entry point for each Celery worker. Config settings are loaded here and the worker is told where to find task definitions. The celery.py file can also optionally contain scheduled tasks configuration.

import os
from datetime import timedelta

from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")

app = Celery("myapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

app.conf.beat_schedule = {
    "run-every-10-seconds": {
        "task": "myapp.tasks.foo",
        "schedule": timedelta(seconds=10),
    },
    "run-every-1800-seconds": {
        "task": "myapp.tasks.bar",
        "schedule": timedelta(seconds=1800),
    },
}
Enter fullscreen mode Exit fullscreen mode

Finally, we need to make a small update to settings.py to include the CELERY_BROKER_URL variable. The broker URL will contain a connection string that points to a Redis instance. This Redis instance will be responsible for managing the queues that contain the pending task data.

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL")
Enter fullscreen mode Exit fullscreen mode

Using an environment variable will allow the local and production deployments to work without code changes.

With that final tweak our Django project should be almost ready to work with Celery. Next we'll see how to containerize this setup and run it locally using Docker.

Local Development with Django and Celery

We can use Docker and Compose together to make local development with Django and Celery easier. This method also makes it easier to collaborate on the project with other developers.

The first step is to containerize the Django and Celery worker processes. To do this, we'll write a Dockerfile that contains the instructions to create an image with the necessary dependencies.

FROM python:3.9-slim-buster

ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE 1

RUN apt-get update && apt-get install -y \
    libpq-dev \
    && apt-get -y clean

WORKDIR /usr/src/app

COPY ./requirements.txt /usr/src/app

RUN pip install --upgrade pip && pip install -r requirements.txt

COPY . /usr/src/app/
Enter fullscreen mode Exit fullscreen mode

If you're familiar with the Dockerfile format, you may notice there is no CMD argument at the end. This is intentional, as we'll use docker-compose to specify the command later. This will allow one Dockerfile to work for both the Django and Celery containers.

Using Docker Compose to Manage the Containers

This is a simple docker-compose.yml file that will configure a Django dev server, Celery worker, and Redis instance. We use the dev server here because we'd like to take advantage of the hot reload feature. The production deployment will use Gunicorn to serve the application.

version: '3.8'

services:
  web:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: django_dev
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ${PWD}:/usr/src/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    ports:
      - "8000:8000"
    depends_on:
      - redis

  celery:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: celery_worker
    command: celery -A myapp worker --loglevel=info
    volumes:
      - ${PWD}:/usr/src/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    depends_on:
      - redis

  redis:
    image: redis:latest
    container_name: redis
    ports:
      - "6379:6379"
Enter fullscreen mode Exit fullscreen mode

Starting and running this collection of containers requires just one command, which will also build images and download any remote images, such as for the Redis container.

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Implementing a Simple Celery Task

Now that we have all the containers up and running, we should take a moment to implement a proof-of-concept task.

We'll add a new file named tasks.py to the multiple_queues_app/ directory and implement a simple async task. The task will compute the Nth Fibonacci number, albeit in a very inefficient recursive fashion.

from celery import shared_task

@shared_task
def fibonacci(n):
    if n <= 1:
        return n
    else:
        return fibonacci(n - 1) + fibonacci(n - 2)
Enter fullscreen mode Exit fullscreen mode

Now we need a view that we can call to invoke this task and process it in the background. Add the following code in the multiple_queues_app/views.py file.

from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from .tasks import fibonacci

@csrf_exempt
def calculate_fibonacci(request):
    if request.method == "POST":
        try:
            N = int(request.POST.get("N"))
            result = fibonacci.delay(N)
            return JsonResponse(
                {"task_id": result.task_id, "status": "Task submitted successfully!"},
                status=200,
            )
        except ValueError:
            return JsonResponse({"error": "Invalid N value provided."}, status=400)
    else:
        return JsonResponse({"error": "Only POST method is allowed."}, status=405)
Enter fullscreen mode Exit fullscreen mode

Of course, we're almost there, but we need to add a urls.py to the app, and update the main project urls.py as well before this view will respond.

from django.urls import path
from . import views

urlpatterns = [
    path("fibonacci/", views.calculate_fibonacci, name="calculate_fibonacci"),
]
Enter fullscreen mode Exit fullscreen mode
from django.contrib import admin
from django.urls import include, path

urlpatterns = [
    path("admin/", admin.site.urls),
    path("", include("multiple_queues_app.urls")),
]
Enter fullscreen mode Exit fullscreen mode

With the view and task in place, we can now give this all a try. First issue a docker-compose restart so that the Celery worker code is updated. Now we can use any tool of our choice to test the view, but I'll use curl to make a request.

curl -X POST -d "N=25" http://localhost:8000/fibonacci/
Enter fullscreen mode Exit fullscreen mode

Watching the logs of the Celery worker, we should see the task be enqueued and take about 3 seconds to execute.

Using Watchdog for Celery Worker Hot Reload

You may have noticed that we had to restart the Celery worker manually before we could try out the new task. Manually restarting the worker is going to become cumbersome very quickly. Ideally, we could have functionality similar to hot reload in the Django dev server.

Luckily, the watchdog utility provides an easy way to restart a process when a directory changes. Let's modify the docker-compose.yml file to use the watchmedo script of the watchdog utility, in the command section for Celery.

  celery:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: celery_worker
    command: watchmedo auto-restart --directory=multiple_queues_app --pattern=*.py --recursive -- celery -A myapp worker --loglevel=info
    volumes:
      - ${PWD}:/usr/src/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    depends_on:
      - redis
Enter fullscreen mode Exit fullscreen mode

Before updating the containers, however, we first need to add a requirements-dev.txt file and modify the Dockerfile to install from it.

FROM python:3.9-slim-buster

ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE 1

RUN apt-get update && apt-get install -y \
    libpq-dev \
    && apt-get -y clean

WORKDIR /usr/src/app

COPY ./requirements.txt /usr/src/app
COPY ./requirements-dev.txt /usr/src/app

RUN pip install --upgrade pip && pip install -r requirements.txt && pip install -r requirements-dev.txt

COPY . /usr/src/app/
Enter fullscreen mode Exit fullscreen mode

With the containers rebuilt and restarted, you should now be able to modify Python files and see the Celery worker restart automatically.

Adding Multiple Queues in Local Development

How can we update this local development setup to include multiple Celery queues?

The process is as simple as adding another service to the docker-compose.yml file. The problem here, however, is that your Compose file can start to become long and full of duplication. Instead of copying the entire Celery service block over and over, we can use some templating magic to keep things DRY.

Below is the updated docker-compose.yml with a new Celery worker for a specific queue added.

version: '3.8'

x-worker-opts: &worker-opts
  build:
    context: .
    dockerfile: Dockerfile
  volumes:
    - ${PWD}:/usr/src/app
  environment:
    - CELERY_BROKER_URL=redis://redis:6379/0
  depends_on:
    - redis

services:
  web:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: django_dev
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ${PWD}:/usr/src/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    ports:
      - "8000:8000"
    depends_on:
      - redis

  fibonacci-worker:
    command: tools/start_celery.sh -Q fibonacci --concurrency=1
    <<: *worker-opts

  prime-worker:
    command: tools/start_celery.sh -Q prime --concurrency=1
    <<: *worker-opts

  redis:
    image: redis:latest
    container_name: redis
    ports:
      - "6379:6379"
Enter fullscreen mode Exit fullscreen mode

We also need to tweak the Python task code as well. Previously, all tasks were sent to a single worker that handled everything. Now that we're dividing work into separate queues, we need to specify which queue should handle a task.

from celery import shared_task

@shared_task(queue="fibonacci")
def fibonacci(n):
    if n <= 1:
        return n
    else:
        return fibonacci(n - 1) + fibonacci(n - 2)

@shared_task(queue="prime")
def nth_prime(n):
    count = 0
    num = 1

    while count < n:
        num += 1
        if num < 2:
            continue
        if any(num % i == 0 for i in range(2, int(num**0.5) + 1)):
            continue
        count += 1

    return num
Enter fullscreen mode Exit fullscreen mode

Production Deployment with Django and Celery

There are many, many ways to deploy Celery workers to a production environment, but in this article I'll focus on a relatively simple technique. For this article we'll go through the process of deploying with a combination of GitHub Actions and Ansible.

Ansible is a reliable and time tested framework for applying configuration to remote machines over SSH. Users define playbooks that configure applications and services on remote machines. Ansible takes these playbooks and checks that the target machine matches the desired configuration, installing and updating whatever is necessary to bring the machine up to date.

You might ask why not use Docker and Compose for the production server as well? That can be a legitimate choice, but a tool like Ansible makes it possible to deploy your application across multiple machines. Doing the same with Docker most likely means using a tool like Kubernetes or Nomad, which increases the complexity level significantly.

Because Ansible works through SSH, we first need to grant the tool access to the target machine.

In a real production scenario, you would probably want to use a virtual machine image that has a specific Ansible user already provisioned. For testing purposes, I've simply started a Digital Ocean droplet with a root user. We'll generate a new SSH key pair locally and then copy the public key to the target machine. The private key will reside on GitHub as a secret.

ssh-keygen -f ansible-key
ssh-copy-id -i ansible-key.pub root@24.199.126.163
Enter fullscreen mode Exit fullscreen mode

You should of course change the IP address to that of a machine you control. The ssh-copy-id program assumes that your local user can already log in with a valid SSH key or password. The point here is to set up a newly generated key-pair that only Ansible will use.

Now that the public key has been uploaded to the target machine, we'll store the private key on GitHub so that Ansible can log in while running inside an Actions pipeline. From the command above, we named the key-pair as ansible-key and ansible-key.pub -- we need to upload the private (i.e. non .pub) key to GitHub.

Using a GitHub Actions Workflow to Trigger Ansible

Before diving into the Ansible itself, we'll need something to trigger and execute the playbook. GitHub Actions are a convenient tool for this purpose. For this simple tutorial, we'll run the Ansible playbook on every push to the main branch. More robust setups could use the branch name to determine whether to deploy to a staging environment or to production.

For now, however, here is a simple deploy.yml that initiates the Ansible playbook. It also demonstrates how to pass GitHub secrets into Ansible.

name: Deploy

on:
  push:
    branches:
      - main

jobs:
  update_celery:
    runs-on: ubuntu-latest
    environment: production
    container: willhallonline/ansible:alpine
    steps:
      - name: Checkout repository
        uses: actions/checkout@v2
        with:
          fetch-depth: 0

      - name: Set up SSH key
        working-directory: ansible
        env:
          ANSIBLE_SSH_PRIVATE_KEY: ${{ secrets.ANSIBLE_SSH_PRIVATE_KEY }}
        run: |
          echo "$ANSIBLE_SSH_PRIVATE_KEY" > id_rsa
          chmod 600 id_rsa

      - name: Run Ansible playbook
        working-directory: ansible
        run: ansible-playbook -i inventory.ini playbook.yml --tags "app,redis,celery" --extra-vars "redis_password=${{ secrets.REDIS_PASSWORD }}"
Enter fullscreen mode Exit fullscreen mode

Using Systemd to Manage Celery Workers

Within our Ansible playbook we'll have a few tasks responsible for setting up Celery workers and keeping the code updated. I find it useful to create an ansible directory to store all of this configuration.

Here's an overview of the directory structure, to give an idea of what's involved with Ansible in our project.

ansible
├── ansible.cfg
├── inventory.ini
├── playbook.yml
└── roles
    ├── app
    │   ├── tasks
    │   │   └── main.yml
    │   └── templates
    │       ├── gunicorn.conf.py.j2
    │       ├── gunicorn-run-dir.service.j2
    │       └── gunicorn.service.j2
    ├── celery
    │   ├── tasks
    │   │   └── main.yml
    │   └── templates
    │       ├── celery-beat.service.j2
    │       └── celery.service.j2
    └── redis
        └── tasks
            └── main.yml
Enter fullscreen mode Exit fullscreen mode

There are two primary tasks, one of which does the initial setup and installation of Redis, and another to configure the Celery workers. Since this approach only uses Docker for local development, we'll use the systemd service to orchestrate Celery. If you've never used systemd, don't worry, it won't be too complicated. Using systemd is an easy way to ensure that Celery is brought back up if the VM should restart or if the worker should die.

We'll have a separate systemd service for each worker, and one for the celery-beat instance that's used for scheduled tasks. I won't show both config files, as they are very similar, but below is the systemd unit file for the Celery worker.

[Unit]
Description=Celery Worker {{ item.name }} for Multiple Queues App
After=network.target

[Service]
Environment="CELERY_BROKER_URL=redis://:{{ redis_password }}@localhost:6379/0"
Environment="PYTHONUNBUFFERED=1"
Type=simple
User=root
Group=root
WorkingDirectory={{ app_path }}
ExecStart={{ app_path }}/.venv/bin/celery -A multiple_queues_app worker -Q {{ item.name }} --loglevel=info
Restart=always
RestartSec=10
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=celery-{{ item.name }}

[Install]
WantedBy=multi-user.target
Enter fullscreen mode Exit fullscreen mode

You'll notice that the template contains Jinja tags. This will allow us to write a unit file for each separate Celery queue, keeping messy duplication to a minimum as we expand the number of queues.

Defining a new Celery queue is done at the playbook level.

- hosts: production
  become: yes 
  vars:
    app_repo: git@github.com:zchtodd/multiple_queues_app.git
    app_path: /opt/multiple_queues_app
    venv_path: /opt/multiple_queues_app/.venv
    gunicorn_config_path: /etc/systemd/system/gunicorn.service
    run_dir_maker_config_path: /etc/systemd/system/gunicorn-run-dir.service
    celery_workers:
      - name: fibonacci
Enter fullscreen mode Exit fullscreen mode

To create a new type of queue, you can add the name to the celery_workers list and Ansible will take care of writing the config files via the templating system.

Deploying Celery with Ansible

The next step is to use Ansible to create the Celery worker services. This process is fairly straightforward and just involves writing out the systemd config files that we saw earlier. A separate Ansible role handles cloning the app code and starting the web server process. You can find all of the details in the project GitHub repository.

- name: Create Celery logs directories
  ansible.builtin.file:
    path: "/var/log/celery/{{ item.name }}"
    state: directory
  loop: "{{ celery_workers }}"

- name: Create Celery Beat Systemd service file
  ansible.builtin.template:
    src: "{{ playbook_dir }}/roles/celery/templates/celery-beat.service.j2"
    dest: "/etc/systemd/system/celery-beat.service"
    mode: "0644"

- name: Create Celery Systemd service files
  ansible.builtin.template:
    src: "{{ playbook_dir }}/roles/celery/templates/celery.service.j2"
    dest: "/etc/systemd/system/celery-{{ item.name }}.service"
    mode: "0644"
  loop: "{{ celery_workers }}"

- name: Reload Systemd configuration
  ansible.builtin.systemd:
    daemon_reload: yes

- name: Enable and start Celery services
  ansible.builtin.systemd:
    name: "celery-{{ item.name }}"
    state: started
    enabled: yes
  loop: "{{ celery_workers }}"

- name: Enable and start Celery beat
  ansible.builtin.systemd:
    name: "celery-beat"
    state: started
    enabled: yes

- name: Restart Celery service
  ansible.builtin.systemd:
    name: "celery-{{ item.name }}"
    state: restarted
    daemon_reload: yes
  loop: "{{ celery_workers }}"

- name: Restart Celery beat
  ansible.builtin.systemd:
    name: "celery-beat"
    state: restarted
    daemon_reload: yes
Enter fullscreen mode Exit fullscreen mode

Once the Celery workers have been set up, you can use the journalctl and systemctl programs to monitor and manage the processes.

To tail the logs of the Fibonacci worker, for example, you would execute the following command.

journalctl -u celery-fibonacci.service -f
Enter fullscreen mode Exit fullscreen mode

Multiple Celery Queues Example App

This tutorial doesn't show all of the code, so if you'd like to see the full implementation, the example app is hosted on GitHub.

The full app should hopefully be a complete template that you can expand on.

Happy hacking!

Top comments (0)