...
Run Format

Source file src/io/pipe.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// Pipe adapter to connect code expecting an io.Reader
     6	// with code expecting an io.Writer.
     7	
     8	package io
     9	
    10	import (
    11		"errors"
    12		"sync"
    13	)
    14	
    15	// ErrClosedPipe is the error used for read or write operations on a closed pipe.
    16	var ErrClosedPipe = errors.New("io: read/write on closed pipe")
    17	
    18	type pipeResult struct {
    19		n   int
    20		err error
    21	}
    22	
    23	// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
    24	type pipe struct {
    25		rl    sync.Mutex // gates readers one at a time
    26		wl    sync.Mutex // gates writers one at a time
    27		l     sync.Mutex // protects remaining fields
    28		data  []byte     // data remaining in pending write
    29		rwait sync.Cond  // waiting reader
    30		wwait sync.Cond  // waiting writer
    31		rerr  error      // if reader closed, error to give writes
    32		werr  error      // if writer closed, error to give reads
    33	}
    34	
    35	func (p *pipe) read(b []byte) (n int, err error) {
    36		// One reader at a time.
    37		p.rl.Lock()
    38		defer p.rl.Unlock()
    39	
    40		p.l.Lock()
    41		defer p.l.Unlock()
    42		for {
    43			if p.rerr != nil {
    44				return 0, ErrClosedPipe
    45			}
    46			if p.data != nil {
    47				break
    48			}
    49			if p.werr != nil {
    50				return 0, p.werr
    51			}
    52			p.rwait.Wait()
    53		}
    54		n = copy(b, p.data)
    55		p.data = p.data[n:]
    56		if len(p.data) == 0 {
    57			p.data = nil
    58			p.wwait.Signal()
    59		}
    60		return
    61	}
    62	
    63	var zero [0]byte
    64	
    65	func (p *pipe) write(b []byte) (n int, err error) {
    66		// pipe uses nil to mean not available
    67		if b == nil {
    68			b = zero[:]
    69		}
    70	
    71		// One writer at a time.
    72		p.wl.Lock()
    73		defer p.wl.Unlock()
    74	
    75		p.l.Lock()
    76		defer p.l.Unlock()
    77		if p.werr != nil {
    78			err = ErrClosedPipe
    79			return
    80		}
    81		p.data = b
    82		p.rwait.Signal()
    83		for {
    84			if p.data == nil {
    85				break
    86			}
    87			if p.rerr != nil {
    88				err = p.rerr
    89				break
    90			}
    91			if p.werr != nil {
    92				err = ErrClosedPipe
    93			}
    94			p.wwait.Wait()
    95		}
    96		n = len(b) - len(p.data)
    97		p.data = nil // in case of rerr or werr
    98		return
    99	}
   100	
   101	func (p *pipe) rclose(err error) {
   102		if err == nil {
   103			err = ErrClosedPipe
   104		}
   105		p.l.Lock()
   106		defer p.l.Unlock()
   107		p.rerr = err
   108		p.rwait.Signal()
   109		p.wwait.Signal()
   110	}
   111	
   112	func (p *pipe) wclose(err error) {
   113		if err == nil {
   114			err = EOF
   115		}
   116		p.l.Lock()
   117		defer p.l.Unlock()
   118		p.werr = err
   119		p.rwait.Signal()
   120		p.wwait.Signal()
   121	}
   122	
   123	// A PipeReader is the read half of a pipe.
   124	type PipeReader struct {
   125		p *pipe
   126	}
   127	
   128	// Read implements the standard Read interface:
   129	// it reads data from the pipe, blocking until a writer
   130	// arrives or the write end is closed.
   131	// If the write end is closed with an error, that error is
   132	// returned as err; otherwise err is EOF.
   133	func (r *PipeReader) Read(data []byte) (n int, err error) {
   134		return r.p.read(data)
   135	}
   136	
   137	// Close closes the reader; subsequent writes to the
   138	// write half of the pipe will return the error ErrClosedPipe.
   139	func (r *PipeReader) Close() error {
   140		return r.CloseWithError(nil)
   141	}
   142	
   143	// CloseWithError closes the reader; subsequent writes
   144	// to the write half of the pipe will return the error err.
   145	func (r *PipeReader) CloseWithError(err error) error {
   146		r.p.rclose(err)
   147		return nil
   148	}
   149	
   150	// A PipeWriter is the write half of a pipe.
   151	type PipeWriter struct {
   152		p *pipe
   153	}
   154	
   155	// Write implements the standard Write interface:
   156	// it writes data to the pipe, blocking until readers
   157	// have consumed all the data or the read end is closed.
   158	// If the read end is closed with an error, that err is
   159	// returned as err; otherwise err is ErrClosedPipe.
   160	func (w *PipeWriter) Write(data []byte) (n int, err error) {
   161		return w.p.write(data)
   162	}
   163	
   164	// Close closes the writer; subsequent reads from the
   165	// read half of the pipe will return no bytes and EOF.
   166	func (w *PipeWriter) Close() error {
   167		return w.CloseWithError(nil)
   168	}
   169	
   170	// CloseWithError closes the writer; subsequent reads from the
   171	// read half of the pipe will return no bytes and the error err.
   172	func (w *PipeWriter) CloseWithError(err error) error {
   173		w.p.wclose(err)
   174		return nil
   175	}
   176	
   177	// Pipe creates a synchronous in-memory pipe.
   178	// It can be used to connect code expecting an io.Reader
   179	// with code expecting an io.Writer.
   180	// Reads on one end are matched with writes on the other,
   181	// copying data directly between the two; there is no internal buffering.
   182	// It is safe to call Read and Write in parallel with each other or with
   183	// Close. Close will complete once pending I/O is done. Parallel calls to
   184	// Read, and parallel calls to Write, are also safe:
   185	// the individual calls will be gated sequentially.
   186	func Pipe() (*PipeReader, *PipeWriter) {
   187		p := new(pipe)
   188		p.rwait.L = &p.l
   189		p.wwait.L = &p.l
   190		r := &PipeReader{p}
   191		w := &PipeWriter{p}
   192		return r, w
   193	}
   194	

View as plain text