We don't want an user waiting too long for a response i.e we want our server's response time to be as minimal as possible. But, sometimes an user request can require time consuming tasks e.g: Image/Video processing, Exporting data, Generating reports etc and then, it becomes impossible to respond to the user very fast synchronously.
The solution to this problem is, using another server to perform these time consuming tasks in the background while the main server responds to client requests.
This is where the concept of performing asynchronous tasks come in. In python based web frameworks, a good way to do it is using Celery. We would also need a message broker which will manage the message queue(s) for us. We will use RabbitMQ as our message broker today.
So, Here's the overview of what we will be designing today:
We will be inserting tasks into the queue from our celery client (django app). Then, we will be initiating some celery workers who will consume the tasks sequentially from the message queues managed by RabbitMQ. Multiple celery workers can subscribe to a single queue. We can utilize this to allocate more workers to a queue that contains high priority tasks. Finally, we will dockerize our solution for deployment as we did throughout this series.
First, lets update our docker-compose.yml to add a RabbitMQ container.
version: '3.7'
services:
rabbit:
image: rabbitmq:3.7-management
restart: always
ports:
- "5672:5672"
- "15672:15672"
env_file:
- .live.env
db:
image: mysql:5.7
ports:
- "3306:3306"
restart: always
volumes:
- production_db_volume:/var/lib/mysql
env_file:
- .live.env
# or
# db:
# image: postgres:12.5
# ports:
# - "5432:5432"
# restart: always
# volumes:
# - production_db_volume:/var/lib/postgresql/data/
# env_file:
# - .live.env
app:
build:
context: .
ports:
- "8000:8000"
volumes:
- production_static_data:/vol/web
restart: always
env_file:
- .live.env
depends_on:
- db
proxy:
build:
context: ./proxy
volumes:
- production_static_data:/vol/static
restart: always
ports:
- "80:80"
depends_on:
- app
volumes:
production_static_data:
production_db_volume:
By default, Rabbit uses 5672 port for the main rabbit server and 15672 for the rabbit admin.
Now, we need to add the following to our .env file for RabbitMQ:
RABBITMQ_DEFAULT_USER=blah_blah_bleh
RABBITMQ_DEFAULT_PASS=blah_blah_bleh
Now, if we build and run our docker-compose, our RabbitMQ server will be ready.
docker-compose up --build
Our message broker is ready. Now, we need to setup the producer and consumers.
First we need to install celery in our python virtual environment.
pip install Celery
# optional but super convenient way to check results of the tasks right in django admin
pip install django-celery-results
We will now create a new file celery.py in the same folder as our django settings.py. This portion is taken straight from the celery documentation here
celery.py
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
app = Celery('mysite')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
Now, we need to edit the init.py file in the same folder as the settings.py file.
init.py
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
Now we need to update the settings.py file with the celery configuration like so,
INSTALLED_APPS = (
...,
'django_celery_results', # optional but super convenient way to check results of the tasks right in django admin
)
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_TIMEZONE = "YourTimeZone"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
# CELERY_RESULT_BACKEND_DB = f'db+mysql+pymysql://{os.environ.get("MYSQL_USER")}:{os.environ.get("MYSQL_PASSWORD")}@db/{os.environ.get("MYSQL_DATABASE")}'
CELERY_BROKER_URL = f'amqp://{os.environ.get("RABBITMQ_DEFAULT_USER")}:{os.environ.get("RABBITMQ_DEFAULT_PASS")}@rabbit//'
CELERY_TASK_RESULT_EXPIRES = 18000
Note, the CELERY_BROKER_URL value, this is how we tell celery where our message broker is. If we used Redis instead of RabbitMQ our broker url would start with redis://
. The @rabbit
in the end of the url just maps to the host docker-compose service for the RabbitMQ container we just built.
If you chose to install django-celery-results
, you need to apply the migrations for django_celery_results:
python manage.py migrate django_celery_results
Now, we need to edit the docker entrypoint.sh to create the celery worker(s):
#!/bin/sh
set -e # exit if errors happen anywhere
python manage.py collectstatic --noinput
python manage.py migrate
celery -A mysite worker -l info --detach
uwsgi --socket :8000 --master --enable-threads --module mysite.wsgi
Thats all for the setup. Now, you can just create your tasks in a tasks.py file under each django app directory like so:
- app1/
- tasks.py
- models.py
- app2/
- tasks.py
- models.py
Now, celery will auto discover all the tasks in the django apps since we called app.autodiscover_tasks()
in our celery.py file.
If you want help on the syntax for writing and executing celery tasks check the official documentation:
1) Writing Celery Tasks
2) Executing a Celery Task
BONUS
If you have some tasks that need to be executed periodically you can use Celery Beat
. In order to setup beat you just need to do a few more things.
pip install django-celery-beat
Then add the app in your settings.py.
INSTALLED_APPS = (
...,
'django_celery_beat'
)
Now, there are two ways to define periodic tasks. One is statically defining them in your settings.py
file which i wouldn't recommend for django. This is because,
django celery beat actually enables you to store the periodic task schedules in the database. The periodic tasks can be managed from the Django Admin interface, where you can create, edit and delete periodic tasks and how often they should run. straight outta django-celery-beat docs
So, in order to setup the database backed beat scheduler all we need to do is add the following command to our docker entrypoint.sh
.
celery -A mysite beat -l info --detach --scheduler django_celery_beat.schedulers:DatabaseScheduler
Make sure to run the django migrate command to run the necessary migrations for the django celery beat app (Our design of the docker entrypoint already does this for us).
#!/bin/sh
set -e # exit if errors happen anywhere
python manage.py collectstatic --noinput
python manage.py migrate
celery -A mysite worker -l info --detach
celery -A mysite beat -l info --detach --scheduler django_celery_beat.schedulers:DatabaseScheduler
uwsgi --socket :8000 --master --enable-threads --module mysite.wsgi
Top comments (0)