...
Run Format

Source file src/runtime/sema.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// Semaphore implementation exposed to Go.
     6	// Intended use is provide a sleep and wakeup
     7	// primitive that can be used in the contended case
     8	// of other synchronization primitives.
     9	// Thus it targets the same goal as Linux's futex,
    10	// but it has much simpler semantics.
    11	//
    12	// That is, don't think of these as semaphores.
    13	// Think of them as a way to implement sleep and wakeup
    14	// such that every sleep is paired with a single wakeup,
    15	// even if, due to races, the wakeup happens before the sleep.
    16	//
    17	// See Mullender and Cox, ``Semaphores in Plan 9,''
    18	// http://swtch.com/semaphore.pdf
    19	
    20	package runtime
    21	
    22	import (
    23		"runtime/internal/atomic"
    24		"runtime/internal/sys"
    25		"unsafe"
    26	)
    27	
    28	// Asynchronous semaphore for sync.Mutex.
    29	
    30	type semaRoot struct {
    31		lock  mutex
    32		head  *sudog
    33		tail  *sudog
    34		nwait uint32 // Number of waiters. Read w/o the lock.
    35	}
    36	
    37	// Prime to not correlate with any user patterns.
    38	const semTabSize = 251
    39	
    40	var semtable [semTabSize]struct {
    41		root semaRoot
    42		pad  [sys.CacheLineSize - unsafe.Sizeof(semaRoot{})]byte
    43	}
    44	
    45	//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
    46	func sync_runtime_Semacquire(addr *uint32) {
    47		semacquire(addr, semaBlockProfile)
    48	}
    49	
    50	//go:linkname net_runtime_Semacquire net.runtime_Semacquire
    51	func net_runtime_Semacquire(addr *uint32) {
    52		semacquire(addr, semaBlockProfile)
    53	}
    54	
    55	//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
    56	func sync_runtime_Semrelease(addr *uint32) {
    57		semrelease(addr)
    58	}
    59	
    60	//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
    61	func sync_runtime_SemacquireMutex(addr *uint32) {
    62		semacquire(addr, semaBlockProfile|semaMutexProfile)
    63	}
    64	
    65	//go:linkname net_runtime_Semrelease net.runtime_Semrelease
    66	func net_runtime_Semrelease(addr *uint32) {
    67		semrelease(addr)
    68	}
    69	
    70	func readyWithTime(s *sudog, traceskip int) {
    71		if s.releasetime != 0 {
    72			s.releasetime = cputicks()
    73		}
    74		goready(s.g, traceskip)
    75	}
    76	
    77	type semaProfileFlags int
    78	
    79	const (
    80		semaBlockProfile semaProfileFlags = 1 << iota
    81		semaMutexProfile
    82	)
    83	
    84	// Called from runtime.
    85	func semacquire(addr *uint32, profile semaProfileFlags) {
    86		gp := getg()
    87		if gp != gp.m.curg {
    88			throw("semacquire not on the G stack")
    89		}
    90	
    91		// Easy case.
    92		if cansemacquire(addr) {
    93			return
    94		}
    95	
    96		// Harder case:
    97		//	increment waiter count
    98		//	try cansemacquire one more time, return if succeeded
    99		//	enqueue itself as a waiter
   100		//	sleep
   101		//	(waiter descriptor is dequeued by signaler)
   102		s := acquireSudog()
   103		root := semroot(addr)
   104		t0 := int64(0)
   105		s.releasetime = 0
   106		s.acquiretime = 0
   107		if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
   108			t0 = cputicks()
   109			s.releasetime = -1
   110		}
   111		if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
   112			if t0 == 0 {
   113				t0 = cputicks()
   114			}
   115			s.acquiretime = t0
   116		}
   117		for {
   118			lock(&root.lock)
   119			// Add ourselves to nwait to disable "easy case" in semrelease.
   120			atomic.Xadd(&root.nwait, 1)
   121			// Check cansemacquire to avoid missed wakeup.
   122			if cansemacquire(addr) {
   123				atomic.Xadd(&root.nwait, -1)
   124				unlock(&root.lock)
   125				break
   126			}
   127			// Any semrelease after the cansemacquire knows we're waiting
   128			// (we set nwait above), so go to sleep.
   129			root.queue(addr, s)
   130			goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
   131			if cansemacquire(addr) {
   132				break
   133			}
   134		}
   135		if s.releasetime > 0 {
   136			blockevent(s.releasetime-t0, 3)
   137		}
   138		releaseSudog(s)
   139	}
   140	
   141	func semrelease(addr *uint32) {
   142		root := semroot(addr)
   143		atomic.Xadd(addr, 1)
   144	
   145		// Easy case: no waiters?
   146		// This check must happen after the xadd, to avoid a missed wakeup
   147		// (see loop in semacquire).
   148		if atomic.Load(&root.nwait) == 0 {
   149			return
   150		}
   151	
   152		// Harder case: search for a waiter and wake it.
   153		lock(&root.lock)
   154		if atomic.Load(&root.nwait) == 0 {
   155			// The count is already consumed by another goroutine,
   156			// so no need to wake up another goroutine.
   157			unlock(&root.lock)
   158			return
   159		}
   160		s := root.head
   161		for ; s != nil; s = s.next {
   162			if s.elem == unsafe.Pointer(addr) {
   163				atomic.Xadd(&root.nwait, -1)
   164				root.dequeue(s)
   165				break
   166			}
   167		}
   168		if s != nil {
   169			if s.acquiretime != 0 {
   170				t0 := cputicks()
   171				for x := root.head; x != nil; x = x.next {
   172					if x.elem == unsafe.Pointer(addr) {
   173						x.acquiretime = t0
   174						break
   175					}
   176				}
   177				mutexevent(t0-s.acquiretime, 3)
   178			}
   179		}
   180		unlock(&root.lock)
   181		if s != nil { // May be slow, so unlock first
   182			readyWithTime(s, 5)
   183		}
   184	}
   185	
   186	func semroot(addr *uint32) *semaRoot {
   187		return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
   188	}
   189	
   190	func cansemacquire(addr *uint32) bool {
   191		for {
   192			v := atomic.Load(addr)
   193			if v == 0 {
   194				return false
   195			}
   196			if atomic.Cas(addr, v, v-1) {
   197				return true
   198			}
   199		}
   200	}
   201	
   202	func (root *semaRoot) queue(addr *uint32, s *sudog) {
   203		s.g = getg()
   204		s.elem = unsafe.Pointer(addr)
   205		s.next = nil
   206		s.prev = root.tail
   207		if root.tail != nil {
   208			root.tail.next = s
   209		} else {
   210			root.head = s
   211		}
   212		root.tail = s
   213	}
   214	
   215	func (root *semaRoot) dequeue(s *sudog) {
   216		if s.next != nil {
   217			s.next.prev = s.prev
   218		} else {
   219			root.tail = s.prev
   220		}
   221		if s.prev != nil {
   222			s.prev.next = s.next
   223		} else {
   224			root.head = s.next
   225		}
   226		s.elem = nil
   227		s.next = nil
   228		s.prev = nil
   229	}
   230	
   231	// notifyList is a ticket-based notification list used to implement sync.Cond.
   232	//
   233	// It must be kept in sync with the sync package.
   234	type notifyList struct {
   235		// wait is the ticket number of the next waiter. It is atomically
   236		// incremented outside the lock.
   237		wait uint32
   238	
   239		// notify is the ticket number of the next waiter to be notified. It can
   240		// be read outside the lock, but is only written to with lock held.
   241		//
   242		// Both wait & notify can wrap around, and such cases will be correctly
   243		// handled as long as their "unwrapped" difference is bounded by 2^31.
   244		// For this not to be the case, we'd need to have 2^31+ goroutines
   245		// blocked on the same condvar, which is currently not possible.
   246		notify uint32
   247	
   248		// List of parked waiters.
   249		lock mutex
   250		head *sudog
   251		tail *sudog
   252	}
   253	
   254	// less checks if a < b, considering a & b running counts that may overflow the
   255	// 32-bit range, and that their "unwrapped" difference is always less than 2^31.
   256	func less(a, b uint32) bool {
   257		return int32(a-b) < 0
   258	}
   259	
   260	// notifyListAdd adds the caller to a notify list such that it can receive
   261	// notifications. The caller must eventually call notifyListWait to wait for
   262	// such a notification, passing the returned ticket number.
   263	//go:linkname notifyListAdd sync.runtime_notifyListAdd
   264	func notifyListAdd(l *notifyList) uint32 {
   265		// This may be called concurrently, for example, when called from
   266		// sync.Cond.Wait while holding a RWMutex in read mode.
   267		return atomic.Xadd(&l.wait, 1) - 1
   268	}
   269	
   270	// notifyListWait waits for a notification. If one has been sent since
   271	// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
   272	//go:linkname notifyListWait sync.runtime_notifyListWait
   273	func notifyListWait(l *notifyList, t uint32) {
   274		lock(&l.lock)
   275	
   276		// Return right away if this ticket has already been notified.
   277		if less(t, l.notify) {
   278			unlock(&l.lock)
   279			return
   280		}
   281	
   282		// Enqueue itself.
   283		s := acquireSudog()
   284		s.g = getg()
   285		s.ticket = t
   286		s.releasetime = 0
   287		t0 := int64(0)
   288		if blockprofilerate > 0 {
   289			t0 = cputicks()
   290			s.releasetime = -1
   291		}
   292		if l.tail == nil {
   293			l.head = s
   294		} else {
   295			l.tail.next = s
   296		}
   297		l.tail = s
   298		goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3)
   299		if t0 != 0 {
   300			blockevent(s.releasetime-t0, 2)
   301		}
   302		releaseSudog(s)
   303	}
   304	
   305	// notifyListNotifyAll notifies all entries in the list.
   306	//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
   307	func notifyListNotifyAll(l *notifyList) {
   308		// Fast-path: if there are no new waiters since the last notification
   309		// we don't need to acquire the lock.
   310		if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
   311			return
   312		}
   313	
   314		// Pull the list out into a local variable, waiters will be readied
   315		// outside the lock.
   316		lock(&l.lock)
   317		s := l.head
   318		l.head = nil
   319		l.tail = nil
   320	
   321		// Update the next ticket to be notified. We can set it to the current
   322		// value of wait because any previous waiters are already in the list
   323		// or will notice that they have already been notified when trying to
   324		// add themselves to the list.
   325		atomic.Store(&l.notify, atomic.Load(&l.wait))
   326		unlock(&l.lock)
   327	
   328		// Go through the local list and ready all waiters.
   329		for s != nil {
   330			next := s.next
   331			s.next = nil
   332			readyWithTime(s, 4)
   333			s = next
   334		}
   335	}
   336	
   337	// notifyListNotifyOne notifies one entry in the list.
   338	//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
   339	func notifyListNotifyOne(l *notifyList) {
   340		// Fast-path: if there are no new waiters since the last notification
   341		// we don't need to acquire the lock at all.
   342		if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
   343			return
   344		}
   345	
   346		lock(&l.lock)
   347	
   348		// Re-check under the lock if we need to do anything.
   349		t := l.notify
   350		if t == atomic.Load(&l.wait) {
   351			unlock(&l.lock)
   352			return
   353		}
   354	
   355		// Update the next notify ticket number, and try to find the G that
   356		// needs to be notified. If it hasn't made it to the list yet we won't
   357		// find it, but it won't park itself once it sees the new notify number.
   358		atomic.Store(&l.notify, t+1)
   359		for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
   360			if s.ticket == t {
   361				n := s.next
   362				if p != nil {
   363					p.next = n
   364				} else {
   365					l.head = n
   366				}
   367				if n == nil {
   368					l.tail = p
   369				}
   370				unlock(&l.lock)
   371				s.next = nil
   372				readyWithTime(s, 4)
   373				return
   374			}
   375		}
   376		unlock(&l.lock)
   377	}
   378	
   379	//go:linkname notifyListCheck sync.runtime_notifyListCheck
   380	func notifyListCheck(sz uintptr) {
   381		if sz != unsafe.Sizeof(notifyList{}) {
   382			print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")
   383			throw("bad notifyList size")
   384		}
   385	}
   386	

View as plain text