Skip to content

Commit

Permalink
Upgrade to google.golang.org/grpc v1.66.2
Browse files Browse the repository at this point in the history
Also upgrade github.com/googleapis/enterprise-certificate-proxy to v0.3.4, to fix a checksum issue in v0.3.3.

Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Oct 8, 2024
1 parent ec15d48 commit f620574
Show file tree
Hide file tree
Showing 98 changed files with 4,125 additions and 2,501 deletions.
6 changes: 1 addition & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
golang.org/x/net v0.30.0
golang.org/x/sync v0.8.0
golang.org/x/time v0.6.0
google.golang.org/grpc v1.66.0
google.golang.org/grpc v1.66.2
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down Expand Up @@ -316,7 +316,3 @@ replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-aler
// - https://github.com/grafana/franz-go/pull/3
// - https://github.com/grafana/franz-go/pull/4
replace github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241003081803-835b5cb1ddcf

// Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions.
// Following https://github.com/grafana/dskit/pull/581
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0
1,182 changes: 61 additions & 1,121 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,7 @@ func NextOrCleanup(next PushFunc, pushReq *Request) (_ PushFunc, maybeCleanup fu
func (d *Distributor) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
pushReq := NewParsedRequest(req)
pushReq.AddCleanup(func() {
req.FreeBuffer()
mimirpb.ReuseSlice(req.Timeseries)
})

Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
return ingesterQueryResult{}, err
}

defer func() {
resp.FreeBuffer()
}()

if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
Expand Down
22 changes: 22 additions & 0 deletions pkg/frontend/querymiddleware/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querymiddleware

import (
"google.golang.org/grpc/mem"

"github.com/grafana/mimir/pkg/mimirpb"
)

var _ mimirpb.UnmarshalerV2 = &PrometheusResponse{}

func (m *PrometheusResponse) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *PrometheusResponse) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}
4 changes: 4 additions & 0 deletions pkg/frontend/querymiddleware/model.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions pkg/frontend/querymiddleware/model.pb.go.expdiff
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
diff --git a/pkg/frontend/querymiddleware/model.pb.go b/pkg/frontend/querymiddleware/model.pb.go
index fe81fabf6..47f80838c 100644
--- a/pkg/frontend/querymiddleware/model.pb.go
+++ b/pkg/frontend/querymiddleware/model.pb.go
@@ -13,7 +13,6 @@ import (
types "github.com/gogo/protobuf/types"
github_com_grafana_mimir_pkg_mimirpb "github.com/grafana/mimir/pkg/mimirpb"
mimirpb "github.com/grafana/mimir/pkg/mimirpb"
- "google.golang.org/grpc/mem"
io "io"
math "math"
math_bits "math/bits"
@@ -91,9 +90,6 @@ type PrometheusResponse struct {
Headers []*PrometheusHeader `protobuf:"bytes,5,rep,name=Headers,proto3" json:"-"`
Warnings []string `protobuf:"bytes,6,rep,name=Warnings,proto3" json:"warnings,omitempty"`
Infos []string `protobuf:"bytes,7,rep,name=Infos,proto3" json:"infos,omitempty"`
-
- // Keep reference to buffer for unsafe references.
- buffer mem.Buffer
}

func (m *PrometheusResponse) Reset() { *m = PrometheusResponse{} }
23 changes: 20 additions & 3 deletions pkg/ingester/client/buffering_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -68,7 +70,12 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) {
}

reqs := serv.requests()
require.Equal(t, requestsToSend, reqs)
diff := cmp.Diff(requestsToSend, reqs, cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
return a.TimeSeries.Equal(b.TimeSeries)
}))
}))
require.Empty(t, diff)
})

t.Run("push with pooling", func(t *testing.T) {
Expand All @@ -85,7 +92,12 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) {
}

reqs := serv.requests()
require.Equal(t, requestsToSend, reqs)
diff := cmp.Diff(requestsToSend, reqs, cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
return a.TimeSeries.Equal(b.TimeSeries)
}))
}))
require.Empty(t, diff)

// Verify that pool was used.
require.Greater(t, pool.Gets.Load(), int64(0))
Expand Down Expand Up @@ -149,7 +161,12 @@ func TestWriteRequestBufferingClient_Push_WithMultipleMarshalCalls(t *testing.T)
_, err := bufferingClient.Push(ctx, req)
require.NoError(t, err)

require.Equal(t, serv.requests(), []*mimirpb.WriteRequest{req})
diff := cmp.Diff([]*mimirpb.WriteRequest{req}, serv.requests(), cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
return a.TimeSeries.Equal(b.TimeSeries)
}))
}))
require.Empty(t, diff)

// Verify that all buffers from the pool were returned.
require.Greater(t, pool.Gets.Load(), int64(0))
Expand Down
67 changes: 67 additions & 0 deletions pkg/ingester/client/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,51 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"google.golang.org/grpc/mem"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/chunk"
)

var _ mimirpb.UnmarshalerV2 = &QueryResponse{}

func (m *QueryResponse) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *QueryResponse) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}

var _ mimirpb.UnmarshalerV2 = &QueryStreamResponse{}

func (m *QueryStreamResponse) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *QueryStreamResponse) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}

var _ mimirpb.UnmarshalerV2 = &ExemplarQueryResponse{}

func (m *ExemplarQueryResponse) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *ExemplarQueryResponse) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}

func ChunksCount(series []TimeSeriesChunk) int {
if len(series) == 0 {
return 0
Expand Down Expand Up @@ -65,3 +106,29 @@ func ChunkFromMeta(meta chunks.Meta) (Chunk, error) {
func DefaultMetricsMetadataRequest() *MetricsMetadataRequest {
return &MetricsMetadataRequest{Limit: -1, LimitPerMetric: -1, Metric: ""}
}

var _ mimirpb.UnmarshalerV2 = &MetricsForLabelMatchersResponse{}

func (m *MetricsForLabelMatchersResponse) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *MetricsForLabelMatchersResponse) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}

var _ mimirpb.UnmarshalerV2 = &ActiveSeriesResponse{}

func (m *ActiveSeriesResponse) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *ActiveSeriesResponse) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}
16 changes: 16 additions & 0 deletions pkg/ingester/client/ingester.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 62 additions & 0 deletions pkg/ingester/client/ingester.pb.go.expdiff
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
diff --git a/pkg/ingester/client/ingester.pb.go b/pkg/ingester/client/ingester.pb.go
index 36860bb02..bbefc14b1 100644
--- a/pkg/ingester/client/ingester.pb.go
+++ b/pkg/ingester/client/ingester.pb.go
@@ -16,7 +16,6 @@ import (
mimirpb "github.com/grafana/mimir/pkg/mimirpb"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
- "google.golang.org/grpc/mem"
status "google.golang.org/grpc/status"
io "io"
math "math"
@@ -584,9 +583,6 @@ func (m *ActiveSeriesRequest) GetType() ActiveSeriesRequest_RequestType {

type QueryResponse struct {
Timeseries []mimirpb.TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
-
- // Keep reference to buffer for unsafe references.
- buffer mem.Buffer
}

func (m *QueryResponse) Reset() { *m = QueryResponse{} }
@@ -642,9 +638,6 @@ type QueryStreamResponse struct {
StreamingSeries []QueryStreamSeries `protobuf:"bytes,3,rep,name=streaming_series,json=streamingSeries,proto3" json:"streaming_series"`
IsEndOfSeriesStream bool `protobuf:"varint,4,opt,name=is_end_of_series_stream,json=isEndOfSeriesStream,proto3" json:"is_end_of_series_stream,omitempty"`
StreamingSeriesChunks []QueryStreamSeriesChunks `protobuf:"bytes,5,rep,name=streaming_series_chunks,json=streamingSeriesChunks,proto3" json:"streaming_series_chunks"`
-
- // Keep reference to buffer for unsafe references.
- buffer mem.Buffer
}

func (m *QueryStreamResponse) Reset() { *m = QueryStreamResponse{} }
@@ -811,9 +804,6 @@ func (m *QueryStreamSeriesChunks) GetChunks() []Chunk {

type ExemplarQueryResponse struct {
Timeseries []mimirpb.TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
-
- // Keep reference to buffer for unsafe references.
- buffer mem.Buffer
}

func (m *ExemplarQueryResponse) Reset() { *m = ExemplarQueryResponse{} }
@@ -1332,9 +1322,6 @@ func (m *MetricsForLabelMatchersRequest) GetMatchersSet() []*LabelMatchers {

type MetricsForLabelMatchersResponse struct {
Metric []*mimirpb.Metric `protobuf:"bytes,1,rep,name=metric,proto3" json:"metric,omitempty"`
-
- // Keep reference to buffer for unsafe references.
- buffer mem.Buffer
}

func (m *MetricsForLabelMatchersResponse) Reset() { *m = MetricsForLabelMatchersResponse{} }
@@ -1483,9 +1470,6 @@ type ActiveSeriesResponse struct {
// bucket_count is only used when the request type was NATIVE_HISTOGRAM_SERIES.
// bucket_count contains the native histogram active buckets count for each series in "metric" above.
BucketCount []uint64 `protobuf:"varint,2,rep,packed,name=bucket_count,json=bucketCount,proto3" json:"bucket_count,omitempty"`
-
- // Keep reference to buffer for unsafe references.
- buffer mem.Buffer
}

func (m *ActiveSeriesResponse) Reset() { *m = ActiveSeriesResponse{} }
5 changes: 4 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3849,7 +3849,10 @@ func (i *Ingester) checkAvailableForPush() error {

// PushToStorage implements ingest.Pusher interface for ingestion via ingest-storage.
func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error {
err := i.PushWithCleanup(ctx, req, func() { mimirpb.ReuseSlice(req.Timeseries) })
err := i.PushWithCleanup(ctx, req, func() {
req.FreeBuffer()
mimirpb.ReuseSlice(req.Timeseries)
})
if err != nil {
return mapPushErrorToErrorWithStatus(err)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3211,8 +3211,10 @@ func TestIngester_Push(t *testing.T) {

// Push timeseries
for idx, req := range testData.reqs {
// Push metrics to the ingester. Override the default cleanup method of mimirpb.ReuseSlice with a no-op one.
err := i.PushWithCleanup(ctx, req, func() {})
// Push metrics to the ingester.
err := i.PushWithCleanup(ctx, req, func() {
req.FreeBuffer()
})

// We expect no error on any request except the last one
// which may error (and in that case we assert on it)
Expand Down Expand Up @@ -5446,7 +5448,7 @@ func TestIngester_QueryStream_StreamingWithManySamples(t *testing.T) {
IsEndOfSeriesStream: true,
}

require.Equal(t, seriesLabelsMsg, *resp)
require.EqualExportedValues(t, seriesLabelsMsg, *resp)

recvMsgs := 0
series := 0
Expand Down
Loading

0 comments on commit f620574

Please sign in to comment.