Errgroup Explained: Understanding Its Inner Workings
We’ve delved into using errgroup to boost the performance of independent tasks, thanks to the robust features it offers, these include:
To get the most out of our discussion, it’s important to have a good grasp of two key concepts in Go programming: errgroup and channels.
Once you’re comfortable with these concepts, you’ll find it much easier to dive into the more advanced stuff we’re going to talk about.
I aim to make the topic more accessible and valuable for you, drawing from my personal experiences and understanding.
1. What is inside the errgroup?
At the heart of the errgroup package lies the Group
struct. Let's take a closer look at its structure:
type Group struct {
cancel func(error)
wg sync.WaitGroup
sem chan token
errOnce sync.Once
err error
}
This struct employs two synchronization techniques: sync.WaitGroup
and sync.Once
.
I’ve covered these in detail in my article ‘6 Key Sync Package Concepts’, but here’s a brief recap:
sync.WaitGroup
: It’s used to wait for a group of goroutines to complete their execution.sync.Once
: This ensures that certain initialization code is executed only once, no matter how many times it’s called. It’s safe to use concurrently.
We also have an error
field, we only capture the first error and ignore any subsequent ones.
The cancel
field is a function and it's used to cancel the context either when the task is completed or when an error occurs.
What really makes this struct special is the sem
channel, which acts as a semaphore which is crucial for controlling the number of goroutines that run at the same time.
2. How it works?
Let’s start with what a semaphore is.
In Go, a semaphore is used to control the number of goroutines that can run at the same time, it’s a tool for managing how busy our program is.
a. Semaphore
The ‘sem’ channel in an errgroup begins with no limits, it’s set as nil, meaning any number of goroutines can run together.
But in many cases, we need to manage this activity to keep our program efficient, that’s where SetLimit(n)
comes in.
This function sets up the 'sem' channel so it only allows 'n' goroutines to run at the same time:
g.sem = make(chan token, n)
This code line transforms the ‘sem’ channel into a buffered channel with a capacity for ’n’ elements (or tokens).
A buffered channel in Go can hold a limited number of values without blocking and the ’n’ here represents the number of goroutines that can be executed simultaneously.
When this buffered channel is full, any attempt to add another goroutine will be blocked. This blockage continues until there is available space in the channel, which happens when one of the running goroutines finishes its task and effectively ‘returns’ its place in the channel.
Let’s now turn our attention to the functions within the Group struct and their role in coordinating these tasks.
b. Go
The Go function in errgroup is essential yet more complex than it may seem, here’s a breakdown of its functionality:
func (g *Group) Go(f func() error) {
// if g.sem is nil, no limit applied
// otherwise, we push a token to the sem channel,
// which will block if the channel is full
// g.wg.Add(1) to add one more goroutine to the wait group
// create a goroutine to run the function f
// defer g.wg.Done() to remove one goroutine from the wait group
// if f return an error, we call g.cancel to cancel the context
// and save the error to g.err
}
When you use g.Go(f)
, several things happen:
Semaphore Check: If
g.sem
is not nil, the function checks the semaphore. If the semaphore's limit has been reached,g.Go(f)
will block until a slot frees up.WaitGroup Management: The function then adds a new goroutine to the WaitGroup (
g.wg.Add(1)
) before creating the goroutine to runf
.Running the Function: The provided function
f
runs in its goroutine. Whenf
completes, whether successfully or with an error, the function marks one less goroutine in the WaitGroup (g.wg.Done()
).Error Handling: If
f
returns an error,g.cancel
is called to cancel the context, and the error is stored ing.err
.
A key point to remember is that g.Go()
doesn’t just queue tasks.
It immediately starts the goroutine for function f
, given there’s space in the semaphore.
Also, it's important to know that g.Go()
can block the initiating goroutine if the semaphore is full, affecting not just g.Wait()
but the g.Go()
function itself.
This aspect is critical in understanding how errgroup manages goroutine execution.
b. Wait
The Wait
function is designed to ensure that all goroutines launched within the errgroup have completed before the program proceeds:
func (g *Group) Wait() error {
g.wg.Wait() // Await all goroutines.
if g.cancel != nil {
g.cancel(g.err)
}
return g.err // Error from goroutines, or nil.
}
This function blocks until g.wg.Wait()
confirms all goroutines have finished.
Then, it invokes g.cancel
, a function that handles the cancellation context, which is called irrespective of whether an error occurred.
Finally, Wait
returns g.err
, which encapsulates any errors from the goroutines but if no errors occurred, g.err
will be nil, indicating successful completion of all goroutines.
c. SetLimit
The SetLimit
function in the Group
type is utilized to initialize the semaphore's sem
channel with n
tokens, this initialization is critical and should be handled with care.
Adjusting the limit while goroutines are active (indicated by existing tokens in the semaphore) will cause the application to panic.
Therefore, it’s imperative to call SetLimit
only before any g.Go()
invocations or after ensuring all goroutines have finished using g.Wait()
.
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}
In cases where n
is set to a negative value, SetLimit
deactivates the semaphore limit by setting g.sem
to nil, allowing an unlimited number of concurrent goroutines.
d. TryGo
The TryGo
function is a solution to prevent the potential blocking issue associated with g.Go()
.
It's useful when adding a goroutine to the group is contingent on the availability of space in the semaphore.
“If the channel is full, won’t it block? You just said sending to a full channel will be blocked”
The implementation of TryGo
cleverly addresses this, it utilizes a select
statement to non-blockingly check the semaphore's capacity:
select {
case g.sem <- token{}:
// Proceed if the semaphore has space.
default:
return false
}
In standard scenarios, sending to a full channel would block the sender.
However, TryGo
leverages the select
statement with a default
case to circumvent this issue. If the semaphore is full, TryGo
doesn't block but returns false instead, signaling the inability to add another goroutine at that moment.
With this understanding, why not take a shot at creating your own errgroup? Here are some features you could consider including:
Gather all errors that occur, not just the first one.
Initiate goroutines only upon calling
errgroup.Wait()
, and execute them in batches (like 10 or 20 at a time).Modify the
SetLimit
function to return an error instead of causing a panic.When the maximum number of active goroutines is reached, have
group.Go
store additional goroutines in a queue to be executed later, rather than blocking us.…
By the way, I’m active on Twitter, so feel free to connect with me there.