Rust

O que Todo Dev Deve Saber sobre Streams Assíncronos e Concorrência Avançada com Tokio

8 min de leitura

O que Todo Dev Deve Saber sobre Streams Assíncronos e Concorrência Avançada com Tokio

Introdução ao Tokio: Fundações de Assincronismo em Rust Tokio é o runtime assíncrono mais maduro e utilizado em Rust, baseado no modelo de trabalho com Tasks e um scheduler eficiente. Diferente de linguagens como JavaScript que possuem uma única event loop, Tokio usa múltiplas threads de trabalho por padrão, permitindo verdadeiro paralelismo em sistemas multicore. Antes de avançarmos para streams e concorrência, você precisa compreender que Tokio não cria threads para cada tarefa — em vez disso, agrupa múltiplas tasks em threads de trabalho através de work-stealing. A chave para dominar Tokio é entender que código assíncrono em Rust é apenas syntactic sugar para máquinas de estado compiladas. Quando você escreve , o compilador transforma isso em uma Future que pode ser pausada e retomada. O runtime Tokio gerencia quando essas Futures são executadas, sempre respeitando dependências de I/O e disponibilidade de recursos. Streams Assíncronos Avançados O que são Streams e por que importam Um Stream assíncrono é essencialmente um

<h2>Introdução ao Tokio: Fundações de Assincronismo em Rust</h2>

<p>Tokio é o runtime assíncrono mais maduro e utilizado em Rust, baseado no modelo de trabalho com Tasks e um scheduler eficiente. Diferente de linguagens como JavaScript que possuem uma única event loop, Tokio usa múltiplas threads de trabalho por padrão, permitindo verdadeiro paralelismo em sistemas multicore. Antes de avançarmos para streams e concorrência, você precisa compreender que Tokio não cria threads para cada tarefa — em vez disso, agrupa múltiplas tasks em threads de trabalho através de work-stealing.</p>

<p>A chave para dominar Tokio é entender que código assíncrono em Rust é apenas syntactic sugar para máquinas de estado compiladas. Quando você escreve <code>async fn</code>, o compilador transforma isso em uma Future que pode ser pausada e retomada. O runtime Tokio gerencia quando essas Futures são executadas, sempre respeitando dependências de I/O e disponibilidade de recursos.</p>

<h2>Streams Assíncronos Avançados</h2>

<h3>O que são Streams e por que importam</h3>

<p>Um Stream assíncrono é essencialmente um Iterator que produz valores de forma assíncrona. Enquanto um Iterator tradicional é síncrono e bloqueia até produzir o próximo item, um Stream pode &quot;aguardar&quot; I/O ou outras operações sem bloquear a thread. A trait <code>Stream</code> do crate <code>futures</code> é o padrão, mas Tokio também oferece <code>tokio_stream</code> com implementações otimizadas.</p>

<pre><code class="language-rust">use tokio_stream::StreamExt;

use tokio::time::{interval, Duration};

#[tokio::main]

async fn main() {

// Criando um stream que emite a cada 100ms

let mut stream = interval(Duration::from_millis(100));

// Consumindo o stream com take() e filter()

let mut contador = 0;

while let Some(_) = stream.next().await {

contador += 1;

println!(&quot;Tick: {}&quot;, contador);

if contador &gt;= 5 {

break;

}

}

}</code></pre>

<p>Neste exemplo, <code>interval()</code> cria um stream que emite periodicamente. O <code>StreamExt</code> trait nos dá acesso a combinadores como <code>take()</code>, <code>filter()</code>, <code>map()</code> — exatamente como Iterators, mas assincronamente. A grande diferença é que cada <code>.next().await</code> respeita o tempo de espera sem bloquear outras tasks.</p>

<h3>Composição de Streams com Combinadores</h3>

<p>Streams brilham quando você precisa transformar e compor fluxos de dados. Imagine um servidor que precisa processar requisições vindas de múltiplas fontes simultaneamente:</p>

<pre><code class="language-rust">use tokio_stream::{StreamExt, wrappers::ReceiverStream};

use tokio::sync::mpsc;

#[tokio::main]

async fn main() {

let (tx, rx) = mpsc::channel(32);

// Spawn tasks que enviam dados

tokio::spawn(async move {

for i in 0..10 {

let _ = tx.send(i).await;

tokio::time::sleep(Duration::from_millis(50)).await;

}

});

// Converter receiver em stream

let mut stream = ReceiverStream::new(rx);

// Aplicar transformações

let mut processado = stream

.map( | x | x * 2) .filter(|x| x % 3 != 0)

.take(5);

while let Some(valor) = processado.next().await {

println!(&quot;Processado: {}&quot;, valor);

}

}</code></pre>

<p>Aqui utilizamos channels do Tokio — <code>mpsc</code> (multi-producer, single-consumer) — e convertemos o receiver em um Stream via <code>ReceiverStream</code>. Os combinadores <code>map</code> e <code>filter</code> funcionam assincronamente, permitindo que transformações sejam aplicadas sem bloquear. O segredo é que cada item passa por toda a pipeline de transformação antes do próximo ser consumido.</p>

<h2>Concorrência Avançada com Tokio</h2>

<h3>Tasks, Spawning e Sincronização</h3>

<p>Tokio permite criar tasks independentes com <code>tokio::spawn()</code>. Diferente de threads do SO, tasks são extremamente leves — você pode ter milhares delas. O verdadeiro desafio em concorrência avançada é coordenar essas tasks sem deadlocks ou race conditions.</p>

<pre><code class="language-rust">use tokio::sync::{Mutex, RwLock, Barrier};

use std::sync::Arc;

#[tokio::main]

async fn main() {

let contador = Arc::new(Mutex::new(0));

let barrier = Arc::new(Barrier::new(3));

let mut handles = vec![];

for i in 0..3 {

let contador = Arc::clone(&amp;contador);

let barrier = Arc::clone(&amp;barrier);

let handle = tokio::spawn(async move {

// Sincronizar o início das 3 tasks

barrier.wait().await;

let mut num = contador.lock().await;

*num += i;

println!(&quot;Task {} incrementou para: {}&quot;, i, *num);

});

handles.push(handle);

}

for handle in handles {

let _ = handle.await;

}

println!(&quot;Valor final: {}&quot;, *contador.lock().await);

}</code></pre>

<p>Aqui demonstro sincronização correta: <code>Barrier</code> força tasks a aguardar até que todas cheguem ao ponto de sincronização, <code>Mutex</code> protege dados compartilhados. Note que usamos <code>Arc</code> (Atomic Reference Counting) para compartilhar ownership — essencial em Rust. <code>Mutex::lock().await</code> é não-bloqueante, diferente do <code>std::sync::Mutex</code> que bloquearia a thread.</p>

<h3>Select e Timeouts para Operações Concorrentes</h3>

<p>Em cenários reais, você frequentemente precisa executar múltiplas operações concorrentes e reagir ao primeiro resultado ou a timeouts. Tokio oferece <code>tokio::select!</code> para isso:</p>

<pre><code class="language-rust">use tokio::time::{sleep, Duration};

#[tokio::main]

async fn main() {

let mut intervalo = tokio::time::interval(Duration::from_millis(100));

loop {

tokio::select! {

_ = intervalo.tick() =&gt; {

println!(&quot;Tick!&quot;);

}

_ = async {

sleep(Duration::from_secs(2)).await

} =&gt; {

println!(&quot;Timeout de 2 segundos atingido!&quot;);

break;

}

}

}

}</code></pre>

<p><code>select!</code> é uma macro que monitora múltiplas Futures. Assim que uma delas completa, o código correspondente é executado. Isso é fundamental para timeouts, operações de fallback e tratamento de múltiplos eventos simultaneamente. Diferente de <code>join!</code>, que aguarda todas as Futures, <code>select!</code> é não-bloqueante e reativo.</p>

<h2>Padrões de Erro Comuns e Otimização</h2>

<p>Iniciantes frequentemente cometem erros ao misturar código síncrono com assíncrono. Nunca use <code>block_on()</code> dentro de uma task — isso paralisa o scheduler. Se você precisa chamar uma função bloqueante (como I/O de arquivo), use <code>tokio::task::block_in_place()</code> ou <code>tokio::task::spawn_blocking()</code>:</p>

<pre><code class="language-rust">#[tokio::main]

async fn main() {

let resultado = tokio::task::spawn_blocking(|| {

// Código síncrono pesado

std::thread::sleep(std::time::Duration::from_secs(1));

42

}).await.unwrap();

println!(&quot;Resultado: {}&quot;, resultado);

}</code></pre>

<p>Para otimização, sempre considere o tamanho do channel buffer (padrão 32 é conservador), ajuste o número de threads worker conforme sua carga, e use <code>tokio_util::codec</code> para protocolar streams de bytes eficientemente.</p>

<h2>Conclusão</h2>

<p>Dominando Tokio, você aprendeu que: <strong>(1) Streams assíncronos são Iterators que respeitam I/O não-bloqueante</strong>, permitindo composição elegante de pipelines de dados através de combinadores; <strong>(2) Concorrência em Tokio exige sincronização explícita com Mutex/RwLock assincronos, nunca síncrono</strong>, e <code>select!</code> é seu aliado para operações reativas; <strong>(3) O paradigma de task-based concurrency elimina a complexidade de threads tradicionais</strong>, mas exige compreensão de Futures e ownership para ser efetivo.</p>

<h2>Referências</h2>

<ul>

<li><a href="https://tokio.rs/" target="_blank" rel="noopener noreferrer">Tokio Official Documentation</a></li>

<li><a href="https://tokio.rs/tokio/tutorial" target="_blank" rel="noopener noreferrer">Tokio Tutorial - Async in Depth</a></li>

<li><a href="https://rust-lang.github.io/async-book/" target="_blank" rel="noopener noreferrer">Async Rust Book</a></li>

<li><a href="https://docs.rs/tokio-stream/latest/tokio_stream/" target="_blank" rel="noopener noreferrer">Tokio Stream Crate</a></li>

<li><a href="https://github.com/rust-lang/rustlings" target="_blank" rel="noopener noreferrer">Rustlings Async Exercises</a></li>

</ul>

Comentários

Mais em Rust

Dominando Macros em Rust: macro_rules! e Metaprogramação em Projetos Reais
Dominando Macros em Rust: macro_rules! e Metaprogramação em Projetos Reais

Introdução às Macros em Rust Macros são uma das características mais poderosa...

O que Todo Dev Deve Saber sobre Funções, Expressões e o Sistema de Tipos de Rust
O que Todo Dev Deve Saber sobre Funções, Expressões e o Sistema de Tipos de Rust

Funções em Rust: Sintaxe, Parâmetros e Retornos Funções são a base da organiz...

Como Usar Iteradores em Rust: map, filter, fold e Lazy Evaluation em Produção
Como Usar Iteradores em Rust: map, filter, fold e Lazy Evaluation em Produção

Entendendo Iteradores em Rust Iteradores são um conceito fundamental em Rust...