JavaScript Avançado

Streams Avançados em Node.js: Transform, Duplex e Backpressure na Prática

7 min de leitura

Streams Avançados em Node.js: Transform, Duplex e Backpressure na Prática

Streams Avançados em Node.js: Transform, Duplex e Backpressure Streams são um dos pilares do Node.js para processar grandes volumes de dados de forma eficiente. Enquanto streams básicos (Readable e Writable) já resolvem muitos problemas, os padrões avançados — Transform, Duplex e o controle de backpressure — são essenciais para construir aplicações robustas e performáticas. Nesta aula, você aprenderá a dominar esses conceitos com exemplos práticos e imediatamente aplicáveis. Transform Streams: Transformando Dados em Tempo Real Um Transform stream é um stream que modifica os dados conforme passam por ele. Você implementa a lógica de transformação no método . É perfeito para casos como compressão, criptografia, parsing JSON em linhas ou conversão de formatos. Implementando um Transform Stream Note que recebe o chunk de dados, realiza a transformação e chama quando pronto. Você pode chamar múltiplas vezes se a transformação gerar vários chunks de saída. Um exemplo mais realista é um parser CSV que converte cada linha em JSON: Duplex Streams:

<h2>Streams Avançados em Node.js: Transform, Duplex e Backpressure</h2>

<p>Streams são um dos pilares do Node.js para processar grandes volumes de dados de forma eficiente. Enquanto streams básicos (Readable e Writable) já resolvem muitos problemas, os padrões avançados — Transform, Duplex e o controle de backpressure — são essenciais para construir aplicações robustas e performáticas. Nesta aula, você aprenderá a dominar esses conceitos com exemplos práticos e imediatamente aplicáveis.</p>

<h2>Transform Streams: Transformando Dados em Tempo Real</h2>

<p>Um Transform stream é um stream que modifica os dados conforme passam por ele. Você implementa a lógica de transformação no método <code>_transform()</code>. É perfeito para casos como compressão, criptografia, parsing JSON em linhas ou conversão de formatos.</p>

<h3>Implementando um Transform Stream</h3>

<pre><code class="language-javascript">const { Transform } = require(&#039;stream&#039;);

class UppercaseTransform extends Transform {

_transform(chunk, encoding, callback) {

// chunk é um Buffer com os dados

const transformed = chunk.toString().toUpperCase();

this.push(transformed);

callback(); // sinaliza que acabou a transformação

}

}

// Uso

const fs = require(&#039;fs&#039;);

fs.createReadStream(&#039;input.txt&#039;)

.pipe(new UppercaseTransform())

.pipe(fs.createWriteStream(&#039;output.txt&#039;));</code></pre>

<p>Note que <code>_transform()</code> recebe o chunk de dados, realiza a transformação e chama <code>callback()</code> quando pronto. Você pode chamar <code>this.push()</code> múltiplas vezes se a transformação gerar vários chunks de saída. Um exemplo mais realista é um parser CSV que converte cada linha em JSON:</p>

<pre><code class="language-javascript">const { Transform } = require(&#039;stream&#039;);

class CSVtoJSON extends Transform {

constructor(options = {}) {

super(options);

this.headers = null;

}

_transform(chunk, encoding, callback) {

const lines = chunk.toString().split(&#039;\n&#039;);

lines.forEach((line, index) =&gt; {

if (index === 0 &amp;&amp; !this.headers) {

this.headers = line.split(&#039;,&#039;);

return;

}

if (line.trim()) {

const values = line.split(&#039;,&#039;);

const obj = {};

this.headers.forEach((header, i) =&gt; {

obj[header] = values[i];

});

this.push(JSON.stringify(obj) + &#039;\n&#039;);

}

});

callback();

}

}

// Teste

fs.createReadStream(&#039;dados.csv&#039;)

.pipe(new CSVtoJSON())

.pipe(fs.createWriteStream(&#039;dados.json&#039;));</code></pre>

<h2>Duplex Streams: Leitura e Escrita Simultâneas</h2>

<p>Um Duplex stream é aquele que funciona como Readable e Writable ao mesmo tempo — dados entram por um lado, saem por outro. Exemplos reais incluem conexões TCP, WebSockets e pipes bidirecionais. Você implementa <code>_read()</code> e <code>_write()</code> separadamente.</p>

<h3>Criando um Duplex Stream</h3>

<pre><code class="language-javascript">const { Duplex } = require(&#039;stream&#039;);

const net = require(&#039;net&#039;);

class EchoServer extends Duplex {

constructor(options) {

super(options);

this.buffer = [];

}

_write(chunk, encoding, callback) {

// Quando dados chegam, ecoamos de volta

console.log(&#039;Recebido:&#039;, chunk.toString());

this.buffer.push(chunk);

callback();

}

_read(size) {

// Quando o outro lado quer ler, enviamos dados armazenados

if (this.buffer.length &gt; 0) {

this.push(this.buffer.shift());

} else {

this.push(null); // FIM do stream

}

}

}

// Teste manual

const echo = new EchoServer();

echo.write(&#039;Olá&#039;);

echo.write(&#039;Mundo&#039;);

echo.end();

echo.on(&#039;data&#039;, (chunk) =&gt; {

console.log(&#039;Lido:&#039;, chunk.toString());

});</code></pre>

<p>Um uso mais prático é um Duplex que funciona como um transformador bidirecional em uma conexão HTTP:</p>

<pre><code class="language-javascript">const { Duplex } = require(&#039;stream&#039;);

class BiDirectionalProcessor extends Duplex {

_write(chunk, encoding, callback) {

// Processa dados que chegam (escrita)

const processed = chunk.toString().toUpperCase();

console.log(&#039;→ Enviado ao cliente:&#039;, processed);

callback();

}

_read(size) {

// Simula dados sendo lidos do servidor

if (!this.done) {

this.push(&#039;Dados do servidor\n&#039;);

this.done = true;

} else {

this.push(null);

}

}

}

const processor = new BiDirectionalProcessor();

processor.pipe(process.stdout);

processor.write(&#039;request do cliente&#039;);

processor.end();</code></pre>

<h2>Backpressure: O Controle de Fluxo Crítico</h2>

<p>Backpressure ocorre quando o lado consumidor (Writable) não consegue processar dados tão rápido quanto o produtor (Readable) os envia. Se ignorado, acumula-se na memória. Node.js oferece mecanismos para detectar e resolver isso.</p>

<h3>Detectando e Tratando Backpressure</h3>

<p>Quando você chama <code>stream.write()</code>, ele retorna <code>false</code> se há backpressure. Além disso, o evento <code>&#039;drain&#039;</code> é emitido quando o buffer interno foi esvaziado:</p>

<pre><code class="language-javascript">const fs = require(&#039;fs&#039;);

const source = fs.createReadStream(&#039;arquivo-grande.txt&#039;);

const destination = fs.createWriteStream(&#039;copia.txt&#039;);

source.on(&#039;data&#039;, (chunk) =&gt; {

const continuar = destination.write(chunk);

if (!continuar) {

console.log(&#039;⚠️ Backpressure detectado!&#039;);

source.pause(); // Pausa leitura

}

});

destination.on(&#039;drain&#039;, () =&gt; {

console.log(&#039;✓ Buffer esvaziado, retomando leitura&#039;);

source.resume();

});

source.on(&#039;end&#039;, () =&gt; console.log(&#039;Concluído&#039;));</code></pre>

<p>A melhor prática, porém, é usar <code>pipe()</code> diretamente, que gerencia backpressure automaticamente:</p>

<pre><code class="language-javascript">// Automático e seguro — Node.js controla tudo

fs.createReadStream(&#039;grande.txt&#039;)

.pipe(new UppercaseTransform())

.pipe(fs.createWriteStream(&#039;resultado.txt&#039;));</code></pre>

<p>Se você implementar um Transform customizado, <strong>sempre respeite o retorno do <code>this.push()</code></strong>:</p>

<pre><code class="language-javascript">class SafeTransform extends Transform {

_transform(chunk, encoding, callback) {

const transformed = chunk.toString().toUpperCase();

// Se push() retorna false, há backpressure

if (!this.push(transformed)) {

console.log(&#039;⚠️ Backpressure no transform&#039;);

}

callback();

}

}</code></pre>

<h2>Conclusão</h2>

<p>Você aprendeu que <strong>Transform streams permitem modificar dados em pipeline</strong>, sendo fundamentais para processamento de grandes arquivos sem carregar tudo na memória. <strong>Duplex streams funcionam bidirecionalmente</strong>, ideais para protocolos de comunicação como HTTP/2 e WebSocket. E mais importante: <strong>backpressure é crítico</strong> — ignorá-lo causa vazamento de memória. Use sempre <code>pipe()</code> quando possível, ou implemente manualmente com <code>pause()</code>/<code>resume()</code> e o evento <code>drain</code>. Com esses três conceitos solidificados, você está pronto para construir aplicações escaláveis em Node.js.</p>

<h2>Referências</h2>

<ul>

<li><a href="https://nodejs.org/api/stream.html" target="_blank" rel="noopener noreferrer">Node.js Stream Documentation</a></li>

<li><a href="https://github.com/substack/stream-handbook" target="_blank" rel="noopener noreferrer">Stream Handbook by Substack</a></li>

<li><a href="https://nodejs.org/en/docs/guides/backpressuring-in-streams/" target="_blank" rel="noopener noreferrer">Backpressure Guide</a></li>

<li><a href="https://www.packtpub.com/product/high-performance-nodejs/" target="_blank" rel="noopener noreferrer">High Performance Node.js - Streams Chapter</a></li>

<li><a href="https://www.digitalocean.com/community/tutorials/nodejs-streams" target="_blank" rel="noopener noreferrer">Understanding Node.js Streams</a></li>

</ul>

Comentários

Mais em JavaScript Avançado

Dominando Testes Unitários Avançados com Vitest: Mocks, Spies e Stubs em Projetos Reais
Dominando Testes Unitários Avançados com Vitest: Mocks, Spies e Stubs em Projetos Reais

Introdução: Por que Mocks, Spies e Stubs são Essenciais Testes unitários avan...

Performance em React: memo, useMemo, useCallback e Profiler na Prática
Performance em React: memo, useMemo, useCallback e Profiler na Prática

Performance em React: memo, useMemo, useCallback e Profiler React é declarati...

Micro-frontends com React: Module Federation e Arquitetura Distribuída na Prática
Micro-frontends com React: Module Federation e Arquitetura Distribuída na Prática

O que são Micro-frontends e Module Federation Micro-frontends é uma arquitetu...