...
Run Format

Source file src/runtime/sema.go

Documentation: runtime

     1  // Copyright 2009 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Semaphore implementation exposed to Go.
     6  // Intended use is provide a sleep and wakeup
     7  // primitive that can be used in the contended case
     8  // of other synchronization primitives.
     9  // Thus it targets the same goal as Linux's futex,
    10  // but it has much simpler semantics.
    11  //
    12  // That is, don't think of these as semaphores.
    13  // Think of them as a way to implement sleep and wakeup
    14  // such that every sleep is paired with a single wakeup,
    15  // even if, due to races, the wakeup happens before the sleep.
    16  //
    17  // See Mullender and Cox, ``Semaphores in Plan 9,''
    18  // https://swtch.com/semaphore.pdf
    19  
    20  package runtime
    21  
    22  import (
    23  	"runtime/internal/atomic"
    24  	"runtime/internal/sys"
    25  	"unsafe"
    26  )
    27  
    28  // Asynchronous semaphore for sync.Mutex.
    29  
    30  // A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
    31  // Each of those sudog may in turn point (through s.waitlink) to a list
    32  // of other sudogs waiting on the same address.
    33  // The operations on the inner lists of sudogs with the same address
    34  // are all O(1). The scanning of the top-level semaRoot list is O(log n),
    35  // where n is the number of distinct addresses with goroutines blocked
    36  // on them that hash to the given semaRoot.
    37  // See golang.org/issue/17953 for a program that worked badly
    38  // before we introduced the second level of list, and test/locklinear.go
    39  // for a test that exercises this.
    40  type semaRoot struct {
    41  	lock  mutex
    42  	treap *sudog // root of balanced tree of unique waiters.
    43  	nwait uint32 // Number of waiters. Read w/o the lock.
    44  }
    45  
    46  // Prime to not correlate with any user patterns.
    47  const semTabSize = 251
    48  
    49  var semtable [semTabSize]struct {
    50  	root semaRoot
    51  	pad  [sys.CacheLineSize - unsafe.Sizeof(semaRoot{})]byte
    52  }
    53  
    54  //go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
    55  func sync_runtime_Semacquire(addr *uint32) {
    56  	semacquire1(addr, false, semaBlockProfile)
    57  }
    58  
    59  //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
    60  func poll_runtime_Semacquire(addr *uint32) {
    61  	semacquire1(addr, false, semaBlockProfile)
    62  }
    63  
    64  //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
    65  func sync_runtime_Semrelease(addr *uint32, handoff bool) {
    66  	semrelease1(addr, handoff)
    67  }
    68  
    69  //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
    70  func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) {
    71  	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile)
    72  }
    73  
    74  //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
    75  func poll_runtime_Semrelease(addr *uint32) {
    76  	semrelease(addr)
    77  }
    78  
    79  func readyWithTime(s *sudog, traceskip int) {
    80  	if s.releasetime != 0 {
    81  		s.releasetime = cputicks()
    82  	}
    83  	goready(s.g, traceskip)
    84  }
    85  
    86  type semaProfileFlags int
    87  
    88  const (
    89  	semaBlockProfile semaProfileFlags = 1 << iota
    90  	semaMutexProfile
    91  )
    92  
    93  // Called from runtime.
    94  func semacquire(addr *uint32) {
    95  	semacquire1(addr, false, 0)
    96  }
    97  
    98  func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) {
    99  	gp := getg()
   100  	if gp != gp.m.curg {
   101  		throw("semacquire not on the G stack")
   102  	}
   103  
   104  	// Easy case.
   105  	if cansemacquire(addr) {
   106  		return
   107  	}
   108  
   109  	// Harder case:
   110  	//	increment waiter count
   111  	//	try cansemacquire one more time, return if succeeded
   112  	//	enqueue itself as a waiter
   113  	//	sleep
   114  	//	(waiter descriptor is dequeued by signaler)
   115  	s := acquireSudog()
   116  	root := semroot(addr)
   117  	t0 := int64(0)
   118  	s.releasetime = 0
   119  	s.acquiretime = 0
   120  	s.ticket = 0
   121  	if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
   122  		t0 = cputicks()
   123  		s.releasetime = -1
   124  	}
   125  	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
   126  		if t0 == 0 {
   127  			t0 = cputicks()
   128  		}
   129  		s.acquiretime = t0
   130  	}
   131  	for {
   132  		lock(&root.lock)
   133  		// Add ourselves to nwait to disable "easy case" in semrelease.
   134  		atomic.Xadd(&root.nwait, 1)
   135  		// Check cansemacquire to avoid missed wakeup.
   136  		if cansemacquire(addr) {
   137  			atomic.Xadd(&root.nwait, -1)
   138  			unlock(&root.lock)
   139  			break
   140  		}
   141  		// Any semrelease after the cansemacquire knows we're waiting
   142  		// (we set nwait above), so go to sleep.
   143  		root.queue(addr, s, lifo)
   144  		goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4)
   145  		if s.ticket != 0 || cansemacquire(addr) {
   146  			break
   147  		}
   148  	}
   149  	if s.releasetime > 0 {
   150  		blockevent(s.releasetime-t0, 3)
   151  	}
   152  	releaseSudog(s)
   153  }
   154  
   155  func semrelease(addr *uint32) {
   156  	semrelease1(addr, false)
   157  }
   158  
   159  func semrelease1(addr *uint32, handoff bool) {
   160  	root := semroot(addr)
   161  	atomic.Xadd(addr, 1)
   162  
   163  	// Easy case: no waiters?
   164  	// This check must happen after the xadd, to avoid a missed wakeup
   165  	// (see loop in semacquire).
   166  	if atomic.Load(&root.nwait) == 0 {
   167  		return
   168  	}
   169  
   170  	// Harder case: search for a waiter and wake it.
   171  	lock(&root.lock)
   172  	if atomic.Load(&root.nwait) == 0 {
   173  		// The count is already consumed by another goroutine,
   174  		// so no need to wake up another goroutine.
   175  		unlock(&root.lock)
   176  		return
   177  	}
   178  	s, t0 := root.dequeue(addr)
   179  	if s != nil {
   180  		atomic.Xadd(&root.nwait, -1)
   181  	}
   182  	unlock(&root.lock)
   183  	if s != nil { // May be slow, so unlock first
   184  		acquiretime := s.acquiretime
   185  		if acquiretime != 0 {
   186  			mutexevent(t0-acquiretime, 3)
   187  		}
   188  		if s.ticket != 0 {
   189  			throw("corrupted semaphore ticket")
   190  		}
   191  		if handoff && cansemacquire(addr) {
   192  			s.ticket = 1
   193  		}
   194  		readyWithTime(s, 5)
   195  	}
   196  }
   197  
   198  func semroot(addr *uint32) *semaRoot {
   199  	return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
   200  }
   201  
   202  func cansemacquire(addr *uint32) bool {
   203  	for {
   204  		v := atomic.Load(addr)
   205  		if v == 0 {
   206  			return false
   207  		}
   208  		if atomic.Cas(addr, v, v-1) {
   209  			return true
   210  		}
   211  	}
   212  }
   213  
   214  // queue adds s to the blocked goroutines in semaRoot.
   215  func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
   216  	s.g = getg()
   217  	s.elem = unsafe.Pointer(addr)
   218  	s.next = nil
   219  	s.prev = nil
   220  
   221  	var last *sudog
   222  	pt := &root.treap
   223  	for t := *pt; t != nil; t = *pt {
   224  		if t.elem == unsafe.Pointer(addr) {
   225  			// Already have addr in list.
   226  			if lifo {
   227  				// Substitute s in t's place in treap.
   228  				*pt = s
   229  				s.ticket = t.ticket
   230  				s.acquiretime = t.acquiretime
   231  				s.parent = t.parent
   232  				s.prev = t.prev
   233  				s.next = t.next
   234  				if s.prev != nil {
   235  					s.prev.parent = s
   236  				}
   237  				if s.next != nil {
   238  					s.next.parent = s
   239  				}
   240  				// Add t first in s's wait list.
   241  				s.waitlink = t
   242  				s.waittail = t.waittail
   243  				if s.waittail == nil {
   244  					s.waittail = t
   245  				}
   246  				t.parent = nil
   247  				t.prev = nil
   248  				t.next = nil
   249  				t.waittail = nil
   250  			} else {
   251  				// Add s to end of t's wait list.
   252  				if t.waittail == nil {
   253  					t.waitlink = s
   254  				} else {
   255  					t.waittail.waitlink = s
   256  				}
   257  				t.waittail = s
   258  				s.waitlink = nil
   259  			}
   260  			return
   261  		}
   262  		last = t
   263  		if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {
   264  			pt = &t.prev
   265  		} else {
   266  			pt = &t.next
   267  		}
   268  	}
   269  
   270  	// Add s as new leaf in tree of unique addrs.
   271  	// The balanced tree is a treap using ticket as the random heap priority.
   272  	// That is, it is a binary tree ordered according to the elem addresses,
   273  	// but then among the space of possible binary trees respecting those
   274  	// addresses, it is kept balanced on average by maintaining a heap ordering
   275  	// on the ticket: s.ticket <= both s.prev.ticket and s.next.ticket.
   276  	// https://en.wikipedia.org/wiki/Treap
   277  	// https://faculty.washington.edu/aragon/pubs/rst89.pdf
   278  	//
   279  	// s.ticket compared with zero in couple of places, therefore set lowest bit.
   280  	// It will not affect treap's quality noticeably.
   281  	s.ticket = fastrand() | 1
   282  	s.parent = last
   283  	*pt = s
   284  
   285  	// Rotate up into tree according to ticket (priority).
   286  	for s.parent != nil && s.parent.ticket > s.ticket {
   287  		if s.parent.prev == s {
   288  			root.rotateRight(s.parent)
   289  		} else {
   290  			if s.parent.next != s {
   291  				panic("semaRoot queue")
   292  			}
   293  			root.rotateLeft(s.parent)
   294  		}
   295  	}
   296  }
   297  
   298  // dequeue searches for and finds the first goroutine
   299  // in semaRoot blocked on addr.
   300  // If the sudog was being profiled, dequeue returns the time
   301  // at which it was woken up as now. Otherwise now is 0.
   302  func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
   303  	ps := &root.treap
   304  	s := *ps
   305  	for ; s != nil; s = *ps {
   306  		if s.elem == unsafe.Pointer(addr) {
   307  			goto Found
   308  		}
   309  		if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) {
   310  			ps = &s.prev
   311  		} else {
   312  			ps = &s.next
   313  		}
   314  	}
   315  	return nil, 0
   316  
   317  Found:
   318  	now = int64(0)
   319  	if s.acquiretime != 0 {
   320  		now = cputicks()
   321  	}
   322  	if t := s.waitlink; t != nil {
   323  		// Substitute t, also waiting on addr, for s in root tree of unique addrs.
   324  		*ps = t
   325  		t.ticket = s.ticket
   326  		t.parent = s.parent
   327  		t.prev = s.prev
   328  		if t.prev != nil {
   329  			t.prev.parent = t
   330  		}
   331  		t.next = s.next
   332  		if t.next != nil {
   333  			t.next.parent = t
   334  		}
   335  		if t.waitlink != nil {
   336  			t.waittail = s.waittail
   337  		} else {
   338  			t.waittail = nil
   339  		}
   340  		t.acquiretime = now
   341  		s.waitlink = nil
   342  		s.waittail = nil
   343  	} else {
   344  		// Rotate s down to be leaf of tree for removal, respecting priorities.
   345  		for s.next != nil || s.prev != nil {
   346  			if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
   347  				root.rotateRight(s)
   348  			} else {
   349  				root.rotateLeft(s)
   350  			}
   351  		}
   352  		// Remove s, now a leaf.
   353  		if s.parent != nil {
   354  			if s.parent.prev == s {
   355  				s.parent.prev = nil
   356  			} else {
   357  				s.parent.next = nil
   358  			}
   359  		} else {
   360  			root.treap = nil
   361  		}
   362  	}
   363  	s.parent = nil
   364  	s.elem = nil
   365  	s.next = nil
   366  	s.prev = nil
   367  	s.ticket = 0
   368  	return s, now
   369  }
   370  
   371  // rotateLeft rotates the tree rooted at node x.
   372  // turning (x a (y b c)) into (y (x a b) c).
   373  func (root *semaRoot) rotateLeft(x *sudog) {
   374  	// p -> (x a (y b c))
   375  	p := x.parent
   376  	a, y := x.prev, x.next
   377  	b, c := y.prev, y.next
   378  
   379  	y.prev = x
   380  	x.parent = y
   381  	y.next = c
   382  	if c != nil {
   383  		c.parent = y
   384  	}
   385  	x.prev = a
   386  	if a != nil {
   387  		a.parent = x
   388  	}
   389  	x.next = b
   390  	if b != nil {
   391  		b.parent = x
   392  	}
   393  
   394  	y.parent = p
   395  	if p == nil {
   396  		root.treap = y
   397  	} else if p.prev == x {
   398  		p.prev = y
   399  	} else {
   400  		if p.next != x {
   401  			throw("semaRoot rotateLeft")
   402  		}
   403  		p.next = y
   404  	}
   405  }
   406  
   407  // rotateRight rotates the tree rooted at node y.
   408  // turning (y (x a b) c) into (x a (y b c)).
   409  func (root *semaRoot) rotateRight(y *sudog) {
   410  	// p -> (y (x a b) c)
   411  	p := y.parent
   412  	x, c := y.prev, y.next
   413  	a, b := x.prev, x.next
   414  
   415  	x.prev = a
   416  	if a != nil {
   417  		a.parent = x
   418  	}
   419  	x.next = y
   420  	y.parent = x
   421  	y.prev = b
   422  	if b != nil {
   423  		b.parent = y
   424  	}
   425  	y.next = c
   426  	if c != nil {
   427  		c.parent = y
   428  	}
   429  
   430  	x.parent = p
   431  	if p == nil {
   432  		root.treap = x
   433  	} else if p.prev == y {
   434  		p.prev = x
   435  	} else {
   436  		if p.next != y {
   437  			throw("semaRoot rotateRight")
   438  		}
   439  		p.next = x
   440  	}
   441  }
   442  
   443  // notifyList is a ticket-based notification list used to implement sync.Cond.
   444  //
   445  // It must be kept in sync with the sync package.
   446  type notifyList struct {
   447  	// wait is the ticket number of the next waiter. It is atomically
   448  	// incremented outside the lock.
   449  	wait uint32
   450  
   451  	// notify is the ticket number of the next waiter to be notified. It can
   452  	// be read outside the lock, but is only written to with lock held.
   453  	//
   454  	// Both wait & notify can wrap around, and such cases will be correctly
   455  	// handled as long as their "unwrapped" difference is bounded by 2^31.
   456  	// For this not to be the case, we'd need to have 2^31+ goroutines
   457  	// blocked on the same condvar, which is currently not possible.
   458  	notify uint32
   459  
   460  	// List of parked waiters.
   461  	lock mutex
   462  	head *sudog
   463  	tail *sudog
   464  }
   465  
   466  // less checks if a < b, considering a & b running counts that may overflow the
   467  // 32-bit range, and that their "unwrapped" difference is always less than 2^31.
   468  func less(a, b uint32) bool {
   469  	return int32(a-b) < 0
   470  }
   471  
   472  // notifyListAdd adds the caller to a notify list such that it can receive
   473  // notifications. The caller must eventually call notifyListWait to wait for
   474  // such a notification, passing the returned ticket number.
   475  //go:linkname notifyListAdd sync.runtime_notifyListAdd
   476  func notifyListAdd(l *notifyList) uint32 {
   477  	// This may be called concurrently, for example, when called from
   478  	// sync.Cond.Wait while holding a RWMutex in read mode.
   479  	return atomic.Xadd(&l.wait, 1) - 1
   480  }
   481  
   482  // notifyListWait waits for a notification. If one has been sent since
   483  // notifyListAdd was called, it returns immediately. Otherwise, it blocks.
   484  //go:linkname notifyListWait sync.runtime_notifyListWait
   485  func notifyListWait(l *notifyList, t uint32) {
   486  	lock(&l.lock)
   487  
   488  	// Return right away if this ticket has already been notified.
   489  	if less(t, l.notify) {
   490  		unlock(&l.lock)
   491  		return
   492  	}
   493  
   494  	// Enqueue itself.
   495  	s := acquireSudog()
   496  	s.g = getg()
   497  	s.ticket = t
   498  	s.releasetime = 0
   499  	t0 := int64(0)
   500  	if blockprofilerate > 0 {
   501  		t0 = cputicks()
   502  		s.releasetime = -1
   503  	}
   504  	if l.tail == nil {
   505  		l.head = s
   506  	} else {
   507  		l.tail.next = s
   508  	}
   509  	l.tail = s
   510  	goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
   511  	if t0 != 0 {
   512  		blockevent(s.releasetime-t0, 2)
   513  	}
   514  	releaseSudog(s)
   515  }
   516  
   517  // notifyListNotifyAll notifies all entries in the list.
   518  //go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
   519  func notifyListNotifyAll(l *notifyList) {
   520  	// Fast-path: if there are no new waiters since the last notification
   521  	// we don't need to acquire the lock.
   522  	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
   523  		return
   524  	}
   525  
   526  	// Pull the list out into a local variable, waiters will be readied
   527  	// outside the lock.
   528  	lock(&l.lock)
   529  	s := l.head
   530  	l.head = nil
   531  	l.tail = nil
   532  
   533  	// Update the next ticket to be notified. We can set it to the current
   534  	// value of wait because any previous waiters are already in the list
   535  	// or will notice that they have already been notified when trying to
   536  	// add themselves to the list.
   537  	atomic.Store(&l.notify, atomic.Load(&l.wait))
   538  	unlock(&l.lock)
   539  
   540  	// Go through the local list and ready all waiters.
   541  	for s != nil {
   542  		next := s.next
   543  		s.next = nil
   544  		readyWithTime(s, 4)
   545  		s = next
   546  	}
   547  }
   548  
   549  // notifyListNotifyOne notifies one entry in the list.
   550  //go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
   551  func notifyListNotifyOne(l *notifyList) {
   552  	// Fast-path: if there are no new waiters since the last notification
   553  	// we don't need to acquire the lock at all.
   554  	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
   555  		return
   556  	}
   557  
   558  	lock(&l.lock)
   559  
   560  	// Re-check under the lock if we need to do anything.
   561  	t := l.notify
   562  	if t == atomic.Load(&l.wait) {
   563  		unlock(&l.lock)
   564  		return
   565  	}
   566  
   567  	// Update the next notify ticket number.
   568  	atomic.Store(&l.notify, t+1)
   569  
   570  	// Try to find the g that needs to be notified.
   571  	// If it hasn't made it to the list yet we won't find it,
   572  	// but it won't park itself once it sees the new notify number.
   573  	//
   574  	// This scan looks linear but essentially always stops quickly.
   575  	// Because g's queue separately from taking numbers,
   576  	// there may be minor reorderings in the list, but we
   577  	// expect the g we're looking for to be near the front.
   578  	// The g has others in front of it on the list only to the
   579  	// extent that it lost the race, so the iteration will not
   580  	// be too long. This applies even when the g is missing:
   581  	// it hasn't yet gotten to sleep and has lost the race to
   582  	// the (few) other g's that we find on the list.
   583  	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
   584  		if s.ticket == t {
   585  			n := s.next
   586  			if p != nil {
   587  				p.next = n
   588  			} else {
   589  				l.head = n
   590  			}
   591  			if n == nil {
   592  				l.tail = p
   593  			}
   594  			unlock(&l.lock)
   595  			s.next = nil
   596  			readyWithTime(s, 4)
   597  			return
   598  		}
   599  	}
   600  	unlock(&l.lock)
   601  }
   602  
   603  //go:linkname notifyListCheck sync.runtime_notifyListCheck
   604  func notifyListCheck(sz uintptr) {
   605  	if sz != unsafe.Sizeof(notifyList{}) {
   606  		print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")
   607  		throw("bad notifyList size")
   608  	}
   609  }
   610  
   611  //go:linkname sync_nanotime sync.runtime_nanotime
   612  func sync_nanotime() int64 {
   613  	return nanotime()
   614  }
   615  

View as plain text