Source file src/net/textproto/pipeline.go

     1  // Copyright 2010 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package textproto
     6  
     7  import (
     8  	"sync"
     9  )
    10  
    11  // A Pipeline manages a pipelined in-order request/response sequence.
    12  //
    13  // To use a Pipeline p to manage multiple clients on a connection,
    14  // each client should run:
    15  //
    16  //	id := p.Next()	// take a number
    17  //
    18  //	p.StartRequest(id)	// wait for turn to send request
    19  //	«send request»
    20  //	p.EndRequest(id)	// notify Pipeline that request is sent
    21  //
    22  //	p.StartResponse(id)	// wait for turn to read response
    23  //	«read response»
    24  //	p.EndResponse(id)	// notify Pipeline that response is read
    25  //
    26  // A pipelined server can use the same calls to ensure that
    27  // responses computed in parallel are written in the correct order.
    28  type Pipeline struct {
    29  	mu       sync.Mutex
    30  	id       uint
    31  	request  sequencer
    32  	response sequencer
    33  }
    34  
    35  // Next returns the next id for a request/response pair.
    36  func (p *Pipeline) Next() uint {
    37  	p.mu.Lock()
    38  	id := p.id
    39  	p.id++
    40  	p.mu.Unlock()
    41  	return id
    42  }
    43  
    44  // StartRequest blocks until it is time to send (or, if this is a server, receive)
    45  // the request with the given id.
    46  func (p *Pipeline) StartRequest(id uint) {
    47  	p.request.Start(id)
    48  }
    49  
    50  // EndRequest notifies p that the request with the given id has been sent
    51  // (or, if this is a server, received).
    52  func (p *Pipeline) EndRequest(id uint) {
    53  	p.request.End(id)
    54  }
    55  
    56  // StartResponse blocks until it is time to receive (or, if this is a server, send)
    57  // the request with the given id.
    58  func (p *Pipeline) StartResponse(id uint) {
    59  	p.response.Start(id)
    60  }
    61  
    62  // EndResponse notifies p that the response with the given id has been received
    63  // (or, if this is a server, sent).
    64  func (p *Pipeline) EndResponse(id uint) {
    65  	p.response.End(id)
    66  }
    67  
    68  // A sequencer schedules a sequence of numbered events that must
    69  // happen in order, one after the other. The event numbering must start
    70  // at 0 and increment without skipping. The event number wraps around
    71  // safely as long as there are not 2^32 simultaneous events pending.
    72  type sequencer struct {
    73  	mu   sync.Mutex
    74  	id   uint
    75  	wait map[uint]chan struct{}
    76  }
    77  
    78  // Start waits until it is time for the event numbered id to begin.
    79  // That is, except for the first event, it waits until End(id-1) has
    80  // been called.
    81  func (s *sequencer) Start(id uint) {
    82  	s.mu.Lock()
    83  	if s.id == id {
    84  		s.mu.Unlock()
    85  		return
    86  	}
    87  	c := make(chan struct{})
    88  	if s.wait == nil {
    89  		s.wait = make(map[uint]chan struct{})
    90  	}
    91  	s.wait[id] = c
    92  	s.mu.Unlock()
    93  	<-c
    94  }
    95  
    96  // End notifies the sequencer that the event numbered id has completed,
    97  // allowing it to schedule the event numbered id+1.  It is a run-time error
    98  // to call End with an id that is not the number of the active event.
    99  func (s *sequencer) End(id uint) {
   100  	s.mu.Lock()
   101  	if s.id != id {
   102  		s.mu.Unlock()
   103  		panic("out of sync")
   104  	}
   105  	id++
   106  	s.id = id
   107  	if s.wait == nil {
   108  		s.wait = make(map[uint]chan struct{})
   109  	}
   110  	c, ok := s.wait[id]
   111  	if ok {
   112  		delete(s.wait, id)
   113  	}
   114  	s.mu.Unlock()
   115  	if ok {
   116  		close(c)
   117  	}
   118  }
   119  

View as plain text