Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions api/controllers/console/auth/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,21 @@ def get(self, provider: str):
account=account,
open_id=user_info.id,
token_response=token_response,
is_new_user=oauth_new_user,
)

if oauth_new_user:
from tasks.import_acedatacloud_workflow_templates_task import (
import_acedatacloud_workflow_templates_task,
)

tenants = TenantService.get_join_tenants(account)
tenant_id = account.current_tenant_id or (tenants[0].id if tenants else None)
if tenant_id:
import_acedatacloud_workflow_templates_task.delay(
tenant_id=str(tenant_id),
account_id=str(account.id),
)

token_pair = AccountService.login(
account=account,
ip_address=extract_remote_ip(request),
Expand Down Expand Up @@ -294,7 +306,7 @@ def get(self):


def _persist_acedatacloud_token(
*, account: Account, open_id: str, token_response: dict, is_new_user: bool = False
*, account: Account, open_id: str, token_response: dict
) -> None:
"""
Persist AceDataCloud access/refresh tokens for later use.
Expand Down Expand Up @@ -349,16 +361,6 @@ def _persist_acedatacloud_token(
acedatacloud_user_id=str(open_id),
acedatacloud_access_token=str(access_token),
)

from tasks.import_acedatacloud_workflow_templates_task import (
import_acedatacloud_workflow_templates_task,
)

import_acedatacloud_workflow_templates_task.delay(
tenant_id=str(tenant_id),
account_id=str(account.id),
is_new_user=is_new_user,
)
except Exception:
logger.exception("Failed to persist AceDataCloud token for account %s", account.id)

Expand Down
208 changes: 80 additions & 128 deletions api/tasks/import_acedatacloud_workflow_templates_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ def _get_workflow_files() -> list[Path]:
return sorted(WORKFLOWS_DIR.glob("*.yml"))


def _parse_workflow_yaml(wf_file: Path) -> dict:
"""Parse a workflow YAML file and return the parsed dict, or empty dict on failure."""
try:
parsed = yaml.safe_load(wf_file.read_text(encoding="utf-8"))
return parsed if isinstance(parsed, dict) else {}
except Exception:
return {}


def _already_imported(*, tenant_id: str, template_name: str) -> bool:
key = f"{REDIS_KEY_PREFIX}{tenant_id}:{template_name}"
return bool(redis_client.exists(key))
Expand All @@ -38,19 +47,6 @@ def _mark_imported(*, tenant_id: str, template_name: str, app_id: str) -> None:
redis_client.setex(key, REDIS_EXPIRY, app_id)


def _check_imported_by_name(*, session: Session, tenant_id: str, app_name: str) -> bool:
"""Fallback check: see if an app with this exact name already exists in the tenant."""
stmt = (
select(App.id)
.where(
App.tenant_id == tenant_id,
App.name == app_name,
)
.limit(1)
)
return session.execute(stmt).first() is not None


def _import_single_workflow(
*,
session: Session,
Expand All @@ -73,15 +69,17 @@ def _import_single_workflow(
if not yaml_content.strip():
return None

try:
parsed = yaml.safe_load(yaml_content)
app_name = parsed.get("app", {}).get("name", "") if isinstance(parsed, dict) else ""
except Exception:
app_name = ""
parsed = _parse_workflow_yaml(wf_file)
app_name = parsed.get("app", {}).get("name", "")

if app_name and _check_imported_by_name(session=session, tenant_id=tenant_id, app_name=app_name):
_mark_imported(tenant_id=tenant_id, template_name=template_name, app_id="exists")
return None
# Check if an app with the same name already exists in this tenant
if app_name:
existing = session.execute(
select(App.id).where(App.tenant_id == tenant_id, App.name == app_name).limit(1)
).scalar_one_or_none()
if existing:
_mark_imported(tenant_id=tenant_id, template_name=template_name, app_id=str(existing))
return None

account.current_tenant_id = tenant_id
dsl_service = AppDslService(session)
Expand All @@ -95,7 +93,7 @@ def _import_single_workflow(
if result.status in (ImportStatus.COMPLETED, ImportStatus.COMPLETED_WITH_WARNINGS):
session.commit()
_mark_imported(tenant_id=tenant_id, template_name=template_name, app_id=str(result.app_id or ""))
logger.info("AceDataCloud workflows: imported %s app_id=%s tenant=%s", template_name, result.app_id, tenant_id)
logger.info("AceDataCloud: imported %s app_id=%s tenant=%s", template_name, result.app_id, tenant_id)
return str(result.app_id) if result.app_id else None

if result.status == ImportStatus.PENDING:
Expand All @@ -105,97 +103,71 @@ def _import_single_workflow(
_mark_imported(
tenant_id=tenant_id, template_name=template_name, app_id=str(confirm_result.app_id or "")
)
logger.info(
"AceDataCloud workflows: imported (confirmed) %s app_id=%s tenant=%s",
template_name,
confirm_result.app_id,
tenant_id,
)
logger.info("AceDataCloud: imported (confirmed) %s app_id=%s", template_name, confirm_result.app_id)
return str(confirm_result.app_id) if confirm_result.app_id else None
session.rollback()
logger.warning("AceDataCloud workflows: confirm failed %s status=%s", template_name, confirm_result.status)
logger.warning("AceDataCloud: confirm failed %s status=%s", template_name, confirm_result.status)
return None

session.rollback()
logger.warning("AceDataCloud workflows: import failed %s status=%s error=%s", template_name, result.status, result.error)
logger.warning("AceDataCloud: import failed %s status=%s error=%s", template_name, result.status, result.error)
return None


def _ensure_explore_apps(*, session: Session, account: Account, tenant_id: str, workflow_files: list[Path]) -> None:
"""Import workflows into the given tenant and register them in Explore.
def _register_explore_apps(*, session: Session, tenant_id: str, workflow_files: list[Path]) -> None:
"""Register already-imported workflows in Explore. Does NOT import anything.

Idempotent: uses a Redis key to avoid re-running on every login.
Idempotent: uses a Redis key so this only runs once.
"""
if redis_client.exists(REDIS_EXPLORE_KEY):
return

logger.info("AceDataCloud workflows: setting up Explore apps in tenant=%s", tenant_id)
logger.info("AceDataCloud: registering Explore apps from tenant=%s", tenant_id)

for position, wf_file in enumerate(workflow_files):
template_name = wf_file.stem

# Try to import; returns app_id if newly created, None if already exists
app_id = _import_single_workflow(
session=session,
account=account,
tenant_id=tenant_id,
wf_file=wf_file,
)
parsed = _parse_workflow_yaml(wf_file)
app_name = parsed.get("app", {}).get("name", "")
if not app_name:
continue

# If we didn't get an app_id from import, look up the existing one
# Find the app that was already imported into this tenant
app_id = session.execute(
select(App.id).where(App.tenant_id == tenant_id, App.name == app_name).limit(1)
).scalar_one_or_none()
if not app_id:
try:
parsed = yaml.safe_load(wf_file.read_text(encoding="utf-8"))
app_name = parsed.get("app", {}).get("name", "") if isinstance(parsed, dict) else ""
except Exception:
app_name = ""

if app_name:
existing = session.execute(
select(App.id).where(App.tenant_id == tenant_id, App.name == app_name).limit(1)
).scalar_one_or_none()
if existing:
app_id = str(existing)
logger.warning("AceDataCloud: app not found for Explore: %s", wf_file.stem)
continue

if not app_id:
logger.warning("AceDataCloud workflows: could not get app_id for %s, skipping Explore", template_name)
app_id_str = str(app_id)

# Skip if already registered
if session.execute(
select(RecommendedApp.id).where(RecommendedApp.app_id == app_id_str).limit(1)
).first():
continue

# Check if already in Explore
already_recommended = session.execute(
select(RecommendedApp.id).where(RecommendedApp.app_id == app_id).limit(1)
).first()
if already_recommended:
app = session.get(App, app_id_str)
if not app:
continue

# Mark app as public and add RecommendedApp
app = session.get(App, app_id)
if app:
app.is_public = True
try:
parsed = yaml.safe_load(wf_file.read_text(encoding="utf-8"))
description = parsed.get("app", {}).get("description", "") if isinstance(parsed, dict) else ""
except Exception:
description = ""

recommended = RecommendedApp(
app_id=app_id,
description={"text": description},
copyright="AceDataCloud",
privacy_policy="https://acedata.cloud/privacy",
custom_disclaimer="",
language=EXPLORE_LANGUAGE,
category=EXPLORE_CATEGORY,
position=position,
is_listed=True,
)
session.add(recommended)
session.commit()
logger.info("AceDataCloud workflows: added %s to Explore (app_id=%s)", template_name, app_id)
app.is_public = True
recommended = RecommendedApp(
app_id=app_id_str,
description={"text": parsed.get("app", {}).get("description", "")},
copyright="AceDataCloud",
privacy_policy="https://acedata.cloud/privacy",
custom_disclaimer="",
language=EXPLORE_LANGUAGE,
category=EXPLORE_CATEGORY,
position=position,
is_listed=True,
)
session.add(recommended)
session.commit()
logger.info("AceDataCloud: added %s to Explore (app_id=%s)", wf_file.stem, app_id_str)

# Mark setup as done so we don't repeat on next login
redis_client.setex(REDIS_EXPLORE_KEY, REDIS_EXPIRY, "1")
logger.info("AceDataCloud workflows: Explore setup complete")
logger.info("AceDataCloud: Explore setup complete")


@shared_task(
Expand All @@ -211,50 +183,30 @@ def import_acedatacloud_workflow_templates_task(
*,
tenant_id: str,
account_id: str,
is_new_user: bool = False,
) -> None:
"""Import workflow templates for a new user and register them in Explore.

Only called once per new user from the OAuth callback.
"""
workflow_files = _get_workflow_files()
if not workflow_files:
logger.info("AceDataCloud workflows: no files found in %s", WORKFLOWS_DIR)
logger.info("AceDataCloud: no workflow files found in %s", WORKFLOWS_DIR)
return

logger.info(
"AceDataCloud workflows: found %d files. tenant=%s is_new_user=%s",
len(workflow_files),
tenant_id,
is_new_user,
)
logger.info("AceDataCloud: importing %d workflows for tenant=%s", len(workflow_files), tenant_id)

try:
with Session(db.engine) as session:
account: Account | None = session.get(Account, account_id)
if not account:
logger.error("AceDataCloud workflows: account not found. account_id=%s", account_id)
return

# 1) Always ensure Explore apps exist (idempotent, uses Redis guard)
_ensure_explore_apps(
session=session,
account=account,
tenant_id=tenant_id,
workflow_files=workflow_files,
)
with Session(db.engine) as session:
account: Account | None = session.get(Account, account_id)
if not account:
logger.error("AceDataCloud: account not found: %s", account_id)
return

# 2) For new users only: import workflows into their personal workspace
if is_new_user:
imported = 0
for wf_file in workflow_files:
app_id = _import_single_workflow(
session=session,
account=account,
tenant_id=tenant_id,
wf_file=wf_file,
)
if app_id:
imported += 1
logger.info("AceDataCloud workflows: new user import done. tenant=%s imported=%d", tenant_id, imported)
else:
logger.info("AceDataCloud workflows: existing user, skip workspace import. tenant=%s", tenant_id)
# 1) Import all workflows into user's workspace
imported = 0
for wf_file in workflow_files:
if _import_single_workflow(session=session, account=account, tenant_id=tenant_id, wf_file=wf_file):
imported += 1
logger.info("AceDataCloud: imported %d workflows for tenant=%s", imported, tenant_id)

except Exception:
logger.exception("AceDataCloud workflows: task failed. tenant=%s", tenant_id)
# 2) Register in Explore if not done yet (idempotent, only runs once)
_register_explore_apps(session=session, tenant_id=tenant_id, workflow_files=workflow_files)
Loading