...
Run Format

Source file src/io/pipe.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// Pipe adapter to connect code expecting an io.Reader
     6	// with code expecting an io.Writer.
     7	
     8	package io
     9	
    10	import (
    11		"errors"
    12		"sync"
    13	)
    14	
    15	// ErrClosedPipe is the error used for read or write operations on a closed pipe.
    16	var ErrClosedPipe = errors.New("io: read/write on closed pipe")
    17	
    18	// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
    19	type pipe struct {
    20		rl    sync.Mutex // gates readers one at a time
    21		wl    sync.Mutex // gates writers one at a time
    22		l     sync.Mutex // protects remaining fields
    23		data  []byte     // data remaining in pending write
    24		rwait sync.Cond  // waiting reader
    25		wwait sync.Cond  // waiting writer
    26		rerr  error      // if reader closed, error to give writes
    27		werr  error      // if writer closed, error to give reads
    28	}
    29	
    30	func (p *pipe) read(b []byte) (n int, err error) {
    31		// One reader at a time.
    32		p.rl.Lock()
    33		defer p.rl.Unlock()
    34	
    35		p.l.Lock()
    36		defer p.l.Unlock()
    37		for {
    38			if p.rerr != nil {
    39				return 0, ErrClosedPipe
    40			}
    41			if p.data != nil {
    42				break
    43			}
    44			if p.werr != nil {
    45				return 0, p.werr
    46			}
    47			p.rwait.Wait()
    48		}
    49		n = copy(b, p.data)
    50		p.data = p.data[n:]
    51		if len(p.data) == 0 {
    52			p.data = nil
    53			p.wwait.Signal()
    54		}
    55		return
    56	}
    57	
    58	var zero [0]byte
    59	
    60	func (p *pipe) write(b []byte) (n int, err error) {
    61		// pipe uses nil to mean not available
    62		if b == nil {
    63			b = zero[:]
    64		}
    65	
    66		// One writer at a time.
    67		p.wl.Lock()
    68		defer p.wl.Unlock()
    69	
    70		p.l.Lock()
    71		defer p.l.Unlock()
    72		if p.werr != nil {
    73			err = ErrClosedPipe
    74			return
    75		}
    76		p.data = b
    77		p.rwait.Signal()
    78		for {
    79			if p.data == nil {
    80				break
    81			}
    82			if p.rerr != nil {
    83				err = p.rerr
    84				break
    85			}
    86			if p.werr != nil {
    87				err = ErrClosedPipe
    88			}
    89			p.wwait.Wait()
    90		}
    91		n = len(b) - len(p.data)
    92		p.data = nil // in case of rerr or werr
    93		return
    94	}
    95	
    96	func (p *pipe) rclose(err error) {
    97		if err == nil {
    98			err = ErrClosedPipe
    99		}
   100		p.l.Lock()
   101		defer p.l.Unlock()
   102		p.rerr = err
   103		p.rwait.Signal()
   104		p.wwait.Signal()
   105	}
   106	
   107	func (p *pipe) wclose(err error) {
   108		if err == nil {
   109			err = EOF
   110		}
   111		p.l.Lock()
   112		defer p.l.Unlock()
   113		p.werr = err
   114		p.rwait.Signal()
   115		p.wwait.Signal()
   116	}
   117	
   118	// A PipeReader is the read half of a pipe.
   119	type PipeReader struct {
   120		p *pipe
   121	}
   122	
   123	// Read implements the standard Read interface:
   124	// it reads data from the pipe, blocking until a writer
   125	// arrives or the write end is closed.
   126	// If the write end is closed with an error, that error is
   127	// returned as err; otherwise err is EOF.
   128	func (r *PipeReader) Read(data []byte) (n int, err error) {
   129		return r.p.read(data)
   130	}
   131	
   132	// Close closes the reader; subsequent writes to the
   133	// write half of the pipe will return the error ErrClosedPipe.
   134	func (r *PipeReader) Close() error {
   135		return r.CloseWithError(nil)
   136	}
   137	
   138	// CloseWithError closes the reader; subsequent writes
   139	// to the write half of the pipe will return the error err.
   140	func (r *PipeReader) CloseWithError(err error) error {
   141		r.p.rclose(err)
   142		return nil
   143	}
   144	
   145	// A PipeWriter is the write half of a pipe.
   146	type PipeWriter struct {
   147		p *pipe
   148	}
   149	
   150	// Write implements the standard Write interface:
   151	// it writes data to the pipe, blocking until readers
   152	// have consumed all the data or the read end is closed.
   153	// If the read end is closed with an error, that err is
   154	// returned as err; otherwise err is ErrClosedPipe.
   155	func (w *PipeWriter) Write(data []byte) (n int, err error) {
   156		return w.p.write(data)
   157	}
   158	
   159	// Close closes the writer; subsequent reads from the
   160	// read half of the pipe will return no bytes and EOF.
   161	func (w *PipeWriter) Close() error {
   162		return w.CloseWithError(nil)
   163	}
   164	
   165	// CloseWithError closes the writer; subsequent reads from the
   166	// read half of the pipe will return no bytes and the error err,
   167	// or EOF if err is nil.
   168	//
   169	// CloseWithError always returns nil.
   170	func (w *PipeWriter) CloseWithError(err error) error {
   171		w.p.wclose(err)
   172		return nil
   173	}
   174	
   175	// Pipe creates a synchronous in-memory pipe.
   176	// It can be used to connect code expecting an io.Reader
   177	// with code expecting an io.Writer.
   178	// Reads on one end are matched with writes on the other,
   179	// copying data directly between the two; there is no internal buffering.
   180	// It is safe to call Read and Write in parallel with each other or with
   181	// Close. Close will complete once pending I/O is done. Parallel calls to
   182	// Read, and parallel calls to Write, are also safe:
   183	// the individual calls will be gated sequentially.
   184	func Pipe() (*PipeReader, *PipeWriter) {
   185		p := new(pipe)
   186		p.rwait.L = &p.l
   187		p.wwait.L = &p.l
   188		r := &PipeReader{p}
   189		w := &PipeWriter{p}
   190		return r, w
   191	}
   192	

View as plain text