1
2
3
4
5
6
7
8 package io
9
10 import (
11 "os"
12 "sync"
13 )
14
15 type pipeResult struct {
16 n int
17 err os.Error
18 }
19
20
21 type pipe struct {
22 rl sync.Mutex
23 wl sync.Mutex
24 l sync.Mutex
25 data []byte
26 rwait sync.Cond
27 wwait sync.Cond
28 rerr os.Error
29 werr os.Error
30 }
31
32 func (p *pipe) read(b []byte) (n int, err os.Error) {
33
34 p.rl.Lock()
35 defer p.rl.Unlock()
36
37 p.l.Lock()
38 defer p.l.Unlock()
39 for {
40 if p.rerr != nil {
41 return 0, os.EINVAL
42 }
43 if p.data != nil {
44 break
45 }
46 if p.werr != nil {
47 return 0, p.werr
48 }
49 p.rwait.Wait()
50 }
51 n = copy(b, p.data)
52 p.data = p.data[n:]
53 if len(p.data) == 0 {
54 p.data = nil
55 p.wwait.Signal()
56 }
57 return
58 }
59
60 var zero [0]byte
61
62 func (p *pipe) write(b []byte) (n int, err os.Error) {
63
64 if b == nil {
65 b = zero[:]
66 }
67
68
69 p.wl.Lock()
70 defer p.wl.Unlock()
71
72 p.l.Lock()
73 defer p.l.Unlock()
74 p.data = b
75 p.rwait.Signal()
76 for {
77 if p.data == nil {
78 break
79 }
80 if p.rerr != nil {
81 err = p.rerr
82 break
83 }
84 if p.werr != nil {
85 err = os.EINVAL
86 }
87 p.wwait.Wait()
88 }
89 n = len(b) - len(p.data)
90 p.data = nil
91 return
92 }
93
94 func (p *pipe) rclose(err os.Error) {
95 if err == nil {
96 err = os.EPIPE
97 }
98 p.l.Lock()
99 defer p.l.Unlock()
100 p.rerr = err
101 p.rwait.Signal()
102 p.wwait.Signal()
103 }
104
105 func (p *pipe) wclose(err os.Error) {
106 if err == nil {
107 err = os.EOF
108 }
109 p.l.Lock()
110 defer p.l.Unlock()
111 p.werr = err
112 p.rwait.Signal()
113 p.wwait.Signal()
114 }
115
116
117 type PipeReader struct {
118 p *pipe
119 }
120
121
122
123
124
125
126 func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
127 return r.p.read(data)
128 }
129
130
131
132 func (r *PipeReader) Close() os.Error {
133 return r.CloseWithError(nil)
134 }
135
136
137
138 func (r *PipeReader) CloseWithError(err os.Error) os.Error {
139 r.p.rclose(err)
140 return nil
141 }
142
143
144 type PipeWriter struct {
145 p *pipe
146 }
147
148
149
150
151
152
153 func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
154 return w.p.write(data)
155 }
156
157
158
159 func (w *PipeWriter) Close() os.Error {
160 return w.CloseWithError(nil)
161 }
162
163
164
165 func (w *PipeWriter) CloseWithError(err os.Error) os.Error {
166 w.p.wclose(err)
167 return nil
168 }
169
170
171
172
173
174
175 func Pipe() (*PipeReader, *PipeWriter) {
176 p := new(pipe)
177 p.rwait.L = &p.l
178 p.wwait.L = &p.l
179 r := &PipeReader{p}
180 w := &PipeWriter{p}
181 return r, w
182 }