diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index b1ca3a18f52..4d0c3de783f 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2183,6 +2183,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, int minShardOffset = INT_MAX; int prevShardCount = 0; Bitmapset *taskRequiredForShardIndex = NULL; + Bitmapset *distributedTableIndex = NULL; /* error if shards are not co-partitioned */ ErrorIfUnsupportedShardDistribution(query); @@ -2198,9 +2199,8 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, RelationRestriction *relationRestriction = NULL; List *prunedShardList = NULL; - - forboth_ptr(prunedShardList, prunedRelationShardList, - relationRestriction, relationRestrictionContext->relationRestrictionList) + foreach_ptr(relationRestriction, + relationRestrictionContext->relationRestrictionList) { Oid relationId = relationRestriction->relationId; @@ -2220,6 +2220,54 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, return NIL; } prevShardCount = cacheEntry->shardIntervalArrayLength; + distributedTableIndex = bms_add_member(distributedTableIndex, + relationRestriction->index); + } + + bool noDistTables = bms_is_empty(distributedTableIndex); + bool hasRefOrSchemaShardedTable = false; + + forboth_ptr(prunedShardList, prunedRelationShardList, + relationRestriction, relationRestrictionContext->relationRestrictionList) + { + Oid relationId = relationRestriction->relationId; + + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + if (!HasDistributionKeyCacheEntry(cacheEntry)) + { + if (noDistTables && !hasRefOrSchemaShardedTable) + { + /* + * Before continuing, check if we're looking at a reference or schema- + * sharded table. If so, and it is the first such table we've seen, we + * add a task for shard index 0; all reference and schema sharded tables + * have shard index 0 so we can hard-code the value rather than looking at + * the shardIndex in pruned shard list, as is done further on down for + * distributed tables. + * + * Note that this only needs to be done once, regardless of how many + * reference or schema sharded tables there are; they all have the + * same shard index (0), and will require just one task. + * + * Also note that this is only done if there are no distributed tables + * involved; the relevant shard indexes will get added, and furthermore + * we don't want to incorrectly add shard index 0 if for example a left + * outer join between a reference table and a distributed table also has + * a restriction that prunes out shard index 0 of the distributed table. + */ + CitusTableType currentTableType = GetCitusTableType(cacheEntry); + hasRefOrSchemaShardedTable = currentTableType == REFERENCE_TABLE || + currentTableType == SINGLE_SHARD_DISTRIBUTED; + if (hasRefOrSchemaShardedTable) + { + taskRequiredForShardIndex = bms_add_member(taskRequiredForShardIndex, + 0); + minShardOffset = 0; + } + } + + continue; + } /* * For left joins we don't care about the shards pruned for the right hand side. @@ -2277,6 +2325,12 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, ++taskIdIndex; } + /* If we detected a reference or schema sharded table then there + * should be no distributed tables involved and exactly one task. + */ + Assert(!hasRefOrSchemaShardedTable || (noDistTables && + list_length(sqlTaskList) == 1)); + /* If it is a modify task with multiple tables */ if (taskType == MODIFY_TASK && list_length( relationRestrictionContext->relationRestrictionList) > 1) diff --git a/src/test/regress/expected/issue_8243.out b/src/test/regress/expected/issue_8243.out new file mode 100644 index 00000000000..5ad931dab66 --- /dev/null +++ b/src/test/regress/expected/issue_8243.out @@ -0,0 +1,315 @@ +-- Test the fix for https://github.com/citusdata/citus/issues/8243 +-- Fix empty list for worker subquery tasks when the query has no +-- ditribtued table but at least one reference table or schema +-- sharded table +SET citus.next_shard_id TO 580000; +SET citus.shard_count TO 32; +SET citus.shard_replication_factor to 1; +CREATE SCHEMA issue_8243; +SET search_path TO issue_8243; +-- DDL for the test; we need some schema sharded tables +CREATE SCHEMA schmshrd; +SELECT citus_schema_distribute('schmshrd'); +NOTICE: distributing the schema schmshrd + citus_schema_distribute +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE schmshrd.t1(id bigserial PRIMARY KEY, name text, val int); +CREATE TABLE schmshrd.t2(id bigserial PRIMARY KEY, name text, val int); +CREATE TABLE schmshrd.t3(id bigserial PRIMARY KEY, name text, val int); +-- and some reference tables +CREATE TABLE ref1(id bigserial PRIMARY KEY, name text); +CREATE TABLE ref2(id bigserial PRIMARY KEY, name text); +CREATE TABLE ref3(id bigserial PRIMARY KEY, name text); +SELECT create_reference_table('ref1'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('ref2'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('ref3'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- and distributed tables +CREATE TABLE dist1_8243(id bigserial PRIMARY KEY, name text, val int); +CREATE TABLE dist2_8243(id bigserial PRIMARY KEY, name text, val int); +SELECT create_distributed_table('dist1_8243', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist2_8243', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Test some queries that would previously end up with empty subquery tasks on workers. +-- Query characteristics: +-- - no distributed tables +-- - at least one reference table or schema sharded table +-- - some property that prevents a router plan (e.g. nextval() on a citus table in select targets) +-- - worker subquery task(s) needed; the Postgres plan has a subquery scan node +-- Test 1: schema shareded table only; exactly 1 task in the query plan. +EXPLAIN (verbose, costs off) +SELECT nextval('schmshrd.t1_id_seq'::regclass) AS id, name +FROM (select name from schmshrd.t2 group by name) sub ; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Output: nextval('schmshrd.t1_id_seq'::regclass), remote_scan.name + Task Count: 1 + Tasks Shown: All + -> Task + Query: SELECT worker_column_1 AS name FROM (SELECT sub.name AS worker_column_1 FROM (SELECT t2.name FROM schmshrd.t2_580001 t2 GROUP BY t2.name) sub) worker_subquery + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Output: t2.name + Group Key: t2.name + -> Seq Scan on schmshrd.t2_580001 t2 + Output: t2.id, t2.name, t2.val +(12 rows) + +-- Test 2: bunch of schema sharded tables; still expect 1 task +EXPLAIN (verbose, costs off) +SELECT nextval('schmshrd.t1_id_seq'::regclass) AS id, sub.name +FROM (select t1.name as name from schmshrd.t1 t1, schmshrd.t2 t2, schmshrd.t3 t3 where t1.id = t2.id and t3.name = t2.name group by t1.name) sub ; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Output: nextval('schmshrd.t1_id_seq'::regclass), remote_scan.name + Task Count: 1 + Tasks Shown: All + -> Task + Query: SELECT worker_column_1 AS name FROM (SELECT sub.name AS worker_column_1 FROM (SELECT t1.name FROM schmshrd.t1_580000 t1, schmshrd.t2_580001 t2, schmshrd.t3_580002 t3 WHERE ((t1.id OPERATOR(pg_catalog.=) t2.id) AND (t3.name OPERATOR(pg_catalog.=) t2.name)) GROUP BY t1.name) sub) worker_subquery + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Output: t1.name + Group Key: t1.name + -> Merge Join + Output: t1.name + Merge Cond: (t2.name = t3.name) + -> Sort + Output: t1.name, t2.name + Sort Key: t2.name + -> Hash Join + Output: t1.name, t2.name + Inner Unique: true + Hash Cond: (t1.id = t2.id) + -> Seq Scan on schmshrd.t1_580000 t1 + Output: t1.id, t1.name, t1.val + -> Hash + Output: t2.id, t2.name + -> Seq Scan on schmshrd.t2_580001 t2 + Output: t2.id, t2.name + -> Sort + Output: t3.name + Sort Key: t3.name + -> Seq Scan on schmshrd.t3_580002 t3 + Output: t3.name +(31 rows) + +-- Test 3: reference table only; exactly 1 task in the query plan. +EXPLAIN (verbose, costs off) +SELECT nextval('ref1_id_seq'::regclass) AS id, name +FROM (select name from ref2 group by name) sub ; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Output: nextval('ref1_id_seq'::regclass), remote_scan.name + Task Count: 1 + Tasks Shown: All + -> Task + Query: SELECT worker_column_1 AS name FROM (SELECT sub.name AS worker_column_1 FROM (SELECT ref2.name FROM issue_8243.ref2_580004 ref2 GROUP BY ref2.name) sub) worker_subquery + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Output: ref2.name + Group Key: ref2.name + -> Seq Scan on issue_8243.ref2_580004 ref2 + Output: ref2.id, ref2.name +(12 rows) + +-- Test 4: bunch of reference tables; still expect 1 task +EXPLAIN (verbose, costs off) +SELECT nextval('ref1_id_seq'::regclass) AS id, sub.name +FROM (select r1.name as name from ref1 r1, ref2 r2, ref3 r3 where r1.id = r2.id and r3.name = r2.name group by r1.name) sub ; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Output: nextval('ref1_id_seq'::regclass), remote_scan.name + Task Count: 1 + Tasks Shown: All + -> Task + Query: SELECT worker_column_1 AS name FROM (SELECT sub.name AS worker_column_1 FROM (SELECT r1.name FROM issue_8243.ref1_580003 r1, issue_8243.ref2_580004 r2, issue_8243.ref3_580005 r3 WHERE ((r1.id OPERATOR(pg_catalog.=) r2.id) AND (r3.name OPERATOR(pg_catalog.=) r2.name)) GROUP BY r1.name) sub) worker_subquery + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Output: r1.name + Group Key: r1.name + -> Merge Join + Output: r1.name + Merge Cond: (r2.name = r3.name) + -> Sort + Output: r1.name, r2.name + Sort Key: r2.name + -> Hash Join + Output: r1.name, r2.name + Inner Unique: true + Hash Cond: (r1.id = r2.id) + -> Seq Scan on issue_8243.ref1_580003 r1 + Output: r1.id, r1.name + -> Hash + Output: r2.id, r2.name + -> Seq Scan on issue_8243.ref2_580004 r2 + Output: r2.id, r2.name + -> Sort + Output: r3.name + Sort Key: r3.name + -> Seq Scan on issue_8243.ref3_580005 r3 + Output: r3.name +(31 rows) + +-- Test 5: mix of schema sharded and reference tables; exactly 1 task in the query plan. +EXPLAIN (verbose, costs off) +SELECT nextval('schmshrd.t1_id_seq'::regclass) AS id , sub.name +FROM (select t1.name as name from schmshrd.t1 t1, ref1 r1 where t1.id = r1.id and r1.id IN (select id from ref3) group by t1.name) sub ; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Output: nextval('schmshrd.t1_id_seq'::regclass), remote_scan.name + Task Count: 1 + Tasks Shown: All + -> Task + Query: SELECT worker_column_1 AS name FROM (SELECT sub.name AS worker_column_1 FROM (SELECT t1.name FROM schmshrd.t1_580000 t1, issue_8243.ref1_580003 r1 WHERE ((t1.id OPERATOR(pg_catalog.=) r1.id) AND (r1.id OPERATOR(pg_catalog.=) ANY (SELECT ref3.id FROM issue_8243.ref3_580005 ref3))) GROUP BY t1.name) sub) worker_subquery + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Output: t1.name + Group Key: t1.name + -> Hash Join + Output: t1.name + Inner Unique: true + Hash Cond: (t1.id = ref3.id) + -> Hash Join + Output: t1.name, t1.id, r1.id + Inner Unique: true + Hash Cond: (r1.id = t1.id) + -> Seq Scan on issue_8243.ref1_580003 r1 + Output: r1.id, r1.name + -> Hash + Output: t1.name, t1.id + -> Seq Scan on schmshrd.t1_580000 t1 + Output: t1.name, t1.id + -> Hash + Output: ref3.id + -> Seq Scan on issue_8243.ref3_580005 ref3 + Output: ref3.id +(28 rows) + +-- Test 6: sanity tests - the fix does not interfere with outer join between reference and distributed table +-- where a restriction prunes out shard index 0 of the distributed table. +-- Plan has 3 tasks +EXPLAIN (verbose, costs off) +SELECT x1.name, dist1_8243.val +FROM ref2 x1 left outer join dist1_8243 using (id) +WHERE dist1_8243.id IN (1, 10001, 999989); + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Output: remote_scan.name, remote_scan.val + Task Count: 3 + Tasks Shown: One of 3 + -> Task + Query: SELECT worker_column_1 AS name, worker_column_2 AS val FROM (SELECT x1.name AS worker_column_1, dist1_8243.val AS worker_column_2 FROM (issue_8243.ref2_580004 x1(id, name) LEFT JOIN issue_8243.dist1_8243_580007 dist1_8243(id, name, val) USING (id)) WHERE ((dist1_8243.id OPERATOR(pg_catalog.=) ANY (ARRAY[(1)::bigint, (10001)::bigint, (999989)::bigint])) AND ((btint4cmp('-2013265920'::integer, hashint8(x1.id)) OPERATOR(pg_catalog.<) 0) AND (btint4cmp(hashint8(x1.id), '-1879048193'::integer) OPERATOR(pg_catalog.<=) 0)))) worker_subquery + Node: host=localhost port=xxxxx dbname=regression + -> Nested Loop + Output: x1.name, dist1_8243.val + Inner Unique: true + -> Bitmap Heap Scan on issue_8243.dist1_8243_580007 dist1_8243 + Output: dist1_8243.id, dist1_8243.name, dist1_8243.val + Recheck Cond: (dist1_8243.id = ANY ('{1,10001,999989}'::bigint[])) + -> Bitmap Index Scan on dist1_8243_pkey_580007 + Index Cond: (dist1_8243.id = ANY ('{1,10001,999989}'::bigint[])) + -> Memoize + Output: x1.name, x1.id + Cache Key: dist1_8243.id + Cache Mode: logical + -> Index Scan using ref2_pkey_580004 on issue_8243.ref2_580004 x1 + Output: x1.name, x1.id + Index Cond: (x1.id = dist1_8243.id) + Filter: ((btint4cmp('-2013265920'::integer, hashint8(x1.id)) < 0) AND (btint4cmp(hashint8(x1.id), '-1879048193'::integer) <= 0)) +(23 rows) + +-- Plan has 2 tasks +EXPLAIN (verbose, costs off) +SELECT x1.name, dist2_8243.val +FROM ref3 x1 left outer join dist2_8243 using (id) +WHERE dist2_8243.id IN (10001, 999989); + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Output: remote_scan.name, remote_scan.val + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Query: SELECT worker_column_1 AS name, worker_column_2 AS val FROM (SELECT x1.name AS worker_column_1, dist2_8243.val AS worker_column_2 FROM (issue_8243.ref3_580005 x1(id, name) LEFT JOIN issue_8243.dist2_8243_580041 dist2_8243(id, name, val) USING (id)) WHERE ((dist2_8243.id OPERATOR(pg_catalog.=) ANY (ARRAY[(10001)::bigint, (999989)::bigint])) AND ((btint4cmp('-1744830464'::integer, hashint8(x1.id)) OPERATOR(pg_catalog.<) 0) AND (btint4cmp(hashint8(x1.id), '-1610612737'::integer) OPERATOR(pg_catalog.<=) 0)))) worker_subquery + Node: host=localhost port=xxxxx dbname=regression + -> Nested Loop + Output: x1.name, dist2_8243.val + Inner Unique: true + -> Bitmap Heap Scan on issue_8243.dist2_8243_580041 dist2_8243 + Output: dist2_8243.id, dist2_8243.name, dist2_8243.val + Recheck Cond: (dist2_8243.id = ANY ('{10001,999989}'::bigint[])) + -> Bitmap Index Scan on dist2_8243_pkey_580041 + Index Cond: (dist2_8243.id = ANY ('{10001,999989}'::bigint[])) + -> Memoize + Output: x1.name, x1.id + Cache Key: dist2_8243.id + Cache Mode: logical + -> Index Scan using ref3_pkey_580005 on issue_8243.ref3_580005 x1 + Output: x1.name, x1.id + Index Cond: (x1.id = dist2_8243.id) + Filter: ((btint4cmp('-1744830464'::integer, hashint8(x1.id)) < 0) AND (btint4cmp(hashint8(x1.id), '-1610612737'::integer) <= 0)) +(23 rows) + +-- Test 7: failing query from https://github.com/citusdata/citus/issues/8243 +SET search_path TO schmshrd; +INSERT INTO t2 (name) VALUES ('user1'), ('user2'), ('user3'), ('user1'), ('user2'), ('user1'); +INSERT INTO t1 (name) SELECT name FROM (SELECT name FROM t2 GROUP BY name) sub; +SELECT id, name FROM t1 ORDER BY id; + id | name +--------------------------------------------------------------------- + 1 | user3 + 2 | user2 + 3 | user1 +(3 rows) + +-- and for reference tables +SET search_path TO issue_8243; +INSERT INTO ref2 (name) VALUES ('user1'), ('user2'), ('user3'), ('user1'), ('user2'), ('user1'); +INSERT INTO ref1 (name) SELECT name FROM (SELECT name FROM ref2 GROUP BY name) sub; +SELECT id, name FROM ref1 ORDER BY id; + id | name +--------------------------------------------------------------------- + 1 | user3 + 2 | user2 + 3 | user1 +(3 rows) + +--- clean up: +SET client_min_messages TO WARNING; +DROP SCHEMA schmshrd CASCADE; +DROP SCHEMA issue_8243 CASCADE; +RESET citus.next_shard_id; +RESET citus.shard_count; +RESET citus.shard_replication_factor; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 4a5e8a11d5a..842aa11a653 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -103,7 +103,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement test: binary_protocol test: alter_table_set_access_method test: alter_distributed_table -test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 +test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 issue_8243 test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes diff --git a/src/test/regress/sql/issue_8243.sql b/src/test/regress/sql/issue_8243.sql new file mode 100644 index 00000000000..e5d43b0ac99 --- /dev/null +++ b/src/test/regress/sql/issue_8243.sql @@ -0,0 +1,104 @@ +-- Test the fix for https://github.com/citusdata/citus/issues/8243 + +-- Fix empty list for worker subquery tasks when the query has no +-- ditribtued table but at least one reference table or schema +-- sharded table + +SET citus.next_shard_id TO 580000; +SET citus.shard_count TO 32; +SET citus.shard_replication_factor to 1; + +CREATE SCHEMA issue_8243; +SET search_path TO issue_8243; + +-- DDL for the test; we need some schema sharded tables +CREATE SCHEMA schmshrd; +SELECT citus_schema_distribute('schmshrd'); +CREATE TABLE schmshrd.t1(id bigserial PRIMARY KEY, name text, val int); +CREATE TABLE schmshrd.t2(id bigserial PRIMARY KEY, name text, val int); +CREATE TABLE schmshrd.t3(id bigserial PRIMARY KEY, name text, val int); + +-- and some reference tables +CREATE TABLE ref1(id bigserial PRIMARY KEY, name text); +CREATE TABLE ref2(id bigserial PRIMARY KEY, name text); +CREATE TABLE ref3(id bigserial PRIMARY KEY, name text); + +SELECT create_reference_table('ref1'); +SELECT create_reference_table('ref2'); +SELECT create_reference_table('ref3'); + +-- and distributed tables +CREATE TABLE dist1_8243(id bigserial PRIMARY KEY, name text, val int); +CREATE TABLE dist2_8243(id bigserial PRIMARY KEY, name text, val int); + +SELECT create_distributed_table('dist1_8243', 'id'); +SELECT create_distributed_table('dist2_8243', 'id'); + +-- Test some queries that would previously end up with empty subquery tasks on workers. +-- Query characteristics: +-- - no distributed tables +-- - at least one reference table or schema sharded table +-- - some property that prevents a router plan (e.g. nextval() on a citus table in select targets) +-- - worker subquery task(s) needed; the Postgres plan has a subquery scan node + +-- Test 1: schema shareded table only; exactly 1 task in the query plan. +EXPLAIN (verbose, costs off) +SELECT nextval('schmshrd.t1_id_seq'::regclass) AS id, name +FROM (select name from schmshrd.t2 group by name) sub ; + +-- Test 2: bunch of schema sharded tables; still expect 1 task +EXPLAIN (verbose, costs off) +SELECT nextval('schmshrd.t1_id_seq'::regclass) AS id, sub.name +FROM (select t1.name as name from schmshrd.t1 t1, schmshrd.t2 t2, schmshrd.t3 t3 where t1.id = t2.id and t3.name = t2.name group by t1.name) sub ; + +-- Test 3: reference table only; exactly 1 task in the query plan. +EXPLAIN (verbose, costs off) +SELECT nextval('ref1_id_seq'::regclass) AS id, name +FROM (select name from ref2 group by name) sub ; + +-- Test 4: bunch of reference tables; still expect 1 task +EXPLAIN (verbose, costs off) +SELECT nextval('ref1_id_seq'::regclass) AS id, sub.name +FROM (select r1.name as name from ref1 r1, ref2 r2, ref3 r3 where r1.id = r2.id and r3.name = r2.name group by r1.name) sub ; + +-- Test 5: mix of schema sharded and reference tables; exactly 1 task in the query plan. +EXPLAIN (verbose, costs off) +SELECT nextval('schmshrd.t1_id_seq'::regclass) AS id , sub.name +FROM (select t1.name as name from schmshrd.t1 t1, ref1 r1 where t1.id = r1.id and r1.id IN (select id from ref3) group by t1.name) sub ; + +-- Test 6: sanity tests - the fix does not interfere with outer join between reference and distributed table +-- where a restriction prunes out shard index 0 of the distributed table. + +-- Plan has 3 tasks +EXPLAIN (verbose, costs off) +SELECT x1.name, dist1_8243.val +FROM ref2 x1 left outer join dist1_8243 using (id) +WHERE dist1_8243.id IN (1, 10001, 999989); + +-- Plan has 2 tasks +EXPLAIN (verbose, costs off) +SELECT x1.name, dist2_8243.val +FROM ref3 x1 left outer join dist2_8243 using (id) +WHERE dist2_8243.id IN (10001, 999989); + +-- Test 7: failing query from https://github.com/citusdata/citus/issues/8243 +SET search_path TO schmshrd; +INSERT INTO t2 (name) VALUES ('user1'), ('user2'), ('user3'), ('user1'), ('user2'), ('user1'); +INSERT INTO t1 (name) SELECT name FROM (SELECT name FROM t2 GROUP BY name) sub; +SELECT id, name FROM t1 ORDER BY id; + +-- and for reference tables +SET search_path TO issue_8243; +INSERT INTO ref2 (name) VALUES ('user1'), ('user2'), ('user3'), ('user1'), ('user2'), ('user1'); +INSERT INTO ref1 (name) SELECT name FROM (SELECT name FROM ref2 GROUP BY name) sub; +SELECT id, name FROM ref1 ORDER BY id; + +--- clean up: +SET client_min_messages TO WARNING; +DROP SCHEMA schmshrd CASCADE; +DROP SCHEMA issue_8243 CASCADE; + +RESET citus.next_shard_id; +RESET citus.shard_count; +RESET citus.shard_replication_factor; +