New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
proposal: sync: add package sync/workerpool #53044
Comments
It looks like that https://pkg.go.dev/golang.org/x/sync/errgroup has most of the features described here. Have you considered improving https://pkg.go.dev/golang.org/x/sync/errgroup instead of adding a new one? |
Being honest, I didn't know that package existed. My searches related to the worker pool pattern never once surfaced While I admit there is definitely significant feature overlap upon first look, I'm not quite convinced these aren't two distinct use-cases. By my (possibly naïve) understanding of worker pools, a pool simply (and continually) dispatches tasks to a set number of concurrent workers regardless of task outcome, and remains available to run future tasks until the pool is stopped. I believe that Assuming my observations are true, I can see distinct value provided by the approach of |
The That leaves That makes it more similar to |
This actually had crossed my mind, but I decided to punt pending some feedback. One thought was to add a After learning about the existence
I'll need to do some studying of On first look, the ergonomics of par.Queue seem a bit more "clever" than I was going for. The WaitGroup-style Thanks for the feedback! |
I've finally had some time to revisit this and have incorporated feedback from @bcmills (thank you, btw): sync/workerpoolpackage workerpool
type Task interface {
Invoke(ctx context.Context) error
}
type WorkerPool struct {}
func New(n int64) WorkerPool
func (p *WorkerPool) Push(t Task, opts ...TaskOption)
func (p *WorkerPool) Run(ctx context.Context) <-chan struct{} Examplepackage main
import (
"context"
"golang.org/x/sync/workerpool"
"time"
)
func main() {
pool := workerpool.New(2)
ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
defer cancel()
done := wp.Run(ctx) // runs until context is cancelled
// pool.Push(Task 1)
// pool.Push(Task 2)
// block until the workerpool is stopped and done channel is closed
<-done
} I have also updated my reference implementation: https://pkg.go.dev/github.com/tniswong/workerpool/v2@v2.0.0 |
sync/workerpool
This package would provide a standardized concurrent worker pool implementation with a simple task interface.
Example
Reasoning
While there are many overly simplistic examples published on the internet, the problem space gains difficulty quickly when trying to write a more robust custom implementation. I believe the community would benefit greatly by having such a robust implementation widely available in the standard library.
I've written github.com/tniswong/workerpool as a draft design that I offer up as a candidate implementation. This design uses
golang.org/x/sync/semaphore
for bounding the concurrent workers.Design Notes
context.Context
for bothWorkerPool.Run(context.Context)
andTask.Invoke(context.Context)
Push()
is concurrency safePush()
can be supplied options to specify task invocation behavior such as Retry (restart the task if it returned an error) and RetryMax (restart the task unless it returns an error more than n times)Does not invoke queued tasks when context is cancelledWhen Run(ctxt) is ctxt cancelled, remaining jobs are invoked with the cancelled context to clear out the work queue.Run()
runs until the context is cancelledReferences
https://brandur.org/go-worker-pool
https://gobyexample.com/worker-pools
https://itnext.io/explain-to-me-go-concurrency-worker-pool-pattern-like-im-five-e5f1be71e2b0
https://medium.com/code-chasm/go-concurrency-pattern-worker-pool-a437117025b1
https://golangbot.com/buffered-channels-worker-pools/
https://github.com/gammazero/workerpool
https://github.com/alitto/pond
https://github.com/cilium/workerpool
https://github.com/vardius/worker-pool
The text was updated successfully, but these errors were encountered: