The Go Programming Language

Source file src/pkg/netchan/common.go

     1	// Copyright 2010 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package netchan
     6	
     7	import (
     8		"gob"
     9		"io"
    10		"os"
    11		"reflect"
    12		"sync"
    13		"time"
    14	)
    15	
    16	// The direction of a connection from the client's perspective.
    17	type Dir int
    18	
    19	const (
    20		Recv Dir = iota
    21		Send
    22	)
    23	
    24	func (dir Dir) String() string {
    25		switch dir {
    26		case Recv:
    27			return "Recv"
    28		case Send:
    29			return "Send"
    30		}
    31		return "???"
    32	}
    33	
    34	// Payload types
    35	const (
    36		payRequest = iota // request structure follows
    37		payError          // error structure follows
    38		payData           // user payload follows
    39		payAck            // acknowledgement; no payload
    40		payClosed         // channel is now closed
    41		payAckSend        // payload has been delivered.
    42	)
    43	
    44	// A header is sent as a prefix to every transmission.  It will be followed by
    45	// a request structure, an error structure, or an arbitrary user payload structure.
    46	type header struct {
    47		Id          int
    48		PayloadType int
    49		SeqNum      int64
    50	}
    51	
    52	// Sent with a header once per channel from importer to exporter to report
    53	// that it wants to bind to a channel with the specified direction for count
    54	// messages, with space for size buffered values. If count is -1, it means unlimited.
    55	type request struct {
    56		Name  string
    57		Count int64
    58		Size  int
    59		Dir   Dir
    60	}
    61	
    62	// Sent with a header to report an error.
    63	type error struct {
    64		Error string
    65	}
    66	
    67	// Used to unify management of acknowledgements for import and export.
    68	type unackedCounter interface {
    69		unackedCount() int64
    70		ack() int64
    71		seq() int64
    72	}
    73	
    74	// A channel and its direction.
    75	type chanDir struct {
    76		ch  reflect.Value
    77		dir Dir
    78	}
    79	
    80	// clientSet contains the objects and methods needed for tracking
    81	// clients of an exporter and draining outstanding messages.
    82	type clientSet struct {
    83		mu      sync.Mutex // protects access to channel and client maps
    84		names   map[string]*chanDir
    85		clients map[unackedCounter]bool
    86	}
    87	
    88	// Mutex-protected encoder and decoder pair.
    89	type encDec struct {
    90		decLock sync.Mutex
    91		dec     *gob.Decoder
    92		encLock sync.Mutex
    93		enc     *gob.Encoder
    94	}
    95	
    96	func newEncDec(conn io.ReadWriter) *encDec {
    97		return &encDec{
    98			dec: gob.NewDecoder(conn),
    99			enc: gob.NewEncoder(conn),
   100		}
   101	}
   102	
   103	// Decode an item from the connection.
   104	func (ed *encDec) decode(value reflect.Value) os.Error {
   105		ed.decLock.Lock()
   106		err := ed.dec.DecodeValue(value)
   107		if err != nil {
   108			// TODO: tear down connection?
   109		}
   110		ed.decLock.Unlock()
   111		return err
   112	}
   113	
   114	// Encode a header and payload onto the connection.
   115	func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) os.Error {
   116		ed.encLock.Lock()
   117		hdr.PayloadType = payloadType
   118		err := ed.enc.Encode(hdr)
   119		if err == nil {
   120			if payload != nil {
   121				err = ed.enc.Encode(payload)
   122			}
   123		}
   124		if err != nil {
   125			// TODO: tear down connection if there is an error?
   126		}
   127		ed.encLock.Unlock()
   128		return err
   129	}
   130	
   131	// See the comment for Exporter.Drain.
   132	func (cs *clientSet) drain(timeout int64) os.Error {
   133		startTime := time.Nanoseconds()
   134		for {
   135			pending := false
   136			cs.mu.Lock()
   137			// Any messages waiting for a client?
   138			for _, chDir := range cs.names {
   139				if chDir.ch.Len() > 0 {
   140					pending = true
   141				}
   142			}
   143			// Any unacknowledged messages?
   144			for client := range cs.clients {
   145				n := client.unackedCount()
   146				if n > 0 { // Check for > rather than != just to be safe.
   147					pending = true
   148					break
   149				}
   150			}
   151			cs.mu.Unlock()
   152			if !pending {
   153				break
   154			}
   155			if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
   156				return os.NewError("timeout")
   157			}
   158			time.Sleep(100 * 1e6) // 100 milliseconds
   159		}
   160		return nil
   161	}
   162	
   163	// See the comment for Exporter.Sync.
   164	func (cs *clientSet) sync(timeout int64) os.Error {
   165		startTime := time.Nanoseconds()
   166		// seq remembers the clients and their seqNum at point of entry.
   167		seq := make(map[unackedCounter]int64)
   168		for client := range cs.clients {
   169			seq[client] = client.seq()
   170		}
   171		for {
   172			pending := false
   173			cs.mu.Lock()
   174			// Any unacknowledged messages?  Look only at clients that existed
   175			// when we started and are still in this client set.
   176			for client := range seq {
   177				if _, ok := cs.clients[client]; ok {
   178					if client.ack() < seq[client] {
   179						pending = true
   180						break
   181					}
   182				}
   183			}
   184			cs.mu.Unlock()
   185			if !pending {
   186				break
   187			}
   188			if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
   189				return os.NewError("timeout")
   190			}
   191			time.Sleep(100 * 1e6) // 100 milliseconds
   192		}
   193		return nil
   194	}
   195	
   196	// A netChan represents a channel imported or exported
   197	// on a single connection. Flow is controlled by the receiving
   198	// side by sending payAckSend messages when values
   199	// are delivered into the local channel.
   200	type netChan struct {
   201		*chanDir
   202		name   string
   203		id     int
   204		size   int // buffer size of channel.
   205		closed bool
   206	
   207		// sender-specific state
   208		ackCh chan bool // buffered with space for all the acks we need
   209		space int       // available space.
   210	
   211		// receiver-specific state
   212		sendCh chan reflect.Value // buffered channel of values received from other end.
   213		ed     *encDec            // so that we can send acks.
   214		count  int64              // number of values still to receive.
   215	}
   216	
   217	// Create a new netChan with the given name (only used for
   218	// messages), id, direction, buffer size, and count.
   219	// The connection to the other side is represented by ed.
   220	func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
   221		c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
   222		if c.dir == Send {
   223			c.ackCh = make(chan bool, size)
   224			c.space = size
   225		}
   226		return c
   227	}
   228	
   229	// Close the channel.
   230	func (nch *netChan) close() {
   231		if nch.closed {
   232			return
   233		}
   234		if nch.dir == Recv {
   235			if nch.sendCh != nil {
   236				// If the sender goroutine is active, close the channel to it.
   237				// It will close nch.ch when it can.
   238				close(nch.sendCh)
   239			} else {
   240				nch.ch.Close()
   241			}
   242		} else {
   243			nch.ch.Close()
   244			close(nch.ackCh)
   245		}
   246		nch.closed = true
   247	}
   248	
   249	// Send message from remote side to local receiver.
   250	func (nch *netChan) send(val reflect.Value) {
   251		if nch.dir != Recv {
   252			panic("send on wrong direction of channel")
   253		}
   254		if nch.sendCh == nil {
   255			// If possible, do local send directly and ack immediately.
   256			if nch.ch.TrySend(val) {
   257				nch.sendAck()
   258				return
   259			}
   260			// Start sender goroutine to manage delayed delivery of values.
   261			nch.sendCh = make(chan reflect.Value, nch.size)
   262			go nch.sender()
   263		}
   264		select {
   265		case nch.sendCh <- val:
   266			// ok
   267		default:
   268			// TODO: should this be more resilient?
   269			panic("netchan: remote sender sent more values than allowed")
   270		}
   271	}
   272	
   273	// sendAck sends an acknowledgment that a message has left
   274	// the channel's buffer. If the messages remaining to be sent
   275	// will fit in the channel's buffer, then we don't
   276	// need to send an ack.
   277	func (nch *netChan) sendAck() {
   278		if nch.count < 0 || nch.count > int64(nch.size) {
   279			nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
   280		}
   281		if nch.count > 0 {
   282			nch.count--
   283		}
   284	}
   285	
   286	// The sender process forwards items from the sending queue
   287	// to the destination channel, acknowledging each item.
   288	func (nch *netChan) sender() {
   289		if nch.dir != Recv {
   290			panic("sender on wrong direction of channel")
   291		}
   292		// When Exporter.Hangup is called, the underlying channel is closed,
   293		// and so we may get a "too many operations on closed channel" error
   294		// if there are outstanding messages in sendCh.
   295		// Make sure that this doesn't panic the whole program.
   296		defer func() {
   297			if r := recover(); r != nil {
   298				// TODO check that r is "too many operations", otherwise re-panic.
   299			}
   300		}()
   301		for v := range nch.sendCh {
   302			nch.ch.Send(v)
   303			nch.sendAck()
   304		}
   305		nch.ch.Close()
   306	}
   307	
   308	// Receive value from local side for sending to remote side.
   309	func (nch *netChan) recv() (val reflect.Value, ok bool) {
   310		if nch.dir != Send {
   311			panic("recv on wrong direction of channel")
   312		}
   313	
   314		if nch.space == 0 {
   315			// Wait for buffer space.
   316			<-nch.ackCh
   317			nch.space++
   318		}
   319		nch.space--
   320		return nch.ch.Recv()
   321	}
   322	
   323	// acked is called when the remote side indicates that
   324	// a value has been delivered.
   325	func (nch *netChan) acked() {
   326		if nch.dir != Send {
   327			panic("recv on wrong direction of channel")
   328		}
   329		select {
   330		case nch.ackCh <- true:
   331			// ok
   332		default:
   333			// TODO: should this be more resilient?
   334			panic("netchan: remote receiver sent too many acks")
   335		}
   336	}

release.r60.3. Except as noted, this content is licensed under a Creative Commons Attribution 3.0 License.