loading...

Django Celery Beat: How to get the last time a PeriodicTask was run

vergeev profile image Pavel Vergeev ・3 min read

The problem

Suppose you have the following task:

import time

from django_project.celery import app  # set up according to https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html#using-celery-with-django


@app.task
def my_task():
    print("I'm running!")
    time.sleep(10)
    print("I've finished!")

Suppose further my_task runs once in several days using django-celery-beat in a single worker process.

The question is: how can my_task get the last time it was run?

Take number one: Bind and Get

The first thing that comes to mind is, find where django-celery-beat puts the last running time of a PariodicTask and take that value.

So first you need to bind the task in order to get its name, then get the appropriate PeriodicTask and take its last_run_at attribute, like so:

@app.task(bind=True)
def my_task(self):
    last_run = PeriodicTask.objects.get(
        task=self.name  # notice PeriodicTask.name and self.name are different things
    ).last_run_at
    print(f"I'm running! Last time I ran at {last_run_at}")
    time.sleep(10)
    print("I've finished!")

The problem: PeriodicTasks will accumulate each time you restart celery process. Soon you'll be getting

django_celery_beat.models.PeriodicTask.MultipleObjectsReturned: get() returned more than one PeriodicTask -- it returned 16!

Take number two: Bind and Filter

In order to avoid MultipleObjectsReturned exception, we can filter out all irrelevant periodic tasks:

@app.task(bind=True)
def my_task(self):
    last_run = PeriodicTask.objects.filter(
        task=self.name
    ).order_by('last_run_at').last().last_run_at
    print(f"I'm running! Last time I ran at {last_run_at}")
    time.sleep(10)
    print("I've finished!")

The first problem: If we run my_task more often than once in five minutes, last_run_at is going to get duplicated:

[2020-02-13 20:15:41,884: WARNING/ForkPoolWorker-2] I'm running! Last time I ran at 2020-02-13 17:13:00.616557+00:00
[2020-02-13 20:15:51,886: WARNING/ForkPoolWorker-2] I've finished!
[2020-02-13 20:15:51,956: INFO/ForkPoolWorker-2] Task integration.tasks.test_task[1e413b12-455c-4740-8438-d6d1f8b18a35] succeeded in 10.409016128978692s: None
[2020-02-13 20:16:01,787: WARNING/ForkPoolWorker-2] I'm running! Last time I ran at 2020-02-13 17:13:00.616557+00:00
[2020-02-13 20:16:11,788: WARNING/ForkPoolWorker-2] I've finished!
[2020-02-13 20:16:11,825: INFO/ForkPoolWorker-2] Task integration.tasks.test_task[80a4467f-d0b2-4839-a0a4-1270bf02c2e8] succeeded in 10.18468949093949s: None
[2020-02-13 20:16:21,734: INFO/MainProcess] Received task: integration.tasks.test_task[5d4c126e-da9d-4986-8d37-d2d9a072bf60]
[2020-02-13 20:16:21,906: WARNING/ForkPoolWorker-2] I'm running! Last time I ran at 2020-02-13 17:16:01.635268+00:00
[2020-02-13 20:16:31,907: WARNING/ForkPoolWorker-2] I've finished!
[2020-02-13 20:16:31,939: INFO/ForkPoolWorker-2] Task integration.tasks.test_task[5d4c126e-da9d-4986-8d37-d2d9a072bf60] succeeded in 10.203773486078717s: None

Fortunately, the problem does not persist if the task is run once in several days. So it would be a valid solution, as long as you don't need to change the schedule of the periodic task.

The second problem: Suppose you do need to change the schedule of the periodic task. You actually cannot do that, or at least I haven't found a way to do that with code. So after PeriodicTask gets reinstantiated with the new schedule, last_run_at becomes None.

The third problem: last_run_at also becomes None if you want to change the time zone.

So what I'm saying is, last_run_at is not a reliable solution for a long-running task.

Take number three: Store it somewhere else

By this time I knew what I wanted: a reliable way to store the last running time of a task that gets executed rarely in a single process. So I've decided to just store it in the database:

# models.py

class PeriodicTaskRun(models.Model):
    task = models.CharField(max_length=200, verbose_name='Task Name')
    created_at = models.DateTime(auto_now_add=True)

# tasks.py

@app.task(bind=True)
def my_task(self):
    last_run = PeriodicTaskRun.objects.filter(task=self.name).latest()
    PeriodicTaskRun.objects.create(task=self.name)

    print(f"I'm running! Last time I ran at {last_run.created_at}")
    time.sleep(10)
    print("I've finished!")

If you know any caveats to this solution I should know about, or if you have a better one, please let me know in the comments!

Discussion

pic
Editor guide
Collapse
furtivusmaj profile image
Ahmed Mej

My application has 2 periodic tasks which run for a couple of seconds, and run once per day.
Do you recommend using this method ? It's the most natural thing that comes to mind

Collapse
vergeev profile image
Pavel Vergeev Author

Yeah! Especially if you don't really need to scale this solution to thousands different tasks that launch millions of times.

Since writing this article I've moved on to a different project with a big load of users. And you know what? It uses the same solution! Except on this project we store last_run in Redis, not Postgres, since we don't need to persist it forever.

Collapse
jheld profile image
Jason Held

Curious what is the scale of your tasks? Django celery beat was not necessarily built to handle load, though if you have separated this database to only be used for beat, that may help.

Collapse
vergeev profile image
Pavel Vergeev Author

My tasks are network-intensive, and are supposed to run for a month or so.

A separate database is a great idea for this kind of tasks, thanks!