<h2>O que é Celery e por que você precisa disso</h2>
<p>Celery é uma biblioteca Python assíncrona que permite executar tarefas em background de forma distribuída. Se você já se viu preso em uma situação onde sua aplicação web fica travada enquanto processa um arquivo grande, envia emails ou faz requisições lentas para APIs externas, você entendeu o problema que Celery resolve.</p>
<p>A ideia central é simples: em vez de executar operações pesadas no mesmo processo que atende as requisições HTTP, você enfileira essas tarefas em um broker de mensagens (como Redis ou RabbitMQ) e deixa que workers independentes processem tudo em paralelo. Sua aplicação responde instantaneamente ao usuário enquanto o trabalho acontece nos bastidores. Celery também oferece agendamento de tarefas através do Beat Scheduler, permitindo executar jobs em horários específicos, como um cron job tradicional, mas com toda a robustez de um sistema distribuído.</p>
<h3>Arquitetura fundamental do Celery</h3>
<p>O Celery funciona com três componentes principais: a aplicação que enfileira tarefas, o broker que armazena essas mensagens, e os workers que as executem. A aplicação envia mensagens para o broker, dizendo "execute essa função com esses parâmetros". Os workers monitoram o broker continuamente, pegam mensagens, executam o código correspondente e retornam o resultado. Tudo isso sem bloquear a aplicação principal.</p>
<p>O resultado das tarefas é armazenado em um backend de resultados (também pode ser Redis, banco de dados, etc), permitindo que você consulte o status e recupere o resultado quando necessário. Essa separação de responsabilidades torna o sistema escalável: você pode adicionar quantos workers quiser conforme a demanda cresce.</p>
<h2>Configuração inicial e primeiros passos</h2>
<p>Antes de qualquer coisa, você precisa instalar Celery e um broker de mensagens. Redis é a escolha mais simples para começar:</p>
<pre><code class="language-bash">pip install celery redis</code></pre>
<p>Agora vamos criar uma estrutura mínima de projeto. Crie um arquivo <code>celery_app.py</code>:</p>
<pre><code class="language-python">from celery import Celery
Cria a instância do Celery
app = Celery('meu_projeto')
Configura o broker (Redis rodando localmente na porta padrão)
app.conf.broker_url = 'redis://localhost:6379/0'
Configura o backend de resultados
app.conf.result_backend = 'redis://localhost:6379/0'
Define um timeout para tarefas (em segundos)
app.conf.task_time_limit = 30 * 60 # 30 minutos
app.conf.task_soft_time_limit = 25 * 60 # 25 minutos (aviso antes do timeout)</code></pre>
<p>Agora defina suas tarefas em um arquivo <code>tasks.py</code>:</p>
<pre><code class="language-python">from celery_app import app
import time
@app.task
def enviar_email(destinatario, assunto, corpo):
"""Simula envio de email"""
print(f"Enviando email para {destinatario}...")
time.sleep(2) # Simula operação lenta
print(f"Email enviado para {destinatario}")
return f"Email enviado com sucesso para {destinatario}"
@app.task
def processar_arquivo(caminho_arquivo):
"""Simula processamento de arquivo grande"""
print(f"Processando arquivo: {caminho_arquivo}")
time.sleep(5) # Simula trabalho pesado
print(f"Arquivo processado")
return {"status": "concluído", "arquivo": caminho_arquivo}</code></pre>
<p>Para testar localmente sem precisar rodar um worker separado, você pode usar:</p>
<pre><code class="language-python"># teste.py
from tasks import enviar_email, processar_arquivo
Executa a tarefa de forma síncrona (apenas para testes)
resultado = enviar_email.apply_async(
args=('usuario@example.com', 'Bem-vindo', 'Teste de email'),
task_id='teste-123'
)
print(f"ID da tarefa: {resultado.id}")
print(f"Status: {resultado.status}")
print(f"Resultado: {resultado.get(timeout=10)}")</code></pre>
<h3>Iniciando o Redis e os Workers</h3>
<p>Abra um terminal e inicie o Redis (ou use Docker):</p>
<pre><code class="language-bash">docker run -d -p 6379:6379 redis:latest</code></pre>
<p>Em outro terminal, inicie um worker Celery:</p>
<pre><code class="language-bash">celery -A celery_app worker --loglevel=info</code></pre>
<p>Agora você pode executar suas tarefas de forma assíncrona. A resposta será instantânea e o worker processará em background.</p>
<h2>Filas, Prioridades e Roteamento de Tarefas</h2>
<p>Por padrão, todas as tarefas entram em uma única fila chamada <code>celery</code>. Para aplicações maiores, você quer separar tarefas por tipo e dar prioridades diferentes. Isso é feito através do roteamento.</p>
<p>Vamos expandir o arquivo de configuração <code>celery_app.py</code>:</p>
<pre><code class="language-python">from celery import Celery
from kombu import Exchange, Queue
app = Celery('meu_projeto')
app.conf.broker_url = 'redis://localhost:6379/0'
app.conf.result_backend = 'redis://localhost:6379/0'
Define as exchanges (tópicos/grupos de mensagens)
default_exchange = Exchange('celery', type='direct')
email_exchange = Exchange('email', type='direct')
processamento_exchange = Exchange('processamento', type='direct')
Define as filas
app.conf.task_queues = (
Queue('default', exchange=default_exchange, routing_key='default'),
Queue('email', exchange=email_exchange, routing_key='email'),
Queue('processamento', exchange=processamento_exchange, routing_key='processamento'),
)
Define o roteamento: qual fila cada tarefa deve usar
app.conf.task_routes = {
'tasks.enviar_email': {'queue': 'email', 'routing_key': 'email'},
'tasks.processar_arquivo': {'queue': 'processamento', 'routing_key': 'processamento'},
}</code></pre>
<p>Agora suas tarefas irão para filas específicas:</p>
<pre><code class="language-python"># tasks.py
from celery_app import app
@app.task(queue='email')
def enviar_email(destinatario, assunto, corpo):
print(f"Enviando email para {destinatario}...")
return f"Email enviado para {destinatario}"
@app.task(queue='processamento')
def processar_arquivo(caminho_arquivo):
print(f"Processando arquivo: {caminho_arquivo}")
return {"status": "concluído"}</code></pre>
<h3>Controlando a prioridade de execução</h3>
<p>Se você quer que certos emails urgentes sejam processados antes de outros, use o parâmetro <code>priority</code>:</p>
<pre><code class="language-python">from tasks import enviar_email
Email com alta prioridade
enviar_email.apply_async(
args=('ceo@empresa.com', 'Urgente', 'Relatório crítico'),
priority=9 # 0 a 9, quanto maior, maior a prioridade
)
Email com prioridade normal
enviar_email.apply_async(
args=('usuario@empresa.com', 'Newsletter', 'Conteúdo semanal'),
priority=5
)</code></pre>
<p>Você pode iniciar múltiplos workers especializados, cada um consumindo filas diferentes:</p>
<pre><code class="language-bash"># Worker exclusivo para emails (processa prioritariamente)
celery -A celery_app worker -Q email --concurrency=2 --loglevel=info
Worker para processamento pesado
celery -A celery_app worker -Q processamento --concurrency=1 --loglevel=info
Worker para tarefas gerais
celery -A celery_app worker -Q default --concurrency=4 --loglevel=info</code></pre>
<h2>Celery Beat: Agendamento de Tarefas Periódicas</h2>
<p>O Celery Beat é um scheduler que executa tarefas em intervalos regulares ou em horários específicos. É a solução Celery para o que você faria com cron, mas integrada e distribuída.</p>
<h3>Configurando o Beat Scheduler</h3>
<p>Adicione à sua configuração do Celery:</p>
<pre><code class="language-python"># celery_app.py
from celery import Celery
from celery.schedules import crontab
from kombu import Exchange, Queue
import os
app = Celery('meu_projeto')
app.conf.broker_url = 'redis://localhost:6379/0'
app.conf.result_backend = 'redis://localhost:6379/0'
Configuração do Beat
app.conf.beat_scheduler = 'celery.beat:PersistentScheduler'
app.conf.beat_schedule = {
Tarefa que executa a cada 30 segundos
'limpar-cache-cada-30s': {
'task': 'tasks.limpar_cache',
'schedule': 30.0, # em segundos
},
Tarefa que executa a cada 5 minutos
'verificar-emails-pendentes': {
'task': 'tasks.verificar_emails_pendentes',
'schedule': 300.0, # 5 minutos
},
Tarefa que executa diariamente às 3 da manhã
'backup-diario': {
'task': 'tasks.fazer_backup',
'schedule': crontab(hour=3, minute=0),
},
Tarefa que executa segunda a sexta às 9 da manhã
'relatorio-semanal': {
'task': 'tasks.enviar_relatorio',
'schedule': crontab(hour=9, minute=0, day_of_week='1-5'),
},
}</code></pre>
<p>Agora defina as tarefas correspondentes:</p>
<pre><code class="language-python"># tasks.py
from celery_app import app
from datetime import datetime
@app.task
def limpar_cache():
"""Remove itens expirados do cache"""
print(f"[{datetime.now()}] Limpando cache...")
Sua lógica de limpeza aqui
return "Cache limpo"
@app.task
def verificar_emails_pendentes():
"""Verifica emails aguardando envio"""
print(f"[{datetime.now()}] Verificando emails pendentes...")
Sua lógica aqui
return "Verificação concluída"
@app.task
def fazer_backup():
"""Faz backup do banco de dados"""
print(f"[{datetime.now()}] Iniciando backup...")
Sua lógica de backup
return "Backup concluído"
@app.task
def enviar_relatorio():
"""Envia relatório semanal"""
print(f"[{datetime.now()}] Gerando e enviando relatório...")
Sua lógica de relatório
return "Relatório enviado"</code></pre>
<h3>Executando o Beat Scheduler</h3>
<p>Em um terminal separado, inicie o Beat:</p>
<pre><code class="language-bash">celery -A celery_app beat --loglevel=info</code></pre>
<p>E deixe um ou mais workers rodando em outros terminais:</p>
<pre><code class="language-bash">celery -A celery_app worker --loglevel=info</code></pre>
<p>O Beat irá enfileirar as tarefas nos horários configurados, e os workers as executarão. Você verá no log do worker as tarefas sendo processadas automaticamente.</p>
<h3>Combinando Beat com roteamento de filas</h3>
<p>Para tarefas agendadas irem para filas específicas:</p>
<pre><code class="language-python"># celery_app.py
app.conf.beat_schedule = {
'backup-diario': {
'task': 'tasks.fazer_backup',
'schedule': crontab(hour=3, minute=0),
'options': {'queue': 'processamento', 'priority': 8}
},
}</code></pre>
<h2>Monitoramento, Tratamento de Erros e Boas Práticas</h2>
<h3>Tratamento de erros e retry automático</h3>
<p>Tarefas podem falhar por vários motivos. Celery oferece mecanismos de retry inteligentes:</p>
<pre><code class="language-python"># tasks.py
from celery_app import app
from celery import Task
import requests
class CallbackTask(Task):
"""Task base com callback para erros"""
autoretry_for = (Exception,)
retry_kwargs = {'max_retries': 3}
retry_backoff = True # Espera exponencial entre tentativas
retry_backoff_max = 600 # Máximo de 10 minutos de espera
retry_jitter = True # Adiciona aleatoriedade para evitar thundering herd
@app.task(bind=True, base=CallbackTask)
def chamar_api_externa(self, url):
"""Chama API externa com retry automático"""
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
return {"status": "sucesso", "dados": response.json()}
except requests.RequestException as exc:
Tenta novamente em 60 segundos na primeira falha,
120 na segunda, 240 na terceira
raise self.retry(exc=exc, countdown=2 * self.request.retries 60)
@app.task(bind=True)
def tarefa_critica(self, dados):
"""Tarefa com retry customizado"""
try:
Sua lógica aqui
resultado = processar(dados)
return resultado
except ValueError as exc:
Não faz retry para ValueError
raise
except Exception as exc:
Faz retry exponencial
self.retry(exc=exc, countdown=10)
def processar(dados):
"""Função auxiliar"""
return {"processado": True}</code></pre>
<h3>Monitoramento com Flower</h3>
<p>Flower é uma interface web para monitorar Celery em tempo real:</p>
<pre><code class="language-bash">pip install flower
Inicie o Flower (acessível em http://localhost:5555)
celery -A celery_app flower</code></pre>
<p>No Flower você pode:</p>
<ul>
<li>Ver workers conectados e seu status</li>
<li>Monitorar tarefas em tempo real</li>
<li>Ver histórico de execução</li>
<li>Executar tarefas manualmente</li>
<li>Visualizar logs detalhados</li>
</ul>
<h3>Padrão de idempotência para tarefas</h3>
<p>Tarefas podem ser executadas mais de uma vez (se o worker falhar ou por deduplicação). Garanta que são idempotentes:</p>
<pre><code class="language-python"># tasks.py
from celery_app import app
import hashlib
Simulando um banco de dados
processados = set()
@app.task(bind=True)
def processar_pagamento(self, pedido_id, valor):
"""Processa pagamento de forma idempotente"""
Cria um ID único para este processamento
task_hash = hashlib.md5(
f"{pedido_id}-{valor}-{self.request.id}".encode()
).hexdigest()
Se já foi processado, retorna sem erro
if task_hash in processados:
print(f"Pagamento já processado (ID: {pedido_id})")
return {"status": "já_processado", "pedido_id": pedido_id}
Processa o pagamento
print(f"Processando pagamento: Pedido {pedido_id}, Valor R${valor}")
Sua lógica de pagamento aqui (chamar API do gateway, etc)
Marca como processado
processados.add(task_hash)
return {"status": "sucesso", "pedido_id": pedido_id, "valor": valor}</code></pre>
<h3>Exemplo completo com Flask</h3>
<p>Aqui está uma aplicação web completa usando Celery:</p>
<pre><code class="language-python"># app.py
from flask import Flask, jsonify, request
from tasks import enviar_email, processar_arquivo
from celery_app import app as celery_app
app = Flask(__name__)
@app.route('/enviar-email', methods=['POST'])
def enviar_email_endpoint():
"""Enfileira um email para envio assíncrono"""
dados = request.get_json()
Enfileira a tarefa sem esperar a resposta
task = enviar_email.apply_async(
args=(
dados['destinatario'],
dados['assunto'],
dados['corpo']
),
priority=dados.get('prioridade', 5)
)
return jsonify({
"mensagem": "Email enfileirado",
"task_id": task.id
}), 202
@app.route('/status/<task_id>', methods=['GET'])
def obter_status(task_id):
"""Retorna status de uma tarefa"""
task = celery_app.AsyncResult(task_id)
return jsonify({
"task_id": task_id,
"status": task.status,
"resultado": task.result if task.successful() else None
})
@app.route('/processar', methods=['POST'])
def processar_endpoint():
"""Enfileira processamento de arquivo"""
dados = request.get_json()
task = processar_arquivo.apply_async(
args=(dados['arquivo'],),
queue='processamento'
)
return jsonify({
"mensagem": "Processamento iniciado",
"task_id": task.id
}), 202
if __name__ == '__main__':
app.run(debug=True)</code></pre>
<p>Use a API assim:</p>
<pre><code class="language-bash"># Enfileira um email
curl -X POST http://localhost:5000/enviar-email \
-H "Content-Type: application/json" \
-d '{
"destinatario": "usuario@example.com",
"assunto": "Bem-vindo",
"corpo": "Teste de email",
"prioridade": 7
}'
Verifica o status (use a task_id retornada)
curl http://localhost:5000/status/abc123def456</code></pre>
<h2>Conclusão</h2>
<p>Você aprendeu que <strong>Celery resolve o problema de operações síncronas bloqueantes</strong> permitindo executar tarefas em background através de um broker de mensagens. Isso torna suas aplicações web responsivas mesmo com operações pesadas acontecendo simultaneamente.</p>
<p>O <strong>roteamento inteligente de tarefas em filas diferentes</strong> com prioridades oferece controle fino sobre como seu sistema processa trabalho, permitindo escalar verticalmente (mais workers) ou horizontalmente (máquinas diferentes) conforme necessário.</p>
<p>O <strong>Celery Beat traz agendamento confiável e distribuído</strong>, eliminando a necessidade de cron jobs tradicionais e integrando perfeitamente com sua arquitetura de workers. Combinado com tratamento de erros, retry exponencial e idempotência, você tem um sistema robusto capaz de processar bilhões de tarefas em produção.</p>
<h2>Referências</h2>
<ul>
<li><a href="https://docs.celeryproject.io/" target="_blank" rel="noopener noreferrer">Documentação Oficial do Celery</a></li>
<li><a href="https://docs.celeryproject.io/en/stable/userguide/routing.html" target="_blank" rel="noopener noreferrer">Celery Task Routing and Queues</a></li>
<li><a href="https://docs.celeryproject.io/en/stable/userguide/periodic-tasks.html" target="_blank" rel="noopener noreferrer">Celery Beat Scheduler</a></li>
<li><a href="https://flower.readthedocs.io/" target="_blank" rel="noopener noreferrer">Flower: Real-time Celery Web Monitor</a></li>
<li><a href="https://redis.io/documentation" target="_blank" rel="noopener noreferrer">Redis: In-Memory Data Structure Store</a></li>
</ul>
<p><!-- FIM --></p>