Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions internal/query/statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ const (
// NOTES:
// 1. regexp_replace() removes extra spaces, tabs and newlines from queries

// PgStatStatementsTimingDefault defines default query for getting timings stats from pg_stat_statements view.
PgStatStatementsTimingDefault = "SELECT pg_get_userbyid(p.userid) AS user, d.datname AS database, " +
// PgStatStatementsTimingPG13 defines timing query for pg_stat_statements (PG 13-16).
PgStatStatementsTimingPG13 = "SELECT pg_get_userbyid(p.userid) AS user, d.datname AS database, " +
"date_trunc('seconds', round(p.total_plan_time + p.total_exec_time) / 1000 * '1 second'::interval)::text AS all_total, " +
"date_trunc('seconds', round(p.blk_read_time) / 1000 * '1 second'::interval)::text AS read_total, " +
"date_trunc('seconds', round(p.blk_write_time) / 1000 * '1 second'::interval)::text AS write_total, " +
Expand All @@ -29,8 +29,9 @@ const (
`regexp_replace({{.PgSSQueryLenFn}}, E'\\s+', ' ', 'g') AS query ` +
"FROM {{.PGSSSchema}}.pg_stat_statements p JOIN pg_database d ON d.oid=p.dbid"

// the timing query for Postgres 17 and newer. v17 splits up the read-write timing stats into more granular columns.
PgStatStatementsTimingPG17 = `
// PgStatStatementsTimingDefault defines timing query for pg_stat_statements (PG 17+).
// PG 17 splits read/write timing into shared_blk_*, local_blk_*, temp_blk_* columns.
PgStatStatementsTimingDefault = `
SELECT pg_get_userbyid(p.userid) AS user, d.datname AS database,
date_trunc('seconds', round(p.total_exec_time + p.total_plan_time) / 1000 * '1 second'::interval)::text AS all_total,
date_trunc('seconds', round(p.shared_blk_read_time + p.local_blk_read_time + p.temp_blk_read_time) / 1000 * '1 second'::interval)::text AS read_total,
Expand Down Expand Up @@ -96,8 +97,8 @@ FROM {{.PGSSSchema}}.pg_stat_statements p JOIN pg_database d ON d.oid=p.dbid
`regexp_replace({{.PgSSQueryLenFn}}, E'\\s+', ' ', 'g') AS query ` +
"FROM {{.PGSSSchema}}.pg_stat_statements p JOIN pg_database d ON d.oid=p.dbid"

// PgStatStatementsReportQueryDefault defines query used for calculating per-statement report based on pg_stat_statements.
PgStatStatementsReportQueryDefault = "WITH totals AS (SELECT " +
// PgStatStatementsReportQueryPG13 defines report query for pg_stat_statements (PG 13-16).
PgStatStatementsReportQueryPG13 = "WITH totals AS (SELECT " +
"sum(calls) AS total_calls," +
"sum(rows) AS total_rows," +
"sum(total_plan_time + total_exec_time) AS total_all_time," +
Expand Down Expand Up @@ -158,8 +159,9 @@ FROM {{.PGSSSchema}}.pg_stat_statements p JOIN pg_database d ON d.oid=p.dbid
"to_char(100*s.wal_bytes/(SELECT coalesce(nullif(total_wal_bytes, 0), 1) FROM totals), 'FM990.00') AS wal_bytes_ratio " +
"FROM stmt s JOIN pg_database d ON d.oid=s.dbid LIMIT 1"

// The reports query for Postgres 17 and newer. v17 splits up the read-write timing stats into more granular columns.
PgStatStatementsReportQueryPG17 = `
// PgStatStatementsReportQueryDefault defines report query for pg_stat_statements (PG 17+).
// PG 17 splits read/write timing into shared_blk_*, local_blk_*, temp_blk_* columns.
PgStatStatementsReportQueryDefault = `
WITH totals AS (
SELECT
sum(calls) AS total_calls,
Expand Down Expand Up @@ -306,9 +308,9 @@ func SelectStatStatementsTimingQuery(version int) string {
case version < 130000:
return PgStatStatementsTimingPG12
case version >= 170000:
return PgStatStatementsTimingPG17
default:
return PgStatStatementsTimingDefault
default:
return PgStatStatementsTimingPG13
}
}

Expand All @@ -318,8 +320,8 @@ func SelectQueryReportQuery(version int) string {
case version < 130000:
return PgStatStatementsReportQueryPG12
case version >= 170000:
return PgStatStatementsReportQueryPG17
default:
return PgStatStatementsReportQueryDefault
default:
return PgStatStatementsReportQueryPG13
}
}
78 changes: 47 additions & 31 deletions internal/query/statements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ func TestSelectStatStatementsTimingQuery(t *testing.T) {
{version: 100000, want: PgStatStatementsTimingPG12},
{version: 110000, want: PgStatStatementsTimingPG12},
{version: 120000, want: PgStatStatementsTimingPG12},
{version: 130000, want: PgStatStatementsTimingDefault},
{version: 130000, want: PgStatStatementsTimingPG13},
{version: 140000, want: PgStatStatementsTimingPG13},
{version: 160000, want: PgStatStatementsTimingPG13},
// PG 17+: blk_read_time/blk_write_time replaced by shared_blk_read_time etc.
{version: 170000, want: PgStatStatementsTimingDefault},
{version: 180000, want: PgStatStatementsTimingDefault},
}

for _, tc := range testcases {
got := SelectStatStatementsTimingQuery(tc.version)
assert.Equal(t, tc.want, got)
assert.Equal(t, tc.want, got, "version %d", tc.version)
}
}

Expand Down Expand Up @@ -56,8 +61,11 @@ func Test_StatStatementsQueries(t *testing.T) {
})
}

t.Run("pg_stat_statements_timing", func(t *testing.T) {
for _, version := range versions {
// Each version in its own sub-test so a missing PG instance skips only that version,
// not the entire timing suite (fixes the t.Skipf-in-loop bug).
for _, version := range versions {
version := version
t.Run(fmt.Sprintf("pg_stat_statements_timing/%d", version), func(t *testing.T) {
tmpl := SelectStatStatementsTimingQuery(version)
opts := NewOptions(version, "f", "off", 256, "public")
q, err := Format(tmpl, opts)
Expand All @@ -67,32 +75,31 @@ func Test_StatStatementsQueries(t *testing.T) {
if err != nil {
t.Skipf("postgres %d not available in test environment", version)
}
defer conn.Close()

_, err = conn.Exec(q)
assert.NoError(t, err)
})
}

conn.Close()
}
})

t.Run("pg_stat_statements_wal", func(t *testing.T) {
for _, version := range []int{130000} {
tmpl := PgStatStatementsWalDefault
// WAL stats are available since PG 13; test all supported versions.
for _, version := range []int{130000, 140000, 150000, 160000, 170000, 180000} {
version := version
t.Run(fmt.Sprintf("pg_stat_statements_wal/%d", version), func(t *testing.T) {
opts := NewOptions(version, "f", "off", 256, "public")
q, err := Format(tmpl, opts)
q, err := Format(PgStatStatementsWalDefault, opts)
assert.NoError(t, err)

conn, err := postgres.NewTestConnectVersion(version)
if err != nil {
t.Skipf("postgres %d not available in test environment", version)
}
defer conn.Close()

_, err = conn.Exec(q)
assert.NoError(t, err)

conn.Close()
}
})
})
}
}

func TestSelectQueryReportQuery(t *testing.T) {
Expand All @@ -105,33 +112,42 @@ func TestSelectQueryReportQuery(t *testing.T) {
{version: 100000, want: PgStatStatementsReportQueryPG12},
{version: 110000, want: PgStatStatementsReportQueryPG12},
{version: 120000, want: PgStatStatementsReportQueryPG12},
{version: 130000, want: PgStatStatementsReportQueryDefault},
{version: 130000, want: PgStatStatementsReportQueryPG13},
{version: 140000, want: PgStatStatementsReportQueryPG13},
{version: 160000, want: PgStatStatementsReportQueryPG13},
// PG 17+: blk_read_time/blk_write_time replaced by shared_blk_read_time etc.
{version: 170000, want: PgStatStatementsReportQueryDefault},
{version: 180000, want: PgStatStatementsReportQueryDefault},
}

for _, tc := range testcases {
got := SelectQueryReportQuery(tc.version)
assert.Equal(t, tc.want, got)
assert.Equal(t, tc.want, got, "version %d", tc.version)
}
}

// Test_StatStatementsReportQueries runs each version in its own sub-test so a missing
// PG instance skips only that version, not the entire suite (fixes the t.Skipf-in-loop bug).
func Test_StatStatementsReportQueries(t *testing.T) {
versions := []int{90500, 90600, 100000, 110000, 120000, 130000, 140000, 150000, 160000, 170000, 180000}

for _, version := range versions {
tmpl := SelectQueryReportQuery(version)
opts := NewOptions(version, "f", "off", 256, "public")
q, err := Format(tmpl, opts)
assert.NoError(t, err)

conn, err := postgres.NewTestConnectVersion(version)
if err != nil {
t.Skipf("postgres %d not available in test environment", version)
}
version := version
t.Run(fmt.Sprintf("version/%d", version), func(t *testing.T) {
tmpl := SelectQueryReportQuery(version)
opts := NewOptions(version, "f", "off", 256, "public")
q, err := Format(tmpl, opts)
assert.NoError(t, err)

// Use fake query_id, just test queries are executed with no errors.
_, err = conn.Exec(q, "1234567890")
assert.NoError(t, err)
conn, err := postgres.NewTestConnectVersion(version)
if err != nil {
t.Skipf("postgres %d not available in test environment", version)
}
defer conn.Close()

conn.Close()
// Use fake query_id, just test queries are executed with no errors.
_, err = conn.Exec(q, "1234567890")
assert.NoError(t, err)
})
}
}
12 changes: 6 additions & 6 deletions internal/query/wal.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package query

const (
// PgStatWALDefault defines query for pg_stat_wal (PG 14-17).
PgStatWALDefault = "SELECT 'WAL' AS source, " +
// PgStatWALPG14 defines query for pg_stat_wal (PG 14-17).
PgStatWALPG14 = "SELECT 'WAL' AS source, " +
"(SELECT pg_size_pretty(count(1) * pg_size_bytes(current_setting('wal_segment_size'))) AS waldir_size FROM pg_ls_waldir()) AS waldir_size, " +
`round(wal_bytes / 1024, 2) AS "wal,KiB", ` +
"wal_records AS records, wal_fpi AS fpi, " +
`wal_write AS write, wal_sync AS sync, wal_write_time AS "write,ms", wal_sync_time AS "sync,ms", wal_buffers_full AS buffers_full, ` +
"date_trunc('seconds', now() - stats_reset)::text AS stats_age " +
"FROM pg_stat_wal"

// PgStatWALPG18 defines query for pg_stat_wal (PG 18+).
// PgStatWALDefault defines query for pg_stat_wal (PG 18+).
// wal_write, wal_sync, wal_write_time, wal_sync_time removed in PG 18.
PgStatWALPG18 = "SELECT 'WAL' AS source, " +
PgStatWALDefault = "SELECT 'WAL' AS source, " +
"(SELECT pg_size_pretty(count(1) * pg_size_bytes(current_setting('wal_segment_size'))) AS waldir_size FROM pg_ls_waldir()) AS waldir_size, " +
`round(wal_bytes / 1024, 2) AS "wal,KiB", ` +
"wal_records AS records, wal_fpi AS fpi, " +
Expand All @@ -25,8 +25,8 @@ const (
func SelectStatWALQuery(version int) (string, int, [2]int) {
if version >= 180000 {
// PG 18 removed wal_write/wal_sync columns; stats_age is col 6 and must not be diffed.
return PgStatWALPG18, 7, [2]int{2, 5}
return PgStatWALDefault, 7, [2]int{2, 5}
}
// cols 2-9: wal,KiB..buffers_full; col 10 (stats_age) excluded.
return PgStatWALDefault, 11, [2]int{2, 9}
return PgStatWALPG14, 11, [2]int{2, 9}
}
4 changes: 2 additions & 2 deletions internal/view/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func New() Views {
"wal": {
Name: "wal",
MinRequiredVersion: query.PostgresV14,
QueryTmpl: query.PgStatWALDefault,
QueryTmpl: query.PgStatWALPG14,
DiffIntvl: [2]int{2, 9},
Ncols: 11,
OrderKey: 0,
Expand All @@ -135,7 +135,7 @@ func New() Views {
},
"statements_timings": {
Name: "statements_timings",
QueryTmpl: query.PgStatStatementsTimingDefault,
QueryTmpl: query.PgStatStatementsTimingPG13,
DiffIntvl: [2]int{6, 10},
Ncols: 13,
OrderKey: 0,
Expand Down
5 changes: 4 additions & 1 deletion report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,11 @@ func processData(app *app, v view.View, config Config, dataCh chan data, doneCh
}

// readMeta creates metadata object from stat.PGresult.
// The metadata query SelectCommonProperties had 7 columns before cbfa0a4 (without
// shared_preload_libraries) and 8 columns after. Accept any result with at least 2
// columns so that tar files recorded with either version can be read.
func readMeta(res stat.PGresult) (metadata, error) {
if res.Nrows != 1 || res.Ncols != 7 {
if res.Nrows != 1 || res.Ncols < 2 {
return metadata{}, fmt.Errorf("invalid result")
}

Expand Down
19 changes: 19 additions & 0 deletions report/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,25 @@ func Test_readMeta(t *testing.T) {
},
want: metadata{version: 140000},
},
// Reproduces issue #122: shared_preload_libraries was added to SelectCommonProperties
// in cbfa0a4, making it 8 columns. Tar files recorded after that commit have Ncols=8
// and were incorrectly rejected by the strict "!= 7" check.
{
valid: true,
res: stat.PGresult{
Values: [][]sql.NullString{
{
{String: "14.9", Valid: true}, {String: "140009", Valid: true},
{String: "off", Valid: true}, {String: "100", Valid: true}, {String: "3", Valid: true},
{String: "pg_stat_statements", Valid: true},
{String: "false", Valid: true}, {String: "1622828486655396e-6", Valid: true},
},
},
Cols: []string{"version", "version_num", "track_commit_timestamp", "max_connections", "autovacuum_max_workers", "shared_preload_libraries", "recovery", "start_time_unix"},
Ncols: 8, Nrows: 1, Valid: true,
},
want: metadata{version: 140009},
},
{valid: false, res: stat.PGresult{Ncols: 1, Nrows: 1, Valid: true}},
{valid: false, res: stat.PGresult{Ncols: 7, Nrows: 0, Valid: true}},
}
Expand Down
Loading