DEV Community

Wesley de Morais
Wesley de Morais

Posted on

Entendendo Task Queue com Django, Celery e RabbitMQ

Sumário

  • Introdução sobre task queue
  • Entendendo o que é celery e rabbitmq
  • Estudo de caso com django
  • Criando a Api com DRF sem Task Queue
  • Usando task queue na API
  • Referências

Introdução sobre Task Queue

Quando estamos criando os nossos projetos existem determinadas tarefas que demoram um pouco(Ou muito) para serem processadas, exemplo disso é a geração de relatórios e acesso a determinadas APis que podem estar fora do ar por vários motivos, como acesso a api do governo para checar o CPF de alguém, afim de verificar se o portador tem algum problema legal. Desta forma é imprescindível que o usuário não fique esperando determinadas tarefas serem concluídas para ter um feedback do que aconteceu ou o que poderá acontecer. Portanto, a task queue, ou também chamada de fila de tarefas, é um modo de postergar determinadas tarefas, dessa maneira essas tarefas vão ser feitas de forma assíncronas em relação ao fluxo da aplicação.

taskqueue

Na imagem acima vemos como funciona esse conceito, assim existe um produtor(Producers) que cria as Tasks para serem processadas, essas vão ser armazenadas dentro de uma fila, dentro do message broker, posteriormente essas tasks vão ser consumidas pelos consumidores(Consumers) para serem processadas, esses podem ou não fazer chamadas ao banco de dados, dependendo da tarefa.

Entendendo o que é o Celery e RabbitMQ

O Celery é uma aplicação python que nos possibilita ter uma abstração para construção de uma Task Queue, este tem uma dependência da existência de um message broker para funcionar. Os consumidores do celery são chamados de Workers que são processos no qual sempre ficam fazendo solicitações para o broker por novas tasks para serem processadas. O RabbitMQ é um dos brokers suportados, no qual tem o objetivo de guardar determinadas tasks que serão consumidas pelos workers. Junto com o celery é possível criar múltiplas filas de prioridades, afim de determinadas tarefas serem processadas antes de outras. O protocolo de comunicação entre o produtor e o broker é chamado de AMQP.

Estudo de caso com Django

Para entender melhor como funciona a task queue com o celery vamos para uma situação problema, imagine que temos uma aplicação que faz o gerenciamento de uma loja de roupas, no qual podemos gerenciar nossas vendas, um dos endpoints que temos é para gerar relatórios no formato xlsx(Excel) das nossas vendas. Assim podemos ter o seguinte diagrama entidade relacionamento:

models
Em resumo no nosso diagrama entidade relacionamento acima temos que um vendedor fazendo muitas vendas e uma venda tendo muitas roupas.

Criando a Api com DRF sem Task Queue

Dependências

Primeiramente, crie uma ambiente virtual para instalar as dependências da aplicação. Dentro do ambiente virtual instale as seguintes dependências:

pip install django djangorestframework model_bakery openpyxl
Enter fullscreen mode Exit fullscreen mode

Instalamos o django e django rest framework para fazer a criação da nossa aplicação; também instalamos o model bakery e openpyxml, a primeira para fazer determinados fakes para a gente fazer o nosso teste, e a segunda para criar o nosso relatório.

Dentro da pasta do ambiente virtual digite o comando para criar um projeto e a nossa aplicação:

django-admin startproject store_app .
django-admin startapp core
Enter fullscreen mode Exit fullscreen mode

Models

Vamos mapear o nosso diagrama em modelos

from django.db import models
from django.utils import timezone
from django.core.exceptions import ValidationError
# Create your models here.

class Seller(models.Model):
    name = models.CharField(max_length=30)

    def __str__(self) -> str:
        return self.name

class Sale(models.Model):
    total = models.FloatField(default=0)
    created_at = models.DateTimeField(default=timezone.now)

    saller = models.ForeignKey(Seller, on_delete=models.DO_NOTHING)

SIZE_CLOTHES = (
    ('p','P'),
    ('m','M'),
    ('g','G')
)

class Clothe(models.Model):
    name = models.CharField(max_length=50)
    size = models.CharField(max_length=1, choices=SIZE_CLOTHES)
    amount = models.PositiveIntegerField()

    def __str__(self) -> str:
        return f"{self.name}({self.size})"

class ClotheItem(models.Model):
    price_unit = models.FloatField()
    amount_items = models.PositiveIntegerField()

    sale = models.ForeignKey(Sale, on_delete=models.CASCADE, related_name="items", null=False)
    clothe = models.ForeignKey(Clothe, on_delete=models.CASCADE, null=False)
    def clean(self) -> None:
        if self.amount_items < self.clothe.amount:
            raise ValidationError("Amount Item more than Amount Clothes ")
        return super(ClotheItem, self).clean()

    def __str__(self) -> str:
        return f"{self.clothe}"
Enter fullscreen mode Exit fullscreen mode

Criando as migrações e o banco:

python manage.py makemigrations
python manage.py migrate
Enter fullscreen mode Exit fullscreen mode

Tasks

O nosso objetivo é criar uma rota que gere um relatório sobre as vendas da nossa loja que pode ser lida pelo Excel, então vamos criar um arquivo na aplicação chamada tasks.py contendo uma função chamada create_report_of_sale

from openpyxl import Workbook
from core.models import Sale

def create_report_of_sale():
    wb = Workbook()
    ws = wb.active
    ws.append(["ID", "Criado Em", "Total","Vendedor","Roupas", "Quantidade de Roupas"])

    sales = Sale.objects.prefetch_related("items")

    for sale in sales:
        date = sale.created_at.strftime('%Y-%m-%d')
        items = []
        for item in sale.items.all():
            items.append(item.clothe.name)

        ws.append([sale.pk,date,sale.total, sale.saller.name, str(items), len(items)])

    wb.save("store_sales.xlsx")
Enter fullscreen mode Exit fullscreen mode

O código acima apenas cria um arquivo store_sales.xlsx contendo uma tabela com alguns dados das vendas.

Urls

from django.contrib import admin
from django.urls import path
from core.views import CreateReportToSale

urlpatterns = [
    path('admin/', admin.site.urls),
    path('report/sales/', CreateReportToSale.as_view()),
]
Enter fullscreen mode Exit fullscreen mode

Views

Vamos criar a nossa view para gerenciar a nossa rota, adicione o seguinte conteúdo:

from rest_framework.views import APIView
from core.tasks import create_report_of_sale
from rest_framework.response import Response
import time
# Create your views here.
class CreateReportToSale(APIView):

    def get(self, request):
        tb = time.time()
        create_report_of_sale()
        ta = time.time()
        print(f"Tempo Processando:{ta-tb}s")
        return Response(data={"msg":f"Relatório criado com sucesso !"},status=200)
Enter fullscreen mode Exit fullscreen mode

O código acima cria uma view para gerenciar a rota com o método HTTP GET e criar o nosso relatório. Adicionamos um print para saber o tempo que demora isso.

Fazendo acontecer

Para a gente gerar o nosso relatório vamos necessitar de dados, então vamos usar a nossa biblioteca model_bakery. Abra o shell do django com os seguinte comando:

python manage.py shell
Enter fullscreen mode Exit fullscreen mode

Dentro dele, execute os seguintes comandos:

>>> from model_bakery import baker
>>> baker.make("core.ClotheItem", _quantity=3000)
Enter fullscreen mode Exit fullscreen mode

O código acima vai criar 3000 Itens de roupas, 3000 roupas e 3000 vendas, pois o nosso modelo ClotheItem está relacionado com esses modelos e temos o argumento null=False. O código serve para simular a criação de 3000 vendas com 1 item de roupa cada.

Depois entre na rota da criação do relatório, com o projeto rodando, e vamos ver que existe uma demora para ser processada, e também podemos ver o tempo que demorou quando vamos no terminal.

Usando Task Queue na API

Dependências

Esse é um exemplo de código que demora para ser processado, também temos outros exemplos como acesso a API que esteja fora do ar, envio de email e etc. Assim para resolver o nosso problema vamos utilizar Task Queue. Primeiramente vamos instalar a nossa biblioteca celery.

pip install celery
Enter fullscreen mode Exit fullscreen mode

Posteriormente temos que subir o nosso message broker(RabbitMQ) com o seguinte comando docker:

docker run -d -p 5672:5672 rabbitmq
Enter fullscreen mode Exit fullscreen mode

Celery

Com essas duas dependências podemos dá continuidade no nosso projeto. Comece criando um arquivo chamado celery.py com o conteúdo:

from celery import Celery
import django
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'store_app.settings')
django.setup()
app = Celery("store_app", broker='pyamqp://guest@localhost//')

app.autodiscover_tasks()
Enter fullscreen mode Exit fullscreen mode

O código acima está usando as configurações do arquivo settings.py e também está criando uma nova instância do celery, dando o nome dela de store_app e fazendo conexão ao nosso broker pelo protocolo AMQP.

A linha que se segue é para que todos os apps que foram colocados no settings.py possam fazer uso dessa instância.

Uma outra configuração que devemos fazer é dentro do arquivo “init.py” do projeto, afim de inicializar a instância do celery quando o django inicializar o projeto, e assim as nossas aplicações poderão fazer acesso a partir da função shared_task.

from celery import app

__all__ = ('app',)
Enter fullscreen mode Exit fullscreen mode

Tasks

No arquivo tasks.py podemos modificar o código para o seguinte:

from openpyxl import Workbook
from core.models import Sale
from celery import shared_task #<-new
@shared_task(
        bind=True,
        max_retries=5,
        default_retry_delay=30) #<-new
def create_report_of_sale(self): #<-new
    try: #<-new
        wb = Workbook()
        ws = wb.active
        ws.append(["ID", "Criado Em", "Total","Vendedor","Roupas", "Quantidade de Roupas"])

        sales = Sale.objects.prefetch_related("items")

        for sale in sales:
            date = sale.created_at.strftime('%Y-%m-%d')
            items = []
            for item in sale.items.all():
                items.append(item.clothe.name)

            ws.append([sale.pk,date,sale.total, sale.saller.name, str(items), len(items)])

        wb.save("sample.xlsx")
    except: #<-new
        raise self.retry() #<-new
Enter fullscreen mode Exit fullscreen mode

No conteúdo acima estamos decorando a nossa função usando o decorator shared_task, usando este decorator estamos criando uma nova task que poderá ser chamada por qualquer código, e ainda estamos passando alguns argumentos para ele, como:

  • bind - Nos possibilita acessar determinadas propriedades da task específica, como o objeto de requisição ou a função retry, que nos possibilita executar novamente a task caso algo tenha dado errado;
  • max_retries - É o contador que adiciona um número máximo de tentativas de chamada da função retry;
  • default_retry_delay - É o tempo entre uma chamada ao retry e outra. ### View

Na nossa view podemos modificar o código para o seguinte:

from rest_framework.views import APIView
from core.tasks import create_report_of_sale
from rest_framework.response import Response
import time
# Create your views here.
class CreateReportToSale(APIView):

    def get(self, request):
        tb = time.time()
        create_report_of_sale.delay() #<-new
        ta = time.time()
        print(f"Tempo de Processamento:{ta-tb}")
        return Response(data={"msg":f"Relatório criado com sucesso !"},status=200)
Enter fullscreen mode Exit fullscreen mode

Com essa simples chamada a “mágica” pode acontecer, mas para isso é importante criarmos nossos consumidores de tasks, então crie um outro terminal e adicione o seguinte código:

celery -A store_app worker --loglevel=INFO
Enter fullscreen mode Exit fullscreen mode

Obs.: Lembre que o nosso process worker não tem hot reloading, ou seja, todas as vezes que modificar sua task rode o código acima.

Entre no seu projeto e acesse a rota de criação do nosso relatório. Como podemos ver as coisas ocorrem bem mais rápido, pois quando acessamos a nossa rota criamos uma task que foi adicionada ao broker e consumida por um worker disponível, e o tempo de adicionar um task no broker é bem mais rápida do que processa-la.

Repositório:
Código Desenvolvido

Referências

https://docs.celeryq.dev/en/stable/index.html

https://denibertovic.com/posts/celery-best-practices/

https://www.django-rest-framework.org

https://docs.djangoproject.com/en/4.1/

Top comments (0)