<h2>DynamoDB Streams: Capturando Mudanças em Tempo Real</h2>
<p>DynamoDB Streams permite capturar modificações (insert, update, delete) em uma tabela e processá-las de forma assíncrona. Cada stream registra a sequência exata de alterações com informações da chave, imagem anterior e nova. Isso é essencial para arquiteturas que precisam sincronizar dados entre sistemas, alimentar data lakes ou disparar notificações em tempo real.</p>
<p>Para ativar streams, você configura na tabela o tipo de informação capturada: <code>KEYS_ONLY</code>, <code>NEW_IMAGE</code>, <code>OLD_IMAGE</code> ou <code>NEW_AND_OLD_IMAGES</code>. A integração natural com Lambda permite processar registros automaticamente sem gerenciar infraestrutura. Aqui está um exemplo funcional que processa eventos de stream:</p>
<pre><code class="language-python">import json
import boto3
from datetime import datetime
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('usuarios')
def lambda_handler(event, context):
"""Processa eventos do DynamoDB Stream"""
for record in event['Records']:
event_name = record['eventName'] # INSERT, MODIFY, REMOVE
if event_name == 'INSERT':
new_image = record['dynamodb']['NewImage']
user_id = new_image['userId']['S']
email = new_image['email']['S']
print(f"Novo usuário: {user_id} - {email}")
Aqui você poderia enviar email de boas-vindas via SNS
elif event_name == 'MODIFY':
old_image = record['dynamodb']['OldImage']
new_image = record['dynamodb']['NewImage']
print(f"Alteração detectada: {json.dumps(new_image)}")
Comparar e registrar mudanças em auditoria
elif event_name == 'REMOVE':
keys = record['dynamodb']['Keys']
print(f"Registro deletado: {keys}")
return {'statusCode': 200, 'body': 'Processado com sucesso'}</code></pre>
<h3>Configuração de Stream na Tabela</h3>
<p>Para criar uma tabela com stream ativo via Terraform ou CloudFormation, defina o parâmetro <code>StreamSpecification</code>. A melhor prática é usar <code>NEW_AND_OLD_IMAGES</code> apenas quando realmente necessário, pois aumenta o custo. Para auditoria simples, <code>KEYS_ONLY</code> é mais econômico.</p>
<p>---</p>
<h2>DynamoDB Transactions: Garantindo Consistência Multi-Registro</h2>
<p>Transactions no DynamoDB garantem que múltiplas operações sejam executadas atomicamente: todas bem-sucedidas ou todas revertidas. Isso é crítico em cenários como transferência entre contas, atualização de estoque com desconto aplicado ou criação de relacionamentos que não podem ficar inconsistentes.</p>
<p>Existem dois modos: <code>TransactWriteItems</code> (até 25 operações) e <code>TransactGetItems</code> (até 25 leituras). Ao contrário de SQL, não há locks de leitura no DynamoDB — use transações apenas quando escritas precisam ser atômicas. Veja um exemplo prático de transferência de pontos entre dois usuários:</p>
<pre><code class="language-python">import boto3
from botocore.exceptions import ClientError
dynamodb = boto3.client('dynamodb')
def transferir_pontos(usuario_origem, usuario_destino, pontos):
"""
Transfere pontos entre usuários de forma atômica.
Se qualquer operação falhar, ambas são revertidas.
"""
try:
response = dynamodb.transact_write_items(
TransactItems=[
{
'Update': {
'TableName': 'usuarios',
'Key': {'userId': {'S': usuario_origem}},
'UpdateExpression': 'SET pontos = pontos - :pts, atualizadoEm = :agora',
'ExpressionAttributeValues': {
':pts': {'N': str(pontos)},
':agora': {'S': datetime.now().isoformat()}
},
'ConditionExpression': 'pontos >= :pts' # Valida antes
}
},
{
'Update': {
'TableName': 'usuarios',
'Key': {'userId': {'S': usuario_destino}},
'UpdateExpression': 'SET pontos = pontos + :pts, atualizadoEm = :agora',
'ExpressionAttributeValues': {
':pts': {'N': str(pontos)},
':agora': {'S': datetime.now().isoformat()}
}
}
}
]
)
print("Transferência bem-sucedida")
return True
except ClientError as e:
if e.response['Error']['Code'] == 'TransactionCanceledException':
print(f"Transação cancelada: {e.response['Error']['Message']}")
Pode ser falta de saldo ou outro erro de validação
return False
from datetime import datetime</code></pre>
<h3>Padrões de Erro e Tratamento</h3>
<p>Exceções de transação requerem retry logic com exponential backoff. A chave é implementar idempotência: sua operação deve produzir o mesmo resultado se executada múltiplas vezes. Inclua um <code>requestId</code> único para detectar duplicatas.</p>
<p>---</p>
<h2>DAX Cache: Acelerando Leituras com Cache em Memória</h2>
<p>DAX (DynamoDB Accelerator) é um cache gerenciado que fica entre sua aplicação e DynamoDB, reduzindo latência de milissegundos para microsegundos e economizando unidades de leitura. É especialmente eficaz para workloads com hot keys (poucos registros acessados repetidamente).</p>
<p>DAX funciona transparentemente: você usa o mesmo SDK do DynamoDB, apenas apontando para o endpoint do cluster DAX. Caches são invalidados automaticamente quando você escreve via DAX, mantendo consistência. Veja como integrar:</p>
<pre><code class="language-python">import amazondax
import boto3
Criar cliente DAX em vez do DynamoDB normal
dax_endpoint = 'my-cluster.dax.amazonaws.com'
dax = amazondax.AmazonDaxClient.resource(endpoint_url=f'http://{dax_endpoint}:8111')
Usar normalmente como uma tabela DynamoDB
table = dax.Table('usuarios')
def buscar_usuario_com_cache(user_id):
"""
Primeira chamada consulta DynamoDB e armazena em cache (TTL padrão 5min).
Chamadas subsequentes vêm do cache sem custo de RCU.
"""
try:
response = table.get_item(Key={'userId': user_id})
user = response.get('Item')
print(f"Usuário {user_id} recuperado - TTL padrão: 5 minutos")
return user
except Exception as e:
print(f"Erro ao acessar DAX: {e}")
Fallback para DynamoDB direto
dynamodb = boto3.resource('dynamodb')
return dynamodb.Table('usuarios').get_item(Key={'userId': user_id}).get('Item')
Simular carga com acesso repetido
for i in range(100):
usuario = buscar_usuario_com_cache('user-123') # Mesmo user a cada iteração</code></pre>
<h3>Quando Usar DAX</h3>
<p>Use DAX quando você tem padrão de leitura repetida ou workload de leitura pesada. <strong>Não</strong> recomendo para aplicações com muitas escritas distintas ou quando dados mudam frequentemente. Custo típico: $0.25/hora para cluster com 3 nós. Analise ROI antes de implementar.</p>
<p>---</p>
<h2>Conclusão</h2>
<p>Dominando esses três pilares você constrói aplicações DynamoDB verdadeiramente resilientes. <strong>Streams</strong> permitem propagação de dados assíncrona sem acoplamento. <strong>Transactions</strong> garantem consistência onde importa: transferências, inventário, estados críticos. <strong>DAX</strong> multiplica performance de leitura quando você tem padrões previsíveis. A combinação desses recursos resolve 95% dos problemas reais em produção.</p>
<p>---</p>
<h2>Referências</h2>
<ul>
<li><a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html" target="_blank" rel="noopener noreferrer">AWS DynamoDB Streams Documentation</a></li>
<li><a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/transaction-apis.html" target="_blank" rel="noopener noreferrer">DynamoDB Transactions Guide</a></li>
<li><a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DAX.html" target="_blank" rel="noopener noreferrer">DAX Developer Guide</a></li>
<li><a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/best-practices.html" target="_blank" rel="noopener noreferrer">AWS DynamoDB Best Practices</a></li>
<li><a href="https://www.dynamodbbook.com" target="_blank" rel="noopener noreferrer">Designing with DynamoDB - Alex DeBrie</a></li>
</ul>