1
0
mirror of https://github.com/prometheus-community/postgres_exporter.git synced 2025-08-05 06:21:12 +03:00
Files
postgres_exporter/pkg/servers/server.go
2020-03-01 01:20:42 +11:00

317 lines
9.2 KiB
Go

package servers
import (
"database/sql"
"fmt"
"github.com/blang/semver"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/wrouesnel/postgres_exporter/pkg/pgdbconv"
"github.com/wrouesnel/postgres_exporter/pkg/queries/metricmaps"
"sync"
"time"
)
// Server describes a connection to a Postgresql database, and describes the
// metric maps and overrides in-place for it.
type Server struct {
db *sql.DB
labels prometheus.Labels
master bool
// Last version used to calculate metric map. If mismatch on scrape,
// then maps are recalculated.
lastMapVersion semver.Version
// Currently active metric map
metricMap map[string]metricmaps.MetricMapNamespace
// Currently active query overrides
queryOverrides map[string]string
mappingMtx sync.RWMutex
// Currently cached metrics
metricCache map[string]cachedMetrics
cacheMtx sync.Mutex
}
// NewServer establishes a new connection using DSN.
func NewServer(dsn string, opts ...ServerOpt) (*Server, error) {
fingerprint, err := parseFingerprint(dsn)
if err != nil {
return nil, err
}
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
log.Infof("Established new database connection to %q.", fingerprint)
s := &Server{
db: db,
master: false,
labels: prometheus.Labels{
metricmaps.ServerLabelName: fingerprint,
},
metricCache: make(map[string]cachedMetrics),
}
for _, opt := range opts {
opt(s)
}
return s, nil
}
// Close disconnects from Postgres.
func (s *Server) Close() error {
return s.db.Close()
}
// Ping checks connection availability and possibly invalidates the connection if it fails.
func (s *Server) Ping() error {
if err := s.db.Ping(); err != nil {
if cerr := s.Close(); cerr != nil {
log.Errorf("Error while closing non-pinging DB connection to %q: %v", s, cerr)
}
return err
}
return nil
}
// String returns server's fingerprint.
func (s *Server) String() string {
return s.labels[metricmaps.ServerLabelName]
}
// Scrape loads metrics.
func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool) error {
s.mappingMtx.RLock()
defer s.mappingMtx.RUnlock()
var err error
if !disableSettingsMetrics && s.master {
if err = querySettings(ch, s); err != nil {
err = fmt.Errorf("error retrieving settings: %s", err)
}
}
errMap := queryNamespaceMappings(ch, s)
if len(errMap) > 0 {
err = fmt.Errorf("queryNamespaceMappings returned %d errors", len(errMap))
}
return err
}
// Query within a namespace mapping and emit metrics. Returns fatal errors if
// the scrape fails, and a slice of errors if they were non-fatal.
func (s *Server) queryNamespaceMapping(namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
// Check for a query override for this namespace
query, found := s.queryOverrides[namespace]
// Was this query disabled (i.e. nothing sensible can be queried on cu
// version of PostgreSQL?
if query == "" && found {
// Return success (no pertinent data)
return []prometheus.Metric{}, []error{}, nil
}
// Don't fail on a bad scrape of one metric
var rows *sql.Rows
var err error
if !found {
// I've no idea how to avoid this properly at the moment, but this is
// an admin tool so you're not injecting SQL right?
rows, err = s.db.Query(fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas, safesql
} else {
rows, err = s.db.Query(query) // nolint: safesql
}
if err != nil {
return []prometheus.Metric{}, []error{}, fmt.Errorf("Error running query on database %q: %s %v", s, namespace, err)
}
defer rows.Close() // nolint: errcheck
var columnNames []string
columnNames, err = rows.Columns()
if err != nil {
return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving column list for: ", namespace, err))
}
// Make a lookup map for the column indices
var columnIdx = make(map[string]int, len(columnNames))
for i, n := range columnNames {
columnIdx[n] = i
}
var columnData = make([]interface{}, len(columnNames))
var scanArgs = make([]interface{}, len(columnNames))
for i := range columnData {
scanArgs[i] = &columnData[i]
}
nonfatalErrors := []error{}
metrics := make([]prometheus.Metric, 0)
for rows.Next() {
err = rows.Scan(scanArgs...)
if err != nil {
return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving rows:", namespace, err))
}
// Get the label values for this row.
labels := make([]string, len(mapping.labels))
for idx, label := range mapping.labels {
labels[idx], _ = pgdbconv.DBToString(columnData[columnIdx[label]])
}
// Loop over column names, and match to scan data. Unknown columns
// will be filled with an untyped metric number *if* they can be
// converted to float64s. NULLs are allowed and treated as NaN.
for idx, columnName := range columnNames {
var metric prometheus.Metric
if metricMapping, ok := mapping.columnMappings[columnName]; ok {
// Is this a metricy metric?
if metricMapping.discard {
continue
}
value, ok := pgdbconv.DBToFloat64(columnData[idx])
if !ok {
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName, columnData[idx])))
continue
}
// Generate the metric
metric = prometheus.MustNewConstMetric(metricMapping.desc, metricMapping.vtype, value, labels...)
} else {
// Unknown metric. Report as untyped if scan to float64 works, else note an error too.
metricLabel := fmt.Sprintf("%s_%s", namespace, columnName)
desc := prometheus.NewDesc(metricLabel, fmt.Sprintf("Unknown metric from %s", namespace), mapping.labels, s.labels)
// Its not an error to fail here, since the values are
// unexpected anyway.
value, ok := pgdbconv.DBToFloat64(columnData[idx])
if !ok {
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unparseable column type - discarding: ", namespace, columnName, err)))
continue
}
metric = prometheus.MustNewConstMetric(desc, prometheus.UntypedValue, value, labels...)
}
metrics = append(metrics, metric)
}
}
return metrics, nonfatalErrors, nil
}
// Iterate through all the namespace mappings in the exporter and run their
// queries.
func (s *Server) queryNamespaceMappings(ch chan<- prometheus.Metric) map[string]error {
// Return a map of namespace -> errors
namespaceErrors := make(map[string]error)
scrapeStart := time.Now()
for namespace, mapping := range s.metricMap {
log.Debugln("Querying namespace: ", namespace)
if mapping.master && !s.master {
log.Debugln("Query skipped...")
continue
}
scrapeMetric := false
// Check if the metric is cached
s.cacheMtx.Lock()
cachedMetric, found := s.metricCache[namespace]
s.cacheMtx.Unlock()
// If found, check if needs refresh from cache
if found {
if scrapeStart.Sub(cachedMetric.lastScrape).Seconds() > float64(mapping.cacheSeconds) {
scrapeMetric = true
}
} else {
scrapeMetric = true
}
var metrics []prometheus.Metric
var nonFatalErrors []error
var err error
if scrapeMetric {
metrics, nonFatalErrors, err = s.queryNamespaceMapping(namespace, mapping)
} else {
metrics = cachedMetric.metrics
}
// Serious error - a namespace disappeared
if err != nil {
namespaceErrors[namespace] = err
log.Infoln(err)
}
// Non-serious errors - likely version or parsing problems.
if len(nonFatalErrors) > 0 {
for _, err := range nonFatalErrors {
log.Infoln(err.Error())
}
}
// Emit the metrics into the channel
for _, metric := range metrics {
ch <- metric
}
if scrapeMetric {
// Only cache if metric is meaningfully cacheable
if mapping.cacheSeconds > 0 {
s.cacheMtx.Lock()
s.metricCache[namespace] = cachedMetrics{
metrics: metrics,
lastScrape: scrapeStart,
}
s.cacheMtx.Unlock()
}
}
}
return namespaceErrors
}
// AddQueries adds queries to the builtinMetricMaps and queryOverrides maps.
// Added queries do not respect version requirements, because it is assumed
// that the user knows what they are doing with their version of postgres.
func (s *Server) AddQueries() {
}
// Query the pg_settings view containing runtime variables
func (s *Server) querySettings(ch chan<- prometheus.Metric) error {
log.Debugf("Querying pg_setting view on %q", s)
// pg_settings docs: https://www.postgresql.org/docs/current/static/view-pg-settings.html
//
// NOTE: If you add more vartypes here, you must update the supported
// types in normaliseUnit() below
query := "SELECT name, setting, COALESCE(unit, ''), short_desc, vartype FROM pg_settings WHERE vartype IN ('bool', 'integer', 'real');"
rows, err := s.db.Query(query)
if err != nil {
return fmt.Errorf("Error running query on database %q: %s %v", s, metricmaps.ExporterNamespaceLabel, err)
}
defer rows.Close() // nolint: errcheck
for rows.Next() {
setting := &pgSetting{}
err = rows.Scan(&setting.name, &setting.setting, &setting.unit, &setting.shortDesc, &setting.vartype)
if err != nil {
return fmt.Errorf("Error retrieving rows on %q: %s %v", s, metricmaps.ExporterNamespaceLabel, err)
}
ch <- setting.metric(s.labels)
}
return nil
}