1
2
3
4
5 package netchan
6
7 import (
8 "io"
9 "log"
10 "net"
11 "os"
12 "reflect"
13 "sync"
14 "time"
15 )
16
17
18
19
20 func impLog(args ...interface{}) {
21 args[0] = "netchan import: " + args[0].(string)
22 log.Print(args...)
23 }
24
25
26
27
28 type Importer struct {
29 *encDec
30 chanLock sync.Mutex
31 names map[string]*netChan
32 chans map[int]*netChan
33 errors chan os.Error
34 maxId int
35 mu sync.Mutex
36 unacked int64
37 seqLock sync.Mutex
38 }
39
40
41
42
43 func NewImporter(conn io.ReadWriter) *Importer {
44 imp := new(Importer)
45 imp.encDec = newEncDec(conn)
46 imp.chans = make(map[int]*netChan)
47 imp.names = make(map[string]*netChan)
48 imp.errors = make(chan os.Error, 10)
49 imp.unacked = 0
50 go imp.run()
51 return imp
52 }
53
54
55 func Import(network, remoteaddr string) (*Importer, os.Error) {
56 conn, err := net.Dial(network, remoteaddr)
57 if err != nil {
58 return nil, err
59 }
60 return NewImporter(conn), nil
61 }
62
63
64 func (imp *Importer) shutdown() {
65 imp.chanLock.Lock()
66 for _, ich := range imp.chans {
67 if ich.dir == Recv {
68 ich.close()
69 }
70 }
71 imp.chanLock.Unlock()
72 }
73
74
75
76
77
78 func (imp *Importer) run() {
79
80 hdr := new(header)
81 hdrValue := reflect.ValueOf(hdr)
82 ackHdr := new(header)
83 err := new(error)
84 errValue := reflect.ValueOf(err)
85 for {
86 *hdr = header{}
87 if e := imp.decode(hdrValue); e != nil {
88 if e != os.EOF {
89 impLog("header:", e)
90 imp.shutdown()
91 }
92 return
93 }
94 switch hdr.PayloadType {
95 case payData:
96
97 case payError:
98 if e := imp.decode(errValue); e != nil {
99 impLog("error:", e)
100 return
101 }
102 if err.Error != "" {
103 impLog("response error:", err.Error)
104 select {
105 case imp.errors <- os.NewError(err.Error):
106 continue
107 default:
108 imp.shutdown()
109 return
110 }
111 }
112 case payClosed:
113 nch := imp.getChan(hdr.Id, false)
114 if nch != nil {
115 nch.close()
116 }
117 continue
118 case payAckSend:
119
120
121 nch := imp.getChan(hdr.Id, true)
122 if nch != nil {
123 nch.acked()
124 imp.mu.Lock()
125 imp.unacked--
126 imp.mu.Unlock()
127 }
128 continue
129 default:
130 impLog("unexpected payload type:", hdr.PayloadType)
131 return
132 }
133 nch := imp.getChan(hdr.Id, false)
134 if nch == nil {
135 continue
136 }
137 if nch.dir != Recv {
138 impLog("cannot happen: receive from non-Recv channel")
139 return
140 }
141
142 ackHdr.Id = hdr.Id
143 ackHdr.SeqNum = hdr.SeqNum
144 imp.encode(ackHdr, payAck, nil)
145
146 value := reflect.New(nch.ch.Type().Elem()).Elem()
147 if e := imp.decode(value); e != nil {
148 impLog("importer value decode:", e)
149 return
150 }
151 nch.send(value)
152 }
153 }
154
155 func (imp *Importer) getChan(id int, errOk bool) *netChan {
156 imp.chanLock.Lock()
157 ich := imp.chans[id]
158 imp.chanLock.Unlock()
159 if ich == nil {
160 if !errOk {
161 impLog("unknown id in netchan request: ", id)
162 }
163 return nil
164 }
165 return ich
166 }
167
168
169
170
171
172 func (imp *Importer) Errors() chan os.Error {
173 return imp.errors
174 }
175
176
177
178 func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) os.Error {
179 return imp.ImportNValues(name, chT, dir, size, -1)
180 }
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197 func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) os.Error {
198 ch, err := checkChan(chT, dir)
199 if err != nil {
200 return err
201 }
202 imp.chanLock.Lock()
203 defer imp.chanLock.Unlock()
204 _, present := imp.names[name]
205 if present {
206 return os.NewError("channel name already being imported:" + name)
207 }
208 if size < 1 {
209 size = 1
210 }
211 id := imp.maxId
212 imp.maxId++
213 nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n))
214 imp.names[name] = nch
215 imp.chans[id] = nch
216
217 hdr := &header{Id: id}
218 req := &request{Name: name, Count: int64(n), Dir: dir, Size: size}
219 if err = imp.encode(hdr, payRequest, req); err != nil {
220 impLog("request encode:", err)
221 return err
222 }
223 if dir == Send {
224 go func() {
225 for i := 0; n == -1 || i < n; i++ {
226 val, ok := nch.recv()
227 if !ok {
228 if err = imp.encode(hdr, payClosed, nil); err != nil {
229 impLog("error encoding client closed message:", err)
230 }
231 return
232 }
233
234
235 imp.mu.Lock()
236 imp.unacked++
237 imp.seqLock.Lock()
238 imp.mu.Unlock()
239 if err = imp.encode(hdr, payData, val.Interface()); err != nil {
240 impLog("error encoding client send:", err)
241 return
242 }
243 imp.seqLock.Unlock()
244 }
245 }()
246 }
247 return nil
248 }
249
250
251
252 func (imp *Importer) Hangup(name string) os.Error {
253 imp.chanLock.Lock()
254 defer imp.chanLock.Unlock()
255 nc := imp.names[name]
256 if nc == nil {
257 return os.NewError("netchan import: hangup: no such channel: " + name)
258 }
259 imp.names[name] = nil, false
260 imp.chans[nc.id] = nil, false
261 nc.close()
262 return nil
263 }
264
265 func (imp *Importer) unackedCount() int64 {
266 imp.mu.Lock()
267 n := imp.unacked
268 imp.mu.Unlock()
269 return n
270 }
271
272
273
274
275
276
277
278 func (imp *Importer) Drain(timeout int64) os.Error {
279 startTime := time.Nanoseconds()
280 for imp.unackedCount() > 0 {
281 if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
282 return os.NewError("timeout")
283 }
284 time.Sleep(100 * 1e6)
285 }
286 return nil
287 }