Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/gzip"
15 "container/list"
16 "context"
17 "crypto/tls"
18 "errors"
19 "fmt"
20 "io"
21 "log"
22 "net"
23 "net/http/httptrace"
24 "net/textproto"
25 "net/url"
26 "os"
27 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "golang.org/x/net/http/httpguts"
34 "golang.org/x/net/http/httpproxy"
35 )
36
37
38
39
40
41
42 var DefaultTransport RoundTripper = &Transport{
43 Proxy: ProxyFromEnvironment,
44 DialContext: (&net.Dialer{
45 Timeout: 30 * time.Second,
46 KeepAlive: 30 * time.Second,
47 DualStack: true,
48 }).DialContext,
49 ForceAttemptHTTP2: true,
50 MaxIdleConns: 100,
51 IdleConnTimeout: 90 * time.Second,
52 TLSHandshakeTimeout: 10 * time.Second,
53 ExpectContinueTimeout: 1 * time.Second,
54 }
55
56
57
58 const DefaultMaxIdleConnsPerHost = 2
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 type Transport struct {
96 idleMu sync.Mutex
97 closeIdle bool
98 idleConn map[connectMethodKey][]*persistConn
99 idleConnWait map[connectMethodKey]wantConnQueue
100 idleLRU connLRU
101
102 reqMu sync.Mutex
103 reqCanceler map[cancelKey]func(error)
104
105 altMu sync.Mutex
106 altProto atomic.Value
107
108 connsPerHostMu sync.Mutex
109 connsPerHost map[connectMethodKey]int
110 connsPerHostWait map[connectMethodKey]wantConnQueue
111
112
113
114
115
116
117
118
119
120
121 Proxy func(*Request) (*url.URL, error)
122
123
124
125
126
127
128
129
130
131 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
132
133
134
135
136
137
138
139
140
141
142
143 Dial func(network, addr string) (net.Conn, error)
144
145
146
147
148
149
150
151
152
153
154
155 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
156
157
158
159
160
161
162
163 DialTLS func(network, addr string) (net.Conn, error)
164
165
166
167
168
169 TLSClientConfig *tls.Config
170
171
172
173 TLSHandshakeTimeout time.Duration
174
175
176
177
178
179
180 DisableKeepAlives bool
181
182
183
184
185
186
187
188
189
190 DisableCompression bool
191
192
193
194 MaxIdleConns int
195
196
197
198
199 MaxIdleConnsPerHost int
200
201
202
203
204
205
206 MaxConnsPerHost int
207
208
209
210
211
212 IdleConnTimeout time.Duration
213
214
215
216
217
218 ResponseHeaderTimeout time.Duration
219
220
221
222
223
224
225
226
227 ExpectContinueTimeout time.Duration
228
229
230
231
232
233
234
235
236
237
238
239 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
240
241
242
243 ProxyConnectHeader Header
244
245
246
247
248
249
250 MaxResponseHeaderBytes int64
251
252
253
254
255 WriteBufferSize int
256
257
258
259
260 ReadBufferSize int
261
262
263
264 nextProtoOnce sync.Once
265 h2transport h2Transport
266 tlsNextProtoWasNil bool
267
268
269
270
271
272
273 ForceAttemptHTTP2 bool
274 }
275
276
277
278
279 type cancelKey struct {
280 req *Request
281 }
282
283 func (t *Transport) writeBufferSize() int {
284 if t.WriteBufferSize > 0 {
285 return t.WriteBufferSize
286 }
287 return 4 << 10
288 }
289
290 func (t *Transport) readBufferSize() int {
291 if t.ReadBufferSize > 0 {
292 return t.ReadBufferSize
293 }
294 return 4 << 10
295 }
296
297
298 func (t *Transport) Clone() *Transport {
299 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
300 t2 := &Transport{
301 Proxy: t.Proxy,
302 DialContext: t.DialContext,
303 Dial: t.Dial,
304 DialTLS: t.DialTLS,
305 DialTLSContext: t.DialTLSContext,
306 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
307 DisableKeepAlives: t.DisableKeepAlives,
308 DisableCompression: t.DisableCompression,
309 MaxIdleConns: t.MaxIdleConns,
310 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
311 MaxConnsPerHost: t.MaxConnsPerHost,
312 IdleConnTimeout: t.IdleConnTimeout,
313 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
314 ExpectContinueTimeout: t.ExpectContinueTimeout,
315 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
316 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
317 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
318 WriteBufferSize: t.WriteBufferSize,
319 ReadBufferSize: t.ReadBufferSize,
320 }
321 if t.TLSClientConfig != nil {
322 t2.TLSClientConfig = t.TLSClientConfig.Clone()
323 }
324 if !t.tlsNextProtoWasNil {
325 npm := map[string]func(authority string, c *tls.Conn) RoundTripper{}
326 for k, v := range t.TLSNextProto {
327 npm[k] = v
328 }
329 t2.TLSNextProto = npm
330 }
331 return t2
332 }
333
334
335
336
337
338
339
340 type h2Transport interface {
341 CloseIdleConnections()
342 }
343
344 func (t *Transport) hasCustomTLSDialer() bool {
345 return t.DialTLS != nil || t.DialTLSContext != nil
346 }
347
348
349
350 func (t *Transport) onceSetNextProtoDefaults() {
351 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
352 if strings.Contains(os.Getenv("GODEBUG"), "http2client=0") {
353 return
354 }
355
356
357
358
359
360
361 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
362 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
363 if v := rv.Field(0); v.CanInterface() {
364 if h2i, ok := v.Interface().(h2Transport); ok {
365 t.h2transport = h2i
366 return
367 }
368 }
369 }
370
371 if t.TLSNextProto != nil {
372
373
374 return
375 }
376 if !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()) {
377
378
379
380
381
382
383 return
384 }
385 if omitBundledHTTP2 {
386 return
387 }
388 t2, err := http2configureTransport(t)
389 if err != nil {
390 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
391 return
392 }
393 t.h2transport = t2
394
395
396
397
398
399
400
401 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
402 const h2max = 1<<32 - 1
403 if limit1 >= h2max {
404 t2.MaxHeaderListSize = h2max
405 } else {
406 t2.MaxHeaderListSize = uint32(limit1)
407 }
408 }
409 }
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
428 return envProxyFunc()(req.URL)
429 }
430
431
432
433 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
434 return func(*Request) (*url.URL, error) {
435 return fixedURL, nil
436 }
437 }
438
439
440
441
442 type transportRequest struct {
443 *Request
444 extra Header
445 trace *httptrace.ClientTrace
446 cancelKey cancelKey
447
448 mu sync.Mutex
449 err error
450 }
451
452 func (tr *transportRequest) extraHeaders() Header {
453 if tr.extra == nil {
454 tr.extra = make(Header)
455 }
456 return tr.extra
457 }
458
459 func (tr *transportRequest) setError(err error) {
460 tr.mu.Lock()
461 if tr.err == nil {
462 tr.err = err
463 }
464 tr.mu.Unlock()
465 }
466
467
468
469 func (t *Transport) useRegisteredProtocol(req *Request) bool {
470 if req.URL.Scheme == "https" && req.requiresHTTP1() {
471
472
473
474
475 return false
476 }
477 return true
478 }
479
480
481
482
483 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
484 if !t.useRegisteredProtocol(req) {
485 return nil
486 }
487 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
488 return altProto[req.URL.Scheme]
489 }
490
491
492 func (t *Transport) roundTrip(req *Request) (*Response, error) {
493 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
494 ctx := req.Context()
495 trace := httptrace.ContextClientTrace(ctx)
496
497 if req.URL == nil {
498 req.closeBody()
499 return nil, errors.New("http: nil Request.URL")
500 }
501 if req.Header == nil {
502 req.closeBody()
503 return nil, errors.New("http: nil Request.Header")
504 }
505 scheme := req.URL.Scheme
506 isHTTP := scheme == "http" || scheme == "https"
507 if isHTTP {
508 for k, vv := range req.Header {
509 if !httpguts.ValidHeaderFieldName(k) {
510 req.closeBody()
511 return nil, fmt.Errorf("net/http: invalid header field name %q", k)
512 }
513 for _, v := range vv {
514 if !httpguts.ValidHeaderFieldValue(v) {
515 req.closeBody()
516 return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
517 }
518 }
519 }
520 }
521
522 origReq := req
523 cancelKey := cancelKey{origReq}
524 req = setupRewindBody(req)
525
526 if altRT := t.alternateRoundTripper(req); altRT != nil {
527 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
528 return resp, err
529 }
530 var err error
531 req, err = rewindBody(req)
532 if err != nil {
533 return nil, err
534 }
535 }
536 if !isHTTP {
537 req.closeBody()
538 return nil, badStringError("unsupported protocol scheme", scheme)
539 }
540 if req.Method != "" && !validMethod(req.Method) {
541 req.closeBody()
542 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
543 }
544 if req.URL.Host == "" {
545 req.closeBody()
546 return nil, errors.New("http: no Host in request URL")
547 }
548
549 for {
550 select {
551 case <-ctx.Done():
552 req.closeBody()
553 return nil, ctx.Err()
554 default:
555 }
556
557
558 treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
559 cm, err := t.connectMethodForRequest(treq)
560 if err != nil {
561 req.closeBody()
562 return nil, err
563 }
564
565
566
567
568
569 pconn, err := t.getConn(treq, cm)
570 if err != nil {
571 t.setReqCanceler(cancelKey, nil)
572 req.closeBody()
573 return nil, err
574 }
575
576 var resp *Response
577 if pconn.alt != nil {
578
579 t.setReqCanceler(cancelKey, nil)
580 resp, err = pconn.alt.RoundTrip(req)
581 } else {
582 resp, err = pconn.roundTrip(treq)
583 }
584 if err == nil {
585 resp.Request = origReq
586 return resp, nil
587 }
588
589
590 if http2isNoCachedConnError(err) {
591 if t.removeIdleConn(pconn) {
592 t.decConnsPerHost(pconn.cacheKey)
593 }
594 } else if !pconn.shouldRetryRequest(req, err) {
595
596
597 if e, ok := err.(transportReadFromServerError); ok {
598 err = e.err
599 }
600 return nil, err
601 }
602 testHookRoundTripRetried()
603
604
605 req, err = rewindBody(req)
606 if err != nil {
607 return nil, err
608 }
609 }
610 }
611
612 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
613
614 type readTrackingBody struct {
615 io.ReadCloser
616 didRead bool
617 }
618
619 func (r *readTrackingBody) Read(data []byte) (int, error) {
620 r.didRead = true
621 return r.ReadCloser.Read(data)
622 }
623
624
625
626
627
628 func setupRewindBody(req *Request) *Request {
629 if req.Body == nil || req.Body == NoBody {
630 return req
631 }
632 newReq := *req
633 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
634 return &newReq
635 }
636
637
638
639
640
641 func rewindBody(req *Request) (rewound *Request, err error) {
642 if req.Body == nil || req.Body == NoBody || !req.Body.(*readTrackingBody).didRead {
643 return req, nil
644 }
645 req.closeBody()
646 if req.GetBody == nil {
647 return nil, errCannotRewind
648 }
649 body, err := req.GetBody()
650 if err != nil {
651 return nil, err
652 }
653 newReq := *req
654 newReq.Body = &readTrackingBody{ReadCloser: body}
655 return &newReq, nil
656 }
657
658
659
660
661 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
662 if http2isNoCachedConnError(err) {
663
664
665
666
667
668
669 return true
670 }
671 if err == errMissingHost {
672
673 return false
674 }
675 if !pc.isReused() {
676
677
678
679
680
681
682
683 return false
684 }
685 if _, ok := err.(nothingWrittenError); ok {
686
687
688 return req.outgoingLength() == 0 || req.GetBody != nil
689 }
690 if !req.isReplayable() {
691
692 return false
693 }
694 if _, ok := err.(transportReadFromServerError); ok {
695
696
697 return true
698 }
699 if err == errServerClosedIdle {
700
701
702
703 return true
704 }
705 return false
706 }
707
708
709 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
710
711
712
713
714
715
716
717
718
719
720
721 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
722 t.altMu.Lock()
723 defer t.altMu.Unlock()
724 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
725 if _, exists := oldMap[scheme]; exists {
726 panic("protocol " + scheme + " already registered")
727 }
728 newMap := make(map[string]RoundTripper)
729 for k, v := range oldMap {
730 newMap[k] = v
731 }
732 newMap[scheme] = rt
733 t.altProto.Store(newMap)
734 }
735
736
737
738
739
740 func (t *Transport) CloseIdleConnections() {
741 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
742 t.idleMu.Lock()
743 m := t.idleConn
744 t.idleConn = nil
745 t.closeIdle = true
746 t.idleLRU = connLRU{}
747 t.idleMu.Unlock()
748 for _, conns := range m {
749 for _, pconn := range conns {
750 pconn.close(errCloseIdleConns)
751 }
752 }
753 if t2 := t.h2transport; t2 != nil {
754 t2.CloseIdleConnections()
755 }
756 }
757
758
759
760
761
762
763
764 func (t *Transport) CancelRequest(req *Request) {
765 t.cancelRequest(cancelKey{req}, errRequestCanceled)
766 }
767
768
769 func (t *Transport) cancelRequest(key cancelKey, err error) {
770 t.reqMu.Lock()
771 cancel := t.reqCanceler[key]
772 delete(t.reqCanceler, key)
773 t.reqMu.Unlock()
774 if cancel != nil {
775 cancel(err)
776 }
777 }
778
779
780
781
782
783 var (
784
785 envProxyOnce sync.Once
786 envProxyFuncValue func(*url.URL) (*url.URL, error)
787 )
788
789
790
791
792 func envProxyFunc() func(*url.URL) (*url.URL, error) {
793 envProxyOnce.Do(func() {
794 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
795 })
796 return envProxyFuncValue
797 }
798
799
800 func resetProxyConfig() {
801 envProxyOnce = sync.Once{}
802 envProxyFuncValue = nil
803 }
804
805 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
806 cm.targetScheme = treq.URL.Scheme
807 cm.targetAddr = canonicalAddr(treq.URL)
808 if t.Proxy != nil {
809 cm.proxyURL, err = t.Proxy(treq.Request)
810 }
811 cm.onlyH1 = treq.requiresHTTP1()
812 return cm, err
813 }
814
815
816
817 func (cm *connectMethod) proxyAuth() string {
818 if cm.proxyURL == nil {
819 return ""
820 }
821 if u := cm.proxyURL.User; u != nil {
822 username := u.Username()
823 password, _ := u.Password()
824 return "Basic " + basicAuth(username, password)
825 }
826 return ""
827 }
828
829
830 var (
831 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
832 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
833 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
834 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
835 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
836 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
837 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
838 errIdleConnTimeout = errors.New("http: idle connection timeout")
839
840
841
842
843
844 errServerClosedIdle = errors.New("http: server closed idle connection")
845 )
846
847
848
849
850
851
852
853
854
855 type transportReadFromServerError struct {
856 err error
857 }
858
859 func (e transportReadFromServerError) Unwrap() error { return e.err }
860
861 func (e transportReadFromServerError) Error() string {
862 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
863 }
864
865 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
866 if err := t.tryPutIdleConn(pconn); err != nil {
867 pconn.close(err)
868 }
869 }
870
871 func (t *Transport) maxIdleConnsPerHost() int {
872 if v := t.MaxIdleConnsPerHost; v != 0 {
873 return v
874 }
875 return DefaultMaxIdleConnsPerHost
876 }
877
878
879
880
881
882
883 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
884 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
885 return errKeepAlivesDisabled
886 }
887 if pconn.isBroken() {
888 return errConnBroken
889 }
890 pconn.markReused()
891
892 t.idleMu.Lock()
893 defer t.idleMu.Unlock()
894
895
896
897
898 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
899 return nil
900 }
901
902
903
904
905
906 key := pconn.cacheKey
907 if q, ok := t.idleConnWait[key]; ok {
908 done := false
909 if pconn.alt == nil {
910
911
912 for q.len() > 0 {
913 w := q.popFront()
914 if w.tryDeliver(pconn, nil) {
915 done = true
916 break
917 }
918 }
919 } else {
920
921
922
923
924 for q.len() > 0 {
925 w := q.popFront()
926 w.tryDeliver(pconn, nil)
927 }
928 }
929 if q.len() == 0 {
930 delete(t.idleConnWait, key)
931 } else {
932 t.idleConnWait[key] = q
933 }
934 if done {
935 return nil
936 }
937 }
938
939 if t.closeIdle {
940 return errCloseIdle
941 }
942 if t.idleConn == nil {
943 t.idleConn = make(map[connectMethodKey][]*persistConn)
944 }
945 idles := t.idleConn[key]
946 if len(idles) >= t.maxIdleConnsPerHost() {
947 return errTooManyIdleHost
948 }
949 for _, exist := range idles {
950 if exist == pconn {
951 log.Fatalf("dup idle pconn %p in freelist", pconn)
952 }
953 }
954 t.idleConn[key] = append(idles, pconn)
955 t.idleLRU.add(pconn)
956 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
957 oldest := t.idleLRU.removeOldest()
958 oldest.close(errTooManyIdle)
959 t.removeIdleConnLocked(oldest)
960 }
961
962
963
964
965 if t.IdleConnTimeout > 0 && pconn.alt == nil {
966 if pconn.idleTimer != nil {
967 pconn.idleTimer.Reset(t.IdleConnTimeout)
968 } else {
969 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
970 }
971 }
972 pconn.idleAt = time.Now()
973 return nil
974 }
975
976
977
978
979 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
980 if t.DisableKeepAlives {
981 return false
982 }
983
984 t.idleMu.Lock()
985 defer t.idleMu.Unlock()
986
987
988
989 t.closeIdle = false
990
991 if w == nil {
992
993 return false
994 }
995
996
997
998
999 var oldTime time.Time
1000 if t.IdleConnTimeout > 0 {
1001 oldTime = time.Now().Add(-t.IdleConnTimeout)
1002 }
1003
1004
1005 if list, ok := t.idleConn[w.key]; ok {
1006 stop := false
1007 delivered := false
1008 for len(list) > 0 && !stop {
1009 pconn := list[len(list)-1]
1010
1011
1012
1013
1014 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1015 if tooOld {
1016
1017
1018
1019 go pconn.closeConnIfStillIdle()
1020 }
1021 if pconn.isBroken() || tooOld {
1022
1023
1024
1025
1026
1027 list = list[:len(list)-1]
1028 continue
1029 }
1030 delivered = w.tryDeliver(pconn, nil)
1031 if delivered {
1032 if pconn.alt != nil {
1033
1034
1035 } else {
1036
1037
1038 t.idleLRU.remove(pconn)
1039 list = list[:len(list)-1]
1040 }
1041 }
1042 stop = true
1043 }
1044 if len(list) > 0 {
1045 t.idleConn[w.key] = list
1046 } else {
1047 delete(t.idleConn, w.key)
1048 }
1049 if stop {
1050 return delivered
1051 }
1052 }
1053
1054
1055 if t.idleConnWait == nil {
1056 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1057 }
1058 q := t.idleConnWait[w.key]
1059 q.cleanFront()
1060 q.pushBack(w)
1061 t.idleConnWait[w.key] = q
1062 return false
1063 }
1064
1065
1066 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1067 t.idleMu.Lock()
1068 defer t.idleMu.Unlock()
1069 return t.removeIdleConnLocked(pconn)
1070 }
1071
1072
1073 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1074 if pconn.idleTimer != nil {
1075 pconn.idleTimer.Stop()
1076 }
1077 t.idleLRU.remove(pconn)
1078 key := pconn.cacheKey
1079 pconns := t.idleConn[key]
1080 var removed bool
1081 switch len(pconns) {
1082 case 0:
1083
1084 case 1:
1085 if pconns[0] == pconn {
1086 delete(t.idleConn, key)
1087 removed = true
1088 }
1089 default:
1090 for i, v := range pconns {
1091 if v != pconn {
1092 continue
1093 }
1094
1095
1096 copy(pconns[i:], pconns[i+1:])
1097 t.idleConn[key] = pconns[:len(pconns)-1]
1098 removed = true
1099 break
1100 }
1101 }
1102 return removed
1103 }
1104
1105 func (t *Transport) setReqCanceler(key cancelKey, fn func(error)) {
1106 t.reqMu.Lock()
1107 defer t.reqMu.Unlock()
1108 if t.reqCanceler == nil {
1109 t.reqCanceler = make(map[cancelKey]func(error))
1110 }
1111 if fn != nil {
1112 t.reqCanceler[key] = fn
1113 } else {
1114 delete(t.reqCanceler, key)
1115 }
1116 }
1117
1118
1119
1120
1121
1122 func (t *Transport) replaceReqCanceler(key cancelKey, fn func(error)) bool {
1123 t.reqMu.Lock()
1124 defer t.reqMu.Unlock()
1125 _, ok := t.reqCanceler[key]
1126 if !ok {
1127 return false
1128 }
1129 if fn != nil {
1130 t.reqCanceler[key] = fn
1131 } else {
1132 delete(t.reqCanceler, key)
1133 }
1134 return true
1135 }
1136
1137 var zeroDialer net.Dialer
1138
1139 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1140 if t.DialContext != nil {
1141 return t.DialContext(ctx, network, addr)
1142 }
1143 if t.Dial != nil {
1144 c, err := t.Dial(network, addr)
1145 if c == nil && err == nil {
1146 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1147 }
1148 return c, err
1149 }
1150 return zeroDialer.DialContext(ctx, network, addr)
1151 }
1152
1153
1154
1155
1156
1157
1158
1159 type wantConn struct {
1160 cm connectMethod
1161 key connectMethodKey
1162 ctx context.Context
1163 ready chan struct{}
1164
1165
1166
1167
1168 beforeDial func()
1169 afterDial func()
1170
1171 mu sync.Mutex
1172 pc *persistConn
1173 err error
1174 }
1175
1176
1177 func (w *wantConn) waiting() bool {
1178 select {
1179 case <-w.ready:
1180 return false
1181 default:
1182 return true
1183 }
1184 }
1185
1186
1187 func (w *wantConn) tryDeliver(pc *persistConn, err error) bool {
1188 w.mu.Lock()
1189 defer w.mu.Unlock()
1190
1191 if w.pc != nil || w.err != nil {
1192 return false
1193 }
1194
1195 w.pc = pc
1196 w.err = err
1197 if w.pc == nil && w.err == nil {
1198 panic("net/http: internal error: misuse of tryDeliver")
1199 }
1200 close(w.ready)
1201 return true
1202 }
1203
1204
1205
1206 func (w *wantConn) cancel(t *Transport, err error) {
1207 w.mu.Lock()
1208 if w.pc == nil && w.err == nil {
1209 close(w.ready)
1210 }
1211 pc := w.pc
1212 w.pc = nil
1213 w.err = err
1214 w.mu.Unlock()
1215
1216 if pc != nil {
1217 t.putOrCloseIdleConn(pc)
1218 }
1219 }
1220
1221
1222 type wantConnQueue struct {
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233 head []*wantConn
1234 headPos int
1235 tail []*wantConn
1236 }
1237
1238
1239 func (q *wantConnQueue) len() int {
1240 return len(q.head) - q.headPos + len(q.tail)
1241 }
1242
1243
1244 func (q *wantConnQueue) pushBack(w *wantConn) {
1245 q.tail = append(q.tail, w)
1246 }
1247
1248
1249 func (q *wantConnQueue) popFront() *wantConn {
1250 if q.headPos >= len(q.head) {
1251 if len(q.tail) == 0 {
1252 return nil
1253 }
1254
1255 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1256 }
1257 w := q.head[q.headPos]
1258 q.head[q.headPos] = nil
1259 q.headPos++
1260 return w
1261 }
1262
1263
1264 func (q *wantConnQueue) peekFront() *wantConn {
1265 if q.headPos < len(q.head) {
1266 return q.head[q.headPos]
1267 }
1268 if len(q.tail) > 0 {
1269 return q.tail[0]
1270 }
1271 return nil
1272 }
1273
1274
1275
1276 func (q *wantConnQueue) cleanFront() (cleaned bool) {
1277 for {
1278 w := q.peekFront()
1279 if w == nil || w.waiting() {
1280 return cleaned
1281 }
1282 q.popFront()
1283 cleaned = true
1284 }
1285 }
1286
1287 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1288 if t.DialTLSContext != nil {
1289 conn, err = t.DialTLSContext(ctx, network, addr)
1290 } else {
1291 conn, err = t.DialTLS(network, addr)
1292 }
1293 if conn == nil && err == nil {
1294 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1295 }
1296 return
1297 }
1298
1299
1300
1301
1302
1303 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
1304 req := treq.Request
1305 trace := treq.trace
1306 ctx := req.Context()
1307 if trace != nil && trace.GetConn != nil {
1308 trace.GetConn(cm.addr())
1309 }
1310
1311 w := &wantConn{
1312 cm: cm,
1313 key: cm.key(),
1314 ctx: ctx,
1315 ready: make(chan struct{}, 1),
1316 beforeDial: testHookPrePendingDial,
1317 afterDial: testHookPostPendingDial,
1318 }
1319 defer func() {
1320 if err != nil {
1321 w.cancel(t, err)
1322 }
1323 }()
1324
1325
1326 if delivered := t.queueForIdleConn(w); delivered {
1327 pc := w.pc
1328
1329
1330 if pc.alt == nil && trace != nil && trace.GotConn != nil {
1331 trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
1332 }
1333
1334
1335
1336 t.setReqCanceler(treq.cancelKey, func(error) {})
1337 return pc, nil
1338 }
1339
1340 cancelc := make(chan error, 1)
1341 t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })
1342
1343
1344 t.queueForDial(w)
1345
1346
1347 select {
1348 case <-w.ready:
1349
1350
1351 if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
1352 trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
1353 }
1354 if w.err != nil {
1355
1356
1357
1358 select {
1359 case <-req.Cancel:
1360 return nil, errRequestCanceledConn
1361 case <-req.Context().Done():
1362 return nil, req.Context().Err()
1363 case err := <-cancelc:
1364 if err == errRequestCanceled {
1365 err = errRequestCanceledConn
1366 }
1367 return nil, err
1368 default:
1369
1370 }
1371 }
1372 return w.pc, w.err
1373 case <-req.Cancel:
1374 return nil, errRequestCanceledConn
1375 case <-req.Context().Done():
1376 return nil, req.Context().Err()
1377 case err := <-cancelc:
1378 if err == errRequestCanceled {
1379 err = errRequestCanceledConn
1380 }
1381 return nil, err
1382 }
1383 }
1384
1385
1386
1387 func (t *Transport) queueForDial(w *wantConn) {
1388 w.beforeDial()
1389 if t.MaxConnsPerHost <= 0 {
1390 go t.dialConnFor(w)
1391 return
1392 }
1393
1394 t.connsPerHostMu.Lock()
1395 defer t.connsPerHostMu.Unlock()
1396
1397 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1398 if t.connsPerHost == nil {
1399 t.connsPerHost = make(map[connectMethodKey]int)
1400 }
1401 t.connsPerHost[w.key] = n + 1
1402 go t.dialConnFor(w)
1403 return
1404 }
1405
1406 if t.connsPerHostWait == nil {
1407 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1408 }
1409 q := t.connsPerHostWait[w.key]
1410 q.cleanFront()
1411 q.pushBack(w)
1412 t.connsPerHostWait[w.key] = q
1413 }
1414
1415
1416
1417
1418 func (t *Transport) dialConnFor(w *wantConn) {
1419 defer w.afterDial()
1420
1421 pc, err := t.dialConn(w.ctx, w.cm)
1422 delivered := w.tryDeliver(pc, err)
1423 if err == nil && (!delivered || pc.alt != nil) {
1424
1425
1426
1427 t.putOrCloseIdleConn(pc)
1428 }
1429 if err != nil {
1430 t.decConnsPerHost(w.key)
1431 }
1432 }
1433
1434
1435
1436 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1437 if t.MaxConnsPerHost <= 0 {
1438 return
1439 }
1440
1441 t.connsPerHostMu.Lock()
1442 defer t.connsPerHostMu.Unlock()
1443 n := t.connsPerHost[key]
1444 if n == 0 {
1445
1446
1447 panic("net/http: internal error: connCount underflow")
1448 }
1449
1450
1451
1452
1453
1454 if q := t.connsPerHostWait[key]; q.len() > 0 {
1455 done := false
1456 for q.len() > 0 {
1457 w := q.popFront()
1458 if w.waiting() {
1459 go t.dialConnFor(w)
1460 done = true
1461 break
1462 }
1463 }
1464 if q.len() == 0 {
1465 delete(t.connsPerHostWait, key)
1466 } else {
1467
1468
1469 t.connsPerHostWait[key] = q
1470 }
1471 if done {
1472 return
1473 }
1474 }
1475
1476
1477 if n--; n == 0 {
1478 delete(t.connsPerHost, key)
1479 } else {
1480 t.connsPerHost[key] = n
1481 }
1482 }
1483
1484
1485
1486
1487 func (pconn *persistConn) addTLS(name string, trace *httptrace.ClientTrace) error {
1488
1489 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1490 if cfg.ServerName == "" {
1491 cfg.ServerName = name
1492 }
1493 if pconn.cacheKey.onlyH1 {
1494 cfg.NextProtos = nil
1495 }
1496 plainConn := pconn.conn
1497 tlsConn := tls.Client(plainConn, cfg)
1498 errc := make(chan error, 2)
1499 var timer *time.Timer
1500 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1501 timer = time.AfterFunc(d, func() {
1502 errc <- tlsHandshakeTimeoutError{}
1503 })
1504 }
1505 go func() {
1506 if trace != nil && trace.TLSHandshakeStart != nil {
1507 trace.TLSHandshakeStart()
1508 }
1509 err := tlsConn.Handshake()
1510 if timer != nil {
1511 timer.Stop()
1512 }
1513 errc <- err
1514 }()
1515 if err := <-errc; err != nil {
1516 plainConn.Close()
1517 if trace != nil && trace.TLSHandshakeDone != nil {
1518 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1519 }
1520 return err
1521 }
1522 cs := tlsConn.ConnectionState()
1523 if trace != nil && trace.TLSHandshakeDone != nil {
1524 trace.TLSHandshakeDone(cs, nil)
1525 }
1526 pconn.tlsState = &cs
1527 pconn.conn = tlsConn
1528 return nil
1529 }
1530
1531 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
1532 pconn = &persistConn{
1533 t: t,
1534 cacheKey: cm.key(),
1535 reqch: make(chan requestAndChan, 1),
1536 writech: make(chan writeRequest, 1),
1537 closech: make(chan struct{}),
1538 writeErrCh: make(chan error, 1),
1539 writeLoopDone: make(chan struct{}),
1540 }
1541 trace := httptrace.ContextClientTrace(ctx)
1542 wrapErr := func(err error) error {
1543 if cm.proxyURL != nil {
1544
1545 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1546 }
1547 return err
1548 }
1549 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1550 var err error
1551 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1552 if err != nil {
1553 return nil, wrapErr(err)
1554 }
1555 if tc, ok := pconn.conn.(*tls.Conn); ok {
1556
1557
1558 if trace != nil && trace.TLSHandshakeStart != nil {
1559 trace.TLSHandshakeStart()
1560 }
1561 if err := tc.Handshake(); err != nil {
1562 go pconn.conn.Close()
1563 if trace != nil && trace.TLSHandshakeDone != nil {
1564 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1565 }
1566 return nil, err
1567 }
1568 cs := tc.ConnectionState()
1569 if trace != nil && trace.TLSHandshakeDone != nil {
1570 trace.TLSHandshakeDone(cs, nil)
1571 }
1572 pconn.tlsState = &cs
1573 }
1574 } else {
1575 conn, err := t.dial(ctx, "tcp", cm.addr())
1576 if err != nil {
1577 return nil, wrapErr(err)
1578 }
1579 pconn.conn = conn
1580 if cm.scheme() == "https" {
1581 var firstTLSHost string
1582 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1583 return nil, wrapErr(err)
1584 }
1585 if err = pconn.addTLS(firstTLSHost, trace); err != nil {
1586 return nil, wrapErr(err)
1587 }
1588 }
1589 }
1590
1591
1592 switch {
1593 case cm.proxyURL == nil:
1594
1595 case cm.proxyURL.Scheme == "socks5":
1596 conn := pconn.conn
1597 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1598 if u := cm.proxyURL.User; u != nil {
1599 auth := &socksUsernamePassword{
1600 Username: u.Username(),
1601 }
1602 auth.Password, _ = u.Password()
1603 d.AuthMethods = []socksAuthMethod{
1604 socksAuthMethodNotRequired,
1605 socksAuthMethodUsernamePassword,
1606 }
1607 d.Authenticate = auth.Authenticate
1608 }
1609 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1610 conn.Close()
1611 return nil, err
1612 }
1613 case cm.targetScheme == "http":
1614 pconn.isProxy = true
1615 if pa := cm.proxyAuth(); pa != "" {
1616 pconn.mutateHeaderFunc = func(h Header) {
1617 h.Set("Proxy-Authorization", pa)
1618 }
1619 }
1620 case cm.targetScheme == "https":
1621 conn := pconn.conn
1622 hdr := t.ProxyConnectHeader
1623 if hdr == nil {
1624 hdr = make(Header)
1625 }
1626 if pa := cm.proxyAuth(); pa != "" {
1627 hdr = hdr.Clone()
1628 hdr.Set("Proxy-Authorization", pa)
1629 }
1630 connectReq := &Request{
1631 Method: "CONNECT",
1632 URL: &url.URL{Opaque: cm.targetAddr},
1633 Host: cm.targetAddr,
1634 Header: hdr,
1635 }
1636
1637
1638
1639
1640
1641
1642 connectCtx := ctx
1643 if ctx.Done() == nil {
1644 newCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
1645 defer cancel()
1646 connectCtx = newCtx
1647 }
1648
1649 didReadResponse := make(chan struct{})
1650 var (
1651 resp *Response
1652 err error
1653 )
1654
1655 go func() {
1656 defer close(didReadResponse)
1657 err = connectReq.Write(conn)
1658 if err != nil {
1659 return
1660 }
1661
1662
1663 br := bufio.NewReader(conn)
1664 resp, err = ReadResponse(br, connectReq)
1665 }()
1666 select {
1667 case <-connectCtx.Done():
1668 conn.Close()
1669 <-didReadResponse
1670 return nil, connectCtx.Err()
1671 case <-didReadResponse:
1672
1673 }
1674 if err != nil {
1675 conn.Close()
1676 return nil, err
1677 }
1678 if resp.StatusCode != 200 {
1679 f := strings.SplitN(resp.Status, " ", 2)
1680 conn.Close()
1681 if len(f) < 2 {
1682 return nil, errors.New("unknown status code")
1683 }
1684 return nil, errors.New(f[1])
1685 }
1686 }
1687
1688 if cm.proxyURL != nil && cm.targetScheme == "https" {
1689 if err := pconn.addTLS(cm.tlsHost(), trace); err != nil {
1690 return nil, err
1691 }
1692 }
1693
1694 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1695 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1696 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
1697 if e, ok := alt.(http2erringRoundTripper); ok {
1698
1699 return nil, e.err
1700 }
1701 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1702 }
1703 }
1704
1705 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1706 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1707
1708 go pconn.readLoop()
1709 go pconn.writeLoop()
1710 return pconn, nil
1711 }
1712
1713
1714
1715
1716
1717
1718
1719 type persistConnWriter struct {
1720 pc *persistConn
1721 }
1722
1723 func (w persistConnWriter) Write(p []byte) (n int, err error) {
1724 n, err = w.pc.conn.Write(p)
1725 w.pc.nwrite += int64(n)
1726 return
1727 }
1728
1729
1730
1731
1732 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
1733 n, err = io.Copy(w.pc.conn, r)
1734 w.pc.nwrite += n
1735 return
1736 }
1737
1738 var _ io.ReaderFrom = (*persistConnWriter)(nil)
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757 type connectMethod struct {
1758 _ incomparable
1759 proxyURL *url.URL
1760 targetScheme string
1761
1762
1763
1764 targetAddr string
1765 onlyH1 bool
1766 }
1767
1768 func (cm *connectMethod) key() connectMethodKey {
1769 proxyStr := ""
1770 targetAddr := cm.targetAddr
1771 if cm.proxyURL != nil {
1772 proxyStr = cm.proxyURL.String()
1773 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
1774 targetAddr = ""
1775 }
1776 }
1777 return connectMethodKey{
1778 proxy: proxyStr,
1779 scheme: cm.targetScheme,
1780 addr: targetAddr,
1781 onlyH1: cm.onlyH1,
1782 }
1783 }
1784
1785
1786 func (cm *connectMethod) scheme() string {
1787 if cm.proxyURL != nil {
1788 return cm.proxyURL.Scheme
1789 }
1790 return cm.targetScheme
1791 }
1792
1793
1794 func (cm *connectMethod) addr() string {
1795 if cm.proxyURL != nil {
1796 return canonicalAddr(cm.proxyURL)
1797 }
1798 return cm.targetAddr
1799 }
1800
1801
1802
1803 func (cm *connectMethod) tlsHost() string {
1804 h := cm.targetAddr
1805 if hasPort(h) {
1806 h = h[:strings.LastIndex(h, ":")]
1807 }
1808 return h
1809 }
1810
1811
1812
1813
1814 type connectMethodKey struct {
1815 proxy, scheme, addr string
1816 onlyH1 bool
1817 }
1818
1819 func (k connectMethodKey) String() string {
1820
1821 var h1 string
1822 if k.onlyH1 {
1823 h1 = ",h1"
1824 }
1825 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
1826 }
1827
1828
1829
1830 type persistConn struct {
1831
1832
1833
1834 alt RoundTripper
1835
1836 t *Transport
1837 cacheKey connectMethodKey
1838 conn net.Conn
1839 tlsState *tls.ConnectionState
1840 br *bufio.Reader
1841 bw *bufio.Writer
1842 nwrite int64
1843 reqch chan requestAndChan
1844 writech chan writeRequest
1845 closech chan struct{}
1846 isProxy bool
1847 sawEOF bool
1848 readLimit int64
1849
1850
1851
1852
1853 writeErrCh chan error
1854
1855 writeLoopDone chan struct{}
1856
1857
1858 idleAt time.Time
1859 idleTimer *time.Timer
1860
1861 mu sync.Mutex
1862 numExpectedResponses int
1863 closed error
1864 canceledErr error
1865 broken bool
1866 reused bool
1867
1868
1869
1870 mutateHeaderFunc func(Header)
1871 }
1872
1873 func (pc *persistConn) maxHeaderResponseSize() int64 {
1874 if v := pc.t.MaxResponseHeaderBytes; v != 0 {
1875 return v
1876 }
1877 return 10 << 20
1878 }
1879
1880 func (pc *persistConn) Read(p []byte) (n int, err error) {
1881 if pc.readLimit <= 0 {
1882 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
1883 }
1884 if int64(len(p)) > pc.readLimit {
1885 p = p[:pc.readLimit]
1886 }
1887 n, err = pc.conn.Read(p)
1888 if err == io.EOF {
1889 pc.sawEOF = true
1890 }
1891 pc.readLimit -= int64(n)
1892 return
1893 }
1894
1895
1896 func (pc *persistConn) isBroken() bool {
1897 pc.mu.Lock()
1898 b := pc.closed != nil
1899 pc.mu.Unlock()
1900 return b
1901 }
1902
1903
1904
1905 func (pc *persistConn) canceled() error {
1906 pc.mu.Lock()
1907 defer pc.mu.Unlock()
1908 return pc.canceledErr
1909 }
1910
1911
1912 func (pc *persistConn) isReused() bool {
1913 pc.mu.Lock()
1914 r := pc.reused
1915 pc.mu.Unlock()
1916 return r
1917 }
1918
1919 func (pc *persistConn) gotIdleConnTrace(idleAt time.Time) (t httptrace.GotConnInfo) {
1920 pc.mu.Lock()
1921 defer pc.mu.Unlock()
1922 t.Reused = pc.reused
1923 t.Conn = pc.conn
1924 t.WasIdle = true
1925 if !idleAt.IsZero() {
1926 t.IdleTime = time.Since(idleAt)
1927 }
1928 return
1929 }
1930
1931 func (pc *persistConn) cancelRequest(err error) {
1932 pc.mu.Lock()
1933 defer pc.mu.Unlock()
1934 pc.canceledErr = err
1935 pc.closeLocked(errRequestCanceled)
1936 }
1937
1938
1939
1940
1941 func (pc *persistConn) closeConnIfStillIdle() {
1942 t := pc.t
1943 t.idleMu.Lock()
1944 defer t.idleMu.Unlock()
1945 if _, ok := t.idleLRU.m[pc]; !ok {
1946
1947 return
1948 }
1949 t.removeIdleConnLocked(pc)
1950 pc.close(errIdleConnTimeout)
1951 }
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
1962 if err == nil {
1963 return nil
1964 }
1965
1966
1967
1968
1969 if cerr := pc.canceled(); cerr != nil {
1970 return cerr
1971 }
1972
1973
1974 req.mu.Lock()
1975 reqErr := req.err
1976 req.mu.Unlock()
1977 if reqErr != nil {
1978 return reqErr
1979 }
1980
1981 if err == errServerClosedIdle {
1982
1983 return err
1984 }
1985
1986 if _, ok := err.(transportReadFromServerError); ok {
1987
1988 return err
1989 }
1990 if pc.isBroken() {
1991 <-pc.writeLoopDone
1992 if pc.nwrite == startBytesWritten {
1993 return nothingWrittenError{err}
1994 }
1995 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %v", err)
1996 }
1997 return err
1998 }
1999
2000
2001
2002
2003 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2004
2005 func (pc *persistConn) readLoop() {
2006 closeErr := errReadLoopExiting
2007 defer func() {
2008 pc.close(closeErr)
2009 pc.t.removeIdleConn(pc)
2010 }()
2011
2012 tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
2013 if err := pc.t.tryPutIdleConn(pc); err != nil {
2014 closeErr = err
2015 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2016 trace.PutIdleConn(err)
2017 }
2018 return false
2019 }
2020 if trace != nil && trace.PutIdleConn != nil {
2021 trace.PutIdleConn(nil)
2022 }
2023 return true
2024 }
2025
2026
2027
2028
2029 eofc := make(chan struct{})
2030 defer close(eofc)
2031
2032
2033 testHookMu.Lock()
2034 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2035 testHookMu.Unlock()
2036
2037 alive := true
2038 for alive {
2039 pc.readLimit = pc.maxHeaderResponseSize()
2040 _, err := pc.br.Peek(1)
2041
2042 pc.mu.Lock()
2043 if pc.numExpectedResponses == 0 {
2044 pc.readLoopPeekFailLocked(err)
2045 pc.mu.Unlock()
2046 return
2047 }
2048 pc.mu.Unlock()
2049
2050 rc := <-pc.reqch
2051 trace := httptrace.ContextClientTrace(rc.req.Context())
2052
2053 var resp *Response
2054 if err == nil {
2055 resp, err = pc.readResponse(rc, trace)
2056 } else {
2057 err = transportReadFromServerError{err}
2058 closeErr = err
2059 }
2060
2061 if err != nil {
2062 if pc.readLimit <= 0 {
2063 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2064 }
2065
2066 select {
2067 case rc.ch <- responseAndError{err: err}:
2068 case <-rc.callerGone:
2069 return
2070 }
2071 return
2072 }
2073 pc.readLimit = maxInt64
2074
2075 pc.mu.Lock()
2076 pc.numExpectedResponses--
2077 pc.mu.Unlock()
2078
2079 bodyWritable := resp.bodyIsWritable()
2080 hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
2081
2082 if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
2083
2084
2085
2086 alive = false
2087 }
2088
2089 if !hasBody || bodyWritable {
2090 pc.t.setReqCanceler(rc.cancelKey, nil)
2091
2092
2093
2094
2095
2096
2097
2098 alive = alive &&
2099 !pc.sawEOF &&
2100 pc.wroteRequest() &&
2101 tryPutIdleConn(trace)
2102
2103 if bodyWritable {
2104 closeErr = errCallerOwnsConn
2105 }
2106
2107 select {
2108 case rc.ch <- responseAndError{res: resp}:
2109 case <-rc.callerGone:
2110 return
2111 }
2112
2113
2114
2115
2116 testHookReadLoopBeforeNextRead()
2117 continue
2118 }
2119
2120 waitForBodyRead := make(chan bool, 2)
2121 body := &bodyEOFSignal{
2122 body: resp.Body,
2123 earlyCloseFn: func() error {
2124 waitForBodyRead <- false
2125 <-eofc
2126 return nil
2127
2128 },
2129 fn: func(err error) error {
2130 isEOF := err == io.EOF
2131 waitForBodyRead <- isEOF
2132 if isEOF {
2133 <-eofc
2134 } else if err != nil {
2135 if cerr := pc.canceled(); cerr != nil {
2136 return cerr
2137 }
2138 }
2139 return err
2140 },
2141 }
2142
2143 resp.Body = body
2144 if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2145 resp.Body = &gzipReader{body: body}
2146 resp.Header.Del("Content-Encoding")
2147 resp.Header.Del("Content-Length")
2148 resp.ContentLength = -1
2149 resp.Uncompressed = true
2150 }
2151
2152 select {
2153 case rc.ch <- responseAndError{res: resp}:
2154 case <-rc.callerGone:
2155 return
2156 }
2157
2158
2159
2160
2161 select {
2162 case bodyEOF := <-waitForBodyRead:
2163 pc.t.setReqCanceler(rc.cancelKey, nil)
2164 alive = alive &&
2165 bodyEOF &&
2166 !pc.sawEOF &&
2167 pc.wroteRequest() &&
2168 tryPutIdleConn(trace)
2169 if bodyEOF {
2170 eofc <- struct{}{}
2171 }
2172 case <-rc.req.Cancel:
2173 alive = false
2174 pc.t.CancelRequest(rc.req)
2175 case <-rc.req.Context().Done():
2176 alive = false
2177 pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
2178 case <-pc.closech:
2179 alive = false
2180 }
2181
2182 testHookReadLoopBeforeNextRead()
2183 }
2184 }
2185
2186 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2187 if pc.closed != nil {
2188 return
2189 }
2190 if n := pc.br.Buffered(); n > 0 {
2191 buf, _ := pc.br.Peek(n)
2192 if is408Message(buf) {
2193 pc.closeLocked(errServerClosedIdle)
2194 return
2195 } else {
2196 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2197 }
2198 }
2199 if peekErr == io.EOF {
2200
2201 pc.closeLocked(errServerClosedIdle)
2202 } else {
2203 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %v", peekErr))
2204 }
2205 }
2206
2207
2208
2209
2210 func is408Message(buf []byte) bool {
2211 if len(buf) < len("HTTP/1.x 408") {
2212 return false
2213 }
2214 if string(buf[:7]) != "HTTP/1." {
2215 return false
2216 }
2217 return string(buf[8:12]) == " 408"
2218 }
2219
2220
2221
2222
2223 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2224 if trace != nil && trace.GotFirstResponseByte != nil {
2225 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2226 trace.GotFirstResponseByte()
2227 }
2228 }
2229 num1xx := 0
2230 const max1xxResponses = 5
2231
2232 continueCh := rc.continueCh
2233 for {
2234 resp, err = ReadResponse(pc.br, rc.req)
2235 if err != nil {
2236 return
2237 }
2238 resCode := resp.StatusCode
2239 if continueCh != nil {
2240 if resCode == 100 {
2241 if trace != nil && trace.Got100Continue != nil {
2242 trace.Got100Continue()
2243 }
2244 continueCh <- struct{}{}
2245 continueCh = nil
2246 } else if resCode >= 200 {
2247 close(continueCh)
2248 continueCh = nil
2249 }
2250 }
2251 is1xx := 100 <= resCode && resCode <= 199
2252
2253 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2254 if is1xxNonTerminal {
2255 num1xx++
2256 if num1xx > max1xxResponses {
2257 return nil, errors.New("net/http: too many 1xx informational responses")
2258 }
2259 pc.readLimit = pc.maxHeaderResponseSize()
2260 if trace != nil && trace.Got1xxResponse != nil {
2261 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2262 return nil, err
2263 }
2264 }
2265 continue
2266 }
2267 break
2268 }
2269 if resp.isProtocolSwitch() {
2270 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2271 }
2272
2273 resp.TLS = pc.tlsState
2274 return
2275 }
2276
2277
2278
2279
2280 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2281 if continueCh == nil {
2282 return nil
2283 }
2284 return func() bool {
2285 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2286 defer timer.Stop()
2287
2288 select {
2289 case _, ok := <-continueCh:
2290 return ok
2291 case <-timer.C:
2292 return true
2293 case <-pc.closech:
2294 return false
2295 }
2296 }
2297 }
2298
2299 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2300 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2301 if br.Buffered() != 0 {
2302 body.br = br
2303 }
2304 return body
2305 }
2306
2307
2308
2309
2310
2311
2312 type readWriteCloserBody struct {
2313 _ incomparable
2314 br *bufio.Reader
2315 io.ReadWriteCloser
2316 }
2317
2318 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2319 if b.br != nil {
2320 if n := b.br.Buffered(); len(p) > n {
2321 p = p[:n]
2322 }
2323 n, err = b.br.Read(p)
2324 if b.br.Buffered() == 0 {
2325 b.br = nil
2326 }
2327 return n, err
2328 }
2329 return b.ReadWriteCloser.Read(p)
2330 }
2331
2332
2333 type nothingWrittenError struct {
2334 error
2335 }
2336
2337 func (pc *persistConn) writeLoop() {
2338 defer close(pc.writeLoopDone)
2339 for {
2340 select {
2341 case wr := <-pc.writech:
2342 startBytesWritten := pc.nwrite
2343 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2344 if bre, ok := err.(requestBodyReadError); ok {
2345 err = bre.error
2346
2347
2348
2349
2350
2351
2352
2353 wr.req.setError(err)
2354 }
2355 if err == nil {
2356 err = pc.bw.Flush()
2357 }
2358 if err != nil {
2359 wr.req.Request.closeBody()
2360 if pc.nwrite == startBytesWritten {
2361 err = nothingWrittenError{err}
2362 }
2363 }
2364 pc.writeErrCh <- err
2365 wr.ch <- err
2366 if err != nil {
2367 pc.close(err)
2368 return
2369 }
2370 case <-pc.closech:
2371 return
2372 }
2373 }
2374 }
2375
2376
2377
2378
2379 const maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2380
2381
2382
2383 func (pc *persistConn) wroteRequest() bool {
2384 select {
2385 case err := <-pc.writeErrCh:
2386
2387
2388 return err == nil
2389 default:
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2401 defer t.Stop()
2402 select {
2403 case err := <-pc.writeErrCh:
2404 return err == nil
2405 case <-t.C:
2406 return false
2407 }
2408 }
2409 }
2410
2411
2412
2413 type responseAndError struct {
2414 _ incomparable
2415 res *Response
2416 err error
2417 }
2418
2419 type requestAndChan struct {
2420 _ incomparable
2421 req *Request
2422 cancelKey cancelKey
2423 ch chan responseAndError
2424
2425
2426
2427
2428 addedGzip bool
2429
2430
2431
2432
2433
2434 continueCh chan<- struct{}
2435
2436 callerGone <-chan struct{}
2437 }
2438
2439
2440
2441
2442
2443 type writeRequest struct {
2444 req *transportRequest
2445 ch chan<- error
2446
2447
2448
2449
2450 continueCh <-chan struct{}
2451 }
2452
2453 type httpError struct {
2454 err string
2455 timeout bool
2456 }
2457
2458 func (e *httpError) Error() string { return e.err }
2459 func (e *httpError) Timeout() bool { return e.timeout }
2460 func (e *httpError) Temporary() bool { return true }
2461
2462 var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true}
2463
2464
2465
2466 var errRequestCanceled = http2errRequestCanceled
2467 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2468
2469 func nop() {}
2470
2471
2472 var (
2473 testHookEnterRoundTrip = nop
2474 testHookWaitResLoop = nop
2475 testHookRoundTripRetried = nop
2476 testHookPrePendingDial = nop
2477 testHookPostPendingDial = nop
2478
2479 testHookMu sync.Locker = fakeLocker{}
2480 testHookReadLoopBeforeNextRead = nop
2481 )
2482
2483 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2484 testHookEnterRoundTrip()
2485 if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) {
2486 pc.t.putOrCloseIdleConn(pc)
2487 return nil, errRequestCanceled
2488 }
2489 pc.mu.Lock()
2490 pc.numExpectedResponses++
2491 headerFn := pc.mutateHeaderFunc
2492 pc.mu.Unlock()
2493
2494 if headerFn != nil {
2495 headerFn(req.extraHeaders())
2496 }
2497
2498
2499
2500
2501
2502 requestedGzip := false
2503 if !pc.t.DisableCompression &&
2504 req.Header.Get("Accept-Encoding") == "" &&
2505 req.Header.Get("Range") == "" &&
2506 req.Method != "HEAD" {
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519 requestedGzip = true
2520 req.extraHeaders().Set("Accept-Encoding", "gzip")
2521 }
2522
2523 var continueCh chan struct{}
2524 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2525 continueCh = make(chan struct{}, 1)
2526 }
2527
2528 if pc.t.DisableKeepAlives && !req.wantsClose() {
2529 req.extraHeaders().Set("Connection", "close")
2530 }
2531
2532 gone := make(chan struct{})
2533 defer close(gone)
2534
2535 defer func() {
2536 if err != nil {
2537 pc.t.setReqCanceler(req.cancelKey, nil)
2538 }
2539 }()
2540
2541 const debugRoundTrip = false
2542
2543
2544
2545
2546 startBytesWritten := pc.nwrite
2547 writeErrCh := make(chan error, 1)
2548 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2549
2550 resc := make(chan responseAndError)
2551 pc.reqch <- requestAndChan{
2552 req: req.Request,
2553 cancelKey: req.cancelKey,
2554 ch: resc,
2555 addedGzip: requestedGzip,
2556 continueCh: continueCh,
2557 callerGone: gone,
2558 }
2559
2560 var respHeaderTimer <-chan time.Time
2561 cancelChan := req.Request.Cancel
2562 ctxDoneChan := req.Context().Done()
2563 for {
2564 testHookWaitResLoop()
2565 select {
2566 case err := <-writeErrCh:
2567 if debugRoundTrip {
2568 req.logf("writeErrCh resv: %T/%#v", err, err)
2569 }
2570 if err != nil {
2571 pc.close(fmt.Errorf("write error: %v", err))
2572 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2573 }
2574 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2575 if debugRoundTrip {
2576 req.logf("starting timer for %v", d)
2577 }
2578 timer := time.NewTimer(d)
2579 defer timer.Stop()
2580 respHeaderTimer = timer.C
2581 }
2582 case <-pc.closech:
2583 if debugRoundTrip {
2584 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2585 }
2586 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2587 case <-respHeaderTimer:
2588 if debugRoundTrip {
2589 req.logf("timeout waiting for response headers.")
2590 }
2591 pc.close(errTimeout)
2592 return nil, errTimeout
2593 case re := <-resc:
2594 if (re.res == nil) == (re.err == nil) {
2595 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2596 }
2597 if debugRoundTrip {
2598 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2599 }
2600 if re.err != nil {
2601 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2602 }
2603 return re.res, nil
2604 case <-cancelChan:
2605 pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
2606 cancelChan = nil
2607 case <-ctxDoneChan:
2608 pc.t.cancelRequest(req.cancelKey, req.Context().Err())
2609 cancelChan = nil
2610 ctxDoneChan = nil
2611 }
2612 }
2613 }
2614
2615
2616
2617 type tLogKey struct{}
2618
2619 func (tr *transportRequest) logf(format string, args ...interface{}) {
2620 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...interface{})); ok {
2621 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2622 }
2623 }
2624
2625
2626
2627 func (pc *persistConn) markReused() {
2628 pc.mu.Lock()
2629 pc.reused = true
2630 pc.mu.Unlock()
2631 }
2632
2633
2634
2635
2636
2637
2638 func (pc *persistConn) close(err error) {
2639 pc.mu.Lock()
2640 defer pc.mu.Unlock()
2641 pc.closeLocked(err)
2642 }
2643
2644 func (pc *persistConn) closeLocked(err error) {
2645 if err == nil {
2646 panic("nil error")
2647 }
2648 pc.broken = true
2649 if pc.closed == nil {
2650 pc.closed = err
2651 pc.t.decConnsPerHost(pc.cacheKey)
2652
2653
2654 if pc.alt == nil {
2655 if err != errCallerOwnsConn {
2656 pc.conn.Close()
2657 }
2658 close(pc.closech)
2659 }
2660 }
2661 pc.mutateHeaderFunc = nil
2662 }
2663
2664 var portMap = map[string]string{
2665 "http": "80",
2666 "https": "443",
2667 "socks5": "1080",
2668 }
2669
2670
2671 func canonicalAddr(url *url.URL) string {
2672 addr := url.Hostname()
2673 if v, err := idnaASCII(addr); err == nil {
2674 addr = v
2675 }
2676 port := url.Port()
2677 if port == "" {
2678 port = portMap[url.Scheme]
2679 }
2680 return net.JoinHostPort(addr, port)
2681 }
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694 type bodyEOFSignal struct {
2695 body io.ReadCloser
2696 mu sync.Mutex
2697 closed bool
2698 rerr error
2699 fn func(error) error
2700 earlyCloseFn func() error
2701 }
2702
2703 var errReadOnClosedResBody = errors.New("http: read on closed response body")
2704
2705 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
2706 es.mu.Lock()
2707 closed, rerr := es.closed, es.rerr
2708 es.mu.Unlock()
2709 if closed {
2710 return 0, errReadOnClosedResBody
2711 }
2712 if rerr != nil {
2713 return 0, rerr
2714 }
2715
2716 n, err = es.body.Read(p)
2717 if err != nil {
2718 es.mu.Lock()
2719 defer es.mu.Unlock()
2720 if es.rerr == nil {
2721 es.rerr = err
2722 }
2723 err = es.condfn(err)
2724 }
2725 return
2726 }
2727
2728 func (es *bodyEOFSignal) Close() error {
2729 es.mu.Lock()
2730 defer es.mu.Unlock()
2731 if es.closed {
2732 return nil
2733 }
2734 es.closed = true
2735 if es.earlyCloseFn != nil && es.rerr != io.EOF {
2736 return es.earlyCloseFn()
2737 }
2738 err := es.body.Close()
2739 return es.condfn(err)
2740 }
2741
2742
2743 func (es *bodyEOFSignal) condfn(err error) error {
2744 if es.fn == nil {
2745 return err
2746 }
2747 err = es.fn(err)
2748 es.fn = nil
2749 return err
2750 }
2751
2752
2753
2754 type gzipReader struct {
2755 _ incomparable
2756 body *bodyEOFSignal
2757 zr *gzip.Reader
2758 zerr error
2759 }
2760
2761 func (gz *gzipReader) Read(p []byte) (n int, err error) {
2762 if gz.zr == nil {
2763 if gz.zerr == nil {
2764 gz.zr, gz.zerr = gzip.NewReader(gz.body)
2765 }
2766 if gz.zerr != nil {
2767 return 0, gz.zerr
2768 }
2769 }
2770
2771 gz.body.mu.Lock()
2772 if gz.body.closed {
2773 err = errReadOnClosedResBody
2774 }
2775 gz.body.mu.Unlock()
2776
2777 if err != nil {
2778 return 0, err
2779 }
2780 return gz.zr.Read(p)
2781 }
2782
2783 func (gz *gzipReader) Close() error {
2784 return gz.body.Close()
2785 }
2786
2787 type tlsHandshakeTimeoutError struct{}
2788
2789 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
2790 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
2791 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
2792
2793
2794
2795
2796 type fakeLocker struct{}
2797
2798 func (fakeLocker) Lock() {}
2799 func (fakeLocker) Unlock() {}
2800
2801
2802
2803
2804 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
2805 if cfg == nil {
2806 return &tls.Config{}
2807 }
2808 return cfg.Clone()
2809 }
2810
2811 type connLRU struct {
2812 ll *list.List
2813 m map[*persistConn]*list.Element
2814 }
2815
2816
2817 func (cl *connLRU) add(pc *persistConn) {
2818 if cl.ll == nil {
2819 cl.ll = list.New()
2820 cl.m = make(map[*persistConn]*list.Element)
2821 }
2822 ele := cl.ll.PushFront(pc)
2823 if _, ok := cl.m[pc]; ok {
2824 panic("persistConn was already in LRU")
2825 }
2826 cl.m[pc] = ele
2827 }
2828
2829 func (cl *connLRU) removeOldest() *persistConn {
2830 ele := cl.ll.Back()
2831 pc := ele.Value.(*persistConn)
2832 cl.ll.Remove(ele)
2833 delete(cl.m, pc)
2834 return pc
2835 }
2836
2837
2838 func (cl *connLRU) remove(pc *persistConn) {
2839 if ele, ok := cl.m[pc]; ok {
2840 cl.ll.Remove(ele)
2841 delete(cl.m, pc)
2842 }
2843 }
2844
2845
2846 func (cl *connLRU) len() int {
2847 return len(cl.m)
2848 }
2849
View as plain text