Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x/sync/errgroup: add TryGo and SetLimit to control concurrency #27837

Closed
kurin opened this issue Sep 25, 2018 · 47 comments
Closed

x/sync/errgroup: add TryGo and SetLimit to control concurrency #27837

kurin opened this issue Sep 25, 2018 · 47 comments
Labels
FrozenDueToAge NeedsInvestigation Someone must examine and confirm this is a valid issue and not a duplicate of an existing one. Proposal Proposal-Accepted Proposal-FinalCommentPeriod
Milestone

Comments

@kurin
Copy link

kurin commented Sep 25, 2018

The errgroup package will currently spawn a new goroutine for each invocation of Group.Go. This is usually fine, but extremely high cardinality fanout can exhaust memory or other resources. It would be neat if the errgroup interface allowed users to specify the maximum number of concurrent goroutines they want the errgroup to spawn.

Proposal

type Group struct {
  N int
  // contains etc
}

N would be copied to an unexported on the first invocation of Go, so that subsequent modification has no effect. This preserves the validity and the behavior of the empty Group.

When calling Go, if the number of functions running is > N then Go would block until the number was <= N.

The behavior of Go is not otherwise modified; if a subtask returns an error, then subsequent tasks will still be executed, and callers would rely on subtasks handling context cancellation to fall through to the Wait() call and then return, if WithContext was called.

Alternatives considered

An alternative interface would be that Go never block, but enqueue instead. This is an unbounded queue and I'm not a fan.

Another alternative is that the group is context-aware, and that Go return immediately if the group's context is cancelled. This requires that Group retain a reference to the context, which it does not currently do.

@gopherbot gopherbot added this to the Unreleased milestone Sep 25, 2018
@mdlayher
Copy link
Member

/cc @bcmills who recently was thinking about some changes to this package IIRC

@kevinburke
Copy link
Contributor

In the meantime I'd suggest using a buffered channel before calling group.Go() and releasing it when the function returns, or using a package like github.com/kevinburke/semaphore to acquire resources before starting a goroutine.

@bcmills
Copy link
Contributor

bcmills commented Jan 10, 2019

There is a draft API In slide 119 (in the backup slides) of my GopherCon 2018 talk, Rethinking Classical Concurrency Patterns.

I agree that the Go method should block until it can begin executing the function, not enqueue: enqueuing tasks to a bounded executor is much too prone to deadlocks.

I propose a new TryGo method as a non-blocking alternative. (A non-blocking variant is mostly useful for “concurrency-saturating” operations like tree or graph traversals, where you want to keep the number of concurrent workers as high as possible but can fall back to sequential operation when saturated.)

I would rather have a SetLimit method than an exported field: that way we can more easily enforce invariants like “the limit must not be modified while goroutines are running”.

@bcmills bcmills added the NeedsInvestigation Someone must examine and confirm this is a valid issue and not a duplicate of an existing one. label Jan 10, 2019
@fatih
Copy link
Member

fatih commented Jul 20, 2019

I also needed something similar and combined it with golang.org/x/sync/semaphore. Here is an example on how I'm using it. It limits the number of simultaneous execution based on the variable maxWorkers:

func main() {
	const maxWorkers = 5
	sem := semaphore.NewWeighted(maxWorkers)

	g, ctx := errgroup.WithContext(context.Background())
	for i := 0; i < 50; i++ {
		i := i
		fmt.Printf("executing %d\n", i)

		g.Go(func() error {
			err := sem.Acquire(ctx, 1)
			if err != nil {
				return err
			}
			defer sem.Release(1)

			// do work
			time.Sleep(1 * time.Second)
			fmt.Printf("finished %+v\n", i)
			return nil
		})
	}

	if err := g.Wait(); err != nil {
		fmt.Printf("g.Wait() err = %+v\n", err)
	}

	fmt.Println("done!")
}

If anything in this approach wrong please let me know. Seems like it works fine based on the debug statements.

@alexaandru
Copy link

@fatih I would personally put the Acquire() outside/in front of the goroutine. The way you have it, it does NOT prevent the launching of 50 simultaneous goroutines, it only prevents them to actually do their work for more than maxWorkers at a time.

Look at it another way, if instead of 50, you had 1m, what your code does is launch 1m goroutines. Of them, maxWorkers goroutines will actuall do the work (well, in this case sleep), while 1m - maxWorkers of them will ALL attempt to acquire the lock (that sits behind the semaphore abstraction).

All the best!

@fatih
Copy link
Member

fatih commented Jul 25, 2019

@alexaandru thanks for the tip! You're right about that. I've fixed that actually on my end (https://twitter.com/fatih/status/1152991683870633985 and https://play.golang.org/p/h2yfBVC8IjB) but I forgot to update it here.

@alexaandru
Copy link

You're most welcome @fatih ! Cheers! :)

@tschaub
Copy link

tschaub commented Apr 9, 2020

Another subtle issue that ideally would be solved by having an errgroup with a limit is that it is very easy to write code using errgroup and semaphore that swallows significant errors and instead returns only context.Cancelled.

For example, it might be non-obvious that the work function below returns context.Cancelled instead of errors.New("important message here"):

const (
	maxWorkers = 10
	numTasks   = 1e6
)

func work() error {
	group, ctx := errgroup.WithContext(context.Background())
	sem := semaphore.NewWeighted(maxWorkers)

	for i := 0; i < numTasks; i++ {
		if err := sem.Acquire(ctx, 1); err != nil {
			return err
		}

		group.Go(func() error {
			defer sem.Release(1)

			time.Sleep(1 * time.Second)
			if rand.Float64() > 0.5 {
				return errors.New("important message here")
			}

			return nil
		})
	}

	return group.Wait()
}

The code can be fixed with something like this, but it is easy to forget

diff --git a/main.go b/main.go
index 7690b92..9f64dbc 100644
--- a/main.go
+++ b/main.go
@@ -21,6 +21,10 @@ func work() error {
        sem := semaphore.NewWeighted(maxWorkers)
 
        for i := 0; i < numTasks; i++ {
+               if ctx.Err() != nil {
+                       break
+               }
+
                if err := sem.Acquire(ctx, 1); err != nil {
                        return err
                }

@bcmills
Copy link
Contributor

bcmills commented Apr 10, 2020

@tschaub, note that in general anything that may produce an error as a result of errgroup cancellation should be run within the errgroup itself.

So that example would probably be clearer as:

const (
	maxWorkers = 10
	numTasks   = 1e6
)

func work() error {
	group, ctx := errgroup.WithContext(context.Background())
+
+	group.Go(func() error {
		sem := semaphore.NewWeighted(maxWorkers)

		for i := 0; i < numTasks; i++ {
			if err := sem.Acquire(ctx, 1); err != nil {
				return err
			}

			group.Go(func() error {
				defer sem.Release(1)

				time.Sleep(1 * time.Second)
				if rand.Float64() > 0.5 {
					return errors.New("important message here")
				}

				return nil
			})
		}
+	})

	return group.Wait()
}

@smasher164
Copy link
Member

We came across this use-case today, and used a semaphore channel instead of x/sync/semaphore. But since context is heavily threaded through, we'll probably switch to using x/sync/semaphore.

Regarding the proposed API, SetLimit makes sense with existing errgroup API, but TryGo always succeeds when there is no limit. Would there be a clearer separation with a LimitGroup type, which is instantiated with WithContextLimit?

@cbozen
Copy link

cbozen commented Aug 25, 2021

We came across this use-case today, and used a semaphore channel instead of x/sync/semaphore. But since context is heavily threaded through, we'll probably switch to using x/sync/semaphore.

Regarding the proposed API, SetLimit makes sense with existing errgroup API, but TryGo always succeeds when there is no limit. Would there be a clearer separation with a LimitGroup type, which is instantiated with WithContextLimit?

Hello, I just created a package for that : https://pkg.go.dev/github.com/cboudereau/errgroupsem in order to use it as quick as possible on our side but feel free to discuss in order to merge both versions :). The one with WaitGroup.Go does not offer the same error management in a fail fast way like the Wait() function actually does in errgroup.

@thepudds
Copy link
Contributor

thepudds commented Mar 4, 2022

Some related discussion here, including several people chiming in to comment that they felt the need to implement something similar:

https://mobile.twitter.com/fatih/status/1499722289625063427

@bcmills bcmills changed the title x/sync/errgroup: add functionality to limit the number of simultaneous execution proposal: x/sync/errgroup: add functionality to limit the number of simultaneous execution Mar 4, 2022
@bcmills bcmills modified the milestones: Unreleased, Proposal Mar 4, 2022
@ianlancetaylor ianlancetaylor added this to Incoming in Proposals (old) Mar 9, 2022
@rsc
Copy link
Contributor

rsc commented Mar 16, 2022

@bcmills do you think there is new API that should be added to errgroup along these lines? If so, what is it?

@rsc
Copy link
Contributor

rsc commented Mar 16, 2022

This proposal has been added to the active column of the proposals project
and will now be reviewed at the weekly proposal review meetings.
— rsc for the proposal review group

@bcmills
Copy link
Contributor

bcmills commented Apr 1, 2022

Taking the API I drafted for my GopherCon 2018 talk and adding documentation, I suggest:

package errgroup

// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (*Group) SetLimit(n int)

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group; its error will be returned by Wait.
func (*Group) Go(f func() error)

// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (*Group) TryGo(f func() error) bool

@bcmills
Copy link
Contributor

bcmills commented Apr 1, 2022

With the above API, I have one open question: if the group is already cancelled, should TryGo return false (and avoid starting the goroutine) if the group is already canceled due to an earlier error?

@seh
Copy link
Contributor

seh commented Apr 1, 2022

Should there be a variant of (*Group).Go that accepts a context.Context to limit the amount of time a caller is willing to wait? The proposal here introduces (*Group).TryGo to not wait at all, and (*Group).Go threatens to block for a long time. In between lies "try to start a goroutine but don't try for longer than this Context is incomplete (not done)."

@changkun
Copy link
Member

changkun commented May 7, 2022

I got an implementation concern that this proposal may have a behavior change and break some code. According to the document:

// ...
// A zero Group is valid and does not cancel on error.
type Group struct

Hence we can use errgroup as follows:

g := &errgroup.Group{}

What is the default maximum allowed concurrency? If we add a field to Group and use a negative value to indicate no limit:

type Group struct {
	max int64 // maximum allowed concurrency
	...
}

Using WithContext seems fine:

func WithContext(ctx context.Context) (*Group, context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	return &Group{cancel: cancel, max: -1}, ctx
}

But according to #27837 (comment), and the proposed SetLimit behavior:

// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
// ...
func (*Group) SetLimit(n int)

It seems for users who already have their usage like the following

g := &errgroup.Group{}

will block forever after implementing the concurrency control, because the default max will be 0, and Go will never run a given task. Did I miss anything?

@gopherbot
Copy link

Change https://go.dev/cl/404515 mentions this issue: x/sync/errgroup: add TryGo and SetLimit to control concurrency

@changkun
Copy link
Member

changkun commented May 7, 2022

I pushed CL 404515 and implemented a slightly different behavior to SetLimit for not to break the existing usage (if my assumption in #27837 (comment) is sound).

The difference is:

-// A negative value indicates no limit.
+// Zero indicates no limit.
func (*Group) SetLimit(n int)

@Kisesy
Copy link

Kisesy commented May 7, 2022

func (*Group) SetLimit(n int) *Group better

@ianlancetaylor
Copy link
Contributor

@bcmills How do you feel about SetLimit(0) meaning that there is no limit, per @changkun 's message above? I don't see a use case for setting the limit to 0, so this seems OK to me.

@bcmills
Copy link
Contributor

bcmills commented May 9, 2022

I would rather retain SetLimit(0) as a way to force calls to TryGo to always return false — that seems particularly useful in testing.

To make that work with the zero-value Group, we could either store the limit internally as a uint (offset by +1), or use a separate boolean to indicate whether a limit is in effect.

@changkun
Copy link
Member

changkun commented May 9, 2022

I would rather retain SetLimit(0) as a way to force calls to TryGo to always return false — that seems particularly useful in testing.

Any immediate examples to show how it could be more useful in this case?
SetLimit(1), then all subsequent TryGo after first use will fail if the first TryGo remains blocking. Could this also work the same way?

@bcmills
Copy link
Contributor

bcmills commented May 9, 2022

Any immediate examples to show how it could be more useful in this case?
SetLimit(1), then all subsequent TryGo after first use will fail if the first TryGo remains blocking.

That would require leaking a goroutine for the duration of the test, which is less ergonomic and much more error-prone.

Compare:

	g := new(errgroup.Group)
	g.SetLimit(0)
	use(g)  // All calls to TryGo fail.

vs.

	g := new(errgroup.Group)
	g.SetLimit(1)
	unblock := make(chan struct{})
	g.Go(func() error {
		<-unblock
		return nil
	})
	t.Cleanup(func() {
		close(unblock)
		g.Wait()
	})
	use(g)  // All calls to TryGo fail.

@changkun
Copy link
Member

changkun commented May 9, 2022

That would require leaking a goroutine for the duration of the test, which is less ergonomic and much more error-prone.

Compare:

	g := new(errgroup.Group)
	g.SetLimit(0)
	use(g)  // All calls to TryGo fail.

vs.

	g := new(errgroup.Group)
	g.SetLimit(1)
	unblock := make(chan struct{})
	g.Go(func() error {
		<-unblock
		return nil
	})
	t.Cleanup(func() {
		close(unblock)
		g.Wait()
	})
	use(g)  // All calls to TryGo fail.

Hm. It's true that in this type of comparison, the first case is simpler. However, we are particularly comparing this single use case of SetLimit(0). If we have the behavior of SetLimit(0) as no limit, this type of comparison does not entirely exist. In general, would it be more interesting for us to test, that if n TryGo reaches the limit of SetLimit(n), then the next TryGo expects to fail? This would require us to write approximately the same code as in the second case.

@bcmills
Copy link
Contributor

bcmills commented May 9, 2022

SetLimit(0) is semantically well-defined: 0 is a natural number, and it can represent the actual number of goroutines running in a Group. It may be useful in testing, and may also be useful as a sort of barrier: if (at some future point) we allow SetLimit to be called concurrently with Go, then one could queue up some number of Go calls from background goroutines and then allow them to proceed by setting a nonzero limit.

So I would really rather not define SetLimit(0) to set the limit to anything other than 0 goroutines. It may make the implementation marginally more convenient, but it would add an otherwise-unnecessary exception in the semantics.

(Contrast with, say, runtime.GOMAXPROCS, where setting the number to 0 would necessarily prevent the program from running at all.)

@changkun
Copy link
Member

changkun commented May 9, 2022

SetLimit(0) is semantically well-defined: [...] it can represent the actual number of goroutines running in a Group. It may be useful in testing, and may also be useful as a sort of barrier

A barrier is a bit tricky here. Could calling Go after SetLimit(0) be very difficult to use then? In this case, Go will block forever, and no one can rescue the leak because SetLimit cannot be modified:

// ...
// The limit must not be modified while any goroutines in the group are active.
func (*Group) SetLimit(n int)

It's actually not about implementation convenience but also trying to align the consistency with runtime.GOMAXPROCS: Go after SetLimit(0) will also block forever and prevent the goroutine that tries to schedule a group of calls from running at all. Isn't it?

if (at some future point) we allow SetLimit to be called concurrently with Go, then one could queue up some number of Go calls from background goroutines and then allow them to proceed by setting a nonzero limit.

Should we relax this requirement concerning the above potential misuse, as you briefly described?

@gopherbot
Copy link

Change https://go.dev/cl/405174 mentions this issue: x/sync/errgroup: add TryGo and SetLimit to control concurrency

@changkun
Copy link
Member

changkun commented May 9, 2022

I also pushed CL 405174 as an alternative implementation, which implements the behavior of the original proposal, i.e.:

A negative value indicates no limit.

But the behavior on calling Go with SetLimit(0) is not entirely desired subjectively: https://go-review.googlesource.com/c/sync/+/405174/comments/835bf24e_0b7f9958

@rsc
Copy link
Contributor

rsc commented May 11, 2022

SetLimit(0) should set the limit to 0, to stop all future calls. That can be a useful thing to do.

@changkun
Copy link
Member

@rsc Could you also clarify what happens to a subsequent SetLimit then? Specifically

g := &errgroup.Group{}
g.SetLimit(0)
go func() {
    time.Sleep(time.Second)
    g.SetLimit(1)                 // What happens to this? panic? or?
}()
g.Go(func() error { return nil }) // Block forever?

@rsc rsc moved this from Likely Accept to Accepted in Proposals (old) May 11, 2022
@rsc
Copy link
Contributor

rsc commented May 11, 2022

No change in consensus, so accepted. 🎉
This issue now tracks the work of implementing the proposal.
— rsc for the proposal review group

@rsc rsc changed the title proposal: x/sync/errgroup: add TryGo and SetLimit to control concurrency x/sync/errgroup: add TryGo and SetLimit to control concurrency May 11, 2022
@rsc rsc modified the milestones: Proposal, Backlog May 11, 2022
@bcmills
Copy link
Contributor

bcmills commented May 11, 2022

@changkun, I think for now the second call to g.SetLimit should be reported as a read/write race.

(The call to Go reads the limit, while the call to SetLimit writes it. That leaves open the option to define and implement more permissive behaviors in the future.)

@ianlancetaylor
Copy link
Contributor

Note that we already said above "// The limit must not be modified while any goroutines in the group are active." We don't necessarily need to go out of our way to detect and report the race, although of course it's fine to do that if it is easy.

@changkun
Copy link
Member

changkun commented May 12, 2022

@bcmills @ianlancetaylor Yeah a direct panic might be a better one than reporting race using -race flag.

Nevertheless, I just subjectively feel uncomfortable when I could call a function, and it may block forever silently (and no other way to rescue it)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
FrozenDueToAge NeedsInvestigation Someone must examine and confirm this is a valid issue and not a duplicate of an existing one. Proposal Proposal-Accepted Proposal-FinalCommentPeriod
Projects
No open projects
Development

No branches or pull requests