Run Format

Source file src/pkg/net/http/transport.go

     1	// Copyright 2011 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// HTTP client implementation. See RFC 2616.
     6	//
     7	// This is the low-level Transport implementation of RoundTripper.
     8	// The high-level interface is in client.go.
     9	
    10	package http
    11	
    12	import (
    13		"bufio"
    14		"compress/gzip"
    15		"crypto/tls"
    16		"errors"
    17		"fmt"
    18		"io"
    19		"log"
    20		"net"
    21		"net/url"
    22		"os"
    23		"strings"
    24		"sync"
    25		"time"
    26	)
    27	
    28	// DefaultTransport is the default implementation of Transport and is
    29	// used by DefaultClient. It establishes network connections as needed
    30	// and caches them for reuse by subsequent calls. It uses HTTP proxies
    31	// as directed by the $HTTP_PROXY and $NO_PROXY (or $http_proxy and
    32	// $no_proxy) environment variables.
    33	var DefaultTransport RoundTripper = &Transport{Proxy: ProxyFromEnvironment}
    34	
    35	// DefaultMaxIdleConnsPerHost is the default value of Transport's
    36	// MaxIdleConnsPerHost.
    37	const DefaultMaxIdleConnsPerHost = 2
    38	
    39	// Transport is an implementation of RoundTripper that supports http,
    40	// https, and http proxies (for either http or https with CONNECT).
    41	// Transport can also cache connections for future re-use.
    42	type Transport struct {
    43		idleMu     sync.Mutex
    44		idleConn   map[string][]*persistConn
    45		idleConnCh map[string]chan *persistConn
    46		reqMu      sync.Mutex
    47		reqConn    map[*Request]*persistConn
    48		altMu      sync.RWMutex
    49		altProto   map[string]RoundTripper // nil or map of URI scheme => RoundTripper
    50	
    51		// Proxy specifies a function to return a proxy for a given
    52		// Request. If the function returns a non-nil error, the
    53		// request is aborted with the provided error.
    54		// If Proxy is nil or returns a nil *URL, no proxy is used.
    55		Proxy func(*Request) (*url.URL, error)
    56	
    57		// Dial specifies the dial function for creating TCP
    58		// connections.
    59		// If Dial is nil, net.Dial is used.
    60		Dial func(network, addr string) (net.Conn, error)
    61	
    62		// TLSClientConfig specifies the TLS configuration to use with
    63		// tls.Client. If nil, the default configuration is used.
    64		TLSClientConfig *tls.Config
    65	
    66		// DisableKeepAlives, if true, prevents re-use of TCP connections
    67		// between different HTTP requests.
    68		DisableKeepAlives bool
    69	
    70		// DisableCompression, if true, prevents the Transport from
    71		// requesting compression with an "Accept-Encoding: gzip"
    72		// request header when the Request contains no existing
    73		// Accept-Encoding value. If the Transport requests gzip on
    74		// its own and gets a gzipped response, it's transparently
    75		// decoded in the Response.Body. However, if the user
    76		// explicitly requested gzip it is not automatically
    77		// uncompressed.
    78		DisableCompression bool
    79	
    80		// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
    81		// (keep-alive) to keep per-host.  If zero,
    82		// DefaultMaxIdleConnsPerHost is used.
    83		MaxIdleConnsPerHost int
    84	
    85		// ResponseHeaderTimeout, if non-zero, specifies the amount of
    86		// time to wait for a server's response headers after fully
    87		// writing the request (including its body, if any). This
    88		// time does not include the time to read the response body.
    89		ResponseHeaderTimeout time.Duration
    90	
    91		// TODO: tunable on global max cached connections
    92		// TODO: tunable on timeout on cached connections
    93	}
    94	
    95	// ProxyFromEnvironment returns the URL of the proxy to use for a
    96	// given request, as indicated by the environment variables
    97	// $HTTP_PROXY and $NO_PROXY (or $http_proxy and $no_proxy).
    98	// An error is returned if the proxy environment is invalid.
    99	// A nil URL and nil error are returned if no proxy is defined in the
   100	// environment, or a proxy should not be used for the given request.
   101	func ProxyFromEnvironment(req *Request) (*url.URL, error) {
   102		proxy := getenvEitherCase("HTTP_PROXY")
   103		if proxy == "" {
   104			return nil, nil
   105		}
   106		if !useProxy(canonicalAddr(req.URL)) {
   107			return nil, nil
   108		}
   109		proxyURL, err := url.Parse(proxy)
   110		if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") {
   111			// proxy was bogus. Try prepending "http://" to it and
   112			// see if that parses correctly. If not, we fall
   113			// through and complain about the original one.
   114			if proxyURL, err := url.Parse("http://" + proxy); err == nil {
   115				return proxyURL, nil
   116			}
   117		}
   118		if err != nil {
   119			return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
   120		}
   121		return proxyURL, nil
   122	}
   123	
   124	// ProxyURL returns a proxy function (for use in a Transport)
   125	// that always returns the same URL.
   126	func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
   127		return func(*Request) (*url.URL, error) {
   128			return fixedURL, nil
   129		}
   130	}
   131	
   132	// transportRequest is a wrapper around a *Request that adds
   133	// optional extra headers to write.
   134	type transportRequest struct {
   135		*Request        // original request, not to be mutated
   136		extra    Header // extra headers to write, or nil
   137	}
   138	
   139	func (tr *transportRequest) extraHeaders() Header {
   140		if tr.extra == nil {
   141			tr.extra = make(Header)
   142		}
   143		return tr.extra
   144	}
   145	
   146	// RoundTrip implements the RoundTripper interface.
   147	//
   148	// For higher-level HTTP client support (such as handling of cookies
   149	// and redirects), see Get, Post, and the Client type.
   150	func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) {
   151		if req.URL == nil {
   152			return nil, errors.New("http: nil Request.URL")
   153		}
   154		if req.Header == nil {
   155			return nil, errors.New("http: nil Request.Header")
   156		}
   157		if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
   158			t.altMu.RLock()
   159			var rt RoundTripper
   160			if t.altProto != nil {
   161				rt = t.altProto[req.URL.Scheme]
   162			}
   163			t.altMu.RUnlock()
   164			if rt == nil {
   165				return nil, &badStringError{"unsupported protocol scheme", req.URL.Scheme}
   166			}
   167			return rt.RoundTrip(req)
   168		}
   169		if req.URL.Host == "" {
   170			return nil, errors.New("http: no Host in request URL")
   171		}
   172		treq := &transportRequest{Request: req}
   173		cm, err := t.connectMethodForRequest(treq)
   174		if err != nil {
   175			return nil, err
   176		}
   177	
   178		// Get the cached or newly-created connection to either the
   179		// host (for http or https), the http proxy, or the http proxy
   180		// pre-CONNECTed to https server.  In any case, we'll be ready
   181		// to send it requests.
   182		pconn, err := t.getConn(cm)
   183		if err != nil {
   184			return nil, err
   185		}
   186	
   187		return pconn.roundTrip(treq)
   188	}
   189	
   190	// RegisterProtocol registers a new protocol with scheme.
   191	// The Transport will pass requests using the given scheme to rt.
   192	// It is rt's responsibility to simulate HTTP request semantics.
   193	//
   194	// RegisterProtocol can be used by other packages to provide
   195	// implementations of protocol schemes like "ftp" or "file".
   196	func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
   197		if scheme == "http" || scheme == "https" {
   198			panic("protocol " + scheme + " already registered")
   199		}
   200		t.altMu.Lock()
   201		defer t.altMu.Unlock()
   202		if t.altProto == nil {
   203			t.altProto = make(map[string]RoundTripper)
   204		}
   205		if _, exists := t.altProto[scheme]; exists {
   206			panic("protocol " + scheme + " already registered")
   207		}
   208		t.altProto[scheme] = rt
   209	}
   210	
   211	// CloseIdleConnections closes any connections which were previously
   212	// connected from previous requests but are now sitting idle in
   213	// a "keep-alive" state. It does not interrupt any connections currently
   214	// in use.
   215	func (t *Transport) CloseIdleConnections() {
   216		t.idleMu.Lock()
   217		m := t.idleConn
   218		t.idleConn = nil
   219		t.idleConnCh = nil
   220		t.idleMu.Unlock()
   221		if m == nil {
   222			return
   223		}
   224		for _, conns := range m {
   225			for _, pconn := range conns {
   226				pconn.close()
   227			}
   228		}
   229	}
   230	
   231	// CancelRequest cancels an in-flight request by closing its
   232	// connection.
   233	func (t *Transport) CancelRequest(req *Request) {
   234		t.reqMu.Lock()
   235		pc := t.reqConn[req]
   236		t.reqMu.Unlock()
   237		if pc != nil {
   238			pc.conn.Close()
   239		}
   240	}
   241	
   242	//
   243	// Private implementation past this point.
   244	//
   245	
   246	func getenvEitherCase(k string) string {
   247		if v := os.Getenv(strings.ToUpper(k)); v != "" {
   248			return v
   249		}
   250		return os.Getenv(strings.ToLower(k))
   251	}
   252	
   253	func (t *Transport) connectMethodForRequest(treq *transportRequest) (*connectMethod, error) {
   254		cm := &connectMethod{
   255			targetScheme: treq.URL.Scheme,
   256			targetAddr:   canonicalAddr(treq.URL),
   257		}
   258		if t.Proxy != nil {
   259			var err error
   260			cm.proxyURL, err = t.Proxy(treq.Request)
   261			if err != nil {
   262				return nil, err
   263			}
   264		}
   265		return cm, nil
   266	}
   267	
   268	// proxyAuth returns the Proxy-Authorization header to set
   269	// on requests, if applicable.
   270	func (cm *connectMethod) proxyAuth() string {
   271		if cm.proxyURL == nil {
   272			return ""
   273		}
   274		if u := cm.proxyURL.User; u != nil {
   275			username := u.Username()
   276			password, _ := u.Password()
   277			return "Basic " + basicAuth(username, password)
   278		}
   279		return ""
   280	}
   281	
   282	// putIdleConn adds pconn to the list of idle persistent connections awaiting
   283	// a new request.
   284	// If pconn is no longer needed or not in a good state, putIdleConn
   285	// returns false.
   286	func (t *Transport) putIdleConn(pconn *persistConn) bool {
   287		if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
   288			pconn.close()
   289			return false
   290		}
   291		if pconn.isBroken() {
   292			return false
   293		}
   294		key := pconn.cacheKey
   295		max := t.MaxIdleConnsPerHost
   296		if max == 0 {
   297			max = DefaultMaxIdleConnsPerHost
   298		}
   299		t.idleMu.Lock()
   300	
   301		waitingDialer := t.idleConnCh[key]
   302		select {
   303		case waitingDialer <- pconn:
   304			// We're done with this pconn and somebody else is
   305			// currently waiting for a conn of this type (they're
   306			// actively dialing, but this conn is ready
   307			// first). Chrome calls this socket late binding.  See
   308			// https://insouciant.org/tech/connection-management-in-chromium/
   309			t.idleMu.Unlock()
   310			return true
   311		default:
   312			if waitingDialer != nil {
   313				// They had populated this, but their dial won
   314				// first, so we can clean up this map entry.
   315				delete(t.idleConnCh, key)
   316			}
   317		}
   318		if t.idleConn == nil {
   319			t.idleConn = make(map[string][]*persistConn)
   320		}
   321		if len(t.idleConn[key]) >= max {
   322			t.idleMu.Unlock()
   323			pconn.close()
   324			return false
   325		}
   326		for _, exist := range t.idleConn[key] {
   327			if exist == pconn {
   328				log.Fatalf("dup idle pconn %p in freelist", pconn)
   329			}
   330		}
   331		t.idleConn[key] = append(t.idleConn[key], pconn)
   332		t.idleMu.Unlock()
   333		return true
   334	}
   335	
   336	// getIdleConnCh returns a channel to receive and return idle
   337	// persistent connection for the given connectMethod.
   338	// It may return nil, if persistent connections are not being used.
   339	func (t *Transport) getIdleConnCh(cm *connectMethod) chan *persistConn {
   340		if t.DisableKeepAlives {
   341			return nil
   342		}
   343		key := cm.key()
   344		t.idleMu.Lock()
   345		defer t.idleMu.Unlock()
   346		if t.idleConnCh == nil {
   347			t.idleConnCh = make(map[string]chan *persistConn)
   348		}
   349		ch, ok := t.idleConnCh[key]
   350		if !ok {
   351			ch = make(chan *persistConn)
   352			t.idleConnCh[key] = ch
   353		}
   354		return ch
   355	}
   356	
   357	func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) {
   358		key := cm.key()
   359		t.idleMu.Lock()
   360		defer t.idleMu.Unlock()
   361		if t.idleConn == nil {
   362			return nil
   363		}
   364		for {
   365			pconns, ok := t.idleConn[key]
   366			if !ok {
   367				return nil
   368			}
   369			if len(pconns) == 1 {
   370				pconn = pconns[0]
   371				delete(t.idleConn, key)
   372			} else {
   373				// 2 or more cached connections; pop last
   374				// TODO: queue?
   375				pconn = pconns[len(pconns)-1]
   376				t.idleConn[key] = pconns[0 : len(pconns)-1]
   377			}
   378			if !pconn.isBroken() {
   379				return
   380			}
   381		}
   382	}
   383	
   384	func (t *Transport) setReqConn(r *Request, pc *persistConn) {
   385		t.reqMu.Lock()
   386		defer t.reqMu.Unlock()
   387		if t.reqConn == nil {
   388			t.reqConn = make(map[*Request]*persistConn)
   389		}
   390		if pc != nil {
   391			t.reqConn[r] = pc
   392		} else {
   393			delete(t.reqConn, r)
   394		}
   395	}
   396	
   397	func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
   398		if t.Dial != nil {
   399			return t.Dial(network, addr)
   400		}
   401		return net.Dial(network, addr)
   402	}
   403	
   404	// getConn dials and creates a new persistConn to the target as
   405	// specified in the connectMethod.  This includes doing a proxy CONNECT
   406	// and/or setting up TLS.  If this doesn't return an error, the persistConn
   407	// is ready to write requests to.
   408	func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
   409		if pc := t.getIdleConn(cm); pc != nil {
   410			return pc, nil
   411		}
   412	
   413		type dialRes struct {
   414			pc  *persistConn
   415			err error
   416		}
   417		dialc := make(chan dialRes)
   418		go func() {
   419			pc, err := t.dialConn(cm)
   420			dialc <- dialRes{pc, err}
   421		}()
   422	
   423		idleConnCh := t.getIdleConnCh(cm)
   424		select {
   425		case v := <-dialc:
   426			// Our dial finished.
   427			return v.pc, v.err
   428		case pc := <-idleConnCh:
   429			// Another request finished first and its net.Conn
   430			// became available before our dial. Or somebody
   431			// else's dial that they didn't use.
   432			// But our dial is still going, so give it away
   433			// when it finishes:
   434			go func() {
   435				if v := <-dialc; v.err == nil {
   436					t.putIdleConn(v.pc)
   437				}
   438			}()
   439			return pc, nil
   440		}
   441	}
   442	
   443	func (t *Transport) dialConn(cm *connectMethod) (*persistConn, error) {
   444		conn, err := t.dial("tcp", cm.addr())
   445		if err != nil {
   446			if cm.proxyURL != nil {
   447				err = fmt.Errorf("http: error connecting to proxy %s: %v", cm.proxyURL, err)
   448			}
   449			return nil, err
   450		}
   451	
   452		pa := cm.proxyAuth()
   453	
   454		pconn := &persistConn{
   455			t:        t,
   456			cacheKey: cm.key(),
   457			conn:     conn,
   458			reqch:    make(chan requestAndChan, 50),
   459			writech:  make(chan writeRequest, 50),
   460			closech:  make(chan struct{}),
   461		}
   462	
   463		switch {
   464		case cm.proxyURL == nil:
   465			// Do nothing.
   466		case cm.targetScheme == "http":
   467			pconn.isProxy = true
   468			if pa != "" {
   469				pconn.mutateHeaderFunc = func(h Header) {
   470					h.Set("Proxy-Authorization", pa)
   471				}
   472			}
   473		case cm.targetScheme == "https":
   474			connectReq := &Request{
   475				Method: "CONNECT",
   476				URL:    &url.URL{Opaque: cm.targetAddr},
   477				Host:   cm.targetAddr,
   478				Header: make(Header),
   479			}
   480			if pa != "" {
   481				connectReq.Header.Set("Proxy-Authorization", pa)
   482			}
   483			connectReq.Write(conn)
   484	
   485			// Read response.
   486			// Okay to use and discard buffered reader here, because
   487			// TLS server will not speak until spoken to.
   488			br := bufio.NewReader(conn)
   489			resp, err := ReadResponse(br, connectReq)
   490			if err != nil {
   491				conn.Close()
   492				return nil, err
   493			}
   494			if resp.StatusCode != 200 {
   495				f := strings.SplitN(resp.Status, " ", 2)
   496				conn.Close()
   497				return nil, errors.New(f[1])
   498			}
   499		}
   500	
   501		if cm.targetScheme == "https" {
   502			// Initiate TLS and check remote host name against certificate.
   503			cfg := t.TLSClientConfig
   504			if cfg == nil || cfg.ServerName == "" {
   505				host := cm.tlsHost()
   506				if cfg == nil {
   507					cfg = &tls.Config{ServerName: host}
   508				} else {
   509					clone := *cfg // shallow clone
   510					clone.ServerName = host
   511					cfg = &clone
   512				}
   513			}
   514			conn = tls.Client(conn, cfg)
   515			if err = conn.(*tls.Conn).Handshake(); err != nil {
   516				return nil, err
   517			}
   518			if !cfg.InsecureSkipVerify {
   519				if err = conn.(*tls.Conn).VerifyHostname(cfg.ServerName); err != nil {
   520					return nil, err
   521				}
   522			}
   523			pconn.conn = conn
   524		}
   525	
   526		pconn.br = bufio.NewReader(pconn.conn)
   527		pconn.bw = bufio.NewWriter(pconn.conn)
   528		go pconn.readLoop()
   529		go pconn.writeLoop()
   530		return pconn, nil
   531	}
   532	
   533	// useProxy returns true if requests to addr should use a proxy,
   534	// according to the NO_PROXY or no_proxy environment variable.
   535	// addr is always a canonicalAddr with a host and port.
   536	func useProxy(addr string) bool {
   537		if len(addr) == 0 {
   538			return true
   539		}
   540		host, _, err := net.SplitHostPort(addr)
   541		if err != nil {
   542			return false
   543		}
   544		if host == "localhost" {
   545			return false
   546		}
   547		if ip := net.ParseIP(host); ip != nil {
   548			if ip.IsLoopback() {
   549				return false
   550			}
   551		}
   552	
   553		no_proxy := getenvEitherCase("NO_PROXY")
   554		if no_proxy == "*" {
   555			return false
   556		}
   557	
   558		addr = strings.ToLower(strings.TrimSpace(addr))
   559		if hasPort(addr) {
   560			addr = addr[:strings.LastIndex(addr, ":")]
   561		}
   562	
   563		for _, p := range strings.Split(no_proxy, ",") {
   564			p = strings.ToLower(strings.TrimSpace(p))
   565			if len(p) == 0 {
   566				continue
   567			}
   568			if hasPort(p) {
   569				p = p[:strings.LastIndex(p, ":")]
   570			}
   571			if addr == p {
   572				return false
   573			}
   574			if p[0] == '.' && (strings.HasSuffix(addr, p) || addr == p[1:]) {
   575				// no_proxy ".foo.com" matches "bar.foo.com" or "foo.com"
   576				return false
   577			}
   578			if p[0] != '.' && strings.HasSuffix(addr, p) && addr[len(addr)-len(p)-1] == '.' {
   579				// no_proxy "foo.com" matches "bar.foo.com"
   580				return false
   581			}
   582		}
   583		return true
   584	}
   585	
   586	// connectMethod is the map key (in its String form) for keeping persistent
   587	// TCP connections alive for subsequent HTTP requests.
   588	//
   589	// A connect method may be of the following types:
   590	//
   591	// Cache key form                Description
   592	// -----------------             -------------------------
   593	// ||http|foo.com                http directly to server, no proxy
   594	// ||https|foo.com               https directly to server, no proxy
   595	// http://proxy.com|https|foo.com  http to proxy, then CONNECT to foo.com
   596	// http://proxy.com|http           http to proxy, http to anywhere after that
   597	//
   598	// Note: no support to https to the proxy yet.
   599	//
   600	type connectMethod struct {
   601		proxyURL     *url.URL // nil for no proxy, else full proxy URL
   602		targetScheme string   // "http" or "https"
   603		targetAddr   string   // Not used if proxy + http targetScheme (4th example in table)
   604	}
   605	
   606	func (ck *connectMethod) key() string {
   607		return ck.String() // TODO: use a struct type instead
   608	}
   609	
   610	func (ck *connectMethod) String() string {
   611		proxyStr := ""
   612		targetAddr := ck.targetAddr
   613		if ck.proxyURL != nil {
   614			proxyStr = ck.proxyURL.String()
   615			if ck.targetScheme == "http" {
   616				targetAddr = ""
   617			}
   618		}
   619		return strings.Join([]string{proxyStr, ck.targetScheme, targetAddr}, "|")
   620	}
   621	
   622	// addr returns the first hop "host:port" to which we need to TCP connect.
   623	func (cm *connectMethod) addr() string {
   624		if cm.proxyURL != nil {
   625			return canonicalAddr(cm.proxyURL)
   626		}
   627		return cm.targetAddr
   628	}
   629	
   630	// tlsHost returns the host name to match against the peer's
   631	// TLS certificate.
   632	func (cm *connectMethod) tlsHost() string {
   633		h := cm.targetAddr
   634		if hasPort(h) {
   635			h = h[:strings.LastIndex(h, ":")]
   636		}
   637		return h
   638	}
   639	
   640	// persistConn wraps a connection, usually a persistent one
   641	// (but may be used for non-keep-alive requests as well)
   642	type persistConn struct {
   643		t        *Transport
   644		cacheKey string // its connectMethod.String()
   645		conn     net.Conn
   646		closed   bool                // whether conn has been closed
   647		br       *bufio.Reader       // from conn
   648		bw       *bufio.Writer       // to conn
   649		reqch    chan requestAndChan // written by roundTrip; read by readLoop
   650		writech  chan writeRequest   // written by roundTrip; read by writeLoop
   651		closech  chan struct{}       // broadcast close when readLoop (TCP connection) closes
   652		isProxy  bool
   653	
   654		lk                   sync.Mutex // guards following 3 fields
   655		numExpectedResponses int
   656		broken               bool // an error has happened on this connection; marked broken so it's not reused.
   657		// mutateHeaderFunc is an optional func to modify extra
   658		// headers on each outbound request before it's written. (the
   659		// original Request given to RoundTrip is not modified)
   660		mutateHeaderFunc func(Header)
   661	}
   662	
   663	func (pc *persistConn) isBroken() bool {
   664		pc.lk.Lock()
   665		b := pc.broken
   666		pc.lk.Unlock()
   667		return b
   668	}
   669	
   670	var remoteSideClosedFunc func(error) bool // or nil to use default
   671	
   672	func remoteSideClosed(err error) bool {
   673		if err == io.EOF {
   674			return true
   675		}
   676		if remoteSideClosedFunc != nil {
   677			return remoteSideClosedFunc(err)
   678		}
   679		return false
   680	}
   681	
   682	func (pc *persistConn) readLoop() {
   683		defer close(pc.closech)
   684		alive := true
   685	
   686		for alive {
   687			pb, err := pc.br.Peek(1)
   688	
   689			pc.lk.Lock()
   690			if pc.numExpectedResponses == 0 {
   691				pc.closeLocked()
   692				pc.lk.Unlock()
   693				if len(pb) > 0 {
   694					log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v",
   695						string(pb), err)
   696				}
   697				return
   698			}
   699			pc.lk.Unlock()
   700	
   701			rc := <-pc.reqch
   702	
   703			var resp *Response
   704			if err == nil {
   705				resp, err = ReadResponse(pc.br, rc.req)
   706				if err == nil && resp.StatusCode == 100 {
   707					// Skip any 100-continue for now.
   708					// TODO(bradfitz): if rc.req had "Expect: 100-continue",
   709					// actually block the request body write and signal the
   710					// writeLoop now to begin sending it. (Issue 2184) For now we
   711					// eat it, since we're never expecting one.
   712					resp, err = ReadResponse(pc.br, rc.req)
   713				}
   714			}
   715			hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0
   716	
   717			if err != nil {
   718				pc.close()
   719			} else {
   720				if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" {
   721					resp.Header.Del("Content-Encoding")
   722					resp.Header.Del("Content-Length")
   723					resp.ContentLength = -1
   724					gzReader, zerr := gzip.NewReader(resp.Body)
   725					if zerr != nil {
   726						pc.close()
   727						err = zerr
   728					} else {
   729						resp.Body = &readerAndCloser{gzReader, resp.Body}
   730					}
   731				}
   732				resp.Body = &bodyEOFSignal{body: resp.Body}
   733			}
   734	
   735			if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 {
   736				// Don't do keep-alive on error if either party requested a close
   737				// or we get an unexpected informational (1xx) response.
   738				// StatusCode 100 is already handled above.
   739				alive = false
   740			}
   741	
   742			var waitForBodyRead chan bool
   743			if hasBody {
   744				waitForBodyRead = make(chan bool, 2)
   745				resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
   746					// Sending false here sets alive to
   747					// false and closes the connection
   748					// below.
   749					waitForBodyRead <- false
   750					return nil
   751				}
   752				resp.Body.(*bodyEOFSignal).fn = func(err error) {
   753					alive1 := alive
   754					if err != nil {
   755						alive1 = false
   756					}
   757					if alive1 && !pc.t.putIdleConn(pc) {
   758						alive1 = false
   759					}
   760					if !alive1 || pc.isBroken() {
   761						pc.close()
   762					}
   763					waitForBodyRead <- alive1
   764				}
   765			}
   766	
   767			if alive && !hasBody {
   768				if !pc.t.putIdleConn(pc) {
   769					alive = false
   770				}
   771			}
   772	
   773			rc.ch <- responseAndError{resp, err}
   774	
   775			// Wait for the just-returned response body to be fully consumed
   776			// before we race and peek on the underlying bufio reader.
   777			if waitForBodyRead != nil {
   778				alive = <-waitForBodyRead
   779			}
   780	
   781			pc.t.setReqConn(rc.req, nil)
   782	
   783			if !alive {
   784				pc.close()
   785			}
   786		}
   787	}
   788	
   789	func (pc *persistConn) writeLoop() {
   790		for {
   791			select {
   792			case wr := <-pc.writech:
   793				if pc.isBroken() {
   794					wr.ch <- errors.New("http: can't write HTTP request on broken connection")
   795					continue
   796				}
   797				err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra)
   798				if err == nil {
   799					err = pc.bw.Flush()
   800				}
   801				if err != nil {
   802					pc.markBroken()
   803				}
   804				wr.ch <- err
   805			case <-pc.closech:
   806				return
   807			}
   808		}
   809	}
   810	
   811	type responseAndError struct {
   812		res *Response
   813		err error
   814	}
   815	
   816	type requestAndChan struct {
   817		req *Request
   818		ch  chan responseAndError
   819	
   820		// did the Transport (as opposed to the client code) add an
   821		// Accept-Encoding gzip header? only if it we set it do
   822		// we transparently decode the gzip.
   823		addedGzip bool
   824	}
   825	
   826	// A writeRequest is sent by the readLoop's goroutine to the
   827	// writeLoop's goroutine to write a request while the read loop
   828	// concurrently waits on both the write response and the server's
   829	// reply.
   830	type writeRequest struct {
   831		req *transportRequest
   832		ch  chan<- error
   833	}
   834	
   835	func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
   836		pc.t.setReqConn(req.Request, pc)
   837		pc.lk.Lock()
   838		pc.numExpectedResponses++
   839		headerFn := pc.mutateHeaderFunc
   840		pc.lk.Unlock()
   841	
   842		if headerFn != nil {
   843			headerFn(req.extraHeaders())
   844		}
   845	
   846		// Ask for a compressed version if the caller didn't set their
   847		// own value for Accept-Encoding. We only attempted to
   848		// uncompress the gzip stream if we were the layer that
   849		// requested it.
   850		requestedGzip := false
   851		if !pc.t.DisableCompression && req.Header.Get("Accept-Encoding") == "" && req.Method != "HEAD" {
   852			// Request gzip only, not deflate. Deflate is ambiguous and
   853			// not as universally supported anyway.
   854			// See: http://www.gzip.org/zlib/zlib_faq.html#faq38
   855			//
   856			// Note that we don't request this for HEAD requests,
   857			// due to a bug in nginx:
   858			//   http://trac.nginx.org/nginx/ticket/358
   859			//   http://golang.org/issue/5522
   860			requestedGzip = true
   861			req.extraHeaders().Set("Accept-Encoding", "gzip")
   862		}
   863	
   864		// Write the request concurrently with waiting for a response,
   865		// in case the server decides to reply before reading our full
   866		// request body.
   867		writeErrCh := make(chan error, 1)
   868		pc.writech <- writeRequest{req, writeErrCh}
   869	
   870		resc := make(chan responseAndError, 1)
   871		pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
   872	
   873		var re responseAndError
   874		var pconnDeadCh = pc.closech
   875		var failTicker <-chan time.Time
   876		var respHeaderTimer <-chan time.Time
   877	WaitResponse:
   878		for {
   879			select {
   880			case err := <-writeErrCh:
   881				if err != nil {
   882					re = responseAndError{nil, err}
   883					pc.close()
   884					break WaitResponse
   885				}
   886				if d := pc.t.ResponseHeaderTimeout; d > 0 {
   887					respHeaderTimer = time.After(d)
   888				}
   889			case <-pconnDeadCh:
   890				// The persist connection is dead. This shouldn't
   891				// usually happen (only with Connection: close responses
   892				// with no response bodies), but if it does happen it
   893				// means either a) the remote server hung up on us
   894				// prematurely, or b) the readLoop sent us a response &
   895				// closed its closech at roughly the same time, and we
   896				// selected this case first, in which case a response
   897				// might still be coming soon.
   898				//
   899				// We can't avoid the select race in b) by using a unbuffered
   900				// resc channel instead, because then goroutines can
   901				// leak if we exit due to other errors.
   902				pconnDeadCh = nil                               // avoid spinning
   903				failTicker = time.After(100 * time.Millisecond) // arbitrary time to wait for resc
   904			case <-failTicker:
   905				re = responseAndError{err: errors.New("net/http: transport closed before response was received")}
   906				break WaitResponse
   907			case <-respHeaderTimer:
   908				pc.close()
   909				re = responseAndError{err: errors.New("net/http: timeout awaiting response headers")}
   910				break WaitResponse
   911			case re = <-resc:
   912				break WaitResponse
   913			}
   914		}
   915	
   916		pc.lk.Lock()
   917		pc.numExpectedResponses--
   918		pc.lk.Unlock()
   919	
   920		if re.err != nil {
   921			pc.t.setReqConn(req.Request, nil)
   922		}
   923		return re.res, re.err
   924	}
   925	
   926	// markBroken marks a connection as broken (so it's not reused).
   927	// It differs from close in that it doesn't close the underlying
   928	// connection for use when it's still being read.
   929	func (pc *persistConn) markBroken() {
   930		pc.lk.Lock()
   931		defer pc.lk.Unlock()
   932		pc.broken = true
   933	}
   934	
   935	func (pc *persistConn) close() {
   936		pc.lk.Lock()
   937		defer pc.lk.Unlock()
   938		pc.closeLocked()
   939	}
   940	
   941	func (pc *persistConn) closeLocked() {
   942		pc.broken = true
   943		if !pc.closed {
   944			pc.conn.Close()
   945			pc.closed = true
   946		}
   947		pc.mutateHeaderFunc = nil
   948	}
   949	
   950	var portMap = map[string]string{
   951		"http":  "80",
   952		"https": "443",
   953	}
   954	
   955	// canonicalAddr returns url.Host but always with a ":port" suffix
   956	func canonicalAddr(url *url.URL) string {
   957		addr := url.Host
   958		if !hasPort(addr) {
   959			return addr + ":" + portMap[url.Scheme]
   960		}
   961		return addr
   962	}
   963	
   964	// bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
   965	// once, right before its final (error-producing) Read or Close call
   966	// returns. If earlyCloseFn is non-nil and Close is called before
   967	// io.EOF is seen, earlyCloseFn is called instead of fn, and its
   968	// return value is the return value from Close.
   969	type bodyEOFSignal struct {
   970		body         io.ReadCloser
   971		mu           sync.Mutex   // guards following 4 fields
   972		closed       bool         // whether Close has been called
   973		rerr         error        // sticky Read error
   974		fn           func(error)  // error will be nil on Read io.EOF
   975		earlyCloseFn func() error // optional alt Close func used if io.EOF not seen
   976	}
   977	
   978	func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
   979		es.mu.Lock()
   980		closed, rerr := es.closed, es.rerr
   981		es.mu.Unlock()
   982		if closed {
   983			return 0, errors.New("http: read on closed response body")
   984		}
   985		if rerr != nil {
   986			return 0, rerr
   987		}
   988	
   989		n, err = es.body.Read(p)
   990		if err != nil {
   991			es.mu.Lock()
   992			defer es.mu.Unlock()
   993			if es.rerr == nil {
   994				es.rerr = err
   995			}
   996			es.condfn(err)
   997		}
   998		return
   999	}
  1000	
  1001	func (es *bodyEOFSignal) Close() error {
  1002		es.mu.Lock()
  1003		defer es.mu.Unlock()
  1004		if es.closed {
  1005			return nil
  1006		}
  1007		es.closed = true
  1008		if es.earlyCloseFn != nil && es.rerr != io.EOF {
  1009			return es.earlyCloseFn()
  1010		}
  1011		err := es.body.Close()
  1012		es.condfn(err)
  1013		return err
  1014	}
  1015	
  1016	// caller must hold es.mu.
  1017	func (es *bodyEOFSignal) condfn(err error) {
  1018		if es.fn == nil {
  1019			return
  1020		}
  1021		if err == io.EOF {
  1022			err = nil
  1023		}
  1024		es.fn(err)
  1025		es.fn = nil
  1026	}
  1027	
  1028	type readerAndCloser struct {
  1029		io.Reader
  1030		io.Closer
  1031	}

View as plain text