diff --git a/CHANGELOG.md b/CHANGELOG.md index ceccc576737..da479b1a02e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ * [BUGFIX] Fix issue where sharded queries could return annotations with incorrect or confusing position information. #9536 * [BUGFIX] Fix issue where downstream consumers may not generate correct cache keys for experimental error caching. #9644 * [BUGFIX] Fix issue where active series requests error when encountering a stale posting. #9580 +* [BUGFIX] Fix pooling buffer reuse logic when `-distributor.max-request-pool-buffer-size` is set. #9666 ### Mixin diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d7feae79ef1..7ca90a4185e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -324,12 +324,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove subservices := []services.Service(nil) subservices = append(subservices, haTracker) - var requestBufferPool util.Pool - if cfg.MaxRequestPoolBufferSize > 0 { - requestBufferPool = util.NewBucketedBufferPool(1<<10, cfg.MaxRequestPoolBufferSize, 4) - } else { - requestBufferPool = util.NewBufferPool() - } + requestBufferPool := util.NewBufferPool(cfg.MaxRequestPoolBufferSize) d := &Distributor{ cfg: cfg, diff --git a/pkg/distributor/push_test.go b/pkg/distributor/push_test.go index 48c6f00ba41..686373bc823 100644 --- a/pkg/distributor/push_test.go +++ b/pkg/distributor/push_test.go @@ -1183,7 +1183,7 @@ func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) { return nil } - h := OTLPHandler(200, util.NewBufferPool(), nil, otlpLimitsMock{}, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger()) + h := OTLPHandler(200, util.NewBufferPool(0), nil, otlpLimitsMock{}, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger()) srv.HTTP.Handle("/otlp", h) // start the server diff --git a/pkg/util/requestbuffers.go b/pkg/util/requestbuffers.go index 68c39d3a59d..41cbf9eb97e 100644 --- a/pkg/util/requestbuffers.go +++ b/pkg/util/requestbuffers.go @@ -5,45 +5,45 @@ package util import ( "bytes" "sync" - - "github.com/grafana/mimir/pkg/util/pool" ) +const defaultPoolBufferCap = 256 * 1024 + // Pool is an abstraction for a pool of byte slices. type Pool interface { - // Get returns a new byte slices that fits the given size. - Get(sz int) []byte + // Get returns a new byte slices. + Get() []byte // Put puts a slice back into the pool. Put(s []byte) } type bufferPool struct { - p sync.Pool + maxBufferCap int + p sync.Pool } -func (p *bufferPool) Get(_ int) []byte { return p.p.Get().([]byte) } -func (p *bufferPool) Put(s []byte) { p.p.Put(s) } //nolint:staticcheck +func (p *bufferPool) Get() []byte { return p.p.Get().([]byte) } +func (p *bufferPool) Put(s []byte) { + if p.maxBufferCap > 0 && cap(s) > p.maxBufferCap { + return // Discard large buffers + } + p.p.Put(s) //nolint:staticcheck +} // NewBufferPool returns a new Pool for byte slices. -func NewBufferPool() Pool { +// If maxBufferCapacity is 0, the pool will not have a maximum capacity. +func NewBufferPool(maxBufferCapacity int) Pool { return &bufferPool{ + maxBufferCap: maxBufferCapacity, p: sync.Pool{ New: func() interface{} { - return make([]byte, 0, 256*1024) + return make([]byte, 0, defaultPoolBufferCap) }, }, } } -// NewBucketedBufferPool returns a new Pool for byte slices with bucketing. -// The pool will have buckets for sizes from minSize to maxSize increasing by the given factor. -func NewBucketedBufferPool(minSize, maxSize int, factor float64) Pool { - return pool.NewBucketedPool(minSize, maxSize, factor, func(sz int) []byte { - return make([]byte, 0, sz) - }) -} - // RequestBuffers provides pooled request buffers. type RequestBuffers struct { p Pool @@ -70,7 +70,7 @@ func (rb *RequestBuffers) Get(size int) *bytes.Buffer { return bytes.NewBuffer(make([]byte, 0, size)) } - b := rb.p.Get(size) + b := rb.p.Get() buf := bytes.NewBuffer(b) buf.Reset() if size > 0 { diff --git a/pkg/util/requestbuffers_test.go b/pkg/util/requestbuffers_test.go index a72901bdd3e..3f857190ff7 100644 --- a/pkg/util/requestbuffers_test.go +++ b/pkg/util/requestbuffers_test.go @@ -10,14 +10,12 @@ import ( ) func TestRequestBuffers(t *testing.T) { - const maxBufferSize = 32 * 1024 - - rb := NewRequestBuffers(&fakePool{maxBufferSize: maxBufferSize}) + rb := NewRequestBuffers(&fakePool{}) t.Cleanup(rb.CleanUp) b := rb.Get(1024) require.NotNil(t, b) - assert.Equal(t, 1024, b.Cap()) + assert.GreaterOrEqual(t, b.Cap(), 1024) assert.Zero(t, b.Len()) // Make sure that the buffer gets reset upon next Get _, err := b.Write([]byte("test")) @@ -30,20 +28,9 @@ func TestRequestBuffers(t *testing.T) { // to test if it reuses the previously returned buffer. b1 := rb.Get(1024) assert.Same(t, unsafe.SliceData(b1.Bytes()), unsafe.SliceData(b.Bytes())) - assert.Equal(t, 1024, b1.Cap()) + assert.GreaterOrEqual(t, b1.Cap(), 1024) assert.Zero(t, b1.Len()) - // Retrieve a buffer larger than maxBufferSize to ensure - // it doesn't get reused. - b2 := rb.Get(maxBufferSize + 1) - assert.Equal(t, maxBufferSize+1, b2.Cap()) - assert.Zero(t, b2.Len()) - - rb.CleanUp() - - b3 := rb.Get(maxBufferSize + 1) - assert.NotSame(t, unsafe.SliceData(b2.Bytes()), unsafe.SliceData(b3.Bytes())) - t.Run("as nil pointer", func(t *testing.T) { var rb *RequestBuffers b := rb.Get(1024) @@ -60,24 +47,56 @@ func TestRequestBuffers(t *testing.T) { }) } +func TestRequestsBuffersMaxPoolBufferSize(t *testing.T) { + const maxPoolBufferCap = defaultPoolBufferCap + + t.Run("pool buffer is reused when size is less or equal to maxBufferSize", func(t *testing.T) { + rb := NewRequestBuffers(&fakePool{maxBufferCap: maxPoolBufferCap}) + t.Cleanup(rb.CleanUp) + + b0 := rb.Get(maxPoolBufferCap) + require.NotNil(t, b0) + assert.Zero(t, b0.Len()) + assert.GreaterOrEqual(t, b0.Cap(), maxPoolBufferCap) + + rb.CleanUp() + + b1 := rb.Get(maxPoolBufferCap) + assert.Same(t, unsafe.SliceData(b0.Bytes()), unsafe.SliceData(b1.Bytes())) + }) + t.Run("pool buffer is not reused when size is greater than maxBufferSize", func(t *testing.T) { + rb := NewRequestBuffers(NewBufferPool(maxPoolBufferCap)) + t.Cleanup(rb.CleanUp) + + b0 := rb.Get(maxPoolBufferCap + 1) + require.NotNil(t, b0) + assert.Zero(t, b0.Len()) + assert.GreaterOrEqual(t, b0.Cap(), maxPoolBufferCap+1) + + rb.CleanUp() + + b1 := rb.Get(maxPoolBufferCap + 1) + assert.NotSame(t, unsafe.SliceData(b0.Bytes()), unsafe.SliceData(b1.Bytes())) + }) +} + type fakePool struct { - maxBufferSize int - buffers [][]byte + maxBufferCap int + buffers [][]byte } -func (p *fakePool) Get(sz int) []byte { - if sz <= p.maxBufferSize { - for i, b := range p.buffers { - if cap(b) < sz { - continue - } - p.buffers = append(p.buffers[:i], p.buffers[i+1:]...) - return b - } +func (p *fakePool) Get() []byte { + if len(p.buffers) > 0 { + buf := p.buffers[0] + p.buffers = p.buffers[1:] + return buf } - return make([]byte, 0, sz) + return make([]byte, 0, defaultPoolBufferCap) } func (p *fakePool) Put(s []byte) { + if p.maxBufferCap > 0 && cap(s) > p.maxBufferCap { + return + } p.buffers = append(p.buffers, s[:0]) } diff --git a/tools/trafficdump/parser.go b/tools/trafficdump/parser.go index 99fe2ca95c5..d00fadd993e 100644 --- a/tools/trafficdump/parser.go +++ b/tools/trafficdump/parser.go @@ -26,7 +26,7 @@ import ( const maxBufferPoolSize = 1024 * 1024 -var bufferPool = util.NewBucketedBufferPool(1e3, maxBufferPoolSize, 2) +var bufferPool = util.NewBufferPool(maxBufferPoolSize) type parser struct { processorConfig processorConfig