The Go Programming Language

Source file src/pkg/netchan/export.go

     1	// Copyright 2010 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	/*
     6		Package netchan implements type-safe networked channels:
     7		it allows the two ends of a channel to appear on different
     8		computers connected by a network.  It does this by transporting
     9		data sent to a channel on one machine so it can be recovered
    10		by a receive of a channel of the same type on the other.
    11	
    12		An exporter publishes a set of channels by name.  An importer
    13		connects to the exporting machine and imports the channels
    14		by name. After importing the channels, the two machines can
    15		use the channels in the usual way.
    16	
    17		Networked channels are not synchronized; they always behave
    18		as if they are buffered channels of at least one element.
    19	*/
    20	package netchan
    21	
    22	// BUG: can't use range clause to receive when using ImportNValues to limit the count.
    23	
    24	import (
    25		"log"
    26		"io"
    27		"net"
    28		"os"
    29		"reflect"
    30		"strconv"
    31		"sync"
    32	)
    33	
    34	// Export
    35	
    36	// expLog is a logging convenience function.  The first argument must be a string.
    37	func expLog(args ...interface{}) {
    38		args[0] = "netchan export: " + args[0].(string)
    39		log.Print(args...)
    40	}
    41	
    42	// An Exporter allows a set of channels to be published on a single
    43	// network port.  A single machine may have multiple Exporters
    44	// but they must use different ports.
    45	type Exporter struct {
    46		*clientSet
    47	}
    48	
    49	type expClient struct {
    50		*encDec
    51		exp     *Exporter
    52		chans   map[int]*netChan // channels in use by client
    53		mu      sync.Mutex       // protects remaining fields
    54		errored bool             // client has been sent an error
    55		seqNum  int64            // sequences messages sent to client; has value of highest sent
    56		ackNum  int64            // highest sequence number acknowledged
    57		seqLock sync.Mutex       // guarantees messages are in sequence, only locked under mu
    58	}
    59	
    60	func newClient(exp *Exporter, conn io.ReadWriter) *expClient {
    61		client := new(expClient)
    62		client.exp = exp
    63		client.encDec = newEncDec(conn)
    64		client.seqNum = 0
    65		client.ackNum = 0
    66		client.chans = make(map[int]*netChan)
    67		return client
    68	}
    69	
    70	func (client *expClient) sendError(hdr *header, err string) {
    71		error := &error{err}
    72		expLog("sending error to client:", error.Error)
    73		client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
    74		client.mu.Lock()
    75		client.errored = true
    76		client.mu.Unlock()
    77	}
    78	
    79	func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan {
    80		exp := client.exp
    81		exp.mu.Lock()
    82		ech, ok := exp.names[name]
    83		exp.mu.Unlock()
    84		if !ok {
    85			client.sendError(hdr, "no such channel: "+name)
    86			return nil
    87		}
    88		if ech.dir != dir {
    89			client.sendError(hdr, "wrong direction for channel: "+name)
    90			return nil
    91		}
    92		nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count)
    93		client.chans[hdr.Id] = nch
    94		return nch
    95	}
    96	
    97	func (client *expClient) getChan(hdr *header, dir Dir) *netChan {
    98		nch := client.chans[hdr.Id]
    99		if nch == nil {
   100			return nil
   101		}
   102		if nch.dir != dir {
   103			client.sendError(hdr, "wrong direction for channel: "+nch.name)
   104		}
   105		return nch
   106	}
   107	
   108	// The function run manages sends and receives for a single client.  For each
   109	// (client Recv) request, this will launch a serveRecv goroutine to deliver
   110	// the data for that channel, while (client Send) requests are handled as
   111	// data arrives from the client.
   112	func (client *expClient) run() {
   113		hdr := new(header)
   114		hdrValue := reflect.ValueOf(hdr)
   115		req := new(request)
   116		reqValue := reflect.ValueOf(req)
   117		error := new(error)
   118		for {
   119			*hdr = header{}
   120			if err := client.decode(hdrValue); err != nil {
   121				if err != os.EOF {
   122					expLog("error decoding client header:", err)
   123				}
   124				break
   125			}
   126			switch hdr.PayloadType {
   127			case payRequest:
   128				*req = request{}
   129				if err := client.decode(reqValue); err != nil {
   130					expLog("error decoding client request:", err)
   131					break
   132				}
   133				if req.Size < 1 {
   134					panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values")
   135				}
   136				switch req.Dir {
   137				case Recv:
   138					// look up channel before calling serveRecv to
   139					// avoid a lock around client.chans.
   140					if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil {
   141						go client.serveRecv(nch, *hdr, req.Count)
   142					}
   143				case Send:
   144					client.newChan(hdr, Recv, req.Name, req.Size, req.Count)
   145					// The actual sends will have payload type payData.
   146					// TODO: manage the count?
   147				default:
   148					error.Error = "request: can't handle channel direction"
   149					expLog(error.Error, req.Dir)
   150					client.encode(hdr, payError, error)
   151				}
   152			case payData:
   153				client.serveSend(*hdr)
   154			case payClosed:
   155				client.serveClosed(*hdr)
   156			case payAck:
   157				client.mu.Lock()
   158				if client.ackNum != hdr.SeqNum-1 {
   159					// Since the sequence number is incremented and the message is sent
   160					// in a single instance of locking client.mu, the messages are guaranteed
   161					// to be sent in order.  Therefore receipt of acknowledgement N means
   162					// all messages <=N have been seen by the recipient.  We check anyway.
   163					expLog("sequence out of order:", client.ackNum, hdr.SeqNum)
   164				}
   165				if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count. 
   166					client.ackNum = hdr.SeqNum
   167				}
   168				client.mu.Unlock()
   169			case payAckSend:
   170				if nch := client.getChan(hdr, Send); nch != nil {
   171					nch.acked()
   172				}
   173			default:
   174				log.Fatal("netchan export: unknown payload type", hdr.PayloadType)
   175			}
   176		}
   177		client.exp.delClient(client)
   178	}
   179	
   180	// Send all the data on a single channel to a client asking for a Recv.
   181	// The header is passed by value to avoid issues of overwriting.
   182	func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) {
   183		for {
   184			val, ok := nch.recv()
   185			if !ok {
   186				if err := client.encode(&hdr, payClosed, nil); err != nil {
   187					expLog("error encoding server closed message:", err)
   188				}
   189				break
   190			}
   191			// We hold the lock during transmission to guarantee messages are
   192			// sent in sequence number order.  Also, we increment first so the
   193			// value of client.SeqNum is the value of the highest used sequence
   194			// number, not one beyond.
   195			client.mu.Lock()
   196			client.seqNum++
   197			hdr.SeqNum = client.seqNum
   198			client.seqLock.Lock() // guarantee ordering of messages
   199			client.mu.Unlock()
   200			err := client.encode(&hdr, payData, val.Interface())
   201			client.seqLock.Unlock()
   202			if err != nil {
   203				expLog("error encoding client response:", err)
   204				client.sendError(&hdr, err.String())
   205				break
   206			}
   207			// Negative count means run forever.
   208			if count >= 0 {
   209				if count--; count <= 0 {
   210					break
   211				}
   212			}
   213		}
   214	}
   215	
   216	// Receive and deliver locally one item from a client asking for a Send
   217	// The header is passed by value to avoid issues of overwriting.
   218	func (client *expClient) serveSend(hdr header) {
   219		nch := client.getChan(&hdr, Recv)
   220		if nch == nil {
   221			return
   222		}
   223		// Create a new value for each received item.
   224		val := reflect.New(nch.ch.Type().Elem()).Elem()
   225		if err := client.decode(val); err != nil {
   226			expLog("value decode:", err, "; type ", nch.ch.Type())
   227			return
   228		}
   229		nch.send(val)
   230	}
   231	
   232	// Report that client has closed the channel that is sending to us.
   233	// The header is passed by value to avoid issues of overwriting.
   234	func (client *expClient) serveClosed(hdr header) {
   235		nch := client.getChan(&hdr, Recv)
   236		if nch == nil {
   237			return
   238		}
   239		nch.close()
   240	}
   241	
   242	func (client *expClient) unackedCount() int64 {
   243		client.mu.Lock()
   244		n := client.seqNum - client.ackNum
   245		client.mu.Unlock()
   246		return n
   247	}
   248	
   249	func (client *expClient) seq() int64 {
   250		client.mu.Lock()
   251		n := client.seqNum
   252		client.mu.Unlock()
   253		return n
   254	}
   255	
   256	func (client *expClient) ack() int64 {
   257		client.mu.Lock()
   258		n := client.seqNum
   259		client.mu.Unlock()
   260		return n
   261	}
   262	
   263	// Serve waits for incoming connections on the listener
   264	// and serves the Exporter's channels on each.
   265	// It blocks until the listener is closed.
   266	func (exp *Exporter) Serve(listener net.Listener) {
   267		for {
   268			conn, err := listener.Accept()
   269			if err != nil {
   270				expLog("listen:", err)
   271				break
   272			}
   273			go exp.ServeConn(conn)
   274		}
   275	}
   276	
   277	// ServeConn exports the Exporter's channels on conn.
   278	// It blocks until the connection is terminated.
   279	func (exp *Exporter) ServeConn(conn io.ReadWriter) {
   280		exp.addClient(conn).run()
   281	}
   282	
   283	// NewExporter creates a new Exporter that exports a set of channels.
   284	func NewExporter() *Exporter {
   285		e := &Exporter{
   286			clientSet: &clientSet{
   287				names:   make(map[string]*chanDir),
   288				clients: make(map[unackedCounter]bool),
   289			},
   290		}
   291		return e
   292	}
   293	
   294	// ListenAndServe exports the exporter's channels through the
   295	// given network and local address defined as in net.Listen.
   296	func (exp *Exporter) ListenAndServe(network, localaddr string) os.Error {
   297		listener, err := net.Listen(network, localaddr)
   298		if err != nil {
   299			return err
   300		}
   301		go exp.Serve(listener)
   302		return nil
   303	}
   304	
   305	// addClient creates a new expClient and records its existence
   306	func (exp *Exporter) addClient(conn io.ReadWriter) *expClient {
   307		client := newClient(exp, conn)
   308		exp.mu.Lock()
   309		exp.clients[client] = true
   310		exp.mu.Unlock()
   311		return client
   312	}
   313	
   314	// delClient forgets the client existed
   315	func (exp *Exporter) delClient(client *expClient) {
   316		exp.mu.Lock()
   317		exp.clients[client] = false, false
   318		exp.mu.Unlock()
   319	}
   320	
   321	// Drain waits until all messages sent from this exporter/importer, including
   322	// those not yet sent to any client and possibly including those sent while
   323	// Drain was executing, have been received by the importer.  In short, it
   324	// waits until all the exporter's messages have been received by a client.
   325	// If the timeout (measured in nanoseconds) is positive and Drain takes
   326	// longer than that to complete, an error is returned.
   327	func (exp *Exporter) Drain(timeout int64) os.Error {
   328		// This wrapper function is here so the method's comment will appear in godoc.
   329		return exp.clientSet.drain(timeout)
   330	}
   331	
   332	// Sync waits until all clients of the exporter have received the messages
   333	// that were sent at the time Sync was invoked.  Unlike Drain, it does not
   334	// wait for messages sent while it is running or messages that have not been
   335	// dispatched to any client.  If the timeout (measured in nanoseconds) is
   336	// positive and Sync takes longer than that to complete, an error is
   337	// returned.
   338	func (exp *Exporter) Sync(timeout int64) os.Error {
   339		// This wrapper function is here so the method's comment will appear in godoc.
   340		return exp.clientSet.sync(timeout)
   341	}
   342	
   343	func checkChan(chT interface{}, dir Dir) (reflect.Value, os.Error) {
   344		chanType := reflect.TypeOf(chT)
   345		if chanType.Kind() != reflect.Chan {
   346			return reflect.Value{}, os.NewError("not a channel")
   347		}
   348		if dir != Send && dir != Recv {
   349			return reflect.Value{}, os.NewError("unknown channel direction")
   350		}
   351		switch chanType.ChanDir() {
   352		case reflect.BothDir:
   353		case reflect.SendDir:
   354			if dir != Recv {
   355				return reflect.Value{}, os.NewError("to import/export with Send, must provide <-chan")
   356			}
   357		case reflect.RecvDir:
   358			if dir != Send {
   359				return reflect.Value{}, os.NewError("to import/export with Recv, must provide chan<-")
   360			}
   361		}
   362		return reflect.ValueOf(chT), nil
   363	}
   364	
   365	// Export exports a channel of a given type and specified direction.  The
   366	// channel to be exported is provided in the call and may be of arbitrary
   367	// channel type.
   368	// Despite the literal signature, the effective signature is
   369	//	Export(name string, chT chan T, dir Dir)
   370	func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error {
   371		ch, err := checkChan(chT, dir)
   372		if err != nil {
   373			return err
   374		}
   375		exp.mu.Lock()
   376		defer exp.mu.Unlock()
   377		_, present := exp.names[name]
   378		if present {
   379			return os.NewError("channel name already being exported:" + name)
   380		}
   381		exp.names[name] = &chanDir{ch, dir}
   382		return nil
   383	}
   384	
   385	// Hangup disassociates the named channel from the Exporter and closes
   386	// the channel.  Messages in flight for the channel may be dropped.
   387	func (exp *Exporter) Hangup(name string) os.Error {
   388		exp.mu.Lock()
   389		chDir, ok := exp.names[name]
   390		if ok {
   391			exp.names[name] = nil, false
   392		}
   393		// TODO drop all instances of channel from client sets
   394		exp.mu.Unlock()
   395		if !ok {
   396			return os.NewError("netchan export: hangup: no such channel: " + name)
   397		}
   398		chDir.ch.Close()
   399		return nil
   400	}

release.r60.3. Except as noted, this content is licensed under a Creative Commons Attribution 3.0 License.