...
Run Format

Source file src/runtime/chan.go

Documentation: runtime

  // Copyright 2014 The Go Authors. All rights reserved.
  // Use of this source code is governed by a BSD-style
  // license that can be found in the LICENSE file.
  
  package runtime
  
  // This file contains the implementation of Go channels.
  
  // Invariants:
  //  At least one of c.sendq and c.recvq is empty,
  //  except for the case of an unbuffered channel with a single goroutine
  //  blocked on it for both sending and receiving using a select statement,
  //  in which case the length of c.sendq and c.recvq is limited only by the
  //  size of the select statement.
  //
  // For buffered channels, also:
  //  c.qcount > 0 implies that c.recvq is empty.
  //  c.qcount < c.dataqsiz implies that c.sendq is empty.
  
  import (
  	"runtime/internal/atomic"
  	"unsafe"
  )
  
  const (
  	maxAlign  = 8
  	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
  	debugChan = false
  )
  
  type hchan struct {
  	qcount   uint           // total data in the queue
  	dataqsiz uint           // size of the circular queue
  	buf      unsafe.Pointer // points to an array of dataqsiz elements
  	elemsize uint16
  	closed   uint32
  	elemtype *_type // element type
  	sendx    uint   // send index
  	recvx    uint   // receive index
  	recvq    waitq  // list of recv waiters
  	sendq    waitq  // list of send waiters
  
  	// lock protects all fields in hchan, as well as several
  	// fields in sudogs blocked on this channel.
  	//
  	// Do not change another G's status while holding this lock
  	// (in particular, do not ready a G), as this can deadlock
  	// with stack shrinking.
  	lock mutex
  }
  
  type waitq struct {
  	first *sudog
  	last  *sudog
  }
  
  //go:linkname reflect_makechan reflect.makechan
  func reflect_makechan(t *chantype, size int64) *hchan {
  	return makechan(t, size)
  }
  
  func makechan(t *chantype, size int64) *hchan {
  	elem := t.elem
  
  	// compiler checks this but be safe.
  	if elem.size >= 1<<16 {
  		throw("makechan: invalid channel element type")
  	}
  	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
  		throw("makechan: bad alignment")
  	}
  	if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
  		panic(plainError("makechan: size out of range"))
  	}
  
  	var c *hchan
  	if elem.kind&kindNoPointers != 0 || size == 0 {
  		// Allocate memory in one call.
  		// Hchan does not contain pointers interesting for GC in this case:
  		// buf points into the same allocation, elemtype is persistent.
  		// SudoG's are referenced from their owning thread so they can't be collected.
  		// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
  		c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
  		if size > 0 && elem.size != 0 {
  			c.buf = add(unsafe.Pointer(c), hchanSize)
  		} else {
  			// race detector uses this location for synchronization
  			// Also prevents us from pointing beyond the allocation (see issue 9401).
  			c.buf = unsafe.Pointer(c)
  		}
  	} else {
  		c = new(hchan)
  		c.buf = newarray(elem, int(size))
  	}
  	c.elemsize = uint16(elem.size)
  	c.elemtype = elem
  	c.dataqsiz = uint(size)
  
  	if debugChan {
  		print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
  	}
  	return c
  }
  
  // chanbuf(c, i) is pointer to the i'th slot in the buffer.
  func chanbuf(c *hchan, i uint) unsafe.Pointer {
  	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
  }
  
  // entry point for c <- x from compiled code
  //go:nosplit
  func chansend1(t *chantype, c *hchan, elem unsafe.Pointer) {
  	chansend(t, c, elem, true, getcallerpc(unsafe.Pointer(&t)))
  }
  
  /*
   * generic single channel send/recv
   * If block is not nil,
   * then the protocol will not
   * sleep but return if it could
   * not complete.
   *
   * sleep can wake up with g.param == nil
   * when a channel involved in the sleep has
   * been closed.  it is easiest to loop and re-run
   * the operation; we'll see that it's now closed.
   */
  func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  	if raceenabled {
  		raceReadObjectPC(t.elem, ep, callerpc, funcPC(chansend))
  	}
  	if msanenabled {
  		msanread(ep, t.elem.size)
  	}
  
  	if c == nil {
  		if !block {
  			return false
  		}
  		gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
  		throw("unreachable")
  	}
  
  	if debugChan {
  		print("chansend: chan=", c, "\n")
  	}
  
  	if raceenabled {
  		racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
  	}
  
  	// Fast path: check for failed non-blocking operation without acquiring the lock.
  	//
  	// After observing that the channel is not closed, we observe that the channel is
  	// not ready for sending. Each of these observations is a single word-sized read
  	// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
  	// Because a closed channel cannot transition from 'ready for sending' to
  	// 'not ready for sending', even if the channel is closed between the two observations,
  	// they imply a moment between the two when the channel was both not yet closed
  	// and not ready for sending. We behave as if we observed the channel at that moment,
  	// and report that the send cannot proceed.
  	//
  	// It is okay if the reads are reordered here: if we observe that the channel is not
  	// ready for sending and then observe that it is not closed, that implies that the
  	// channel wasn't closed during the first observation.
  	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
  		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
  		return false
  	}
  
  	var t0 int64
  	if blockprofilerate > 0 {
  		t0 = cputicks()
  	}
  
  	lock(&c.lock)
  
  	if c.closed != 0 {
  		unlock(&c.lock)
  		panic(plainError("send on closed channel"))
  	}
  
  	if sg := c.recvq.dequeue(); sg != nil {
  		// Found a waiting receiver. We pass the value we want to send
  		// directly to the receiver, bypassing the channel buffer (if any).
  		send(c, sg, ep, func() { unlock(&c.lock) })
  		return true
  	}
  
  	if c.qcount < c.dataqsiz {
  		// Space is available in the channel buffer. Enqueue the element to send.
  		qp := chanbuf(c, c.sendx)
  		if raceenabled {
  			raceacquire(qp)
  			racerelease(qp)
  		}
  		typedmemmove(c.elemtype, qp, ep)
  		c.sendx++
  		if c.sendx == c.dataqsiz {
  			c.sendx = 0
  		}
  		c.qcount++
  		unlock(&c.lock)
  		return true
  	}
  
  	if !block {
  		unlock(&c.lock)
  		return false
  	}
  
  	// Block on the channel. Some receiver will complete our operation for us.
  	gp := getg()
  	mysg := acquireSudog()
  	mysg.releasetime = 0
  	if t0 != 0 {
  		mysg.releasetime = -1
  	}
  	// No stack splits between assigning elem and enqueuing mysg
  	// on gp.waiting where copystack can find it.
  	mysg.elem = ep
  	mysg.waitlink = nil
  	mysg.g = gp
  	mysg.selectdone = nil
  	mysg.c = c
  	gp.waiting = mysg
  	gp.param = nil
  	c.sendq.enqueue(mysg)
  	goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
  
  	// someone woke us up.
  	if mysg != gp.waiting {
  		throw("G waiting list is corrupted")
  	}
  	gp.waiting = nil
  	if gp.param == nil {
  		if c.closed == 0 {
  			throw("chansend: spurious wakeup")
  		}
  		panic(plainError("send on closed channel"))
  	}
  	gp.param = nil
  	if mysg.releasetime > 0 {
  		blockevent(mysg.releasetime-t0, 2)
  	}
  	mysg.c = nil
  	releaseSudog(mysg)
  	return true
  }
  
  // send processes a send operation on an empty channel c.
  // The value ep sent by the sender is copied to the receiver sg.
  // The receiver is then woken up to go on its merry way.
  // Channel c must be empty and locked.  send unlocks c with unlockf.
  // sg must already be dequeued from c.
  // ep must be non-nil and point to the heap or the caller's stack.
  func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
  	if raceenabled {
  		if c.dataqsiz == 0 {
  			racesync(c, sg)
  		} else {
  			// Pretend we go through the buffer, even though
  			// we copy directly. Note that we need to increment
  			// the head/tail locations only when raceenabled.
  			qp := chanbuf(c, c.recvx)
  			raceacquire(qp)
  			racerelease(qp)
  			raceacquireg(sg.g, qp)
  			racereleaseg(sg.g, qp)
  			c.recvx++
  			if c.recvx == c.dataqsiz {
  				c.recvx = 0
  			}
  			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  		}
  	}
  	if sg.elem != nil {
  		sendDirect(c.elemtype, sg, ep)
  		sg.elem = nil
  	}
  	gp := sg.g
  	unlockf()
  	gp.param = unsafe.Pointer(sg)
  	if sg.releasetime != 0 {
  		sg.releasetime = cputicks()
  	}
  	goready(gp, 4)
  }
  
  // Sends and receives on unbuffered or empty-buffered channels are the
  // only operations where one running goroutine writes to the stack of
  // another running goroutine. The GC assumes that stack writes only
  // happen when the goroutine is running and are only done by that
  // goroutine. Using a write barrier is sufficient to make up for
  // violating that assumption, but the write barrier has to work.
  // typedmemmove will call bulkBarrierPreWrite, but the target bytes
  // are not in the heap, so that will not help. We arrange to call
  // memmove and typeBitsBulkBarrier instead.
  
  func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
  	// src is on our stack, dst is a slot on another stack.
  
  	// Once we read sg.elem out of sg, it will no longer
  	// be updated if the destination's stack gets copied (shrunk).
  	// So make sure that no preemption points can happen between read & use.
  	dst := sg.elem
  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
  	memmove(dst, src, t.size)
  }
  
  func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
  	// dst is on our stack or the heap, src is on another stack.
  	// The channel is locked, so src will not move during this
  	// operation.
  	src := sg.elem
  	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
  	memmove(dst, src, t.size)
  }
  
  func closechan(c *hchan) {
  	if c == nil {
  		panic(plainError("close of nil channel"))
  	}
  
  	lock(&c.lock)
  	if c.closed != 0 {
  		unlock(&c.lock)
  		panic(plainError("close of closed channel"))
  	}
  
  	if raceenabled {
  		callerpc := getcallerpc(unsafe.Pointer(&c))
  		racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
  		racerelease(unsafe.Pointer(c))
  	}
  
  	c.closed = 1
  
  	var glist *g
  
  	// release all readers
  	for {
  		sg := c.recvq.dequeue()
  		if sg == nil {
  			break
  		}
  		if sg.elem != nil {
  			typedmemclr(c.elemtype, sg.elem)
  			sg.elem = nil
  		}
  		if sg.releasetime != 0 {
  			sg.releasetime = cputicks()
  		}
  		gp := sg.g
  		gp.param = nil
  		if raceenabled {
  			raceacquireg(gp, unsafe.Pointer(c))
  		}
  		gp.schedlink.set(glist)
  		glist = gp
  	}
  
  	// release all writers (they will panic)
  	for {
  		sg := c.sendq.dequeue()
  		if sg == nil {
  			break
  		}
  		sg.elem = nil
  		if sg.releasetime != 0 {
  			sg.releasetime = cputicks()
  		}
  		gp := sg.g
  		gp.param = nil
  		if raceenabled {
  			raceacquireg(gp, unsafe.Pointer(c))
  		}
  		gp.schedlink.set(glist)
  		glist = gp
  	}
  	unlock(&c.lock)
  
  	// Ready all Gs now that we've dropped the channel lock.
  	for glist != nil {
  		gp := glist
  		glist = glist.schedlink.ptr()
  		gp.schedlink = 0
  		goready(gp, 3)
  	}
  }
  
  // entry points for <- c from compiled code
  //go:nosplit
  func chanrecv1(t *chantype, c *hchan, elem unsafe.Pointer) {
  	chanrecv(t, c, elem, true)
  }
  
  //go:nosplit
  func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) {
  	_, received = chanrecv(t, c, elem, true)
  	return
  }
  
  // chanrecv receives on channel c and writes the received data to ep.
  // ep may be nil, in which case received data is ignored.
  // If block == false and no elements are available, returns (false, false).
  // Otherwise, if c is closed, zeros *ep and returns (true, false).
  // Otherwise, fills in *ep with an element and returns (true, true).
  // A non-nil ep must point to the heap or the caller's stack.
  func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  	// raceenabled: don't need to check ep, as it is always on the stack
  	// or is new memory allocated by reflect.
  
  	if debugChan {
  		print("chanrecv: chan=", c, "\n")
  	}
  
  	if c == nil {
  		if !block {
  			return
  		}
  		gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
  		throw("unreachable")
  	}
  
  	// Fast path: check for failed non-blocking operation without acquiring the lock.
  	//
  	// After observing that the channel is not ready for receiving, we observe that the
  	// channel is not closed. Each of these observations is a single word-sized read
  	// (first c.sendq.first or c.qcount, and second c.closed).
  	// Because a channel cannot be reopened, the later observation of the channel
  	// being not closed implies that it was also not closed at the moment of the
  	// first observation. We behave as if we observed the channel at that moment
  	// and report that the receive cannot proceed.
  	//
  	// The order of operations is important here: reversing the operations can lead to
  	// incorrect behavior when racing with a close.
  	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
  		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
  		atomic.Load(&c.closed) == 0 {
  		return
  	}
  
  	var t0 int64
  	if blockprofilerate > 0 {
  		t0 = cputicks()
  	}
  
  	lock(&c.lock)
  
  	if c.closed != 0 && c.qcount == 0 {
  		if raceenabled {
  			raceacquire(unsafe.Pointer(c))
  		}
  		unlock(&c.lock)
  		if ep != nil {
  			typedmemclr(c.elemtype, ep)
  		}
  		return true, false
  	}
  
  	if sg := c.sendq.dequeue(); sg != nil {
  		// Found a waiting sender. If buffer is size 0, receive value
  		// directly from sender. Otherwise, receive from head of queue
  		// and add sender's value to the tail of the queue (both map to
  		// the same buffer slot because the queue is full).
  		recv(c, sg, ep, func() { unlock(&c.lock) })
  		return true, true
  	}
  
  	if c.qcount > 0 {
  		// Receive directly from queue
  		qp := chanbuf(c, c.recvx)
  		if raceenabled {
  			raceacquire(qp)
  			racerelease(qp)
  		}
  		if ep != nil {
  			typedmemmove(c.elemtype, ep, qp)
  		}
  		typedmemclr(c.elemtype, qp)
  		c.recvx++
  		if c.recvx == c.dataqsiz {
  			c.recvx = 0
  		}
  		c.qcount--
  		unlock(&c.lock)
  		return true, true
  	}
  
  	if !block {
  		unlock(&c.lock)
  		return false, false
  	}
  
  	// no sender available: block on this channel.
  	gp := getg()
  	mysg := acquireSudog()
  	mysg.releasetime = 0
  	if t0 != 0 {
  		mysg.releasetime = -1
  	}
  	// No stack splits between assigning elem and enqueuing mysg
  	// on gp.waiting where copystack can find it.
  	mysg.elem = ep
  	mysg.waitlink = nil
  	gp.waiting = mysg
  	mysg.g = gp
  	mysg.selectdone = nil
  	mysg.c = c
  	gp.param = nil
  	c.recvq.enqueue(mysg)
  	goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
  
  	// someone woke us up
  	if mysg != gp.waiting {
  		throw("G waiting list is corrupted")
  	}
  	gp.waiting = nil
  	if mysg.releasetime > 0 {
  		blockevent(mysg.releasetime-t0, 2)
  	}
  	closed := gp.param == nil
  	gp.param = nil
  	mysg.c = nil
  	releaseSudog(mysg)
  	return true, !closed
  }
  
  // recv processes a receive operation on a full channel c.
  // There are 2 parts:
  // 1) The value sent by the sender sg is put into the channel
  //    and the sender is woken up to go on its merry way.
  // 2) The value received by the receiver (the current G) is
  //    written to ep.
  // For synchronous channels, both values are the same.
  // For asynchronous channels, the receiver gets its data from
  // the channel buffer and the sender's data is put in the
  // channel buffer.
  // Channel c must be full and locked. recv unlocks c with unlockf.
  // sg must already be dequeued from c.
  // A non-nil ep must point to the heap or the caller's stack.
  func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
  	if c.dataqsiz == 0 {
  		if raceenabled {
  			racesync(c, sg)
  		}
  		if ep != nil {
  			// copy data from sender
  			recvDirect(c.elemtype, sg, ep)
  		}
  	} else {
  		// Queue is full. Take the item at the
  		// head of the queue. Make the sender enqueue
  		// its item at the tail of the queue. Since the
  		// queue is full, those are both the same slot.
  		qp := chanbuf(c, c.recvx)
  		if raceenabled {
  			raceacquire(qp)
  			racerelease(qp)
  			raceacquireg(sg.g, qp)
  			racereleaseg(sg.g, qp)
  		}
  		// copy data from queue to receiver
  		if ep != nil {
  			typedmemmove(c.elemtype, ep, qp)
  		}
  		// copy data from sender to queue
  		typedmemmove(c.elemtype, qp, sg.elem)
  		c.recvx++
  		if c.recvx == c.dataqsiz {
  			c.recvx = 0
  		}
  		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  	}
  	sg.elem = nil
  	gp := sg.g
  	unlockf()
  	gp.param = unsafe.Pointer(sg)
  	if sg.releasetime != 0 {
  		sg.releasetime = cputicks()
  	}
  	goready(gp, 4)
  }
  
  // compiler implements
  //
  //	select {
  //	case c <- v:
  //		... foo
  //	default:
  //		... bar
  //	}
  //
  // as
  //
  //	if selectnbsend(c, v) {
  //		... foo
  //	} else {
  //		... bar
  //	}
  //
  func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {
  	return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
  }
  
  // compiler implements
  //
  //	select {
  //	case v = <-c:
  //		... foo
  //	default:
  //		... bar
  //	}
  //
  // as
  //
  //	if selectnbrecv(&v, c) {
  //		... foo
  //	} else {
  //		... bar
  //	}
  //
  func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {
  	selected, _ = chanrecv(t, c, elem, false)
  	return
  }
  
  // compiler implements
  //
  //	select {
  //	case v, ok = <-c:
  //		... foo
  //	default:
  //		... bar
  //	}
  //
  // as
  //
  //	if c != nil && selectnbrecv2(&v, &ok, c) {
  //		... foo
  //	} else {
  //		... bar
  //	}
  //
  func selectnbrecv2(t *chantype, elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
  	// TODO(khr): just return 2 values from this function, now that it is in Go.
  	selected, *received = chanrecv(t, c, elem, false)
  	return
  }
  
  //go:linkname reflect_chansend reflect.chansend
  func reflect_chansend(t *chantype, c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
  	return chansend(t, c, elem, !nb, getcallerpc(unsafe.Pointer(&t)))
  }
  
  //go:linkname reflect_chanrecv reflect.chanrecv
  func reflect_chanrecv(t *chantype, c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
  	return chanrecv(t, c, elem, !nb)
  }
  
  //go:linkname reflect_chanlen reflect.chanlen
  func reflect_chanlen(c *hchan) int {
  	if c == nil {
  		return 0
  	}
  	return int(c.qcount)
  }
  
  //go:linkname reflect_chancap reflect.chancap
  func reflect_chancap(c *hchan) int {
  	if c == nil {
  		return 0
  	}
  	return int(c.dataqsiz)
  }
  
  //go:linkname reflect_chanclose reflect.chanclose
  func reflect_chanclose(c *hchan) {
  	closechan(c)
  }
  
  func (q *waitq) enqueue(sgp *sudog) {
  	sgp.next = nil
  	x := q.last
  	if x == nil {
  		sgp.prev = nil
  		q.first = sgp
  		q.last = sgp
  		return
  	}
  	sgp.prev = x
  	x.next = sgp
  	q.last = sgp
  }
  
  func (q *waitq) dequeue() *sudog {
  	for {
  		sgp := q.first
  		if sgp == nil {
  			return nil
  		}
  		y := sgp.next
  		if y == nil {
  			q.first = nil
  			q.last = nil
  		} else {
  			y.prev = nil
  			q.first = y
  			sgp.next = nil // mark as removed (see dequeueSudog)
  		}
  
  		// if sgp participates in a select and is already signaled, ignore it
  		if sgp.selectdone != nil {
  			// claim the right to signal
  			if *sgp.selectdone != 0 || !atomic.Cas(sgp.selectdone, 0, 1) {
  				continue
  			}
  		}
  
  		return sgp
  	}
  }
  
  func racesync(c *hchan, sg *sudog) {
  	racerelease(chanbuf(c, 0))
  	raceacquireg(sg.g, chanbuf(c, 0))
  	racereleaseg(sg.g, chanbuf(c, 0))
  	raceacquire(chanbuf(c, 0))
  }
  

View as plain text