Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x/time/rate: Limiter allows more configured with multiple goroutines #65508

Open
ianzhang1988 opened this issue Feb 4, 2024 · 3 comments
Open
Labels
NeedsInvestigation Someone must examine and confirm this is a valid issue and not a duplicate of an existing one.
Milestone

Comments

@ianzhang1988
Copy link

ianzhang1988 commented Feb 4, 2024

Go version

go version go1.18.5 linux/amd64

Output of go env in your module/workspace:

GO111MODULE="on"
GOARCH="amd64"
GOBIN=""
GOCACHE="/root/.cache/go-build"
GOENV="/root/.config/go/env"
GOEXE=""
GOEXPERIMENT=""
GOFLAGS=""
GOHOSTARCH="amd64"
GOHOSTOS="linux"
GOINSECURE=""
GOMODCACHE="/root/.gvm/pkgsets/go1.18.5/global/pkg/mod"
GONOPROXY=""
GONOSUMDB=""
GOOS="linux"
GOPATH="/root/.gvm/pkgsets/go1.18.5/global"
GOPRIVATE=""
GOPROXY="https://goproxy.cn,direct"
GOROOT="/root/.gvm/gos/go1.18.5"
GOSUMDB="sum.golang.org"
GOTMPDIR=""
GOTOOLDIR="/root/.gvm/gos/go1.18.5/pkg/tool/linux_amd64"
GOVCS=""
GOVERSION="go1.18.5"
GCCGO="gccgo"
GOAMD64="v1"
AR="ar"
CC="gcc"
CXX="g++"
CGO_ENABLED="0"
GOMOD="/data/zhangyang/goproject/go.mod"
GOWORK=""
CGO_CFLAGS="-g -O2"
CGO_CPPFLAGS=""
CGO_CXXFLAGS="-g -O2"
CGO_FFLAGS="-g -O2"
CGO_LDFLAGS="-g -O2"
PKG_CONFIG="pkg-config"
GOGCCFLAGS="-fPIC -m64 -fmessage-length=0 -fdebug-prefix-map=/tmp/go-build3572669853=/tmp/go-build -gno-record-gcc-switches"

What did you do?

Here is my code

package main

import (
	"fmt"
	"time"
	. "golang.org/x/time/rate"
)

func Produce(ch chan uint64) {
	var counter uint64 = 0
	for {
		ch <- counter
		counter += 1
	}
}

func ConsumWithLimitDelay(ch chan uint64, chout chan uint64, lim *Limiter) {
	for {
		n := lim.Reserve()
		if !n.OK() {
			continue
		}

		time.Sleep(n.Delay())
		chout <- <-ch
	}
}

func Count(ch chan uint64) {
	var counter uint64 = 0

	lastTime := time.Now()

	for {
		_ = <-ch
		counter += 1
		du := time.Since(lastTime)
		if du > 1*time.Second {
			fmt.Printf("%d %v\n", counter, du)
			counter = 0
			lastTime = lastTime.Add(du)
		}
	}
}

func main() {
	ch := make(chan uint64, 1000)
	ch2 := make(chan uint64, 100)
	go Produce(ch)
	lim := NewLimiter(200000.0, 1000)
	for i := 0; i < 100; i++ {
		go ConsumWithLimitDelay(ch, ch2, lim)
	}
	Count(ch2)
}

limit is set to 200k, but actually is 300k on my machine.

here is what i think is going wrong in rate.go:
sometimes lim.advance(t) would return a time t which is in the past, then update to lim.last and causing problem.
(see // !!! comment in code below)

func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
	defer lim.mu.Unlock()

	if lim.limit == Inf {
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: t,
		}
	} else if lim.limit == 0 {
		var ok bool
		if lim.burst >= n {
			ok = true
			lim.burst -= n
		}
		return Reservation{
			ok:        ok,
			lim:       lim,
			tokens:    lim.burst,
			timeToAct: t,
		}
	}

        // !!! with this block of code would set rate.go work correctly
	// if t.Before(lim.last) {
	// 	return Reservation{
	// 		ok:    false,
	// 		lim:   lim,
	// 		limit: lim.limit,
	// 	}
	// }

        // !!! t could be in the past in multiple goroutine
	t, tokens := lim.advance(t)

	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = t.Add(waitDuration)

                // !!! some time here would update last to a past time, causing the problem
		// Update state
		lim.last = t
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	}

	return r
}

func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
	last := lim.last
	if t.Before(last) {
		last = t
	}

	// Calculate the new number of tokens, due to time that passed.
	elapsed := t.Sub(last)
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}
	return t, tokens
}

What did you see happen?

limiter can't limit properly in multiple goroutine

What did you expect to see?

limiter limit properly in multiple goroutine

@gopherbot gopherbot added this to the Unreleased milestone Feb 4, 2024
@seankhliao seankhliao changed the title x/time: rate limiter not working properly in multiple goroutine x/time/rate: Limiter allows more configured with multiple goroutines Feb 6, 2024
@seankhliao seankhliao added the NeedsInvestigation Someone must examine and confirm this is a valid issue and not a duplicate of an existing one. label Feb 6, 2024
@seankhliao
Copy link
Member

cc @Sajmani maybe?

it does appear that by passing a time before a lock is obtained to reserveN that drift can be an issue (so setting t = time.Now() in reserveN results in an accurate limit).

related #23145 ?

@ianzhang1988
Copy link
Author

Is there any update on this?

@ianzhang1988
Copy link
Author

I think the fix is simple as below.

func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
	last := lim.last
	if t.Before(last) {
		// last = t
		t = last // here
	}

	// Calculate the new number of tokens, due to time that passed.
	elapsed := t.Sub(last)
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}
	return t, tokens
}

if t is in the past, advance should not produce any token. In the sense that token for this t is already produced in last call of advance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
NeedsInvestigation Someone must examine and confirm this is a valid issue and not a duplicate of an existing one.
Projects
None yet
Development

No branches or pull requests

3 participants