Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
Greenplum source module
"""

import re
import traceback
from collections import namedtuple
from typing import Iterable, Optional, Tuple

from sqlalchemy import sql, text
from sqlalchemy.dialects.postgresql.base import PGDialect
from sqlalchemy.engine import Inspector
from sqlalchemy.exc import SQLAlchemyError

from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import (
Expand Down Expand Up @@ -48,8 +50,11 @@
)
from metadata.ingestion.source.database.greenplum.queries import (
GREENPLUM_GET_DB_NAMES,
GREENPLUM_GET_TABLE_NAMES,
GREENPLUM_PARTITION_DETAILS,
GREENPLUM_GET_TABLE_NAMES_V6,
GREENPLUM_GET_TABLE_NAMES_V7,
GREENPLUM_GET_VERSION,
GREENPLUM_PARTITION_DETAILS_V6,
GREENPLUM_PARTITION_DETAILS_V7,
)
from metadata.ingestion.source.database.greenplum.utils import (
get_column_info,
Expand Down Expand Up @@ -92,6 +97,10 @@
Database metadata from Greenplum Source
"""

def __init__(self, config, metadata):

Check warning on line 100 in ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Type annotation is missing for parameter "metadata" (reportMissingParameterType)

Check warning on line 100 in ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Type of parameter "metadata" is partially unknown   Parameter type is "OpenMetadata[Unknown, Unknown]" (reportUnknownParameterType)

Check warning on line 100 in ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Type annotation is missing for parameter "config" (reportMissingParameterType)
super().__init__(config, metadata)

Check warning on line 101 in ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Type of "__init__" is partially unknown   Type of "__init__" is "(config: Source, metadata: OpenMetadata[Unknown, Unknown]) -> None" (reportUnknownMemberType)
self._greenplum_version: Optional[int] = None

@classmethod
def create(
cls,
Expand All @@ -105,13 +114,43 @@
raise InvalidSourceException(f"Expected GreenplumConnection, but got {connection}")
return cls(config, metadata)

def _get_greenplum_major_version(self) -> int:
if self._greenplum_version is None:
try:
with self.engine.connect() as conn:
result = conn.execute(text(GREENPLUM_GET_VERSION))
version_string = result.scalar() or ""
match = re.search(r"Greenplum Database (\d+)", version_string)
if match:
self._greenplum_version = int(match.group(1))
logger.info("Detected Greenplum major version %d", self._greenplum_version)
else:
logger.warning(
"Could not parse Greenplum major version from SELECT version() output, "

Check warning on line 129 in ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Implicit string concatenation not allowed (reportImplicitStringConcatenation)
"defaulting conservatively to 6"
)
Comment on lines +128 to +131
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning log on an unparseable SELECT version() result logs the full version_string, which can include OS/compiler/build details. Consider logging only a redacted/trimmed value (or logging it at debug) to reduce inadvertent environment information exposure in ingestion logs.

Copilot uses AI. Check for mistakes.
logger.debug("Full version() output: %s", version_string)
self._greenplum_version = 6
except SQLAlchemyError as exc:
logger.debug(traceback.format_exc())
logger.warning(
"Could not determine Greenplum version (%s), defaulting conservatively to 6",
exc,
)
self._greenplum_version = 6
return self._greenplum_version

def _is_v7_or_later(self) -> bool:
return self._get_greenplum_major_version() >= 7

def query_table_names_and_types(self, schema_name: str) -> Iterable[TableNameAndType]:
"""
Overwrite the inspector implementation to handle partitioned
and foreign types
"""
table_names_query = GREENPLUM_GET_TABLE_NAMES_V7 if self._is_v7_or_later() else GREENPLUM_GET_TABLE_NAMES_V6
result = self.connection.execute(
sql.text(GREENPLUM_GET_TABLE_NAMES),
sql.text(table_names_query),
{"schema": schema_name},
)

Expand Down Expand Up @@ -157,26 +196,58 @@

def get_table_partition_details(
self, table_name: str, schema_name: str, inspector: Inspector
) -> Tuple[bool, Optional[TablePartition]]:
if self._is_v7_or_later():
return self._get_table_partition_details_v7(table_name, schema_name)
return self._get_table_partition_details_v6(table_name, schema_name)

def _get_table_partition_details_v6(
self, table_name: str, schema_name: str
) -> Tuple[bool, Optional[TablePartition]]:
with self.engine.connect() as conn:
result = conn.execute(
text(GREENPLUM_PARTITION_DETAILS.format(table_name=table_name, schema_name=schema_name))
text(GREENPLUM_PARTITION_DETAILS_V6),
{"table_name": table_name, "schema_name": schema_name},
).all()

if result:
partition_details = TablePartition(
columns=[
PartitionColumnDetails(
columnName=row.column_name,
intervalType=INTERVAL_TYPE_MAP.get(
result[0].partition_strategy,
PartitionIntervalTypes.COLUMN_VALUE,
),
interval=None,
)
for row in result
if row.column_name
]
return self._build_partition_result(result)

Check warning on line 213 in ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Type of "_build_partition_result" is partially unknown   Type of "_build_partition_result" is "(result: Unknown) -> Tuple[bool, TablePartition | None]" (reportUnknownMemberType)

def _get_table_partition_details_v7(
self, table_name: str, schema_name: str
) -> Tuple[bool, Optional[TablePartition]]:
with self.engine.connect() as conn:
result = conn.execute(
text(GREENPLUM_PARTITION_DETAILS_V7),
{"table_name": table_name, "schema_name": schema_name},
).all()

return self._build_partition_result(result)

Check warning on line 224 in ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Type of "_build_partition_result" is partially unknown   Type of "_build_partition_result" is "(result: Unknown) -> Tuple[bool, TablePartition | None]" (reportUnknownMemberType)

@staticmethod
def _build_partition_result(
result,

Check warning on line 228 in ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Type annotation is missing for parameter "result" (reportMissingParameterType)

Check warning on line 228 in ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Type of parameter "result" is unknown (reportUnknownParameterType)
) -> Tuple[bool, Optional[TablePartition]]:
# When a partition key is an expression (not a plain column),
# pg_partitioned_table.partattrs / pg_partition.paratts contains 0, so the
# join to information_schema.columns yields NULL column_name and the row is
# filtered out below. The table is still partitioned; we just cannot surface
# the partition-column details yet. Return (True, None) so it is ingested as
# TableType.Partitioned rather than TableType.Regular.
if not result:
return False, None
columns = [
PartitionColumnDetails(
columnName=row.column_name,

Check warning on line 240 in ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py

View workflow job for this annotation

GitHub Actions / Unit Tests & Static Checks (3.10)

Type of "column_name" is unknown (reportUnknownMemberType)
intervalType=INTERVAL_TYPE_MAP.get(
row.partition_strategy,
PartitionIntervalTypes.COLUMN_VALUE,
),
interval=None,
)
return True, partition_details
return False, None
for row in result
if row.column_name
]
if not columns:
# Partition exists but all keys are expression-based (column_name is NULL).
return True, None
return True, TablePartition(columns=columns)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

# https://www.postgresql.org/docs/current/catalog-pg-class.html
# r = ordinary table, v = view, m = materialized view, c = composite type, f = foreign table, p = partitioned table,
GREENPLUM_GET_TABLE_NAMES = """

# Greenplum 6: uses pg_partition_rule to filter out child partitions
GREENPLUM_GET_TABLE_NAMES_V6 = """
select c.relname, c.relkind
from pg_catalog.pg_class c
left outer join pg_catalog.pg_partition_rule pr on c.oid = pr.parchildrelid
Expand All @@ -26,7 +28,15 @@
and n.nspname = :schema
"""

GREENPLUM_PARTITION_DETAILS = textwrap.dedent(
# Greenplum 7+: uses PostgreSQL-style relispartition to filter out child partitions
GREENPLUM_GET_TABLE_NAMES_V7 = """
SELECT c.relname, c.relkind FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = :schema AND c.relkind in ('r', 'p', 'f') AND c.relispartition = false
"""

# Greenplum 6: uses pg_partition catalog with parkind/paratts columns
GREENPLUM_PARTITION_DETAILS_V6 = textwrap.dedent(
"""
select
ns.nspname as schema,
Expand All @@ -45,7 +55,7 @@
from
pg_catalog.pg_partition) pt
join
pg_class par
pg_catalog.pg_class par
on
par.oid = pt.parrelid
left join
Expand All @@ -56,7 +66,42 @@
col.table_schema = ns.nspname
and col.table_name = par.relname
and ordinal_position = pt.column_index
where par.relname='{table_name}' and ns.nspname='{schema_name}'
where par.relname=:table_name and ns.nspname=:schema_name
"""
)

# Greenplum 7+: uses PostgreSQL-standard pg_partitioned_table with partstrat/partattrs
GREENPLUM_PARTITION_DETAILS_V7 = textwrap.dedent(
"""
select
ns.nspname as schema,
par.relname as table_name,
partition_strategy,
col.column_name
from
(select
partrelid,
partnatts,
case partstrat
when 'l' then 'list'
when 'h' then 'hash'
when 'r' then 'range' end as partition_strategy,
unnest(partattrs) column_index
from
pg_catalog.pg_partitioned_table) pt
join
pg_catalog.pg_class par
on
par.oid = pt.partrelid
join
pg_catalog.pg_namespace ns on ns.oid = par.relnamespace
left join
information_schema.columns col
on
col.table_schema = ns.nspname
and col.table_name = par.relname
and ordinal_position = pt.column_index
where par.relname=:table_name and ns.nspname=:schema_name
"""
)

Expand Down Expand Up @@ -139,3 +184,7 @@
GREENPLUM_GET_SERVER_VERSION = """
show server_version
"""

GREENPLUM_GET_VERSION = """
SELECT version()
"""
Loading
Loading