Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: Add support for round #9651

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* `cortex_alertmanager_alerts`
* `cortex_alertmanager_silences`
* [CHANGE] Distributor: Drop experimental `-distributor.direct-otlp-translation-enabled` flag, since direct OTLP translation is well tested at this point. #9647
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] gRPC: Support S2 compression. #9322
* `-alertmanager.alertmanager-client.grpc-compression=s2`
Expand Down
56 changes: 47 additions & 9 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type InstantVectorFunctionOperatorFactory func(
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
annotations *annotations.Annotations,
expressionPosition posrange.PositionRange,
timeRange types.QueryTimeRange,
) (types.InstantVectorOperator, error)

type ScalarFunctionOperatorFactory func(
Expand All @@ -37,7 +38,7 @@ type ScalarFunctionOperatorFactory func(
// - name: The name of the function
// - f: The function implementation
func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionOverInstantVector) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args))
Expand Down Expand Up @@ -101,7 +102,7 @@ func FunctionOverRangeVectorOperatorFactory(
name string,
f functions.FunctionOverRangeVector,
) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args))
Expand All @@ -123,7 +124,7 @@ func FunctionOverRangeVectorOperatorFactory(
}
}

func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) {
func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for vector, got %v", len(args))
Expand All @@ -139,7 +140,7 @@ func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.Mem
}

func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 5 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 5 argument for label_replace, got %v", len(args))
Expand Down Expand Up @@ -190,7 +191,7 @@ func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory
}

func ClampFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 3 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 3 argument for clamp, got %v", len(args))
Expand Down Expand Up @@ -224,7 +225,7 @@ func ClampFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
}

func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 2 argument for %s, got %v", functionName, len(args))
Expand All @@ -251,6 +252,42 @@ func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) Instant
}
}

func RoundFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 1 && len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 or 2 argument for round, got %v", len(args))
}

inner, ok := args[0].(types.InstantVectorOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected an instant vector for 1st argument for round, got %T", args[0])
}

var toNearest types.ScalarOperator
if len(args) == 2 {
toNearest, ok = args[1].(types.ScalarOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected a scalar for 2nd argument for round, got %T", args[1])
}
} else {
toNearest = operators.NewScalarConstant(float64(1), timeRange, memoryConsumptionTracker, expressionPosition)
}

f := functions.FunctionOverInstantVector{
SeriesDataFunc: functions.Round,
// TODO(jhesketh): With the currently vendored prometheus, round does not consistently drop the __name__ label
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not consistently drop the __name__ label, or doesn't drop it at all?

Is there an issue link for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky. Against main, round does not return any name (as evidence by the tests here prometheus/prometheus#15176).

Against what we have vendored it does have the name. So the bug has seemingly since been fixed. I have not found an exact issue but assume it is related to the more recent delayed label name dropping work fixing something incidentally.

Also worth noting is that in the vendored version it has DropName: true,, so I'm not sure if that isn't being honoured somewhere (I haven't tracked down where).

The only upstream tests of round are in the aggregations.test file, such as:

eval instant at 50m round(0.005 * http_requests{group="production",job="api-server"})
	{group="production", instance="0", job="api-server"} 1
	{group="production", instance="1", job="api-server"} 1

Which due to the multiplication has the resulting label dropped.

So it wouldn't surprise me if it went unnoticed upstream and also fixed as part of the label work (also unnoticed).

Copy link
Contributor

@charleskorn charleskorn Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, thanks for the explanation.

Also worth noting is that in the vendored version it has DropName: true,, so I'm not sure if that isn't being honoured somewhere (I haven't tracked down where).

DropName is only used when the experimental new delayed __name__ dropping is enabled, and this is disabled by default.

I would rephrase the comment slightly to make it clearer, something like this perhaps:

Suggested change
// TODO(jhesketh): With the currently vendored prometheus, round does not consistently drop the __name__ label
// TODO(jhesketh): With the version of Prometheus vendored at the time of writing, round does not drop the __name__ label, and this is verified by our tests.
// We match this behaviour for consistency, but this behaviour has changed in Prometheus 3.0, so we'll need to match that once Prometheus 3.0 is vendored in.

// (as verified by our tests). We match this for consistency, but will
// need to drop them once prometheus 3.0 is vendored in.
SeriesMetadataFunction: functions.SeriesMetadataFunctionDefinition{},
}

return operators.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition), nil
}
}

// These functions return an instant-vector.
var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{
// Please keep this list sorted alphabetically.
Expand All @@ -265,8 +302,8 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"avg_over_time": FunctionOverRangeVectorOperatorFactory("avg_over_time", functions.AvgOverTime),
"ceil": InstantVectorTransformationFunctionOperatorFactory("ceil", functions.Ceil),
"clamp": ClampFunctionOperatorFactory(),
"clamp_min": ClampMinMaxFunctionOperatorFactory("clamp_min", true),
"clamp_max": ClampMinMaxFunctionOperatorFactory("clamp_max", false),
"clamp_min": ClampMinMaxFunctionOperatorFactory("clamp_min", true),
"cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos),
"cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh),
"count_over_time": FunctionOverRangeVectorOperatorFactory("count_over_time", functions.CountOverTime),
Expand All @@ -278,14 +315,15 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase),
"label_replace": LabelReplaceFunctionOperatorFactory(),
"last_over_time": FunctionOverRangeVectorOperatorFactory("last_over_time", functions.LastOverTime),
"max_over_time": FunctionOverRangeVectorOperatorFactory("max_over_time", functions.MaxOverTime),
"min_over_time": FunctionOverRangeVectorOperatorFactory("min_over_time", functions.MinOverTime),
"ln": InstantVectorTransformationFunctionOperatorFactory("ln", functions.Ln),
"log10": InstantVectorTransformationFunctionOperatorFactory("log10", functions.Log10),
"log2": InstantVectorTransformationFunctionOperatorFactory("log2", functions.Log2),
"max_over_time": FunctionOverRangeVectorOperatorFactory("max_over_time", functions.MaxOverTime),
"min_over_time": FunctionOverRangeVectorOperatorFactory("min_over_time", functions.MinOverTime),
"present_over_time": FunctionOverRangeVectorOperatorFactory("present_over_time", functions.PresentOverTime),
"rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad),
"rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate),
"round": RoundFunctionOperatorFactory(),
"sgn": InstantVectorTransformationFunctionOperatorFactory("sgn", functions.Sgn),
"sin": InstantVectorTransformationFunctionOperatorFactory("sin", functions.Sin),
"sinh": InstantVectorTransformationFunctionOperatorFactory("sinh", functions.Sinh),
Expand Down
29 changes: 29 additions & 0 deletions pkg/streamingpromql/functions/math.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,32 @@ func ClampMinMaxFactory(isMin bool) InstantVectorSeriesFunction {
return seriesData, nil
}
}

// round returns a number rounded to toNearest.
// Ties are solved by rounding up.
var Round InstantVectorSeriesFunction = func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
toNearest := scalarArgsData[0]

// There will always be a scalar at every step of the query.
// However, there may not be a sample at a step. So we need to
// keep track of where we are up to step-wise with the scalars,
// incrementing through the scalars until their timestamp matches
// the samples.
argIdx := 0

for step, data := range seriesData.Floats {
for data.T > toNearest.Samples[argIdx].T {
argIdx++
}

// Invert as it seems to cause fewer floating point accuracy issues.
toNearestInverse := 1.0 / toNearest.Samples[argIdx].F

// We reuse the existing FPoint slice in place
seriesData.Floats[step].F = math.Floor(data.F*toNearestInverse+0.5) / toNearestInverse
}
// Histograms are dropped from Round
types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker)
seriesData.Histograms = nil
return seriesData, nil
}
2 changes: 1 addition & 1 deletion pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (q *Query) convertFunctionCallToInstantVectorOperator(e *parser.Call) (type
args[i] = a
}

return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange)
return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, q.timeRange)
}

func (q *Query) convertToRangeVectorOperator(expr parser.Expr) (types.RangeVectorOperator, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ eval range from 0 to 5m step 1m clamp_min(mixed_metric, 2)

eval range from 0 to 5m step 1m clamp_max(mixed_metric, 2)
{} _ 1 2 2

# round ignores any histograms
# Prometheus currently returns 0 instead of no value as per the documentation
# https://github.com/prometheus/prometheus/pull/15176
eval range from 0 to 5m step 1m round(mixed_metric)
mixed_metric _ 1 2 3
18 changes: 18 additions & 0 deletions pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -436,3 +436,21 @@ eval range from 0 to 54m step 6m clamp_min(series, scalar(mins))

eval range from 0 to 54m step 6m clamp_max(series, scalar(maxes))
{} 0 _ 0 0 5 _ 3 NaN 2 NaN

clear

load 6m
toNearest 1 _ 3 4 0.1
series{a="b"} -5.5 2.75 _ _ 6.87

eval range from 0 to 24m step 6m round(series, scalar(toNearest))
series{a="b"} -5 NaN _ _ 6.9

clear

load 6m
toNearest 1 -0.1 0.5 10 0.1 0.25 100 0.0001 5 1000 NaN _ _ NaN _ 2 2
series{a="b"} -5.5 2.75 0.25 15.5 9.999 3.14159 -0.999999 1000.01 0.49999999 -1e6 10 20 NaN NaN _ _ 10.5

eval range from 0 to 96m step 6m round(series, scalar(toNearest))
series{a="b"} -5 2.7 0.5 20 10 3.25 0 1000.01 0 -1000000 NaN NaN NaN NaN _ _ 10
84 changes: 36 additions & 48 deletions pkg/streamingpromql/testdata/upstream/aggregators.test
Original file line number Diff line number Diff line change
Expand Up @@ -132,70 +132,58 @@ eval instant at 50m ceil(0.004 * http_requests{group="production",job="api-serve
{group="production", instance="0", job="api-server"} 1
{group="production", instance="1", job="api-server"} 1

# Unsupported by streaming engine.
# eval instant at 50m round(0.004 * http_requests{group="production",job="api-server"})
# {group="production", instance="0", job="api-server"} 0
# {group="production", instance="1", job="api-server"} 1
eval instant at 50m round(0.004 * http_requests{group="production",job="api-server"})
{group="production", instance="0", job="api-server"} 0
{group="production", instance="1", job="api-server"} 1

# Round should correctly handle negative numbers.
# Unsupported by streaming engine.
# eval instant at 50m round(-1 * (0.004 * http_requests{group="production",job="api-server"}))
# {group="production", instance="0", job="api-server"} 0
# {group="production", instance="1", job="api-server"} -1
eval instant at 50m round(-1 * (0.004 * http_requests{group="production",job="api-server"}))
{group="production", instance="0", job="api-server"} 0
{group="production", instance="1", job="api-server"} -1

# Round should round half up.
# Unsupported by streaming engine.
# eval instant at 50m round(0.005 * http_requests{group="production",job="api-server"})
# {group="production", instance="0", job="api-server"} 1
# {group="production", instance="1", job="api-server"} 1
eval instant at 50m round(0.005 * http_requests{group="production",job="api-server"})
{group="production", instance="0", job="api-server"} 1
{group="production", instance="1", job="api-server"} 1

# Unsupported by streaming engine.
# eval instant at 50m round(-1 * (0.005 * http_requests{group="production",job="api-server"}))
# {group="production", instance="0", job="api-server"} 0
# {group="production", instance="1", job="api-server"} -1
eval instant at 50m round(-1 * (0.005 * http_requests{group="production",job="api-server"}))
{group="production", instance="0", job="api-server"} 0
{group="production", instance="1", job="api-server"} -1

# Unsupported by streaming engine.
# eval instant at 50m round(1 + 0.005 * http_requests{group="production",job="api-server"})
# {group="production", instance="0", job="api-server"} 2
# {group="production", instance="1", job="api-server"} 2
eval instant at 50m round(1 + 0.005 * http_requests{group="production",job="api-server"})
{group="production", instance="0", job="api-server"} 2
{group="production", instance="1", job="api-server"} 2

# Unsupported by streaming engine.
# eval instant at 50m round(-1 * (1 + 0.005 * http_requests{group="production",job="api-server"}))
# {group="production", instance="0", job="api-server"} -1
# {group="production", instance="1", job="api-server"} -2
eval instant at 50m round(-1 * (1 + 0.005 * http_requests{group="production",job="api-server"}))
{group="production", instance="0", job="api-server"} -1
{group="production", instance="1", job="api-server"} -2

# Round should accept the number to round nearest to.
# Unsupported by streaming engine.
# eval instant at 50m round(0.0005 * http_requests{group="production",job="api-server"}, 0.1)
# {group="production", instance="0", job="api-server"} 0.1
# {group="production", instance="1", job="api-server"} 0.1
eval instant at 50m round(0.0005 * http_requests{group="production",job="api-server"}, 0.1)
{group="production", instance="0", job="api-server"} 0.1
{group="production", instance="1", job="api-server"} 0.1

# Unsupported by streaming engine.
# eval instant at 50m round(2.1 + 0.0005 * http_requests{group="production",job="api-server"}, 0.1)
# {group="production", instance="0", job="api-server"} 2.2
# {group="production", instance="1", job="api-server"} 2.2
eval instant at 50m round(2.1 + 0.0005 * http_requests{group="production",job="api-server"}, 0.1)
{group="production", instance="0", job="api-server"} 2.2
{group="production", instance="1", job="api-server"} 2.2

# Unsupported by streaming engine.
# eval instant at 50m round(5.2 + 0.0005 * http_requests{group="production",job="api-server"}, 0.1)
# {group="production", instance="0", job="api-server"} 5.3
# {group="production", instance="1", job="api-server"} 5.3
eval instant at 50m round(5.2 + 0.0005 * http_requests{group="production",job="api-server"}, 0.1)
{group="production", instance="0", job="api-server"} 5.3
{group="production", instance="1", job="api-server"} 5.3

# Round should work correctly with negative numbers and multiple decimal places.
# Unsupported by streaming engine.
# eval instant at 50m round(-1 * (5.2 + 0.0005 * http_requests{group="production",job="api-server"}), 0.1)
# {group="production", instance="0", job="api-server"} -5.2
# {group="production", instance="1", job="api-server"} -5.3
eval instant at 50m round(-1 * (5.2 + 0.0005 * http_requests{group="production",job="api-server"}), 0.1)
{group="production", instance="0", job="api-server"} -5.2
{group="production", instance="1", job="api-server"} -5.3

# Round should work correctly with big toNearests.
# Unsupported by streaming engine.
# eval instant at 50m round(0.025 * http_requests{group="production",job="api-server"}, 5)
# {group="production", instance="0", job="api-server"} 5
# {group="production", instance="1", job="api-server"} 5
eval instant at 50m round(0.025 * http_requests{group="production",job="api-server"}, 5)
{group="production", instance="0", job="api-server"} 5
{group="production", instance="1", job="api-server"} 5

# Unsupported by streaming engine.
# eval instant at 50m round(0.045 * http_requests{group="production",job="api-server"}, 5)
# {group="production", instance="0", job="api-server"} 5
# {group="production", instance="1", job="api-server"} 10
eval instant at 50m round(0.045 * http_requests{group="production",job="api-server"}, 5)
{group="production", instance="0", job="api-server"} 5
{group="production", instance="1", job="api-server"} 10

# Standard deviation and variance.
eval instant at 50m stddev(http_requests)
Expand Down