diff --git a/flowfile_core/flowfile_core/alembic/versions/011_drop_optimized_plan_columns.py b/flowfile_core/flowfile_core/alembic/versions/011_drop_optimized_plan_columns.py new file mode 100644 index 000000000..85021024b --- /dev/null +++ b/flowfile_core/flowfile_core/alembic/versions/011_drop_optimized_plan_columns.py @@ -0,0 +1,35 @@ +"""Drop optimized virtual-table plan storage columns. + +Removes ``serialized_lazy_frame``, ``polars_plan``, and ``source_table_versions`` +from ``catalog_tables``. These backed the optimized virtual-table read path, +which has been collapsed into the unified ``resolve_virtual_flow_table`` flow. +``is_optimized`` is kept as a derived "producer flow is fully lazy" indicator +that drives the laziness-blocker propagation. + +Revision ID: 011 +Revises: 010 +Create Date: 2026-05-01 +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +revision: str = "011" +down_revision: str | None = "010" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + with op.batch_alter_table("catalog_tables", recreate="auto") as batch_op: + batch_op.drop_column("source_table_versions") + batch_op.drop_column("polars_plan") + batch_op.drop_column("serialized_lazy_frame") + + +def downgrade() -> None: + op.add_column("catalog_tables", sa.Column("serialized_lazy_frame", sa.LargeBinary, nullable=True)) + op.add_column("catalog_tables", sa.Column("polars_plan", sa.Text, nullable=True)) + op.add_column("catalog_tables", sa.Column("source_table_versions", sa.Text, nullable=True)) diff --git a/flowfile_core/flowfile_core/catalog/delta_utils.py b/flowfile_core/flowfile_core/catalog/delta_utils.py index ab1022bb7..3da0a4720 100644 --- a/flowfile_core/flowfile_core/catalog/delta_utils.py +++ b/flowfile_core/flowfile_core/catalog/delta_utils.py @@ -6,7 +6,6 @@ from __future__ import annotations -import json import logging import shutil from pathlib import Path @@ -14,50 +13,11 @@ import pyarrow as pa from deltalake import DeltaTable -from shared.delta_models import SourceTableVersion from shared.delta_utils import get_delta_size_bytes logger = logging.getLogger(__name__) -def check_source_versions_current(source_table_versions_json: str | None) -> bool: - """Return True if all source delta tables are still at their recorded versions. - - Returns True when no versions are recorded (backward compat for existing virtual tables). - Returns False if any source table has been updated, deleted, or is unreadable. - - Note: There is a known TOCTOU (time-of-check-time-of-use) race condition here. - A source table could be updated between this staleness check and the actual - query execution (collect). This is acceptable for now but means stale reads - are possible in rare concurrent-write scenarios. - """ - if not source_table_versions_json: - return True - try: - raw = json.loads(source_table_versions_json) - versions = [SourceTableVersion(**entry) for entry in raw] - except (ValueError, KeyError, TypeError): - logger.warning("Could not parse source_table_versions JSON, treating as stale") - return False - - for sv in versions: - try: - current_version = DeltaTable(sv.file_path, without_files=True).version() - if current_version != sv.version: - logger.info( - "Source table %d at %s changed: expected version %d, current %d", - sv.table_id, - sv.file_path, - sv.version, - current_version, - ) - return False - except Exception: - logger.warning("Could not read delta version for source table %d at %s", sv.table_id, sv.file_path) - return False - return True - - def is_delta_table(path: str | Path) -> bool: """Return ``True`` if *path* is a directory containing ``_delta_log/``.""" p = Path(path) diff --git a/flowfile_core/flowfile_core/catalog/service.py b/flowfile_core/flowfile_core/catalog/service.py index 149ddc40d..30c1e3159 100644 --- a/flowfile_core/flowfile_core/catalog/service.py +++ b/flowfile_core/flowfile_core/catalog/service.py @@ -38,7 +38,6 @@ # Re-exports preserved so external callers / tests that still reach for the # underscore-prefixed names continue to work. -from flowfile_core.catalog.text_utils import hash_source_versions as _hash_source_versions # noqa: F401 from flowfile_core.catalog.text_utils import is_table_reference as _is_table_reference # noqa: F401 from flowfile_core.catalog.text_utils import parse_delta_history as _parse_delta_history # noqa: F401 from flowfile_core.catalog.text_utils import rewrite_qualified_references as _rewrite_qualified_references # noqa: F401 @@ -148,7 +147,7 @@ def __init__(self, repo: CatalogRepository) -> None: self._sql.bind(virtual_tables=self._virtual_tables) self._virtual_tables.bind(sql=self._sql) - self._previews = TablePreviewService(repo, self._tables, self._virtual_tables) + self._previews = TablePreviewService(repo, self._tables) self._visualizations = VisualizationService( repo, self._namespaces, self._tables, self._virtual_tables, self._sql ) @@ -634,11 +633,8 @@ def create_virtual_flow_table( producer_registration_id: int, namespace_id: int | None = None, description: str | None = None, - serialized_lazy_frame: bytes | None = None, is_optimized: bool = False, schema_json: str | None = None, - polars_plan: str | None = None, - source_table_versions: str | None = None, ) -> CatalogTableOut: """Create a virtual flow table (non-materialised catalog entry).""" return self._virtual_tables.create_virtual_flow_table( @@ -647,11 +643,8 @@ def create_virtual_flow_table( producer_registration_id, namespace_id, description, - serialized_lazy_frame, is_optimized, schema_json, - polars_plan, - source_table_versions, ) def update_virtual_flow_table( @@ -661,11 +654,8 @@ def update_virtual_flow_table( description: str | None = None, namespace_id: int | None = None, producer_registration_id: int | None = None, - serialized_lazy_frame: bytes | None = None, is_optimized: bool | None = None, schema_json: str | None = None, - polars_plan: str | None = None, - source_table_versions: str | None = None, ) -> CatalogTableOut: """Update a virtual flow table's metadata or producer.""" return self._virtual_tables.update_virtual_flow_table( @@ -674,11 +664,8 @@ def update_virtual_flow_table( description, namespace_id, producer_registration_id, - serialized_lazy_frame, is_optimized, schema_json, - polars_plan, - source_table_versions, ) def create_query_virtual_table( @@ -747,15 +734,6 @@ def get_table_preview( """Read the first N rows from a catalog table (physical, virtual or Delta-versioned).""" return self._previews.get_table_preview(table_id, limit, version, user_id) - def resolve_virtual_flow_table_preview( - self, - table_id: int, - limit: int, - user_id: int | None = None, - ) -> CatalogTablePreview: - """Resolve a virtual flow table and return a preview (worker-backed).""" - return self._previews.resolve_virtual_flow_table_preview(table_id, limit, user_id) - def get_table_history(self, table_id: int, limit: int | None = None) -> DeltaTableHistory: """Return the version history for a Delta catalog table.""" return self._previews.get_table_history(table_id, limit) diff --git a/flowfile_core/flowfile_core/catalog/services/previews.py b/flowfile_core/flowfile_core/catalog/services/previews.py index e6c040ae2..35c0c93fe 100644 --- a/flowfile_core/flowfile_core/catalog/services/previews.py +++ b/flowfile_core/flowfile_core/catalog/services/previews.py @@ -1,11 +1,16 @@ -"""Previews for physical and virtual tables, plus Delta history.""" +"""Previews for physical tables and Delta history. + +Virtual tables don't have a preview path: the catalog UI shows a +"no data preview available" placeholder for them and never calls these +endpoints, so ``get_table_preview`` returns an empty preview when handed +a virtual table id. +""" from __future__ import annotations import logging from pathlib import Path -import polars as pl from deltalake import DeltaTable from flowfile_core.catalog.delta_utils import ( @@ -16,23 +21,18 @@ from flowfile_core.catalog.exceptions import TableNotFoundError from flowfile_core.catalog.repository import CatalogRepository from flowfile_core.catalog.serializers import format_pyarrow_preview -from flowfile_core.catalog.services._resolve import resolve_or_log from flowfile_core.catalog.services.tables import TableService -from flowfile_core.catalog.services.virtual_tables import VirtualTableService -from flowfile_core.catalog.text_utils import hash_source_versions, parse_delta_history +from flowfile_core.catalog.text_utils import parse_delta_history from flowfile_core.database.models import CatalogTable from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import ( trigger_delta_history, trigger_delta_version_preview, - trigger_resolve_virtual_table, ) from flowfile_core.schemas.catalog_schema import ( CatalogTablePreview, DeltaTableHistory, ) from flowfile_core.utils.arrow_reader import read_top_n -from shared.delta_utils import validate_catalog_path -from shared.storage_config import storage logger = logging.getLogger(__name__) @@ -45,17 +45,15 @@ def _should_offload() -> bool: class TablePreviewService: - """Owns table preview fetching (physical, Delta-versioned, virtual) and Delta history.""" + """Owns physical/Delta-versioned table preview fetching and Delta history.""" def __init__( self, repo: CatalogRepository, tables: TableService, - virtual_tables: VirtualTableService, ) -> None: self.repo = repo self._tables = tables - self._virtual_tables = virtual_tables def get_table_preview( self, @@ -64,79 +62,15 @@ def get_table_preview( version: int | None = None, user_id: int | None = None, ) -> CatalogTablePreview: - """Read the first N rows from a catalog table.""" - table = self.repo.get_table(table_id) - if table is None: - raise TableNotFoundError(table_id=table_id) - - if getattr(table, "table_type", "physical") == "virtual": - if getattr(table, "sql_query", None): - return self._get_query_virtual_table_preview(table, limit, user_id) - return self._get_virtual_table_preview(table, limit, user_id) - - return self._get_physical_table_preview(table, limit, version) - - def _format_virtual_preview( - self, - lazy_frame: pl.LazyFrame, - table: CatalogTable, - limit: int, - ) -> CatalogTablePreview: - """Materialise *lazy_frame* on the worker, read top-N rows back as PyArrow. + """Read the first N rows from a catalog table. - Honours the core-never-collects rule — the plan is shipped to the - worker, written as IPC, then read back via ``read_top_n``. + Virtual tables fall through to ``_get_physical_table_preview`` and + return an empty preview (no ``file_path``). """ - versions_hash = hash_source_versions(table.source_table_versions) - result = trigger_resolve_virtual_table(table.id, lazy_frame.serialize(), versions_hash) - ipc_path = validate_catalog_path(result["ipc_path"], storage.catalog_virtual_results_directory) - pa_table = read_top_n(str(ipc_path), n=limit) - return format_pyarrow_preview(pa_table, total_rows=result.get("row_count")) - - def _get_query_virtual_table_preview( - self, - table: CatalogTable, - limit: int, - user_id: int | None, - ) -> CatalogTablePreview: - """Resolve a query-based virtual table and return a preview.""" - lazy_frame = resolve_or_log( - lambda: self._virtual_tables.resolve_query_virtual_table(table.id, user_id=user_id), - kind="query virtual table for preview", - identifier=table.id, - ) - if lazy_frame is None: - return CatalogTablePreview(columns=[], dtypes=[], rows=[], total_rows=0) - return self._format_virtual_preview(lazy_frame, table, limit) - - def _get_virtual_table_preview( - self, - table: CatalogTable, - limit: int, - user_id: int | None, - ) -> CatalogTablePreview: - """Resolve a virtual flow table and return a preview of the collected result.""" - lazy_frame = resolve_or_log( - lambda: self._virtual_tables.resolve_virtual_flow_table(table.id, user_id=user_id), - kind="virtual flow table for preview", - identifier=table.id, - ) - if lazy_frame is None: - return CatalogTablePreview(columns=[], dtypes=[], rows=[], total_rows=0) - return self._format_virtual_preview(lazy_frame, table, limit) - - def resolve_virtual_flow_table_preview( - self, - table_id: int, - limit: int, - user_id: int | None = None, - ) -> CatalogTablePreview: - """Resolve a virtual flow table and return a preview (worker-backed).""" table = self.repo.get_table(table_id) if table is None: raise TableNotFoundError(table_id=table_id) - lazy_frame = self._virtual_tables.resolve_virtual_flow_table(table_id, user_id=user_id) - return self._format_virtual_preview(lazy_frame, table, limit) + return self._get_physical_table_preview(table, limit, version) def _get_physical_table_preview( self, diff --git a/flowfile_core/flowfile_core/catalog/services/sql.py b/flowfile_core/flowfile_core/catalog/services/sql.py index a674a3b1d..a890415e4 100644 --- a/flowfile_core/flowfile_core/catalog/services/sql.py +++ b/flowfile_core/flowfile_core/catalog/services/sql.py @@ -18,7 +18,6 @@ from flowfile_core.catalog.services._resolve import resolve_or_log from flowfile_core.catalog.services.flows import FlowRegistrationService from flowfile_core.catalog.text_utils import ( - hash_source_versions, is_table_reference, rewrite_qualified_references, ) @@ -117,8 +116,7 @@ def execute_sql_query( def _materialise_virtual_for_sql(self, virtual_id: int, user_id: int | None) -> str: """Resolve a virtual table to an IPC path for the SQL worker call.""" lazy_frame = self._resolve_virtual_flow_table_via_facade(virtual_id, user_id=user_id, run_location="remote") - versions_hash = hash_source_versions(self.repo.get_table(virtual_id).source_table_versions) - result = trigger_resolve_virtual_table(virtual_id, lazy_frame.serialize(), versions_hash) + result = trigger_resolve_virtual_table(virtual_id, lazy_frame.serialize()) return result["ipc_path"] def save_sql_query_as_flow( diff --git a/flowfile_core/flowfile_core/catalog/services/tables.py b/flowfile_core/flowfile_core/catalog/services/tables.py index 792e3a620..b28fd947c 100644 --- a/flowfile_core/flowfile_core/catalog/services/tables.py +++ b/flowfile_core/flowfile_core/catalog/services/tables.py @@ -312,8 +312,6 @@ def table_to_out( is_optimized=getattr(table, "is_optimized", None), laziness_blockers=laziness_blockers, sql_query=getattr(table, "sql_query", None), - polars_plan=getattr(table, "polars_plan", None), - source_table_versions=getattr(table, "source_table_versions", None), created_at=table.created_at, updated_at=table.updated_at, ) @@ -369,8 +367,6 @@ def bulk_enrich_tables(self, tables: list[CatalogTable], user_id: int) -> list[C producer_registration_name=producer_registration_name, is_optimized=getattr(table, "is_optimized", None), sql_query=getattr(table, "sql_query", None), - polars_plan=getattr(table, "polars_plan", None), - source_table_versions=getattr(table, "source_table_versions", None), created_at=table.created_at, updated_at=table.updated_at, ) diff --git a/flowfile_core/flowfile_core/catalog/services/virtual_tables.py b/flowfile_core/flowfile_core/catalog/services/virtual_tables.py index 8624f2763..9fe51d0ac 100644 --- a/flowfile_core/flowfile_core/catalog/services/virtual_tables.py +++ b/flowfile_core/flowfile_core/catalog/services/virtual_tables.py @@ -2,7 +2,6 @@ from __future__ import annotations -import io import json import logging from pathlib import Path @@ -11,7 +10,7 @@ import polars as pl from flowfile_core.catalog.constants import QUERY_VIRTUAL_TABLE_RECURSION_LIMIT -from flowfile_core.catalog.delta_utils import check_source_versions_current, is_delta_table +from flowfile_core.catalog.delta_utils import is_delta_table from flowfile_core.catalog.exceptions import ( FlowNotFoundError, TableNotFoundError, @@ -77,11 +76,8 @@ def create_virtual_flow_table( producer_registration_id: int, namespace_id: int | None = None, description: str | None = None, - serialized_lazy_frame: bytes | None = None, is_optimized: bool = False, schema_json: str | None = None, - polars_plan: str | None = None, - source_table_versions: str | None = None, ) -> CatalogTableOut: """Create a virtual flow table (non-materialized catalog entry).""" producer = self.repo.get_flow(producer_registration_id) @@ -99,11 +95,8 @@ def create_virtual_flow_table( storage_format="delta", table_type="virtual", producer_registration_id=producer_registration_id, - serialized_lazy_frame=serialized_lazy_frame, is_optimized=is_optimized, schema_json=schema_json, - polars_plan=polars_plan, - source_table_versions=source_table_versions, ) table = self.repo.create_table(table) return self._tables.table_to_out(table) @@ -115,11 +108,8 @@ def update_virtual_flow_table( description: str | None = None, namespace_id: int | None = None, producer_registration_id: int | None = None, - serialized_lazy_frame: bytes | None = None, is_optimized: bool | None = None, schema_json: str | None = None, - polars_plan: str | None = None, - source_table_versions: str | None = None, ) -> CatalogTableOut: """Update a virtual flow table's metadata or producer.""" table = self.repo.get_table(table_id) @@ -137,16 +127,10 @@ def update_virtual_flow_table( if producer is None: raise FlowNotFoundError(registration_id=producer_registration_id) table.producer_registration_id = producer_registration_id - if serialized_lazy_frame is not None: - table.serialized_lazy_frame = serialized_lazy_frame if is_optimized is not None: table.is_optimized = is_optimized if schema_json is not None: table.schema_json = schema_json - if polars_plan is not None: - table.polars_plan = polars_plan - if source_table_versions is not None: - table.source_table_versions = source_table_versions table = self.repo.update_table(table) @@ -194,7 +178,6 @@ def create_query_virtual_table( storage_format="delta", table_type="virtual", producer_registration_id=None, - serialized_lazy_frame=None, is_optimized=False, sql_query=sql_query, schema_json=schema_json, @@ -316,8 +299,8 @@ def _resolve_table_for_sql_context( """Return a LazyFrame for a single referenced table when resolving a SQL context. ``resolve_or_log`` covers the broad set of errors that can come out of - nested virtual-table resolution — corrupt serialized frames, missing - producer files, polars eval errors, recursion bugs. + nested virtual-table resolution — missing producer files, polars eval + errors, recursion bugs. """ if t.table_type == "virtual": if getattr(t, "sql_query", None): @@ -326,8 +309,6 @@ def _resolve_table_for_sql_context( kind="nested query virtual table", identifier=t.name, ) - if t.is_optimized and t.serialized_lazy_frame and check_source_versions_current(t.source_table_versions): - return pl.LazyFrame.deserialize(io.BytesIO(t.serialized_lazy_frame)) if t.producer_registration_id: return resolve_or_log( lambda: self.resolve_virtual_flow_table(t.id, user_id=user_id), @@ -346,11 +327,9 @@ def resolve_virtual_flow_table( run_location: Literal["remote", "local"] | None = None, node_logger: NodeLogger | None = None, ) -> pl.LazyFrame: - """Resolve a virtual flow table to a LazyFrame. + """Resolve a virtual flow table to a LazyFrame by re-executing its producer flow. - For optimized tables, deserializes the stored LazyFrame directly. For query-based virtual tables, delegates to resolve_query_virtual_table. - For non-optimized tables, triggers flow execution via the worker. """ if run_location is None: run_location = "remote" if _should_offload() else "local" @@ -364,13 +343,6 @@ def resolve_virtual_flow_table( if table.sql_query: return self.resolve_query_virtual_table(table_id, user_id=user_id) - if table.is_optimized and table.serialized_lazy_frame: - if check_source_versions_current(table.source_table_versions): - return pl.LazyFrame.deserialize(io.BytesIO(table.serialized_lazy_frame)) - logger.info( - "Source table versions changed for virtual table %r, falling back to flow execution", table.name - ) - if not table.producer_registration_id: raise ValueError(f"Virtual table {table.name} has no producer flow") diff --git a/flowfile_core/flowfile_core/catalog/services/visualizations.py b/flowfile_core/flowfile_core/catalog/services/visualizations.py index 4fffdfed5..8f6662c07 100644 --- a/flowfile_core/flowfile_core/catalog/services/visualizations.py +++ b/flowfile_core/flowfile_core/catalog/services/visualizations.py @@ -31,7 +31,6 @@ from flowfile_core.catalog.services.tables import TableService from flowfile_core.catalog.services.virtual_tables import VirtualTableService from flowfile_core.catalog.text_utils import ( - hash_source_versions, is_table_reference, rewrite_qualified_references, ) @@ -425,8 +424,7 @@ def _resolve_source_for_worker(self, source: VizSourceDescriptor, user_id: int | from flowfile_core.catalog import service as _service_module lazy_frame = self._resolve_virtual_flow_table_via_facade(table.id, user_id=user_id, run_location="remote") - versions_hash = hash_source_versions(table.source_table_versions) - result = _service_module.trigger_resolve_virtual_table(table.id, lazy_frame.serialize(), versions_hash) + result = _service_module.trigger_resolve_virtual_table(table.id, lazy_frame.serialize()) return { "kind": "ipc_path", "session_key": f"fvt:{table.id}:{int(result['mtime'])}", @@ -486,8 +484,7 @@ def _materialise_virtual_for_viz(self, vname: str, virtual_map: dict[str, int], vid = virtual_map[vname] lazy_frame = self._resolve_virtual_flow_table_via_facade(vid, user_id=user_id, run_location="remote") - versions_hash = hash_source_versions(self.repo.get_table(vid).source_table_versions) - result = _service_module.trigger_resolve_virtual_table(vid, lazy_frame.serialize(), versions_hash) + result = _service_module.trigger_resolve_virtual_table(vid, lazy_frame.serialize()) return result["ipc_path"] def _dispatch_visualize_query( diff --git a/flowfile_core/flowfile_core/catalog/text_utils.py b/flowfile_core/flowfile_core/catalog/text_utils.py index 32237da5f..c851689b7 100644 --- a/flowfile_core/flowfile_core/catalog/text_utils.py +++ b/flowfile_core/flowfile_core/catalog/text_utils.py @@ -7,7 +7,6 @@ from __future__ import annotations -import hashlib import re from collections.abc import Iterable @@ -63,10 +62,3 @@ def parse_delta_history(raw_history: list[dict]) -> list[DeltaVersionCommit]: ) for h in raw_history ] - - -def hash_source_versions(versions_json: str | None) -> str: - """Stable cache key for a virtual table's source-version state.""" - if not versions_json: - return "noversions" - return hashlib.sha256(versions_json.encode()).hexdigest() diff --git a/flowfile_core/flowfile_core/database/models.py b/flowfile_core/flowfile_core/database/models.py index 41968452c..a840294a8 100644 --- a/flowfile_core/flowfile_core/database/models.py +++ b/flowfile_core/flowfile_core/database/models.py @@ -8,7 +8,6 @@ Float, ForeignKey, Integer, - LargeBinary, String, Text, UniqueConstraint, @@ -365,11 +364,8 @@ class CatalogTable(Base): # Pydantic schemas: schemas/catalog_schema.py; interf # Virtual Table fields (NULL for physical tables) table_type = Column(String, nullable=False, default="physical", server_default="physical") producer_registration_id = Column(Integer, ForeignKey("flow_registrations.id"), nullable=True) - serialized_lazy_frame = Column(LargeBinary, nullable=True) # TODO Should be hashed is_optimized = Column(Boolean, nullable=True, default=False) sql_query = Column(Text, nullable=True) # SQL definition for query-based virtual tables - polars_plan = Column(Text, nullable=True) # Polars explain() plan for optimized virtual tables - source_table_versions = Column(Text, nullable=True) # JSON list of SourceTableVersion for staleness detection # Timestamps created_at = Column(DateTime, default=func.now(), nullable=False) diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py index 87bf29270..a6c1f5a21 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py @@ -247,20 +247,18 @@ def trigger_catalog_materialize( def trigger_resolve_virtual_table( table_id: int, plan_bytes: bytes, - source_versions_hash: str, ) -> dict: - """Ask the worker to materialise a flow-virtual table to its IPC cache. + """Ask the worker to materialise a flow-virtual table to disk. Ships *plan_bytes* (output of ``pl.LazyFrame.serialize()``); the worker - deserialises in a spawned child, collects, and writes IPC. Idempotent on - ``(table_id, source_versions_hash)``. + deserialises in a spawned child, collects, and writes IPC. Each call + re-executes the plan; the worker holds no cache. """ from base64 import b64encode payload = { "table_id": table_id, "plan_bytes": b64encode(plan_bytes).decode("ascii"), - "source_versions_hash": source_versions_hash, } response = requests.post(f"{WORKER_URL}/flow/resolve_virtual_table", json=payload, timeout=300) if not response.ok: @@ -379,7 +377,8 @@ def trigger_visualize_column_stats(worker_source: dict, column: str, limit: int) raise RuntimeError(f"Worker visualize_column_stats failed: {response.text}") data = response.json() logger.info( - "[viz] <- worker /catalog/visualize_column_stats session_key=%s status=%d cache_hit=%s value_count=%d truncated=%s", + "[viz] <- worker /catalog/visualize_column_stats session_key=%s status=%d " + "cache_hit=%s value_count=%d truncated=%s", session_key, response.status_code, data.get("cache_hit"), diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 64a9e3738..c698bb818 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -1,6 +1,5 @@ import datetime import functools -import io as _io import json import os import threading @@ -21,8 +20,9 @@ from pyarrow.parquet import ParquetFile from flowfile_core.catalog import CatalogService -from flowfile_core.catalog.delta_utils import check_source_versions_current, delete_table_storage, is_delta_table +from flowfile_core.catalog.delta_utils import delete_table_storage, is_delta_table from flowfile_core.catalog.repository import SQLAlchemyCatalogRepository +from flowfile_core.catalog.text_utils import is_table_reference from flowfile_core.configs import logger from flowfile_core.configs.app_settings import get_google_oauth_config from flowfile_core.configs.flow_logger import FlowLogger, NodeLogger @@ -349,21 +349,11 @@ def get_cloud_connection_settings( def _resolve_virtual_table( - is_optimized: bool, - serialized_lf: bytes | None, catalog_table_id: int, run_location: Literal["remote", "local"] | None = None, node_logger: NodeLogger = None, - source_table_versions: str | None = None, ) -> pl.LazyFrame: - """Resolve a virtual table to a LazyFrame. - - Optimized tables deserialize a stored execution plan if source table - versions are still current; otherwise falls back to re-executing - the producer flow via CatalogService. - """ - if is_optimized and serialized_lf and check_source_versions_current(source_table_versions): - return pl.LazyFrame.deserialize(_io.BytesIO(serialized_lf)) + """Resolve a virtual table to a LazyFrame by re-executing its producer flow.""" with get_db_context() as db: repo = SQLAlchemyCatalogRepository(db) svc = CatalogService(repo) @@ -371,10 +361,13 @@ def _resolve_virtual_table( class CatalogSqlTables(NamedTuple): - """Resolved catalog tables for SQL execution.""" + """Resolved catalog tables for SQL execution. + + ``virtual_tables`` maps logical table name -> ``(table_id, is_optimized)``. + """ table_paths: dict[str, str] - virtual_tables: dict[str, tuple[bool, bytes | None, int, str | None]] + virtual_tables: dict[str, tuple[int, bool]] def ml_flow_model_path(flow_id: int, train_node_id: int | str) -> Path: @@ -389,7 +382,7 @@ def ml_flow_model_path(flow_id: int, train_node_id: int | str) -> Path: def _resolve_catalog_sql_tables(node_id: int | str) -> CatalogSqlTables: """Resolve all catalog tables (physical Delta + virtual) for a SQL query node.""" table_paths: dict[str, str] = {} - virtual_tables: dict[str, tuple[bool, bytes | None, int, str | None]] = {} + virtual_tables: dict[str, tuple[int, bool]] = {} try: with get_db_context() as db: repo = SQLAlchemyCatalogRepository(db) @@ -404,12 +397,7 @@ def _resolve_catalog_sql_tables(node_id: int | str) -> CatalogSqlTables: ) seen_names.add(t.name) if t.table_type == "virtual": - virtual_tables[t.name] = ( - t.is_optimized, - t.serialized_lazy_frame, - t.id, - t.source_table_versions, - ) + virtual_tables[t.name] = (t.id, bool(t.is_optimized)) elif t.file_path and is_delta_table(Path(t.file_path)): table_paths[t.name] = t.file_path @@ -427,18 +415,14 @@ class CatalogTableInfo(NamedTuple): file_path: str | None table_type: str - serialized_lf: bytes | None is_optimized: bool - source_table_versions: str | None = None def _resolve_catalog_table_info(node_catalog_reader: "input_schema.NodeCatalogReader") -> CatalogTableInfo: """Resolve a single catalog table (physical or virtual) for a table reader node.""" file_path: str | None = None table_type: str = "physical" - serialized_lf: bytes | None = None is_optimized: bool = False - source_table_versions: str | None = None try: with get_db_context() as db: repo = SQLAlchemyCatalogRepository(db) @@ -467,8 +451,6 @@ def _resolve_catalog_table_info(node_catalog_reader: "input_schema.NodeCatalogRe table_type = table_record.table_type if table_type == "virtual": is_optimized = table_record.is_optimized - serialized_lf = table_record.serialized_lazy_frame - source_table_versions = table_record.source_table_versions else: file_path = table_record.file_path else: @@ -482,9 +464,7 @@ def _resolve_catalog_table_info(node_catalog_reader: "input_schema.NodeCatalogRe return CatalogTableInfo( file_path=file_path, table_type=table_type, - serialized_lf=serialized_lf, is_optimized=is_optimized, - source_table_versions=source_table_versions, ) @@ -579,71 +559,6 @@ def _register_catalog_table( raise -def _collect_source_table_versions(graph: "FlowGraph") -> str | None: - """Collect delta versions of upstream physical catalog tables used by this flow. - - For each catalog_reader node that reads a physical delta table, records - the current delta version. For optimized virtual table sources, includes - their transitive source_table_versions. - - Returns a JSON string of SourceTableVersion entries, or None if no sources found. - """ - from deltalake import DeltaTable as _DeltaTable - - from shared.delta_models import SourceTableVersion - - versions: list[SourceTableVersion] = [] - seen_table_ids: set[int] = set() - - # Collect all catalog_reader table IDs first - table_ids: list[int] = [] - for node in graph.nodes: - if node.node_type != "catalog_reader": - continue - setting = node.setting_input - table_id = getattr(setting, "catalog_table_id", None) - if not table_id or table_id in seen_table_ids: - continue - seen_table_ids.add(table_id) - table_ids.append(table_id) - - # Single DB session for all lookups - try: - with get_db_context() as db: - repo = SQLAlchemyCatalogRepository(db) - for table_id in table_ids: - table_record = repo.get_table(table_id) - if table_record is None: - continue - if table_record.table_type == "virtual": - # Include transitive versions from optimized virtual sources - if table_record.source_table_versions: - existing = json.loads(table_record.source_table_versions) - for entry in existing: - sv = SourceTableVersion(**entry) - if sv.table_id not in seen_table_ids: - seen_table_ids.add(sv.table_id) - versions.append(sv) - elif table_record.file_path and is_delta_table(table_record.file_path): - try: - current_version = _DeltaTable(table_record.file_path, without_files=True).version() - versions.append( - SourceTableVersion( - table_id=table_id, - file_path=table_record.file_path, - version=current_version, - ) - ) - except Exception: - logger.warning("Could not read delta version for source table %d", table_id, exc_info=True) - except Exception: - logger.warning("Could not collect source table versions", exc_info=True) - - if not versions: - return None - return json.dumps([v.model_dump() for v in versions]) - - def _handle_virtual_table_write( graph: "FlowGraph", node_catalog_writer: input_schema.NodeCatalogWriter, @@ -658,28 +573,9 @@ def _handle_virtual_table_write( "Open the flow from the catalog, or register it first." ) - serialized_lf: bytes | None = None - polars_plan: str | None = None - source_table_versions: str | None = None - changed_execution_mode = False - writer_node = graph.get_node(node_catalog_writer.node_id) is_lazy, _reasons = writer_node.check_upstream_laziness() - if is_lazy: - if graph.execution_mode != "performance": - graph.execution_mode = "performance" - graph.reset() - changed_execution_mode = True - incoming_node = graph.get_node(node_catalog_writer.node_id).node_inputs.main_inputs[0] - df = incoming_node.get_resulting_data() - polars_plan = df.data_frame.explain() - graph.flow_logger.info(f"creating a virtual table with: {polars_plan}") - buf = _io.BytesIO() - df.data_frame.serialize(buf) - serialized_lf = buf.getvalue() - source_table_versions = _collect_source_table_versions(graph) - else: - graph.flow_logger.info("creating a virtual table from workflow") + graph.flow_logger.info("creating a virtual table from workflow") schema_json = json.dumps([{"name": c.column_name, "dtype": c.data_type} for c in df.schema]) @@ -698,11 +594,8 @@ def _handle_virtual_table_write( name=settings.table_name or None, producer_registration_id=reg_id, description=settings.description, - serialized_lazy_frame=serialized_lf, is_optimized=is_lazy, schema_json=schema_json, - polars_plan=polars_plan, - source_table_versions=source_table_versions, ) else: svc.create_virtual_flow_table( @@ -711,15 +604,10 @@ def _handle_virtual_table_write( producer_registration_id=reg_id, namespace_id=settings.namespace_id, description=settings.description, - serialized_lazy_frame=serialized_lf, is_optimized=is_lazy, schema_json=schema_json, - polars_plan=polars_plan, - source_table_versions=source_table_versions, ) - if changed_execution_mode: - graph.execution_mode = "Development" return df @@ -3023,32 +2911,36 @@ def _add_catalog_sql_reader(self, node_catalog_reader: input_schema.NodeCatalogR sql_code = node_catalog_reader.sql_query resolved = _resolve_catalog_sql_tables(node_catalog_reader.node_id) table_paths = resolved.table_paths - virtual_tables = resolved.virtual_tables + # Only register virtuals actually referenced by the query — registering + # every virtual would force its producer flow to execute even when + # unused, which is both wasteful and a source of false errors when an + # unrelated virtual's producer can't be resolved. + referenced_virtuals = { + name: (tid, is_opt) + for name, (tid, is_opt) in resolved.virtual_tables.items() + if is_table_reference(name, sql_code or "") + } def _func() -> FlowDataEngine: - if not table_paths and not virtual_tables: + if not table_paths and not referenced_virtuals: raise ValueError("No catalog tables available to query") ctx = pl.SQLContext() for name, path in table_paths.items(): ctx.register(name, pl.scan_delta(path)) - for name, (is_opt, ser_lf, tid, stv) in virtual_tables.items(): + for name, (tid, _is_opt) in referenced_virtuals.items(): ctx.register( name, _resolve_virtual_table( - is_opt, - ser_lf, tid, node_logger=self.flow_logger.get_node_logger(node_catalog_reader.node_id), run_location=self.execution_location, - source_table_versions=stv, ), ) return FlowDataEngine(ctx.execute(sql_code)) - # todo: There are quite some round-trips happening here because the Flowgraph tries to predict the schema. is_virtual_optimized: bool | None = None - if virtual_tables: - is_virtual_optimized = all(is_opt for is_opt, _, _, _ in virtual_tables.values()) + if referenced_virtuals: + is_virtual_optimized = all(is_opt for _, is_opt in referenced_virtuals.values()) self.add_node_step( node_id=node_catalog_reader.node_id, @@ -3081,21 +2973,15 @@ def _add_catalog_table_reader(self, node_catalog_reader: input_schema.NodeCatalo resolved_path = info.file_path delta_version = node_catalog_reader.delta_version _table_type = info.table_type - _serialized_lf = info.serialized_lf - _is_optimized = info.is_optimized _catalog_table_id = node_catalog_reader.catalog_table_id - _source_table_versions = info.source_table_versions def _func() -> FlowDataEngine: if _table_type == "virtual": return FlowDataEngine( _resolve_virtual_table( - _is_optimized, - _serialized_lf, _catalog_table_id, node_logger=self.flow_logger.get_node_logger(node_catalog_reader.node_id), run_location=self.execution_location, - source_table_versions=_source_table_versions, ) ) diff --git a/flowfile_core/flowfile_core/routes/catalog.py b/flowfile_core/flowfile_core/routes/catalog.py index 137c1028c..474a7a7f5 100644 --- a/flowfile_core/flowfile_core/routes/catalog.py +++ b/flowfile_core/flowfile_core/routes/catalog.py @@ -898,18 +898,6 @@ def update_virtual_flow_table( ) -@router.post("/virtual-tables/{table_id}/resolve", response_model=CatalogTablePreview) -@handle_catalog_exceptions(TableNotFoundError="Virtual table not found", FlowNotFoundError="Producer flow not found") -def resolve_virtual_flow_table( - table_id: int, - limit: int = Query(100, ge=1, le=10000), - current_user=Depends(get_current_active_user), - service: CatalogService = Depends(get_catalog_service), -) -> CatalogTablePreview: - """Resolve a virtual flow table and return a preview of the result.""" - return service.resolve_virtual_flow_table_preview(table_id, limit, user_id=current_user.id) - - # --------------------------------------------------------------------------- # Query-based Virtual Tables # --------------------------------------------------------------------------- diff --git a/flowfile_core/flowfile_core/schemas/catalog_schema.py b/flowfile_core/flowfile_core/schemas/catalog_schema.py index 90eb5cba1..508ed5b46 100644 --- a/flowfile_core/flowfile_core/schemas/catalog_schema.py +++ b/flowfile_core/flowfile_core/schemas/catalog_schema.py @@ -266,8 +266,6 @@ class CatalogTableOut(BaseModel): is_optimized: bool | None = None laziness_blockers: list[str] | None = None sql_query: str | None = None - polars_plan: str | None = None - source_table_versions: str | None = None created_at: datetime updated_at: datetime diff --git a/flowfile_core/tests/flowfile/conftest.py b/flowfile_core/tests/flowfile/conftest.py index 21c2db89b..50cfa57fe 100644 --- a/flowfile_core/tests/flowfile/conftest.py +++ b/flowfile_core/tests/flowfile/conftest.py @@ -78,9 +78,9 @@ def sqlite_db(tmp_path): def catalog_cleanup(): """Remove all catalog / flow-registration rows so tests start clean. - Also wipes the worker's catalog_virtual_results cache so a previous - test's ``fvt-{table_id}-{hash}.arrow`` file doesn't satisfy the next - test's resolve request when fixtures recycle the same table id. + Also wipes the worker's catalog_virtual_results directory so a previous + test's ``fvt-{table_id}.arrow`` file doesn't leak into the next test + when fixtures recycle the same table id. """ from shared.storage_config import storage diff --git a/flowfile_core/tests/flowfile/test_catalog_flow_graph.py b/flowfile_core/tests/flowfile/test_catalog_flow_graph.py index 4b9f31d37..09019338f 100644 --- a/flowfile_core/tests/flowfile/test_catalog_flow_graph.py +++ b/flowfile_core/tests/flowfile/test_catalog_flow_graph.py @@ -7,7 +7,6 @@ - Round-trip: write → read → verify data integrity """ -import io as _io import os import tempfile from pathlib import Path @@ -750,22 +749,8 @@ def test_sql_query_no_tables_raises(self): class TestResolveVirtualTable: """Test _resolve_virtual_table helper.""" - def test_resolve_optimized_virtual_table(self): - """An optimized virtual table should deserialize the stored LazyFrame.""" - lf = pl.LazyFrame({"x": [1, 2, 3]}) - buf = _io.BytesIO() - lf.serialize(buf) - serialized = buf.getvalue() - result = _resolve_virtual_table( - is_optimized=True, serialized_lf=serialized, catalog_table_id=-1, run_location="local" - ) - - assert isinstance(result, pl.LazyFrame) - df = result.collect() - assert df["x"].to_list() == [1, 2, 3] - - def test_resolve_non_optimized_virtual_table(self): - """A non-optimized virtual table should call CatalogService.resolve_virtual_flow_table.""" + def test_resolve_virtual_table_delegates_to_service(self): + """_resolve_virtual_table should delegate to CatalogService.resolve_virtual_flow_table.""" expected_lf = pl.LazyFrame({"y": [10, 20]}) with patch("flowfile_core.flowfile.flow_graph.get_db_context") as mock_ctx: @@ -778,29 +763,11 @@ def test_resolve_non_optimized_virtual_table(self): mock_svc_instance.resolve_virtual_flow_table.return_value = expected_lf MockSvc.return_value = mock_svc_instance - result = _resolve_virtual_table(is_optimized=False, serialized_lf=None, catalog_table_id=42) + result = _resolve_virtual_table(catalog_table_id=42) assert isinstance(result, pl.LazyFrame) mock_svc_instance.resolve_virtual_flow_table.assert_called_once_with(42, run_location=None, node_logger=None) - def test_resolve_optimized_without_serialized_lf_falls_back(self): - """When is_optimized=True but serialized_lf is None, should fall back to service resolution.""" - expected_lf = pl.LazyFrame({"z": [5]}) - - with patch("flowfile_core.flowfile.flow_graph.get_db_context") as mock_ctx: - mock_db = MagicMock() - mock_ctx.return_value.__enter__ = MagicMock(return_value=mock_db) - mock_ctx.return_value.__exit__ = MagicMock(return_value=False) - - with patch("flowfile_core.flowfile.flow_graph.CatalogService") as MockSvc: - mock_svc_instance = MagicMock() - mock_svc_instance.resolve_virtual_flow_table.return_value = expected_lf - MockSvc.return_value = mock_svc_instance - - _resolve_virtual_table(is_optimized=True, serialized_lf=None, catalog_table_id=99) - - mock_svc_instance.resolve_virtual_flow_table.assert_called_once_with(99, run_location=None, node_logger=None) - class TestWriteCatalogDeltaLocal: """Test _write_catalog_delta_local helper.""" diff --git a/flowfile_core/tests/flowfile/test_virtual_catalog_writer.py b/flowfile_core/tests/flowfile/test_virtual_catalog_writer.py index e10094943..540de5b30 100644 --- a/flowfile_core/tests/flowfile/test_virtual_catalog_writer.py +++ b/flowfile_core/tests/flowfile/test_virtual_catalog_writer.py @@ -3,20 +3,17 @@ Covers: - Virtual catalog writer: creates virtual tables without materializing data - Virtual writer requires a source_registration_id -- Serialized lazy frame storage and is_optimized flag +- is_optimized flag on virtual tables - check_flow_laziness / check_upstream_laziness scoping - FlowRegistration.flow_path usage (regression for file_path AttributeError) -- Source table version tracking and staleness detection """ -import json import tempfile import polars as pl import pytest from flowfile_core.catalog import CatalogService -from flowfile_core.catalog.delta_utils import check_source_versions_current from flowfile_core.catalog.repository import SQLAlchemyCatalogRepository from flowfile_core.database.connection import get_db_context from flowfile_core.database.models import ( @@ -26,7 +23,6 @@ from flowfile_core.flowfile.flow_graph import add_connection from flowfile_core.schemas import input_schema from flowfile_core.schemas.transform_schema import BasicFilter, FilterInput, PivotInput -from shared.delta_models import SourceTableVersion from tests.flowfile.conftest import ( CATALOG_SAMPLE_DATA as SAMPLE_DATA, ) @@ -340,7 +336,6 @@ def test_virtual_writer_stores_serialized_lazyframe_when_lazy(self): assert len(tables) == 1 table = tables[0] assert table.table_type == "virtual" - assert table.serialized_lazy_frame is not None assert table.is_optimized is True def test_virtual_writer_non_optimized_when_upstream_eager(self): @@ -393,13 +388,6 @@ def test_virtual_writer_non_optimized_when_upstream_eager(self): class TestReadVirtualFlowTables: """Tests that virtual tables can be resolved and previewed.""" - def test_lazy_frame_table_preview(self, lazy_virtual_table_id, catalog_service): - """Preview of an optimized virtual table should return filtered rows.""" - preview = catalog_service.get_table_preview(lazy_virtual_table_id) - assert len(preview.columns) > 0 - # Filter is age > 28, so Alice (30) and Charlie (35) match - assert len(preview.rows) == 2 - def test_lazy_frame_resolve(self, lazy_virtual_table_id, catalog_service): """resolve_virtual_flow_table on an optimized table returns a LazyFrame.""" lf = catalog_service.resolve_virtual_flow_table(lazy_virtual_table_id) @@ -407,13 +395,6 @@ def test_lazy_frame_resolve(self, lazy_virtual_table_id, catalog_service): df = lf.collect() assert df.height == 2 - def test_eager_frame_table_preview(self, eager_virtual_table_id, catalog_service): - """Preview of a non-optimized (eager) virtual table should re-execute the flow.""" - preview = catalog_service.get_table_preview(eager_virtual_table_id) - assert len(preview.columns) > 0 - # Pivot of 3 rows by name produces 3 rows - assert len(preview.rows) == 3 - def test_eager_frame_resolve(self, eager_virtual_table_id, catalog_service): """resolve_virtual_flow_table on a non-optimized table re-executes and returns LazyFrame.""" lf = catalog_service.resolve_virtual_flow_table(eager_virtual_table_id) @@ -482,7 +463,6 @@ def test_resolve_query_virtual_only_materializes_referenced_tables( owner_id=1, producer_registration_id=other_reg_id, namespace_id=ns_id, - serialized_lazy_frame=b"\x00", is_optimized=True, ) svc.create_query_virtual_table( @@ -534,7 +514,6 @@ def test_sql_query_filter_ignores_alias_lookalike( owner_id=1, producer_registration_id=reg_id, namespace_id=ns_id, - serialized_lazy_frame=b"\x00", is_optimized=True, ) @@ -582,7 +561,6 @@ def test_sql_query_does_not_materialize_unreferenced_virtuals( owner_id=1, producer_registration_id=reg_id, namespace_id=ns_id, - serialized_lazy_frame=b"\x00", # invalid bytes on purpose is_optimized=True, ) @@ -1016,201 +994,3 @@ def test_sql_reader_not_optimized_when_source_virtual_table_is_not_optimized(sel _run_graph(graph) -# --------------------------------------------------------------------------- -# Source table version tracking tests -# --------------------------------------------------------------------------- - - -class TestCheckSourceVersionsCurrent: - """Unit tests for the check_source_versions_current utility.""" - - def test_none_returns_true(self): - """Backward compat: None source_table_versions should return True.""" - assert check_source_versions_current(None) is True - - def test_empty_string_returns_true(self): - """Empty string should return True.""" - assert check_source_versions_current("") is True - - def test_empty_list_returns_true(self): - """Empty JSON list should return True (no sources to check).""" - assert check_source_versions_current("[]") is True - - def test_invalid_json_returns_false(self): - """Malformed JSON should return False (treat as stale).""" - assert check_source_versions_current("not json") is False - - def test_matching_versions_returns_true(self): - """When source delta table versions match, should return True.""" - from shared.delta_utils import write_delta - - with tempfile.TemporaryDirectory() as tmp_dir: - delta_path = f"{tmp_dir}/test_table" - df = pl.DataFrame({"x": [1, 2, 3]}) - write_delta(df, delta_path, mode="overwrite") - - from deltalake import DeltaTable - - current_version = DeltaTable(delta_path, without_files=True).version() - - versions_json = json.dumps( - [SourceTableVersion(table_id=1, file_path=delta_path, version=current_version).model_dump()] - ) - assert check_source_versions_current(versions_json) is True - - def test_stale_version_returns_false(self): - """When source delta table has been updated, should return False.""" - from shared.delta_utils import write_delta - - with tempfile.TemporaryDirectory() as tmp_dir: - delta_path = f"{tmp_dir}/test_table" - df = pl.DataFrame({"x": [1, 2, 3]}) - write_delta(df, delta_path, mode="overwrite") - - # Record version 0 - versions_json = json.dumps([SourceTableVersion(table_id=1, file_path=delta_path, version=0).model_dump()]) - - # Write again to bump version - write_delta(pl.DataFrame({"x": [4, 5, 6]}), delta_path, mode="overwrite") - - assert check_source_versions_current(versions_json) is False - - def test_missing_file_path_returns_false(self): - """When the delta table path doesn't exist, should return False.""" - versions_json = json.dumps( - [SourceTableVersion(table_id=1, file_path="/nonexistent/path", version=0).model_dump()] - ) - assert check_source_versions_current(versions_json) is False - - -class TestSourceTableVersionCapture: - """Test that source_table_versions are captured when creating virtual tables - from flows that read physical delta catalog tables.""" - - def test_virtual_table_from_delta_source_stores_versions(self): - """A virtual table created from a flow reading a physical delta table - should have source_table_versions populated.""" - from shared.delta_utils import write_delta - - ns_id = _create_namespace() - - # Create a physical delta catalog table - with tempfile.TemporaryDirectory() as tmp_dir: - delta_path = f"{tmp_dir}/source_delta" - df = pl.DataFrame({"name": ["Alice", "Bob"], "age": [30, 25]}) - write_delta(df, delta_path, mode="overwrite") - - with get_db_context() as db: - repo = SQLAlchemyCatalogRepository(db) - svc = CatalogService(repo) - physical_table = svc.register_table_from_data( - name="source_physical", - table_path=delta_path, - owner_id=1, - namespace_id=ns_id, - storage_format="delta", - schema=[{"name": "name", "dtype": "Utf8"}, {"name": "age", "dtype": "Int64"}], - row_count=2, - column_count=2, - size_bytes=100, - ) - physical_table_id = physical_table.id - - # Create a flow that reads from the physical table and writes a virtual table - with tempfile.NamedTemporaryFile(suffix=".yaml", delete=False) as f: - flow_path = f.name - reg_id = _create_flow_registration(ns_id, name="version_tracking_flow", path=flow_path) - graph = _create_graph(source_registration_id=reg_id) - - # Node 1: catalog reader - promise_reader = input_schema.NodePromise(flow_id=graph.flow_id, node_id=1, node_type="catalog_reader") - graph.add_node_promise(promise_reader) - reader = input_schema.NodeCatalogReader( - flow_id=graph.flow_id, - node_id=1, - catalog_table_id=physical_table_id, - ) - graph.add_catalog_reader(reader) - - # Node 2: filter (lazy) - promise_filter = input_schema.NodePromise(flow_id=graph.flow_id, node_id=2, node_type="filter") - graph.add_node_promise(promise_filter) - filter_settings = input_schema.NodeFilter( - flow_id=graph.flow_id, - node_id=2, - depending_on_id=1, - filter_input=FilterInput( - mode="basic", - basic_filter=BasicFilter(field="age", operator="greater_than", value="20"), - ), - ) - graph.add_filter(filter_settings) - add_connection(graph, input_schema.NodeConnection.create_from_simple_input(from_id=1, to_id=2)) - - # Node 3: virtual catalog writer - _add_catalog_writer( - graph, - node_id=3, - depending_on_id=2, - table_name="version_tracked_virtual", - namespace_id=ns_id, - write_mode="virtual", - ) - - _run_graph(graph) - - # Verify source_table_versions is populated - with get_db_context() as db: - repo = SQLAlchemyCatalogRepository(db) - tables = repo.list_tables(namespace_id=ns_id) - vt = next(t for t in tables if t.name == "version_tracked_virtual") - assert vt.is_optimized is True - assert vt.source_table_versions is not None - - versions = json.loads(vt.source_table_versions) - assert len(versions) == 1 - assert versions[0]["table_id"] == physical_table_id - assert versions[0]["file_path"] == delta_path - assert isinstance(versions[0]["version"], int) - - def test_virtual_table_without_catalog_source_has_no_versions(self): - """A virtual table from a flow with only manual input should have - source_table_versions=None (no delta sources to track).""" - ns_id = _create_namespace() - reg_id = _create_flow_registration(ns_id, name="no_source_flow") - graph = _create_graph(source_registration_id=reg_id) - - _add_manual_input(graph, SAMPLE_DATA, node_id=1) - - # Node 2: filter (lazy) - promise_filter = input_schema.NodePromise(flow_id=graph.flow_id, node_id=2, node_type="filter") - graph.add_node_promise(promise_filter) - filter_settings = input_schema.NodeFilter( - flow_id=graph.flow_id, - node_id=2, - depending_on_id=1, - filter_input=FilterInput( - mode="basic", - basic_filter=BasicFilter(field="age", operator="greater_than", value="28"), - ), - ) - graph.add_filter(filter_settings) - add_connection(graph, input_schema.NodeConnection.create_from_simple_input(from_id=1, to_id=2)) - - _add_catalog_writer( - graph, - node_id=3, - depending_on_id=2, - table_name="no_source_virtual", - namespace_id=ns_id, - write_mode="virtual", - ) - - _run_graph(graph) - - with get_db_context() as db: - repo = SQLAlchemyCatalogRepository(db) - tables = repo.list_tables(namespace_id=ns_id) - vt = next(t for t in tables if t.name == "no_source_virtual") - assert vt.is_optimized is True - assert vt.source_table_versions is None diff --git a/flowfile_core/tests/test_catalog_visualizations.py b/flowfile_core/tests/test_catalog_visualizations.py index 271859285..4f816686e 100644 --- a/flowfile_core/tests/test_catalog_visualizations.py +++ b/flowfile_core/tests/test_catalog_visualizations.py @@ -449,7 +449,6 @@ def test_compute_flow_virtual_table_ships_ipc_path(self, client): owner_id=1, table_type="virtual", producer_registration_id=None, - source_table_versions=None, ) db.add(table) db.commit() @@ -480,12 +479,9 @@ def fake_trigger(worker_source, payload, max_rows): def fake_resolve(self, table_id, **kwargs): return pl.LazyFrame({"x": [1, 2, 3]}) - def fake_resolve_virtual(table_id, plan_bytes, source_versions_hash): - captured["resolve"] = { - "table_id": table_id, - "hash": source_versions_hash, - } - return {"ipc_path": f"fvt-{table_id}-noversions00000.arrow", "mtime": 1234.5, "row_count": 3} + def fake_resolve_virtual(table_id, plan_bytes): + captured["resolve"] = {"table_id": table_id} + return {"ipc_path": f"fvt-{table_id}.arrow", "mtime": 1234.5, "row_count": 3} with ( patch.object(svc_module.CatalogService, "resolve_virtual_flow_table", fake_resolve), @@ -496,8 +492,7 @@ def fake_resolve_virtual(table_id, plan_bytes, source_versions_hash): assert resp.status_code == 200, resp.text assert captured["source"]["kind"] == "ipc_path" - assert captured["source"]["ipc_path"].startswith(f"fvt-{tid}-") + assert captured["source"]["ipc_path"] == f"fvt-{tid}.arrow" assert captured["source"]["mtime"] == 1234.5 assert captured["source"]["session_key"] == f"fvt:{tid}:1234" assert captured["resolve"]["table_id"] == tid - assert captured["resolve"]["hash"] == "noversions" diff --git a/flowfile_frontend/src/renderer/app/api/catalog.api.ts b/flowfile_frontend/src/renderer/app/api/catalog.api.ts index e98f3cd08..cd920ab1d 100644 --- a/flowfile_frontend/src/renderer/app/api/catalog.api.ts +++ b/flowfile_frontend/src/renderer/app/api/catalog.api.ts @@ -273,18 +273,6 @@ export class CatalogApi { return response.data; } - static async resolveVirtualTable( - tableId: number, - limit = 100, - ): Promise { - const response = await axios.post( - `/catalog/virtual-tables/${tableId}/resolve`, - null, - { params: { limit } }, - ); - return response.data; - } - // ====== Query-based Virtual Tables ====== static async createQueryVirtualTable(body: QueryVirtualTableCreate): Promise { diff --git a/flowfile_frontend/src/renderer/app/types/catalog.types.ts b/flowfile_frontend/src/renderer/app/types/catalog.types.ts index f7413a774..d2f5dd604 100644 --- a/flowfile_frontend/src/renderer/app/types/catalog.types.ts +++ b/flowfile_frontend/src/renderer/app/types/catalog.types.ts @@ -177,8 +177,6 @@ export interface CatalogTable { is_optimized: boolean | null; laziness_blockers: string[] | null; sql_query: string | null; - polars_plan: string | null; - source_table_versions: string | null; created_at: string; updated_at: string; } diff --git a/flowfile_frontend/src/renderer/app/views/CatalogView/TableDetailPanel.vue b/flowfile_frontend/src/renderer/app/views/CatalogView/TableDetailPanel.vue index 1fd3b61b6..8318e6617 100644 --- a/flowfile_frontend/src/renderer/app/views/CatalogView/TableDetailPanel.vue +++ b/flowfile_frontend/src/renderer/app/views/CatalogView/TableDetailPanel.vue @@ -159,12 +159,6 @@
{{ table.sql_query }}
- -
-

Polars Query Plan

-
{{ table.polars_plan }}
-
-

Version History

diff --git a/flowfile_worker/flowfile_worker/funcs.py b/flowfile_worker/flowfile_worker/funcs.py index 618e8a812..9cd4a789b 100644 --- a/flowfile_worker/flowfile_worker/funcs.py +++ b/flowfile_worker/flowfile_worker/funcs.py @@ -32,10 +32,6 @@ def _validate_virtual_results_path(name: str) -> Path: return validate_catalog_path(name, storage.catalog_virtual_results_directory) -def _row_count_ipc(p: Path) -> int: - return int(pl.scan_ipc(str(p)).select(pl.len()).collect().item()) - - def _get_delta_size_bytes(delta_dir: Path) -> int: """Delegate to ``shared.delta_utils.get_delta_size_bytes``.""" return get_delta_size_bytes(delta_dir) @@ -939,16 +935,14 @@ def _resolve_virtual_table_child(plan_bytes: bytes, target_path: str, queue: Que def resolve_virtual_table(req: models.ResolveVirtualTableRequest) -> models.ResolveVirtualTableResponse: - """Materialise a virtual flow table to disk; idempotent on (table_id, source_versions_hash).""" + """Materialise a virtual flow table to disk by collecting the supplied plan in a subprocess. + + No caching: every call re-executes the plan and overwrites + ``fvt-{table_id}.arrow`` atomically. + """ target_dir = storage.catalog_virtual_results_directory target_dir.mkdir(parents=True, exist_ok=True) - target = target_dir / f"fvt-{req.table_id}-{req.source_versions_hash[:16]}.arrow" - if target.exists(): - return models.ResolveVirtualTableResponse( - ipc_path=target.name, - mtime=target.stat().st_mtime, - row_count=_row_count_ipc(target), - ) + target = target_dir / f"fvt-{req.table_id}.arrow" queue: Queue = mp_context.Queue(maxsize=1) p = mp_context.Process( diff --git a/flowfile_worker/flowfile_worker/models.py b/flowfile_worker/flowfile_worker/models.py index 697a4d7c8..dbf8bddbb 100644 --- a/flowfile_worker/flowfile_worker/models.py +++ b/flowfile_worker/flowfile_worker/models.py @@ -261,7 +261,6 @@ class ResolveVirtualTableRequest(BaseModel): table_id: int plan_bytes: Base64Bytes - source_versions_hash: str class ResolveVirtualTableResponse(BaseModel): diff --git a/flowfile_worker/flowfile_worker/routes.py b/flowfile_worker/flowfile_worker/routes.py index 8af2382c7..150062ac4 100644 --- a/flowfile_worker/flowfile_worker/routes.py +++ b/flowfile_worker/flowfile_worker/routes.py @@ -445,8 +445,7 @@ def create_table( def resolve_virtual_table(payload: models.ResolveVirtualTableRequest) -> models.ResolveVirtualTableResponse: """Materialise a flow-virtual table from a serialised polars plan. - Idempotent on ``(table_id, source_versions_hash)`` — repeated calls return - the same IPC file without re-executing the producer plan. + Every call re-executes the plan and overwrites the per-table IPC file. """ try: return funcs.resolve_virtual_table(payload) diff --git a/flowfile_worker/tests/test_catalog_visualize.py b/flowfile_worker/tests/test_catalog_visualize.py index 36ddce49c..daf7c669e 100644 --- a/flowfile_worker/tests/test_catalog_visualize.py +++ b/flowfile_worker/tests/test_catalog_visualize.py @@ -581,7 +581,7 @@ def test_visualize_query_endpoint_ipc_path(tmp_path): target_dir = storage.catalog_virtual_results_directory target_dir.mkdir(parents=True, exist_ok=True) - ipc_name = "fvt-1-deadbeefdeadbeef.arrow" + ipc_name = "fvt-1.arrow" pl.DataFrame({"category": ["a", "b", "a"], "value": [1, 2, 3]}).write_ipc(str(target_dir / ipc_name)) client = TestClient(main.app) @@ -610,7 +610,7 @@ def test_visualize_query_sql_with_virtual_refs(tmp_path): target_dir = storage.catalog_virtual_results_directory target_dir.mkdir(parents=True, exist_ok=True) - ipc_name = "fvt-2-cafebabecafebabe.arrow" + ipc_name = "fvt-2.arrow" pl.DataFrame({"category": ["a", "b"], "boost": [10, 20]}).write_ipc(str(target_dir / ipc_name)) client = TestClient(main.app) diff --git a/flowfile_worker/tests/test_resolve_virtual_table.py b/flowfile_worker/tests/test_resolve_virtual_table.py index 414cfd45d..ca66eeae9 100644 --- a/flowfile_worker/tests/test_resolve_virtual_table.py +++ b/flowfile_worker/tests/test_resolve_virtual_table.py @@ -2,7 +2,6 @@ from __future__ import annotations from base64 import b64encode -from unittest.mock import patch import polars as pl import pytest @@ -34,7 +33,6 @@ def test_resolve_virtual_table_round_trip(): payload = { "table_id": 7, "plan_bytes": b64encode(_plan_bytes()).decode("ascii"), - "source_versions_hash": "deadbeef" * 4, } r = client.post("/flow/resolve_virtual_table", json=payload) @@ -42,8 +40,7 @@ def test_resolve_virtual_table_round_trip(): body = r.json() assert body["row_count"] == 3 assert body["mtime"] > 0 - assert body["ipc_path"].startswith("fvt-7-") - assert body["ipc_path"].endswith(".arrow") + assert body["ipc_path"] == "fvt-7.arrow" target = storage.catalog_virtual_results_directory / body["ipc_path"] assert target.exists() @@ -53,33 +50,23 @@ def test_resolve_virtual_table_round_trip(): @pytest.mark.worker -def test_resolve_virtual_table_is_idempotent_on_versions_hash(): - """Second call with the same (table_id, source_versions_hash) is a cache hit. - - The cache file's mtime must not change and the spawn helper must not run. - """ +def test_resolve_virtual_table_overwrites_on_repeat_call(): + """Repeat calls re-execute the plan and overwrite the per-table IPC file.""" client = TestClient(main.app) payload = { "table_id": 11, "plan_bytes": b64encode(_plan_bytes()).decode("ascii"), - "source_versions_hash": "cafebabe" * 4, } r1 = client.post("/flow/resolve_virtual_table", json=payload) assert r1.status_code == 200, r1.text first = r1.json() - target = storage.catalog_virtual_results_directory / first["ipc_path"] - assert target.exists() - - with patch("flowfile_worker.funcs.mp_context") as mock_mp: - r2 = client.post("/flow/resolve_virtual_table", json=payload) - assert mock_mp.Process.call_count == 0 - assert mock_mp.Queue.call_count == 0 + r2 = client.post("/flow/resolve_virtual_table", json=payload) assert r2.status_code == 200, r2.text second = r2.json() + assert second["ipc_path"] == first["ipc_path"] - assert second["mtime"] == first["mtime"] assert second["row_count"] == first["row_count"] @@ -89,7 +76,6 @@ def test_resolve_virtual_table_invalid_plan_returns_500(): payload = { "table_id": 99, "plan_bytes": b64encode(b"not a valid polars plan").decode("ascii"), - "source_versions_hash": "facade01" * 4, } r = client.post("/flow/resolve_virtual_table", json=payload) assert r.status_code == 500 @@ -102,7 +88,6 @@ def test_resolve_virtual_table_function_path_normalises_under_results_dir(): req = models.ResolveVirtualTableRequest( table_id=42, plan_bytes=_plan_bytes(), - source_versions_hash="abcdef01" * 4, ) res = funcs.resolve_virtual_table(req) target = storage.catalog_virtual_results_directory / res.ipc_path diff --git a/shared/delta_models.py b/shared/delta_models.py index b287f4bab..2fdaaa49a 100644 --- a/shared/delta_models.py +++ b/shared/delta_models.py @@ -12,11 +12,3 @@ class DeltaVersionCommit(BaseModel): timestamp: str | None = None operation: str | None = None parameters: dict | None = None - - -class SourceTableVersion(BaseModel): - """Delta version of a source catalog table captured when a virtual table plan was generated.""" - - table_id: int - file_path: str - version: int