All Posts

Concurrency Pattern: Architecting Scalable Systems with Worker Pools

January 27, 2025

Go

Concurrency

Performance

Concurrency in Go enables efficient parallel execution of tasks using goroutines, which are lightweight and memory-efficient units of work. While running a large number of goroutines can speed up processing, it can also significantly increase memory usage. Hence, it's really important to consider the complexity of the code inside each goroutine, as the complexity of the logic directly impacts resource consumption.

Background

During my previous internship, I was part of a team tasked with managing messaging services for the company. One of the key challenges we faced was efficiently sending messages to a large user base—around 20,000 users. Initially, we designed the process to handle this load by identifying eligible users and then batching the task into smaller chunks. Each chunk was assigned to a goroutine for parallel execution, taking advantage of Go's concurrency model to speed up the process.

However, despite our efforts to optimize the flow, we encountered a significant issue with an excessive memory consumption, which ultimately caused the service to crash with an out of memory error. Upon closer inspection, we realized that the approach of spawning too many goroutines, especially with resource-heavy tasks, overwhelmed the system. This experience highlighted the need for a more efficient method to handle large-scale concurrent operations—one that could better manage resources and ensure system stability under heavy load.

The Need for a Worker Pool

To resolve this, we shifted towards into a concurrency pattern, worker pools. Instead of spawning an unbounded number of goroutines, a worker pool allows us to control the number of concurrent workers executing tasks.

TaskWorker 1TaskWorker 2TaskWorker 3TaskWorker 4TaskTaskTaskWorker PoolTasks QueueTask's Waiting for pickup from workerexecutingexecutingexecutingexecuting
Worker Pool Illustration.

By limiting the number of workers and queuing tasks, we could ensure that the system remained efficient and resource-friendly, even under heavy concurrency. In previous issue, worker pool pattern helps us manage the distribution of messages while maintaining a balance between parallel execution and resource utilization.

The Hands-On

There are plenty of community packages that provide ready-to-use worker pool interfaces. However, the core principles behind implementing this pattern remain consistent. Let's dive deeper into the inner workings of a worker pool and uncover how it truly operates behind the scenes.

Simple Scenario

Let's define a simple Task struct. Each Task has an Id, which helps us identify which task is being processed. The Execute method is where the task's work gets done. In this example, it simply prints the task's Id and then simulates a delay for 3 seconds to represent a time-consuming operation.

pkg/pool/pool.go
type Task struct {
	Id int
}
 
func (t Task) Execute() {
	fmt.Printf("processing task id: %d\n", t.Id)
	time.Sleep(3 * time.Second)
}

Setting Up the Pool

Next we'll create the main entity that manages our worker and tasks, called WorkerPool. It will carry:

  • tasks: a slice that holds all the tasks that need to be processed.
  • queue: a buffered channel that acts as a queue for distributing tasks to the workers.
  • count: the number of workers (goroutines) that will be processing the tasks concurrently.
  • wg: a sync.WaitGroup that ensures the main program waits until all tasks are finished before it exits.
pkg/pool/pool.go
type WorkerPool struct {
	tasks []Task
	queue chan Task
	count int
	wg    sync.WaitGroup
}

We also can create a function New that initializes a WorkerPool. It takes the number of workers and the list of tasks, and sets up the channel (queue) with a buffer size equal to the number of tasks. This ensures that the tasks are all queued up before processing starts.

pkg/pool/pool.go
func New(workerCount int, tasks []Task) WorkerPool {
	return WorkerPool{
		tasks: tasks,
		count: workerCount,
		queue: make(chan Task, len(tasks)),
	}
}

Task Processing

Then create a process method where each worker waits for tasks to arrive in the queue. When a task arrives, the worker grabs it and calls task.Execute() to process it. After processing, the worker signals that the task is complete by calling p.wg.Done().

pkg/pool/pool.go
func (p *WorkerPool) process() {
	for task := range p.queue {
		task.Execute()
		p.wg.Done()
	}
}

This method runs in a loop where the worker continues to pull tasks from the queue until the channel is closed, where it's indicate there are no more tasks to process.

Spinning Up Workers

Start method will responsible for launching the workers. Here's how it works:

  • We launch p.count workers as goroutines, each running the process method to handle tasks.
  • We add the total number of tasks to the sync.WaitGroup with p.wg.Add(len(p.tasks)), so the main function knows when all tasks are complete.
  • We send each task into the queue, and once all tasks are queued, we close the channel to indicate that no more tasks will be added.
  • Finally, we call p.wg.Wait() to block until all tasks are finished.
pkg/pool/pool.go
func (p *WorkerPool) Start() {
	for i := 0; i < p.count; i++ {
		go p.Process()
	}
 
	p.wg.Add(len(p.tasks))
	for _, task := range p.tasks {
		p.queue <- task
	}
 
	close(p.queue)
 
	p.wg.Wait()
}

Bringing It All Together

In the main function, we create 20 tasks with unique Ids, instantiate a WorkerPool with 3 workers, and call Start to begin processing the tasks concurrently.

cmd/main.go
func main() {
	tasks := make([]pool.Task, 23)
	for i := 0; i < 23; i++ {
		tasks[i] = pool.Task{Id: i + 1}
	}
 
	p := pool.New(3, tasks)
	p.Start()
 
	fmt.Println("all task has been proceeed")
}
 

Wrapping It Up

Implementing the worker pool pattern may seem straightforward, but it's a powerful strategy for managing concurrent processes in large-scale Go applications. By controlling the number of active goroutines, it ensures better resource management, improves performance, and enhances system stability. This approach not only solves immediate concurrency issues but also provides a solid foundation for building more efficient, scalable systems as your application grows.

Built with Next.js, MDX, Tailwind and Vercel