...
Run Format

Source file src/runtime/sema.go

Documentation: runtime

  // Copyright 2009 The Go Authors. All rights reserved.
  // Use of this source code is governed by a BSD-style
  // license that can be found in the LICENSE file.
  
  // Semaphore implementation exposed to Go.
  // Intended use is provide a sleep and wakeup
  // primitive that can be used in the contended case
  // of other synchronization primitives.
  // Thus it targets the same goal as Linux's futex,
  // but it has much simpler semantics.
  //
  // That is, don't think of these as semaphores.
  // Think of them as a way to implement sleep and wakeup
  // such that every sleep is paired with a single wakeup,
  // even if, due to races, the wakeup happens before the sleep.
  //
  // See Mullender and Cox, ``Semaphores in Plan 9,''
  // http://swtch.com/semaphore.pdf
  
  package runtime
  
  import (
  	"runtime/internal/atomic"
  	"runtime/internal/sys"
  	"unsafe"
  )
  
  // Asynchronous semaphore for sync.Mutex.
  
  // A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
  // Each of those sudog may in turn point (through s.waitlink) to a list
  // of other sudogs waiting on the same address.
  // The operations on the inner lists of sudogs with the same address
  // are all O(1). The scanning of the top-level semaRoot list is O(log n),
  // where n is the number of distinct addresses with goroutines blocked
  // on them that hash to the given semaRoot.
  // See golang.org/issue/17953 for a program that worked badly
  // before we introduced the second level of list, and test/locklinear.go
  // for a test that exercises this.
  type semaRoot struct {
  	lock  mutex
  	treap *sudog // root of balanced tree of unique waiters.
  	nwait uint32 // Number of waiters. Read w/o the lock.
  }
  
  // Prime to not correlate with any user patterns.
  const semTabSize = 251
  
  var semtable [semTabSize]struct {
  	root semaRoot
  	pad  [sys.CacheLineSize - unsafe.Sizeof(semaRoot{})]byte
  }
  
  //go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
  func sync_runtime_Semacquire(addr *uint32) {
  	semacquire1(addr, false, semaBlockProfile)
  }
  
  //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
  func poll_runtime_Semacquire(addr *uint32) {
  	semacquire1(addr, false, semaBlockProfile)
  }
  
  //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
  func sync_runtime_Semrelease(addr *uint32, handoff bool) {
  	semrelease1(addr, handoff)
  }
  
  //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
  func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) {
  	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile)
  }
  
  //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
  func poll_runtime_Semrelease(addr *uint32) {
  	semrelease(addr)
  }
  
  func readyWithTime(s *sudog, traceskip int) {
  	if s.releasetime != 0 {
  		s.releasetime = cputicks()
  	}
  	goready(s.g, traceskip)
  }
  
  type semaProfileFlags int
  
  const (
  	semaBlockProfile semaProfileFlags = 1 << iota
  	semaMutexProfile
  )
  
  // Called from runtime.
  func semacquire(addr *uint32) {
  	semacquire1(addr, false, 0)
  }
  
  func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) {
  	gp := getg()
  	if gp != gp.m.curg {
  		throw("semacquire not on the G stack")
  	}
  
  	// Easy case.
  	if cansemacquire(addr) {
  		return
  	}
  
  	// Harder case:
  	//	increment waiter count
  	//	try cansemacquire one more time, return if succeeded
  	//	enqueue itself as a waiter
  	//	sleep
  	//	(waiter descriptor is dequeued by signaler)
  	s := acquireSudog()
  	root := semroot(addr)
  	t0 := int64(0)
  	s.releasetime = 0
  	s.acquiretime = 0
  	s.ticket = 0
  	if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
  		t0 = cputicks()
  		s.releasetime = -1
  	}
  	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
  		if t0 == 0 {
  			t0 = cputicks()
  		}
  		s.acquiretime = t0
  	}
  	for {
  		lock(&root.lock)
  		// Add ourselves to nwait to disable "easy case" in semrelease.
  		atomic.Xadd(&root.nwait, 1)
  		// Check cansemacquire to avoid missed wakeup.
  		if cansemacquire(addr) {
  			atomic.Xadd(&root.nwait, -1)
  			unlock(&root.lock)
  			break
  		}
  		// Any semrelease after the cansemacquire knows we're waiting
  		// (we set nwait above), so go to sleep.
  		root.queue(addr, s, lifo)
  		goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
  		if s.ticket != 0 || cansemacquire(addr) {
  			break
  		}
  	}
  	if s.releasetime > 0 {
  		blockevent(s.releasetime-t0, 3)
  	}
  	releaseSudog(s)
  }
  
  func semrelease(addr *uint32) {
  	semrelease1(addr, false)
  }
  
  func semrelease1(addr *uint32, handoff bool) {
  	root := semroot(addr)
  	atomic.Xadd(addr, 1)
  
  	// Easy case: no waiters?
  	// This check must happen after the xadd, to avoid a missed wakeup
  	// (see loop in semacquire).
  	if atomic.Load(&root.nwait) == 0 {
  		return
  	}
  
  	// Harder case: search for a waiter and wake it.
  	lock(&root.lock)
  	if atomic.Load(&root.nwait) == 0 {
  		// The count is already consumed by another goroutine,
  		// so no need to wake up another goroutine.
  		unlock(&root.lock)
  		return
  	}
  	s, t0 := root.dequeue(addr)
  	if s != nil {
  		atomic.Xadd(&root.nwait, -1)
  	}
  	unlock(&root.lock)
  	if s != nil { // May be slow, so unlock first
  		acquiretime := s.acquiretime
  		if acquiretime != 0 {
  			mutexevent(t0-acquiretime, 3)
  		}
  		if s.ticket != 0 {
  			throw("corrupted semaphore ticket")
  		}
  		if handoff && cansemacquire(addr) {
  			s.ticket = 1
  		}
  		readyWithTime(s, 5)
  	}
  }
  
  func semroot(addr *uint32) *semaRoot {
  	return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
  }
  
  func cansemacquire(addr *uint32) bool {
  	for {
  		v := atomic.Load(addr)
  		if v == 0 {
  			return false
  		}
  		if atomic.Cas(addr, v, v-1) {
  			return true
  		}
  	}
  }
  
  // queue adds s to the blocked goroutines in semaRoot.
  func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
  	s.g = getg()
  	s.elem = unsafe.Pointer(addr)
  	s.next = nil
  	s.prev = nil
  
  	var last *sudog
  	pt := &root.treap
  	for t := *pt; t != nil; t = *pt {
  		if t.elem == unsafe.Pointer(addr) {
  			// Already have addr in list.
  			if lifo {
  				// Substitute s in t's place in treap.
  				*pt = s
  				s.ticket = t.ticket
  				s.acquiretime = t.acquiretime
  				s.parent = t.parent
  				s.prev = t.prev
  				s.next = t.next
  				if s.prev != nil {
  					s.prev.parent = s
  				}
  				if s.next != nil {
  					s.next.parent = s
  				}
  				// Add t first in s's wait list.
  				s.waitlink = t
  				s.waittail = t.waittail
  				if s.waittail == nil {
  					s.waittail = t
  				}
  				t.parent = nil
  				t.prev = nil
  				t.next = nil
  				t.waittail = nil
  			} else {
  				// Add s to end of t's wait list.
  				if t.waittail == nil {
  					t.waitlink = s
  				} else {
  					t.waittail.waitlink = s
  				}
  				t.waittail = s
  				s.waitlink = nil
  			}
  			return
  		}
  		last = t
  		if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {
  			pt = &t.prev
  		} else {
  			pt = &t.next
  		}
  	}
  
  	// Add s as new leaf in tree of unique addrs.
  	// The balanced tree is a treap using ticket as the random heap priority.
  	// That is, it is a binary tree ordered according to the elem addresses,
  	// but then among the space of possible binary trees respecting those
  	// addresses, it is kept balanced on average by maintaining a heap ordering
  	// on the ticket: s.ticket <= both s.prev.ticket and s.next.ticket.
  	// https://en.wikipedia.org/wiki/Treap
  	// http://faculty.washington.edu/aragon/pubs/rst89.pdf
  	s.ticket = fastrand()
  	s.parent = last
  	*pt = s
  
  	// Rotate up into tree according to ticket (priority).
  	for s.parent != nil && s.parent.ticket > s.ticket {
  		if s.parent.prev == s {
  			root.rotateRight(s.parent)
  		} else {
  			if s.parent.next != s {
  				panic("semaRoot queue")
  			}
  			root.rotateLeft(s.parent)
  		}
  	}
  }
  
  // dequeue searches for and finds the first goroutine
  // in semaRoot blocked on addr.
  // If the sudog was being profiled, dequeue returns the time
  // at which it was woken up as now. Otherwise now is 0.
  func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
  	ps := &root.treap
  	s := *ps
  	for ; s != nil; s = *ps {
  		if s.elem == unsafe.Pointer(addr) {
  			goto Found
  		}
  		if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) {
  			ps = &s.prev
  		} else {
  			ps = &s.next
  		}
  	}
  	return nil, 0
  
  Found:
  	now = int64(0)
  	if s.acquiretime != 0 {
  		now = cputicks()
  	}
  	if t := s.waitlink; t != nil {
  		// Substitute t, also waiting on addr, for s in root tree of unique addrs.
  		*ps = t
  		t.ticket = s.ticket
  		t.parent = s.parent
  		t.prev = s.prev
  		if t.prev != nil {
  			t.prev.parent = t
  		}
  		t.next = s.next
  		if t.next != nil {
  			t.next.parent = t
  		}
  		if t.waitlink != nil {
  			t.waittail = s.waittail
  		} else {
  			t.waittail = nil
  		}
  		t.acquiretime = now
  		s.waitlink = nil
  		s.waittail = nil
  	} else {
  		// Rotate s down to be leaf of tree for removal, respecting priorities.
  		for s.next != nil || s.prev != nil {
  			if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
  				root.rotateRight(s)
  			} else {
  				root.rotateLeft(s)
  			}
  		}
  		// Remove s, now a leaf.
  		if s.parent != nil {
  			if s.parent.prev == s {
  				s.parent.prev = nil
  			} else {
  				s.parent.next = nil
  			}
  		} else {
  			root.treap = nil
  		}
  	}
  	s.parent = nil
  	s.elem = nil
  	s.next = nil
  	s.prev = nil
  	s.ticket = 0
  	return s, now
  }
  
  // rotateLeft rotates the tree rooted at node x.
  // turning (x a (y b c)) into (y (x a b) c).
  func (root *semaRoot) rotateLeft(x *sudog) {
  	// p -> (x a (y b c))
  	p := x.parent
  	a, y := x.prev, x.next
  	b, c := y.prev, y.next
  
  	y.prev = x
  	x.parent = y
  	y.next = c
  	if c != nil {
  		c.parent = y
  	}
  	x.prev = a
  	if a != nil {
  		a.parent = x
  	}
  	x.next = b
  	if b != nil {
  		b.parent = x
  	}
  
  	y.parent = p
  	if p == nil {
  		root.treap = y
  	} else if p.prev == x {
  		p.prev = y
  	} else {
  		if p.next != x {
  			throw("semaRoot rotateLeft")
  		}
  		p.next = y
  	}
  }
  
  // rotateRight rotates the tree rooted at node y.
  // turning (y (x a b) c) into (x a (y b c)).
  func (root *semaRoot) rotateRight(y *sudog) {
  	// p -> (y (x a b) c)
  	p := y.parent
  	x, c := y.prev, y.next
  	a, b := x.prev, x.next
  
  	x.prev = a
  	if a != nil {
  		a.parent = x
  	}
  	x.next = y
  	y.parent = x
  	y.prev = b
  	if b != nil {
  		b.parent = y
  	}
  	y.next = c
  	if c != nil {
  		c.parent = y
  	}
  
  	x.parent = p
  	if p == nil {
  		root.treap = x
  	} else if p.prev == y {
  		p.prev = x
  	} else {
  		if p.next != y {
  			throw("semaRoot rotateRight")
  		}
  		p.next = x
  	}
  }
  
  // notifyList is a ticket-based notification list used to implement sync.Cond.
  //
  // It must be kept in sync with the sync package.
  type notifyList struct {
  	// wait is the ticket number of the next waiter. It is atomically
  	// incremented outside the lock.
  	wait uint32
  
  	// notify is the ticket number of the next waiter to be notified. It can
  	// be read outside the lock, but is only written to with lock held.
  	//
  	// Both wait & notify can wrap around, and such cases will be correctly
  	// handled as long as their "unwrapped" difference is bounded by 2^31.
  	// For this not to be the case, we'd need to have 2^31+ goroutines
  	// blocked on the same condvar, which is currently not possible.
  	notify uint32
  
  	// List of parked waiters.
  	lock mutex
  	head *sudog
  	tail *sudog
  }
  
  // less checks if a < b, considering a & b running counts that may overflow the
  // 32-bit range, and that their "unwrapped" difference is always less than 2^31.
  func less(a, b uint32) bool {
  	return int32(a-b) < 0
  }
  
  // notifyListAdd adds the caller to a notify list such that it can receive
  // notifications. The caller must eventually call notifyListWait to wait for
  // such a notification, passing the returned ticket number.
  //go:linkname notifyListAdd sync.runtime_notifyListAdd
  func notifyListAdd(l *notifyList) uint32 {
  	// This may be called concurrently, for example, when called from
  	// sync.Cond.Wait while holding a RWMutex in read mode.
  	return atomic.Xadd(&l.wait, 1) - 1
  }
  
  // notifyListWait waits for a notification. If one has been sent since
  // notifyListAdd was called, it returns immediately. Otherwise, it blocks.
  //go:linkname notifyListWait sync.runtime_notifyListWait
  func notifyListWait(l *notifyList, t uint32) {
  	lock(&l.lock)
  
  	// Return right away if this ticket has already been notified.
  	if less(t, l.notify) {
  		unlock(&l.lock)
  		return
  	}
  
  	// Enqueue itself.
  	s := acquireSudog()
  	s.g = getg()
  	s.ticket = t
  	s.releasetime = 0
  	t0 := int64(0)
  	if blockprofilerate > 0 {
  		t0 = cputicks()
  		s.releasetime = -1
  	}
  	if l.tail == nil {
  		l.head = s
  	} else {
  		l.tail.next = s
  	}
  	l.tail = s
  	goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3)
  	if t0 != 0 {
  		blockevent(s.releasetime-t0, 2)
  	}
  	releaseSudog(s)
  }
  
  // notifyListNotifyAll notifies all entries in the list.
  //go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
  func notifyListNotifyAll(l *notifyList) {
  	// Fast-path: if there are no new waiters since the last notification
  	// we don't need to acquire the lock.
  	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
  		return
  	}
  
  	// Pull the list out into a local variable, waiters will be readied
  	// outside the lock.
  	lock(&l.lock)
  	s := l.head
  	l.head = nil
  	l.tail = nil
  
  	// Update the next ticket to be notified. We can set it to the current
  	// value of wait because any previous waiters are already in the list
  	// or will notice that they have already been notified when trying to
  	// add themselves to the list.
  	atomic.Store(&l.notify, atomic.Load(&l.wait))
  	unlock(&l.lock)
  
  	// Go through the local list and ready all waiters.
  	for s != nil {
  		next := s.next
  		s.next = nil
  		readyWithTime(s, 4)
  		s = next
  	}
  }
  
  // notifyListNotifyOne notifies one entry in the list.
  //go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
  func notifyListNotifyOne(l *notifyList) {
  	// Fast-path: if there are no new waiters since the last notification
  	// we don't need to acquire the lock at all.
  	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
  		return
  	}
  
  	lock(&l.lock)
  
  	// Re-check under the lock if we need to do anything.
  	t := l.notify
  	if t == atomic.Load(&l.wait) {
  		unlock(&l.lock)
  		return
  	}
  
  	// Update the next notify ticket number.
  	atomic.Store(&l.notify, t+1)
  
  	// Try to find the g that needs to be notified.
  	// If it hasn't made it to the list yet we won't find it,
  	// but it won't park itself once it sees the new notify number.
  	//
  	// This scan looks linear but essentially always stops quickly.
  	// Because g's queue separately from taking numbers,
  	// there may be minor reorderings in the list, but we
  	// expect the g we're looking for to be near the front.
  	// The g has others in front of it on the list only to the
  	// extent that it lost the race, so the iteration will not
  	// be too long. This applies even when the g is missing:
  	// it hasn't yet gotten to sleep and has lost the race to
  	// the (few) other g's that we find on the list.
  	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
  		if s.ticket == t {
  			n := s.next
  			if p != nil {
  				p.next = n
  			} else {
  				l.head = n
  			}
  			if n == nil {
  				l.tail = p
  			}
  			unlock(&l.lock)
  			s.next = nil
  			readyWithTime(s, 4)
  			return
  		}
  	}
  	unlock(&l.lock)
  }
  
  //go:linkname notifyListCheck sync.runtime_notifyListCheck
  func notifyListCheck(sz uintptr) {
  	if sz != unsafe.Sizeof(notifyList{}) {
  		print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")
  		throw("bad notifyList size")
  	}
  }
  
  //go:linkname sync_nanotime sync.runtime_nanotime
  func sync_nanotime() int64 {
  	return nanotime()
  }
  

View as plain text