Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 19 additions & 24 deletions workers/api-deployment/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ def _unified_api_execution(
Returns:
Execution result dictionary
"""
api_client = None
try:
# Set up execution context using shared utilities
organization_id = schema_name
config, api_client = WorkerExecutionContext.setup_execution_context(
_, api_client = WorkerExecutionContext.setup_execution_context(
organization_id, execution_id, workflow_id
)

Expand Down Expand Up @@ -233,22 +234,13 @@ def _unified_api_execution(
f"files_processed={len(converted_files)}",
)

# CRITICAL: Clean up StateStore to prevent data leaks between tasks
try:
from shared.infrastructure.context import StateStore

StateStore.clear_all()
logger.debug("🧹 Cleaned up StateStore context to prevent data leaks")
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup StateStore context: {cleanup_error}")

return result

except Exception as e:
logger.error(f"API execution failed: {e}")

# Handle execution error with standardized pattern
if "api_client" in locals():
if api_client is not None:
WorkerExecutionContext.handle_execution_error(
api_client, execution_id, e, logger, f"api_execution_{task_type}"
)
Expand All @@ -261,26 +253,29 @@ def _unified_api_execution(
f"error={str(e)}",
)

# CRITICAL: Clean up StateStore to prevent data leaks between tasks (error path)
try:
from shared.infrastructure.context import StateStore

StateStore.clear_all()
logger.debug(
"🧹 Cleaned up StateStore context to prevent data leaks (error path)"
)
except Exception as cleanup_error:
logger.warning(
f"Failed to cleanup StateStore context on error: {cleanup_error}"
)

return {
"execution_id": execution_id,
"status": "ERROR",
"error": str(e),
"files_processed": 0,
}

finally:
# Clean up API client session to prevent socket FD leaks
if api_client is not None:
try:
api_client.close()
except Exception as e:
logger.debug("api_client.close() failed during cleanup: %s", e)

# Clean up StateStore to prevent data leaks between tasks
try:
from shared.infrastructure.context import StateStore

StateStore.clear_all()
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup StateStore context: {cleanup_error}")


@app.task(
bind=True,
Expand Down
Loading