Source file src/pkg/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/gzip"
15 "crypto/tls"
16 "encoding/base64"
17 "errors"
18 "fmt"
19 "io"
20 "log"
21 "net"
22 "net/url"
23 "os"
24 "strings"
25 "sync"
26 "time"
27 )
28
29
30
31
32
33
34 var DefaultTransport RoundTripper = &Transport{Proxy: ProxyFromEnvironment}
35
36
37
38 const DefaultMaxIdleConnsPerHost = 2
39
40
41
42
43 type Transport struct {
44 idleMu sync.Mutex
45 idleConn map[string][]*persistConn
46 idleConnCh map[string]chan *persistConn
47 reqMu sync.Mutex
48 reqConn map[*Request]*persistConn
49 altMu sync.RWMutex
50 altProto map[string]RoundTripper
51
52
53
54
55
56 Proxy func(*Request) (*url.URL, error)
57
58
59
60
61 Dial func(network, addr string) (net.Conn, error)
62
63
64
65 TLSClientConfig *tls.Config
66
67
68
69 DisableKeepAlives bool
70
71
72
73
74
75
76
77
78
79 DisableCompression bool
80
81
82
83
84 MaxIdleConnsPerHost int
85
86
87
88
89
90 ResponseHeaderTimeout time.Duration
91
92
93
94 }
95
96
97
98
99
100
101
102 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
103 proxy := getenvEitherCase("HTTP_PROXY")
104 if proxy == "" {
105 return nil, nil
106 }
107 if !useProxy(canonicalAddr(req.URL)) {
108 return nil, nil
109 }
110 proxyURL, err := url.Parse(proxy)
111 if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") {
112 if u, err := url.Parse("http://" + proxy); err == nil {
113 proxyURL = u
114 err = nil
115 }
116 }
117 if err != nil {
118 return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
119 }
120 return proxyURL, nil
121 }
122
123
124
125 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
126 return func(*Request) (*url.URL, error) {
127 return fixedURL, nil
128 }
129 }
130
131
132
133 type transportRequest struct {
134 *Request
135 extra Header
136 }
137
138 func (tr *transportRequest) extraHeaders() Header {
139 if tr.extra == nil {
140 tr.extra = make(Header)
141 }
142 return tr.extra
143 }
144
145
146
147
148
149 func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) {
150 if req.URL == nil {
151 return nil, errors.New("http: nil Request.URL")
152 }
153 if req.Header == nil {
154 return nil, errors.New("http: nil Request.Header")
155 }
156 if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
157 t.altMu.RLock()
158 var rt RoundTripper
159 if t.altProto != nil {
160 rt = t.altProto[req.URL.Scheme]
161 }
162 t.altMu.RUnlock()
163 if rt == nil {
164 return nil, &badStringError{"unsupported protocol scheme", req.URL.Scheme}
165 }
166 return rt.RoundTrip(req)
167 }
168 if req.URL.Host == "" {
169 return nil, errors.New("http: no Host in request URL")
170 }
171 treq := &transportRequest{Request: req}
172 cm, err := t.connectMethodForRequest(treq)
173 if err != nil {
174 return nil, err
175 }
176
177
178
179
180
181 pconn, err := t.getConn(cm)
182 if err != nil {
183 return nil, err
184 }
185
186 return pconn.roundTrip(treq)
187 }
188
189
190
191
192
193
194
195 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
196 if scheme == "http" || scheme == "https" {
197 panic("protocol " + scheme + " already registered")
198 }
199 t.altMu.Lock()
200 defer t.altMu.Unlock()
201 if t.altProto == nil {
202 t.altProto = make(map[string]RoundTripper)
203 }
204 if _, exists := t.altProto[scheme]; exists {
205 panic("protocol " + scheme + " already registered")
206 }
207 t.altProto[scheme] = rt
208 }
209
210
211
212
213
214 func (t *Transport) CloseIdleConnections() {
215 t.idleMu.Lock()
216 m := t.idleConn
217 t.idleConn = nil
218 t.idleMu.Unlock()
219 if m == nil {
220 return
221 }
222 for _, conns := range m {
223 for _, pconn := range conns {
224 pconn.close()
225 }
226 }
227 }
228
229
230
231 func (t *Transport) CancelRequest(req *Request) {
232 t.reqMu.Lock()
233 pc := t.reqConn[req]
234 t.reqMu.Unlock()
235 if pc != nil {
236 pc.conn.Close()
237 }
238 }
239
240
241
242
243
244 func getenvEitherCase(k string) string {
245 if v := os.Getenv(strings.ToUpper(k)); v != "" {
246 return v
247 }
248 return os.Getenv(strings.ToLower(k))
249 }
250
251 func (t *Transport) connectMethodForRequest(treq *transportRequest) (*connectMethod, error) {
252 cm := &connectMethod{
253 targetScheme: treq.URL.Scheme,
254 targetAddr: canonicalAddr(treq.URL),
255 }
256 if t.Proxy != nil {
257 var err error
258 cm.proxyURL, err = t.Proxy(treq.Request)
259 if err != nil {
260 return nil, err
261 }
262 }
263 return cm, nil
264 }
265
266
267
268 func (cm *connectMethod) proxyAuth() string {
269 if cm.proxyURL == nil {
270 return ""
271 }
272 if u := cm.proxyURL.User; u != nil {
273 return "Basic " + base64.URLEncoding.EncodeToString([]byte(u.String()))
274 }
275 return ""
276 }
277
278
279
280
281
282 func (t *Transport) putIdleConn(pconn *persistConn) bool {
283 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
284 pconn.close()
285 return false
286 }
287 if pconn.isBroken() {
288 return false
289 }
290 key := pconn.cacheKey
291 max := t.MaxIdleConnsPerHost
292 if max == 0 {
293 max = DefaultMaxIdleConnsPerHost
294 }
295 t.idleMu.Lock()
296 select {
297 case t.idleConnCh[key] <- pconn:
298
299
300
301
302
303 t.idleMu.Unlock()
304 return true
305 default:
306 }
307 if t.idleConn == nil {
308 t.idleConn = make(map[string][]*persistConn)
309 }
310 if len(t.idleConn[key]) >= max {
311 t.idleMu.Unlock()
312 pconn.close()
313 return false
314 }
315 for _, exist := range t.idleConn[key] {
316 if exist == pconn {
317 log.Fatalf("dup idle pconn %p in freelist", pconn)
318 }
319 }
320 t.idleConn[key] = append(t.idleConn[key], pconn)
321 t.idleMu.Unlock()
322 return true
323 }
324
325 func (t *Transport) getIdleConnCh(cm *connectMethod) chan *persistConn {
326 key := cm.key()
327 t.idleMu.Lock()
328 defer t.idleMu.Unlock()
329 if t.idleConnCh == nil {
330 t.idleConnCh = make(map[string]chan *persistConn)
331 }
332 ch, ok := t.idleConnCh[key]
333 if !ok {
334 ch = make(chan *persistConn)
335 t.idleConnCh[key] = ch
336 }
337 return ch
338 }
339
340 func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) {
341 key := cm.key()
342 t.idleMu.Lock()
343 defer t.idleMu.Unlock()
344 if t.idleConn == nil {
345 return nil
346 }
347 for {
348 pconns, ok := t.idleConn[key]
349 if !ok {
350 return nil
351 }
352 if len(pconns) == 1 {
353 pconn = pconns[0]
354 delete(t.idleConn, key)
355 } else {
356
357
358 pconn = pconns[len(pconns)-1]
359 t.idleConn[key] = pconns[0 : len(pconns)-1]
360 }
361 if !pconn.isBroken() {
362 return
363 }
364 }
365 }
366
367 func (t *Transport) setReqConn(r *Request, pc *persistConn) {
368 t.reqMu.Lock()
369 defer t.reqMu.Unlock()
370 if t.reqConn == nil {
371 t.reqConn = make(map[*Request]*persistConn)
372 }
373 if pc != nil {
374 t.reqConn[r] = pc
375 } else {
376 delete(t.reqConn, r)
377 }
378 }
379
380 func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
381 if t.Dial != nil {
382 return t.Dial(network, addr)
383 }
384 return net.Dial(network, addr)
385 }
386
387
388
389
390
391 func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
392 if pc := t.getIdleConn(cm); pc != nil {
393 return pc, nil
394 }
395
396 type dialRes struct {
397 pc *persistConn
398 err error
399 }
400 dialc := make(chan dialRes)
401 go func() {
402 pc, err := t.dialConn(cm)
403 dialc <- dialRes{pc, err}
404 }()
405
406 idleConnCh := t.getIdleConnCh(cm)
407 select {
408 case v := <-dialc:
409
410 return v.pc, v.err
411 case pc := <-idleConnCh:
412
413
414
415
416
417 go func() {
418 if v := <-dialc; v.err == nil {
419 t.putIdleConn(v.pc)
420 }
421 }()
422 return pc, nil
423 }
424 }
425
426 func (t *Transport) dialConn(cm *connectMethod) (*persistConn, error) {
427 conn, err := t.dial("tcp", cm.addr())
428 if err != nil {
429 if cm.proxyURL != nil {
430 err = fmt.Errorf("http: error connecting to proxy %s: %v", cm.proxyURL, err)
431 }
432 return nil, err
433 }
434
435 pa := cm.proxyAuth()
436
437 pconn := &persistConn{
438 t: t,
439 cacheKey: cm.key(),
440 conn: conn,
441 reqch: make(chan requestAndChan, 50),
442 writech: make(chan writeRequest, 50),
443 closech: make(chan struct{}),
444 }
445
446 switch {
447 case cm.proxyURL == nil:
448
449 case cm.targetScheme == "http":
450 pconn.isProxy = true
451 if pa != "" {
452 pconn.mutateHeaderFunc = func(h Header) {
453 h.Set("Proxy-Authorization", pa)
454 }
455 }
456 case cm.targetScheme == "https":
457 connectReq := &Request{
458 Method: "CONNECT",
459 URL: &url.URL{Opaque: cm.targetAddr},
460 Host: cm.targetAddr,
461 Header: make(Header),
462 }
463 if pa != "" {
464 connectReq.Header.Set("Proxy-Authorization", pa)
465 }
466 connectReq.Write(conn)
467
468
469
470
471 br := bufio.NewReader(conn)
472 resp, err := ReadResponse(br, connectReq)
473 if err != nil {
474 conn.Close()
475 return nil, err
476 }
477 if resp.StatusCode != 200 {
478 f := strings.SplitN(resp.Status, " ", 2)
479 conn.Close()
480 return nil, errors.New(f[1])
481 }
482 }
483
484 if cm.targetScheme == "https" {
485
486 cfg := t.TLSClientConfig
487 if cfg == nil || cfg.ServerName == "" {
488 host := cm.tlsHost()
489 if cfg == nil {
490 cfg = &tls.Config{ServerName: host}
491 } else {
492 clone := *cfg
493 clone.ServerName = host
494 cfg = &clone
495 }
496 }
497 conn = tls.Client(conn, cfg)
498 if err = conn.(*tls.Conn).Handshake(); err != nil {
499 return nil, err
500 }
501 if t.TLSClientConfig == nil || !t.TLSClientConfig.InsecureSkipVerify {
502 if err = conn.(*tls.Conn).VerifyHostname(cm.tlsHost()); err != nil {
503 return nil, err
504 }
505 }
506 pconn.conn = conn
507 }
508
509 pconn.br = bufio.NewReader(pconn.conn)
510 pconn.bw = bufio.NewWriter(pconn.conn)
511 go pconn.readLoop()
512 go pconn.writeLoop()
513 return pconn, nil
514 }
515
516
517
518
519 func useProxy(addr string) bool {
520 if len(addr) == 0 {
521 return true
522 }
523 host, _, err := net.SplitHostPort(addr)
524 if err != nil {
525 return false
526 }
527 if host == "localhost" {
528 return false
529 }
530 if ip := net.ParseIP(host); ip != nil {
531 if ip.IsLoopback() {
532 return false
533 }
534 }
535
536 no_proxy := getenvEitherCase("NO_PROXY")
537 if no_proxy == "*" {
538 return false
539 }
540
541 addr = strings.ToLower(strings.TrimSpace(addr))
542 if hasPort(addr) {
543 addr = addr[:strings.LastIndex(addr, ":")]
544 }
545
546 for _, p := range strings.Split(no_proxy, ",") {
547 p = strings.ToLower(strings.TrimSpace(p))
548 if len(p) == 0 {
549 continue
550 }
551 if hasPort(p) {
552 p = p[:strings.LastIndex(p, ":")]
553 }
554 if addr == p {
555 return false
556 }
557 if p[0] == '.' && (strings.HasSuffix(addr, p) || addr == p[1:]) {
558
559 return false
560 }
561 if p[0] != '.' && strings.HasSuffix(addr, p) && addr[len(addr)-len(p)-1] == '.' {
562
563 return false
564 }
565 }
566 return true
567 }
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583 type connectMethod struct {
584 proxyURL *url.URL
585 targetScheme string
586 targetAddr string
587 }
588
589 func (ck *connectMethod) key() string {
590 return ck.String()
591 }
592
593 func (ck *connectMethod) String() string {
594 proxyStr := ""
595 targetAddr := ck.targetAddr
596 if ck.proxyURL != nil {
597 proxyStr = ck.proxyURL.String()
598 if ck.targetScheme == "http" {
599 targetAddr = ""
600 }
601 }
602 return strings.Join([]string{proxyStr, ck.targetScheme, targetAddr}, "|")
603 }
604
605
606 func (cm *connectMethod) addr() string {
607 if cm.proxyURL != nil {
608 return canonicalAddr(cm.proxyURL)
609 }
610 return cm.targetAddr
611 }
612
613
614
615 func (cm *connectMethod) tlsHost() string {
616 h := cm.targetAddr
617 if hasPort(h) {
618 h = h[:strings.LastIndex(h, ":")]
619 }
620 return h
621 }
622
623
624
625 type persistConn struct {
626 t *Transport
627 cacheKey string
628 conn net.Conn
629 closed bool
630 br *bufio.Reader
631 bw *bufio.Writer
632 reqch chan requestAndChan
633 writech chan writeRequest
634 closech chan struct{}
635 isProxy bool
636
637 lk sync.Mutex
638 numExpectedResponses int
639 broken bool
640
641
642
643 mutateHeaderFunc func(Header)
644 }
645
646 func (pc *persistConn) isBroken() bool {
647 pc.lk.Lock()
648 b := pc.broken
649 pc.lk.Unlock()
650 return b
651 }
652
653 var remoteSideClosedFunc func(error) bool
654
655 func remoteSideClosed(err error) bool {
656 if err == io.EOF {
657 return true
658 }
659 if remoteSideClosedFunc != nil {
660 return remoteSideClosedFunc(err)
661 }
662 return false
663 }
664
665 func (pc *persistConn) readLoop() {
666 defer close(pc.closech)
667 alive := true
668
669 for alive {
670 pb, err := pc.br.Peek(1)
671
672 pc.lk.Lock()
673 if pc.numExpectedResponses == 0 {
674 pc.closeLocked()
675 pc.lk.Unlock()
676 if len(pb) > 0 {
677 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v",
678 string(pb), err)
679 }
680 return
681 }
682 pc.lk.Unlock()
683
684 rc := <-pc.reqch
685
686 var resp *Response
687 if err == nil {
688 resp, err = ReadResponse(pc.br, rc.req)
689 if err == nil && resp.StatusCode == 100 {
690
691
692
693
694
695 resp, err = ReadResponse(pc.br, rc.req)
696 }
697 }
698 hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0
699
700 if err != nil {
701 pc.close()
702 } else {
703 if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" {
704 resp.Header.Del("Content-Encoding")
705 resp.Header.Del("Content-Length")
706 resp.ContentLength = -1
707 gzReader, zerr := gzip.NewReader(resp.Body)
708 if zerr != nil {
709 pc.close()
710 err = zerr
711 } else {
712 resp.Body = &readerAndCloser{gzReader, resp.Body}
713 }
714 }
715 resp.Body = &bodyEOFSignal{body: resp.Body}
716 }
717
718 if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 {
719
720
721
722 alive = false
723 }
724
725 var waitForBodyRead chan bool
726 if hasBody {
727 waitForBodyRead = make(chan bool, 2)
728 resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
729
730
731
732 waitForBodyRead <- false
733 return nil
734 }
735 resp.Body.(*bodyEOFSignal).fn = func(err error) {
736 alive1 := alive
737 if err != nil {
738 alive1 = false
739 }
740 if alive1 && !pc.t.putIdleConn(pc) {
741 alive1 = false
742 }
743 if !alive1 || pc.isBroken() {
744 pc.close()
745 }
746 waitForBodyRead <- alive1
747 }
748 }
749
750 if alive && !hasBody {
751 if !pc.t.putIdleConn(pc) {
752 alive = false
753 }
754 }
755
756 rc.ch <- responseAndError{resp, err}
757
758
759
760 if waitForBodyRead != nil {
761 alive = <-waitForBodyRead
762 }
763
764 pc.t.setReqConn(rc.req, nil)
765
766 if !alive {
767 pc.close()
768 }
769 }
770 }
771
772 func (pc *persistConn) writeLoop() {
773 for {
774 select {
775 case wr := <-pc.writech:
776 if pc.isBroken() {
777 wr.ch <- errors.New("http: can't write HTTP request on broken connection")
778 continue
779 }
780 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra)
781 if err == nil {
782 err = pc.bw.Flush()
783 }
784 if err != nil {
785 pc.markBroken()
786 }
787 wr.ch <- err
788 case <-pc.closech:
789 return
790 }
791 }
792 }
793
794 type responseAndError struct {
795 res *Response
796 err error
797 }
798
799 type requestAndChan struct {
800 req *Request
801 ch chan responseAndError
802
803
804
805
806 addedGzip bool
807 }
808
809
810
811
812
813 type writeRequest struct {
814 req *transportRequest
815 ch chan<- error
816 }
817
818 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
819 pc.t.setReqConn(req.Request, pc)
820 pc.lk.Lock()
821 pc.numExpectedResponses++
822 headerFn := pc.mutateHeaderFunc
823 pc.lk.Unlock()
824
825 if headerFn != nil {
826 headerFn(req.extraHeaders())
827 }
828
829
830
831
832
833 requestedGzip := false
834 if !pc.t.DisableCompression && req.Header.Get("Accept-Encoding") == "" {
835
836
837
838 requestedGzip = true
839 req.extraHeaders().Set("Accept-Encoding", "gzip")
840 }
841
842
843
844
845 writeErrCh := make(chan error, 1)
846 pc.writech <- writeRequest{req, writeErrCh}
847
848 resc := make(chan responseAndError, 1)
849 pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
850
851 var re responseAndError
852 var pconnDeadCh = pc.closech
853 var failTicker <-chan time.Time
854 var respHeaderTimer <-chan time.Time
855 WaitResponse:
856 for {
857 select {
858 case err := <-writeErrCh:
859 if err != nil {
860 re = responseAndError{nil, err}
861 pc.close()
862 break WaitResponse
863 }
864 if d := pc.t.ResponseHeaderTimeout; d > 0 {
865 respHeaderTimer = time.After(d)
866 }
867 case <-pconnDeadCh:
868
869
870
871
872
873
874
875
876
877
878
879
880 pconnDeadCh = nil
881 failTicker = time.After(100 * time.Millisecond)
882 case <-failTicker:
883 re = responseAndError{err: errors.New("net/http: transport closed before response was received")}
884 break WaitResponse
885 case <-respHeaderTimer:
886 pc.close()
887 re = responseAndError{err: errors.New("net/http: timeout awaiting response headers")}
888 break WaitResponse
889 case re = <-resc:
890 break WaitResponse
891 }
892 }
893
894 pc.lk.Lock()
895 pc.numExpectedResponses--
896 pc.lk.Unlock()
897
898 if re.err != nil {
899 pc.t.setReqConn(req.Request, nil)
900 }
901 return re.res, re.err
902 }
903
904
905
906
907 func (pc *persistConn) markBroken() {
908 pc.lk.Lock()
909 defer pc.lk.Unlock()
910 pc.broken = true
911 }
912
913 func (pc *persistConn) close() {
914 pc.lk.Lock()
915 defer pc.lk.Unlock()
916 pc.closeLocked()
917 }
918
919 func (pc *persistConn) closeLocked() {
920 pc.broken = true
921 if !pc.closed {
922 pc.conn.Close()
923 pc.closed = true
924 }
925 pc.mutateHeaderFunc = nil
926 }
927
928 var portMap = map[string]string{
929 "http": "80",
930 "https": "443",
931 }
932
933
934 func canonicalAddr(url *url.URL) string {
935 addr := url.Host
936 if !hasPort(addr) {
937 return addr + ":" + portMap[url.Scheme]
938 }
939 return addr
940 }
941
942
943
944
945
946
947 type bodyEOFSignal struct {
948 body io.ReadCloser
949 mu sync.Mutex
950 closed bool
951 rerr error
952 fn func(error)
953 earlyCloseFn func() error
954 }
955
956 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
957 es.mu.Lock()
958 closed, rerr := es.closed, es.rerr
959 es.mu.Unlock()
960 if closed {
961 return 0, errors.New("http: read on closed response body")
962 }
963 if rerr != nil {
964 return 0, rerr
965 }
966
967 n, err = es.body.Read(p)
968 if err != nil {
969 es.mu.Lock()
970 defer es.mu.Unlock()
971 if es.rerr == nil {
972 es.rerr = err
973 }
974 es.condfn(err)
975 }
976 return
977 }
978
979 func (es *bodyEOFSignal) Close() error {
980 es.mu.Lock()
981 defer es.mu.Unlock()
982 if es.closed {
983 return nil
984 }
985 es.closed = true
986 if es.earlyCloseFn != nil && es.rerr != io.EOF {
987 return es.earlyCloseFn()
988 }
989 err := es.body.Close()
990 es.condfn(err)
991 return err
992 }
993
994
995 func (es *bodyEOFSignal) condfn(err error) {
996 if es.fn == nil {
997 return
998 }
999 if err == io.EOF {
1000 err = nil
1001 }
1002 es.fn(err)
1003 es.fn = nil
1004 }
1005
1006 type readerAndCloser struct {
1007 io.Reader
1008 io.Closer
1009 }
View as plain text