Source file src/cmd/go/internal/par/work.go

     1  // Copyright 2018 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Package par implements parallel execution helpers.
     6  package par
     7  
     8  import (
     9  	"errors"
    10  	"math/rand"
    11  	"sync"
    12  	"sync/atomic"
    13  )
    14  
    15  // Work manages a set of work items to be executed in parallel, at most once each.
    16  // The items in the set must all be valid map keys.
    17  type Work[T comparable] struct {
    18  	f       func(T) // function to run for each item
    19  	running int     // total number of runners
    20  
    21  	mu      sync.Mutex
    22  	added   map[T]bool // items added to set
    23  	todo    []T        // items yet to be run
    24  	wait    sync.Cond  // wait when todo is empty
    25  	waiting int        // number of runners waiting for todo
    26  }
    27  
    28  func (w *Work[T]) init() {
    29  	if w.added == nil {
    30  		w.added = make(map[T]bool)
    31  	}
    32  }
    33  
    34  // Add adds item to the work set, if it hasn't already been added.
    35  func (w *Work[T]) Add(item T) {
    36  	w.mu.Lock()
    37  	w.init()
    38  	if !w.added[item] {
    39  		w.added[item] = true
    40  		w.todo = append(w.todo, item)
    41  		if w.waiting > 0 {
    42  			w.wait.Signal()
    43  		}
    44  	}
    45  	w.mu.Unlock()
    46  }
    47  
    48  // Do runs f in parallel on items from the work set,
    49  // with at most n invocations of f running at a time.
    50  // It returns when everything added to the work set has been processed.
    51  // At least one item should have been added to the work set
    52  // before calling Do (or else Do returns immediately),
    53  // but it is allowed for f(item) to add new items to the set.
    54  // Do should only be used once on a given Work.
    55  func (w *Work[T]) Do(n int, f func(item T)) {
    56  	if n < 1 {
    57  		panic("par.Work.Do: n < 1")
    58  	}
    59  	if w.running >= 1 {
    60  		panic("par.Work.Do: already called Do")
    61  	}
    62  
    63  	w.running = n
    64  	w.f = f
    65  	w.wait.L = &w.mu
    66  
    67  	for i := 0; i < n-1; i++ {
    68  		go w.runner()
    69  	}
    70  	w.runner()
    71  }
    72  
    73  // runner executes work in w until both nothing is left to do
    74  // and all the runners are waiting for work.
    75  // (Then all the runners return.)
    76  func (w *Work[T]) runner() {
    77  	for {
    78  		// Wait for something to do.
    79  		w.mu.Lock()
    80  		for len(w.todo) == 0 {
    81  			w.waiting++
    82  			if w.waiting == w.running {
    83  				// All done.
    84  				w.wait.Broadcast()
    85  				w.mu.Unlock()
    86  				return
    87  			}
    88  			w.wait.Wait()
    89  			w.waiting--
    90  		}
    91  
    92  		// Pick something to do at random,
    93  		// to eliminate pathological contention
    94  		// in case items added at about the same time
    95  		// are most likely to contend.
    96  		i := rand.Intn(len(w.todo))
    97  		item := w.todo[i]
    98  		w.todo[i] = w.todo[len(w.todo)-1]
    99  		w.todo = w.todo[:len(w.todo)-1]
   100  		w.mu.Unlock()
   101  
   102  		w.f(item)
   103  	}
   104  }
   105  
   106  // ErrCache is like Cache except that it also stores
   107  // an error value alongside the cached value V.
   108  type ErrCache[K comparable, V any] struct {
   109  	Cache[K, errValue[V]]
   110  }
   111  
   112  type errValue[V any] struct {
   113  	v   V
   114  	err error
   115  }
   116  
   117  func (c *ErrCache[K, V]) Do(key K, f func() (V, error)) (V, error) {
   118  	v := c.Cache.Do(key, func() errValue[V] {
   119  		v, err := f()
   120  		return errValue[V]{v, err}
   121  	})
   122  	return v.v, v.err
   123  }
   124  
   125  var ErrCacheEntryNotFound = errors.New("cache entry not found")
   126  
   127  // Get returns the cached result associated with key.
   128  // It returns ErrCacheEntryNotFound if there is no such result.
   129  func (c *ErrCache[K, V]) Get(key K) (V, error) {
   130  	v, ok := c.Cache.Get(key)
   131  	if !ok {
   132  		v.err = ErrCacheEntryNotFound
   133  	}
   134  	return v.v, v.err
   135  }
   136  
   137  // Cache runs an action once per key and caches the result.
   138  type Cache[K comparable, V any] struct {
   139  	m sync.Map
   140  }
   141  
   142  type cacheEntry[V any] struct {
   143  	done   atomic.Bool
   144  	mu     sync.Mutex
   145  	result V
   146  }
   147  
   148  // Do calls the function f if and only if Do is being called for the first time with this key.
   149  // No call to Do with a given key returns until the one call to f returns.
   150  // Do returns the value returned by the one call to f.
   151  func (c *Cache[K, V]) Do(key K, f func() V) V {
   152  	entryIface, ok := c.m.Load(key)
   153  	if !ok {
   154  		entryIface, _ = c.m.LoadOrStore(key, new(cacheEntry[V]))
   155  	}
   156  	e := entryIface.(*cacheEntry[V])
   157  	if !e.done.Load() {
   158  		e.mu.Lock()
   159  		if !e.done.Load() {
   160  			e.result = f()
   161  			e.done.Store(true)
   162  		}
   163  		e.mu.Unlock()
   164  	}
   165  	return e.result
   166  }
   167  
   168  // Get returns the cached result associated with key
   169  // and reports whether there is such a result.
   170  //
   171  // If the result for key is being computed, Get does not wait for the computation to finish.
   172  func (c *Cache[K, V]) Get(key K) (V, bool) {
   173  	entryIface, ok := c.m.Load(key)
   174  	if !ok {
   175  		return *new(V), false
   176  	}
   177  	e := entryIface.(*cacheEntry[V])
   178  	if !e.done.Load() {
   179  		return *new(V), false
   180  	}
   181  	return e.result, true
   182  }
   183  
   184  // Clear removes all entries in the cache.
   185  //
   186  // Concurrent calls to Get may return old values. Concurrent calls to Do
   187  // may return old values or store results in entries that have been deleted.
   188  //
   189  // TODO(jayconrod): Delete this after the package cache clearing functions
   190  // in internal/load have been removed.
   191  func (c *Cache[K, V]) Clear() {
   192  	c.m.Range(func(key, value any) bool {
   193  		c.m.Delete(key)
   194  		return true
   195  	})
   196  }
   197  
   198  // Delete removes an entry from the map. It is safe to call Delete for an
   199  // entry that does not exist. Delete will return quickly, even if the result
   200  // for a key is still being computed; the computation will finish, but the
   201  // result won't be accessible through the cache.
   202  //
   203  // TODO(jayconrod): Delete this after the package cache clearing functions
   204  // in internal/load have been removed.
   205  func (c *Cache[K, V]) Delete(key K) {
   206  	c.m.Delete(key)
   207  }
   208  
   209  // DeleteIf calls pred for each key in the map. If pred returns true for a key,
   210  // DeleteIf removes the corresponding entry. If the result for a key is
   211  // still being computed, DeleteIf will remove the entry without waiting for
   212  // the computation to finish. The result won't be accessible through the cache.
   213  //
   214  // TODO(jayconrod): Delete this after the package cache clearing functions
   215  // in internal/load have been removed.
   216  func (c *Cache[K, V]) DeleteIf(pred func(key K) bool) {
   217  	c.m.Range(func(key, _ any) bool {
   218  		if key := key.(K); pred(key) {
   219  			c.Delete(key)
   220  		}
   221  		return true
   222  	})
   223  }
   224  

View as plain text