diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index c482c25fa46..ec5b539fbfc 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -10,18 +10,24 @@ #include "postgres.h" +#include "miscadmin.h" + #include "access/genam.h" #include "access/htup_details.h" #include "access/xact.h" #include "catalog/index.h" #include "catalog/pg_attrdef.h" #include "catalog/pg_class.h" +#include "catalog/pg_collation.h" #include "catalog/pg_constraint.h" #include "catalog/pg_depend.h" #include "catalog/pg_type.h" +#include "commands/defrem.h" #include "commands/tablecmds.h" +#include "executor/spi.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" +#include "nodes/nodeFuncs.h" #include "nodes/parsenodes.h" #include "parser/parse_expr.h" #include "parser/parse_type.h" @@ -29,10 +35,13 @@ #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/ruleutils.h" #include "utils/syscache.h" +#include "utils/varlena.h" #include "pg_version_constants.h" +#include "distributed/backend_data.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" @@ -66,6 +75,87 @@ bool EnableLocalReferenceForeignKeys = true; */ bool AllowUnsafeConstraints = false; +/* + * GUC citus.distribution_columns: a comma-separated priority list of column + * names (e.g. 'tenant_id,customer_id,department'). When a new table is created, + * Citus walks the list in order and distributes by the first column that exists + * in the table. Applies to CREATE TABLE and CREATE TABLE AS SELECT. + */ +char *DistributionColumns = ""; + +/* Pre-parsed list of distribution column names (List of String nodes). + * Updated by AssignDistributionColumns whenever the GUC changes. */ +List *ParsedDistributionColumns = NIL; + + +/* + * CheckDistributionColumns is the GUC check hook for + * citus.distribution_columns. It validates the comma-separated identifier + * list using SplitIdentifierString and rejects invalid input (e.g. empty + * tokens from double commas) before the assign hook runs. + */ +bool +CheckDistributionColumns(char **newval, void **extra, GucSource source) +{ + if (*newval == NULL || (*newval)[0] == '\0') + { + return true; + } + + char *rawCopy = pstrdup(*newval); + List *parsed = NIL; + bool valid = SplitIdentifierString(rawCopy, ',', &parsed); + + list_free(parsed); + pfree(rawCopy); + + return valid; +} + + +/* + * AssignDistributionColumns is the GUC assign hook for + * citus.distribution_columns. It parses the comma-separated string into + * a list of char* pointers (via SplitIdentifierString) so that callers + * don't re-tokenize on every use. The check hook has already validated + * the input, so SplitIdentifierString will always succeed here. + */ +void +AssignDistributionColumns(const char *newval, void *extra) +{ + /* + * SplitIdentifierString modifies the input string in-place and returns + * a list of char* pointers into that string. We must keep the backing + * string alive as long as ParsedDistributionColumns references it, and + * free it on the next call. Everything must live in TopMemoryContext + * so it survives transaction boundaries. + */ + static char *previousRawList = NULL; + + list_free(ParsedDistributionColumns); + ParsedDistributionColumns = NIL; + + if (previousRawList != NULL) + { + pfree(previousRawList); + previousRawList = NULL; + } + + if (!newval || newval[0] == '\0') + { + return; + } + + MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext); + + previousRawList = pstrdup(newval); + + SplitIdentifierString(previousRawList, ',', &ParsedDistributionColumns); + + MemoryContextSwitchTo(oldContext); +} + + /* Local functions forward declarations for unsupported command checks */ static void PostprocessCreateTableStmtForeignKeys(CreateStmt *createStatement); static void PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, @@ -127,6 +217,11 @@ static char * GetAddColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId, bool ifNotExists); static void ErrorIfAlterTableDropTableNameFromPostgresFdw(List *optionList, Oid relationId); +static char * FindMatchingDistributionColumnFromTargetList(List *targetList, + List *colNames); +static char * FindMatchingDistributionColumn(Oid relationId); +static bool ShouldAutoDistributeNewTable(Oid relationId); +static void AutoDistributeNewTable(Oid relationId); /* @@ -4232,11 +4327,650 @@ ErrorIfTableHasIdentityColumn(Oid relationId) } +/* + * FindMatchingDistributionColumnFromTargetList walks the GUC priority list + * and returns the first column name that appears in the given Query's + * targetList. This is used to determine the distribution column before + * the table exists (for CTAS optimization). colNames, if non-NULL, + * overrides the column names from the targetList (as used by IntoClause). + */ +static char * +FindMatchingDistributionColumnFromTargetList(List *targetList, List *colNames) +{ + if (ParsedDistributionColumns == NIL) + { + return NULL; + } + + /* Build list of available column names from colNames or targetList */ + List *availableCols = NIL; + bool freeAvailableCols = false; + + if (colNames != NIL) + { + /* colNames is a List of String nodes; extract raw char* */ + ListCell *cnCell = NULL; + foreach(cnCell, colNames) + { + availableCols = lappend(availableCols, strVal(lfirst(cnCell))); + } + freeAvailableCols = true; + } + else + { + ListCell *tlCell = NULL; + foreach(tlCell, targetList) + { + TargetEntry *tle = lfirst_node(TargetEntry, tlCell); + if (!tle->resjunk && tle->resname) + { + availableCols = lappend(availableCols, tle->resname); + } + } + freeAvailableCols = true; + } + + /* Walk priority list, return the first match */ + ListCell *tokenCell = NULL; + foreach(tokenCell, ParsedDistributionColumns) + { + const char *candidate = (const char *) lfirst(tokenCell); + + ListCell *colCell = NULL; + foreach(colCell, availableCols) + { + const char *colName = (const char *) lfirst(colCell); + if (pg_strcasecmp(candidate, colName) == 0) + { + if (freeAvailableCols) + { + list_free(availableCols); + } + return pstrdup(candidate); + } + } + } + + if (freeAvailableCols) + { + list_free(availableCols); + } + + return NULL; +} + +/* + * FindMatchingDistributionColumn walks the comma-separated priority list in + * citus.distribution_columns and returns a palloc'd copy of the first column + * name that exists in the given relation. Returns NULL if none match or if + * the GUC is empty. + */ +static char * +FindMatchingDistributionColumn(Oid relationId) +{ + if (ParsedDistributionColumns == NIL) + { + return NULL; + } + + ListCell *cell = NULL; + foreach(cell, ParsedDistributionColumns) + { + const char *colName = (const char *) lfirst(cell); + AttrNumber attNum = get_attnum(relationId, colName); + if (attNum != InvalidAttrNumber) + { + return pstrdup(colName); + } + } + + return NULL; +} + + +/* + * ShouldAutoDistributeNewTable returns true if citus.distribution_columns is + * set and the given relation has a column matching one of the names in the + * priority list. + */ +static bool +ShouldAutoDistributeNewTable(Oid relationId) +{ + if (ParsedDistributionColumns == NIL) + { + return false; + } + + if (IsBinaryUpgrade) + { + return false; + } + + /* internal backends (metadata sync, rebalancer) should not auto-distribute */ + if (IsCitusInternalBackend() || IsRebalancerInternalBackend()) + { + return false; + } + + /* skip temp tables */ + if (get_rel_persistence(relationId) == RELPERSISTENCE_TEMP) + { + return false; + } + + /* + * Skip tables that are already Citus tables (e.g. partitions that were + * already distributed by PostprocessCreateTableStmtPartitionOf). + */ + if (IsCitusTable(relationId)) + { + return false; + } + + /* + * Skip partitions of tables that are already distributed. They will be + * distributed automatically by Citus when attached to their parent. + * For partitions of local parents, the parent itself will be auto- + * distributed (if it matches the GUC) and the partition will follow. + */ + if (PartitionTable(relationId)) + { + return false; + } + + /* + * Skip foreign tables, materialized views, and bare inherited tables — + * Citus cannot hash-distribute these relation kinds. + */ + char relkind = get_rel_relkind(relationId); + if (relkind == RELKIND_FOREIGN_TABLE || + relkind == RELKIND_MATVIEW) + { + return false; + } + + /* Citus does not support distributing tables with inheritance parents */ + if (IsChildTable(relationId) || IsParentTable(relationId)) + { + return false; + } + + /* check whether any column in the priority list exists in this table */ + char *matchedCol = FindMatchingDistributionColumn(relationId); + if (matchedCol == NULL) + { + return false; + } + + pfree(matchedCol); + return true; +} + + +/* + * AutoDistributeNewTable distributes the given relation using the first + * matching column from the citus.distribution_columns priority list as + * the hash distribution column. + */ +static void +AutoDistributeNewTable(Oid relationId) +{ + char *distributionColumn = FindMatchingDistributionColumn(relationId); + Assert(distributionColumn != NULL); + + char *colocateWith = "default"; + bool shardCountIsStrict = false; + + ereport(NOTICE, (errmsg("auto-distributing table \"%s\" by column \"%s\" " + "(from citus.distribution_columns)", + get_rel_name(relationId), distributionColumn))); + + CreateDistributedTable(relationId, distributionColumn, + DISTRIBUTE_BY_HASH, ShardCount, + shardCountIsStrict, colocateWith); + + pfree(distributionColumn); +} + + +/* + * TryOptimizeCTASForAutoDistribution intercepts CREATE TABLE AS SELECT + * and converts it into CREATE TABLE + INSERT INTO ... SELECT to avoid + * pulling all data through the coordinator. When the auto-distribution + * GUC is set and the output columns match, we: + * 1. Create the empty target table + * 2. Distribute it (auto-distribution) + * 3. Execute INSERT INTO target SELECT ... (Citus pushes this down) + * + * Returns true if the CTAS was handled via the optimized path. + * Returns false if the normal (unoptimized) path should be used. + */ +bool +TryOptimizeCTASForAutoDistribution(CreateTableAsStmt *ctasStmt, + const char *queryString) +{ + /* Quick bail-out if the GUC is not set */ + if (ParsedDistributionColumns == NIL) + { + return false; + } + + if (IsBinaryUpgrade) + { + return false; + } + + /* internal backends should not auto-distribute */ + if (IsCitusInternalBackend() || IsRebalancerInternalBackend()) + { + return false; + } + + /* only handle regular tables, not materialized views */ + if (ctasStmt->objtype != OBJECT_TABLE) + { + return false; + } + + /* skip temp tables */ + IntoClause *into = ctasStmt->into; + if (into->rel->relpersistence == RELPERSISTENCE_TEMP) + { + return false; + } + + /* + * If schema-based sharding is enabled and the target table would go + * into a tenant schema, fall back to the normal path so that + * ConvertNewTableIfNecessary can create a single-shard tenant table + * (tenant schema takes precedence over auto-distribution). + */ + if (EnableSchemaBasedSharding) + { + Oid schemaOid = InvalidOid; + if (into->rel->schemaname != NULL) + { + schemaOid = get_namespace_oid(into->rel->schemaname, true); + } + else + { + List *searchPath = fetch_search_path(false); + if (searchPath != NIL) + { + schemaOid = linitial_oid(searchPath); + list_free(searchPath); + } + } + if (OidIsValid(schemaOid) && IsTenantSchema(schemaOid)) + { + return false; + } + } + + /* + * Skip SELECT INTO syntax — it doesn't use "AS" so we can't easily + * extract the SELECT part from the query string. Fall back to the + * normal post-creation path for these. + */ + if (ctasStmt->is_select_into) + { + return false; + } + + /* + * The query must be an analyzed Query node by the time we get here. + * If it's not (e.g. it's an EXECUTE), fall through to the normal path. + */ + if (!IsA(ctasStmt->query, Query)) + { + return false; + } + + Query *selectQuery = (Query *) ctasStmt->query; + + /* + * Check if any output column matches the distribution_columns GUC. + * colNames from IntoClause override the target list column names. + */ + char *distColumn = FindMatchingDistributionColumnFromTargetList( + selectQuery->targetList, into->colNames); + if (distColumn == NULL) + { + return false; + } + + /* + * Build qualified table name for the target. + */ + const char *schemaName = into->rel->schemaname; + const char *tableName = into->rel->relname; + const char *qualifiedName = schemaName ? + quote_qualified_identifier(schemaName, tableName) : + quote_identifier(tableName); + + /* + * If IF NOT EXISTS is set and the table already exists, skip. + */ + if (ctasStmt->if_not_exists) + { + bool missingOk = true; + Oid existingOid = RangeVarGetRelid(into->rel, NoLock, missingOk); + if (OidIsValid(existingOid)) + { + pfree(distColumn); + return false; + } + } + + /* + * Step 1: Build and execute CREATE TABLE with column definitions + * derived from the SELECT's target list. + */ + StringInfoData createBuf; + initStringInfo(&createBuf); + appendStringInfo(&createBuf, "CREATE TABLE %s (", qualifiedName); + + int colIdx = 0; + ListCell *colNameCell = list_head(into->colNames); + TargetEntry *tle = NULL; + foreach_declared_ptr(tle, selectQuery->targetList) + { + if (tle->resjunk) + { + continue; + } + + const char *colName = NULL; + if (colNameCell != NULL) + { + colName = strVal(lfirst(colNameCell)); + colNameCell = lnext(into->colNames, colNameCell); + } + else + { + colName = tle->resname; + } + + if (colName == NULL) + { + /* Cannot determine column name, fall back to normal path */ + pfree(createBuf.data); + pfree(distColumn); + return false; + } + + Oid colType = exprType((Node *) tle->expr); + int32 colTypmod = exprTypmod((Node *) tle->expr); + Oid colCollation = exprCollation((Node *) tle->expr); + + if (colIdx > 0) + { + appendStringInfoString(&createBuf, ", "); + } + + bits16 formatFlags = FORMAT_TYPE_TYPEMOD_GIVEN | FORMAT_TYPE_FORCE_QUALIFY; + appendStringInfo(&createBuf, "%s %s", + quote_identifier(colName), + format_type_extended(colType, colTypmod, formatFlags)); + + /* Add COLLATE clause if non-default collation */ + if (OidIsValid(colCollation) && colCollation != DEFAULT_COLLATION_OID) + { + appendStringInfo(&createBuf, " COLLATE %s", + generate_collation_name(colCollation)); + } + + colIdx++; + } + + if (colIdx == 0) + { + /* No columns — fall back */ + pfree(createBuf.data); + pfree(distColumn); + return false; + } + + appendStringInfoChar(&createBuf, ')'); + + /* Add WITH clause options if present */ + if (into->options != NIL) + { + appendStringInfoString(&createBuf, " WITH ("); + int optIdx = 0; + DefElem *opt = NULL; + foreach_declared_ptr(opt, into->options) + { + if (optIdx > 0) + { + appendStringInfoString(&createBuf, ", "); + } + + if (opt->arg != NULL) + { + appendStringInfo(&createBuf, "%s = %s", + opt->defname, + defGetString(opt)); + } + else + { + appendStringInfoString(&createBuf, opt->defname); + } + optIdx++; + } + appendStringInfoChar(&createBuf, ')'); + } + + /* Add tablespace if specified */ + if (into->tableSpaceName != NULL) + { + appendStringInfo(&createBuf, " TABLESPACE %s", + quote_identifier(into->tableSpaceName)); + } + + /* Add access method if specified (e.g. USING columnar) */ + if (into->accessMethod != NULL) + { + appendStringInfo(&createBuf, " USING %s", + quote_identifier(into->accessMethod)); + } + + /* + * Execute the CREATE TABLE via SPI (utility commands can't go through + * ExecuteQueryStringIntoDestReceiver). This will trigger + * ConvertNewTableIfNecessary for the CREATE TABLE path, + * which will auto-distribute the empty table. + */ + ereport(DEBUG1, (errmsg("optimized CTAS: creating empty distributed table " + "before INSERT...SELECT"))); + + int spiResult = SPI_connect(); + if (spiResult != SPI_OK_CONNECT) + { + ereport(ERROR, (errmsg("could not connect to SPI manager"))); + } + + spiResult = SPI_execute(createBuf.data, false, 0); + if (spiResult != SPI_OK_UTILITY) + { + ereport(ERROR, (errmsg("failed to execute CREATE TABLE via SPI: %s", + createBuf.data))); + } + + /* + * Need to increment command counter so that subsequent commands + * can see the new table. + */ + CommandCounterIncrement(); + + /* + * The table was created by SPI as a sub-command, so the utility hook + * won't auto-distribute it (ConvertNewTableIfNecessary only runs for + * top-level commands). We must explicitly distribute it here. + */ + bool missingOk = false; + Oid createdRelationId = RangeVarGetRelid(into->rel, NoLock, missingOk); + + if (ShouldAutoDistributeNewTable(createdRelationId)) + { + AutoDistributeNewTable(createdRelationId); + } + + if (!IsCitusTable(createdRelationId)) + { + /* + * Table was created but not distributed. Execute INSERT ... SELECT + * to populate it, but this won't benefit from pushdown. + */ + ereport(DEBUG1, (errmsg("optimized CTAS: table was not auto-distributed, " + "falling back to coordinator INSERT"))); + } + else + { + ereport(NOTICE, (errmsg("optimized CTAS: table \"%s\" auto-distributed by " + "column \"%s\", using INSERT...SELECT for data", + tableName, distColumn))); + } + + /* + * Step 2: Build INSERT INTO target SELECT ... to populate the table. + * We extract the SELECT portion from the original queryString by + * scanning for the AS keyword that separates CREATE TABLE ... from + * the query body. + */ + StringInfoData insertBuf; + initStringInfo(&insertBuf); + + /* + * Find the SELECT part of the CTAS statement. We look for AS followed + * by SELECT, (, or WITH (for CTEs). We need to handle: + * CREATE TABLE t AS SELECT ... + * CREATE TABLE t AS (SELECT ...) + * CREATE TABLE t (col1, col2) AS SELECT ... + * CREATE TABLE t AS WITH cte AS (...) SELECT ... + */ + const char *selectStart = NULL; + const char *ptr = queryString; + + /* + * Scan for the AS keyword that precedes the SELECT query. + * We need to skip past the table name and any column list. + * Look for pattern: AS followed by SELECT, (, or WITH. + */ + while (*ptr != '\0') + { + /* skip string literals (handling '' escape sequences) */ + if (*ptr == '\'') + { + ptr++; + while (*ptr != '\0') + { + if (*ptr == '\'') + { + if (*(ptr + 1) == '\'') + { + ptr += 2; /* skip escaped quote */ + } + else + { + break; /* end of string literal */ + } + } + else + { + ptr++; + } + } + if (*ptr != '\0') + { + ptr++; + } + continue; + } + + /* skip quoted identifiers */ + if (*ptr == '\"') + { + ptr++; + while (*ptr != '\0' && *ptr != '\"') + { + ptr++; + } + if (*ptr != '\0') + { + ptr++; + } + continue; + } + + /* Check for AS keyword (case-insensitive) */ + if ((ptr[0] == 'A' || ptr[0] == 'a') && + (ptr[1] == 'S' || ptr[1] == 's') && + (ptr == queryString || !isalnum((unsigned char) ptr[-1])) && + !isalnum((unsigned char) ptr[2]) && ptr[2] != '_') + { + const char *afterAs = ptr + 2; + + /* skip whitespace after AS */ + while (*afterAs == ' ' || *afterAs == '\t' || *afterAs == '\n' || + *afterAs == '\r') + { + afterAs++; + } + + /* Check if what follows is a SELECT query indicator */ + if (pg_strncasecmp(afterAs, "SELECT", 6) == 0 || + pg_strncasecmp(afterAs, "WITH", 4) == 0 || + pg_strncasecmp(afterAs, "TABLE", 5) == 0 || + pg_strncasecmp(afterAs, "VALUES", 6) == 0 || + *afterAs == '(') + { + selectStart = afterAs; + break; + } + } + + ptr++; + } + + if (selectStart == NULL) + { + /* + * Could not find the SELECT part — this shouldn't happen for + * valid CTAS, but fall back gracefully. The table is already + * created (empty), so just let the caller know we handled it. + */ + ereport(WARNING, (errmsg("optimized CTAS: could not extract SELECT " + "from query string, table is empty"))); + SPI_finish(); + return true; + } + + appendStringInfo(&insertBuf, "INSERT INTO %s %s", + qualifiedName, selectStart); + + spiResult = SPI_execute(insertBuf.data, false, 0); + if (spiResult != SPI_OK_INSERT) + { + ereport(ERROR, (errmsg("failed to execute INSERT...SELECT via SPI"))); + } + + SPI_finish(); + + return true; +} + + /* * ConvertNewTableIfNecessary converts the given table to a tenant schema - * table or a Citus managed table if necessary. + * table, an auto-distributed table, or a Citus managed table if necessary. * * Input node is expected to be a CreateStmt or a CreateTableAsStmt. + * + * The precedence is: + * 1. Tenant schema tables (citus.enable_schema_based_sharding) + * 2. Auto-distributed tables (citus.distribution_columns) + * 3. Citus managed tables (citus.use_citus_managed_tables) */ void ConvertNewTableIfNecessary(Node *createStmt) @@ -4274,6 +5008,16 @@ ConvertNewTableIfNecessary(Node *createStmt) CreateTenantSchemaTable(createdRelationId); } + else if (ShouldAutoDistributeNewTable(createdRelationId)) + { + /* + * citus.distribution_columns is set and the table has a matching column. + * Distribute the table by that column. Because CREATE TABLE AS SELECT + * already loaded data into the local table, CreateDistributedTable will + * move the data to the shards automatically. + */ + AutoDistributeNewTable(createdRelationId); + } /* * We simply ignore the tables created by using that syntax when using @@ -4295,9 +5039,10 @@ ConvertNewTableIfNecessary(Node *createStmt) } /* - * Check ShouldCreateTenantSchemaTable() before ShouldAddNewTableToMetadata() - * because we don't want to unnecessarily add the table into metadata - * (as a Citus managed table) before distributing it as a tenant table. + * Check ShouldCreateTenantSchemaTable() before ShouldAutoDistributeNewTable() + * and ShouldAddNewTableToMetadata() because we don't want to unnecessarily + * add the table into metadata (as a Citus managed table) before distributing + * it as a tenant table. */ if (ShouldCreateTenantSchemaTable(createdRelationId)) { @@ -4311,6 +5056,15 @@ ConvertNewTableIfNecessary(Node *createStmt) CreateTenantSchemaTable(createdRelationId); } } + else if (ShouldAutoDistributeNewTable(createdRelationId)) + { + /* + * citus.distribution_columns is set and the table has a matching column. + * For CREATE TABLE (without AS SELECT), the table is empty so this is + * very fast — no data movement needed. + */ + AutoDistributeNewTable(createdRelationId); + } else if (ShouldAddNewTableToMetadata(createdRelationId)) { /* diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index f09d7ced39e..d1246ca2f5b 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -349,8 +349,24 @@ citus_ProcessUtility(PlannedStmt *pstmt, PG_TRY(); { - citus_ProcessUtilityInternal(pstmt, queryString, context, params, queryEnv, dest, - completionTag); + /* + * For CREATE TABLE AS SELECT with auto-distribution enabled, + * try the optimized path that avoids pulling data through the + * coordinator. If successful, skip normal CTAS processing. + */ + bool ctasHandled = false; + if (context == PROCESS_UTILITY_TOPLEVEL && + IsA(parsetree, CreateTableAsStmt)) + { + ctasHandled = TryOptimizeCTASForAutoDistribution( + (CreateTableAsStmt *) parsetree, queryString); + } + + if (!ctasHandled) + { + citus_ProcessUtilityInternal(pstmt, queryString, context, params, + queryEnv, dest, completionTag); + } if (UtilityHookLevel == 1) { @@ -370,6 +386,7 @@ citus_ProcessUtility(PlannedStmt *pstmt, * to create a tenant schema table or a Citus managed table. */ if (context == PROCESS_UTILITY_TOPLEVEL && + !ctasHandled && (IsA(parsetree, CreateStmt) || IsA(parsetree, CreateForeignTableStmt) || IsA(parsetree, CreateTableAsStmt))) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index d11a4257bc6..accb6b773d2 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1251,6 +1251,21 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, ErrorIfNotASuitableDeadlockFactor, NULL, NULL); + DefineCustomStringVariable( + "citus.distribution_columns", + gettext_noop("Sets a priority list of distribution columns for new tables."), + gettext_noop("A comma-separated list of column names in priority order " + "(e.g. 'tenant_id,customer_id,department'). When a new table " + "is created, Citus walks the list in order and distributes " + "the table by the first column name that exists in the table. " + "Applies to CREATE TABLE and CREATE TABLE AS SELECT. " + "Set to empty string to disable."), + &DistributionColumns, + "", + PGC_USERSET, + GUC_STANDARD, + CheckDistributionColumns, AssignDistributionColumns, NULL); + DefineCustomBoolVariable( "citus.enable_alter_database_owner", gettext_noop("Enables propagating ALTER DATABASE ... OWNER TO ... statements to " diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 2d8ed3b2f09..20ca3ebac86 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -26,6 +26,11 @@ extern bool AddAllLocalTablesToMetadata; extern bool EnableSchemaBasedSharding; +extern char *DistributionColumns; +extern List *ParsedDistributionColumns; + +extern bool CheckDistributionColumns(char **newval, void **extra, GucSource source); +extern void AssignDistributionColumns(const char *newval, void *extra); /* controlled via GUC, should be accessed via EnableLocalReferenceForeignKeys() */ extern bool EnableLocalReferenceForeignKeys; @@ -669,6 +674,8 @@ extern char * GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationI char *colname, bool missingTableOk); extern void ErrorIfTableHasIdentityColumn(Oid relationId); +extern bool TryOptimizeCTASForAutoDistribution(CreateTableAsStmt *ctasStmt, + const char *queryString); extern void ConvertNewTableIfNecessary(Node *createStmt); extern void ConvertToTenantTableIfNecessary(AlterObjectSchemaStmt *alterObjectSchemaStmt); diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index fc865ef4e8c..69b2200a25d 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -56,7 +56,7 @@ vanilla_diffs_file = $(citus_abs_srcdir)/pg_vanilla_outputs/$(MAJORVERSION)/regr # intermediate, for muscle memory backward compatibility. check: check-full check-enterprise-full # check-full triggers all tests that ought to be run routinely -check-full: check-multi check-multi-mx check-multi-1 check-multi-1-create-citus check-operations check-add-backup-node check-follower-cluster check-isolation check-failure check-split check-vanilla check-columnar check-columnar-isolation check-pg-upgrade check-arbitrary-configs check-citus-upgrade check-citus-upgrade-mixed check-citus-upgrade-local check-citus-upgrade-mixed-local check-pytest check-query-generator check-tap +check-full: check-multi check-multi-mx check-multi-1 check-multi-1-create-citus check-post-citus14 check-operations check-add-backup-node check-follower-cluster check-isolation check-failure check-split check-vanilla check-columnar check-columnar-isolation check-pg-upgrade check-arbitrary-configs check-citus-upgrade check-citus-upgrade-mixed check-citus-upgrade-local check-citus-upgrade-mixed-local check-pytest check-query-generator check-tap # check-enterprise-full triggers all enterprise specific tests check-enterprise-full: check-enterprise check-enterprise-isolation check-enterprise-failure check-enterprise-isolation-logicalrep-1 check-enterprise-isolation-logicalrep-2 check-enterprise-isolation-logicalrep-3 @@ -173,6 +173,10 @@ check-multi-1: all $(pg_regress_multi_check) --load-extension=citus \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_1_schedule $(EXTRA_TESTS) +check-post-citus14: all + $(pg_regress_multi_check) --load-extension=citus \ + -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/post_citus14_schedule $(EXTRA_TESTS) + check-multi-1-create-citus: all $(pg_regress_multi_check) --load-extension=citus \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_1_create_citus_schedule $(EXTRA_TESTS) diff --git a/src/test/regress/expected/auto_distribution_columns.out b/src/test/regress/expected/auto_distribution_columns.out new file mode 100644 index 00000000000..da09972e431 --- /dev/null +++ b/src/test/regress/expected/auto_distribution_columns.out @@ -0,0 +1,1278 @@ +-- +-- AUTO_DISTRIBUTION_COLUMNS +-- +-- Tests for the citus.distribution_columns GUC that auto-distributes +-- tables by a priority list of column names on CREATE TABLE / CREATE TABLE AS SELECT. +-- +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 7800000; +-- add a worker so we can actually distribute +SELECT 1 FROM citus_add_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- ===== Basic: single column in list ===== +SET citus.distribution_columns TO 'tenant_id'; +CREATE TABLE t_basic (id bigserial, tenant_id bigint, data text); +NOTICE: auto-distributing table "t_basic" by column "tenant_id" (from citus.distribution_columns) +-- verify it was auto-distributed by tenant_id +SELECT distribution_column FROM citus_tables WHERE table_name = 't_basic'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +DROP TABLE t_basic; +-- ===== Priority list: first match wins ===== +SET citus.distribution_columns TO 'tenant_id, customer_id, department'; +-- Table has tenant_id → should distribute by tenant_id +CREATE TABLE t_prio1 (id int, tenant_id int, customer_id int, department text); +NOTICE: auto-distributing table "t_prio1" by column "tenant_id" (from citus.distribution_columns) +SELECT distribution_column FROM citus_tables WHERE table_name = 't_prio1'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +-- Table only has customer_id → should distribute by customer_id +CREATE TABLE t_prio2 (id int, customer_id int, department text); +NOTICE: auto-distributing table "t_prio2" by column "customer_id" (from citus.distribution_columns) +SELECT distribution_column FROM citus_tables WHERE table_name = 't_prio2'::regclass; + distribution_column +--------------------------------------------------------------------- + customer_id +(1 row) + +-- Table only has department → should distribute by department +CREATE TABLE t_prio3 (id int, department text); +NOTICE: auto-distributing table "t_prio3" by column "department" (from citus.distribution_columns) +SELECT distribution_column FROM citus_tables WHERE table_name = 't_prio3'::regclass; + distribution_column +--------------------------------------------------------------------- + department +(1 row) + +-- Table has none of the columns → should NOT be distributed +CREATE TABLE t_prio_none (id int, other_col text); +SELECT count(*) FROM citus_tables WHERE table_name = 't_prio_none'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP TABLE t_prio1, t_prio2, t_prio3, t_prio_none; +-- ===== Priority list fallback in CTAS ===== +-- First token doesn't match, second does → should distribute by tenant_id +RESET citus.distribution_columns; +CREATE TABLE source_data (id int, tenant_id int, val text); +SELECT create_distributed_table('source_data', 'tenant_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_data VALUES (1, 10, 'a'), (2, 20, 'b'), (3, 10, 'c'); +SET citus.distribution_columns TO 'nonexistent, tenant_id'; +CREATE TABLE t_ctas_fallback AS SELECT * FROM source_data; +NOTICE: auto-distributing table "t_ctas_fallback" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "t_ctas_fallback" auto-distributed by column "tenant_id", using INSERT...SELECT for data +SELECT distribution_column FROM citus_tables WHERE table_name = 't_ctas_fallback'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +DROP TABLE t_ctas_fallback; +DROP TABLE source_data; +-- ===== Whitespace handling in list ===== +SET citus.distribution_columns TO ' tenant_id , customer_id '; +CREATE TABLE t_ws (id int, customer_id int); +NOTICE: auto-distributing table "t_ws" by column "customer_id" (from citus.distribution_columns) +SELECT distribution_column FROM citus_tables WHERE table_name = 't_ws'::regclass; + distribution_column +--------------------------------------------------------------------- + customer_id +(1 row) + +DROP TABLE t_ws; +-- ===== Empty / disabled ===== +SET citus.distribution_columns TO ''; +CREATE TABLE t_disabled (id int, tenant_id int); +SELECT count(*) FROM citus_tables WHERE table_name = 't_disabled'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP TABLE t_disabled; +RESET citus.distribution_columns; +CREATE TABLE t_reset (id int, tenant_id int); +SELECT count(*) FROM citus_tables WHERE table_name = 't_reset'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP TABLE t_reset; +-- ===== Temp tables should NOT be auto-distributed ===== +SET citus.distribution_columns TO 'tenant_id'; +CREATE TEMP TABLE t_temp (id int, tenant_id int); +-- should not appear in citus_tables (temp tables can't be distributed) +SELECT count(*) FROM citus_tables WHERE table_name = 't_temp'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP TABLE t_temp; +-- ===== Schema-based sharding takes precedence ===== +SET citus.enable_schema_based_sharding TO ON; +SET citus.distribution_columns TO 'tenant_id'; +CREATE SCHEMA auto_dist_tenant_schema; +CREATE TABLE auto_dist_tenant_schema.t_tenant (id int, tenant_id int); +-- should be a single-shard (tenant) table, not hash-distributed by tenant_id +SELECT distribution_column FROM citus_tables WHERE table_name = 'auto_dist_tenant_schema.t_tenant'::regclass; + distribution_column +--------------------------------------------------------------------- + +(1 row) + +BEGIN; + SET LOCAL client_min_messages TO WARNING; + DROP SCHEMA auto_dist_tenant_schema CASCADE; +COMMIT; +RESET citus.enable_schema_based_sharding; +-- ===== NOTICE message shows which column is chosen ===== +SET citus.distribution_columns TO 'nonexistent, department'; +CREATE TABLE t_notice (id int, department text); +NOTICE: auto-distributing table "t_notice" by column "department" (from citus.distribution_columns) +-- The NOTICE should say: auto-distributing table "t_notice" by column "department" +DROP TABLE t_notice; +-- ===== Colocated tables ===== +SET citus.distribution_columns TO 'tenant_id'; +CREATE TABLE t_coloc1 (id int, tenant_id int); +NOTICE: auto-distributing table "t_coloc1" by column "tenant_id" (from citus.distribution_columns) +CREATE TABLE t_coloc2 (id int, tenant_id int); +NOTICE: auto-distributing table "t_coloc2" by column "tenant_id" (from citus.distribution_columns) +-- both should be colocated (same distribution column type, same shard count) +SELECT c1.colocation_id = c2.colocation_id AS colocated +FROM citus_tables c1, citus_tables c2 +WHERE c1.table_name = 't_coloc1'::regclass + AND c2.table_name = 't_coloc2'::regclass; + colocated +--------------------------------------------------------------------- + t +(1 row) + +DROP TABLE t_coloc1, t_coloc2; +-- ===== Reference tables: use SET LOCAL to disable GUC temporarily ===== +SET citus.distribution_columns TO 'tenant_id'; +-- A table with a matching column gets auto-distributed as hash +CREATE TABLE lookup_bad (id int, tenant_id int, name text); +NOTICE: auto-distributing table "lookup_bad" by column "tenant_id" (from citus.distribution_columns) +-- This would fail because table is already distributed: +-- SELECT create_reference_table('lookup_bad'); +SELECT citus_table_type FROM citus_tables WHERE table_name = 'lookup_bad'::regclass; + citus_table_type +--------------------------------------------------------------------- + distributed +(1 row) + +DROP TABLE lookup_bad; +-- The correct pattern: use SET LOCAL inside a transaction to temporarily +-- disable the GUC, then create the reference table normally +BEGIN; + SET LOCAL citus.distribution_columns TO ''; + CREATE TABLE lookup_ref (id int, tenant_id int, name text); + SELECT create_reference_table('lookup_ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +-- verify it's a reference table, not hash-distributed +SELECT citus_table_type FROM citus_tables WHERE table_name = 'lookup_ref'::regclass; + citus_table_type +--------------------------------------------------------------------- + reference +(1 row) + +-- also works for tables that have no matching column (no GUC conflict) +CREATE TABLE no_match_ref (id int, code text); +-- no matching column → table is local, so we can make it a reference table +SELECT create_reference_table('no_match_ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_table_type FROM citus_tables WHERE table_name = 'no_match_ref'::regclass; + citus_table_type +--------------------------------------------------------------------- + reference +(1 row) + +DROP TABLE lookup_ref, no_match_ref; +-- ===== Partitioned tables: parent auto-distributed, partitions follow ===== +SET citus.distribution_columns TO 'tenant_id'; +-- Range-partitioned table +CREATE TABLE orders ( + id int, + tenant_id int, + order_date date, + amount numeric +) PARTITION BY RANGE (order_date); +NOTICE: auto-distributing table "orders" by column "tenant_id" (from citus.distribution_columns) +-- parent should be auto-distributed by tenant_id +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'orders'::regclass; + distribution_column | citus_table_type +--------------------------------------------------------------------- + tenant_id | distributed +(1 row) + +-- create partitions — they should inherit the distribution from the parent +CREATE TABLE orders_2024 PARTITION OF orders + FOR VALUES FROM ('2024-01-01') TO ('2025-01-01'); +CREATE TABLE orders_2025 PARTITION OF orders + FOR VALUES FROM ('2025-01-01') TO ('2026-01-01'); +-- partitions should also be distributed by tenant_id +SELECT p.table_name::text, distribution_column +FROM citus_tables p +WHERE p.table_name::text IN ('orders_2024', 'orders_2025') +ORDER BY p.table_name::text; + table_name | distribution_column +--------------------------------------------------------------------- + orders_2024 | tenant_id + orders_2025 | tenant_id +(2 rows) + +-- insert data and verify it goes to the right partitions +INSERT INTO orders VALUES (1, 10, '2024-06-15', 100.00); +INSERT INTO orders VALUES (2, 20, '2025-03-01', 200.00); +SELECT count(*) FROM orders; + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(*) FROM orders_2024; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT count(*) FROM orders_2025; + count +--------------------------------------------------------------------- + 1 +(1 row) + +DROP TABLE orders; +-- List-partitioned table +CREATE TABLE events ( + id int, + tenant_id int, + event_type text, + payload text +) PARTITION BY LIST (event_type); +NOTICE: auto-distributing table "events" by column "tenant_id" (from citus.distribution_columns) +SELECT distribution_column FROM citus_tables WHERE table_name = 'events'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +CREATE TABLE events_click PARTITION OF events FOR VALUES IN ('click'); +CREATE TABLE events_view PARTITION OF events FOR VALUES IN ('view'); +SELECT p.table_name::text, distribution_column +FROM citus_tables p +WHERE p.table_name::text IN ('events_click', 'events_view') +ORDER BY p.table_name::text; + table_name | distribution_column +--------------------------------------------------------------------- + events_click | tenant_id + events_view | tenant_id +(2 rows) + +INSERT INTO events VALUES (1, 10, 'click', 'data1'), (2, 20, 'view', 'data2'); +SELECT count(*) FROM events; + count +--------------------------------------------------------------------- + 2 +(1 row) + +DROP TABLE events; +-- Hash-partitioned table +CREATE TABLE metrics ( + id int, + tenant_id int, + metric_name text, + value float +) PARTITION BY HASH (id); +NOTICE: auto-distributing table "metrics" by column "tenant_id" (from citus.distribution_columns) +SELECT distribution_column FROM citus_tables WHERE table_name = 'metrics'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +CREATE TABLE metrics_p0 PARTITION OF metrics FOR VALUES WITH (MODULUS 2, REMAINDER 0); +CREATE TABLE metrics_p1 PARTITION OF metrics FOR VALUES WITH (MODULUS 2, REMAINDER 1); +SELECT p.table_name::text, distribution_column +FROM citus_tables p +WHERE p.table_name::text IN ('metrics_p0', 'metrics_p1') +ORDER BY p.table_name::text; + table_name | distribution_column +--------------------------------------------------------------------- + metrics_p0 | tenant_id + metrics_p1 | tenant_id +(2 rows) + +DROP TABLE metrics; +-- ===== Partitioned table with no matching column stays local ===== +CREATE TABLE local_partitioned ( + id int, + created_at date +) PARTITION BY RANGE (created_at); +-- no tenant_id column → should NOT be distributed +SELECT count(*) FROM citus_tables WHERE table_name = 'local_partitioned'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +CREATE TABLE local_partitioned_2024 PARTITION OF local_partitioned + FOR VALUES FROM ('2024-01-01') TO ('2025-01-01'); +SELECT count(*) FROM citus_tables WHERE table_name = 'local_partitioned_2024'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP TABLE local_partitioned; +-- ===== ATTACH PARTITION to an already auto-distributed table ===== +CREATE TABLE sales ( + id int, + tenant_id int, + sale_date date +) PARTITION BY RANGE (sale_date); +NOTICE: auto-distributing table "sales" by column "tenant_id" (from citus.distribution_columns) +-- auto-distributed +SELECT distribution_column FROM citus_tables WHERE table_name = 'sales'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +-- create a standalone table, then attach it as a partition +RESET citus.distribution_columns; +CREATE TABLE sales_2026 (id int, tenant_id int, sale_date date); +-- not distributed yet +SELECT count(*) FROM citus_tables WHERE table_name = 'sales_2026'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SET citus.distribution_columns TO 'tenant_id'; +ALTER TABLE sales ATTACH PARTITION sales_2026 FOR VALUES FROM ('2026-01-01') TO ('2027-01-01'); +-- now it should be distributed as part of the parent +SELECT distribution_column FROM citus_tables WHERE table_name = 'sales_2026'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +DROP TABLE sales; +-- ===== Views should NOT be auto-distributed ===== +CREATE TABLE base_for_view (id int, tenant_id int, val text); +NOTICE: auto-distributing table "base_for_view" by column "tenant_id" (from citus.distribution_columns) +-- base table gets distributed +SELECT distribution_column FROM citus_tables WHERE table_name = 'base_for_view'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +CREATE VIEW v_base AS SELECT * FROM base_for_view; +-- views are not in citus_tables +SELECT count(*) FROM citus_tables WHERE table_name = 'v_base'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP VIEW v_base; +DROP TABLE base_for_view; +-- ===== IF NOT EXISTS on an already-distributed table ===== +CREATE TABLE t_ifne (id int, tenant_id int); +NOTICE: auto-distributing table "t_ifne" by column "tenant_id" (from citus.distribution_columns) +SELECT distribution_column FROM citus_tables WHERE table_name = 't_ifne'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +-- should not error, just skip +CREATE TABLE IF NOT EXISTS t_ifne (id int, tenant_id int); +NOTICE: relation "t_ifne" already exists, skipping +SELECT distribution_column FROM citus_tables WHERE table_name = 't_ifne'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +DROP TABLE t_ifne; +-- ===== Multiple tables in sequence (different distribution columns) ===== +SET citus.distribution_columns TO 'org_id, tenant_id'; +CREATE TABLE by_org (id int, org_id int); +NOTICE: auto-distributing table "by_org" by column "org_id" (from citus.distribution_columns) +CREATE TABLE by_tenant (id int, tenant_id int); +NOTICE: auto-distributing table "by_tenant" by column "tenant_id" (from citus.distribution_columns) +CREATE TABLE by_org_and_tenant (id int, org_id int, tenant_id int); +NOTICE: auto-distributing table "by_org_and_tenant" by column "org_id" (from citus.distribution_columns) +-- org_id wins for table that has both +SELECT table_name::text, distribution_column +FROM citus_tables +WHERE table_name::text IN ('by_org', 'by_tenant', 'by_org_and_tenant') +ORDER BY table_name::text; + table_name | distribution_column +--------------------------------------------------------------------- + by_org | org_id + by_org_and_tenant | org_id + by_tenant | tenant_id +(3 rows) + +DROP TABLE by_org, by_tenant, by_org_and_tenant; +-- ===== Foreign tables should NOT be auto-distributed ===== +SET citus.distribution_columns TO 'tenant_id'; +CREATE FOREIGN TABLE t_foreign (id int, tenant_id int) + SERVER fake_fdw_server; +-- foreign tables cannot be hash-distributed, should be skipped +SELECT count(*) FROM citus_tables WHERE table_name = 't_foreign'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP FOREIGN TABLE t_foreign; +-- ===== Unlogged tables SHOULD be auto-distributed ===== +CREATE UNLOGGED TABLE t_unlogged (id int, tenant_id int); +NOTICE: auto-distributing table "t_unlogged" by column "tenant_id" (from citus.distribution_columns) +SELECT distribution_column FROM citus_tables WHERE table_name = 't_unlogged'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +DROP TABLE t_unlogged; +-- ===== Materialized views should NOT be auto-distributed ===== +-- create a source table first (reset GUC to avoid auto-distribution) +RESET citus.distribution_columns; +CREATE TABLE matview_source (id int, tenant_id int, val text); +SELECT create_distributed_table('matview_source', 'tenant_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO matview_source VALUES (1, 10, 'a'), (2, 20, 'b'); +SET citus.distribution_columns TO 'tenant_id'; +CREATE MATERIALIZED VIEW mv_test AS SELECT * FROM matview_source; +-- matviews should NOT appear in citus_tables +SELECT count(*) FROM citus_tables WHERE table_name = 'mv_test'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP MATERIALIZED VIEW mv_test; +DROP TABLE matview_source; +-- ===== SELECT INTO (another form of CTAS) ===== +RESET citus.distribution_columns; +CREATE TABLE select_into_source (id int, tenant_id int, data text); +SELECT create_distributed_table('select_into_source', 'tenant_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO select_into_source VALUES (1, 10, 'x'), (2, 20, 'y'); +SET citus.distribution_columns TO 'tenant_id'; +SELECT * INTO t_select_into FROM select_into_source; +NOTICE: auto-distributing table "t_select_into" by column "tenant_id" (from citus.distribution_columns) +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.t_select_into$$) +-- should be auto-distributed by tenant_id +SELECT distribution_column FROM citus_tables WHERE table_name = 't_select_into'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +SELECT count(*) FROM t_select_into; + count +--------------------------------------------------------------------- + 2 +(1 row) + +DROP TABLE t_select_into; +DROP TABLE select_into_source; +-- ===== CREATE TABLE ... LIKE ===== +RESET citus.distribution_columns; +CREATE TABLE template_table (id int, tenant_id int, name text, created_at timestamptz DEFAULT now()); +SET citus.distribution_columns TO 'tenant_id'; +CREATE TABLE t_like (LIKE template_table INCLUDING ALL); +NOTICE: auto-distributing table "t_like" by column "tenant_id" (from citus.distribution_columns) +-- should be auto-distributed by tenant_id (inherited from LIKE) +SELECT distribution_column FROM citus_tables WHERE table_name = 't_like'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +DROP TABLE t_like; +DROP TABLE template_table; +-- ===== Table inheritance (INHERITS) should NOT be auto-distributed ===== +RESET citus.distribution_columns; +CREATE TABLE parent_inherit (id int, tenant_id int); +SET citus.distribution_columns TO 'tenant_id'; +CREATE TABLE child_inherit (extra text) INHERITS (parent_inherit); +-- Citus doesn't support distributing tables with inheritance, should be skipped +SELECT count(*) FROM citus_tables WHERE table_name = 'child_inherit'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- parent with children should also not be auto-distributed +SELECT count(*) FROM citus_tables WHERE table_name = 'parent_inherit'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP TABLE child_inherit; +DROP TABLE parent_inherit; +-- ===== Non-hashable distribution column type (jsonb) ===== +-- In PG 18+, jsonb has a hash function, so it CAN be distributed. +-- This verifies auto-distribution works with non-trivial column types. +CREATE TABLE t_jsonb (id int, tenant_id jsonb); +NOTICE: auto-distributing table "t_jsonb" by column "tenant_id" (from citus.distribution_columns) +SELECT distribution_column FROM citus_tables WHERE table_name = 't_jsonb'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +DROP TABLE t_jsonb; +-- ===== Quoted / case-sensitive column names ===== +-- GUC value uses standard PG identifier rules via SplitIdentifierString: +-- unquoted names are downcased, quoted names preserve case. +CREATE TABLE t_case1 (id int, tenant_id int); +NOTICE: auto-distributing table "t_case1" by column "tenant_id" (from citus.distribution_columns) +-- 'tenant_id' in GUC (already lowercase) matches 'tenant_id' → distributed +SELECT distribution_column FROM citus_tables WHERE table_name = 't_case1'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +DROP TABLE t_case1; +-- Quoted column name preserves case in the table +CREATE TABLE t_case2 (id int, "Tenant_Id" int); +-- GUC 'tenant_id' (lowercase) does NOT match "Tenant_Id" → should NOT be distributed +SELECT count(*) FROM citus_tables WHERE table_name = 't_case2'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP TABLE t_case2; +-- Unquoted mixed-case GUC value gets downcased → does NOT match quoted column +SET citus.distribution_columns TO 'Tenant_Id'; +CREATE TABLE t_case3a (id int, "Tenant_Id" int); +-- 'Tenant_Id' downcased to 'tenant_id', does NOT match "Tenant_Id" → NOT distributed +SELECT count(*) FROM citus_tables WHERE table_name = 't_case3a'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP TABLE t_case3a; +-- Quoted GUC value preserves case → matches quoted column +SET citus.distribution_columns TO '"Tenant_Id"'; +CREATE TABLE t_case3b (id int, "Tenant_Id" int); +NOTICE: auto-distributing table "t_case3b" by column "Tenant_Id" (from citus.distribution_columns) +-- '"Tenant_Id"' preserves case → matches column "Tenant_Id" → distributed +SELECT distribution_column FROM citus_tables WHERE table_name = 't_case3b'::regclass; + distribution_column +--------------------------------------------------------------------- + Tenant_Id +(1 row) + +DROP TABLE t_case3b; +SET citus.distribution_columns TO 'tenant_id'; +-- ===== UNIQUE constraint without distribution column → error on CREATE ===== +-- UNIQUE on non-distribution column causes auto-distribution to fail, +-- which rolls back the entire CREATE TABLE statement +CREATE TABLE t_unique_bad (id int UNIQUE, tenant_id int); +NOTICE: auto-distributing table "t_unique_bad" by column "tenant_id" (from citus.distribution_columns) +ERROR: cannot create constraint on "t_unique_bad" +DETAIL: Distributed relations cannot have UNIQUE, EXCLUDE, or PRIMARY KEY constraints that do not include the partition column (with an equality operator if EXCLUDE). +-- Should error: cannot create constraint ... that does not include partition column +-- UNIQUE including the distribution column → should succeed +CREATE TABLE t_unique_good (id int, tenant_id int, UNIQUE(tenant_id, id)); +NOTICE: auto-distributing table "t_unique_good" by column "tenant_id" (from citus.distribution_columns) +SELECT distribution_column FROM citus_tables WHERE table_name = 't_unique_good'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +DROP TABLE t_unique_good; +-- ===== Transaction rollback: auto-distributed table should not persist ===== +BEGIN; + CREATE TABLE t_rollback (id int, tenant_id int); +NOTICE: auto-distributing table "t_rollback" by column "tenant_id" (from citus.distribution_columns) + -- should exist inside the transaction + SELECT distribution_column FROM citus_tables WHERE table_name = 't_rollback'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +ROLLBACK; +-- should not exist after rollback +SELECT count(*) FROM pg_class WHERE relname = 't_rollback'; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- ===== Transaction commit: auto-distributed table should persist ===== +BEGIN; + CREATE TABLE t_commit (id int, tenant_id int); +NOTICE: auto-distributing table "t_commit" by column "tenant_id" (from citus.distribution_columns) + SELECT distribution_column FROM citus_tables WHERE table_name = 't_commit'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +COMMIT; +SELECT distribution_column FROM citus_tables WHERE table_name = 't_commit'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +DROP TABLE t_commit; +-- ===== Multiple tables in one transaction ===== +BEGIN; + CREATE TABLE t_txn1 (id int, tenant_id int); +NOTICE: auto-distributing table "t_txn1" by column "tenant_id" (from citus.distribution_columns) + CREATE TABLE t_txn2 (id int, tenant_id int); +NOTICE: auto-distributing table "t_txn2" by column "tenant_id" (from citus.distribution_columns) +COMMIT; +SELECT table_name::text, distribution_column +FROM citus_tables +WHERE table_name::text IN ('t_txn1', 't_txn2') +ORDER BY table_name::text; + table_name | distribution_column +--------------------------------------------------------------------- + t_txn1 | tenant_id + t_txn2 | tenant_id +(2 rows) + +DROP TABLE t_txn1, t_txn2; +-- ===== Interaction with citus.use_citus_managed_tables ===== +SET citus.use_citus_managed_tables TO ON; +SET citus.distribution_columns TO 'tenant_id'; +-- table with matching column → auto-distributed (distribution_columns wins) +CREATE TABLE t_guc_interact1 (id int, tenant_id int); +NOTICE: auto-distributing table "t_guc_interact1" by column "tenant_id" (from citus.distribution_columns) +SELECT citus_table_type, distribution_column +FROM citus_tables WHERE table_name = 't_guc_interact1'::regclass; + citus_table_type | distribution_column +--------------------------------------------------------------------- + distributed | tenant_id +(1 row) + +-- table without matching column → becomes citus managed table +CREATE TABLE t_guc_interact2 (id int, other_col text); +SELECT citus_table_type +FROM citus_tables WHERE table_name = 't_guc_interact2'::regclass; + citus_table_type +--------------------------------------------------------------------- + local +(1 row) + +DROP TABLE t_guc_interact1, t_guc_interact2; +RESET citus.use_citus_managed_tables; +-- ===== Table in non-public schema ===== +CREATE SCHEMA auto_dist_test_schema; +CREATE TABLE auto_dist_test_schema.t_schema (id int, tenant_id int); +NOTICE: auto-distributing table "t_schema" by column "tenant_id" (from citus.distribution_columns) +SELECT distribution_column FROM citus_tables +WHERE table_name = 'auto_dist_test_schema.t_schema'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +DROP TABLE auto_dist_test_schema.t_schema; +DROP SCHEMA auto_dist_test_schema; +-- ===== ALTER TABLE ADD COLUMN should NOT retroactively distribute ===== +RESET citus.distribution_columns; +CREATE TABLE t_alter_add (id int, other_col text); +-- not distributed (no GUC, no matching column) +SELECT count(*) FROM citus_tables WHERE table_name = 't_alter_add'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SET citus.distribution_columns TO 'tenant_id'; +ALTER TABLE t_alter_add ADD COLUMN tenant_id int; +-- should still NOT be distributed (auto-distribution only on CREATE TABLE) +SELECT count(*) FROM citus_tables WHERE table_name = 't_alter_add'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP TABLE t_alter_add; +-- ===== Empty tokens in GUC list (double comma) ===== +-- SplitIdentifierString rejects empty identifiers between commas +SET citus.distribution_columns TO 'nonexistent,,tenant_id'; +ERROR: invalid value for parameter "citus.distribution_columns": "nonexistent,,tenant_id" +-- GUC was not changed (SET failed), so previous value is still active +CREATE TABLE t_double_comma (id int, tenant_id int); +NOTICE: auto-distributing table "t_double_comma" by column "tenant_id" (from citus.distribution_columns) +SELECT distribution_column FROM citus_tables WHERE table_name = 't_double_comma'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +DROP TABLE t_double_comma; +-- ============================================================================= +-- CTAS WITH NESTED QUERIES +-- ============================================================================= +-- Test CREATE TABLE AS SELECT with various source query patterns to verify +-- auto-distribution works correctly and data is fully preserved. +SET citus.distribution_columns TO 'tenant_id'; +-- Setup: create source tables with known data +-- Distributed table WITH tenant_id (matches GUC) +RESET citus.distribution_columns; +CREATE TABLE src_distributed (id int, tenant_id int, val text); +SELECT create_distributed_table('src_distributed', 'tenant_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO src_distributed VALUES (1,10,'a'),(2,10,'b'),(3,20,'c'),(4,20,'d'),(5,30,'e'); +-- Distributed table WITHOUT tenant_id (no matching GUC column) +CREATE TABLE src_no_match (id int, category_id int, info text); +SELECT create_distributed_table('src_no_match', 'category_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO src_no_match VALUES (1,100,'x'),(2,100,'y'),(3,200,'z'),(4,300,'w'),(5,300,'v'); +-- Reference table +CREATE TABLE src_ref (code int, label text); +SELECT create_reference_table('src_ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO src_ref VALUES (10,'ten'),(20,'twenty'),(30,'thirty'); +-- Local (plain) table +CREATE TABLE src_local (id int, tenant_id int, note text); +INSERT INTO src_local VALUES (1,10,'n1'),(2,20,'n2'),(3,30,'n3'); +SET citus.distribution_columns TO 'tenant_id'; +-- ----- CTAS from a nested join query ----- +CREATE TABLE ctas_join AS ( + SELECT d.id, d.tenant_id, d.val, r.label + FROM src_distributed d + JOIN src_ref r ON d.tenant_id = r.code +); +NOTICE: auto-distributing table "ctas_join" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "ctas_join" auto-distributed by column "tenant_id", using INSERT...SELECT for data +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_join'::regclass; + distribution_column | citus_table_type +--------------------------------------------------------------------- + tenant_id | distributed +(1 row) + +-- row count must match and be non-empty +SELECT count(*) AS join_count FROM ctas_join; + join_count +--------------------------------------------------------------------- + 5 +(1 row) + +SELECT (SELECT count(*) FROM ctas_join) = + (SELECT count(*) FROM src_distributed d JOIN src_ref r ON d.tenant_id = r.code) + AS counts_match; + counts_match +--------------------------------------------------------------------- + t +(1 row) + +DROP TABLE ctas_join; +-- ----- CTAS from a distributed table that does NOT have the GUC column ----- +-- src_no_match has (id, category_id, info) — no tenant_id column +CREATE TABLE ctas_no_match AS ( + SELECT id, category_id, info FROM src_no_match +); +-- no matching column → should NOT be auto-distributed +SELECT count(*) AS is_distributed FROM citus_tables WHERE table_name = 'ctas_no_match'::regclass; + is_distributed +--------------------------------------------------------------------- + 0 +(1 row) + +-- data must still be complete +SELECT count(*) AS no_match_count FROM ctas_no_match; + no_match_count +--------------------------------------------------------------------- + 5 +(1 row) + +SELECT (SELECT count(*) FROM ctas_no_match) = + (SELECT count(*) FROM src_no_match) + AS counts_match; + counts_match +--------------------------------------------------------------------- + t +(1 row) + +DROP TABLE ctas_no_match; +-- ----- CTAS from a local table ----- +CREATE TABLE ctas_from_local AS ( + SELECT id, tenant_id, note FROM src_local +); +NOTICE: auto-distributing table "ctas_from_local" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "ctas_from_local" auto-distributed by column "tenant_id", using INSERT...SELECT for data +-- has tenant_id → should be auto-distributed +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_from_local'::regclass; + distribution_column | citus_table_type +--------------------------------------------------------------------- + tenant_id | distributed +(1 row) + +SELECT count(*) AS local_count FROM ctas_from_local; + local_count +--------------------------------------------------------------------- + 3 +(1 row) + +DROP TABLE ctas_from_local; +-- ----- CTAS from a distributed table with the same distribution column ----- +CREATE TABLE ctas_same_dist AS ( + SELECT id, tenant_id, val FROM src_distributed +); +NOTICE: auto-distributing table "ctas_same_dist" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "ctas_same_dist" auto-distributed by column "tenant_id", using INSERT...SELECT for data +-- auto-distributed by tenant_id (same as source) +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_same_dist'::regclass; + distribution_column | citus_table_type +--------------------------------------------------------------------- + tenant_id | distributed +(1 row) + +SELECT count(*) AS same_dist_count FROM ctas_same_dist; + same_dist_count +--------------------------------------------------------------------- + 5 +(1 row) + +DROP TABLE ctas_same_dist; +-- ----- CTAS from a reference table ----- +CREATE TABLE ctas_from_ref AS ( + SELECT code AS tenant_id, label FROM src_ref +); +NOTICE: auto-distributing table "ctas_from_ref" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "ctas_from_ref" auto-distributed by column "tenant_id", using INSERT...SELECT for data +-- has tenant_id (aliased from code) → should be auto-distributed +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_from_ref'::regclass; + distribution_column | citus_table_type +--------------------------------------------------------------------- + tenant_id | distributed +(1 row) + +SELECT count(*) AS ref_count FROM ctas_from_ref; + ref_count +--------------------------------------------------------------------- + 3 +(1 row) + +DROP TABLE ctas_from_ref; +-- ----- CTAS from a multi-table nested subquery with aggregation ----- +CREATE TABLE ctas_nested_agg AS ( + SELECT sub.tenant_id, sub.total_val, r.label + FROM ( + SELECT tenant_id, count(*) AS total_val + FROM src_distributed + GROUP BY tenant_id + ) sub + JOIN src_ref r ON sub.tenant_id = r.code +); +NOTICE: auto-distributing table "ctas_nested_agg" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "ctas_nested_agg" auto-distributed by column "tenant_id", using INSERT...SELECT for data +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_nested_agg'::regclass; + distribution_column | citus_table_type +--------------------------------------------------------------------- + tenant_id | distributed +(1 row) + +SELECT count(*) AS nested_agg_count FROM ctas_nested_agg; + nested_agg_count +--------------------------------------------------------------------- + 3 +(1 row) + +DROP TABLE ctas_nested_agg; +-- ============================================================================= +-- EXPLAIN CREATE TABLE AS SELECT — plan pushdown analysis +-- ============================================================================= +-- PostgreSQL supports EXPLAIN CREATE TABLE AS SELECT — it shows the plan +-- for the SELECT without actually creating the table. With Citus, this shows +-- whether the query is pushed down to workers or pulled to coordinator. +-- +-- Note: EXPLAIN doesn't trigger auto-distribution (no table is created), +-- so we first EXPLAIN the CTAS to see the plan, then execute the actual +-- CTAS and verify the result. +SET citus.distribution_columns TO 'tenant_id'; +-- Setup source tables +RESET citus.distribution_columns; +CREATE TABLE explain_src (id int, tenant_id int, val text); +SELECT create_distributed_table('explain_src', 'tenant_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO explain_src VALUES (1,10,'a'),(2,20,'b'),(3,30,'c'); +SET citus.distribution_columns TO 'tenant_id'; +-- Case 1: CTAS from a distributed table with the SAME distribution column +-- The source table is distributed by tenant_id, the new table will also +-- be auto-distributed by tenant_id → same colocation group +EXPLAIN (COSTS FALSE) CREATE TABLE ctas_explain_same AS + SELECT * FROM explain_src; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on explain_src_7800205 explain_src +(6 rows) + +-- Now actually create it and verify +CREATE TABLE ctas_explain_same AS + SELECT * FROM explain_src; +NOTICE: auto-distributing table "ctas_explain_same" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "ctas_explain_same" auto-distributed by column "tenant_id", using INSERT...SELECT for data +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_explain_same'::regclass; + distribution_column | citus_table_type +--------------------------------------------------------------------- + tenant_id | distributed +(1 row) + +SELECT a.colocation_id = b.colocation_id AS colocated +FROM citus_tables a, citus_tables b +WHERE a.table_name = 'explain_src'::regclass + AND b.table_name = 'ctas_explain_same'::regclass; + colocated +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) AS row_count FROM ctas_explain_same; + row_count +--------------------------------------------------------------------- + 3 +(1 row) + +DROP TABLE ctas_explain_same; +-- Case 2: CTAS from a table with a DIFFERENT distribution column +-- Source is distributed by category_id with 3 shards. The new table gets +-- tenant_id from an alias, making it auto-distributed by tenant_id with +-- 4 shards. EXPLAIN shows the SELECT plan scanning the source (3 shards). +-- After creation, the new table is NOT co-located with the source +-- (different shard count and distribution column). +RESET citus.distribution_columns; +SET citus.shard_count TO 3; -- different shard count to guarantee non-colocation +CREATE TABLE explain_src_diff (id int, category_id int, val text); +SELECT create_distributed_table('explain_src_diff', 'category_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO explain_src_diff VALUES (1,100,'x'),(2,200,'y'),(3,300,'z'); +SET citus.shard_count TO 4; +SET citus.distribution_columns TO 'tenant_id'; +-- EXPLAIN the CTAS: shows the SELECT plan scanning source with 3 shards +EXPLAIN (COSTS FALSE) CREATE TABLE ctas_explain_diff AS + SELECT id, category_id AS tenant_id, val FROM explain_src_diff; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Task Count: 3 + Tasks Shown: One of 3 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on explain_src_diff_7800213 explain_src_diff +(6 rows) + +-- Now actually create it +CREATE TABLE ctas_explain_diff AS + SELECT id, category_id AS tenant_id, val FROM explain_src_diff; +NOTICE: auto-distributing table "ctas_explain_diff" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "ctas_explain_diff" auto-distributed by column "tenant_id", using INSERT...SELECT for data +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_explain_diff'::regclass; + distribution_column | citus_table_type +--------------------------------------------------------------------- + tenant_id | distributed +(1 row) + +-- NOT co-located with source table (different shard count and column) +SELECT a.colocation_id = b.colocation_id AS colocated +FROM citus_tables a, citus_tables b +WHERE a.table_name = 'explain_src_diff'::regclass + AND b.table_name = 'ctas_explain_diff'::regclass; + colocated +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) AS row_count FROM ctas_explain_diff; + row_count +--------------------------------------------------------------------- + 3 +(1 row) + +DROP TABLE ctas_explain_diff; +-- Case 3: CTAS from a JOIN between two co-located distributed tables +CREATE TABLE explain_items (id int, tenant_id int, qty int); +NOTICE: auto-distributing table "explain_items" by column "tenant_id" (from citus.distribution_columns) +INSERT INTO explain_items VALUES (1,10,5),(2,20,10),(3,30,15); +-- Both explain_src and explain_items are distributed by tenant_id +SELECT a.colocation_id = b.colocation_id AS colocated +FROM citus_tables a, citus_tables b +WHERE a.table_name = 'explain_src'::regclass + AND b.table_name = 'explain_items'::regclass; + colocated +--------------------------------------------------------------------- + t +(1 row) + +-- EXPLAIN the CTAS with a co-located join → should push down +EXPLAIN (COSTS FALSE) CREATE TABLE ctas_explain_join AS + SELECT s.id, s.tenant_id, s.val, i.qty + FROM explain_src s JOIN explain_items i ON s.tenant_id = i.tenant_id; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge Join + Merge Cond: (s.tenant_id = i.tenant_id) + -> Sort + Sort Key: s.tenant_id + -> Seq Scan on explain_src_7800205 s + -> Sort + Sort Key: i.tenant_id + -> Seq Scan on explain_items_7800220 i +(13 rows) + +CREATE TABLE ctas_explain_join AS + SELECT s.id, s.tenant_id, s.val, i.qty + FROM explain_src s JOIN explain_items i ON s.tenant_id = i.tenant_id; +NOTICE: auto-distributing table "ctas_explain_join" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "ctas_explain_join" auto-distributed by column "tenant_id", using INSERT...SELECT for data +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_explain_join'::regclass; + distribution_column | citus_table_type +--------------------------------------------------------------------- + tenant_id | distributed +(1 row) + +SELECT count(*) AS row_count FROM ctas_explain_join; + row_count +--------------------------------------------------------------------- + 3 +(1 row) + +SELECT (SELECT count(*) FROM ctas_explain_join) = + (SELECT count(*) + FROM explain_src s JOIN explain_items i ON s.tenant_id = i.tenant_id) + AS counts_match; + counts_match +--------------------------------------------------------------------- + t +(1 row) + +DROP TABLE ctas_explain_join, explain_items; +DROP TABLE explain_src, explain_src_diff; +-- Cleanup CTAS source tables +DROP TABLE src_distributed, src_no_match, src_ref, src_local; +-- CTAS in tenant schema with distribution_columns set ===== +-- Tenant schema should take precedence over distribution_columns +SET citus.enable_schema_based_sharding TO ON; +SET citus.distribution_columns TO 'tenant_id'; +CREATE SCHEMA tenant_ctas_schema; +CREATE TABLE tenant_ctas_schema.ctas_src (id int, tenant_id int, val text); +INSERT INTO tenant_ctas_schema.ctas_src VALUES (1, 10, 'a'), (2, 20, 'b'), (3, 10, 'c'); +-- CTAS in tenant schema: should be single-shard tenant table, NOT hash-distributed +CREATE TABLE tenant_ctas_schema.t_ctas AS SELECT * FROM tenant_ctas_schema.ctas_src; +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$tenant_ctas_schema.t_ctas$$) +-- Verify it is a single-shard (tenant) table, not hash-distributed by tenant_id +SELECT citus_table_type FROM citus_tables WHERE table_name = 'tenant_ctas_schema.t_ctas'::regclass; + citus_table_type +--------------------------------------------------------------------- + schema +(1 row) + +SELECT count(*) AS row_count FROM tenant_ctas_schema.t_ctas; + row_count +--------------------------------------------------------------------- + 3 +(1 row) + +BEGIN; + SET LOCAL client_min_messages TO WARNING; + DROP SCHEMA tenant_ctas_schema CASCADE; +COMMIT; +RESET citus.enable_schema_based_sharding; +-- CTAS with CTE, TABLE syntax, and parenthesized subquery ===== +-- Test various SQL forms that the AS keyword scanner must handle +-- Temporarily reset GUC so we can manually distribute the source table +RESET citus.distribution_columns; +CREATE TABLE ctas_syntax_src (id int, tenant_id int, val text); +SELECT create_distributed_table('ctas_syntax_src', 'tenant_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO ctas_syntax_src VALUES (1, 10, 'a'), (2, 20, 'b'), (3, 10, 'c'); +-- Re-enable auto-distribution for the CTAS tests +SET citus.distribution_columns TO 'tenant_id'; +-- CTAS with CTE (WITH ... AS ... SELECT) +CREATE TABLE ctas_cte AS WITH src AS (SELECT * FROM ctas_syntax_src WHERE tenant_id = 10) SELECT * FROM src; +NOTICE: auto-distributing table "ctas_cte" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "ctas_cte" auto-distributed by column "tenant_id", using INSERT...SELECT for data +SELECT distribution_column, citus_table_type FROM citus_tables WHERE table_name = 'ctas_cte'::regclass; + distribution_column | citus_table_type +--------------------------------------------------------------------- + tenant_id | distributed +(1 row) + +SELECT count(*) AS row_count FROM ctas_cte; + row_count +--------------------------------------------------------------------- + 2 +(1 row) + +-- CTAS with TABLE keyword +CREATE TABLE ctas_table_kw AS TABLE ctas_syntax_src; +NOTICE: auto-distributing table "ctas_table_kw" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "ctas_table_kw" auto-distributed by column "tenant_id", using INSERT...SELECT for data +SELECT distribution_column, citus_table_type FROM citus_tables WHERE table_name = 'ctas_table_kw'::regclass; + distribution_column | citus_table_type +--------------------------------------------------------------------- + tenant_id | distributed +(1 row) + +SELECT count(*) AS row_count FROM ctas_table_kw; + row_count +--------------------------------------------------------------------- + 3 +(1 row) + +-- CTAS with VALUES keyword +CREATE TABLE ctas_values (id, tenant_id, val) AS VALUES (1, 10, 'a'), (2, 20, 'b'); +NOTICE: auto-distributing table "ctas_values" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "ctas_values" auto-distributed by column "tenant_id", using INSERT...SELECT for data +SELECT distribution_column, citus_table_type FROM citus_tables WHERE table_name = 'ctas_values'::regclass; + distribution_column | citus_table_type +--------------------------------------------------------------------- + tenant_id | distributed +(1 row) + +SELECT count(*) AS row_count FROM ctas_values; + row_count +--------------------------------------------------------------------- + 2 +(1 row) + +DROP TABLE ctas_cte, ctas_table_kw, ctas_values; +-- CTAS with explicit column name override ===== +-- IntoClause.colNames overrides the targetList column names +CREATE TABLE ctas_colnames (a, tenant_id, c) AS SELECT id, tenant_id, val FROM ctas_syntax_src; +NOTICE: auto-distributing table "ctas_colnames" by column "tenant_id" (from citus.distribution_columns) +NOTICE: optimized CTAS: table "ctas_colnames" auto-distributed by column "tenant_id", using INSERT...SELECT for data +-- Should distribute by tenant_id (the overridden name matches) +SELECT distribution_column FROM citus_tables WHERE table_name = 'ctas_colnames'::regclass; + distribution_column +--------------------------------------------------------------------- + tenant_id +(1 row) + +SELECT count(*) AS row_count FROM ctas_colnames; + row_count +--------------------------------------------------------------------- + 3 +(1 row) + +-- Override renames 'tenant_id' to 'other_name' — should NOT match when +-- 'other_name' is not in the distribution_columns GUC +CREATE TABLE ctas_rename (a, other_name, c) AS SELECT id, tenant_id, val FROM ctas_syntax_src; +SELECT count(*) FROM citus_tables WHERE table_name = 'ctas_rename'::regclass; + count +--------------------------------------------------------------------- + 0 +(1 row) + +DROP TABLE ctas_colnames; +DROP TABLE IF EXISTS ctas_rename; +DROP TABLE ctas_syntax_src; +-- ===== Cleanup ===== +RESET citus.distribution_columns; +RESET citus.shard_count; +RESET citus.shard_replication_factor; +SELECT citus_remove_node('localhost', :worker_1_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/post_citus14_schedule b/src/test/regress/post_citus14_schedule new file mode 100644 index 00000000000..71f63bbb074 --- /dev/null +++ b/src/test/regress/post_citus14_schedule @@ -0,0 +1,17 @@ +# ---------- +# post_citus14_schedule +# +# Tests for features introduced after Citus 14. These tests are expected +# to fail in n/n-1 mode since the worker nodes run the previous major +# version which does not have these features. +# +# This schedule is meant to be run standalone (not appended to multi_1_schedule) +# so it includes the minimal cluster setup as its first tests. +# ---------- + +# --- cluster setup (same as minimal_schedule) --- +test: minimal_cluster_management +test: multi_test_helpers multi_test_helpers_superuser multi_create_fdw multi_test_catalog_views tablespace + +# --- post-citus-14 feature tests --- +test: auto_distribution_columns diff --git a/src/test/regress/sql/auto_distribution_columns.sql b/src/test/regress/sql/auto_distribution_columns.sql new file mode 100644 index 00000000000..9746c477740 --- /dev/null +++ b/src/test/regress/sql/auto_distribution_columns.sql @@ -0,0 +1,852 @@ +-- +-- AUTO_DISTRIBUTION_COLUMNS +-- +-- Tests for the citus.distribution_columns GUC that auto-distributes +-- tables by a priority list of column names on CREATE TABLE / CREATE TABLE AS SELECT. +-- + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 7800000; + +-- add a worker so we can actually distribute +SELECT 1 FROM citus_add_node('localhost', :worker_1_port); + +-- ===== Basic: single column in list ===== + +SET citus.distribution_columns TO 'tenant_id'; + +CREATE TABLE t_basic (id bigserial, tenant_id bigint, data text); + +-- verify it was auto-distributed by tenant_id +SELECT distribution_column FROM citus_tables WHERE table_name = 't_basic'::regclass; + +DROP TABLE t_basic; + +-- ===== Priority list: first match wins ===== + +SET citus.distribution_columns TO 'tenant_id, customer_id, department'; + +-- Table has tenant_id → should distribute by tenant_id +CREATE TABLE t_prio1 (id int, tenant_id int, customer_id int, department text); +SELECT distribution_column FROM citus_tables WHERE table_name = 't_prio1'::regclass; + +-- Table only has customer_id → should distribute by customer_id +CREATE TABLE t_prio2 (id int, customer_id int, department text); +SELECT distribution_column FROM citus_tables WHERE table_name = 't_prio2'::regclass; + +-- Table only has department → should distribute by department +CREATE TABLE t_prio3 (id int, department text); +SELECT distribution_column FROM citus_tables WHERE table_name = 't_prio3'::regclass; + +-- Table has none of the columns → should NOT be distributed +CREATE TABLE t_prio_none (id int, other_col text); +SELECT count(*) FROM citus_tables WHERE table_name = 't_prio_none'::regclass; + +DROP TABLE t_prio1, t_prio2, t_prio3, t_prio_none; + +-- ===== Priority list fallback in CTAS ===== +-- First token doesn't match, second does → should distribute by tenant_id +RESET citus.distribution_columns; +CREATE TABLE source_data (id int, tenant_id int, val text); +SELECT create_distributed_table('source_data', 'tenant_id'); +INSERT INTO source_data VALUES (1, 10, 'a'), (2, 20, 'b'), (3, 10, 'c'); + +SET citus.distribution_columns TO 'nonexistent, tenant_id'; +CREATE TABLE t_ctas_fallback AS SELECT * FROM source_data; +SELECT distribution_column FROM citus_tables WHERE table_name = 't_ctas_fallback'::regclass; +DROP TABLE t_ctas_fallback; +DROP TABLE source_data; + +-- ===== Whitespace handling in list ===== + +SET citus.distribution_columns TO ' tenant_id , customer_id '; + +CREATE TABLE t_ws (id int, customer_id int); +SELECT distribution_column FROM citus_tables WHERE table_name = 't_ws'::regclass; +DROP TABLE t_ws; + +-- ===== Empty / disabled ===== + +SET citus.distribution_columns TO ''; +CREATE TABLE t_disabled (id int, tenant_id int); +SELECT count(*) FROM citus_tables WHERE table_name = 't_disabled'::regclass; +DROP TABLE t_disabled; + +RESET citus.distribution_columns; +CREATE TABLE t_reset (id int, tenant_id int); +SELECT count(*) FROM citus_tables WHERE table_name = 't_reset'::regclass; +DROP TABLE t_reset; + +-- ===== Temp tables should NOT be auto-distributed ===== + +SET citus.distribution_columns TO 'tenant_id'; +CREATE TEMP TABLE t_temp (id int, tenant_id int); +-- should not appear in citus_tables (temp tables can't be distributed) +SELECT count(*) FROM citus_tables WHERE table_name = 't_temp'::regclass; +DROP TABLE t_temp; + +-- ===== Schema-based sharding takes precedence ===== + +SET citus.enable_schema_based_sharding TO ON; +SET citus.distribution_columns TO 'tenant_id'; + +CREATE SCHEMA auto_dist_tenant_schema; +CREATE TABLE auto_dist_tenant_schema.t_tenant (id int, tenant_id int); + +-- should be a single-shard (tenant) table, not hash-distributed by tenant_id +SELECT distribution_column FROM citus_tables WHERE table_name = 'auto_dist_tenant_schema.t_tenant'::regclass; + +BEGIN; + SET LOCAL client_min_messages TO WARNING; + DROP SCHEMA auto_dist_tenant_schema CASCADE; +COMMIT; + +RESET citus.enable_schema_based_sharding; + +-- ===== NOTICE message shows which column is chosen ===== + +SET citus.distribution_columns TO 'nonexistent, department'; +CREATE TABLE t_notice (id int, department text); +-- The NOTICE should say: auto-distributing table "t_notice" by column "department" +DROP TABLE t_notice; + +-- ===== Colocated tables ===== + +SET citus.distribution_columns TO 'tenant_id'; + +CREATE TABLE t_coloc1 (id int, tenant_id int); +CREATE TABLE t_coloc2 (id int, tenant_id int); + +-- both should be colocated (same distribution column type, same shard count) +SELECT c1.colocation_id = c2.colocation_id AS colocated +FROM citus_tables c1, citus_tables c2 +WHERE c1.table_name = 't_coloc1'::regclass + AND c2.table_name = 't_coloc2'::regclass; + +DROP TABLE t_coloc1, t_coloc2; + +-- ===== Reference tables: use SET LOCAL to disable GUC temporarily ===== + +SET citus.distribution_columns TO 'tenant_id'; + +-- A table with a matching column gets auto-distributed as hash +CREATE TABLE lookup_bad (id int, tenant_id int, name text); +-- This would fail because table is already distributed: +-- SELECT create_reference_table('lookup_bad'); +SELECT citus_table_type FROM citus_tables WHERE table_name = 'lookup_bad'::regclass; +DROP TABLE lookup_bad; + +-- The correct pattern: use SET LOCAL inside a transaction to temporarily +-- disable the GUC, then create the reference table normally +BEGIN; + SET LOCAL citus.distribution_columns TO ''; + CREATE TABLE lookup_ref (id int, tenant_id int, name text); + SELECT create_reference_table('lookup_ref'); +COMMIT; + +-- verify it's a reference table, not hash-distributed +SELECT citus_table_type FROM citus_tables WHERE table_name = 'lookup_ref'::regclass; + +-- also works for tables that have no matching column (no GUC conflict) +CREATE TABLE no_match_ref (id int, code text); +-- no matching column → table is local, so we can make it a reference table +SELECT create_reference_table('no_match_ref'); +SELECT citus_table_type FROM citus_tables WHERE table_name = 'no_match_ref'::regclass; + +DROP TABLE lookup_ref, no_match_ref; + +-- ===== Partitioned tables: parent auto-distributed, partitions follow ===== + +SET citus.distribution_columns TO 'tenant_id'; + +-- Range-partitioned table +CREATE TABLE orders ( + id int, + tenant_id int, + order_date date, + amount numeric +) PARTITION BY RANGE (order_date); + +-- parent should be auto-distributed by tenant_id +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'orders'::regclass; + +-- create partitions — they should inherit the distribution from the parent +CREATE TABLE orders_2024 PARTITION OF orders + FOR VALUES FROM ('2024-01-01') TO ('2025-01-01'); + +CREATE TABLE orders_2025 PARTITION OF orders + FOR VALUES FROM ('2025-01-01') TO ('2026-01-01'); + +-- partitions should also be distributed by tenant_id +SELECT p.table_name::text, distribution_column +FROM citus_tables p +WHERE p.table_name::text IN ('orders_2024', 'orders_2025') +ORDER BY p.table_name::text; + +-- insert data and verify it goes to the right partitions +INSERT INTO orders VALUES (1, 10, '2024-06-15', 100.00); +INSERT INTO orders VALUES (2, 20, '2025-03-01', 200.00); +SELECT count(*) FROM orders; +SELECT count(*) FROM orders_2024; +SELECT count(*) FROM orders_2025; + +DROP TABLE orders; + +-- List-partitioned table +CREATE TABLE events ( + id int, + tenant_id int, + event_type text, + payload text +) PARTITION BY LIST (event_type); + +SELECT distribution_column FROM citus_tables WHERE table_name = 'events'::regclass; + +CREATE TABLE events_click PARTITION OF events FOR VALUES IN ('click'); +CREATE TABLE events_view PARTITION OF events FOR VALUES IN ('view'); + +SELECT p.table_name::text, distribution_column +FROM citus_tables p +WHERE p.table_name::text IN ('events_click', 'events_view') +ORDER BY p.table_name::text; + +INSERT INTO events VALUES (1, 10, 'click', 'data1'), (2, 20, 'view', 'data2'); +SELECT count(*) FROM events; + +DROP TABLE events; + +-- Hash-partitioned table +CREATE TABLE metrics ( + id int, + tenant_id int, + metric_name text, + value float +) PARTITION BY HASH (id); + +SELECT distribution_column FROM citus_tables WHERE table_name = 'metrics'::regclass; + +CREATE TABLE metrics_p0 PARTITION OF metrics FOR VALUES WITH (MODULUS 2, REMAINDER 0); +CREATE TABLE metrics_p1 PARTITION OF metrics FOR VALUES WITH (MODULUS 2, REMAINDER 1); + +SELECT p.table_name::text, distribution_column +FROM citus_tables p +WHERE p.table_name::text IN ('metrics_p0', 'metrics_p1') +ORDER BY p.table_name::text; + +DROP TABLE metrics; + +-- ===== Partitioned table with no matching column stays local ===== + +CREATE TABLE local_partitioned ( + id int, + created_at date +) PARTITION BY RANGE (created_at); + +-- no tenant_id column → should NOT be distributed +SELECT count(*) FROM citus_tables WHERE table_name = 'local_partitioned'::regclass; + +CREATE TABLE local_partitioned_2024 PARTITION OF local_partitioned + FOR VALUES FROM ('2024-01-01') TO ('2025-01-01'); + +SELECT count(*) FROM citus_tables WHERE table_name = 'local_partitioned_2024'::regclass; + +DROP TABLE local_partitioned; + +-- ===== ATTACH PARTITION to an already auto-distributed table ===== + +CREATE TABLE sales ( + id int, + tenant_id int, + sale_date date +) PARTITION BY RANGE (sale_date); + +-- auto-distributed +SELECT distribution_column FROM citus_tables WHERE table_name = 'sales'::regclass; + +-- create a standalone table, then attach it as a partition +RESET citus.distribution_columns; +CREATE TABLE sales_2026 (id int, tenant_id int, sale_date date); +-- not distributed yet +SELECT count(*) FROM citus_tables WHERE table_name = 'sales_2026'::regclass; + +SET citus.distribution_columns TO 'tenant_id'; +ALTER TABLE sales ATTACH PARTITION sales_2026 FOR VALUES FROM ('2026-01-01') TO ('2027-01-01'); + +-- now it should be distributed as part of the parent +SELECT distribution_column FROM citus_tables WHERE table_name = 'sales_2026'::regclass; + +DROP TABLE sales; + +-- ===== Views should NOT be auto-distributed ===== + +CREATE TABLE base_for_view (id int, tenant_id int, val text); +-- base table gets distributed +SELECT distribution_column FROM citus_tables WHERE table_name = 'base_for_view'::regclass; + +CREATE VIEW v_base AS SELECT * FROM base_for_view; +-- views are not in citus_tables +SELECT count(*) FROM citus_tables WHERE table_name = 'v_base'::regclass; + +DROP VIEW v_base; +DROP TABLE base_for_view; + +-- ===== IF NOT EXISTS on an already-distributed table ===== + +CREATE TABLE t_ifne (id int, tenant_id int); +SELECT distribution_column FROM citus_tables WHERE table_name = 't_ifne'::regclass; + +-- should not error, just skip +CREATE TABLE IF NOT EXISTS t_ifne (id int, tenant_id int); +SELECT distribution_column FROM citus_tables WHERE table_name = 't_ifne'::regclass; + +DROP TABLE t_ifne; + +-- ===== Multiple tables in sequence (different distribution columns) ===== + +SET citus.distribution_columns TO 'org_id, tenant_id'; + +CREATE TABLE by_org (id int, org_id int); +CREATE TABLE by_tenant (id int, tenant_id int); +CREATE TABLE by_org_and_tenant (id int, org_id int, tenant_id int); + +-- org_id wins for table that has both +SELECT table_name::text, distribution_column +FROM citus_tables +WHERE table_name::text IN ('by_org', 'by_tenant', 'by_org_and_tenant') +ORDER BY table_name::text; + +DROP TABLE by_org, by_tenant, by_org_and_tenant; + +-- ===== Foreign tables should NOT be auto-distributed ===== + +SET citus.distribution_columns TO 'tenant_id'; + +CREATE FOREIGN TABLE t_foreign (id int, tenant_id int) + SERVER fake_fdw_server; +-- foreign tables cannot be hash-distributed, should be skipped +SELECT count(*) FROM citus_tables WHERE table_name = 't_foreign'::regclass; +DROP FOREIGN TABLE t_foreign; + +-- ===== Unlogged tables SHOULD be auto-distributed ===== + +CREATE UNLOGGED TABLE t_unlogged (id int, tenant_id int); +SELECT distribution_column FROM citus_tables WHERE table_name = 't_unlogged'::regclass; +DROP TABLE t_unlogged; + +-- ===== Materialized views should NOT be auto-distributed ===== + +-- create a source table first (reset GUC to avoid auto-distribution) +RESET citus.distribution_columns; +CREATE TABLE matview_source (id int, tenant_id int, val text); +SELECT create_distributed_table('matview_source', 'tenant_id'); +INSERT INTO matview_source VALUES (1, 10, 'a'), (2, 20, 'b'); + +SET citus.distribution_columns TO 'tenant_id'; + +CREATE MATERIALIZED VIEW mv_test AS SELECT * FROM matview_source; +-- matviews should NOT appear in citus_tables +SELECT count(*) FROM citus_tables WHERE table_name = 'mv_test'::regclass; + +DROP MATERIALIZED VIEW mv_test; +DROP TABLE matview_source; + +-- ===== SELECT INTO (another form of CTAS) ===== + +RESET citus.distribution_columns; +CREATE TABLE select_into_source (id int, tenant_id int, data text); +SELECT create_distributed_table('select_into_source', 'tenant_id'); +INSERT INTO select_into_source VALUES (1, 10, 'x'), (2, 20, 'y'); + +SET citus.distribution_columns TO 'tenant_id'; + +SELECT * INTO t_select_into FROM select_into_source; +-- should be auto-distributed by tenant_id +SELECT distribution_column FROM citus_tables WHERE table_name = 't_select_into'::regclass; +SELECT count(*) FROM t_select_into; + +DROP TABLE t_select_into; +DROP TABLE select_into_source; + +-- ===== CREATE TABLE ... LIKE ===== + +RESET citus.distribution_columns; +CREATE TABLE template_table (id int, tenant_id int, name text, created_at timestamptz DEFAULT now()); + +SET citus.distribution_columns TO 'tenant_id'; + +CREATE TABLE t_like (LIKE template_table INCLUDING ALL); +-- should be auto-distributed by tenant_id (inherited from LIKE) +SELECT distribution_column FROM citus_tables WHERE table_name = 't_like'::regclass; + +DROP TABLE t_like; +DROP TABLE template_table; + +-- ===== Table inheritance (INHERITS) should NOT be auto-distributed ===== + +RESET citus.distribution_columns; +CREATE TABLE parent_inherit (id int, tenant_id int); + +SET citus.distribution_columns TO 'tenant_id'; + +CREATE TABLE child_inherit (extra text) INHERITS (parent_inherit); +-- Citus doesn't support distributing tables with inheritance, should be skipped +SELECT count(*) FROM citus_tables WHERE table_name = 'child_inherit'::regclass; +-- parent with children should also not be auto-distributed +SELECT count(*) FROM citus_tables WHERE table_name = 'parent_inherit'::regclass; + +DROP TABLE child_inherit; +DROP TABLE parent_inherit; + +-- ===== Non-hashable distribution column type (jsonb) ===== + +-- In PG 18+, jsonb has a hash function, so it CAN be distributed. +-- This verifies auto-distribution works with non-trivial column types. +CREATE TABLE t_jsonb (id int, tenant_id jsonb); +SELECT distribution_column FROM citus_tables WHERE table_name = 't_jsonb'::regclass; +DROP TABLE t_jsonb; + +-- ===== Quoted / case-sensitive column names ===== + +-- GUC value uses standard PG identifier rules via SplitIdentifierString: +-- unquoted names are downcased, quoted names preserve case. +CREATE TABLE t_case1 (id int, tenant_id int); +-- 'tenant_id' in GUC (already lowercase) matches 'tenant_id' → distributed +SELECT distribution_column FROM citus_tables WHERE table_name = 't_case1'::regclass; +DROP TABLE t_case1; + +-- Quoted column name preserves case in the table +CREATE TABLE t_case2 (id int, "Tenant_Id" int); +-- GUC 'tenant_id' (lowercase) does NOT match "Tenant_Id" → should NOT be distributed +SELECT count(*) FROM citus_tables WHERE table_name = 't_case2'::regclass; +DROP TABLE t_case2; + +-- Unquoted mixed-case GUC value gets downcased → does NOT match quoted column +SET citus.distribution_columns TO 'Tenant_Id'; +CREATE TABLE t_case3a (id int, "Tenant_Id" int); +-- 'Tenant_Id' downcased to 'tenant_id', does NOT match "Tenant_Id" → NOT distributed +SELECT count(*) FROM citus_tables WHERE table_name = 't_case3a'::regclass; +DROP TABLE t_case3a; + +-- Quoted GUC value preserves case → matches quoted column +SET citus.distribution_columns TO '"Tenant_Id"'; +CREATE TABLE t_case3b (id int, "Tenant_Id" int); +-- '"Tenant_Id"' preserves case → matches column "Tenant_Id" → distributed +SELECT distribution_column FROM citus_tables WHERE table_name = 't_case3b'::regclass; +DROP TABLE t_case3b; + +SET citus.distribution_columns TO 'tenant_id'; + +-- ===== UNIQUE constraint without distribution column → error on CREATE ===== + +-- UNIQUE on non-distribution column causes auto-distribution to fail, +-- which rolls back the entire CREATE TABLE statement +CREATE TABLE t_unique_bad (id int UNIQUE, tenant_id int); +-- Should error: cannot create constraint ... that does not include partition column + +-- UNIQUE including the distribution column → should succeed +CREATE TABLE t_unique_good (id int, tenant_id int, UNIQUE(tenant_id, id)); +SELECT distribution_column FROM citus_tables WHERE table_name = 't_unique_good'::regclass; +DROP TABLE t_unique_good; + +-- ===== Transaction rollback: auto-distributed table should not persist ===== + +BEGIN; + CREATE TABLE t_rollback (id int, tenant_id int); + -- should exist inside the transaction + SELECT distribution_column FROM citus_tables WHERE table_name = 't_rollback'::regclass; +ROLLBACK; + +-- should not exist after rollback +SELECT count(*) FROM pg_class WHERE relname = 't_rollback'; + +-- ===== Transaction commit: auto-distributed table should persist ===== + +BEGIN; + CREATE TABLE t_commit (id int, tenant_id int); + SELECT distribution_column FROM citus_tables WHERE table_name = 't_commit'::regclass; +COMMIT; + +SELECT distribution_column FROM citus_tables WHERE table_name = 't_commit'::regclass; +DROP TABLE t_commit; + +-- ===== Multiple tables in one transaction ===== + +BEGIN; + CREATE TABLE t_txn1 (id int, tenant_id int); + CREATE TABLE t_txn2 (id int, tenant_id int); +COMMIT; + +SELECT table_name::text, distribution_column +FROM citus_tables +WHERE table_name::text IN ('t_txn1', 't_txn2') +ORDER BY table_name::text; + +DROP TABLE t_txn1, t_txn2; + +-- ===== Interaction with citus.use_citus_managed_tables ===== + +SET citus.use_citus_managed_tables TO ON; +SET citus.distribution_columns TO 'tenant_id'; + +-- table with matching column → auto-distributed (distribution_columns wins) +CREATE TABLE t_guc_interact1 (id int, tenant_id int); +SELECT citus_table_type, distribution_column +FROM citus_tables WHERE table_name = 't_guc_interact1'::regclass; + +-- table without matching column → becomes citus managed table +CREATE TABLE t_guc_interact2 (id int, other_col text); +SELECT citus_table_type +FROM citus_tables WHERE table_name = 't_guc_interact2'::regclass; + +DROP TABLE t_guc_interact1, t_guc_interact2; +RESET citus.use_citus_managed_tables; + +-- ===== Table in non-public schema ===== + +CREATE SCHEMA auto_dist_test_schema; + +CREATE TABLE auto_dist_test_schema.t_schema (id int, tenant_id int); +SELECT distribution_column FROM citus_tables +WHERE table_name = 'auto_dist_test_schema.t_schema'::regclass; + +DROP TABLE auto_dist_test_schema.t_schema; +DROP SCHEMA auto_dist_test_schema; + +-- ===== ALTER TABLE ADD COLUMN should NOT retroactively distribute ===== + +RESET citus.distribution_columns; +CREATE TABLE t_alter_add (id int, other_col text); +-- not distributed (no GUC, no matching column) +SELECT count(*) FROM citus_tables WHERE table_name = 't_alter_add'::regclass; + +SET citus.distribution_columns TO 'tenant_id'; +ALTER TABLE t_alter_add ADD COLUMN tenant_id int; +-- should still NOT be distributed (auto-distribution only on CREATE TABLE) +SELECT count(*) FROM citus_tables WHERE table_name = 't_alter_add'::regclass; + +DROP TABLE t_alter_add; + +-- ===== Empty tokens in GUC list (double comma) ===== +-- SplitIdentifierString rejects empty identifiers between commas +SET citus.distribution_columns TO 'nonexistent,,tenant_id'; + +-- GUC was not changed (SET failed), so previous value is still active +CREATE TABLE t_double_comma (id int, tenant_id int); +SELECT distribution_column FROM citus_tables WHERE table_name = 't_double_comma'::regclass; +DROP TABLE t_double_comma; + +-- ============================================================================= +-- CTAS WITH NESTED QUERIES +-- ============================================================================= +-- Test CREATE TABLE AS SELECT with various source query patterns to verify +-- auto-distribution works correctly and data is fully preserved. + +SET citus.distribution_columns TO 'tenant_id'; + +-- Setup: create source tables with known data + +-- Distributed table WITH tenant_id (matches GUC) +RESET citus.distribution_columns; +CREATE TABLE src_distributed (id int, tenant_id int, val text); +SELECT create_distributed_table('src_distributed', 'tenant_id'); +INSERT INTO src_distributed VALUES (1,10,'a'),(2,10,'b'),(3,20,'c'),(4,20,'d'),(5,30,'e'); + +-- Distributed table WITHOUT tenant_id (no matching GUC column) +CREATE TABLE src_no_match (id int, category_id int, info text); +SELECT create_distributed_table('src_no_match', 'category_id'); +INSERT INTO src_no_match VALUES (1,100,'x'),(2,100,'y'),(3,200,'z'),(4,300,'w'),(5,300,'v'); + +-- Reference table +CREATE TABLE src_ref (code int, label text); +SELECT create_reference_table('src_ref'); +INSERT INTO src_ref VALUES (10,'ten'),(20,'twenty'),(30,'thirty'); + +-- Local (plain) table +CREATE TABLE src_local (id int, tenant_id int, note text); +INSERT INTO src_local VALUES (1,10,'n1'),(2,20,'n2'),(3,30,'n3'); + +SET citus.distribution_columns TO 'tenant_id'; + +-- ----- CTAS from a nested join query ----- +CREATE TABLE ctas_join AS ( + SELECT d.id, d.tenant_id, d.val, r.label + FROM src_distributed d + JOIN src_ref r ON d.tenant_id = r.code +); + +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_join'::regclass; + +-- row count must match and be non-empty +SELECT count(*) AS join_count FROM ctas_join; +SELECT (SELECT count(*) FROM ctas_join) = + (SELECT count(*) FROM src_distributed d JOIN src_ref r ON d.tenant_id = r.code) + AS counts_match; + +DROP TABLE ctas_join; + +-- ----- CTAS from a distributed table that does NOT have the GUC column ----- +-- src_no_match has (id, category_id, info) — no tenant_id column +CREATE TABLE ctas_no_match AS ( + SELECT id, category_id, info FROM src_no_match +); + +-- no matching column → should NOT be auto-distributed +SELECT count(*) AS is_distributed FROM citus_tables WHERE table_name = 'ctas_no_match'::regclass; + +-- data must still be complete +SELECT count(*) AS no_match_count FROM ctas_no_match; +SELECT (SELECT count(*) FROM ctas_no_match) = + (SELECT count(*) FROM src_no_match) + AS counts_match; + +DROP TABLE ctas_no_match; + +-- ----- CTAS from a local table ----- +CREATE TABLE ctas_from_local AS ( + SELECT id, tenant_id, note FROM src_local +); + +-- has tenant_id → should be auto-distributed +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_from_local'::regclass; + +SELECT count(*) AS local_count FROM ctas_from_local; + +DROP TABLE ctas_from_local; + +-- ----- CTAS from a distributed table with the same distribution column ----- +CREATE TABLE ctas_same_dist AS ( + SELECT id, tenant_id, val FROM src_distributed +); + +-- auto-distributed by tenant_id (same as source) +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_same_dist'::regclass; + +SELECT count(*) AS same_dist_count FROM ctas_same_dist; + +DROP TABLE ctas_same_dist; + +-- ----- CTAS from a reference table ----- +CREATE TABLE ctas_from_ref AS ( + SELECT code AS tenant_id, label FROM src_ref +); + +-- has tenant_id (aliased from code) → should be auto-distributed +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_from_ref'::regclass; + +SELECT count(*) AS ref_count FROM ctas_from_ref; + +DROP TABLE ctas_from_ref; + +-- ----- CTAS from a multi-table nested subquery with aggregation ----- +CREATE TABLE ctas_nested_agg AS ( + SELECT sub.tenant_id, sub.total_val, r.label + FROM ( + SELECT tenant_id, count(*) AS total_val + FROM src_distributed + GROUP BY tenant_id + ) sub + JOIN src_ref r ON sub.tenant_id = r.code +); + +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_nested_agg'::regclass; + +SELECT count(*) AS nested_agg_count FROM ctas_nested_agg; + +DROP TABLE ctas_nested_agg; + +-- ============================================================================= +-- EXPLAIN CREATE TABLE AS SELECT — plan pushdown analysis +-- ============================================================================= +-- PostgreSQL supports EXPLAIN CREATE TABLE AS SELECT — it shows the plan +-- for the SELECT without actually creating the table. With Citus, this shows +-- whether the query is pushed down to workers or pulled to coordinator. +-- +-- Note: EXPLAIN doesn't trigger auto-distribution (no table is created), +-- so we first EXPLAIN the CTAS to see the plan, then execute the actual +-- CTAS and verify the result. + +SET citus.distribution_columns TO 'tenant_id'; + +-- Setup source tables +RESET citus.distribution_columns; +CREATE TABLE explain_src (id int, tenant_id int, val text); +SELECT create_distributed_table('explain_src', 'tenant_id'); +INSERT INTO explain_src VALUES (1,10,'a'),(2,20,'b'),(3,30,'c'); +SET citus.distribution_columns TO 'tenant_id'; + +-- Case 1: CTAS from a distributed table with the SAME distribution column +-- The source table is distributed by tenant_id, the new table will also +-- be auto-distributed by tenant_id → same colocation group + +EXPLAIN (COSTS FALSE) CREATE TABLE ctas_explain_same AS + SELECT * FROM explain_src; + +-- Now actually create it and verify +CREATE TABLE ctas_explain_same AS + SELECT * FROM explain_src; + +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_explain_same'::regclass; + +SELECT a.colocation_id = b.colocation_id AS colocated +FROM citus_tables a, citus_tables b +WHERE a.table_name = 'explain_src'::regclass + AND b.table_name = 'ctas_explain_same'::regclass; + +SELECT count(*) AS row_count FROM ctas_explain_same; + +DROP TABLE ctas_explain_same; + +-- Case 2: CTAS from a table with a DIFFERENT distribution column +-- Source is distributed by category_id with 3 shards. The new table gets +-- tenant_id from an alias, making it auto-distributed by tenant_id with +-- 4 shards. EXPLAIN shows the SELECT plan scanning the source (3 shards). +-- After creation, the new table is NOT co-located with the source +-- (different shard count and distribution column). + +RESET citus.distribution_columns; +SET citus.shard_count TO 3; -- different shard count to guarantee non-colocation +CREATE TABLE explain_src_diff (id int, category_id int, val text); +SELECT create_distributed_table('explain_src_diff', 'category_id'); +INSERT INTO explain_src_diff VALUES (1,100,'x'),(2,200,'y'),(3,300,'z'); +SET citus.shard_count TO 4; +SET citus.distribution_columns TO 'tenant_id'; + +-- EXPLAIN the CTAS: shows the SELECT plan scanning source with 3 shards +EXPLAIN (COSTS FALSE) CREATE TABLE ctas_explain_diff AS + SELECT id, category_id AS tenant_id, val FROM explain_src_diff; + +-- Now actually create it +CREATE TABLE ctas_explain_diff AS + SELECT id, category_id AS tenant_id, val FROM explain_src_diff; + +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_explain_diff'::regclass; + +-- NOT co-located with source table (different shard count and column) +SELECT a.colocation_id = b.colocation_id AS colocated +FROM citus_tables a, citus_tables b +WHERE a.table_name = 'explain_src_diff'::regclass + AND b.table_name = 'ctas_explain_diff'::regclass; + +SELECT count(*) AS row_count FROM ctas_explain_diff; + +DROP TABLE ctas_explain_diff; + +-- Case 3: CTAS from a JOIN between two co-located distributed tables +CREATE TABLE explain_items (id int, tenant_id int, qty int); +INSERT INTO explain_items VALUES (1,10,5),(2,20,10),(3,30,15); + +-- Both explain_src and explain_items are distributed by tenant_id +SELECT a.colocation_id = b.colocation_id AS colocated +FROM citus_tables a, citus_tables b +WHERE a.table_name = 'explain_src'::regclass + AND b.table_name = 'explain_items'::regclass; + +-- EXPLAIN the CTAS with a co-located join → should push down +EXPLAIN (COSTS FALSE) CREATE TABLE ctas_explain_join AS + SELECT s.id, s.tenant_id, s.val, i.qty + FROM explain_src s JOIN explain_items i ON s.tenant_id = i.tenant_id; + +CREATE TABLE ctas_explain_join AS + SELECT s.id, s.tenant_id, s.val, i.qty + FROM explain_src s JOIN explain_items i ON s.tenant_id = i.tenant_id; + +SELECT distribution_column, citus_table_type +FROM citus_tables WHERE table_name = 'ctas_explain_join'::regclass; + +SELECT count(*) AS row_count FROM ctas_explain_join; +SELECT (SELECT count(*) FROM ctas_explain_join) = + (SELECT count(*) + FROM explain_src s JOIN explain_items i ON s.tenant_id = i.tenant_id) + AS counts_match; + +DROP TABLE ctas_explain_join, explain_items; +DROP TABLE explain_src, explain_src_diff; + +-- Cleanup CTAS source tables +DROP TABLE src_distributed, src_no_match, src_ref, src_local; + +-- CTAS in tenant schema with distribution_columns set ===== +-- Tenant schema should take precedence over distribution_columns + +SET citus.enable_schema_based_sharding TO ON; +SET citus.distribution_columns TO 'tenant_id'; + +CREATE SCHEMA tenant_ctas_schema; +CREATE TABLE tenant_ctas_schema.ctas_src (id int, tenant_id int, val text); +INSERT INTO tenant_ctas_schema.ctas_src VALUES (1, 10, 'a'), (2, 20, 'b'), (3, 10, 'c'); + +-- CTAS in tenant schema: should be single-shard tenant table, NOT hash-distributed +CREATE TABLE tenant_ctas_schema.t_ctas AS SELECT * FROM tenant_ctas_schema.ctas_src; + +-- Verify it is a single-shard (tenant) table, not hash-distributed by tenant_id +SELECT citus_table_type FROM citus_tables WHERE table_name = 'tenant_ctas_schema.t_ctas'::regclass; +SELECT count(*) AS row_count FROM tenant_ctas_schema.t_ctas; + +BEGIN; + SET LOCAL client_min_messages TO WARNING; + DROP SCHEMA tenant_ctas_schema CASCADE; +COMMIT; + +RESET citus.enable_schema_based_sharding; + +-- CTAS with CTE, TABLE syntax, and parenthesized subquery ===== +-- Test various SQL forms that the AS keyword scanner must handle + +-- Temporarily reset GUC so we can manually distribute the source table +RESET citus.distribution_columns; +CREATE TABLE ctas_syntax_src (id int, tenant_id int, val text); +SELECT create_distributed_table('ctas_syntax_src', 'tenant_id'); +INSERT INTO ctas_syntax_src VALUES (1, 10, 'a'), (2, 20, 'b'), (3, 10, 'c'); + +-- Re-enable auto-distribution for the CTAS tests +SET citus.distribution_columns TO 'tenant_id'; + +-- CTAS with CTE (WITH ... AS ... SELECT) +CREATE TABLE ctas_cte AS WITH src AS (SELECT * FROM ctas_syntax_src WHERE tenant_id = 10) SELECT * FROM src; +SELECT distribution_column, citus_table_type FROM citus_tables WHERE table_name = 'ctas_cte'::regclass; +SELECT count(*) AS row_count FROM ctas_cte; + +-- CTAS with TABLE keyword +CREATE TABLE ctas_table_kw AS TABLE ctas_syntax_src; +SELECT distribution_column, citus_table_type FROM citus_tables WHERE table_name = 'ctas_table_kw'::regclass; +SELECT count(*) AS row_count FROM ctas_table_kw; + +-- CTAS with VALUES keyword +CREATE TABLE ctas_values (id, tenant_id, val) AS VALUES (1, 10, 'a'), (2, 20, 'b'); +SELECT distribution_column, citus_table_type FROM citus_tables WHERE table_name = 'ctas_values'::regclass; +SELECT count(*) AS row_count FROM ctas_values; + +DROP TABLE ctas_cte, ctas_table_kw, ctas_values; + +-- CTAS with explicit column name override ===== +-- IntoClause.colNames overrides the targetList column names + +CREATE TABLE ctas_colnames (a, tenant_id, c) AS SELECT id, tenant_id, val FROM ctas_syntax_src; +-- Should distribute by tenant_id (the overridden name matches) +SELECT distribution_column FROM citus_tables WHERE table_name = 'ctas_colnames'::regclass; +SELECT count(*) AS row_count FROM ctas_colnames; + +-- Override renames 'tenant_id' to 'other_name' — should NOT match when +-- 'other_name' is not in the distribution_columns GUC +CREATE TABLE ctas_rename (a, other_name, c) AS SELECT id, tenant_id, val FROM ctas_syntax_src; +SELECT count(*) FROM citus_tables WHERE table_name = 'ctas_rename'::regclass; + +DROP TABLE ctas_colnames; +DROP TABLE IF EXISTS ctas_rename; +DROP TABLE ctas_syntax_src; + +-- ===== Cleanup ===== +RESET citus.distribution_columns; +RESET citus.shard_count; +RESET citus.shard_replication_factor; + +SELECT citus_remove_node('localhost', :worker_1_port);