<h2>Introdução aos Padrões de Concorrência em Go</h2>
<p>Go foi projetada desde o início com concorrência em mente. Diferente de linguagens que adicionaram suporte a concorrência posteriormente, Go integra goroutines e canais como primitivas de primeira classe. Isso torna a linguagem naturalmente adequada para construir aplicações que processam múltiplos fluxos de trabalho simultaneamente, sem a complexidade típica de threads tradicionais.</p>
<p>Os padrões que estudaremos neste artigo — Pipeline, Fan-out, Fan-in e Worker Pool — não são exclusivos de Go, mas ganham elegância e simplicidade quando implementados com goroutines e canais. Compreender esses padrões é essencial para qualquer desenvolvedor Go que queira construir sistemas escaláveis, eficientes e maintíveis. Vamos explorar cada um progressivamente, começando pelos conceitos fundamentais e evoluindo para aplicações práticas complexas.</p>
<h2>Pipeline: Processamento em Cadeia</h2>
<h3>O Conceito</h3>
<p>Um pipeline é uma série de estágios de processamento conectados em sequência, onde a saída de um estágio é a entrada do próximo. Cada estágio executa uma transformação ou validação específica nos dados, e as goroutines se comunicam através de canais. Esse padrão é particularmente útil quando você precisa processar dados através de múltiplas etapas sequenciais, mantendo a concorrência entre elas.</p>
<p>A beleza de um pipeline em Go é que você pode processar dados de forma contínua sem precisar acumular resultados intermediários na memória. Dados fluem naturalmente de um estágio para o próximo, permitindo processamento eficiente mesmo com grandes volumes.</p>
<h3>Implementação Prática</h3>
<pre><code class="language-go">package main
import (
"fmt"
)
// Stage 1: Gera números
func generate(count int) <-chan int {
out := make(chan int)
go func() {
for i := 1; i <= count; i++ {
out <- i
}
close(out)
}()
return out
}
// Stage 2: Multiplica cada número por 2
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// Stage 3: Adiciona 10 a cada resultado
func addTen(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n + 10
}
close(out)
}()
return out
}
func main() {
// Conecta os estágios em um pipeline
results := addTen(square(generate(5)))
for result := range results {
fmt.Println(result) // 11, 14, 19, 26, 35
}
}</code></pre>
<p>Neste exemplo, criamos um pipeline de três estágios. O estágio de geração produz números de 1 a 5. O estágio de quadrado eleva cada número ao quadrado. O estágio final adiciona 10. Observe como cada função retorna apenas um canal de leitura (<code><-chan</code>), impedindo que estágios subsequentes escrevam nele — isso é uma prática defensiva importante.</p>
<h2>Fan-out e Fan-in: Distribuição e Agregação</h2>
<h3>Fan-out: Distribuindo Trabalho</h3>
<p>Fan-out é o padrão de distribuir uma entrada para múltiplos processadores que trabalham em paralelo. Use-o quando você tem um único fluxo de dados que precisa ser processado por vários workers independentemente. Isso acelera o processamento distribuindo a carga.</p>
<pre><code class="language-go">package main
import (
"fmt"
"sync"
)
// Distribui dados de um canal para múltiplos workers
func fanOut(in <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
ch := make(chan int)
channels[i] = ch
go func(out chan<- int) {
for val := range in {
out <- val
}
close(out)
}(ch)
}
return channels
}
// Processa os dados (simulando trabalho)
func process(in <-chan int, id int) <-chan string {
out := make(chan string)
go func() {
for val := range in {
out <- fmt.Sprintf("Worker %d processou: %d", id, val)
}
close(out)
}()
return out
}
func main() {
// Gera dados
data := make(chan int)
go func() {
for i := 1; i <= 10; i++ {
data <- i
}
close(data)
}()
// Fan-out: distribui para 3 workers
workers := fanOut(data, 3)
// Cada worker processa seus dados
results := make([]<-chan string, 3)
for i, worker := range workers {
results[i] = process(worker, i+1)
}
}</code></pre>
<h3>Fan-in: Agregando Resultados</h3>
<p>Fan-in é o inverso: múltiplos canais de entrada convergem em um único canal de saída. Use este padrão quando vários workers independentes produzem resultados que precisam ser coletados e processados juntos.</p>
<pre><code class="language-go">package main
import (
"fmt"
)
// Combina múltiplos canais em um único canal
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
// Lança uma goroutine para cada canal de entrada
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
for val := range c {
out <- val
}
wg.Done()
}(ch)
}
// Fecha o canal de saída quando todos os inputs terminarem
go func() {
wg.Wait()
close(out)
}()
return out
}
func producer(id int, count int) <-chan int {
out := make(chan int)
go func() {
for i := 1; i <= count; i++ {
out <- id*100 + i
}
close(out)
}()
return out
}
func main() {
// Múltiplos produtores
p1 := producer(1, 3)
p2 := producer(2, 3)
p3 := producer(3, 3)
// Fan-in: combina todos em um canal
results := fanIn(p1, p2, p3)
for result := range results {
fmt.Println(result)
}
}</code></pre>
<p>Note que usamos <code>sync.WaitGroup</code> para rastrear quando todos os canais de entrada foram processados. Isso garante que o canal de saída seja fechado apenas após todos os inputs terminarem, evitando deadlock.</p>
<h3>Combinando Fan-out e Fan-in</h3>
<p>A verdadeira potência emerge quando combinamos fan-out e fan-in. Fan-out distribui o trabalho, cada worker processa de forma independente, e fan-in recolhe os resultados.</p>
<pre><code class="language-go">package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, in <-chan int) <-chan string {
out := make(chan string)
go func() {
for val := range in {
time.Sleep(time.Millisecond * 100) // Simula trabalho
out <- fmt.Sprintf("Worker %d: %d^2 = %d", id, val, val*val)
}
close(out)
}()
return out
}
func main() {
// Entrada: números de 1 a 10
numbers := make(chan int)
go func() {
for i := 1; i <= 10; i++ {
numbers <- i
}
close(numbers)
}()
// Fan-out: distribui para 3 workers
numWorkers := 3
workers := make([]<-chan string, numWorkers)
for i := 0; i < numWorkers; i++ {
workers[i] = worker(i+1, numbers)
}
// Fan-in: coleta resultados
results := fanIn(workers...)
for result := range results {
fmt.Println(result)
}
}
func fanIn(channels ...<-chan string) <-chan string {
out := make(chan string)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan string) {
for val := range c {
out <- val
}
wg.Done()
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}</code></pre>
<h2>Worker Pool: Pool de Trabalho Reutilizável</h2>
<h3>O Conceito e Benefícios</h3>
<p>Um worker pool é um conjunto fixo de goroutines que aguardam por tarefas em um canal compartilhado. Em vez de criar uma nova goroutine para cada tarefa (o que pode ser caro com milhares de tarefas), reutilizamos um número limitado de workers. Esse padrão é ideal quando você tem muitas tarefas pequenas, quer controlar a concorrência máxima, ou precisa de um limite superior no consumo de recursos.</p>
<p>A diferença crítica entre fan-out/fan-in e worker pool é a quantidade: fan-out tipicamente distribui a entrada para um número pequeno de workers, enquanto um pool recebe múltiplas tarefas e as distribui entre workers reutilizáveis. Worker pool é mais adequado para sistemas com taxa de chegada variável de trabalho.</p>
<h3>Implementação Clássica</h3>
<pre><code class="language-go">package main
import (
"fmt"
"sync"
)
// Task representa uma unidade de trabalho
type Task struct {
ID int
Value int
}
// Result representa o resultado de uma tarefa
type Result struct {
TaskID int
Result int
Error error
}
// WorkerPool gerencia um conjunto de workers
type WorkerPool struct {
tasks chan Task
results chan Result
wg sync.WaitGroup
}
// NewWorkerPool cria um novo pool com numWorkers goroutines
func NewWorkerPool(numWorkers int) *WorkerPool {
return &WorkerPool{
tasks: make(chan Task, 10), // Buffer pequeno
results: make(chan Result, 10),
}
}
// Start inicia os workers
func (wp *WorkerPool) Start(numWorkers int) {
for i := 0; i < numWorkers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
// worker é executado por cada goroutine do pool
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for task := range wp.tasks {
// Processa a tarefa
result := task.Value * task.Value
wp.results <- Result{
TaskID: task.ID,
Result: result,
}
}
}
// Submit adiciona uma tarefa ao pool
func (wp *WorkerPool) Submit(task Task) {
wp.tasks <- task
}
// Close fecha o pool e aguarda conclusão
func (wp *WorkerPool) Close() {
close(wp.tasks)
wp.wg.Wait()
close(wp.results)
}
func main() {
pool := NewWorkerPool(0)
pool.Start(4) // 4 workers
// Submete 20 tarefas
go func() {
for i := 1; i <= 20; i++ {
pool.Submit(Task{ID: i, Value: i})
}
pool.Close()
}()
// Coleta resultados
for result := range pool.results {
fmt.Printf("Task %d: %d\n", result.TaskID, result.Result)
}
}</code></pre>
<h3>Worker Pool Avançado com Contexto</h3>
<p>Em aplicações reais, frequentemente precisamos cancelar tarefas, implementar timeouts ou parar o pool graciosamente. A abordagem abaixo usa <code>context.Context</code>:</p>
<pre><code class="language-go">package main
import (
"context"
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Dur time.Duration
}
type JobResult struct {
JobID int
Value string
}
type Pool struct {
jobs chan Job
results chan JobResult
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewPool(ctx context.Context, numWorkers int) *Pool {
ctxWithCancel, cancel := context.WithCancel(ctx)
p := &Pool{
jobs: make(chan Job, 5),
results: make(chan JobResult, 5),
ctx: ctxWithCancel,
cancel: cancel,
}
for i := 0; i < numWorkers; i++ {
p.wg.Add(1)
go p.worker(i + 1)
}
return p
}
func (p *Pool) worker(id int) {
defer p.wg.Done()
for {
select {
case <-p.ctx.Done():
fmt.Printf("Worker %d terminando\n", id)
return
case job, ok := <-p.jobs:
if !ok {
return
}
// Simula processamento
time.Sleep(job.Dur)
p.results <- JobResult{
JobID: job.ID,
Value: fmt.Sprintf("Processado por worker %d", id),
}
}
}
}
func (p *Pool) Submit(job Job) {
select {
case p.jobs <- job:
case <-p.ctx.Done():
fmt.Println("Pool foi cancelado")
}
}
func (p *Pool) Shutdown() {
close(p.jobs)
p.wg.Wait()
p.cancel()
close(p.results)
}
func main() {
ctx := context.Background()
pool := NewPool(ctx, 3)
// Submete tarefas
for i := 1; i <= 10; i++ {
pool.Submit(Job{ID: i, Dur: time.Millisecond * 500})
}
// Coleta alguns resultados e cancela
for i := 0; i < 5; i++ {
result := <-pool.results
fmt.Printf("Job %d: %s\n", result.JobID, result.Value)
}
pool.Shutdown()
// Coleta resultados restantes
for result := range pool.results {
fmt.Printf("Job %d: %s\n", result.JobID, result.Value)
}
}</code></pre>
<p>Nesta implementação, o context permite que o caller cancele todas as operações pendentes simultaneamente. Cada worker verifica o contexto antes de processar uma nova tarefa, garantindo um shutdown limpo.</p>
<h2>Escolhendo o Padrão Certo</h2>
<h3>Pipeline</h3>
<p>Use pipeline quando você tem dados fluindo através de uma série de transformações sequenciais. Cada estágio é independente e pode executar simultaneamente. Exemplo: ler arquivo → parsear → validar → salvar.</p>
<h3>Fan-out / Fan-in</h3>
<p>Use fan-out e fan-in quando uma entrada precisa ser processada por múltiplos workers em paralelo, e você precisa agregar os resultados depois. Exemplo: análise de log distribuída, processamento de imagem com múltiplos filtros, ou scraping web com múltiplos clients.</p>
<h3>Worker Pool</h3>
<p>Use worker pool quando você tem muitas tarefas chegando continuamente, e quer controlar o nível de concorrência. Exemplo: processador de fila de mensagens, servidor web com limite de conexões simultâneas, ou batching de requisições.</p>
<pre><code class="language-go">// Exemplo prático: Combinando padrões em um problema real
package main
import (
"fmt"
"sync"
)
// Simula obtenção de URLs de uma fonte
func getURLs() <-chan string {
out := make(chan string)
go func() {
urls := []string{
"http://example.com/1", "http://example.com/2",
"http://example.com/3", "http://example.com/4",
"http://example.com/5", "http://example.com/6",
}
for _, url := range urls {
out <- url
}
close(out)
}()
return out
}
// Worker que "baixa" URLs
func downloader(id int, urls <-chan string) <-chan string {
out := make(chan string)
go func() {
for url := range urls {
out <- fmt.Sprintf("Worker %d baixou %s", id, url)
}
close(out)
}()
return out
}
func mergeResults(channels ...<-chan string) <-chan string {
out := make(chan string)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan string) {
for val := range c {
out <- val
}
wg.Done()
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Pipeline + Fan-out + Fan-in
urls := getURLs()
// Fan-out para 3 downloaders
results := make([]<-chan string, 3)
for i := 0; i < 3; i++ {
results[i] = downloader(i+1, urls)
}
// Fan-in coleta resultados
combined := mergeResults(results...)
for result := range combined {
fmt.Println(result)
}
}</code></pre>
<h2>Conclusão</h2>
<p>Estudamos quatro padrões fundamentais de concorrência em Go, cada um com seu propósito específico. O <strong>Pipeline</strong> permite processar dados através de múltiplos estágios sequenciais de forma eficiente, reutilizando goroutines para transformações diferentes. O <strong>Fan-out/Fan-in</strong> distribui uma entrada entre múltiplos processadores e depois agrega os resultados, oferecendo paralelismo transparente e elegante. O <strong>Worker Pool</strong> reutiliza um conjunto fixo de goroutines para processar muitas tarefas, controlando o consumo de recursos e evitando a explosão de goroutines.</p>
<p>A chave para dominar esses padrões é entender que eles não são isolados — frequentemente você combinará dois ou mais deles em uma solução real. Um sistema pode usar pipeline para estruturar o fluxo, fan-out para distribuição e um worker pool para escalar o processamento. Comece simples, teste seus canais com tipos bem definidos, e sempre considere o fechamento correto de canais e sincronização com <code>sync.WaitGroup</code> ou <code>context.Context</code>. A elegância de Go está em como esses primitivos simples (goroutines e canais) permitem construir sistemas sofisticados e maintíveis.</p>
<h2>Referências</h2>
<ul>
<li><a href="https://golang.org/doc/effective_go#concurrency" target="_blank" rel="noopener noreferrer">Effective Go: Concurrency</a> — Documentação oficial sobre concorrência em Go</li>
<li><a href="https://go.dev/blog/pipelines" target="_blank" rel="noopener noreferrer">Go Concurrency Patterns</a> — Blog oficial do Go com exemplos de pipelines e padrões</li>
<li><a href="https://go.dev/blog/io2013-talk-concurrency" target="_blank" rel="noopener noreferrer">Advanced Go Concurrency Patterns</a> — Artigo avançado sobre padrões sofisticados</li>
<li><a href="https://www.gopl.io/" target="_blank" rel="noopener noreferrer">The Go Programming Language (Capítulo 8)</a> — Livro clássico cobrindo concorrência em profundidade</li>
</ul>
<p><!-- FIM --></p>