The Go Programming Language

Source file src/pkg/netchan/import.go

     1	// Copyright 2010 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package netchan
     6	
     7	import (
     8		"io"
     9		"log"
    10		"net"
    11		"os"
    12		"reflect"
    13		"sync"
    14		"time"
    15	)
    16	
    17	// Import
    18	
    19	// impLog is a logging convenience function.  The first argument must be a string.
    20	func impLog(args ...interface{}) {
    21		args[0] = "netchan import: " + args[0].(string)
    22		log.Print(args...)
    23	}
    24	
    25	// An Importer allows a set of channels to be imported from a single
    26	// remote machine/network port.  A machine may have multiple
    27	// importers, even from the same machine/network port.
    28	type Importer struct {
    29		*encDec
    30		chanLock sync.Mutex // protects access to channel map
    31		names    map[string]*netChan
    32		chans    map[int]*netChan
    33		errors   chan os.Error
    34		maxId    int
    35		mu       sync.Mutex // protects remaining fields
    36		unacked  int64      // number of unacknowledged sends.
    37		seqLock  sync.Mutex // guarantees messages are in sequence, only locked under mu
    38	}
    39	
    40	// NewImporter creates a new Importer object to import a set of channels
    41	// from the given connection. The Exporter must be available and serving when
    42	// the Importer is created.
    43	func NewImporter(conn io.ReadWriter) *Importer {
    44		imp := new(Importer)
    45		imp.encDec = newEncDec(conn)
    46		imp.chans = make(map[int]*netChan)
    47		imp.names = make(map[string]*netChan)
    48		imp.errors = make(chan os.Error, 10)
    49		imp.unacked = 0
    50		go imp.run()
    51		return imp
    52	}
    53	
    54	// Import imports a set of channels from the given network and address.
    55	func Import(network, remoteaddr string) (*Importer, os.Error) {
    56		conn, err := net.Dial(network, remoteaddr)
    57		if err != nil {
    58			return nil, err
    59		}
    60		return NewImporter(conn), nil
    61	}
    62	
    63	// shutdown closes all channels for which we are receiving data from the remote side.
    64	func (imp *Importer) shutdown() {
    65		imp.chanLock.Lock()
    66		for _, ich := range imp.chans {
    67			if ich.dir == Recv {
    68				ich.close()
    69			}
    70		}
    71		imp.chanLock.Unlock()
    72	}
    73	
    74	// Handle the data from a single imported data stream, which will
    75	// have the form
    76	//	(response, data)*
    77	// The response identifies by name which channel is transmitting data.
    78	func (imp *Importer) run() {
    79		// Loop on responses; requests are sent by ImportNValues()
    80		hdr := new(header)
    81		hdrValue := reflect.ValueOf(hdr)
    82		ackHdr := new(header)
    83		err := new(error)
    84		errValue := reflect.ValueOf(err)
    85		for {
    86			*hdr = header{}
    87			if e := imp.decode(hdrValue); e != nil {
    88				if e != os.EOF {
    89					impLog("header:", e)
    90					imp.shutdown()
    91				}
    92				return
    93			}
    94			switch hdr.PayloadType {
    95			case payData:
    96				// done lower in loop
    97			case payError:
    98				if e := imp.decode(errValue); e != nil {
    99					impLog("error:", e)
   100					return
   101				}
   102				if err.Error != "" {
   103					impLog("response error:", err.Error)
   104					select {
   105					case imp.errors <- os.NewError(err.Error):
   106						continue // errors are not acknowledged
   107					default:
   108						imp.shutdown()
   109						return
   110					}
   111				}
   112			case payClosed:
   113				nch := imp.getChan(hdr.Id, false)
   114				if nch != nil {
   115					nch.close()
   116				}
   117				continue // closes are not acknowledged.
   118			case payAckSend:
   119				// we can receive spurious acks if the channel is
   120				// hung up, so we ask getChan to ignore any errors.
   121				nch := imp.getChan(hdr.Id, true)
   122				if nch != nil {
   123					nch.acked()
   124					imp.mu.Lock()
   125					imp.unacked--
   126					imp.mu.Unlock()
   127				}
   128				continue
   129			default:
   130				impLog("unexpected payload type:", hdr.PayloadType)
   131				return
   132			}
   133			nch := imp.getChan(hdr.Id, false)
   134			if nch == nil {
   135				continue
   136			}
   137			if nch.dir != Recv {
   138				impLog("cannot happen: receive from non-Recv channel")
   139				return
   140			}
   141			// Acknowledge receipt
   142			ackHdr.Id = hdr.Id
   143			ackHdr.SeqNum = hdr.SeqNum
   144			imp.encode(ackHdr, payAck, nil)
   145			// Create a new value for each received item.
   146			value := reflect.New(nch.ch.Type().Elem()).Elem()
   147			if e := imp.decode(value); e != nil {
   148				impLog("importer value decode:", e)
   149				return
   150			}
   151			nch.send(value)
   152		}
   153	}
   154	
   155	func (imp *Importer) getChan(id int, errOk bool) *netChan {
   156		imp.chanLock.Lock()
   157		ich := imp.chans[id]
   158		imp.chanLock.Unlock()
   159		if ich == nil {
   160			if !errOk {
   161				impLog("unknown id in netchan request: ", id)
   162			}
   163			return nil
   164		}
   165		return ich
   166	}
   167	
   168	// Errors returns a channel from which transmission and protocol errors
   169	// can be read. Clients of the importer are not required to read the error
   170	// channel for correct execution. However, if too many errors occur
   171	// without being read from the error channel, the importer will shut down.
   172	func (imp *Importer) Errors() chan os.Error {
   173		return imp.errors
   174	}
   175	
   176	// Import imports a channel of the given type, size and specified direction.
   177	// It is equivalent to ImportNValues with a count of -1, meaning unbounded.
   178	func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) os.Error {
   179		return imp.ImportNValues(name, chT, dir, size, -1)
   180	}
   181	
   182	// ImportNValues imports a channel of the given type and specified
   183	// direction and then receives or transmits up to n values on that
   184	// channel.  A value of n==-1 implies an unbounded number of values.  The
   185	// channel will have buffer space for size values, or 1 value if size < 1.
   186	// The channel to be bound to the remote site's channel is provided
   187	// in the call and may be of arbitrary channel type.
   188	// Despite the literal signature, the effective signature is
   189	//	ImportNValues(name string, chT chan T, dir Dir, size, n int) os.Error
   190	// Example usage:
   191	//	imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234")
   192	//	if err != nil { log.Fatal(err) }
   193	//	ch := make(chan myType)
   194	//	err = imp.ImportNValues("name", ch, Recv, 1, 1)
   195	//	if err != nil { log.Fatal(err) }
   196	//	fmt.Printf("%+v\n", <-ch)
   197	func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) os.Error {
   198		ch, err := checkChan(chT, dir)
   199		if err != nil {
   200			return err
   201		}
   202		imp.chanLock.Lock()
   203		defer imp.chanLock.Unlock()
   204		_, present := imp.names[name]
   205		if present {
   206			return os.NewError("channel name already being imported:" + name)
   207		}
   208		if size < 1 {
   209			size = 1
   210		}
   211		id := imp.maxId
   212		imp.maxId++
   213		nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n))
   214		imp.names[name] = nch
   215		imp.chans[id] = nch
   216		// Tell the other side about this channel.
   217		hdr := &header{Id: id}
   218		req := &request{Name: name, Count: int64(n), Dir: dir, Size: size}
   219		if err = imp.encode(hdr, payRequest, req); err != nil {
   220			impLog("request encode:", err)
   221			return err
   222		}
   223		if dir == Send {
   224			go func() {
   225				for i := 0; n == -1 || i < n; i++ {
   226					val, ok := nch.recv()
   227					if !ok {
   228						if err = imp.encode(hdr, payClosed, nil); err != nil {
   229							impLog("error encoding client closed message:", err)
   230						}
   231						return
   232					}
   233					// We hold the lock during transmission to guarantee messages are
   234					// sent in order.
   235					imp.mu.Lock()
   236					imp.unacked++
   237					imp.seqLock.Lock()
   238					imp.mu.Unlock()
   239					if err = imp.encode(hdr, payData, val.Interface()); err != nil {
   240						impLog("error encoding client send:", err)
   241						return
   242					}
   243					imp.seqLock.Unlock()
   244				}
   245			}()
   246		}
   247		return nil
   248	}
   249	
   250	// Hangup disassociates the named channel from the Importer and closes
   251	// the channel.  Messages in flight for the channel may be dropped.
   252	func (imp *Importer) Hangup(name string) os.Error {
   253		imp.chanLock.Lock()
   254		defer imp.chanLock.Unlock()
   255		nc := imp.names[name]
   256		if nc == nil {
   257			return os.NewError("netchan import: hangup: no such channel: " + name)
   258		}
   259		imp.names[name] = nil, false
   260		imp.chans[nc.id] = nil, false
   261		nc.close()
   262		return nil
   263	}
   264	
   265	func (imp *Importer) unackedCount() int64 {
   266		imp.mu.Lock()
   267		n := imp.unacked
   268		imp.mu.Unlock()
   269		return n
   270	}
   271	
   272	// Drain waits until all messages sent from this exporter/importer, including
   273	// those not yet sent to any server and possibly including those sent while
   274	// Drain was executing, have been received by the exporter.  In short, it
   275	// waits until all the importer's messages have been received.
   276	// If the timeout (measured in nanoseconds) is positive and Drain takes
   277	// longer than that to complete, an error is returned.
   278	func (imp *Importer) Drain(timeout int64) os.Error {
   279		startTime := time.Nanoseconds()
   280		for imp.unackedCount() > 0 {
   281			if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
   282				return os.NewError("timeout")
   283			}
   284			time.Sleep(100 * 1e6)
   285		}
   286		return nil
   287	}

release.r60.3. Except as noted, this content is licensed under a Creative Commons Attribution 3.0 License.