diff --git a/flowfile/flowfile/__init__.py b/flowfile/flowfile/__init__.py index 773a689d8..74b5301e6 100644 --- a/flowfile/flowfile/__init__.py +++ b/flowfile/flowfile/__init__.py @@ -51,8 +51,6 @@ Utf8, ) -from flowfile.api import open_graph_in_editor -from flowfile.web import start_server as start_web_ui from flowfile_core.flowfile import node_designer from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn @@ -249,4 +247,21 @@ "Field", "start_web_ui", ] + + +def __getattr__(name: str): + # The web UI / editor entry points pull the FastAPI server stack (uvicorn, + # fastapi) and `requests`; resolve them lazily so a plain `import flowfile` + # for the dataframe API stays light. + if name == "open_graph_in_editor": + from flowfile.api import open_graph_in_editor + + return open_graph_in_editor + if name == "start_web_ui": + from flowfile.web import start_server as start_web_ui + + return start_web_ui + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + logging.getLogger("PipelineHandler").setLevel(logging.WARNING) diff --git a/flowfile/flowfile/__main__.py b/flowfile/flowfile/__main__.py index 25492ad17..52ba7c4b2 100644 --- a/flowfile/flowfile/__main__.py +++ b/flowfile/flowfile/__main__.py @@ -20,6 +20,12 @@ def run_flow(flow_path: str, param_overrides: list[str] | None = None, run_id: i OFFLOAD_TO_WORKER.set(False) + # DB init is no longer an import side effect; get_run_user_id below reads the + # catalog DB via shared models (bypassing core's get_db guard), so init here. + from flowfile_core.database.connection import ensure_db_initialized + + ensure_db_initialized() + from flowfile_core.flowfile.manage.io_flowfile import open_flow path = Path(flow_path) @@ -150,9 +156,8 @@ def _complete_run_if_needed( def _run_project_command(action: str | None, arg: str | None) -> None: """Headless project init/open/save — the proof that build-from-scratch works without the UI.""" - from fastapi import HTTPException - from flowfile_core.auth.utils import get_local_user_id + from flowfile_core.exceptions import FlowfileHTTPException from flowfile_core.fileExplorer.funcs import validate_path_under_cwd from flowfile_core.project import project_sync @@ -162,7 +167,7 @@ def _validated_folder(usage: str) -> str: sys.exit(1) try: return validate_path_under_cwd(arg) - except HTTPException as e: + except FlowfileHTTPException as e: print(f"Invalid project path: {e.detail}", file=sys.stderr) sys.exit(1) diff --git a/flowfile_core/flowfile_core/__init__.py b/flowfile_core/flowfile_core/__init__.py index b3cbb1822..6297055de 100644 --- a/flowfile_core/flowfile_core/__init__.py +++ b/flowfile_core/flowfile_core/__init__.py @@ -6,14 +6,11 @@ from shared._version import get_version validate_setup() -from flowfile_core.database.init_db import init_db from flowfile_core.flowfile.handler import FlowfileHandler if "FLOWFILE_MODE" not in os.environ: os.environ["FLOWFILE_MODE"] = "electron" -init_db() - class ServerRun: exit: bool = False diff --git a/flowfile_core/flowfile_core/ai/audit.py b/flowfile_core/flowfile_core/ai/audit.py index b16a6ec7f..5a05c7fd1 100644 --- a/flowfile_core/flowfile_core/ai/audit.py +++ b/flowfile_core/flowfile_core/ai/audit.py @@ -28,7 +28,7 @@ from sqlalchemy.orm import Session -from flowfile_core.database.connection import SessionLocal +from flowfile_core.database.connection import SessionLocal, ensure_db_initialized from flowfile_core.database.models import AiAuditEvent logger = logging.getLogger(__name__) @@ -101,6 +101,7 @@ def record_event(event: AuditEvent, db: Session | None = None) -> AiAuditEvent: db.add(row) db.flush() else: + ensure_db_initialized() with SessionLocal() as session: session.add(row) session.commit() @@ -140,6 +141,7 @@ def _apply(session: Session) -> AiAuditEvent | None: if db is not None: return _apply(db) + ensure_db_initialized() with SessionLocal() as session: row = _apply(session) if row is None: @@ -184,6 +186,7 @@ def _query(session: Session) -> list[AiAuditEvent]: if db is not None: return _query(db) + ensure_db_initialized() with SessionLocal() as session: return _query(session) diff --git a/flowfile_core/flowfile_core/ai/diff.py b/flowfile_core/flowfile_core/ai/diff.py index e9a7c2b60..60274eec4 100644 --- a/flowfile_core/flowfile_core/ai/diff.py +++ b/flowfile_core/flowfile_core/ai/diff.py @@ -610,8 +610,7 @@ def apply_diff(flow, diff: GraphDiff) -> ApplyResult: # AI modules (executor, audit, providers) deliberately keep out of # their import-time graph; this ``diff`` module imports lightly so # tests can stub ``flow_graph`` symbols where needed. - from fastapi import HTTPException - + from flowfile_core.exceptions import FlowfileHTTPException from flowfile_core.flowfile.flow_graph import add_connection, delete_connection for add in diff.additions: @@ -670,7 +669,7 @@ def apply_diff(flow, diff: GraphDiff) -> ApplyResult: connection = input_schema.NodeConnection.model_validate(c.connection) try: delete_connection(flow, connection) - except HTTPException as exc: + except FlowfileHTTPException as exc: # Narrow catch: only the "Connection does not exist" 422 gets # swallowed. ``add_connection``'s cycle-detection 422 lives in # the other loop and is unaffected; any other HTTPException diff --git a/flowfile_core/flowfile_core/ai/diff_routes.py b/flowfile_core/flowfile_core/ai/diff_routes.py index 015d7fdc8..a051bbfb4 100644 --- a/flowfile_core/flowfile_core/ai/diff_routes.py +++ b/flowfile_core/flowfile_core/ai/diff_routes.py @@ -41,7 +41,7 @@ from flowfile_core import flow_file_handler from flowfile_core.ai import audit, diff from flowfile_core.auth.jwt import get_current_active_user -from flowfile_core.database.connection import SessionLocal, get_db +from flowfile_core.database.connection import SessionLocal, ensure_db_initialized, get_db router = APIRouter() @@ -165,6 +165,7 @@ def _flip_audit_actions(audit_ids: list[int], action: audit.DiffAction) -> list[ if not audit_ids: return [] updated: list[int] = [] + ensure_db_initialized() with SessionLocal() as session: for aid in audit_ids: row = audit.update_diff_action(aid, action, db=session) diff --git a/flowfile_core/flowfile_core/ai/metrics.py b/flowfile_core/flowfile_core/ai/metrics.py index 033a4b58e..c6d573094 100644 --- a/flowfile_core/flowfile_core/ai/metrics.py +++ b/flowfile_core/flowfile_core/ai/metrics.py @@ -38,7 +38,7 @@ from sqlalchemy.orm import Session from flowfile_core.ai.audit import query_events -from flowfile_core.database.connection import SessionLocal +from flowfile_core.database.connection import SessionLocal, ensure_db_initialized from flowfile_core.database.models import AiAuditEvent logger = logging.getLogger(__name__) @@ -84,6 +84,7 @@ def _aggregate(session: Session) -> dict[str, float | int]: if db is not None: return _aggregate(db) + ensure_db_initialized() with SessionLocal() as session: return _aggregate(session) diff --git a/flowfile_core/flowfile_core/ai/tools/executor/handlers/connections.py b/flowfile_core/flowfile_core/ai/tools/executor/handlers/connections.py index 8437a8c96..ddc11f6a6 100644 --- a/flowfile_core/flowfile_core/ai/tools/executor/handlers/connections.py +++ b/flowfile_core/flowfile_core/ai/tools/executor/handlers/connections.py @@ -511,11 +511,11 @@ def _handle_delete_connection( staged_node_payload={"delete_connection": connection.model_dump()}, ) - from fastapi import HTTPException + from flowfile_core.exceptions import FlowfileHTTPException try: delete_connection(flow, connection) - except HTTPException as exc: + except FlowfileHTTPException as exc: # LLM-redundant-op tolerance: swallow only the "Connection does not # exist" 422 (an ``update_node_settings`` in the same diff already # implicitly rewired, so this delete targets a wire that's already diff --git a/flowfile_core/flowfile_core/auth/secrets.py b/flowfile_core/flowfile_core/auth/secrets.py index dc1ba6dfb..6ef200bc6 100644 --- a/flowfile_core/flowfile_core/auth/secrets.py +++ b/flowfile_core/flowfile_core/auth/secrets.py @@ -6,8 +6,19 @@ import logging import os from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from cryptography.fernet import Fernet + + +def _fernet() -> "type[Fernet]": + # cryptography's Rust bindings are costly to import and this module sits on + # the `import flowfile_frame` path (via secret_manager) — load on first use. + from cryptography.fernet import Fernet + + return Fernet -from cryptography.fernet import Fernet logger = logging.getLogger(__name__) @@ -33,7 +44,7 @@ def __init__(self): self.key_path = self.storage_path / ".secret_key" if not self.key_path.exists(): with open(self.key_path, "wb") as f: - f.write(Fernet.generate_key()) + f.write(_fernet().generate_key()) try: os.chmod(self.key_path, 0o600) except Exception as e: @@ -55,7 +66,7 @@ def _read_store(self, service_name): with open(path, "rb") as f: data = f.read() - return json.loads(Fernet(key).decrypt(data).decode()) + return json.loads(_fernet()(key).decrypt(data).decode()) except Exception as e: logger.debug(f"Error reading from encrypted store: {e}") return {} @@ -66,7 +77,7 @@ def _write_store(self, service_name, data): with open(self.key_path, "rb") as f: key = f.read() - encrypted = Fernet(key).encrypt(json.dumps(data).encode()) + encrypted = _fernet()(key).encrypt(json.dumps(data).encode()) path = self._get_store_path(service_name) with open(path, "wb") as f: @@ -151,7 +162,7 @@ def get_docker_secret_key() -> str | None: if env_key: env_key = env_key.strip().strip('"').strip("'") try: - Fernet(env_key.encode()) + _fernet()(env_key.encode()) return env_key except Exception: raise RuntimeError("FLOWFILE_MASTER_KEY is not a valid Fernet key") from None @@ -161,7 +172,7 @@ def get_docker_secret_key() -> str | None: try: with open(secret_path) as f: key = f.read().strip() - Fernet(key.encode()) + _fernet()(key.encode()) return key except Exception: raise RuntimeError("Failed to read master key from Docker secret") from None @@ -191,7 +202,7 @@ def generate_master_key() -> str: Returns: str: A new valid Fernet encryption key. """ - return Fernet.generate_key().decode() + return _fernet().generate_key().decode() def get_master_key(): @@ -218,6 +229,6 @@ def get_master_key(): key = get_password("flowfile", "master_key") if not key: - key = Fernet.generate_key().decode() + key = _fernet().generate_key().decode() set_password("flowfile", "master_key", key) return key diff --git a/flowfile_core/flowfile_core/auth/utils.py b/flowfile_core/flowfile_core/auth/utils.py index 5ea371060..d2e59c545 100644 --- a/flowfile_core/flowfile_core/auth/utils.py +++ b/flowfile_core/flowfile_core/auth/utils.py @@ -7,8 +7,9 @@ def get_local_user_id() -> int: """Resolve the local_user's ID from the database for CLI execution.""" try: from flowfile_core.database import models as db_models - from flowfile_core.database.connection import SessionLocal + from flowfile_core.database.connection import SessionLocal, ensure_db_initialized + ensure_db_initialized() db = SessionLocal() try: local_user = db.query(db_models.User).filter(db_models.User.username == "local_user").first() diff --git a/flowfile_core/flowfile_core/catalog/__init__.py b/flowfile_core/flowfile_core/catalog/__init__.py index a09a92d35..7ffb55a46 100644 --- a/flowfile_core/flowfile_core/catalog/__init__.py +++ b/flowfile_core/flowfile_core/catalog/__init__.py @@ -8,6 +8,8 @@ * Domain exceptions (``CatalogError`` hierarchy) """ +from typing import TYPE_CHECKING + from .exceptions import ( AmbiguousTableError, CatalogError, @@ -39,7 +41,22 @@ VisualizationNotFoundError, ) from .repository import CatalogRepository, SQLAlchemyCatalogRepository -from .service import CatalogService + +if TYPE_CHECKING: + from .service import CatalogService + + +def __getattr__(name: str): + # ``.service`` transitively builds the whole catalog schema/serializer stack + # (~150ms of pydantic model construction). flow_graph — and through it + # `import flowfile_frame` — imports this package eagerly, so resolve + # CatalogService on first use instead. + if name == "CatalogService": + from .service import CatalogService + + return CatalogService + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + __all__ = [ "CatalogService", diff --git a/flowfile_core/flowfile_core/catalog/delta_utils.py b/flowfile_core/flowfile_core/catalog/delta_utils.py index 6f426b9cd..bcc716de7 100644 --- a/flowfile_core/flowfile_core/catalog/delta_utils.py +++ b/flowfile_core/flowfile_core/catalog/delta_utils.py @@ -10,9 +10,10 @@ import logging import shutil from pathlib import Path +from typing import TYPE_CHECKING -import pyarrow as pa -from deltalake import DeltaTable +if TYPE_CHECKING: + import pyarrow as pa from shared.delta_models import SourceTableVersion from shared.delta_utils import get_delta_size_bytes @@ -40,6 +41,8 @@ def check_source_versions_current(source_table_versions_json: str | None) -> boo logger.warning("Could not parse source_table_versions JSON, treating as stale") return False + from deltalake import DeltaTable + for sv in versions: try: current_version = DeltaTable(sv.file_path, without_files=True).version() @@ -85,6 +88,8 @@ def get_delta_table_size_bytes(path: str | Path) -> int: def read_delta_preview(path: str, n_rows: int = 100) -> pa.Table: """Read the first N rows from a Delta table using PyArrow.""" + from deltalake import DeltaTable + dt = DeltaTable(str(path)) dataset = dt.to_pyarrow_dataset() diff --git a/flowfile_core/flowfile_core/catalog/migrate_parquet_to_delta.py b/flowfile_core/flowfile_core/catalog/migrate_parquet_to_delta.py index f08e37a39..50445ea9c 100644 --- a/flowfile_core/flowfile_core/catalog/migrate_parquet_to_delta.py +++ b/flowfile_core/flowfile_core/catalog/migrate_parquet_to_delta.py @@ -23,7 +23,7 @@ import polars as pl from flowfile_core.catalog.delta_utils import is_delta_table -from flowfile_core.database.connection import SessionLocal +from flowfile_core.database.connection import SessionLocal, ensure_db_initialized from flowfile_core.database.models import CatalogTable logger = logging.getLogger(__name__) @@ -88,6 +88,7 @@ def migrate_table(table: CatalogTable, *, dry_run: bool = False) -> bool: def main(dry_run: bool = False) -> int: + ensure_db_initialized() db = SessionLocal() try: tables = db.query(CatalogTable).filter(CatalogTable.storage_format == "parquet").all() diff --git a/flowfile_core/flowfile_core/catalog/services/notebook_store.py b/flowfile_core/flowfile_core/catalog/services/notebook_store.py index 11be59d21..c6264b3a8 100644 --- a/flowfile_core/flowfile_core/catalog/services/notebook_store.py +++ b/flowfile_core/flowfile_core/catalog/services/notebook_store.py @@ -15,10 +15,9 @@ import os import tempfile import uuid +from functools import cache from pathlib import Path -import yaml - from flowfile_core.fileExplorer.funcs import _is_contained from flowfile_core.schemas.catalog_schema import NotebookCellModel from shared.storage_config import storage @@ -45,21 +44,29 @@ def _atomic_write(path: Path, content: str) -> None: _SUFFIX = ".notebook.yaml" -class _NotebookDumper(yaml.SafeDumper): - pass - - -def _represent_str(dumper: yaml.SafeDumper, data: str): +def _represent_str(dumper, data: str): style = "|" if "\n" in data else None return dumper.represent_scalar("tag:yaml.org,2002:str", data, style=style) -_NotebookDumper.add_representer(str, _represent_str) +@cache +def _notebook_dumper(): + """Build the SafeDumper subclass on first use — PyYAML stays off the + `import flowfile_frame` path (this module loads via catalog.service).""" + import yaml + + class _NotebookDumper(yaml.SafeDumper): + pass + + _NotebookDumper.add_representer(str, _represent_str) + return _NotebookDumper def _dump(data: dict) -> str: + import yaml + return yaml.dump( - data, Dumper=_NotebookDumper, default_flow_style=False, sort_keys=False, allow_unicode=True, width=4096 + data, Dumper=_notebook_dumper(), default_flow_style=False, sort_keys=False, allow_unicode=True, width=4096 ) @@ -140,6 +147,8 @@ def read_notebook_cells(owner_id: int, notebook_uuid: str) -> list[NotebookCellM if not path.is_file(): logger.warning("Notebook file missing for %s; returning empty cells", ctx) return [] + import yaml + try: raw = yaml.safe_load(path.read_text(encoding="utf-8")) except (OSError, yaml.YAMLError) as exc: diff --git a/flowfile_core/flowfile_core/catalog/services/previews.py b/flowfile_core/flowfile_core/catalog/services/previews.py index 876925f76..6748c6e89 100644 --- a/flowfile_core/flowfile_core/catalog/services/previews.py +++ b/flowfile_core/flowfile_core/catalog/services/previews.py @@ -6,7 +6,6 @@ from pathlib import Path import polars as pl -from deltalake import DeltaTable from flowfile_core.catalog.delta_utils import ( is_delta_table, @@ -188,6 +187,8 @@ def _get_delta_version_preview( except (RuntimeError, OSError, ValueError, KeyError): logger.warning("Worker delta version preview failed, falling back to local", exc_info=True) + from deltalake import DeltaTable + delta_table = DeltaTable(table_path, version=version, storage_options=storage_options) dataset = delta_table.to_pyarrow_dataset() pa_table = dataset.head(limit) @@ -239,6 +240,8 @@ def get_table_history(self, table_id: int, limit: int | None = None) -> DeltaTab except (RuntimeError, OSError, ValueError, KeyError): logger.warning("Worker delta history read failed, falling back to local", exc_info=True) + from deltalake import DeltaTable + delta_table = DeltaTable(table_path, without_files=True, storage_options=storage_options) raw_history = delta_table.history(limit) current_version = delta_table.version() diff --git a/flowfile_core/flowfile_core/catalog/services/tables.py b/flowfile_core/flowfile_core/catalog/services/tables.py index 1a657a350..b76eda7c0 100644 --- a/flowfile_core/flowfile_core/catalog/services/tables.py +++ b/flowfile_core/flowfile_core/catalog/services/tables.py @@ -8,8 +8,6 @@ from pathlib import Path from uuid import uuid4 -from deltalake import DeltaTable -from pyarrow import dataset as ds from sqlalchemy.orm.attributes import flag_modified from flowfile_core.catalog.delta_utils import ( @@ -231,12 +229,16 @@ def _read_table_metadata( path = Path(table_path) if storage_format == "delta" or (storage_format is None and is_delta_table(path)): + from deltalake import DeltaTable + delta_table = DeltaTable(str(path)) pa_schema = delta_table.schema().to_arrow() schema_list = [{"name": field.name, "dtype": str(field.type)} for field in pa_schema] row_count = delta_table.to_pyarrow_dataset().count_rows() size_bytes = get_delta_table_size_bytes(path) else: + from pyarrow import dataset as ds + dataset = ds.dataset(str(path), format="parquet") schema_list = [{"name": field.name, "dtype": str(field.type)} for field in dataset.schema] row_count = dataset.count_rows() diff --git a/flowfile_core/flowfile_core/configs/settings.py b/flowfile_core/flowfile_core/configs/settings.py index ed1f1f358..d58e7721b 100644 --- a/flowfile_core/flowfile_core/configs/settings.py +++ b/flowfile_core/flowfile_core/configs/settings.py @@ -4,7 +4,6 @@ import platform import tempfile -from passlib.context import CryptContext from starlette.config import Config from flowfile_core.configs.utils import MutableBool @@ -158,7 +157,6 @@ def is_package_mode() -> bool: ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 120 -PWD_CONTEXT = CryptContext(schemes=["bcrypt"], deprecated="auto") GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID", "") GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET", "") @@ -166,3 +164,15 @@ def is_package_mode() -> bool: "GOOGLE_OAUTH_REDIRECT_URI", f"http://localhost:{SERVER_PORT}/ga_connections/oauth/callback", ) + + +def __getattr__(name: str): + # Nothing in-repo uses PWD_CONTEXT (auth/password.py owns password hashing), + # but it stays resolvable for external callers. Built lazily so importing + # settings — which sits on the `import flowfile_frame` path — doesn't pay + # for loading passlib. + if name == "PWD_CONTEXT": + from passlib.context import CryptContext + + return CryptContext(schemes=["bcrypt"], deprecated="auto") + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/flowfile_core/flowfile_core/database/connection.py b/flowfile_core/flowfile_core/database/connection.py index bfe6f31ba..597242611 100644 --- a/flowfile_core/flowfile_core/database/connection.py +++ b/flowfile_core/flowfile_core/database/connection.py @@ -1,4 +1,6 @@ +import os import sys +import threading from contextlib import contextmanager from pathlib import Path @@ -27,9 +29,38 @@ def get_database_path() -> Path | None: SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) +_DB_INIT_LOCK = threading.Lock() +_db_initialized = False + + +def ensure_db_initialized() -> None: + """Idempotently run the startup Alembic migration + seed default rows. + + Previously this ran as a side effect of ``import flowfile_core`` (via + ``init_db.py``), which made importing the dataframe API drag in Alembic and + create the catalog DB on disk. It is now deferred to first actual DB access + (``get_db``/``get_db_context``) and to explicit server startup. Cheap on + every call after the first. Honors ``FLOWFILE_SKIP_STARTUP_MIGRATION``. + """ + global _db_initialized + if _db_initialized: + return + with _DB_INIT_LOCK: + if _db_initialized: + return + if not os.environ.get("FLOWFILE_SKIP_STARTUP_MIGRATION"): + from flowfile_core.database.migration import run_startup_migration + + run_startup_migration() + from flowfile_core.database.init_db import init_db + + init_db() + _db_initialized = True + def get_db(): """Dependency for FastAPI to get database session.""" + ensure_db_initialized() db = SessionLocal() try: yield db @@ -40,6 +71,7 @@ def get_db(): @contextmanager def get_db_context(): """Context manager for getting database session.""" + ensure_db_initialized() db = SessionLocal() try: yield db diff --git a/flowfile_core/flowfile_core/database/init_db.py b/flowfile_core/flowfile_core/database/init_db.py index b23f76d77..b3f554667 100644 --- a/flowfile_core/flowfile_core/database/init_db.py +++ b/flowfile_core/flowfile_core/database/init_db.py @@ -9,7 +9,6 @@ from flowfile_core.auth.password import get_password_hash from flowfile_core.database import models as db_models from flowfile_core.database.connection import SessionLocal -from flowfile_core.database.migration import run_startup_migration from shared._version import get_version pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") @@ -20,12 +19,6 @@ logger = logging.getLogger(__name__) -# Run Alembic-based migrations (replaces the old manual run_migrations + create_all). -# Skipped when FLOWFILE_SKIP_STARTUP_MIGRATION is set so the alembic CLI can import -# our metadata without recursively re-entering migration machinery. -if not os.environ.get("FLOWFILE_SKIP_STARTUP_MIGRATION"): - run_startup_migration() - def create_default_local_user(db: Session): local_user = db.query(db_models.User).filter(db_models.User.username == "local_user").first() @@ -180,5 +173,7 @@ def init_db(): if __name__ == "__main__": - init_db() + from flowfile_core.database.connection import ensure_db_initialized + + ensure_db_initialized() print("Local user created successfully") diff --git a/flowfile_core/flowfile_core/exceptions.py b/flowfile_core/flowfile_core/exceptions.py new file mode 100644 index 000000000..49f7681a6 --- /dev/null +++ b/flowfile_core/flowfile_core/exceptions.py @@ -0,0 +1,12 @@ +class FlowfileHTTPException(Exception): + """HTTP-style error raised by engine/secret code without importing FastAPI. + + A handler registered in ``flowfile_core.main`` maps this to the same JSON + response FastAPI's built-in ``HTTPException`` produces, so route behaviour is + unchanged while keeping ``flowfile_core.flowfile`` importable without FastAPI. + """ + + def __init__(self, status_code: int, detail: str | None = None): + self.status_code = status_code + self.detail = detail + super().__init__(detail) diff --git a/flowfile_core/flowfile_core/fileExplorer/funcs.py b/flowfile_core/flowfile_core/fileExplorer/funcs.py index 8f5e9a414..b5f622938 100644 --- a/flowfile_core/flowfile_core/fileExplorer/funcs.py +++ b/flowfile_core/flowfile_core/fileExplorer/funcs.py @@ -5,10 +5,10 @@ from pathlib import Path from typing import Literal -from fastapi import HTTPException from pydantic import BaseModel from flowfile_core.configs import settings +from flowfile_core.exceptions import FlowfileHTTPException from shared.storage_config import storage @@ -432,10 +432,10 @@ def validate_path_under_cwd(user_path: str) -> str: The validated, normalized full path as a string Raises: - HTTPException: 403 if path escapes the allowed directories + FlowfileHTTPException: 403 if path escapes the allowed directories """ if ".." in user_path: - raise HTTPException(403, "Access denied: path traversal not allowed") + raise FlowfileHTTPException(403, "Access denied: path traversal not allowed") expanded = os.path.expanduser(user_path) @@ -446,7 +446,7 @@ def validate_path_under_cwd(user_path: str) -> str: base_real = os.path.realpath(root) if _is_contained(base_real, candidate_real): return candidate_real - raise HTTPException(403, "Access denied") + raise FlowfileHTTPException(403, "Access denied") # In Docker/package mode, enforce strict sandboxing to trusted roots. for base in (os.getcwd(), str(storage.base_directory), str(storage.user_data_directory)): @@ -456,7 +456,7 @@ def validate_path_under_cwd(user_path: str) -> str: if _is_contained(base_real, candidate_real): return candidate_real - raise HTTPException(403, "Access denied") + raise FlowfileHTTPException(403, "Access denied") _MANAGED_FLOW_FILENAME_RE = re.compile(r"^[A-Za-z0-9_\-]+\.(yaml|yml|json)$") @@ -468,7 +468,7 @@ def resolve_managed_flow_path(filename: str) -> str: Accepts ONLY filenames matching ``[A-Za-z0-9_-]+\\.(yaml|yml|json)`` — no directory components, no dots other than the extension separator. Returns an absolute path strictly under storage.flows_directory. Raises - HTTPException(403) on violation. + FlowfileHTTPException(403) on violation. Sanitization is layered so CodeQL's ``py/path-injection`` query sees a recognized sanitizer on every taint path: @@ -480,17 +480,17 @@ def resolve_managed_flow_path(filename: str) -> str: validate_path_under_cwd above). """ if not filename or not _MANAGED_FLOW_FILENAME_RE.fullmatch(filename): - raise HTTPException(status_code=403, detail="invalid managed flow filename") + raise FlowfileHTTPException(status_code=403, detail="invalid managed flow filename") if filename != os.path.basename(filename): - raise HTTPException(status_code=403, detail="invalid managed flow filename") + raise FlowfileHTTPException(status_code=403, detail="invalid managed flow filename") if ".." in filename or "/" in filename or "\\" in filename: - raise HTTPException(status_code=403, detail="invalid managed flow filename") + raise FlowfileHTTPException(status_code=403, detail="invalid managed flow filename") base_path = os.path.normpath(str(storage.flows_directory)) fullpath = os.path.normpath(os.path.join(base_path, filename)) if fullpath.startswith(base_path): return fullpath - raise HTTPException(status_code=403, detail="invalid managed flow filename") + raise FlowfileHTTPException(status_code=403, detail="invalid managed flow filename") # Alias for backward compatibility diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/create/funcs.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/create/funcs.py index 2f44e5a97..8f83b38bf 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/create/funcs.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/create/funcs.py @@ -3,7 +3,6 @@ import polars as pl from polars._typing import CsvEncoding -from flowfile_core.flowfile.flow_data_engine.read_excel_tables import df_from_calamine_xlsx, df_from_openpyxl from flowfile_core.flowfile.flow_data_engine.sample_data import create_fake_data from flowfile_core.schemas import input_schema from shared.path_utils import is_url @@ -170,6 +169,10 @@ def create_from_path_avro(received_table: input_schema.ReceivedTable) -> pl.Data def create_from_path_excel(received_table: input_schema.ReceivedTable): + # read_excel_tables pulls openpyxl at module level — defer it to the excel + # read path so `import flowfile_frame` doesn't pay for it. + from flowfile_core.flowfile.flow_data_engine.read_excel_tables import df_from_calamine_xlsx, df_from_openpyxl + if not isinstance(received_table.table_settings, input_schema.InputExcelTable): raise ValueError("Received table settings are not of type InputExcelTable") table_settings: input_schema.InputExcelTable = received_table.table_settings diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py index 0f81d2d9b..c36f05dfe 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py @@ -10,6 +10,11 @@ from typing import TYPE_CHECKING, Any, Literal, TypeVar if TYPE_CHECKING: + # pyarrow is expensive to import and only needed when arrow/parquet paths + # actually run — keep it off the `import flowfile_frame` hot path. + from pyarrow import Table as PaTable + from pyarrow.parquet import ParquetFile + from flowfile_core.flowfile.flow_node.multi_output import NamedOutputs import polars as pl @@ -18,8 +23,6 @@ from polars.exceptions import PanicException from polars_expr_transformer import simple_function_to_expr as to_expr from polars_grouper import graph_solver -from pyarrow import Table as PaTable -from pyarrow.parquet import ParquetFile from flowfile_core.configs import logger from flowfile_core.configs.flow_logger import NodeLogger @@ -813,6 +816,8 @@ def _read_json_from_cloud( def _handle_path_ref(self, path_ref: str, optimize_memory: bool): """Handles file path reference input.""" + from pyarrow.parquet import ParquetFile + try: pf = ParquetFile(path_ref) except Exception as e: diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/sample_data.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/sample_data.py index 4c92f2584..6b497d071 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/sample_data.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/sample_data.py @@ -5,7 +5,6 @@ from typing import Any import polars as pl -from faker import Faker def create_fake_data(n_records: int = 1000, optimized: bool = True) -> pl.DataFrame: @@ -18,6 +17,8 @@ def create_fake_data(n_records: int = 1000, optimized: bool = True) -> pl.DataFr Returns: pl.DataFrame """ + from faker import Faker + fake = Faker() selector = partial(randint, 0) @@ -78,6 +79,8 @@ def generate_phone_number(): def create_fake_data_raw( n_records: int = 1000, col_selection: list[str] = None ) -> Generator[dict[str, Any], None, None]: + from faker import Faker + fake = Faker() selector = partial(randint, 0) diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/streaming.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/streaming.py index 6687c45b1..5f8441a0a 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/streaming.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/streaming.py @@ -15,7 +15,6 @@ from typing import Any import polars as pl -from websockets.sync.client import connect from flowfile_core.configs.settings import WORKER_URL from flowfile_core.flowfile.flow_data_engine.subprocess_operations.models import Status @@ -148,6 +147,10 @@ def streaming_start( Raises immediately on connection failure or send error. """ + # websockets loads only when a task is actually streamed to the worker — + # keeps it off the `import flowfile_frame` path. + from websockets.sync.client import connect + ws_url = _get_ws_url() + "/ws/submit" metadata = _build_metadata(task_id, operation_type, flow_id, node_id, kwargs) diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py index c7be0ed4f..fa08df988 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py @@ -1,15 +1,38 @@ # Standard library imports +from __future__ import annotations + import io import json import threading from base64 import b64decode -from typing import Any, Literal +from typing import TYPE_CHECKING, Any, Literal from uuid import uuid4 import polars as pl -import requests from pl_fuzzy_frame_match.models import FuzzyMapping +if TYPE_CHECKING: + import requests + + from flowfile_core.schemas.catalog_schema import CatalogTablePreview, DeltaTableHistory +else: + + class _LazyRequests: + """Stand-in that defers `import requests` (~80ms) until the first worker call. + + Every use in this module is a network round-trip to the worker, so the + import cost never belongs on the `import flowfile_frame` path. On first + attribute access the real module replaces this proxy in globals(). + """ + + def __getattr__(self, name): + import requests + + globals()["requests"] = requests + return getattr(requests, name) + + requests = _LazyRequests() + from flowfile_core.configs import logger from flowfile_core.configs.settings import OFFLOAD_TO_WORKER, WORKER_URL from flowfile_core.flowfile.flow_data_engine.subprocess_operations.models import ( @@ -29,7 +52,6 @@ DatabaseExternalReadSettings, DatabaseExternalWriteSettings, ) -from flowfile_core.schemas.catalog_schema import CatalogTablePreview, DeltaTableHistory from flowfile_core.schemas.cloud_storage_schemas import CloudStorageWriteSettingsWorkerInterface from flowfile_core.schemas.input_schema import ReceivedTable from flowfile_core.utils.arrow_reader import read @@ -481,6 +503,8 @@ def trigger_delta_history( response = requests.post(f"{WORKER_URL}/catalog/delta_history", json=payload) if not response.ok: raise RuntimeError(f"Worker delta history read failed: {response.text}") + from flowfile_core.schemas.catalog_schema import DeltaTableHistory + return DeltaTableHistory.model_validate(response.json()) @@ -501,6 +525,8 @@ def trigger_delta_version_preview( response = requests.post(f"{WORKER_URL}/catalog/delta_version_preview", json=payload) if not response.ok: raise RuntimeError(f"Worker delta version preview failed: {response.text}") + from flowfile_core.schemas.catalog_schema import CatalogTablePreview + return CatalogTablePreview.model_validate(response.json()) @@ -520,6 +546,8 @@ def trigger_delta_preview( response = requests.post(f"{WORKER_URL}/catalog/delta_preview", json=payload) if not response.ok: raise RuntimeError(f"Worker delta preview failed: {response.text}") + from flowfile_core.schemas.catalog_schema import CatalogTablePreview + return CatalogTablePreview.model_validate(response.json()) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 847eec76d..e08322959 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -10,17 +10,24 @@ from functools import partial from pathlib import Path from time import time -from typing import Any, Literal, NamedTuple, Union +from typing import TYPE_CHECKING, Any, Literal, NamedTuple, Union from uuid import uuid1 -import fastexcel import polars as pl -import yaml -from fastapi.exceptions import HTTPException -from pyarrow.parquet import ParquetFile + +if TYPE_CHECKING: + # fastexcel/pyarrow/yaml are expensive to import and only needed on the + # excel-read / parquet-input / flow-save paths — keep them off the + # `import flowfile_frame` hot path (see tests/test_lazy_imports.py). + # CatalogService/TableWriteMetadata drag in the catalog service + schema + # stack (~150ms of pydantic model building) and resolve lazily for the + # same reason. + from pyarrow.parquet import ParquetFile + + from flowfile_core.catalog import CatalogService + from flowfile_core.schemas.catalog_schema import TableWriteMetadata from flowfile_core.auth import sharing -from flowfile_core.catalog import CatalogService from flowfile_core.catalog.delta_utils import check_source_versions_current, delete_table_storage, is_delta_table from flowfile_core.catalog.repository import SQLAlchemyCatalogRepository from flowfile_core.catalog.storage_backend import _is_cloud_uri, resolve_for_namespace, serialized_frame_uses_cloud @@ -31,6 +38,7 @@ from flowfile_core.configs.node_store.nodes import get_source_node_types, get_source_node_types_str from flowfile_core.database import models as db_models from flowfile_core.database.connection import get_db_context +from flowfile_core.exceptions import FlowfileHTTPException from flowfile_core.flowfile.analytics.utils import create_graphic_walker_node_from_node_promise from flowfile_core.flowfile.artifacts import ArtifactContext from flowfile_core.flowfile.database_connection_manager.db_connections import ( @@ -49,10 +57,6 @@ ) from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn, cast_str_to_polars_type from flowfile_core.flowfile.flow_data_engine.polars_code_parser import polars_code_parser -from flowfile_core.flowfile.flow_data_engine.read_excel_tables import ( - get_calamine_xlsx_data_types, - get_open_xlsx_datatypes, -) from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import ( ExternalCloudWriter, ExternalDatabaseFetcher, @@ -107,11 +111,6 @@ from flowfile_core.flowfile.util.calculate_layout import calculate_layered_layout from flowfile_core.flowfile.util.execution_orderer import ExecutionPlan, ExecutionStage, compute_execution_plan from flowfile_core.flowfile.utils import snake_case_to_camel_case -from flowfile_core.kafka.connection_manager import ( - build_consumer_config, - get_kafka_connection, - get_kafka_connection_by_name, -) from flowfile_core.kernel import get_kernel_manager from flowfile_core.kernel.execution import ( build_execute_request, @@ -121,7 +120,6 @@ write_inputs_to_parquet, ) from flowfile_core.schemas import input_schema, schemas, transform_schema -from flowfile_core.schemas.catalog_schema import TableWriteMetadata from flowfile_core.schemas.cloud_storage_schemas import ( AuthMethod, CloudStorageReadSettingsInternal, @@ -150,7 +148,6 @@ from shared.google_analytics.models import ( GoogleAnalyticsReadSettings as WorkerGoogleAnalyticsReadSettings, ) -from shared.kafka.consumer import infer_topic_schema, make_kafka_commit_callback, read_kafka_source from shared.kafka.models import KafkaReadSettings from shared.storage_config import storage @@ -164,7 +161,17 @@ def represent_list_json(dumper, data): return dumper.represent_sequence("tag:yaml.org,2002:seq", data, flow_style=False) -yaml.add_representer(list, represent_list_json) +def _get_yaml(): + """Import yaml on first use (flow save/load), with our list representer installed. + + Registration is a plain dict assignment, so re-running it on every call is + idempotent and cheap; the win is that `import flowfile_frame` no longer pays + for loading PyYAML. + """ + import yaml + + yaml.add_representer(list, represent_list_json) + return yaml def with_history_capture(action_type: "HistoryActionType", description_template: str = "Update {node_type} settings"): @@ -245,6 +252,13 @@ def get_xlsx_schema( Returns: A list of FlowfileColumn objects representing the schema. """ + # read_excel_tables pulls openpyxl (and numpy) at module level — defer it to + # the xlsx schema path so `import flowfile_frame` doesn't pay for it. + from flowfile_core.flowfile.flow_data_engine.read_excel_tables import ( + get_calamine_xlsx_data_types, + get_open_xlsx_datatypes, + ) + try: logger.info("Starting to calculate the schema") if engine == "openpyxl": @@ -347,7 +361,7 @@ def get_cloud_connection_settings( A FullCloudStorageConnection object with the connection details. Raises: - HTTPException: If the connection settings cannot be found. + FlowfileHTTPException: If the connection settings cannot be found. """ cloud_connection_settings = get_local_cloud_connection(connection_name, user_id) if cloud_connection_settings is None and auth_mode in ("env_vars", transform_schema.AUTO_DATA_TYPE): @@ -356,7 +370,7 @@ def get_cloud_connection_settings( elif cloud_connection_settings is None and auth_mode == "aws-cli": cloud_connection_settings = FullCloudStorageConnection(storage_type="s3", auth_method="aws-cli") if cloud_connection_settings is None: - raise HTTPException(status_code=400, detail="Cloud connection settings not found") + raise FlowfileHTTPException(status_code=400, detail="Cloud connection settings not found") return cloud_connection_settings @@ -391,6 +405,8 @@ def _resolve_virtual_table( ): return pl.LazyFrame.deserialize(_io.BytesIO(serialized_lf)) with get_db_context() as db: + from flowfile_core.catalog import CatalogService + repo = SQLAlchemyCatalogRepository(db) svc = CatalogService(repo) return svc.resolve_virtual_flow_table( @@ -511,6 +527,8 @@ def _resolve_catalog_table_info(node_catalog_reader: "input_schema.NodeCatalogRe resolved_table_name: str | None = None try: with get_db_context() as db: + from flowfile_core.catalog import CatalogService + repo = SQLAlchemyCatalogRepository(db) svc = CatalogService(repo) @@ -585,7 +603,7 @@ def _write_catalog_delta_local( merge_keys: list[str] | None, partition_by: list[str] | None = None, storage_options: dict[str, str] | None = None, -) -> TableWriteMetadata | None: +) -> "TableWriteMetadata | None": """Write a Delta table in-process. Returns metadata dict, or ``None`` when the write was skipped. *storage_options* (passed verbatim, ``None`` ⇒ local filesystem) routes the write to object @@ -624,7 +642,7 @@ def _write_catalog_delta_remote( op_type: str, op_kwargs: dict, table_name: str, -) -> TableWriteMetadata | None: +) -> "TableWriteMetadata | None": """Write a Delta table via the worker service. Returns metadata dict, or ``None`` when skipped.""" fetcher = ExternalDfFetcher( flow_id=flow_id, @@ -647,7 +665,7 @@ def _write_catalog_delta_remote( return meta -def _effective_namespace_id(svc: CatalogService, settings) -> int | None: +def _effective_namespace_id(svc: "CatalogService", settings) -> int | None: """Resolve a node's target namespace name-first (portable ``namespace_full_name``), falling back to the numeric ``namespace_id`` for flows saved before names were stored. The numeric id is install-local and goes stale when namespaces are recreated, so the name is preferred whenever present.""" @@ -661,7 +679,7 @@ def _register_catalog_table( settings: input_schema.CatalogWriteSettings, source_registration_id: int | None, user_id: int, - meta_kwargs: TableWriteMetadata, + meta_kwargs: "TableWriteMetadata", storage_options: dict[str, str] | None = None, is_cloud: bool = False, ) -> None: @@ -676,6 +694,8 @@ def _register_catalog_table( partition_columns = get_delta_partition_columns(dest_path) if is_delta_table(dest_path) else [] try: with get_db_context() as db: + from flowfile_core.catalog import CatalogService + repo = SQLAlchemyCatalogRepository(db) svc = CatalogService(repo) if existing is not None: @@ -872,6 +892,8 @@ def _handle_virtual_table_write( schema_json = json.dumps([{"name": c.column_name, "dtype": c.data_type} for c in df.schema]) with get_db_context() as db: + from flowfile_core.catalog import CatalogService + repo = SQLAlchemyCatalogRepository(db) svc = CatalogService(repo) ns_id = _effective_namespace_id(svc, settings) @@ -922,6 +944,8 @@ def _handle_physical_table_write( user_id = node_catalog_writer.user_id or 1 with get_db_context() as db: + from flowfile_core.catalog import CatalogService + repo = SQLAlchemyCatalogRepository(db) svc = CatalogService(repo) namespace_id = _effective_namespace_id(svc, settings) @@ -1015,14 +1039,14 @@ def _resolve_database_credentials( database_connection = database_settings.database_connection encrypted_password = get_encrypted_secret(current_user_id=user_id, secret_name=database_connection.password_ref) if encrypted_password is None: - raise HTTPException(status_code=400, detail="Password not found") + raise FlowfileHTTPException(status_code=400, detail="Password not found") return database_connection, encrypted_password, None elif is_sqlite: return database_settings.database_connection, None, None else: ref_settings = get_local_database_connection(database_settings.database_connection_name, user_id) if ref_settings is None: - raise HTTPException( + raise FlowfileHTTPException( status_code=400, detail=( f"Database connection '{database_settings.database_connection_name}' not found " @@ -1043,14 +1067,14 @@ class FlowGraph: depends_on: dict[ int, Union[ - ParquetFile, + "ParquetFile", FlowDataEngine, "FlowGraph", pl.DataFrame, ], ] _flow_id: int - _input_data: Union[ParquetFile, FlowDataEngine, "FlowGraph"] + _input_data: Union["ParquetFile", FlowDataEngine, "FlowGraph"] _input_cols: list[str] _output_cols: list[str] _node_db: dict[str | int, FlowNode] @@ -1073,7 +1097,7 @@ def __init__( input_cols: list[str] = None, output_cols: list[str] = None, path_ref: str = None, - input_flow: Union[ParquetFile, FlowDataEngine, "FlowGraph"] = None, + input_flow: Union["ParquetFile", FlowDataEngine, "FlowGraph"] = None, cache_results: bool = False, ): """Initializes a new FlowGraph instance. @@ -2645,6 +2669,8 @@ def _func(data: FlowDataEngine) -> FlowDataEngine: ) tags = list({"ml", trainer.task_type, settings.model_type, *settings.catalog_tags}) + from flowfile_core.catalog import CatalogService + with get_db_context() as _ns_db: effective_namespace_id = _effective_namespace_id( CatalogService(SQLAlchemyCatalogRepository(_ns_db)), settings @@ -2827,6 +2853,8 @@ def _func(data: FlowDataEngine) -> FlowDataEngine: else: if not settings.model_name: raise ValueError("Apply Model: 'model_name' is required when source='catalog'.") + from flowfile_core.catalog import CatalogService + storage_backend = get_storage_backend() with get_db_context() as db: effective_namespace_id = _effective_namespace_id( @@ -3979,6 +4007,14 @@ def add_kafka_source(self, node_kafka_source: input_schema.NodeKafkaSource): Args: node_kafka_source: The settings for the Kafka source node. """ + # confluent_kafka + pyarrow.ipc load only when a kafka node is actually + # added — they must stay off the `import flowfile_frame` path. + from flowfile_core.kafka.connection_manager import ( + build_consumer_config, + get_kafka_connection, + get_kafka_connection_by_name, + ) + from shared.kafka.consumer import infer_topic_schema, make_kafka_commit_callback, read_kafka_source logger.info("Adding kafka source") node_type = "kafka_source" @@ -4023,7 +4059,7 @@ def _get_kafka_read_settings() -> KafkaReadSettings: db, kafka_settings.kafka_connection_name, node_kafka_source.user_id ) if db_conn is None: - raise HTTPException(status_code=400, detail="Kafka connection not found") + raise FlowfileHTTPException(status_code=400, detail="Kafka connection not found") consumer_config = build_consumer_config(db, db_conn, node_kafka_source.user_id) _read_settings["v"] = KafkaReadSettings.from_consumer_config( consumer_config, @@ -4164,7 +4200,7 @@ def _build_worker_settings() -> WorkerGoogleAnalyticsReadSettings: with get_db_context() as db: db_conn = get_ga_connection(db, ga_settings.ga_connection_name, node_ga_reader.user_id) if db_conn is None: - raise HTTPException( + raise FlowfileHTTPException( status_code=400, detail=( f"Google Analytics connection '{ga_settings.ga_connection_name}' not found " @@ -4176,7 +4212,7 @@ def _build_worker_settings() -> WorkerGoogleAnalyticsReadSettings: db, ga_settings.ga_connection_name, node_ga_reader.user_id ) if encrypted_credential is None: - raise HTTPException( + raise FlowfileHTTPException( status_code=400, detail=( f"Google Analytics connection '{ga_settings.ga_connection_name}' has no stored credential" @@ -4220,7 +4256,7 @@ def _build_worker_settings() -> WorkerGoogleAnalyticsReadSettings: # ``oauth_cfg`` is only fetched for the oauth auth method; guard against # an unknown auth_method value reaching this branch with ``None``. if not oauth_cfg or not oauth_cfg["client_id"] or not oauth_cfg["client_secret"]: - raise HTTPException( + raise FlowfileHTTPException( status_code=500, detail=( "Google OAuth is not configured on this instance. Open Admin → Google OAuth " @@ -4545,6 +4581,8 @@ def add_read(self, input_file: input_schema.NodeRead): input_file.received_file.file_type in ("xlsx", "excel") and input_file.received_file.table_settings.sheet_name == "" ): + import fastexcel + sheet_name = fastexcel.read_excel(input_file.received_file.path).sheet_names[0] input_file.received_file.table_settings.sheet_name = sheet_name @@ -5401,6 +5439,7 @@ def save_flow(self, flow_path: str): "Or stay on.1 if you still need .flowfile support.\n\n" ) elif suffix in (".yaml", ".yml"): + yaml = _get_yaml() flowfile_data = self.get_flowfile_data() data = flowfile_data.model_dump(mode="json") with open(flow_path, "w", encoding="utf-8") as f: @@ -5412,6 +5451,7 @@ def save_flow(self, flow_path: str): json.dump(data, f, indent=2, ensure_ascii=False) else: + yaml = _get_yaml() flowfile_data = self.get_flowfile_data() logger.warning(f"Unknown file extension {suffix}. Defaulting to YAML format.") data = flowfile_data.model_dump(mode="json") @@ -5710,10 +5750,10 @@ def add_connection(flow: FlowGraph, node_connection: input_schema.NodeConnection ) if not n ] - raise HTTPException(404, f"Node(s) not found: {', '.join(missing)}") + raise FlowfileHTTPException(404, f"Node(s) not found: {', '.join(missing)}") error = validate_connection(from_node, to_node) if error is not None: - raise HTTPException(422, error.detail) + raise FlowfileHTTPException(422, error.detail) to_node.add_node_connection( from_node, node_connection.input_connection.get_node_input_connection_type(), @@ -5734,13 +5774,13 @@ def delete_connection(graph, node_connection: input_schema.NodeConnection): # already removed) surfaces as an AttributeError → 500, which also drops # CORS headers and shows up as a CORS error in the browser. if from_node is None or to_node is None: - raise HTTPException(422, "Connection does not exist on the input node") + raise FlowfileHTTPException(422, "Connection does not exist on the input node") connection_valid = to_node.node_inputs.validate_if_input_connection_exists( node_input_id=from_node.node_id, connection_name=node_connection.input_connection.get_node_input_connection_type(), ) if not connection_valid: - raise HTTPException(422, "Connection does not exist on the input node") + raise FlowfileHTTPException(422, "Connection does not exist on the input node") from_node.delete_lead_to_node(node_connection.input_connection.node_id) to_node.delete_input_node( node_connection.output_connection.node_id, diff --git a/flowfile_core/flowfile_core/flowfile/flow_node/models.py b/flowfile_core/flowfile_core/flowfile/flow_node/models.py index b5ac5ce62..47c8a0d47 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_node/models.py +++ b/flowfile_core/flowfile_core/flowfile/flow_node/models.py @@ -1,9 +1,14 @@ +from __future__ import annotations + from collections.abc import Callable from dataclasses import dataclass from enum import Enum, auto -from typing import Literal +from typing import TYPE_CHECKING, Literal -import pyarrow as pa +if TYPE_CHECKING: + # Annotation-only: pyarrow is expensive and must stay off the + # `import flowfile_frame` path. + import pyarrow as pa class ExecutionStrategy(Enum): @@ -179,9 +184,9 @@ class NodeStepInputs: main_inputs: A list of `FlowNode` objects connected to the main input port(s). """ - left_input: "FlowNode" = None - right_input: "FlowNode" = None - main_inputs: list["FlowNode"] = None + left_input: FlowNode = None + right_input: FlowNode = None + main_inputs: list[FlowNode] = None @property def input_ids(self) -> list[int]: @@ -193,7 +198,7 @@ def input_ids(self) -> list[int]: return [node_input.node_information.id for node_input in self.get_all_inputs()] return [] - def get_all_inputs(self) -> list["FlowNode"]: + def get_all_inputs(self) -> list[FlowNode]: """ Retrieves a single list containing all input nodes (main, left, and right). :return: A list of all connected `FlowNode` objects. diff --git a/flowfile_core/flowfile_core/flowfile/flow_node/state.py b/flowfile_core/flowfile_core/flowfile/flow_node/state.py index 61e29c672..19d81f1d5 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_node/state.py +++ b/flowfile_core/flowfile_core/flowfile/flow_node/state.py @@ -10,9 +10,10 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING -import pyarrow as pa - if TYPE_CHECKING: + # Annotation-only: keep pyarrow off the `import flowfile_frame` path. + import pyarrow as pa + from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn diff --git a/flowfile_core/flowfile_core/flowfile/manage/compatibility_enhancements.py b/flowfile_core/flowfile_core/flowfile/manage/compatibility_enhancements.py index c89559bf5..67028746e 100644 --- a/flowfile_core/flowfile_core/flowfile/manage/compatibility_enhancements.py +++ b/flowfile_core/flowfile_core/flowfile/manage/compatibility_enhancements.py @@ -9,7 +9,6 @@ from typing import Any from flowfile_core.schemas import input_schema, schemas, transform_schema -from tools.migrate.legacy_schemas import LEGACY_CLASS_MAP # LEGACY PICKLE LOADING @@ -25,6 +24,10 @@ class LegacyUnpickler(pickle.Unpickler): def find_class(self, module: str, name: str): """Override to redirect transform_schema dataclasses to legacy definitions.""" + # Deferred: the legacy-schema module is sizable and only needed when a + # pre-YAML `.flowfile` pickle is actually opened. + from tools.migrate.legacy_schemas import LEGACY_CLASS_MAP + if name in LEGACY_CLASS_MAP: return LEGACY_CLASS_MAP[name] return super().find_class(module, name) diff --git a/flowfile_core/flowfile_core/flowfile/manage/io_flowfile.py b/flowfile_core/flowfile_core/flowfile/manage/io_flowfile.py index f7ebee789..f7462589c 100644 --- a/flowfile_core/flowfile_core/flowfile/manage/io_flowfile.py +++ b/flowfile_core/flowfile_core/flowfile/manage/io_flowfile.py @@ -10,11 +10,6 @@ from flowfile_core.schemas.schemas import get_settings_class_for_node_type from shared.storage_config import storage -try: - import yaml -except ImportError: - yaml = None - def _validate_flow_path(flow_path: Path) -> Path: """Validate flow path is within allowed directories or is an explicit absolute path.""" @@ -115,8 +110,11 @@ def _load_flowfile_yaml(flow_path: Path) -> schemas.FlowInformation: Returns: FlowInformation object """ - if yaml is None: - raise ImportError("PyYAML is required for YAML files. Install with: pip install pyyaml") + try: + # Deferred so PyYAML loads only when a YAML flow is actually opened. + import yaml + except ImportError: + raise ImportError("PyYAML is required for YAML files. Install with: pip install pyyaml") from None flow_path = _validate_flow_path(flow_path) with open(flow_path, encoding="utf-8") as f: data = yaml.safe_load(f) diff --git a/flowfile_core/flowfile_core/flowfile/sources/external_sources/custom_external_sources/sample_users.py b/flowfile_core/flowfile_core/flowfile/sources/external_sources/custom_external_sources/sample_users.py index b5875ff6b..63d2b7ff2 100644 --- a/flowfile_core/flowfile_core/flowfile/sources/external_sources/custom_external_sources/sample_users.py +++ b/flowfile_core/flowfile_core/flowfile/sources/external_sources/custom_external_sources/sample_users.py @@ -2,8 +2,6 @@ from time import sleep from typing import Any -import requests - from flowfile_core.schemas.input_schema import SampleUsers @@ -17,6 +15,9 @@ def getter(data: SampleUsers) -> Generator[dict[str, Any], None, None]: Returns: """ + # requests loads only when this demo source actually fetches data. + import requests + index_pos = 0 for _i in range(data.size): sleep(0.01) diff --git a/flowfile_core/flowfile_core/kernel/__init__.py b/flowfile_core/flowfile_core/kernel/__init__.py index dcd57f0c8..37e5b336c 100644 --- a/flowfile_core/flowfile_core/kernel/__init__.py +++ b/flowfile_core/flowfile_core/kernel/__init__.py @@ -1,4 +1,5 @@ -from flowfile_core.kernel.manager import KernelManager +from typing import TYPE_CHECKING + from flowfile_core.kernel.models import ( ArtifactIdentifier, ArtifactPersistenceInfo, @@ -16,7 +17,6 @@ RecoveryMode, RecoveryStatus, ) -from flowfile_core.kernel.routes import router __all__ = [ "KernelManager", @@ -39,12 +39,16 @@ "get_kernel_manager", ] -_manager: KernelManager | None = None +if TYPE_CHECKING: + from flowfile_core.kernel.manager import KernelManager + +_manager: "KernelManager | None" = None -def get_kernel_manager() -> KernelManager: +def get_kernel_manager() -> "KernelManager": global _manager if _manager is None: + from flowfile_core.kernel.manager import KernelManager from shared.storage_config import storage # Use a sub-directory of the standard temp/internal_storage tree. @@ -54,3 +58,20 @@ def get_kernel_manager() -> KernelManager: shared_path = str(storage.temp_directory / "kernel_shared") _manager = KernelManager(shared_volume_path=shared_path) return _manager + + +def __getattr__(name: str): + # `router` is imported only by the FastAPI server (main.py). Loading it + # eagerly would pull fastapi into every importer of this package (incl. + # flowfile_frame via flow_graph's KernelManager/execution imports), so + # resolve it lazily to keep the dataframe-API import light. KernelManager + # is deferred for the same reason: manager.py imports docker + httpx. + if name == "router": + from flowfile_core.kernel.routes import router + + return router + if name == "KernelManager": + from flowfile_core.kernel.manager import KernelManager + + return KernelManager + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/flowfile_core/flowfile_core/kernel/execution.py b/flowfile_core/flowfile_core/kernel/execution.py index af45b0f66..36fdf6639 100644 --- a/flowfile_core/flowfile_core/kernel/execution.py +++ b/flowfile_core/flowfile_core/kernel/execution.py @@ -4,18 +4,25 @@ each piece independently testable. """ +from __future__ import annotations + import logging import os import re +from typing import TYPE_CHECKING import polars as pl from flowfile_core.configs.settings import OFFLOAD_TO_WORKER, SERVER_PORT from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import ExternalDfFetcher -from flowfile_core.kernel.manager import KernelManager from flowfile_core.kernel.models import ExecuteRequest, ExecuteResult +if TYPE_CHECKING: + # KernelManager is annotation-only here; manager.py imports docker + httpx, + # which must stay off the `import flowfile_frame` path. + from flowfile_core.kernel.manager import KernelManager + logger = logging.getLogger(__name__) _SAFE_NAME_RE = re.compile(r"^[a-z][a-z0-9_]*$") diff --git a/flowfile_core/flowfile_core/main.py b/flowfile_core/flowfile_core/main.py index de8e3a3ea..7f596453e 100644 --- a/flowfile_core/flowfile_core/main.py +++ b/flowfile_core/flowfile_core/main.py @@ -9,8 +9,9 @@ from pathlib import Path import uvicorn -from fastapi import BackgroundTasks, FastAPI +from fastapi import BackgroundTasks, FastAPI, Request from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse from flowfile_core.ai import router as ai_router from flowfile_core.ai.admin_routes import router as ai_admin_router @@ -23,6 +24,8 @@ WORKER_PORT, WORKER_URL, ) +from flowfile_core.database.connection import ensure_db_initialized +from flowfile_core.exceptions import FlowfileHTTPException from flowfile_core.kernel import router as kernel_router from flowfile_core.lsp.admin_routes import router as lsp_admin_router from flowfile_core.lsp.routes import router as lsp_router @@ -69,6 +72,10 @@ async def shutdown_handler(app: FastAPI): print("Starting core application...") + # DB init/migration used to run as an import side effect of flowfile_core; + # it's now deferred, so run it explicitly at server startup (idempotent). + ensure_db_initialized() + # Only auto-start scheduler if explicitly opted in via env var if os.environ.get("FLOWFILE_SCHEDULER_ENABLED", "").lower() in ("true", "1", "yes"): scheduler = FlowScheduler() @@ -123,6 +130,14 @@ def _shutdown_local_model(): lifespan=shutdown_handler, ) + +@app.exception_handler(FlowfileHTTPException) +async def _flowfile_http_exception_handler(request: Request, exc: FlowfileHTTPException): + """Map engine/secret FlowfileHTTPExceptions to the same JSON response FastAPI's + built-in HTTPException handler produces (keeps flow_graph fastapi-import-free).""" + return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail}) + + # The Tauri 2 desktop shell loads the renderer from a custom protocol — the # exact origin differs per OS (`tauri://localhost` on macOS/iOS, `http://tauri.localhost` # on Linux, `https://tauri.localhost` on Windows/Android). A regex covers all @@ -276,6 +291,10 @@ def _run_flow_cli(flow_path: str, run_id: int) -> int: OFFLOAD_TO_WORKER.set(False) + # Standalone CLI run: no FastAPI lifespan, and get_run_user_id below reads the + # catalog DB via shared models (bypassing core's get_db guard), so init here. + ensure_db_initialized() + from flowfile_core.flowfile.manage.io_flowfile import open_flow path = Path(flow_path) diff --git a/flowfile_core/flowfile_core/secret_manager/secret_manager.py b/flowfile_core/flowfile_core/secret_manager/secret_manager.py index 6ca612bed..5a783d669 100644 --- a/flowfile_core/flowfile_core/secret_manager/secret_manager.py +++ b/flowfile_core/flowfile_core/secret_manager/secret_manager.py @@ -1,9 +1,6 @@ import base64 +from typing import TYPE_CHECKING -from cryptography.fernet import Fernet -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.kdf.hkdf import HKDF -from fastapi.exceptions import HTTPException from pydantic import SecretStr from sqlalchemy import and_ from sqlalchemy.orm import Session @@ -13,6 +10,19 @@ from flowfile_core.auth.secrets import get_master_key from flowfile_core.database import models as db_models from flowfile_core.database.connection import get_db_context +from flowfile_core.exceptions import FlowfileHTTPException + +if TYPE_CHECKING: + from cryptography.fernet import Fernet + + +def _fernet() -> "type[Fernet]": + # cryptography's Rust bindings are costly to import; secrets are en/decrypted + # far from the `import flowfile_frame` hot path, so load on first use. + from cryptography.fernet import Fernet + + return Fernet + # Version identifier for key derivation scheme (allows future migrations) KEY_DERIVATION_VERSION = b"flowfile-secrets-v1" @@ -35,6 +45,9 @@ def derive_user_key(user_id: int) -> bytes: Returns: bytes: A 32-byte URL-safe base64-encoded key suitable for Fernet """ + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.kdf.hkdf import HKDF + master_key = get_master_key().encode() hkdf = HKDF( @@ -51,14 +64,14 @@ def derive_user_key(user_id: int) -> bytes: def _encrypt_with_master_key(secret_value: str) -> str: """Legacy encryption using master key directly (for backward compatibility).""" key = get_master_key().encode() - f = Fernet(key) + f = _fernet()(key) return f.encrypt(secret_value.encode()).decode() def _decrypt_with_master_key(encrypted_value: str) -> SecretStr: """Legacy decryption using master key directly (for backward compatibility).""" key = get_master_key().encode() - f = Fernet(key) + f = _fernet()(key) return SecretStr(f.decrypt(encrypted_value.encode()).decode()) @@ -79,7 +92,7 @@ def encrypt_secret(secret_value: str, user_id: int) -> str: str: The encrypted value with embedded user_id """ key = derive_user_key(user_id) - f = Fernet(key) + f = _fernet()(key) fernet_token = f.encrypt(secret_value.encode()).decode() return f"{SECRET_FORMAT_PREFIX}{user_id}${fernet_token}" @@ -110,13 +123,13 @@ def decrypt_secret(encrypted_value: str, user_id: int | None = None) -> SecretSt fernet_token = parts[1] key = derive_user_key(embedded_user_id) - f = Fernet(key) + f = _fernet()(key) return SecretStr(f.decrypt(fernet_token.encode()).decode()) # Legacy format - use provided user_id or fall back to master key if user_id is not None: key = derive_user_key(user_id) - f = Fernet(key) + f = _fernet()(key) return SecretStr(f.decrypt(encrypted_value.encode()).decode()) # Fall back to master key for legacy secrets without user context @@ -195,7 +208,7 @@ def delete_secret(db: Session, secret_name: str, user_id: int) -> None: ) if not db_secret: - raise HTTPException(status_code=404, detail="Secret not found") + raise FlowfileHTTPException(status_code=404, detail="Secret not found") sharing.delete_grants_for_resource(db, "secret", db_secret.id) db.delete(db_secret) diff --git a/flowfile_core/flowfile_core/utils/arrow_reader.py b/flowfile_core/flowfile_core/utils/arrow_reader.py index da560cb08..b62cb25fe 100644 --- a/flowfile_core/flowfile_core/utils/arrow_reader.py +++ b/flowfile_core/flowfile_core/utils/arrow_reader.py @@ -1,6 +1,12 @@ +from __future__ import annotations + from collections.abc import Callable, Iterator +from typing import TYPE_CHECKING -import pyarrow as pa +if TYPE_CHECKING: + # pyarrow is expensive to import; every public helper here imports it on + # first call instead, keeping it off the `import flowfile_frame` path. + import pyarrow as pa from flowfile_core.configs import logger @@ -30,6 +36,8 @@ def open_validated_file(file_path: str, n: int) -> pa.OSFile: >>> # Use the file object >>> file.close() """ + import pyarrow as pa + logger.debug(f"Attempting to open file: {file_path} with n={n}") if n < 0: logger.error(f"Invalid negative row count requested: {n}") @@ -67,6 +75,8 @@ def create_reader(source: pa.OSFile) -> pa.ipc.RecordBatchFileReader: ... reader = create_reader(source) ... # Use the reader """ + import pyarrow as pa + try: reader = pa.ipc.open_file(source) logger.debug(f"Created reader with {reader.num_record_batches} batches") @@ -176,6 +186,8 @@ def read(file_path: str) -> pa.Table: >>> print(f"Read {len(table)} rows") >>> print(f"Columns: {table.column_names}") """ + import pyarrow as pa + logger.info(f"Reading entire file: {file_path}") with open_validated_file(file_path, 0) as source: reader = create_reader(source) @@ -212,6 +224,8 @@ def read_top_n(file_path: str, n: int = 1000, strict: bool = False) -> pa.Table: >>> # Read exactly 500 rows with strict checking >>> table = read_top_n("data.arrow", n=500, strict=True) """ + import pyarrow as pa + logger.info(f"Reading top {n} rows from {file_path} (strict={strict})") with open_validated_file(file_path, n) as source: reader = create_reader(source) diff --git a/flowfile_core/tests/conftest.py b/flowfile_core/tests/conftest.py index f047d9855..c04c89813 100644 --- a/flowfile_core/tests/conftest.py +++ b/flowfile_core/tests/conftest.py @@ -61,11 +61,10 @@ def is_port_in_use(port, host='localhost'): @pytest.fixture(scope="session", autouse=True) def setup_test_db(): """Setup the test database and clean up after tests""" - from flowfile_core.database.connection import engine, get_database_url - from flowfile_core.database.init_db import init_db + from flowfile_core.database.connection import engine, ensure_db_initialized, get_database_url from flowfile_core.database.models import Base - init_db() + ensure_db_initialized() yield diff --git a/flowfile_core/tests/flowfile/test_catalog_flow_graph.py b/flowfile_core/tests/flowfile/test_catalog_flow_graph.py index 12d8d70d0..a120929de 100644 --- a/flowfile_core/tests/flowfile/test_catalog_flow_graph.py +++ b/flowfile_core/tests/flowfile/test_catalog_flow_graph.py @@ -884,7 +884,7 @@ def test_resolve_non_optimized_virtual_table(self): mock_ctx.return_value.__enter__ = MagicMock(return_value=mock_db) mock_ctx.return_value.__exit__ = MagicMock(return_value=False) - with patch("flowfile_core.flowfile.flow_graph.CatalogService") as MockSvc: + with patch("flowfile_core.catalog.CatalogService") as MockSvc: mock_svc_instance = MagicMock() mock_svc_instance.resolve_virtual_flow_table.return_value = expected_lf MockSvc.return_value = mock_svc_instance @@ -905,7 +905,7 @@ def test_resolve_optimized_without_serialized_lf_falls_back(self): mock_ctx.return_value.__enter__ = MagicMock(return_value=mock_db) mock_ctx.return_value.__exit__ = MagicMock(return_value=False) - with patch("flowfile_core.flowfile.flow_graph.CatalogService") as MockSvc: + with patch("flowfile_core.catalog.CatalogService") as MockSvc: mock_svc_instance = MagicMock() mock_svc_instance.resolve_virtual_flow_table.return_value = expected_lf MockSvc.return_value = mock_svc_instance diff --git a/flowfile_core/tests/flowfile/test_flow_graph_utils.py b/flowfile_core/tests/flowfile/test_flow_graph_utils.py index 60a11333f..37149adff 100644 --- a/flowfile_core/tests/flowfile/test_flow_graph_utils.py +++ b/flowfile_core/tests/flowfile/test_flow_graph_utils.py @@ -1,7 +1,7 @@ import pytest -from fastapi import HTTPException +from flowfile_core.exceptions import FlowfileHTTPException from flowfile_core.flowfile.flow_graph import FlowGraph, add_connection, validate_connection from flowfile_core.flowfile.flow_graph_utils import _create_node_id_mapping, _validate_input, combine_flow_graphs @@ -456,7 +456,7 @@ def _build_chain_for_cycle_tests() -> FlowGraph: def test_add_connection_rejects_self_loop(): graph = _build_chain_for_cycle_tests() - with pytest.raises(HTTPException) as exc: + with pytest.raises(FlowfileHTTPException) as exc: add_connection(graph, input_schema.NodeConnection.create_from_simple_input(2, 2)) assert exc.value.status_code == 422 assert "cycle" in exc.value.detail.lower() @@ -465,7 +465,7 @@ def test_add_connection_rejects_self_loop(): def test_add_connection_rejects_direct_back_edge(): graph = _build_chain_for_cycle_tests() # 1 -> 2 exists; adding 2 -> 1 closes a 2-cycle. - with pytest.raises(HTTPException) as exc: + with pytest.raises(FlowfileHTTPException) as exc: add_connection(graph, input_schema.NodeConnection.create_from_simple_input(2, 1)) assert exc.value.status_code == 422 @@ -473,7 +473,7 @@ def test_add_connection_rejects_direct_back_edge(): def test_add_connection_rejects_transitive_back_edge(): graph = _build_chain_for_cycle_tests() # Chain is 1 -> 2 -> 3; closing 3 -> 1 would form a 3-cycle. - with pytest.raises(HTTPException) as exc: + with pytest.raises(FlowfileHTTPException) as exc: add_connection(graph, input_schema.NodeConnection.create_from_simple_input(3, 1)) assert exc.value.status_code == 422 @@ -493,7 +493,7 @@ def test_add_connection_rejects_wiring_into_source_node(): graph = create_graph(flow_id=1) add_manual_input(graph, [{"name": "john"}], node_id=1) add_manual_input(graph, [{"name": "jane"}], node_id=2) # second source - with pytest.raises(HTTPException) as exc: + with pytest.raises(FlowfileHTTPException) as exc: add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) assert exc.value.status_code == 422 assert "source node" in exc.value.detail @@ -529,7 +529,7 @@ def test_add_connection_cycle_into_transform_keeps_message(): graph = _build_chain_for_cycle_tests() # 1(source) -> 2 -> 3 # 3 -> 2 closes a cycle; node 2 is a record_id transform, not a source, so the # source-target check passes and the cycle check is what rejects it. - with pytest.raises(HTTPException) as exc: + with pytest.raises(FlowfileHTTPException) as exc: add_connection(graph, input_schema.NodeConnection.create_from_simple_input(3, 2)) assert exc.value.status_code == 422 assert "would create a cycle" in exc.value.detail diff --git a/flowfile_core/tests/project/test_path_security.py b/flowfile_core/tests/project/test_path_security.py index 80435c9c4..9457d191a 100644 --- a/flowfile_core/tests/project/test_path_security.py +++ b/flowfile_core/tests/project/test_path_security.py @@ -111,7 +111,7 @@ class TestValidatePathDockerMode: def test_dotdot_in_docker_mode_raises_403(self, monkeypatch): # Finding I1: docker branch had no explicit '..' guard pre-fix; post-fix rejects it. from flowfile_core.configs import settings - from fastapi import HTTPException + from flowfile_core.exceptions import FlowfileHTTPException as HTTPException monkeypatch.setattr(settings, "FLOWFILE_MODE", "docker") with pytest.raises(HTTPException) as exc_info: @@ -121,7 +121,7 @@ def test_dotdot_in_docker_mode_raises_403(self, monkeypatch): def test_dotdot_in_package_mode_raises_403(self, monkeypatch): # Finding I1: same guard applies in package mode. from flowfile_core.configs import settings - from fastapi import HTTPException + from flowfile_core.exceptions import FlowfileHTTPException as HTTPException monkeypatch.setattr(settings, "FLOWFILE_MODE", "package") with pytest.raises(HTTPException) as exc_info: @@ -134,7 +134,7 @@ def test_sibling_prefix_in_docker_mode_raises_403(self, tmp_path, monkeypatch): # but settings.FLOWFILE_MODE is bound at import time). from flowfile_core.configs import settings from shared.storage_config import storage - from fastapi import HTTPException + from flowfile_core.exceptions import FlowfileHTTPException as HTTPException monkeypatch.setattr(settings, "FLOWFILE_MODE", "docker") # Construct a sibling of the user_data_directory. @@ -165,7 +165,7 @@ class TestValidatePathElectronMode: def test_dotdot_in_electron_mode_raises_403(self, monkeypatch): # Non-regression: electron mode blocks '..' traversal too (added in the same fix). from flowfile_core.configs import settings - from fastapi import HTTPException + from flowfile_core.exceptions import FlowfileHTTPException as HTTPException monkeypatch.setattr(settings, "FLOWFILE_MODE", "electron") with pytest.raises(HTTPException) as exc_info: diff --git a/flowfile_core/tests/sharing/test_project_tenant_isolation.py b/flowfile_core/tests/sharing/test_project_tenant_isolation.py index 453045e9a..67d0dd846 100644 --- a/flowfile_core/tests/sharing/test_project_tenant_isolation.py +++ b/flowfile_core/tests/sharing/test_project_tenant_isolation.py @@ -1084,7 +1084,7 @@ def test_dotdot_in_docker_mode_raises_403(self, tmp_path, monkeypatch): # Finding I1: docker/package branch must reject .. sequences. monkeypatch.setenv("FLOWFILE_MODE", "docker") from flowfile_core.fileExplorer.funcs import validate_path_under_cwd - from fastapi import HTTPException + from flowfile_core.exceptions import FlowfileHTTPException as HTTPException with pytest.raises(HTTPException) as exc_info: validate_path_under_cwd("some/../../etc/passwd") @@ -1094,7 +1094,7 @@ def test_dotdot_in_package_mode_raises_403(self, tmp_path, monkeypatch): # Finding I1: same check applies in package mode. monkeypatch.setenv("FLOWFILE_MODE", "package") from flowfile_core.fileExplorer.funcs import validate_path_under_cwd - from fastapi import HTTPException + from flowfile_core.exceptions import FlowfileHTTPException as HTTPException with pytest.raises(HTTPException) as exc_info: validate_path_under_cwd("../secret") diff --git a/flowfile_core/tests/test_catalog_cloud_virtual.py b/flowfile_core/tests/test_catalog_cloud_virtual.py index 32a03855c..bb7905760 100644 --- a/flowfile_core/tests/test_catalog_cloud_virtual.py +++ b/flowfile_core/tests/test_catalog_cloud_virtual.py @@ -161,7 +161,7 @@ def test_resolve_virtual_table_skips_cloud_blob(self): mock_db = MagicMock() mock_ctx.return_value.__enter__ = MagicMock(return_value=mock_db) mock_ctx.return_value.__exit__ = MagicMock(return_value=False) - with patch("flowfile_core.flowfile.flow_graph.CatalogService") as MockSvc: + with patch("flowfile_core.catalog.CatalogService") as MockSvc: inst = MagicMock() inst.resolve_virtual_flow_table.return_value = expected MockSvc.return_value = inst diff --git a/flowfile_core/tests/test_endpoints.py b/flowfile_core/tests/test_endpoints.py index d9a9f7e2c..d860079ea 100644 --- a/flowfile_core/tests/test_endpoints.py +++ b/flowfile_core/tests/test_endpoints.py @@ -2337,7 +2337,7 @@ def test_validate_path_under_cwd_electron_allows_any_local_path(monkeypatch): def test_validate_path_under_cwd_electron_blocks_dot_dot(monkeypatch): """In electron mode, .. traversal must still be rejected with a 403 (the blocklist is kept on top of the normpath+startswith barrier).""" - from fastapi import HTTPException + from flowfile_core.exceptions import FlowfileHTTPException as HTTPException from flowfile_core.configs import settings from flowfile_core.fileExplorer.funcs import validate_path_under_cwd diff --git a/flowfile_core/tests/test_lazy_imports.py b/flowfile_core/tests/test_lazy_imports.py new file mode 100644 index 000000000..cae6aad3a --- /dev/null +++ b/flowfile_core/tests/test_lazy_imports.py @@ -0,0 +1,104 @@ +"""Import-weight contract for the programmatic API. + +`import flowfile_frame` / `import flowfile` build ETL graphs in-process and must +stay lightweight: they must NOT drag in the FastAPI server stack, Alembic, the +cloud/data heavyweights (boto3, deltalake, gcsfs, pyarrow), worker/HTTP clients +(requests, httpx, websockets), Docker/Kafka/Excel/YAML/crypto libraries, or +faker, and importing them must NOT create or migrate the catalog DB on disk +(that is deferred to first actual DB access / server startup — see +database/connection.ensure_db_initialized). + +Each check runs in a fresh subprocess because sys.modules is process-global: by +the time the pytest session reaches this file, other test modules have already +imported FastAPI, so an in-process assertion would be meaningless. +""" + +from __future__ import annotations + +import subprocess +import sys +import tempfile +from pathlib import Path + +# Server/heavy deps that must stay off the dataframe-API import path. +BANNED = [ + # server stack + "fastapi", + "uvicorn", + "alembic", + # cloud/data heavyweights + "boto3", + "botocore", + "deltalake", + "gcsfs", + "pyarrow", + # worker/HTTP clients (only needed when talking to worker/kernel/web) + "requests", + "httpx", + "websockets", + "docker", + # format/source specifics (load on first use of the matching node) + "openpyxl", + "fastexcel", + "confluent_kafka", + "yaml", + # secrets crypto (loads on first encrypt/decrypt) + "cryptography", + "passlib", + # sample-data generator + "faker", +] + + +def _run(script: str) -> str: + """Run a snippet in a fresh interpreter with a throwaway catalog DB path. + + PYTHONPATH is seeded from the parent's ``sys.path`` so the subprocess imports + the exact same packages this test session does (the package-under-test), not + whatever a bare interpreter's site config happens to resolve. + """ + import os + + with tempfile.TemporaryDirectory() as tmp: + env = { + **os.environ, + "FLOWFILE_DB_PATH": str(Path(tmp) / "catalog.db"), + "PYTHONPATH": os.pathsep.join(p for p in sys.path if p), + } + proc = subprocess.run( + [sys.executable, "-c", script], + capture_output=True, + text=True, + env=env, + ) + assert proc.returncode == 0, f"subprocess failed:\nstdout={proc.stdout}\nstderr={proc.stderr}" + return proc.stdout + + +def test_import_flowfile_frame_is_lightweight() -> None: + out = _run( + "import os, sys\n" + f"banned = {BANNED!r}\n" + "import flowfile_frame\n" + "leaked = [m for m in banned if m in sys.modules]\n" + "assert not leaked, f'import flowfile_frame leaked heavy modules: {leaked}'\n" + "assert not os.path.exists(os.environ['FLOWFILE_DB_PATH']), 'import created the catalog DB on disk'\n" + "print('ok')\n" + ) + assert "ok" in out + + +def test_import_flowfile_is_lightweight() -> None: + out = _run( + "import os, sys\n" + f"banned = {BANNED!r}\n" + "import flowfile\n" + "leaked = [m for m in banned if m in sys.modules]\n" + "assert not leaked, f'import flowfile leaked heavy modules: {leaked}'\n" + "assert not os.path.exists(os.environ['FLOWFILE_DB_PATH']), 'import created the catalog DB on disk'\n" + # The web-UI entry points stay resolvable (lazily) without being loaded at import. + "assert callable(flowfile.start_web_ui)\n" + "assert callable(flowfile.open_graph_in_editor)\n" + "print('ok')\n" + ) + assert "ok" in out diff --git a/flowfile_frame/flowfile_frame/catalog_reference.py b/flowfile_frame/flowfile_frame/catalog_reference.py index 3297f8299..3e8e3d110 100644 --- a/flowfile_frame/flowfile_frame/catalog_reference.py +++ b/flowfile_frame/flowfile_frame/catalog_reference.py @@ -25,7 +25,6 @@ from typing import TYPE_CHECKING, Literal, TypeAlias from flowfile_core.catalog import ( - CatalogService, NamespaceExistsError, SQLAlchemyCatalogRepository, ) @@ -34,6 +33,7 @@ if TYPE_CHECKING: from sqlalchemy.orm import Session + from flowfile_core.catalog import CatalogService from flowfile_core.database.models import CatalogNamespace from flowfile_core.flowfile.flow_graph import FlowGraph from flowfile_core.schemas.catalog_schema import CatalogTableOut @@ -48,6 +48,10 @@ def _get_current_user_id() -> int: def _get_service(db: Session) -> CatalogService: + # Deferred: CatalogService pulls the catalog schema/serializer stack, which + # is too heavy for the `import flowfile_frame` path. + from flowfile_core.catalog import CatalogService + return CatalogService(SQLAlchemyCatalogRepository(db)) diff --git a/flowfile_frame/flowfile_frame/cloud_storage/secret_manager.py b/flowfile_frame/flowfile_frame/cloud_storage/secret_manager.py index 0773174fb..1b6455707 100644 --- a/flowfile_frame/flowfile_frame/cloud_storage/secret_manager.py +++ b/flowfile_frame/flowfile_frame/cloud_storage/secret_manager.py @@ -1,4 +1,3 @@ -from flowfile_core.auth.jwt import create_access_token, get_current_user_sync from flowfile_core.database.connection import get_db_context from flowfile_core.flowfile.database_connection_manager.db_connections import ( delete_cloud_connection, @@ -9,6 +8,10 @@ def get_current_user_id() -> int | None: + # Imported lazily: flowfile_core.auth.jwt is a FastAPI router module, so a + # top-level import would drag FastAPI into `import flowfile_frame`. + from flowfile_core.auth.jwt import create_access_token, get_current_user_sync + access_token = create_access_token(data={"sub": "local_user"}) with get_db_context() as db: current_user_id = get_current_user_sync(access_token, db).id @@ -25,6 +28,8 @@ def create_cloud_storage_connection(connection: FullCloudStorageConnection) -> N Returns: None """ + from flowfile_core.auth.jwt import create_access_token, get_current_user_sync + access_token = create_access_token(data={"sub": "local_user"}) with get_db_context() as db: diff --git a/shared/cloud_storage/directory.py b/shared/cloud_storage/directory.py index c4907708a..752990f40 100644 --- a/shared/cloud_storage/directory.py +++ b/shared/cloud_storage/directory.py @@ -8,9 +8,6 @@ from typing import Any -import boto3 -from botocore.exceptions import ClientError - def get_first_file_from_cloud_dir(source: str, storage_options: dict[str, Any] | None = None) -> str: """Get the first file matching the extension from a cloud storage directory. @@ -145,6 +142,8 @@ def _remove_wildcards_from_prefix(prefix: str) -> str: def _create_s3_client(storage_options: dict[str, Any] | None): """Create boto3 S3 client with optional credentials.""" + import boto3 + if storage_options is None: return boto3.client("s3") @@ -158,6 +157,8 @@ def _create_s3_client(storage_options: dict[str, Any] | None): def _get_first_file(s3_client, bucket_name: str, base_prefix: str, file_extension: str) -> dict[Any, Any]: """List objects and return the first file matching the extension.""" + from botocore.exceptions import ClientError + try: paginator = s3_client.get_paginator("list_objects_v2") pages = paginator.paginate(Bucket=bucket_name, Prefix=base_prefix) diff --git a/shared/cloud_storage/gcs.py b/shared/cloud_storage/gcs.py index 58f7d4c15..e318979bc 100644 --- a/shared/cloud_storage/gcs.py +++ b/shared/cloud_storage/gcs.py @@ -12,9 +12,7 @@ from typing import Any, Literal from urllib.parse import urlparse -import gcsfs import polars as pl -from deltalake import DeltaTable, write_deltalake def use_pyarrow_for_gcs(storage_type: str, endpoint_url: str | None) -> bool: @@ -80,6 +78,7 @@ def get_lazy_frame_from_gcs_pyarrow_dataset( is_directory If True, glob/wildcard patterns are stripped to get the base directory path. """ + import gcsfs import pyarrow.dataset as ds clean_path = ( @@ -117,6 +116,7 @@ def sink_to_gcs( **kwargs Additional arguments passed to the underlying writer. """ + import gcsfs import pyarrow.parquet as pq fs = gcsfs.GCSFileSystem(**storage_options) @@ -161,6 +161,9 @@ def write_delta_to_gcs( partition_by Delta partition columns (applied at table creation). """ + import gcsfs + from deltalake import write_deltalake + clean_path = get_path_without_scheme(path) fs = gcsfs.GCSFileSystem(**storage_options) @@ -205,6 +208,9 @@ def scan_delta_from_gcs( pl.LazyFrame A lazy frame backed by the Delta table's PyArrow dataset. """ + import gcsfs + from deltalake import DeltaTable + fs = gcsfs.GCSFileSystem(**storage_options) clean_path = get_path_without_scheme(resource_path) diff --git a/shared/cloud_storage/utils.py b/shared/cloud_storage/utils.py index 1e6f058df..ddd9b3a3b 100644 --- a/shared/cloud_storage/utils.py +++ b/shared/cloud_storage/utils.py @@ -7,8 +7,6 @@ from typing import Any, Literal -import boto3 - def normalize_delta_path(resource_path: str) -> str: """Normalize az:// paths to abfss:// for delta-rs compatibility. @@ -42,6 +40,8 @@ def create_storage_options_from_boto_credentials( dict[str, Any] A storage options dictionary for Polars with explicit credentials. """ + import boto3 + session = boto3.Session(profile_name=profile_name, region_name=region_name) credentials = session.get_credentials() frozen_creds = credentials.get_frozen_credentials() diff --git a/shared/kafka/__init__.py b/shared/kafka/__init__.py index d69ec0f99..8c2cff588 100644 --- a/shared/kafka/__init__.py +++ b/shared/kafka/__init__.py @@ -1,6 +1,5 @@ """Shared Kafka consumer logic used by both flowfile_core and flowfile_worker.""" -from shared.kafka.consumer import commit_offsets, infer_topic_schema, make_kafka_commit_callback, read_kafka_source from shared.kafka.deserializers import DESERIALIZERS, JsonDeserializer, KafkaDeserializer from shared.kafka.models import DeferredKafkaCommit, KafkaReadSettings @@ -15,3 +14,16 @@ "make_kafka_commit_callback", "read_kafka_source", ] + +_CONSUMER_EXPORTS = ("commit_offsets", "infer_topic_schema", "make_kafka_commit_callback", "read_kafka_source") + + +def __getattr__(name: str): + # consumer.py imports confluent_kafka + pyarrow.ipc at module level; loading + # it eagerly would tax every `import shared.kafka.models` (e.g. the + # `import flowfile_frame` path). Resolve the consumer surface on first use. + if name in _CONSUMER_EXPORTS: + from shared.kafka import consumer + + return getattr(consumer, name) + raise AttributeError(f"module {__name__!r} has no attribute {name!r}")