Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Drop optimized virtual-table plan storage columns.

Removes ``serialized_lazy_frame``, ``polars_plan``, and ``source_table_versions``
from ``catalog_tables``. These backed the optimized virtual-table read path,
which has been collapsed into the unified ``resolve_virtual_flow_table`` flow.
``is_optimized`` is kept as a derived "producer flow is fully lazy" indicator
that drives the laziness-blocker propagation.

Revision ID: 011
Revises: 010
Create Date: 2026-05-01
"""

from collections.abc import Sequence

import sqlalchemy as sa
from alembic import op

revision: str = "011"
down_revision: str | None = "010"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
with op.batch_alter_table("catalog_tables", recreate="auto") as batch_op:
batch_op.drop_column("source_table_versions")
batch_op.drop_column("polars_plan")
batch_op.drop_column("serialized_lazy_frame")


def downgrade() -> None:
op.add_column("catalog_tables", sa.Column("serialized_lazy_frame", sa.LargeBinary, nullable=True))
op.add_column("catalog_tables", sa.Column("polars_plan", sa.Text, nullable=True))
op.add_column("catalog_tables", sa.Column("source_table_versions", sa.Text, nullable=True))
40 changes: 0 additions & 40 deletions flowfile_core/flowfile_core/catalog/delta_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,58 +6,18 @@

from __future__ import annotations

import json
import logging
import shutil
from pathlib import Path

import pyarrow as pa
from deltalake import DeltaTable

from shared.delta_models import SourceTableVersion
from shared.delta_utils import get_delta_size_bytes

logger = logging.getLogger(__name__)


def check_source_versions_current(source_table_versions_json: str | None) -> bool:
"""Return True if all source delta tables are still at their recorded versions.

Returns True when no versions are recorded (backward compat for existing virtual tables).
Returns False if any source table has been updated, deleted, or is unreadable.

Note: There is a known TOCTOU (time-of-check-time-of-use) race condition here.
A source table could be updated between this staleness check and the actual
query execution (collect). This is acceptable for now but means stale reads
are possible in rare concurrent-write scenarios.
"""
if not source_table_versions_json:
return True
try:
raw = json.loads(source_table_versions_json)
versions = [SourceTableVersion(**entry) for entry in raw]
except (ValueError, KeyError, TypeError):
logger.warning("Could not parse source_table_versions JSON, treating as stale")
return False

for sv in versions:
try:
current_version = DeltaTable(sv.file_path, without_files=True).version()
if current_version != sv.version:
logger.info(
"Source table %d at %s changed: expected version %d, current %d",
sv.table_id,
sv.file_path,
sv.version,
current_version,
)
return False
except Exception:
logger.warning("Could not read delta version for source table %d at %s", sv.table_id, sv.file_path)
return False
return True


def is_delta_table(path: str | Path) -> bool:
"""Return ``True`` if *path* is a directory containing ``_delta_log/``."""
p = Path(path)
Expand Down
24 changes: 1 addition & 23 deletions flowfile_core/flowfile_core/catalog/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

# Re-exports preserved so external callers / tests that still reach for the
# underscore-prefixed names continue to work.
from flowfile_core.catalog.text_utils import hash_source_versions as _hash_source_versions # noqa: F401
from flowfile_core.catalog.text_utils import is_table_reference as _is_table_reference # noqa: F401
from flowfile_core.catalog.text_utils import parse_delta_history as _parse_delta_history # noqa: F401
from flowfile_core.catalog.text_utils import rewrite_qualified_references as _rewrite_qualified_references # noqa: F401
Expand Down Expand Up @@ -148,7 +147,7 @@ def __init__(self, repo: CatalogRepository) -> None:
self._sql.bind(virtual_tables=self._virtual_tables)
self._virtual_tables.bind(sql=self._sql)

self._previews = TablePreviewService(repo, self._tables, self._virtual_tables)
self._previews = TablePreviewService(repo, self._tables)
self._visualizations = VisualizationService(
repo, self._namespaces, self._tables, self._virtual_tables, self._sql
)
Expand Down Expand Up @@ -634,11 +633,8 @@ def create_virtual_flow_table(
producer_registration_id: int,
namespace_id: int | None = None,
description: str | None = None,
serialized_lazy_frame: bytes | None = None,
is_optimized: bool = False,
schema_json: str | None = None,
polars_plan: str | None = None,
source_table_versions: str | None = None,
) -> CatalogTableOut:
"""Create a virtual flow table (non-materialised catalog entry)."""
return self._virtual_tables.create_virtual_flow_table(
Expand All @@ -647,11 +643,8 @@ def create_virtual_flow_table(
producer_registration_id,
namespace_id,
description,
serialized_lazy_frame,
is_optimized,
schema_json,
polars_plan,
source_table_versions,
)

def update_virtual_flow_table(
Expand All @@ -661,11 +654,8 @@ def update_virtual_flow_table(
description: str | None = None,
namespace_id: int | None = None,
producer_registration_id: int | None = None,
serialized_lazy_frame: bytes | None = None,
is_optimized: bool | None = None,
schema_json: str | None = None,
polars_plan: str | None = None,
source_table_versions: str | None = None,
) -> CatalogTableOut:
"""Update a virtual flow table's metadata or producer."""
return self._virtual_tables.update_virtual_flow_table(
Expand All @@ -674,11 +664,8 @@ def update_virtual_flow_table(
description,
namespace_id,
producer_registration_id,
serialized_lazy_frame,
is_optimized,
schema_json,
polars_plan,
source_table_versions,
)

def create_query_virtual_table(
Expand Down Expand Up @@ -747,15 +734,6 @@ def get_table_preview(
"""Read the first N rows from a catalog table (physical, virtual or Delta-versioned)."""
return self._previews.get_table_preview(table_id, limit, version, user_id)

def resolve_virtual_flow_table_preview(
self,
table_id: int,
limit: int,
user_id: int | None = None,
) -> CatalogTablePreview:
"""Resolve a virtual flow table and return a preview (worker-backed)."""
return self._previews.resolve_virtual_flow_table_preview(table_id, limit, user_id)

def get_table_history(self, table_id: int, limit: int | None = None) -> DeltaTableHistory:
"""Return the version history for a Delta catalog table."""
return self._previews.get_table_history(table_id, limit)
Expand Down
92 changes: 13 additions & 79 deletions flowfile_core/flowfile_core/catalog/services/previews.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
"""Previews for physical and virtual tables, plus Delta history."""
"""Previews for physical tables and Delta history.

Virtual tables don't have a preview path: the catalog UI shows a
"no data preview available" placeholder for them and never calls these
endpoints, so ``get_table_preview`` returns an empty preview when handed
a virtual table id.
"""

from __future__ import annotations

import logging
from pathlib import Path

import polars as pl
from deltalake import DeltaTable

from flowfile_core.catalog.delta_utils import (
Expand All @@ -16,23 +21,18 @@
from flowfile_core.catalog.exceptions import TableNotFoundError
from flowfile_core.catalog.repository import CatalogRepository
from flowfile_core.catalog.serializers import format_pyarrow_preview
from flowfile_core.catalog.services._resolve import resolve_or_log
from flowfile_core.catalog.services.tables import TableService
from flowfile_core.catalog.services.virtual_tables import VirtualTableService
from flowfile_core.catalog.text_utils import hash_source_versions, parse_delta_history
from flowfile_core.catalog.text_utils import parse_delta_history
from flowfile_core.database.models import CatalogTable
from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import (
trigger_delta_history,
trigger_delta_version_preview,
trigger_resolve_virtual_table,
)
from flowfile_core.schemas.catalog_schema import (
CatalogTablePreview,
DeltaTableHistory,
)
from flowfile_core.utils.arrow_reader import read_top_n
from shared.delta_utils import validate_catalog_path
from shared.storage_config import storage

logger = logging.getLogger(__name__)

Expand All @@ -45,17 +45,15 @@ def _should_offload() -> bool:


class TablePreviewService:
"""Owns table preview fetching (physical, Delta-versioned, virtual) and Delta history."""
"""Owns physical/Delta-versioned table preview fetching and Delta history."""

def __init__(
self,
repo: CatalogRepository,
tables: TableService,
virtual_tables: VirtualTableService,
) -> None:
self.repo = repo
self._tables = tables
self._virtual_tables = virtual_tables

def get_table_preview(
self,
Expand All @@ -64,79 +62,15 @@ def get_table_preview(
version: int | None = None,
user_id: int | None = None,
) -> CatalogTablePreview:
"""Read the first N rows from a catalog table."""
table = self.repo.get_table(table_id)
if table is None:
raise TableNotFoundError(table_id=table_id)

if getattr(table, "table_type", "physical") == "virtual":
if getattr(table, "sql_query", None):
return self._get_query_virtual_table_preview(table, limit, user_id)
return self._get_virtual_table_preview(table, limit, user_id)

return self._get_physical_table_preview(table, limit, version)

def _format_virtual_preview(
self,
lazy_frame: pl.LazyFrame,
table: CatalogTable,
limit: int,
) -> CatalogTablePreview:
"""Materialise *lazy_frame* on the worker, read top-N rows back as PyArrow.
"""Read the first N rows from a catalog table.

Honours the core-never-collects rule — the plan is shipped to the
worker, written as IPC, then read back via ``read_top_n``.
Virtual tables fall through to ``_get_physical_table_preview`` and
return an empty preview (no ``file_path``).
"""
versions_hash = hash_source_versions(table.source_table_versions)
result = trigger_resolve_virtual_table(table.id, lazy_frame.serialize(), versions_hash)
ipc_path = validate_catalog_path(result["ipc_path"], storage.catalog_virtual_results_directory)
pa_table = read_top_n(str(ipc_path), n=limit)
return format_pyarrow_preview(pa_table, total_rows=result.get("row_count"))

def _get_query_virtual_table_preview(
self,
table: CatalogTable,
limit: int,
user_id: int | None,
) -> CatalogTablePreview:
"""Resolve a query-based virtual table and return a preview."""
lazy_frame = resolve_or_log(
lambda: self._virtual_tables.resolve_query_virtual_table(table.id, user_id=user_id),
kind="query virtual table for preview",
identifier=table.id,
)
if lazy_frame is None:
return CatalogTablePreview(columns=[], dtypes=[], rows=[], total_rows=0)
return self._format_virtual_preview(lazy_frame, table, limit)

def _get_virtual_table_preview(
self,
table: CatalogTable,
limit: int,
user_id: int | None,
) -> CatalogTablePreview:
"""Resolve a virtual flow table and return a preview of the collected result."""
lazy_frame = resolve_or_log(
lambda: self._virtual_tables.resolve_virtual_flow_table(table.id, user_id=user_id),
kind="virtual flow table for preview",
identifier=table.id,
)
if lazy_frame is None:
return CatalogTablePreview(columns=[], dtypes=[], rows=[], total_rows=0)
return self._format_virtual_preview(lazy_frame, table, limit)

def resolve_virtual_flow_table_preview(
self,
table_id: int,
limit: int,
user_id: int | None = None,
) -> CatalogTablePreview:
"""Resolve a virtual flow table and return a preview (worker-backed)."""
table = self.repo.get_table(table_id)
if table is None:
raise TableNotFoundError(table_id=table_id)
lazy_frame = self._virtual_tables.resolve_virtual_flow_table(table_id, user_id=user_id)
return self._format_virtual_preview(lazy_frame, table, limit)
return self._get_physical_table_preview(table, limit, version)

def _get_physical_table_preview(
self,
Expand Down
4 changes: 1 addition & 3 deletions flowfile_core/flowfile_core/catalog/services/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from flowfile_core.catalog.services._resolve import resolve_or_log
from flowfile_core.catalog.services.flows import FlowRegistrationService
from flowfile_core.catalog.text_utils import (
hash_source_versions,
is_table_reference,
rewrite_qualified_references,
)
Expand Down Expand Up @@ -117,8 +116,7 @@ def execute_sql_query(
def _materialise_virtual_for_sql(self, virtual_id: int, user_id: int | None) -> str:
"""Resolve a virtual table to an IPC path for the SQL worker call."""
lazy_frame = self._resolve_virtual_flow_table_via_facade(virtual_id, user_id=user_id, run_location="remote")
versions_hash = hash_source_versions(self.repo.get_table(virtual_id).source_table_versions)
result = trigger_resolve_virtual_table(virtual_id, lazy_frame.serialize(), versions_hash)
result = trigger_resolve_virtual_table(virtual_id, lazy_frame.serialize())
return result["ipc_path"]

def save_sql_query_as_flow(
Expand Down
4 changes: 0 additions & 4 deletions flowfile_core/flowfile_core/catalog/services/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ def table_to_out(
is_optimized=getattr(table, "is_optimized", None),
laziness_blockers=laziness_blockers,
sql_query=getattr(table, "sql_query", None),
polars_plan=getattr(table, "polars_plan", None),
source_table_versions=getattr(table, "source_table_versions", None),
created_at=table.created_at,
updated_at=table.updated_at,
)
Expand Down Expand Up @@ -369,8 +367,6 @@ def bulk_enrich_tables(self, tables: list[CatalogTable], user_id: int) -> list[C
producer_registration_name=producer_registration_name,
is_optimized=getattr(table, "is_optimized", None),
sql_query=getattr(table, "sql_query", None),
polars_plan=getattr(table, "polars_plan", None),
source_table_versions=getattr(table, "source_table_versions", None),
created_at=table.created_at,
updated_at=table.updated_at,
)
Expand Down
Loading
Loading