<h2>Redis: O Fundamento</h2>
<p>Redis é um armazenamento de dados em memória (in-memory data store) de código aberto que funciona como banco de dados chave-valor. Diferentemente de bancos de dados tradicionais que persistem dados em disco, Redis mantém tudo em RAM, o que o torna extraordinariamente rápido. Sua velocidade o qualifica para três casos de uso principais: cache, sistemas de pub/sub (publicador/assinante) e filas de processamento.</p>
<p>O diferencial do Redis é sua capacidade de trabalhar com estruturas de dados complexas — strings, listas, conjuntos, hashes e sorted sets — todas operadas atomicamente. Isso significa que operações são indivisíveis e ocorrem sem interferência de requisições concorrentes. Quando você precisa de subsistemas que trocam mensagens rapidamente ou precisam armazenar dados temporários com extrema performance, Redis é a ferramenta ideal. A biblioteca <code>redis-py</code> é o cliente Python oficial que abstrai a comunicação com o servidor Redis, permitindo que você interaja com toda essa potência através de uma API Python intuitiva.</p>
<h2>Instalação e Configuração Básica</h2>
<p>Antes de qualquer código, você precisa do servidor Redis rodando e da biblioteca <code>redis-py</code> instalada. Redis é disponibilizado em repositórios oficiais da maioria das distribuições Linux. No macOS, use Homebrew. No Windows, você pode usar WSL2 ou containers Docker.</p>
<pre><code class="language-bash"># Linux (Debian/Ubuntu)
sudo apt-get install redis-server
macOS
brew install redis
Iniciar o servidor Redis
redis-server
Em outro terminal, verificar se está rodando
redis-cli ping</code></pre>
<p>Com o servidor pronto, instale a biblioteca Python:</p>
<pre><code class="language-bash">pip install redis</code></pre>
<p>Agora você consegue conectar:</p>
<pre><code class="language-python">import redis
Conexão padrão localhost:6379
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
Testar a conexão
print(r.ping()) # Output: True</code></pre>
<p>O parâmetro <code>decode_responses=True</code> é essencial na maioria dos casos: ele converte automaticamente bytes retornados pelo Redis em strings Python. Sem ele, você receberia <code>b'valor'</code> em vez de <code>'valor'</code>, dificultando o trabalho. O parâmetro <code>db=0</code> especifica qual banco de dados Redis usar (0 a 15 por padrão).</p>
<h2>Cache com Redis</h2>
<h3>O Conceito de Cache</h3>
<p>Cache é um armazenamento temporário de dados frequentemente acessados. A estratégia funciona assim: antes de executar uma operação custosa (como uma consulta em banco de dados), você verifica se o resultado já existe no cache. Se existir, retorna-o imediatamente. Se não existir, executa a operação, armazena o resultado no cache para futuras requisições e retorna ao cliente. Isso reduz carga no banco de dados e diminui latência drasticamente.</p>
<p>Redis é ideal para cache porque oferece expiração automática de chaves (TTL — Time To Live). Você pode dizer "armazene este valor, mas Delete-o após 5 minutos". Isso é crítico em cache, pois dados que nunca expiram ocupam memória eternamente.</p>
<h3>Implementando Cache de Dados</h3>
<p>Vamos criar um exemplo prático: um sistema que busca informações de usuários de um banco de dados lento. Sem cache, cada requisição query o banco. Com cache, consultas subsequentes são servidas em microsegundos.</p>
<pre><code class="language-python">import redis
import json
import time
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
Simular uma operação custosa (leitura de BD)
def buscar_usuario_no_bd(user_id):
time.sleep(2) # Simula latência de BD
return {
'id': user_id,
'nome': f'Usuário {user_id}',
'email': f'user{user_id}@example.com'
}
Função com cache
def obter_usuario(user_id, cache_ttl=3600):
Criar uma chave única para este usuário
cache_key = f'usuario:{user_id}'
Tentar recuperar do cache
cached = r.get(cache_key)
if cached:
print(f'✓ Retornado do cache')
return json.loads(cached)
Cache miss: buscar do "BD"
print(f'✗ Cache miss - consultando BD')
usuario = buscar_usuario_no_bd(user_id)
Armazenar no cache com TTL
r.setex(cache_key, cache_ttl, json.dumps(usuario))
return usuario
Teste
print('Primeira requisição:')
user1 = obter_usuario(1) # Demora 2 segundos
print(user1)
print('\nSegunda requisição (mesmo usuário):')
user1_again = obter_usuario(1) # Instantâneo
print(user1_again)
print('\nTerceira requisição (usuário diferente):')
user2 = obter_usuario(2) # Demora 2 segundos novamente
print(user2)</code></pre>
<p><strong>Output esperado:</strong></p>
<pre><code>Primeira requisição:
✗ Cache miss - consultando BD
{'id': 1, 'nome': 'Usuário 1', 'email': 'user1@example.com'}
Segunda requisição (mesmo usuário):
✓ Retornado do cache
{'id': 1, 'nome': 'Usuário 1', 'email': 'user1@example.com'}
Terceira requisição (usuário diferente):
✗ Cache miss - consultando BD
{'id': 2, 'nome': 'Usuário 2', 'email': 'user2@example.com'}</code></pre>
<p>Aqui usamos <code>setex()</code> que é a fusão de <code>set()</code> + <code>expire()</code>: armazena um valor e define sua expiração em uma operação atômica. Após 3600 segundos, a chave é deletada automaticamente. Você também pode usar <code>set()</code> com parâmetro <code>ex</code>:</p>
<pre><code class="language-python">r.set(cache_key, json.dumps(usuario), ex=3600)</code></pre>
<h3>Estratégias de Invalidação</h3>
<p>Um cache que nunca é invalidado pode servir dados desatualizados. Existem estratégias para lidar com isso. A mais simples é a expiração por TTL (já vista). Outra é invalidar manualmente quando os dados mudam:</p>
<pre><code class="language-python">def atualizar_usuario(user_id, dados_novos):
Atualizar no BD (pseudo-código)
db.update_user(user_id, dados_novos)
Invalidar o cache
cache_key = f'usuario:{user_id}'
r.delete(cache_key)
print(f'Cache invalidado para usuário {user_id}')
atualizar_usuario(1, {'nome': 'Novo Nome'})
Próxima requisição de obter_usuario(1) terá cache miss e buscará dados novos</code></pre>
<p>Você também pode usar o padrão de invalidação em cascata com chaves padrão:</p>
<pre><code class="language-python"># Deletar todas as chaves que correspondem a um padrão
r.delete(r.keys('usuario:')) # Deleta todos os caches de usuários</code></pre>
<h2>Pub/Sub: Comunicação Entre Componentes</h2>
<h3>Entendendo Pub/Sub</h3>
<p>Pub/Sub (Publicador/Assinante) é um padrão de mensageria onde produtores de dados (publishers) enviam mensagens para tópicos sem conhecer quem as receberá. Consumidores (subscribers) se inscrevem em tópicos de interesse e recebem mensagens assim que são publicadas. Isso desacopla componentes do sistema: publishers não dependem de subscribers existirem, e novos subscribers podem ser adicionados sem alterar publishers.</p>
<p>Redis implementa Pub/Sub de forma elegante. Um publisher envia uma mensagem para um canal, e todos os subscribers daquele canal a recebem instantaneamente. É importante notar que Redis Pub/Sub <strong>não persiste mensagens</strong>: se nenhum subscriber estiver escutando quando a mensagem é publicada, ela se perde. Para persistência, use Streams ou filas (próxima seção).</p>
<h3>Implementando Pub/Sub</h3>
<p>Vamos criar um exemplo de notificações em tempo real: um serviço publica atualizações de pedidos, e múltiplos clientes as recebem.</p>
<pre><code class="language-python">import redis
import threading
import json
from datetime import datetime
def criar_publisher():
"""Simula um serviço que publica atualizações"""
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def publicar_atualizacao_pedido(pedido_id, status):
mensagem = {
'pedido_id': pedido_id,
'status': status,
'timestamp': datetime.now().isoformat()
}
Publicar no canal 'pedidos'
num_subscribers = r.publish('pedidos', json.dumps(mensagem))
print(f'[PUBLISHER] Mensagem publicada - {num_subscribers} subscribers receberam')
Simular publicações
import time
time.sleep(1) # Aguardar subscribers conectarem
publicar_atualizacao_pedido(101, 'confirmado')
time.sleep(1)
publicar_atualizacao_pedido(101, 'processando')
time.sleep(1)
publicar_atualizacao_pedido(101, 'enviado')
def criar_subscriber(nome_subscriber):
"""Simula um cliente que consome notificações"""
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
pubsub = r.pubsub()
Inscrever no canal
pubsub.subscribe('pedidos')
print(f'[{nome_subscriber}] Inscrito no canal pedidos')
Escutar mensagens
for mensagem in pubsub.listen():
if mensagem['type'] == 'message':
dados = json.loads(mensagem['data'])
print(f'[{nome_subscriber}] Recebeu: Pedido {dados["pedido_id"]} - {dados["status"]}')
Executar em threads paralelas
thread_pub = threading.Thread(target=criar_publisher)
thread_sub1 = threading.Thread(target=criar_subscriber, args=('Cliente1',))
thread_sub2 = threading.Thread(target=criar_subscriber, args=('Cliente2',))
thread_sub1.start()
thread_sub2.start()
thread_pub.start()
thread_sub1.join()
thread_sub2.join()
thread_pub.join()</code></pre>
<p><strong>Output esperado:</strong></p>
<pre><code>[Cliente1] Inscrito no canal pedidos
[Cliente2] Inscrito no canal pedidos
[PUBLISHER] Mensagem publicada - 2 subscribers receberam
[Cliente1] Recebeu: Pedido 101 - confirmado
[Cliente2] Recebeu: Pedido 101 - confirmado
[PUBLISHER] Mensagem publicada - 2 subscribers receberam
[Cliente1] Recebeu: Pedido 101 - processando
[Cliente2] Recebeu: Pedido 101 - processando
[PUBLISHER] Mensagem publicada - 2 subscribers receberam
[Cliente1] Recebeu: Pedido 101 - enviado
[Cliente2] Recebeu: Pedido 101 - enviado</code></pre>
<p>O método <code>listen()</code> cria um gerador que bloqueia aguardando mensagens. O loop roda indefinidamente até a conexão ser encerrada. Cada mensagem recebida é um dicionário com chaves <code>type</code> (tipo de evento: <code>'message'</code>, <code>'subscribe'</code>, etc.) e <code>data</code> (conteúdo da mensagem).</p>
<h3>Padrões de Inscrição</h3>
<p>Você pode inscrever-se em múltiplos canais ou usar padrões com wildcards:</p>
<pre><code class="language-python">pubsub = r.pubsub()
Inscrever em múltiplos canais
pubsub.subscribe('pedidos', 'notificacoes', 'alertas')
Inscrever em padrões (requer psubscribe)
pubsub.psubscribe('usuario:*:update') # Recebe de usuario:1:update, usuario:2:update, etc.
No publisher, publicar em canal específico
r.publish('usuario:42:update', json.dumps({'acao': 'perfil_atualizado'}))</code></pre>
<h2>Filas com Redis</h2>
<h3>Diferença Entre Pub/Sub e Filas</h3>
<p>Filas e Pub/Sub parecem similares, mas têm propósitos diferentes. Pub/Sub é <strong>um para muitos e não persiste</strong>: uma mensagem é entregue a todos os subscribers conectados <em>naquele momento</em>. Filas são <strong>um para um (ou múltiplos consumidores) e persistem</strong>: mensagens ficam armazenadas até serem consumidas, e cada mensagem é entregue a um único consumidor (em arquitetura de fila tradicional) ou processada uma única vez mesmo com múltiplos consumidores.</p>
<p>Redis não tem suporte nativo a filas com reconhecimento de entrega (como RabbitMQ), mas você pode implementar uma fila simples e eficiente usando listas. Para casos que exigem garantias de entrega e reprocessamento, use Redis Streams. Aqui focamos em filas com listas pela simplicidade didática.</p>
<h3>Implementando Filas com Listas</h3>
<p>Uma lista Redis é uma sequência ordenada de valores. As operações <code>LPUSH</code> (inserir à esquerda) e <code>RPOP</code> (remover à direita) criam uma estrutura FIFO (First In, First Out).</p>
<pre><code class="language-python">import redis
import json
import threading
import time
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def produtor_tarefas():
"""Enfileira tarefas"""
tarefas = [
{'id': 1, 'tipo': 'email', 'destinatario': 'user1@example.com'},
{'id': 2, 'tipo': 'sms', 'destinatario': '+5511999999999'},
{'id': 3, 'tipo': 'email', 'destinatario': 'user2@example.com'},
{'id': 4, 'tipo': 'push', 'destinatario': 'device_token_xyz'},
]
for tarefa in tarefas:
Enfileirar no final da fila
r.rpush('fila_tarefas', json.dumps(tarefa))
print(f'[PRODUTOR] Enfileirada tarefa {tarefa["id"]}')
time.sleep(0.5)
print('[PRODUTOR] Todas as tarefas enfileiradas')
def consumidor_tarefas(nome_consumidor):
"""Consome e processa tarefas"""
while True:
Remover do início da fila (ou aguardar)
tarefa_json = r.lpop('fila_tarefas')
if tarefa_json is None:
print(f'[{nome_consumidor}] Fila vazia, aguardando...')
time.sleep(1)
continue
tarefa = json.loads(tarefa_json)
print(f'[{nome_consumidor}] Processando tarefa {tarefa["id"]} ({tarefa["tipo"]})')
Simular processamento
time.sleep(1)
print(f'[{nome_consumidor}] Tarefa {tarefa["id"]} concluída')
Iniciar produtor e consumidores em threads paralelas
thread_prod = threading.Thread(target=produtor_tarefas)
thread_cons1 = threading.Thread(target=consumidor_tarefas, args=('Consumidor1',), daemon=True)
thread_cons2 = threading.Thread(target=consumidor_tarefas, args=('Consumidor2',), daemon=True)
thread_cons1.start()
thread_cons2.start()
thread_prod.start()
thread_prod.join()
time.sleep(10) # Aguardar consumidores processarem</code></pre>
<p><strong>Output esperado (pode variar em ordem):</strong></p>
<pre><code>[Consumidor1] Fila vazia, aguardando...
[Consumidor2] Fila vazia, aguardando...
[PRODUTOR] Enfileirada tarefa 1
[PRODUTOR] Enfileirada tarefa 2
[Consumidor1] Processando tarefa 1 (email)
[PRODUTOR] Enfileirada tarefa 3
[Consumidor2] Processando tarefa 2 (sms)
[PRODUTOR] Enfileirada tarefa 4
[PRODUTOR] Todas as tarefas enfileiradas
[Consumidor1] Tarefa 1 concluída
[Consumidor2] Tarefa 2 concluída
[Consumidor1] Processando tarefa 3 (email)
[Consumidor2] Processando tarefa 4 (push)
[Consumidor1] Tarefa 3 concluída
[Consumidor2] Tarefa 4 concluída</code></pre>
<p>Aqui, <code>rpush()</code> adiciona à direita e <code>lpop()</code> remove pela esquerda, garantindo ordem FIFO. Múltiplos consumidores podem trabalhar em paralelo, cada um pegando uma tarefa diferente.</p>
<h3>Bloqueando Pop para Eficiência</h3>
<p>Usar <code>lpop()</code> em loop com sleep é ineficiente: desperdiça CPU verificando a fila continuamente. Redis oferece <code>blpop()</code> (blocking left pop) que aguarda até que um elemento esteja disponível:</p>
<pre><code class="language-python">def consumidor_tarefas_bloqueante(nome_consumidor):
"""Versão mais eficiente com blocking pop"""
while True:
Aguardar até 0 segundos (indefinido) por uma tarefa
resultado = r.blpop('fila_tarefas', timeout=0)
if resultado is None:
print(f'[{nome_consumidor}] Timeout atingido')
continue
blpop retorna tupla (chave, valor)
chave, tarefa_json = resultado
tarefa = json.loads(tarefa_json)
print(f'[{nome_consumidor}] Processando tarefa {tarefa["id"]}')
Processar...
time.sleep(1)
print(f'[{nome_consumidor}] Concluída')</code></pre>
<p><code>blpop()</code> é superior porque não consome CPU enquanto aguarda. O timeout em segundos (0 = indefinido) define quanto tempo esperar. Se excedido sem nenhuma mensagem, retorna <code>None</code>.</p>
<h3>Filas com Prioridade</h3>
<p>Para tarefas com prioridades diferentes, use sorted sets:</p>
<pre><code class="language-python">def enfileirar_com_prioridade(chave_fila, tarefa, prioridade=0):
"""prioridade: -10 (alta) a 10 (baixa)"""
r.zadd(chave_fila, {json.dumps(tarefa): prioridade})
def desfileirar_com_prioridade(chave_fila):
"""Remove tarefa com maior prioridade (menor score)"""
resultado = r.zrange(chave_fila, 0, 0)
if resultado:
tarefa = resultado[0]
r.zrem(chave_fila, tarefa)
return json.loads(tarefa)
return None
Exemplo
enfileirar_com_prioridade('fila_prioritaria',
{'id': 1, 'tipo': 'email'},
prioridade=5)
enfileirar_com_prioridade('fila_prioritaria',
{'id': 2, 'tipo': 'alerta'},
prioridade=-10) # Alta prioridade
tarefa = desfileirar_com_prioridade('fila_prioritaria')
print(tarefa) # {'id': 2, 'tipo': 'alerta'} (menor score = maior prioridade)</code></pre>
<h2>Padrões Avançados e Boas Práticas</h2>
<h3>Connection Pooling</h3>
<p>Em aplicações reais, você não cria uma nova conexão Redis para cada operação. Conexões são custosas. Use connection pooling: um conjunto de conexões reutilizáveis mantidas abertas.</p>
<pre><code class="language-python">import redis
Pool padrão (máximo 50 conexões)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
Ou, mais simples, redis-py cria um pool automaticamente:
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
Internamente já usa pooling</code></pre>
<h3>Pipelines para Múltiplas Operações</h3>
<p>Se você precisa executar múltiplas operações Redis, use pipelines para agrupá-las e enviar em uma única requisição:</p>
<pre><code class="language-python"># Sem pipeline: 3 requisições
r.set('chave1', 'valor1')
r.set('chave2', 'valor2')
r.set('chave3', 'valor3')
Com pipeline: 1 requisição
pipe = r.pipeline()
pipe.set('chave1', 'valor1')
pipe.set('chave2', 'valor2')
pipe.set('chave3', 'valor3')
results = pipe.execute() # Execute todas de uma vez</code></pre>
<p>Pipelines reduzem latência de rede significativamente em operações em batch.</p>
<h3>Tratamento de Erros</h3>
<p>Redis pode ficar indisponível. Sempre trate exceções:</p>
<pre><code class="language-python">import redis
from redis.exceptions import ConnectionError, TimeoutError
try:
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
r.ping()
except ConnectionError:
print('Falha ao conectar ao Redis')
except TimeoutError:
print('Timeout na conexão com Redis')
Para operações, use try-except
try:
valor = r.get('chave')
except redis.RedisError as e:
print(f'Erro Redis: {e}')</code></pre>
<h3>Expiração e Limpeza</h3>
<p>Sempre defina TTL em dados temporários. Dados sem expiração causam vazamento de memória:</p>
<pre><code class="language-python"># Ruim: nenhuma expiração
r.set('sessao_user', json.dumps(dados_sessao))
Bom: expira em 24 horas
r.setex('sessao_user', 86400, json.dumps(dados_sessao))
Ou:
r.set('sessao_user', json.dumps(dados_sessao), ex=86400)</code></pre>
<p>Você pode monitorar memória usada com:</p>
<pre><code class="language-python">info = r.info('memory')
print(f"Memória usada: {info['used_memory_human']}")
print(f"Pico de memória: {info['used_memory_peak_human']}")</code></pre>
<h2>Conclusão</h2>
<p>Dominando Redis com Python, você adquiriu três superpoderes. Primeiro, <strong>cache inteligente</strong> que reduz carga massivamente em sistemas: dados frequentemente acessados são servidos em microsegundos, não segundos. Redis transforma performance de aplicações. Segundo, <strong>Pub/Sub para comunicação em tempo real</strong>: desacopla componentes do sistema permitindo que diferentes serviços se comuniquem sem conhecerem-se mutuamente, criando arquiteturas escaláveis e resilientes. Terceiro, <strong>filas robustas para processamento assíncrono</strong>: tarefas custosas são enfileiradas e processadas em background por workers, mantendo a aplicação responsiva.</p>
<p>A biblioteca <code>redis-py</code> abstrai a complexidade do protocolo Redis e oferece uma API Pythônica. Combine essas capacidades — cache + Pub/Sub + filas — e você resolve problemas que muitos desenvolvedores enfrentam com soluções caras e complexas. Use as práticas apresentadas: sempre defina TTLs, trate erros, use connection pooling e pipelines. Redis não é apenas um cache; é um alicerce para sistemas modernos de alta performance.</p>
<h2>Referências</h2>
<ul>
<li><a href="https://redis-py.readthedocs.io/" target="_blank" rel="noopener noreferrer">Documentação Oficial do redis-py</a></li>
<li><a href="https://redis.io/documentation" target="_blank" rel="noopener noreferrer">Documentação Oficial do Redis</a></li>
<li><a href="https://redis.io/docs/data-types/" target="_blank" rel="noopener noreferrer">Redis Data Types - Redis Official</a></li>
<li><a href="https://realpython.com/python-redis/" target="_blank" rel="noopener noreferrer">Building Applications with Redis - Real Python</a></li>
<li><a href="https://redis.io/topics/pubsub" target="_blank" rel="noopener noreferrer">Redis Pub/Sub Architecture - Redis Labs</a></li>
</ul>
<p><!-- FIM --></p>