// Copyright 2010 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package net import ( "io" "os" "sync" "time" ) // pipeDeadline is an abstraction for handling timeouts. type pipeDeadline struct { mu sync.Mutex // Guards timer and cancel timer *time.Timer cancel chan struct{} // Must be non-nil } func makePipeDeadline() pipeDeadline { return pipeDeadline{cancel: make(chan struct{})} } // set sets the point in time when the deadline will time out. // A timeout event is signaled by closing the channel returned by waiter. // Once a timeout has occurred, the deadline can be refreshed by specifying a // t value in the future. // // A zero value for t prevents timeout. func (d *pipeDeadline) set(t time.Time) { d.mu.Lock() defer d.mu.Unlock() if d.timer != nil && !d.timer.Stop() { <-d.cancel // Wait for the timer callback to finish and close cancel } d.timer = nil // Time is zero, then there is no deadline. closed := isClosedChan(d.cancel) if t.IsZero() { if closed { d.cancel = make(chan struct{}) } return } // Time in the future, setup a timer to cancel in the future. if dur := time.Until(t); dur > 0 { if closed { d.cancel = make(chan struct{}) } d.timer = time.AfterFunc(dur, func() { close(d.cancel) }) return } // Time in the past, so close immediately. if !closed { close(d.cancel) } } // wait returns a channel that is closed when the deadline is exceeded. func (d *pipeDeadline) wait() chan struct{} { d.mu.Lock() defer d.mu.Unlock() return d.cancel } func isClosedChan(c <-chan struct{}) bool { select { case <-c: return true default: return false } } type pipeAddr struct{} func (pipeAddr) Network() string { return "pipe" } func (pipeAddr) String() string { return "pipe" } type pipe struct { wrMu sync.Mutex // Serialize Write operations // Used by local Read to interact with remote Write. // Successful receive on rdRx is always followed by send on rdTx. rdRx <-chan []byte rdTx chan<- int // Used by local Write to interact with remote Read. // Successful send on wrTx is always followed by receive on wrRx. wrTx chan<- []byte wrRx <-chan int once sync.Once // Protects closing localDone localDone chan struct{} remoteDone <-chan struct{} readDeadline pipeDeadline writeDeadline pipeDeadline } // Pipe creates a synchronous, in-memory, full duplex // network connection; both ends implement the [Conn] interface. // Reads on one end are matched with writes on the other, // copying data directly between the two; there is no internal // buffering. func Pipe() (Conn, Conn) { cb1 := make(chan []byte) cb2 := make(chan []byte) cn1 := make(chan int) cn2 := make(chan int) done1 := make(chan struct{}) done2 := make(chan struct{}) p1 := &pipe{ rdRx: cb1, rdTx: cn1, wrTx: cb2, wrRx: cn2, localDone: done1, remoteDone: done2, readDeadline: makePipeDeadline(), writeDeadline: makePipeDeadline(), } p2 := &pipe{ rdRx: cb2, rdTx: cn2, wrTx: cb1, wrRx: cn1, localDone: done2, remoteDone: done1, readDeadline: makePipeDeadline(), writeDeadline: makePipeDeadline(), } return p1, p2 } func (*pipe) LocalAddr() Addr { return pipeAddr{} } func (*pipe) RemoteAddr() Addr { return pipeAddr{} } func (p *pipe) Read(b []byte) (int, error) { n, err := p.read(b) if err != nil && err != io.EOF && err != io.ErrClosedPipe { err = &OpError{Op: "read", Net: "pipe", Err: err} } return n, err } func (p *pipe) read(b []byte) (n int, err error) { switch { case isClosedChan(p.localDone): return 0, io.ErrClosedPipe case isClosedChan(p.remoteDone): return 0, io.EOF case isClosedChan(p.readDeadline.wait()): return 0, os.ErrDeadlineExceeded } select { case bw := <-p.rdRx: nr := copy(b, bw) p.rdTx <- nr return nr, nil case <-p.localDone: return 0, io.ErrClosedPipe case <-p.remoteDone: return 0, io.EOF case <-p.readDeadline.wait(): return 0, os.ErrDeadlineExceeded } } func (p *pipe) Write(b []byte) (int, error) { n, err := p.write(b) if err != nil && err != io.ErrClosedPipe { err = &OpError{Op: "write", Net: "pipe", Err: err} } return n, err } func (p *pipe) write(b []byte) (n int, err error) { switch { case isClosedChan(p.localDone): return 0, io.ErrClosedPipe case isClosedChan(p.remoteDone): return 0, io.ErrClosedPipe case isClosedChan(p.writeDeadline.wait()): return 0, os.ErrDeadlineExceeded } p.wrMu.Lock() // Ensure entirety of b is written together defer p.wrMu.Unlock() for once := true; once || len(b) > 0; once = false { select { case p.wrTx <- b: nw := <-p.wrRx b = b[nw:] n += nw case <-p.localDone: return n, io.ErrClosedPipe case <-p.remoteDone: return n, io.ErrClosedPipe case <-p.writeDeadline.wait(): return n, os.ErrDeadlineExceeded } } return n, nil } func (p *pipe) SetDeadline(t time.Time) error { if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { return io.ErrClosedPipe } p.readDeadline.set(t) p.writeDeadline.set(t) return nil } func (p *pipe) SetReadDeadline(t time.Time) error { if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { return io.ErrClosedPipe } p.readDeadline.set(t) return nil } func (p *pipe) SetWriteDeadline(t time.Time) error { if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { return io.ErrClosedPipe } p.writeDeadline.set(t) return nil } func (p *pipe) Close() error { p.once.Do(func() { close(p.localDone) }) return nil }