Black Lives Matter. Support the Equal Justice Initiative.

Source file src/runtime/netpoll.go

Documentation: runtime

     1  // Copyright 2013 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // +build aix darwin dragonfly freebsd js,wasm linux netbsd openbsd solaris windows
     6  
     7  package runtime
     8  
     9  import (
    10  	"runtime/internal/atomic"
    11  	"unsafe"
    12  )
    13  
    14  // Integrated network poller (platform-independent part).
    15  // A particular implementation (epoll/kqueue/port/AIX/Windows)
    16  // must define the following functions:
    17  //
    18  // func netpollinit()
    19  //     Initialize the poller. Only called once.
    20  //
    21  // func netpollopen(fd uintptr, pd *pollDesc) int32
    22  //     Arm edge-triggered notifications for fd. The pd argument is to pass
    23  //     back to netpollready when fd is ready. Return an errno value.
    24  //
    25  // func netpoll(delta int64) gList
    26  //     Poll the network. If delta < 0, block indefinitely. If delta == 0,
    27  //     poll without blocking. If delta > 0, block for up to delta nanoseconds.
    28  //     Return a list of goroutines built by calling netpollready.
    29  //
    30  // func netpollBreak()
    31  //     Wake up the network poller, assumed to be blocked in netpoll.
    32  //
    33  // func netpollIsPollDescriptor(fd uintptr) bool
    34  //     Reports whether fd is a file descriptor used by the poller.
    35  
    36  // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
    37  // goroutines respectively. The semaphore can be in the following states:
    38  // pdReady - io readiness notification is pending;
    39  //           a goroutine consumes the notification by changing the state to nil.
    40  // pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
    41  //          the goroutine commits to park by changing the state to G pointer,
    42  //          or, alternatively, concurrent io notification changes the state to READY,
    43  //          or, alternatively, concurrent timeout/close changes the state to nil.
    44  // G pointer - the goroutine is blocked on the semaphore;
    45  //             io notification or timeout/close changes the state to READY or nil respectively
    46  //             and unparks the goroutine.
    47  // nil - nothing of the above.
    48  const (
    49  	pdReady uintptr = 1
    50  	pdWait  uintptr = 2
    51  )
    52  
    53  const pollBlockSize = 4 * 1024
    54  
    55  // Network poller descriptor.
    56  //
    57  // No heap pointers.
    58  //
    59  //go:notinheap
    60  type pollDesc struct {
    61  	link *pollDesc // in pollcache, protected by pollcache.lock
    62  
    63  	// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
    64  	// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
    65  	// pollReset, pollWait, pollWaitCanceled and runtime¬∑netpollready (IO readiness notification)
    66  	// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
    67  	// in a lock-free way by all operations.
    68  	// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
    69  	// that will blow up when GC starts moving objects.
    70  	lock    mutex // protects the following fields
    71  	fd      uintptr
    72  	closing bool
    73  	everr   bool    // marks event scanning error happened
    74  	user    uint32  // user settable cookie
    75  	rseq    uintptr // protects from stale read timers
    76  	rg      uintptr // pdReady, pdWait, G waiting for read or nil
    77  	rt      timer   // read deadline timer (set if rt.f != nil)
    78  	rd      int64   // read deadline
    79  	wseq    uintptr // protects from stale write timers
    80  	wg      uintptr // pdReady, pdWait, G waiting for write or nil
    81  	wt      timer   // write deadline timer
    82  	wd      int64   // write deadline
    83  }
    84  
    85  type pollCache struct {
    86  	lock  mutex
    87  	first *pollDesc
    88  	// PollDesc objects must be type-stable,
    89  	// because we can get ready notification from epoll/kqueue
    90  	// after the descriptor is closed/reused.
    91  	// Stale notifications are detected using seq variable,
    92  	// seq is incremented when deadlines are changed or descriptor is reused.
    93  }
    94  
    95  var (
    96  	netpollInitLock mutex
    97  	netpollInited   uint32
    98  
    99  	pollcache      pollCache
   100  	netpollWaiters uint32
   101  )
   102  
   103  //go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
   104  func poll_runtime_pollServerInit() {
   105  	netpollGenericInit()
   106  }
   107  
   108  func netpollGenericInit() {
   109  	if atomic.Load(&netpollInited) == 0 {
   110  		lock(&netpollInitLock)
   111  		if netpollInited == 0 {
   112  			netpollinit()
   113  			atomic.Store(&netpollInited, 1)
   114  		}
   115  		unlock(&netpollInitLock)
   116  	}
   117  }
   118  
   119  func netpollinited() bool {
   120  	return atomic.Load(&netpollInited) != 0
   121  }
   122  
   123  //go:linkname poll_runtime_isPollServerDescriptor internal/poll.runtime_isPollServerDescriptor
   124  
   125  // poll_runtime_isPollServerDescriptor reports whether fd is a
   126  // descriptor being used by netpoll.
   127  func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
   128  	return netpollIsPollDescriptor(fd)
   129  }
   130  
   131  //go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
   132  func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
   133  	pd := pollcache.alloc()
   134  	lock(&pd.lock)
   135  	if pd.wg != 0 && pd.wg != pdReady {
   136  		throw("runtime: blocked write on free polldesc")
   137  	}
   138  	if pd.rg != 0 && pd.rg != pdReady {
   139  		throw("runtime: blocked read on free polldesc")
   140  	}
   141  	pd.fd = fd
   142  	pd.closing = false
   143  	pd.everr = false
   144  	pd.rseq++
   145  	pd.rg = 0
   146  	pd.rd = 0
   147  	pd.wseq++
   148  	pd.wg = 0
   149  	pd.wd = 0
   150  	unlock(&pd.lock)
   151  
   152  	var errno int32
   153  	errno = netpollopen(fd, pd)
   154  	return pd, int(errno)
   155  }
   156  
   157  //go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose
   158  func poll_runtime_pollClose(pd *pollDesc) {
   159  	if !pd.closing {
   160  		throw("runtime: close polldesc w/o unblock")
   161  	}
   162  	if pd.wg != 0 && pd.wg != pdReady {
   163  		throw("runtime: blocked write on closing polldesc")
   164  	}
   165  	if pd.rg != 0 && pd.rg != pdReady {
   166  		throw("runtime: blocked read on closing polldesc")
   167  	}
   168  	netpollclose(pd.fd)
   169  	pollcache.free(pd)
   170  }
   171  
   172  func (c *pollCache) free(pd *pollDesc) {
   173  	lock(&c.lock)
   174  	pd.link = c.first
   175  	c.first = pd
   176  	unlock(&c.lock)
   177  }
   178  
   179  //go:linkname poll_runtime_pollReset internal/poll.runtime_pollReset
   180  func poll_runtime_pollReset(pd *pollDesc, mode int) int {
   181  	err := netpollcheckerr(pd, int32(mode))
   182  	if err != 0 {
   183  		return err
   184  	}
   185  	if mode == 'r' {
   186  		pd.rg = 0
   187  	} else if mode == 'w' {
   188  		pd.wg = 0
   189  	}
   190  	return 0
   191  }
   192  
   193  //go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
   194  func poll_runtime_pollWait(pd *pollDesc, mode int) int {
   195  	err := netpollcheckerr(pd, int32(mode))
   196  	if err != 0 {
   197  		return err
   198  	}
   199  	// As for now only Solaris, illumos, and AIX use level-triggered IO.
   200  	if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
   201  		netpollarm(pd, mode)
   202  	}
   203  	for !netpollblock(pd, int32(mode), false) {
   204  		err = netpollcheckerr(pd, int32(mode))
   205  		if err != 0 {
   206  			return err
   207  		}
   208  		// Can happen if timeout has fired and unblocked us,
   209  		// but before we had a chance to run, timeout has been reset.
   210  		// Pretend it has not happened and retry.
   211  	}
   212  	return 0
   213  }
   214  
   215  //go:linkname poll_runtime_pollWaitCanceled internal/poll.runtime_pollWaitCanceled
   216  func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
   217  	// This function is used only on windows after a failed attempt to cancel
   218  	// a pending async IO operation. Wait for ioready, ignore closing or timeouts.
   219  	for !netpollblock(pd, int32(mode), true) {
   220  	}
   221  }
   222  
   223  //go:linkname poll_runtime_pollSetDeadline internal/poll.runtime_pollSetDeadline
   224  func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
   225  	lock(&pd.lock)
   226  	if pd.closing {
   227  		unlock(&pd.lock)
   228  		return
   229  	}
   230  	rd0, wd0 := pd.rd, pd.wd
   231  	combo0 := rd0 > 0 && rd0 == wd0
   232  	if d > 0 {
   233  		d += nanotime()
   234  		if d <= 0 {
   235  			// If the user has a deadline in the future, but the delay calculation
   236  			// overflows, then set the deadline to the maximum possible value.
   237  			d = 1<<63 - 1
   238  		}
   239  	}
   240  	if mode == 'r' || mode == 'r'+'w' {
   241  		pd.rd = d
   242  	}
   243  	if mode == 'w' || mode == 'r'+'w' {
   244  		pd.wd = d
   245  	}
   246  	combo := pd.rd > 0 && pd.rd == pd.wd
   247  	rtf := netpollReadDeadline
   248  	if combo {
   249  		rtf = netpollDeadline
   250  	}
   251  	if pd.rt.f == nil {
   252  		if pd.rd > 0 {
   253  			pd.rt.f = rtf
   254  			// Copy current seq into the timer arg.
   255  			// Timer func will check the seq against current descriptor seq,
   256  			// if they differ the descriptor was reused or timers were reset.
   257  			pd.rt.arg = pd
   258  			pd.rt.seq = pd.rseq
   259  			resettimer(&pd.rt, pd.rd)
   260  		}
   261  	} else if pd.rd != rd0 || combo != combo0 {
   262  		pd.rseq++ // invalidate current timers
   263  		if pd.rd > 0 {
   264  			modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq)
   265  		} else {
   266  			deltimer(&pd.rt)
   267  			pd.rt.f = nil
   268  		}
   269  	}
   270  	if pd.wt.f == nil {
   271  		if pd.wd > 0 && !combo {
   272  			pd.wt.f = netpollWriteDeadline
   273  			pd.wt.arg = pd
   274  			pd.wt.seq = pd.wseq
   275  			resettimer(&pd.wt, pd.wd)
   276  		}
   277  	} else if pd.wd != wd0 || combo != combo0 {
   278  		pd.wseq++ // invalidate current timers
   279  		if pd.wd > 0 && !combo {
   280  			modtimer(&pd.wt, pd.wd, 0, netpollWriteDeadline, pd, pd.wseq)
   281  		} else {
   282  			deltimer(&pd.wt)
   283  			pd.wt.f = nil
   284  		}
   285  	}
   286  	// If we set the new deadline in the past, unblock currently pending IO if any.
   287  	var rg, wg *g
   288  	if pd.rd < 0 || pd.wd < 0 {
   289  		atomic.StorepNoWB(noescape(unsafe.Pointer(&wg)), nil) // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
   290  		if pd.rd < 0 {
   291  			rg = netpollunblock(pd, 'r', false)
   292  		}
   293  		if pd.wd < 0 {
   294  			wg = netpollunblock(pd, 'w', false)
   295  		}
   296  	}
   297  	unlock(&pd.lock)
   298  	if rg != nil {
   299  		netpollgoready(rg, 3)
   300  	}
   301  	if wg != nil {
   302  		netpollgoready(wg, 3)
   303  	}
   304  }
   305  
   306  //go:linkname poll_runtime_pollUnblock internal/poll.runtime_pollUnblock
   307  func poll_runtime_pollUnblock(pd *pollDesc) {
   308  	lock(&pd.lock)
   309  	if pd.closing {
   310  		throw("runtime: unblock on closing polldesc")
   311  	}
   312  	pd.closing = true
   313  	pd.rseq++
   314  	pd.wseq++
   315  	var rg, wg *g
   316  	atomic.StorepNoWB(noescape(unsafe.Pointer(&rg)), nil) // full memory barrier between store to closing and read of rg/wg in netpollunblock
   317  	rg = netpollunblock(pd, 'r', false)
   318  	wg = netpollunblock(pd, 'w', false)
   319  	if pd.rt.f != nil {
   320  		deltimer(&pd.rt)
   321  		pd.rt.f = nil
   322  	}
   323  	if pd.wt.f != nil {
   324  		deltimer(&pd.wt)
   325  		pd.wt.f = nil
   326  	}
   327  	unlock(&pd.lock)
   328  	if rg != nil {
   329  		netpollgoready(rg, 3)
   330  	}
   331  	if wg != nil {
   332  		netpollgoready(wg, 3)
   333  	}
   334  }
   335  
   336  // netpollready is called by the platform-specific netpoll function.
   337  // It declares that the fd associated with pd is ready for I/O.
   338  // The toRun argument is used to build a list of goroutines to return
   339  // from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
   340  // whether the fd is ready for reading or writing or both.
   341  //
   342  // This may run while the world is stopped, so write barriers are not allowed.
   343  //go:nowritebarrier
   344  func netpollready(toRun *gList, pd *pollDesc, mode int32) {
   345  	var rg, wg *g
   346  	if mode == 'r' || mode == 'r'+'w' {
   347  		rg = netpollunblock(pd, 'r', true)
   348  	}
   349  	if mode == 'w' || mode == 'r'+'w' {
   350  		wg = netpollunblock(pd, 'w', true)
   351  	}
   352  	if rg != nil {
   353  		toRun.push(rg)
   354  	}
   355  	if wg != nil {
   356  		toRun.push(wg)
   357  	}
   358  }
   359  
   360  func netpollcheckerr(pd *pollDesc, mode int32) int {
   361  	if pd.closing {
   362  		return 1 // ErrFileClosing or ErrNetClosing
   363  	}
   364  	if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
   365  		return 2 // ErrTimeout
   366  	}
   367  	// Report an event scanning error only on a read event.
   368  	// An error on a write event will be captured in a subsequent
   369  	// write call that is able to report a more specific error.
   370  	if mode == 'r' && pd.everr {
   371  		return 3 // ErrNotPollable
   372  	}
   373  	return 0
   374  }
   375  
   376  func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
   377  	r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
   378  	if r {
   379  		// Bump the count of goroutines waiting for the poller.
   380  		// The scheduler uses this to decide whether to block
   381  		// waiting for the poller if there is nothing else to do.
   382  		atomic.Xadd(&netpollWaiters, 1)
   383  	}
   384  	return r
   385  }
   386  
   387  func netpollgoready(gp *g, traceskip int) {
   388  	atomic.Xadd(&netpollWaiters, -1)
   389  	goready(gp, traceskip+1)
   390  }
   391  
   392  // returns true if IO is ready, or false if timedout or closed
   393  // waitio - wait only for completed IO, ignore errors
   394  func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
   395  	gpp := &pd.rg
   396  	if mode == 'w' {
   397  		gpp = &pd.wg
   398  	}
   399  
   400  	// set the gpp semaphore to WAIT
   401  	for {
   402  		old := *gpp
   403  		if old == pdReady {
   404  			*gpp = 0
   405  			return true
   406  		}
   407  		if old != 0 {
   408  			throw("runtime: double wait")
   409  		}
   410  		if atomic.Casuintptr(gpp, 0, pdWait) {
   411  			break
   412  		}
   413  	}
   414  
   415  	// need to recheck error states after setting gpp to WAIT
   416  	// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
   417  	// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
   418  	if waitio || netpollcheckerr(pd, mode) == 0 {
   419  		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
   420  	}
   421  	// be careful to not lose concurrent READY notification
   422  	old := atomic.Xchguintptr(gpp, 0)
   423  	if old > pdWait {
   424  		throw("runtime: corrupted polldesc")
   425  	}
   426  	return old == pdReady
   427  }
   428  
   429  func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
   430  	gpp := &pd.rg
   431  	if mode == 'w' {
   432  		gpp = &pd.wg
   433  	}
   434  
   435  	for {
   436  		old := *gpp
   437  		if old == pdReady {
   438  			return nil
   439  		}
   440  		if old == 0 && !ioready {
   441  			// Only set READY for ioready. runtime_pollWait
   442  			// will check for timeout/cancel before waiting.
   443  			return nil
   444  		}
   445  		var new uintptr
   446  		if ioready {
   447  			new = pdReady
   448  		}
   449  		if atomic.Casuintptr(gpp, old, new) {
   450  			if old == pdReady || old == pdWait {
   451  				old = 0
   452  			}
   453  			return (*g)(unsafe.Pointer(old))
   454  		}
   455  	}
   456  }
   457  
   458  func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
   459  	lock(&pd.lock)
   460  	// Seq arg is seq when the timer was set.
   461  	// If it's stale, ignore the timer event.
   462  	currentSeq := pd.rseq
   463  	if !read {
   464  		currentSeq = pd.wseq
   465  	}
   466  	if seq != currentSeq {
   467  		// The descriptor was reused or timers were reset.
   468  		unlock(&pd.lock)
   469  		return
   470  	}
   471  	var rg *g
   472  	if read {
   473  		if pd.rd <= 0 || pd.rt.f == nil {
   474  			throw("runtime: inconsistent read deadline")
   475  		}
   476  		pd.rd = -1
   477  		atomic.StorepNoWB(unsafe.Pointer(&pd.rt.f), nil) // full memory barrier between store to rd and load of rg in netpollunblock
   478  		rg = netpollunblock(pd, 'r', false)
   479  	}
   480  	var wg *g
   481  	if write {
   482  		if pd.wd <= 0 || pd.wt.f == nil && !read {
   483  			throw("runtime: inconsistent write deadline")
   484  		}
   485  		pd.wd = -1
   486  		atomic.StorepNoWB(unsafe.Pointer(&pd.wt.f), nil) // full memory barrier between store to wd and load of wg in netpollunblock
   487  		wg = netpollunblock(pd, 'w', false)
   488  	}
   489  	unlock(&pd.lock)
   490  	if rg != nil {
   491  		netpollgoready(rg, 0)
   492  	}
   493  	if wg != nil {
   494  		netpollgoready(wg, 0)
   495  	}
   496  }
   497  
   498  func netpollDeadline(arg interface{}, seq uintptr) {
   499  	netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
   500  }
   501  
   502  func netpollReadDeadline(arg interface{}, seq uintptr) {
   503  	netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
   504  }
   505  
   506  func netpollWriteDeadline(arg interface{}, seq uintptr) {
   507  	netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
   508  }
   509  
   510  func (c *pollCache) alloc() *pollDesc {
   511  	lock(&c.lock)
   512  	if c.first == nil {
   513  		const pdSize = unsafe.Sizeof(pollDesc{})
   514  		n := pollBlockSize / pdSize
   515  		if n == 0 {
   516  			n = 1
   517  		}
   518  		// Must be in non-GC memory because can be referenced
   519  		// only from epoll/kqueue internals.
   520  		mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
   521  		for i := uintptr(0); i < n; i++ {
   522  			pd := (*pollDesc)(add(mem, i*pdSize))
   523  			pd.link = c.first
   524  			c.first = pd
   525  		}
   526  	}
   527  	pd := c.first
   528  	c.first = pd.link
   529  	unlock(&c.lock)
   530  	return pd
   531  }
   532  

View as plain text