<h2>Streams Avançados em Node.js: Transform, Duplex e Backpressure</h2>
<p>Streams são um dos pilares do Node.js para processar grandes volumes de dados de forma eficiente. Enquanto streams básicos (Readable e Writable) já resolvem muitos problemas, os padrões avançados — Transform, Duplex e o controle de backpressure — são essenciais para construir aplicações robustas e performáticas. Nesta aula, você aprenderá a dominar esses conceitos com exemplos práticos e imediatamente aplicáveis.</p>
<h2>Transform Streams: Transformando Dados em Tempo Real</h2>
<p>Um Transform stream é um stream que modifica os dados conforme passam por ele. Você implementa a lógica de transformação no método <code>_transform()</code>. É perfeito para casos como compressão, criptografia, parsing JSON em linhas ou conversão de formatos.</p>
<h3>Implementando um Transform Stream</h3>
<pre><code class="language-javascript">const { Transform } = require('stream');
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
// chunk é um Buffer com os dados
const transformed = chunk.toString().toUpperCase();
this.push(transformed);
callback(); // sinaliza que acabou a transformação
}
}
// Uso
const fs = require('fs');
fs.createReadStream('input.txt')
.pipe(new UppercaseTransform())
.pipe(fs.createWriteStream('output.txt'));</code></pre>
<p>Note que <code>_transform()</code> recebe o chunk de dados, realiza a transformação e chama <code>callback()</code> quando pronto. Você pode chamar <code>this.push()</code> múltiplas vezes se a transformação gerar vários chunks de saída. Um exemplo mais realista é um parser CSV que converte cada linha em JSON:</p>
<pre><code class="language-javascript">const { Transform } = require('stream');
class CSVtoJSON extends Transform {
constructor(options = {}) {
super(options);
this.headers = null;
}
_transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
lines.forEach((line, index) => {
if (index === 0 && !this.headers) {
this.headers = line.split(',');
return;
}
if (line.trim()) {
const values = line.split(',');
const obj = {};
this.headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(JSON.stringify(obj) + '\n');
}
});
callback();
}
}
// Teste
fs.createReadStream('dados.csv')
.pipe(new CSVtoJSON())
.pipe(fs.createWriteStream('dados.json'));</code></pre>
<h2>Duplex Streams: Leitura e Escrita Simultâneas</h2>
<p>Um Duplex stream é aquele que funciona como Readable e Writable ao mesmo tempo — dados entram por um lado, saem por outro. Exemplos reais incluem conexões TCP, WebSockets e pipes bidirecionais. Você implementa <code>_read()</code> e <code>_write()</code> separadamente.</p>
<h3>Criando um Duplex Stream</h3>
<pre><code class="language-javascript">const { Duplex } = require('stream');
const net = require('net');
class EchoServer extends Duplex {
constructor(options) {
super(options);
this.buffer = [];
}
_write(chunk, encoding, callback) {
// Quando dados chegam, ecoamos de volta
console.log('Recebido:', chunk.toString());
this.buffer.push(chunk);
callback();
}
_read(size) {
// Quando o outro lado quer ler, enviamos dados armazenados
if (this.buffer.length > 0) {
this.push(this.buffer.shift());
} else {
this.push(null); // FIM do stream
}
}
}
// Teste manual
const echo = new EchoServer();
echo.write('Olá');
echo.write('Mundo');
echo.end();
echo.on('data', (chunk) => {
console.log('Lido:', chunk.toString());
});</code></pre>
<p>Um uso mais prático é um Duplex que funciona como um transformador bidirecional em uma conexão HTTP:</p>
<pre><code class="language-javascript">const { Duplex } = require('stream');
class BiDirectionalProcessor extends Duplex {
_write(chunk, encoding, callback) {
// Processa dados que chegam (escrita)
const processed = chunk.toString().toUpperCase();
console.log('→ Enviado ao cliente:', processed);
callback();
}
_read(size) {
// Simula dados sendo lidos do servidor
if (!this.done) {
this.push('Dados do servidor\n');
this.done = true;
} else {
this.push(null);
}
}
}
const processor = new BiDirectionalProcessor();
processor.pipe(process.stdout);
processor.write('request do cliente');
processor.end();</code></pre>
<h2>Backpressure: O Controle de Fluxo Crítico</h2>
<p>Backpressure ocorre quando o lado consumidor (Writable) não consegue processar dados tão rápido quanto o produtor (Readable) os envia. Se ignorado, acumula-se na memória. Node.js oferece mecanismos para detectar e resolver isso.</p>
<h3>Detectando e Tratando Backpressure</h3>
<p>Quando você chama <code>stream.write()</code>, ele retorna <code>false</code> se há backpressure. Além disso, o evento <code>'drain'</code> é emitido quando o buffer interno foi esvaziado:</p>
<pre><code class="language-javascript">const fs = require('fs');
const source = fs.createReadStream('arquivo-grande.txt');
const destination = fs.createWriteStream('copia.txt');
source.on('data', (chunk) => {
const continuar = destination.write(chunk);
if (!continuar) {
console.log('⚠️ Backpressure detectado!');
source.pause(); // Pausa leitura
}
});
destination.on('drain', () => {
console.log('✓ Buffer esvaziado, retomando leitura');
source.resume();
});
source.on('end', () => console.log('Concluído'));</code></pre>
<p>A melhor prática, porém, é usar <code>pipe()</code> diretamente, que gerencia backpressure automaticamente:</p>
<pre><code class="language-javascript">// Automático e seguro — Node.js controla tudo
fs.createReadStream('grande.txt')
.pipe(new UppercaseTransform())
.pipe(fs.createWriteStream('resultado.txt'));</code></pre>
<p>Se você implementar um Transform customizado, <strong>sempre respeite o retorno do <code>this.push()</code></strong>:</p>
<pre><code class="language-javascript">class SafeTransform extends Transform {
_transform(chunk, encoding, callback) {
const transformed = chunk.toString().toUpperCase();
// Se push() retorna false, há backpressure
if (!this.push(transformed)) {
console.log('⚠️ Backpressure no transform');
}
callback();
}
}</code></pre>
<h2>Conclusão</h2>
<p>Você aprendeu que <strong>Transform streams permitem modificar dados em pipeline</strong>, sendo fundamentais para processamento de grandes arquivos sem carregar tudo na memória. <strong>Duplex streams funcionam bidirecionalmente</strong>, ideais para protocolos de comunicação como HTTP/2 e WebSocket. E mais importante: <strong>backpressure é crítico</strong> — ignorá-lo causa vazamento de memória. Use sempre <code>pipe()</code> quando possível, ou implemente manualmente com <code>pause()</code>/<code>resume()</code> e o evento <code>drain</code>. Com esses três conceitos solidificados, você está pronto para construir aplicações escaláveis em Node.js.</p>
<h2>Referências</h2>
<ul>
<li><a href="https://nodejs.org/api/stream.html" target="_blank" rel="noopener noreferrer">Node.js Stream Documentation</a></li>
<li><a href="https://github.com/substack/stream-handbook" target="_blank" rel="noopener noreferrer">Stream Handbook by Substack</a></li>
<li><a href="https://nodejs.org/en/docs/guides/backpressuring-in-streams/" target="_blank" rel="noopener noreferrer">Backpressure Guide</a></li>
<li><a href="https://www.packtpub.com/product/high-performance-nodejs/" target="_blank" rel="noopener noreferrer">High Performance Node.js - Streams Chapter</a></li>
<li><a href="https://www.digitalocean.com/community/tutorials/nodejs-streams" target="_blank" rel="noopener noreferrer">Understanding Node.js Streams</a></li>
</ul>