The Go Programming Language

Source file src/pkg/net/fd_windows.go

     1	// Copyright 2010 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package net
     6	
     7	import (
     8		"os"
     9		"runtime"
    10		"sync"
    11		"syscall"
    12		"time"
    13		"unsafe"
    14	)
    15	
    16	type InvalidConnError struct{}
    17	
    18	func (e *InvalidConnError) String() string  { return "invalid net.Conn" }
    19	func (e *InvalidConnError) Temporary() bool { return false }
    20	func (e *InvalidConnError) Timeout() bool   { return false }
    21	
    22	var initErr os.Error
    23	
    24	func init() {
    25		var d syscall.WSAData
    26		e := syscall.WSAStartup(uint32(0x101), &d)
    27		if e != 0 {
    28			initErr = os.NewSyscallError("WSAStartup", e)
    29		}
    30	}
    31	
    32	func closesocket(s syscall.Handle) (errno int) {
    33		return syscall.Closesocket(s)
    34	}
    35	
    36	// Interface for all io operations.
    37	type anOpIface interface {
    38		Op() *anOp
    39		Name() string
    40		Submit() (errno int)
    41	}
    42	
    43	// IO completion result parameters.
    44	type ioResult struct {
    45		qty uint32
    46		err int
    47	}
    48	
    49	// anOp implements functionality common to all io operations.
    50	type anOp struct {
    51		// Used by IOCP interface, it must be first field
    52		// of the struct, as our code rely on it.
    53		o syscall.Overlapped
    54	
    55		resultc chan ioResult // io completion results
    56		errnoc  chan int      // io submit / cancel operation errors
    57		fd      *netFD
    58	}
    59	
    60	func (o *anOp) Init(fd *netFD) {
    61		o.fd = fd
    62		o.resultc = make(chan ioResult, 1)
    63		o.errnoc = make(chan int)
    64	}
    65	
    66	func (o *anOp) Op() *anOp {
    67		return o
    68	}
    69	
    70	// bufOp is used by io operations that read / write
    71	// data from / to client buffer.
    72	type bufOp struct {
    73		anOp
    74		buf syscall.WSABuf
    75	}
    76	
    77	func (o *bufOp) Init(fd *netFD, buf []byte) {
    78		o.anOp.Init(fd)
    79		o.buf.Len = uint32(len(buf))
    80		if len(buf) == 0 {
    81			o.buf.Buf = nil
    82		} else {
    83			o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0]))
    84		}
    85	}
    86	
    87	// resultSrv will retrieve all io completion results from
    88	// iocp and send them to the correspondent waiting client
    89	// goroutine via channel supplied in the request.
    90	type resultSrv struct {
    91		iocp syscall.Handle
    92	}
    93	
    94	func (s *resultSrv) Run() {
    95		var o *syscall.Overlapped
    96		var key uint32
    97		var r ioResult
    98		for {
    99			r.err = syscall.GetQueuedCompletionStatus(s.iocp, &(r.qty), &key, &o, syscall.INFINITE)
   100			switch {
   101			case r.err == 0:
   102				// Dequeued successfully completed io packet.
   103			case r.err == syscall.WAIT_TIMEOUT && o == nil:
   104				// Wait has timed out (should not happen now, but might be used in the future).
   105				panic("GetQueuedCompletionStatus timed out")
   106			case o == nil:
   107				// Failed to dequeue anything -> report the error.
   108				panic("GetQueuedCompletionStatus failed " + syscall.Errstr(r.err))
   109			default:
   110				// Dequeued failed io packet.
   111			}
   112			(*anOp)(unsafe.Pointer(o)).resultc <- r
   113		}
   114	}
   115	
   116	// ioSrv executes net io requests.
   117	type ioSrv struct {
   118		submchan chan anOpIface // submit io requests
   119		canchan  chan anOpIface // cancel io requests
   120	}
   121	
   122	// ProcessRemoteIO will execute submit io requests on behalf
   123	// of other goroutines, all on a single os thread, so it can
   124	// cancel them later. Results of all operations will be sent
   125	// back to their requesters via channel supplied in request.
   126	func (s *ioSrv) ProcessRemoteIO() {
   127		runtime.LockOSThread()
   128		defer runtime.UnlockOSThread()
   129		for {
   130			select {
   131			case o := <-s.submchan:
   132				o.Op().errnoc <- o.Submit()
   133			case o := <-s.canchan:
   134				o.Op().errnoc <- syscall.CancelIo(syscall.Handle(o.Op().fd.sysfd))
   135			}
   136		}
   137	}
   138	
   139	// ExecIO executes a single io operation. It either executes it
   140	// inline, or, if timeouts are employed, passes the request onto
   141	// a special goroutine and waits for completion or cancels request.
   142	func (s *ioSrv) ExecIO(oi anOpIface, deadline_delta int64) (n int, err os.Error) {
   143		var e int
   144		o := oi.Op()
   145		if deadline_delta > 0 {
   146			// Send request to a special dedicated thread,
   147			// so it can stop the io with CancelIO later.
   148			s.submchan <- oi
   149			e = <-o.errnoc
   150		} else {
   151			e = oi.Submit()
   152		}
   153		switch e {
   154		case 0:
   155			// IO completed immediately, but we need to get our completion message anyway.
   156		case syscall.ERROR_IO_PENDING:
   157			// IO started, and we have to wait for its completion.
   158		default:
   159			return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, os.Errno(e)}
   160		}
   161		// Wait for our request to complete.
   162		var r ioResult
   163		if deadline_delta > 0 {
   164			select {
   165			case r = <-o.resultc:
   166			case <-time.After(deadline_delta):
   167				s.canchan <- oi
   168				<-o.errnoc
   169				r = <-o.resultc
   170				if r.err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
   171					r.err = syscall.EWOULDBLOCK
   172				}
   173			}
   174		} else {
   175			r = <-o.resultc
   176		}
   177		if r.err != 0 {
   178			err = &OpError{oi.Name(), o.fd.net, o.fd.laddr, os.Errno(r.err)}
   179		}
   180		return int(r.qty), err
   181	}
   182	
   183	// Start helper goroutines.
   184	var resultsrv *resultSrv
   185	var iosrv *ioSrv
   186	var onceStartServer sync.Once
   187	
   188	func startServer() {
   189		resultsrv = new(resultSrv)
   190		var errno int
   191		resultsrv.iocp, errno = syscall.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 1)
   192		if errno != 0 {
   193			panic("CreateIoCompletionPort failed " + syscall.Errstr(errno))
   194		}
   195		go resultsrv.Run()
   196	
   197		iosrv = new(ioSrv)
   198		iosrv.submchan = make(chan anOpIface)
   199		iosrv.canchan = make(chan anOpIface)
   200		go iosrv.ProcessRemoteIO()
   201	}
   202	
   203	// Network file descriptor.
   204	type netFD struct {
   205		// locking/lifetime of sysfd
   206		sysmu   sync.Mutex
   207		sysref  int
   208		closing bool
   209	
   210		// immutable until Close
   211		sysfd  syscall.Handle
   212		family int
   213		proto  int
   214		net    string
   215		laddr  Addr
   216		raddr  Addr
   217	
   218		// owned by client
   219		rdeadline_delta int64
   220		rdeadline       int64
   221		rio             sync.Mutex
   222		wdeadline_delta int64
   223		wdeadline       int64
   224		wio             sync.Mutex
   225	}
   226	
   227	func allocFD(fd syscall.Handle, family, proto int, net string) (f *netFD) {
   228		f = &netFD{
   229			sysfd:  fd,
   230			family: family,
   231			proto:  proto,
   232			net:    net,
   233		}
   234		runtime.SetFinalizer(f, (*netFD).Close)
   235		return f
   236	}
   237	
   238	func newFD(fd syscall.Handle, family, proto int, net string) (f *netFD, err os.Error) {
   239		if initErr != nil {
   240			return nil, initErr
   241		}
   242		onceStartServer.Do(startServer)
   243		// Associate our socket with resultsrv.iocp.
   244		if _, e := syscall.CreateIoCompletionPort(syscall.Handle(fd), resultsrv.iocp, 0, 0); e != 0 {
   245			return nil, os.Errno(e)
   246		}
   247		return allocFD(fd, family, proto, net), nil
   248	}
   249	
   250	func (fd *netFD) setAddr(laddr, raddr Addr) {
   251		fd.laddr = laddr
   252		fd.raddr = raddr
   253	}
   254	
   255	func (fd *netFD) connect(ra syscall.Sockaddr) (err os.Error) {
   256		e := syscall.Connect(fd.sysfd, ra)
   257		if e != 0 {
   258			return os.Errno(e)
   259		}
   260		return nil
   261	}
   262	
   263	// Add a reference to this fd.
   264	func (fd *netFD) incref() {
   265		fd.sysmu.Lock()
   266		fd.sysref++
   267		fd.sysmu.Unlock()
   268	}
   269	
   270	// Remove a reference to this FD and close if we've been asked to do so (and
   271	// there are no references left.
   272	func (fd *netFD) decref() {
   273		fd.sysmu.Lock()
   274		fd.sysref--
   275		if fd.closing && fd.sysref == 0 && fd.sysfd != syscall.InvalidHandle {
   276			// In case the user has set linger, switch to blocking mode so
   277			// the close blocks.  As long as this doesn't happen often, we
   278			// can handle the extra OS processes.  Otherwise we'll need to
   279			// use the resultsrv for Close too.  Sigh.
   280			syscall.SetNonblock(fd.sysfd, false)
   281			closesocket(fd.sysfd)
   282			fd.sysfd = syscall.InvalidHandle
   283			// no need for a finalizer anymore
   284			runtime.SetFinalizer(fd, nil)
   285		}
   286		fd.sysmu.Unlock()
   287	}
   288	
   289	func (fd *netFD) Close() os.Error {
   290		if fd == nil || fd.sysfd == syscall.InvalidHandle {
   291			return os.EINVAL
   292		}
   293	
   294		fd.incref()
   295		syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR)
   296		fd.closing = true
   297		fd.decref()
   298		return nil
   299	}
   300	
   301	// Read from network.
   302	
   303	type readOp struct {
   304		bufOp
   305	}
   306	
   307	func (o *readOp) Submit() (errno int) {
   308		var d, f uint32
   309		return syscall.WSARecv(syscall.Handle(o.fd.sysfd), &o.buf, 1, &d, &f, &o.o, nil)
   310	}
   311	
   312	func (o *readOp) Name() string {
   313		return "WSARecv"
   314	}
   315	
   316	func (fd *netFD) Read(buf []byte) (n int, err os.Error) {
   317		if fd == nil {
   318			return 0, os.EINVAL
   319		}
   320		fd.rio.Lock()
   321		defer fd.rio.Unlock()
   322		fd.incref()
   323		defer fd.decref()
   324		if fd.sysfd == syscall.InvalidHandle {
   325			return 0, os.EINVAL
   326		}
   327		var o readOp
   328		o.Init(fd, buf)
   329		n, err = iosrv.ExecIO(&o, fd.rdeadline_delta)
   330		if err == nil && n == 0 {
   331			err = os.EOF
   332		}
   333		return
   334	}
   335	
   336	// ReadFrom from network.
   337	
   338	type readFromOp struct {
   339		bufOp
   340		rsa  syscall.RawSockaddrAny
   341		rsan int32
   342	}
   343	
   344	func (o *readFromOp) Submit() (errno int) {
   345		var d, f uint32
   346		return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &d, &f, &o.rsa, &o.rsan, &o.o, nil)
   347	}
   348	
   349	func (o *readFromOp) Name() string {
   350		return "WSARecvFrom"
   351	}
   352	
   353	func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err os.Error) {
   354		if fd == nil {
   355			return 0, nil, os.EINVAL
   356		}
   357		if len(buf) == 0 {
   358			return 0, nil, nil
   359		}
   360		fd.rio.Lock()
   361		defer fd.rio.Unlock()
   362		fd.incref()
   363		defer fd.decref()
   364		if fd.sysfd == syscall.InvalidHandle {
   365			return 0, nil, os.EINVAL
   366		}
   367		var o readFromOp
   368		o.Init(fd, buf)
   369		o.rsan = int32(unsafe.Sizeof(o.rsa))
   370		n, err = iosrv.ExecIO(&o, fd.rdeadline_delta)
   371		if err != nil {
   372			return 0, nil, err
   373		}
   374		sa, _ = o.rsa.Sockaddr()
   375		return
   376	}
   377	
   378	// Write to network.
   379	
   380	type writeOp struct {
   381		bufOp
   382	}
   383	
   384	func (o *writeOp) Submit() (errno int) {
   385		var d uint32
   386		return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &d, 0, &o.o, nil)
   387	}
   388	
   389	func (o *writeOp) Name() string {
   390		return "WSASend"
   391	}
   392	
   393	func (fd *netFD) Write(buf []byte) (n int, err os.Error) {
   394		if fd == nil {
   395			return 0, os.EINVAL
   396		}
   397		fd.wio.Lock()
   398		defer fd.wio.Unlock()
   399		fd.incref()
   400		defer fd.decref()
   401		if fd.sysfd == syscall.InvalidHandle {
   402			return 0, os.EINVAL
   403		}
   404		var o writeOp
   405		o.Init(fd, buf)
   406		return iosrv.ExecIO(&o, fd.wdeadline_delta)
   407	}
   408	
   409	// WriteTo to network.
   410	
   411	type writeToOp struct {
   412		bufOp
   413		sa syscall.Sockaddr
   414	}
   415	
   416	func (o *writeToOp) Submit() (errno int) {
   417		var d uint32
   418		return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &d, 0, o.sa, &o.o, nil)
   419	}
   420	
   421	func (o *writeToOp) Name() string {
   422		return "WSASendto"
   423	}
   424	
   425	func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (n int, err os.Error) {
   426		if fd == nil {
   427			return 0, os.EINVAL
   428		}
   429		if len(buf) == 0 {
   430			return 0, nil
   431		}
   432		fd.wio.Lock()
   433		defer fd.wio.Unlock()
   434		fd.incref()
   435		defer fd.decref()
   436		if fd.sysfd == syscall.InvalidHandle {
   437			return 0, os.EINVAL
   438		}
   439		var o writeToOp
   440		o.Init(fd, buf)
   441		o.sa = sa
   442		return iosrv.ExecIO(&o, fd.wdeadline_delta)
   443	}
   444	
   445	// Accept new network connections.
   446	
   447	type acceptOp struct {
   448		anOp
   449		newsock syscall.Handle
   450		attrs   [2]syscall.RawSockaddrAny // space for local and remote address only
   451	}
   452	
   453	func (o *acceptOp) Submit() (errno int) {
   454		var d uint32
   455		l := uint32(unsafe.Sizeof(o.attrs[0]))
   456		return syscall.AcceptEx(o.fd.sysfd, o.newsock,
   457			(*byte)(unsafe.Pointer(&o.attrs[0])), 0, l, l, &d, &o.o)
   458	}
   459	
   460	func (o *acceptOp) Name() string {
   461		return "AcceptEx"
   462	}
   463	
   464	func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os.Error) {
   465		if fd == nil || fd.sysfd == syscall.InvalidHandle {
   466			return nil, os.EINVAL
   467		}
   468		fd.incref()
   469		defer fd.decref()
   470	
   471		// Get new socket.
   472		// See ../syscall/exec.go for description of ForkLock.
   473		syscall.ForkLock.RLock()
   474		s, e := syscall.Socket(fd.family, fd.proto, 0)
   475		if e != 0 {
   476			syscall.ForkLock.RUnlock()
   477			return nil, os.Errno(e)
   478		}
   479		syscall.CloseOnExec(s)
   480		syscall.ForkLock.RUnlock()
   481	
   482		// Associate our new socket with IOCP.
   483		onceStartServer.Do(startServer)
   484		if _, e = syscall.CreateIoCompletionPort(s, resultsrv.iocp, 0, 0); e != 0 {
   485			return nil, &OpError{"CreateIoCompletionPort", fd.net, fd.laddr, os.Errno(e)}
   486		}
   487	
   488		// Submit accept request.
   489		var o acceptOp
   490		o.Init(fd)
   491		o.newsock = s
   492		_, err = iosrv.ExecIO(&o, 0)
   493		if err != nil {
   494			closesocket(s)
   495			return nil, err
   496		}
   497	
   498		// Inherit properties of the listening socket.
   499		e = syscall.Setsockopt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd)))
   500		if e != 0 {
   501			closesocket(s)
   502			return nil, err
   503		}
   504	
   505		// Get local and peer addr out of AcceptEx buffer.
   506		var lrsa, rrsa *syscall.RawSockaddrAny
   507		var llen, rlen int32
   508		l := uint32(unsafe.Sizeof(*lrsa))
   509		syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&o.attrs[0])),
   510			0, l, l, &lrsa, &llen, &rrsa, &rlen)
   511		lsa, _ := lrsa.Sockaddr()
   512		rsa, _ := rrsa.Sockaddr()
   513	
   514		nfd = allocFD(s, fd.family, fd.proto, fd.net)
   515		nfd.setAddr(toAddr(lsa), toAddr(rsa))
   516		return nfd, nil
   517	}
   518	
   519	// Unimplemented functions.
   520	
   521	func (fd *netFD) dup() (f *os.File, err os.Error) {
   522		// TODO: Implement this
   523		return nil, os.NewSyscallError("dup", syscall.EWINDOWS)
   524	}
   525	
   526	func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err os.Error) {
   527		return 0, 0, 0, nil, os.EAFNOSUPPORT
   528	}
   529	
   530	func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err os.Error) {
   531		return 0, 0, os.EAFNOSUPPORT
   532	}

release.r60.3. Except as noted, this content is licensed under a Creative Commons Attribution 3.0 License.