Skip to content

Commit

Permalink
Refactor collectors to not depend of kingpin to be configured.
Browse files Browse the repository at this point in the history
This commit removes references to kingpin.CommandLine, allowing for the
collector package to be used and configured with a custom kingpin (or no
kingpin at all).

The configuration for collectors has been moved to struct fields, which
the kingpin flags populate at flag parse time.

Co-authored-by: Robert Fratto <[email protected]>
Signed-off-by: Marc Tuduri <[email protected]>
  • Loading branch information
2 people authored and cristiangreco committed Oct 14, 2024
1 parent 04268cc commit def4a11
Show file tree
Hide file tree
Showing 16 changed files with 246 additions and 171 deletions.
28 changes: 20 additions & 8 deletions collector/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"log/slog"
"regexp"
"strings"
"sync"
"time"
Expand All @@ -42,15 +43,26 @@ const (

// Tunable flags.
var (
exporterLockTimeout = kingpin.Flag(
versionRE = regexp.MustCompile(`^\d+\.\d+`)
)

// Config holds configuration options for the exporter.
type Config struct {
LockTimeout int
SlowLogFilter bool
}

// RegisterFlags adds flags to configure the exporter.
func (c *Config) RegisterFlags(application *kingpin.Application) {
application.Flag(
"exporter.lock_wait_timeout",
"Set a lock_wait_timeout (in seconds) on the connection to avoid long metadata locking.",
).Default("2").Int()
slowLogFilter = kingpin.Flag(
).Default("2").IntVar(&c.LockTimeout)
application.Flag(
"exporter.log_slow_filter",
"Add a log_slow_filter to avoid slow query logging of scrapes. NOTE: Not supported by Oracle MySQL.",
).Default("false").Bool()
)
).Default("false").BoolVar(&c.SlowLogFilter)
}

// metric definition
var (
Expand Down Expand Up @@ -86,11 +98,11 @@ type Exporter struct {
}

// New returns a new MySQL exporter for the provided DSN.
func New(ctx context.Context, dsn string, scrapers []Scraper, logger *slog.Logger) *Exporter {
func New(ctx context.Context, dsn string, scrapers []Scraper, logger *slog.Logger, cfg Config) *Exporter {
// Setup extra params for the DSN, default to having a lock timeout.
dsnParams := []string{fmt.Sprintf(timeoutParam, *exporterLockTimeout)}
dsnParams := []string{fmt.Sprintf(timeoutParam, cfg.LockTimeout)}

if *slowLogFilter {
if cfg.SlowLogFilter {
dsnParams = append(dsnParams, sessionSettingsParam)
}

Expand Down
10 changes: 10 additions & 0 deletions collector/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"testing"

"github.com/alecthomas/kingpin/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
Expand All @@ -30,13 +31,22 @@ func TestExporter(t *testing.T) {
t.Skip("-short is passed, skipping test")
}

var exporterConfig Config
kingpinApp := kingpin.New("TestExporter", "")
exporterConfig.RegisterFlags(kingpinApp)
_, err := kingpinApp.Parse([]string{})
if err != nil {
t.Fatal(err)
}

exporter := New(
context.Background(),
dsn,
[]Scraper{
ScrapeGlobalStatus{},
},
promslog.NewNopLogger(),
exporterConfig,
)

convey.Convey("Metrics describing", t, func() {
Expand Down
45 changes: 25 additions & 20 deletions collector/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,6 @@ const (
heartbeatQuery = "SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(%s), server_id from `%s`.`%s`"
)

var (
collectHeartbeatDatabase = kingpin.Flag(
"collect.heartbeat.database",
"Database from where to collect heartbeat data",
).Default("heartbeat").String()
collectHeartbeatTable = kingpin.Flag(
"collect.heartbeat.table",
"Table from where to collect heartbeat data",
).Default("heartbeat").String()
collectHeartbeatUtc = kingpin.Flag(
"collect.heartbeat.utc",
"Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`)",
).Bool()
)

// Metric descriptors.
var (
HeartbeatStoredDesc = prometheus.NewDesc(
Expand All @@ -74,7 +59,11 @@ var (
// server_id int unsigned NOT NULL PRIMARY KEY,
//
// );
type ScrapeHeartbeat struct{}
type ScrapeHeartbeat struct {
Database string
Table string
UTC bool
}

// Name of the Scraper. Should be unique.
func (ScrapeHeartbeat) Name() string {
Expand All @@ -91,18 +80,34 @@ func (ScrapeHeartbeat) Version() float64 {
return 5.1
}

// RegisterFlags adds flags to configure the Scraper.
func (s *ScrapeHeartbeat) RegisterFlags(application *kingpin.Application) {
application.Flag(
"collect.heartbeat.database",
"Database from where to collect heartbeat data",
).Default("heartbeat").StringVar(&s.Database)
application.Flag(
"collect.heartbeat.table",
"Table from where to collect heartbeat data",
).Default("heartbeat").StringVar(&s.Table)
application.Flag(
"collect.heartbeat.utc",
"Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`)",
).BoolVar(&s.UTC)
}

// nowExpr returns a current timestamp expression.
func nowExpr() string {
if *collectHeartbeatUtc {
func (s ScrapeHeartbeat) nowExpr() string {
if s.UTC {
return "UTC_TIMESTAMP(6)"
}
return "NOW(6)"
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeHeartbeat) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
func (s ScrapeHeartbeat) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
db := instance.getDB()
query := fmt.Sprintf(heartbeatQuery, nowExpr(), *collectHeartbeatDatabase, *collectHeartbeatTable)
query := fmt.Sprintf(heartbeatQuery, s.nowExpr(), s.Database, s.Table)
heartbeatRows, err := db.QueryContext(ctx, query)
if err != nil {
return err
Expand Down
9 changes: 7 additions & 2 deletions collector/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ var ScrapeHeartbeatTestCases = []ScrapeHeartbeatTestCase{
func TestScrapeHeartbeat(t *testing.T) {
for _, tt := range ScrapeHeartbeatTestCases {
t.Run(fmt.Sprint(tt.Args), func(t *testing.T) {
_, err := kingpin.CommandLine.Parse(tt.Args)
scraper := ScrapeHeartbeat{}

app := kingpin.New("TestScrapeHeartbeat", "")
scraper.RegisterFlags(app)

_, err := app.Parse(tt.Args)
if err != nil {
t.Fatal(err)
}
Expand All @@ -73,7 +78,7 @@ func TestScrapeHeartbeat(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeHeartbeat{}).Scrape(context.Background(), inst, ch, promslog.NewNopLogger()); err != nil {
if err = scraper.Scrape(context.Background(), inst, ch, promslog.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
47 changes: 26 additions & 21 deletions collector/info_schema_processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,6 @@ const infoSchemaProcesslistQuery = `
GROUP BY user, host, command, state
`

// Tunable flags.
var (
processlistMinTime = kingpin.Flag(
"collect.info_schema.processlist.min_time",
"Minimum time a thread must be in each state to be counted",
).Default("0").Int()
processesByUserFlag = kingpin.Flag(
"collect.info_schema.processlist.processes_by_user",
"Enable collecting the number of processes by user",
).Default("true").Bool()
processesByHostFlag = kingpin.Flag(
"collect.info_schema.processlist.processes_by_host",
"Enable collecting the number of processes by host",
).Default("true").Bool()
)

// Metric descriptors.
var (
processlistCountDesc = prometheus.NewDesc(
Expand All @@ -78,7 +62,11 @@ var (
)

// ScrapeProcesslist collects from `information_schema.processlist`.
type ScrapeProcesslist struct{}
type ScrapeProcesslist struct {
ProcessListMinTime int
ProcessesByUserFlag bool
ProcessesByHostFlag bool
}

// Name of the Scraper. Should be unique.
func (ScrapeProcesslist) Name() string {
Expand All @@ -95,11 +83,27 @@ func (ScrapeProcesslist) Version() float64 {
return 5.1
}

// RegisterFlags adds flags to configure the Scraper.
func (s *ScrapeProcesslist) RegisterFlags(application *kingpin.Application) {
application.Flag(
"collect.info_schema.processlist.min_time",
"Minimum time a thread must be in each state to be counted",
).Default("0").IntVar(&s.ProcessListMinTime)
application.Flag(
"collect.info_schema.processlist.processes_by_user",
"Enable collecting the number of processes by user",
).Default("true").BoolVar(&s.ProcessesByUserFlag)
application.Flag(
"collect.info_schema.processlist.processes_by_host",
"Enable collecting the number of processes by host",
).Default("true").BoolVar(&s.ProcessesByHostFlag)
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeProcesslist) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
func (s ScrapeProcesslist) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
processQuery := fmt.Sprintf(
infoSchemaProcesslistQuery,
*processlistMinTime,
s.ProcessListMinTime,
)
db := instance.getDB()
processlistRows, err := db.QueryContext(ctx, processQuery)
Expand Down Expand Up @@ -162,12 +166,13 @@ func (ScrapeProcesslist) Scrape(ctx context.Context, instance *instance, ch chan
}
}

if *processesByHostFlag {
if s.ProcessesByHostFlag {
for _, host := range sortedMapKeys(stateHostCounts) {
ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(stateHostCounts[host]), host)
}
}
if *processesByUserFlag {

if s.ProcessesByUserFlag {
for _, user := range sortedMapKeys(stateUserCounts) {
ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(stateUserCounts[user]), user)
}
Expand Down
8 changes: 6 additions & 2 deletions collector/info_schema_processlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import (
)

func TestScrapeProcesslist(t *testing.T) {
_, err := kingpin.CommandLine.Parse([]string{
scraper := ScrapeProcesslist{}
app := kingpin.New("TestScrapeProcesslist", "")
scraper.RegisterFlags(app)

_, err := app.Parse([]string{
"--collect.info_schema.processlist.processes_by_user",
"--collect.info_schema.processlist.processes_by_host",
})
Expand Down Expand Up @@ -57,7 +61,7 @@ func TestScrapeProcesslist(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeProcesslist{}).Scrape(context.Background(), inst, ch, promslog.NewNopLogger()); err != nil {
if err = scraper.Scrape(context.Background(), inst, ch, promslog.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
26 changes: 14 additions & 12 deletions collector/info_schema_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@ const (
`
)

// Tunable flags.
var (
tableSchemaDatabases = kingpin.Flag(
"collect.info_schema.tables.databases",
"The list of databases to collect table stats for, or '*' for all",
).Default("*").String()
)

// Metric descriptors.
var (
infoSchemaTablesVersionDesc = prometheus.NewDesc(
Expand All @@ -78,7 +70,9 @@ var (
)

// ScrapeTableSchema collects from `information_schema.tables`.
type ScrapeTableSchema struct{}
type ScrapeTableSchema struct {
Databases string
}

// Name of the Scraper. Should be unique.
func (ScrapeTableSchema) Name() string {
Expand All @@ -95,11 +89,19 @@ func (ScrapeTableSchema) Version() float64 {
return 5.1
}

// RegisterFlags adds flags to configure the Scraper.
func (s *ScrapeTableSchema) RegisterFlags(application *kingpin.Application) {
application.Flag(
"collect.info_schema.tables.databases",
"The list of databases to collect table stats for, or '*' for all",
).Default("*").StringVar(&s.Databases)
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeTableSchema) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
func (s ScrapeTableSchema) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
var dbList []string
db := instance.getDB()
if *tableSchemaDatabases == "*" {
if s.Databases == "*" {
dbListRows, err := db.QueryContext(ctx, dbListQuery)
if err != nil {
return err
Expand All @@ -117,7 +119,7 @@ func (ScrapeTableSchema) Scrape(ctx context.Context, instance *instance, ch chan
dbList = append(dbList, database)
}
} else {
dbList = strings.Split(*tableSchemaDatabases, ",")
dbList = strings.Split(s.Databases, ",")
}

for _, database := range dbList {
Expand Down
24 changes: 13 additions & 11 deletions collector/mysql_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,6 @@ const mysqlUserQuery = `
FROM mysql.user
`

// Tunable flags.
var (
userPrivilegesFlag = kingpin.Flag(
"collect.mysql.user.privileges",
"Enable collecting user privileges from mysql.user",
).Default("false").Bool()
)

var (
labelNames = []string{"mysql_user", "hostmask"}
)
Expand All @@ -102,7 +94,9 @@ var (
)

// ScrapeUser collects from `information_schema.processlist`.
type ScrapeUser struct{}
type ScrapeUser struct {
Privileges bool
}

// Name of the Scraper. Should be unique.
func (ScrapeUser) Name() string {
Expand All @@ -119,8 +113,16 @@ func (ScrapeUser) Version() float64 {
return 5.1
}

// RegisterFlags adds flags to configure the Scraper.
func (s *ScrapeUser) RegisterFlags(application *kingpin.Application) {
application.Flag(
"collect.mysql.user.privileges",
"Enable collecting user privileges from mysql.user",
).Default("false").BoolVar(&s.Privileges)
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeUser) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
func (s ScrapeUser) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
db := instance.getDB()
var (
userRows *sql.Rows
Expand Down Expand Up @@ -214,7 +216,7 @@ func (ScrapeUser) Scrape(ctx context.Context, instance *instance, ch chan<- prom
return err
}

if *userPrivilegesFlag {
if s.Privileges {
userCols, err := userRows.Columns()
if err != nil {
return err
Expand Down
Loading

0 comments on commit def4a11

Please sign in to comment.