Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/gzip"
15 "container/list"
16 "context"
17 "crypto/tls"
18 "errors"
19 "fmt"
20 "io"
21 "log"
22 "net"
23 "net/http/httptrace"
24 "net/textproto"
25 "net/url"
26 "os"
27 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "golang.org/x/net/http/httpguts"
34 "golang.org/x/net/http/httpproxy"
35 )
36
37
38
39
40
41
42 var DefaultTransport RoundTripper = &Transport{
43 Proxy: ProxyFromEnvironment,
44 DialContext: (&net.Dialer{
45 Timeout: 30 * time.Second,
46 KeepAlive: 30 * time.Second,
47 }).DialContext,
48 ForceAttemptHTTP2: true,
49 MaxIdleConns: 100,
50 IdleConnTimeout: 90 * time.Second,
51 TLSHandshakeTimeout: 10 * time.Second,
52 ExpectContinueTimeout: 1 * time.Second,
53 }
54
55
56
57 const DefaultMaxIdleConnsPerHost = 2
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 type Transport struct {
95 idleMu sync.Mutex
96 closeIdle bool
97 idleConn map[connectMethodKey][]*persistConn
98 idleConnWait map[connectMethodKey]wantConnQueue
99 idleLRU connLRU
100
101 reqMu sync.Mutex
102 reqCanceler map[cancelKey]func(error)
103
104 altMu sync.Mutex
105 altProto atomic.Value
106
107 connsPerHostMu sync.Mutex
108 connsPerHost map[connectMethodKey]int
109 connsPerHostWait map[connectMethodKey]wantConnQueue
110
111
112
113
114
115
116
117
118
119
120 Proxy func(*Request) (*url.URL, error)
121
122
123
124
125
126
127
128
129
130 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
131
132
133
134
135
136
137
138
139
140
141
142 Dial func(network, addr string) (net.Conn, error)
143
144
145
146
147
148
149
150
151
152
153
154 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
155
156
157
158
159
160
161
162 DialTLS func(network, addr string) (net.Conn, error)
163
164
165
166
167
168 TLSClientConfig *tls.Config
169
170
171
172 TLSHandshakeTimeout time.Duration
173
174
175
176
177
178
179 DisableKeepAlives bool
180
181
182
183
184
185
186
187
188
189 DisableCompression bool
190
191
192
193 MaxIdleConns int
194
195
196
197
198 MaxIdleConnsPerHost int
199
200
201
202
203
204
205 MaxConnsPerHost int
206
207
208
209
210
211 IdleConnTimeout time.Duration
212
213
214
215
216
217 ResponseHeaderTimeout time.Duration
218
219
220
221
222
223
224
225
226 ExpectContinueTimeout time.Duration
227
228
229
230
231
232
233
234
235
236
237
238 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
239
240
241
242
243 ProxyConnectHeader Header
244
245
246
247
248
249
250
251
252 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
253
254
255
256
257
258
259 MaxResponseHeaderBytes int64
260
261
262
263
264 WriteBufferSize int
265
266
267
268
269 ReadBufferSize int
270
271
272
273 nextProtoOnce sync.Once
274 h2transport h2Transport
275 tlsNextProtoWasNil bool
276
277
278
279
280
281
282 ForceAttemptHTTP2 bool
283 }
284
285
286
287
288 type cancelKey struct {
289 req *Request
290 }
291
292 func (t *Transport) writeBufferSize() int {
293 if t.WriteBufferSize > 0 {
294 return t.WriteBufferSize
295 }
296 return 4 << 10
297 }
298
299 func (t *Transport) readBufferSize() int {
300 if t.ReadBufferSize > 0 {
301 return t.ReadBufferSize
302 }
303 return 4 << 10
304 }
305
306
307 func (t *Transport) Clone() *Transport {
308 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
309 t2 := &Transport{
310 Proxy: t.Proxy,
311 DialContext: t.DialContext,
312 Dial: t.Dial,
313 DialTLS: t.DialTLS,
314 DialTLSContext: t.DialTLSContext,
315 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
316 DisableKeepAlives: t.DisableKeepAlives,
317 DisableCompression: t.DisableCompression,
318 MaxIdleConns: t.MaxIdleConns,
319 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
320 MaxConnsPerHost: t.MaxConnsPerHost,
321 IdleConnTimeout: t.IdleConnTimeout,
322 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
323 ExpectContinueTimeout: t.ExpectContinueTimeout,
324 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
325 GetProxyConnectHeader: t.GetProxyConnectHeader,
326 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
327 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
328 WriteBufferSize: t.WriteBufferSize,
329 ReadBufferSize: t.ReadBufferSize,
330 }
331 if t.TLSClientConfig != nil {
332 t2.TLSClientConfig = t.TLSClientConfig.Clone()
333 }
334 if !t.tlsNextProtoWasNil {
335 npm := map[string]func(authority string, c *tls.Conn) RoundTripper{}
336 for k, v := range t.TLSNextProto {
337 npm[k] = v
338 }
339 t2.TLSNextProto = npm
340 }
341 return t2
342 }
343
344
345
346
347
348
349
350 type h2Transport interface {
351 CloseIdleConnections()
352 }
353
354 func (t *Transport) hasCustomTLSDialer() bool {
355 return t.DialTLS != nil || t.DialTLSContext != nil
356 }
357
358
359
360 func (t *Transport) onceSetNextProtoDefaults() {
361 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
362 if strings.Contains(os.Getenv("GODEBUG"), "http2client=0") {
363 return
364 }
365
366
367
368
369
370
371 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
372 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
373 if v := rv.Field(0); v.CanInterface() {
374 if h2i, ok := v.Interface().(h2Transport); ok {
375 t.h2transport = h2i
376 return
377 }
378 }
379 }
380
381 if t.TLSNextProto != nil {
382
383
384 return
385 }
386 if !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()) {
387
388
389
390
391
392
393 return
394 }
395 if omitBundledHTTP2 {
396 return
397 }
398 t2, err := http2configureTransports(t)
399 if err != nil {
400 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
401 return
402 }
403 t.h2transport = t2
404
405
406
407
408
409
410
411 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
412 const h2max = 1<<32 - 1
413 if limit1 >= h2max {
414 t2.MaxHeaderListSize = h2max
415 } else {
416 t2.MaxHeaderListSize = uint32(limit1)
417 }
418 }
419 }
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
438 return envProxyFunc()(req.URL)
439 }
440
441
442
443 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
444 return func(*Request) (*url.URL, error) {
445 return fixedURL, nil
446 }
447 }
448
449
450
451
452 type transportRequest struct {
453 *Request
454 extra Header
455 trace *httptrace.ClientTrace
456 cancelKey cancelKey
457
458 mu sync.Mutex
459 err error
460 }
461
462 func (tr *transportRequest) extraHeaders() Header {
463 if tr.extra == nil {
464 tr.extra = make(Header)
465 }
466 return tr.extra
467 }
468
469 func (tr *transportRequest) setError(err error) {
470 tr.mu.Lock()
471 if tr.err == nil {
472 tr.err = err
473 }
474 tr.mu.Unlock()
475 }
476
477
478
479 func (t *Transport) useRegisteredProtocol(req *Request) bool {
480 if req.URL.Scheme == "https" && req.requiresHTTP1() {
481
482
483
484
485 return false
486 }
487 return true
488 }
489
490
491
492
493 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
494 if !t.useRegisteredProtocol(req) {
495 return nil
496 }
497 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
498 return altProto[req.URL.Scheme]
499 }
500
501
502 func (t *Transport) roundTrip(req *Request) (*Response, error) {
503 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
504 ctx := req.Context()
505 trace := httptrace.ContextClientTrace(ctx)
506
507 if req.URL == nil {
508 req.closeBody()
509 return nil, errors.New("http: nil Request.URL")
510 }
511 if req.Header == nil {
512 req.closeBody()
513 return nil, errors.New("http: nil Request.Header")
514 }
515 scheme := req.URL.Scheme
516 isHTTP := scheme == "http" || scheme == "https"
517 if isHTTP {
518 for k, vv := range req.Header {
519 if !httpguts.ValidHeaderFieldName(k) {
520 req.closeBody()
521 return nil, fmt.Errorf("net/http: invalid header field name %q", k)
522 }
523 for _, v := range vv {
524 if !httpguts.ValidHeaderFieldValue(v) {
525 req.closeBody()
526 return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
527 }
528 }
529 }
530 }
531
532 origReq := req
533 cancelKey := cancelKey{origReq}
534 req = setupRewindBody(req)
535
536 if altRT := t.alternateRoundTripper(req); altRT != nil {
537 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
538 return resp, err
539 }
540 var err error
541 req, err = rewindBody(req)
542 if err != nil {
543 return nil, err
544 }
545 }
546 if !isHTTP {
547 req.closeBody()
548 return nil, badStringError("unsupported protocol scheme", scheme)
549 }
550 if req.Method != "" && !validMethod(req.Method) {
551 req.closeBody()
552 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
553 }
554 if req.URL.Host == "" {
555 req.closeBody()
556 return nil, errors.New("http: no Host in request URL")
557 }
558
559 for {
560 select {
561 case <-ctx.Done():
562 req.closeBody()
563 return nil, ctx.Err()
564 default:
565 }
566
567
568 treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
569 cm, err := t.connectMethodForRequest(treq)
570 if err != nil {
571 req.closeBody()
572 return nil, err
573 }
574
575
576
577
578
579 pconn, err := t.getConn(treq, cm)
580 if err != nil {
581 t.setReqCanceler(cancelKey, nil)
582 req.closeBody()
583 return nil, err
584 }
585
586 var resp *Response
587 if pconn.alt != nil {
588
589 t.setReqCanceler(cancelKey, nil)
590 resp, err = pconn.alt.RoundTrip(req)
591 } else {
592 resp, err = pconn.roundTrip(treq)
593 }
594 if err == nil {
595 resp.Request = origReq
596 return resp, nil
597 }
598
599
600 if http2isNoCachedConnError(err) {
601 if t.removeIdleConn(pconn) {
602 t.decConnsPerHost(pconn.cacheKey)
603 }
604 } else if !pconn.shouldRetryRequest(req, err) {
605
606
607 if e, ok := err.(transportReadFromServerError); ok {
608 err = e.err
609 }
610 return nil, err
611 }
612 testHookRoundTripRetried()
613
614
615 req, err = rewindBody(req)
616 if err != nil {
617 return nil, err
618 }
619 }
620 }
621
622 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
623
624 type readTrackingBody struct {
625 io.ReadCloser
626 didRead bool
627 didClose bool
628 }
629
630 func (r *readTrackingBody) Read(data []byte) (int, error) {
631 r.didRead = true
632 return r.ReadCloser.Read(data)
633 }
634
635 func (r *readTrackingBody) Close() error {
636 r.didClose = true
637 return r.ReadCloser.Close()
638 }
639
640
641
642
643
644 func setupRewindBody(req *Request) *Request {
645 if req.Body == nil || req.Body == NoBody {
646 return req
647 }
648 newReq := *req
649 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
650 return &newReq
651 }
652
653
654
655
656
657 func rewindBody(req *Request) (rewound *Request, err error) {
658 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose) {
659 return req, nil
660 }
661 if !req.Body.(*readTrackingBody).didClose {
662 req.closeBody()
663 }
664 if req.GetBody == nil {
665 return nil, errCannotRewind
666 }
667 body, err := req.GetBody()
668 if err != nil {
669 return nil, err
670 }
671 newReq := *req
672 newReq.Body = &readTrackingBody{ReadCloser: body}
673 return &newReq, nil
674 }
675
676
677
678
679 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
680 if http2isNoCachedConnError(err) {
681
682
683
684
685
686
687 return true
688 }
689 if err == errMissingHost {
690
691 return false
692 }
693 if !pc.isReused() {
694
695
696
697
698
699
700
701 return false
702 }
703 if _, ok := err.(nothingWrittenError); ok {
704
705
706 return req.outgoingLength() == 0 || req.GetBody != nil
707 }
708 if !req.isReplayable() {
709
710 return false
711 }
712 if _, ok := err.(transportReadFromServerError); ok {
713
714
715 return true
716 }
717 if err == errServerClosedIdle {
718
719
720
721 return true
722 }
723 return false
724 }
725
726
727 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
728
729
730
731
732
733
734
735
736
737
738
739 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
740 t.altMu.Lock()
741 defer t.altMu.Unlock()
742 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
743 if _, exists := oldMap[scheme]; exists {
744 panic("protocol " + scheme + " already registered")
745 }
746 newMap := make(map[string]RoundTripper)
747 for k, v := range oldMap {
748 newMap[k] = v
749 }
750 newMap[scheme] = rt
751 t.altProto.Store(newMap)
752 }
753
754
755
756
757
758 func (t *Transport) CloseIdleConnections() {
759 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
760 t.idleMu.Lock()
761 m := t.idleConn
762 t.idleConn = nil
763 t.closeIdle = true
764 t.idleLRU = connLRU{}
765 t.idleMu.Unlock()
766 for _, conns := range m {
767 for _, pconn := range conns {
768 pconn.close(errCloseIdleConns)
769 }
770 }
771 if t2 := t.h2transport; t2 != nil {
772 t2.CloseIdleConnections()
773 }
774 }
775
776
777
778
779
780
781
782 func (t *Transport) CancelRequest(req *Request) {
783 t.cancelRequest(cancelKey{req}, errRequestCanceled)
784 }
785
786
787
788 func (t *Transport) cancelRequest(key cancelKey, err error) bool {
789 t.reqMu.Lock()
790 cancel := t.reqCanceler[key]
791 delete(t.reqCanceler, key)
792 t.reqMu.Unlock()
793 if cancel != nil {
794 cancel(err)
795 }
796
797 return cancel != nil
798 }
799
800
801
802
803
804 var (
805
806 envProxyOnce sync.Once
807 envProxyFuncValue func(*url.URL) (*url.URL, error)
808 )
809
810
811
812
813 func envProxyFunc() func(*url.URL) (*url.URL, error) {
814 envProxyOnce.Do(func() {
815 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
816 })
817 return envProxyFuncValue
818 }
819
820
821 func resetProxyConfig() {
822 envProxyOnce = sync.Once{}
823 envProxyFuncValue = nil
824 }
825
826 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
827 cm.targetScheme = treq.URL.Scheme
828 cm.targetAddr = canonicalAddr(treq.URL)
829 if t.Proxy != nil {
830 cm.proxyURL, err = t.Proxy(treq.Request)
831 }
832 cm.onlyH1 = treq.requiresHTTP1()
833 return cm, err
834 }
835
836
837
838 func (cm *connectMethod) proxyAuth() string {
839 if cm.proxyURL == nil {
840 return ""
841 }
842 if u := cm.proxyURL.User; u != nil {
843 username := u.Username()
844 password, _ := u.Password()
845 return "Basic " + basicAuth(username, password)
846 }
847 return ""
848 }
849
850
851 var (
852 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
853 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
854 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
855 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
856 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
857 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
858 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
859 errIdleConnTimeout = errors.New("http: idle connection timeout")
860
861
862
863
864
865 errServerClosedIdle = errors.New("http: server closed idle connection")
866 )
867
868
869
870
871
872
873
874
875
876 type transportReadFromServerError struct {
877 err error
878 }
879
880 func (e transportReadFromServerError) Unwrap() error { return e.err }
881
882 func (e transportReadFromServerError) Error() string {
883 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
884 }
885
886 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
887 if err := t.tryPutIdleConn(pconn); err != nil {
888 pconn.close(err)
889 }
890 }
891
892 func (t *Transport) maxIdleConnsPerHost() int {
893 if v := t.MaxIdleConnsPerHost; v != 0 {
894 return v
895 }
896 return DefaultMaxIdleConnsPerHost
897 }
898
899
900
901
902
903
904 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
905 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
906 return errKeepAlivesDisabled
907 }
908 if pconn.isBroken() {
909 return errConnBroken
910 }
911 pconn.markReused()
912
913 t.idleMu.Lock()
914 defer t.idleMu.Unlock()
915
916
917
918
919 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
920 return nil
921 }
922
923
924
925
926
927 key := pconn.cacheKey
928 if q, ok := t.idleConnWait[key]; ok {
929 done := false
930 if pconn.alt == nil {
931
932
933 for q.len() > 0 {
934 w := q.popFront()
935 if w.tryDeliver(pconn, nil) {
936 done = true
937 break
938 }
939 }
940 } else {
941
942
943
944
945 for q.len() > 0 {
946 w := q.popFront()
947 w.tryDeliver(pconn, nil)
948 }
949 }
950 if q.len() == 0 {
951 delete(t.idleConnWait, key)
952 } else {
953 t.idleConnWait[key] = q
954 }
955 if done {
956 return nil
957 }
958 }
959
960 if t.closeIdle {
961 return errCloseIdle
962 }
963 if t.idleConn == nil {
964 t.idleConn = make(map[connectMethodKey][]*persistConn)
965 }
966 idles := t.idleConn[key]
967 if len(idles) >= t.maxIdleConnsPerHost() {
968 return errTooManyIdleHost
969 }
970 for _, exist := range idles {
971 if exist == pconn {
972 log.Fatalf("dup idle pconn %p in freelist", pconn)
973 }
974 }
975 t.idleConn[key] = append(idles, pconn)
976 t.idleLRU.add(pconn)
977 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
978 oldest := t.idleLRU.removeOldest()
979 oldest.close(errTooManyIdle)
980 t.removeIdleConnLocked(oldest)
981 }
982
983
984
985
986 if t.IdleConnTimeout > 0 && pconn.alt == nil {
987 if pconn.idleTimer != nil {
988 pconn.idleTimer.Reset(t.IdleConnTimeout)
989 } else {
990 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
991 }
992 }
993 pconn.idleAt = time.Now()
994 return nil
995 }
996
997
998
999
1000 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1001 if t.DisableKeepAlives {
1002 return false
1003 }
1004
1005 t.idleMu.Lock()
1006 defer t.idleMu.Unlock()
1007
1008
1009
1010 t.closeIdle = false
1011
1012 if w == nil {
1013
1014 return false
1015 }
1016
1017
1018
1019
1020 var oldTime time.Time
1021 if t.IdleConnTimeout > 0 {
1022 oldTime = time.Now().Add(-t.IdleConnTimeout)
1023 }
1024
1025
1026 if list, ok := t.idleConn[w.key]; ok {
1027 stop := false
1028 delivered := false
1029 for len(list) > 0 && !stop {
1030 pconn := list[len(list)-1]
1031
1032
1033
1034
1035 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1036 if tooOld {
1037
1038
1039
1040 go pconn.closeConnIfStillIdle()
1041 }
1042 if pconn.isBroken() || tooOld {
1043
1044
1045
1046
1047
1048 list = list[:len(list)-1]
1049 continue
1050 }
1051 delivered = w.tryDeliver(pconn, nil)
1052 if delivered {
1053 if pconn.alt != nil {
1054
1055
1056 } else {
1057
1058
1059 t.idleLRU.remove(pconn)
1060 list = list[:len(list)-1]
1061 }
1062 }
1063 stop = true
1064 }
1065 if len(list) > 0 {
1066 t.idleConn[w.key] = list
1067 } else {
1068 delete(t.idleConn, w.key)
1069 }
1070 if stop {
1071 return delivered
1072 }
1073 }
1074
1075
1076 if t.idleConnWait == nil {
1077 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1078 }
1079 q := t.idleConnWait[w.key]
1080 q.cleanFront()
1081 q.pushBack(w)
1082 t.idleConnWait[w.key] = q
1083 return false
1084 }
1085
1086
1087 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1088 t.idleMu.Lock()
1089 defer t.idleMu.Unlock()
1090 return t.removeIdleConnLocked(pconn)
1091 }
1092
1093
1094 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1095 if pconn.idleTimer != nil {
1096 pconn.idleTimer.Stop()
1097 }
1098 t.idleLRU.remove(pconn)
1099 key := pconn.cacheKey
1100 pconns := t.idleConn[key]
1101 var removed bool
1102 switch len(pconns) {
1103 case 0:
1104
1105 case 1:
1106 if pconns[0] == pconn {
1107 delete(t.idleConn, key)
1108 removed = true
1109 }
1110 default:
1111 for i, v := range pconns {
1112 if v != pconn {
1113 continue
1114 }
1115
1116
1117 copy(pconns[i:], pconns[i+1:])
1118 t.idleConn[key] = pconns[:len(pconns)-1]
1119 removed = true
1120 break
1121 }
1122 }
1123 return removed
1124 }
1125
1126 func (t *Transport) setReqCanceler(key cancelKey, fn func(error)) {
1127 t.reqMu.Lock()
1128 defer t.reqMu.Unlock()
1129 if t.reqCanceler == nil {
1130 t.reqCanceler = make(map[cancelKey]func(error))
1131 }
1132 if fn != nil {
1133 t.reqCanceler[key] = fn
1134 } else {
1135 delete(t.reqCanceler, key)
1136 }
1137 }
1138
1139
1140
1141
1142
1143 func (t *Transport) replaceReqCanceler(key cancelKey, fn func(error)) bool {
1144 t.reqMu.Lock()
1145 defer t.reqMu.Unlock()
1146 _, ok := t.reqCanceler[key]
1147 if !ok {
1148 return false
1149 }
1150 if fn != nil {
1151 t.reqCanceler[key] = fn
1152 } else {
1153 delete(t.reqCanceler, key)
1154 }
1155 return true
1156 }
1157
1158 var zeroDialer net.Dialer
1159
1160 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1161 if t.DialContext != nil {
1162 return t.DialContext(ctx, network, addr)
1163 }
1164 if t.Dial != nil {
1165 c, err := t.Dial(network, addr)
1166 if c == nil && err == nil {
1167 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1168 }
1169 return c, err
1170 }
1171 return zeroDialer.DialContext(ctx, network, addr)
1172 }
1173
1174
1175
1176
1177
1178
1179
1180 type wantConn struct {
1181 cm connectMethod
1182 key connectMethodKey
1183 ctx context.Context
1184 ready chan struct{}
1185
1186
1187
1188
1189 beforeDial func()
1190 afterDial func()
1191
1192 mu sync.Mutex
1193 pc *persistConn
1194 err error
1195 }
1196
1197
1198 func (w *wantConn) waiting() bool {
1199 select {
1200 case <-w.ready:
1201 return false
1202 default:
1203 return true
1204 }
1205 }
1206
1207
1208 func (w *wantConn) tryDeliver(pc *persistConn, err error) bool {
1209 w.mu.Lock()
1210 defer w.mu.Unlock()
1211
1212 if w.pc != nil || w.err != nil {
1213 return false
1214 }
1215
1216 w.pc = pc
1217 w.err = err
1218 if w.pc == nil && w.err == nil {
1219 panic("net/http: internal error: misuse of tryDeliver")
1220 }
1221 close(w.ready)
1222 return true
1223 }
1224
1225
1226
1227 func (w *wantConn) cancel(t *Transport, err error) {
1228 w.mu.Lock()
1229 if w.pc == nil && w.err == nil {
1230 close(w.ready)
1231 }
1232 pc := w.pc
1233 w.pc = nil
1234 w.err = err
1235 w.mu.Unlock()
1236
1237 if pc != nil {
1238 t.putOrCloseIdleConn(pc)
1239 }
1240 }
1241
1242
1243 type wantConnQueue struct {
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254 head []*wantConn
1255 headPos int
1256 tail []*wantConn
1257 }
1258
1259
1260 func (q *wantConnQueue) len() int {
1261 return len(q.head) - q.headPos + len(q.tail)
1262 }
1263
1264
1265 func (q *wantConnQueue) pushBack(w *wantConn) {
1266 q.tail = append(q.tail, w)
1267 }
1268
1269
1270 func (q *wantConnQueue) popFront() *wantConn {
1271 if q.headPos >= len(q.head) {
1272 if len(q.tail) == 0 {
1273 return nil
1274 }
1275
1276 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1277 }
1278 w := q.head[q.headPos]
1279 q.head[q.headPos] = nil
1280 q.headPos++
1281 return w
1282 }
1283
1284
1285 func (q *wantConnQueue) peekFront() *wantConn {
1286 if q.headPos < len(q.head) {
1287 return q.head[q.headPos]
1288 }
1289 if len(q.tail) > 0 {
1290 return q.tail[0]
1291 }
1292 return nil
1293 }
1294
1295
1296
1297 func (q *wantConnQueue) cleanFront() (cleaned bool) {
1298 for {
1299 w := q.peekFront()
1300 if w == nil || w.waiting() {
1301 return cleaned
1302 }
1303 q.popFront()
1304 cleaned = true
1305 }
1306 }
1307
1308 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1309 if t.DialTLSContext != nil {
1310 conn, err = t.DialTLSContext(ctx, network, addr)
1311 } else {
1312 conn, err = t.DialTLS(network, addr)
1313 }
1314 if conn == nil && err == nil {
1315 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1316 }
1317 return
1318 }
1319
1320
1321
1322
1323
1324 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
1325 req := treq.Request
1326 trace := treq.trace
1327 ctx := req.Context()
1328 if trace != nil && trace.GetConn != nil {
1329 trace.GetConn(cm.addr())
1330 }
1331
1332 w := &wantConn{
1333 cm: cm,
1334 key: cm.key(),
1335 ctx: ctx,
1336 ready: make(chan struct{}, 1),
1337 beforeDial: testHookPrePendingDial,
1338 afterDial: testHookPostPendingDial,
1339 }
1340 defer func() {
1341 if err != nil {
1342 w.cancel(t, err)
1343 }
1344 }()
1345
1346
1347 if delivered := t.queueForIdleConn(w); delivered {
1348 pc := w.pc
1349
1350
1351 if pc.alt == nil && trace != nil && trace.GotConn != nil {
1352 trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
1353 }
1354
1355
1356
1357 t.setReqCanceler(treq.cancelKey, func(error) {})
1358 return pc, nil
1359 }
1360
1361 cancelc := make(chan error, 1)
1362 t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })
1363
1364
1365 t.queueForDial(w)
1366
1367
1368 select {
1369 case <-w.ready:
1370
1371
1372 if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
1373 trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
1374 }
1375 if w.err != nil {
1376
1377
1378
1379 select {
1380 case <-req.Cancel:
1381 return nil, errRequestCanceledConn
1382 case <-req.Context().Done():
1383 return nil, req.Context().Err()
1384 case err := <-cancelc:
1385 if err == errRequestCanceled {
1386 err = errRequestCanceledConn
1387 }
1388 return nil, err
1389 default:
1390
1391 }
1392 }
1393 return w.pc, w.err
1394 case <-req.Cancel:
1395 return nil, errRequestCanceledConn
1396 case <-req.Context().Done():
1397 return nil, req.Context().Err()
1398 case err := <-cancelc:
1399 if err == errRequestCanceled {
1400 err = errRequestCanceledConn
1401 }
1402 return nil, err
1403 }
1404 }
1405
1406
1407
1408 func (t *Transport) queueForDial(w *wantConn) {
1409 w.beforeDial()
1410 if t.MaxConnsPerHost <= 0 {
1411 go t.dialConnFor(w)
1412 return
1413 }
1414
1415 t.connsPerHostMu.Lock()
1416 defer t.connsPerHostMu.Unlock()
1417
1418 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1419 if t.connsPerHost == nil {
1420 t.connsPerHost = make(map[connectMethodKey]int)
1421 }
1422 t.connsPerHost[w.key] = n + 1
1423 go t.dialConnFor(w)
1424 return
1425 }
1426
1427 if t.connsPerHostWait == nil {
1428 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1429 }
1430 q := t.connsPerHostWait[w.key]
1431 q.cleanFront()
1432 q.pushBack(w)
1433 t.connsPerHostWait[w.key] = q
1434 }
1435
1436
1437
1438
1439 func (t *Transport) dialConnFor(w *wantConn) {
1440 defer w.afterDial()
1441
1442 pc, err := t.dialConn(w.ctx, w.cm)
1443 delivered := w.tryDeliver(pc, err)
1444 if err == nil && (!delivered || pc.alt != nil) {
1445
1446
1447
1448 t.putOrCloseIdleConn(pc)
1449 }
1450 if err != nil {
1451 t.decConnsPerHost(w.key)
1452 }
1453 }
1454
1455
1456
1457 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1458 if t.MaxConnsPerHost <= 0 {
1459 return
1460 }
1461
1462 t.connsPerHostMu.Lock()
1463 defer t.connsPerHostMu.Unlock()
1464 n := t.connsPerHost[key]
1465 if n == 0 {
1466
1467
1468 panic("net/http: internal error: connCount underflow")
1469 }
1470
1471
1472
1473
1474
1475 if q := t.connsPerHostWait[key]; q.len() > 0 {
1476 done := false
1477 for q.len() > 0 {
1478 w := q.popFront()
1479 if w.waiting() {
1480 go t.dialConnFor(w)
1481 done = true
1482 break
1483 }
1484 }
1485 if q.len() == 0 {
1486 delete(t.connsPerHostWait, key)
1487 } else {
1488
1489
1490 t.connsPerHostWait[key] = q
1491 }
1492 if done {
1493 return
1494 }
1495 }
1496
1497
1498 if n--; n == 0 {
1499 delete(t.connsPerHost, key)
1500 } else {
1501 t.connsPerHost[key] = n
1502 }
1503 }
1504
1505
1506
1507
1508 func (pconn *persistConn) addTLS(name string, trace *httptrace.ClientTrace) error {
1509
1510 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1511 if cfg.ServerName == "" {
1512 cfg.ServerName = name
1513 }
1514 if pconn.cacheKey.onlyH1 {
1515 cfg.NextProtos = nil
1516 }
1517 plainConn := pconn.conn
1518 tlsConn := tls.Client(plainConn, cfg)
1519 errc := make(chan error, 2)
1520 var timer *time.Timer
1521 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1522 timer = time.AfterFunc(d, func() {
1523 errc <- tlsHandshakeTimeoutError{}
1524 })
1525 }
1526 go func() {
1527 if trace != nil && trace.TLSHandshakeStart != nil {
1528 trace.TLSHandshakeStart()
1529 }
1530 err := tlsConn.Handshake()
1531 if timer != nil {
1532 timer.Stop()
1533 }
1534 errc <- err
1535 }()
1536 if err := <-errc; err != nil {
1537 plainConn.Close()
1538 if trace != nil && trace.TLSHandshakeDone != nil {
1539 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1540 }
1541 return err
1542 }
1543 cs := tlsConn.ConnectionState()
1544 if trace != nil && trace.TLSHandshakeDone != nil {
1545 trace.TLSHandshakeDone(cs, nil)
1546 }
1547 pconn.tlsState = &cs
1548 pconn.conn = tlsConn
1549 return nil
1550 }
1551
1552 type erringRoundTripper interface {
1553 RoundTripErr() error
1554 }
1555
1556 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
1557 pconn = &persistConn{
1558 t: t,
1559 cacheKey: cm.key(),
1560 reqch: make(chan requestAndChan, 1),
1561 writech: make(chan writeRequest, 1),
1562 closech: make(chan struct{}),
1563 writeErrCh: make(chan error, 1),
1564 writeLoopDone: make(chan struct{}),
1565 }
1566 trace := httptrace.ContextClientTrace(ctx)
1567 wrapErr := func(err error) error {
1568 if cm.proxyURL != nil {
1569
1570 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1571 }
1572 return err
1573 }
1574 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1575 var err error
1576 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1577 if err != nil {
1578 return nil, wrapErr(err)
1579 }
1580 if tc, ok := pconn.conn.(*tls.Conn); ok {
1581
1582
1583 if trace != nil && trace.TLSHandshakeStart != nil {
1584 trace.TLSHandshakeStart()
1585 }
1586 if err := tc.Handshake(); err != nil {
1587 go pconn.conn.Close()
1588 if trace != nil && trace.TLSHandshakeDone != nil {
1589 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1590 }
1591 return nil, err
1592 }
1593 cs := tc.ConnectionState()
1594 if trace != nil && trace.TLSHandshakeDone != nil {
1595 trace.TLSHandshakeDone(cs, nil)
1596 }
1597 pconn.tlsState = &cs
1598 }
1599 } else {
1600 conn, err := t.dial(ctx, "tcp", cm.addr())
1601 if err != nil {
1602 return nil, wrapErr(err)
1603 }
1604 pconn.conn = conn
1605 if cm.scheme() == "https" {
1606 var firstTLSHost string
1607 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1608 return nil, wrapErr(err)
1609 }
1610 if err = pconn.addTLS(firstTLSHost, trace); err != nil {
1611 return nil, wrapErr(err)
1612 }
1613 }
1614 }
1615
1616
1617 switch {
1618 case cm.proxyURL == nil:
1619
1620 case cm.proxyURL.Scheme == "socks5":
1621 conn := pconn.conn
1622 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1623 if u := cm.proxyURL.User; u != nil {
1624 auth := &socksUsernamePassword{
1625 Username: u.Username(),
1626 }
1627 auth.Password, _ = u.Password()
1628 d.AuthMethods = []socksAuthMethod{
1629 socksAuthMethodNotRequired,
1630 socksAuthMethodUsernamePassword,
1631 }
1632 d.Authenticate = auth.Authenticate
1633 }
1634 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1635 conn.Close()
1636 return nil, err
1637 }
1638 case cm.targetScheme == "http":
1639 pconn.isProxy = true
1640 if pa := cm.proxyAuth(); pa != "" {
1641 pconn.mutateHeaderFunc = func(h Header) {
1642 h.Set("Proxy-Authorization", pa)
1643 }
1644 }
1645 case cm.targetScheme == "https":
1646 conn := pconn.conn
1647 var hdr Header
1648 if t.GetProxyConnectHeader != nil {
1649 var err error
1650 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1651 if err != nil {
1652 conn.Close()
1653 return nil, err
1654 }
1655 } else {
1656 hdr = t.ProxyConnectHeader
1657 }
1658 if hdr == nil {
1659 hdr = make(Header)
1660 }
1661 if pa := cm.proxyAuth(); pa != "" {
1662 hdr = hdr.Clone()
1663 hdr.Set("Proxy-Authorization", pa)
1664 }
1665 connectReq := &Request{
1666 Method: "CONNECT",
1667 URL: &url.URL{Opaque: cm.targetAddr},
1668 Host: cm.targetAddr,
1669 Header: hdr,
1670 }
1671
1672
1673
1674
1675
1676
1677 connectCtx := ctx
1678 if ctx.Done() == nil {
1679 newCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
1680 defer cancel()
1681 connectCtx = newCtx
1682 }
1683
1684 didReadResponse := make(chan struct{})
1685 var (
1686 resp *Response
1687 err error
1688 )
1689
1690 go func() {
1691 defer close(didReadResponse)
1692 err = connectReq.Write(conn)
1693 if err != nil {
1694 return
1695 }
1696
1697
1698 br := bufio.NewReader(conn)
1699 resp, err = ReadResponse(br, connectReq)
1700 }()
1701 select {
1702 case <-connectCtx.Done():
1703 conn.Close()
1704 <-didReadResponse
1705 return nil, connectCtx.Err()
1706 case <-didReadResponse:
1707
1708 }
1709 if err != nil {
1710 conn.Close()
1711 return nil, err
1712 }
1713 if resp.StatusCode != 200 {
1714 f := strings.SplitN(resp.Status, " ", 2)
1715 conn.Close()
1716 if len(f) < 2 {
1717 return nil, errors.New("unknown status code")
1718 }
1719 return nil, errors.New(f[1])
1720 }
1721 }
1722
1723 if cm.proxyURL != nil && cm.targetScheme == "https" {
1724 if err := pconn.addTLS(cm.tlsHost(), trace); err != nil {
1725 return nil, err
1726 }
1727 }
1728
1729 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1730 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1731 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
1732 if e, ok := alt.(erringRoundTripper); ok {
1733
1734 return nil, e.RoundTripErr()
1735 }
1736 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1737 }
1738 }
1739
1740 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1741 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1742
1743 go pconn.readLoop()
1744 go pconn.writeLoop()
1745 return pconn, nil
1746 }
1747
1748
1749
1750
1751
1752
1753
1754 type persistConnWriter struct {
1755 pc *persistConn
1756 }
1757
1758 func (w persistConnWriter) Write(p []byte) (n int, err error) {
1759 n, err = w.pc.conn.Write(p)
1760 w.pc.nwrite += int64(n)
1761 return
1762 }
1763
1764
1765
1766
1767 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
1768 n, err = io.Copy(w.pc.conn, r)
1769 w.pc.nwrite += n
1770 return
1771 }
1772
1773 var _ io.ReaderFrom = (*persistConnWriter)(nil)
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792 type connectMethod struct {
1793 _ incomparable
1794 proxyURL *url.URL
1795 targetScheme string
1796
1797
1798
1799 targetAddr string
1800 onlyH1 bool
1801 }
1802
1803 func (cm *connectMethod) key() connectMethodKey {
1804 proxyStr := ""
1805 targetAddr := cm.targetAddr
1806 if cm.proxyURL != nil {
1807 proxyStr = cm.proxyURL.String()
1808 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
1809 targetAddr = ""
1810 }
1811 }
1812 return connectMethodKey{
1813 proxy: proxyStr,
1814 scheme: cm.targetScheme,
1815 addr: targetAddr,
1816 onlyH1: cm.onlyH1,
1817 }
1818 }
1819
1820
1821 func (cm *connectMethod) scheme() string {
1822 if cm.proxyURL != nil {
1823 return cm.proxyURL.Scheme
1824 }
1825 return cm.targetScheme
1826 }
1827
1828
1829 func (cm *connectMethod) addr() string {
1830 if cm.proxyURL != nil {
1831 return canonicalAddr(cm.proxyURL)
1832 }
1833 return cm.targetAddr
1834 }
1835
1836
1837
1838 func (cm *connectMethod) tlsHost() string {
1839 h := cm.targetAddr
1840 if hasPort(h) {
1841 h = h[:strings.LastIndex(h, ":")]
1842 }
1843 return h
1844 }
1845
1846
1847
1848
1849 type connectMethodKey struct {
1850 proxy, scheme, addr string
1851 onlyH1 bool
1852 }
1853
1854 func (k connectMethodKey) String() string {
1855
1856 var h1 string
1857 if k.onlyH1 {
1858 h1 = ",h1"
1859 }
1860 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
1861 }
1862
1863
1864
1865 type persistConn struct {
1866
1867
1868
1869 alt RoundTripper
1870
1871 t *Transport
1872 cacheKey connectMethodKey
1873 conn net.Conn
1874 tlsState *tls.ConnectionState
1875 br *bufio.Reader
1876 bw *bufio.Writer
1877 nwrite int64
1878 reqch chan requestAndChan
1879 writech chan writeRequest
1880 closech chan struct{}
1881 isProxy bool
1882 sawEOF bool
1883 readLimit int64
1884
1885
1886
1887
1888 writeErrCh chan error
1889
1890 writeLoopDone chan struct{}
1891
1892
1893 idleAt time.Time
1894 idleTimer *time.Timer
1895
1896 mu sync.Mutex
1897 numExpectedResponses int
1898 closed error
1899 canceledErr error
1900 broken bool
1901 reused bool
1902
1903
1904
1905 mutateHeaderFunc func(Header)
1906 }
1907
1908 func (pc *persistConn) maxHeaderResponseSize() int64 {
1909 if v := pc.t.MaxResponseHeaderBytes; v != 0 {
1910 return v
1911 }
1912 return 10 << 20
1913 }
1914
1915 func (pc *persistConn) Read(p []byte) (n int, err error) {
1916 if pc.readLimit <= 0 {
1917 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
1918 }
1919 if int64(len(p)) > pc.readLimit {
1920 p = p[:pc.readLimit]
1921 }
1922 n, err = pc.conn.Read(p)
1923 if err == io.EOF {
1924 pc.sawEOF = true
1925 }
1926 pc.readLimit -= int64(n)
1927 return
1928 }
1929
1930
1931 func (pc *persistConn) isBroken() bool {
1932 pc.mu.Lock()
1933 b := pc.closed != nil
1934 pc.mu.Unlock()
1935 return b
1936 }
1937
1938
1939
1940 func (pc *persistConn) canceled() error {
1941 pc.mu.Lock()
1942 defer pc.mu.Unlock()
1943 return pc.canceledErr
1944 }
1945
1946
1947 func (pc *persistConn) isReused() bool {
1948 pc.mu.Lock()
1949 r := pc.reused
1950 pc.mu.Unlock()
1951 return r
1952 }
1953
1954 func (pc *persistConn) gotIdleConnTrace(idleAt time.Time) (t httptrace.GotConnInfo) {
1955 pc.mu.Lock()
1956 defer pc.mu.Unlock()
1957 t.Reused = pc.reused
1958 t.Conn = pc.conn
1959 t.WasIdle = true
1960 if !idleAt.IsZero() {
1961 t.IdleTime = time.Since(idleAt)
1962 }
1963 return
1964 }
1965
1966 func (pc *persistConn) cancelRequest(err error) {
1967 pc.mu.Lock()
1968 defer pc.mu.Unlock()
1969 pc.canceledErr = err
1970 pc.closeLocked(errRequestCanceled)
1971 }
1972
1973
1974
1975
1976 func (pc *persistConn) closeConnIfStillIdle() {
1977 t := pc.t
1978 t.idleMu.Lock()
1979 defer t.idleMu.Unlock()
1980 if _, ok := t.idleLRU.m[pc]; !ok {
1981
1982 return
1983 }
1984 t.removeIdleConnLocked(pc)
1985 pc.close(errIdleConnTimeout)
1986 }
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
1997 if err == nil {
1998 return nil
1999 }
2000
2001
2002
2003
2004
2005
2006
2007
2008 <-pc.writeLoopDone
2009
2010
2011
2012
2013 if cerr := pc.canceled(); cerr != nil {
2014 return cerr
2015 }
2016
2017
2018 req.mu.Lock()
2019 reqErr := req.err
2020 req.mu.Unlock()
2021 if reqErr != nil {
2022 return reqErr
2023 }
2024
2025 if err == errServerClosedIdle {
2026
2027 return err
2028 }
2029
2030 if _, ok := err.(transportReadFromServerError); ok {
2031
2032 return err
2033 }
2034 if pc.isBroken() {
2035 if pc.nwrite == startBytesWritten {
2036 return nothingWrittenError{err}
2037 }
2038 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %v", err)
2039 }
2040 return err
2041 }
2042
2043
2044
2045
2046 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2047
2048 func (pc *persistConn) readLoop() {
2049 closeErr := errReadLoopExiting
2050 defer func() {
2051 pc.close(closeErr)
2052 pc.t.removeIdleConn(pc)
2053 }()
2054
2055 tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
2056 if err := pc.t.tryPutIdleConn(pc); err != nil {
2057 closeErr = err
2058 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2059 trace.PutIdleConn(err)
2060 }
2061 return false
2062 }
2063 if trace != nil && trace.PutIdleConn != nil {
2064 trace.PutIdleConn(nil)
2065 }
2066 return true
2067 }
2068
2069
2070
2071
2072 eofc := make(chan struct{})
2073 defer close(eofc)
2074
2075
2076 testHookMu.Lock()
2077 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2078 testHookMu.Unlock()
2079
2080 alive := true
2081 for alive {
2082 pc.readLimit = pc.maxHeaderResponseSize()
2083 _, err := pc.br.Peek(1)
2084
2085 pc.mu.Lock()
2086 if pc.numExpectedResponses == 0 {
2087 pc.readLoopPeekFailLocked(err)
2088 pc.mu.Unlock()
2089 return
2090 }
2091 pc.mu.Unlock()
2092
2093 rc := <-pc.reqch
2094 trace := httptrace.ContextClientTrace(rc.req.Context())
2095
2096 var resp *Response
2097 if err == nil {
2098 resp, err = pc.readResponse(rc, trace)
2099 } else {
2100 err = transportReadFromServerError{err}
2101 closeErr = err
2102 }
2103
2104 if err != nil {
2105 if pc.readLimit <= 0 {
2106 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2107 }
2108
2109 select {
2110 case rc.ch <- responseAndError{err: err}:
2111 case <-rc.callerGone:
2112 return
2113 }
2114 return
2115 }
2116 pc.readLimit = maxInt64
2117
2118 pc.mu.Lock()
2119 pc.numExpectedResponses--
2120 pc.mu.Unlock()
2121
2122 bodyWritable := resp.bodyIsWritable()
2123 hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
2124
2125 if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
2126
2127
2128
2129 alive = false
2130 }
2131
2132 if !hasBody || bodyWritable {
2133 replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil)
2134
2135
2136
2137
2138
2139
2140 alive = alive &&
2141 !pc.sawEOF &&
2142 pc.wroteRequest() &&
2143 replaced && tryPutIdleConn(trace)
2144
2145 if bodyWritable {
2146 closeErr = errCallerOwnsConn
2147 }
2148
2149 select {
2150 case rc.ch <- responseAndError{res: resp}:
2151 case <-rc.callerGone:
2152 return
2153 }
2154
2155
2156
2157
2158 testHookReadLoopBeforeNextRead()
2159 continue
2160 }
2161
2162 waitForBodyRead := make(chan bool, 2)
2163 body := &bodyEOFSignal{
2164 body: resp.Body,
2165 earlyCloseFn: func() error {
2166 waitForBodyRead <- false
2167 <-eofc
2168 return nil
2169
2170 },
2171 fn: func(err error) error {
2172 isEOF := err == io.EOF
2173 waitForBodyRead <- isEOF
2174 if isEOF {
2175 <-eofc
2176 } else if err != nil {
2177 if cerr := pc.canceled(); cerr != nil {
2178 return cerr
2179 }
2180 }
2181 return err
2182 },
2183 }
2184
2185 resp.Body = body
2186 if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2187 resp.Body = &gzipReader{body: body}
2188 resp.Header.Del("Content-Encoding")
2189 resp.Header.Del("Content-Length")
2190 resp.ContentLength = -1
2191 resp.Uncompressed = true
2192 }
2193
2194 select {
2195 case rc.ch <- responseAndError{res: resp}:
2196 case <-rc.callerGone:
2197 return
2198 }
2199
2200
2201
2202
2203 select {
2204 case bodyEOF := <-waitForBodyRead:
2205 replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil)
2206 alive = alive &&
2207 bodyEOF &&
2208 !pc.sawEOF &&
2209 pc.wroteRequest() &&
2210 replaced && tryPutIdleConn(trace)
2211 if bodyEOF {
2212 eofc <- struct{}{}
2213 }
2214 case <-rc.req.Cancel:
2215 alive = false
2216 pc.t.CancelRequest(rc.req)
2217 case <-rc.req.Context().Done():
2218 alive = false
2219 pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
2220 case <-pc.closech:
2221 alive = false
2222 }
2223
2224 testHookReadLoopBeforeNextRead()
2225 }
2226 }
2227
2228 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2229 if pc.closed != nil {
2230 return
2231 }
2232 if n := pc.br.Buffered(); n > 0 {
2233 buf, _ := pc.br.Peek(n)
2234 if is408Message(buf) {
2235 pc.closeLocked(errServerClosedIdle)
2236 return
2237 } else {
2238 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2239 }
2240 }
2241 if peekErr == io.EOF {
2242
2243 pc.closeLocked(errServerClosedIdle)
2244 } else {
2245 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %v", peekErr))
2246 }
2247 }
2248
2249
2250
2251
2252 func is408Message(buf []byte) bool {
2253 if len(buf) < len("HTTP/1.x 408") {
2254 return false
2255 }
2256 if string(buf[:7]) != "HTTP/1." {
2257 return false
2258 }
2259 return string(buf[8:12]) == " 408"
2260 }
2261
2262
2263
2264
2265 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2266 if trace != nil && trace.GotFirstResponseByte != nil {
2267 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2268 trace.GotFirstResponseByte()
2269 }
2270 }
2271 num1xx := 0
2272 const max1xxResponses = 5
2273
2274 continueCh := rc.continueCh
2275 for {
2276 resp, err = ReadResponse(pc.br, rc.req)
2277 if err != nil {
2278 return
2279 }
2280 resCode := resp.StatusCode
2281 if continueCh != nil {
2282 if resCode == 100 {
2283 if trace != nil && trace.Got100Continue != nil {
2284 trace.Got100Continue()
2285 }
2286 continueCh <- struct{}{}
2287 continueCh = nil
2288 } else if resCode >= 200 {
2289 close(continueCh)
2290 continueCh = nil
2291 }
2292 }
2293 is1xx := 100 <= resCode && resCode <= 199
2294
2295 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2296 if is1xxNonTerminal {
2297 num1xx++
2298 if num1xx > max1xxResponses {
2299 return nil, errors.New("net/http: too many 1xx informational responses")
2300 }
2301 pc.readLimit = pc.maxHeaderResponseSize()
2302 if trace != nil && trace.Got1xxResponse != nil {
2303 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2304 return nil, err
2305 }
2306 }
2307 continue
2308 }
2309 break
2310 }
2311 if resp.isProtocolSwitch() {
2312 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2313 }
2314
2315 resp.TLS = pc.tlsState
2316 return
2317 }
2318
2319
2320
2321
2322 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2323 if continueCh == nil {
2324 return nil
2325 }
2326 return func() bool {
2327 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2328 defer timer.Stop()
2329
2330 select {
2331 case _, ok := <-continueCh:
2332 return ok
2333 case <-timer.C:
2334 return true
2335 case <-pc.closech:
2336 return false
2337 }
2338 }
2339 }
2340
2341 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2342 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2343 if br.Buffered() != 0 {
2344 body.br = br
2345 }
2346 return body
2347 }
2348
2349
2350
2351
2352
2353
2354 type readWriteCloserBody struct {
2355 _ incomparable
2356 br *bufio.Reader
2357 io.ReadWriteCloser
2358 }
2359
2360 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2361 if b.br != nil {
2362 if n := b.br.Buffered(); len(p) > n {
2363 p = p[:n]
2364 }
2365 n, err = b.br.Read(p)
2366 if b.br.Buffered() == 0 {
2367 b.br = nil
2368 }
2369 return n, err
2370 }
2371 return b.ReadWriteCloser.Read(p)
2372 }
2373
2374
2375 type nothingWrittenError struct {
2376 error
2377 }
2378
2379 func (pc *persistConn) writeLoop() {
2380 defer close(pc.writeLoopDone)
2381 for {
2382 select {
2383 case wr := <-pc.writech:
2384 startBytesWritten := pc.nwrite
2385 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2386 if bre, ok := err.(requestBodyReadError); ok {
2387 err = bre.error
2388
2389
2390
2391
2392
2393
2394
2395 wr.req.setError(err)
2396 }
2397 if err == nil {
2398 err = pc.bw.Flush()
2399 }
2400 if err != nil {
2401 if pc.nwrite == startBytesWritten {
2402 err = nothingWrittenError{err}
2403 }
2404 }
2405 pc.writeErrCh <- err
2406 wr.ch <- err
2407 if err != nil {
2408 pc.close(err)
2409 return
2410 }
2411 case <-pc.closech:
2412 return
2413 }
2414 }
2415 }
2416
2417
2418
2419
2420 const maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2421
2422
2423
2424 func (pc *persistConn) wroteRequest() bool {
2425 select {
2426 case err := <-pc.writeErrCh:
2427
2428
2429 return err == nil
2430 default:
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2442 defer t.Stop()
2443 select {
2444 case err := <-pc.writeErrCh:
2445 return err == nil
2446 case <-t.C:
2447 return false
2448 }
2449 }
2450 }
2451
2452
2453
2454 type responseAndError struct {
2455 _ incomparable
2456 res *Response
2457 err error
2458 }
2459
2460 type requestAndChan struct {
2461 _ incomparable
2462 req *Request
2463 cancelKey cancelKey
2464 ch chan responseAndError
2465
2466
2467
2468
2469 addedGzip bool
2470
2471
2472
2473
2474
2475 continueCh chan<- struct{}
2476
2477 callerGone <-chan struct{}
2478 }
2479
2480
2481
2482
2483
2484 type writeRequest struct {
2485 req *transportRequest
2486 ch chan<- error
2487
2488
2489
2490
2491 continueCh <-chan struct{}
2492 }
2493
2494 type httpError struct {
2495 err string
2496 timeout bool
2497 }
2498
2499 func (e *httpError) Error() string { return e.err }
2500 func (e *httpError) Timeout() bool { return e.timeout }
2501 func (e *httpError) Temporary() bool { return true }
2502
2503 var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true}
2504
2505
2506
2507 var errRequestCanceled = http2errRequestCanceled
2508 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2509
2510 func nop() {}
2511
2512
2513 var (
2514 testHookEnterRoundTrip = nop
2515 testHookWaitResLoop = nop
2516 testHookRoundTripRetried = nop
2517 testHookPrePendingDial = nop
2518 testHookPostPendingDial = nop
2519
2520 testHookMu sync.Locker = fakeLocker{}
2521 testHookReadLoopBeforeNextRead = nop
2522 )
2523
2524 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2525 testHookEnterRoundTrip()
2526 if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) {
2527 pc.t.putOrCloseIdleConn(pc)
2528 return nil, errRequestCanceled
2529 }
2530 pc.mu.Lock()
2531 pc.numExpectedResponses++
2532 headerFn := pc.mutateHeaderFunc
2533 pc.mu.Unlock()
2534
2535 if headerFn != nil {
2536 headerFn(req.extraHeaders())
2537 }
2538
2539
2540
2541
2542
2543 requestedGzip := false
2544 if !pc.t.DisableCompression &&
2545 req.Header.Get("Accept-Encoding") == "" &&
2546 req.Header.Get("Range") == "" &&
2547 req.Method != "HEAD" {
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560 requestedGzip = true
2561 req.extraHeaders().Set("Accept-Encoding", "gzip")
2562 }
2563
2564 var continueCh chan struct{}
2565 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2566 continueCh = make(chan struct{}, 1)
2567 }
2568
2569 if pc.t.DisableKeepAlives &&
2570 !req.wantsClose() &&
2571 !isProtocolSwitchHeader(req.Header) {
2572 req.extraHeaders().Set("Connection", "close")
2573 }
2574
2575 gone := make(chan struct{})
2576 defer close(gone)
2577
2578 defer func() {
2579 if err != nil {
2580 pc.t.setReqCanceler(req.cancelKey, nil)
2581 }
2582 }()
2583
2584 const debugRoundTrip = false
2585
2586
2587
2588
2589 startBytesWritten := pc.nwrite
2590 writeErrCh := make(chan error, 1)
2591 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2592
2593 resc := make(chan responseAndError)
2594 pc.reqch <- requestAndChan{
2595 req: req.Request,
2596 cancelKey: req.cancelKey,
2597 ch: resc,
2598 addedGzip: requestedGzip,
2599 continueCh: continueCh,
2600 callerGone: gone,
2601 }
2602
2603 var respHeaderTimer <-chan time.Time
2604 cancelChan := req.Request.Cancel
2605 ctxDoneChan := req.Context().Done()
2606 pcClosed := pc.closech
2607 canceled := false
2608 for {
2609 testHookWaitResLoop()
2610 select {
2611 case err := <-writeErrCh:
2612 if debugRoundTrip {
2613 req.logf("writeErrCh resv: %T/%#v", err, err)
2614 }
2615 if err != nil {
2616 pc.close(fmt.Errorf("write error: %v", err))
2617 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2618 }
2619 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2620 if debugRoundTrip {
2621 req.logf("starting timer for %v", d)
2622 }
2623 timer := time.NewTimer(d)
2624 defer timer.Stop()
2625 respHeaderTimer = timer.C
2626 }
2627 case <-pcClosed:
2628 pcClosed = nil
2629 if canceled || pc.t.replaceReqCanceler(req.cancelKey, nil) {
2630 if debugRoundTrip {
2631 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2632 }
2633 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2634 }
2635 case <-respHeaderTimer:
2636 if debugRoundTrip {
2637 req.logf("timeout waiting for response headers.")
2638 }
2639 pc.close(errTimeout)
2640 return nil, errTimeout
2641 case re := <-resc:
2642 if (re.res == nil) == (re.err == nil) {
2643 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2644 }
2645 if debugRoundTrip {
2646 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2647 }
2648 if re.err != nil {
2649 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2650 }
2651 return re.res, nil
2652 case <-cancelChan:
2653 canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
2654 cancelChan = nil
2655 case <-ctxDoneChan:
2656 canceled = pc.t.cancelRequest(req.cancelKey, req.Context().Err())
2657 cancelChan = nil
2658 ctxDoneChan = nil
2659 }
2660 }
2661 }
2662
2663
2664
2665 type tLogKey struct{}
2666
2667 func (tr *transportRequest) logf(format string, args ...interface{}) {
2668 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...interface{})); ok {
2669 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2670 }
2671 }
2672
2673
2674
2675 func (pc *persistConn) markReused() {
2676 pc.mu.Lock()
2677 pc.reused = true
2678 pc.mu.Unlock()
2679 }
2680
2681
2682
2683
2684
2685
2686 func (pc *persistConn) close(err error) {
2687 pc.mu.Lock()
2688 defer pc.mu.Unlock()
2689 pc.closeLocked(err)
2690 }
2691
2692 func (pc *persistConn) closeLocked(err error) {
2693 if err == nil {
2694 panic("nil error")
2695 }
2696 pc.broken = true
2697 if pc.closed == nil {
2698 pc.closed = err
2699 pc.t.decConnsPerHost(pc.cacheKey)
2700
2701
2702 if pc.alt == nil {
2703 if err != errCallerOwnsConn {
2704 pc.conn.Close()
2705 }
2706 close(pc.closech)
2707 }
2708 }
2709 pc.mutateHeaderFunc = nil
2710 }
2711
2712 var portMap = map[string]string{
2713 "http": "80",
2714 "https": "443",
2715 "socks5": "1080",
2716 }
2717
2718
2719 func canonicalAddr(url *url.URL) string {
2720 addr := url.Hostname()
2721 if v, err := idnaASCII(addr); err == nil {
2722 addr = v
2723 }
2724 port := url.Port()
2725 if port == "" {
2726 port = portMap[url.Scheme]
2727 }
2728 return net.JoinHostPort(addr, port)
2729 }
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742 type bodyEOFSignal struct {
2743 body io.ReadCloser
2744 mu sync.Mutex
2745 closed bool
2746 rerr error
2747 fn func(error) error
2748 earlyCloseFn func() error
2749 }
2750
2751 var errReadOnClosedResBody = errors.New("http: read on closed response body")
2752
2753 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
2754 es.mu.Lock()
2755 closed, rerr := es.closed, es.rerr
2756 es.mu.Unlock()
2757 if closed {
2758 return 0, errReadOnClosedResBody
2759 }
2760 if rerr != nil {
2761 return 0, rerr
2762 }
2763
2764 n, err = es.body.Read(p)
2765 if err != nil {
2766 es.mu.Lock()
2767 defer es.mu.Unlock()
2768 if es.rerr == nil {
2769 es.rerr = err
2770 }
2771 err = es.condfn(err)
2772 }
2773 return
2774 }
2775
2776 func (es *bodyEOFSignal) Close() error {
2777 es.mu.Lock()
2778 defer es.mu.Unlock()
2779 if es.closed {
2780 return nil
2781 }
2782 es.closed = true
2783 if es.earlyCloseFn != nil && es.rerr != io.EOF {
2784 return es.earlyCloseFn()
2785 }
2786 err := es.body.Close()
2787 return es.condfn(err)
2788 }
2789
2790
2791 func (es *bodyEOFSignal) condfn(err error) error {
2792 if es.fn == nil {
2793 return err
2794 }
2795 err = es.fn(err)
2796 es.fn = nil
2797 return err
2798 }
2799
2800
2801
2802 type gzipReader struct {
2803 _ incomparable
2804 body *bodyEOFSignal
2805 zr *gzip.Reader
2806 zerr error
2807 }
2808
2809 func (gz *gzipReader) Read(p []byte) (n int, err error) {
2810 if gz.zr == nil {
2811 if gz.zerr == nil {
2812 gz.zr, gz.zerr = gzip.NewReader(gz.body)
2813 }
2814 if gz.zerr != nil {
2815 return 0, gz.zerr
2816 }
2817 }
2818
2819 gz.body.mu.Lock()
2820 if gz.body.closed {
2821 err = errReadOnClosedResBody
2822 }
2823 gz.body.mu.Unlock()
2824
2825 if err != nil {
2826 return 0, err
2827 }
2828 return gz.zr.Read(p)
2829 }
2830
2831 func (gz *gzipReader) Close() error {
2832 return gz.body.Close()
2833 }
2834
2835 type tlsHandshakeTimeoutError struct{}
2836
2837 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
2838 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
2839 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
2840
2841
2842
2843
2844 type fakeLocker struct{}
2845
2846 func (fakeLocker) Lock() {}
2847 func (fakeLocker) Unlock() {}
2848
2849
2850
2851
2852 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
2853 if cfg == nil {
2854 return &tls.Config{}
2855 }
2856 return cfg.Clone()
2857 }
2858
2859 type connLRU struct {
2860 ll *list.List
2861 m map[*persistConn]*list.Element
2862 }
2863
2864
2865 func (cl *connLRU) add(pc *persistConn) {
2866 if cl.ll == nil {
2867 cl.ll = list.New()
2868 cl.m = make(map[*persistConn]*list.Element)
2869 }
2870 ele := cl.ll.PushFront(pc)
2871 if _, ok := cl.m[pc]; ok {
2872 panic("persistConn was already in LRU")
2873 }
2874 cl.m[pc] = ele
2875 }
2876
2877 func (cl *connLRU) removeOldest() *persistConn {
2878 ele := cl.ll.Back()
2879 pc := ele.Value.(*persistConn)
2880 cl.ll.Remove(ele)
2881 delete(cl.m, pc)
2882 return pc
2883 }
2884
2885
2886 func (cl *connLRU) remove(pc *persistConn) {
2887 if ele, ok := cl.m[pc]; ok {
2888 cl.ll.Remove(ele)
2889 delete(cl.m, pc)
2890 }
2891 }
2892
2893
2894 func (cl *connLRU) len() int {
2895 return len(cl.m)
2896 }
2897
View as plain text