Have you ever wanted to dynamically add schedules for a large base of users in your service and found out that celery-beat
is limited? And you then stumble into django-celery-beat
, but you are using fastapi
or flask
? This article is just made for you!
TLDR: Use rdbbeat library to persist dynamic schedules in your RDB.
pip install rdbbeat
- Run
celery-beat
with the custom scheduler:-
python -m celery --app=server.tasks beat --loglevel=info --scheduler=rdbbeat.schedulers:DatabaseScheduler
.
-
- Run a celery worker and use rdbbeat models and controller to add/get schedules.
Introduction
celery-beat runs periodic tasks from schedules that are defined before run time. We want a tool that can add schedules dynamically to our systems during operation.
rdbbeat
extends the celery-beat
scheduling system in many ways to solve the above problem in a generic pattern. Firstly, the library is built with sqlalchemy
models and persists the schedules in your RDB. The library implements listeners on the database models to capture addition/deletion and/or modification of schedules during run time.
Usage Example
We can look at this example:
Build a company's admin tool that sends "Happy Birthday" messages to employees on their birthdays via email.
We can use rdbbeat
to add schedules for each employee's birthday dynamically during their on-boarding. The schedules are persisted in the database and can be modified or deleted at any time.
Complete code for this example can be found on Github.
1. Basic service setup with flask
and celery
mkdir rdbbeat-flask-example
cd rdbbeat-flask-example
python -m venv venv
source venv/bin/activate
pip install flask celery # view requirements.txt for other dependencies
2. Basic model and DB setup
- Create and run a database server (I used postgres)
- Create an employee model in SQLAlchemy
# server/models.py
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Employee(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80), nullable=False)
surname = db.Column(db.String(80), nullable=False)
date_of_birth = db.Column(db.Date, nullable=False)
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"surname": self.surname,
"date_of_birth": self.date_of_birth.strftime("%Y-%m-%d"),
}
3. Basic Flask app setup
- A simple flask app with a blueprint for employee routes
# server/app.py
import os
from dotenv import load_dotenv
from flask import Flask
from flask_cors import CORS
from flask_migrate import Migrate
from server.db_connection import DATABASE_URL
from server.models import db
load_dotenv()
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = DATABASE_URL
app.config["SECRET_KEY"] = os.getenv("SECRET_KEY")
# Celery configuration
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0"
app.config["CELERY_RESULT_BACKEND"] = "database"
app.config["CELERY_RESULT_DBURI"] = DATABASE_URL
app.config["CELERY_TRACK_STARTED"] = True
app.config["CELERY_SEND_EVENTS"] = True
app.config["BROKER_TRANSPORT_OPTIONS"] = {"visibility_timeout": 3600}
app.config["CELERY_DEFAULT_QUEUE"] = "default"
migrate = Migrate(app, db, directory="server/migrations", compare_type=True)
CORS(app)
db.init_app(app)
from server.views import employee_router # noqa isort:skip
app.register_blueprint(employee_router)
@app.route("/")
def index():
return "Learn to use the celery-rdbbeat scheduler!"
NB: Just adding celery config that we will use later
The
db_connection.py
file to holds the creation of DB connections with session managementThe
views.py
file holds the employee routes
from dateutil.parser import parse
from flask import Blueprint, jsonify, request
from rdbbeat.controller import Schedule, ScheduledTask, schedule_task
from server.db_connection import session_scope
from server.models import Employee
employee_router = Blueprint("employee_router", __name__, url_prefix="/api/v1")
@employee_router.post("/employees/")
def create_employee():
employee_data = request.get_json()
date_of_birth = parse(employee_data["date_of_birth"]).date()
employee = Employee(
name=employee_data["name"],
surname=employee_data["surname"],
date_of_birth=date_of_birth,
)
with session_scope() as session:
session.add(employee)
session.commit()
return jsonify(db_employee.to_dict()), 201
@employee_router.get("/employees/<int:employee_id>")
def get_employee(employee_id):
with session_scope() as session:
employee = session.query(Employee).get(employee_id)
if not employee:
return jsonify({"error": "Employee not found"}), 404
return jsonify(employee.to_dict())
Test Point
- Use flask-migration or pure alembic to create migrations and run them
# with flask-migrate
flask db init
flask db migrate -m "create employee table" # create migrations
flask db upgrade # create the table in the DB
- Check that your table is created in the DB (I used
TablePlus
) - Run the flask app
export FLASK_APP=server/app.py
flask run # (or python -m flask run)
- At this point, you should be able to create employees and get them from the DB using the routes we created above. You can use Postman here or Insomnia. I just used CURL.
# Create an employee
curl -X POST -H "Content-Type: application/json" -d '{"name": "John", "surname": "Doe", "date_of_birth": "1990-01-01"}' http://localhost:5000/api/v1/employees/
# {"date_of_birth":"1990-01-01","id":1,"name":"John","surname":"Doe"}
# Get an employee with id=1
curl http://localhost:5000/api/v1/employees/1
# {"date_of_birth":"1990-01-01","id":1,"name":"John","surname":"Doe"}
Now, let the scheduling fun begin!
4. Running the celery-beat with rdbbeat
as a custom scheduler
- Run the migrations to create
rdbbeat
tables (note that they live in thescheduler
schema, not thepublic
schema)
python -m alembic -n scheduler upgrade head
- Then run the celery-beat (in a separate terminal)
python -m celery --app=server.tasks beat --loglevel=info --scheduler=rdbbeat.schedulers:DatabaseScheduler
5. Create a simple celery app and link it to the flask app
# server/celery_worker.py
from celery import Celery
from server.app import app as flask_app
from server.db_connection import session_scope
REDIS_URL = "redis://localhost:6379/0"
def create_celery(flask_app=flask_app):
celery_app = Celery("flask", broker=REDIS_URL)
celery_app.conf.task_default_queue = "default"
celery_app.conf.broker_transport_options = {
"max_retries": 3,
"interval_start": 0,
"interval_step": 0.2,
"interval_max": 0.2,
}
# Provide session scope to `rdbbeat`
celery_app.conf.session_scope = session_scope
celery_app.conf.update(flask_app.config)
TaskBase = celery_app.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with flask_app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery_app.Task = ContextTask
return celery_app
app = create_celery(flask_app=flask_app)
NB: Be sure to provide the session scope to
rdbbeat
so that it can access the DB.
- Create a simple task
# server/tasks.py
from server.celery_worker import app
from server.db_connection import session_scope
from server.models import Employee, db
@app.task(name="birthday_greeting")
def birthday_greeting(employee_id):
with session_scope() as session:
employee = session.query(Employee).get(employee_id)
print(f"Happy birthday, {employee.name} {employee.surname}!")
# Send email to employee
# email_service.send_email(template="birthday_greeting", to=employee.email, context={"employee": employee.to_dict()})
# Remind his manager too, in case they forgot :)
- Now, modify the
views.py
file to add a schedule for each employee's birthday message
# server/views.py
...
from rdbbeat.controller import Schedule, ScheduledTask, schedule_task
@employee_router.post("/employees/")
def create_employee():
employee_data = request.get_json()
date_of_birth = parse(employee_data["date_of_birth"]).date()
employee = Employee(
name=employee_data["name"],
surname=employee_data["surname"],
date_of_birth=date_of_birth,
)
with session_scope() as session:
session.add(employee)
session.commit()
# Create birthday greeting task
db_employee = session.query(Employee).get(employee.id)
schedule = Schedule(
minute="*",
hour="*",
day_of_week="*",
day_of_month=str(date_of_birth.day),
month_of_year=str(date_of_birth.month),
timezone="UTC" # FIX-ME: get timezone from employee
)
task_to_schedule = ScheduledTask(
name=f"{db_employee.id}_birthday_greeting", # All tasks must have a unique name
task="birthday_greeting",
schedule=schedule,
)
# Provide task kwargs for when the task is executed
task_kwargs = {"employee_id": db_employee.id}
schedule_task(session=session, scheduled_task=task_to_schedule, **task_kwargs)
return jsonify(db_employee.to_dict()), 201
...
6. Run everything and test!
- For testing sanity, let's modify the schedule to run every minute
# server/views.py
...
schedule = Schedule(
minute="*",
hour="*",
day_of_week="*",
day_of_month="*",
month_of_year="*",
timezone="UTC"
)
...
- Check that the flask app in running in 1 terminal
- Check that the celery-beat is running in another terminal
- Then run the celery worker in another terminal
python -m celery --app=server.tasks worker --loglevel=info
- Run those curl commands again to create an employees.
7. It works!!
- You should see logs like these in terminal 2:
[2021-03-01 16:00:00,000: INFO/MainProcess] Scheduler: Sending due task birthday_greeting (birthday_greeting)
[2021-03-01 16:00:00,000: INFO/MainProcess] Scheduler: Sending due task birthday_greeting (birthday_greeting)
- And the celery worker logs should be happy like:
[2021-03-01 16:00:00,000: INFO/MainProcess] Received task: birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b]
[2021-03-01 16:00:00,000: INFO/MainProcess] Received task: birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b]
[2021-03-01 16:00:00,000: INFO/MainProcess] Task birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b] succeeded in 0.000s: None
[2021-03-01 16:00:00,000: INFO/MainProcess] Task birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b] succeeded in 0.000s: None
8. Advanced stuff
8.1. Single-beat
You can use single-beat to run celery-beat in a single process. It has its benefits - it basically safe guards against multiple celery-beat processes running at the same time.
8.2. Deploy with k8s
If you are running your system in kubernetes, you can start up the celery-beat in on pod, the celery-worker in a different pod and run your server application in a different pod. As long as they all have access to the same DB, they should be able to communicate with each other.
9. Alternatives.
9.1. Redbeat
According to its GitHub's docs, RedBeat is a Celery Beat Scheduler that stores the scheduled tasks and runtime metadata in Redis. The trade-off here is that your task schedules are stores in Redis and you have to ensure that your Redis instance is highly available to avoid losing your schedules.
9.2. Django Celery Beat
rdbbeat
does what Django Celery Beat does for django, but for SQLAlchemy. If you are using django framework for your server application, you can use Django Celery Beat
to schedule your tasks.
Top comments (0)