1
2
3
4
5 package http
6
7 import (
8 "bufio"
9 "io"
10 "net"
11 "net/textproto"
12 "os"
13 "sync"
14 )
15
16 var (
17 ErrPersistEOF = &ProtocolError{"persistent connection closed"}
18 ErrPipeline = &ProtocolError{"pipeline error"}
19 )
20
21
22
23
24
25
26
27
28
29
30 type ServerConn struct {
31 lk sync.Mutex
32 c net.Conn
33 r *bufio.Reader
34 re, we os.Error
35 lastbody io.ReadCloser
36 nread, nwritten int
37 pipereq map[*Request]uint
38
39 pipe textproto.Pipeline
40 }
41
42
43
44 func NewServerConn(c net.Conn, r *bufio.Reader) *ServerConn {
45 if r == nil {
46 r = bufio.NewReader(c)
47 }
48 return &ServerConn{c: c, r: r, pipereq: make(map[*Request]uint)}
49 }
50
51
52
53
54
55 func (sc *ServerConn) Hijack() (c net.Conn, r *bufio.Reader) {
56 sc.lk.Lock()
57 defer sc.lk.Unlock()
58 c = sc.c
59 r = sc.r
60 sc.c = nil
61 sc.r = nil
62 return
63 }
64
65
66 func (sc *ServerConn) Close() os.Error {
67 c, _ := sc.Hijack()
68 if c != nil {
69 return c.Close()
70 }
71 return nil
72 }
73
74
75
76
77
78 func (sc *ServerConn) Read() (req *Request, err os.Error) {
79
80
81 id := sc.pipe.Next()
82 sc.pipe.StartRequest(id)
83 defer func() {
84 sc.pipe.EndRequest(id)
85 if req == nil {
86 sc.pipe.StartResponse(id)
87 sc.pipe.EndResponse(id)
88 } else {
89
90 sc.lk.Lock()
91 sc.pipereq[req] = id
92 sc.lk.Unlock()
93 }
94 }()
95
96 sc.lk.Lock()
97 if sc.we != nil {
98 defer sc.lk.Unlock()
99 return nil, sc.we
100 }
101 if sc.re != nil {
102 defer sc.lk.Unlock()
103 return nil, sc.re
104 }
105 if sc.r == nil {
106 defer sc.lk.Unlock()
107 return nil, os.EBADF
108 }
109 r := sc.r
110 lastbody := sc.lastbody
111 sc.lastbody = nil
112 sc.lk.Unlock()
113
114
115 if lastbody != nil {
116
117
118
119 err = lastbody.Close()
120 if err != nil {
121 sc.lk.Lock()
122 defer sc.lk.Unlock()
123 sc.re = err
124 return nil, err
125 }
126 }
127
128 req, err = ReadRequest(r)
129 sc.lk.Lock()
130 defer sc.lk.Unlock()
131 if err != nil {
132 if err == io.ErrUnexpectedEOF {
133
134
135
136 sc.re = ErrPersistEOF
137 return nil, sc.re
138 } else {
139 sc.re = err
140 return req, err
141 }
142 }
143 sc.lastbody = req.Body
144 sc.nread++
145 if req.Close {
146 sc.re = ErrPersistEOF
147 return req, sc.re
148 }
149 return req, err
150 }
151
152
153
154 func (sc *ServerConn) Pending() int {
155 sc.lk.Lock()
156 defer sc.lk.Unlock()
157 return sc.nread - sc.nwritten
158 }
159
160
161
162
163 func (sc *ServerConn) Write(req *Request, resp *Response) os.Error {
164
165
166 sc.lk.Lock()
167 id, ok := sc.pipereq[req]
168 sc.pipereq[req] = 0, false
169 if !ok {
170 sc.lk.Unlock()
171 return ErrPipeline
172 }
173 sc.lk.Unlock()
174
175
176 sc.pipe.StartResponse(id)
177 defer sc.pipe.EndResponse(id)
178
179 sc.lk.Lock()
180 if sc.we != nil {
181 defer sc.lk.Unlock()
182 return sc.we
183 }
184 if sc.c == nil {
185 defer sc.lk.Unlock()
186 return os.EBADF
187 }
188 c := sc.c
189 if sc.nread <= sc.nwritten {
190 defer sc.lk.Unlock()
191 return os.NewError("persist server pipe count")
192 }
193 if resp.Close {
194
195
196
197 sc.re = ErrPersistEOF
198 }
199 sc.lk.Unlock()
200
201 err := resp.Write(c)
202 sc.lk.Lock()
203 defer sc.lk.Unlock()
204 if err != nil {
205 sc.we = err
206 return err
207 }
208 sc.nwritten++
209
210 return nil
211 }
212
213
214
215
216
217
218
219
220 type ClientConn struct {
221 lk sync.Mutex
222 c net.Conn
223 r *bufio.Reader
224 re, we os.Error
225 lastbody io.ReadCloser
226 nread, nwritten int
227 pipereq map[*Request]uint
228
229 pipe textproto.Pipeline
230 writeReq func(*Request, io.Writer) os.Error
231 }
232
233
234
235 func NewClientConn(c net.Conn, r *bufio.Reader) *ClientConn {
236 if r == nil {
237 r = bufio.NewReader(c)
238 }
239 return &ClientConn{
240 c: c,
241 r: r,
242 pipereq: make(map[*Request]uint),
243 writeReq: (*Request).Write,
244 }
245 }
246
247
248
249 func NewProxyClientConn(c net.Conn, r *bufio.Reader) *ClientConn {
250 cc := NewClientConn(c, r)
251 cc.writeReq = (*Request).WriteProxy
252 return cc
253 }
254
255
256
257
258
259 func (cc *ClientConn) Hijack() (c net.Conn, r *bufio.Reader) {
260 cc.lk.Lock()
261 defer cc.lk.Unlock()
262 c = cc.c
263 r = cc.r
264 cc.c = nil
265 cc.r = nil
266 return
267 }
268
269
270 func (cc *ClientConn) Close() os.Error {
271 c, _ := cc.Hijack()
272 if c != nil {
273 return c.Close()
274 }
275 return nil
276 }
277
278
279
280
281
282
283 func (cc *ClientConn) Write(req *Request) (err os.Error) {
284
285
286 id := cc.pipe.Next()
287 cc.pipe.StartRequest(id)
288 defer func() {
289 cc.pipe.EndRequest(id)
290 if err != nil {
291 cc.pipe.StartResponse(id)
292 cc.pipe.EndResponse(id)
293 } else {
294
295 cc.lk.Lock()
296 cc.pipereq[req] = id
297 cc.lk.Unlock()
298 }
299 }()
300
301 cc.lk.Lock()
302 if cc.re != nil {
303 defer cc.lk.Unlock()
304 return cc.re
305 }
306 if cc.we != nil {
307 defer cc.lk.Unlock()
308 return cc.we
309 }
310 if cc.c == nil {
311 defer cc.lk.Unlock()
312 return os.EBADF
313 }
314 c := cc.c
315 if req.Close {
316
317
318 cc.we = ErrPersistEOF
319 }
320 cc.lk.Unlock()
321
322 err = cc.writeReq(req, c)
323 cc.lk.Lock()
324 defer cc.lk.Unlock()
325 if err != nil {
326 cc.we = err
327 return err
328 }
329 cc.nwritten++
330
331 return nil
332 }
333
334
335
336 func (cc *ClientConn) Pending() int {
337 cc.lk.Lock()
338 defer cc.lk.Unlock()
339 return cc.nwritten - cc.nread
340 }
341
342
343
344
345
346 func (cc *ClientConn) Read(req *Request) (*Response, os.Error) {
347 return cc.readUsing(req, ReadResponse)
348 }
349
350
351
352 func (cc *ClientConn) readUsing(req *Request, readRes func(*bufio.Reader, *Request) (*Response, os.Error)) (resp *Response, err os.Error) {
353
354 cc.lk.Lock()
355 id, ok := cc.pipereq[req]
356 cc.pipereq[req] = 0, false
357 if !ok {
358 cc.lk.Unlock()
359 return nil, ErrPipeline
360 }
361 cc.lk.Unlock()
362
363
364 cc.pipe.StartResponse(id)
365 defer cc.pipe.EndResponse(id)
366
367 cc.lk.Lock()
368 if cc.re != nil {
369 defer cc.lk.Unlock()
370 return nil, cc.re
371 }
372 if cc.r == nil {
373 defer cc.lk.Unlock()
374 return nil, os.EBADF
375 }
376 r := cc.r
377 lastbody := cc.lastbody
378 cc.lastbody = nil
379 cc.lk.Unlock()
380
381
382 if lastbody != nil {
383
384
385
386 err = lastbody.Close()
387 if err != nil {
388 cc.lk.Lock()
389 defer cc.lk.Unlock()
390 cc.re = err
391 return nil, err
392 }
393 }
394
395 resp, err = readRes(r, req)
396 cc.lk.Lock()
397 defer cc.lk.Unlock()
398 if err != nil {
399 cc.re = err
400 return resp, err
401 }
402 cc.lastbody = resp.Body
403
404 cc.nread++
405
406 if resp.Close {
407 cc.re = ErrPersistEOF
408 return resp, cc.re
409 }
410 return resp, err
411 }
412
413
414 func (cc *ClientConn) Do(req *Request) (resp *Response, err os.Error) {
415 err = cc.Write(req)
416 if err != nil {
417 return
418 }
419 return cc.Read(req)
420 }