...
Run Format

Source file src/internal/poll/fd_mutex.go

Documentation: internal/poll

     1  // Copyright 2013 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package poll
     6  
     7  import "sync/atomic"
     8  
     9  // fdMutex is a specialized synchronization primitive that manages
    10  // lifetime of an fd and serializes access to Read, Write and Close
    11  // methods on FD.
    12  type fdMutex struct {
    13  	state uint64
    14  	rsema uint32
    15  	wsema uint32
    16  }
    17  
    18  // fdMutex.state is organized as follows:
    19  // 1 bit - whether FD is closed, if set all subsequent lock operations will fail.
    20  // 1 bit - lock for read operations.
    21  // 1 bit - lock for write operations.
    22  // 20 bits - total number of references (read+write+misc).
    23  // 20 bits - number of outstanding read waiters.
    24  // 20 bits - number of outstanding write waiters.
    25  const (
    26  	mutexClosed  = 1 << 0
    27  	mutexRLock   = 1 << 1
    28  	mutexWLock   = 1 << 2
    29  	mutexRef     = 1 << 3
    30  	mutexRefMask = (1<<20 - 1) << 3
    31  	mutexRWait   = 1 << 23
    32  	mutexRMask   = (1<<20 - 1) << 23
    33  	mutexWWait   = 1 << 43
    34  	mutexWMask   = (1<<20 - 1) << 43
    35  )
    36  
    37  // Read operations must do rwlock(true)/rwunlock(true).
    38  //
    39  // Write operations must do rwlock(false)/rwunlock(false).
    40  //
    41  // Misc operations must do incref/decref.
    42  // Misc operations include functions like setsockopt and setDeadline.
    43  // They need to use incref/decref to ensure that they operate on the
    44  // correct fd in presence of a concurrent close call (otherwise fd can
    45  // be closed under their feet).
    46  //
    47  // Close operations must do increfAndClose/decref.
    48  
    49  // incref adds a reference to mu.
    50  // It reports whether mu is available for reading or writing.
    51  func (mu *fdMutex) incref() bool {
    52  	for {
    53  		old := atomic.LoadUint64(&mu.state)
    54  		if old&mutexClosed != 0 {
    55  			return false
    56  		}
    57  		new := old + mutexRef
    58  		if new&mutexRefMask == 0 {
    59  			panic("inconsistent poll.fdMutex")
    60  		}
    61  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
    62  			return true
    63  		}
    64  	}
    65  }
    66  
    67  // increfAndClose sets the state of mu to closed.
    68  // It returns false if the file was already closed.
    69  func (mu *fdMutex) increfAndClose() bool {
    70  	for {
    71  		old := atomic.LoadUint64(&mu.state)
    72  		if old&mutexClosed != 0 {
    73  			return false
    74  		}
    75  		// Mark as closed and acquire a reference.
    76  		new := (old | mutexClosed) + mutexRef
    77  		if new&mutexRefMask == 0 {
    78  			panic("inconsistent poll.fdMutex")
    79  		}
    80  		// Remove all read and write waiters.
    81  		new &^= mutexRMask | mutexWMask
    82  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
    83  			// Wake all read and write waiters,
    84  			// they will observe closed flag after wakeup.
    85  			for old&mutexRMask != 0 {
    86  				old -= mutexRWait
    87  				runtime_Semrelease(&mu.rsema)
    88  			}
    89  			for old&mutexWMask != 0 {
    90  				old -= mutexWWait
    91  				runtime_Semrelease(&mu.wsema)
    92  			}
    93  			return true
    94  		}
    95  	}
    96  }
    97  
    98  // decref removes a reference from mu.
    99  // It reports whether there is no remaining reference.
   100  func (mu *fdMutex) decref() bool {
   101  	for {
   102  		old := atomic.LoadUint64(&mu.state)
   103  		if old&mutexRefMask == 0 {
   104  			panic("inconsistent poll.fdMutex")
   105  		}
   106  		new := old - mutexRef
   107  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   108  			return new&(mutexClosed|mutexRefMask) == mutexClosed
   109  		}
   110  	}
   111  }
   112  
   113  // lock adds a reference to mu and locks mu.
   114  // It reports whether mu is available for reading or writing.
   115  func (mu *fdMutex) rwlock(read bool) bool {
   116  	var mutexBit, mutexWait, mutexMask uint64
   117  	var mutexSema *uint32
   118  	if read {
   119  		mutexBit = mutexRLock
   120  		mutexWait = mutexRWait
   121  		mutexMask = mutexRMask
   122  		mutexSema = &mu.rsema
   123  	} else {
   124  		mutexBit = mutexWLock
   125  		mutexWait = mutexWWait
   126  		mutexMask = mutexWMask
   127  		mutexSema = &mu.wsema
   128  	}
   129  	for {
   130  		old := atomic.LoadUint64(&mu.state)
   131  		if old&mutexClosed != 0 {
   132  			return false
   133  		}
   134  		var new uint64
   135  		if old&mutexBit == 0 {
   136  			// Lock is free, acquire it.
   137  			new = (old | mutexBit) + mutexRef
   138  			if new&mutexRefMask == 0 {
   139  				panic("inconsistent poll.fdMutex")
   140  			}
   141  		} else {
   142  			// Wait for lock.
   143  			new = old + mutexWait
   144  			if new&mutexMask == 0 {
   145  				panic("inconsistent poll.fdMutex")
   146  			}
   147  		}
   148  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   149  			if old&mutexBit == 0 {
   150  				return true
   151  			}
   152  			runtime_Semacquire(mutexSema)
   153  			// The signaller has subtracted mutexWait.
   154  		}
   155  	}
   156  }
   157  
   158  // unlock removes a reference from mu and unlocks mu.
   159  // It reports whether there is no remaining reference.
   160  func (mu *fdMutex) rwunlock(read bool) bool {
   161  	var mutexBit, mutexWait, mutexMask uint64
   162  	var mutexSema *uint32
   163  	if read {
   164  		mutexBit = mutexRLock
   165  		mutexWait = mutexRWait
   166  		mutexMask = mutexRMask
   167  		mutexSema = &mu.rsema
   168  	} else {
   169  		mutexBit = mutexWLock
   170  		mutexWait = mutexWWait
   171  		mutexMask = mutexWMask
   172  		mutexSema = &mu.wsema
   173  	}
   174  	for {
   175  		old := atomic.LoadUint64(&mu.state)
   176  		if old&mutexBit == 0 || old&mutexRefMask == 0 {
   177  			panic("inconsistent poll.fdMutex")
   178  		}
   179  		// Drop lock, drop reference and wake read waiter if present.
   180  		new := (old &^ mutexBit) - mutexRef
   181  		if old&mutexMask != 0 {
   182  			new -= mutexWait
   183  		}
   184  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   185  			if old&mutexMask != 0 {
   186  				runtime_Semrelease(mutexSema)
   187  			}
   188  			return new&(mutexClosed|mutexRefMask) == mutexClosed
   189  		}
   190  	}
   191  }
   192  
   193  // Implemented in runtime package.
   194  func runtime_Semacquire(sema *uint32)
   195  func runtime_Semrelease(sema *uint32)
   196  
   197  // incref adds a reference to fd.
   198  // It returns an error when fd cannot be used.
   199  func (fd *FD) incref() error {
   200  	if !fd.fdmu.incref() {
   201  		return errClosing(fd.isFile)
   202  	}
   203  	return nil
   204  }
   205  
   206  // decref removes a reference from fd.
   207  // It also closes fd when the state of fd is set to closed and there
   208  // is no remaining reference.
   209  func (fd *FD) decref() error {
   210  	if fd.fdmu.decref() {
   211  		return fd.destroy()
   212  	}
   213  	return nil
   214  }
   215  
   216  // readLock adds a reference to fd and locks fd for reading.
   217  // It returns an error when fd cannot be used for reading.
   218  func (fd *FD) readLock() error {
   219  	if !fd.fdmu.rwlock(true) {
   220  		return errClosing(fd.isFile)
   221  	}
   222  	return nil
   223  }
   224  
   225  // readUnlock removes a reference from fd and unlocks fd for reading.
   226  // It also closes fd when the state of fd is set to closed and there
   227  // is no remaining reference.
   228  func (fd *FD) readUnlock() {
   229  	if fd.fdmu.rwunlock(true) {
   230  		fd.destroy()
   231  	}
   232  }
   233  
   234  // writeLock adds a reference to fd and locks fd for writing.
   235  // It returns an error when fd cannot be used for writing.
   236  func (fd *FD) writeLock() error {
   237  	if !fd.fdmu.rwlock(false) {
   238  		return errClosing(fd.isFile)
   239  	}
   240  	return nil
   241  }
   242  
   243  // writeUnlock removes a reference from fd and unlocks fd for writing.
   244  // It also closes fd when the state of fd is set to closed and there
   245  // is no remaining reference.
   246  func (fd *FD) writeUnlock() {
   247  	if fd.fdmu.rwunlock(false) {
   248  		fd.destroy()
   249  	}
   250  }
   251  

View as plain text