Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributor: fix pool buffer reuse logic when distributor.max-request-pool-buffer-size is set #9666

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* [BUGFIX] Fix issue where sharded queries could return annotations with incorrect or confusing position information. #9536
* [BUGFIX] Fix issue where downstream consumers may not generate correct cache keys for experimental error caching. #9644
* [BUGFIX] Fix issue where active series requests error when encountering a stale posting. #9580
* [BUGFIX] Fix pooling buffer reuse logic when `-distributor.max-request-pool-buffer-size` is set. #9666

### Mixin

Expand Down
7 changes: 1 addition & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices := []services.Service(nil)
subservices = append(subservices, haTracker)

var requestBufferPool util.Pool
if cfg.MaxRequestPoolBufferSize > 0 {
requestBufferPool = util.NewBucketedBufferPool(1<<10, cfg.MaxRequestPoolBufferSize, 4)
} else {
requestBufferPool = util.NewBufferPool()
}
requestBufferPool := util.NewBufferPool(cfg.MaxRequestPoolBufferSize)

d := &Distributor{
cfg: cfg,
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) {

return nil
}
h := OTLPHandler(200, util.NewBufferPool(), nil, otlpLimitsMock{}, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger())
h := OTLPHandler(200, util.NewBufferPool(0), nil, otlpLimitsMock{}, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger())
srv.HTTP.Handle("/otlp", h)

// start the server
Expand Down
36 changes: 18 additions & 18 deletions pkg/util/requestbuffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,45 @@ package util
import (
"bytes"
"sync"

"github.com/grafana/mimir/pkg/util/pool"
)

const defaultPoolBufferCap = 256 * 1024

// Pool is an abstraction for a pool of byte slices.
type Pool interface {
// Get returns a new byte slices that fits the given size.
Get(sz int) []byte
// Get returns a new byte slices.
Get() []byte

// Put puts a slice back into the pool.
Put(s []byte)
}

type bufferPool struct {
p sync.Pool
maxBufferCap int
p sync.Pool
}

func (p *bufferPool) Get(_ int) []byte { return p.p.Get().([]byte) }
func (p *bufferPool) Put(s []byte) { p.p.Put(s) } //nolint:staticcheck
func (p *bufferPool) Get() []byte { return p.p.Get().([]byte) }
func (p *bufferPool) Put(s []byte) {
if p.maxBufferCap > 0 && cap(s) > p.maxBufferCap {
return // Discard large buffers
}
p.p.Put(s) //nolint:staticcheck
}

// NewBufferPool returns a new Pool for byte slices.
func NewBufferPool() Pool {
// If maxBufferCapacity is 0, the pool will not have a maximum capacity.
func NewBufferPool(maxBufferCapacity int) Pool {
return &bufferPool{
maxBufferCap: maxBufferCapacity,
p: sync.Pool{
New: func() interface{} {
return make([]byte, 0, 256*1024)
return make([]byte, 0, defaultPoolBufferCap)
},
},
}
}

// NewBucketedBufferPool returns a new Pool for byte slices with bucketing.
// The pool will have buckets for sizes from minSize to maxSize increasing by the given factor.
func NewBucketedBufferPool(minSize, maxSize int, factor float64) Pool {
return pool.NewBucketedPool(minSize, maxSize, factor, func(sz int) []byte {
return make([]byte, 0, sz)
})
}

// RequestBuffers provides pooled request buffers.
type RequestBuffers struct {
p Pool
Expand All @@ -70,7 +70,7 @@ func (rb *RequestBuffers) Get(size int) *bytes.Buffer {
return bytes.NewBuffer(make([]byte, 0, size))
}

b := rb.p.Get(size)
b := rb.p.Get()
buf := bytes.NewBuffer(b)
buf.Reset()
if size > 0 {
Expand Down
75 changes: 47 additions & 28 deletions pkg/util/requestbuffers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ import (
)

func TestRequestBuffers(t *testing.T) {
const maxBufferSize = 32 * 1024

rb := NewRequestBuffers(&fakePool{maxBufferSize: maxBufferSize})
rb := NewRequestBuffers(&fakePool{})
t.Cleanup(rb.CleanUp)

b := rb.Get(1024)
require.NotNil(t, b)
assert.Equal(t, 1024, b.Cap())
assert.GreaterOrEqual(t, b.Cap(), 1024)
assert.Zero(t, b.Len())
// Make sure that the buffer gets reset upon next Get
_, err := b.Write([]byte("test"))
Expand All @@ -30,20 +28,9 @@ func TestRequestBuffers(t *testing.T) {
// to test if it reuses the previously returned buffer.
b1 := rb.Get(1024)
assert.Same(t, unsafe.SliceData(b1.Bytes()), unsafe.SliceData(b.Bytes()))
assert.Equal(t, 1024, b1.Cap())
assert.GreaterOrEqual(t, b1.Cap(), 1024)
assert.Zero(t, b1.Len())

// Retrieve a buffer larger than maxBufferSize to ensure
// it doesn't get reused.
b2 := rb.Get(maxBufferSize + 1)
assert.Equal(t, maxBufferSize+1, b2.Cap())
assert.Zero(t, b2.Len())

rb.CleanUp()

b3 := rb.Get(maxBufferSize + 1)
assert.NotSame(t, unsafe.SliceData(b2.Bytes()), unsafe.SliceData(b3.Bytes()))

t.Run("as nil pointer", func(t *testing.T) {
var rb *RequestBuffers
b := rb.Get(1024)
Expand All @@ -60,24 +47,56 @@ func TestRequestBuffers(t *testing.T) {
})
}

func TestRequestsBuffersMaxPoolBufferSize(t *testing.T) {
const maxPoolBufferCap = defaultPoolBufferCap

t.Run("pool buffer is reused when size is less or equal to maxBufferSize", func(t *testing.T) {
rb := NewRequestBuffers(&fakePool{maxBufferCap: maxPoolBufferCap})
t.Cleanup(rb.CleanUp)

b0 := rb.Get(maxPoolBufferCap)
require.NotNil(t, b0)
assert.Zero(t, b0.Len())
assert.GreaterOrEqual(t, b0.Cap(), maxPoolBufferCap)

rb.CleanUp()

b1 := rb.Get(maxPoolBufferCap)
assert.Same(t, unsafe.SliceData(b0.Bytes()), unsafe.SliceData(b1.Bytes()))
})
t.Run("pool buffer is not reused when size is greater than maxBufferSize", func(t *testing.T) {
rb := NewRequestBuffers(NewBufferPool(maxPoolBufferCap))
t.Cleanup(rb.CleanUp)

b0 := rb.Get(maxPoolBufferCap + 1)
require.NotNil(t, b0)
assert.Zero(t, b0.Len())
assert.GreaterOrEqual(t, b0.Cap(), maxPoolBufferCap+1)

rb.CleanUp()

b1 := rb.Get(maxPoolBufferCap + 1)
assert.NotSame(t, unsafe.SliceData(b0.Bytes()), unsafe.SliceData(b1.Bytes()))
})
}

type fakePool struct {
maxBufferSize int
buffers [][]byte
maxBufferCap int
buffers [][]byte
}

func (p *fakePool) Get(sz int) []byte {
if sz <= p.maxBufferSize {
for i, b := range p.buffers {
if cap(b) < sz {
continue
}
p.buffers = append(p.buffers[:i], p.buffers[i+1:]...)
return b
}
func (p *fakePool) Get() []byte {
if len(p.buffers) > 0 {
buf := p.buffers[0]
p.buffers = p.buffers[1:]
return buf
}
return make([]byte, 0, sz)
return make([]byte, 0, defaultPoolBufferCap)
}

func (p *fakePool) Put(s []byte) {
if p.maxBufferCap > 0 && cap(s) > p.maxBufferCap {
return
}
p.buffers = append(p.buffers, s[:0])
}
2 changes: 1 addition & 1 deletion tools/trafficdump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

const maxBufferPoolSize = 1024 * 1024

var bufferPool = util.NewBucketedBufferPool(1e3, maxBufferPoolSize, 2)
var bufferPool = util.NewBufferPool(maxBufferPoolSize)

type parser struct {
processorConfig processorConfig
Expand Down