diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py index cd48dcf1f894..57a4b34dd538 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py @@ -12,6 +12,7 @@ Greenplum source module """ +import re import traceback from collections import namedtuple from typing import Iterable, Optional, Tuple @@ -19,6 +20,7 @@ from sqlalchemy import sql, text from sqlalchemy.dialects.postgresql.base import PGDialect from sqlalchemy.engine import Inspector +from sqlalchemy.exc import SQLAlchemyError from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( @@ -48,8 +50,11 @@ ) from metadata.ingestion.source.database.greenplum.queries import ( GREENPLUM_GET_DB_NAMES, - GREENPLUM_GET_TABLE_NAMES, - GREENPLUM_PARTITION_DETAILS, + GREENPLUM_GET_TABLE_NAMES_V6, + GREENPLUM_GET_TABLE_NAMES_V7, + GREENPLUM_GET_VERSION, + GREENPLUM_PARTITION_DETAILS_V6, + GREENPLUM_PARTITION_DETAILS_V7, ) from metadata.ingestion.source.database.greenplum.utils import ( get_column_info, @@ -92,6 +97,10 @@ class GreenplumSource(CommonDbSourceService, MultiDBSource): Database metadata from Greenplum Source """ + def __init__(self, config, metadata): + super().__init__(config, metadata) + self._greenplum_version: Optional[int] = None + @classmethod def create( cls, @@ -105,13 +114,43 @@ def create( raise InvalidSourceException(f"Expected GreenplumConnection, but got {connection}") return cls(config, metadata) + def _get_greenplum_major_version(self) -> int: + if self._greenplum_version is None: + try: + with self.engine.connect() as conn: + result = conn.execute(text(GREENPLUM_GET_VERSION)) + version_string = result.scalar() or "" + match = re.search(r"Greenplum Database (\d+)", version_string) + if match: + self._greenplum_version = int(match.group(1)) + logger.info("Detected Greenplum major version %d", self._greenplum_version) + else: + logger.warning( + "Could not parse Greenplum major version from SELECT version() output, " + "defaulting conservatively to 6" + ) + logger.debug("Full version() output: %s", version_string) + self._greenplum_version = 6 + except SQLAlchemyError as exc: + logger.debug(traceback.format_exc()) + logger.warning( + "Could not determine Greenplum version (%s), defaulting conservatively to 6", + exc, + ) + self._greenplum_version = 6 + return self._greenplum_version + + def _is_v7_or_later(self) -> bool: + return self._get_greenplum_major_version() >= 7 + def query_table_names_and_types(self, schema_name: str) -> Iterable[TableNameAndType]: """ Overwrite the inspector implementation to handle partitioned and foreign types """ + table_names_query = GREENPLUM_GET_TABLE_NAMES_V7 if self._is_v7_or_later() else GREENPLUM_GET_TABLE_NAMES_V6 result = self.connection.execute( - sql.text(GREENPLUM_GET_TABLE_NAMES), + sql.text(table_names_query), {"schema": schema_name}, ) @@ -157,26 +196,58 @@ def get_database_names(self) -> Iterable[str]: def get_table_partition_details( self, table_name: str, schema_name: str, inspector: Inspector + ) -> Tuple[bool, Optional[TablePartition]]: + if self._is_v7_or_later(): + return self._get_table_partition_details_v7(table_name, schema_name) + return self._get_table_partition_details_v6(table_name, schema_name) + + def _get_table_partition_details_v6( + self, table_name: str, schema_name: str ) -> Tuple[bool, Optional[TablePartition]]: with self.engine.connect() as conn: result = conn.execute( - text(GREENPLUM_PARTITION_DETAILS.format(table_name=table_name, schema_name=schema_name)) + text(GREENPLUM_PARTITION_DETAILS_V6), + {"table_name": table_name, "schema_name": schema_name}, ).all() - if result: - partition_details = TablePartition( - columns=[ - PartitionColumnDetails( - columnName=row.column_name, - intervalType=INTERVAL_TYPE_MAP.get( - result[0].partition_strategy, - PartitionIntervalTypes.COLUMN_VALUE, - ), - interval=None, - ) - for row in result - if row.column_name - ] + return self._build_partition_result(result) + + def _get_table_partition_details_v7( + self, table_name: str, schema_name: str + ) -> Tuple[bool, Optional[TablePartition]]: + with self.engine.connect() as conn: + result = conn.execute( + text(GREENPLUM_PARTITION_DETAILS_V7), + {"table_name": table_name, "schema_name": schema_name}, + ).all() + + return self._build_partition_result(result) + + @staticmethod + def _build_partition_result( + result, + ) -> Tuple[bool, Optional[TablePartition]]: + # When a partition key is an expression (not a plain column), + # pg_partitioned_table.partattrs / pg_partition.paratts contains 0, so the + # join to information_schema.columns yields NULL column_name and the row is + # filtered out below. The table is still partitioned; we just cannot surface + # the partition-column details yet. Return (True, None) so it is ingested as + # TableType.Partitioned rather than TableType.Regular. + if not result: + return False, None + columns = [ + PartitionColumnDetails( + columnName=row.column_name, + intervalType=INTERVAL_TYPE_MAP.get( + row.partition_strategy, + PartitionIntervalTypes.COLUMN_VALUE, + ), + interval=None, ) - return True, partition_details - return False, None + for row in result + if row.column_name + ] + if not columns: + # Partition exists but all keys are expression-based (column_name is NULL). + return True, None + return True, TablePartition(columns=columns) diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py b/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py index 3a66c86f3e61..808184f799fc 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/queries.py @@ -16,7 +16,9 @@ # https://www.postgresql.org/docs/current/catalog-pg-class.html # r = ordinary table, v = view, m = materialized view, c = composite type, f = foreign table, p = partitioned table, -GREENPLUM_GET_TABLE_NAMES = """ + +# Greenplum 6: uses pg_partition_rule to filter out child partitions +GREENPLUM_GET_TABLE_NAMES_V6 = """ select c.relname, c.relkind from pg_catalog.pg_class c left outer join pg_catalog.pg_partition_rule pr on c.oid = pr.parchildrelid @@ -26,7 +28,15 @@ and n.nspname = :schema """ -GREENPLUM_PARTITION_DETAILS = textwrap.dedent( +# Greenplum 7+: uses PostgreSQL-style relispartition to filter out child partitions +GREENPLUM_GET_TABLE_NAMES_V7 = """ + SELECT c.relname, c.relkind FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = :schema AND c.relkind in ('r', 'p', 'f') AND c.relispartition = false +""" + +# Greenplum 6: uses pg_partition catalog with parkind/paratts columns +GREENPLUM_PARTITION_DETAILS_V6 = textwrap.dedent( """ select ns.nspname as schema, @@ -45,7 +55,7 @@ from pg_catalog.pg_partition) pt join - pg_class par + pg_catalog.pg_class par on par.oid = pt.parrelid left join @@ -56,7 +66,42 @@ col.table_schema = ns.nspname and col.table_name = par.relname and ordinal_position = pt.column_index - where par.relname='{table_name}' and ns.nspname='{schema_name}' + where par.relname=:table_name and ns.nspname=:schema_name + """ +) + +# Greenplum 7+: uses PostgreSQL-standard pg_partitioned_table with partstrat/partattrs +GREENPLUM_PARTITION_DETAILS_V7 = textwrap.dedent( + """ + select + ns.nspname as schema, + par.relname as table_name, + partition_strategy, + col.column_name + from + (select + partrelid, + partnatts, + case partstrat + when 'l' then 'list' + when 'h' then 'hash' + when 'r' then 'range' end as partition_strategy, + unnest(partattrs) column_index + from + pg_catalog.pg_partitioned_table) pt + join + pg_catalog.pg_class par + on + par.oid = pt.partrelid + join + pg_catalog.pg_namespace ns on ns.oid = par.relnamespace + left join + information_schema.columns col + on + col.table_schema = ns.nspname + and col.table_name = par.relname + and ordinal_position = pt.column_index + where par.relname=:table_name and ns.nspname=:schema_name """ ) @@ -139,3 +184,7 @@ GREENPLUM_GET_SERVER_VERSION = """ show server_version """ + +GREENPLUM_GET_VERSION = """ +SELECT version() +""" diff --git a/ingestion/tests/unit/topology/database/test_greenplum.py b/ingestion/tests/unit/topology/database/test_greenplum.py index e7e1e4706c40..47f203b76f42 100644 --- a/ingestion/tests/unit/topology/database/test_greenplum.py +++ b/ingestion/tests/unit/topology/database/test_greenplum.py @@ -13,9 +13,17 @@ Test Greenplum using the topology """ +from collections import namedtuple from unittest import TestCase -from unittest.mock import patch +from unittest.mock import MagicMock, patch +from sqlalchemy.exc import OperationalError + +from metadata.generated.schema.entity.data.table import ( + PartitionColumnDetails, + PartitionIntervalTypes, + TablePartition, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) @@ -55,6 +63,8 @@ }, } +PartitionRow = namedtuple("PartitionRow", ["schema", "table_name", "partition_strategy", "column_name"]) + class greenplumUnitTest(TestCase): @patch("metadata.ingestion.source.database.common_db_source.CommonDbSourceService.test_connection") @@ -72,3 +82,213 @@ def __init__(self, methodName, test_connection) -> None: def test_close_connection(self, engine, connection): connection.return_value = True self.greenplum_source.close() + + def test_version_detection_v6(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + mock_conn.execute.return_value.scalar.return_value = ( + "PostgreSQL 9.4.26 (Greenplum Database 6.25.3 build dev) on x86_64-pc-linux-gnu compiled by gcc 6.4.0" + ) + self.greenplum_source.engine = mock_engine + + self.greenplum_source._greenplum_version = None + assert self.greenplum_source._get_greenplum_major_version() == 6 + assert not self.greenplum_source._is_v7_or_later() + + def test_version_detection_v7(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + mock_conn.execute.return_value.scalar.return_value = ( + "PostgreSQL 12.12 (Greenplum Database 7.0.0 build dev) on x86_64-pc-linux-gnu compiled by gcc 12.3.1" + ) + self.greenplum_source.engine = mock_engine + + self.greenplum_source._greenplum_version = None + assert self.greenplum_source._get_greenplum_major_version() == 7 + assert self.greenplum_source._is_v7_or_later() + + def test_version_detection_caches_result(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + mock_conn.execute.return_value.scalar.return_value = ( + "PostgreSQL 12.12 (Greenplum Database 7.2.0 build dev) on x86_64-pc-linux-gnu" + ) + self.greenplum_source.engine = mock_engine + + self.greenplum_source._greenplum_version = None + self.greenplum_source._get_greenplum_major_version() + self.greenplum_source._get_greenplum_major_version() + mock_engine.connect.assert_called_once() + + def test_version_detection_defaults_to_6_on_error(self): + mock_engine = MagicMock() + mock_engine.connect.side_effect = OperationalError("SELECT version()", {}, Exception("connection error")) + self.greenplum_source.engine = mock_engine + + self.greenplum_source._greenplum_version = None + assert self.greenplum_source._get_greenplum_major_version() == 6 + + def test_version_detection_defaults_to_6_on_unparseable(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + mock_conn.execute.return_value.scalar.return_value = "PostgreSQL 14.0 on x86_64-pc-linux-gnu" + self.greenplum_source.engine = mock_engine + + self.greenplum_source._greenplum_version = None + assert self.greenplum_source._get_greenplum_major_version() == 6 + + def test_partition_details_v7(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + + mock_conn.execute.return_value.all.return_value = [ + PartitionRow( + schema="public", + table_name="sales", + partition_strategy="range", + column_name="sale_date", + ) + ] + + self.greenplum_source.engine = mock_engine + self.greenplum_source._greenplum_version = 7 + + is_partitioned, partition = self.greenplum_source.get_table_partition_details("sales", "public", MagicMock()) + + assert is_partitioned is True + assert partition == TablePartition( + columns=[ + PartitionColumnDetails( + columnName="sale_date", + intervalType=PartitionIntervalTypes.TIME_UNIT, + interval=None, + ) + ] + ) + + def test_partition_details_v6(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + + mock_conn.execute.return_value.all.return_value = [ + PartitionRow( + schema="public", + table_name="sales", + partition_strategy="list", + column_name="region", + ) + ] + + self.greenplum_source.engine = mock_engine + self.greenplum_source._greenplum_version = 6 + + is_partitioned, partition = self.greenplum_source.get_table_partition_details("sales", "public", MagicMock()) + + assert is_partitioned is True + assert partition == TablePartition( + columns=[ + PartitionColumnDetails( + columnName="region", + intervalType=PartitionIntervalTypes.COLUMN_VALUE, + interval=None, + ) + ] + ) + + def test_partition_details_no_results(self): + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + mock_conn.execute.return_value.all.return_value = [] + + self.greenplum_source.engine = mock_engine + self.greenplum_source._greenplum_version = 7 + + is_partitioned, partition = self.greenplum_source.get_table_partition_details( + "not_partitioned", "public", MagicMock() + ) + + assert is_partitioned is False + assert partition is None + + def test_partition_details_expression_key_returns_true_no_columns(self): + """Expression-based partition keys yield NULL column_name rows; the + table should still be reported as partitioned (is_partitioned=True) + because a row exists in the partitioning catalog, even though we + cannot resolve the column names yet. partition details are None.""" + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_engine.connect.return_value.__enter__ = MagicMock(return_value=mock_conn) + mock_engine.connect.return_value.__exit__ = MagicMock(return_value=False) + mock_conn.execute.return_value.all.return_value = [ + PartitionRow( + schema="public", + table_name="sales", + partition_strategy="range", + column_name=None, + ) + ] + + self.greenplum_source.engine = mock_engine + self.greenplum_source._greenplum_version = 7 + + is_partitioned, partition = self.greenplum_source.get_table_partition_details("sales", "public", MagicMock()) + + assert is_partitioned is True + assert partition is None + + def test_query_table_names_uses_v6_query_for_v6(self): + """Issue #24559: ensure GP6 uses pg_partition_rule-based table-listing query.""" + from metadata.ingestion.source.database.greenplum.queries import ( + GREENPLUM_GET_TABLE_NAMES_V6, + ) + + mock_connection = MagicMock() + mock_connection.execute.return_value = [] + self.greenplum_source._greenplum_version = 6 + + with patch.object( + type(self.greenplum_source), + "connection", + new_callable=lambda: property(lambda self: mock_connection), + ): + list(self.greenplum_source.query_table_names_and_types("public")) + + executed_sql = str(mock_connection.execute.call_args[0][0]) + assert executed_sql.strip() == GREENPLUM_GET_TABLE_NAMES_V6.strip() + assert "pg_partition_rule" in executed_sql + + def test_query_table_names_uses_v7_query_for_v7(self): + """Issue #24559: ensure GP7 uses relispartition-based table-listing query.""" + from metadata.ingestion.source.database.greenplum.queries import ( + GREENPLUM_GET_TABLE_NAMES_V7, + ) + + mock_connection = MagicMock() + mock_connection.execute.return_value = [] + self.greenplum_source._greenplum_version = 7 + + with patch.object( + type(self.greenplum_source), + "connection", + new_callable=lambda: property(lambda self: mock_connection), + ): + list(self.greenplum_source.query_table_names_and_types("public")) + + executed_sql = str(mock_connection.execute.call_args[0][0]) + assert executed_sql.strip() == GREENPLUM_GET_TABLE_NAMES_V7.strip() + assert "relispartition" in executed_sql + assert "pg_partition_rule" not in executed_sql