1
2
3
4
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
20 package netchan
21
22
23
24 import (
25 "log"
26 "io"
27 "net"
28 "os"
29 "reflect"
30 "strconv"
31 "sync"
32 )
33
34
35
36
37 func expLog(args ...interface{}) {
38 args[0] = "netchan export: " + args[0].(string)
39 log.Print(args...)
40 }
41
42
43
44
45 type Exporter struct {
46 *clientSet
47 }
48
49 type expClient struct {
50 *encDec
51 exp *Exporter
52 chans map[int]*netChan
53 mu sync.Mutex
54 errored bool
55 seqNum int64
56 ackNum int64
57 seqLock sync.Mutex
58 }
59
60 func newClient(exp *Exporter, conn io.ReadWriter) *expClient {
61 client := new(expClient)
62 client.exp = exp
63 client.encDec = newEncDec(conn)
64 client.seqNum = 0
65 client.ackNum = 0
66 client.chans = make(map[int]*netChan)
67 return client
68 }
69
70 func (client *expClient) sendError(hdr *header, err string) {
71 error := &error{err}
72 expLog("sending error to client:", error.Error)
73 client.encode(hdr, payError, error)
74 client.mu.Lock()
75 client.errored = true
76 client.mu.Unlock()
77 }
78
79 func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan {
80 exp := client.exp
81 exp.mu.Lock()
82 ech, ok := exp.names[name]
83 exp.mu.Unlock()
84 if !ok {
85 client.sendError(hdr, "no such channel: "+name)
86 return nil
87 }
88 if ech.dir != dir {
89 client.sendError(hdr, "wrong direction for channel: "+name)
90 return nil
91 }
92 nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count)
93 client.chans[hdr.Id] = nch
94 return nch
95 }
96
97 func (client *expClient) getChan(hdr *header, dir Dir) *netChan {
98 nch := client.chans[hdr.Id]
99 if nch == nil {
100 return nil
101 }
102 if nch.dir != dir {
103 client.sendError(hdr, "wrong direction for channel: "+nch.name)
104 }
105 return nch
106 }
107
108
109
110
111
112 func (client *expClient) run() {
113 hdr := new(header)
114 hdrValue := reflect.ValueOf(hdr)
115 req := new(request)
116 reqValue := reflect.ValueOf(req)
117 error := new(error)
118 for {
119 *hdr = header{}
120 if err := client.decode(hdrValue); err != nil {
121 if err != os.EOF {
122 expLog("error decoding client header:", err)
123 }
124 break
125 }
126 switch hdr.PayloadType {
127 case payRequest:
128 *req = request{}
129 if err := client.decode(reqValue); err != nil {
130 expLog("error decoding client request:", err)
131 break
132 }
133 if req.Size < 1 {
134 panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values")
135 }
136 switch req.Dir {
137 case Recv:
138
139
140 if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil {
141 go client.serveRecv(nch, *hdr, req.Count)
142 }
143 case Send:
144 client.newChan(hdr, Recv, req.Name, req.Size, req.Count)
145
146
147 default:
148 error.Error = "request: can't handle channel direction"
149 expLog(error.Error, req.Dir)
150 client.encode(hdr, payError, error)
151 }
152 case payData:
153 client.serveSend(*hdr)
154 case payClosed:
155 client.serveClosed(*hdr)
156 case payAck:
157 client.mu.Lock()
158 if client.ackNum != hdr.SeqNum-1 {
159
160
161
162
163 expLog("sequence out of order:", client.ackNum, hdr.SeqNum)
164 }
165 if client.ackNum < hdr.SeqNum {
166 client.ackNum = hdr.SeqNum
167 }
168 client.mu.Unlock()
169 case payAckSend:
170 if nch := client.getChan(hdr, Send); nch != nil {
171 nch.acked()
172 }
173 default:
174 log.Fatal("netchan export: unknown payload type", hdr.PayloadType)
175 }
176 }
177 client.exp.delClient(client)
178 }
179
180
181
182 func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) {
183 for {
184 val, ok := nch.recv()
185 if !ok {
186 if err := client.encode(&hdr, payClosed, nil); err != nil {
187 expLog("error encoding server closed message:", err)
188 }
189 break
190 }
191
192
193
194
195 client.mu.Lock()
196 client.seqNum++
197 hdr.SeqNum = client.seqNum
198 client.seqLock.Lock()
199 client.mu.Unlock()
200 err := client.encode(&hdr, payData, val.Interface())
201 client.seqLock.Unlock()
202 if err != nil {
203 expLog("error encoding client response:", err)
204 client.sendError(&hdr, err.String())
205 break
206 }
207
208 if count >= 0 {
209 if count--; count <= 0 {
210 break
211 }
212 }
213 }
214 }
215
216
217
218 func (client *expClient) serveSend(hdr header) {
219 nch := client.getChan(&hdr, Recv)
220 if nch == nil {
221 return
222 }
223
224 val := reflect.New(nch.ch.Type().Elem()).Elem()
225 if err := client.decode(val); err != nil {
226 expLog("value decode:", err, "; type ", nch.ch.Type())
227 return
228 }
229 nch.send(val)
230 }
231
232
233
234 func (client *expClient) serveClosed(hdr header) {
235 nch := client.getChan(&hdr, Recv)
236 if nch == nil {
237 return
238 }
239 nch.close()
240 }
241
242 func (client *expClient) unackedCount() int64 {
243 client.mu.Lock()
244 n := client.seqNum - client.ackNum
245 client.mu.Unlock()
246 return n
247 }
248
249 func (client *expClient) seq() int64 {
250 client.mu.Lock()
251 n := client.seqNum
252 client.mu.Unlock()
253 return n
254 }
255
256 func (client *expClient) ack() int64 {
257 client.mu.Lock()
258 n := client.seqNum
259 client.mu.Unlock()
260 return n
261 }
262
263
264
265
266 func (exp *Exporter) Serve(listener net.Listener) {
267 for {
268 conn, err := listener.Accept()
269 if err != nil {
270 expLog("listen:", err)
271 break
272 }
273 go exp.ServeConn(conn)
274 }
275 }
276
277
278
279 func (exp *Exporter) ServeConn(conn io.ReadWriter) {
280 exp.addClient(conn).run()
281 }
282
283
284 func NewExporter() *Exporter {
285 e := &Exporter{
286 clientSet: &clientSet{
287 names: make(map[string]*chanDir),
288 clients: make(map[unackedCounter]bool),
289 },
290 }
291 return e
292 }
293
294
295
296 func (exp *Exporter) ListenAndServe(network, localaddr string) os.Error {
297 listener, err := net.Listen(network, localaddr)
298 if err != nil {
299 return err
300 }
301 go exp.Serve(listener)
302 return nil
303 }
304
305
306 func (exp *Exporter) addClient(conn io.ReadWriter) *expClient {
307 client := newClient(exp, conn)
308 exp.mu.Lock()
309 exp.clients[client] = true
310 exp.mu.Unlock()
311 return client
312 }
313
314
315 func (exp *Exporter) delClient(client *expClient) {
316 exp.mu.Lock()
317 exp.clients[client] = false, false
318 exp.mu.Unlock()
319 }
320
321
322
323
324
325
326
327 func (exp *Exporter) Drain(timeout int64) os.Error {
328
329 return exp.clientSet.drain(timeout)
330 }
331
332
333
334
335
336
337
338 func (exp *Exporter) Sync(timeout int64) os.Error {
339
340 return exp.clientSet.sync(timeout)
341 }
342
343 func checkChan(chT interface{}, dir Dir) (reflect.Value, os.Error) {
344 chanType := reflect.TypeOf(chT)
345 if chanType.Kind() != reflect.Chan {
346 return reflect.Value{}, os.NewError("not a channel")
347 }
348 if dir != Send && dir != Recv {
349 return reflect.Value{}, os.NewError("unknown channel direction")
350 }
351 switch chanType.ChanDir() {
352 case reflect.BothDir:
353 case reflect.SendDir:
354 if dir != Recv {
355 return reflect.Value{}, os.NewError("to import/export with Send, must provide <-chan")
356 }
357 case reflect.RecvDir:
358 if dir != Send {
359 return reflect.Value{}, os.NewError("to import/export with Recv, must provide chan<-")
360 }
361 }
362 return reflect.ValueOf(chT), nil
363 }
364
365
366
367
368
369
370 func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error {
371 ch, err := checkChan(chT, dir)
372 if err != nil {
373 return err
374 }
375 exp.mu.Lock()
376 defer exp.mu.Unlock()
377 _, present := exp.names[name]
378 if present {
379 return os.NewError("channel name already being exported:" + name)
380 }
381 exp.names[name] = &chanDir{ch, dir}
382 return nil
383 }
384
385
386
387 func (exp *Exporter) Hangup(name string) os.Error {
388 exp.mu.Lock()
389 chDir, ok := exp.names[name]
390 if ok {
391 exp.names[name] = nil, false
392 }
393
394 exp.mu.Unlock()
395 if !ok {
396 return os.NewError("netchan export: hangup: no such channel: " + name)
397 }
398 chDir.ch.Close()
399 return nil
400 }