Source file src/cmd/go/internal/par/queue.go

     1  // Copyright 2020 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package par
     6  
     7  import "fmt"
     8  
     9  // Queue manages a set of work items to be executed in parallel. The number of
    10  // active work items is limited, and excess items are queued sequentially.
    11  type Queue struct {
    12  	maxActive int
    13  	st        chan queueState
    14  }
    15  
    16  type queueState struct {
    17  	active  int // number of goroutines processing work; always nonzero when len(backlog) > 0
    18  	backlog []func()
    19  	idle    chan struct{} // if non-nil, closed when active becomes 0
    20  }
    21  
    22  // NewQueue returns a Queue that executes up to maxActive items in parallel.
    23  //
    24  // maxActive must be positive.
    25  func NewQueue(maxActive int) *Queue {
    26  	if maxActive < 1 {
    27  		panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive))
    28  	}
    29  
    30  	q := &Queue{
    31  		maxActive: maxActive,
    32  		st:        make(chan queueState, 1),
    33  	}
    34  	q.st <- queueState{}
    35  	return q
    36  }
    37  
    38  // Add adds f as a work item in the queue.
    39  //
    40  // Add returns immediately, but the queue will be marked as non-idle until after
    41  // f (and any subsequently-added work) has completed.
    42  func (q *Queue) Add(f func()) {
    43  	st := <-q.st
    44  	if st.active == q.maxActive {
    45  		st.backlog = append(st.backlog, f)
    46  		q.st <- st
    47  		return
    48  	}
    49  	if st.active == 0 {
    50  		// Mark q as non-idle.
    51  		st.idle = nil
    52  	}
    53  	st.active++
    54  	q.st <- st
    55  
    56  	go func() {
    57  		for {
    58  			f()
    59  
    60  			st := <-q.st
    61  			if len(st.backlog) == 0 {
    62  				if st.active--; st.active == 0 && st.idle != nil {
    63  					close(st.idle)
    64  				}
    65  				q.st <- st
    66  				return
    67  			}
    68  			f, st.backlog = st.backlog[0], st.backlog[1:]
    69  			q.st <- st
    70  		}
    71  	}()
    72  }
    73  
    74  // Idle returns a channel that will be closed when q has no (active or enqueued)
    75  // work outstanding.
    76  func (q *Queue) Idle() <-chan struct{} {
    77  	st := <-q.st
    78  	defer func() { q.st <- st }()
    79  
    80  	if st.idle == nil {
    81  		st.idle = make(chan struct{})
    82  		if st.active == 0 {
    83  			close(st.idle)
    84  		}
    85  	}
    86  
    87  	return st.idle
    88  }
    89  

View as plain text