The Go Programming Language

Source file src/pkg/net/fd.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package net
     6	
     7	import (
     8		"io"
     9		"os"
    10		"sync"
    11		"syscall"
    12		"time"
    13	)
    14	
    15	// Network file descriptor.
    16	type netFD struct {
    17		// locking/lifetime of sysfd
    18		sysmu   sync.Mutex
    19		sysref  int
    20		closing bool
    21	
    22		// immutable until Close
    23		sysfd   int
    24		family  int
    25		proto   int
    26		sysfile *os.File
    27		cr      chan bool
    28		cw      chan bool
    29		net     string
    30		laddr   Addr
    31		raddr   Addr
    32	
    33		// owned by client
    34		rdeadline_delta int64
    35		rdeadline       int64
    36		rio             sync.Mutex
    37		wdeadline_delta int64
    38		wdeadline       int64
    39		wio             sync.Mutex
    40	
    41		// owned by fd wait server
    42		ncr, ncw int
    43	}
    44	
    45	type InvalidConnError struct{}
    46	
    47	func (e *InvalidConnError) String() string  { return "invalid net.Conn" }
    48	func (e *InvalidConnError) Temporary() bool { return false }
    49	func (e *InvalidConnError) Timeout() bool   { return false }
    50	
    51	// A pollServer helps FDs determine when to retry a non-blocking
    52	// read or write after they get EAGAIN.  When an FD needs to wait,
    53	// send the fd on s.cr (for a read) or s.cw (for a write) to pass the
    54	// request to the poll server.  Then receive on fd.cr/fd.cw.
    55	// When the pollServer finds that i/o on FD should be possible
    56	// again, it will send fd on fd.cr/fd.cw to wake any waiting processes.
    57	// This protocol is implemented as s.WaitRead() and s.WaitWrite().
    58	//
    59	// There is one subtlety: when sending on s.cr/s.cw, the
    60	// poll server is probably in a system call, waiting for an fd
    61	// to become ready.  It's not looking at the request channels.
    62	// To resolve this, the poll server waits not just on the FDs it has
    63	// been given but also its own pipe.  After sending on the
    64	// buffered channel s.cr/s.cw, WaitRead/WaitWrite writes a
    65	// byte to the pipe, causing the pollServer's poll system call to
    66	// return.  In response to the pipe being readable, the pollServer
    67	// re-polls its request channels.
    68	//
    69	// Note that the ordering is "send request" and then "wake up server".
    70	// If the operations were reversed, there would be a race: the poll
    71	// server might wake up and look at the request channel, see that it
    72	// was empty, and go back to sleep, all before the requester managed
    73	// to send the request.  Because the send must complete before the wakeup,
    74	// the request channel must be buffered.  A buffer of size 1 is sufficient
    75	// for any request load.  If many processes are trying to submit requests,
    76	// one will succeed, the pollServer will read the request, and then the
    77	// channel will be empty for the next process's request.  A larger buffer
    78	// might help batch requests.
    79	//
    80	// To avoid races in closing, all fd operations are locked and
    81	// refcounted. when netFD.Close() is called, it calls syscall.Shutdown
    82	// and sets a closing flag. Only when the last reference is removed
    83	// will the fd be closed.
    84	
    85	type pollServer struct {
    86		cr, cw     chan *netFD // buffered >= 1
    87		pr, pw     *os.File
    88		poll       *pollster // low-level OS hooks
    89		sync.Mutex           // controls pending and deadline
    90		pending    map[int]*netFD
    91		deadline   int64 // next deadline (nsec since 1970)
    92	}
    93	
    94	func (s *pollServer) AddFD(fd *netFD, mode int) {
    95		intfd := fd.sysfd
    96		if intfd < 0 {
    97			// fd closed underfoot
    98			if mode == 'r' {
    99				fd.cr <- true
   100			} else {
   101				fd.cw <- true
   102			}
   103			return
   104		}
   105	
   106		s.Lock()
   107	
   108		var t int64
   109		key := intfd << 1
   110		if mode == 'r' {
   111			fd.ncr++
   112			t = fd.rdeadline
   113		} else {
   114			fd.ncw++
   115			key++
   116			t = fd.wdeadline
   117		}
   118		s.pending[key] = fd
   119		doWakeup := false
   120		if t > 0 && (s.deadline == 0 || t < s.deadline) {
   121			s.deadline = t
   122			doWakeup = true
   123		}
   124	
   125		wake, err := s.poll.AddFD(intfd, mode, false)
   126		if err != nil {
   127			panic("pollServer AddFD " + err.String())
   128		}
   129		if wake {
   130			doWakeup = true
   131		}
   132	
   133		s.Unlock()
   134	
   135		if doWakeup {
   136			s.Wakeup()
   137		}
   138	}
   139	
   140	var wakeupbuf [1]byte
   141	
   142	func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
   143	
   144	func (s *pollServer) LookupFD(fd int, mode int) *netFD {
   145		key := fd << 1
   146		if mode == 'w' {
   147			key++
   148		}
   149		netfd, ok := s.pending[key]
   150		if !ok {
   151			return nil
   152		}
   153		s.pending[key] = nil, false
   154		return netfd
   155	}
   156	
   157	func (s *pollServer) WakeFD(fd *netFD, mode int) {
   158		if mode == 'r' {
   159			for fd.ncr > 0 {
   160				fd.ncr--
   161				fd.cr <- true
   162			}
   163		} else {
   164			for fd.ncw > 0 {
   165				fd.ncw--
   166				fd.cw <- true
   167			}
   168		}
   169	}
   170	
   171	func (s *pollServer) Now() int64 {
   172		return time.Nanoseconds()
   173	}
   174	
   175	func (s *pollServer) CheckDeadlines() {
   176		now := s.Now()
   177		// TODO(rsc): This will need to be handled more efficiently,
   178		// probably with a heap indexed by wakeup time.
   179	
   180		var next_deadline int64
   181		for key, fd := range s.pending {
   182			var t int64
   183			var mode int
   184			if key&1 == 0 {
   185				mode = 'r'
   186			} else {
   187				mode = 'w'
   188			}
   189			if mode == 'r' {
   190				t = fd.rdeadline
   191			} else {
   192				t = fd.wdeadline
   193			}
   194			if t > 0 {
   195				if t <= now {
   196					s.pending[key] = nil, false
   197					if mode == 'r' {
   198						s.poll.DelFD(fd.sysfd, mode)
   199						fd.rdeadline = -1
   200					} else {
   201						s.poll.DelFD(fd.sysfd, mode)
   202						fd.wdeadline = -1
   203					}
   204					s.WakeFD(fd, mode)
   205				} else if next_deadline == 0 || t < next_deadline {
   206					next_deadline = t
   207				}
   208			}
   209		}
   210		s.deadline = next_deadline
   211	}
   212	
   213	func (s *pollServer) Run() {
   214		var scratch [100]byte
   215		s.Lock()
   216		defer s.Unlock()
   217		for {
   218			var t = s.deadline
   219			if t > 0 {
   220				t = t - s.Now()
   221				if t <= 0 {
   222					s.CheckDeadlines()
   223					continue
   224				}
   225			}
   226			fd, mode, err := s.poll.WaitFD(s, t)
   227			if err != nil {
   228				print("pollServer WaitFD: ", err.String(), "\n")
   229				return
   230			}
   231			if fd < 0 {
   232				// Timeout happened.
   233				s.CheckDeadlines()
   234				continue
   235			}
   236			if fd == s.pr.Fd() {
   237				// Drain our wakeup pipe (we could loop here,
   238				// but it's unlikely that there are more than
   239				// len(scratch) wakeup calls).
   240				s.pr.Read(scratch[0:])
   241				s.CheckDeadlines()
   242			} else {
   243				netfd := s.LookupFD(fd, mode)
   244				if netfd == nil {
   245					print("pollServer: unexpected wakeup for fd=", fd, " mode=", string(mode), "\n")
   246					continue
   247				}
   248				s.WakeFD(netfd, mode)
   249			}
   250		}
   251	}
   252	
   253	func (s *pollServer) WaitRead(fd *netFD) {
   254		s.AddFD(fd, 'r')
   255		<-fd.cr
   256	}
   257	
   258	func (s *pollServer) WaitWrite(fd *netFD) {
   259		s.AddFD(fd, 'w')
   260		<-fd.cw
   261	}
   262	
   263	// Network FD methods.
   264	// All the network FDs use a single pollServer.
   265	
   266	var pollserver *pollServer
   267	var onceStartServer sync.Once
   268	
   269	func startServer() {
   270		p, err := newPollServer()
   271		if err != nil {
   272			print("Start pollServer: ", err.String(), "\n")
   273		}
   274		pollserver = p
   275	}
   276	
   277	func newFD(fd, family, proto int, net string) (f *netFD, err os.Error) {
   278		onceStartServer.Do(startServer)
   279		if e := syscall.SetNonblock(fd, true); e != 0 {
   280			return nil, os.Errno(e)
   281		}
   282		f = &netFD{
   283			sysfd:  fd,
   284			family: family,
   285			proto:  proto,
   286			net:    net,
   287		}
   288		f.cr = make(chan bool, 1)
   289		f.cw = make(chan bool, 1)
   290		return f, nil
   291	}
   292	
   293	func (fd *netFD) setAddr(laddr, raddr Addr) {
   294		fd.laddr = laddr
   295		fd.raddr = raddr
   296		var ls, rs string
   297		if laddr != nil {
   298			ls = laddr.String()
   299		}
   300		if raddr != nil {
   301			rs = raddr.String()
   302		}
   303		fd.sysfile = os.NewFile(fd.sysfd, fd.net+":"+ls+"->"+rs)
   304	}
   305	
   306	func (fd *netFD) connect(ra syscall.Sockaddr) (err os.Error) {
   307		e := syscall.Connect(fd.sysfd, ra)
   308		if e == syscall.EINPROGRESS {
   309			var errno int
   310			pollserver.WaitWrite(fd)
   311			e, errno = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR)
   312			if errno != 0 {
   313				return os.NewSyscallError("getsockopt", errno)
   314			}
   315		}
   316		if e != 0 {
   317			return os.Errno(e)
   318		}
   319		return nil
   320	}
   321	
   322	// Add a reference to this fd.
   323	func (fd *netFD) incref() {
   324		fd.sysmu.Lock()
   325		fd.sysref++
   326		fd.sysmu.Unlock()
   327	}
   328	
   329	// Remove a reference to this FD and close if we've been asked to do so (and
   330	// there are no references left.
   331	func (fd *netFD) decref() {
   332		fd.sysmu.Lock()
   333		fd.sysref--
   334		if fd.closing && fd.sysref == 0 && fd.sysfd >= 0 {
   335			// In case the user has set linger, switch to blocking mode so
   336			// the close blocks.  As long as this doesn't happen often, we
   337			// can handle the extra OS processes.  Otherwise we'll need to
   338			// use the pollserver for Close too.  Sigh.
   339			syscall.SetNonblock(fd.sysfd, false)
   340			fd.sysfile.Close()
   341			fd.sysfile = nil
   342			fd.sysfd = -1
   343		}
   344		fd.sysmu.Unlock()
   345	}
   346	
   347	func (fd *netFD) Close() os.Error {
   348		if fd == nil || fd.sysfile == nil {
   349			return os.EINVAL
   350		}
   351	
   352		fd.incref()
   353		syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR)
   354		fd.closing = true
   355		fd.decref()
   356		return nil
   357	}
   358	
   359	func (fd *netFD) Read(p []byte) (n int, err os.Error) {
   360		if fd == nil {
   361			return 0, os.EINVAL
   362		}
   363		fd.rio.Lock()
   364		defer fd.rio.Unlock()
   365		fd.incref()
   366		defer fd.decref()
   367		if fd.sysfile == nil {
   368			return 0, os.EINVAL
   369		}
   370		if fd.rdeadline_delta > 0 {
   371			fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
   372		} else {
   373			fd.rdeadline = 0
   374		}
   375		var oserr os.Error
   376		for {
   377			var errno int
   378			n, errno = syscall.Read(fd.sysfile.Fd(), p)
   379			if errno == syscall.EAGAIN && fd.rdeadline >= 0 {
   380				pollserver.WaitRead(fd)
   381				continue
   382			}
   383			if errno != 0 {
   384				n = 0
   385				oserr = os.Errno(errno)
   386			} else if n == 0 && errno == 0 && fd.proto != syscall.SOCK_DGRAM {
   387				err = os.EOF
   388			}
   389			break
   390		}
   391		if oserr != nil {
   392			err = &OpError{"read", fd.net, fd.raddr, oserr}
   393		}
   394		return
   395	}
   396	
   397	func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err os.Error) {
   398		if fd == nil || fd.sysfile == nil {
   399			return 0, nil, os.EINVAL
   400		}
   401		fd.rio.Lock()
   402		defer fd.rio.Unlock()
   403		fd.incref()
   404		defer fd.decref()
   405		if fd.rdeadline_delta > 0 {
   406			fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
   407		} else {
   408			fd.rdeadline = 0
   409		}
   410		var oserr os.Error
   411		for {
   412			var errno int
   413			n, sa, errno = syscall.Recvfrom(fd.sysfd, p, 0)
   414			if errno == syscall.EAGAIN && fd.rdeadline >= 0 {
   415				pollserver.WaitRead(fd)
   416				continue
   417			}
   418			if errno != 0 {
   419				n = 0
   420				oserr = os.Errno(errno)
   421			}
   422			break
   423		}
   424		if oserr != nil {
   425			err = &OpError{"read", fd.net, fd.laddr, oserr}
   426		}
   427		return
   428	}
   429	
   430	func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err os.Error) {
   431		if fd == nil || fd.sysfile == nil {
   432			return 0, 0, 0, nil, os.EINVAL
   433		}
   434		fd.rio.Lock()
   435		defer fd.rio.Unlock()
   436		fd.incref()
   437		defer fd.decref()
   438		if fd.rdeadline_delta > 0 {
   439			fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
   440		} else {
   441			fd.rdeadline = 0
   442		}
   443		var oserr os.Error
   444		for {
   445			var errno int
   446			n, oobn, flags, sa, errno = syscall.Recvmsg(fd.sysfd, p, oob, 0)
   447			if errno == syscall.EAGAIN && fd.rdeadline >= 0 {
   448				pollserver.WaitRead(fd)
   449				continue
   450			}
   451			if errno != 0 {
   452				oserr = os.Errno(errno)
   453			}
   454			if n == 0 {
   455				oserr = os.EOF
   456			}
   457			break
   458		}
   459		if oserr != nil {
   460			err = &OpError{"read", fd.net, fd.laddr, oserr}
   461			return
   462		}
   463		return
   464	}
   465	
   466	func (fd *netFD) Write(p []byte) (n int, err os.Error) {
   467		if fd == nil {
   468			return 0, os.EINVAL
   469		}
   470		fd.wio.Lock()
   471		defer fd.wio.Unlock()
   472		fd.incref()
   473		defer fd.decref()
   474		if fd.sysfile == nil {
   475			return 0, os.EINVAL
   476		}
   477		if fd.wdeadline_delta > 0 {
   478			fd.wdeadline = pollserver.Now() + fd.wdeadline_delta
   479		} else {
   480			fd.wdeadline = 0
   481		}
   482		nn := 0
   483		var oserr os.Error
   484	
   485		for {
   486			n, errno := syscall.Write(fd.sysfile.Fd(), p[nn:])
   487			if n > 0 {
   488				nn += n
   489			}
   490			if nn == len(p) {
   491				break
   492			}
   493			if errno == syscall.EAGAIN && fd.wdeadline >= 0 {
   494				pollserver.WaitWrite(fd)
   495				continue
   496			}
   497			if errno != 0 {
   498				n = 0
   499				oserr = os.Errno(errno)
   500				break
   501			}
   502			if n == 0 {
   503				oserr = io.ErrUnexpectedEOF
   504				break
   505			}
   506		}
   507		if oserr != nil {
   508			err = &OpError{"write", fd.net, fd.raddr, oserr}
   509		}
   510		return nn, err
   511	}
   512	
   513	func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err os.Error) {
   514		if fd == nil || fd.sysfile == nil {
   515			return 0, os.EINVAL
   516		}
   517		fd.wio.Lock()
   518		defer fd.wio.Unlock()
   519		fd.incref()
   520		defer fd.decref()
   521		if fd.wdeadline_delta > 0 {
   522			fd.wdeadline = pollserver.Now() + fd.wdeadline_delta
   523		} else {
   524			fd.wdeadline = 0
   525		}
   526		var oserr os.Error
   527		for {
   528			errno := syscall.Sendto(fd.sysfd, p, 0, sa)
   529			if errno == syscall.EAGAIN && fd.wdeadline >= 0 {
   530				pollserver.WaitWrite(fd)
   531				continue
   532			}
   533			if errno != 0 {
   534				oserr = os.Errno(errno)
   535			}
   536			break
   537		}
   538		if oserr == nil {
   539			n = len(p)
   540		} else {
   541			err = &OpError{"write", fd.net, fd.raddr, oserr}
   542		}
   543		return
   544	}
   545	
   546	func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err os.Error) {
   547		if fd == nil || fd.sysfile == nil {
   548			return 0, 0, os.EINVAL
   549		}
   550		fd.wio.Lock()
   551		defer fd.wio.Unlock()
   552		fd.incref()
   553		defer fd.decref()
   554		if fd.wdeadline_delta > 0 {
   555			fd.wdeadline = pollserver.Now() + fd.wdeadline_delta
   556		} else {
   557			fd.wdeadline = 0
   558		}
   559		var oserr os.Error
   560		for {
   561			var errno int
   562			errno = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
   563			if errno == syscall.EAGAIN && fd.wdeadline >= 0 {
   564				pollserver.WaitWrite(fd)
   565				continue
   566			}
   567			if errno != 0 {
   568				oserr = os.Errno(errno)
   569			}
   570			break
   571		}
   572		if oserr == nil {
   573			n = len(p)
   574			oobn = len(oob)
   575		} else {
   576			err = &OpError{"write", fd.net, fd.raddr, oserr}
   577		}
   578		return
   579	}
   580	
   581	func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os.Error) {
   582		if fd == nil || fd.sysfile == nil {
   583			return nil, os.EINVAL
   584		}
   585	
   586		fd.incref()
   587		defer fd.decref()
   588		if fd.rdeadline_delta > 0 {
   589			fd.rdeadline = pollserver.Now() + fd.rdeadline_delta
   590		} else {
   591			fd.rdeadline = 0
   592		}
   593	
   594		// See ../syscall/exec.go for description of ForkLock.
   595		// It is okay to hold the lock across syscall.Accept
   596		// because we have put fd.sysfd into non-blocking mode.
   597		syscall.ForkLock.RLock()
   598		var s, e int
   599		var rsa syscall.Sockaddr
   600		for {
   601			if fd.closing {
   602				syscall.ForkLock.RUnlock()
   603				return nil, os.EINVAL
   604			}
   605			s, rsa, e = syscall.Accept(fd.sysfd)
   606			if e != syscall.EAGAIN || fd.rdeadline < 0 {
   607				break
   608			}
   609			syscall.ForkLock.RUnlock()
   610			pollserver.WaitRead(fd)
   611			syscall.ForkLock.RLock()
   612		}
   613		if e != 0 {
   614			syscall.ForkLock.RUnlock()
   615			return nil, &OpError{"accept", fd.net, fd.laddr, os.Errno(e)}
   616		}
   617		syscall.CloseOnExec(s)
   618		syscall.ForkLock.RUnlock()
   619	
   620		if nfd, err = newFD(s, fd.family, fd.proto, fd.net); err != nil {
   621			syscall.Close(s)
   622			return nil, err
   623		}
   624		lsa, _ := syscall.Getsockname(nfd.sysfd)
   625		nfd.setAddr(toAddr(lsa), toAddr(rsa))
   626		return nfd, nil
   627	}
   628	
   629	func (fd *netFD) dup() (f *os.File, err os.Error) {
   630		ns, e := syscall.Dup(fd.sysfd)
   631		if e != 0 {
   632			return nil, &OpError{"dup", fd.net, fd.laddr, os.Errno(e)}
   633		}
   634	
   635		// We want blocking mode for the new fd, hence the double negative.
   636		if e = syscall.SetNonblock(ns, false); e != 0 {
   637			return nil, &OpError{"setnonblock", fd.net, fd.laddr, os.Errno(e)}
   638		}
   639	
   640		return os.NewFile(ns, fd.sysfile.Name()), nil
   641	}
   642	
   643	func closesocket(s int) (errno int) {
   644		return syscall.Close(s)
   645	}

release.r60.3. Except as noted, this content is licensed under a Creative Commons Attribution 3.0 License.