1
2
3
4
5 package net
6
7 import (
8 "os"
9 "runtime"
10 "sync"
11 "syscall"
12 "time"
13 "unsafe"
14 )
15
16 type InvalidConnError struct{}
17
18 func (e *InvalidConnError) String() string { return "invalid net.Conn" }
19 func (e *InvalidConnError) Temporary() bool { return false }
20 func (e *InvalidConnError) Timeout() bool { return false }
21
22 var initErr os.Error
23
24 func init() {
25 var d syscall.WSAData
26 e := syscall.WSAStartup(uint32(0x101), &d)
27 if e != 0 {
28 initErr = os.NewSyscallError("WSAStartup", e)
29 }
30 }
31
32 func closesocket(s syscall.Handle) (errno int) {
33 return syscall.Closesocket(s)
34 }
35
36
37 type anOpIface interface {
38 Op() *anOp
39 Name() string
40 Submit() (errno int)
41 }
42
43
44 type ioResult struct {
45 qty uint32
46 err int
47 }
48
49
50 type anOp struct {
51
52
53 o syscall.Overlapped
54
55 resultc chan ioResult
56 errnoc chan int
57 fd *netFD
58 }
59
60 func (o *anOp) Init(fd *netFD) {
61 o.fd = fd
62 o.resultc = make(chan ioResult, 1)
63 o.errnoc = make(chan int)
64 }
65
66 func (o *anOp) Op() *anOp {
67 return o
68 }
69
70
71
72 type bufOp struct {
73 anOp
74 buf syscall.WSABuf
75 }
76
77 func (o *bufOp) Init(fd *netFD, buf []byte) {
78 o.anOp.Init(fd)
79 o.buf.Len = uint32(len(buf))
80 if len(buf) == 0 {
81 o.buf.Buf = nil
82 } else {
83 o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0]))
84 }
85 }
86
87
88
89
90 type resultSrv struct {
91 iocp syscall.Handle
92 }
93
94 func (s *resultSrv) Run() {
95 var o *syscall.Overlapped
96 var key uint32
97 var r ioResult
98 for {
99 r.err = syscall.GetQueuedCompletionStatus(s.iocp, &(r.qty), &key, &o, syscall.INFINITE)
100 switch {
101 case r.err == 0:
102
103 case r.err == syscall.WAIT_TIMEOUT && o == nil:
104
105 panic("GetQueuedCompletionStatus timed out")
106 case o == nil:
107
108 panic("GetQueuedCompletionStatus failed " + syscall.Errstr(r.err))
109 default:
110
111 }
112 (*anOp)(unsafe.Pointer(o)).resultc <- r
113 }
114 }
115
116
117 type ioSrv struct {
118 submchan chan anOpIface
119 canchan chan anOpIface
120 }
121
122
123
124
125
126 func (s *ioSrv) ProcessRemoteIO() {
127 runtime.LockOSThread()
128 defer runtime.UnlockOSThread()
129 for {
130 select {
131 case o := <-s.submchan:
132 o.Op().errnoc <- o.Submit()
133 case o := <-s.canchan:
134 o.Op().errnoc <- syscall.CancelIo(syscall.Handle(o.Op().fd.sysfd))
135 }
136 }
137 }
138
139
140
141
142 func (s *ioSrv) ExecIO(oi anOpIface, deadline_delta int64) (n int, err os.Error) {
143 var e int
144 o := oi.Op()
145 if deadline_delta > 0 {
146
147
148 s.submchan <- oi
149 e = <-o.errnoc
150 } else {
151 e = oi.Submit()
152 }
153 switch e {
154 case 0:
155
156 case syscall.ERROR_IO_PENDING:
157
158 default:
159 return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, os.Errno(e)}
160 }
161
162 var r ioResult
163 if deadline_delta > 0 {
164 select {
165 case r = <-o.resultc:
166 case <-time.After(deadline_delta):
167 s.canchan <- oi
168 <-o.errnoc
169 r = <-o.resultc
170 if r.err == syscall.ERROR_OPERATION_ABORTED {
171 r.err = syscall.EWOULDBLOCK
172 }
173 }
174 } else {
175 r = <-o.resultc
176 }
177 if r.err != 0 {
178 err = &OpError{oi.Name(), o.fd.net, o.fd.laddr, os.Errno(r.err)}
179 }
180 return int(r.qty), err
181 }
182
183
184 var resultsrv *resultSrv
185 var iosrv *ioSrv
186 var onceStartServer sync.Once
187
188 func startServer() {
189 resultsrv = new(resultSrv)
190 var errno int
191 resultsrv.iocp, errno = syscall.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 1)
192 if errno != 0 {
193 panic("CreateIoCompletionPort failed " + syscall.Errstr(errno))
194 }
195 go resultsrv.Run()
196
197 iosrv = new(ioSrv)
198 iosrv.submchan = make(chan anOpIface)
199 iosrv.canchan = make(chan anOpIface)
200 go iosrv.ProcessRemoteIO()
201 }
202
203
204 type netFD struct {
205
206 sysmu sync.Mutex
207 sysref int
208 closing bool
209
210
211 sysfd syscall.Handle
212 family int
213 proto int
214 net string
215 laddr Addr
216 raddr Addr
217
218
219 rdeadline_delta int64
220 rdeadline int64
221 rio sync.Mutex
222 wdeadline_delta int64
223 wdeadline int64
224 wio sync.Mutex
225 }
226
227 func allocFD(fd syscall.Handle, family, proto int, net string) (f *netFD) {
228 f = &netFD{
229 sysfd: fd,
230 family: family,
231 proto: proto,
232 net: net,
233 }
234 runtime.SetFinalizer(f, (*netFD).Close)
235 return f
236 }
237
238 func newFD(fd syscall.Handle, family, proto int, net string) (f *netFD, err os.Error) {
239 if initErr != nil {
240 return nil, initErr
241 }
242 onceStartServer.Do(startServer)
243
244 if _, e := syscall.CreateIoCompletionPort(syscall.Handle(fd), resultsrv.iocp, 0, 0); e != 0 {
245 return nil, os.Errno(e)
246 }
247 return allocFD(fd, family, proto, net), nil
248 }
249
250 func (fd *netFD) setAddr(laddr, raddr Addr) {
251 fd.laddr = laddr
252 fd.raddr = raddr
253 }
254
255 func (fd *netFD) connect(ra syscall.Sockaddr) (err os.Error) {
256 e := syscall.Connect(fd.sysfd, ra)
257 if e != 0 {
258 return os.Errno(e)
259 }
260 return nil
261 }
262
263
264 func (fd *netFD) incref() {
265 fd.sysmu.Lock()
266 fd.sysref++
267 fd.sysmu.Unlock()
268 }
269
270
271
272 func (fd *netFD) decref() {
273 fd.sysmu.Lock()
274 fd.sysref--
275 if fd.closing && fd.sysref == 0 && fd.sysfd != syscall.InvalidHandle {
276
277
278
279
280 syscall.SetNonblock(fd.sysfd, false)
281 closesocket(fd.sysfd)
282 fd.sysfd = syscall.InvalidHandle
283
284 runtime.SetFinalizer(fd, nil)
285 }
286 fd.sysmu.Unlock()
287 }
288
289 func (fd *netFD) Close() os.Error {
290 if fd == nil || fd.sysfd == syscall.InvalidHandle {
291 return os.EINVAL
292 }
293
294 fd.incref()
295 syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR)
296 fd.closing = true
297 fd.decref()
298 return nil
299 }
300
301
302
303 type readOp struct {
304 bufOp
305 }
306
307 func (o *readOp) Submit() (errno int) {
308 var d, f uint32
309 return syscall.WSARecv(syscall.Handle(o.fd.sysfd), &o.buf, 1, &d, &f, &o.o, nil)
310 }
311
312 func (o *readOp) Name() string {
313 return "WSARecv"
314 }
315
316 func (fd *netFD) Read(buf []byte) (n int, err os.Error) {
317 if fd == nil {
318 return 0, os.EINVAL
319 }
320 fd.rio.Lock()
321 defer fd.rio.Unlock()
322 fd.incref()
323 defer fd.decref()
324 if fd.sysfd == syscall.InvalidHandle {
325 return 0, os.EINVAL
326 }
327 var o readOp
328 o.Init(fd, buf)
329 n, err = iosrv.ExecIO(&o, fd.rdeadline_delta)
330 if err == nil && n == 0 {
331 err = os.EOF
332 }
333 return
334 }
335
336
337
338 type readFromOp struct {
339 bufOp
340 rsa syscall.RawSockaddrAny
341 rsan int32
342 }
343
344 func (o *readFromOp) Submit() (errno int) {
345 var d, f uint32
346 return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &d, &f, &o.rsa, &o.rsan, &o.o, nil)
347 }
348
349 func (o *readFromOp) Name() string {
350 return "WSARecvFrom"
351 }
352
353 func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err os.Error) {
354 if fd == nil {
355 return 0, nil, os.EINVAL
356 }
357 if len(buf) == 0 {
358 return 0, nil, nil
359 }
360 fd.rio.Lock()
361 defer fd.rio.Unlock()
362 fd.incref()
363 defer fd.decref()
364 if fd.sysfd == syscall.InvalidHandle {
365 return 0, nil, os.EINVAL
366 }
367 var o readFromOp
368 o.Init(fd, buf)
369 o.rsan = int32(unsafe.Sizeof(o.rsa))
370 n, err = iosrv.ExecIO(&o, fd.rdeadline_delta)
371 if err != nil {
372 return 0, nil, err
373 }
374 sa, _ = o.rsa.Sockaddr()
375 return
376 }
377
378
379
380 type writeOp struct {
381 bufOp
382 }
383
384 func (o *writeOp) Submit() (errno int) {
385 var d uint32
386 return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &d, 0, &o.o, nil)
387 }
388
389 func (o *writeOp) Name() string {
390 return "WSASend"
391 }
392
393 func (fd *netFD) Write(buf []byte) (n int, err os.Error) {
394 if fd == nil {
395 return 0, os.EINVAL
396 }
397 fd.wio.Lock()
398 defer fd.wio.Unlock()
399 fd.incref()
400 defer fd.decref()
401 if fd.sysfd == syscall.InvalidHandle {
402 return 0, os.EINVAL
403 }
404 var o writeOp
405 o.Init(fd, buf)
406 return iosrv.ExecIO(&o, fd.wdeadline_delta)
407 }
408
409
410
411 type writeToOp struct {
412 bufOp
413 sa syscall.Sockaddr
414 }
415
416 func (o *writeToOp) Submit() (errno int) {
417 var d uint32
418 return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &d, 0, o.sa, &o.o, nil)
419 }
420
421 func (o *writeToOp) Name() string {
422 return "WSASendto"
423 }
424
425 func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (n int, err os.Error) {
426 if fd == nil {
427 return 0, os.EINVAL
428 }
429 if len(buf) == 0 {
430 return 0, nil
431 }
432 fd.wio.Lock()
433 defer fd.wio.Unlock()
434 fd.incref()
435 defer fd.decref()
436 if fd.sysfd == syscall.InvalidHandle {
437 return 0, os.EINVAL
438 }
439 var o writeToOp
440 o.Init(fd, buf)
441 o.sa = sa
442 return iosrv.ExecIO(&o, fd.wdeadline_delta)
443 }
444
445
446
447 type acceptOp struct {
448 anOp
449 newsock syscall.Handle
450 attrs [2]syscall.RawSockaddrAny
451 }
452
453 func (o *acceptOp) Submit() (errno int) {
454 var d uint32
455 l := uint32(unsafe.Sizeof(o.attrs[0]))
456 return syscall.AcceptEx(o.fd.sysfd, o.newsock,
457 (*byte)(unsafe.Pointer(&o.attrs[0])), 0, l, l, &d, &o.o)
458 }
459
460 func (o *acceptOp) Name() string {
461 return "AcceptEx"
462 }
463
464 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os.Error) {
465 if fd == nil || fd.sysfd == syscall.InvalidHandle {
466 return nil, os.EINVAL
467 }
468 fd.incref()
469 defer fd.decref()
470
471
472
473 syscall.ForkLock.RLock()
474 s, e := syscall.Socket(fd.family, fd.proto, 0)
475 if e != 0 {
476 syscall.ForkLock.RUnlock()
477 return nil, os.Errno(e)
478 }
479 syscall.CloseOnExec(s)
480 syscall.ForkLock.RUnlock()
481
482
483 onceStartServer.Do(startServer)
484 if _, e = syscall.CreateIoCompletionPort(s, resultsrv.iocp, 0, 0); e != 0 {
485 return nil, &OpError{"CreateIoCompletionPort", fd.net, fd.laddr, os.Errno(e)}
486 }
487
488
489 var o acceptOp
490 o.Init(fd)
491 o.newsock = s
492 _, err = iosrv.ExecIO(&o, 0)
493 if err != nil {
494 closesocket(s)
495 return nil, err
496 }
497
498
499 e = syscall.Setsockopt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd)))
500 if e != 0 {
501 closesocket(s)
502 return nil, err
503 }
504
505
506 var lrsa, rrsa *syscall.RawSockaddrAny
507 var llen, rlen int32
508 l := uint32(unsafe.Sizeof(*lrsa))
509 syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&o.attrs[0])),
510 0, l, l, &lrsa, &llen, &rrsa, &rlen)
511 lsa, _ := lrsa.Sockaddr()
512 rsa, _ := rrsa.Sockaddr()
513
514 nfd = allocFD(s, fd.family, fd.proto, fd.net)
515 nfd.setAddr(toAddr(lsa), toAddr(rsa))
516 return nfd, nil
517 }
518
519
520
521 func (fd *netFD) dup() (f *os.File, err os.Error) {
522
523 return nil, os.NewSyscallError("dup", syscall.EWINDOWS)
524 }
525
526 func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err os.Error) {
527 return 0, 0, 0, nil, os.EAFNOSUPPORT
528 }
529
530 func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err os.Error) {
531 return 0, 0, os.EAFNOSUPPORT
532 }