...
Run Format

Source file src/io/pipe.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// Pipe adapter to connect code expecting an io.Reader
     6	// with code expecting an io.Writer.
     7	
     8	package io
     9	
    10	import (
    11		"errors"
    12		"sync"
    13	)
    14	
    15	// ErrClosedPipe is the error used for read or write operations on a closed pipe.
    16	var ErrClosedPipe = errors.New("io: read/write on closed pipe")
    17	
    18	// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
    19	type pipe struct {
    20		rl    sync.Mutex // gates readers one at a time
    21		wl    sync.Mutex // gates writers one at a time
    22		l     sync.Mutex // protects remaining fields
    23		data  []byte     // data remaining in pending write
    24		rwait sync.Cond  // waiting reader
    25		wwait sync.Cond  // waiting writer
    26		rerr  error      // if reader closed, error to give writes
    27		werr  error      // if writer closed, error to give reads
    28	}
    29	
    30	func (p *pipe) read(b []byte) (n int, err error) {
    31		// One reader at a time.
    32		p.rl.Lock()
    33		defer p.rl.Unlock()
    34	
    35		p.l.Lock()
    36		defer p.l.Unlock()
    37		for {
    38			if p.rerr != nil {
    39				return 0, ErrClosedPipe
    40			}
    41			if p.data != nil {
    42				break
    43			}
    44			if p.werr != nil {
    45				return 0, p.werr
    46			}
    47			p.rwait.Wait()
    48		}
    49		n = copy(b, p.data)
    50		p.data = p.data[n:]
    51		if len(p.data) == 0 {
    52			p.data = nil
    53			p.wwait.Signal()
    54		}
    55		return
    56	}
    57	
    58	var zero [0]byte
    59	
    60	func (p *pipe) write(b []byte) (n int, err error) {
    61		// pipe uses nil to mean not available
    62		if b == nil {
    63			b = zero[:]
    64		}
    65	
    66		// One writer at a time.
    67		p.wl.Lock()
    68		defer p.wl.Unlock()
    69	
    70		p.l.Lock()
    71		defer p.l.Unlock()
    72		if p.werr != nil {
    73			err = ErrClosedPipe
    74			return
    75		}
    76		p.data = b
    77		p.rwait.Signal()
    78		for {
    79			if p.data == nil {
    80				break
    81			}
    82			if p.rerr != nil {
    83				err = p.rerr
    84				break
    85			}
    86			if p.werr != nil {
    87				err = ErrClosedPipe
    88				break
    89			}
    90			p.wwait.Wait()
    91		}
    92		n = len(b) - len(p.data)
    93		p.data = nil // in case of rerr or werr
    94		return
    95	}
    96	
    97	func (p *pipe) rclose(err error) {
    98		if err == nil {
    99			err = ErrClosedPipe
   100		}
   101		p.l.Lock()
   102		defer p.l.Unlock()
   103		p.rerr = err
   104		p.rwait.Signal()
   105		p.wwait.Signal()
   106	}
   107	
   108	func (p *pipe) wclose(err error) {
   109		if err == nil {
   110			err = EOF
   111		}
   112		p.l.Lock()
   113		defer p.l.Unlock()
   114		p.werr = err
   115		p.rwait.Signal()
   116		p.wwait.Signal()
   117	}
   118	
   119	// A PipeReader is the read half of a pipe.
   120	type PipeReader struct {
   121		p *pipe
   122	}
   123	
   124	// Read implements the standard Read interface:
   125	// it reads data from the pipe, blocking until a writer
   126	// arrives or the write end is closed.
   127	// If the write end is closed with an error, that error is
   128	// returned as err; otherwise err is EOF.
   129	func (r *PipeReader) Read(data []byte) (n int, err error) {
   130		return r.p.read(data)
   131	}
   132	
   133	// Close closes the reader; subsequent writes to the
   134	// write half of the pipe will return the error ErrClosedPipe.
   135	func (r *PipeReader) Close() error {
   136		return r.CloseWithError(nil)
   137	}
   138	
   139	// CloseWithError closes the reader; subsequent writes
   140	// to the write half of the pipe will return the error err.
   141	func (r *PipeReader) CloseWithError(err error) error {
   142		r.p.rclose(err)
   143		return nil
   144	}
   145	
   146	// A PipeWriter is the write half of a pipe.
   147	type PipeWriter struct {
   148		p *pipe
   149	}
   150	
   151	// Write implements the standard Write interface:
   152	// it writes data to the pipe, blocking until one or more readers
   153	// have consumed all the data or the read end is closed.
   154	// If the read end is closed with an error, that err is
   155	// returned as err; otherwise err is ErrClosedPipe.
   156	func (w *PipeWriter) Write(data []byte) (n int, err error) {
   157		return w.p.write(data)
   158	}
   159	
   160	// Close closes the writer; subsequent reads from the
   161	// read half of the pipe will return no bytes and EOF.
   162	func (w *PipeWriter) Close() error {
   163		return w.CloseWithError(nil)
   164	}
   165	
   166	// CloseWithError closes the writer; subsequent reads from the
   167	// read half of the pipe will return no bytes and the error err,
   168	// or EOF if err is nil.
   169	//
   170	// CloseWithError always returns nil.
   171	func (w *PipeWriter) CloseWithError(err error) error {
   172		w.p.wclose(err)
   173		return nil
   174	}
   175	
   176	// Pipe creates a synchronous in-memory pipe.
   177	// It can be used to connect code expecting an io.Reader
   178	// with code expecting an io.Writer.
   179	//
   180	// Reads and Writes on the pipe are matched one to one
   181	// except when multiple Reads are needed to consume a single Write.
   182	// That is, each Write to the PipeWriter blocks until it has satisfied
   183	// one or more Reads from the PipeReader that fully consume
   184	// the written data.
   185	// The data is copied directly from the Write to the corresponding
   186	// Read (or Reads); there is no internal buffering.
   187	//
   188	// It is safe to call Read and Write in parallel with each other or with Close.
   189	// Parallel calls to Read and parallel calls to Write are also safe:
   190	// the individual calls will be gated sequentially.
   191	func Pipe() (*PipeReader, *PipeWriter) {
   192		p := new(pipe)
   193		p.rwait.L = &p.l
   194		p.wwait.L = &p.l
   195		r := &PipeReader{p}
   196		w := &PipeWriter{p}
   197		return r, w
   198	}
   199	

View as plain text