package netchan
import (
"gob"
"net"
"os"
"reflect"
"sync"
"time"
)
type Dir int
const (
Recv Dir = iota
Send
)
const (
payRequest = iota
payError
payData
payAck
)
type header struct {
name string
payloadType int
seqNum int64
}
type request struct {
count int64
dir Dir
}
type error struct {
error string
}
type unackedCounter interface {
unackedCount() int64
ack() int64
seq() int64
}
type chanDir struct {
ch *reflect.ChanValue
dir Dir
}
type clientSet struct {
mu sync.Mutex
chans map[string]*chanDir
clients map[unackedCounter]bool
}
type encDec struct {
decLock sync.Mutex
dec *gob.Decoder
encLock sync.Mutex
enc *gob.Encoder
}
func newEncDec(conn net.Conn) *encDec {
return &encDec{
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(conn),
}
}
func (ed *encDec) decode(value reflect.Value) os.Error {
ed.decLock.Lock()
err := ed.dec.DecodeValue(value)
if err != nil {
}
ed.decLock.Unlock()
return err
}
func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) os.Error {
ed.encLock.Lock()
hdr.payloadType = payloadType
err := ed.enc.Encode(hdr)
if err == nil {
if payload != nil {
err = ed.enc.Encode(payload)
}
}
if err != nil {
}
ed.encLock.Unlock()
return err
}
func (cs *clientSet) drain(timeout int64) os.Error {
startTime := time.Nanoseconds()
for {
pending := false
cs.mu.Lock()
for _, chDir := range cs.chans {
if chDir.ch.Len() > 0 {
pending = true
}
}
for client := range cs.clients {
n := client.unackedCount()
if n > 0 {
pending = true
break
}
}
cs.mu.Unlock()
if !pending {
break
}
if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
return os.ErrorString("timeout")
}
time.Sleep(100 * 1e6)
}
return nil
}
func (cs *clientSet) sync(timeout int64) os.Error {
startTime := time.Nanoseconds()
seq := make(map[unackedCounter]int64)
for client := range cs.clients {
seq[client] = client.seq()
}
for {
pending := false
cs.mu.Lock()
for client := range seq {
if _, ok := cs.clients[client]; ok {
if client.ack() < seq[client] {
pending = true
break
}
}
}
cs.mu.Unlock()
if !pending {
break
}
if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
return os.ErrorString("timeout")
}
time.Sleep(100 * 1e6)
}
return nil
}