DEV Community

Cover image for Django + Celery: testando sistemas com filas
Eduardo Oliveira
Eduardo Oliveira

Posted on • Updated on

Django + Celery: testando sistemas com filas

Esse texto tem o objetivo de descrever como foram implementados os testes com o django e o celery rodando juntos no back-end de um MVP de um produto. Não é objetivo desse texto descrever ou discutir as melhores práticas para testes com filas (melhores práticas, geralmente, dependem de contextos mais amplos), mas sim exemplificar um caso de uso em que precisávamos testar as nossas tarefas de segundo plano.

Conteúdos


Introdução

O celery é, de forma bem resumida, um sistema de filas que permite a execução de tarefas em segundo plano.

Imagine, por exemplo, uma aplicação de um fórum que, quando você envia uma requisição POST para comentar um tópico, seu sistema precise enviar um e-mail de notificação para todos os membros que já comentaram no tópico.

Suponha, seguindo o exemplo, que o envio de cada e-mail leve 500 milissegundos e que seu sistema precise enviar 15 e-mails. Perceba que, nessa lógica, se você faz essa requisição e espera o envio dos e-mails, comentar no tópico do seu fórum vai levar cerca de 7 segundos, o que é um tempo muito grande.

Uma forma de contornar isso: filas e processamento paralelo (com o que a gente costuma chamar de workers). O sistema agenda numa fila o envio dos e-mails e termina a requisição devolvendo as informações para o app/front-end. Nesse contexto, o envio dos e-mails será feito "em segundo plano".

Objetivos

Foi feita, até aqui, uma breve descrição do que é o celery e, agora, é importante deixarmos claro algumas coisas: esse texto não tem como objetivo ensinar você a usar o celery em conjunto (ou separado) com o django.

O objetivo desse texto é tratar sobre algumas estratégias de como escrever testes para o django junto com o celery.

Exemplo de Uso

Existem dois conceitos (ou duas formas de fazer a mesma coisa) que precisamos levar em consideração para implementar nossos testes e, portanto, precisamos detalhar isso com um exemplo de uso.

Suponha um model de compras que registra o preço da compra e suponha um outro model de produto que indica o valor vendido em produtos daquele tipo (a modelagem de dados abaixo é só pra exemplificar e não foi elaborada da melhor forma possível) e, para completar, uma tarefa (entenda tarefa como um método, uma função) que executa um cálculo de soma de valores vendidos desse produto específico:

# products/models.py
from decimal import Decimal
from django.db import models

class Product(models.Model):
    name = models.CharField(max_length=100)
    price = models.DecimalField(max_digits=15, decimal_places=2)
    total_purchased_value = models.DecimalField(max_digits=15, decimal_places=2, default=Decimal(0))

    def __str__(self):
        return self.name

    class Meta:
        verbose_name = 'Product'
        verbose_name_plural = 'Products'

class Purchase(models.Model):
    product =  models.ForeignKey(Product, on_delete=models.CASCADE)
    purchase_final_value = models.DecimalField(max_digits=15, decimal_places=2)

    def __str__(self):
        return str(self.product)

    class Meta:
        verbose_name = 'Purchase'
        verbose_name_plural = 'Purchases'
Enter fullscreen mode Exit fullscreen mode
# products/tasks.py
from celery import shared_task

@shared_task(name="calculate_total_purchased_value")
def calculate_total_purchased_value(id_product):
    from app.models import Product, Purchase

    product = Product.objects.filter(pk=id_product).first()
    purchases = Purchase.objects.filter(product=product).all()

    value = 0
    for purchase in purchases:
        value += purchase.purchase_final_value

    product.total_purchased_value = value
    product.save()
Enter fullscreen mode Exit fullscreen mode

Até aqui, é importante perceber que não chamamos a nossa tarefa em nenhum momento e, também, que ela faz um cálculo bem mal feito e que existem formas melhores de fazer isso (Referência Aqui), porém esse exemplo tem apenas o objetivo de mostrar como testar.

Outra questão pertinente é que o import dos models (from app.models import Product, Purchase) em app/tasks.py foi feito dentro do método calculate_total_purchased_value com o objetivo de evitar problemas de dependências circulares, tendo em vista que iremos importar esse método no app/models.py.

Por fim, para a chamada da nossa tarefa ao criar novas purchases, podemos fazer o seguinte:

# products/models.py
from decimal import Decimal
from django.db import models
from app.tasks import calculate_total_purchased_value

class Product(models.Model):
    name = models.CharField(max_length=100)
    price = models.DecimalField(max_digits=15, decimal_places=2)
    total_purchased_value = models.DecimalField(max_digits=15, decimal_places=2, default=Decimal(0))

    def __str__(self):
        return self.name

    class Meta:
        verbose_name = 'Product'
        verbose_name_plural = 'Products'

class Purchase(models.Model):
    product =  models.ForeignKey(Product, on_delete=models.CASCADE)
    purchase_final_value = models.DecimalField(max_digits=15, decimal_places=2)

    def __str__(self):
        return str(self.product)

    def save(self, *args, **kwargs):
        super(Purchase, self).save(*args, **kwargs)

        calculate_total_purchased_value.apply_async(args=[self.product.pk], countdown=2)

    class Meta:
        verbose_name = 'Purchase'
        verbose_name_plural = 'Purchases'
Enter fullscreen mode Exit fullscreen mode

Aqui vamos apenas chamar o método quando algum model Purchase for salvo pois é um exemplo e não precisamos lidar com outros casos de uso (deletar, etc.). A outra consideração é: existem duas formas de executar a tarefa em segundo plano:

  1. .apply_async(): mais completa, os argumentos devem ser passados como o parâmetro args e permite que eu adicione um countdown (tempo para que ela seja executada, referência aqui).
  2. .delay(): não permite countdown ou alguns outros parâmetros, a chamada é bem parecida com a chamada de uma função normal: calculate_total_purchased_value.delay(self.product.pk).

É importante para os próximos passos desse texto, sobre os testes, ter em mente qual dos dois métodos foi usado pois para a estratégia que será utilizada, terá algumas diferenças entre o uso de um ou de outro.

Estratégias

Podemos pensar, de forma básica, em duas estratégias para escrever testes (testar) o nosso processamento paralelo:

  1. Testar se as tarefas estão sendo "enfileiradas", isso é, se o método apply_async ou delay está sendo executado e, depois, testar o método calculate_total_purchased_value separadamente.

  2. Fazer com que ao invés de rodar o processamento paralelo, ele seja executado síncrono e então conseguimos fazer todos os testes como se não existisse processamento em segundo plano.

Nesse texto, iremos tratar da primeira estratégia. Nem sempre é a melhor estratégia, porém, é a que teve menor custo (adequações, cognitivo e de tempo) para ser implementada no projeto no momento em que foi decidido adicionar testes de integração no back-end.

Mocks com unittest

Pra usarmos a estratégia comentada acima (Item 1), o que faremos é: criar um mock para a função apply_async (ou delay) que é adicionada na função da nossa tarefa (pelo decorator do celery). Podemos começar criando um TestCase:

# products/tests.py
from django.test import TestCase
from products.models import Product, Purchase

class TaskTestCase(TestCase):
    def setUp(self):
        self.product = Product.objects.create(name="First Product", price=60)

    def test_save_purchase_call_task(self):
        # TODO
        pass
Enter fullscreen mode Exit fullscreen mode

A próxima coisa que precisamos fazer é criar nosso mock, podemos utilizar o decorator @patch do unittest (referência):

# products/tests.py
# ...
from unittest.mock import patch
# ...

   @patch("products.tasks.calculate_total_purchased_value.apply_async")
    def test_save_purchase_call_task(self, apply_async_mock):
        # TODO
        pass
# ...
Enter fullscreen mode Exit fullscreen mode

Perceba que o parâmetro utilizado dentro do @patch. É o caminho absoluto de importação do app (app do django), arquivo tasks.py, método calculate_total_purchased_value, método apply_async adicionado pelo decorator.

Agora, precisamos criar um item do Purchase para podermos validar se nossa tarefa foi executada:

# products/tests.py
# ...

    @patch("products.tasks.calculate_total_purchased_value.apply_async")
    def test_save_purchase_call_task(self, apply_async_mock):
        item = Purchase()
        item.product = self.product
        item.purchase_final_value = 50
        item.save()
# ...
Enter fullscreen mode Exit fullscreen mode

Finalmente, precisamos fazer a nossa assertion sobre a função apply_async ter sido chamada ou não:

# products/tests.py
# ...

    @patch("products.tasks.calculate_total_purchased_value.apply_async")
    def test_save_purchase_call_task(self, apply_async_mock):
        item = Purchase()
        item.product = self.product
        item.purchase_final_value = 50
        item.save()

        apply_async_mock.assert_called_once_with(args=[self.product.pk], countdown=2)

# ...
Enter fullscreen mode Exit fullscreen mode

Finalmente, rodando um comando python manage.py test, obtemos o incrível resultado esperado:

Terminal mostrando testes passando

Agora, podemos testar o outro lado da história (verificar se a tarefa calculate_total_purchased_value faz o que desejamos que ela faça). Vamos continuar utilizando o @patch nesse momento pois vamos precisar criar models Purchase e não queremos que ele chame a tarefa original nesse momento. Fora isso, o processo de testar agora é bem parecido com testar uma função comum:

# products/tests.py
# ...
from products.tasks import calculate_total_purchased_value
# ...


    @patch("products.tasks.calculate_total_purchased_value.apply_async")
    def test_background_task(self, apply_async_mock):
        purchase1 = Purchase()
        purchase1.product = self.product
        purchase1.purchase_final_value = 50
        purchase1.save()

        purchase2 = Purchase()
        purchase2.product = self.product
        purchase2.purchase_final_value = 40
        purchase2.save()

        calculate_total_purchased_value(self.product.pk)

        product = Product.objects.get(id=self.product.pk)

        self.assertEqual(product.total_purchased_value, 90)

# ...
Enter fullscreen mode Exit fullscreen mode

Executando nossos testes novamente, vemos agora dois testes passando:

Dois testes passando

Nosso arquivo tests.py final ficou dessa forma:

from django.test import TestCase
from unittest.mock import patch
from products.models import Product, Purchase
from products.tasks import calculate_total_purchased_value

class TaskTestCase(TestCase):
    def setUp(self):
        self.product = Product.objects.create(name="First Product", price=60)

    @patch("products.tasks.calculate_total_purchased_value.apply_async")
    def test_save_purchase_call_task(self, apply_async_mock):
        item = Purchase()
        item.product = self.product
        item.purchase_final_value = 50
        item.save()

        apply_async_mock.assert_called_once_with(args=[self.product.pk], countdown=2)

    @patch("products.tasks.calculate_total_purchased_value.apply_async")
    def test_background_task(self, apply_async_mock):
        purchase1 = Purchase()
        purchase1.product = self.product
        purchase1.purchase_final_value = 50
        purchase1.save()

        purchase2 = Purchase()
        purchase2.product = self.product
        purchase2.purchase_final_value = 40
        purchase2.save()

        calculate_total_purchased_value(self.product.pk)

        product = Product.objects.get(id=self.product.pk)

        self.assertEqual(product.total_purchased_value, 90)
Enter fullscreen mode Exit fullscreen mode

Perceba que aqui só escrevemos testes extremamente simples e não estamos nos preocupando com cenários de testes, efeitos colaterais nem nada do gênero, afinal esse texto é apenas um exemplo.

O código escrito nesse texto pode ser encontrado no seguinte repositório: testing-django-celery-tasks.


Referências

Top comments (0)