Go

O que Todo Dev Deve Saber sobre Padrões de Concorrência em Go: Pipeline, Fan-out, Fan-in e Worker Pool

15 min de leitura

O que Todo Dev Deve Saber sobre Padrões de Concorrência em Go: Pipeline, Fan-out, Fan-in e Worker Pool

Introdução aos Padrões de Concorrência em Go Go foi projetada desde o início com concorrência em mente. Diferente de linguagens que adicionaram suporte a concorrência posteriormente, Go integra goroutines e canais como primitivas de primeira classe. Isso torna a linguagem naturalmente adequada para construir aplicações que processam múltiplos fluxos de trabalho simultaneamente, sem a complexidade típica de threads tradicionais. Os padrões que estudaremos neste artigo — Pipeline, Fan-out, Fan-in e Worker Pool — não são exclusivos de Go, mas ganham elegância e simplicidade quando implementados com goroutines e canais. Compreender esses padrões é essencial para qualquer desenvolvedor Go que queira construir sistemas escaláveis, eficientes e maintíveis. Vamos explorar cada um progressivamente, começando pelos conceitos fundamentais e evoluindo para aplicações práticas complexas. Pipeline: Processamento em Cadeia O Conceito Um pipeline é uma série de estágios de processamento conectados em sequência, onde a saída de um estágio é a entrada do próximo. Cada estágio executa uma transformação ou validação específica nos dados,

<h2>Introdução aos Padrões de Concorrência em Go</h2>

<p>Go foi projetada desde o início com concorrência em mente. Diferente de linguagens que adicionaram suporte a concorrência posteriormente, Go integra goroutines e canais como primitivas de primeira classe. Isso torna a linguagem naturalmente adequada para construir aplicações que processam múltiplos fluxos de trabalho simultaneamente, sem a complexidade típica de threads tradicionais.</p>

<p>Os padrões que estudaremos neste artigo — Pipeline, Fan-out, Fan-in e Worker Pool — não são exclusivos de Go, mas ganham elegância e simplicidade quando implementados com goroutines e canais. Compreender esses padrões é essencial para qualquer desenvolvedor Go que queira construir sistemas escaláveis, eficientes e maintíveis. Vamos explorar cada um progressivamente, começando pelos conceitos fundamentais e evoluindo para aplicações práticas complexas.</p>

<h2>Pipeline: Processamento em Cadeia</h2>

<h3>O Conceito</h3>

<p>Um pipeline é uma série de estágios de processamento conectados em sequência, onde a saída de um estágio é a entrada do próximo. Cada estágio executa uma transformação ou validação específica nos dados, e as goroutines se comunicam através de canais. Esse padrão é particularmente útil quando você precisa processar dados através de múltiplas etapas sequenciais, mantendo a concorrência entre elas.</p>

<p>A beleza de um pipeline em Go é que você pode processar dados de forma contínua sem precisar acumular resultados intermediários na memória. Dados fluem naturalmente de um estágio para o próximo, permitindo processamento eficiente mesmo com grandes volumes.</p>

<h3>Implementação Prática</h3>

<pre><code class="language-go">package main

import (

&quot;fmt&quot;

)

// Stage 1: Gera números

func generate(count int) &lt;-chan int {

out := make(chan int)

go func() {

for i := 1; i &lt;= count; i++ {

out &lt;- i

}

close(out)

}()

return out

}

// Stage 2: Multiplica cada número por 2

func square(in &lt;-chan int) &lt;-chan int {

out := make(chan int)

go func() {

for n := range in {

out &lt;- n * n

}

close(out)

}()

return out

}

// Stage 3: Adiciona 10 a cada resultado

func addTen(in &lt;-chan int) &lt;-chan int {

out := make(chan int)

go func() {

for n := range in {

out &lt;- n + 10

}

close(out)

}()

return out

}

func main() {

// Conecta os estágios em um pipeline

results := addTen(square(generate(5)))

for result := range results {

fmt.Println(result) // 11, 14, 19, 26, 35

}

}</code></pre>

<p>Neste exemplo, criamos um pipeline de três estágios. O estágio de geração produz números de 1 a 5. O estágio de quadrado eleva cada número ao quadrado. O estágio final adiciona 10. Observe como cada função retorna apenas um canal de leitura (<code>&lt;-chan</code>), impedindo que estágios subsequentes escrevam nele — isso é uma prática defensiva importante.</p>

<h2>Fan-out e Fan-in: Distribuição e Agregação</h2>

<h3>Fan-out: Distribuindo Trabalho</h3>

<p>Fan-out é o padrão de distribuir uma entrada para múltiplos processadores que trabalham em paralelo. Use-o quando você tem um único fluxo de dados que precisa ser processado por vários workers independentemente. Isso acelera o processamento distribuindo a carga.</p>

<pre><code class="language-go">package main

import (

&quot;fmt&quot;

&quot;sync&quot;

)

// Distribui dados de um canal para múltiplos workers

func fanOut(in &lt;-chan int, workers int) []&lt;-chan int {

channels := make([]&lt;-chan int, workers)

for i := 0; i &lt; workers; i++ {

ch := make(chan int)

channels[i] = ch

go func(out chan&lt;- int) {

for val := range in {

out &lt;- val

}

close(out)

}(ch)

}

return channels

}

// Processa os dados (simulando trabalho)

func process(in &lt;-chan int, id int) &lt;-chan string {

out := make(chan string)

go func() {

for val := range in {

out &lt;- fmt.Sprintf(&quot;Worker %d processou: %d&quot;, id, val)

}

close(out)

}()

return out

}

func main() {

// Gera dados

data := make(chan int)

go func() {

for i := 1; i &lt;= 10; i++ {

data &lt;- i

}

close(data)

}()

// Fan-out: distribui para 3 workers

workers := fanOut(data, 3)

// Cada worker processa seus dados

results := make([]&lt;-chan string, 3)

for i, worker := range workers {

results[i] = process(worker, i+1)

}

}</code></pre>

<h3>Fan-in: Agregando Resultados</h3>

<p>Fan-in é o inverso: múltiplos canais de entrada convergem em um único canal de saída. Use este padrão quando vários workers independentes produzem resultados que precisam ser coletados e processados juntos.</p>

<pre><code class="language-go">package main

import (

&quot;fmt&quot;

)

// Combina múltiplos canais em um único canal

func fanIn(channels ...&lt;-chan int) &lt;-chan int {

out := make(chan int)

var wg sync.WaitGroup

// Lança uma goroutine para cada canal de entrada

for _, ch := range channels {

wg.Add(1)

go func(c &lt;-chan int) {

for val := range c {

out &lt;- val

}

wg.Done()

}(ch)

}

// Fecha o canal de saída quando todos os inputs terminarem

go func() {

wg.Wait()

close(out)

}()

return out

}

func producer(id int, count int) &lt;-chan int {

out := make(chan int)

go func() {

for i := 1; i &lt;= count; i++ {

out &lt;- id*100 + i

}

close(out)

}()

return out

}

func main() {

// Múltiplos produtores

p1 := producer(1, 3)

p2 := producer(2, 3)

p3 := producer(3, 3)

// Fan-in: combina todos em um canal

results := fanIn(p1, p2, p3)

for result := range results {

fmt.Println(result)

}

}</code></pre>

<p>Note que usamos <code>sync.WaitGroup</code> para rastrear quando todos os canais de entrada foram processados. Isso garante que o canal de saída seja fechado apenas após todos os inputs terminarem, evitando deadlock.</p>

<h3>Combinando Fan-out e Fan-in</h3>

<p>A verdadeira potência emerge quando combinamos fan-out e fan-in. Fan-out distribui o trabalho, cada worker processa de forma independente, e fan-in recolhe os resultados.</p>

<pre><code class="language-go">package main

import (

&quot;fmt&quot;

&quot;sync&quot;

&quot;time&quot;

)

func worker(id int, in &lt;-chan int) &lt;-chan string {

out := make(chan string)

go func() {

for val := range in {

time.Sleep(time.Millisecond * 100) // Simula trabalho

out &lt;- fmt.Sprintf(&quot;Worker %d: %d^2 = %d&quot;, id, val, val*val)

}

close(out)

}()

return out

}

func main() {

// Entrada: números de 1 a 10

numbers := make(chan int)

go func() {

for i := 1; i &lt;= 10; i++ {

numbers &lt;- i

}

close(numbers)

}()

// Fan-out: distribui para 3 workers

numWorkers := 3

workers := make([]&lt;-chan string, numWorkers)

for i := 0; i &lt; numWorkers; i++ {

workers[i] = worker(i+1, numbers)

}

// Fan-in: coleta resultados

results := fanIn(workers...)

for result := range results {

fmt.Println(result)

}

}

func fanIn(channels ...&lt;-chan string) &lt;-chan string {

out := make(chan string)

var wg sync.WaitGroup

for _, ch := range channels {

wg.Add(1)

go func(c &lt;-chan string) {

for val := range c {

out &lt;- val

}

wg.Done()

}(ch)

}

go func() {

wg.Wait()

close(out)

}()

return out

}</code></pre>

<h2>Worker Pool: Pool de Trabalho Reutilizável</h2>

<h3>O Conceito e Benefícios</h3>

<p>Um worker pool é um conjunto fixo de goroutines que aguardam por tarefas em um canal compartilhado. Em vez de criar uma nova goroutine para cada tarefa (o que pode ser caro com milhares de tarefas), reutilizamos um número limitado de workers. Esse padrão é ideal quando você tem muitas tarefas pequenas, quer controlar a concorrência máxima, ou precisa de um limite superior no consumo de recursos.</p>

<p>A diferença crítica entre fan-out/fan-in e worker pool é a quantidade: fan-out tipicamente distribui a entrada para um número pequeno de workers, enquanto um pool recebe múltiplas tarefas e as distribui entre workers reutilizáveis. Worker pool é mais adequado para sistemas com taxa de chegada variável de trabalho.</p>

<h3>Implementação Clássica</h3>

<pre><code class="language-go">package main

import (

&quot;fmt&quot;

&quot;sync&quot;

)

// Task representa uma unidade de trabalho

type Task struct {

ID int

Value int

}

// Result representa o resultado de uma tarefa

type Result struct {

TaskID int

Result int

Error error

}

// WorkerPool gerencia um conjunto de workers

type WorkerPool struct {

tasks chan Task

results chan Result

wg sync.WaitGroup

}

// NewWorkerPool cria um novo pool com numWorkers goroutines

func NewWorkerPool(numWorkers int) *WorkerPool {

return &amp;WorkerPool{

tasks: make(chan Task, 10), // Buffer pequeno

results: make(chan Result, 10),

}

}

// Start inicia os workers

func (wp *WorkerPool) Start(numWorkers int) {

for i := 0; i &lt; numWorkers; i++ {

wp.wg.Add(1)

go wp.worker(i)

}

}

// worker é executado por cada goroutine do pool

func (wp *WorkerPool) worker(id int) {

defer wp.wg.Done()

for task := range wp.tasks {

// Processa a tarefa

result := task.Value * task.Value

wp.results &lt;- Result{

TaskID: task.ID,

Result: result,

}

}

}

// Submit adiciona uma tarefa ao pool

func (wp *WorkerPool) Submit(task Task) {

wp.tasks &lt;- task

}

// Close fecha o pool e aguarda conclusão

func (wp *WorkerPool) Close() {

close(wp.tasks)

wp.wg.Wait()

close(wp.results)

}

func main() {

pool := NewWorkerPool(0)

pool.Start(4) // 4 workers

// Submete 20 tarefas

go func() {

for i := 1; i &lt;= 20; i++ {

pool.Submit(Task{ID: i, Value: i})

}

pool.Close()

}()

// Coleta resultados

for result := range pool.results {

fmt.Printf(&quot;Task %d: %d\n&quot;, result.TaskID, result.Result)

}

}</code></pre>

<h3>Worker Pool Avançado com Contexto</h3>

<p>Em aplicações reais, frequentemente precisamos cancelar tarefas, implementar timeouts ou parar o pool graciosamente. A abordagem abaixo usa <code>context.Context</code>:</p>

<pre><code class="language-go">package main

import (

&quot;context&quot;

&quot;fmt&quot;

&quot;sync&quot;

&quot;time&quot;

)

type Job struct {

ID int

Dur time.Duration

}

type JobResult struct {

JobID int

Value string

}

type Pool struct {

jobs chan Job

results chan JobResult

ctx context.Context

cancel context.CancelFunc

wg sync.WaitGroup

}

func NewPool(ctx context.Context, numWorkers int) *Pool {

ctxWithCancel, cancel := context.WithCancel(ctx)

p := &amp;Pool{

jobs: make(chan Job, 5),

results: make(chan JobResult, 5),

ctx: ctxWithCancel,

cancel: cancel,

}

for i := 0; i &lt; numWorkers; i++ {

p.wg.Add(1)

go p.worker(i + 1)

}

return p

}

func (p *Pool) worker(id int) {

defer p.wg.Done()

for {

select {

case &lt;-p.ctx.Done():

fmt.Printf(&quot;Worker %d terminando\n&quot;, id)

return

case job, ok := &lt;-p.jobs:

if !ok {

return

}

// Simula processamento

time.Sleep(job.Dur)

p.results &lt;- JobResult{

JobID: job.ID,

Value: fmt.Sprintf(&quot;Processado por worker %d&quot;, id),

}

}

}

}

func (p *Pool) Submit(job Job) {

select {

case p.jobs &lt;- job:

case &lt;-p.ctx.Done():

fmt.Println(&quot;Pool foi cancelado&quot;)

}

}

func (p *Pool) Shutdown() {

close(p.jobs)

p.wg.Wait()

p.cancel()

close(p.results)

}

func main() {

ctx := context.Background()

pool := NewPool(ctx, 3)

// Submete tarefas

for i := 1; i &lt;= 10; i++ {

pool.Submit(Job{ID: i, Dur: time.Millisecond * 500})

}

// Coleta alguns resultados e cancela

for i := 0; i &lt; 5; i++ {

result := &lt;-pool.results

fmt.Printf(&quot;Job %d: %s\n&quot;, result.JobID, result.Value)

}

pool.Shutdown()

// Coleta resultados restantes

for result := range pool.results {

fmt.Printf(&quot;Job %d: %s\n&quot;, result.JobID, result.Value)

}

}</code></pre>

<p>Nesta implementação, o context permite que o caller cancele todas as operações pendentes simultaneamente. Cada worker verifica o contexto antes de processar uma nova tarefa, garantindo um shutdown limpo.</p>

<h2>Escolhendo o Padrão Certo</h2>

<h3>Pipeline</h3>

<p>Use pipeline quando você tem dados fluindo através de uma série de transformações sequenciais. Cada estágio é independente e pode executar simultaneamente. Exemplo: ler arquivo → parsear → validar → salvar.</p>

<h3>Fan-out / Fan-in</h3>

<p>Use fan-out e fan-in quando uma entrada precisa ser processada por múltiplos workers em paralelo, e você precisa agregar os resultados depois. Exemplo: análise de log distribuída, processamento de imagem com múltiplos filtros, ou scraping web com múltiplos clients.</p>

<h3>Worker Pool</h3>

<p>Use worker pool quando você tem muitas tarefas chegando continuamente, e quer controlar o nível de concorrência. Exemplo: processador de fila de mensagens, servidor web com limite de conexões simultâneas, ou batching de requisições.</p>

<pre><code class="language-go">// Exemplo prático: Combinando padrões em um problema real

package main

import (

&quot;fmt&quot;

&quot;sync&quot;

)

// Simula obtenção de URLs de uma fonte

func getURLs() &lt;-chan string {

out := make(chan string)

go func() {

urls := []string{

&quot;http://example.com/1&quot;, &quot;http://example.com/2&quot;,

&quot;http://example.com/3&quot;, &quot;http://example.com/4&quot;,

&quot;http://example.com/5&quot;, &quot;http://example.com/6&quot;,

}

for _, url := range urls {

out &lt;- url

}

close(out)

}()

return out

}

// Worker que &quot;baixa&quot; URLs

func downloader(id int, urls &lt;-chan string) &lt;-chan string {

out := make(chan string)

go func() {

for url := range urls {

out &lt;- fmt.Sprintf(&quot;Worker %d baixou %s&quot;, id, url)

}

close(out)

}()

return out

}

func mergeResults(channels ...&lt;-chan string) &lt;-chan string {

out := make(chan string)

var wg sync.WaitGroup

for _, ch := range channels {

wg.Add(1)

go func(c &lt;-chan string) {

for val := range c {

out &lt;- val

}

wg.Done()

}(ch)

}

go func() {

wg.Wait()

close(out)

}()

return out

}

func main() {

// Pipeline + Fan-out + Fan-in

urls := getURLs()

// Fan-out para 3 downloaders

results := make([]&lt;-chan string, 3)

for i := 0; i &lt; 3; i++ {

results[i] = downloader(i+1, urls)

}

// Fan-in coleta resultados

combined := mergeResults(results...)

for result := range combined {

fmt.Println(result)

}

}</code></pre>

<h2>Conclusão</h2>

<p>Estudamos quatro padrões fundamentais de concorrência em Go, cada um com seu propósito específico. O <strong>Pipeline</strong> permite processar dados através de múltiplos estágios sequenciais de forma eficiente, reutilizando goroutines para transformações diferentes. O <strong>Fan-out/Fan-in</strong> distribui uma entrada entre múltiplos processadores e depois agrega os resultados, oferecendo paralelismo transparente e elegante. O <strong>Worker Pool</strong> reutiliza um conjunto fixo de goroutines para processar muitas tarefas, controlando o consumo de recursos e evitando a explosão de goroutines.</p>

<p>A chave para dominar esses padrões é entender que eles não são isolados — frequentemente você combinará dois ou mais deles em uma solução real. Um sistema pode usar pipeline para estruturar o fluxo, fan-out para distribuição e um worker pool para escalar o processamento. Comece simples, teste seus canais com tipos bem definidos, e sempre considere o fechamento correto de canais e sincronização com <code>sync.WaitGroup</code> ou <code>context.Context</code>. A elegância de Go está em como esses primitivos simples (goroutines e canais) permitem construir sistemas sofisticados e maintíveis.</p>

<h2>Referências</h2>

<ul>

<li><a href="https://golang.org/doc/effective_go#concurrency" target="_blank" rel="noopener noreferrer">Effective Go: Concurrency</a> — Documentação oficial sobre concorrência em Go</li>

<li><a href="https://go.dev/blog/pipelines" target="_blank" rel="noopener noreferrer">Go Concurrency Patterns</a> — Blog oficial do Go com exemplos de pipelines e padrões</li>

<li><a href="https://go.dev/blog/io2013-talk-concurrency" target="_blank" rel="noopener noreferrer">Advanced Go Concurrency Patterns</a> — Artigo avançado sobre padrões sofisticados</li>

<li><a href="https://www.gopl.io/" target="_blank" rel="noopener noreferrer">The Go Programming Language (Capítulo 8)</a> — Livro clássico cobrindo concorrência em profundidade</li>

</ul>

<p>&lt;!-- FIM --&gt;</p>

Comentários

Mais em Go

O que Todo Dev Deve Saber sobre SQLC em Go: Gerando Código Tipado a partir de Queries SQL
O que Todo Dev Deve Saber sobre SQLC em Go: Gerando Código Tipado a partir de Queries SQL

O que é SQLC e Por que Você Deveria Usar SQLC é uma ferramenta que gera códig...

Boas Práticas de CQRS e Event Sourcing em Go: Implementação Prática para Times Ágeis
Boas Práticas de CQRS e Event Sourcing em Go: Implementação Prática para Times Ágeis

Entendendo CQRS: O Padrão de Separação de Responsabilidades CQRS significa Co...

Como Usar Deploy de APIs Go em Produção: VPS, Kubernetes e GitHub Actions em Produção
Como Usar Deploy de APIs Go em Produção: VPS, Kubernetes e GitHub Actions em Produção

Introdução: Por Que Go é Ideal para APIs em Produção Go é uma linguagem compi...