Run Format

Source file src/pkg/net/rpc/client.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package rpc
     6	
     7	import (
     8		"bufio"
     9		"encoding/gob"
    10		"errors"
    11		"io"
    12		"log"
    13		"net"
    14		"net/http"
    15		"sync"
    16	)
    17	
    18	// ServerError represents an error that has been returned from
    19	// the remote side of the RPC connection.
    20	type ServerError string
    21	
    22	func (e ServerError) Error() string {
    23		return string(e)
    24	}
    25	
    26	var ErrShutdown = errors.New("connection is shut down")
    27	
    28	// Call represents an active RPC.
    29	type Call struct {
    30		ServiceMethod string      // The name of the service and method to call.
    31		Args          interface{} // The argument to the function (*struct).
    32		Reply         interface{} // The reply from the function (*struct).
    33		Error         error       // After completion, the error status.
    34		Done          chan *Call  // Strobes when call is complete.
    35	}
    36	
    37	// Client represents an RPC Client.
    38	// There may be multiple outstanding Calls associated
    39	// with a single Client, and a Client may be used by
    40	// multiple goroutines simultaneously.
    41	type Client struct {
    42		mutex    sync.Mutex // protects pending, seq, request
    43		sending  sync.Mutex
    44		request  Request
    45		seq      uint64
    46		codec    ClientCodec
    47		pending  map[uint64]*Call
    48		closing  bool
    49		shutdown bool
    50	}
    51	
    52	// A ClientCodec implements writing of RPC requests and
    53	// reading of RPC responses for the client side of an RPC session.
    54	// The client calls WriteRequest to write a request to the connection
    55	// and calls ReadResponseHeader and ReadResponseBody in pairs
    56	// to read responses.  The client calls Close when finished with the
    57	// connection. ReadResponseBody may be called with a nil
    58	// argument to force the body of the response to be read and then
    59	// discarded.
    60	type ClientCodec interface {
    61		WriteRequest(*Request, interface{}) error
    62		ReadResponseHeader(*Response) error
    63		ReadResponseBody(interface{}) error
    64	
    65		Close() error
    66	}
    67	
    68	func (client *Client) send(call *Call) {
    69		client.sending.Lock()
    70		defer client.sending.Unlock()
    71	
    72		// Register this call.
    73		client.mutex.Lock()
    74		if client.shutdown || client.closing {
    75			call.Error = ErrShutdown
    76			client.mutex.Unlock()
    77			call.done()
    78			return
    79		}
    80		seq := client.seq
    81		client.seq++
    82		client.pending[seq] = call
    83		client.mutex.Unlock()
    84	
    85		// Encode and send the request.
    86		client.request.Seq = seq
    87		client.request.ServiceMethod = call.ServiceMethod
    88		err := client.codec.WriteRequest(&client.request, call.Args)
    89		if err != nil {
    90			client.mutex.Lock()
    91			call = client.pending[seq]
    92			delete(client.pending, seq)
    93			client.mutex.Unlock()
    94			if call != nil {
    95				call.Error = err
    96				call.done()
    97			}
    98		}
    99	}
   100	
   101	func (client *Client) input() {
   102		var err error
   103		var response Response
   104		for err == nil {
   105			response = Response{}
   106			err = client.codec.ReadResponseHeader(&response)
   107			if err != nil {
   108				break
   109			}
   110			seq := response.Seq
   111			client.mutex.Lock()
   112			call := client.pending[seq]
   113			delete(client.pending, seq)
   114			client.mutex.Unlock()
   115	
   116			switch {
   117			case call == nil:
   118				// We've got no pending call. That usually means that
   119				// WriteRequest partially failed, and call was already
   120				// removed; response is a server telling us about an
   121				// error reading request body. We should still attempt
   122				// to read error body, but there's no one to give it to.
   123				err = client.codec.ReadResponseBody(nil)
   124				if err != nil {
   125					err = errors.New("reading error body: " + err.Error())
   126				}
   127			case response.Error != "":
   128				// We've got an error response. Give this to the request;
   129				// any subsequent requests will get the ReadResponseBody
   130				// error if there is one.
   131				call.Error = ServerError(response.Error)
   132				err = client.codec.ReadResponseBody(nil)
   133				if err != nil {
   134					err = errors.New("reading error body: " + err.Error())
   135				}
   136				call.done()
   137			default:
   138				err = client.codec.ReadResponseBody(call.Reply)
   139				if err != nil {
   140					call.Error = errors.New("reading body " + err.Error())
   141				}
   142				call.done()
   143			}
   144		}
   145		// Terminate pending calls.
   146		client.sending.Lock()
   147		client.mutex.Lock()
   148		client.shutdown = true
   149		closing := client.closing
   150		if err == io.EOF {
   151			if closing {
   152				err = ErrShutdown
   153			} else {
   154				err = io.ErrUnexpectedEOF
   155			}
   156		}
   157		for _, call := range client.pending {
   158			call.Error = err
   159			call.done()
   160		}
   161		client.mutex.Unlock()
   162		client.sending.Unlock()
   163		if err != io.EOF && !closing {
   164			log.Println("rpc: client protocol error:", err)
   165		}
   166	}
   167	
   168	func (call *Call) done() {
   169		select {
   170		case call.Done <- call:
   171			// ok
   172		default:
   173			// We don't want to block here.  It is the caller's responsibility to make
   174			// sure the channel has enough buffer space. See comment in Go().
   175			log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
   176		}
   177	}
   178	
   179	// NewClient returns a new Client to handle requests to the
   180	// set of services at the other end of the connection.
   181	// It adds a buffer to the write side of the connection so
   182	// the header and payload are sent as a unit.
   183	func NewClient(conn io.ReadWriteCloser) *Client {
   184		encBuf := bufio.NewWriter(conn)
   185		client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
   186		return NewClientWithCodec(client)
   187	}
   188	
   189	// NewClientWithCodec is like NewClient but uses the specified
   190	// codec to encode requests and decode responses.
   191	func NewClientWithCodec(codec ClientCodec) *Client {
   192		client := &Client{
   193			codec:   codec,
   194			pending: make(map[uint64]*Call),
   195		}
   196		go client.input()
   197		return client
   198	}
   199	
   200	type gobClientCodec struct {
   201		rwc    io.ReadWriteCloser
   202		dec    *gob.Decoder
   203		enc    *gob.Encoder
   204		encBuf *bufio.Writer
   205	}
   206	
   207	func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
   208		if err = c.enc.Encode(r); err != nil {
   209			return
   210		}
   211		if err = c.enc.Encode(body); err != nil {
   212			return
   213		}
   214		return c.encBuf.Flush()
   215	}
   216	
   217	func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
   218		return c.dec.Decode(r)
   219	}
   220	
   221	func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
   222		return c.dec.Decode(body)
   223	}
   224	
   225	func (c *gobClientCodec) Close() error {
   226		return c.rwc.Close()
   227	}
   228	
   229	// DialHTTP connects to an HTTP RPC server at the specified network address
   230	// listening on the default HTTP RPC path.
   231	func DialHTTP(network, address string) (*Client, error) {
   232		return DialHTTPPath(network, address, DefaultRPCPath)
   233	}
   234	
   235	// DialHTTPPath connects to an HTTP RPC server
   236	// at the specified network address and path.
   237	func DialHTTPPath(network, address, path string) (*Client, error) {
   238		var err error
   239		conn, err := net.Dial(network, address)
   240		if err != nil {
   241			return nil, err
   242		}
   243		io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
   244	
   245		// Require successful HTTP response
   246		// before switching to RPC protocol.
   247		resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
   248		if err == nil && resp.Status == connected {
   249			return NewClient(conn), nil
   250		}
   251		if err == nil {
   252			err = errors.New("unexpected HTTP response: " + resp.Status)
   253		}
   254		conn.Close()
   255		return nil, &net.OpError{
   256			Op:   "dial-http",
   257			Net:  network + " " + address,
   258			Addr: nil,
   259			Err:  err,
   260		}
   261	}
   262	
   263	// Dial connects to an RPC server at the specified network address.
   264	func Dial(network, address string) (*Client, error) {
   265		conn, err := net.Dial(network, address)
   266		if err != nil {
   267			return nil, err
   268		}
   269		return NewClient(conn), nil
   270	}
   271	
   272	func (client *Client) Close() error {
   273		client.mutex.Lock()
   274		if client.shutdown || client.closing {
   275			client.mutex.Unlock()
   276			return ErrShutdown
   277		}
   278		client.closing = true
   279		client.mutex.Unlock()
   280		return client.codec.Close()
   281	}
   282	
   283	// Go invokes the function asynchronously.  It returns the Call structure representing
   284	// the invocation.  The done channel will signal when the call is complete by returning
   285	// the same Call object.  If done is nil, Go will allocate a new channel.
   286	// If non-nil, done must be buffered or Go will deliberately crash.
   287	func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
   288		call := new(Call)
   289		call.ServiceMethod = serviceMethod
   290		call.Args = args
   291		call.Reply = reply
   292		if done == nil {
   293			done = make(chan *Call, 10) // buffered.
   294		} else {
   295			// If caller passes done != nil, it must arrange that
   296			// done has enough buffer for the number of simultaneous
   297			// RPCs that will be using that channel.  If the channel
   298			// is totally unbuffered, it's best not to run at all.
   299			if cap(done) == 0 {
   300				log.Panic("rpc: done channel is unbuffered")
   301			}
   302		}
   303		call.Done = done
   304		client.send(call)
   305		return call
   306	}
   307	
   308	// Call invokes the named function, waits for it to complete, and returns its error status.
   309	func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
   310		call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
   311		return call.Error
   312	}

View as plain text