Source file src/internal/trace/v2/generation.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package trace
     6  
     7  import (
     8  	"bufio"
     9  	"bytes"
    10  	"cmp"
    11  	"encoding/binary"
    12  	"fmt"
    13  	"io"
    14  	"slices"
    15  	"strings"
    16  
    17  	"internal/trace/v2/event"
    18  	"internal/trace/v2/event/go122"
    19  )
    20  
    21  // generation contains all the trace data for a single
    22  // trace generation. It is purely data: it does not
    23  // track any parse state nor does it contain a cursor
    24  // into the generation.
    25  type generation struct {
    26  	gen        uint64
    27  	batches    map[ThreadID][]batch
    28  	cpuSamples []cpuSample
    29  	*evTable
    30  }
    31  
    32  // spilledBatch represents a batch that was read out for the next generation,
    33  // while reading the previous one. It's passed on when parsing the next
    34  // generation.
    35  type spilledBatch struct {
    36  	gen uint64
    37  	*batch
    38  }
    39  
    40  // readGeneration buffers and decodes the structural elements of a trace generation
    41  // out of r. spill is the first batch of the new generation (already buffered and
    42  // parsed from reading the last generation). Returns the generation and the first
    43  // batch read of the next generation, if any.
    44  func readGeneration(r *bufio.Reader, spill *spilledBatch) (*generation, *spilledBatch, error) {
    45  	g := &generation{
    46  		evTable: new(evTable),
    47  		batches: make(map[ThreadID][]batch),
    48  	}
    49  	// Process the spilled batch.
    50  	if spill != nil {
    51  		g.gen = spill.gen
    52  		if err := processBatch(g, *spill.batch); err != nil {
    53  			return nil, nil, err
    54  		}
    55  		spill = nil
    56  	}
    57  	// Read batches one at a time until we either hit EOF or
    58  	// the next generation.
    59  	for {
    60  		b, gen, err := readBatch(r)
    61  		if err == io.EOF {
    62  			break
    63  		}
    64  		if err != nil {
    65  			return nil, nil, err
    66  		}
    67  		if gen == 0 {
    68  			// 0 is a sentinel used by the runtime, so we'll never see it.
    69  			return nil, nil, fmt.Errorf("invalid generation number %d", gen)
    70  		}
    71  		if g.gen == 0 {
    72  			// Initialize gen.
    73  			g.gen = gen
    74  		}
    75  		if gen == g.gen+1 { // TODO: advance this the same way the runtime does.
    76  			spill = &spilledBatch{gen: gen, batch: &b}
    77  			break
    78  		}
    79  		if gen != g.gen {
    80  			// N.B. Fail as fast as possible if we see this. At first it
    81  			// may seem prudent to be fault-tolerant and assume we have a
    82  			// complete generation, parsing and returning that first. However,
    83  			// if the batches are mixed across generations then it's likely
    84  			// we won't be able to parse this generation correctly at all.
    85  			// Rather than return a cryptic error in that case, indicate the
    86  			// problem as soon as we see it.
    87  			return nil, nil, fmt.Errorf("generations out of order")
    88  		}
    89  		if err := processBatch(g, b); err != nil {
    90  			return nil, nil, err
    91  		}
    92  	}
    93  
    94  	// Check some invariants.
    95  	if g.freq == 0 {
    96  		return nil, nil, fmt.Errorf("no frequency event found")
    97  	}
    98  	// N.B. Trust that the batch order is correct. We can't validate the batch order
    99  	// by timestamp because the timestamps could just be plain wrong. The source of
   100  	// truth is the order things appear in the trace and the partial order sequence
   101  	// numbers on certain events. If it turns out the batch order is actually incorrect
   102  	// we'll very likely fail to advance a partial order from the frontier.
   103  
   104  	// Compactify stacks and strings for better lookup performance later.
   105  	g.stacks.compactify()
   106  	g.strings.compactify()
   107  
   108  	// Validate stacks.
   109  	if err := validateStackStrings(&g.stacks, &g.strings); err != nil {
   110  		return nil, nil, err
   111  	}
   112  
   113  	// Fix up the CPU sample timestamps, now that we have freq.
   114  	for i := range g.cpuSamples {
   115  		s := &g.cpuSamples[i]
   116  		s.time = g.freq.mul(timestamp(s.time))
   117  	}
   118  	// Sort the CPU samples.
   119  	slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
   120  		return cmp.Compare(a.time, b.time)
   121  	})
   122  	return g, spill, nil
   123  }
   124  
   125  // processBatch adds the batch to the generation.
   126  func processBatch(g *generation, b batch) error {
   127  	switch {
   128  	case b.isStringsBatch():
   129  		if err := addStrings(&g.strings, b); err != nil {
   130  			return err
   131  		}
   132  	case b.isStacksBatch():
   133  		if err := addStacks(&g.stacks, b); err != nil {
   134  			return err
   135  		}
   136  	case b.isCPUSamplesBatch():
   137  		samples, err := addCPUSamples(g.cpuSamples, b)
   138  		if err != nil {
   139  			return err
   140  		}
   141  		g.cpuSamples = samples
   142  	case b.isFreqBatch():
   143  		freq, err := parseFreq(b)
   144  		if err != nil {
   145  			return err
   146  		}
   147  		if g.freq != 0 {
   148  			return fmt.Errorf("found multiple frequency events")
   149  		}
   150  		g.freq = freq
   151  	default:
   152  		g.batches[b.m] = append(g.batches[b.m], b)
   153  	}
   154  	return nil
   155  }
   156  
   157  // validateStackStrings makes sure all the string references in
   158  // the stack table are present in the string table.
   159  func validateStackStrings(stacks *dataTable[stackID, stack], strings *dataTable[stringID, string]) error {
   160  	var err error
   161  	stacks.forEach(func(id stackID, stk stack) bool {
   162  		for _, frame := range stk.frames {
   163  			_, ok := strings.get(frame.funcID)
   164  			if !ok {
   165  				err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
   166  				return false
   167  			}
   168  			_, ok = strings.get(frame.fileID)
   169  			if !ok {
   170  				err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
   171  				return false
   172  			}
   173  		}
   174  		return true
   175  	})
   176  	return err
   177  }
   178  
   179  // addStrings takes a batch whose first byte is an EvStrings event
   180  // (indicating that the batch contains only strings) and adds each
   181  // string contained therein to the provided strings map.
   182  func addStrings(stringTable *dataTable[stringID, string], b batch) error {
   183  	if !b.isStringsBatch() {
   184  		return fmt.Errorf("internal error: addStrings called on non-string batch")
   185  	}
   186  	r := bytes.NewReader(b.data)
   187  	hdr, err := r.ReadByte() // Consume the EvStrings byte.
   188  	if err != nil || event.Type(hdr) != go122.EvStrings {
   189  		return fmt.Errorf("missing strings batch header")
   190  	}
   191  
   192  	var sb strings.Builder
   193  	for r.Len() != 0 {
   194  		// Read the header.
   195  		ev, err := r.ReadByte()
   196  		if err != nil {
   197  			return err
   198  		}
   199  		if event.Type(ev) != go122.EvString {
   200  			return fmt.Errorf("expected string event, got %d", ev)
   201  		}
   202  
   203  		// Read the string's ID.
   204  		id, err := binary.ReadUvarint(r)
   205  		if err != nil {
   206  			return err
   207  		}
   208  
   209  		// Read the string's length.
   210  		len, err := binary.ReadUvarint(r)
   211  		if err != nil {
   212  			return err
   213  		}
   214  		if len > go122.MaxStringSize {
   215  			return fmt.Errorf("invalid string size %d, maximum is %d", len, go122.MaxStringSize)
   216  		}
   217  
   218  		// Copy out the string.
   219  		n, err := io.CopyN(&sb, r, int64(len))
   220  		if n != int64(len) {
   221  			return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
   222  		}
   223  		if err != nil {
   224  			return fmt.Errorf("copying string data: %w", err)
   225  		}
   226  
   227  		// Add the string to the map.
   228  		s := sb.String()
   229  		sb.Reset()
   230  		if err := stringTable.insert(stringID(id), s); err != nil {
   231  			return err
   232  		}
   233  	}
   234  	return nil
   235  }
   236  
   237  // addStacks takes a batch whose first byte is an EvStacks event
   238  // (indicating that the batch contains only stacks) and adds each
   239  // string contained therein to the provided stacks map.
   240  func addStacks(stackTable *dataTable[stackID, stack], b batch) error {
   241  	if !b.isStacksBatch() {
   242  		return fmt.Errorf("internal error: addStacks called on non-stacks batch")
   243  	}
   244  	r := bytes.NewReader(b.data)
   245  	hdr, err := r.ReadByte() // Consume the EvStacks byte.
   246  	if err != nil || event.Type(hdr) != go122.EvStacks {
   247  		return fmt.Errorf("missing stacks batch header")
   248  	}
   249  
   250  	for r.Len() != 0 {
   251  		// Read the header.
   252  		ev, err := r.ReadByte()
   253  		if err != nil {
   254  			return err
   255  		}
   256  		if event.Type(ev) != go122.EvStack {
   257  			return fmt.Errorf("expected stack event, got %d", ev)
   258  		}
   259  
   260  		// Read the stack's ID.
   261  		id, err := binary.ReadUvarint(r)
   262  		if err != nil {
   263  			return err
   264  		}
   265  
   266  		// Read how many frames are in each stack.
   267  		nFrames, err := binary.ReadUvarint(r)
   268  		if err != nil {
   269  			return err
   270  		}
   271  		if nFrames > go122.MaxFramesPerStack {
   272  			return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, go122.MaxFramesPerStack)
   273  		}
   274  
   275  		// Each frame consists of 4 fields: pc, funcID (string), fileID (string), line.
   276  		frames := make([]frame, 0, nFrames)
   277  		for i := uint64(0); i < nFrames; i++ {
   278  			// Read the frame data.
   279  			pc, err := binary.ReadUvarint(r)
   280  			if err != nil {
   281  				return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
   282  			}
   283  			funcID, err := binary.ReadUvarint(r)
   284  			if err != nil {
   285  				return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
   286  			}
   287  			fileID, err := binary.ReadUvarint(r)
   288  			if err != nil {
   289  				return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
   290  			}
   291  			line, err := binary.ReadUvarint(r)
   292  			if err != nil {
   293  				return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
   294  			}
   295  			frames = append(frames, frame{
   296  				pc:     pc,
   297  				funcID: stringID(funcID),
   298  				fileID: stringID(fileID),
   299  				line:   line,
   300  			})
   301  		}
   302  
   303  		// Add the stack to the map.
   304  		if err := stackTable.insert(stackID(id), stack{frames: frames}); err != nil {
   305  			return err
   306  		}
   307  	}
   308  	return nil
   309  }
   310  
   311  // addCPUSamples takes a batch whose first byte is an EvCPUSamples event
   312  // (indicating that the batch contains only CPU samples) and adds each
   313  // sample contained therein to the provided samples list.
   314  func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
   315  	if !b.isCPUSamplesBatch() {
   316  		return nil, fmt.Errorf("internal error: addStrings called on non-string batch")
   317  	}
   318  	r := bytes.NewReader(b.data)
   319  	hdr, err := r.ReadByte() // Consume the EvCPUSamples byte.
   320  	if err != nil || event.Type(hdr) != go122.EvCPUSamples {
   321  		return nil, fmt.Errorf("missing CPU samples batch header")
   322  	}
   323  
   324  	for r.Len() != 0 {
   325  		// Read the header.
   326  		ev, err := r.ReadByte()
   327  		if err != nil {
   328  			return nil, err
   329  		}
   330  		if event.Type(ev) != go122.EvCPUSample {
   331  			return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
   332  		}
   333  
   334  		// Read the sample's timestamp.
   335  		ts, err := binary.ReadUvarint(r)
   336  		if err != nil {
   337  			return nil, err
   338  		}
   339  
   340  		// Read the sample's M.
   341  		m, err := binary.ReadUvarint(r)
   342  		if err != nil {
   343  			return nil, err
   344  		}
   345  		mid := ThreadID(m)
   346  
   347  		// Read the sample's P.
   348  		p, err := binary.ReadUvarint(r)
   349  		if err != nil {
   350  			return nil, err
   351  		}
   352  		pid := ProcID(p)
   353  
   354  		// Read the sample's G.
   355  		g, err := binary.ReadUvarint(r)
   356  		if err != nil {
   357  			return nil, err
   358  		}
   359  		goid := GoID(g)
   360  		if g == 0 {
   361  			goid = NoGoroutine
   362  		}
   363  
   364  		// Read the sample's stack.
   365  		s, err := binary.ReadUvarint(r)
   366  		if err != nil {
   367  			return nil, err
   368  		}
   369  
   370  		// Add the sample to the slice.
   371  		samples = append(samples, cpuSample{
   372  			schedCtx: schedCtx{
   373  				M: mid,
   374  				P: pid,
   375  				G: goid,
   376  			},
   377  			time:  Time(ts), // N.B. this is really a "timestamp," not a Time.
   378  			stack: stackID(s),
   379  		})
   380  	}
   381  	return samples, nil
   382  }
   383  
   384  // parseFreq parses out a lone EvFrequency from a batch.
   385  func parseFreq(b batch) (frequency, error) {
   386  	if !b.isFreqBatch() {
   387  		return 0, fmt.Errorf("internal error: parseFreq called on non-frequency batch")
   388  	}
   389  	r := bytes.NewReader(b.data)
   390  	r.ReadByte() // Consume the EvFrequency byte.
   391  
   392  	// Read the frequency. It'll come out as timestamp units per second.
   393  	f, err := binary.ReadUvarint(r)
   394  	if err != nil {
   395  		return 0, err
   396  	}
   397  	// Convert to nanoseconds per timestamp unit.
   398  	return frequency(1.0 / (float64(f) / 1e9)), nil
   399  }
   400  

View as plain text