Run Format

Source file src/pkg/net/rpc/server.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	/*
     6		Package rpc provides access to the exported methods of an object across a
     7		network or other I/O connection.  A server registers an object, making it visible
     8		as a service with the name of the type of the object.  After registration, exported
     9		methods of the object will be accessible remotely.  A server may register multiple
    10		objects (services) of different types but it is an error to register multiple
    11		objects of the same type.
    12	
    13		Only methods that satisfy these criteria will be made available for remote access;
    14		other methods will be ignored:
    15	
    16			- the method is exported.
    17			- the method has two arguments, both exported (or builtin) types.
    18			- the method's second argument is a pointer.
    19			- the method has return type error.
    20	
    21		In effect, the method must look schematically like
    22	
    23			func (t *T) MethodName(argType T1, replyType *T2) error
    24	
    25		where T, T1 and T2 can be marshaled by encoding/gob.
    26		These requirements apply even if a different codec is used.
    27		(In the future, these requirements may soften for custom codecs.)
    28	
    29		The method's first argument represents the arguments provided by the caller; the
    30		second argument represents the result parameters to be returned to the caller.
    31		The method's return value, if non-nil, is passed back as a string that the client
    32		sees as if created by errors.New.  If an error is returned, the reply parameter
    33		will not be sent back to the client.
    34	
    35		The server may handle requests on a single connection by calling ServeConn.  More
    36		typically it will create a network listener and call Accept or, for an HTTP
    37		listener, HandleHTTP and http.Serve.
    38	
    39		A client wishing to use the service establishes a connection and then invokes
    40		NewClient on the connection.  The convenience function Dial (DialHTTP) performs
    41		both steps for a raw network connection (an HTTP connection).  The resulting
    42		Client object has two methods, Call and Go, that specify the service and method to
    43		call, a pointer containing the arguments, and a pointer to receive the result
    44		parameters.
    45	
    46		The Call method waits for the remote call to complete while the Go method
    47		launches the call asynchronously and signals completion using the Call
    48		structure's Done channel.
    49	
    50		Unless an explicit codec is set up, package encoding/gob is used to
    51		transport the data.
    52	
    53		Here is a simple example.  A server wishes to export an object of type Arith:
    54	
    55			package server
    56	
    57			type Args struct {
    58				A, B int
    59			}
    60	
    61			type Quotient struct {
    62				Quo, Rem int
    63			}
    64	
    65			type Arith int
    66	
    67			func (t *Arith) Multiply(args *Args, reply *int) error {
    68				*reply = args.A * args.B
    69				return nil
    70			}
    71	
    72			func (t *Arith) Divide(args *Args, quo *Quotient) error {
    73				if args.B == 0 {
    74					return errors.New("divide by zero")
    75				}
    76				quo.Quo = args.A / args.B
    77				quo.Rem = args.A % args.B
    78				return nil
    79			}
    80	
    81		The server calls (for HTTP service):
    82	
    83			arith := new(Arith)
    84			rpc.Register(arith)
    85			rpc.HandleHTTP()
    86			l, e := net.Listen("tcp", ":1234")
    87			if e != nil {
    88				log.Fatal("listen error:", e)
    89			}
    90			go http.Serve(l, nil)
    91	
    92		At this point, clients can see a service "Arith" with methods "Arith.Multiply" and
    93		"Arith.Divide".  To invoke one, a client first dials the server:
    94	
    95			client, err := rpc.DialHTTP("tcp", serverAddress + ":1234")
    96			if err != nil {
    97				log.Fatal("dialing:", err)
    98			}
    99	
   100		Then it can make a remote call:
   101	
   102			// Synchronous call
   103			args := &server.Args{7,8}
   104			var reply int
   105			err = client.Call("Arith.Multiply", args, &reply)
   106			if err != nil {
   107				log.Fatal("arith error:", err)
   108			}
   109			fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
   110	
   111		or
   112	
   113			// Asynchronous call
   114			quotient := new(Quotient)
   115			divCall := client.Go("Arith.Divide", args, quotient, nil)
   116			replyCall := <-divCall.Done	// will be equal to divCall
   117			// check errors, print, etc.
   118	
   119		A server implementation will often provide a simple, type-safe wrapper for the
   120		client.
   121	*/
   122	package rpc
   123	
   124	import (
   125		"bufio"
   126		"encoding/gob"
   127		"errors"
   128		"io"
   129		"log"
   130		"net"
   131		"net/http"
   132		"reflect"
   133		"strings"
   134		"sync"
   135		"unicode"
   136		"unicode/utf8"
   137	)
   138	
   139	const (
   140		// Defaults used by HandleHTTP
   141		DefaultRPCPath   = "/_goRPC_"
   142		DefaultDebugPath = "/debug/rpc"
   143	)
   144	
   145	// Precompute the reflect type for error.  Can't use error directly
   146	// because Typeof takes an empty interface value.  This is annoying.
   147	var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
   148	
   149	type methodType struct {
   150		sync.Mutex // protects counters
   151		method     reflect.Method
   152		ArgType    reflect.Type
   153		ReplyType  reflect.Type
   154		numCalls   uint
   155	}
   156	
   157	type service struct {
   158		name   string                 // name of service
   159		rcvr   reflect.Value          // receiver of methods for the service
   160		typ    reflect.Type           // type of the receiver
   161		method map[string]*methodType // registered methods
   162	}
   163	
   164	// Request is a header written before every RPC call.  It is used internally
   165	// but documented here as an aid to debugging, such as when analyzing
   166	// network traffic.
   167	type Request struct {
   168		ServiceMethod string   // format: "Service.Method"
   169		Seq           uint64   // sequence number chosen by client
   170		next          *Request // for free list in Server
   171	}
   172	
   173	// Response is a header written before every RPC return.  It is used internally
   174	// but documented here as an aid to debugging, such as when analyzing
   175	// network traffic.
   176	type Response struct {
   177		ServiceMethod string    // echoes that of the Request
   178		Seq           uint64    // echoes that of the request
   179		Error         string    // error, if any.
   180		next          *Response // for free list in Server
   181	}
   182	
   183	// Server represents an RPC Server.
   184	type Server struct {
   185		mu         sync.RWMutex // protects the serviceMap
   186		serviceMap map[string]*service
   187		reqLock    sync.Mutex // protects freeReq
   188		freeReq    *Request
   189		respLock   sync.Mutex // protects freeResp
   190		freeResp   *Response
   191	}
   192	
   193	// NewServer returns a new Server.
   194	func NewServer() *Server {
   195		return &Server{serviceMap: make(map[string]*service)}
   196	}
   197	
   198	// DefaultServer is the default instance of *Server.
   199	var DefaultServer = NewServer()
   200	
   201	// Is this an exported - upper case - name?
   202	func isExported(name string) bool {
   203		rune, _ := utf8.DecodeRuneInString(name)
   204		return unicode.IsUpper(rune)
   205	}
   206	
   207	// Is this type exported or a builtin?
   208	func isExportedOrBuiltinType(t reflect.Type) bool {
   209		for t.Kind() == reflect.Ptr {
   210			t = t.Elem()
   211		}
   212		// PkgPath will be non-empty even for an exported type,
   213		// so we need to check the type name as well.
   214		return isExported(t.Name()) || t.PkgPath() == ""
   215	}
   216	
   217	// Register publishes in the server the set of methods of the
   218	// receiver value that satisfy the following conditions:
   219	//	- exported method
   220	//	- two arguments, both pointers to exported structs
   221	//	- one return value, of type error
   222	// It returns an error if the receiver is not an exported type or has
   223	// no methods or unsuitable methods. It also logs the error using package log.
   224	// The client accesses each method using a string of the form "Type.Method",
   225	// where Type is the receiver's concrete type.
   226	func (server *Server) Register(rcvr interface{}) error {
   227		return server.register(rcvr, "", false)
   228	}
   229	
   230	// RegisterName is like Register but uses the provided name for the type
   231	// instead of the receiver's concrete type.
   232	func (server *Server) RegisterName(name string, rcvr interface{}) error {
   233		return server.register(rcvr, name, true)
   234	}
   235	
   236	func (server *Server) register(rcvr interface{}, name string, useName bool) error {
   237		server.mu.Lock()
   238		defer server.mu.Unlock()
   239		if server.serviceMap == nil {
   240			server.serviceMap = make(map[string]*service)
   241		}
   242		s := new(service)
   243		s.typ = reflect.TypeOf(rcvr)
   244		s.rcvr = reflect.ValueOf(rcvr)
   245		sname := reflect.Indirect(s.rcvr).Type().Name()
   246		if useName {
   247			sname = name
   248		}
   249		if sname == "" {
   250			log.Fatal("rpc: no service name for type", s.typ.String())
   251		}
   252		if !isExported(sname) && !useName {
   253			s := "rpc Register: type " + sname + " is not exported"
   254			log.Print(s)
   255			return errors.New(s)
   256		}
   257		if _, present := server.serviceMap[sname]; present {
   258			return errors.New("rpc: service already defined: " + sname)
   259		}
   260		s.name = sname
   261		s.method = make(map[string]*methodType)
   262	
   263		// Install the methods
   264		s.method = suitableMethods(s.typ, true)
   265	
   266		if len(s.method) == 0 {
   267			str := ""
   268			// To help the user, see if a pointer receiver would work.
   269			method := suitableMethods(reflect.PtrTo(s.typ), false)
   270			if len(method) != 0 {
   271				str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
   272			} else {
   273				str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
   274			}
   275			log.Print(str)
   276			return errors.New(str)
   277		}
   278		server.serviceMap[s.name] = s
   279		return nil
   280	}
   281	
   282	// suitableMethods returns suitable Rpc methods of typ, it will report
   283	// error using log if reportErr is true.
   284	func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
   285		methods := make(map[string]*methodType)
   286		for m := 0; m < typ.NumMethod(); m++ {
   287			method := typ.Method(m)
   288			mtype := method.Type
   289			mname := method.Name
   290			// Method must be exported.
   291			if method.PkgPath != "" {
   292				continue
   293			}
   294			// Method needs three ins: receiver, *args, *reply.
   295			if mtype.NumIn() != 3 {
   296				if reportErr {
   297					log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
   298				}
   299				continue
   300			}
   301			// First arg need not be a pointer.
   302			argType := mtype.In(1)
   303			if !isExportedOrBuiltinType(argType) {
   304				if reportErr {
   305					log.Println(mname, "argument type not exported:", argType)
   306				}
   307				continue
   308			}
   309			// Second arg must be a pointer.
   310			replyType := mtype.In(2)
   311			if replyType.Kind() != reflect.Ptr {
   312				if reportErr {
   313					log.Println("method", mname, "reply type not a pointer:", replyType)
   314				}
   315				continue
   316			}
   317			// Reply type must be exported.
   318			if !isExportedOrBuiltinType(replyType) {
   319				if reportErr {
   320					log.Println("method", mname, "reply type not exported:", replyType)
   321				}
   322				continue
   323			}
   324			// Method needs one out.
   325			if mtype.NumOut() != 1 {
   326				if reportErr {
   327					log.Println("method", mname, "has wrong number of outs:", mtype.NumOut())
   328				}
   329				continue
   330			}
   331			// The return type of the method must be error.
   332			if returnType := mtype.Out(0); returnType != typeOfError {
   333				if reportErr {
   334					log.Println("method", mname, "returns", returnType.String(), "not error")
   335				}
   336				continue
   337			}
   338			methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
   339		}
   340		return methods
   341	}
   342	
   343	// A value sent as a placeholder for the server's response value when the server
   344	// receives an invalid request. It is never decoded by the client since the Response
   345	// contains an error when it is used.
   346	var invalidRequest = struct{}{}
   347	
   348	func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
   349		resp := server.getResponse()
   350		// Encode the response header
   351		resp.ServiceMethod = req.ServiceMethod
   352		if errmsg != "" {
   353			resp.Error = errmsg
   354			reply = invalidRequest
   355		}
   356		resp.Seq = req.Seq
   357		sending.Lock()
   358		err := codec.WriteResponse(resp, reply)
   359		if err != nil {
   360			log.Println("rpc: writing response:", err)
   361		}
   362		sending.Unlock()
   363		server.freeResponse(resp)
   364	}
   365	
   366	func (m *methodType) NumCalls() (n uint) {
   367		m.Lock()
   368		n = m.numCalls
   369		m.Unlock()
   370		return n
   371	}
   372	
   373	func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
   374		mtype.Lock()
   375		mtype.numCalls++
   376		mtype.Unlock()
   377		function := mtype.method.Func
   378		// Invoke the method, providing a new value for the reply.
   379		returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
   380		// The return value for the method is an error.
   381		errInter := returnValues[0].Interface()
   382		errmsg := ""
   383		if errInter != nil {
   384			errmsg = errInter.(error).Error()
   385		}
   386		server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
   387		server.freeRequest(req)
   388	}
   389	
   390	type gobServerCodec struct {
   391		rwc    io.ReadWriteCloser
   392		dec    *gob.Decoder
   393		enc    *gob.Encoder
   394		encBuf *bufio.Writer
   395	}
   396	
   397	func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
   398		return c.dec.Decode(r)
   399	}
   400	
   401	func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
   402		return c.dec.Decode(body)
   403	}
   404	
   405	func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
   406		if err = c.enc.Encode(r); err != nil {
   407			return
   408		}
   409		if err = c.enc.Encode(body); err != nil {
   410			return
   411		}
   412		return c.encBuf.Flush()
   413	}
   414	
   415	func (c *gobServerCodec) Close() error {
   416		return c.rwc.Close()
   417	}
   418	
   419	// ServeConn runs the server on a single connection.
   420	// ServeConn blocks, serving the connection until the client hangs up.
   421	// The caller typically invokes ServeConn in a go statement.
   422	// ServeConn uses the gob wire format (see package gob) on the
   423	// connection.  To use an alternate codec, use ServeCodec.
   424	func (server *Server) ServeConn(conn io.ReadWriteCloser) {
   425		buf := bufio.NewWriter(conn)
   426		srv := &gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(buf), buf}
   427		server.ServeCodec(srv)
   428	}
   429	
   430	// ServeCodec is like ServeConn but uses the specified codec to
   431	// decode requests and encode responses.
   432	func (server *Server) ServeCodec(codec ServerCodec) {
   433		sending := new(sync.Mutex)
   434		for {
   435			service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
   436			if err != nil {
   437				if err != io.EOF {
   438					log.Println("rpc:", err)
   439				}
   440				if !keepReading {
   441					break
   442				}
   443				// send a response if we actually managed to read a header.
   444				if req != nil {
   445					server.sendResponse(sending, req, invalidRequest, codec, err.Error())
   446					server.freeRequest(req)
   447				}
   448				continue
   449			}
   450			go service.call(server, sending, mtype, req, argv, replyv, codec)
   451		}
   452		codec.Close()
   453	}
   454	
   455	// ServeRequest is like ServeCodec but synchronously serves a single request.
   456	// It does not close the codec upon completion.
   457	func (server *Server) ServeRequest(codec ServerCodec) error {
   458		sending := new(sync.Mutex)
   459		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
   460		if err != nil {
   461			if !keepReading {
   462				return err
   463			}
   464			// send a response if we actually managed to read a header.
   465			if req != nil {
   466				server.sendResponse(sending, req, invalidRequest, codec, err.Error())
   467				server.freeRequest(req)
   468			}
   469			return err
   470		}
   471		service.call(server, sending, mtype, req, argv, replyv, codec)
   472		return nil
   473	}
   474	
   475	func (server *Server) getRequest() *Request {
   476		server.reqLock.Lock()
   477		req := server.freeReq
   478		if req == nil {
   479			req = new(Request)
   480		} else {
   481			server.freeReq = req.next
   482			*req = Request{}
   483		}
   484		server.reqLock.Unlock()
   485		return req
   486	}
   487	
   488	func (server *Server) freeRequest(req *Request) {
   489		server.reqLock.Lock()
   490		req.next = server.freeReq
   491		server.freeReq = req
   492		server.reqLock.Unlock()
   493	}
   494	
   495	func (server *Server) getResponse() *Response {
   496		server.respLock.Lock()
   497		resp := server.freeResp
   498		if resp == nil {
   499			resp = new(Response)
   500		} else {
   501			server.freeResp = resp.next
   502			*resp = Response{}
   503		}
   504		server.respLock.Unlock()
   505		return resp
   506	}
   507	
   508	func (server *Server) freeResponse(resp *Response) {
   509		server.respLock.Lock()
   510		resp.next = server.freeResp
   511		server.freeResp = resp
   512		server.respLock.Unlock()
   513	}
   514	
   515	func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
   516		service, mtype, req, keepReading, err = server.readRequestHeader(codec)
   517		if err != nil {
   518			if !keepReading {
   519				return
   520			}
   521			// discard body
   522			codec.ReadRequestBody(nil)
   523			return
   524		}
   525	
   526		// Decode the argument value.
   527		argIsValue := false // if true, need to indirect before calling.
   528		if mtype.ArgType.Kind() == reflect.Ptr {
   529			argv = reflect.New(mtype.ArgType.Elem())
   530		} else {
   531			argv = reflect.New(mtype.ArgType)
   532			argIsValue = true
   533		}
   534		// argv guaranteed to be a pointer now.
   535		if err = codec.ReadRequestBody(argv.Interface()); err != nil {
   536			return
   537		}
   538		if argIsValue {
   539			argv = argv.Elem()
   540		}
   541	
   542		replyv = reflect.New(mtype.ReplyType.Elem())
   543		return
   544	}
   545	
   546	func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mtype *methodType, req *Request, keepReading bool, err error) {
   547		// Grab the request header.
   548		req = server.getRequest()
   549		err = codec.ReadRequestHeader(req)
   550		if err != nil {
   551			req = nil
   552			if err == io.EOF || err == io.ErrUnexpectedEOF {
   553				return
   554			}
   555			err = errors.New("rpc: server cannot decode request: " + err.Error())
   556			return
   557		}
   558	
   559		// We read the header successfully.  If we see an error now,
   560		// we can still recover and move on to the next request.
   561		keepReading = true
   562	
   563		serviceMethod := strings.Split(req.ServiceMethod, ".")
   564		if len(serviceMethod) != 2 {
   565			err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
   566			return
   567		}
   568		// Look up the request.
   569		server.mu.RLock()
   570		service = server.serviceMap[serviceMethod[0]]
   571		server.mu.RUnlock()
   572		if service == nil {
   573			err = errors.New("rpc: can't find service " + req.ServiceMethod)
   574			return
   575		}
   576		mtype = service.method[serviceMethod[1]]
   577		if mtype == nil {
   578			err = errors.New("rpc: can't find method " + req.ServiceMethod)
   579		}
   580		return
   581	}
   582	
   583	// Accept accepts connections on the listener and serves requests
   584	// for each incoming connection.  Accept blocks; the caller typically
   585	// invokes it in a go statement.
   586	func (server *Server) Accept(lis net.Listener) {
   587		for {
   588			conn, err := lis.Accept()
   589			if err != nil {
   590				log.Fatal("rpc.Serve: accept:", err.Error()) // TODO(r): exit?
   591			}
   592			go server.ServeConn(conn)
   593		}
   594	}
   595	
   596	// Register publishes the receiver's methods in the DefaultServer.
   597	func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
   598	
   599	// RegisterName is like Register but uses the provided name for the type
   600	// instead of the receiver's concrete type.
   601	func RegisterName(name string, rcvr interface{}) error {
   602		return DefaultServer.RegisterName(name, rcvr)
   603	}
   604	
   605	// A ServerCodec implements reading of RPC requests and writing of
   606	// RPC responses for the server side of an RPC session.
   607	// The server calls ReadRequestHeader and ReadRequestBody in pairs
   608	// to read requests from the connection, and it calls WriteResponse to
   609	// write a response back.  The server calls Close when finished with the
   610	// connection. ReadRequestBody may be called with a nil
   611	// argument to force the body of the request to be read and discarded.
   612	type ServerCodec interface {
   613		ReadRequestHeader(*Request) error
   614		ReadRequestBody(interface{}) error
   615		WriteResponse(*Response, interface{}) error
   616	
   617		Close() error
   618	}
   619	
   620	// ServeConn runs the DefaultServer on a single connection.
   621	// ServeConn blocks, serving the connection until the client hangs up.
   622	// The caller typically invokes ServeConn in a go statement.
   623	// ServeConn uses the gob wire format (see package gob) on the
   624	// connection.  To use an alternate codec, use ServeCodec.
   625	func ServeConn(conn io.ReadWriteCloser) {
   626		DefaultServer.ServeConn(conn)
   627	}
   628	
   629	// ServeCodec is like ServeConn but uses the specified codec to
   630	// decode requests and encode responses.
   631	func ServeCodec(codec ServerCodec) {
   632		DefaultServer.ServeCodec(codec)
   633	}
   634	
   635	// ServeRequest is like ServeCodec but synchronously serves a single request.
   636	// It does not close the codec upon completion.
   637	func ServeRequest(codec ServerCodec) error {
   638		return DefaultServer.ServeRequest(codec)
   639	}
   640	
   641	// Accept accepts connections on the listener and serves requests
   642	// to DefaultServer for each incoming connection.
   643	// Accept blocks; the caller typically invokes it in a go statement.
   644	func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
   645	
   646	// Can connect to RPC service using HTTP CONNECT to rpcPath.
   647	var connected = "200 Connected to Go RPC"
   648	
   649	// ServeHTTP implements an http.Handler that answers RPC requests.
   650	func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
   651		if req.Method != "CONNECT" {
   652			w.Header().Set("Content-Type", "text/plain; charset=utf-8")
   653			w.WriteHeader(http.StatusMethodNotAllowed)
   654			io.WriteString(w, "405 must CONNECT\n")
   655			return
   656		}
   657		conn, _, err := w.(http.Hijacker).Hijack()
   658		if err != nil {
   659			log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
   660			return
   661		}
   662		io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
   663		server.ServeConn(conn)
   664	}
   665	
   666	// HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
   667	// and a debugging handler on debugPath.
   668	// It is still necessary to invoke http.Serve(), typically in a go statement.
   669	func (server *Server) HandleHTTP(rpcPath, debugPath string) {
   670		http.Handle(rpcPath, server)
   671		http.Handle(debugPath, debugHTTP{server})
   672	}
   673	
   674	// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
   675	// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
   676	// It is still necessary to invoke http.Serve(), typically in a go statement.
   677	func HandleHTTP() {
   678		DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
   679	}

View as plain text