Run Format

Source file src/pkg/net/rpc/client.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package rpc
     6	
     7	import (
     8		"bufio"
     9		"encoding/gob"
    10		"errors"
    11		"io"
    12		"log"
    13		"net"
    14		"net/http"
    15		"sync"
    16	)
    17	
    18	// ServerError represents an error that has been returned from
    19	// the remote side of the RPC connection.
    20	type ServerError string
    21	
    22	func (e ServerError) Error() string {
    23		return string(e)
    24	}
    25	
    26	var ErrShutdown = errors.New("connection is shut down")
    27	
    28	// Call represents an active RPC.
    29	type Call struct {
    30		ServiceMethod string      // The name of the service and method to call.
    31		Args          interface{} // The argument to the function (*struct).
    32		Reply         interface{} // The reply from the function (*struct).
    33		Error         error       // After completion, the error status.
    34		Done          chan *Call  // Strobes when call is complete.
    35	}
    36	
    37	// Client represents an RPC Client.
    38	// There may be multiple outstanding Calls associated
    39	// with a single Client, and a Client may be used by
    40	// multiple goroutines simultaneously.
    41	type Client struct {
    42		mutex    sync.Mutex // protects pending, seq, request
    43		sending  sync.Mutex
    44		request  Request
    45		seq      uint64
    46		codec    ClientCodec
    47		pending  map[uint64]*Call
    48		closing  bool
    49		shutdown bool
    50	}
    51	
    52	// A ClientCodec implements writing of RPC requests and
    53	// reading of RPC responses for the client side of an RPC session.
    54	// The client calls WriteRequest to write a request to the connection
    55	// and calls ReadResponseHeader and ReadResponseBody in pairs
    56	// to read responses.  The client calls Close when finished with the
    57	// connection. ReadResponseBody may be called with a nil
    58	// argument to force the body of the response to be read and then
    59	// discarded.
    60	type ClientCodec interface {
    61		// WriteRequest must be safe for concurrent use by multiple goroutines.
    62		WriteRequest(*Request, interface{}) error
    63		ReadResponseHeader(*Response) error
    64		ReadResponseBody(interface{}) error
    65	
    66		Close() error
    67	}
    68	
    69	func (client *Client) send(call *Call) {
    70		client.sending.Lock()
    71		defer client.sending.Unlock()
    72	
    73		// Register this call.
    74		client.mutex.Lock()
    75		if client.shutdown || client.closing {
    76			call.Error = ErrShutdown
    77			client.mutex.Unlock()
    78			call.done()
    79			return
    80		}
    81		seq := client.seq
    82		client.seq++
    83		client.pending[seq] = call
    84		client.mutex.Unlock()
    85	
    86		// Encode and send the request.
    87		client.request.Seq = seq
    88		client.request.ServiceMethod = call.ServiceMethod
    89		err := client.codec.WriteRequest(&client.request, call.Args)
    90		if err != nil {
    91			client.mutex.Lock()
    92			call = client.pending[seq]
    93			delete(client.pending, seq)
    94			client.mutex.Unlock()
    95			if call != nil {
    96				call.Error = err
    97				call.done()
    98			}
    99		}
   100	}
   101	
   102	func (client *Client) input() {
   103		var err error
   104		var response Response
   105		for err == nil {
   106			response = Response{}
   107			err = client.codec.ReadResponseHeader(&response)
   108			if err != nil {
   109				break
   110			}
   111			seq := response.Seq
   112			client.mutex.Lock()
   113			call := client.pending[seq]
   114			delete(client.pending, seq)
   115			client.mutex.Unlock()
   116	
   117			switch {
   118			case call == nil:
   119				// We've got no pending call. That usually means that
   120				// WriteRequest partially failed, and call was already
   121				// removed; response is a server telling us about an
   122				// error reading request body. We should still attempt
   123				// to read error body, but there's no one to give it to.
   124				err = client.codec.ReadResponseBody(nil)
   125				if err != nil {
   126					err = errors.New("reading error body: " + err.Error())
   127				}
   128			case response.Error != "":
   129				// We've got an error response. Give this to the request;
   130				// any subsequent requests will get the ReadResponseBody
   131				// error if there is one.
   132				call.Error = ServerError(response.Error)
   133				err = client.codec.ReadResponseBody(nil)
   134				if err != nil {
   135					err = errors.New("reading error body: " + err.Error())
   136				}
   137				call.done()
   138			default:
   139				err = client.codec.ReadResponseBody(call.Reply)
   140				if err != nil {
   141					call.Error = errors.New("reading body " + err.Error())
   142				}
   143				call.done()
   144			}
   145		}
   146		// Terminate pending calls.
   147		client.sending.Lock()
   148		client.mutex.Lock()
   149		client.shutdown = true
   150		closing := client.closing
   151		if err == io.EOF {
   152			if closing {
   153				err = ErrShutdown
   154			} else {
   155				err = io.ErrUnexpectedEOF
   156			}
   157		}
   158		for _, call := range client.pending {
   159			call.Error = err
   160			call.done()
   161		}
   162		client.mutex.Unlock()
   163		client.sending.Unlock()
   164		if debugLog && err != io.EOF && !closing {
   165			log.Println("rpc: client protocol error:", err)
   166		}
   167	}
   168	
   169	func (call *Call) done() {
   170		select {
   171		case call.Done <- call:
   172			// ok
   173		default:
   174			// We don't want to block here.  It is the caller's responsibility to make
   175			// sure the channel has enough buffer space. See comment in Go().
   176			if debugLog {
   177				log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
   178			}
   179		}
   180	}
   181	
   182	// NewClient returns a new Client to handle requests to the
   183	// set of services at the other end of the connection.
   184	// It adds a buffer to the write side of the connection so
   185	// the header and payload are sent as a unit.
   186	func NewClient(conn io.ReadWriteCloser) *Client {
   187		encBuf := bufio.NewWriter(conn)
   188		client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
   189		return NewClientWithCodec(client)
   190	}
   191	
   192	// NewClientWithCodec is like NewClient but uses the specified
   193	// codec to encode requests and decode responses.
   194	func NewClientWithCodec(codec ClientCodec) *Client {
   195		client := &Client{
   196			codec:   codec,
   197			pending: make(map[uint64]*Call),
   198		}
   199		go client.input()
   200		return client
   201	}
   202	
   203	type gobClientCodec struct {
   204		rwc    io.ReadWriteCloser
   205		dec    *gob.Decoder
   206		enc    *gob.Encoder
   207		encBuf *bufio.Writer
   208	}
   209	
   210	func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
   211		if err = c.enc.Encode(r); err != nil {
   212			return
   213		}
   214		if err = c.enc.Encode(body); err != nil {
   215			return
   216		}
   217		return c.encBuf.Flush()
   218	}
   219	
   220	func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
   221		return c.dec.Decode(r)
   222	}
   223	
   224	func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
   225		return c.dec.Decode(body)
   226	}
   227	
   228	func (c *gobClientCodec) Close() error {
   229		return c.rwc.Close()
   230	}
   231	
   232	// DialHTTP connects to an HTTP RPC server at the specified network address
   233	// listening on the default HTTP RPC path.
   234	func DialHTTP(network, address string) (*Client, error) {
   235		return DialHTTPPath(network, address, DefaultRPCPath)
   236	}
   237	
   238	// DialHTTPPath connects to an HTTP RPC server
   239	// at the specified network address and path.
   240	func DialHTTPPath(network, address, path string) (*Client, error) {
   241		var err error
   242		conn, err := net.Dial(network, address)
   243		if err != nil {
   244			return nil, err
   245		}
   246		io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
   247	
   248		// Require successful HTTP response
   249		// before switching to RPC protocol.
   250		resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
   251		if err == nil && resp.Status == connected {
   252			return NewClient(conn), nil
   253		}
   254		if err == nil {
   255			err = errors.New("unexpected HTTP response: " + resp.Status)
   256		}
   257		conn.Close()
   258		return nil, &net.OpError{
   259			Op:   "dial-http",
   260			Net:  network + " " + address,
   261			Addr: nil,
   262			Err:  err,
   263		}
   264	}
   265	
   266	// Dial connects to an RPC server at the specified network address.
   267	func Dial(network, address string) (*Client, error) {
   268		conn, err := net.Dial(network, address)
   269		if err != nil {
   270			return nil, err
   271		}
   272		return NewClient(conn), nil
   273	}
   274	
   275	func (client *Client) Close() error {
   276		client.mutex.Lock()
   277		if client.shutdown || client.closing {
   278			client.mutex.Unlock()
   279			return ErrShutdown
   280		}
   281		client.closing = true
   282		client.mutex.Unlock()
   283		return client.codec.Close()
   284	}
   285	
   286	// Go invokes the function asynchronously.  It returns the Call structure representing
   287	// the invocation.  The done channel will signal when the call is complete by returning
   288	// the same Call object.  If done is nil, Go will allocate a new channel.
   289	// If non-nil, done must be buffered or Go will deliberately crash.
   290	func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
   291		call := new(Call)
   292		call.ServiceMethod = serviceMethod
   293		call.Args = args
   294		call.Reply = reply
   295		if done == nil {
   296			done = make(chan *Call, 10) // buffered.
   297		} else {
   298			// If caller passes done != nil, it must arrange that
   299			// done has enough buffer for the number of simultaneous
   300			// RPCs that will be using that channel.  If the channel
   301			// is totally unbuffered, it's best not to run at all.
   302			if cap(done) == 0 {
   303				log.Panic("rpc: done channel is unbuffered")
   304			}
   305		}
   306		call.Done = done
   307		client.send(call)
   308		return call
   309	}
   310	
   311	// Call invokes the named function, waits for it to complete, and returns its error status.
   312	func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
   313		call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
   314		return call.Error
   315	}

View as plain text