Python

Dominando Celery em Python: Filas de Tarefas, Workers e Beat Scheduler em Projetos Reais

18 min de leitura

Dominando Celery em Python: Filas de Tarefas, Workers e Beat Scheduler em Projetos Reais

O que é Celery e por que você precisa disso Celery é uma biblioteca Python assíncrona que permite executar tarefas em background de forma distribuída. Se você já se viu preso em uma situação onde sua aplicação web fica travada enquanto processa um arquivo grande, envia emails ou faz requisições lentas para APIs externas, você entendeu o problema que Celery resolve. A ideia central é simples: em vez de executar operações pesadas no mesmo processo que atende as requisições HTTP, você enfileira essas tarefas em um broker de mensagens (como Redis ou RabbitMQ) e deixa que workers independentes processem tudo em paralelo. Sua aplicação responde instantaneamente ao usuário enquanto o trabalho acontece nos bastidores. Celery também oferece agendamento de tarefas através do Beat Scheduler, permitindo executar jobs em horários específicos, como um cron job tradicional, mas com toda a robustez de um sistema distribuído. Arquitetura fundamental do Celery O Celery funciona com três componentes principais: a aplicação que enfileira tarefas,

<h2>O que é Celery e por que você precisa disso</h2>

<p>Celery é uma biblioteca Python assíncrona que permite executar tarefas em background de forma distribuída. Se você já se viu preso em uma situação onde sua aplicação web fica travada enquanto processa um arquivo grande, envia emails ou faz requisições lentas para APIs externas, você entendeu o problema que Celery resolve.</p>

<p>A ideia central é simples: em vez de executar operações pesadas no mesmo processo que atende as requisições HTTP, você enfileira essas tarefas em um broker de mensagens (como Redis ou RabbitMQ) e deixa que workers independentes processem tudo em paralelo. Sua aplicação responde instantaneamente ao usuário enquanto o trabalho acontece nos bastidores. Celery também oferece agendamento de tarefas através do Beat Scheduler, permitindo executar jobs em horários específicos, como um cron job tradicional, mas com toda a robustez de um sistema distribuído.</p>

<h3>Arquitetura fundamental do Celery</h3>

<p>O Celery funciona com três componentes principais: a aplicação que enfileira tarefas, o broker que armazena essas mensagens, e os workers que as executem. A aplicação envia mensagens para o broker, dizendo &quot;execute essa função com esses parâmetros&quot;. Os workers monitoram o broker continuamente, pegam mensagens, executam o código correspondente e retornam o resultado. Tudo isso sem bloquear a aplicação principal.</p>

<p>O resultado das tarefas é armazenado em um backend de resultados (também pode ser Redis, banco de dados, etc), permitindo que você consulte o status e recupere o resultado quando necessário. Essa separação de responsabilidades torna o sistema escalável: você pode adicionar quantos workers quiser conforme a demanda cresce.</p>

<h2>Configuração inicial e primeiros passos</h2>

<p>Antes de qualquer coisa, você precisa instalar Celery e um broker de mensagens. Redis é a escolha mais simples para começar:</p>

<pre><code class="language-bash">pip install celery redis</code></pre>

<p>Agora vamos criar uma estrutura mínima de projeto. Crie um arquivo <code>celery_app.py</code>:</p>

<pre><code class="language-python">from celery import Celery

Cria a instância do Celery

app = Celery(&#039;meu_projeto&#039;)

Configura o broker (Redis rodando localmente na porta padrão)

app.conf.broker_url = &#039;redis://localhost:6379/0&#039;

Configura o backend de resultados

app.conf.result_backend = &#039;redis://localhost:6379/0&#039;

Define um timeout para tarefas (em segundos)

app.conf.task_time_limit = 30 * 60 # 30 minutos

app.conf.task_soft_time_limit = 25 * 60 # 25 minutos (aviso antes do timeout)</code></pre>

<p>Agora defina suas tarefas em um arquivo <code>tasks.py</code>:</p>

<pre><code class="language-python">from celery_app import app

import time

@app.task

def enviar_email(destinatario, assunto, corpo):

&quot;&quot;&quot;Simula envio de email&quot;&quot;&quot;

print(f&quot;Enviando email para {destinatario}...&quot;)

time.sleep(2) # Simula operação lenta

print(f&quot;Email enviado para {destinatario}&quot;)

return f&quot;Email enviado com sucesso para {destinatario}&quot;

@app.task

def processar_arquivo(caminho_arquivo):

&quot;&quot;&quot;Simula processamento de arquivo grande&quot;&quot;&quot;

print(f&quot;Processando arquivo: {caminho_arquivo}&quot;)

time.sleep(5) # Simula trabalho pesado

print(f&quot;Arquivo processado&quot;)

return {&quot;status&quot;: &quot;concluído&quot;, &quot;arquivo&quot;: caminho_arquivo}</code></pre>

<p>Para testar localmente sem precisar rodar um worker separado, você pode usar:</p>

<pre><code class="language-python"># teste.py

from tasks import enviar_email, processar_arquivo

Executa a tarefa de forma síncrona (apenas para testes)

resultado = enviar_email.apply_async(

args=(&#039;usuario@example.com&#039;, &#039;Bem-vindo&#039;, &#039;Teste de email&#039;),

task_id=&#039;teste-123&#039;

)

print(f&quot;ID da tarefa: {resultado.id}&quot;)

print(f&quot;Status: {resultado.status}&quot;)

print(f&quot;Resultado: {resultado.get(timeout=10)}&quot;)</code></pre>

<h3>Iniciando o Redis e os Workers</h3>

<p>Abra um terminal e inicie o Redis (ou use Docker):</p>

<pre><code class="language-bash">docker run -d -p 6379:6379 redis:latest</code></pre>

<p>Em outro terminal, inicie um worker Celery:</p>

<pre><code class="language-bash">celery -A celery_app worker --loglevel=info</code></pre>

<p>Agora você pode executar suas tarefas de forma assíncrona. A resposta será instantânea e o worker processará em background.</p>

<h2>Filas, Prioridades e Roteamento de Tarefas</h2>

<p>Por padrão, todas as tarefas entram em uma única fila chamada <code>celery</code>. Para aplicações maiores, você quer separar tarefas por tipo e dar prioridades diferentes. Isso é feito através do roteamento.</p>

<p>Vamos expandir o arquivo de configuração <code>celery_app.py</code>:</p>

<pre><code class="language-python">from celery import Celery

from kombu import Exchange, Queue

app = Celery(&#039;meu_projeto&#039;)

app.conf.broker_url = &#039;redis://localhost:6379/0&#039;

app.conf.result_backend = &#039;redis://localhost:6379/0&#039;

Define as exchanges (tópicos/grupos de mensagens)

default_exchange = Exchange(&#039;celery&#039;, type=&#039;direct&#039;)

email_exchange = Exchange(&#039;email&#039;, type=&#039;direct&#039;)

processamento_exchange = Exchange(&#039;processamento&#039;, type=&#039;direct&#039;)

Define as filas

app.conf.task_queues = (

Queue(&#039;default&#039;, exchange=default_exchange, routing_key=&#039;default&#039;),

Queue(&#039;email&#039;, exchange=email_exchange, routing_key=&#039;email&#039;),

Queue(&#039;processamento&#039;, exchange=processamento_exchange, routing_key=&#039;processamento&#039;),

)

Define o roteamento: qual fila cada tarefa deve usar

app.conf.task_routes = {

&#039;tasks.enviar_email&#039;: {&#039;queue&#039;: &#039;email&#039;, &#039;routing_key&#039;: &#039;email&#039;},

&#039;tasks.processar_arquivo&#039;: {&#039;queue&#039;: &#039;processamento&#039;, &#039;routing_key&#039;: &#039;processamento&#039;},

}</code></pre>

<p>Agora suas tarefas irão para filas específicas:</p>

<pre><code class="language-python"># tasks.py

from celery_app import app

@app.task(queue=&#039;email&#039;)

def enviar_email(destinatario, assunto, corpo):

print(f&quot;Enviando email para {destinatario}...&quot;)

return f&quot;Email enviado para {destinatario}&quot;

@app.task(queue=&#039;processamento&#039;)

def processar_arquivo(caminho_arquivo):

print(f&quot;Processando arquivo: {caminho_arquivo}&quot;)

return {&quot;status&quot;: &quot;concluído&quot;}</code></pre>

<h3>Controlando a prioridade de execução</h3>

<p>Se você quer que certos emails urgentes sejam processados antes de outros, use o parâmetro <code>priority</code>:</p>

<pre><code class="language-python">from tasks import enviar_email

Email com alta prioridade

enviar_email.apply_async(

args=(&#039;ceo@empresa.com&#039;, &#039;Urgente&#039;, &#039;Relatório crítico&#039;),

priority=9 # 0 a 9, quanto maior, maior a prioridade

)

Email com prioridade normal

enviar_email.apply_async(

args=(&#039;usuario@empresa.com&#039;, &#039;Newsletter&#039;, &#039;Conteúdo semanal&#039;),

priority=5

)</code></pre>

<p>Você pode iniciar múltiplos workers especializados, cada um consumindo filas diferentes:</p>

<pre><code class="language-bash"># Worker exclusivo para emails (processa prioritariamente)

celery -A celery_app worker -Q email --concurrency=2 --loglevel=info

Worker para processamento pesado

celery -A celery_app worker -Q processamento --concurrency=1 --loglevel=info

Worker para tarefas gerais

celery -A celery_app worker -Q default --concurrency=4 --loglevel=info</code></pre>

<h2>Celery Beat: Agendamento de Tarefas Periódicas</h2>

<p>O Celery Beat é um scheduler que executa tarefas em intervalos regulares ou em horários específicos. É a solução Celery para o que você faria com cron, mas integrada e distribuída.</p>

<h3>Configurando o Beat Scheduler</h3>

<p>Adicione à sua configuração do Celery:</p>

<pre><code class="language-python"># celery_app.py

from celery import Celery

from celery.schedules import crontab

from kombu import Exchange, Queue

import os

app = Celery(&#039;meu_projeto&#039;)

app.conf.broker_url = &#039;redis://localhost:6379/0&#039;

app.conf.result_backend = &#039;redis://localhost:6379/0&#039;

Configuração do Beat

app.conf.beat_scheduler = &#039;celery.beat:PersistentScheduler&#039;

app.conf.beat_schedule = {

Tarefa que executa a cada 30 segundos

&#039;limpar-cache-cada-30s&#039;: {

&#039;task&#039;: &#039;tasks.limpar_cache&#039;,

&#039;schedule&#039;: 30.0, # em segundos

},

Tarefa que executa a cada 5 minutos

&#039;verificar-emails-pendentes&#039;: {

&#039;task&#039;: &#039;tasks.verificar_emails_pendentes&#039;,

&#039;schedule&#039;: 300.0, # 5 minutos

},

Tarefa que executa diariamente às 3 da manhã

&#039;backup-diario&#039;: {

&#039;task&#039;: &#039;tasks.fazer_backup&#039;,

&#039;schedule&#039;: crontab(hour=3, minute=0),

},

Tarefa que executa segunda a sexta às 9 da manhã

&#039;relatorio-semanal&#039;: {

&#039;task&#039;: &#039;tasks.enviar_relatorio&#039;,

&#039;schedule&#039;: crontab(hour=9, minute=0, day_of_week=&#039;1-5&#039;),

},

}</code></pre>

<p>Agora defina as tarefas correspondentes:</p>

<pre><code class="language-python"># tasks.py

from celery_app import app

from datetime import datetime

@app.task

def limpar_cache():

&quot;&quot;&quot;Remove itens expirados do cache&quot;&quot;&quot;

print(f&quot;[{datetime.now()}] Limpando cache...&quot;)

Sua lógica de limpeza aqui

return &quot;Cache limpo&quot;

@app.task

def verificar_emails_pendentes():

&quot;&quot;&quot;Verifica emails aguardando envio&quot;&quot;&quot;

print(f&quot;[{datetime.now()}] Verificando emails pendentes...&quot;)

Sua lógica aqui

return &quot;Verificação concluída&quot;

@app.task

def fazer_backup():

&quot;&quot;&quot;Faz backup do banco de dados&quot;&quot;&quot;

print(f&quot;[{datetime.now()}] Iniciando backup...&quot;)

Sua lógica de backup

return &quot;Backup concluído&quot;

@app.task

def enviar_relatorio():

&quot;&quot;&quot;Envia relatório semanal&quot;&quot;&quot;

print(f&quot;[{datetime.now()}] Gerando e enviando relatório...&quot;)

Sua lógica de relatório

return &quot;Relatório enviado&quot;</code></pre>

<h3>Executando o Beat Scheduler</h3>

<p>Em um terminal separado, inicie o Beat:</p>

<pre><code class="language-bash">celery -A celery_app beat --loglevel=info</code></pre>

<p>E deixe um ou mais workers rodando em outros terminais:</p>

<pre><code class="language-bash">celery -A celery_app worker --loglevel=info</code></pre>

<p>O Beat irá enfileirar as tarefas nos horários configurados, e os workers as executarão. Você verá no log do worker as tarefas sendo processadas automaticamente.</p>

<h3>Combinando Beat com roteamento de filas</h3>

<p>Para tarefas agendadas irem para filas específicas:</p>

<pre><code class="language-python"># celery_app.py

app.conf.beat_schedule = {

&#039;backup-diario&#039;: {

&#039;task&#039;: &#039;tasks.fazer_backup&#039;,

&#039;schedule&#039;: crontab(hour=3, minute=0),

&#039;options&#039;: {&#039;queue&#039;: &#039;processamento&#039;, &#039;priority&#039;: 8}

},

}</code></pre>

<h2>Monitoramento, Tratamento de Erros e Boas Práticas</h2>

<h3>Tratamento de erros e retry automático</h3>

<p>Tarefas podem falhar por vários motivos. Celery oferece mecanismos de retry inteligentes:</p>

<pre><code class="language-python"># tasks.py

from celery_app import app

from celery import Task

import requests

class CallbackTask(Task):

&quot;&quot;&quot;Task base com callback para erros&quot;&quot;&quot;

autoretry_for = (Exception,)

retry_kwargs = {&#039;max_retries&#039;: 3}

retry_backoff = True # Espera exponencial entre tentativas

retry_backoff_max = 600 # Máximo de 10 minutos de espera

retry_jitter = True # Adiciona aleatoriedade para evitar thundering herd

@app.task(bind=True, base=CallbackTask)

def chamar_api_externa(self, url):

&quot;&quot;&quot;Chama API externa com retry automático&quot;&quot;&quot;

try:

response = requests.get(url, timeout=5)

response.raise_for_status()

return {&quot;status&quot;: &quot;sucesso&quot;, &quot;dados&quot;: response.json()}

except requests.RequestException as exc:

Tenta novamente em 60 segundos na primeira falha,

120 na segunda, 240 na terceira

raise self.retry(exc=exc, countdown=2 * self.request.retries 60)

@app.task(bind=True)

def tarefa_critica(self, dados):

&quot;&quot;&quot;Tarefa com retry customizado&quot;&quot;&quot;

try:

Sua lógica aqui

resultado = processar(dados)

return resultado

except ValueError as exc:

Não faz retry para ValueError

raise

except Exception as exc:

Faz retry exponencial

self.retry(exc=exc, countdown=10)

def processar(dados):

&quot;&quot;&quot;Função auxiliar&quot;&quot;&quot;

return {&quot;processado&quot;: True}</code></pre>

<h3>Monitoramento com Flower</h3>

<p>Flower é uma interface web para monitorar Celery em tempo real:</p>

<pre><code class="language-bash">pip install flower

Inicie o Flower (acessível em http://localhost:5555)

celery -A celery_app flower</code></pre>

<p>No Flower você pode:</p>

<ul>

<li>Ver workers conectados e seu status</li>

<li>Monitorar tarefas em tempo real</li>

<li>Ver histórico de execução</li>

<li>Executar tarefas manualmente</li>

<li>Visualizar logs detalhados</li>

</ul>

<h3>Padrão de idempotência para tarefas</h3>

<p>Tarefas podem ser executadas mais de uma vez (se o worker falhar ou por deduplicação). Garanta que são idempotentes:</p>

<pre><code class="language-python"># tasks.py

from celery_app import app

import hashlib

Simulando um banco de dados

processados = set()

@app.task(bind=True)

def processar_pagamento(self, pedido_id, valor):

&quot;&quot;&quot;Processa pagamento de forma idempotente&quot;&quot;&quot;

Cria um ID único para este processamento

task_hash = hashlib.md5(

f&quot;{pedido_id}-{valor}-{self.request.id}&quot;.encode()

).hexdigest()

Se já foi processado, retorna sem erro

if task_hash in processados:

print(f&quot;Pagamento já processado (ID: {pedido_id})&quot;)

return {&quot;status&quot;: &quot;já_processado&quot;, &quot;pedido_id&quot;: pedido_id}

Processa o pagamento

print(f&quot;Processando pagamento: Pedido {pedido_id}, Valor R${valor}&quot;)

Sua lógica de pagamento aqui (chamar API do gateway, etc)

Marca como processado

processados.add(task_hash)

return {&quot;status&quot;: &quot;sucesso&quot;, &quot;pedido_id&quot;: pedido_id, &quot;valor&quot;: valor}</code></pre>

<h3>Exemplo completo com Flask</h3>

<p>Aqui está uma aplicação web completa usando Celery:</p>

<pre><code class="language-python"># app.py

from flask import Flask, jsonify, request

from tasks import enviar_email, processar_arquivo

from celery_app import app as celery_app

app = Flask(__name__)

@app.route(&#039;/enviar-email&#039;, methods=[&#039;POST&#039;])

def enviar_email_endpoint():

&quot;&quot;&quot;Enfileira um email para envio assíncrono&quot;&quot;&quot;

dados = request.get_json()

Enfileira a tarefa sem esperar a resposta

task = enviar_email.apply_async(

args=(

dados[&#039;destinatario&#039;],

dados[&#039;assunto&#039;],

dados[&#039;corpo&#039;]

),

priority=dados.get(&#039;prioridade&#039;, 5)

)

return jsonify({

&quot;mensagem&quot;: &quot;Email enfileirado&quot;,

&quot;task_id&quot;: task.id

}), 202

@app.route(&#039;/status/&lt;task_id&gt;&#039;, methods=[&#039;GET&#039;])

def obter_status(task_id):

&quot;&quot;&quot;Retorna status de uma tarefa&quot;&quot;&quot;

task = celery_app.AsyncResult(task_id)

return jsonify({

&quot;task_id&quot;: task_id,

&quot;status&quot;: task.status,

&quot;resultado&quot;: task.result if task.successful() else None

})

@app.route(&#039;/processar&#039;, methods=[&#039;POST&#039;])

def processar_endpoint():

&quot;&quot;&quot;Enfileira processamento de arquivo&quot;&quot;&quot;

dados = request.get_json()

task = processar_arquivo.apply_async(

args=(dados[&#039;arquivo&#039;],),

queue=&#039;processamento&#039;

)

return jsonify({

&quot;mensagem&quot;: &quot;Processamento iniciado&quot;,

&quot;task_id&quot;: task.id

}), 202

if __name__ == &#039;__main__&#039;:

app.run(debug=True)</code></pre>

<p>Use a API assim:</p>

<pre><code class="language-bash"># Enfileira um email

curl -X POST http://localhost:5000/enviar-email \

-H &quot;Content-Type: application/json&quot; \

-d &#039;{

&quot;destinatario&quot;: &quot;usuario@example.com&quot;,

&quot;assunto&quot;: &quot;Bem-vindo&quot;,

&quot;corpo&quot;: &quot;Teste de email&quot;,

&quot;prioridade&quot;: 7

}&#039;

Verifica o status (use a task_id retornada)

curl http://localhost:5000/status/abc123def456</code></pre>

<h2>Conclusão</h2>

<p>Você aprendeu que <strong>Celery resolve o problema de operações síncronas bloqueantes</strong> permitindo executar tarefas em background através de um broker de mensagens. Isso torna suas aplicações web responsivas mesmo com operações pesadas acontecendo simultaneamente.</p>

<p>O <strong>roteamento inteligente de tarefas em filas diferentes</strong> com prioridades oferece controle fino sobre como seu sistema processa trabalho, permitindo escalar verticalmente (mais workers) ou horizontalmente (máquinas diferentes) conforme necessário.</p>

<p>O <strong>Celery Beat traz agendamento confiável e distribuído</strong>, eliminando a necessidade de cron jobs tradicionais e integrando perfeitamente com sua arquitetura de workers. Combinado com tratamento de erros, retry exponencial e idempotência, você tem um sistema robusto capaz de processar bilhões de tarefas em produção.</p>

<h2>Referências</h2>

<ul>

<li><a href="https://docs.celeryproject.io/" target="_blank" rel="noopener noreferrer">Documentação Oficial do Celery</a></li>

<li><a href="https://docs.celeryproject.io/en/stable/userguide/routing.html" target="_blank" rel="noopener noreferrer">Celery Task Routing and Queues</a></li>

<li><a href="https://docs.celeryproject.io/en/stable/userguide/periodic-tasks.html" target="_blank" rel="noopener noreferrer">Celery Beat Scheduler</a></li>

<li><a href="https://flower.readthedocs.io/" target="_blank" rel="noopener noreferrer">Flower: Real-time Celery Web Monitor</a></li>

<li><a href="https://redis.io/documentation" target="_blank" rel="noopener noreferrer">Redis: In-Memory Data Structure Store</a></li>

</ul>

<p>&lt;!-- FIM --&gt;</p>

Comentários

Mais em Python

Dominando TDD em Python: Desenvolvendo Guiado por Testes na Prática em Projetos Reais
Dominando TDD em Python: Desenvolvendo Guiado por Testes na Prática em Projetos Reais

O que é TDD e por que você deveria se importar Test-Driven Development (TDD)...

Dominando Manipulação de Arquivos em Python: CSV, JSON, XML e Excel com openpyxl em Projetos Reais
Dominando Manipulação de Arquivos em Python: CSV, JSON, XML e Excel com openpyxl em Projetos Reais

Introdução: Por que dominar manipulação de arquivos? A manipulação de arquivo...

Guia Completo de Módulos e Pacotes em Python: import, __init__ e Organização de Projetos
Guia Completo de Módulos e Pacotes em Python: import, __init__ e Organização de Projetos

Entendendo o Sistema de Módulos e Pacotes em Python Um módulo em Python é sim...