Concurrency Pattern: Architecting Scalable Systems with Worker Pools
January 27, 2025
Go
Concurrency
Performance
Concurrency in Go
enables efficient parallel execution of tasks using goroutines, which are lightweight and memory-efficient units of work. While running a large number of goroutines can speed up processing, it can also significantly increase memory usage. Hence, it's really important to consider the complexity of the code inside each goroutine, as the complexity of the logic directly impacts resource consumption.
Background
During my previous internship, I was part of a team tasked with managing messaging services for the company. One of the key challenges we faced was efficiently sending messages to a large user base—around 20,000 users. Initially, we designed the process to handle this load by identifying eligible users and then batching the task into smaller chunks. Each chunk was assigned to a goroutine for parallel execution, taking advantage of Go's concurrency model to speed up the process.
However, despite our efforts to optimize the flow, we encountered a significant issue with an excessive memory consumption, which ultimately caused the service to crash with an out of memory error. Upon closer inspection, we realized that the approach of spawning too many goroutines, especially with resource-heavy tasks, overwhelmed the system. This experience highlighted the need for a more efficient method to handle large-scale concurrent operations—one that could better manage resources and ensure system stability under heavy load.
The Need for a Worker Pool
To resolve this, we shifted towards into a concurrency pattern, worker pools
. Instead of spawning an unbounded number of goroutines, a worker pool allows us to control the number of concurrent workers executing tasks.
By limiting the number of workers and queuing tasks, we could ensure that the system remained efficient and resource-friendly, even under heavy concurrency. In previous issue, worker pool pattern helps us manage the distribution of messages while maintaining a balance between parallel execution and resource utilization.
The Hands-On
There are plenty of community packages that provide ready-to-use worker pool interfaces. However, the core principles behind implementing this pattern remain consistent. Let's dive deeper into the inner workings of a worker pool and uncover how it truly operates behind the scenes.
Simple Scenario
Let's define a simple Task
struct. Each Task
has an Id, which helps us identify which task is being processed. The Execute
method is where the task's work gets done. In this example, it simply prints the task's Id and then simulates a delay for 3 seconds to represent a time-consuming operation.
type Task struct {
Id int
}
func (t Task) Execute() {
fmt.Printf("processing task id: %d\n", t.Id)
time.Sleep(3 * time.Second)
}
Setting Up the Pool
Next we'll create the main entity that manages our worker and tasks, called WorkerPool
. It will carry:
- tasks: a slice that holds all the tasks that need to be processed.
- queue: a buffered channel that acts as a queue for distributing tasks to the workers.
- count: the number of workers (goroutines) that will be processing the tasks concurrently.
- wg: a sync.WaitGroup that ensures the main program waits until all tasks are finished before it exits.
type WorkerPool struct {
tasks []Task
queue chan Task
count int
wg sync.WaitGroup
}
We also can create a function New
that initializes a WorkerPool
. It takes the number of workers and the list of tasks, and sets up the channel (queue) with a buffer size equal to the number of tasks. This ensures that the tasks are all queued up before processing starts.
func New(workerCount int, tasks []Task) WorkerPool {
return WorkerPool{
tasks: tasks,
count: workerCount,
queue: make(chan Task, len(tasks)),
}
}
Task Processing
Then create a process
method where each worker waits for tasks to arrive in the queue. When a task arrives, the worker grabs it and calls task.Execute()
to process it. After processing, the worker signals that the task is complete by calling p.wg.Done()
.
func (p *WorkerPool) process() {
for task := range p.queue {
task.Execute()
p.wg.Done()
}
}
This method runs in a loop where the worker continues to pull tasks from the queue until the channel is closed, where it's indicate there are no more tasks to process.
Spinning Up Workers
Start
method will responsible for launching the workers. Here's how it works:
- We launch p.count workers as goroutines, each running the process method to handle tasks.
- We add the total number of tasks to the sync.WaitGroup with
p.wg.Add(len(p.tasks))
, so the main function knows when all tasks are complete. - We send each task into the queue, and once all tasks are queued, we
close
the channel to indicate that no more tasks will be added. - Finally, we call
p.wg.Wait()
to block until all tasks are finished.
func (p *WorkerPool) Start() {
for i := 0; i < p.count; i++ {
go p.Process()
}
p.wg.Add(len(p.tasks))
for _, task := range p.tasks {
p.queue <- task
}
close(p.queue)
p.wg.Wait()
}
Bringing It All Together
In the main function, we create 20 tasks with unique Ids, instantiate a WorkerPool
with 3 workers, and call Start
to begin processing the tasks concurrently.
func main() {
tasks := make([]pool.Task, 23)
for i := 0; i < 23; i++ {
tasks[i] = pool.Task{Id: i + 1}
}
p := pool.New(3, tasks)
p.Start()
fmt.Println("all task has been proceeed")
}
Wrapping It Up
Implementing the worker pool pattern may seem straightforward, but it's a powerful strategy for managing concurrent processes in large-scale Go applications. By controlling the number of active goroutines, it ensures better resource management, improves performance, and enhances system stability. This approach not only solves immediate concurrency issues but also provides a solid foundation for building more efficient, scalable systems as your application grows.