Source file src/net/pipe.go

     1  // Copyright 2010 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package net
     6  
     7  import (
     8  	"io"
     9  	"os"
    10  	"sync"
    11  	"time"
    12  )
    13  
    14  // pipeDeadline is an abstraction for handling timeouts.
    15  type pipeDeadline struct {
    16  	mu     sync.Mutex // Guards timer and cancel
    17  	timer  *time.Timer
    18  	cancel chan struct{} // Must be non-nil
    19  }
    20  
    21  func makePipeDeadline() pipeDeadline {
    22  	return pipeDeadline{cancel: make(chan struct{})}
    23  }
    24  
    25  // set sets the point in time when the deadline will time out.
    26  // A timeout event is signaled by closing the channel returned by waiter.
    27  // Once a timeout has occurred, the deadline can be refreshed by specifying a
    28  // t value in the future.
    29  //
    30  // A zero value for t prevents timeout.
    31  func (d *pipeDeadline) set(t time.Time) {
    32  	d.mu.Lock()
    33  	defer d.mu.Unlock()
    34  
    35  	if d.timer != nil && !d.timer.Stop() {
    36  		<-d.cancel // Wait for the timer callback to finish and close cancel
    37  	}
    38  	d.timer = nil
    39  
    40  	// Time is zero, then there is no deadline.
    41  	closed := isClosedChan(d.cancel)
    42  	if t.IsZero() {
    43  		if closed {
    44  			d.cancel = make(chan struct{})
    45  		}
    46  		return
    47  	}
    48  
    49  	// Time in the future, setup a timer to cancel in the future.
    50  	if dur := time.Until(t); dur > 0 {
    51  		if closed {
    52  			d.cancel = make(chan struct{})
    53  		}
    54  		d.timer = time.AfterFunc(dur, func() {
    55  			close(d.cancel)
    56  		})
    57  		return
    58  	}
    59  
    60  	// Time in the past, so close immediately.
    61  	if !closed {
    62  		close(d.cancel)
    63  	}
    64  }
    65  
    66  // wait returns a channel that is closed when the deadline is exceeded.
    67  func (d *pipeDeadline) wait() chan struct{} {
    68  	d.mu.Lock()
    69  	defer d.mu.Unlock()
    70  	return d.cancel
    71  }
    72  
    73  func isClosedChan(c <-chan struct{}) bool {
    74  	select {
    75  	case <-c:
    76  		return true
    77  	default:
    78  		return false
    79  	}
    80  }
    81  
    82  type pipeAddr struct{}
    83  
    84  func (pipeAddr) Network() string { return "pipe" }
    85  func (pipeAddr) String() string  { return "pipe" }
    86  
    87  type pipe struct {
    88  	wrMu sync.Mutex // Serialize Write operations
    89  
    90  	// Used by local Read to interact with remote Write.
    91  	// Successful receive on rdRx is always followed by send on rdTx.
    92  	rdRx <-chan []byte
    93  	rdTx chan<- int
    94  
    95  	// Used by local Write to interact with remote Read.
    96  	// Successful send on wrTx is always followed by receive on wrRx.
    97  	wrTx chan<- []byte
    98  	wrRx <-chan int
    99  
   100  	once       sync.Once // Protects closing localDone
   101  	localDone  chan struct{}
   102  	remoteDone <-chan struct{}
   103  
   104  	readDeadline  pipeDeadline
   105  	writeDeadline pipeDeadline
   106  }
   107  
   108  // Pipe creates a synchronous, in-memory, full duplex
   109  // network connection; both ends implement the [Conn] interface.
   110  // Reads on one end are matched with writes on the other,
   111  // copying data directly between the two; there is no internal
   112  // buffering.
   113  func Pipe() (Conn, Conn) {
   114  	cb1 := make(chan []byte)
   115  	cb2 := make(chan []byte)
   116  	cn1 := make(chan int)
   117  	cn2 := make(chan int)
   118  	done1 := make(chan struct{})
   119  	done2 := make(chan struct{})
   120  
   121  	p1 := &pipe{
   122  		rdRx: cb1, rdTx: cn1,
   123  		wrTx: cb2, wrRx: cn2,
   124  		localDone: done1, remoteDone: done2,
   125  		readDeadline:  makePipeDeadline(),
   126  		writeDeadline: makePipeDeadline(),
   127  	}
   128  	p2 := &pipe{
   129  		rdRx: cb2, rdTx: cn2,
   130  		wrTx: cb1, wrRx: cn1,
   131  		localDone: done2, remoteDone: done1,
   132  		readDeadline:  makePipeDeadline(),
   133  		writeDeadline: makePipeDeadline(),
   134  	}
   135  	return p1, p2
   136  }
   137  
   138  func (*pipe) LocalAddr() Addr  { return pipeAddr{} }
   139  func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
   140  
   141  func (p *pipe) Read(b []byte) (int, error) {
   142  	n, err := p.read(b)
   143  	if err != nil && err != io.EOF && err != io.ErrClosedPipe {
   144  		err = &OpError{Op: "read", Net: "pipe", Err: err}
   145  	}
   146  	return n, err
   147  }
   148  
   149  func (p *pipe) read(b []byte) (n int, err error) {
   150  	switch {
   151  	case isClosedChan(p.localDone):
   152  		return 0, io.ErrClosedPipe
   153  	case isClosedChan(p.remoteDone):
   154  		return 0, io.EOF
   155  	case isClosedChan(p.readDeadline.wait()):
   156  		return 0, os.ErrDeadlineExceeded
   157  	}
   158  
   159  	select {
   160  	case bw := <-p.rdRx:
   161  		nr := copy(b, bw)
   162  		p.rdTx <- nr
   163  		return nr, nil
   164  	case <-p.localDone:
   165  		return 0, io.ErrClosedPipe
   166  	case <-p.remoteDone:
   167  		return 0, io.EOF
   168  	case <-p.readDeadline.wait():
   169  		return 0, os.ErrDeadlineExceeded
   170  	}
   171  }
   172  
   173  func (p *pipe) Write(b []byte) (int, error) {
   174  	n, err := p.write(b)
   175  	if err != nil && err != io.ErrClosedPipe {
   176  		err = &OpError{Op: "write", Net: "pipe", Err: err}
   177  	}
   178  	return n, err
   179  }
   180  
   181  func (p *pipe) write(b []byte) (n int, err error) {
   182  	switch {
   183  	case isClosedChan(p.localDone):
   184  		return 0, io.ErrClosedPipe
   185  	case isClosedChan(p.remoteDone):
   186  		return 0, io.ErrClosedPipe
   187  	case isClosedChan(p.writeDeadline.wait()):
   188  		return 0, os.ErrDeadlineExceeded
   189  	}
   190  
   191  	p.wrMu.Lock() // Ensure entirety of b is written together
   192  	defer p.wrMu.Unlock()
   193  	for once := true; once || len(b) > 0; once = false {
   194  		select {
   195  		case p.wrTx <- b:
   196  			nw := <-p.wrRx
   197  			b = b[nw:]
   198  			n += nw
   199  		case <-p.localDone:
   200  			return n, io.ErrClosedPipe
   201  		case <-p.remoteDone:
   202  			return n, io.ErrClosedPipe
   203  		case <-p.writeDeadline.wait():
   204  			return n, os.ErrDeadlineExceeded
   205  		}
   206  	}
   207  	return n, nil
   208  }
   209  
   210  func (p *pipe) SetDeadline(t time.Time) error {
   211  	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
   212  		return io.ErrClosedPipe
   213  	}
   214  	p.readDeadline.set(t)
   215  	p.writeDeadline.set(t)
   216  	return nil
   217  }
   218  
   219  func (p *pipe) SetReadDeadline(t time.Time) error {
   220  	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
   221  		return io.ErrClosedPipe
   222  	}
   223  	p.readDeadline.set(t)
   224  	return nil
   225  }
   226  
   227  func (p *pipe) SetWriteDeadline(t time.Time) error {
   228  	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
   229  		return io.ErrClosedPipe
   230  	}
   231  	p.writeDeadline.set(t)
   232  	return nil
   233  }
   234  
   235  func (p *pipe) Close() error {
   236  	p.once.Do(func() { close(p.localDone) })
   237  	return nil
   238  }
   239  

View as plain text