Source file src/pkg/io/pipe.go
1
2
3
4
5
6
7
8 package io
9
10 import (
11 "errors"
12 "sync"
13 )
14
15
16 var ErrClosedPipe = errors.New("io: read/write on closed pipe")
17
18 type pipeResult struct {
19 n int
20 err error
21 }
22
23
24 type pipe struct {
25 rl sync.Mutex
26 wl sync.Mutex
27 l sync.Mutex
28 data []byte
29 rwait sync.Cond
30 wwait sync.Cond
31 rerr error
32 werr error
33 }
34
35 func (p *pipe) read(b []byte) (n int, err error) {
36
37 p.rl.Lock()
38 defer p.rl.Unlock()
39
40 p.l.Lock()
41 defer p.l.Unlock()
42 for {
43 if p.rerr != nil {
44 return 0, ErrClosedPipe
45 }
46 if p.data != nil {
47 break
48 }
49 if p.werr != nil {
50 return 0, p.werr
51 }
52 p.rwait.Wait()
53 }
54 n = copy(b, p.data)
55 p.data = p.data[n:]
56 if len(p.data) == 0 {
57 p.data = nil
58 p.wwait.Signal()
59 }
60 return
61 }
62
63 var zero [0]byte
64
65 func (p *pipe) write(b []byte) (n int, err error) {
66
67 if b == nil {
68 b = zero[:]
69 }
70
71
72 p.wl.Lock()
73 defer p.wl.Unlock()
74
75 p.l.Lock()
76 defer p.l.Unlock()
77 p.data = b
78 p.rwait.Signal()
79 for {
80 if p.data == nil {
81 break
82 }
83 if p.rerr != nil {
84 err = p.rerr
85 break
86 }
87 if p.werr != nil {
88 err = ErrClosedPipe
89 }
90 p.wwait.Wait()
91 }
92 n = len(b) - len(p.data)
93 p.data = nil
94 return
95 }
96
97 func (p *pipe) rclose(err error) {
98 if err == nil {
99 err = ErrClosedPipe
100 }
101 p.l.Lock()
102 defer p.l.Unlock()
103 p.rerr = err
104 p.rwait.Signal()
105 p.wwait.Signal()
106 }
107
108 func (p *pipe) wclose(err error) {
109 if err == nil {
110 err = EOF
111 }
112 p.l.Lock()
113 defer p.l.Unlock()
114 p.werr = err
115 p.rwait.Signal()
116 p.wwait.Signal()
117 }
118
119
120 type PipeReader struct {
121 p *pipe
122 }
123
124
125
126
127
128
129 func (r *PipeReader) Read(data []byte) (n int, err error) {
130 return r.p.read(data)
131 }
132
133
134
135 func (r *PipeReader) Close() error {
136 return r.CloseWithError(nil)
137 }
138
139
140
141 func (r *PipeReader) CloseWithError(err error) error {
142 r.p.rclose(err)
143 return nil
144 }
145
146
147 type PipeWriter struct {
148 p *pipe
149 }
150
151
152
153
154
155
156 func (w *PipeWriter) Write(data []byte) (n int, err error) {
157 return w.p.write(data)
158 }
159
160
161
162 func (w *PipeWriter) Close() error {
163 return w.CloseWithError(nil)
164 }
165
166
167
168 func (w *PipeWriter) CloseWithError(err error) error {
169 w.p.wclose(err)
170 return nil
171 }
172
173
174
175
176
177
178
179
180
181
182 func Pipe() (*PipeReader, *PipeWriter) {
183 p := new(pipe)
184 p.rwait.L = &p.l
185 p.wwait.L = &p.l
186 r := &PipeReader{p}
187 w := &PipeWriter{p}
188 return r, w
189 }
View as plain text