The Go Programming Language

Source file src/pkg/io/pipe.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// Pipe adapter to connect code expecting an io.Reader
     6	// with code expecting an io.Writer.
     7	
     8	package io
     9	
    10	import (
    11		"os"
    12		"sync"
    13	)
    14	
    15	type pipeResult struct {
    16		n   int
    17		err os.Error
    18	}
    19	
    20	// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
    21	type pipe struct {
    22		rl    sync.Mutex // gates readers one at a time
    23		wl    sync.Mutex // gates writers one at a time
    24		l     sync.Mutex // protects remaining fields
    25		data  []byte     // data remaining in pending write
    26		rwait sync.Cond  // waiting reader
    27		wwait sync.Cond  // waiting writer
    28		rerr  os.Error   // if reader closed, error to give writes
    29		werr  os.Error   // if writer closed, error to give reads
    30	}
    31	
    32	func (p *pipe) read(b []byte) (n int, err os.Error) {
    33		// One reader at a time.
    34		p.rl.Lock()
    35		defer p.rl.Unlock()
    36	
    37		p.l.Lock()
    38		defer p.l.Unlock()
    39		for {
    40			if p.rerr != nil {
    41				return 0, os.EINVAL
    42			}
    43			if p.data != nil {
    44				break
    45			}
    46			if p.werr != nil {
    47				return 0, p.werr
    48			}
    49			p.rwait.Wait()
    50		}
    51		n = copy(b, p.data)
    52		p.data = p.data[n:]
    53		if len(p.data) == 0 {
    54			p.data = nil
    55			p.wwait.Signal()
    56		}
    57		return
    58	}
    59	
    60	var zero [0]byte
    61	
    62	func (p *pipe) write(b []byte) (n int, err os.Error) {
    63		// pipe uses nil to mean not available
    64		if b == nil {
    65			b = zero[:]
    66		}
    67	
    68		// One writer at a time.
    69		p.wl.Lock()
    70		defer p.wl.Unlock()
    71	
    72		p.l.Lock()
    73		defer p.l.Unlock()
    74		p.data = b
    75		p.rwait.Signal()
    76		for {
    77			if p.data == nil {
    78				break
    79			}
    80			if p.rerr != nil {
    81				err = p.rerr
    82				break
    83			}
    84			if p.werr != nil {
    85				err = os.EINVAL
    86			}
    87			p.wwait.Wait()
    88		}
    89		n = len(b) - len(p.data)
    90		p.data = nil // in case of rerr or werr
    91		return
    92	}
    93	
    94	func (p *pipe) rclose(err os.Error) {
    95		if err == nil {
    96			err = os.EPIPE
    97		}
    98		p.l.Lock()
    99		defer p.l.Unlock()
   100		p.rerr = err
   101		p.rwait.Signal()
   102		p.wwait.Signal()
   103	}
   104	
   105	func (p *pipe) wclose(err os.Error) {
   106		if err == nil {
   107			err = os.EOF
   108		}
   109		p.l.Lock()
   110		defer p.l.Unlock()
   111		p.werr = err
   112		p.rwait.Signal()
   113		p.wwait.Signal()
   114	}
   115	
   116	// A PipeReader is the read half of a pipe.
   117	type PipeReader struct {
   118		p *pipe
   119	}
   120	
   121	// Read implements the standard Read interface:
   122	// it reads data from the pipe, blocking until a writer
   123	// arrives or the write end is closed.
   124	// If the write end is closed with an error, that error is
   125	// returned as err; otherwise err is os.EOF.
   126	func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
   127		return r.p.read(data)
   128	}
   129	
   130	// Close closes the reader; subsequent writes to the
   131	// write half of the pipe will return the error os.EPIPE.
   132	func (r *PipeReader) Close() os.Error {
   133		return r.CloseWithError(nil)
   134	}
   135	
   136	// CloseWithError closes the reader; subsequent writes
   137	// to the write half of the pipe will return the error err.
   138	func (r *PipeReader) CloseWithError(err os.Error) os.Error {
   139		r.p.rclose(err)
   140		return nil
   141	}
   142	
   143	// A PipeWriter is the write half of a pipe.
   144	type PipeWriter struct {
   145		p *pipe
   146	}
   147	
   148	// Write implements the standard Write interface:
   149	// it writes data to the pipe, blocking until readers
   150	// have consumed all the data or the read end is closed.
   151	// If the read end is closed with an error, that err is
   152	// returned as err; otherwise err is os.EPIPE.
   153	func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
   154		return w.p.write(data)
   155	}
   156	
   157	// Close closes the writer; subsequent reads from the
   158	// read half of the pipe will return no bytes and os.EOF.
   159	func (w *PipeWriter) Close() os.Error {
   160		return w.CloseWithError(nil)
   161	}
   162	
   163	// CloseWithError closes the writer; subsequent reads from the
   164	// read half of the pipe will return no bytes and the error err.
   165	func (w *PipeWriter) CloseWithError(err os.Error) os.Error {
   166		w.p.wclose(err)
   167		return nil
   168	}
   169	
   170	// Pipe creates a synchronous in-memory pipe.
   171	// It can be used to connect code expecting an io.Reader
   172	// with code expecting an io.Writer.
   173	// Reads on one end are matched with writes on the other,
   174	// copying data directly between the two; there is no internal buffering.
   175	func Pipe() (*PipeReader, *PipeWriter) {
   176		p := new(pipe)
   177		p.rwait.L = &p.l
   178		p.wwait.L = &p.l
   179		r := &PipeReader{p}
   180		w := &PipeWriter{p}
   181		return r, w
   182	}

release.r60.3. Except as noted, this content is licensed under a Creative Commons Attribution 3.0 License.