1 // Copyright 2009 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package rpc 6 7 import ( 8 "bufio" 9 "encoding/gob" 10 "errors" 11 "io" 12 "log" 13 "net" 14 "net/http" 15 "sync" 16 ) 17 18 // ServerError represents an error that has been returned from 19 // the remote side of the RPC connection. 20 type ServerError string 21 22 func (e ServerError) Error() string { 23 return string(e) 24 } 25 26 var ErrShutdown = errors.New("connection is shut down") 27 28 // Call represents an active RPC. 29 type Call struct { 30 ServiceMethod string // The name of the service and method to call. 31 Args interface{} // The argument to the function (*struct). 32 Reply interface{} // The reply from the function (*struct). 33 Error error // After completion, the error status. 34 Done chan *Call // Strobes when call is complete. 35 } 36 37 // Client represents an RPC Client. 38 // There may be multiple outstanding Calls associated 39 // with a single Client, and a Client may be used by 40 // multiple goroutines simultaneously. 41 type Client struct { 42 codec ClientCodec 43 44 reqMutex sync.Mutex // protects following 45 request Request 46 47 mutex sync.Mutex // protects following 48 seq uint64 49 pending map[uint64]*Call 50 closing bool // user has called Close 51 shutdown bool // server has told us to stop 52 } 53 54 // A ClientCodec implements writing of RPC requests and 55 // reading of RPC responses for the client side of an RPC session. 56 // The client calls WriteRequest to write a request to the connection 57 // and calls ReadResponseHeader and ReadResponseBody in pairs 58 // to read responses. The client calls Close when finished with the 59 // connection. ReadResponseBody may be called with a nil 60 // argument to force the body of the response to be read and then 61 // discarded. 62 // See NewClient's comment for information about concurrent access. 63 type ClientCodec interface { 64 WriteRequest(*Request, interface{}) error 65 ReadResponseHeader(*Response) error 66 ReadResponseBody(interface{}) error 67 68 Close() error 69 } 70 71 func (client *Client) send(call *Call) { 72 client.reqMutex.Lock() 73 defer client.reqMutex.Unlock() 74 75 // Register this call. 76 client.mutex.Lock() 77 if client.shutdown || client.closing { 78 client.mutex.Unlock() 79 call.Error = ErrShutdown 80 call.done() 81 return 82 } 83 seq := client.seq 84 client.seq++ 85 client.pending[seq] = call 86 client.mutex.Unlock() 87 88 // Encode and send the request. 89 client.request.Seq = seq 90 client.request.ServiceMethod = call.ServiceMethod 91 err := client.codec.WriteRequest(&client.request, call.Args) 92 if err != nil { 93 client.mutex.Lock() 94 call = client.pending[seq] 95 delete(client.pending, seq) 96 client.mutex.Unlock() 97 if call != nil { 98 call.Error = err 99 call.done() 100 } 101 } 102 } 103 104 func (client *Client) input() { 105 var err error 106 var response Response 107 for err == nil { 108 response = Response{} 109 err = client.codec.ReadResponseHeader(&response) 110 if err != nil { 111 break 112 } 113 seq := response.Seq 114 client.mutex.Lock() 115 call := client.pending[seq] 116 delete(client.pending, seq) 117 client.mutex.Unlock() 118 119 switch { 120 case call == nil: 121 // We've got no pending call. That usually means that 122 // WriteRequest partially failed, and call was already 123 // removed; response is a server telling us about an 124 // error reading request body. We should still attempt 125 // to read error body, but there's no one to give it to. 126 err = client.codec.ReadResponseBody(nil) 127 if err != nil { 128 err = errors.New("reading error body: " + err.Error()) 129 } 130 case response.Error != "": 131 // We've got an error response. Give this to the request; 132 // any subsequent requests will get the ReadResponseBody 133 // error if there is one. 134 call.Error = ServerError(response.Error) 135 err = client.codec.ReadResponseBody(nil) 136 if err != nil { 137 err = errors.New("reading error body: " + err.Error()) 138 } 139 call.done() 140 default: 141 err = client.codec.ReadResponseBody(call.Reply) 142 if err != nil { 143 call.Error = errors.New("reading body " + err.Error()) 144 } 145 call.done() 146 } 147 } 148 // Terminate pending calls. 149 client.reqMutex.Lock() 150 client.mutex.Lock() 151 client.shutdown = true 152 closing := client.closing 153 if err == io.EOF { 154 if closing { 155 err = ErrShutdown 156 } else { 157 err = io.ErrUnexpectedEOF 158 } 159 } 160 for _, call := range client.pending { 161 call.Error = err 162 call.done() 163 } 164 client.mutex.Unlock() 165 client.reqMutex.Unlock() 166 if debugLog && err != io.EOF && !closing { 167 log.Println("rpc: client protocol error:", err) 168 } 169 } 170 171 func (call *Call) done() { 172 select { 173 case call.Done <- call: 174 // ok 175 default: 176 // We don't want to block here. It is the caller's responsibility to make 177 // sure the channel has enough buffer space. See comment in Go(). 178 if debugLog { 179 log.Println("rpc: discarding Call reply due to insufficient Done chan capacity") 180 } 181 } 182 } 183 184 // NewClient returns a new Client to handle requests to the 185 // set of services at the other end of the connection. 186 // It adds a buffer to the write side of the connection so 187 // the header and payload are sent as a unit. 188 // 189 // The read and write halves of the connection are serialized independently, 190 // so no interlocking is required. However each half may be accessed 191 // concurrently so the implementation of conn should protect against 192 // concurrent reads or concurrent writes. 193 func NewClient(conn io.ReadWriteCloser) *Client { 194 encBuf := bufio.NewWriter(conn) 195 client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf} 196 return NewClientWithCodec(client) 197 } 198 199 // NewClientWithCodec is like NewClient but uses the specified 200 // codec to encode requests and decode responses. 201 func NewClientWithCodec(codec ClientCodec) *Client { 202 client := &Client{ 203 codec: codec, 204 pending: make(map[uint64]*Call), 205 } 206 go client.input() 207 return client 208 } 209 210 type gobClientCodec struct { 211 rwc io.ReadWriteCloser 212 dec *gob.Decoder 213 enc *gob.Encoder 214 encBuf *bufio.Writer 215 } 216 217 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) { 218 if err = c.enc.Encode(r); err != nil { 219 return 220 } 221 if err = c.enc.Encode(body); err != nil { 222 return 223 } 224 return c.encBuf.Flush() 225 } 226 227 func (c *gobClientCodec) ReadResponseHeader(r *Response) error { 228 return c.dec.Decode(r) 229 } 230 231 func (c *gobClientCodec) ReadResponseBody(body interface{}) error { 232 return c.dec.Decode(body) 233 } 234 235 func (c *gobClientCodec) Close() error { 236 return c.rwc.Close() 237 } 238 239 // DialHTTP connects to an HTTP RPC server at the specified network address 240 // listening on the default HTTP RPC path. 241 func DialHTTP(network, address string) (*Client, error) { 242 return DialHTTPPath(network, address, DefaultRPCPath) 243 } 244 245 // DialHTTPPath connects to an HTTP RPC server 246 // at the specified network address and path. 247 func DialHTTPPath(network, address, path string) (*Client, error) { 248 var err error 249 conn, err := net.Dial(network, address) 250 if err != nil { 251 return nil, err 252 } 253 io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") 254 255 // Require successful HTTP response 256 // before switching to RPC protocol. 257 resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) 258 if err == nil && resp.Status == connected { 259 return NewClient(conn), nil 260 } 261 if err == nil { 262 err = errors.New("unexpected HTTP response: " + resp.Status) 263 } 264 conn.Close() 265 return nil, &net.OpError{ 266 Op: "dial-http", 267 Net: network + " " + address, 268 Addr: nil, 269 Err: err, 270 } 271 } 272 273 // Dial connects to an RPC server at the specified network address. 274 func Dial(network, address string) (*Client, error) { 275 conn, err := net.Dial(network, address) 276 if err != nil { 277 return nil, err 278 } 279 return NewClient(conn), nil 280 } 281 282 // Close calls the underlying codec's Close method. If the connection is already 283 // shutting down, ErrShutdown is returned. 284 func (client *Client) Close() error { 285 client.mutex.Lock() 286 if client.closing { 287 client.mutex.Unlock() 288 return ErrShutdown 289 } 290 client.closing = true 291 client.mutex.Unlock() 292 return client.codec.Close() 293 } 294 295 // Go invokes the function asynchronously. It returns the Call structure representing 296 // the invocation. The done channel will signal when the call is complete by returning 297 // the same Call object. If done is nil, Go will allocate a new channel. 298 // If non-nil, done must be buffered or Go will deliberately crash. 299 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { 300 call := new(Call) 301 call.ServiceMethod = serviceMethod 302 call.Args = args 303 call.Reply = reply 304 if done == nil { 305 done = make(chan *Call, 10) // buffered. 306 } else { 307 // If caller passes done != nil, it must arrange that 308 // done has enough buffer for the number of simultaneous 309 // RPCs that will be using that channel. If the channel 310 // is totally unbuffered, it's best not to run at all. 311 if cap(done) == 0 { 312 log.Panic("rpc: done channel is unbuffered") 313 } 314 } 315 call.Done = done 316 client.send(call) 317 return call 318 } 319 320 // Call invokes the named function, waits for it to complete, and returns its error status. 321 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { 322 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done 323 return call.Error 324 } 325
View as plain text