From ccb7ce5f9be87556f1c7e179c3a71c96ee2bfec4 Mon Sep 17 00:00:00 2001 From: Rajdeep Singh Date: Sun, 12 Apr 2026 02:16:18 +0530 Subject: [PATCH 1/4] Fixes #24559: Add Greenplum 7 partition support Greenplum 7 adopted PostgreSQL 10+ declarative partitioning, replacing the GP-specific pg_partition and pg_partition_rule catalog tables with the standard pg_partitioned_table and relispartition column in pg_class. The connector now detects the Greenplum major version at runtime by parsing SELECT version() output with a regex, and selects the appropriate queries: - GP6: pg_partition_rule JOIN for table names, pg_partition with parkind/paratts for partition details (existing behavior) - GP7+: relispartition=false for table names, pg_partitioned_table with partstrat/partattrs for partition details (PostgreSQL-compatible) The version is cached per source instance to avoid repeated queries. --- .../source/database/greenplum/metadata.py | 127 +++++++++--- .../source/database/greenplum/queries.py | 55 ++++- .../unit/topology/database/test_greenplum.py | 193 +++++++++++++++++- 3 files changed, 344 insertions(+), 31 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py index 81b98ddab5d4..188102197358 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py @@ -11,6 +11,8 @@ """ Greenplum source module """ + +import re import traceback from collections import namedtuple from typing import Iterable, Optional, Tuple @@ -18,6 +20,7 @@ from sqlalchemy import sql, text from sqlalchemy.dialects.postgresql.base import PGDialect from sqlalchemy.engine import Inspector +from sqlalchemy.exc import SQLAlchemyError from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( @@ -47,8 +50,11 @@ ) from metadata.ingestion.source.database.greenplum.queries import ( GREENPLUM_GET_DB_NAMES, - GREENPLUM_GET_TABLE_NAMES, - GREENPLUM_PARTITION_DETAILS, + GREENPLUM_GET_TABLE_NAMES_V6, + GREENPLUM_GET_TABLE_NAMES_V7, + GREENPLUM_GET_VERSION, + GREENPLUM_PARTITION_DETAILS_V6, + GREENPLUM_PARTITION_DETAILS_V7, ) from metadata.ingestion.source.database.greenplum.utils import ( get_column_info, @@ -91,6 +97,10 @@ class GreenplumSource(CommonDbSourceService, MultiDBSource): Database metadata from Greenplum Source """ + def __init__(self, config, metadata): + super().__init__(config, metadata) + self._greenplum_version: Optional[int] = None + @classmethod def create( cls, @@ -106,6 +116,37 @@ def create( ) return cls(config, metadata) + def _get_greenplum_major_version(self) -> int: + if self._greenplum_version is None: + try: + with self.engine.connect() as conn: + result = conn.execute(text(GREENPLUM_GET_VERSION)) + version_string = result.scalar() or "" + match = re.search(r"Greenplum Database (\d+)", version_string) + if match: + self._greenplum_version = int(match.group(1)) + logger.info( + "Detected Greenplum major version %d", self._greenplum_version + ) + else: + logger.warning( + "Could not parse Greenplum major version from" + " SELECT version() output, defaulting to 7" + ) + logger.debug("Full version() output: %s", version_string) + self._greenplum_version = 7 + except SQLAlchemyError as exc: + logger.debug(traceback.format_exc()) + logger.warning( + "Could not determine Greenplum version (%s), defaulting to 7", + exc, + ) + self._greenplum_version = 7 + return self._greenplum_version + + def _is_v7_or_later(self) -> bool: + return self._get_greenplum_major_version() >= 7 + def query_table_names_and_types( self, schema_name: str ) -> Iterable[TableNameAndType]: @@ -113,8 +154,13 @@ def query_table_names_and_types( Overwrite the inspector implementation to handle partitioned and foreign types """ + table_names_query = ( + GREENPLUM_GET_TABLE_NAMES_V7 + if self._is_v7_or_later() + else GREENPLUM_GET_TABLE_NAMES_V6 + ) result = self.connection.execute( - sql.text(GREENPLUM_GET_TABLE_NAMES), + sql.text(table_names_query), {"schema": schema_name}, ) @@ -149,9 +195,11 @@ def get_database_names(self) -> Iterable[str]: if filter_by_database( self.source_config.databaseFilterPattern, - database_fqn - if self.source_config.useFqnForFiltering - else new_database, + ( + database_fqn + if self.source_config.useFqnForFiltering + else new_database + ), ): self.status.filter(database_fqn, "Database Filtered Out") continue @@ -167,30 +215,55 @@ def get_database_names(self) -> Iterable[str]: def get_table_partition_details( self, table_name: str, schema_name: str, inspector: Inspector + ) -> Tuple[bool, Optional[TablePartition]]: + if self._is_v7_or_later(): + return self._get_table_partition_details_v7(table_name, schema_name) + return self._get_table_partition_details_v6(table_name, schema_name) + + def _get_table_partition_details_v6( + self, table_name: str, schema_name: str ) -> Tuple[bool, Optional[TablePartition]]: with self.engine.connect() as conn: result = conn.execute( - text( - GREENPLUM_PARTITION_DETAILS.format( - table_name=table_name, schema_name=schema_name - ) - ) + text(GREENPLUM_PARTITION_DETAILS_V6), + {"table_name": table_name, "schema_name": schema_name}, ).all() - if result: - partition_details = TablePartition( - columns=[ - PartitionColumnDetails( - columnName=row.column_name, - intervalType=INTERVAL_TYPE_MAP.get( - result[0].partition_strategy, - PartitionIntervalTypes.COLUMN_VALUE, - ), - interval=None, - ) - for row in result - if row.column_name - ] + return self._build_partition_result(result) + + def _get_table_partition_details_v7( + self, table_name: str, schema_name: str + ) -> Tuple[bool, Optional[TablePartition]]: + with self.engine.connect() as conn: + result = conn.execute( + text(GREENPLUM_PARTITION_DETAILS_V7), + {"table_name": table_name, "schema_name": schema_name}, + ).all() + + return self._build_partition_result(result) + + @staticmethod + def _build_partition_result( + result, + ) -> Tuple[bool, Optional[TablePartition]]: + # TODO: support expression-based partition keys. When a partition key is + # an expression (not a plain column), pg_partitioned_table.partattrs / + # pg_partition.paratts contains 0, so the join to information_schema.columns + # yields NULL column_name and the row is filtered out below. + if not result: + return False, None + columns = [ + PartitionColumnDetails( + columnName=row.column_name, + intervalType=INTERVAL_TYPE_MAP.get( + row.partition_strategy, + PartitionIntervalTypes.COLUMN_VALUE, + ), + interval=None, ) - return True, partition_details - return False, None + for row in result + if row.column_name + ] + if not columns: + return False, None + return True, TablePartition(columns=columns) diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py b/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py index 3a66c86f3e61..610464c40586 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py @@ -16,7 +16,9 @@ # https://www.postgresql.org/docs/current/catalog-pg-class.html # r = ordinary table, v = view, m = materialized view, c = composite type, f = foreign table, p = partitioned table, -GREENPLUM_GET_TABLE_NAMES = """ + +# Greenplum 6: uses pg_partition_rule to filter out child partitions +GREENPLUM_GET_TABLE_NAMES_V6 = """ select c.relname, c.relkind from pg_catalog.pg_class c left outer join pg_catalog.pg_partition_rule pr on c.oid = pr.parchildrelid @@ -26,7 +28,15 @@ and n.nspname = :schema """ -GREENPLUM_PARTITION_DETAILS = textwrap.dedent( +# Greenplum 7+: uses PostgreSQL-style relispartition to filter out child partitions +GREENPLUM_GET_TABLE_NAMES_V7 = """ + SELECT c.relname, c.relkind FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = :schema AND c.relkind in ('r', 'p', 'f') AND c.relispartition = false +""" + +# Greenplum 6: uses pg_partition catalog with parkind/paratts columns +GREENPLUM_PARTITION_DETAILS_V6 = textwrap.dedent( """ select ns.nspname as schema, @@ -56,7 +66,42 @@ col.table_schema = ns.nspname and col.table_name = par.relname and ordinal_position = pt.column_index - where par.relname='{table_name}' and ns.nspname='{schema_name}' + where par.relname=:table_name and ns.nspname=:schema_name + """ +) + +# Greenplum 7+: uses PostgreSQL-standard pg_partitioned_table with partstrat/partattrs +GREENPLUM_PARTITION_DETAILS_V7 = textwrap.dedent( + """ + select + ns.nspname as schema, + par.relname as table_name, + partition_strategy, + col.column_name + from + (select + partrelid, + partnatts, + case partstrat + when 'l' then 'list' + when 'h' then 'hash' + when 'r' then 'range' end as partition_strategy, + unnest(partattrs) column_index + from + pg_partitioned_table) pt + join + pg_class par + on + par.oid = pt.partrelid + join + pg_namespace ns on ns.oid = par.relnamespace + left join + information_schema.columns col + on + col.table_schema = ns.nspname + and col.table_name = par.relname + and ordinal_position = pt.column_index + where par.relname=:table_name and ns.nspname=:schema_name """ ) @@ -139,3 +184,7 @@ GREENPLUM_GET_SERVER_VERSION = """ show server_version """ + +GREENPLUM_GET_VERSION = """ +SELECT version() +""" diff --git a/ingestion/tests/unit/topology/database/test_greenplum.py b/ingestion/tests/unit/topology/database/test_greenplum.py index e98813a0e5d1..809ed67b1858 100644 --- a/ingestion/tests/unit/topology/database/test_greenplum.py +++ b/ingestion/tests/unit/topology/database/test_greenplum.py @@ -13,9 +13,17 @@ Test Greenplum using the topology """ +from collections import namedtuple from unittest import TestCase -from unittest.mock import patch +from unittest.mock import MagicMock, patch +from sqlalchemy.exc import OperationalError + +from metadata.generated.schema.entity.data.table import ( + PartitionColumnDetails, + PartitionIntervalTypes, + TablePartition, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) @@ -55,6 +63,10 @@ }, } +PartitionRow = namedtuple( + "PartitionRow", ["schema", "table_name", "partition_strategy", "column_name"] +) + class greenplumUnitTest(TestCase): @patch( @@ -76,3 +88,182 @@ def __init__(self, methodName, test_connection) -> None: def test_close_connection(self, engine, connection): connection.return_value = True self.greenplum_source.close() + + def test_version_detection_v6(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + mock_conn.execute.return_value.scalar.return_value = ( + "PostgreSQL 9.4.26 (Greenplum Database 6.25.3 build dev) on" + " x86_64-pc-linux-gnu compiled by gcc 6.4.0" + ) + self.greenplum_source.engine = mock_engine + + self.greenplum_source._greenplum_version = None + assert self.greenplum_source._get_greenplum_major_version() == 6 + assert not self.greenplum_source._is_v7_or_later() + + def test_version_detection_v7(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + mock_conn.execute.return_value.scalar.return_value = ( + "PostgreSQL 12.12 (Greenplum Database 7.0.0 build dev) on" + " x86_64-pc-linux-gnu compiled by gcc 12.3.1" + ) + self.greenplum_source.engine = mock_engine + + self.greenplum_source._greenplum_version = None + assert self.greenplum_source._get_greenplum_major_version() == 7 + assert self.greenplum_source._is_v7_or_later() + + def test_version_detection_caches_result(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + mock_conn.execute.return_value.scalar.return_value = ( + "PostgreSQL 12.12 (Greenplum Database 7.2.0 build dev) on" + " x86_64-pc-linux-gnu" + ) + self.greenplum_source.engine = mock_engine + + self.greenplum_source._greenplum_version = None + self.greenplum_source._get_greenplum_major_version() + self.greenplum_source._get_greenplum_major_version() + mock_engine.connect.assert_called_once() + + def test_version_detection_defaults_to_7_on_error(self): + mock_engine = MagicMock() + mock_engine.connect.side_effect = OperationalError( + "SELECT version()", {}, Exception("connection error") + ) + self.greenplum_source.engine = mock_engine + + self.greenplum_source._greenplum_version = None + assert self.greenplum_source._get_greenplum_major_version() == 7 + + def test_version_detection_defaults_to_7_on_unparseable(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + mock_conn.execute.return_value.scalar.return_value = ( + "PostgreSQL 14.0 on x86_64-pc-linux-gnu" + ) + self.greenplum_source.engine = mock_engine + + self.greenplum_source._greenplum_version = None + assert self.greenplum_source._get_greenplum_major_version() == 7 + + def test_partition_details_v7(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + + mock_conn.execute.return_value.all.return_value = [ + PartitionRow( + schema="public", + table_name="sales", + partition_strategy="range", + column_name="sale_date", + ) + ] + + self.greenplum_source.engine = mock_engine + self.greenplum_source._greenplum_version = 7 + + is_partitioned, partition = self.greenplum_source.get_table_partition_details( + "sales", "public", MagicMock() + ) + + assert is_partitioned is True + assert partition == TablePartition( + columns=[ + PartitionColumnDetails( + columnName="sale_date", + intervalType=PartitionIntervalTypes.TIME_UNIT, + interval=None, + ) + ] + ) + + def test_partition_details_v6(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + + mock_conn.execute.return_value.all.return_value = [ + PartitionRow( + schema="public", + table_name="sales", + partition_strategy="list", + column_name="region", + ) + ] + + self.greenplum_source.engine = mock_engine + self.greenplum_source._greenplum_version = 6 + + is_partitioned, partition = self.greenplum_source.get_table_partition_details( + "sales", "public", MagicMock() + ) + + assert is_partitioned is True + assert partition == TablePartition( + columns=[ + PartitionColumnDetails( + columnName="region", + intervalType=PartitionIntervalTypes.COLUMN_VALUE, + interval=None, + ) + ] + ) + + def test_partition_details_no_results(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + mock_conn.execute.return_value.all.return_value = [] + + self.greenplum_source.engine = mock_engine + self.greenplum_source._greenplum_version = 7 + + is_partitioned, partition = self.greenplum_source.get_table_partition_details( + "not_partitioned", "public", MagicMock() + ) + + assert is_partitioned is False + assert partition is None + + def test_partition_details_expression_key_returns_false(self): + """Expression-based partition keys yield NULL column_name rows; the + table should be reported as non-partitioned rather than partitioned + with an empty columns list.""" + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + mock_conn.execute.return_value.all.return_value = [ + PartitionRow( + schema="public", + table_name="sales", + partition_strategy="range", + column_name=None, + ) + ] + + self.greenplum_source.engine = mock_engine + self.greenplum_source._greenplum_version = 7 + + is_partitioned, partition = self.greenplum_source.get_table_partition_details( + "sales", "public", MagicMock() + ) + + assert is_partitioned is False + assert partition is None From f5942f581335ce5b9d959f9c620a0ade5d7b25c2 Mon Sep 17 00:00:00 2001 From: Rajdeep Singh Date: Mon, 27 Apr 2026 13:15:25 +0530 Subject: [PATCH 2/4] fix(greenplum): return is_partitioned=True for expression-based partition keys When _build_partition_result receives rows from the partition catalog but all column_name values are NULL (expression-based partition keys), the table IS partitioned - we just cannot surface the column details yet. Previously returned (False, None) which caused expression-key partitioned tables to be ingested as TableType.Regular. Now returns (True, None) so they are correctly ingested as TableType.Partitioned. Updated test name and assertion to reflect the corrected behavior. Addresses Copilot review comment. --- .../ingestion/source/database/greenplum/metadata.py | 13 ++++++++----- .../tests/unit/topology/database/test_greenplum.py | 9 +++++---- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py index 188102197358..0b7dabf42467 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py @@ -246,10 +246,12 @@ def _get_table_partition_details_v7( def _build_partition_result( result, ) -> Tuple[bool, Optional[TablePartition]]: - # TODO: support expression-based partition keys. When a partition key is - # an expression (not a plain column), pg_partitioned_table.partattrs / - # pg_partition.paratts contains 0, so the join to information_schema.columns - # yields NULL column_name and the row is filtered out below. + # When a partition key is an expression (not a plain column), + # pg_partitioned_table.partattrs / pg_partition.paratts contains 0, so the + # join to information_schema.columns yields NULL column_name and the row is + # filtered out below. The table is still partitioned; we just cannot surface + # the partition-column details yet. Return (True, None) so it is ingested as + # TableType.Partitioned rather than TableType.Regular. if not result: return False, None columns = [ @@ -265,5 +267,6 @@ def _build_partition_result( if row.column_name ] if not columns: - return False, None + # Partition exists but all keys are expression-based (column_name is NULL). + return True, None return True, TablePartition(columns=columns) diff --git a/ingestion/tests/unit/topology/database/test_greenplum.py b/ingestion/tests/unit/topology/database/test_greenplum.py index 809ed67b1858..041e57053f08 100644 --- a/ingestion/tests/unit/topology/database/test_greenplum.py +++ b/ingestion/tests/unit/topology/database/test_greenplum.py @@ -241,10 +241,11 @@ def test_partition_details_no_results(self): assert is_partitioned is False assert partition is None - def test_partition_details_expression_key_returns_false(self): + def test_partition_details_expression_key_returns_true_no_columns(self): """Expression-based partition keys yield NULL column_name rows; the - table should be reported as non-partitioned rather than partitioned - with an empty columns list.""" + table should still be reported as partitioned (is_partitioned=True) + because a row exists in the partitioning catalog, even though we + cannot resolve the column names yet. partition details are None.""" mock_engine = MagicMock() mock_conn = MagicMock() mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) @@ -265,5 +266,5 @@ def test_partition_details_expression_key_returns_false(self): "sales", "public", MagicMock() ) - assert is_partitioned is False + assert is_partitioned is True assert partition is None From 0c42b13b077f669f574a4a9ced52a3c06a09856a Mon Sep 17 00:00:00 2001 From: Rajdeep Singh Date: Mon, 27 Apr 2026 13:44:33 +0530 Subject: [PATCH 3/4] fix(greenplum): schema-qualify all pg_catalog references in partition queries Qualify pg_class, pg_namespace, and pg_partitioned_table with pg_catalog. in both GREENPLUM_PARTITION_DETAILS_V6 and GREENPLUM_PARTITION_DETAILS_V7 to avoid search_path dependency and prevent potential name shadowing by user-defined objects with the same names. Addresses Copilot review comment. --- .../ingestion/source/database/greenplum/queries.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py b/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py index 610464c40586..808184f799fc 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py @@ -55,7 +55,7 @@ from pg_catalog.pg_partition) pt join - pg_class par + pg_catalog.pg_class par on par.oid = pt.parrelid left join @@ -88,13 +88,13 @@ when 'r' then 'range' end as partition_strategy, unnest(partattrs) column_index from - pg_partitioned_table) pt + pg_catalog.pg_partitioned_table) pt join - pg_class par + pg_catalog.pg_class par on par.oid = pt.partrelid join - pg_namespace ns on ns.oid = par.relnamespace + pg_catalog.pg_namespace ns on ns.oid = par.relnamespace left join information_schema.columns col on From 5d84a09ea3036ff7fa940b6bb566ff832b424d08 Mon Sep 17 00:00:00 2001 From: Rajdeep Singh Date: Mon, 27 Apr 2026 21:11:18 +0530 Subject: [PATCH 4/4] fix(greenplum): default to v6 on version detection failure; add table-listing query branch tests Address reviewer feedback on PR #27288: - Default to Greenplum 6 (conservative) when version detection fails or output is unparseable, since GP7 queries reference catalogs/columns that do not exist on GP6 (was defaulting to 7, which could regress GP6 clusters). - Add unit tests verifying query_table_names_and_types selects GREENPLUM_GET_TABLE_NAMES_V6 vs GREENPLUM_GET_TABLE_NAMES_V7 based on detected major version, covering the table discovery path that originally failed in #24559. - Apply ruff format to satisfy py-checkstyle CI. --- .../source/database/greenplum/metadata.py | 24 ++---- .../unit/topology/database/test_greenplum.py | 84 ++++++++++++------- 2 files changed, 64 insertions(+), 44 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py index 8c21d16f5faa..57a4b34dd538 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py @@ -123,40 +123,32 @@ def _get_greenplum_major_version(self) -> int: match = re.search(r"Greenplum Database (\d+)", version_string) if match: self._greenplum_version = int(match.group(1)) - logger.info( - "Detected Greenplum major version %d", self._greenplum_version - ) + logger.info("Detected Greenplum major version %d", self._greenplum_version) else: logger.warning( - "Could not parse Greenplum major version from" - " SELECT version() output, defaulting to 7" + "Could not parse Greenplum major version from SELECT version() output, " + "defaulting conservatively to 6" ) logger.debug("Full version() output: %s", version_string) - self._greenplum_version = 7 + self._greenplum_version = 6 except SQLAlchemyError as exc: logger.debug(traceback.format_exc()) logger.warning( - "Could not determine Greenplum version (%s), defaulting to 7", + "Could not determine Greenplum version (%s), defaulting conservatively to 6", exc, ) - self._greenplum_version = 7 + self._greenplum_version = 6 return self._greenplum_version def _is_v7_or_later(self) -> bool: return self._get_greenplum_major_version() >= 7 - def query_table_names_and_types( - self, schema_name: str - ) -> Iterable[TableNameAndType]: + def query_table_names_and_types(self, schema_name: str) -> Iterable[TableNameAndType]: """ Overwrite the inspector implementation to handle partitioned and foreign types """ - table_names_query = ( - GREENPLUM_GET_TABLE_NAMES_V7 - if self._is_v7_or_later() - else GREENPLUM_GET_TABLE_NAMES_V6 - ) + table_names_query = GREENPLUM_GET_TABLE_NAMES_V7 if self._is_v7_or_later() else GREENPLUM_GET_TABLE_NAMES_V6 result = self.connection.execute( sql.text(table_names_query), {"schema": schema_name}, diff --git a/ingestion/tests/unit/topology/database/test_greenplum.py b/ingestion/tests/unit/topology/database/test_greenplum.py index 5a2dedd36cf8..47f203b76f42 100644 --- a/ingestion/tests/unit/topology/database/test_greenplum.py +++ b/ingestion/tests/unit/topology/database/test_greenplum.py @@ -63,9 +63,7 @@ }, } -PartitionRow = namedtuple( - "PartitionRow", ["schema", "table_name", "partition_strategy", "column_name"] -) +PartitionRow = namedtuple("PartitionRow", ["schema", "table_name", "partition_strategy", "column_name"]) class greenplumUnitTest(TestCase): @@ -91,8 +89,7 @@ def test_version_detection_v6(self): mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) mock_conn.execute.return_value.scalar.return_value = ( - "PostgreSQL 9.4.26 (Greenplum Database 6.25.3 build dev) on" - " x86_64-pc-linux-gnu compiled by gcc 6.4.0" + "PostgreSQL 9.4.26 (Greenplum Database 6.25.3 build dev) on x86_64-pc-linux-gnu compiled by gcc 6.4.0" ) self.greenplum_source.engine = mock_engine @@ -106,8 +103,7 @@ def test_version_detection_v7(self): mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) mock_conn.execute.return_value.scalar.return_value = ( - "PostgreSQL 12.12 (Greenplum Database 7.0.0 build dev) on" - " x86_64-pc-linux-gnu compiled by gcc 12.3.1" + "PostgreSQL 12.12 (Greenplum Database 7.0.0 build dev) on x86_64-pc-linux-gnu compiled by gcc 12.3.1" ) self.greenplum_source.engine = mock_engine @@ -121,8 +117,7 @@ def test_version_detection_caches_result(self): mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) mock_conn.execute.return_value.scalar.return_value = ( - "PostgreSQL 12.12 (Greenplum Database 7.2.0 build dev) on" - " x86_64-pc-linux-gnu" + "PostgreSQL 12.12 (Greenplum Database 7.2.0 build dev) on x86_64-pc-linux-gnu" ) self.greenplum_source.engine = mock_engine @@ -131,28 +126,24 @@ def test_version_detection_caches_result(self): self.greenplum_source._get_greenplum_major_version() mock_engine.connect.assert_called_once() - def test_version_detection_defaults_to_7_on_error(self): + def test_version_detection_defaults_to_6_on_error(self): mock_engine = MagicMock() - mock_engine.connect.side_effect = OperationalError( - "SELECT version()", {}, Exception("connection error") - ) + mock_engine.connect.side_effect = OperationalError("SELECT version()", {}, Exception("connection error")) self.greenplum_source.engine = mock_engine self.greenplum_source._greenplum_version = None - assert self.greenplum_source._get_greenplum_major_version() == 7 + assert self.greenplum_source._get_greenplum_major_version() == 6 - def test_version_detection_defaults_to_7_on_unparseable(self): + def test_version_detection_defaults_to_6_on_unparseable(self): mock_engine = MagicMock() mock_conn = MagicMock() mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) - mock_conn.execute.return_value.scalar.return_value = ( - "PostgreSQL 14.0 on x86_64-pc-linux-gnu" - ) + mock_conn.execute.return_value.scalar.return_value = "PostgreSQL 14.0 on x86_64-pc-linux-gnu" self.greenplum_source.engine = mock_engine self.greenplum_source._greenplum_version = None - assert self.greenplum_source._get_greenplum_major_version() == 7 + assert self.greenplum_source._get_greenplum_major_version() == 6 def test_partition_details_v7(self): mock_engine = MagicMock() @@ -172,9 +163,7 @@ def test_partition_details_v7(self): self.greenplum_source.engine = mock_engine self.greenplum_source._greenplum_version = 7 - is_partitioned, partition = self.greenplum_source.get_table_partition_details( - "sales", "public", MagicMock() - ) + is_partitioned, partition = self.greenplum_source.get_table_partition_details("sales", "public", MagicMock()) assert is_partitioned is True assert partition == TablePartition( @@ -205,9 +194,7 @@ def test_partition_details_v6(self): self.greenplum_source.engine = mock_engine self.greenplum_source._greenplum_version = 6 - is_partitioned, partition = self.greenplum_source.get_table_partition_details( - "sales", "public", MagicMock() - ) + is_partitioned, partition = self.greenplum_source.get_table_partition_details("sales", "public", MagicMock()) assert is_partitioned is True assert partition == TablePartition( @@ -258,9 +245,50 @@ def test_partition_details_expression_key_returns_true_no_columns(self): self.greenplum_source.engine = mock_engine self.greenplum_source._greenplum_version = 7 - is_partitioned, partition = self.greenplum_source.get_table_partition_details( - "sales", "public", MagicMock() - ) + is_partitioned, partition = self.greenplum_source.get_table_partition_details("sales", "public", MagicMock()) assert is_partitioned is True assert partition is None + + def test_query_table_names_uses_v6_query_for_v6(self): + """Issue #24559: ensure GP6 uses pg_partition_rule-based table-listing query.""" + from metadata.ingestion.source.database.greenplum.queries import ( + GREENPLUM_GET_TABLE_NAMES_V6, + ) + + mock_connection = MagicMock() + mock_connection.execute.return_value = [] + self.greenplum_source._greenplum_version = 6 + + with patch.object( + type(self.greenplum_source), + "connection", + new_callable=lambda: property(lambda self: mock_connection), + ): + list(self.greenplum_source.query_table_names_and_types("public")) + + executed_sql = str(mock_connection.execute.call_args[0][0]) + assert executed_sql.strip() == GREENPLUM_GET_TABLE_NAMES_V6.strip() + assert "pg_partition_rule" in executed_sql + + def test_query_table_names_uses_v7_query_for_v7(self): + """Issue #24559: ensure GP7 uses relispartition-based table-listing query.""" + from metadata.ingestion.source.database.greenplum.queries import ( + GREENPLUM_GET_TABLE_NAMES_V7, + ) + + mock_connection = MagicMock() + mock_connection.execute.return_value = [] + self.greenplum_source._greenplum_version = 7 + + with patch.object( + type(self.greenplum_source), + "connection", + new_callable=lambda: property(lambda self: mock_connection), + ): + list(self.greenplum_source.query_table_names_and_types("public")) + + executed_sql = str(mock_connection.execute.call_args[0][0]) + assert executed_sql.strip() == GREENPLUM_GET_TABLE_NAMES_V7.strip() + assert "relispartition" in executed_sql + assert "pg_partition_rule" not in executed_sql