Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions src/backend/distributed/planner/multi_physical_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -2183,6 +2183,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
int minShardOffset = INT_MAX;
int prevShardCount = 0;
Bitmapset *taskRequiredForShardIndex = NULL;
Bitmapset *distributedTableIndex = NULL;

/* error if shards are not co-partitioned */
ErrorIfUnsupportedShardDistribution(query);
Expand All @@ -2198,9 +2199,8 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,

RelationRestriction *relationRestriction = NULL;
List *prunedShardList = NULL;

forboth_ptr(prunedShardList, prunedRelationShardList,
relationRestriction, relationRestrictionContext->relationRestrictionList)
foreach_ptr(relationRestriction,
relationRestrictionContext->relationRestrictionList)
{
Oid relationId = relationRestriction->relationId;

Expand All @@ -2220,6 +2220,54 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
return NIL;
}
prevShardCount = cacheEntry->shardIntervalArrayLength;
distributedTableIndex = bms_add_member(distributedTableIndex,
relationRestriction->index);
}

bool noDistTables = bms_is_empty(distributedTableIndex);
bool hasRefOrSchemaShardedTable = false;

forboth_ptr(prunedShardList, prunedRelationShardList,
relationRestriction, relationRestrictionContext->relationRestrictionList)
{
Oid relationId = relationRestriction->relationId;

CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
if (!HasDistributionKeyCacheEntry(cacheEntry))
{
if (noDistTables && !hasRefOrSchemaShardedTable)
{
/*
* Before continuing, check if we're looking at a reference or schema-
* sharded table. If so, and it is the first such table we've seen, we
* add a task for shard index 0; all reference and schema sharded tables
* have shard index 0 so we can hard-code the value rather than looking at
* the shardIndex in pruned shard list, as is done further on down for
* distributed tables.
*
* Note that this only needs to be done once, regardless of how many
* reference or schema sharded tables there are; they all have the
* same shard index (0), and will require just one task.
*
* Also note that this is only done if there are no distributed tables
* involved; the relevant shard indexes will get added, and furthermore
* we don't want to incorrectly add shard index 0 if for example a left
* outer join between a reference table and a distributed table also has
* a restriction that prunes out shard index 0 of the distributed table.
*/
CitusTableType currentTableType = GetCitusTableType(cacheEntry);
hasRefOrSchemaShardedTable = currentTableType == REFERENCE_TABLE ||
currentTableType == SINGLE_SHARD_DISTRIBUTED;
if (hasRefOrSchemaShardedTable)
{
taskRequiredForShardIndex = bms_add_member(taskRequiredForShardIndex,
0);
minShardOffset = 0;
}
}

continue;
}

/*
* For left joins we don't care about the shards pruned for the right hand side.
Expand Down Expand Up @@ -2277,6 +2325,12 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
++taskIdIndex;
}

/* If we detected a reference or schema sharded table then there
* should be no distributed tables involved and exactly one task.
*/
Assert(!hasRefOrSchemaShardedTable || (noDistTables &&
list_length(sqlTaskList) == 1));

/* If it is a modify task with multiple tables */
if (taskType == MODIFY_TASK && list_length(
relationRestrictionContext->relationRestrictionList) > 1)
Expand Down
Loading
Loading