...
Run Format

Source file src/runtime/netpoll.go

     1	// Copyright 2013 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows
     6	
     7	package runtime
     8	
     9	import (
    10		"runtime/internal/atomic"
    11		"unsafe"
    12	)
    13	
    14	// Integrated network poller (platform-independent part).
    15	// A particular implementation (epoll/kqueue) must define the following functions:
    16	// func netpollinit()			// to initialize the poller
    17	// func netpollopen(fd uintptr, pd *pollDesc) int32	// to arm edge-triggered notifications
    18	// and associate fd with pd.
    19	// An implementation must call the following function to denote that the pd is ready.
    20	// func netpollready(gpp **g, pd *pollDesc, mode int32)
    21	
    22	// pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
    23	// goroutines respectively. The semaphore can be in the following states:
    24	// pdReady - io readiness notification is pending;
    25	//           a goroutine consumes the notification by changing the state to nil.
    26	// pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
    27	//          the goroutine commits to park by changing the state to G pointer,
    28	//          or, alternatively, concurrent io notification changes the state to READY,
    29	//          or, alternatively, concurrent timeout/close changes the state to nil.
    30	// G pointer - the goroutine is blocked on the semaphore;
    31	//             io notification or timeout/close changes the state to READY or nil respectively
    32	//             and unparks the goroutine.
    33	// nil - nothing of the above.
    34	const (
    35		pdReady uintptr = 1
    36		pdWait  uintptr = 2
    37	)
    38	
    39	const pollBlockSize = 4 * 1024
    40	
    41	// Network poller descriptor.
    42	//
    43	// No heap pointers.
    44	//
    45	//go:notinheap
    46	type pollDesc struct {
    47		link *pollDesc // in pollcache, protected by pollcache.lock
    48	
    49		// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
    50		// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
    51		// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
    52		// proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
    53		// in a lock-free way by all operations.
    54		// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
    55		// that will blow up when GC starts moving objects.
    56		lock    mutex // protects the following fields
    57		fd      uintptr
    58		closing bool
    59		seq     uintptr // protects from stale timers and ready notifications
    60		rg      uintptr // pdReady, pdWait, G waiting for read or nil
    61		rt      timer   // read deadline timer (set if rt.f != nil)
    62		rd      int64   // read deadline
    63		wg      uintptr // pdReady, pdWait, G waiting for write or nil
    64		wt      timer   // write deadline timer
    65		wd      int64   // write deadline
    66		user    uint32  // user settable cookie
    67	}
    68	
    69	type pollCache struct {
    70		lock  mutex
    71		first *pollDesc
    72		// PollDesc objects must be type-stable,
    73		// because we can get ready notification from epoll/kqueue
    74		// after the descriptor is closed/reused.
    75		// Stale notifications are detected using seq variable,
    76		// seq is incremented when deadlines are changed or descriptor is reused.
    77	}
    78	
    79	var (
    80		netpollInited uint32
    81		pollcache     pollCache
    82	)
    83	
    84	//go:linkname net_runtime_pollServerInit net.runtime_pollServerInit
    85	func net_runtime_pollServerInit() {
    86		netpollinit()
    87		atomic.Store(&netpollInited, 1)
    88	}
    89	
    90	func netpollinited() bool {
    91		return atomic.Load(&netpollInited) != 0
    92	}
    93	
    94	//go:linkname net_runtime_pollOpen net.runtime_pollOpen
    95	func net_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    96		pd := pollcache.alloc()
    97		lock(&pd.lock)
    98		if pd.wg != 0 && pd.wg != pdReady {
    99			throw("netpollOpen: blocked write on free descriptor")
   100		}
   101		if pd.rg != 0 && pd.rg != pdReady {
   102			throw("netpollOpen: blocked read on free descriptor")
   103		}
   104		pd.fd = fd
   105		pd.closing = false
   106		pd.seq++
   107		pd.rg = 0
   108		pd.rd = 0
   109		pd.wg = 0
   110		pd.wd = 0
   111		unlock(&pd.lock)
   112	
   113		var errno int32
   114		errno = netpollopen(fd, pd)
   115		return pd, int(errno)
   116	}
   117	
   118	//go:linkname net_runtime_pollClose net.runtime_pollClose
   119	func net_runtime_pollClose(pd *pollDesc) {
   120		if !pd.closing {
   121			throw("netpollClose: close w/o unblock")
   122		}
   123		if pd.wg != 0 && pd.wg != pdReady {
   124			throw("netpollClose: blocked write on closing descriptor")
   125		}
   126		if pd.rg != 0 && pd.rg != pdReady {
   127			throw("netpollClose: blocked read on closing descriptor")
   128		}
   129		netpollclose(pd.fd)
   130		pollcache.free(pd)
   131	}
   132	
   133	func (c *pollCache) free(pd *pollDesc) {
   134		lock(&c.lock)
   135		pd.link = c.first
   136		c.first = pd
   137		unlock(&c.lock)
   138	}
   139	
   140	//go:linkname net_runtime_pollReset net.runtime_pollReset
   141	func net_runtime_pollReset(pd *pollDesc, mode int) int {
   142		err := netpollcheckerr(pd, int32(mode))
   143		if err != 0 {
   144			return err
   145		}
   146		if mode == 'r' {
   147			pd.rg = 0
   148		} else if mode == 'w' {
   149			pd.wg = 0
   150		}
   151		return 0
   152	}
   153	
   154	//go:linkname net_runtime_pollWait net.runtime_pollWait
   155	func net_runtime_pollWait(pd *pollDesc, mode int) int {
   156		err := netpollcheckerr(pd, int32(mode))
   157		if err != 0 {
   158			return err
   159		}
   160		// As for now only Solaris uses level-triggered IO.
   161		if GOOS == "solaris" {
   162			netpollarm(pd, mode)
   163		}
   164		for !netpollblock(pd, int32(mode), false) {
   165			err = netpollcheckerr(pd, int32(mode))
   166			if err != 0 {
   167				return err
   168			}
   169			// Can happen if timeout has fired and unblocked us,
   170			// but before we had a chance to run, timeout has been reset.
   171			// Pretend it has not happened and retry.
   172		}
   173		return 0
   174	}
   175	
   176	//go:linkname net_runtime_pollWaitCanceled net.runtime_pollWaitCanceled
   177	func net_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
   178		// This function is used only on windows after a failed attempt to cancel
   179		// a pending async IO operation. Wait for ioready, ignore closing or timeouts.
   180		for !netpollblock(pd, int32(mode), true) {
   181		}
   182	}
   183	
   184	//go:linkname net_runtime_pollSetDeadline net.runtime_pollSetDeadline
   185	func net_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
   186		lock(&pd.lock)
   187		if pd.closing {
   188			unlock(&pd.lock)
   189			return
   190		}
   191		pd.seq++ // invalidate current timers
   192		// Reset current timers.
   193		if pd.rt.f != nil {
   194			deltimer(&pd.rt)
   195			pd.rt.f = nil
   196		}
   197		if pd.wt.f != nil {
   198			deltimer(&pd.wt)
   199			pd.wt.f = nil
   200		}
   201		// Setup new timers.
   202		if d != 0 && d <= nanotime() {
   203			d = -1
   204		}
   205		if mode == 'r' || mode == 'r'+'w' {
   206			pd.rd = d
   207		}
   208		if mode == 'w' || mode == 'r'+'w' {
   209			pd.wd = d
   210		}
   211		if pd.rd > 0 && pd.rd == pd.wd {
   212			pd.rt.f = netpollDeadline
   213			pd.rt.when = pd.rd
   214			// Copy current seq into the timer arg.
   215			// Timer func will check the seq against current descriptor seq,
   216			// if they differ the descriptor was reused or timers were reset.
   217			pd.rt.arg = pd
   218			pd.rt.seq = pd.seq
   219			addtimer(&pd.rt)
   220		} else {
   221			if pd.rd > 0 {
   222				pd.rt.f = netpollReadDeadline
   223				pd.rt.when = pd.rd
   224				pd.rt.arg = pd
   225				pd.rt.seq = pd.seq
   226				addtimer(&pd.rt)
   227			}
   228			if pd.wd > 0 {
   229				pd.wt.f = netpollWriteDeadline
   230				pd.wt.when = pd.wd
   231				pd.wt.arg = pd
   232				pd.wt.seq = pd.seq
   233				addtimer(&pd.wt)
   234			}
   235		}
   236		// If we set the new deadline in the past, unblock currently pending IO if any.
   237		var rg, wg *g
   238		atomicstorep(unsafe.Pointer(&wg), nil) // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
   239		if pd.rd < 0 {
   240			rg = netpollunblock(pd, 'r', false)
   241		}
   242		if pd.wd < 0 {
   243			wg = netpollunblock(pd, 'w', false)
   244		}
   245		unlock(&pd.lock)
   246		if rg != nil {
   247			goready(rg, 3)
   248		}
   249		if wg != nil {
   250			goready(wg, 3)
   251		}
   252	}
   253	
   254	//go:linkname net_runtime_pollUnblock net.runtime_pollUnblock
   255	func net_runtime_pollUnblock(pd *pollDesc) {
   256		lock(&pd.lock)
   257		if pd.closing {
   258			throw("netpollUnblock: already closing")
   259		}
   260		pd.closing = true
   261		pd.seq++
   262		var rg, wg *g
   263		atomicstorep(unsafe.Pointer(&rg), nil) // full memory barrier between store to closing and read of rg/wg in netpollunblock
   264		rg = netpollunblock(pd, 'r', false)
   265		wg = netpollunblock(pd, 'w', false)
   266		if pd.rt.f != nil {
   267			deltimer(&pd.rt)
   268			pd.rt.f = nil
   269		}
   270		if pd.wt.f != nil {
   271			deltimer(&pd.wt)
   272			pd.wt.f = nil
   273		}
   274		unlock(&pd.lock)
   275		if rg != nil {
   276			goready(rg, 3)
   277		}
   278		if wg != nil {
   279			goready(wg, 3)
   280		}
   281	}
   282	
   283	// make pd ready, newly runnable goroutines (if any) are returned in rg/wg
   284	// May run during STW, so write barriers are not allowed.
   285	//go:nowritebarrier
   286	func netpollready(gpp *guintptr, pd *pollDesc, mode int32) {
   287		var rg, wg guintptr
   288		if mode == 'r' || mode == 'r'+'w' {
   289			rg.set(netpollunblock(pd, 'r', true))
   290		}
   291		if mode == 'w' || mode == 'r'+'w' {
   292			wg.set(netpollunblock(pd, 'w', true))
   293		}
   294		if rg != 0 {
   295			rg.ptr().schedlink = *gpp
   296			*gpp = rg
   297		}
   298		if wg != 0 {
   299			wg.ptr().schedlink = *gpp
   300			*gpp = wg
   301		}
   302	}
   303	
   304	func netpollcheckerr(pd *pollDesc, mode int32) int {
   305		if pd.closing {
   306			return 1 // errClosing
   307		}
   308		if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
   309			return 2 // errTimeout
   310		}
   311		return 0
   312	}
   313	
   314	func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
   315		return atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
   316	}
   317	
   318	// returns true if IO is ready, or false if timedout or closed
   319	// waitio - wait only for completed IO, ignore errors
   320	func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
   321		gpp := &pd.rg
   322		if mode == 'w' {
   323			gpp = &pd.wg
   324		}
   325	
   326		// set the gpp semaphore to WAIT
   327		for {
   328			old := *gpp
   329			if old == pdReady {
   330				*gpp = 0
   331				return true
   332			}
   333			if old != 0 {
   334				throw("netpollblock: double wait")
   335			}
   336			if atomic.Casuintptr(gpp, 0, pdWait) {
   337				break
   338			}
   339		}
   340	
   341		// need to recheck error states after setting gpp to WAIT
   342		// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
   343		// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
   344		if waitio || netpollcheckerr(pd, mode) == 0 {
   345			gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
   346		}
   347		// be careful to not lose concurrent READY notification
   348		old := atomic.Xchguintptr(gpp, 0)
   349		if old > pdWait {
   350			throw("netpollblock: corrupted state")
   351		}
   352		return old == pdReady
   353	}
   354	
   355	func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
   356		gpp := &pd.rg
   357		if mode == 'w' {
   358			gpp = &pd.wg
   359		}
   360	
   361		for {
   362			old := *gpp
   363			if old == pdReady {
   364				return nil
   365			}
   366			if old == 0 && !ioready {
   367				// Only set READY for ioready. runtime_pollWait
   368				// will check for timeout/cancel before waiting.
   369				return nil
   370			}
   371			var new uintptr
   372			if ioready {
   373				new = pdReady
   374			}
   375			if atomic.Casuintptr(gpp, old, new) {
   376				if old == pdReady || old == pdWait {
   377					old = 0
   378				}
   379				return (*g)(unsafe.Pointer(old))
   380			}
   381		}
   382	}
   383	
   384	func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
   385		lock(&pd.lock)
   386		// Seq arg is seq when the timer was set.
   387		// If it's stale, ignore the timer event.
   388		if seq != pd.seq {
   389			// The descriptor was reused or timers were reset.
   390			unlock(&pd.lock)
   391			return
   392		}
   393		var rg *g
   394		if read {
   395			if pd.rd <= 0 || pd.rt.f == nil {
   396				throw("netpolldeadlineimpl: inconsistent read deadline")
   397			}
   398			pd.rd = -1
   399			atomicstorep(unsafe.Pointer(&pd.rt.f), nil) // full memory barrier between store to rd and load of rg in netpollunblock
   400			rg = netpollunblock(pd, 'r', false)
   401		}
   402		var wg *g
   403		if write {
   404			if pd.wd <= 0 || pd.wt.f == nil && !read {
   405				throw("netpolldeadlineimpl: inconsistent write deadline")
   406			}
   407			pd.wd = -1
   408			atomicstorep(unsafe.Pointer(&pd.wt.f), nil) // full memory barrier between store to wd and load of wg in netpollunblock
   409			wg = netpollunblock(pd, 'w', false)
   410		}
   411		unlock(&pd.lock)
   412		if rg != nil {
   413			goready(rg, 0)
   414		}
   415		if wg != nil {
   416			goready(wg, 0)
   417		}
   418	}
   419	
   420	func netpollDeadline(arg interface{}, seq uintptr) {
   421		netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
   422	}
   423	
   424	func netpollReadDeadline(arg interface{}, seq uintptr) {
   425		netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
   426	}
   427	
   428	func netpollWriteDeadline(arg interface{}, seq uintptr) {
   429		netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
   430	}
   431	
   432	func (c *pollCache) alloc() *pollDesc {
   433		lock(&c.lock)
   434		if c.first == nil {
   435			const pdSize = unsafe.Sizeof(pollDesc{})
   436			n := pollBlockSize / pdSize
   437			if n == 0 {
   438				n = 1
   439			}
   440			// Must be in non-GC memory because can be referenced
   441			// only from epoll/kqueue internals.
   442			mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
   443			for i := uintptr(0); i < n; i++ {
   444				pd := (*pollDesc)(add(mem, i*pdSize))
   445				pd.link = c.first
   446				c.first = pd
   447			}
   448		}
   449		pd := c.first
   450		c.first = pd.link
   451		unlock(&c.lock)
   452		return pd
   453	}
   454	

View as plain text