1
2
3
4
5 package netchan
6
7 import (
8 "gob"
9 "io"
10 "os"
11 "reflect"
12 "sync"
13 "time"
14 )
15
16
17 type Dir int
18
19 const (
20 Recv Dir = iota
21 Send
22 )
23
24 func (dir Dir) String() string {
25 switch dir {
26 case Recv:
27 return "Recv"
28 case Send:
29 return "Send"
30 }
31 return "???"
32 }
33
34
35 const (
36 payRequest = iota
37 payError
38 payData
39 payAck
40 payClosed
41 payAckSend
42 )
43
44
45
46 type header struct {
47 Id int
48 PayloadType int
49 SeqNum int64
50 }
51
52
53
54
55 type request struct {
56 Name string
57 Count int64
58 Size int
59 Dir Dir
60 }
61
62
63 type error struct {
64 Error string
65 }
66
67
68 type unackedCounter interface {
69 unackedCount() int64
70 ack() int64
71 seq() int64
72 }
73
74
75 type chanDir struct {
76 ch reflect.Value
77 dir Dir
78 }
79
80
81
82 type clientSet struct {
83 mu sync.Mutex
84 names map[string]*chanDir
85 clients map[unackedCounter]bool
86 }
87
88
89 type encDec struct {
90 decLock sync.Mutex
91 dec *gob.Decoder
92 encLock sync.Mutex
93 enc *gob.Encoder
94 }
95
96 func newEncDec(conn io.ReadWriter) *encDec {
97 return &encDec{
98 dec: gob.NewDecoder(conn),
99 enc: gob.NewEncoder(conn),
100 }
101 }
102
103
104 func (ed *encDec) decode(value reflect.Value) os.Error {
105 ed.decLock.Lock()
106 err := ed.dec.DecodeValue(value)
107 if err != nil {
108
109 }
110 ed.decLock.Unlock()
111 return err
112 }
113
114
115 func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) os.Error {
116 ed.encLock.Lock()
117 hdr.PayloadType = payloadType
118 err := ed.enc.Encode(hdr)
119 if err == nil {
120 if payload != nil {
121 err = ed.enc.Encode(payload)
122 }
123 }
124 if err != nil {
125
126 }
127 ed.encLock.Unlock()
128 return err
129 }
130
131
132 func (cs *clientSet) drain(timeout int64) os.Error {
133 startTime := time.Nanoseconds()
134 for {
135 pending := false
136 cs.mu.Lock()
137
138 for _, chDir := range cs.names {
139 if chDir.ch.Len() > 0 {
140 pending = true
141 }
142 }
143
144 for client := range cs.clients {
145 n := client.unackedCount()
146 if n > 0 {
147 pending = true
148 break
149 }
150 }
151 cs.mu.Unlock()
152 if !pending {
153 break
154 }
155 if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
156 return os.NewError("timeout")
157 }
158 time.Sleep(100 * 1e6)
159 }
160 return nil
161 }
162
163
164 func (cs *clientSet) sync(timeout int64) os.Error {
165 startTime := time.Nanoseconds()
166
167 seq := make(map[unackedCounter]int64)
168 for client := range cs.clients {
169 seq[client] = client.seq()
170 }
171 for {
172 pending := false
173 cs.mu.Lock()
174
175
176 for client := range seq {
177 if _, ok := cs.clients[client]; ok {
178 if client.ack() < seq[client] {
179 pending = true
180 break
181 }
182 }
183 }
184 cs.mu.Unlock()
185 if !pending {
186 break
187 }
188 if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
189 return os.NewError("timeout")
190 }
191 time.Sleep(100 * 1e6)
192 }
193 return nil
194 }
195
196
197
198
199
200 type netChan struct {
201 *chanDir
202 name string
203 id int
204 size int
205 closed bool
206
207
208 ackCh chan bool
209 space int
210
211
212 sendCh chan reflect.Value
213 ed *encDec
214 count int64
215 }
216
217
218
219
220 func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
221 c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
222 if c.dir == Send {
223 c.ackCh = make(chan bool, size)
224 c.space = size
225 }
226 return c
227 }
228
229
230 func (nch *netChan) close() {
231 if nch.closed {
232 return
233 }
234 if nch.dir == Recv {
235 if nch.sendCh != nil {
236
237
238 close(nch.sendCh)
239 } else {
240 nch.ch.Close()
241 }
242 } else {
243 nch.ch.Close()
244 close(nch.ackCh)
245 }
246 nch.closed = true
247 }
248
249
250 func (nch *netChan) send(val reflect.Value) {
251 if nch.dir != Recv {
252 panic("send on wrong direction of channel")
253 }
254 if nch.sendCh == nil {
255
256 if nch.ch.TrySend(val) {
257 nch.sendAck()
258 return
259 }
260
261 nch.sendCh = make(chan reflect.Value, nch.size)
262 go nch.sender()
263 }
264 select {
265 case nch.sendCh <- val:
266
267 default:
268
269 panic("netchan: remote sender sent more values than allowed")
270 }
271 }
272
273
274
275
276
277 func (nch *netChan) sendAck() {
278 if nch.count < 0 || nch.count > int64(nch.size) {
279 nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
280 }
281 if nch.count > 0 {
282 nch.count--
283 }
284 }
285
286
287
288 func (nch *netChan) sender() {
289 if nch.dir != Recv {
290 panic("sender on wrong direction of channel")
291 }
292
293
294
295
296 defer func() {
297 if r := recover(); r != nil {
298
299 }
300 }()
301 for v := range nch.sendCh {
302 nch.ch.Send(v)
303 nch.sendAck()
304 }
305 nch.ch.Close()
306 }
307
308
309 func (nch *netChan) recv() (val reflect.Value, ok bool) {
310 if nch.dir != Send {
311 panic("recv on wrong direction of channel")
312 }
313
314 if nch.space == 0 {
315
316 <-nch.ackCh
317 nch.space++
318 }
319 nch.space--
320 return nch.ch.Recv()
321 }
322
323
324
325 func (nch *netChan) acked() {
326 if nch.dir != Send {
327 panic("recv on wrong direction of channel")
328 }
329 select {
330 case nch.ackCh <- true:
331
332 default:
333
334 panic("netchan: remote receiver sent too many acks")
335 }
336 }