Source file src/internal/trace/v2/reader.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package trace
     6  
     7  import (
     8  	"bufio"
     9  	"fmt"
    10  	"io"
    11  	"slices"
    12  	"strings"
    13  
    14  	"internal/trace/v2/event/go122"
    15  	"internal/trace/v2/version"
    16  )
    17  
    18  // Reader reads a byte stream, validates it, and produces trace events.
    19  type Reader struct {
    20  	r           *bufio.Reader
    21  	lastTs      Time
    22  	gen         *generation
    23  	spill       *spilledBatch
    24  	frontier    []*batchCursor
    25  	cpuSamples  []cpuSample
    26  	order       ordering
    27  	emittedSync bool
    28  }
    29  
    30  // NewReader creates a new trace reader.
    31  func NewReader(r io.Reader) (*Reader, error) {
    32  	br := bufio.NewReader(r)
    33  	v, err := version.ReadHeader(br)
    34  	if err != nil {
    35  		return nil, err
    36  	}
    37  	if v != version.Go122 {
    38  		return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
    39  	}
    40  	return &Reader{
    41  		r: br,
    42  		order: ordering{
    43  			mStates:     make(map[ThreadID]*mState),
    44  			pStates:     make(map[ProcID]*pState),
    45  			gStates:     make(map[GoID]*gState),
    46  			activeTasks: make(map[TaskID]taskState),
    47  		},
    48  		// Don't emit a sync event when we first go to emit events.
    49  		emittedSync: true,
    50  	}, nil
    51  }
    52  
    53  // ReadEvent reads a single event from the stream.
    54  //
    55  // If the stream has been exhausted, it returns an invalid
    56  // event and io.EOF.
    57  func (r *Reader) ReadEvent() (e Event, err error) {
    58  	// Go 1.22+ trace parsing algorithm.
    59  	//
    60  	// (1) Read in all the batches for the next generation from the stream.
    61  	//   (a) Use the size field in the header to quickly find all batches.
    62  	// (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data.
    63  	// (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.)
    64  	// (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M.
    65  	// (5) Try to advance the next event for the M at the top of the min-heap.
    66  	//   (a) On success, select that M.
    67  	//   (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances.
    68  	//   (c) If there's nothing left to advance, goto (1).
    69  	// (6) Select the latest event for the selected M and get it ready to be returned.
    70  	// (7) Read the next event for the selected M and update the min-heap.
    71  	// (8) Return the selected event, goto (5) on the next call.
    72  
    73  	// Set us up to track the last timestamp and fix up
    74  	// the timestamp of any event that comes through.
    75  	defer func() {
    76  		if err != nil {
    77  			return
    78  		}
    79  		if err = e.validateTableIDs(); err != nil {
    80  			return
    81  		}
    82  		if e.base.time <= r.lastTs {
    83  			e.base.time = r.lastTs + 1
    84  		}
    85  		r.lastTs = e.base.time
    86  	}()
    87  
    88  	// Consume any extra events produced during parsing.
    89  	if ev := r.order.consumeExtraEvent(); ev.Kind() != EventBad {
    90  		return ev, nil
    91  	}
    92  
    93  	// Check if we need to refresh the generation.
    94  	if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
    95  		if !r.emittedSync {
    96  			r.emittedSync = true
    97  			return syncEvent(r.gen.evTable, r.lastTs), nil
    98  		}
    99  		if r.gen != nil && r.spill == nil {
   100  			// If we have a generation from the last read,
   101  			// and there's nothing left in the frontier, and
   102  			// there's no spilled batch, indicating that there's
   103  			// no further generation, it means we're done.
   104  			// Return io.EOF.
   105  			return Event{}, io.EOF
   106  		}
   107  		// Read the next generation.
   108  		r.gen, r.spill, err = readGeneration(r.r, r.spill)
   109  		if err != nil {
   110  			return Event{}, err
   111  		}
   112  
   113  		// Reset CPU samples cursor.
   114  		r.cpuSamples = r.gen.cpuSamples
   115  
   116  		// Reset frontier.
   117  		for m, batches := range r.gen.batches {
   118  			bc := &batchCursor{m: m}
   119  			ok, err := bc.nextEvent(batches, r.gen.freq)
   120  			if err != nil {
   121  				return Event{}, err
   122  			}
   123  			if !ok {
   124  				// Turns out there aren't actually any events in these batches.
   125  				continue
   126  			}
   127  			r.frontier = heapInsert(r.frontier, bc)
   128  		}
   129  
   130  		// Reset emittedSync.
   131  		r.emittedSync = false
   132  	}
   133  	refresh := func(i int) error {
   134  		bc := r.frontier[i]
   135  
   136  		// Refresh the cursor's event.
   137  		ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
   138  		if err != nil {
   139  			return err
   140  		}
   141  		if ok {
   142  			// If we successfully refreshed, update the heap.
   143  			heapUpdate(r.frontier, i)
   144  		} else {
   145  			// There's nothing else to read. Delete this cursor from the frontier.
   146  			r.frontier = heapRemove(r.frontier, i)
   147  		}
   148  		return nil
   149  	}
   150  	// Inject a CPU sample if it comes next.
   151  	if len(r.cpuSamples) != 0 {
   152  		if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
   153  			e := r.cpuSamples[0].asEvent(r.gen.evTable)
   154  			r.cpuSamples = r.cpuSamples[1:]
   155  			return e, nil
   156  		}
   157  	}
   158  	// Try to advance the head of the frontier, which should have the minimum timestamp.
   159  	// This should be by far the most common case
   160  	if len(r.frontier) == 0 {
   161  		return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
   162  	}
   163  	bc := r.frontier[0]
   164  	if ctx, ok, err := r.order.advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); err != nil {
   165  		return Event{}, err
   166  	} else if ok {
   167  		e := Event{table: r.gen.evTable, ctx: ctx, base: bc.ev}
   168  		return e, refresh(0)
   169  	}
   170  	// Sort the min-heap. A sorted min-heap is still a min-heap,
   171  	// but now we can iterate over the rest and try to advance in
   172  	// order. This path should be rare.
   173  	slices.SortFunc(r.frontier, (*batchCursor).compare)
   174  	// Try to advance the rest of the frontier, in timestamp order.
   175  	for i := 1; i < len(r.frontier); i++ {
   176  		bc := r.frontier[i]
   177  		if ctx, ok, err := r.order.advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); err != nil {
   178  			return Event{}, err
   179  		} else if ok {
   180  			e := Event{table: r.gen.evTable, ctx: ctx, base: bc.ev}
   181  			return e, refresh(i)
   182  		}
   183  	}
   184  	return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
   185  }
   186  
   187  func dumpFrontier(frontier []*batchCursor) string {
   188  	var sb strings.Builder
   189  	for _, bc := range frontier {
   190  		spec := go122.Specs()[bc.ev.typ]
   191  		fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
   192  		for i, arg := range spec.Args[1:] {
   193  			fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
   194  		}
   195  		fmt.Fprintf(&sb, "]\n")
   196  	}
   197  	return sb.String()
   198  }
   199  

View as plain text