Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poc cost attribution #9392

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
55 changes: 55 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -4292,6 +4292,28 @@
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cost_attribution_label",
"required": false,
"desc": "Label used to define the cost attribution. This label will be included in the specified distributor and ingester metrics for each write request, allowing them to be distinguished by the label. The label applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total. Set to an empty string to disable cost attribution.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "validation.cost-attribution-label",
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_cost_attribution_per_user",
"required": false,
"desc": "Maximum number of cost attribution labels allowed per user.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "validation.max-cost-attribution-per-user",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "ruler_evaluation_delay_duration",
Expand Down Expand Up @@ -18128,6 +18150,17 @@
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "custom_registry_path",
"required": false,
"desc": "Defines a custom path for the registry. When specified, Mimir will expose cost attribution metrics through this custom path, if not specified, cost attribution metrics won't be exposed.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "custom-registry-path",
"fieldType": "string",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "timeseries_unmarshal_caching_optimization_enabled",
Expand All @@ -18138,6 +18171,28 @@
"fieldFlag": "timeseries-unmarshal-caching-optimization-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cost_attribution_eviction_interval",
"required": false,
"desc": "Time interval at which inactive cost attributions will be evicted from the cache.",
"fieldValue": null,
"fieldDefaultValue": 1800000000000,
"fieldFlag": "cost-attribution-eviction-interval",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cost_attribution_cool_down_duration",
"required": false,
"desc": "Duration during which any cost attribution for a user will be marked as __unaccounted__ after exceeding the specified limit, prior to resetting the cache.",
"fieldValue": null,
"fieldDefaultValue": 1200000000000,
"fieldFlag": "cost-attribution-cool-down-duration",
"fieldType": "duration",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,12 @@ Usage of ./cmd/mimir/mimir:
Expands ${var} or $var in config according to the values of the environment variables.
-config.file value
Configuration file to load.
-cost-attribution-cool-down-duration duration
[experimental] Duration during which any cost attribution for a user will be marked as __unaccounted__ after exceeding the specified limit, prior to resetting the cache. (default 20m0s)
-cost-attribution-eviction-interval duration
[experimental] Time interval at which inactive cost attributions will be evicted from the cache. (default 30m0s)
-custom-registry-path string
Defines a custom path for the registry. When specified, Mimir will expose cost attribution metrics through this custom path, if not specified, cost attribution metrics won't be exposed.
-debug.block-profile-rate int
Fraction of goroutine blocking events that are reported in the blocking profile. 1 to include every blocking event in the profile, 0 to disable.
-debug.mutex-profile-fraction int
Expand Down Expand Up @@ -3059,10 +3065,14 @@ Usage of ./cmd/mimir/mimir:
Enable anonymous usage reporting. (default true)
-usage-stats.installation-mode string
Installation mode. Supported values: custom, helm, jsonnet. (default "custom")
-validation.cost-attribution-label string
[experimental] Label used to define the cost attribution. This label will be included in the specified distributor and ingester metrics for each write request, allowing them to be distinguished by the label. The label applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total. Set to an empty string to disable cost attribution.
-validation.create-grace-period duration
Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + creation_grace_period)'. This configuration is enforced in the distributor and ingester. (default 10m)
-validation.enforce-metadata-metric-name
Enforce every metadata has a metric name. (default true)
-validation.max-cost-attribution-per-user int
[experimental] Maximum number of cost attribution labels allowed per user.
-validation.max-label-names-per-series int
Maximum number of label names per series. (default 30)
-validation.max-length-label-name int
Expand Down
158 changes: 158 additions & 0 deletions pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package costattribution

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/mimir/pkg/util/validation"
)

type Manager struct {
services.Service
logger log.Logger
attributionTracker *attributionTrackerGroup
inactiveTimeout time.Duration
invalidValue string
}

// NewManager creates a new cost attribution manager. which is responsible for managing the cost attribution of series.
// It will clean up inactive series and update the cost attribution of series every 3 minutes.
func NewManager(cleanupInterval, inactiveTimeout time.Duration, cooldownTimeout time.Duration, logger log.Logger, limits *validation.Overrides) *Manager {
s := &Manager{
attributionTracker: newAttributionTrackerGroup(limits, cooldownTimeout),
inactiveTimeout: inactiveTimeout,
logger: logger,
invalidValue: "__unaccounted__",
}

s.Service = services.NewTimerService(cleanupInterval, nil, s.iteration, nil).WithName("cost attribution manager")
return s
}

func (m *Manager) iteration(_ context.Context) error {
m.attributionTracker.purgeInactiveAttributions(m.inactiveTimeout)
return nil
}

// EnabledForUser returns true if the cost attribution is enabled for the user
func (m *Manager) EnabledForUser(userID string) bool {
return m.attributionTracker.limits.CostAttributionLabel(userID) != ""
}

// UserAttributionLabel returns the cost attribution label for the user, first it will try to get the label from the cache,
// If not found, it will get the label from the config
// If the user is not enabled for cost attribution, it would clean the cache and return empty string
func (m *Manager) UserAttributionLabel(userID string) string {
if m.EnabledForUser(userID) {
return m.attributionTracker.getUserAttributionLabelFromCache(userID)
}
m.attributionTracker.deleteUserTracerFromCache(userID)
return ""
}

// UserAttributionLimit returns the cost attribution limit for the user, first it will try to get the limit from the cache,
// If not found, it will get the limit from the config
// If the user is not enabled for cost attribution, it would clean the cache and return 0
func (m *Manager) UserAttributionLimit(userID string) int {
if m.EnabledForUser(userID) {
return m.attributionTracker.getUserAttributionLimitFromCache(userID)
}
m.attributionTracker.deleteUserTracerFromCache(userID)
return 0
}

func (m *Manager) UpdateAttributionTimestamp(user string, calb string, lbs labels.Labels, now time.Time) (bool, string) {
// if cost attribution is not enabled for the user, return empty string
if !m.EnabledForUser(user) {
m.attributionTracker.deleteUserTracerFromCache(user)
return false, ""
}

// when cost attribution is enabled, the label has to be set. the cache would be updated with the label
lb := m.attributionTracker.getUserAttributionLabelFromCache(user)
// this should not happened, if user is enabled for cost attribution, the label has to be set

isUpdated := calb != lb
if lb == "" {
return isUpdated, ""
}
val := lbs.Get(lb)

if m.attributionTracker.attributionLimitExceeded(user, val, now) {
val = m.invalidValue
level.Error(m.logger).Log("msg", fmt.Sprintf("set attribution label to \"%s\" since user has reached the limit of cost attribution labels", m.invalidValue))
}
m.attributionTracker.updateAttributionCacheForUser(user, lb, val, now)
return isUpdated, val
}

// SetActiveSeries adjust the input attribution and sets the active series gauge for the given user and attribution
func (m *Manager) SetActiveSeries(userID, calb, attribution string, value float64) {
// if the input label is outdated, we skip the update
if calb != m.UserAttributionLabel(userID) {
return
}
attribution = m.adjustUserAttribution(userID, attribution)

m.attributionTracker.mu.Lock()
defer m.attributionTracker.mu.Unlock()
if tracker, exists := m.attributionTracker.trackersByUserID[userID]; exists {
tracker.activeSeriesPerUserAttribution.WithLabelValues(userID, attribution).Set(value)
}
}

// IncrementDiscardedSamples increments the discarded samples counter for a given user and attribution
func (m *Manager) IncrementDiscardedSamples(userID string, lbs labels.Labels, value float64, now time.Time) {
if !m.EnabledForUser(userID) {
return
}
calb := m.UserAttributionLabel(userID)
_, attribution := m.UpdateAttributionTimestamp(userID, calb, lbs, now)

m.attributionTracker.mu.RLock()
defer m.attributionTracker.mu.RUnlock()
if tracker, exists := m.attributionTracker.trackersByUserID[userID]; exists {
tracker.discardedSampleAttribution.WithLabelValues(userID, attribution).Add(value)
}
}

// IncrementReceivedSamples increments the received samples counter for a given user and attribution
func (m *Manager) IncrementReceivedSamples(userID string, lbs labels.Labels, value float64, now time.Time) {
if !m.EnabledForUser(userID) {
return
}
calb := m.UserAttributionLabel(userID)
_, attribution := m.UpdateAttributionTimestamp(userID, calb, lbs, now)
m.attributionTracker.mu.RLock()
defer m.attributionTracker.mu.RUnlock()
if tracker, exists := m.attributionTracker.trackersByUserID[userID]; exists {
tracker.receivedSamplesAttribution.WithLabelValues(userID, attribution).Add(value)
}
}

func (m *Manager) adjustUserAttribution(userID, attribution string) string {
if m.attributionTracker.attributionLimitExceeded(userID, attribution, time.Now()) {
return m.invalidValue
}
return attribution
}

func (m *Manager) Collect(out chan<- prometheus.Metric) {
m.attributionTracker.mu.RLock()
defer m.attributionTracker.mu.RUnlock()
for _, tracker := range m.attributionTracker.trackersByUserID {
tracker.Collect(out)
}
}

// Describe implements prometheus.Collector.
func (m *Manager) Describe(chan<- *prometheus.Desc) {
// this is an unchecked collector
}
Loading
Loading