<h2>Entendendo os Três Pilares do Kinesis</h2>
<p>O AWS Kinesis é um serviço gerenciado para processar dados em tempo real em larga escala. Existem três componentes principais: <strong>Kinesis Data Streams</strong> (para captura de dados brutos contínuos), <strong>Kinesis Firehose</strong> (para entrega automática em data lakes) e <strong>Kinesis Analytics</strong> (para análise SQL em tempo real). Cada um resolve um problema específico e frequentemente são usados em conjunto em arquiteturas modernas.</p>
<p>Compreender quando usar cada um é fundamental. Streams oferece controle fino e baixa latência—ideal para processamento real-time exigente. Firehose simplifica a ingestão com transformações automáticas—perfeito para arquivos em S3 ou Redshift. Analytics permite consultas SQL sem escrever código de processamento—indicado para monitoramento e alertas rápidos.</p>
<h2>Kinesis Data Streams na Prática</h2>
<h3>Produzindo Dados</h3>
<p>O Data Streams funciona como fila distribuída com shards paralelos. Você envia registros com uma <code>PartitionKey</code> que determina qual shard receberá o dado. Veja um produtor Python real:</p>
<pre><code class="language-python">import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def enviar_evento_usuario(user_id, acao):
payload = {
'user_id': user_id,
'acao': acao,
'timestamp': datetime.utcnow().isoformat(),
'origem': 'mobile-app'
}
response = kinesis.put_record(
StreamName='eventos-usuarios',
Data=json.dumps(payload),
PartitionKey=str(user_id) # Agrupa eventos do mesmo usuário
)
print(f"Registro {response['ShardId']}: {response['SequenceNumber']}")
Uso
enviar_evento_usuario(12345, 'login')
enviar_evento_usuario(12345, 'visualizou_produto')</code></pre>
<h3>Consumindo Dados</h3>
<p>Consumidores leem dados em ordem dentro de cada shard. Este exemplo processa eventos em tempo real:</p>
<pre><code class="language-python">import json
kinesis = boto3.client('kinesis')
def processar_stream():
response = kinesis.describe_stream(StreamName='eventos-usuarios')
shard_ids = [shard['ShardId'] for shard in response['StreamDescription']['Shards']]
for shard_id in shard_ids:
shard_iterator = kinesis.get_shard_iterator(
StreamName='eventos-usuarios',
ShardId=shard_id,
ShardIteratorType='LATEST'
)['ShardIterator']
while shard_iterator:
records = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100
)
for record in records['Records']:
data = json.loads(record['Data'])
print(f"Processando: {data['acao']} do usuário {data['user_id']}")
shard_iterator = records['NextShardIterator']
processar_stream()</code></pre>
<p>Use <strong>Kinesis Client Library (KCL)</strong> ou Lambda com event source mappings para produção—eles gerenciam shards automaticamente.</p>
<h2>Firehose: Entrega Automatizada</h2>
<h3>Configuração e Transformação</h3>
<p>Firehose entrega dados em lotes para S3, Redshift ou Splunk com latência de 1-5 minutos. Você pode transformar registros com Lambda durante a ingestão:</p>
<pre><code class="language-python">import boto3
import json
import base64
firehose = boto3.client('firehose')
def lambda_handler(event, context):
"""Função Lambda chamada pelo Firehose para transformar registros"""
output = []
for record in event['records']:
data = json.loads(base64.b64decode(record['data']))
Transformação: adicionar campo processado
data['processado_em'] = 'lambda-transform'
data['preco'] = float(data.get('preco', 0))
Re-codificar
transformed = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(
json.dumps(data).encode('utf-8')
).decode('utf-8')
}
output.append(transformed)
return {'records': output}</code></pre>
<h3>Enviando para Firehose</h3>
<pre><code class="language-python">def enviar_para_data_lake(produto_id, preco, quantidade):
firehose.put_record(
DeliveryStreamName='vendas-data-lake',
Record={
'Data': json.dumps({
'produto_id': produto_id,
'preco': preco,
'quantidade': quantidade
})
}
)
Firehose agrupa 128 MB ou 15 minutos antes de escrever em S3
enviar_para_data_lake('PROD001', 99.90, 5)</code></pre>
<p>Firehose é ideal para pipelines ETL simples onde latência de minutos é aceitável. Elimina necessidade de gerenciar consumers, scaling ou checkpoints.</p>
<h2>Kinesis Analytics para Análise em Tempo Real</h2>
<h3>Consultas SQL em Streaming</h3>
<p>Analytics transforma dados Streams em tabelas que você consulta com SQL. Perfeito para alertas e agregações imediatas:</p>
<pre><code class="language-sql">-- Criar aplicação Analytics conectada ao Stream
CREATE APPLICATION vendas_tempo_real;
-- Criar input (source)
CREATE OR REPLACE STREAM SOURCE_SQL_STREAM (
product_id VARCHAR(32),
valor DOUBLE,
timestamp_evento BIGINT
);
-- Criar sink (resultado)
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
categoria VARCHAR(32),
total_vendas DOUBLE,
quantidade_vendas BIGINT,
tempo_janela TIMESTAMP
);
-- Agregação com janela de tempo (tumbling window)
INSERT INTO DESTINATION_SQL_STREAM
SELECT
product_id,
SUM(valor) as total_vendas,
COUNT(*) as quantidade_vendas,
TUMBLE_START(ROWTIME, INTERVAL '1' MINUTE) as tempo_janela
FROM SOURCE_SQL_STREAM
GROUP BY TUMBLE(ROWTIME, INTERVAL '1' MINUTE), product_id
HAVING SUM(valor) > 1000; -- Alerta: mais de R$ 1000 por minuto</code></pre>
<h3>Enviando Resultados</h3>
<p>Os resultados da Analytics podem ser consumidos por Lambda ou enviados direto para SNS para alertas:</p>
<pre><code class="language-python"># No seu código de alerta
if total_vendas > 1000:
sns = boto3.client('sns')
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789:alertas-vendas',
Subject='Alerta: Pico de Vendas',
Message=f'Total: R$ {total_vendas} em {quantidade_vendas} transações'
)</code></pre>
<p>Combine Streams → Analytics → SNS para um sistema de monitoramento robusto sem escrever código de processamento distribuído.</p>
<h2>Caso de Uso Integrado: Sistema de Fraude em Tempo Real</h2>
<p>Em um projeto real, você captura transações com Streams, processa com Analytics, enriquece com Lambda, e archiva no S3 via Firehose. Transações suspeitas disparam alertas instantaneamente enquanto dados históricos alimentam modelos de ML.</p>
<p>Essa arquitetura processa milhões de eventos diários com latência de segundos, escalando automaticamente. Custos são previsíveis (você paga por shards e volume processado) e operação é mínima comparada a Kafka self-hosted.</p>
<h2>Conclusão</h2>
<p><strong>Tres aprendizados principais</strong>: (1) Kinesis Data Streams oferece controle e baixa latência para processamento complexo—use quando precisar de throughput garantido e lógica customizada; (2) Firehose elimina complexidade operacional para entrega em data lakes—escolha quando latência de minutos é aceitável e transformações são simples; (3) Analytics ativa SQL em tempo real—implemente quando precisar de agregações, alertas e janelas de tempo sem escrever código distribuído.</p>
<p>Domine a escolha certa entre os três e você construirá pipelines de dados que escalam com seu negócio.</p>
<h2>Referências</h2>
<ul>
<li><a href="https://docs.aws.amazon.com/kinesis/" target="_blank" rel="noopener noreferrer">AWS Kinesis Documentation</a></li>
<li><a href="https://docs.aws.amazon.com/streams/latest/dev/best-practices.html" target="_blank" rel="noopener noreferrer">Kinesis Best Practices</a></li>
<li><a href="https://docs.aws.amazon.com/kinesisanalytics/latest/dev/what-is.html" target="_blank" rel="noopener noreferrer">Building Real-time Applications with Kinesis Analytics</a></li>
<li><a href="https://docs.aws.amazon.com/kinesis/latest/dev/lambda-preprocessing.html" target="_blank" rel="noopener noreferrer">AWS Kinesis Firehose Data Transformation</a></li>
<li><a href="https://aws.amazon.com/blogs/big-data/" target="_blank" rel="noopener noreferrer">Real-time Data Processing with AWS</a></li>
</ul>