Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions flowfile/flowfile/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
Utf8,
)

from flowfile.api import open_graph_in_editor
from flowfile.web import start_server as start_web_ui
from flowfile_core.flowfile import node_designer
from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine
from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn
Expand Down Expand Up @@ -249,4 +247,21 @@
"Field",
"start_web_ui",
]


def __getattr__(name: str):
# The web UI / editor entry points pull the FastAPI server stack (uvicorn,
# fastapi) and `requests`; resolve them lazily so a plain `import flowfile`
# for the dataframe API stays light.
if name == "open_graph_in_editor":
from flowfile.api import open_graph_in_editor

return open_graph_in_editor
if name == "start_web_ui":
from flowfile.web import start_server as start_web_ui

return start_web_ui
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


logging.getLogger("PipelineHandler").setLevel(logging.WARNING)
11 changes: 8 additions & 3 deletions flowfile/flowfile/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ def run_flow(flow_path: str, param_overrides: list[str] | None = None, run_id: i

OFFLOAD_TO_WORKER.set(False)

# DB init is no longer an import side effect; get_run_user_id below reads the
# catalog DB via shared models (bypassing core's get_db guard), so init here.
from flowfile_core.database.connection import ensure_db_initialized

ensure_db_initialized()

from flowfile_core.flowfile.manage.io_flowfile import open_flow

path = Path(flow_path)
Expand Down Expand Up @@ -150,9 +156,8 @@ def _complete_run_if_needed(

def _run_project_command(action: str | None, arg: str | None) -> None:
"""Headless project init/open/save — the proof that build-from-scratch works without the UI."""
from fastapi import HTTPException

from flowfile_core.auth.utils import get_local_user_id
from flowfile_core.exceptions import FlowfileHTTPException
from flowfile_core.fileExplorer.funcs import validate_path_under_cwd
from flowfile_core.project import project_sync

Expand All @@ -162,7 +167,7 @@ def _validated_folder(usage: str) -> str:
sys.exit(1)
try:
return validate_path_under_cwd(arg)
except HTTPException as e:
except FlowfileHTTPException as e:
print(f"Invalid project path: {e.detail}", file=sys.stderr)
sys.exit(1)

Expand Down
3 changes: 0 additions & 3 deletions flowfile_core/flowfile_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@
from shared._version import get_version

validate_setup()
from flowfile_core.database.init_db import init_db
from flowfile_core.flowfile.handler import FlowfileHandler

if "FLOWFILE_MODE" not in os.environ:
os.environ["FLOWFILE_MODE"] = "electron"

init_db()


class ServerRun:
exit: bool = False
Expand Down
5 changes: 4 additions & 1 deletion flowfile_core/flowfile_core/ai/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from sqlalchemy.orm import Session

from flowfile_core.database.connection import SessionLocal
from flowfile_core.database.connection import SessionLocal, ensure_db_initialized
from flowfile_core.database.models import AiAuditEvent

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -101,6 +101,7 @@ def record_event(event: AuditEvent, db: Session | None = None) -> AiAuditEvent:
db.add(row)
db.flush()
else:
ensure_db_initialized()
with SessionLocal() as session:
session.add(row)
session.commit()
Expand Down Expand Up @@ -140,6 +141,7 @@ def _apply(session: Session) -> AiAuditEvent | None:

if db is not None:
return _apply(db)
ensure_db_initialized()
with SessionLocal() as session:
row = _apply(session)
if row is None:
Expand Down Expand Up @@ -184,6 +186,7 @@ def _query(session: Session) -> list[AiAuditEvent]:

if db is not None:
return _query(db)
ensure_db_initialized()
with SessionLocal() as session:
return _query(session)

Expand Down
5 changes: 2 additions & 3 deletions flowfile_core/flowfile_core/ai/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,7 @@ def apply_diff(flow, diff: GraphDiff) -> ApplyResult:
# AI modules (executor, audit, providers) deliberately keep out of
# their import-time graph; this ``diff`` module imports lightly so
# tests can stub ``flow_graph`` symbols where needed.
from fastapi import HTTPException

from flowfile_core.exceptions import FlowfileHTTPException
from flowfile_core.flowfile.flow_graph import add_connection, delete_connection

for add in diff.additions:
Expand Down Expand Up @@ -670,7 +669,7 @@ def apply_diff(flow, diff: GraphDiff) -> ApplyResult:
connection = input_schema.NodeConnection.model_validate(c.connection)
try:
delete_connection(flow, connection)
except HTTPException as exc:
except FlowfileHTTPException as exc:
# Narrow catch: only the "Connection does not exist" 422 gets
# swallowed. ``add_connection``'s cycle-detection 422 lives in
# the other loop and is unaffected; any other HTTPException
Expand Down
3 changes: 2 additions & 1 deletion flowfile_core/flowfile_core/ai/diff_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from flowfile_core import flow_file_handler
from flowfile_core.ai import audit, diff
from flowfile_core.auth.jwt import get_current_active_user
from flowfile_core.database.connection import SessionLocal, get_db
from flowfile_core.database.connection import SessionLocal, ensure_db_initialized, get_db

router = APIRouter()

Expand Down Expand Up @@ -165,6 +165,7 @@ def _flip_audit_actions(audit_ids: list[int], action: audit.DiffAction) -> list[
if not audit_ids:
return []
updated: list[int] = []
ensure_db_initialized()
with SessionLocal() as session:
for aid in audit_ids:
row = audit.update_diff_action(aid, action, db=session)
Expand Down
3 changes: 2 additions & 1 deletion flowfile_core/flowfile_core/ai/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from sqlalchemy.orm import Session

from flowfile_core.ai.audit import query_events
from flowfile_core.database.connection import SessionLocal
from flowfile_core.database.connection import SessionLocal, ensure_db_initialized
from flowfile_core.database.models import AiAuditEvent

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -84,6 +84,7 @@ def _aggregate(session: Session) -> dict[str, float | int]:

if db is not None:
return _aggregate(db)
ensure_db_initialized()
with SessionLocal() as session:
return _aggregate(session)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,11 @@ def _handle_delete_connection(
staged_node_payload={"delete_connection": connection.model_dump()},
)

from fastapi import HTTPException
from flowfile_core.exceptions import FlowfileHTTPException

try:
delete_connection(flow, connection)
except HTTPException as exc:
except FlowfileHTTPException as exc:
# LLM-redundant-op tolerance: swallow only the "Connection does not
# exist" 422 (an ``update_node_settings`` in the same diff already
# implicitly rewired, so this delete targets a wire that's already
Expand Down
27 changes: 19 additions & 8 deletions flowfile_core/flowfile_core/auth/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,19 @@
import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from cryptography.fernet import Fernet


def _fernet() -> "type[Fernet]":
# cryptography's Rust bindings are costly to import and this module sits on
# the `import flowfile_frame` path (via secret_manager) — load on first use.
from cryptography.fernet import Fernet

return Fernet

from cryptography.fernet import Fernet

logger = logging.getLogger(__name__)

Expand All @@ -33,7 +44,7 @@ def __init__(self):
self.key_path = self.storage_path / ".secret_key"
if not self.key_path.exists():
with open(self.key_path, "wb") as f:
f.write(Fernet.generate_key())
f.write(_fernet().generate_key())
try:
os.chmod(self.key_path, 0o600)
except Exception as e:
Expand All @@ -55,7 +66,7 @@ def _read_store(self, service_name):
with open(path, "rb") as f:
data = f.read()

return json.loads(Fernet(key).decrypt(data).decode())
return json.loads(_fernet()(key).decrypt(data).decode())
except Exception as e:
logger.debug(f"Error reading from encrypted store: {e}")
return {}
Expand All @@ -66,7 +77,7 @@ def _write_store(self, service_name, data):
with open(self.key_path, "rb") as f:
key = f.read()

encrypted = Fernet(key).encrypt(json.dumps(data).encode())
encrypted = _fernet()(key).encrypt(json.dumps(data).encode())
path = self._get_store_path(service_name)

with open(path, "wb") as f:
Expand Down Expand Up @@ -151,7 +162,7 @@ def get_docker_secret_key() -> str | None:
if env_key:
env_key = env_key.strip().strip('"').strip("'")
try:
Fernet(env_key.encode())
_fernet()(env_key.encode())
return env_key
except Exception:
raise RuntimeError("FLOWFILE_MASTER_KEY is not a valid Fernet key") from None
Expand All @@ -161,7 +172,7 @@ def get_docker_secret_key() -> str | None:
try:
with open(secret_path) as f:
key = f.read().strip()
Fernet(key.encode())
_fernet()(key.encode())
return key
except Exception:
raise RuntimeError("Failed to read master key from Docker secret") from None
Expand Down Expand Up @@ -191,7 +202,7 @@ def generate_master_key() -> str:
Returns:
str: A new valid Fernet encryption key.
"""
return Fernet.generate_key().decode()
return _fernet().generate_key().decode()


def get_master_key():
Expand All @@ -218,6 +229,6 @@ def get_master_key():

key = get_password("flowfile", "master_key")
if not key:
key = Fernet.generate_key().decode()
key = _fernet().generate_key().decode()
set_password("flowfile", "master_key", key)
return key
3 changes: 2 additions & 1 deletion flowfile_core/flowfile_core/auth/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ def get_local_user_id() -> int:
"""Resolve the local_user's ID from the database for CLI execution."""
try:
from flowfile_core.database import models as db_models
from flowfile_core.database.connection import SessionLocal
from flowfile_core.database.connection import SessionLocal, ensure_db_initialized

ensure_db_initialized()
db = SessionLocal()
try:
local_user = db.query(db_models.User).filter(db_models.User.username == "local_user").first()
Expand Down
19 changes: 18 additions & 1 deletion flowfile_core/flowfile_core/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
* Domain exceptions (``CatalogError`` hierarchy)
"""

from typing import TYPE_CHECKING

from .exceptions import (
AmbiguousTableError,
CatalogError,
Expand Down Expand Up @@ -39,7 +41,22 @@
VisualizationNotFoundError,
)
from .repository import CatalogRepository, SQLAlchemyCatalogRepository
from .service import CatalogService

if TYPE_CHECKING:
from .service import CatalogService


def __getattr__(name: str):
# ``.service`` transitively builds the whole catalog schema/serializer stack
# (~150ms of pydantic model construction). flow_graph — and through it
# `import flowfile_frame` — imports this package eagerly, so resolve
# CatalogService on first use instead.
if name == "CatalogService":
from .service import CatalogService

return CatalogService
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


__all__ = [
"CatalogService",
Expand Down
9 changes: 7 additions & 2 deletions flowfile_core/flowfile_core/catalog/delta_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import logging
import shutil
from pathlib import Path
from typing import TYPE_CHECKING

import pyarrow as pa
from deltalake import DeltaTable
if TYPE_CHECKING:
import pyarrow as pa

from shared.delta_models import SourceTableVersion
from shared.delta_utils import get_delta_size_bytes
Expand Down Expand Up @@ -40,6 +41,8 @@ def check_source_versions_current(source_table_versions_json: str | None) -> boo
logger.warning("Could not parse source_table_versions JSON, treating as stale")
return False

from deltalake import DeltaTable

for sv in versions:
try:
current_version = DeltaTable(sv.file_path, without_files=True).version()
Expand Down Expand Up @@ -85,6 +88,8 @@ def get_delta_table_size_bytes(path: str | Path) -> int:

def read_delta_preview(path: str, n_rows: int = 100) -> pa.Table:
"""Read the first N rows from a Delta table using PyArrow."""
from deltalake import DeltaTable

dt = DeltaTable(str(path))

dataset = dt.to_pyarrow_dataset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import polars as pl

from flowfile_core.catalog.delta_utils import is_delta_table
from flowfile_core.database.connection import SessionLocal
from flowfile_core.database.connection import SessionLocal, ensure_db_initialized
from flowfile_core.database.models import CatalogTable

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -88,6 +88,7 @@ def migrate_table(table: CatalogTable, *, dry_run: bool = False) -> bool:


def main(dry_run: bool = False) -> int:
ensure_db_initialized()
db = SessionLocal()
try:
tables = db.query(CatalogTable).filter(CatalogTable.storage_format == "parquet").all()
Expand Down
Loading
Loading