Cloud & Infraestrutura

Dominando Kinesis: Data Streams, Firehose e Analytics para Dados em Tempo Real em Projetos Reais

8 min de leitura

Dominando Kinesis: Data Streams, Firehose e Analytics para Dados em Tempo Real em Projetos Reais

Entendendo os Três Pilares do Kinesis O AWS Kinesis é um serviço gerenciado para processar dados em tempo real em larga escala. Existem três componentes principais: Kinesis Data Streams (para captura de dados brutos contínuos), Kinesis Firehose (para entrega automática em data lakes) e Kinesis Analytics (para análise SQL em tempo real). Cada um resolve um problema específico e frequentemente são usados em conjunto em arquiteturas modernas. Compreender quando usar cada um é fundamental. Streams oferece controle fino e baixa latência—ideal para processamento real-time exigente. Firehose simplifica a ingestão com transformações automáticas—perfeito para arquivos em S3 ou Redshift. Analytics permite consultas SQL sem escrever código de processamento—indicado para monitoramento e alertas rápidos. Kinesis Data Streams na Prática Produzindo Dados O Data Streams funciona como fila distribuída com shards paralelos. Você envia registros com uma que determina qual shard receberá o dado. Veja um produtor Python real: Consumindo Dados Consumidores leem dados em ordem dentro de cada shard. Este exemplo processa

<h2>Entendendo os Três Pilares do Kinesis</h2>

<p>O AWS Kinesis é um serviço gerenciado para processar dados em tempo real em larga escala. Existem três componentes principais: <strong>Kinesis Data Streams</strong> (para captura de dados brutos contínuos), <strong>Kinesis Firehose</strong> (para entrega automática em data lakes) e <strong>Kinesis Analytics</strong> (para análise SQL em tempo real). Cada um resolve um problema específico e frequentemente são usados em conjunto em arquiteturas modernas.</p>

<p>Compreender quando usar cada um é fundamental. Streams oferece controle fino e baixa latência—ideal para processamento real-time exigente. Firehose simplifica a ingestão com transformações automáticas—perfeito para arquivos em S3 ou Redshift. Analytics permite consultas SQL sem escrever código de processamento—indicado para monitoramento e alertas rápidos.</p>

<h2>Kinesis Data Streams na Prática</h2>

<h3>Produzindo Dados</h3>

<p>O Data Streams funciona como fila distribuída com shards paralelos. Você envia registros com uma <code>PartitionKey</code> que determina qual shard receberá o dado. Veja um produtor Python real:</p>

<pre><code class="language-python">import boto3

import json

from datetime import datetime

kinesis = boto3.client(&#039;kinesis&#039;, region_name=&#039;us-east-1&#039;)

def enviar_evento_usuario(user_id, acao):

payload = {

&#039;user_id&#039;: user_id,

&#039;acao&#039;: acao,

&#039;timestamp&#039;: datetime.utcnow().isoformat(),

&#039;origem&#039;: &#039;mobile-app&#039;

}

response = kinesis.put_record(

StreamName=&#039;eventos-usuarios&#039;,

Data=json.dumps(payload),

PartitionKey=str(user_id) # Agrupa eventos do mesmo usuário

)

print(f&quot;Registro {response[&#039;ShardId&#039;]}: {response[&#039;SequenceNumber&#039;]}&quot;)

Uso

enviar_evento_usuario(12345, &#039;login&#039;)

enviar_evento_usuario(12345, &#039;visualizou_produto&#039;)</code></pre>

<h3>Consumindo Dados</h3>

<p>Consumidores leem dados em ordem dentro de cada shard. Este exemplo processa eventos em tempo real:</p>

<pre><code class="language-python">import json

kinesis = boto3.client(&#039;kinesis&#039;)

def processar_stream():

response = kinesis.describe_stream(StreamName=&#039;eventos-usuarios&#039;)

shard_ids = [shard[&#039;ShardId&#039;] for shard in response[&#039;StreamDescription&#039;][&#039;Shards&#039;]]

for shard_id in shard_ids:

shard_iterator = kinesis.get_shard_iterator(

StreamName=&#039;eventos-usuarios&#039;,

ShardId=shard_id,

ShardIteratorType=&#039;LATEST&#039;

)[&#039;ShardIterator&#039;]

while shard_iterator:

records = kinesis.get_records(

ShardIterator=shard_iterator,

Limit=100

)

for record in records[&#039;Records&#039;]:

data = json.loads(record[&#039;Data&#039;])

print(f&quot;Processando: {data[&#039;acao&#039;]} do usuário {data[&#039;user_id&#039;]}&quot;)

shard_iterator = records[&#039;NextShardIterator&#039;]

processar_stream()</code></pre>

<p>Use <strong>Kinesis Client Library (KCL)</strong> ou Lambda com event source mappings para produção—eles gerenciam shards automaticamente.</p>

<h2>Firehose: Entrega Automatizada</h2>

<h3>Configuração e Transformação</h3>

<p>Firehose entrega dados em lotes para S3, Redshift ou Splunk com latência de 1-5 minutos. Você pode transformar registros com Lambda durante a ingestão:</p>

<pre><code class="language-python">import boto3

import json

import base64

firehose = boto3.client(&#039;firehose&#039;)

def lambda_handler(event, context):

&quot;&quot;&quot;Função Lambda chamada pelo Firehose para transformar registros&quot;&quot;&quot;

output = []

for record in event[&#039;records&#039;]:

data = json.loads(base64.b64decode(record[&#039;data&#039;]))

Transformação: adicionar campo processado

data[&#039;processado_em&#039;] = &#039;lambda-transform&#039;

data[&#039;preco&#039;] = float(data.get(&#039;preco&#039;, 0))

Re-codificar

transformed = {

&#039;recordId&#039;: record[&#039;recordId&#039;],

&#039;result&#039;: &#039;Ok&#039;,

&#039;data&#039;: base64.b64encode(

json.dumps(data).encode(&#039;utf-8&#039;)

).decode(&#039;utf-8&#039;)

}

output.append(transformed)

return {&#039;records&#039;: output}</code></pre>

<h3>Enviando para Firehose</h3>

<pre><code class="language-python">def enviar_para_data_lake(produto_id, preco, quantidade):

firehose.put_record(

DeliveryStreamName=&#039;vendas-data-lake&#039;,

Record={

&#039;Data&#039;: json.dumps({

&#039;produto_id&#039;: produto_id,

&#039;preco&#039;: preco,

&#039;quantidade&#039;: quantidade

})

}

)

Firehose agrupa 128 MB ou 15 minutos antes de escrever em S3

enviar_para_data_lake(&#039;PROD001&#039;, 99.90, 5)</code></pre>

<p>Firehose é ideal para pipelines ETL simples onde latência de minutos é aceitável. Elimina necessidade de gerenciar consumers, scaling ou checkpoints.</p>

<h2>Kinesis Analytics para Análise em Tempo Real</h2>

<h3>Consultas SQL em Streaming</h3>

<p>Analytics transforma dados Streams em tabelas que você consulta com SQL. Perfeito para alertas e agregações imediatas:</p>

<pre><code class="language-sql">-- Criar aplicação Analytics conectada ao Stream

CREATE APPLICATION vendas_tempo_real;

-- Criar input (source)

CREATE OR REPLACE STREAM SOURCE_SQL_STREAM (

product_id VARCHAR(32),

valor DOUBLE,

timestamp_evento BIGINT

);

-- Criar sink (resultado)

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (

categoria VARCHAR(32),

total_vendas DOUBLE,

quantidade_vendas BIGINT,

tempo_janela TIMESTAMP

);

-- Agregação com janela de tempo (tumbling window)

INSERT INTO DESTINATION_SQL_STREAM

SELECT

product_id,

SUM(valor) as total_vendas,

COUNT(*) as quantidade_vendas,

TUMBLE_START(ROWTIME, INTERVAL &#039;1&#039; MINUTE) as tempo_janela

FROM SOURCE_SQL_STREAM

GROUP BY TUMBLE(ROWTIME, INTERVAL &#039;1&#039; MINUTE), product_id

HAVING SUM(valor) &gt; 1000; -- Alerta: mais de R$ 1000 por minuto</code></pre>

<h3>Enviando Resultados</h3>

<p>Os resultados da Analytics podem ser consumidos por Lambda ou enviados direto para SNS para alertas:</p>

<pre><code class="language-python"># No seu código de alerta

if total_vendas &gt; 1000:

sns = boto3.client(&#039;sns&#039;)

sns.publish(

TopicArn=&#039;arn:aws:sns:us-east-1:123456789:alertas-vendas&#039;,

Subject=&#039;Alerta: Pico de Vendas&#039;,

Message=f&#039;Total: R$ {total_vendas} em {quantidade_vendas} transações&#039;

)</code></pre>

<p>Combine Streams → Analytics → SNS para um sistema de monitoramento robusto sem escrever código de processamento distribuído.</p>

<h2>Caso de Uso Integrado: Sistema de Fraude em Tempo Real</h2>

<p>Em um projeto real, você captura transações com Streams, processa com Analytics, enriquece com Lambda, e archiva no S3 via Firehose. Transações suspeitas disparam alertas instantaneamente enquanto dados históricos alimentam modelos de ML.</p>

<p>Essa arquitetura processa milhões de eventos diários com latência de segundos, escalando automaticamente. Custos são previsíveis (você paga por shards e volume processado) e operação é mínima comparada a Kafka self-hosted.</p>

<h2>Conclusão</h2>

<p><strong>Tres aprendizados principais</strong>: (1) Kinesis Data Streams oferece controle e baixa latência para processamento complexo—use quando precisar de throughput garantido e lógica customizada; (2) Firehose elimina complexidade operacional para entrega em data lakes—escolha quando latência de minutos é aceitável e transformações são simples; (3) Analytics ativa SQL em tempo real—implemente quando precisar de agregações, alertas e janelas de tempo sem escrever código distribuído.</p>

<p>Domine a escolha certa entre os três e você construirá pipelines de dados que escalam com seu negócio.</p>

<h2>Referências</h2>

<ul>

<li><a href="https://docs.aws.amazon.com/kinesis/" target="_blank" rel="noopener noreferrer">AWS Kinesis Documentation</a></li>

<li><a href="https://docs.aws.amazon.com/streams/latest/dev/best-practices.html" target="_blank" rel="noopener noreferrer">Kinesis Best Practices</a></li>

<li><a href="https://docs.aws.amazon.com/kinesisanalytics/latest/dev/what-is.html" target="_blank" rel="noopener noreferrer">Building Real-time Applications with Kinesis Analytics</a></li>

<li><a href="https://docs.aws.amazon.com/kinesis/latest/dev/lambda-preprocessing.html" target="_blank" rel="noopener noreferrer">AWS Kinesis Firehose Data Transformation</a></li>

<li><a href="https://aws.amazon.com/blogs/big-data/" target="_blank" rel="noopener noreferrer">Real-time Data Processing with AWS</a></li>

</ul>

Comentários

Mais em Cloud & Infraestrutura

Route 53: DNS, Health Checks e Roteamento de Tráfego Global na Prática
Route 53: DNS, Health Checks e Roteamento de Tráfego Global na Prática

Fundamentos do Route 53: DNS na AWS O Route 53 é o serviço de DNS gerenciado...

Como Usar ECS com Fargate: Task Definitions, Services e Service Discovery em Produção
Como Usar ECS com Fargate: Task Definitions, Services e Service Discovery em Produção

Fundamentos do ECS com Fargate: Arquitetura e Componentes O Amazon ECS (Elast...

Dominando AWS Lambda: Fundamentos, Triggers e Execution Model em Projetos Reais
Dominando AWS Lambda: Fundamentos, Triggers e Execution Model em Projetos Reais

Fundamentos do AWS Lambda AWS Lambda é um serviço de computação sem servidor...