Source file src/runtime/netpoll.go

Documentation: runtime

     1  // Copyright 2013 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // +build aix darwin dragonfly freebsd js,wasm linux nacl netbsd openbsd solaris windows
     6  
     7  package runtime
     8  
     9  import (
    10  	"runtime/internal/atomic"
    11  	"unsafe"
    12  )
    13  
    14  // Integrated network poller (platform-independent part).
    15  // A particular implementation (epoll/kqueue) must define the following functions:
    16  // func netpollinit()			// to initialize the poller
    17  // func netpollopen(fd uintptr, pd *pollDesc) int32	// to arm edge-triggered notifications
    18  // and associate fd with pd.
    19  // An implementation must call the following function to denote that the pd is ready.
    20  // func netpollready(gpp **g, pd *pollDesc, mode int32)
    21  
    22  // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
    23  // goroutines respectively. The semaphore can be in the following states:
    24  // pdReady - io readiness notification is pending;
    25  //           a goroutine consumes the notification by changing the state to nil.
    26  // pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
    27  //          the goroutine commits to park by changing the state to G pointer,
    28  //          or, alternatively, concurrent io notification changes the state to READY,
    29  //          or, alternatively, concurrent timeout/close changes the state to nil.
    30  // G pointer - the goroutine is blocked on the semaphore;
    31  //             io notification or timeout/close changes the state to READY or nil respectively
    32  //             and unparks the goroutine.
    33  // nil - nothing of the above.
    34  const (
    35  	pdReady uintptr = 1
    36  	pdWait  uintptr = 2
    37  )
    38  
    39  const pollBlockSize = 4 * 1024
    40  
    41  // Network poller descriptor.
    42  //
    43  // No heap pointers.
    44  //
    45  //go:notinheap
    46  type pollDesc struct {
    47  	link *pollDesc // in pollcache, protected by pollcache.lock
    48  
    49  	// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
    50  	// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
    51  	// pollReset, pollWait, pollWaitCanceled and runtime¬∑netpollready (IO readiness notification)
    52  	// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
    53  	// in a lock-free way by all operations.
    54  	// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
    55  	// that will blow up when GC starts moving objects.
    56  	lock    mutex // protects the following fields
    57  	fd      uintptr
    58  	closing bool
    59  	everr   bool    // marks event scanning error happened
    60  	user    uint32  // user settable cookie
    61  	rseq    uintptr // protects from stale read timers
    62  	rg      uintptr // pdReady, pdWait, G waiting for read or nil
    63  	rt      timer   // read deadline timer (set if rt.f != nil)
    64  	rd      int64   // read deadline
    65  	wseq    uintptr // protects from stale write timers
    66  	wg      uintptr // pdReady, pdWait, G waiting for write or nil
    67  	wt      timer   // write deadline timer
    68  	wd      int64   // write deadline
    69  }
    70  
    71  type pollCache struct {
    72  	lock  mutex
    73  	first *pollDesc
    74  	// PollDesc objects must be type-stable,
    75  	// because we can get ready notification from epoll/kqueue
    76  	// after the descriptor is closed/reused.
    77  	// Stale notifications are detected using seq variable,
    78  	// seq is incremented when deadlines are changed or descriptor is reused.
    79  }
    80  
    81  var (
    82  	netpollInited  uint32
    83  	pollcache      pollCache
    84  	netpollWaiters uint32
    85  )
    86  
    87  //go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
    88  func poll_runtime_pollServerInit() {
    89  	netpollinit()
    90  	atomic.Store(&netpollInited, 1)
    91  }
    92  
    93  func netpollinited() bool {
    94  	return atomic.Load(&netpollInited) != 0
    95  }
    96  
    97  //go:linkname poll_runtime_isPollServerDescriptor internal/poll.runtime_isPollServerDescriptor
    98  
    99  // poll_runtime_isPollServerDescriptor reports whether fd is a
   100  // descriptor being used by netpoll.
   101  func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
   102  	fds := netpolldescriptor()
   103  	if GOOS != "aix" {
   104  		return fd == fds
   105  	} else {
   106  		// AIX have a pipe in its netpoll implementation.
   107  		// Therefore, two fd are returned by netpolldescriptor using a mask.
   108  		return fd == fds&0xFFFF || fd == (fds>>16)&0xFFFF
   109  	}
   110  }
   111  
   112  //go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
   113  func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
   114  	pd := pollcache.alloc()
   115  	lock(&pd.lock)
   116  	if pd.wg != 0 && pd.wg != pdReady {
   117  		throw("runtime: blocked write on free polldesc")
   118  	}
   119  	if pd.rg != 0 && pd.rg != pdReady {
   120  		throw("runtime: blocked read on free polldesc")
   121  	}
   122  	pd.fd = fd
   123  	pd.closing = false
   124  	pd.everr = false
   125  	pd.rseq++
   126  	pd.rg = 0
   127  	pd.rd = 0
   128  	pd.wseq++
   129  	pd.wg = 0
   130  	pd.wd = 0
   131  	unlock(&pd.lock)
   132  
   133  	var errno int32
   134  	errno = netpollopen(fd, pd)
   135  	return pd, int(errno)
   136  }
   137  
   138  //go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose
   139  func poll_runtime_pollClose(pd *pollDesc) {
   140  	if !pd.closing {
   141  		throw("runtime: close polldesc w/o unblock")
   142  	}
   143  	if pd.wg != 0 && pd.wg != pdReady {
   144  		throw("runtime: blocked write on closing polldesc")
   145  	}
   146  	if pd.rg != 0 && pd.rg != pdReady {
   147  		throw("runtime: blocked read on closing polldesc")
   148  	}
   149  	netpollclose(pd.fd)
   150  	pollcache.free(pd)
   151  }
   152  
   153  func (c *pollCache) free(pd *pollDesc) {
   154  	lock(&c.lock)
   155  	pd.link = c.first
   156  	c.first = pd
   157  	unlock(&c.lock)
   158  }
   159  
   160  //go:linkname poll_runtime_pollReset internal/poll.runtime_pollReset
   161  func poll_runtime_pollReset(pd *pollDesc, mode int) int {
   162  	err := netpollcheckerr(pd, int32(mode))
   163  	if err != 0 {
   164  		return err
   165  	}
   166  	if mode == 'r' {
   167  		pd.rg = 0
   168  	} else if mode == 'w' {
   169  		pd.wg = 0
   170  	}
   171  	return 0
   172  }
   173  
   174  //go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
   175  func poll_runtime_pollWait(pd *pollDesc, mode int) int {
   176  	err := netpollcheckerr(pd, int32(mode))
   177  	if err != 0 {
   178  		return err
   179  	}
   180  	// As for now only Solaris, illumos, and AIX use level-triggered IO.
   181  	if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
   182  		netpollarm(pd, mode)
   183  	}
   184  	for !netpollblock(pd, int32(mode), false) {
   185  		err = netpollcheckerr(pd, int32(mode))
   186  		if err != 0 {
   187  			return err
   188  		}
   189  		// Can happen if timeout has fired and unblocked us,
   190  		// but before we had a chance to run, timeout has been reset.
   191  		// Pretend it has not happened and retry.
   192  	}
   193  	return 0
   194  }
   195  
   196  //go:linkname poll_runtime_pollWaitCanceled internal/poll.runtime_pollWaitCanceled
   197  func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
   198  	// This function is used only on windows after a failed attempt to cancel
   199  	// a pending async IO operation. Wait for ioready, ignore closing or timeouts.
   200  	for !netpollblock(pd, int32(mode), true) {
   201  	}
   202  }
   203  
   204  //go:linkname poll_runtime_pollSetDeadline internal/poll.runtime_pollSetDeadline
   205  func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
   206  	lock(&pd.lock)
   207  	if pd.closing {
   208  		unlock(&pd.lock)
   209  		return
   210  	}
   211  	rd0, wd0 := pd.rd, pd.wd
   212  	combo0 := rd0 > 0 && rd0 == wd0
   213  	if d > 0 {
   214  		d += nanotime()
   215  		if d <= 0 {
   216  			// If the user has a deadline in the future, but the delay calculation
   217  			// overflows, then set the deadline to the maximum possible value.
   218  			d = 1<<63 - 1
   219  		}
   220  	}
   221  	if mode == 'r' || mode == 'r'+'w' {
   222  		pd.rd = d
   223  	}
   224  	if mode == 'w' || mode == 'r'+'w' {
   225  		pd.wd = d
   226  	}
   227  	combo := pd.rd > 0 && pd.rd == pd.wd
   228  	rtf := netpollReadDeadline
   229  	if combo {
   230  		rtf = netpollDeadline
   231  	}
   232  	if pd.rt.f == nil {
   233  		if pd.rd > 0 {
   234  			pd.rt.f = rtf
   235  			pd.rt.when = pd.rd
   236  			// Copy current seq into the timer arg.
   237  			// Timer func will check the seq against current descriptor seq,
   238  			// if they differ the descriptor was reused or timers were reset.
   239  			pd.rt.arg = pd
   240  			pd.rt.seq = pd.rseq
   241  			addtimer(&pd.rt)
   242  		}
   243  	} else if pd.rd != rd0 || combo != combo0 {
   244  		pd.rseq++ // invalidate current timers
   245  		if pd.rd > 0 {
   246  			modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq)
   247  		} else {
   248  			deltimer(&pd.rt)
   249  			pd.rt.f = nil
   250  		}
   251  	}
   252  	if pd.wt.f == nil {
   253  		if pd.wd > 0 && !combo {
   254  			pd.wt.f = netpollWriteDeadline
   255  			pd.wt.when = pd.wd
   256  			pd.wt.arg = pd
   257  			pd.wt.seq = pd.wseq
   258  			addtimer(&pd.wt)
   259  		}
   260  	} else if pd.wd != wd0 || combo != combo0 {
   261  		pd.wseq++ // invalidate current timers
   262  		if pd.wd > 0 && !combo {
   263  			modtimer(&pd.wt, pd.wd, 0, netpollWriteDeadline, pd, pd.wseq)
   264  		} else {
   265  			deltimer(&pd.wt)
   266  			pd.wt.f = nil
   267  		}
   268  	}
   269  	// If we set the new deadline in the past, unblock currently pending IO if any.
   270  	var rg, wg *g
   271  	if pd.rd < 0 || pd.wd < 0 {
   272  		atomic.StorepNoWB(noescape(unsafe.Pointer(&wg)), nil) // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
   273  		if pd.rd < 0 {
   274  			rg = netpollunblock(pd, 'r', false)
   275  		}
   276  		if pd.wd < 0 {
   277  			wg = netpollunblock(pd, 'w', false)
   278  		}
   279  	}
   280  	unlock(&pd.lock)
   281  	if rg != nil {
   282  		netpollgoready(rg, 3)
   283  	}
   284  	if wg != nil {
   285  		netpollgoready(wg, 3)
   286  	}
   287  }
   288  
   289  //go:linkname poll_runtime_pollUnblock internal/poll.runtime_pollUnblock
   290  func poll_runtime_pollUnblock(pd *pollDesc) {
   291  	lock(&pd.lock)
   292  	if pd.closing {
   293  		throw("runtime: unblock on closing polldesc")
   294  	}
   295  	pd.closing = true
   296  	pd.rseq++
   297  	pd.wseq++
   298  	var rg, wg *g
   299  	atomic.StorepNoWB(noescape(unsafe.Pointer(&rg)), nil) // full memory barrier between store to closing and read of rg/wg in netpollunblock
   300  	rg = netpollunblock(pd, 'r', false)
   301  	wg = netpollunblock(pd, 'w', false)
   302  	if pd.rt.f != nil {
   303  		deltimer(&pd.rt)
   304  		pd.rt.f = nil
   305  	}
   306  	if pd.wt.f != nil {
   307  		deltimer(&pd.wt)
   308  		pd.wt.f = nil
   309  	}
   310  	unlock(&pd.lock)
   311  	if rg != nil {
   312  		netpollgoready(rg, 3)
   313  	}
   314  	if wg != nil {
   315  		netpollgoready(wg, 3)
   316  	}
   317  }
   318  
   319  // make pd ready, newly runnable goroutines (if any) are added to toRun.
   320  // May run during STW, so write barriers are not allowed.
   321  //go:nowritebarrier
   322  func netpollready(toRun *gList, pd *pollDesc, mode int32) {
   323  	var rg, wg *g
   324  	if mode == 'r' || mode == 'r'+'w' {
   325  		rg = netpollunblock(pd, 'r', true)
   326  	}
   327  	if mode == 'w' || mode == 'r'+'w' {
   328  		wg = netpollunblock(pd, 'w', true)
   329  	}
   330  	if rg != nil {
   331  		toRun.push(rg)
   332  	}
   333  	if wg != nil {
   334  		toRun.push(wg)
   335  	}
   336  }
   337  
   338  func netpollcheckerr(pd *pollDesc, mode int32) int {
   339  	if pd.closing {
   340  		return 1 // ErrFileClosing or ErrNetClosing
   341  	}
   342  	if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
   343  		return 2 // ErrTimeout
   344  	}
   345  	// Report an event scanning error only on a read event.
   346  	// An error on a write event will be captured in a subsequent
   347  	// write call that is able to report a more specific error.
   348  	if mode == 'r' && pd.everr {
   349  		return 3 // ErrNotPollable
   350  	}
   351  	return 0
   352  }
   353  
   354  func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
   355  	r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
   356  	if r {
   357  		// Bump the count of goroutines waiting for the poller.
   358  		// The scheduler uses this to decide whether to block
   359  		// waiting for the poller if there is nothing else to do.
   360  		atomic.Xadd(&netpollWaiters, 1)
   361  	}
   362  	return r
   363  }
   364  
   365  func netpollgoready(gp *g, traceskip int) {
   366  	atomic.Xadd(&netpollWaiters, -1)
   367  	goready(gp, traceskip+1)
   368  }
   369  
   370  // returns true if IO is ready, or false if timedout or closed
   371  // waitio - wait only for completed IO, ignore errors
   372  func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
   373  	gpp := &pd.rg
   374  	if mode == 'w' {
   375  		gpp = &pd.wg
   376  	}
   377  
   378  	// set the gpp semaphore to WAIT
   379  	for {
   380  		old := *gpp
   381  		if old == pdReady {
   382  			*gpp = 0
   383  			return true
   384  		}
   385  		if old != 0 {
   386  			throw("runtime: double wait")
   387  		}
   388  		if atomic.Casuintptr(gpp, 0, pdWait) {
   389  			break
   390  		}
   391  	}
   392  
   393  	// need to recheck error states after setting gpp to WAIT
   394  	// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
   395  	// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
   396  	if waitio || netpollcheckerr(pd, mode) == 0 {
   397  		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
   398  	}
   399  	// be careful to not lose concurrent READY notification
   400  	old := atomic.Xchguintptr(gpp, 0)
   401  	if old > pdWait {
   402  		throw("runtime: corrupted polldesc")
   403  	}
   404  	return old == pdReady
   405  }
   406  
   407  func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
   408  	gpp := &pd.rg
   409  	if mode == 'w' {
   410  		gpp = &pd.wg
   411  	}
   412  
   413  	for {
   414  		old := *gpp
   415  		if old == pdReady {
   416  			return nil
   417  		}
   418  		if old == 0 && !ioready {
   419  			// Only set READY for ioready. runtime_pollWait
   420  			// will check for timeout/cancel before waiting.
   421  			return nil
   422  		}
   423  		var new uintptr
   424  		if ioready {
   425  			new = pdReady
   426  		}
   427  		if atomic.Casuintptr(gpp, old, new) {
   428  			if old == pdReady || old == pdWait {
   429  				old = 0
   430  			}
   431  			return (*g)(unsafe.Pointer(old))
   432  		}
   433  	}
   434  }
   435  
   436  func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
   437  	lock(&pd.lock)
   438  	// Seq arg is seq when the timer was set.
   439  	// If it's stale, ignore the timer event.
   440  	currentSeq := pd.rseq
   441  	if !read {
   442  		currentSeq = pd.wseq
   443  	}
   444  	if seq != currentSeq {
   445  		// The descriptor was reused or timers were reset.
   446  		unlock(&pd.lock)
   447  		return
   448  	}
   449  	var rg *g
   450  	if read {
   451  		if pd.rd <= 0 || pd.rt.f == nil {
   452  			throw("runtime: inconsistent read deadline")
   453  		}
   454  		pd.rd = -1
   455  		atomic.StorepNoWB(unsafe.Pointer(&pd.rt.f), nil) // full memory barrier between store to rd and load of rg in netpollunblock
   456  		rg = netpollunblock(pd, 'r', false)
   457  	}
   458  	var wg *g
   459  	if write {
   460  		if pd.wd <= 0 || pd.wt.f == nil && !read {
   461  			throw("runtime: inconsistent write deadline")
   462  		}
   463  		pd.wd = -1
   464  		atomic.StorepNoWB(unsafe.Pointer(&pd.wt.f), nil) // full memory barrier between store to wd and load of wg in netpollunblock
   465  		wg = netpollunblock(pd, 'w', false)
   466  	}
   467  	unlock(&pd.lock)
   468  	if rg != nil {
   469  		netpollgoready(rg, 0)
   470  	}
   471  	if wg != nil {
   472  		netpollgoready(wg, 0)
   473  	}
   474  }
   475  
   476  func netpollDeadline(arg interface{}, seq uintptr) {
   477  	netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
   478  }
   479  
   480  func netpollReadDeadline(arg interface{}, seq uintptr) {
   481  	netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
   482  }
   483  
   484  func netpollWriteDeadline(arg interface{}, seq uintptr) {
   485  	netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
   486  }
   487  
   488  func (c *pollCache) alloc() *pollDesc {
   489  	lock(&c.lock)
   490  	if c.first == nil {
   491  		const pdSize = unsafe.Sizeof(pollDesc{})
   492  		n := pollBlockSize / pdSize
   493  		if n == 0 {
   494  			n = 1
   495  		}
   496  		// Must be in non-GC memory because can be referenced
   497  		// only from epoll/kqueue internals.
   498  		mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
   499  		for i := uintptr(0); i < n; i++ {
   500  			pd := (*pollDesc)(add(mem, i*pdSize))
   501  			pd.link = c.first
   502  			c.first = pd
   503  		}
   504  	}
   505  	pd := c.first
   506  	c.first = pd.link
   507  	unlock(&c.lock)
   508  	return pd
   509  }
   510  

View as plain text