Source file src/pkg/net/rpc/client.go
1
2
3
4
5 package rpc
6
7 import (
8 "bufio"
9 "encoding/gob"
10 "errors"
11 "io"
12 "log"
13 "net"
14 "net/http"
15 "sync"
16 )
17
18
19
20 type ServerError string
21
22 func (e ServerError) Error() string {
23 return string(e)
24 }
25
26 var ErrShutdown = errors.New("connection is shut down")
27
28
29 type Call struct {
30 ServiceMethod string
31 Args interface{}
32 Reply interface{}
33 Error error
34 Done chan *Call
35 }
36
37
38
39
40
41 type Client struct {
42 mutex sync.Mutex
43 sending sync.Mutex
44 request Request
45 seq uint64
46 codec ClientCodec
47 pending map[uint64]*Call
48 closing bool
49 shutdown bool
50 }
51
52
53
54
55
56
57
58
59
60 type ClientCodec interface {
61 WriteRequest(*Request, interface{}) error
62 ReadResponseHeader(*Response) error
63 ReadResponseBody(interface{}) error
64
65 Close() error
66 }
67
68 func (client *Client) send(call *Call) {
69 client.sending.Lock()
70 defer client.sending.Unlock()
71
72
73 client.mutex.Lock()
74 if client.shutdown || client.closing {
75 call.Error = ErrShutdown
76 client.mutex.Unlock()
77 call.done()
78 return
79 }
80 seq := client.seq
81 client.seq++
82 client.pending[seq] = call
83 client.mutex.Unlock()
84
85
86 client.request.Seq = seq
87 client.request.ServiceMethod = call.ServiceMethod
88 err := client.codec.WriteRequest(&client.request, call.Args)
89 if err != nil {
90 client.mutex.Lock()
91 call = client.pending[seq]
92 delete(client.pending, seq)
93 client.mutex.Unlock()
94 if call != nil {
95 call.Error = err
96 call.done()
97 }
98 }
99 }
100
101 func (client *Client) input() {
102 var err error
103 var response Response
104 for err == nil {
105 response = Response{}
106 err = client.codec.ReadResponseHeader(&response)
107 if err != nil {
108 break
109 }
110 seq := response.Seq
111 client.mutex.Lock()
112 call := client.pending[seq]
113 delete(client.pending, seq)
114 client.mutex.Unlock()
115
116 switch {
117 case call == nil:
118
119
120
121
122
123 err = client.codec.ReadResponseBody(nil)
124 if err != nil {
125 err = errors.New("reading error body: " + err.Error())
126 }
127 case response.Error != "":
128
129
130
131 call.Error = ServerError(response.Error)
132 err = client.codec.ReadResponseBody(nil)
133 if err != nil {
134 err = errors.New("reading error body: " + err.Error())
135 }
136 call.done()
137 default:
138 err = client.codec.ReadResponseBody(call.Reply)
139 if err != nil {
140 call.Error = errors.New("reading body " + err.Error())
141 }
142 call.done()
143 }
144 }
145
146 client.sending.Lock()
147 client.mutex.Lock()
148 client.shutdown = true
149 closing := client.closing
150 if err == io.EOF {
151 if closing {
152 err = ErrShutdown
153 } else {
154 err = io.ErrUnexpectedEOF
155 }
156 }
157 for _, call := range client.pending {
158 call.Error = err
159 call.done()
160 }
161 client.mutex.Unlock()
162 client.sending.Unlock()
163 if err != io.EOF && !closing {
164 log.Println("rpc: client protocol error:", err)
165 }
166 }
167
168 func (call *Call) done() {
169 select {
170 case call.Done <- call:
171
172 default:
173
174
175 log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
176 }
177 }
178
179
180
181
182
183 func NewClient(conn io.ReadWriteCloser) *Client {
184 encBuf := bufio.NewWriter(conn)
185 client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
186 return NewClientWithCodec(client)
187 }
188
189
190
191 func NewClientWithCodec(codec ClientCodec) *Client {
192 client := &Client{
193 codec: codec,
194 pending: make(map[uint64]*Call),
195 }
196 go client.input()
197 return client
198 }
199
200 type gobClientCodec struct {
201 rwc io.ReadWriteCloser
202 dec *gob.Decoder
203 enc *gob.Encoder
204 encBuf *bufio.Writer
205 }
206
207 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
208 if err = c.enc.Encode(r); err != nil {
209 return
210 }
211 if err = c.enc.Encode(body); err != nil {
212 return
213 }
214 return c.encBuf.Flush()
215 }
216
217 func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
218 return c.dec.Decode(r)
219 }
220
221 func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
222 return c.dec.Decode(body)
223 }
224
225 func (c *gobClientCodec) Close() error {
226 return c.rwc.Close()
227 }
228
229
230
231 func DialHTTP(network, address string) (*Client, error) {
232 return DialHTTPPath(network, address, DefaultRPCPath)
233 }
234
235
236
237 func DialHTTPPath(network, address, path string) (*Client, error) {
238 var err error
239 conn, err := net.Dial(network, address)
240 if err != nil {
241 return nil, err
242 }
243 io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
244
245
246
247 resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
248 if err == nil && resp.Status == connected {
249 return NewClient(conn), nil
250 }
251 if err == nil {
252 err = errors.New("unexpected HTTP response: " + resp.Status)
253 }
254 conn.Close()
255 return nil, &net.OpError{
256 Op: "dial-http",
257 Net: network + " " + address,
258 Addr: nil,
259 Err: err,
260 }
261 }
262
263
264 func Dial(network, address string) (*Client, error) {
265 conn, err := net.Dial(network, address)
266 if err != nil {
267 return nil, err
268 }
269 return NewClient(conn), nil
270 }
271
272 func (client *Client) Close() error {
273 client.mutex.Lock()
274 if client.shutdown || client.closing {
275 client.mutex.Unlock()
276 return ErrShutdown
277 }
278 client.closing = true
279 client.mutex.Unlock()
280 return client.codec.Close()
281 }
282
283
284
285
286
287 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
288 call := new(Call)
289 call.ServiceMethod = serviceMethod
290 call.Args = args
291 call.Reply = reply
292 if done == nil {
293 done = make(chan *Call, 10)
294 } else {
295
296
297
298
299 if cap(done) == 0 {
300 log.Panic("rpc: done channel is unbuffered")
301 }
302 }
303 call.Done = done
304 client.send(call)
305 return call
306 }
307
308
309 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
310 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
311 return call.Error
312 }
View as plain text