Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal: sync: add package sync/workerpool #53044

Open
tniswong opened this issue May 24, 2022 · 5 comments
Open

proposal: sync: add package sync/workerpool #53044

tniswong opened this issue May 24, 2022 · 5 comments

Comments

@tniswong
Copy link

tniswong commented May 24, 2022

sync/workerpool

This package would provide a standardized concurrent worker pool implementation with a simple task interface.

package workerpool

type Task interface {
	Invoke(ctx context.Context) error
}

type WorkerPool struct {}

func New(n int64) WorkerPool
func (p *WorkerPool) Push(t Task, opts ...TaskOption)
func (p *WorkerPool) Run(ctx context.Context)
func (p WorkerPool) Wait()

Example

package main

import (
    "context"
    "golang.org/x/sync/workerpool"
)

func main() {

    wp := workerpool.New(2)
    ctx, cancel := context.WithCancel(context.Background())
    
    go wp.Run(ctx) // runs until context is cancelled

    // wp.Push(Task 1)
    // wp.Push(Task 2)

    wp.Wait() // blocks until all pending tasks are complete, but does not stop workerpool goroutine
    cancel() // stops the workerpool

    // wait for the workerpool to be stopped
    select {
    case <-ctx.Done():
    }

}

Reasoning

While there are many overly simplistic examples published on the internet, the problem space gains difficulty quickly when trying to write a more robust custom implementation. I believe the community would benefit greatly by having such a robust implementation widely available in the standard library.

I've written github.com/tniswong/workerpool as a draft design that I offer up as a candidate implementation. This design uses golang.org/x/sync/semaphore for bounding the concurrent workers.

Design Notes

  • Uses context.Context for both WorkerPool.Run(context.Context) and Task.Invoke(context.Context)
  • The task queue has no code-defined size limitations
  • Tasks are responsible for collecting their own results upon completion (if applicable)
  • Push() is concurrency safe
  • Push() can be supplied options to specify task invocation behavior such as Retry (restart the task if it returned an error) and RetryMax (restart the task unless it returns an error more than n times)
  • Does not invoke queued tasks when context is cancelled When Run(ctxt) is ctxt cancelled, remaining jobs are invoked with the cancelled context to clear out the work queue.
  • No hanging worker threads to clean up thanks to the semaphore
  • Run() runs until the context is cancelled

References

https://brandur.org/go-worker-pool
https://gobyexample.com/worker-pools
https://itnext.io/explain-to-me-go-concurrency-worker-pool-pattern-like-im-five-e5f1be71e2b0
https://medium.com/code-chasm/go-concurrency-pattern-worker-pool-a437117025b1
https://golangbot.com/buffered-channels-worker-pools/

https://github.com/gammazero/workerpool
https://github.com/alitto/pond
https://github.com/cilium/workerpool
https://github.com/vardius/worker-pool

@gopherbot gopherbot added this to the Proposal milestone May 24, 2022
@ZekeLu
Copy link
Contributor

ZekeLu commented May 24, 2022

It looks like that https://pkg.go.dev/golang.org/x/sync/errgroup has most of the features described here. Have you considered improving https://pkg.go.dev/golang.org/x/sync/errgroup instead of adding a new one?

@tniswong
Copy link
Author

Being honest, I didn't know that package existed. My searches related to the worker pool pattern never once surfaced sync/errgroup as an option.

While I admit there is definitely significant feature overlap upon first look, I'm not quite convinced these aren't two distinct use-cases.

By my (possibly naïve) understanding of worker pools, a pool simply (and continually) dispatches tasks to a set number of concurrent workers regardless of task outcome, and remains available to run future tasks until the pool is stopped. I believe that sync/errgroup (and I've only briefly studied this package so very possible I'm wrong here), on the other hand, will terminate upon encountering an error by any task, and only remains usable so long as no task has errored.

Assuming my observations are true, I can see distinct value provided by the approach of sync/errgroup and that of this proposal.

@bcmills
Copy link
Contributor

bcmills commented May 24, 2022

The Run method makes this API prone to goroutine leaks and synchronization bugs: it is too easy to accidentally leak the Run goroutine, especially given that it doesn't come with a mechanism to way for that goroutine to finish. (For more detail, see my GopherCon '18 talk, particularly starting around slide 75.)

That leaves New, Push, and Wait, which are analogous to errgroup.Group's SetLimit (#27837), Go, and Wait respectively, but you are correct that errgroup specifically focuses on error aggregation (to facilitate fork/join-style concurrency) whereas the API proposed here intentionally does not (to facilitate reuse).

That makes it more similar to cmd/go/internal/par.Queue, which provides only NewQueue(maxActive int), Add(func()), and Idle() <-chan struct{}. I think that is the more appropriate API here — it has the same concurrency-limiting properties, but without the leak-prone Run method and with a somewhat more flexible way to select on completion.

@ianlancetaylor ianlancetaylor added this to Incoming in Proposals (old) May 24, 2022
@tniswong
Copy link
Author

The Run method makes this API prone to goroutine leaks and synchronization bugs: it is too easy to accidentally leak the Run goroutine, especially given that it doesn't come with a mechanism to way for that goroutine to finish. (For more detail, see my GopherCon '18 talk, particularly starting around slide 75.)

This actually had crossed my mind, but I decided to punt pending some feedback. One thought was to add a func (p WorkerPool) WaitStop() to wait for the pool itself to stop gracefully. Also, this design doesn't require that Run(context.Context) be run as a goroutine, it can absolutely be called normally and will block until the context is cancelled. Tasks can also be added via Push() before the call to Run(context.Context) or from another goroutine, but I'm not sure that alleviates your concerns.

After learning about the existence sync/errgroup, I've been reading through it's history and came across your talk. Will be listening to it tonight. In the mean time, very interested in hearing other ideas on how to combat this.

That leaves New, Push, and Wait, which are analogous to errgroup.Group's SetLimit (#27837), Go, and Wait respectively, but you are correct that errgroup specifically focuses on error aggregation (to facilitate fork/join-style concurrency) whereas the API proposed here intentionally does not (to facilitate reuse).

That makes it more similar to cmd/go/internal/par.Queue, which provides only NewQueue(maxActive int), Add(func()), and Idle() <-chan struct{}. I think that is the more appropriate API here — it has the same concurrency-limiting properties, but without the leak-prone Run method and with a somewhat more flexible way to select on completion.

I'll need to do some studying of par.Queue to competently comment, but my understanding is that it is an internal package, thus not available for use. Are you suggesting making that API commonly available or using it as an influence to modify this proposal's defined API?

On first look, the ergonomics of par.Queue seem a bit more "clever" than I was going for. The WaitGroup-style Wait() feels very clear and natural.

Thanks for the feedback!

@tniswong
Copy link
Author

I've finally had some time to revisit this and have incorporated feedback from @bcmills (thank you, btw):

sync/workerpool

package workerpool

type Task interface {
	Invoke(ctx context.Context) error
}

type WorkerPool struct {}

func New(n int64) WorkerPool
func (p *WorkerPool) Push(t Task, opts ...TaskOption)
func (p *WorkerPool) Run(ctx context.Context) <-chan struct{}

Example

package main

import (
    "context"
    "golang.org/x/sync/workerpool"
    "time"
)

func main() {

    pool := workerpool.New(2)
    ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
    defer cancel()
    
    done := wp.Run(ctx) // runs until context is cancelled

    // pool.Push(Task 1)
    // pool.Push(Task 2)

    // block until the workerpool is stopped and done channel is closed
    <-done

}

I have also updated my reference implementation:

https://pkg.go.dev/github.com/tniswong/workerpool/v2@v2.0.0
https://github.com/tniswong/workerpool/tree/master/v2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Incoming
Development

No branches or pull requests

4 participants