1
2
3
4
5 package net
6
7 import (
8 "io"
9 "os"
10 "sync"
11 "syscall"
12 "time"
13 )
14
15
16 type netFD struct {
17
18 sysmu sync.Mutex
19 sysref int
20 closing bool
21
22
23 sysfd int
24 family int
25 proto int
26 sysfile *os.File
27 cr chan bool
28 cw chan bool
29 net string
30 laddr Addr
31 raddr Addr
32
33
34 rdeadline_delta int64
35 rdeadline int64
36 rio sync.Mutex
37 wdeadline_delta int64
38 wdeadline int64
39 wio sync.Mutex
40
41
42 ncr, ncw int
43 }
44
45 type InvalidConnError struct{}
46
47 func (e *InvalidConnError) String() string { return "invalid net.Conn" }
48 func (e *InvalidConnError) Temporary() bool { return false }
49 func (e *InvalidConnError) Timeout() bool { return false }
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 type pollServer struct {
86 cr, cw chan *netFD
87 pr, pw *os.File
88 poll *pollster
89 sync.Mutex
90 pending map[int]*netFD
91 deadline int64
92 }
93
94 func (s *pollServer) AddFD(fd *netFD, mode int) {
95 intfd := fd.sysfd
96 if intfd < 0 {
97
98 if mode == 'r' {
99 fd.cr <- true
100 } else {
101 fd.cw <- true
102 }
103 return
104 }
105
106 s.Lock()
107
108 var t int64
109 key := intfd << 1
110 if mode == 'r' {
111 fd.ncr++
112 t = fd.rdeadline
113 } else {
114 fd.ncw++
115 key++
116 t = fd.wdeadline
117 }
118 s.pending[key] = fd
119 doWakeup := false
120 if t > 0 && (s.deadline == 0 || t < s.deadline) {
121 s.deadline = t
122 doWakeup = true
123 }
124
125 wake, err := s.poll.AddFD(intfd, mode, false)
126 if err != nil {
127 panic("pollServer AddFD " + err.String())
128 }
129 if wake {
130 doWakeup = true
131 }
132
133 s.Unlock()
134
135 if doWakeup {
136 s.Wakeup()
137 }
138 }
139
140 var wakeupbuf [1]byte
141
142 func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
143
144 func (s *pollServer) LookupFD(fd int, mode int) *netFD {
145 key := fd << 1
146 if mode == 'w' {
147 key++
148 }
149 netfd, ok := s.pending[key]
150 if !ok {
151 return nil
152 }
153 s.pending[key] = nil, false
154 return netfd
155 }
156
157 func (s *pollServer) WakeFD(fd *netFD, mode int) {
158 if mode == 'r' {
159 for fd.ncr > 0 {
160 fd.ncr--
161 fd.cr <- true
162 }
163 } else {
164 for fd.ncw > 0 {
165 fd.ncw--
166 fd.cw <- true
167 }
168 }
169 }
170
171 func (s *pollServer) Now() int64 {
172 return time.Nanoseconds()
173 }
174
175 func (s *pollServer) CheckDeadlines() {
176 now := s.Now()
177
178
179
180 var next_deadline int64
181 for key, fd := range s.pending {
182 var t int64
183 var mode int
184 if key&1 == 0 {
185 mode = 'r'
186 } else {
187 mode = 'w'
188 }
189 if mode == 'r' {
190 t = fd.rdeadline
191 } else {
192 t = fd.wdeadline
193 }
194 if t > 0 {
195 if t <= now {
196 s.pending[key] = nil, false
197 if mode == 'r' {
198 s.poll.DelFD(fd.sysfd, mode)
199 fd.rdeadline = -1
200 } else {
201 s.poll.DelFD(fd.sysfd, mode)
202 fd.wdeadline = -1
203 }
204 s.WakeFD(fd, mode)
205 } else if next_deadline == 0 || t < next_deadline {
206 next_deadline = t
207 }
208 }
209 }
210 s.deadline = next_deadline
211 }
212
213 func (s *pollServer) Run() {
214 var scratch [100]byte
215 s.Lock()
216 defer s.Unlock()
217 for {
218 var t = s.deadline
219 if t > 0 {
220 t = t - s.Now()
221 if t <= 0 {
222 s.CheckDeadlines()
223 continue
224 }
225 }
226 fd, mode, err := s.poll.WaitFD(s, t)
227 if err != nil {
228 print("pollServer WaitFD: ", err.String(), "\n")
229 return
230 }
231 if fd < 0 {
232
233 s.CheckDeadlines()
234 continue
235 }
236 if fd == s.pr.Fd() {
237
238
239
240 s.pr.Read(scratch[0:])
241 s.CheckDeadlines()
242 } else {
243 netfd := s.LookupFD(fd, mode)
244 if netfd == nil {
245 print("pollServer: unexpected wakeup for fd=", fd, " mode=", string(mode), "\n")
246 continue
247 }
248 s.WakeFD(netfd, mode)
249 }
250 }
251 }
252
253 func (s *pollServer) WaitRead(fd *netFD) {
254 s.AddFD(fd, 'r')
255 <-fd.cr
256 }
257
258 func (s *pollServer) WaitWrite(fd *netFD) {
259 s.AddFD(fd, 'w')
260 <-fd.cw
261 }
262
263
264
265
266 var pollserver *pollServer
267 var onceStartServer sync.Once
268
269 func startServer() {
270 p, err := newPollServer()
271 if err != nil {
272 print("Start pollServer: ", err.String(), "\n")
273 }
274 pollserver = p
275 }
276
277 func newFD(fd, family, proto int, net string) (f *netFD, err os.Error) {
278 onceStartServer.Do(startServer)
279 if e := syscall.SetNonblock(fd, true); e != 0 {
280 return nil, os.Errno(e)
281 }
282 f = &netFD{
283 sysfd: fd,
284 family: family,
285 proto: proto,
286 net: net,
287 }
288 f.cr = make(chan bool, 1)
289 f.cw = make(chan bool, 1)
290 return f, nil
291 }
292
293 func (fd *netFD) setAddr(laddr, raddr Addr) {
294 fd.laddr = laddr
295 fd.raddr = raddr
296 var ls, rs string
297 if laddr != nil {
298 ls = laddr.String()
299 }
300 if raddr != nil {
301 rs = raddr.String()
302 }
303 fd.sysfile = os.NewFile(fd.sysfd, fd.net+":"+ls+"->"+rs)
304 }
305
306 func (fd *netFD) connect(ra syscall.Sockaddr) (err os.Error) {
307 e := syscall.Connect(fd.sysfd, ra)
308 if e == syscall.EINPROGRESS {
309 var errno int
310 pollserver.WaitWrite(fd)
311 e, errno = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR)
312 if errno != 0 {
313 return os.NewSyscallError("getsockopt", errno)
314 }
315 }
316 if e != 0 {
317 return os.Errno(e)
318 }
319 return nil
320 }
321
322
323 func (fd *netFD) incref() {
324 fd.sysmu.Lock()
325 fd.sysref++
326 fd.sysmu.Unlock()
327 }
328
329
330
331 func (fd *netFD) decref() {
332 fd.sysmu.Lock()
333 fd.sysref--
334 if fd.closing && fd.sysref == 0 && fd.sysfd >= 0 {
335
336
337
338
339 syscall.SetNonblock(fd.sysfd, false)
340 fd.sysfile.Close()
341 fd.sysfile = nil
342 fd.sysfd = -1
343 }
344 fd.sysmu.Unlock()
345 }
346
347 func (fd *netFD) Close() os.Error {
348 if fd == nil || fd.sysfile == nil {
349 return os.EINVAL
350 }
351
352 fd.incref()
353 syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR)
354 fd.closing = true
355 fd.decref()
356 return nil
357 }
358
359 func (fd *netFD) Read(p []byte) (n int, err os.Error) {
360 if fd == nil {
361 return 0, os.EINVAL
362 }
363 fd.rio.Lock()
364 defer fd.rio.Unlock()
365 fd.incref()
366 defer fd.decref()
367 if fd.sysfile == nil {
368 return 0, os.EINVAL
369 }
370 if fd.rdeadline_delta > 0 {
371 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
372 } else {
373 fd.rdeadline = 0
374 }
375 var oserr os.Error
376 for {
377 var errno int
378 n, errno = syscall.Read(fd.sysfile.Fd(), p)
379 if errno == syscall.EAGAIN && fd.rdeadline >= 0 {
380 pollserver.WaitRead(fd)
381 continue
382 }
383 if errno != 0 {
384 n = 0
385 oserr = os.Errno(errno)
386 } else if n == 0 && errno == 0 && fd.proto != syscall.SOCK_DGRAM {
387 err = os.EOF
388 }
389 break
390 }
391 if oserr != nil {
392 err = &OpError{"read", fd.net, fd.raddr, oserr}
393 }
394 return
395 }
396
397 func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err os.Error) {
398 if fd == nil || fd.sysfile == nil {
399 return 0, nil, os.EINVAL
400 }
401 fd.rio.Lock()
402 defer fd.rio.Unlock()
403 fd.incref()
404 defer fd.decref()
405 if fd.rdeadline_delta > 0 {
406 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
407 } else {
408 fd.rdeadline = 0
409 }
410 var oserr os.Error
411 for {
412 var errno int
413 n, sa, errno = syscall.Recvfrom(fd.sysfd, p, 0)
414 if errno == syscall.EAGAIN && fd.rdeadline >= 0 {
415 pollserver.WaitRead(fd)
416 continue
417 }
418 if errno != 0 {
419 n = 0
420 oserr = os.Errno(errno)
421 }
422 break
423 }
424 if oserr != nil {
425 err = &OpError{"read", fd.net, fd.laddr, oserr}
426 }
427 return
428 }
429
430 func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err os.Error) {
431 if fd == nil || fd.sysfile == nil {
432 return 0, 0, 0, nil, os.EINVAL
433 }
434 fd.rio.Lock()
435 defer fd.rio.Unlock()
436 fd.incref()
437 defer fd.decref()
438 if fd.rdeadline_delta > 0 {
439 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
440 } else {
441 fd.rdeadline = 0
442 }
443 var oserr os.Error
444 for {
445 var errno int
446 n, oobn, flags, sa, errno = syscall.Recvmsg(fd.sysfd, p, oob, 0)
447 if errno == syscall.EAGAIN && fd.rdeadline >= 0 {
448 pollserver.WaitRead(fd)
449 continue
450 }
451 if errno != 0 {
452 oserr = os.Errno(errno)
453 }
454 if n == 0 {
455 oserr = os.EOF
456 }
457 break
458 }
459 if oserr != nil {
460 err = &OpError{"read", fd.net, fd.laddr, oserr}
461 return
462 }
463 return
464 }
465
466 func (fd *netFD) Write(p []byte) (n int, err os.Error) {
467 if fd == nil {
468 return 0, os.EINVAL
469 }
470 fd.wio.Lock()
471 defer fd.wio.Unlock()
472 fd.incref()
473 defer fd.decref()
474 if fd.sysfile == nil {
475 return 0, os.EINVAL
476 }
477 if fd.wdeadline_delta > 0 {
478 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta
479 } else {
480 fd.wdeadline = 0
481 }
482 nn := 0
483 var oserr os.Error
484
485 for {
486 n, errno := syscall.Write(fd.sysfile.Fd(), p[nn:])
487 if n > 0 {
488 nn += n
489 }
490 if nn == len(p) {
491 break
492 }
493 if errno == syscall.EAGAIN && fd.wdeadline >= 0 {
494 pollserver.WaitWrite(fd)
495 continue
496 }
497 if errno != 0 {
498 n = 0
499 oserr = os.Errno(errno)
500 break
501 }
502 if n == 0 {
503 oserr = io.ErrUnexpectedEOF
504 break
505 }
506 }
507 if oserr != nil {
508 err = &OpError{"write", fd.net, fd.raddr, oserr}
509 }
510 return nn, err
511 }
512
513 func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err os.Error) {
514 if fd == nil || fd.sysfile == nil {
515 return 0, os.EINVAL
516 }
517 fd.wio.Lock()
518 defer fd.wio.Unlock()
519 fd.incref()
520 defer fd.decref()
521 if fd.wdeadline_delta > 0 {
522 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta
523 } else {
524 fd.wdeadline = 0
525 }
526 var oserr os.Error
527 for {
528 errno := syscall.Sendto(fd.sysfd, p, 0, sa)
529 if errno == syscall.EAGAIN && fd.wdeadline >= 0 {
530 pollserver.WaitWrite(fd)
531 continue
532 }
533 if errno != 0 {
534 oserr = os.Errno(errno)
535 }
536 break
537 }
538 if oserr == nil {
539 n = len(p)
540 } else {
541 err = &OpError{"write", fd.net, fd.raddr, oserr}
542 }
543 return
544 }
545
546 func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err os.Error) {
547 if fd == nil || fd.sysfile == nil {
548 return 0, 0, os.EINVAL
549 }
550 fd.wio.Lock()
551 defer fd.wio.Unlock()
552 fd.incref()
553 defer fd.decref()
554 if fd.wdeadline_delta > 0 {
555 fd.wdeadline = pollserver.Now() + fd.wdeadline_delta
556 } else {
557 fd.wdeadline = 0
558 }
559 var oserr os.Error
560 for {
561 var errno int
562 errno = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
563 if errno == syscall.EAGAIN && fd.wdeadline >= 0 {
564 pollserver.WaitWrite(fd)
565 continue
566 }
567 if errno != 0 {
568 oserr = os.Errno(errno)
569 }
570 break
571 }
572 if oserr == nil {
573 n = len(p)
574 oobn = len(oob)
575 } else {
576 err = &OpError{"write", fd.net, fd.raddr, oserr}
577 }
578 return
579 }
580
581 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os.Error) {
582 if fd == nil || fd.sysfile == nil {
583 return nil, os.EINVAL
584 }
585
586 fd.incref()
587 defer fd.decref()
588 if fd.rdeadline_delta > 0 {
589 fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
590 } else {
591 fd.rdeadline = 0
592 }
593
594
595
596
597 syscall.ForkLock.RLock()
598 var s, e int
599 var rsa syscall.Sockaddr
600 for {
601 if fd.closing {
602 syscall.ForkLock.RUnlock()
603 return nil, os.EINVAL
604 }
605 s, rsa, e = syscall.Accept(fd.sysfd)
606 if e != syscall.EAGAIN || fd.rdeadline < 0 {
607 break
608 }
609 syscall.ForkLock.RUnlock()
610 pollserver.WaitRead(fd)
611 syscall.ForkLock.RLock()
612 }
613 if e != 0 {
614 syscall.ForkLock.RUnlock()
615 return nil, &OpError{"accept", fd.net, fd.laddr, os.Errno(e)}
616 }
617 syscall.CloseOnExec(s)
618 syscall.ForkLock.RUnlock()
619
620 if nfd, err = newFD(s, fd.family, fd.proto, fd.net); err != nil {
621 syscall.Close(s)
622 return nil, err
623 }
624 lsa, _ := syscall.Getsockname(nfd.sysfd)
625 nfd.setAddr(toAddr(lsa), toAddr(rsa))
626 return nfd, nil
627 }
628
629 func (fd *netFD) dup() (f *os.File, err os.Error) {
630 ns, e := syscall.Dup(fd.sysfd)
631 if e != 0 {
632 return nil, &OpError{"dup", fd.net, fd.laddr, os.Errno(e)}
633 }
634
635
636 if e = syscall.SetNonblock(ns, false); e != 0 {
637 return nil, &OpError{"setnonblock", fd.net, fd.laddr, os.Errno(e)}
638 }
639
640 return os.NewFile(ns, fd.sysfile.Name()), nil
641 }
642
643 func closesocket(s int) (errno int) {
644 return syscall.Close(s)
645 }