Run Format

Source file src/pkg/net/http/transport.go

     1	// Copyright 2011 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// HTTP client implementation. See RFC 2616.
     6	//
     7	// This is the low-level Transport implementation of RoundTripper.
     8	// The high-level interface is in client.go.
     9	
    10	package http
    11	
    12	import (
    13		"bufio"
    14		"compress/gzip"
    15		"crypto/tls"
    16		"encoding/base64"
    17		"errors"
    18		"fmt"
    19		"io"
    20		"log"
    21		"net"
    22		"net/url"
    23		"os"
    24		"strings"
    25		"sync"
    26		"time"
    27	)
    28	
    29	// DefaultTransport is the default implementation of Transport and is
    30	// used by DefaultClient. It establishes network connections as needed
    31	// and caches them for reuse by subsequent calls. It uses HTTP proxies
    32	// as directed by the $HTTP_PROXY and $NO_PROXY (or $http_proxy and
    33	// $no_proxy) environment variables.
    34	var DefaultTransport RoundTripper = &Transport{Proxy: ProxyFromEnvironment}
    35	
    36	// DefaultMaxIdleConnsPerHost is the default value of Transport's
    37	// MaxIdleConnsPerHost.
    38	const DefaultMaxIdleConnsPerHost = 2
    39	
    40	// Transport is an implementation of RoundTripper that supports http,
    41	// https, and http proxies (for either http or https with CONNECT).
    42	// Transport can also cache connections for future re-use.
    43	type Transport struct {
    44		idleMu     sync.Mutex
    45		idleConn   map[string][]*persistConn
    46		idleConnCh map[string]chan *persistConn
    47		reqMu      sync.Mutex
    48		reqConn    map[*Request]*persistConn
    49		altMu      sync.RWMutex
    50		altProto   map[string]RoundTripper // nil or map of URI scheme => RoundTripper
    51	
    52		// Proxy specifies a function to return a proxy for a given
    53		// Request. If the function returns a non-nil error, the
    54		// request is aborted with the provided error.
    55		// If Proxy is nil or returns a nil *URL, no proxy is used.
    56		Proxy func(*Request) (*url.URL, error)
    57	
    58		// Dial specifies the dial function for creating TCP
    59		// connections.
    60		// If Dial is nil, net.Dial is used.
    61		Dial func(network, addr string) (net.Conn, error)
    62	
    63		// TLSClientConfig specifies the TLS configuration to use with
    64		// tls.Client. If nil, the default configuration is used.
    65		TLSClientConfig *tls.Config
    66	
    67		// DisableKeepAlives, if true, prevents re-use of TCP connections
    68		// between different HTTP requests.
    69		DisableKeepAlives bool
    70	
    71		// DisableCompression, if true, prevents the Transport from
    72		// requesting compression with an "Accept-Encoding: gzip"
    73		// request header when the Request contains no existing
    74		// Accept-Encoding value. If the Transport requests gzip on
    75		// its own and gets a gzipped response, it's transparently
    76		// decoded in the Response.Body. However, if the user
    77		// explicitly requested gzip it is not automatically
    78		// uncompressed.
    79		DisableCompression bool
    80	
    81		// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
    82		// (keep-alive) to keep per-host.  If zero,
    83		// DefaultMaxIdleConnsPerHost is used.
    84		MaxIdleConnsPerHost int
    85	
    86		// ResponseHeaderTimeout, if non-zero, specifies the amount of
    87		// time to wait for a server's response headers after fully
    88		// writing the request (including its body, if any). This
    89		// time does not include the time to read the response body.
    90		ResponseHeaderTimeout time.Duration
    91	
    92		// TODO: tunable on global max cached connections
    93		// TODO: tunable on timeout on cached connections
    94	}
    95	
    96	// ProxyFromEnvironment returns the URL of the proxy to use for a
    97	// given request, as indicated by the environment variables
    98	// $HTTP_PROXY and $NO_PROXY (or $http_proxy and $no_proxy).
    99	// An error is returned if the proxy environment is invalid.
   100	// A nil URL and nil error are returned if no proxy is defined in the
   101	// environment, or a proxy should not be used for the given request.
   102	func ProxyFromEnvironment(req *Request) (*url.URL, error) {
   103		proxy := getenvEitherCase("HTTP_PROXY")
   104		if proxy == "" {
   105			return nil, nil
   106		}
   107		if !useProxy(canonicalAddr(req.URL)) {
   108			return nil, nil
   109		}
   110		proxyURL, err := url.Parse(proxy)
   111		if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") {
   112			if u, err := url.Parse("http://" + proxy); err == nil {
   113				proxyURL = u
   114				err = nil
   115			}
   116		}
   117		if err != nil {
   118			return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
   119		}
   120		return proxyURL, nil
   121	}
   122	
   123	// ProxyURL returns a proxy function (for use in a Transport)
   124	// that always returns the same URL.
   125	func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
   126		return func(*Request) (*url.URL, error) {
   127			return fixedURL, nil
   128		}
   129	}
   130	
   131	// transportRequest is a wrapper around a *Request that adds
   132	// optional extra headers to write.
   133	type transportRequest struct {
   134		*Request        // original request, not to be mutated
   135		extra    Header // extra headers to write, or nil
   136	}
   137	
   138	func (tr *transportRequest) extraHeaders() Header {
   139		if tr.extra == nil {
   140			tr.extra = make(Header)
   141		}
   142		return tr.extra
   143	}
   144	
   145	// RoundTrip implements the RoundTripper interface.
   146	//
   147	// For higher-level HTTP client support (such as handling of cookies
   148	// and redirects), see Get, Post, and the Client type.
   149	func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) {
   150		if req.URL == nil {
   151			return nil, errors.New("http: nil Request.URL")
   152		}
   153		if req.Header == nil {
   154			return nil, errors.New("http: nil Request.Header")
   155		}
   156		if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
   157			t.altMu.RLock()
   158			var rt RoundTripper
   159			if t.altProto != nil {
   160				rt = t.altProto[req.URL.Scheme]
   161			}
   162			t.altMu.RUnlock()
   163			if rt == nil {
   164				return nil, &badStringError{"unsupported protocol scheme", req.URL.Scheme}
   165			}
   166			return rt.RoundTrip(req)
   167		}
   168		if req.URL.Host == "" {
   169			return nil, errors.New("http: no Host in request URL")
   170		}
   171		treq := &transportRequest{Request: req}
   172		cm, err := t.connectMethodForRequest(treq)
   173		if err != nil {
   174			return nil, err
   175		}
   176	
   177		// Get the cached or newly-created connection to either the
   178		// host (for http or https), the http proxy, or the http proxy
   179		// pre-CONNECTed to https server.  In any case, we'll be ready
   180		// to send it requests.
   181		pconn, err := t.getConn(cm)
   182		if err != nil {
   183			return nil, err
   184		}
   185	
   186		return pconn.roundTrip(treq)
   187	}
   188	
   189	// RegisterProtocol registers a new protocol with scheme.
   190	// The Transport will pass requests using the given scheme to rt.
   191	// It is rt's responsibility to simulate HTTP request semantics.
   192	//
   193	// RegisterProtocol can be used by other packages to provide
   194	// implementations of protocol schemes like "ftp" or "file".
   195	func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
   196		if scheme == "http" || scheme == "https" {
   197			panic("protocol " + scheme + " already registered")
   198		}
   199		t.altMu.Lock()
   200		defer t.altMu.Unlock()
   201		if t.altProto == nil {
   202			t.altProto = make(map[string]RoundTripper)
   203		}
   204		if _, exists := t.altProto[scheme]; exists {
   205			panic("protocol " + scheme + " already registered")
   206		}
   207		t.altProto[scheme] = rt
   208	}
   209	
   210	// CloseIdleConnections closes any connections which were previously
   211	// connected from previous requests but are now sitting idle in
   212	// a "keep-alive" state. It does not interrupt any connections currently
   213	// in use.
   214	func (t *Transport) CloseIdleConnections() {
   215		t.idleMu.Lock()
   216		m := t.idleConn
   217		t.idleConn = nil
   218		t.idleMu.Unlock()
   219		if m == nil {
   220			return
   221		}
   222		for _, conns := range m {
   223			for _, pconn := range conns {
   224				pconn.close()
   225			}
   226		}
   227	}
   228	
   229	// CancelRequest cancels an in-flight request by closing its
   230	// connection.
   231	func (t *Transport) CancelRequest(req *Request) {
   232		t.reqMu.Lock()
   233		pc := t.reqConn[req]
   234		t.reqMu.Unlock()
   235		if pc != nil {
   236			pc.conn.Close()
   237		}
   238	}
   239	
   240	//
   241	// Private implementation past this point.
   242	//
   243	
   244	func getenvEitherCase(k string) string {
   245		if v := os.Getenv(strings.ToUpper(k)); v != "" {
   246			return v
   247		}
   248		return os.Getenv(strings.ToLower(k))
   249	}
   250	
   251	func (t *Transport) connectMethodForRequest(treq *transportRequest) (*connectMethod, error) {
   252		cm := &connectMethod{
   253			targetScheme: treq.URL.Scheme,
   254			targetAddr:   canonicalAddr(treq.URL),
   255		}
   256		if t.Proxy != nil {
   257			var err error
   258			cm.proxyURL, err = t.Proxy(treq.Request)
   259			if err != nil {
   260				return nil, err
   261			}
   262		}
   263		return cm, nil
   264	}
   265	
   266	// proxyAuth returns the Proxy-Authorization header to set
   267	// on requests, if applicable.
   268	func (cm *connectMethod) proxyAuth() string {
   269		if cm.proxyURL == nil {
   270			return ""
   271		}
   272		if u := cm.proxyURL.User; u != nil {
   273			return "Basic " + base64.URLEncoding.EncodeToString([]byte(u.String()))
   274		}
   275		return ""
   276	}
   277	
   278	// putIdleConn adds pconn to the list of idle persistent connections awaiting
   279	// a new request.
   280	// If pconn is no longer needed or not in a good state, putIdleConn
   281	// returns false.
   282	func (t *Transport) putIdleConn(pconn *persistConn) bool {
   283		if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
   284			pconn.close()
   285			return false
   286		}
   287		if pconn.isBroken() {
   288			return false
   289		}
   290		key := pconn.cacheKey
   291		max := t.MaxIdleConnsPerHost
   292		if max == 0 {
   293			max = DefaultMaxIdleConnsPerHost
   294		}
   295		t.idleMu.Lock()
   296		select {
   297		case t.idleConnCh[key] <- pconn:
   298			// We're done with this pconn and somebody else is
   299			// currently waiting for a conn of this type (they're
   300			// actively dialing, but this conn is ready
   301			// first). Chrome calls this socket late binding.  See
   302			// https://insouciant.org/tech/connection-management-in-chromium/
   303			t.idleMu.Unlock()
   304			return true
   305		default:
   306		}
   307		if t.idleConn == nil {
   308			t.idleConn = make(map[string][]*persistConn)
   309		}
   310		if len(t.idleConn[key]) >= max {
   311			t.idleMu.Unlock()
   312			pconn.close()
   313			return false
   314		}
   315		for _, exist := range t.idleConn[key] {
   316			if exist == pconn {
   317				log.Fatalf("dup idle pconn %p in freelist", pconn)
   318			}
   319		}
   320		t.idleConn[key] = append(t.idleConn[key], pconn)
   321		t.idleMu.Unlock()
   322		return true
   323	}
   324	
   325	func (t *Transport) getIdleConnCh(cm *connectMethod) chan *persistConn {
   326		key := cm.key()
   327		t.idleMu.Lock()
   328		defer t.idleMu.Unlock()
   329		if t.idleConnCh == nil {
   330			t.idleConnCh = make(map[string]chan *persistConn)
   331		}
   332		ch, ok := t.idleConnCh[key]
   333		if !ok {
   334			ch = make(chan *persistConn)
   335			t.idleConnCh[key] = ch
   336		}
   337		return ch
   338	}
   339	
   340	func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) {
   341		key := cm.key()
   342		t.idleMu.Lock()
   343		defer t.idleMu.Unlock()
   344		if t.idleConn == nil {
   345			return nil
   346		}
   347		for {
   348			pconns, ok := t.idleConn[key]
   349			if !ok {
   350				return nil
   351			}
   352			if len(pconns) == 1 {
   353				pconn = pconns[0]
   354				delete(t.idleConn, key)
   355			} else {
   356				// 2 or more cached connections; pop last
   357				// TODO: queue?
   358				pconn = pconns[len(pconns)-1]
   359				t.idleConn[key] = pconns[0 : len(pconns)-1]
   360			}
   361			if !pconn.isBroken() {
   362				return
   363			}
   364		}
   365	}
   366	
   367	func (t *Transport) setReqConn(r *Request, pc *persistConn) {
   368		t.reqMu.Lock()
   369		defer t.reqMu.Unlock()
   370		if t.reqConn == nil {
   371			t.reqConn = make(map[*Request]*persistConn)
   372		}
   373		if pc != nil {
   374			t.reqConn[r] = pc
   375		} else {
   376			delete(t.reqConn, r)
   377		}
   378	}
   379	
   380	func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
   381		if t.Dial != nil {
   382			return t.Dial(network, addr)
   383		}
   384		return net.Dial(network, addr)
   385	}
   386	
   387	// getConn dials and creates a new persistConn to the target as
   388	// specified in the connectMethod.  This includes doing a proxy CONNECT
   389	// and/or setting up TLS.  If this doesn't return an error, the persistConn
   390	// is ready to write requests to.
   391	func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
   392		if pc := t.getIdleConn(cm); pc != nil {
   393			return pc, nil
   394		}
   395	
   396		type dialRes struct {
   397			pc  *persistConn
   398			err error
   399		}
   400		dialc := make(chan dialRes)
   401		go func() {
   402			pc, err := t.dialConn(cm)
   403			dialc <- dialRes{pc, err}
   404		}()
   405	
   406		idleConnCh := t.getIdleConnCh(cm)
   407		select {
   408		case v := <-dialc:
   409			// Our dial finished.
   410			return v.pc, v.err
   411		case pc := <-idleConnCh:
   412			// Another request finished first and its net.Conn
   413			// became available before our dial. Or somebody
   414			// else's dial that they didn't use.
   415			// But our dial is still going, so give it away
   416			// when it finishes:
   417			go func() {
   418				if v := <-dialc; v.err == nil {
   419					t.putIdleConn(v.pc)
   420				}
   421			}()
   422			return pc, nil
   423		}
   424	}
   425	
   426	func (t *Transport) dialConn(cm *connectMethod) (*persistConn, error) {
   427		conn, err := t.dial("tcp", cm.addr())
   428		if err != nil {
   429			if cm.proxyURL != nil {
   430				err = fmt.Errorf("http: error connecting to proxy %s: %v", cm.proxyURL, err)
   431			}
   432			return nil, err
   433		}
   434	
   435		pa := cm.proxyAuth()
   436	
   437		pconn := &persistConn{
   438			t:        t,
   439			cacheKey: cm.key(),
   440			conn:     conn,
   441			reqch:    make(chan requestAndChan, 50),
   442			writech:  make(chan writeRequest, 50),
   443			closech:  make(chan struct{}),
   444		}
   445	
   446		switch {
   447		case cm.proxyURL == nil:
   448			// Do nothing.
   449		case cm.targetScheme == "http":
   450			pconn.isProxy = true
   451			if pa != "" {
   452				pconn.mutateHeaderFunc = func(h Header) {
   453					h.Set("Proxy-Authorization", pa)
   454				}
   455			}
   456		case cm.targetScheme == "https":
   457			connectReq := &Request{
   458				Method: "CONNECT",
   459				URL:    &url.URL{Opaque: cm.targetAddr},
   460				Host:   cm.targetAddr,
   461				Header: make(Header),
   462			}
   463			if pa != "" {
   464				connectReq.Header.Set("Proxy-Authorization", pa)
   465			}
   466			connectReq.Write(conn)
   467	
   468			// Read response.
   469			// Okay to use and discard buffered reader here, because
   470			// TLS server will not speak until spoken to.
   471			br := bufio.NewReader(conn)
   472			resp, err := ReadResponse(br, connectReq)
   473			if err != nil {
   474				conn.Close()
   475				return nil, err
   476			}
   477			if resp.StatusCode != 200 {
   478				f := strings.SplitN(resp.Status, " ", 2)
   479				conn.Close()
   480				return nil, errors.New(f[1])
   481			}
   482		}
   483	
   484		if cm.targetScheme == "https" {
   485			// Initiate TLS and check remote host name against certificate.
   486			cfg := t.TLSClientConfig
   487			if cfg == nil || cfg.ServerName == "" {
   488				host := cm.tlsHost()
   489				if cfg == nil {
   490					cfg = &tls.Config{ServerName: host}
   491				} else {
   492					clone := *cfg // shallow clone
   493					clone.ServerName = host
   494					cfg = &clone
   495				}
   496			}
   497			conn = tls.Client(conn, cfg)
   498			if err = conn.(*tls.Conn).Handshake(); err != nil {
   499				return nil, err
   500			}
   501			if t.TLSClientConfig == nil || !t.TLSClientConfig.InsecureSkipVerify {
   502				if err = conn.(*tls.Conn).VerifyHostname(cm.tlsHost()); err != nil {
   503					return nil, err
   504				}
   505			}
   506			pconn.conn = conn
   507		}
   508	
   509		pconn.br = bufio.NewReader(pconn.conn)
   510		pconn.bw = bufio.NewWriter(pconn.conn)
   511		go pconn.readLoop()
   512		go pconn.writeLoop()
   513		return pconn, nil
   514	}
   515	
   516	// useProxy returns true if requests to addr should use a proxy,
   517	// according to the NO_PROXY or no_proxy environment variable.
   518	// addr is always a canonicalAddr with a host and port.
   519	func useProxy(addr string) bool {
   520		if len(addr) == 0 {
   521			return true
   522		}
   523		host, _, err := net.SplitHostPort(addr)
   524		if err != nil {
   525			return false
   526		}
   527		if host == "localhost" {
   528			return false
   529		}
   530		if ip := net.ParseIP(host); ip != nil {
   531			if ip.IsLoopback() {
   532				return false
   533			}
   534		}
   535	
   536		no_proxy := getenvEitherCase("NO_PROXY")
   537		if no_proxy == "*" {
   538			return false
   539		}
   540	
   541		addr = strings.ToLower(strings.TrimSpace(addr))
   542		if hasPort(addr) {
   543			addr = addr[:strings.LastIndex(addr, ":")]
   544		}
   545	
   546		for _, p := range strings.Split(no_proxy, ",") {
   547			p = strings.ToLower(strings.TrimSpace(p))
   548			if len(p) == 0 {
   549				continue
   550			}
   551			if hasPort(p) {
   552				p = p[:strings.LastIndex(p, ":")]
   553			}
   554			if addr == p {
   555				return false
   556			}
   557			if p[0] == '.' && (strings.HasSuffix(addr, p) || addr == p[1:]) {
   558				// no_proxy ".foo.com" matches "bar.foo.com" or "foo.com"
   559				return false
   560			}
   561			if p[0] != '.' && strings.HasSuffix(addr, p) && addr[len(addr)-len(p)-1] == '.' {
   562				// no_proxy "foo.com" matches "bar.foo.com"
   563				return false
   564			}
   565		}
   566		return true
   567	}
   568	
   569	// connectMethod is the map key (in its String form) for keeping persistent
   570	// TCP connections alive for subsequent HTTP requests.
   571	//
   572	// A connect method may be of the following types:
   573	//
   574	// Cache key form                Description
   575	// -----------------             -------------------------
   576	// ||http|foo.com                http directly to server, no proxy
   577	// ||https|foo.com               https directly to server, no proxy
   578	// http://proxy.com|https|foo.com  http to proxy, then CONNECT to foo.com
   579	// http://proxy.com|http           http to proxy, http to anywhere after that
   580	//
   581	// Note: no support to https to the proxy yet.
   582	//
   583	type connectMethod struct {
   584		proxyURL     *url.URL // nil for no proxy, else full proxy URL
   585		targetScheme string   // "http" or "https"
   586		targetAddr   string   // Not used if proxy + http targetScheme (4th example in table)
   587	}
   588	
   589	func (ck *connectMethod) key() string {
   590		return ck.String() // TODO: use a struct type instead
   591	}
   592	
   593	func (ck *connectMethod) String() string {
   594		proxyStr := ""
   595		targetAddr := ck.targetAddr
   596		if ck.proxyURL != nil {
   597			proxyStr = ck.proxyURL.String()
   598			if ck.targetScheme == "http" {
   599				targetAddr = ""
   600			}
   601		}
   602		return strings.Join([]string{proxyStr, ck.targetScheme, targetAddr}, "|")
   603	}
   604	
   605	// addr returns the first hop "host:port" to which we need to TCP connect.
   606	func (cm *connectMethod) addr() string {
   607		if cm.proxyURL != nil {
   608			return canonicalAddr(cm.proxyURL)
   609		}
   610		return cm.targetAddr
   611	}
   612	
   613	// tlsHost returns the host name to match against the peer's
   614	// TLS certificate.
   615	func (cm *connectMethod) tlsHost() string {
   616		h := cm.targetAddr
   617		if hasPort(h) {
   618			h = h[:strings.LastIndex(h, ":")]
   619		}
   620		return h
   621	}
   622	
   623	// persistConn wraps a connection, usually a persistent one
   624	// (but may be used for non-keep-alive requests as well)
   625	type persistConn struct {
   626		t        *Transport
   627		cacheKey string // its connectMethod.String()
   628		conn     net.Conn
   629		closed   bool                // whether conn has been closed
   630		br       *bufio.Reader       // from conn
   631		bw       *bufio.Writer       // to conn
   632		reqch    chan requestAndChan // written by roundTrip; read by readLoop
   633		writech  chan writeRequest   // written by roundTrip; read by writeLoop
   634		closech  chan struct{}       // broadcast close when readLoop (TCP connection) closes
   635		isProxy  bool
   636	
   637		lk                   sync.Mutex // guards following 3 fields
   638		numExpectedResponses int
   639		broken               bool // an error has happened on this connection; marked broken so it's not reused.
   640		// mutateHeaderFunc is an optional func to modify extra
   641		// headers on each outbound request before it's written. (the
   642		// original Request given to RoundTrip is not modified)
   643		mutateHeaderFunc func(Header)
   644	}
   645	
   646	func (pc *persistConn) isBroken() bool {
   647		pc.lk.Lock()
   648		b := pc.broken
   649		pc.lk.Unlock()
   650		return b
   651	}
   652	
   653	var remoteSideClosedFunc func(error) bool // or nil to use default
   654	
   655	func remoteSideClosed(err error) bool {
   656		if err == io.EOF {
   657			return true
   658		}
   659		if remoteSideClosedFunc != nil {
   660			return remoteSideClosedFunc(err)
   661		}
   662		return false
   663	}
   664	
   665	func (pc *persistConn) readLoop() {
   666		defer close(pc.closech)
   667		alive := true
   668	
   669		for alive {
   670			pb, err := pc.br.Peek(1)
   671	
   672			pc.lk.Lock()
   673			if pc.numExpectedResponses == 0 {
   674				pc.closeLocked()
   675				pc.lk.Unlock()
   676				if len(pb) > 0 {
   677					log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v",
   678						string(pb), err)
   679				}
   680				return
   681			}
   682			pc.lk.Unlock()
   683	
   684			rc := <-pc.reqch
   685	
   686			var resp *Response
   687			if err == nil {
   688				resp, err = ReadResponse(pc.br, rc.req)
   689				if err == nil && resp.StatusCode == 100 {
   690					// Skip any 100-continue for now.
   691					// TODO(bradfitz): if rc.req had "Expect: 100-continue",
   692					// actually block the request body write and signal the
   693					// writeLoop now to begin sending it. (Issue 2184) For now we
   694					// eat it, since we're never expecting one.
   695					resp, err = ReadResponse(pc.br, rc.req)
   696				}
   697			}
   698			hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0
   699	
   700			if err != nil {
   701				pc.close()
   702			} else {
   703				if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" {
   704					resp.Header.Del("Content-Encoding")
   705					resp.Header.Del("Content-Length")
   706					resp.ContentLength = -1
   707					gzReader, zerr := gzip.NewReader(resp.Body)
   708					if zerr != nil {
   709						pc.close()
   710						err = zerr
   711					} else {
   712						resp.Body = &readerAndCloser{gzReader, resp.Body}
   713					}
   714				}
   715				resp.Body = &bodyEOFSignal{body: resp.Body}
   716			}
   717	
   718			if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 {
   719				// Don't do keep-alive on error if either party requested a close
   720				// or we get an unexpected informational (1xx) response.
   721				// StatusCode 100 is already handled above.
   722				alive = false
   723			}
   724	
   725			var waitForBodyRead chan bool
   726			if hasBody {
   727				waitForBodyRead = make(chan bool, 2)
   728				resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
   729					// Sending false here sets alive to
   730					// false and closes the connection
   731					// below.
   732					waitForBodyRead <- false
   733					return nil
   734				}
   735				resp.Body.(*bodyEOFSignal).fn = func(err error) {
   736					alive1 := alive
   737					if err != nil {
   738						alive1 = false
   739					}
   740					if alive1 && !pc.t.putIdleConn(pc) {
   741						alive1 = false
   742					}
   743					if !alive1 || pc.isBroken() {
   744						pc.close()
   745					}
   746					waitForBodyRead <- alive1
   747				}
   748			}
   749	
   750			if alive && !hasBody {
   751				if !pc.t.putIdleConn(pc) {
   752					alive = false
   753				}
   754			}
   755	
   756			rc.ch <- responseAndError{resp, err}
   757	
   758			// Wait for the just-returned response body to be fully consumed
   759			// before we race and peek on the underlying bufio reader.
   760			if waitForBodyRead != nil {
   761				alive = <-waitForBodyRead
   762			}
   763	
   764			pc.t.setReqConn(rc.req, nil)
   765	
   766			if !alive {
   767				pc.close()
   768			}
   769		}
   770	}
   771	
   772	func (pc *persistConn) writeLoop() {
   773		for {
   774			select {
   775			case wr := <-pc.writech:
   776				if pc.isBroken() {
   777					wr.ch <- errors.New("http: can't write HTTP request on broken connection")
   778					continue
   779				}
   780				err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra)
   781				if err == nil {
   782					err = pc.bw.Flush()
   783				}
   784				if err != nil {
   785					pc.markBroken()
   786				}
   787				wr.ch <- err
   788			case <-pc.closech:
   789				return
   790			}
   791		}
   792	}
   793	
   794	type responseAndError struct {
   795		res *Response
   796		err error
   797	}
   798	
   799	type requestAndChan struct {
   800		req *Request
   801		ch  chan responseAndError
   802	
   803		// did the Transport (as opposed to the client code) add an
   804		// Accept-Encoding gzip header? only if it we set it do
   805		// we transparently decode the gzip.
   806		addedGzip bool
   807	}
   808	
   809	// A writeRequest is sent by the readLoop's goroutine to the
   810	// writeLoop's goroutine to write a request while the read loop
   811	// concurrently waits on both the write response and the server's
   812	// reply.
   813	type writeRequest struct {
   814		req *transportRequest
   815		ch  chan<- error
   816	}
   817	
   818	func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
   819		pc.t.setReqConn(req.Request, pc)
   820		pc.lk.Lock()
   821		pc.numExpectedResponses++
   822		headerFn := pc.mutateHeaderFunc
   823		pc.lk.Unlock()
   824	
   825		if headerFn != nil {
   826			headerFn(req.extraHeaders())
   827		}
   828	
   829		// Ask for a compressed version if the caller didn't set their
   830		// own value for Accept-Encoding. We only attempted to
   831		// uncompress the gzip stream if we were the layer that
   832		// requested it.
   833		requestedGzip := false
   834		if !pc.t.DisableCompression && req.Header.Get("Accept-Encoding") == "" {
   835			// Request gzip only, not deflate. Deflate is ambiguous and
   836			// not as universally supported anyway.
   837			// See: http://www.gzip.org/zlib/zlib_faq.html#faq38
   838			requestedGzip = true
   839			req.extraHeaders().Set("Accept-Encoding", "gzip")
   840		}
   841	
   842		// Write the request concurrently with waiting for a response,
   843		// in case the server decides to reply before reading our full
   844		// request body.
   845		writeErrCh := make(chan error, 1)
   846		pc.writech <- writeRequest{req, writeErrCh}
   847	
   848		resc := make(chan responseAndError, 1)
   849		pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
   850	
   851		var re responseAndError
   852		var pconnDeadCh = pc.closech
   853		var failTicker <-chan time.Time
   854		var respHeaderTimer <-chan time.Time
   855	WaitResponse:
   856		for {
   857			select {
   858			case err := <-writeErrCh:
   859				if err != nil {
   860					re = responseAndError{nil, err}
   861					pc.close()
   862					break WaitResponse
   863				}
   864				if d := pc.t.ResponseHeaderTimeout; d > 0 {
   865					respHeaderTimer = time.After(d)
   866				}
   867			case <-pconnDeadCh:
   868				// The persist connection is dead. This shouldn't
   869				// usually happen (only with Connection: close responses
   870				// with no response bodies), but if it does happen it
   871				// means either a) the remote server hung up on us
   872				// prematurely, or b) the readLoop sent us a response &
   873				// closed its closech at roughly the same time, and we
   874				// selected this case first, in which case a response
   875				// might still be coming soon.
   876				//
   877				// We can't avoid the select race in b) by using a unbuffered
   878				// resc channel instead, because then goroutines can
   879				// leak if we exit due to other errors.
   880				pconnDeadCh = nil                               // avoid spinning
   881				failTicker = time.After(100 * time.Millisecond) // arbitrary time to wait for resc
   882			case <-failTicker:
   883				re = responseAndError{err: errors.New("net/http: transport closed before response was received")}
   884				break WaitResponse
   885			case <-respHeaderTimer:
   886				pc.close()
   887				re = responseAndError{err: errors.New("net/http: timeout awaiting response headers")}
   888				break WaitResponse
   889			case re = <-resc:
   890				break WaitResponse
   891			}
   892		}
   893	
   894		pc.lk.Lock()
   895		pc.numExpectedResponses--
   896		pc.lk.Unlock()
   897	
   898		if re.err != nil {
   899			pc.t.setReqConn(req.Request, nil)
   900		}
   901		return re.res, re.err
   902	}
   903	
   904	// markBroken marks a connection as broken (so it's not reused).
   905	// It differs from close in that it doesn't close the underlying
   906	// connection for use when it's still being read.
   907	func (pc *persistConn) markBroken() {
   908		pc.lk.Lock()
   909		defer pc.lk.Unlock()
   910		pc.broken = true
   911	}
   912	
   913	func (pc *persistConn) close() {
   914		pc.lk.Lock()
   915		defer pc.lk.Unlock()
   916		pc.closeLocked()
   917	}
   918	
   919	func (pc *persistConn) closeLocked() {
   920		pc.broken = true
   921		if !pc.closed {
   922			pc.conn.Close()
   923			pc.closed = true
   924		}
   925		pc.mutateHeaderFunc = nil
   926	}
   927	
   928	var portMap = map[string]string{
   929		"http":  "80",
   930		"https": "443",
   931	}
   932	
   933	// canonicalAddr returns url.Host but always with a ":port" suffix
   934	func canonicalAddr(url *url.URL) string {
   935		addr := url.Host
   936		if !hasPort(addr) {
   937			return addr + ":" + portMap[url.Scheme]
   938		}
   939		return addr
   940	}
   941	
   942	// bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
   943	// once, right before its final (error-producing) Read or Close call
   944	// returns. If earlyCloseFn is non-nil and Close is called before
   945	// io.EOF is seen, earlyCloseFn is called instead of fn, and its
   946	// return value is the return value from Close.
   947	type bodyEOFSignal struct {
   948		body         io.ReadCloser
   949		mu           sync.Mutex   // guards following 4 fields
   950		closed       bool         // whether Close has been called
   951		rerr         error        // sticky Read error
   952		fn           func(error)  // error will be nil on Read io.EOF
   953		earlyCloseFn func() error // optional alt Close func used if io.EOF not seen
   954	}
   955	
   956	func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
   957		es.mu.Lock()
   958		closed, rerr := es.closed, es.rerr
   959		es.mu.Unlock()
   960		if closed {
   961			return 0, errors.New("http: read on closed response body")
   962		}
   963		if rerr != nil {
   964			return 0, rerr
   965		}
   966	
   967		n, err = es.body.Read(p)
   968		if err != nil {
   969			es.mu.Lock()
   970			defer es.mu.Unlock()
   971			if es.rerr == nil {
   972				es.rerr = err
   973			}
   974			es.condfn(err)
   975		}
   976		return
   977	}
   978	
   979	func (es *bodyEOFSignal) Close() error {
   980		es.mu.Lock()
   981		defer es.mu.Unlock()
   982		if es.closed {
   983			return nil
   984		}
   985		es.closed = true
   986		if es.earlyCloseFn != nil && es.rerr != io.EOF {
   987			return es.earlyCloseFn()
   988		}
   989		err := es.body.Close()
   990		es.condfn(err)
   991		return err
   992	}
   993	
   994	// caller must hold es.mu.
   995	func (es *bodyEOFSignal) condfn(err error) {
   996		if es.fn == nil {
   997			return
   998		}
   999		if err == io.EOF {
  1000			err = nil
  1001		}
  1002		es.fn(err)
  1003		es.fn = nil
  1004	}
  1005	
  1006	type readerAndCloser struct {
  1007		io.Reader
  1008		io.Closer
  1009	}

View as plain text