...
Run Format

Source file src/pkg/net/http/transport.go

     1	// Copyright 2011 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// HTTP client implementation. See RFC 2616.
     6	//
     7	// This is the low-level Transport implementation of RoundTripper.
     8	// The high-level interface is in client.go.
     9	
    10	package http
    11	
    12	import (
    13		"bufio"
    14		"compress/gzip"
    15		"crypto/tls"
    16		"errors"
    17		"fmt"
    18		"io"
    19		"log"
    20		"net"
    21		"net/url"
    22		"os"
    23		"strings"
    24		"sync"
    25		"time"
    26	)
    27	
    28	// DefaultTransport is the default implementation of Transport and is
    29	// used by DefaultClient. It establishes network connections as needed
    30	// and caches them for reuse by subsequent calls. It uses HTTP proxies
    31	// as directed by the $HTTP_PROXY and $NO_PROXY (or $http_proxy and
    32	// $no_proxy) environment variables.
    33	var DefaultTransport RoundTripper = &Transport{
    34		Proxy: ProxyFromEnvironment,
    35		Dial: (&net.Dialer{
    36			Timeout:   30 * time.Second,
    37			KeepAlive: 30 * time.Second,
    38		}).Dial,
    39		TLSHandshakeTimeout: 10 * time.Second,
    40	}
    41	
    42	// DefaultMaxIdleConnsPerHost is the default value of Transport's
    43	// MaxIdleConnsPerHost.
    44	const DefaultMaxIdleConnsPerHost = 2
    45	
    46	// Transport is an implementation of RoundTripper that supports http,
    47	// https, and http proxies (for either http or https with CONNECT).
    48	// Transport can also cache connections for future re-use.
    49	type Transport struct {
    50		idleMu      sync.Mutex
    51		idleConn    map[connectMethodKey][]*persistConn
    52		idleConnCh  map[connectMethodKey]chan *persistConn
    53		reqMu       sync.Mutex
    54		reqCanceler map[*Request]func()
    55		altMu       sync.RWMutex
    56		altProto    map[string]RoundTripper // nil or map of URI scheme => RoundTripper
    57	
    58		// Proxy specifies a function to return a proxy for a given
    59		// Request. If the function returns a non-nil error, the
    60		// request is aborted with the provided error.
    61		// If Proxy is nil or returns a nil *URL, no proxy is used.
    62		Proxy func(*Request) (*url.URL, error)
    63	
    64		// Dial specifies the dial function for creating TCP
    65		// connections.
    66		// If Dial is nil, net.Dial is used.
    67		Dial func(network, addr string) (net.Conn, error)
    68	
    69		// TLSClientConfig specifies the TLS configuration to use with
    70		// tls.Client. If nil, the default configuration is used.
    71		TLSClientConfig *tls.Config
    72	
    73		// TLSHandshakeTimeout specifies the maximum amount of time waiting to
    74		// wait for a TLS handshake. Zero means no timeout.
    75		TLSHandshakeTimeout time.Duration
    76	
    77		// DisableKeepAlives, if true, prevents re-use of TCP connections
    78		// between different HTTP requests.
    79		DisableKeepAlives bool
    80	
    81		// DisableCompression, if true, prevents the Transport from
    82		// requesting compression with an "Accept-Encoding: gzip"
    83		// request header when the Request contains no existing
    84		// Accept-Encoding value. If the Transport requests gzip on
    85		// its own and gets a gzipped response, it's transparently
    86		// decoded in the Response.Body. However, if the user
    87		// explicitly requested gzip it is not automatically
    88		// uncompressed.
    89		DisableCompression bool
    90	
    91		// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
    92		// (keep-alive) to keep per-host.  If zero,
    93		// DefaultMaxIdleConnsPerHost is used.
    94		MaxIdleConnsPerHost int
    95	
    96		// ResponseHeaderTimeout, if non-zero, specifies the amount of
    97		// time to wait for a server's response headers after fully
    98		// writing the request (including its body, if any). This
    99		// time does not include the time to read the response body.
   100		ResponseHeaderTimeout time.Duration
   101	
   102		// TODO: tunable on global max cached connections
   103		// TODO: tunable on timeout on cached connections
   104	}
   105	
   106	// ProxyFromEnvironment returns the URL of the proxy to use for a
   107	// given request, as indicated by the environment variables
   108	// $HTTP_PROXY and $NO_PROXY (or $http_proxy and $no_proxy).
   109	// An error is returned if the proxy environment is invalid.
   110	// A nil URL and nil error are returned if no proxy is defined in the
   111	// environment, or a proxy should not be used for the given request.
   112	//
   113	// As a special case, if req.URL.Host is "localhost" (with or without
   114	// a port number), then a nil URL and nil error will be returned.
   115	func ProxyFromEnvironment(req *Request) (*url.URL, error) {
   116		proxy := httpProxyEnv.Get()
   117		if proxy == "" {
   118			return nil, nil
   119		}
   120		if !useProxy(canonicalAddr(req.URL)) {
   121			return nil, nil
   122		}
   123		proxyURL, err := url.Parse(proxy)
   124		if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") {
   125			// proxy was bogus. Try prepending "http://" to it and
   126			// see if that parses correctly. If not, we fall
   127			// through and complain about the original one.
   128			if proxyURL, err := url.Parse("http://" + proxy); err == nil {
   129				return proxyURL, nil
   130			}
   131		}
   132		if err != nil {
   133			return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
   134		}
   135		return proxyURL, nil
   136	}
   137	
   138	// ProxyURL returns a proxy function (for use in a Transport)
   139	// that always returns the same URL.
   140	func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
   141		return func(*Request) (*url.URL, error) {
   142			return fixedURL, nil
   143		}
   144	}
   145	
   146	// transportRequest is a wrapper around a *Request that adds
   147	// optional extra headers to write.
   148	type transportRequest struct {
   149		*Request        // original request, not to be mutated
   150		extra    Header // extra headers to write, or nil
   151	}
   152	
   153	func (tr *transportRequest) extraHeaders() Header {
   154		if tr.extra == nil {
   155			tr.extra = make(Header)
   156		}
   157		return tr.extra
   158	}
   159	
   160	// RoundTrip implements the RoundTripper interface.
   161	//
   162	// For higher-level HTTP client support (such as handling of cookies
   163	// and redirects), see Get, Post, and the Client type.
   164	func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) {
   165		if req.URL == nil {
   166			req.closeBody()
   167			return nil, errors.New("http: nil Request.URL")
   168		}
   169		if req.Header == nil {
   170			req.closeBody()
   171			return nil, errors.New("http: nil Request.Header")
   172		}
   173		if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
   174			t.altMu.RLock()
   175			var rt RoundTripper
   176			if t.altProto != nil {
   177				rt = t.altProto[req.URL.Scheme]
   178			}
   179			t.altMu.RUnlock()
   180			if rt == nil {
   181				req.closeBody()
   182				return nil, &badStringError{"unsupported protocol scheme", req.URL.Scheme}
   183			}
   184			return rt.RoundTrip(req)
   185		}
   186		if req.URL.Host == "" {
   187			req.closeBody()
   188			return nil, errors.New("http: no Host in request URL")
   189		}
   190		treq := &transportRequest{Request: req}
   191		cm, err := t.connectMethodForRequest(treq)
   192		if err != nil {
   193			req.closeBody()
   194			return nil, err
   195		}
   196	
   197		// Get the cached or newly-created connection to either the
   198		// host (for http or https), the http proxy, or the http proxy
   199		// pre-CONNECTed to https server.  In any case, we'll be ready
   200		// to send it requests.
   201		pconn, err := t.getConn(req, cm)
   202		if err != nil {
   203			t.setReqCanceler(req, nil)
   204			req.closeBody()
   205			return nil, err
   206		}
   207	
   208		return pconn.roundTrip(treq)
   209	}
   210	
   211	// RegisterProtocol registers a new protocol with scheme.
   212	// The Transport will pass requests using the given scheme to rt.
   213	// It is rt's responsibility to simulate HTTP request semantics.
   214	//
   215	// RegisterProtocol can be used by other packages to provide
   216	// implementations of protocol schemes like "ftp" or "file".
   217	func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
   218		if scheme == "http" || scheme == "https" {
   219			panic("protocol " + scheme + " already registered")
   220		}
   221		t.altMu.Lock()
   222		defer t.altMu.Unlock()
   223		if t.altProto == nil {
   224			t.altProto = make(map[string]RoundTripper)
   225		}
   226		if _, exists := t.altProto[scheme]; exists {
   227			panic("protocol " + scheme + " already registered")
   228		}
   229		t.altProto[scheme] = rt
   230	}
   231	
   232	// CloseIdleConnections closes any connections which were previously
   233	// connected from previous requests but are now sitting idle in
   234	// a "keep-alive" state. It does not interrupt any connections currently
   235	// in use.
   236	func (t *Transport) CloseIdleConnections() {
   237		t.idleMu.Lock()
   238		m := t.idleConn
   239		t.idleConn = nil
   240		t.idleConnCh = nil
   241		t.idleMu.Unlock()
   242		for _, conns := range m {
   243			for _, pconn := range conns {
   244				pconn.close()
   245			}
   246		}
   247	}
   248	
   249	// CancelRequest cancels an in-flight request by closing its
   250	// connection.
   251	func (t *Transport) CancelRequest(req *Request) {
   252		t.reqMu.Lock()
   253		cancel := t.reqCanceler[req]
   254		t.reqMu.Unlock()
   255		if cancel != nil {
   256			cancel()
   257		}
   258	}
   259	
   260	//
   261	// Private implementation past this point.
   262	//
   263	
   264	var (
   265		httpProxyEnv = &envOnce{
   266			names: []string{"HTTP_PROXY", "http_proxy"},
   267		}
   268		noProxyEnv = &envOnce{
   269			names: []string{"NO_PROXY", "no_proxy"},
   270		}
   271	)
   272	
   273	// envOnce looks up an environment variable (optionally by multiple
   274	// names) once. It mitigates expensive lookups on some platforms
   275	// (e.g. Windows).
   276	type envOnce struct {
   277		names []string
   278		once  sync.Once
   279		val   string
   280	}
   281	
   282	func (e *envOnce) Get() string {
   283		e.once.Do(e.init)
   284		return e.val
   285	}
   286	
   287	func (e *envOnce) init() {
   288		for _, n := range e.names {
   289			e.val = os.Getenv(n)
   290			if e.val != "" {
   291				return
   292			}
   293		}
   294	}
   295	
   296	// reset is used by tests
   297	func (e *envOnce) reset() {
   298		e.once = sync.Once{}
   299		e.val = ""
   300	}
   301	
   302	func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
   303		cm.targetScheme = treq.URL.Scheme
   304		cm.targetAddr = canonicalAddr(treq.URL)
   305		if t.Proxy != nil {
   306			cm.proxyURL, err = t.Proxy(treq.Request)
   307		}
   308		return cm, nil
   309	}
   310	
   311	// proxyAuth returns the Proxy-Authorization header to set
   312	// on requests, if applicable.
   313	func (cm *connectMethod) proxyAuth() string {
   314		if cm.proxyURL == nil {
   315			return ""
   316		}
   317		if u := cm.proxyURL.User; u != nil {
   318			username := u.Username()
   319			password, _ := u.Password()
   320			return "Basic " + basicAuth(username, password)
   321		}
   322		return ""
   323	}
   324	
   325	// putIdleConn adds pconn to the list of idle persistent connections awaiting
   326	// a new request.
   327	// If pconn is no longer needed or not in a good state, putIdleConn
   328	// returns false.
   329	func (t *Transport) putIdleConn(pconn *persistConn) bool {
   330		if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
   331			pconn.close()
   332			return false
   333		}
   334		if pconn.isBroken() {
   335			return false
   336		}
   337		key := pconn.cacheKey
   338		max := t.MaxIdleConnsPerHost
   339		if max == 0 {
   340			max = DefaultMaxIdleConnsPerHost
   341		}
   342		t.idleMu.Lock()
   343	
   344		waitingDialer := t.idleConnCh[key]
   345		select {
   346		case waitingDialer <- pconn:
   347			// We're done with this pconn and somebody else is
   348			// currently waiting for a conn of this type (they're
   349			// actively dialing, but this conn is ready
   350			// first). Chrome calls this socket late binding.  See
   351			// https://insouciant.org/tech/connection-management-in-chromium/
   352			t.idleMu.Unlock()
   353			return true
   354		default:
   355			if waitingDialer != nil {
   356				// They had populated this, but their dial won
   357				// first, so we can clean up this map entry.
   358				delete(t.idleConnCh, key)
   359			}
   360		}
   361		if t.idleConn == nil {
   362			t.idleConn = make(map[connectMethodKey][]*persistConn)
   363		}
   364		if len(t.idleConn[key]) >= max {
   365			t.idleMu.Unlock()
   366			pconn.close()
   367			return false
   368		}
   369		for _, exist := range t.idleConn[key] {
   370			if exist == pconn {
   371				log.Fatalf("dup idle pconn %p in freelist", pconn)
   372			}
   373		}
   374		t.idleConn[key] = append(t.idleConn[key], pconn)
   375		t.idleMu.Unlock()
   376		return true
   377	}
   378	
   379	// getIdleConnCh returns a channel to receive and return idle
   380	// persistent connection for the given connectMethod.
   381	// It may return nil, if persistent connections are not being used.
   382	func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn {
   383		if t.DisableKeepAlives {
   384			return nil
   385		}
   386		key := cm.key()
   387		t.idleMu.Lock()
   388		defer t.idleMu.Unlock()
   389		if t.idleConnCh == nil {
   390			t.idleConnCh = make(map[connectMethodKey]chan *persistConn)
   391		}
   392		ch, ok := t.idleConnCh[key]
   393		if !ok {
   394			ch = make(chan *persistConn)
   395			t.idleConnCh[key] = ch
   396		}
   397		return ch
   398	}
   399	
   400	func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn) {
   401		key := cm.key()
   402		t.idleMu.Lock()
   403		defer t.idleMu.Unlock()
   404		if t.idleConn == nil {
   405			return nil
   406		}
   407		for {
   408			pconns, ok := t.idleConn[key]
   409			if !ok {
   410				return nil
   411			}
   412			if len(pconns) == 1 {
   413				pconn = pconns[0]
   414				delete(t.idleConn, key)
   415			} else {
   416				// 2 or more cached connections; pop last
   417				// TODO: queue?
   418				pconn = pconns[len(pconns)-1]
   419				t.idleConn[key] = pconns[:len(pconns)-1]
   420			}
   421			if !pconn.isBroken() {
   422				return
   423			}
   424		}
   425	}
   426	
   427	func (t *Transport) setReqCanceler(r *Request, fn func()) {
   428		t.reqMu.Lock()
   429		defer t.reqMu.Unlock()
   430		if t.reqCanceler == nil {
   431			t.reqCanceler = make(map[*Request]func())
   432		}
   433		if fn != nil {
   434			t.reqCanceler[r] = fn
   435		} else {
   436			delete(t.reqCanceler, r)
   437		}
   438	}
   439	
   440	func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
   441		if t.Dial != nil {
   442			return t.Dial(network, addr)
   443		}
   444		return net.Dial(network, addr)
   445	}
   446	
   447	// getConn dials and creates a new persistConn to the target as
   448	// specified in the connectMethod.  This includes doing a proxy CONNECT
   449	// and/or setting up TLS.  If this doesn't return an error, the persistConn
   450	// is ready to write requests to.
   451	func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error) {
   452		if pc := t.getIdleConn(cm); pc != nil {
   453			return pc, nil
   454		}
   455	
   456		type dialRes struct {
   457			pc  *persistConn
   458			err error
   459		}
   460		dialc := make(chan dialRes)
   461	
   462		handlePendingDial := func() {
   463			if v := <-dialc; v.err == nil {
   464				t.putIdleConn(v.pc)
   465			}
   466		}
   467	
   468		cancelc := make(chan struct{})
   469		t.setReqCanceler(req, func() { close(cancelc) })
   470	
   471		go func() {
   472			pc, err := t.dialConn(cm)
   473			dialc <- dialRes{pc, err}
   474		}()
   475	
   476		idleConnCh := t.getIdleConnCh(cm)
   477		select {
   478		case v := <-dialc:
   479			// Our dial finished.
   480			return v.pc, v.err
   481		case pc := <-idleConnCh:
   482			// Another request finished first and its net.Conn
   483			// became available before our dial. Or somebody
   484			// else's dial that they didn't use.
   485			// But our dial is still going, so give it away
   486			// when it finishes:
   487			go handlePendingDial()
   488			return pc, nil
   489		case <-cancelc:
   490			go handlePendingDial()
   491			return nil, errors.New("net/http: request canceled while waiting for connection")
   492		}
   493	}
   494	
   495	func (t *Transport) dialConn(cm connectMethod) (*persistConn, error) {
   496		conn, err := t.dial("tcp", cm.addr())
   497		if err != nil {
   498			if cm.proxyURL != nil {
   499				err = fmt.Errorf("http: error connecting to proxy %s: %v", cm.proxyURL, err)
   500			}
   501			return nil, err
   502		}
   503	
   504		pa := cm.proxyAuth()
   505	
   506		pconn := &persistConn{
   507			t:          t,
   508			cacheKey:   cm.key(),
   509			conn:       conn,
   510			reqch:      make(chan requestAndChan, 1),
   511			writech:    make(chan writeRequest, 1),
   512			closech:    make(chan struct{}),
   513			writeErrCh: make(chan error, 1),
   514		}
   515	
   516		switch {
   517		case cm.proxyURL == nil:
   518			// Do nothing.
   519		case cm.targetScheme == "http":
   520			pconn.isProxy = true
   521			if pa != "" {
   522				pconn.mutateHeaderFunc = func(h Header) {
   523					h.Set("Proxy-Authorization", pa)
   524				}
   525			}
   526		case cm.targetScheme == "https":
   527			connectReq := &Request{
   528				Method: "CONNECT",
   529				URL:    &url.URL{Opaque: cm.targetAddr},
   530				Host:   cm.targetAddr,
   531				Header: make(Header),
   532			}
   533			if pa != "" {
   534				connectReq.Header.Set("Proxy-Authorization", pa)
   535			}
   536			connectReq.Write(conn)
   537	
   538			// Read response.
   539			// Okay to use and discard buffered reader here, because
   540			// TLS server will not speak until spoken to.
   541			br := bufio.NewReader(conn)
   542			resp, err := ReadResponse(br, connectReq)
   543			if err != nil {
   544				conn.Close()
   545				return nil, err
   546			}
   547			if resp.StatusCode != 200 {
   548				f := strings.SplitN(resp.Status, " ", 2)
   549				conn.Close()
   550				return nil, errors.New(f[1])
   551			}
   552		}
   553	
   554		if cm.targetScheme == "https" {
   555			// Initiate TLS and check remote host name against certificate.
   556			cfg := t.TLSClientConfig
   557			if cfg == nil || cfg.ServerName == "" {
   558				host := cm.tlsHost()
   559				if cfg == nil {
   560					cfg = &tls.Config{ServerName: host}
   561				} else {
   562					clone := *cfg // shallow clone
   563					clone.ServerName = host
   564					cfg = &clone
   565				}
   566			}
   567			plainConn := conn
   568			tlsConn := tls.Client(plainConn, cfg)
   569			errc := make(chan error, 2)
   570			var timer *time.Timer // for canceling TLS handshake
   571			if d := t.TLSHandshakeTimeout; d != 0 {
   572				timer = time.AfterFunc(d, func() {
   573					errc <- tlsHandshakeTimeoutError{}
   574				})
   575			}
   576			go func() {
   577				err := tlsConn.Handshake()
   578				if timer != nil {
   579					timer.Stop()
   580				}
   581				errc <- err
   582			}()
   583			if err := <-errc; err != nil {
   584				plainConn.Close()
   585				return nil, err
   586			}
   587			if !cfg.InsecureSkipVerify {
   588				if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil {
   589					plainConn.Close()
   590					return nil, err
   591				}
   592			}
   593			cs := tlsConn.ConnectionState()
   594			pconn.tlsState = &cs
   595			pconn.conn = tlsConn
   596		}
   597	
   598		pconn.br = bufio.NewReader(noteEOFReader{pconn.conn, &pconn.sawEOF})
   599		pconn.bw = bufio.NewWriter(pconn.conn)
   600		go pconn.readLoop()
   601		go pconn.writeLoop()
   602		return pconn, nil
   603	}
   604	
   605	// useProxy returns true if requests to addr should use a proxy,
   606	// according to the NO_PROXY or no_proxy environment variable.
   607	// addr is always a canonicalAddr with a host and port.
   608	func useProxy(addr string) bool {
   609		if len(addr) == 0 {
   610			return true
   611		}
   612		host, _, err := net.SplitHostPort(addr)
   613		if err != nil {
   614			return false
   615		}
   616		if host == "localhost" {
   617			return false
   618		}
   619		if ip := net.ParseIP(host); ip != nil {
   620			if ip.IsLoopback() {
   621				return false
   622			}
   623		}
   624	
   625		no_proxy := noProxyEnv.Get()
   626		if no_proxy == "*" {
   627			return false
   628		}
   629	
   630		addr = strings.ToLower(strings.TrimSpace(addr))
   631		if hasPort(addr) {
   632			addr = addr[:strings.LastIndex(addr, ":")]
   633		}
   634	
   635		for _, p := range strings.Split(no_proxy, ",") {
   636			p = strings.ToLower(strings.TrimSpace(p))
   637			if len(p) == 0 {
   638				continue
   639			}
   640			if hasPort(p) {
   641				p = p[:strings.LastIndex(p, ":")]
   642			}
   643			if addr == p {
   644				return false
   645			}
   646			if p[0] == '.' && (strings.HasSuffix(addr, p) || addr == p[1:]) {
   647				// no_proxy ".foo.com" matches "bar.foo.com" or "foo.com"
   648				return false
   649			}
   650			if p[0] != '.' && strings.HasSuffix(addr, p) && addr[len(addr)-len(p)-1] == '.' {
   651				// no_proxy "foo.com" matches "bar.foo.com"
   652				return false
   653			}
   654		}
   655		return true
   656	}
   657	
   658	// connectMethod is the map key (in its String form) for keeping persistent
   659	// TCP connections alive for subsequent HTTP requests.
   660	//
   661	// A connect method may be of the following types:
   662	//
   663	// Cache key form                Description
   664	// -----------------             -------------------------
   665	// |http|foo.com                 http directly to server, no proxy
   666	// |https|foo.com                https directly to server, no proxy
   667	// http://proxy.com|https|foo.com  http to proxy, then CONNECT to foo.com
   668	// http://proxy.com|http           http to proxy, http to anywhere after that
   669	//
   670	// Note: no support to https to the proxy yet.
   671	//
   672	type connectMethod struct {
   673		proxyURL     *url.URL // nil for no proxy, else full proxy URL
   674		targetScheme string   // "http" or "https"
   675		targetAddr   string   // Not used if proxy + http targetScheme (4th example in table)
   676	}
   677	
   678	func (cm *connectMethod) key() connectMethodKey {
   679		proxyStr := ""
   680		targetAddr := cm.targetAddr
   681		if cm.proxyURL != nil {
   682			proxyStr = cm.proxyURL.String()
   683			if cm.targetScheme == "http" {
   684				targetAddr = ""
   685			}
   686		}
   687		return connectMethodKey{
   688			proxy:  proxyStr,
   689			scheme: cm.targetScheme,
   690			addr:   targetAddr,
   691		}
   692	}
   693	
   694	// addr returns the first hop "host:port" to which we need to TCP connect.
   695	func (cm *connectMethod) addr() string {
   696		if cm.proxyURL != nil {
   697			return canonicalAddr(cm.proxyURL)
   698		}
   699		return cm.targetAddr
   700	}
   701	
   702	// tlsHost returns the host name to match against the peer's
   703	// TLS certificate.
   704	func (cm *connectMethod) tlsHost() string {
   705		h := cm.targetAddr
   706		if hasPort(h) {
   707			h = h[:strings.LastIndex(h, ":")]
   708		}
   709		return h
   710	}
   711	
   712	// connectMethodKey is the map key version of connectMethod, with a
   713	// stringified proxy URL (or the empty string) instead of a pointer to
   714	// a URL.
   715	type connectMethodKey struct {
   716		proxy, scheme, addr string
   717	}
   718	
   719	func (k connectMethodKey) String() string {
   720		// Only used by tests.
   721		return fmt.Sprintf("%s|%s|%s", k.proxy, k.scheme, k.addr)
   722	}
   723	
   724	// persistConn wraps a connection, usually a persistent one
   725	// (but may be used for non-keep-alive requests as well)
   726	type persistConn struct {
   727		t        *Transport
   728		cacheKey connectMethodKey
   729		conn     net.Conn
   730		tlsState *tls.ConnectionState
   731		br       *bufio.Reader       // from conn
   732		sawEOF   bool                // whether we've seen EOF from conn; owned by readLoop
   733		bw       *bufio.Writer       // to conn
   734		reqch    chan requestAndChan // written by roundTrip; read by readLoop
   735		writech  chan writeRequest   // written by roundTrip; read by writeLoop
   736		closech  chan struct{}       // closed when conn closed
   737		isProxy  bool
   738		// writeErrCh passes the request write error (usually nil)
   739		// from the writeLoop goroutine to the readLoop which passes
   740		// it off to the res.Body reader, which then uses it to decide
   741		// whether or not a connection can be reused. Issue 7569.
   742		writeErrCh chan error
   743	
   744		lk                   sync.Mutex // guards following fields
   745		numExpectedResponses int
   746		closed               bool // whether conn has been closed
   747		broken               bool // an error has happened on this connection; marked broken so it's not reused.
   748		// mutateHeaderFunc is an optional func to modify extra
   749		// headers on each outbound request before it's written. (the
   750		// original Request given to RoundTrip is not modified)
   751		mutateHeaderFunc func(Header)
   752	}
   753	
   754	// isBroken reports whether this connection is in a known broken state.
   755	func (pc *persistConn) isBroken() bool {
   756		pc.lk.Lock()
   757		b := pc.broken
   758		pc.lk.Unlock()
   759		return b
   760	}
   761	
   762	func (pc *persistConn) cancelRequest() {
   763		pc.conn.Close()
   764	}
   765	
   766	var remoteSideClosedFunc func(error) bool // or nil to use default
   767	
   768	func remoteSideClosed(err error) bool {
   769		if err == io.EOF {
   770			return true
   771		}
   772		if remoteSideClosedFunc != nil {
   773			return remoteSideClosedFunc(err)
   774		}
   775		return false
   776	}
   777	
   778	func (pc *persistConn) readLoop() {
   779		alive := true
   780	
   781		for alive {
   782			pb, err := pc.br.Peek(1)
   783	
   784			pc.lk.Lock()
   785			if pc.numExpectedResponses == 0 {
   786				if !pc.closed {
   787					pc.closeLocked()
   788					if len(pb) > 0 {
   789						log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v",
   790							string(pb), err)
   791					}
   792				}
   793				pc.lk.Unlock()
   794				return
   795			}
   796			pc.lk.Unlock()
   797	
   798			rc := <-pc.reqch
   799	
   800			var resp *Response
   801			if err == nil {
   802				resp, err = ReadResponse(pc.br, rc.req)
   803				if err == nil && resp.StatusCode == 100 {
   804					// Skip any 100-continue for now.
   805					// TODO(bradfitz): if rc.req had "Expect: 100-continue",
   806					// actually block the request body write and signal the
   807					// writeLoop now to begin sending it. (Issue 2184) For now we
   808					// eat it, since we're never expecting one.
   809					resp, err = ReadResponse(pc.br, rc.req)
   810				}
   811			}
   812	
   813			if resp != nil {
   814				resp.TLS = pc.tlsState
   815			}
   816	
   817			hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0
   818	
   819			if err != nil {
   820				pc.close()
   821			} else {
   822				if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" {
   823					resp.Header.Del("Content-Encoding")
   824					resp.Header.Del("Content-Length")
   825					resp.ContentLength = -1
   826					resp.Body = &gzipReader{body: resp.Body}
   827				}
   828				resp.Body = &bodyEOFSignal{body: resp.Body}
   829			}
   830	
   831			if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 {
   832				// Don't do keep-alive on error if either party requested a close
   833				// or we get an unexpected informational (1xx) response.
   834				// StatusCode 100 is already handled above.
   835				alive = false
   836			}
   837	
   838			var waitForBodyRead chan bool
   839			if hasBody {
   840				waitForBodyRead = make(chan bool, 2)
   841				resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
   842					// Sending false here sets alive to
   843					// false and closes the connection
   844					// below.
   845					waitForBodyRead <- false
   846					return nil
   847				}
   848				resp.Body.(*bodyEOFSignal).fn = func(err error) {
   849					waitForBodyRead <- alive &&
   850						err == nil &&
   851						!pc.sawEOF &&
   852						pc.wroteRequest() &&
   853						pc.t.putIdleConn(pc)
   854				}
   855			}
   856	
   857			if alive && !hasBody {
   858				alive = !pc.sawEOF &&
   859					pc.wroteRequest() &&
   860					pc.t.putIdleConn(pc)
   861			}
   862	
   863			rc.ch <- responseAndError{resp, err}
   864	
   865			// Wait for the just-returned response body to be fully consumed
   866			// before we race and peek on the underlying bufio reader.
   867			if waitForBodyRead != nil {
   868				select {
   869				case alive = <-waitForBodyRead:
   870				case <-pc.closech:
   871					alive = false
   872				}
   873			}
   874	
   875			pc.t.setReqCanceler(rc.req, nil)
   876	
   877			if !alive {
   878				pc.close()
   879			}
   880		}
   881	}
   882	
   883	func (pc *persistConn) writeLoop() {
   884		for {
   885			select {
   886			case wr := <-pc.writech:
   887				if pc.isBroken() {
   888					wr.ch <- errors.New("http: can't write HTTP request on broken connection")
   889					continue
   890				}
   891				err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra)
   892				if err == nil {
   893					err = pc.bw.Flush()
   894				}
   895				if err != nil {
   896					pc.markBroken()
   897					wr.req.Request.closeBody()
   898				}
   899				pc.writeErrCh <- err // to the body reader, which might recycle us
   900				wr.ch <- err         // to the roundTrip function
   901			case <-pc.closech:
   902				return
   903			}
   904		}
   905	}
   906	
   907	// wroteRequest is a check before recycling a connection that the previous write
   908	// (from writeLoop above) happened and was successful.
   909	func (pc *persistConn) wroteRequest() bool {
   910		select {
   911		case err := <-pc.writeErrCh:
   912			// Common case: the write happened well before the response, so
   913			// avoid creating a timer.
   914			return err == nil
   915		default:
   916			// Rare case: the request was written in writeLoop above but
   917			// before it could send to pc.writeErrCh, the reader read it
   918			// all, processed it, and called us here. In this case, give the
   919			// write goroutine a bit of time to finish its send.
   920			//
   921			// Less rare case: We also get here in the legitimate case of
   922			// Issue 7569, where the writer is still writing (or stalled),
   923			// but the server has already replied. In this case, we don't
   924			// want to wait too long, and we want to return false so this
   925			// connection isn't re-used.
   926			select {
   927			case err := <-pc.writeErrCh:
   928				return err == nil
   929			case <-time.After(50 * time.Millisecond):
   930				return false
   931			}
   932		}
   933	}
   934	
   935	type responseAndError struct {
   936		res *Response
   937		err error
   938	}
   939	
   940	type requestAndChan struct {
   941		req *Request
   942		ch  chan responseAndError
   943	
   944		// did the Transport (as opposed to the client code) add an
   945		// Accept-Encoding gzip header? only if it we set it do
   946		// we transparently decode the gzip.
   947		addedGzip bool
   948	}
   949	
   950	// A writeRequest is sent by the readLoop's goroutine to the
   951	// writeLoop's goroutine to write a request while the read loop
   952	// concurrently waits on both the write response and the server's
   953	// reply.
   954	type writeRequest struct {
   955		req *transportRequest
   956		ch  chan<- error
   957	}
   958	
   959	type httpError struct {
   960		err     string
   961		timeout bool
   962	}
   963	
   964	func (e *httpError) Error() string   { return e.err }
   965	func (e *httpError) Timeout() bool   { return e.timeout }
   966	func (e *httpError) Temporary() bool { return true }
   967	
   968	var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true}
   969	var errClosed error = &httpError{err: "net/http: transport closed before response was received"}
   970	
   971	func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
   972		pc.t.setReqCanceler(req.Request, pc.cancelRequest)
   973		pc.lk.Lock()
   974		pc.numExpectedResponses++
   975		headerFn := pc.mutateHeaderFunc
   976		pc.lk.Unlock()
   977	
   978		if headerFn != nil {
   979			headerFn(req.extraHeaders())
   980		}
   981	
   982		// Ask for a compressed version if the caller didn't set their
   983		// own value for Accept-Encoding. We only attempted to
   984		// uncompress the gzip stream if we were the layer that
   985		// requested it.
   986		requestedGzip := false
   987		if !pc.t.DisableCompression && req.Header.Get("Accept-Encoding") == "" && req.Method != "HEAD" {
   988			// Request gzip only, not deflate. Deflate is ambiguous and
   989			// not as universally supported anyway.
   990			// See: http://www.gzip.org/zlib/zlib_faq.html#faq38
   991			//
   992			// Note that we don't request this for HEAD requests,
   993			// due to a bug in nginx:
   994			//   http://trac.nginx.org/nginx/ticket/358
   995			//   http://golang.org/issue/5522
   996			requestedGzip = true
   997			req.extraHeaders().Set("Accept-Encoding", "gzip")
   998		}
   999	
  1000		// Write the request concurrently with waiting for a response,
  1001		// in case the server decides to reply before reading our full
  1002		// request body.
  1003		writeErrCh := make(chan error, 1)
  1004		pc.writech <- writeRequest{req, writeErrCh}
  1005	
  1006		resc := make(chan responseAndError, 1)
  1007		pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
  1008	
  1009		var re responseAndError
  1010		var pconnDeadCh = pc.closech
  1011		var failTicker <-chan time.Time
  1012		var respHeaderTimer <-chan time.Time
  1013	WaitResponse:
  1014		for {
  1015			select {
  1016			case err := <-writeErrCh:
  1017				if err != nil {
  1018					re = responseAndError{nil, err}
  1019					pc.close()
  1020					break WaitResponse
  1021				}
  1022				if d := pc.t.ResponseHeaderTimeout; d > 0 {
  1023					respHeaderTimer = time.After(d)
  1024				}
  1025			case <-pconnDeadCh:
  1026				// The persist connection is dead. This shouldn't
  1027				// usually happen (only with Connection: close responses
  1028				// with no response bodies), but if it does happen it
  1029				// means either a) the remote server hung up on us
  1030				// prematurely, or b) the readLoop sent us a response &
  1031				// closed its closech at roughly the same time, and we
  1032				// selected this case first, in which case a response
  1033				// might still be coming soon.
  1034				//
  1035				// We can't avoid the select race in b) by using a unbuffered
  1036				// resc channel instead, because then goroutines can
  1037				// leak if we exit due to other errors.
  1038				pconnDeadCh = nil                               // avoid spinning
  1039				failTicker = time.After(100 * time.Millisecond) // arbitrary time to wait for resc
  1040			case <-failTicker:
  1041				re = responseAndError{err: errClosed}
  1042				break WaitResponse
  1043			case <-respHeaderTimer:
  1044				pc.close()
  1045				re = responseAndError{err: errTimeout}
  1046				break WaitResponse
  1047			case re = <-resc:
  1048				break WaitResponse
  1049			}
  1050		}
  1051	
  1052		pc.lk.Lock()
  1053		pc.numExpectedResponses--
  1054		pc.lk.Unlock()
  1055	
  1056		if re.err != nil {
  1057			pc.t.setReqCanceler(req.Request, nil)
  1058		}
  1059		return re.res, re.err
  1060	}
  1061	
  1062	// markBroken marks a connection as broken (so it's not reused).
  1063	// It differs from close in that it doesn't close the underlying
  1064	// connection for use when it's still being read.
  1065	func (pc *persistConn) markBroken() {
  1066		pc.lk.Lock()
  1067		defer pc.lk.Unlock()
  1068		pc.broken = true
  1069	}
  1070	
  1071	func (pc *persistConn) close() {
  1072		pc.lk.Lock()
  1073		defer pc.lk.Unlock()
  1074		pc.closeLocked()
  1075	}
  1076	
  1077	func (pc *persistConn) closeLocked() {
  1078		pc.broken = true
  1079		if !pc.closed {
  1080			pc.conn.Close()
  1081			pc.closed = true
  1082			close(pc.closech)
  1083		}
  1084		pc.mutateHeaderFunc = nil
  1085	}
  1086	
  1087	var portMap = map[string]string{
  1088		"http":  "80",
  1089		"https": "443",
  1090	}
  1091	
  1092	// canonicalAddr returns url.Host but always with a ":port" suffix
  1093	func canonicalAddr(url *url.URL) string {
  1094		addr := url.Host
  1095		if !hasPort(addr) {
  1096			return addr + ":" + portMap[url.Scheme]
  1097		}
  1098		return addr
  1099	}
  1100	
  1101	// bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
  1102	// once, right before its final (error-producing) Read or Close call
  1103	// returns. If earlyCloseFn is non-nil and Close is called before
  1104	// io.EOF is seen, earlyCloseFn is called instead of fn, and its
  1105	// return value is the return value from Close.
  1106	type bodyEOFSignal struct {
  1107		body         io.ReadCloser
  1108		mu           sync.Mutex   // guards following 4 fields
  1109		closed       bool         // whether Close has been called
  1110		rerr         error        // sticky Read error
  1111		fn           func(error)  // error will be nil on Read io.EOF
  1112		earlyCloseFn func() error // optional alt Close func used if io.EOF not seen
  1113	}
  1114	
  1115	func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
  1116		es.mu.Lock()
  1117		closed, rerr := es.closed, es.rerr
  1118		es.mu.Unlock()
  1119		if closed {
  1120			return 0, errors.New("http: read on closed response body")
  1121		}
  1122		if rerr != nil {
  1123			return 0, rerr
  1124		}
  1125	
  1126		n, err = es.body.Read(p)
  1127		if err != nil {
  1128			es.mu.Lock()
  1129			defer es.mu.Unlock()
  1130			if es.rerr == nil {
  1131				es.rerr = err
  1132			}
  1133			es.condfn(err)
  1134		}
  1135		return
  1136	}
  1137	
  1138	func (es *bodyEOFSignal) Close() error {
  1139		es.mu.Lock()
  1140		defer es.mu.Unlock()
  1141		if es.closed {
  1142			return nil
  1143		}
  1144		es.closed = true
  1145		if es.earlyCloseFn != nil && es.rerr != io.EOF {
  1146			return es.earlyCloseFn()
  1147		}
  1148		err := es.body.Close()
  1149		es.condfn(err)
  1150		return err
  1151	}
  1152	
  1153	// caller must hold es.mu.
  1154	func (es *bodyEOFSignal) condfn(err error) {
  1155		if es.fn == nil {
  1156			return
  1157		}
  1158		if err == io.EOF {
  1159			err = nil
  1160		}
  1161		es.fn(err)
  1162		es.fn = nil
  1163	}
  1164	
  1165	// gzipReader wraps a response body so it can lazily
  1166	// call gzip.NewReader on the first call to Read
  1167	type gzipReader struct {
  1168		body io.ReadCloser // underlying Response.Body
  1169		zr   io.Reader     // lazily-initialized gzip reader
  1170	}
  1171	
  1172	func (gz *gzipReader) Read(p []byte) (n int, err error) {
  1173		if gz.zr == nil {
  1174			gz.zr, err = gzip.NewReader(gz.body)
  1175			if err != nil {
  1176				return 0, err
  1177			}
  1178		}
  1179		return gz.zr.Read(p)
  1180	}
  1181	
  1182	func (gz *gzipReader) Close() error {
  1183		return gz.body.Close()
  1184	}
  1185	
  1186	type readerAndCloser struct {
  1187		io.Reader
  1188		io.Closer
  1189	}
  1190	
  1191	type tlsHandshakeTimeoutError struct{}
  1192	
  1193	func (tlsHandshakeTimeoutError) Timeout() bool   { return true }
  1194	func (tlsHandshakeTimeoutError) Temporary() bool { return true }
  1195	func (tlsHandshakeTimeoutError) Error() string   { return "net/http: TLS handshake timeout" }
  1196	
  1197	type noteEOFReader struct {
  1198		r      io.Reader
  1199		sawEOF *bool
  1200	}
  1201	
  1202	func (nr noteEOFReader) Read(p []byte) (n int, err error) {
  1203		n, err = nr.r.Read(p)
  1204		if err == io.EOF {
  1205			*nr.sawEOF = true
  1206		}
  1207		return
  1208	}
  1209	

View as plain text