...
Run Format

Source file src/runtime/chan.go

Documentation: runtime

     1  // Copyright 2014 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package runtime
     6  
     7  // This file contains the implementation of Go channels.
     8  
     9  // Invariants:
    10  //  At least one of c.sendq and c.recvq is empty,
    11  //  except for the case of an unbuffered channel with a single goroutine
    12  //  blocked on it for both sending and receiving using a select statement,
    13  //  in which case the length of c.sendq and c.recvq is limited only by the
    14  //  size of the select statement.
    15  //
    16  // For buffered channels, also:
    17  //  c.qcount > 0 implies that c.recvq is empty.
    18  //  c.qcount < c.dataqsiz implies that c.sendq is empty.
    19  
    20  import (
    21  	"runtime/internal/atomic"
    22  	"runtime/internal/math"
    23  	"unsafe"
    24  )
    25  
    26  const (
    27  	maxAlign  = 8
    28  	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    29  	debugChan = false
    30  )
    31  
    32  type hchan struct {
    33  	qcount   uint           // total data in the queue
    34  	dataqsiz uint           // size of the circular queue
    35  	buf      unsafe.Pointer // points to an array of dataqsiz elements
    36  	elemsize uint16
    37  	closed   uint32
    38  	elemtype *_type // element type
    39  	sendx    uint   // send index
    40  	recvx    uint   // receive index
    41  	recvq    waitq  // list of recv waiters
    42  	sendq    waitq  // list of send waiters
    43  
    44  	// lock protects all fields in hchan, as well as several
    45  	// fields in sudogs blocked on this channel.
    46  	//
    47  	// Do not change another G's status while holding this lock
    48  	// (in particular, do not ready a G), as this can deadlock
    49  	// with stack shrinking.
    50  	lock mutex
    51  }
    52  
    53  type waitq struct {
    54  	first *sudog
    55  	last  *sudog
    56  }
    57  
    58  //go:linkname reflect_makechan reflect.makechan
    59  func reflect_makechan(t *chantype, size int) *hchan {
    60  	return makechan(t, size)
    61  }
    62  
    63  func makechan64(t *chantype, size int64) *hchan {
    64  	if int64(int(size)) != size {
    65  		panic(plainError("makechan: size out of range"))
    66  	}
    67  
    68  	return makechan(t, int(size))
    69  }
    70  
    71  func makechan(t *chantype, size int) *hchan {
    72  	elem := t.elem
    73  
    74  	// compiler checks this but be safe.
    75  	if elem.size >= 1<<16 {
    76  		throw("makechan: invalid channel element type")
    77  	}
    78  	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    79  		throw("makechan: bad alignment")
    80  	}
    81  
    82  	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    83  	if overflow || mem > maxAlloc-hchanSize || size < 0 {
    84  		panic(plainError("makechan: size out of range"))
    85  	}
    86  
    87  	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    88  	// buf points into the same allocation, elemtype is persistent.
    89  	// SudoG's are referenced from their owning thread so they can't be collected.
    90  	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    91  	var c *hchan
    92  	switch {
    93  	case mem == 0:
    94  		// Queue or element size is zero.
    95  		c = (*hchan)(mallocgc(hchanSize, nil, true))
    96  		// Race detector uses this location for synchronization.
    97  		c.buf = c.raceaddr()
    98  	case elem.kind&kindNoPointers != 0:
    99  		// Elements do not contain pointers.
   100  		// Allocate hchan and buf in one call.
   101  		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
   102  		c.buf = add(unsafe.Pointer(c), hchanSize)
   103  	default:
   104  		// Elements contain pointers.
   105  		c = new(hchan)
   106  		c.buf = mallocgc(mem, elem, true)
   107  	}
   108  
   109  	c.elemsize = uint16(elem.size)
   110  	c.elemtype = elem
   111  	c.dataqsiz = uint(size)
   112  
   113  	if debugChan {
   114  		print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
   115  	}
   116  	return c
   117  }
   118  
   119  // chanbuf(c, i) is pointer to the i'th slot in the buffer.
   120  func chanbuf(c *hchan, i uint) unsafe.Pointer {
   121  	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
   122  }
   123  
   124  // entry point for c <- x from compiled code
   125  //go:nosplit
   126  func chansend1(c *hchan, elem unsafe.Pointer) {
   127  	chansend(c, elem, true, getcallerpc())
   128  }
   129  
   130  /*
   131   * generic single channel send/recv
   132   * If block is not nil,
   133   * then the protocol will not
   134   * sleep but return if it could
   135   * not complete.
   136   *
   137   * sleep can wake up with g.param == nil
   138   * when a channel involved in the sleep has
   139   * been closed.  it is easiest to loop and re-run
   140   * the operation; we'll see that it's now closed.
   141   */
   142  func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   143  	if c == nil {
   144  		if !block {
   145  			return false
   146  		}
   147  		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
   148  		throw("unreachable")
   149  	}
   150  
   151  	if debugChan {
   152  		print("chansend: chan=", c, "\n")
   153  	}
   154  
   155  	if raceenabled {
   156  		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
   157  	}
   158  
   159  	// Fast path: check for failed non-blocking operation without acquiring the lock.
   160  	//
   161  	// After observing that the channel is not closed, we observe that the channel is
   162  	// not ready for sending. Each of these observations is a single word-sized read
   163  	// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
   164  	// Because a closed channel cannot transition from 'ready for sending' to
   165  	// 'not ready for sending', even if the channel is closed between the two observations,
   166  	// they imply a moment between the two when the channel was both not yet closed
   167  	// and not ready for sending. We behave as if we observed the channel at that moment,
   168  	// and report that the send cannot proceed.
   169  	//
   170  	// It is okay if the reads are reordered here: if we observe that the channel is not
   171  	// ready for sending and then observe that it is not closed, that implies that the
   172  	// channel wasn't closed during the first observation.
   173  	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
   174  		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
   175  		return false
   176  	}
   177  
   178  	var t0 int64
   179  	if blockprofilerate > 0 {
   180  		t0 = cputicks()
   181  	}
   182  
   183  	lock(&c.lock)
   184  
   185  	if c.closed != 0 {
   186  		unlock(&c.lock)
   187  		panic(plainError("send on closed channel"))
   188  	}
   189  
   190  	if sg := c.recvq.dequeue(); sg != nil {
   191  		// Found a waiting receiver. We pass the value we want to send
   192  		// directly to the receiver, bypassing the channel buffer (if any).
   193  		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
   194  		return true
   195  	}
   196  
   197  	if c.qcount < c.dataqsiz {
   198  		// Space is available in the channel buffer. Enqueue the element to send.
   199  		qp := chanbuf(c, c.sendx)
   200  		if raceenabled {
   201  			raceacquire(qp)
   202  			racerelease(qp)
   203  		}
   204  		typedmemmove(c.elemtype, qp, ep)
   205  		c.sendx++
   206  		if c.sendx == c.dataqsiz {
   207  			c.sendx = 0
   208  		}
   209  		c.qcount++
   210  		unlock(&c.lock)
   211  		return true
   212  	}
   213  
   214  	if !block {
   215  		unlock(&c.lock)
   216  		return false
   217  	}
   218  
   219  	// Block on the channel. Some receiver will complete our operation for us.
   220  	gp := getg()
   221  	mysg := acquireSudog()
   222  	mysg.releasetime = 0
   223  	if t0 != 0 {
   224  		mysg.releasetime = -1
   225  	}
   226  	// No stack splits between assigning elem and enqueuing mysg
   227  	// on gp.waiting where copystack can find it.
   228  	mysg.elem = ep
   229  	mysg.waitlink = nil
   230  	mysg.g = gp
   231  	mysg.isSelect = false
   232  	mysg.c = c
   233  	gp.waiting = mysg
   234  	gp.param = nil
   235  	c.sendq.enqueue(mysg)
   236  	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
   237  	// Ensure the value being sent is kept alive until the
   238  	// receiver copies it out. The sudog has a pointer to the
   239  	// stack object, but sudogs aren't considered as roots of the
   240  	// stack tracer.
   241  	KeepAlive(ep)
   242  
   243  	// someone woke us up.
   244  	if mysg != gp.waiting {
   245  		throw("G waiting list is corrupted")
   246  	}
   247  	gp.waiting = nil
   248  	if gp.param == nil {
   249  		if c.closed == 0 {
   250  			throw("chansend: spurious wakeup")
   251  		}
   252  		panic(plainError("send on closed channel"))
   253  	}
   254  	gp.param = nil
   255  	if mysg.releasetime > 0 {
   256  		blockevent(mysg.releasetime-t0, 2)
   257  	}
   258  	mysg.c = nil
   259  	releaseSudog(mysg)
   260  	return true
   261  }
   262  
   263  // send processes a send operation on an empty channel c.
   264  // The value ep sent by the sender is copied to the receiver sg.
   265  // The receiver is then woken up to go on its merry way.
   266  // Channel c must be empty and locked.  send unlocks c with unlockf.
   267  // sg must already be dequeued from c.
   268  // ep must be non-nil and point to the heap or the caller's stack.
   269  func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   270  	if raceenabled {
   271  		if c.dataqsiz == 0 {
   272  			racesync(c, sg)
   273  		} else {
   274  			// Pretend we go through the buffer, even though
   275  			// we copy directly. Note that we need to increment
   276  			// the head/tail locations only when raceenabled.
   277  			qp := chanbuf(c, c.recvx)
   278  			raceacquire(qp)
   279  			racerelease(qp)
   280  			raceacquireg(sg.g, qp)
   281  			racereleaseg(sg.g, qp)
   282  			c.recvx++
   283  			if c.recvx == c.dataqsiz {
   284  				c.recvx = 0
   285  			}
   286  			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   287  		}
   288  	}
   289  	if sg.elem != nil {
   290  		sendDirect(c.elemtype, sg, ep)
   291  		sg.elem = nil
   292  	}
   293  	gp := sg.g
   294  	unlockf()
   295  	gp.param = unsafe.Pointer(sg)
   296  	if sg.releasetime != 0 {
   297  		sg.releasetime = cputicks()
   298  	}
   299  	goready(gp, skip+1)
   300  }
   301  
   302  // Sends and receives on unbuffered or empty-buffered channels are the
   303  // only operations where one running goroutine writes to the stack of
   304  // another running goroutine. The GC assumes that stack writes only
   305  // happen when the goroutine is running and are only done by that
   306  // goroutine. Using a write barrier is sufficient to make up for
   307  // violating that assumption, but the write barrier has to work.
   308  // typedmemmove will call bulkBarrierPreWrite, but the target bytes
   309  // are not in the heap, so that will not help. We arrange to call
   310  // memmove and typeBitsBulkBarrier instead.
   311  
   312  func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
   313  	// src is on our stack, dst is a slot on another stack.
   314  
   315  	// Once we read sg.elem out of sg, it will no longer
   316  	// be updated if the destination's stack gets copied (shrunk).
   317  	// So make sure that no preemption points can happen between read & use.
   318  	dst := sg.elem
   319  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
   320  	// No need for cgo write barrier checks because dst is always
   321  	// Go memory.
   322  	memmove(dst, src, t.size)
   323  }
   324  
   325  func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
   326  	// dst is on our stack or the heap, src is on another stack.
   327  	// The channel is locked, so src will not move during this
   328  	// operation.
   329  	src := sg.elem
   330  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
   331  	memmove(dst, src, t.size)
   332  }
   333  
   334  func closechan(c *hchan) {
   335  	if c == nil {
   336  		panic(plainError("close of nil channel"))
   337  	}
   338  
   339  	lock(&c.lock)
   340  	if c.closed != 0 {
   341  		unlock(&c.lock)
   342  		panic(plainError("close of closed channel"))
   343  	}
   344  
   345  	if raceenabled {
   346  		callerpc := getcallerpc()
   347  		racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
   348  		racerelease(c.raceaddr())
   349  	}
   350  
   351  	c.closed = 1
   352  
   353  	var glist gList
   354  
   355  	// release all readers
   356  	for {
   357  		sg := c.recvq.dequeue()
   358  		if sg == nil {
   359  			break
   360  		}
   361  		if sg.elem != nil {
   362  			typedmemclr(c.elemtype, sg.elem)
   363  			sg.elem = nil
   364  		}
   365  		if sg.releasetime != 0 {
   366  			sg.releasetime = cputicks()
   367  		}
   368  		gp := sg.g
   369  		gp.param = nil
   370  		if raceenabled {
   371  			raceacquireg(gp, c.raceaddr())
   372  		}
   373  		glist.push(gp)
   374  	}
   375  
   376  	// release all writers (they will panic)
   377  	for {
   378  		sg := c.sendq.dequeue()
   379  		if sg == nil {
   380  			break
   381  		}
   382  		sg.elem = nil
   383  		if sg.releasetime != 0 {
   384  			sg.releasetime = cputicks()
   385  		}
   386  		gp := sg.g
   387  		gp.param = nil
   388  		if raceenabled {
   389  			raceacquireg(gp, c.raceaddr())
   390  		}
   391  		glist.push(gp)
   392  	}
   393  	unlock(&c.lock)
   394  
   395  	// Ready all Gs now that we've dropped the channel lock.
   396  	for !glist.empty() {
   397  		gp := glist.pop()
   398  		gp.schedlink = 0
   399  		goready(gp, 3)
   400  	}
   401  }
   402  
   403  // entry points for <- c from compiled code
   404  //go:nosplit
   405  func chanrecv1(c *hchan, elem unsafe.Pointer) {
   406  	chanrecv(c, elem, true)
   407  }
   408  
   409  //go:nosplit
   410  func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
   411  	_, received = chanrecv(c, elem, true)
   412  	return
   413  }
   414  
   415  // chanrecv receives on channel c and writes the received data to ep.
   416  // ep may be nil, in which case received data is ignored.
   417  // If block == false and no elements are available, returns (false, false).
   418  // Otherwise, if c is closed, zeros *ep and returns (true, false).
   419  // Otherwise, fills in *ep with an element and returns (true, true).
   420  // A non-nil ep must point to the heap or the caller's stack.
   421  func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   422  	// raceenabled: don't need to check ep, as it is always on the stack
   423  	// or is new memory allocated by reflect.
   424  
   425  	if debugChan {
   426  		print("chanrecv: chan=", c, "\n")
   427  	}
   428  
   429  	if c == nil {
   430  		if !block {
   431  			return
   432  		}
   433  		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
   434  		throw("unreachable")
   435  	}
   436  
   437  	// Fast path: check for failed non-blocking operation without acquiring the lock.
   438  	//
   439  	// After observing that the channel is not ready for receiving, we observe that the
   440  	// channel is not closed. Each of these observations is a single word-sized read
   441  	// (first c.sendq.first or c.qcount, and second c.closed).
   442  	// Because a channel cannot be reopened, the later observation of the channel
   443  	// being not closed implies that it was also not closed at the moment of the
   444  	// first observation. We behave as if we observed the channel at that moment
   445  	// and report that the receive cannot proceed.
   446  	//
   447  	// The order of operations is important here: reversing the operations can lead to
   448  	// incorrect behavior when racing with a close.
   449  	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
   450  		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
   451  		atomic.Load(&c.closed) == 0 {
   452  		return
   453  	}
   454  
   455  	var t0 int64
   456  	if blockprofilerate > 0 {
   457  		t0 = cputicks()
   458  	}
   459  
   460  	lock(&c.lock)
   461  
   462  	if c.closed != 0 && c.qcount == 0 {
   463  		if raceenabled {
   464  			raceacquire(c.raceaddr())
   465  		}
   466  		unlock(&c.lock)
   467  		if ep != nil {
   468  			typedmemclr(c.elemtype, ep)
   469  		}
   470  		return true, false
   471  	}
   472  
   473  	if sg := c.sendq.dequeue(); sg != nil {
   474  		// Found a waiting sender. If buffer is size 0, receive value
   475  		// directly from sender. Otherwise, receive from head of queue
   476  		// and add sender's value to the tail of the queue (both map to
   477  		// the same buffer slot because the queue is full).
   478  		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
   479  		return true, true
   480  	}
   481  
   482  	if c.qcount > 0 {
   483  		// Receive directly from queue
   484  		qp := chanbuf(c, c.recvx)
   485  		if raceenabled {
   486  			raceacquire(qp)
   487  			racerelease(qp)
   488  		}
   489  		if ep != nil {
   490  			typedmemmove(c.elemtype, ep, qp)
   491  		}
   492  		typedmemclr(c.elemtype, qp)
   493  		c.recvx++
   494  		if c.recvx == c.dataqsiz {
   495  			c.recvx = 0
   496  		}
   497  		c.qcount--
   498  		unlock(&c.lock)
   499  		return true, true
   500  	}
   501  
   502  	if !block {
   503  		unlock(&c.lock)
   504  		return false, false
   505  	}
   506  
   507  	// no sender available: block on this channel.
   508  	gp := getg()
   509  	mysg := acquireSudog()
   510  	mysg.releasetime = 0
   511  	if t0 != 0 {
   512  		mysg.releasetime = -1
   513  	}
   514  	// No stack splits between assigning elem and enqueuing mysg
   515  	// on gp.waiting where copystack can find it.
   516  	mysg.elem = ep
   517  	mysg.waitlink = nil
   518  	gp.waiting = mysg
   519  	mysg.g = gp
   520  	mysg.isSelect = false
   521  	mysg.c = c
   522  	gp.param = nil
   523  	c.recvq.enqueue(mysg)
   524  	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
   525  
   526  	// someone woke us up
   527  	if mysg != gp.waiting {
   528  		throw("G waiting list is corrupted")
   529  	}
   530  	gp.waiting = nil
   531  	if mysg.releasetime > 0 {
   532  		blockevent(mysg.releasetime-t0, 2)
   533  	}
   534  	closed := gp.param == nil
   535  	gp.param = nil
   536  	mysg.c = nil
   537  	releaseSudog(mysg)
   538  	return true, !closed
   539  }
   540  
   541  // recv processes a receive operation on a full channel c.
   542  // There are 2 parts:
   543  // 1) The value sent by the sender sg is put into the channel
   544  //    and the sender is woken up to go on its merry way.
   545  // 2) The value received by the receiver (the current G) is
   546  //    written to ep.
   547  // For synchronous channels, both values are the same.
   548  // For asynchronous channels, the receiver gets its data from
   549  // the channel buffer and the sender's data is put in the
   550  // channel buffer.
   551  // Channel c must be full and locked. recv unlocks c with unlockf.
   552  // sg must already be dequeued from c.
   553  // A non-nil ep must point to the heap or the caller's stack.
   554  func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   555  	if c.dataqsiz == 0 {
   556  		if raceenabled {
   557  			racesync(c, sg)
   558  		}
   559  		if ep != nil {
   560  			// copy data from sender
   561  			recvDirect(c.elemtype, sg, ep)
   562  		}
   563  	} else {
   564  		// Queue is full. Take the item at the
   565  		// head of the queue. Make the sender enqueue
   566  		// its item at the tail of the queue. Since the
   567  		// queue is full, those are both the same slot.
   568  		qp := chanbuf(c, c.recvx)
   569  		if raceenabled {
   570  			raceacquire(qp)
   571  			racerelease(qp)
   572  			raceacquireg(sg.g, qp)
   573  			racereleaseg(sg.g, qp)
   574  		}
   575  		// copy data from queue to receiver
   576  		if ep != nil {
   577  			typedmemmove(c.elemtype, ep, qp)
   578  		}
   579  		// copy data from sender to queue
   580  		typedmemmove(c.elemtype, qp, sg.elem)
   581  		c.recvx++
   582  		if c.recvx == c.dataqsiz {
   583  			c.recvx = 0
   584  		}
   585  		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   586  	}
   587  	sg.elem = nil
   588  	gp := sg.g
   589  	unlockf()
   590  	gp.param = unsafe.Pointer(sg)
   591  	if sg.releasetime != 0 {
   592  		sg.releasetime = cputicks()
   593  	}
   594  	goready(gp, skip+1)
   595  }
   596  
   597  // compiler implements
   598  //
   599  //	select {
   600  //	case c <- v:
   601  //		... foo
   602  //	default:
   603  //		... bar
   604  //	}
   605  //
   606  // as
   607  //
   608  //	if selectnbsend(c, v) {
   609  //		... foo
   610  //	} else {
   611  //		... bar
   612  //	}
   613  //
   614  func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
   615  	return chansend(c, elem, false, getcallerpc())
   616  }
   617  
   618  // compiler implements
   619  //
   620  //	select {
   621  //	case v = <-c:
   622  //		... foo
   623  //	default:
   624  //		... bar
   625  //	}
   626  //
   627  // as
   628  //
   629  //	if selectnbrecv(&v, c) {
   630  //		... foo
   631  //	} else {
   632  //		... bar
   633  //	}
   634  //
   635  func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
   636  	selected, _ = chanrecv(c, elem, false)
   637  	return
   638  }
   639  
   640  // compiler implements
   641  //
   642  //	select {
   643  //	case v, ok = <-c:
   644  //		... foo
   645  //	default:
   646  //		... bar
   647  //	}
   648  //
   649  // as
   650  //
   651  //	if c != nil && selectnbrecv2(&v, &ok, c) {
   652  //		... foo
   653  //	} else {
   654  //		... bar
   655  //	}
   656  //
   657  func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
   658  	// TODO(khr): just return 2 values from this function, now that it is in Go.
   659  	selected, *received = chanrecv(c, elem, false)
   660  	return
   661  }
   662  
   663  //go:linkname reflect_chansend reflect.chansend
   664  func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
   665  	return chansend(c, elem, !nb, getcallerpc())
   666  }
   667  
   668  //go:linkname reflect_chanrecv reflect.chanrecv
   669  func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
   670  	return chanrecv(c, elem, !nb)
   671  }
   672  
   673  //go:linkname reflect_chanlen reflect.chanlen
   674  func reflect_chanlen(c *hchan) int {
   675  	if c == nil {
   676  		return 0
   677  	}
   678  	return int(c.qcount)
   679  }
   680  
   681  //go:linkname reflect_chancap reflect.chancap
   682  func reflect_chancap(c *hchan) int {
   683  	if c == nil {
   684  		return 0
   685  	}
   686  	return int(c.dataqsiz)
   687  }
   688  
   689  //go:linkname reflect_chanclose reflect.chanclose
   690  func reflect_chanclose(c *hchan) {
   691  	closechan(c)
   692  }
   693  
   694  func (q *waitq) enqueue(sgp *sudog) {
   695  	sgp.next = nil
   696  	x := q.last
   697  	if x == nil {
   698  		sgp.prev = nil
   699  		q.first = sgp
   700  		q.last = sgp
   701  		return
   702  	}
   703  	sgp.prev = x
   704  	x.next = sgp
   705  	q.last = sgp
   706  }
   707  
   708  func (q *waitq) dequeue() *sudog {
   709  	for {
   710  		sgp := q.first
   711  		if sgp == nil {
   712  			return nil
   713  		}
   714  		y := sgp.next
   715  		if y == nil {
   716  			q.first = nil
   717  			q.last = nil
   718  		} else {
   719  			y.prev = nil
   720  			q.first = y
   721  			sgp.next = nil // mark as removed (see dequeueSudog)
   722  		}
   723  
   724  		// if a goroutine was put on this queue because of a
   725  		// select, there is a small window between the goroutine
   726  		// being woken up by a different case and it grabbing the
   727  		// channel locks. Once it has the lock
   728  		// it removes itself from the queue, so we won't see it after that.
   729  		// We use a flag in the G struct to tell us when someone
   730  		// else has won the race to signal this goroutine but the goroutine
   731  		// hasn't removed itself from the queue yet.
   732  		if sgp.isSelect {
   733  			if !atomic.Cas(&sgp.g.selectDone, 0, 1) {
   734  				continue
   735  			}
   736  		}
   737  
   738  		return sgp
   739  	}
   740  }
   741  
   742  func (c *hchan) raceaddr() unsafe.Pointer {
   743  	// Treat read-like and write-like operations on the channel to
   744  	// happen at this address. Avoid using the address of qcount
   745  	// or dataqsiz, because the len() and cap() builtins read
   746  	// those addresses, and we don't want them racing with
   747  	// operations like close().
   748  	return unsafe.Pointer(&c.buf)
   749  }
   750  
   751  func racesync(c *hchan, sg *sudog) {
   752  	racerelease(chanbuf(c, 0))
   753  	raceacquireg(sg.g, chanbuf(c, 0))
   754  	racereleaseg(sg.g, chanbuf(c, 0))
   755  	raceacquire(chanbuf(c, 0))
   756  }
   757  

View as plain text