...
Run Format

Source file src/pkg/net/rpc/client.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package rpc
     6	
     7	import (
     8		"bufio"
     9		"encoding/gob"
    10		"errors"
    11		"io"
    12		"log"
    13		"net"
    14		"net/http"
    15		"sync"
    16	)
    17	
    18	// ServerError represents an error that has been returned from
    19	// the remote side of the RPC connection.
    20	type ServerError string
    21	
    22	func (e ServerError) Error() string {
    23		return string(e)
    24	}
    25	
    26	var ErrShutdown = errors.New("connection is shut down")
    27	
    28	// Call represents an active RPC.
    29	type Call struct {
    30		ServiceMethod string      // The name of the service and method to call.
    31		Args          interface{} // The argument to the function (*struct).
    32		Reply         interface{} // The reply from the function (*struct).
    33		Error         error       // After completion, the error status.
    34		Done          chan *Call  // Strobes when call is complete.
    35	}
    36	
    37	// Client represents an RPC Client.
    38	// There may be multiple outstanding Calls associated
    39	// with a single Client, and a Client may be used by
    40	// multiple goroutines simultaneously.
    41	type Client struct {
    42		codec ClientCodec
    43	
    44		sending sync.Mutex
    45	
    46		mutex    sync.Mutex // protects following
    47		request  Request
    48		seq      uint64
    49		pending  map[uint64]*Call
    50		closing  bool // user has called Close
    51		shutdown bool // server has told us to stop
    52	}
    53	
    54	// A ClientCodec implements writing of RPC requests and
    55	// reading of RPC responses for the client side of an RPC session.
    56	// The client calls WriteRequest to write a request to the connection
    57	// and calls ReadResponseHeader and ReadResponseBody in pairs
    58	// to read responses.  The client calls Close when finished with the
    59	// connection. ReadResponseBody may be called with a nil
    60	// argument to force the body of the response to be read and then
    61	// discarded.
    62	type ClientCodec interface {
    63		// WriteRequest must be safe for concurrent use by multiple goroutines.
    64		WriteRequest(*Request, interface{}) error
    65		ReadResponseHeader(*Response) error
    66		ReadResponseBody(interface{}) error
    67	
    68		Close() error
    69	}
    70	
    71	func (client *Client) send(call *Call) {
    72		client.sending.Lock()
    73		defer client.sending.Unlock()
    74	
    75		// Register this call.
    76		client.mutex.Lock()
    77		if client.shutdown || client.closing {
    78			call.Error = ErrShutdown
    79			client.mutex.Unlock()
    80			call.done()
    81			return
    82		}
    83		seq := client.seq
    84		client.seq++
    85		client.pending[seq] = call
    86		client.mutex.Unlock()
    87	
    88		// Encode and send the request.
    89		client.request.Seq = seq
    90		client.request.ServiceMethod = call.ServiceMethod
    91		err := client.codec.WriteRequest(&client.request, call.Args)
    92		if err != nil {
    93			client.mutex.Lock()
    94			call = client.pending[seq]
    95			delete(client.pending, seq)
    96			client.mutex.Unlock()
    97			if call != nil {
    98				call.Error = err
    99				call.done()
   100			}
   101		}
   102	}
   103	
   104	func (client *Client) input() {
   105		var err error
   106		var response Response
   107		for err == nil {
   108			response = Response{}
   109			err = client.codec.ReadResponseHeader(&response)
   110			if err != nil {
   111				break
   112			}
   113			seq := response.Seq
   114			client.mutex.Lock()
   115			call := client.pending[seq]
   116			delete(client.pending, seq)
   117			client.mutex.Unlock()
   118	
   119			switch {
   120			case call == nil:
   121				// We've got no pending call. That usually means that
   122				// WriteRequest partially failed, and call was already
   123				// removed; response is a server telling us about an
   124				// error reading request body. We should still attempt
   125				// to read error body, but there's no one to give it to.
   126				err = client.codec.ReadResponseBody(nil)
   127				if err != nil {
   128					err = errors.New("reading error body: " + err.Error())
   129				}
   130			case response.Error != "":
   131				// We've got an error response. Give this to the request;
   132				// any subsequent requests will get the ReadResponseBody
   133				// error if there is one.
   134				call.Error = ServerError(response.Error)
   135				err = client.codec.ReadResponseBody(nil)
   136				if err != nil {
   137					err = errors.New("reading error body: " + err.Error())
   138				}
   139				call.done()
   140			default:
   141				err = client.codec.ReadResponseBody(call.Reply)
   142				if err != nil {
   143					call.Error = errors.New("reading body " + err.Error())
   144				}
   145				call.done()
   146			}
   147		}
   148		// Terminate pending calls.
   149		client.sending.Lock()
   150		client.mutex.Lock()
   151		client.shutdown = true
   152		closing := client.closing
   153		if err == io.EOF {
   154			if closing {
   155				err = ErrShutdown
   156			} else {
   157				err = io.ErrUnexpectedEOF
   158			}
   159		}
   160		for _, call := range client.pending {
   161			call.Error = err
   162			call.done()
   163		}
   164		client.mutex.Unlock()
   165		client.sending.Unlock()
   166		if debugLog && err != io.EOF && !closing {
   167			log.Println("rpc: client protocol error:", err)
   168		}
   169	}
   170	
   171	func (call *Call) done() {
   172		select {
   173		case call.Done <- call:
   174			// ok
   175		default:
   176			// We don't want to block here.  It is the caller's responsibility to make
   177			// sure the channel has enough buffer space. See comment in Go().
   178			if debugLog {
   179				log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
   180			}
   181		}
   182	}
   183	
   184	// NewClient returns a new Client to handle requests to the
   185	// set of services at the other end of the connection.
   186	// It adds a buffer to the write side of the connection so
   187	// the header and payload are sent as a unit.
   188	func NewClient(conn io.ReadWriteCloser) *Client {
   189		encBuf := bufio.NewWriter(conn)
   190		client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
   191		return NewClientWithCodec(client)
   192	}
   193	
   194	// NewClientWithCodec is like NewClient but uses the specified
   195	// codec to encode requests and decode responses.
   196	func NewClientWithCodec(codec ClientCodec) *Client {
   197		client := &Client{
   198			codec:   codec,
   199			pending: make(map[uint64]*Call),
   200		}
   201		go client.input()
   202		return client
   203	}
   204	
   205	type gobClientCodec struct {
   206		rwc    io.ReadWriteCloser
   207		dec    *gob.Decoder
   208		enc    *gob.Encoder
   209		encBuf *bufio.Writer
   210	}
   211	
   212	func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
   213		if err = c.enc.Encode(r); err != nil {
   214			return
   215		}
   216		if err = c.enc.Encode(body); err != nil {
   217			return
   218		}
   219		return c.encBuf.Flush()
   220	}
   221	
   222	func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
   223		return c.dec.Decode(r)
   224	}
   225	
   226	func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
   227		return c.dec.Decode(body)
   228	}
   229	
   230	func (c *gobClientCodec) Close() error {
   231		return c.rwc.Close()
   232	}
   233	
   234	// DialHTTP connects to an HTTP RPC server at the specified network address
   235	// listening on the default HTTP RPC path.
   236	func DialHTTP(network, address string) (*Client, error) {
   237		return DialHTTPPath(network, address, DefaultRPCPath)
   238	}
   239	
   240	// DialHTTPPath connects to an HTTP RPC server
   241	// at the specified network address and path.
   242	func DialHTTPPath(network, address, path string) (*Client, error) {
   243		var err error
   244		conn, err := net.Dial(network, address)
   245		if err != nil {
   246			return nil, err
   247		}
   248		io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
   249	
   250		// Require successful HTTP response
   251		// before switching to RPC protocol.
   252		resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
   253		if err == nil && resp.Status == connected {
   254			return NewClient(conn), nil
   255		}
   256		if err == nil {
   257			err = errors.New("unexpected HTTP response: " + resp.Status)
   258		}
   259		conn.Close()
   260		return nil, &net.OpError{
   261			Op:   "dial-http",
   262			Net:  network + " " + address,
   263			Addr: nil,
   264			Err:  err,
   265		}
   266	}
   267	
   268	// Dial connects to an RPC server at the specified network address.
   269	func Dial(network, address string) (*Client, error) {
   270		conn, err := net.Dial(network, address)
   271		if err != nil {
   272			return nil, err
   273		}
   274		return NewClient(conn), nil
   275	}
   276	
   277	func (client *Client) Close() error {
   278		client.mutex.Lock()
   279		if client.closing {
   280			client.mutex.Unlock()
   281			return ErrShutdown
   282		}
   283		client.closing = true
   284		client.mutex.Unlock()
   285		return client.codec.Close()
   286	}
   287	
   288	// Go invokes the function asynchronously.  It returns the Call structure representing
   289	// the invocation.  The done channel will signal when the call is complete by returning
   290	// the same Call object.  If done is nil, Go will allocate a new channel.
   291	// If non-nil, done must be buffered or Go will deliberately crash.
   292	func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
   293		call := new(Call)
   294		call.ServiceMethod = serviceMethod
   295		call.Args = args
   296		call.Reply = reply
   297		if done == nil {
   298			done = make(chan *Call, 10) // buffered.
   299		} else {
   300			// If caller passes done != nil, it must arrange that
   301			// done has enough buffer for the number of simultaneous
   302			// RPCs that will be using that channel.  If the channel
   303			// is totally unbuffered, it's best not to run at all.
   304			if cap(done) == 0 {
   305				log.Panic("rpc: done channel is unbuffered")
   306			}
   307		}
   308		call.Done = done
   309		client.send(call)
   310		return call
   311	}
   312	
   313	// Call invokes the named function, waits for it to complete, and returns its error status.
   314	func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
   315		call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
   316		return call.Error
   317	}
   318	

View as plain text