The Go Programming Language

Source file src/pkg/http/persist.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package http
     6	
     7	import (
     8		"bufio"
     9		"io"
    10		"net"
    11		"net/textproto"
    12		"os"
    13		"sync"
    14	)
    15	
    16	var (
    17		ErrPersistEOF = &ProtocolError{"persistent connection closed"}
    18		ErrPipeline   = &ProtocolError{"pipeline error"}
    19	)
    20	
    21	// A ServerConn reads requests and sends responses over an underlying
    22	// connection, until the HTTP keepalive logic commands an end. ServerConn
    23	// also allows hijacking the underlying connection by calling Hijack
    24	// to regain control over the connection. ServerConn supports pipe-lining,
    25	// i.e. requests can be read out of sync (but in the same order) while the
    26	// respective responses are sent.
    27	//
    28	// ServerConn is low-level and should not be needed by most applications.
    29	// See Server.
    30	type ServerConn struct {
    31		lk              sync.Mutex // read-write protects the following fields
    32		c               net.Conn
    33		r               *bufio.Reader
    34		re, we          os.Error // read/write errors
    35		lastbody        io.ReadCloser
    36		nread, nwritten int
    37		pipereq         map[*Request]uint
    38	
    39		pipe textproto.Pipeline
    40	}
    41	
    42	// NewServerConn returns a new ServerConn reading and writing c.  If r is not
    43	// nil, it is the buffer to use when reading c.
    44	func NewServerConn(c net.Conn, r *bufio.Reader) *ServerConn {
    45		if r == nil {
    46			r = bufio.NewReader(c)
    47		}
    48		return &ServerConn{c: c, r: r, pipereq: make(map[*Request]uint)}
    49	}
    50	
    51	// Hijack detaches the ServerConn and returns the underlying connection as well
    52	// as the read-side bufio which may have some left over data. Hijack may be
    53	// called before Read has signaled the end of the keep-alive logic. The user
    54	// should not call Hijack while Read or Write is in progress.
    55	func (sc *ServerConn) Hijack() (c net.Conn, r *bufio.Reader) {
    56		sc.lk.Lock()
    57		defer sc.lk.Unlock()
    58		c = sc.c
    59		r = sc.r
    60		sc.c = nil
    61		sc.r = nil
    62		return
    63	}
    64	
    65	// Close calls Hijack and then also closes the underlying connection
    66	func (sc *ServerConn) Close() os.Error {
    67		c, _ := sc.Hijack()
    68		if c != nil {
    69			return c.Close()
    70		}
    71		return nil
    72	}
    73	
    74	// Read returns the next request on the wire. An ErrPersistEOF is returned if
    75	// it is gracefully determined that there are no more requests (e.g. after the
    76	// first request on an HTTP/1.0 connection, or after a Connection:close on a
    77	// HTTP/1.1 connection).
    78	func (sc *ServerConn) Read() (req *Request, err os.Error) {
    79	
    80		// Ensure ordered execution of Reads and Writes
    81		id := sc.pipe.Next()
    82		sc.pipe.StartRequest(id)
    83		defer func() {
    84			sc.pipe.EndRequest(id)
    85			if req == nil {
    86				sc.pipe.StartResponse(id)
    87				sc.pipe.EndResponse(id)
    88			} else {
    89				// Remember the pipeline id of this request
    90				sc.lk.Lock()
    91				sc.pipereq[req] = id
    92				sc.lk.Unlock()
    93			}
    94		}()
    95	
    96		sc.lk.Lock()
    97		if sc.we != nil { // no point receiving if write-side broken or closed
    98			defer sc.lk.Unlock()
    99			return nil, sc.we
   100		}
   101		if sc.re != nil {
   102			defer sc.lk.Unlock()
   103			return nil, sc.re
   104		}
   105		if sc.r == nil { // connection closed by user in the meantime
   106			defer sc.lk.Unlock()
   107			return nil, os.EBADF
   108		}
   109		r := sc.r
   110		lastbody := sc.lastbody
   111		sc.lastbody = nil
   112		sc.lk.Unlock()
   113	
   114		// Make sure body is fully consumed, even if user does not call body.Close
   115		if lastbody != nil {
   116			// body.Close is assumed to be idempotent and multiple calls to
   117			// it should return the error that its first invocation
   118			// returned.
   119			err = lastbody.Close()
   120			if err != nil {
   121				sc.lk.Lock()
   122				defer sc.lk.Unlock()
   123				sc.re = err
   124				return nil, err
   125			}
   126		}
   127	
   128		req, err = ReadRequest(r)
   129		sc.lk.Lock()
   130		defer sc.lk.Unlock()
   131		if err != nil {
   132			if err == io.ErrUnexpectedEOF {
   133				// A close from the opposing client is treated as a
   134				// graceful close, even if there was some unparse-able
   135				// data before the close.
   136				sc.re = ErrPersistEOF
   137				return nil, sc.re
   138			} else {
   139				sc.re = err
   140				return req, err
   141			}
   142		}
   143		sc.lastbody = req.Body
   144		sc.nread++
   145		if req.Close {
   146			sc.re = ErrPersistEOF
   147			return req, sc.re
   148		}
   149		return req, err
   150	}
   151	
   152	// Pending returns the number of unanswered requests
   153	// that have been received on the connection.
   154	func (sc *ServerConn) Pending() int {
   155		sc.lk.Lock()
   156		defer sc.lk.Unlock()
   157		return sc.nread - sc.nwritten
   158	}
   159	
   160	// Write writes resp in response to req. To close the connection gracefully, set the
   161	// Response.Close field to true. Write should be considered operational until
   162	// it returns an error, regardless of any errors returned on the Read side.
   163	func (sc *ServerConn) Write(req *Request, resp *Response) os.Error {
   164	
   165		// Retrieve the pipeline ID of this request/response pair
   166		sc.lk.Lock()
   167		id, ok := sc.pipereq[req]
   168		sc.pipereq[req] = 0, false
   169		if !ok {
   170			sc.lk.Unlock()
   171			return ErrPipeline
   172		}
   173		sc.lk.Unlock()
   174	
   175		// Ensure pipeline order
   176		sc.pipe.StartResponse(id)
   177		defer sc.pipe.EndResponse(id)
   178	
   179		sc.lk.Lock()
   180		if sc.we != nil {
   181			defer sc.lk.Unlock()
   182			return sc.we
   183		}
   184		if sc.c == nil { // connection closed by user in the meantime
   185			defer sc.lk.Unlock()
   186			return os.EBADF
   187		}
   188		c := sc.c
   189		if sc.nread <= sc.nwritten {
   190			defer sc.lk.Unlock()
   191			return os.NewError("persist server pipe count")
   192		}
   193		if resp.Close {
   194			// After signaling a keep-alive close, any pipelined unread
   195			// requests will be lost. It is up to the user to drain them
   196			// before signaling.
   197			sc.re = ErrPersistEOF
   198		}
   199		sc.lk.Unlock()
   200	
   201		err := resp.Write(c)
   202		sc.lk.Lock()
   203		defer sc.lk.Unlock()
   204		if err != nil {
   205			sc.we = err
   206			return err
   207		}
   208		sc.nwritten++
   209	
   210		return nil
   211	}
   212	
   213	// A ClientConn sends request and receives headers over an underlying
   214	// connection, while respecting the HTTP keepalive logic. ClientConn
   215	// supports hijacking the connection calling Hijack to
   216	// regain control of the underlying net.Conn and deal with it as desired.
   217	//
   218	// ClientConn is low-level and should not be needed by most applications.
   219	// See Client.
   220	type ClientConn struct {
   221		lk              sync.Mutex // read-write protects the following fields
   222		c               net.Conn
   223		r               *bufio.Reader
   224		re, we          os.Error // read/write errors
   225		lastbody        io.ReadCloser
   226		nread, nwritten int
   227		pipereq         map[*Request]uint
   228	
   229		pipe     textproto.Pipeline
   230		writeReq func(*Request, io.Writer) os.Error
   231	}
   232	
   233	// NewClientConn returns a new ClientConn reading and writing c.  If r is not
   234	// nil, it is the buffer to use when reading c.
   235	func NewClientConn(c net.Conn, r *bufio.Reader) *ClientConn {
   236		if r == nil {
   237			r = bufio.NewReader(c)
   238		}
   239		return &ClientConn{
   240			c:        c,
   241			r:        r,
   242			pipereq:  make(map[*Request]uint),
   243			writeReq: (*Request).Write,
   244		}
   245	}
   246	
   247	// NewProxyClientConn works like NewClientConn but writes Requests
   248	// using Request's WriteProxy method.
   249	func NewProxyClientConn(c net.Conn, r *bufio.Reader) *ClientConn {
   250		cc := NewClientConn(c, r)
   251		cc.writeReq = (*Request).WriteProxy
   252		return cc
   253	}
   254	
   255	// Hijack detaches the ClientConn and returns the underlying connection as well
   256	// as the read-side bufio which may have some left over data. Hijack may be
   257	// called before the user or Read have signaled the end of the keep-alive
   258	// logic. The user should not call Hijack while Read or Write is in progress.
   259	func (cc *ClientConn) Hijack() (c net.Conn, r *bufio.Reader) {
   260		cc.lk.Lock()
   261		defer cc.lk.Unlock()
   262		c = cc.c
   263		r = cc.r
   264		cc.c = nil
   265		cc.r = nil
   266		return
   267	}
   268	
   269	// Close calls Hijack and then also closes the underlying connection
   270	func (cc *ClientConn) Close() os.Error {
   271		c, _ := cc.Hijack()
   272		if c != nil {
   273			return c.Close()
   274		}
   275		return nil
   276	}
   277	
   278	// Write writes a request. An ErrPersistEOF error is returned if the connection
   279	// has been closed in an HTTP keepalive sense. If req.Close equals true, the
   280	// keepalive connection is logically closed after this request and the opposing
   281	// server is informed. An ErrUnexpectedEOF indicates the remote closed the
   282	// underlying TCP connection, which is usually considered as graceful close.
   283	func (cc *ClientConn) Write(req *Request) (err os.Error) {
   284	
   285		// Ensure ordered execution of Writes
   286		id := cc.pipe.Next()
   287		cc.pipe.StartRequest(id)
   288		defer func() {
   289			cc.pipe.EndRequest(id)
   290			if err != nil {
   291				cc.pipe.StartResponse(id)
   292				cc.pipe.EndResponse(id)
   293			} else {
   294				// Remember the pipeline id of this request
   295				cc.lk.Lock()
   296				cc.pipereq[req] = id
   297				cc.lk.Unlock()
   298			}
   299		}()
   300	
   301		cc.lk.Lock()
   302		if cc.re != nil { // no point sending if read-side closed or broken
   303			defer cc.lk.Unlock()
   304			return cc.re
   305		}
   306		if cc.we != nil {
   307			defer cc.lk.Unlock()
   308			return cc.we
   309		}
   310		if cc.c == nil { // connection closed by user in the meantime
   311			defer cc.lk.Unlock()
   312			return os.EBADF
   313		}
   314		c := cc.c
   315		if req.Close {
   316			// We write the EOF to the write-side error, because there
   317			// still might be some pipelined reads
   318			cc.we = ErrPersistEOF
   319		}
   320		cc.lk.Unlock()
   321	
   322		err = cc.writeReq(req, c)
   323		cc.lk.Lock()
   324		defer cc.lk.Unlock()
   325		if err != nil {
   326			cc.we = err
   327			return err
   328		}
   329		cc.nwritten++
   330	
   331		return nil
   332	}
   333	
   334	// Pending returns the number of unanswered requests
   335	// that have been sent on the connection.
   336	func (cc *ClientConn) Pending() int {
   337		cc.lk.Lock()
   338		defer cc.lk.Unlock()
   339		return cc.nwritten - cc.nread
   340	}
   341	
   342	// Read reads the next response from the wire. A valid response might be
   343	// returned together with an ErrPersistEOF, which means that the remote
   344	// requested that this be the last request serviced. Read can be called
   345	// concurrently with Write, but not with another Read.
   346	func (cc *ClientConn) Read(req *Request) (*Response, os.Error) {
   347		return cc.readUsing(req, ReadResponse)
   348	}
   349	
   350	// readUsing is the implementation of Read with a replaceable
   351	// ReadResponse-like function, used by the Transport.
   352	func (cc *ClientConn) readUsing(req *Request, readRes func(*bufio.Reader, *Request) (*Response, os.Error)) (resp *Response, err os.Error) {
   353		// Retrieve the pipeline ID of this request/response pair
   354		cc.lk.Lock()
   355		id, ok := cc.pipereq[req]
   356		cc.pipereq[req] = 0, false
   357		if !ok {
   358			cc.lk.Unlock()
   359			return nil, ErrPipeline
   360		}
   361		cc.lk.Unlock()
   362	
   363		// Ensure pipeline order
   364		cc.pipe.StartResponse(id)
   365		defer cc.pipe.EndResponse(id)
   366	
   367		cc.lk.Lock()
   368		if cc.re != nil {
   369			defer cc.lk.Unlock()
   370			return nil, cc.re
   371		}
   372		if cc.r == nil { // connection closed by user in the meantime
   373			defer cc.lk.Unlock()
   374			return nil, os.EBADF
   375		}
   376		r := cc.r
   377		lastbody := cc.lastbody
   378		cc.lastbody = nil
   379		cc.lk.Unlock()
   380	
   381		// Make sure body is fully consumed, even if user does not call body.Close
   382		if lastbody != nil {
   383			// body.Close is assumed to be idempotent and multiple calls to
   384			// it should return the error that its first invokation
   385			// returned.
   386			err = lastbody.Close()
   387			if err != nil {
   388				cc.lk.Lock()
   389				defer cc.lk.Unlock()
   390				cc.re = err
   391				return nil, err
   392			}
   393		}
   394	
   395		resp, err = readRes(r, req)
   396		cc.lk.Lock()
   397		defer cc.lk.Unlock()
   398		if err != nil {
   399			cc.re = err
   400			return resp, err
   401		}
   402		cc.lastbody = resp.Body
   403	
   404		cc.nread++
   405	
   406		if resp.Close {
   407			cc.re = ErrPersistEOF // don't send any more requests
   408			return resp, cc.re
   409		}
   410		return resp, err
   411	}
   412	
   413	// Do is convenience method that writes a request and reads a response.
   414	func (cc *ClientConn) Do(req *Request) (resp *Response, err os.Error) {
   415		err = cc.Write(req)
   416		if err != nil {
   417			return
   418		}
   419		return cc.Read(req)
   420	}

release.r60.3. Except as noted, this content is licensed under a Creative Commons Attribution 3.0 License.