From ba65d2207405f9bc4b57da7bd1e35b0999ecef67 Mon Sep 17 00:00:00 2001 From: Pavel Borisov Date: Mon, 18 May 2026 19:30:15 +0400 Subject: [PATCH] Support checking Orioledb tables in pg_amcheck - Add support for extra access method and call verify_orioledb() sql function for them. - As verify_orioledb function resides in orioledb schema do necessary schema checks before the call. No changes in amcheck utility that still supports only heap and btree access methods. --- src/bin/pg_amcheck/pg_amcheck.c | 173 ++++++++++++++++++++++++++++---- 1 file changed, 154 insertions(+), 19 deletions(-) diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c index 8844bfb6a0e..8ee60a591ff 100644 --- a/src/bin/pg_amcheck/pg_amcheck.c +++ b/src/bin/pg_amcheck/pg_amcheck.c @@ -150,6 +150,7 @@ typedef struct DatabaseInfo { char *datname; char *amcheck_schema; /* escaped, quoted literal */ + char *orioledb_schema; /* escaped, quoted; NULL if not installed */ bool is_checkunique; } DatabaseInfo; @@ -157,7 +158,8 @@ typedef struct RelationInfo { const DatabaseInfo *datinfo; /* shared by other relinfos */ Oid reloid; - bool is_heap; /* true if heap, false if btree */ + bool is_heap; /* heap table */ + bool is_orioledb; /* orioledb table */ char *nspname; char *relname; int relpages; @@ -174,13 +176,25 @@ static const char *const amcheck_sql = "\nJOIN pg_catalog.pg_namespace n ON x.extnamespace = n.oid" "\nWHERE x.extname = 'amcheck'"; +/* + * Lookup the schema where the orioledb extension lives, if installed. + */ +static const char *const orioledb_sql = +"SELECT n.nspname FROM pg_catalog.pg_extension x" +"\nJOIN pg_catalog.pg_namespace n ON x.extnamespace = n.oid" +"\nWHERE x.extname = 'orioledb'"; + static void prepare_heap_command(PQExpBuffer sql, RelationInfo *rel, PGconn *conn); +static void prepare_orioledb_command(PQExpBuffer sql, RelationInfo *rel, + PGconn *conn); static void prepare_btree_command(PQExpBuffer sql, RelationInfo *rel, PGconn *conn); static void run_command(ParallelSlot *slot, const char *sql); static bool verify_heap_slot_handler(PGresult *res, PGconn *conn, void *context); +static bool verify_orioledb_slot_handler(PGresult *res, PGconn *conn, + void *context); static bool verify_btree_slot_handler(PGresult *res, PGconn *conn, void *context); static void help(const char *progname); static void progress_report(uint64 relations_total, uint64 relations_checked, @@ -631,6 +645,21 @@ main(int argc, char *argv[]) PQclear(result); + /* + * Look up the schema where the orioledb extension is installed (if + * any). Required to schema-qualify orioledb_verify_tableam() for any + * orioledb tables we might encounter. + */ + result = executeQuery(conn, orioledb_sql, opts.echo); + if (PQresultStatus(result) == PGRES_TUPLES_OK && PQntuples(result) > 0) + { + const char *schema = PQgetvalue(result, 0, 0); + + dat->orioledb_schema = PQescapeIdentifier(conn, schema, + strlen(schema)); + } + PQclear(result); + compile_relation_list_one_db(conn, &relations, dat, &pagestotal); } @@ -776,6 +805,27 @@ main(int argc, char *argv[]) ParallelSlotSetHandler(free_slot, verify_heap_slot_handler, rel); run_command(free_slot, rel->sql); } + else if (rel->is_orioledb) + { + if (!rel->datinfo->orioledb_schema) + { + pg_log_warning("skipping orioledb relation \"%s.%s.%s\": orioledb extension not found", + rel->datinfo->datname, rel->nspname, rel->relname); + continue; + } + if (opts.verbose) + { + if (opts.show_progress && progress_since_last_stderr) + fprintf(stderr, "\n"); + pg_log_info("checking orioledb table \"%s.%s.%s\"", + rel->datinfo->datname, rel->nspname, rel->relname); + progress_since_last_stderr = false; + } + prepare_orioledb_command(&sql, rel, free_slot->connection); + rel->sql = pstrdup(sql.data); /* pg_free'd after command */ + ParallelSlotSetHandler(free_slot, verify_orioledb_slot_handler, rel); + run_command(free_slot, rel->sql); + } else { if (opts.verbose) @@ -861,6 +911,27 @@ prepare_heap_command(PQExpBuffer sql, RelationInfo *rel, PGconn *conn) rel->reloid); } +/* + * prepare_orioledb_command + * + * Build a SQL command that invokes verify_orioledb() on the given orioledb + * relation. Returns (index_name, msg) per failed index or an empty result + * if the relation passed all checks. + * + * The --check-toast pg_amcheck flag is reused to request the deeper + * (force_map_check) variant. + */ +static void +prepare_orioledb_command(PQExpBuffer sql, RelationInfo *rel, PGconn *conn) +{ + resetPQExpBuffer(sql); + appendPQExpBuffer(sql, + "SELECT index_name, msg FROM %s.verify_orioledb(%u, %s)", + rel->datinfo->orioledb_schema, + rel->reloid, + opts.reconcile_toast ? "true" : "false"); +} + /* * prepare_btree_command * @@ -1101,6 +1172,56 @@ verify_heap_slot_handler(PGresult *res, PGconn *conn, void *context) return should_processing_continue(res); } +/* + * verify_orioledb_slot_handler + * + * Handles results from verify_orioledb(). Each row is (index_name, msg) + * describing a failed index; non-empty result means failure. + */ +static bool +verify_orioledb_slot_handler(PGresult *res, PGconn *conn, void *context) +{ + RelationInfo *rel = (RelationInfo *) context; + + if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + int ntups = PQntuples(res); + int i; + + if (ntups > 0) + all_checks_pass = false; + + for (i = 0; i < ntups; i++) + { + const char *index_name = PQgetvalue(res, i, 0); + const char *msg = PQgetvalue(res, i, 1); + + printf(_("orioledb table \"%s.%s.%s\", index \"%s\":\n"), + rel->datinfo->datname, rel->nspname, rel->relname, + index_name); + printf(" %s\n", msg); + } + } + else + { + char *msg = indent_lines(PQerrorMessage(conn)); + + all_checks_pass = false; + printf(_("orioledb table \"%s.%s.%s\":\n"), + rel->datinfo->datname, rel->nspname, rel->relname); + printf("%s", msg); + if (opts.verbose) + printf(_("query was: %s\n"), rel->sql); + FREE_AND_SET_NULL(msg); + } + + FREE_AND_SET_NULL(rel->sql); + FREE_AND_SET_NULL(rel->nspname); + FREE_AND_SET_NULL(rel->relname); + + return should_processing_continue(res); +} + /* * verify_btree_slot_handler * @@ -1914,7 +2035,7 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations, /* Append the relation CTE. */ appendPQExpBufferStr(&sql, - " relation (pattern_id, oid, nspname, relname, reltoastrelid, relpages, is_heap, is_btree) AS (" + " relation (pattern_id, oid, nspname, relname, reltoastrelid, relpages, is_heap, is_btree, is_orioledb) AS (" "\nSELECT DISTINCT ON (c.oid"); if (!opts.allrel) appendPQExpBufferStr(&sql, ", ip.pattern_id) ip.pattern_id,"); @@ -1923,17 +2044,20 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations, appendPQExpBuffer(&sql, "\nc.oid, n.nspname, c.relname, c.reltoastrelid, c.relpages, " "c.relam = %u AS is_heap, " - "c.relam = %u AS is_btree" + "c.relam = %u AS is_btree, " + "am.amname = 'orioledb' AS is_orioledb" "\nFROM pg_catalog.pg_class c " "INNER JOIN pg_catalog.pg_namespace n " - "ON c.relnamespace = n.oid", + "ON c.relnamespace = n.oid " + "LEFT JOIN pg_catalog.pg_am am " + "ON am.oid = c.relam", HEAP_TABLE_AM_OID, BTREE_AM_OID); if (!opts.allrel) appendPQExpBuffer(&sql, "\nINNER JOIN include_pat ip" "\nON (n.nspname ~ ip.nsp_regex OR ip.nsp_regex IS NULL)" "\nAND (c.relname ~ ip.rel_regex OR ip.rel_regex IS NULL)" - "\nAND (c.relam = %u OR NOT ip.heap_only)" + "\nAND (c.relam = %u OR am.amname = 'orioledb' OR NOT ip.heap_only)" "\nAND (c.relam = %u OR NOT ip.btree_only)", HEAP_TABLE_AM_OID, BTREE_AM_OID); if (opts.excludetbl || opts.excludeidx || opts.excludensp) @@ -1941,7 +2065,7 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations, "\nLEFT OUTER JOIN exclude_pat ep" "\nON (n.nspname ~ ep.nsp_regex OR ep.nsp_regex IS NULL)" "\nAND (c.relname ~ ep.rel_regex OR ep.rel_regex IS NULL)" - "\nAND (c.relam = %u OR NOT ep.heap_only OR ep.rel_regex IS NULL)" + "\nAND (c.relam = %u OR am.amname = 'orioledb' OR NOT ep.heap_only OR ep.rel_regex IS NULL)" "\nAND (c.relam = %u OR NOT ep.btree_only OR ep.rel_regex IS NULL)", HEAP_TABLE_AM_OID, BTREE_AM_OID); @@ -1971,15 +2095,15 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations, */ if (opts.allrel) appendPQExpBuffer(&sql, - " AND c.relam = %u " + " AND (c.relam = %u OR am.amname = 'orioledb') " "AND c.relkind IN ('r', 'S', 'm', 't') " "AND c.relnamespace != %u", HEAP_TABLE_AM_OID, PG_TOAST_NAMESPACE); else appendPQExpBuffer(&sql, - " AND c.relam IN (%u, %u)" + " AND (c.relam IN (%u, %u) OR am.amname = 'orioledb') " "AND c.relkind IN ('r', 'S', 'm', 't', 'i') " - "AND ((c.relam = %u AND c.relkind IN ('r', 'S', 'm', 't')) OR " + "AND (((c.relam = %u OR am.amname = 'orioledb') AND c.relkind IN ('r', 'S', 'm', 't')) OR " "(c.relam = %u AND c.relkind = 'i'))", HEAP_TABLE_AM_OID, BTREE_AM_OID, HEAP_TABLE_AM_OID, BTREE_AM_OID); @@ -2018,15 +2142,18 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations, * selected above, filtering by exclusion patterns (if any) that match * btree index names. */ - appendPQExpBufferStr(&sql, + appendPQExpBuffer(&sql, ", index (oid, nspname, relname, relpages) AS (" "\nSELECT c.oid, r.nspname, c.relname, c.relpages " "FROM relation r" + "\nINNER JOIN pg_catalog.pg_class pc " + "ON pc.oid = r.oid AND pc.relam = %u" "\nINNER JOIN pg_catalog.pg_index i " "ON r.oid = i.indrelid " "INNER JOIN pg_catalog.pg_class c " "ON i.indexrelid = c.oid " - "AND c.relpersistence != 't'"); + "AND c.relpersistence != 't'", + HEAP_TABLE_AM_OID); if (opts.excludeidx || opts.excludensp) appendPQExpBufferStr(&sql, "\nINNER JOIN pg_catalog.pg_namespace n " @@ -2091,7 +2218,7 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations, * list. */ appendPQExpBufferStr(&sql, - "\nSELECT pattern_id, is_heap, is_btree, oid, nspname, relname, relpages " + "\nSELECT pattern_id, is_heap, is_btree, oid, nspname, relname, relpages, is_orioledb " "FROM ("); appendPQExpBufferStr(&sql, /* Inclusion patterns that failed to match */ @@ -2099,34 +2226,38 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations, "NULL::OID AS oid, " "NULL::TEXT AS nspname, " "NULL::TEXT AS relname, " - "NULL::INTEGER AS relpages" + "NULL::INTEGER AS relpages, " + "is_orioledb" "\nFROM relation " "WHERE pattern_id IS NOT NULL " "UNION" /* Primary relations */ "\nSELECT NULL::INTEGER AS pattern_id, " - "is_heap, is_btree, oid, nspname, relname, relpages " + "is_heap, is_btree, oid, nspname, relname, relpages, is_orioledb " "FROM relation"); if (!opts.no_toast_expansion) appendPQExpBufferStr(&sql, " UNION" /* Toast tables for primary relations */ "\nSELECT NULL::INTEGER AS pattern_id, TRUE AS is_heap, " - "FALSE AS is_btree, oid, nspname, relname, relpages " + "FALSE AS is_btree, oid, nspname, relname, relpages, " + "FALSE AS is_orioledb " "FROM toast"); if (!opts.no_btree_expansion) appendPQExpBufferStr(&sql, " UNION" /* Indexes for primary relations */ "\nSELECT NULL::INTEGER AS pattern_id, FALSE AS is_heap, " - "TRUE AS is_btree, oid, nspname, relname, relpages " + "TRUE AS is_btree, oid, nspname, relname, relpages, " + "FALSE AS is_orioledb " "FROM index"); if (!opts.no_toast_expansion && !opts.no_btree_expansion) appendPQExpBufferStr(&sql, " UNION" /* Indexes for toast relations */ "\nSELECT NULL::INTEGER AS pattern_id, FALSE AS is_heap, " - "TRUE AS is_btree, oid, nspname, relname, relpages " + "TRUE AS is_btree, oid, nspname, relname, relpages, " + "FALSE AS is_orioledb " "FROM toast_index"); appendPQExpBufferStr(&sql, "\n) AS combined_records " @@ -2148,6 +2279,7 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations, int pattern_id = -1; bool is_heap = false; bool is_btree PG_USED_FOR_ASSERTS_ONLY = false; + bool is_orioledb = false; Oid oid = InvalidOid; const char *nspname = NULL; const char *relname = NULL; @@ -2167,6 +2299,8 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations, relname = PQgetvalue(res, i, 5); if (!PQgetisnull(res, i, 6)) relpages = atoi(PQgetvalue(res, i, 6)); + if (!PQgetisnull(res, i, 7)) + is_orioledb = (PQgetvalue(res, i, 7)[0] == 't'); if (pattern_id >= 0) { @@ -2184,15 +2318,16 @@ compile_relation_list_one_db(PGconn *conn, SimplePtrList *relations, else { /* Current record pertains to a relation */ - + bool is_table PG_USED_FOR_ASSERTS_ONLY = is_heap || is_orioledb; RelationInfo *rel = (RelationInfo *) pg_malloc0(sizeof(RelationInfo)); Assert(OidIsValid(oid)); - Assert((is_heap && !is_btree) || (is_btree && !is_heap)); + Assert((is_table && !is_btree) || (is_btree && !is_table)); rel->datinfo = dat; rel->reloid = oid; rel->is_heap = is_heap; + rel->is_orioledb = is_orioledb; rel->nspname = pstrdup(nspname); rel->relname = pstrdup(relname); rel->relpages = relpages;