<h2>Introdução ao Tokio: Fundações de Assincronismo em Rust</h2>
<p>Tokio é o runtime assíncrono mais maduro e utilizado em Rust, baseado no modelo de trabalho com Tasks e um scheduler eficiente. Diferente de linguagens como JavaScript que possuem uma única event loop, Tokio usa múltiplas threads de trabalho por padrão, permitindo verdadeiro paralelismo em sistemas multicore. Antes de avançarmos para streams e concorrência, você precisa compreender que Tokio não cria threads para cada tarefa — em vez disso, agrupa múltiplas tasks em threads de trabalho através de work-stealing.</p>
<p>A chave para dominar Tokio é entender que código assíncrono em Rust é apenas syntactic sugar para máquinas de estado compiladas. Quando você escreve <code>async fn</code>, o compilador transforma isso em uma Future que pode ser pausada e retomada. O runtime Tokio gerencia quando essas Futures são executadas, sempre respeitando dependências de I/O e disponibilidade de recursos.</p>
<h2>Streams Assíncronos Avançados</h2>
<h3>O que são Streams e por que importam</h3>
<p>Um Stream assíncrono é essencialmente um Iterator que produz valores de forma assíncrona. Enquanto um Iterator tradicional é síncrono e bloqueia até produzir o próximo item, um Stream pode "aguardar" I/O ou outras operações sem bloquear a thread. A trait <code>Stream</code> do crate <code>futures</code> é o padrão, mas Tokio também oferece <code>tokio_stream</code> com implementações otimizadas.</p>
<pre><code class="language-rust">use tokio_stream::StreamExt;
use tokio::time::{interval, Duration};
#[tokio::main]
async fn main() {
// Criando um stream que emite a cada 100ms
let mut stream = interval(Duration::from_millis(100));
// Consumindo o stream com take() e filter()
let mut contador = 0;
while let Some(_) = stream.next().await {
contador += 1;
println!("Tick: {}", contador);
if contador >= 5 {
break;
}
}
}</code></pre>
<p>Neste exemplo, <code>interval()</code> cria um stream que emite periodicamente. O <code>StreamExt</code> trait nos dá acesso a combinadores como <code>take()</code>, <code>filter()</code>, <code>map()</code> — exatamente como Iterators, mas assincronamente. A grande diferença é que cada <code>.next().await</code> respeita o tempo de espera sem bloquear outras tasks.</p>
<h3>Composição de Streams com Combinadores</h3>
<p>Streams brilham quando você precisa transformar e compor fluxos de dados. Imagine um servidor que precisa processar requisições vindas de múltiplas fontes simultaneamente:</p>
<pre><code class="language-rust">use tokio_stream::{StreamExt, wrappers::ReceiverStream};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(32);
// Spawn tasks que enviam dados
tokio::spawn(async move {
for i in 0..10 {
let _ = tx.send(i).await;
tokio::time::sleep(Duration::from_millis(50)).await;
}
});
// Converter receiver em stream
let mut stream = ReceiverStream::new(rx);
// Aplicar transformações
let mut processado = stream
.map( | x | x * 2) .filter(|x| x % 3 != 0)
.take(5);
while let Some(valor) = processado.next().await {
println!("Processado: {}", valor);
}
}</code></pre>
<p>Aqui utilizamos channels do Tokio — <code>mpsc</code> (multi-producer, single-consumer) — e convertemos o receiver em um Stream via <code>ReceiverStream</code>. Os combinadores <code>map</code> e <code>filter</code> funcionam assincronamente, permitindo que transformações sejam aplicadas sem bloquear. O segredo é que cada item passa por toda a pipeline de transformação antes do próximo ser consumido.</p>
<h2>Concorrência Avançada com Tokio</h2>
<h3>Tasks, Spawning e Sincronização</h3>
<p>Tokio permite criar tasks independentes com <code>tokio::spawn()</code>. Diferente de threads do SO, tasks são extremamente leves — você pode ter milhares delas. O verdadeiro desafio em concorrência avançada é coordenar essas tasks sem deadlocks ou race conditions.</p>
<pre><code class="language-rust">use tokio::sync::{Mutex, RwLock, Barrier};
use std::sync::Arc;
#[tokio::main]
async fn main() {
let contador = Arc::new(Mutex::new(0));
let barrier = Arc::new(Barrier::new(3));
let mut handles = vec![];
for i in 0..3 {
let contador = Arc::clone(&contador);
let barrier = Arc::clone(&barrier);
let handle = tokio::spawn(async move {
// Sincronizar o início das 3 tasks
barrier.wait().await;
let mut num = contador.lock().await;
*num += i;
println!("Task {} incrementou para: {}", i, *num);
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
println!("Valor final: {}", *contador.lock().await);
}</code></pre>
<p>Aqui demonstro sincronização correta: <code>Barrier</code> força tasks a aguardar até que todas cheguem ao ponto de sincronização, <code>Mutex</code> protege dados compartilhados. Note que usamos <code>Arc</code> (Atomic Reference Counting) para compartilhar ownership — essencial em Rust. <code>Mutex::lock().await</code> é não-bloqueante, diferente do <code>std::sync::Mutex</code> que bloquearia a thread.</p>
<h3>Select e Timeouts para Operações Concorrentes</h3>
<p>Em cenários reais, você frequentemente precisa executar múltiplas operações concorrentes e reagir ao primeiro resultado ou a timeouts. Tokio oferece <code>tokio::select!</code> para isso:</p>
<pre><code class="language-rust">use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let mut intervalo = tokio::time::interval(Duration::from_millis(100));
loop {
tokio::select! {
_ = intervalo.tick() => {
println!("Tick!");
}
_ = async {
sleep(Duration::from_secs(2)).await
} => {
println!("Timeout de 2 segundos atingido!");
break;
}
}
}
}</code></pre>
<p><code>select!</code> é uma macro que monitora múltiplas Futures. Assim que uma delas completa, o código correspondente é executado. Isso é fundamental para timeouts, operações de fallback e tratamento de múltiplos eventos simultaneamente. Diferente de <code>join!</code>, que aguarda todas as Futures, <code>select!</code> é não-bloqueante e reativo.</p>
<h2>Padrões de Erro Comuns e Otimização</h2>
<p>Iniciantes frequentemente cometem erros ao misturar código síncrono com assíncrono. Nunca use <code>block_on()</code> dentro de uma task — isso paralisa o scheduler. Se você precisa chamar uma função bloqueante (como I/O de arquivo), use <code>tokio::task::block_in_place()</code> ou <code>tokio::task::spawn_blocking()</code>:</p>
<pre><code class="language-rust">#[tokio::main]
async fn main() {
let resultado = tokio::task::spawn_blocking(|| {
// Código síncrono pesado
std::thread::sleep(std::time::Duration::from_secs(1));
42
}).await.unwrap();
println!("Resultado: {}", resultado);
}</code></pre>
<p>Para otimização, sempre considere o tamanho do channel buffer (padrão 32 é conservador), ajuste o número de threads worker conforme sua carga, e use <code>tokio_util::codec</code> para protocolar streams de bytes eficientemente.</p>
<h2>Conclusão</h2>
<p>Dominando Tokio, você aprendeu que: <strong>(1) Streams assíncronos são Iterators que respeitam I/O não-bloqueante</strong>, permitindo composição elegante de pipelines de dados através de combinadores; <strong>(2) Concorrência em Tokio exige sincronização explícita com Mutex/RwLock assincronos, nunca síncrono</strong>, e <code>select!</code> é seu aliado para operações reativas; <strong>(3) O paradigma de task-based concurrency elimina a complexidade de threads tradicionais</strong>, mas exige compreensão de Futures e ownership para ser efetivo.</p>
<h2>Referências</h2>
<ul>
<li><a href="https://tokio.rs/" target="_blank" rel="noopener noreferrer">Tokio Official Documentation</a></li>
<li><a href="https://tokio.rs/tokio/tutorial" target="_blank" rel="noopener noreferrer">Tokio Tutorial - Async in Depth</a></li>
<li><a href="https://rust-lang.github.io/async-book/" target="_blank" rel="noopener noreferrer">Async Rust Book</a></li>
<li><a href="https://docs.rs/tokio-stream/latest/tokio_stream/" target="_blank" rel="noopener noreferrer">Tokio Stream Crate</a></li>
<li><a href="https://github.com/rust-lang/rustlings" target="_blank" rel="noopener noreferrer">Rustlings Async Exercises</a></li>
</ul>