You've already forked postgres_exporter
							
							
				mirror of
				https://github.com/prometheus-community/postgres_exporter.git
				synced 2025-10-31 09:10:25 +03:00 
			
		
		
		
	feat: allow setting limit in pg_stat_statements (#1205)
				
					
				
			Add `.limit` CLI flag to `stat_statements` collector to allow setting a custom number of queries to be returned. Signed-off-by: Cristian Greco <cristian@regolo.cc>
This commit is contained in:
		| @@ -24,11 +24,15 @@ import ( | |||||||
| 	"github.com/prometheus/client_golang/prometheus" | 	"github.com/prometheus/client_golang/prometheus" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const statStatementsSubsystem = "stat_statements" | const ( | ||||||
|  | 	statStatementsSubsystem = "stat_statements" | ||||||
|  | 	defaultStatementLimit   = "100" | ||||||
|  | ) | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
| 	includeQueryFlag    *bool = nil | 	includeQueryFlag    *bool = nil | ||||||
| 	statementLengthFlag *uint = nil | 	statementLengthFlag *uint = nil | ||||||
|  | 	statementLimitFlag  *uint = nil | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func init() { | func init() { | ||||||
| @@ -47,12 +51,18 @@ func init() { | |||||||
| 		"Maximum length of the statement text."). | 		"Maximum length of the statement text."). | ||||||
| 		Default("120"). | 		Default("120"). | ||||||
| 		Uint() | 		Uint() | ||||||
|  | 	statementLimitFlag = kingpin.Flag( | ||||||
|  | 		fmt.Sprint(collectorFlagPrefix, statStatementsSubsystem, ".limit"), | ||||||
|  | 		"Maximum number of statements to return."). | ||||||
|  | 		Default(defaultStatementLimit). | ||||||
|  | 		Uint() | ||||||
| } | } | ||||||
|  |  | ||||||
| type PGStatStatementsCollector struct { | type PGStatStatementsCollector struct { | ||||||
| 	log                   *slog.Logger | 	log                   *slog.Logger | ||||||
| 	includeQueryStatement bool | 	includeQueryStatement bool | ||||||
| 	statementLength       uint | 	statementLength       uint | ||||||
|  | 	statementLimit        uint | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewPGStatStatementsCollector(config collectorConfig) (Collector, error) { | func NewPGStatStatementsCollector(config collectorConfig) (Collector, error) { | ||||||
| @@ -60,6 +70,7 @@ func NewPGStatStatementsCollector(config collectorConfig) (Collector, error) { | |||||||
| 		log:                   config.logger, | 		log:                   config.logger, | ||||||
| 		includeQueryStatement: *includeQueryFlag, | 		includeQueryStatement: *includeQueryFlag, | ||||||
| 		statementLength:       *statementLengthFlag, | 		statementLength:       *statementLengthFlag, | ||||||
|  | 		statementLimit:        *statementLimitFlag, | ||||||
| 	}, nil | 	}, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -126,9 +137,9 @@ const ( | |||||||
| 			FROM pg_stat_statements | 			FROM pg_stat_statements | ||||||
| 		) | 		) | ||||||
| 	ORDER BY seconds_total DESC | 	ORDER BY seconds_total DESC | ||||||
| 	LIMIT 100;` | 	LIMIT %s;` | ||||||
|  |  | ||||||
| 	pgStatStatementsNewQuery = `SELECT | 	pgStatStatementsQuery_PG13 = `SELECT | ||||||
| 		pg_get_userbyid(userid) as user, | 		pg_get_userbyid(userid) as user, | ||||||
| 		pg_database.datname, | 		pg_database.datname, | ||||||
| 		pg_stat_statements.queryid, | 		pg_stat_statements.queryid, | ||||||
| @@ -148,7 +159,7 @@ const ( | |||||||
| 			FROM pg_stat_statements | 			FROM pg_stat_statements | ||||||
| 		) | 		) | ||||||
| 	ORDER BY seconds_total DESC | 	ORDER BY seconds_total DESC | ||||||
| 	LIMIT 100;` | 	LIMIT %s;` | ||||||
|  |  | ||||||
| 	pgStatStatementsQuery_PG17 = `SELECT | 	pgStatStatementsQuery_PG17 = `SELECT | ||||||
| 		pg_get_userbyid(userid) as user, | 		pg_get_userbyid(userid) as user, | ||||||
| @@ -170,7 +181,7 @@ const ( | |||||||
| 			FROM pg_stat_statements | 			FROM pg_stat_statements | ||||||
| 		) | 		) | ||||||
| 	ORDER BY seconds_total DESC | 	ORDER BY seconds_total DESC | ||||||
| 	LIMIT 100;` | 	LIMIT %s;` | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func (c PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { | func (c PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { | ||||||
| @@ -179,20 +190,24 @@ func (c PGStatStatementsCollector) Update(ctx context.Context, instance *instanc | |||||||
| 	case instance.version.GE(semver.MustParse("17.0.0")): | 	case instance.version.GE(semver.MustParse("17.0.0")): | ||||||
| 		queryTemplate = pgStatStatementsQuery_PG17 | 		queryTemplate = pgStatStatementsQuery_PG17 | ||||||
| 	case instance.version.GE(semver.MustParse("13.0.0")): | 	case instance.version.GE(semver.MustParse("13.0.0")): | ||||||
| 		queryTemplate = pgStatStatementsNewQuery | 		queryTemplate = pgStatStatementsQuery_PG13 | ||||||
| 	default: | 	default: | ||||||
| 		queryTemplate = pgStatStatementsQuery | 		queryTemplate = pgStatStatementsQuery | ||||||
| 	} | 	} | ||||||
| 	var querySelect = "" | 	querySelect := "" | ||||||
| 	if c.includeQueryStatement { | 	if c.includeQueryStatement { | ||||||
| 		querySelect = fmt.Sprintf(pgStatStatementQuerySelect, c.statementLength) | 		querySelect = fmt.Sprintf(pgStatStatementQuerySelect, c.statementLength) | ||||||
| 	} | 	} | ||||||
| 	query := fmt.Sprintf(queryTemplate, querySelect) | 	statementLimit := defaultStatementLimit | ||||||
|  | 	if c.statementLimit > 0 { | ||||||
|  | 		statementLimit = fmt.Sprintf("%d", c.statementLimit) | ||||||
|  | 	} | ||||||
|  | 	query := fmt.Sprintf(queryTemplate, querySelect, statementLimit) | ||||||
|  |  | ||||||
| 	db := instance.getDB() | 	db := instance.getDB() | ||||||
| 	rows, err := db.QueryContext(ctx, query) | 	rows, err := db.QueryContext(ctx, query) | ||||||
|  |  | ||||||
| 	var presentQueryIds = make(map[string]struct{}) | 	presentQueryIds := make(map[string]struct{}) | ||||||
|  |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
|   | |||||||
| @@ -24,7 +24,7 @@ import ( | |||||||
| 	"github.com/smartystreets/goconvey/convey" | 	"github.com/smartystreets/goconvey/convey" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func TestPGStateStatementsCollector(t *testing.T) { | func TestPGStatStatementsCollector(t *testing.T) { | ||||||
| 	db, mock, err := sqlmock.New() | 	db, mock, err := sqlmock.New() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Error opening a stub db connection: %s", err) | 		t.Fatalf("Error opening a stub db connection: %s", err) | ||||||
| @@ -36,7 +36,7 @@ func TestPGStateStatementsCollector(t *testing.T) { | |||||||
| 	columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | 	columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | ||||||
| 	rows := sqlmock.NewRows(columns). | 	rows := sqlmock.NewRows(columns). | ||||||
| 		AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) | 		AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) | ||||||
| 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, ""))).WillReturnRows(rows) | 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, "", defaultStatementLimit))).WillReturnRows(rows) | ||||||
|  |  | ||||||
| 	ch := make(chan prometheus.Metric) | 	ch := make(chan prometheus.Metric) | ||||||
| 	go func() { | 	go func() { | ||||||
| @@ -67,7 +67,7 @@ func TestPGStateStatementsCollector(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestPGStateStatementsCollectorWithStatement(t *testing.T) { | func TestPGStatStatementsCollectorWithStatement(t *testing.T) { | ||||||
| 	db, mock, err := sqlmock.New() | 	db, mock, err := sqlmock.New() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Error opening a stub db connection: %s", err) | 		t.Fatalf("Error opening a stub db connection: %s", err) | ||||||
| @@ -79,7 +79,7 @@ func TestPGStateStatementsCollectorWithStatement(t *testing.T) { | |||||||
| 	columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 100) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | 	columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 100) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | ||||||
| 	rows := sqlmock.NewRows(columns). | 	rows := sqlmock.NewRows(columns). | ||||||
| 		AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) | 		AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) | ||||||
| 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, fmt.Sprintf(pgStatStatementQuerySelect, 100)))).WillReturnRows(rows) | 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, fmt.Sprintf(pgStatStatementQuerySelect, 100), defaultStatementLimit))).WillReturnRows(rows) | ||||||
|  |  | ||||||
| 	ch := make(chan prometheus.Metric) | 	ch := make(chan prometheus.Metric) | ||||||
| 	go func() { | 	go func() { | ||||||
| @@ -111,7 +111,51 @@ func TestPGStateStatementsCollectorWithStatement(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestPGStateStatementsCollectorNull(t *testing.T) { | func TestPGStatStatementsCollectorWithStatementAndLimit(t *testing.T) { | ||||||
|  | 	db, mock, err := sqlmock.New() | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("Error opening a stub db connection: %s", err) | ||||||
|  | 	} | ||||||
|  | 	defer db.Close() | ||||||
|  |  | ||||||
|  | 	inst := &instance{db: db, version: semver.MustParse("12.0.0")} | ||||||
|  |  | ||||||
|  | 	columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 100) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | ||||||
|  | 	rows := sqlmock.NewRows(columns). | ||||||
|  | 		AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) | ||||||
|  | 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, fmt.Sprintf(pgStatStatementQuerySelect, 100), "10"))).WillReturnRows(rows) | ||||||
|  |  | ||||||
|  | 	ch := make(chan prometheus.Metric) | ||||||
|  | 	go func() { | ||||||
|  | 		defer close(ch) | ||||||
|  | 		c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 100, statementLimit: 10} | ||||||
|  |  | ||||||
|  | 		if err := c.Update(context.Background(), inst, ch); err != nil { | ||||||
|  | 			t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	expected := []MetricResult{ | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5}, | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4}, | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100}, | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1}, | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2}, | ||||||
|  | 		{labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	convey.Convey("Metrics comparison", t, func() { | ||||||
|  | 		for _, expect := range expected { | ||||||
|  | 			m := readMetric(<-ch) | ||||||
|  | 			convey.So(expect, convey.ShouldResemble, m) | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  | 	if err := mock.ExpectationsWereMet(); err != nil { | ||||||
|  | 		t.Errorf("there were unfulfilled exceptions: %s", err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPGStatStatementsCollectorNull(t *testing.T) { | ||||||
| 	db, mock, err := sqlmock.New() | 	db, mock, err := sqlmock.New() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Error opening a stub db connection: %s", err) | 		t.Fatalf("Error opening a stub db connection: %s", err) | ||||||
| @@ -123,7 +167,7 @@ func TestPGStateStatementsCollectorNull(t *testing.T) { | |||||||
| 	columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | 	columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | ||||||
| 	rows := sqlmock.NewRows(columns). | 	rows := sqlmock.NewRows(columns). | ||||||
| 		AddRow(nil, nil, nil, nil, nil, nil, nil, nil) | 		AddRow(nil, nil, nil, nil, nil, nil, nil, nil) | ||||||
| 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, ""))).WillReturnRows(rows) | 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG13, "", defaultStatementLimit))).WillReturnRows(rows) | ||||||
|  |  | ||||||
| 	ch := make(chan prometheus.Metric) | 	ch := make(chan prometheus.Metric) | ||||||
| 	go func() { | 	go func() { | ||||||
| @@ -154,7 +198,7 @@ func TestPGStateStatementsCollectorNull(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestPGStateStatementsCollectorNullWithStatement(t *testing.T) { | func TestPGStatStatementsCollectorNullWithStatement(t *testing.T) { | ||||||
| 	db, mock, err := sqlmock.New() | 	db, mock, err := sqlmock.New() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Error opening a stub db connection: %s", err) | 		t.Fatalf("Error opening a stub db connection: %s", err) | ||||||
| @@ -166,7 +210,7 @@ func TestPGStateStatementsCollectorNullWithStatement(t *testing.T) { | |||||||
| 	columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 200) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | 	columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 200) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | ||||||
| 	rows := sqlmock.NewRows(columns). | 	rows := sqlmock.NewRows(columns). | ||||||
| 		AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil) | 		AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil) | ||||||
| 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, fmt.Sprintf(pgStatStatementQuerySelect, 200)))).WillReturnRows(rows) | 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG13, fmt.Sprintf(pgStatStatementQuerySelect, 200), defaultStatementLimit))).WillReturnRows(rows) | ||||||
|  |  | ||||||
| 	ch := make(chan prometheus.Metric) | 	ch := make(chan prometheus.Metric) | ||||||
| 	go func() { | 	go func() { | ||||||
| @@ -198,7 +242,51 @@ func TestPGStateStatementsCollectorNullWithStatement(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestPGStateStatementsCollectorNewPG(t *testing.T) { | func TestPGStatStatementsCollectorNullWithStatementAndLimit(t *testing.T) { | ||||||
|  | 	db, mock, err := sqlmock.New() | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("Error opening a stub db connection: %s", err) | ||||||
|  | 	} | ||||||
|  | 	defer db.Close() | ||||||
|  |  | ||||||
|  | 	inst := &instance{db: db, version: semver.MustParse("13.3.7")} | ||||||
|  |  | ||||||
|  | 	columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 200) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | ||||||
|  | 	rows := sqlmock.NewRows(columns). | ||||||
|  | 		AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil) | ||||||
|  | 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG13, fmt.Sprintf(pgStatStatementQuerySelect, 200), "10"))).WillReturnRows(rows) | ||||||
|  |  | ||||||
|  | 	ch := make(chan prometheus.Metric) | ||||||
|  | 	go func() { | ||||||
|  | 		defer close(ch) | ||||||
|  | 		c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 200, statementLimit: 10} | ||||||
|  |  | ||||||
|  | 		if err := c.Update(context.Background(), inst, ch); err != nil { | ||||||
|  | 			t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	expected := []MetricResult{ | ||||||
|  | 		{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, | ||||||
|  | 		{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, | ||||||
|  | 		{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, | ||||||
|  | 		{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, | ||||||
|  | 		{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, | ||||||
|  | 		{labels: labelMap{"queryid": "unknown", "query": "unknown"}, metricType: dto.MetricType_COUNTER, value: 1}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	convey.Convey("Metrics comparison", t, func() { | ||||||
|  | 		for _, expect := range expected { | ||||||
|  | 			m := readMetric(<-ch) | ||||||
|  | 			convey.So(expect, convey.ShouldResemble, m) | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  | 	if err := mock.ExpectationsWereMet(); err != nil { | ||||||
|  | 		t.Errorf("there were unfulfilled exceptions: %s", err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPGStatStatementsCollector_PG13(t *testing.T) { | ||||||
| 	db, mock, err := sqlmock.New() | 	db, mock, err := sqlmock.New() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Error opening a stub db connection: %s", err) | 		t.Fatalf("Error opening a stub db connection: %s", err) | ||||||
| @@ -210,7 +298,7 @@ func TestPGStateStatementsCollectorNewPG(t *testing.T) { | |||||||
| 	columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | 	columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | ||||||
| 	rows := sqlmock.NewRows(columns). | 	rows := sqlmock.NewRows(columns). | ||||||
| 		AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) | 		AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) | ||||||
| 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, ""))).WillReturnRows(rows) | 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG13, "", defaultStatementLimit))).WillReturnRows(rows) | ||||||
|  |  | ||||||
| 	ch := make(chan prometheus.Metric) | 	ch := make(chan prometheus.Metric) | ||||||
| 	go func() { | 	go func() { | ||||||
| @@ -241,7 +329,7 @@ func TestPGStateStatementsCollectorNewPG(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestPGStateStatementsCollectorNewPGWithStatement(t *testing.T) { | func TestPGStatStatementsCollector_PG13_WithStatement(t *testing.T) { | ||||||
| 	db, mock, err := sqlmock.New() | 	db, mock, err := sqlmock.New() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Error opening a stub db connection: %s", err) | 		t.Fatalf("Error opening a stub db connection: %s", err) | ||||||
| @@ -253,7 +341,7 @@ func TestPGStateStatementsCollectorNewPGWithStatement(t *testing.T) { | |||||||
| 	columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | 	columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | ||||||
| 	rows := sqlmock.NewRows(columns). | 	rows := sqlmock.NewRows(columns). | ||||||
| 		AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) | 		AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) | ||||||
| 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, fmt.Sprintf(pgStatStatementQuerySelect, 300)))).WillReturnRows(rows) | 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG13, fmt.Sprintf(pgStatStatementQuerySelect, 300), defaultStatementLimit))).WillReturnRows(rows) | ||||||
|  |  | ||||||
| 	ch := make(chan prometheus.Metric) | 	ch := make(chan prometheus.Metric) | ||||||
| 	go func() { | 	go func() { | ||||||
| @@ -285,7 +373,51 @@ func TestPGStateStatementsCollectorNewPGWithStatement(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestPGStateStatementsCollector_PG17(t *testing.T) { | func TestPGStatStatementsCollector_PG13_WithStatementAndLimit(t *testing.T) { | ||||||
|  | 	db, mock, err := sqlmock.New() | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("Error opening a stub db connection: %s", err) | ||||||
|  | 	} | ||||||
|  | 	defer db.Close() | ||||||
|  |  | ||||||
|  | 	inst := &instance{db: db, version: semver.MustParse("13.3.7")} | ||||||
|  |  | ||||||
|  | 	columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | ||||||
|  | 	rows := sqlmock.NewRows(columns). | ||||||
|  | 		AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) | ||||||
|  | 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG13, fmt.Sprintf(pgStatStatementQuerySelect, 300), "10"))).WillReturnRows(rows) | ||||||
|  |  | ||||||
|  | 	ch := make(chan prometheus.Metric) | ||||||
|  | 	go func() { | ||||||
|  | 		defer close(ch) | ||||||
|  | 		c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 300, statementLimit: 10} | ||||||
|  |  | ||||||
|  | 		if err := c.Update(context.Background(), inst, ch); err != nil { | ||||||
|  | 			t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	expected := []MetricResult{ | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5}, | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4}, | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100}, | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1}, | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2}, | ||||||
|  | 		{labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	convey.Convey("Metrics comparison", t, func() { | ||||||
|  | 		for _, expect := range expected { | ||||||
|  | 			m := readMetric(<-ch) | ||||||
|  | 			convey.So(expect, convey.ShouldResemble, m) | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  | 	if err := mock.ExpectationsWereMet(); err != nil { | ||||||
|  | 		t.Errorf("there were unfulfilled exceptions: %s", err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPGStatStatementsCollector_PG17(t *testing.T) { | ||||||
| 	db, mock, err := sqlmock.New() | 	db, mock, err := sqlmock.New() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Error opening a stub db connection: %s", err) | 		t.Fatalf("Error opening a stub db connection: %s", err) | ||||||
| @@ -297,7 +429,7 @@ func TestPGStateStatementsCollector_PG17(t *testing.T) { | |||||||
| 	columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | 	columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | ||||||
| 	rows := sqlmock.NewRows(columns). | 	rows := sqlmock.NewRows(columns). | ||||||
| 		AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) | 		AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) | ||||||
| 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG17, ""))).WillReturnRows(rows) | 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG17, "", defaultStatementLimit))).WillReturnRows(rows) | ||||||
|  |  | ||||||
| 	ch := make(chan prometheus.Metric) | 	ch := make(chan prometheus.Metric) | ||||||
| 	go func() { | 	go func() { | ||||||
| @@ -328,7 +460,7 @@ func TestPGStateStatementsCollector_PG17(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestPGStateStatementsCollector_PG17_WithStatement(t *testing.T) { | func TestPGStatStatementsCollector_PG17_WithStatement(t *testing.T) { | ||||||
| 	db, mock, err := sqlmock.New() | 	db, mock, err := sqlmock.New() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Error opening a stub db connection: %s", err) | 		t.Fatalf("Error opening a stub db connection: %s", err) | ||||||
| @@ -340,7 +472,7 @@ func TestPGStateStatementsCollector_PG17_WithStatement(t *testing.T) { | |||||||
| 	columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | 	columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | ||||||
| 	rows := sqlmock.NewRows(columns). | 	rows := sqlmock.NewRows(columns). | ||||||
| 		AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) | 		AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) | ||||||
| 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG17, fmt.Sprintf(pgStatStatementQuerySelect, 300)))).WillReturnRows(rows) | 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG17, fmt.Sprintf(pgStatStatementQuerySelect, 300), defaultStatementLimit))).WillReturnRows(rows) | ||||||
|  |  | ||||||
| 	ch := make(chan prometheus.Metric) | 	ch := make(chan prometheus.Metric) | ||||||
| 	go func() { | 	go func() { | ||||||
| @@ -371,3 +503,47 @@ func TestPGStateStatementsCollector_PG17_WithStatement(t *testing.T) { | |||||||
| 		t.Errorf("there were unfulfilled exceptions: %s", err) | 		t.Errorf("there were unfulfilled exceptions: %s", err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestPGStatStatementsCollector_PG17_WithStatementAndLimit(t *testing.T) { | ||||||
|  | 	db, mock, err := sqlmock.New() | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("Error opening a stub db connection: %s", err) | ||||||
|  | 	} | ||||||
|  | 	defer db.Close() | ||||||
|  |  | ||||||
|  | 	inst := &instance{db: db, version: semver.MustParse("17.0.0")} | ||||||
|  |  | ||||||
|  | 	columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} | ||||||
|  | 	rows := sqlmock.NewRows(columns). | ||||||
|  | 		AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2) | ||||||
|  | 	mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG17, fmt.Sprintf(pgStatStatementQuerySelect, 300), "10"))).WillReturnRows(rows) | ||||||
|  |  | ||||||
|  | 	ch := make(chan prometheus.Metric) | ||||||
|  | 	go func() { | ||||||
|  | 		defer close(ch) | ||||||
|  | 		c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 300, statementLimit: 10} | ||||||
|  |  | ||||||
|  | 		if err := c.Update(context.Background(), inst, ch); err != nil { | ||||||
|  | 			t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	expected := []MetricResult{ | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5}, | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4}, | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100}, | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1}, | ||||||
|  | 		{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2}, | ||||||
|  | 		{labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	convey.Convey("Metrics comparison", t, func() { | ||||||
|  | 		for _, expect := range expected { | ||||||
|  | 			m := readMetric(<-ch) | ||||||
|  | 			convey.So(expect, convey.ShouldResemble, m) | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  | 	if err := mock.ExpectationsWereMet(); err != nil { | ||||||
|  | 		t.Errorf("there were unfulfilled exceptions: %s", err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user