Source file src/internal/poll/splice_linux_test.go

     1  // Copyright 2021 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package poll_test
     6  
     7  import (
     8  	"internal/poll"
     9  	"runtime"
    10  	"sync"
    11  	"sync/atomic"
    12  	"testing"
    13  	"time"
    14  )
    15  
    16  var closeHook atomic.Value // func(fd int)
    17  
    18  func init() {
    19  	closeFunc := poll.CloseFunc
    20  	poll.CloseFunc = func(fd int) (err error) {
    21  		if v := closeHook.Load(); v != nil {
    22  			if hook := v.(func(int)); hook != nil {
    23  				hook(fd)
    24  			}
    25  		}
    26  		return closeFunc(fd)
    27  	}
    28  }
    29  
    30  func TestSplicePipePool(t *testing.T) {
    31  	const N = 64
    32  	var (
    33  		p          *poll.SplicePipe
    34  		ps         []*poll.SplicePipe
    35  		allFDs     []int
    36  		pendingFDs sync.Map // fd → struct{}{}
    37  		err        error
    38  	)
    39  
    40  	closeHook.Store(func(fd int) { pendingFDs.Delete(fd) })
    41  	t.Cleanup(func() { closeHook.Store((func(int))(nil)) })
    42  
    43  	for i := 0; i < N; i++ {
    44  		p, _, err = poll.GetPipe()
    45  		if err != nil {
    46  			t.Skipf("failed to create pipe due to error(%v), skip this test", err)
    47  		}
    48  		_, pwfd := poll.GetPipeFds(p)
    49  		allFDs = append(allFDs, pwfd)
    50  		pendingFDs.Store(pwfd, struct{}{})
    51  		ps = append(ps, p)
    52  	}
    53  	for _, p = range ps {
    54  		poll.PutPipe(p)
    55  	}
    56  	ps = nil
    57  	p = nil
    58  
    59  	// Exploit the timeout of "go test" as a timer for the subsequent verification.
    60  	timeout := 5 * time.Minute
    61  	if deadline, ok := t.Deadline(); ok {
    62  		timeout = deadline.Sub(time.Now())
    63  		timeout -= timeout / 10 // Leave 10% headroom for cleanup.
    64  	}
    65  	expiredTime := time.NewTimer(timeout)
    66  	defer expiredTime.Stop()
    67  
    68  	// Trigger garbage collection repeatedly, waiting for all pipes in sync.Pool
    69  	// to either be deallocated and closed, or to time out.
    70  	for {
    71  		runtime.GC()
    72  		time.Sleep(10 * time.Millisecond)
    73  
    74  		// Detect whether all pipes are closed properly.
    75  		var leakedFDs []int
    76  		pendingFDs.Range(func(k, v any) bool {
    77  			leakedFDs = append(leakedFDs, k.(int))
    78  			return true
    79  		})
    80  		if len(leakedFDs) == 0 {
    81  			break
    82  		}
    83  
    84  		select {
    85  		case <-expiredTime.C:
    86  			t.Logf("all descriptors: %v", allFDs)
    87  			t.Fatalf("leaked descriptors: %v", leakedFDs)
    88  		default:
    89  		}
    90  	}
    91  }
    92  
    93  func BenchmarkSplicePipe(b *testing.B) {
    94  	b.Run("SplicePipeWithPool", func(b *testing.B) {
    95  		for i := 0; i < b.N; i++ {
    96  			p, _, err := poll.GetPipe()
    97  			if err != nil {
    98  				continue
    99  			}
   100  			poll.PutPipe(p)
   101  		}
   102  	})
   103  	b.Run("SplicePipeWithoutPool", func(b *testing.B) {
   104  		for i := 0; i < b.N; i++ {
   105  			p := poll.NewPipe()
   106  			if p == nil {
   107  				b.Skip("newPipe returned nil")
   108  			}
   109  			poll.DestroyPipe(p)
   110  		}
   111  	})
   112  }
   113  
   114  func BenchmarkSplicePipePoolParallel(b *testing.B) {
   115  	b.RunParallel(func(pb *testing.PB) {
   116  		for pb.Next() {
   117  			p, _, err := poll.GetPipe()
   118  			if err != nil {
   119  				continue
   120  			}
   121  			poll.PutPipe(p)
   122  		}
   123  	})
   124  }
   125  
   126  func BenchmarkSplicePipeNativeParallel(b *testing.B) {
   127  	b.RunParallel(func(pb *testing.PB) {
   128  		for pb.Next() {
   129  			p := poll.NewPipe()
   130  			if p == nil {
   131  				b.Skip("newPipe returned nil")
   132  			}
   133  			poll.DestroyPipe(p)
   134  		}
   135  	})
   136  }
   137  

View as plain text