Source file src/pkg/net/http/httputil/persist.go
1
2
3
4
5
6
7 package httputil
8
9 import (
10 "bufio"
11 "errors"
12 "io"
13 "net"
14 "net/http"
15 "net/textproto"
16 "sync"
17 )
18
19 var (
20 ErrPersistEOF = &http.ProtocolError{ErrorString: "persistent connection closed"}
21 ErrClosed = &http.ProtocolError{ErrorString: "connection closed by user"}
22 ErrPipeline = &http.ProtocolError{ErrorString: "pipeline error"}
23 )
24
25
26
27 var errClosed = errors.New("i/o operation on closed connection")
28
29
30
31
32
33
34
35
36
37
38 type ServerConn struct {
39 lk sync.Mutex
40 c net.Conn
41 r *bufio.Reader
42 re, we error
43 lastbody io.ReadCloser
44 nread, nwritten int
45 pipereq map[*http.Request]uint
46
47 pipe textproto.Pipeline
48 }
49
50
51
52 func NewServerConn(c net.Conn, r *bufio.Reader) *ServerConn {
53 if r == nil {
54 r = bufio.NewReader(c)
55 }
56 return &ServerConn{c: c, r: r, pipereq: make(map[*http.Request]uint)}
57 }
58
59
60
61
62
63 func (sc *ServerConn) Hijack() (c net.Conn, r *bufio.Reader) {
64 sc.lk.Lock()
65 defer sc.lk.Unlock()
66 c = sc.c
67 r = sc.r
68 sc.c = nil
69 sc.r = nil
70 return
71 }
72
73
74 func (sc *ServerConn) Close() error {
75 c, _ := sc.Hijack()
76 if c != nil {
77 return c.Close()
78 }
79 return nil
80 }
81
82
83
84
85
86 func (sc *ServerConn) Read() (req *http.Request, err error) {
87
88
89 id := sc.pipe.Next()
90 sc.pipe.StartRequest(id)
91 defer func() {
92 sc.pipe.EndRequest(id)
93 if req == nil {
94 sc.pipe.StartResponse(id)
95 sc.pipe.EndResponse(id)
96 } else {
97
98 sc.lk.Lock()
99 sc.pipereq[req] = id
100 sc.lk.Unlock()
101 }
102 }()
103
104 sc.lk.Lock()
105 if sc.we != nil {
106 defer sc.lk.Unlock()
107 return nil, sc.we
108 }
109 if sc.re != nil {
110 defer sc.lk.Unlock()
111 return nil, sc.re
112 }
113 if sc.r == nil {
114 defer sc.lk.Unlock()
115 return nil, errClosed
116 }
117 r := sc.r
118 lastbody := sc.lastbody
119 sc.lastbody = nil
120 sc.lk.Unlock()
121
122
123 if lastbody != nil {
124
125
126
127 err = lastbody.Close()
128 if err != nil {
129 sc.lk.Lock()
130 defer sc.lk.Unlock()
131 sc.re = err
132 return nil, err
133 }
134 }
135
136 req, err = http.ReadRequest(r)
137 sc.lk.Lock()
138 defer sc.lk.Unlock()
139 if err != nil {
140 if err == io.ErrUnexpectedEOF {
141
142
143
144 sc.re = ErrPersistEOF
145 return nil, sc.re
146 } else {
147 sc.re = err
148 return req, err
149 }
150 }
151 sc.lastbody = req.Body
152 sc.nread++
153 if req.Close {
154 sc.re = ErrPersistEOF
155 return req, sc.re
156 }
157 return req, err
158 }
159
160
161
162 func (sc *ServerConn) Pending() int {
163 sc.lk.Lock()
164 defer sc.lk.Unlock()
165 return sc.nread - sc.nwritten
166 }
167
168
169
170
171 func (sc *ServerConn) Write(req *http.Request, resp *http.Response) error {
172
173
174 sc.lk.Lock()
175 id, ok := sc.pipereq[req]
176 delete(sc.pipereq, req)
177 if !ok {
178 sc.lk.Unlock()
179 return ErrPipeline
180 }
181 sc.lk.Unlock()
182
183
184 sc.pipe.StartResponse(id)
185 defer sc.pipe.EndResponse(id)
186
187 sc.lk.Lock()
188 if sc.we != nil {
189 defer sc.lk.Unlock()
190 return sc.we
191 }
192 if sc.c == nil {
193 defer sc.lk.Unlock()
194 return ErrClosed
195 }
196 c := sc.c
197 if sc.nread <= sc.nwritten {
198 defer sc.lk.Unlock()
199 return errors.New("persist server pipe count")
200 }
201 if resp.Close {
202
203
204
205 sc.re = ErrPersistEOF
206 }
207 sc.lk.Unlock()
208
209 err := resp.Write(c)
210 sc.lk.Lock()
211 defer sc.lk.Unlock()
212 if err != nil {
213 sc.we = err
214 return err
215 }
216 sc.nwritten++
217
218 return nil
219 }
220
221
222
223
224
225
226
227
228 type ClientConn struct {
229 lk sync.Mutex
230 c net.Conn
231 r *bufio.Reader
232 re, we error
233 lastbody io.ReadCloser
234 nread, nwritten int
235 pipereq map[*http.Request]uint
236
237 pipe textproto.Pipeline
238 writeReq func(*http.Request, io.Writer) error
239 }
240
241
242
243 func NewClientConn(c net.Conn, r *bufio.Reader) *ClientConn {
244 if r == nil {
245 r = bufio.NewReader(c)
246 }
247 return &ClientConn{
248 c: c,
249 r: r,
250 pipereq: make(map[*http.Request]uint),
251 writeReq: (*http.Request).Write,
252 }
253 }
254
255
256
257 func NewProxyClientConn(c net.Conn, r *bufio.Reader) *ClientConn {
258 cc := NewClientConn(c, r)
259 cc.writeReq = (*http.Request).WriteProxy
260 return cc
261 }
262
263
264
265
266
267 func (cc *ClientConn) Hijack() (c net.Conn, r *bufio.Reader) {
268 cc.lk.Lock()
269 defer cc.lk.Unlock()
270 c = cc.c
271 r = cc.r
272 cc.c = nil
273 cc.r = nil
274 return
275 }
276
277
278 func (cc *ClientConn) Close() error {
279 c, _ := cc.Hijack()
280 if c != nil {
281 return c.Close()
282 }
283 return nil
284 }
285
286
287
288
289
290
291 func (cc *ClientConn) Write(req *http.Request) (err error) {
292
293
294 id := cc.pipe.Next()
295 cc.pipe.StartRequest(id)
296 defer func() {
297 cc.pipe.EndRequest(id)
298 if err != nil {
299 cc.pipe.StartResponse(id)
300 cc.pipe.EndResponse(id)
301 } else {
302
303 cc.lk.Lock()
304 cc.pipereq[req] = id
305 cc.lk.Unlock()
306 }
307 }()
308
309 cc.lk.Lock()
310 if cc.re != nil {
311 defer cc.lk.Unlock()
312 return cc.re
313 }
314 if cc.we != nil {
315 defer cc.lk.Unlock()
316 return cc.we
317 }
318 if cc.c == nil {
319 defer cc.lk.Unlock()
320 return errClosed
321 }
322 c := cc.c
323 if req.Close {
324
325
326 cc.we = ErrPersistEOF
327 }
328 cc.lk.Unlock()
329
330 err = cc.writeReq(req, c)
331 cc.lk.Lock()
332 defer cc.lk.Unlock()
333 if err != nil {
334 cc.we = err
335 return err
336 }
337 cc.nwritten++
338
339 return nil
340 }
341
342
343
344 func (cc *ClientConn) Pending() int {
345 cc.lk.Lock()
346 defer cc.lk.Unlock()
347 return cc.nwritten - cc.nread
348 }
349
350
351
352
353
354 func (cc *ClientConn) Read(req *http.Request) (resp *http.Response, err error) {
355
356 cc.lk.Lock()
357 id, ok := cc.pipereq[req]
358 delete(cc.pipereq, req)
359 if !ok {
360 cc.lk.Unlock()
361 return nil, ErrPipeline
362 }
363 cc.lk.Unlock()
364
365
366 cc.pipe.StartResponse(id)
367 defer cc.pipe.EndResponse(id)
368
369 cc.lk.Lock()
370 if cc.re != nil {
371 defer cc.lk.Unlock()
372 return nil, cc.re
373 }
374 if cc.r == nil {
375 defer cc.lk.Unlock()
376 return nil, errClosed
377 }
378 r := cc.r
379 lastbody := cc.lastbody
380 cc.lastbody = nil
381 cc.lk.Unlock()
382
383
384 if lastbody != nil {
385
386
387
388 err = lastbody.Close()
389 if err != nil {
390 cc.lk.Lock()
391 defer cc.lk.Unlock()
392 cc.re = err
393 return nil, err
394 }
395 }
396
397 resp, err = http.ReadResponse(r, req)
398 cc.lk.Lock()
399 defer cc.lk.Unlock()
400 if err != nil {
401 cc.re = err
402 return resp, err
403 }
404 cc.lastbody = resp.Body
405
406 cc.nread++
407
408 if resp.Close {
409 cc.re = ErrPersistEOF
410 return resp, cc.re
411 }
412 return resp, err
413 }
414
415
416 func (cc *ClientConn) Do(req *http.Request) (resp *http.Response, err error) {
417 err = cc.Write(req)
418 if err != nil {
419 return
420 }
421 return cc.Read(req)
422 }
View as plain text