...
Run Format

Source file src/sync/waitgroup.go

     1	// Copyright 2011 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package sync
     6	
     7	import (
     8		"internal/race"
     9		"sync/atomic"
    10		"unsafe"
    11	)
    12	
    13	// A WaitGroup waits for a collection of goroutines to finish.
    14	// The main goroutine calls Add to set the number of
    15	// goroutines to wait for. Then each of the goroutines
    16	// runs and calls Done when finished. At the same time,
    17	// Wait can be used to block until all goroutines have finished.
    18	//
    19	// A WaitGroup must not be copied after first use.
    20	type WaitGroup struct {
    21		noCopy noCopy
    22	
    23		// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    24		// 64-bit atomic operations require 64-bit alignment, but 32-bit
    25		// compilers do not ensure it. So we allocate 12 bytes and then use
    26		// the aligned 8 bytes in them as state.
    27		state1 [12]byte
    28		sema   uint32
    29	}
    30	
    31	func (wg *WaitGroup) state() *uint64 {
    32		if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
    33			return (*uint64)(unsafe.Pointer(&wg.state1))
    34		} else {
    35			return (*uint64)(unsafe.Pointer(&wg.state1[4]))
    36		}
    37	}
    38	
    39	// Add adds delta, which may be negative, to the WaitGroup counter.
    40	// If the counter becomes zero, all goroutines blocked on Wait are released.
    41	// If the counter goes negative, Add panics.
    42	//
    43	// Note that calls with a positive delta that occur when the counter is zero
    44	// must happen before a Wait. Calls with a negative delta, or calls with a
    45	// positive delta that start when the counter is greater than zero, may happen
    46	// at any time.
    47	// Typically this means the calls to Add should execute before the statement
    48	// creating the goroutine or other event to be waited for.
    49	// If a WaitGroup is reused to wait for several independent sets of events,
    50	// new Add calls must happen after all previous Wait calls have returned.
    51	// See the WaitGroup example.
    52	func (wg *WaitGroup) Add(delta int) {
    53		statep := wg.state()
    54		if race.Enabled {
    55			_ = *statep // trigger nil deref early
    56			if delta < 0 {
    57				// Synchronize decrements with Wait.
    58				race.ReleaseMerge(unsafe.Pointer(wg))
    59			}
    60			race.Disable()
    61			defer race.Enable()
    62		}
    63		state := atomic.AddUint64(statep, uint64(delta)<<32)
    64		v := int32(state >> 32)
    65		w := uint32(state)
    66		if race.Enabled {
    67			if delta > 0 && v == int32(delta) {
    68				// The first increment must be synchronized with Wait.
    69				// Need to model this as a read, because there can be
    70				// several concurrent wg.counter transitions from 0.
    71				race.Read(unsafe.Pointer(&wg.sema))
    72			}
    73		}
    74		if v < 0 {
    75			panic("sync: negative WaitGroup counter")
    76		}
    77		if w != 0 && delta > 0 && v == int32(delta) {
    78			panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    79		}
    80		if v > 0 || w == 0 {
    81			return
    82		}
    83		// This goroutine has set counter to 0 when waiters > 0.
    84		// Now there can't be concurrent mutations of state:
    85		// - Adds must not happen concurrently with Wait,
    86		// - Wait does not increment waiters if it sees counter == 0.
    87		// Still do a cheap sanity check to detect WaitGroup misuse.
    88		if *statep != state {
    89			panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    90		}
    91		// Reset waiters count to 0.
    92		*statep = 0
    93		for ; w != 0; w-- {
    94			runtime_Semrelease(&wg.sema)
    95		}
    96	}
    97	
    98	// Done decrements the WaitGroup counter.
    99	func (wg *WaitGroup) Done() {
   100		wg.Add(-1)
   101	}
   102	
   103	// Wait blocks until the WaitGroup counter is zero.
   104	func (wg *WaitGroup) Wait() {
   105		statep := wg.state()
   106		if race.Enabled {
   107			_ = *statep // trigger nil deref early
   108			race.Disable()
   109		}
   110		for {
   111			state := atomic.LoadUint64(statep)
   112			v := int32(state >> 32)
   113			w := uint32(state)
   114			if v == 0 {
   115				// Counter is 0, no need to wait.
   116				if race.Enabled {
   117					race.Enable()
   118					race.Acquire(unsafe.Pointer(wg))
   119				}
   120				return
   121			}
   122			// Increment waiters count.
   123			if atomic.CompareAndSwapUint64(statep, state, state+1) {
   124				if race.Enabled && w == 0 {
   125					// Wait must be synchronized with the first Add.
   126					// Need to model this is as a write to race with the read in Add.
   127					// As a consequence, can do the write only for the first waiter,
   128					// otherwise concurrent Waits will race with each other.
   129					race.Write(unsafe.Pointer(&wg.sema))
   130				}
   131				runtime_Semacquire(&wg.sema)
   132				if *statep != 0 {
   133					panic("sync: WaitGroup is reused before previous Wait has returned")
   134				}
   135				if race.Enabled {
   136					race.Enable()
   137					race.Acquire(unsafe.Pointer(wg))
   138				}
   139				return
   140			}
   141		}
   142	}
   143	

View as plain text