...
Run Format

Source file src/net/fd_mutex.go

     1	// Copyright 2013 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package net
     6	
     7	import "sync/atomic"
     8	
     9	// fdMutex is a specialized synchronization primitive that manages
    10	// lifetime of an fd and serializes access to Read, Write and Close
    11	// methods on netFD.
    12	type fdMutex struct {
    13		state uint64
    14		rsema uint32
    15		wsema uint32
    16	}
    17	
    18	// fdMutex.state is organized as follows:
    19	// 1 bit - whether netFD is closed, if set all subsequent lock operations will fail.
    20	// 1 bit - lock for read operations.
    21	// 1 bit - lock for write operations.
    22	// 20 bits - total number of references (read+write+misc).
    23	// 20 bits - number of outstanding read waiters.
    24	// 20 bits - number of outstanding write waiters.
    25	const (
    26		mutexClosed  = 1 << 0
    27		mutexRLock   = 1 << 1
    28		mutexWLock   = 1 << 2
    29		mutexRef     = 1 << 3
    30		mutexRefMask = (1<<20 - 1) << 3
    31		mutexRWait   = 1 << 23
    32		mutexRMask   = (1<<20 - 1) << 23
    33		mutexWWait   = 1 << 43
    34		mutexWMask   = (1<<20 - 1) << 43
    35	)
    36	
    37	// Read operations must do rwlock(true)/rwunlock(true).
    38	//
    39	// Write operations must do rwlock(false)/rwunlock(false).
    40	//
    41	// Misc operations must do incref/decref.
    42	// Misc operations include functions like setsockopt and setDeadline.
    43	// They need to use incref/decref to ensure that they operate on the
    44	// correct fd in presence of a concurrent close call (otherwise fd can
    45	// be closed under their feet).
    46	//
    47	// Close operations must do increfAndClose/decref.
    48	
    49	// incref adds a reference to mu.
    50	// It reports whether mu is available for reading or writing.
    51	func (mu *fdMutex) incref() bool {
    52		for {
    53			old := atomic.LoadUint64(&mu.state)
    54			if old&mutexClosed != 0 {
    55				return false
    56			}
    57			new := old + mutexRef
    58			if new&mutexRefMask == 0 {
    59				panic("net: inconsistent fdMutex")
    60			}
    61			if atomic.CompareAndSwapUint64(&mu.state, old, new) {
    62				return true
    63			}
    64		}
    65	}
    66	
    67	// increfAndClose sets the state of mu to closed.
    68	// It reports whether there is no remaining reference.
    69	func (mu *fdMutex) increfAndClose() bool {
    70		for {
    71			old := atomic.LoadUint64(&mu.state)
    72			if old&mutexClosed != 0 {
    73				return false
    74			}
    75			// Mark as closed and acquire a reference.
    76			new := (old | mutexClosed) + mutexRef
    77			if new&mutexRefMask == 0 {
    78				panic("net: inconsistent fdMutex")
    79			}
    80			// Remove all read and write waiters.
    81			new &^= mutexRMask | mutexWMask
    82			if atomic.CompareAndSwapUint64(&mu.state, old, new) {
    83				// Wake all read and write waiters,
    84				// they will observe closed flag after wakeup.
    85				for old&mutexRMask != 0 {
    86					old -= mutexRWait
    87					runtime_Semrelease(&mu.rsema)
    88				}
    89				for old&mutexWMask != 0 {
    90					old -= mutexWWait
    91					runtime_Semrelease(&mu.wsema)
    92				}
    93				return true
    94			}
    95		}
    96	}
    97	
    98	// decref removes a reference from mu.
    99	// It reports whether there is no remaining reference.
   100	func (mu *fdMutex) decref() bool {
   101		for {
   102			old := atomic.LoadUint64(&mu.state)
   103			if old&mutexRefMask == 0 {
   104				panic("net: inconsistent fdMutex")
   105			}
   106			new := old - mutexRef
   107			if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   108				return new&(mutexClosed|mutexRefMask) == mutexClosed
   109			}
   110		}
   111	}
   112	
   113	// lock adds a reference to mu and locks mu.
   114	// It reports whether mu is available for reading or writing.
   115	func (mu *fdMutex) rwlock(read bool) bool {
   116		var mutexBit, mutexWait, mutexMask uint64
   117		var mutexSema *uint32
   118		if read {
   119			mutexBit = mutexRLock
   120			mutexWait = mutexRWait
   121			mutexMask = mutexRMask
   122			mutexSema = &mu.rsema
   123		} else {
   124			mutexBit = mutexWLock
   125			mutexWait = mutexWWait
   126			mutexMask = mutexWMask
   127			mutexSema = &mu.wsema
   128		}
   129		for {
   130			old := atomic.LoadUint64(&mu.state)
   131			if old&mutexClosed != 0 {
   132				return false
   133			}
   134			var new uint64
   135			if old&mutexBit == 0 {
   136				// Lock is free, acquire it.
   137				new = (old | mutexBit) + mutexRef
   138				if new&mutexRefMask == 0 {
   139					panic("net: inconsistent fdMutex")
   140				}
   141			} else {
   142				// Wait for lock.
   143				new = old + mutexWait
   144				if new&mutexMask == 0 {
   145					panic("net: inconsistent fdMutex")
   146				}
   147			}
   148			if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   149				if old&mutexBit == 0 {
   150					return true
   151				}
   152				runtime_Semacquire(mutexSema)
   153				// The signaller has subtracted mutexWait.
   154			}
   155		}
   156	}
   157	
   158	// unlock removes a reference from mu and unlocks mu.
   159	// It reports whether there is no remaining reference.
   160	func (mu *fdMutex) rwunlock(read bool) bool {
   161		var mutexBit, mutexWait, mutexMask uint64
   162		var mutexSema *uint32
   163		if read {
   164			mutexBit = mutexRLock
   165			mutexWait = mutexRWait
   166			mutexMask = mutexRMask
   167			mutexSema = &mu.rsema
   168		} else {
   169			mutexBit = mutexWLock
   170			mutexWait = mutexWWait
   171			mutexMask = mutexWMask
   172			mutexSema = &mu.wsema
   173		}
   174		for {
   175			old := atomic.LoadUint64(&mu.state)
   176			if old&mutexBit == 0 || old&mutexRefMask == 0 {
   177				panic("net: inconsistent fdMutex")
   178			}
   179			// Drop lock, drop reference and wake read waiter if present.
   180			new := (old &^ mutexBit) - mutexRef
   181			if old&mutexMask != 0 {
   182				new -= mutexWait
   183			}
   184			if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   185				if old&mutexMask != 0 {
   186					runtime_Semrelease(mutexSema)
   187				}
   188				return new&(mutexClosed|mutexRefMask) == mutexClosed
   189			}
   190		}
   191	}
   192	
   193	// Implemented in runtime package.
   194	func runtime_Semacquire(sema *uint32)
   195	func runtime_Semrelease(sema *uint32)
   196	
   197	// incref adds a reference to fd.
   198	// It returns an error when fd cannot be used.
   199	func (fd *netFD) incref() error {
   200		if !fd.fdmu.incref() {
   201			return errClosing
   202		}
   203		return nil
   204	}
   205	
   206	// decref removes a reference from fd.
   207	// It also closes fd when the state of fd is set to closed and there
   208	// is no remaining reference.
   209	func (fd *netFD) decref() {
   210		if fd.fdmu.decref() {
   211			fd.destroy()
   212		}
   213	}
   214	
   215	// readLock adds a reference to fd and locks fd for reading.
   216	// It returns an error when fd cannot be used for reading.
   217	func (fd *netFD) readLock() error {
   218		if !fd.fdmu.rwlock(true) {
   219			return errClosing
   220		}
   221		return nil
   222	}
   223	
   224	// readUnlock removes a reference from fd and unlocks fd for reading.
   225	// It also closes fd when the state of fd is set to closed and there
   226	// is no remaining reference.
   227	func (fd *netFD) readUnlock() {
   228		if fd.fdmu.rwunlock(true) {
   229			fd.destroy()
   230		}
   231	}
   232	
   233	// writeLock adds a reference to fd and locks fd for writing.
   234	// It returns an error when fd cannot be used for writing.
   235	func (fd *netFD) writeLock() error {
   236		if !fd.fdmu.rwlock(false) {
   237			return errClosing
   238		}
   239		return nil
   240	}
   241	
   242	// writeUnlock removes a reference from fd and unlocks fd for writing.
   243	// It also closes fd when the state of fd is set to closed and there
   244	// is no remaining reference.
   245	func (fd *netFD) writeUnlock() {
   246		if fd.fdmu.rwunlock(false) {
   247			fd.destroy()
   248		}
   249	}
   250	

View as plain text