New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
strings: Join memory leak #54203
Comments
What happens to the strings that are returned? Are they being saved somewhere? |
strings are used by func (b *BulkConsumer) Consume(batchSize int, interval int, ctx context.Context, wg *sync.WaitGroup) {
chunk := make([]*params, batchSize)
i := 0
timer := time.NewTimer(time.Duration(interval) * time.Second)
commit := func() {
defer func() {
i = 0
}()
if i == 0 {
return
}
log.Debug("es update body: " + getUpdateStr(chunk, i))
// elasticsearch bulk
res, err := b.c.Bulk(strings.NewReader(getUpdateStr(chunk, i)))
log.Debug(res, err)
if err != nil || res.StatusCode >= 300 {
log.Errorf("es bulk fail. res: %s, error: %s", res, err)
}
pipe := b.r.Pipeline()
for j := 0; j < i; j++ {
pipe.SAdd("member_refresh_tag", chunk[j].memberId)
}
_, err = pipe.Exec()
if err != nil {
log.Errorf("set member_refresh_tag cache error. error: %s", err)
}
}
for {
select {
case <-timer.C:
commit()
timer.Reset(time.Duration(interval) * time.Second)
case p := <-b.ch:
chunk[i] = &p
i++
if i >= batchSize {
if !timer.Stop() {
<-timer.C
}
commit()
timer.Reset(time.Duration(interval) * time.Second)
}
case <-ctx.Done():
log.Info("ctx.Done()")
if !timer.Stop() {
<-timer.C
}
log.Info("bulk_consumer exiting")
commit()
log.Info("bulk_consumer exit")
wg.Done()
return
}
}
} |
They end up in I don't think it is efficient to debug this back and forth on an issue. If you can, please try to build a reproducible example that we can run ourselves. I suspect something is hanging on to the results. Nothing in the implementation of |
I notice this additional "consumption":
what log do you use? |
logrus |
It's a bug of Reproducible Code: package main
import (
"github.com/elastic/go-elasticsearch/v7"
"strings"
"time"
)
func getUpdateStr() string {
return strings.Join([]string{
"{\"update\": { \"_id\": 33141, \"_index\": \"member_tag_0\" }}",
`{"doc": { "channel_uid": 94248 }}`,
}, "")
}
func main() {
es, _ := elasticsearch.NewDefaultClient()
for {
_, _ = es.Bulk(strings.NewReader(getUpdateStr()))
time.Sleep(100 * time.Millisecond)
}
} |
Closing as not a bug in std |
The bug is that I didn't call the The flame graph looks weird. Total memory in flame graph is much less than real memory used by project |
What version of Go are you using (
go version
)?Does this issue reproduce with the latest release?
yes
What operating system and processor architecture are you using (
go env
)?go env
OutputWhat did you do?
I run elasticsearch bulk updates in goroutine. In goroutine, continually read update params from channel (about 20/s) and generate update string using this(below) code.
What did you expect to see?
stable memory
What did you see instead?
increasing memory
The text was updated successfully, but these errors were encountered: