Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,27 @@ services:
- ./workflow_data:/data
- ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config

# Celery worker for structure extraction
worker-structure:
image: unstract/backend:${VERSION}
container_name: unstract-worker-structure
restart: unless-stopped
entrypoint: .venv/bin/celery
command: "-A backend.workers.structure worker --loglevel=info -Q structure_extraction --autoscale=${WORKER_STRUCTURE_AUTOSCALE}"
env_file:
- ../backend/.env
depends_on:
- rabbitmq
- db
environment:
- ENVIRONMENT=development
- APPLICATION_NAME=unstract-worker-structure
labels:
- traefik.enable=false
volumes:
- ./workflow_data:/data
- ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config

# Celery Flower
celery-flower:
image: unstract/backend:${VERSION}
Expand Down
5 changes: 5 additions & 0 deletions docker/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ WORKER_FILE_PROCESSING_NEW_AUTOSCALE=8,2 # File processing unified worker aut
WORKER_NOTIFICATION_AUTOSCALE=4,1 # Notification worker autoscale
WORKER_LOG_CONSUMER_AUTOSCALE=2,1 # Log consumer worker autoscale
WORKER_SCHEDULER_AUTOSCALE=2,1 # Scheduler worker autoscale
WORKER_STRUCTURE_AUTOSCALE=4,1 # Structure tool worker autoscale

# Worker-specific configurations
API_DEPLOYMENT_WORKER_NAME=api-deployment-worker
Expand All @@ -38,6 +39,10 @@ GENERAL_WORKER_NAME=general-worker
GENERAL_HEALTH_PORT=8081
GENERAL_MAX_CONCURRENT_TASKS=10

STRUCTURE_WORKER_NAME=structure-worker
STRUCTURE_HEALTH_PORT=8088
STRUCTURE_MAX_CONCURRENT_TASKS=4

# =============================================================================
# HIERARCHICAL CELERY CONFIGURATION SYSTEM
# =============================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class ToolExecution:
# Offset for step adjustment: Converts zero-based indexing to one-based
# for readability
STEP_ADJUSTMENT_OFFSET: int = 1
STRUCTURE_TOOL_IMAGE_IDENTIFIER = "tool-structure"


class ToolRuntimeVariable:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,39 @@ def run_tool(
file_execution_id: str,
tool_sandbox: ToolSandbox,
) -> RunnerContainerRunResponse | None:
return self.run_tool_with_retry(file_execution_id, tool_sandbox)
# Check if this is a structure tool and feature flag is enabled
is_structure_tool = (
ToolExecution.STRUCTURE_TOOL_IMAGE_IDENTIFIER in tool_sandbox.image_name
)
use_celery = False

if is_structure_tool:
# Check feature flag for structure tool execution
use_celery = self._should_use_celery_for_structure()

if use_celery:
# Celery execution path - bypass Runner entirely
logger.info(
f"Routing structure tool to Celery worker for file_execution_id={file_execution_id}"
)
return self._dispatch_celery_task(
task_name="structure",
file_execution_id=file_execution_id,
tool_sandbox=tool_sandbox,
)
else:
# Docker execution path (original)
return self.run_tool_with_retry(file_execution_id, tool_sandbox)

def _should_use_celery_for_structure(self) -> bool:
"""Check feature flag to determine if structure tool should use Celery.

Returns:
bool: True if Celery should be used, False for Docker
"""
from unstract.flags.feature_flag import check_feature_flag_status

return check_feature_flag_status("use_structure_celery_task")

def run_tool_with_retry(
self,
Expand Down Expand Up @@ -207,6 +239,136 @@ def run_tool_with_retry(
raise e
return None

def _dispatch_celery_task(
self,
task_name: str,
file_execution_id: str,
tool_sandbox: ToolSandbox,
) -> RunnerContainerRunResponse:
"""Dispatch execution to Celery task instead of Docker.

Args:
task_name: Name of the task (e.g., "structure")
file_execution_id: File execution ID
tool_sandbox: ToolSandbox instance containing settings and context

Returns:
RunnerContainerRunResponse with task result
"""
from celery import current_app

from unstract.tool_sandbox.dto import RunnerContainerRunStatus

# Map task_name to full Celery task path
task_map = {
"structure": "structure.execute_extraction",
# Future: Add other migrated tasks here
}

full_task_name = task_map.get(task_name)
if not full_task_name:
raise ValueError(f"Unknown Celery task: {task_name}")

# Get execution context from file handler
from unstract.workflow_execution.execution_file_handler import (
ExecutionFileHandler,
)

file_handler = ExecutionFileHandler(
workflow_id=self.workflow_id,
execution_id=tool_sandbox.helper.execution_id,
organization_id=self.organization_id,
file_execution_id=file_execution_id,
)

# Get metadata to extract execution context
try:
metadata = file_handler.get_workflow_metadata()
except Exception:
# If metadata doesn't exist yet, create minimal metadata
metadata = {
"source_name": "INFILE",
"source_hash": "",
"tags": [],
"llm_profile_id": None,
"custom_data": {},
}

# Get source file path and output directory
source_file_path = file_handler.infile
_, source_file_name = (
os.path.split(source_file_path) if source_file_path else ("", "INFILE")
)

# Determine output directory based on tool instance
tool_instance_id = tool_sandbox.get_tool_instance_id()
if tool_instance_id:
output_dir = f"{file_handler.file_execution_dir}/{tool_instance_id}"
else:
output_dir = file_handler.file_execution_dir

# Build task parameters
task_kwargs = {
"settings": tool_sandbox.get_tool_instance_settings(),
"file_execution_id": file_execution_id,
"organization_id": self.organization_id,
"workflow_id": self.workflow_id,
"tool_instance_id": tool_instance_id or "",
"platform_api_key": self.platform_service_api_key,
"execution_id": tool_sandbox.helper.execution_id,
"source_file_name": source_file_name,
"input_file_path": source_file_path or "",
"output_dir": output_dir or "",
"exec_metadata": {
"source_hash": metadata.get("source_hash", ""),
"llm_profile_id": metadata.get("llm_profile_id"),
"custom_data": metadata.get("custom_data", {}),
},
"tags": metadata.get("tags", []),
}

# Send task to Celery
logger.info(
f"Dispatching Celery task {full_task_name} for file_execution_id={file_execution_id}"
)

# Determine queue name based on task
queue_name = "structure_extraction" if task_name == "structure" else "celery"

task = current_app.send_task(
full_task_name,
kwargs=task_kwargs,
queue=queue_name,
)

# Wait for result (blocking)
try:
result = task.get(timeout=7200) # 2 hour timeout

# Convert Celery result to RunnerContainerRunResponse format
logger.info(
f"Celery task {full_task_name} completed successfully for "
f"file_execution_id={file_execution_id}"
)

return RunnerContainerRunResponse(
type="RESULT",
status=RunnerContainerRunStatus.SUCCESS,
result=result,
error=None,
)
except Exception as e:
logger.error(
f"Celery task {full_task_name} failed for "
f"file_execution_id={file_execution_id}: {e}"
)
return RunnerContainerRunResponse(
type="RESULT",
status=RunnerContainerRunStatus.ERROR,
result=None,
error=str(e),
)

def get_tool_environment_variables(self) -> dict[str, Any]:
"""Obtain a dictionary of env variables required by a tool.

Expand Down
5 changes: 5 additions & 0 deletions workers/shared/enums/worker_enums_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class WorkerType(str, Enum):
NOTIFICATION = "notification"
LOG_CONSUMER = "log_consumer"
SCHEDULER = "scheduler"
STRUCTURE = "structure"

@classmethod
def from_directory_name(cls, name: str) -> "WorkerType":
Expand Down Expand Up @@ -110,6 +111,7 @@ def to_health_port(self) -> int:
WorkerType.NOTIFICATION: 8085,
WorkerType.LOG_CONSUMER: 8086,
WorkerType.SCHEDULER: 8087,
WorkerType.STRUCTURE: 8088,
}
return port_mapping.get(self, 8080)

Expand Down Expand Up @@ -147,6 +149,9 @@ class QueueName(str, Enum):
# Scheduler queue
SCHEDULER = "scheduler"

# Structure extraction queue
STRUCTURE = "structure_extraction"

def to_env_var_name(self) -> str:
"""Convert queue name to environment variable name.

Expand Down
5 changes: 5 additions & 0 deletions workers/structure/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Structure extraction worker module."""

from worker import app
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use explicit relative import for worker module.

The import from worker import app uses an implicit import which may fail in Python 3 if worker is not in sys.path. Since worker.py is in the same package (workers/structure/), use an explicit relative import.

🐛 Proposed fix
-from worker import app
+from .worker import app
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from worker import app
from .worker import app
🤖 Prompt for AI Agents
In `@workers/structure/__init__.py` at line 3, Replace the implicit top-level
import "from worker import app" with an explicit relative import so the
package-local module is resolved correctly; change the import in
workers.structure.__init__.py to import the app symbol from the sibling module
named worker (i.e., use a relative import of worker and import app).


__all__ = ["app"]
108 changes: 108 additions & 0 deletions workers/structure/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
class SettingsKeys:
TOOL_INSTANCE_ID = "tool_instance_id"
PROMPT_REGISTRY_ID = "prompt_registry_id"
PROMPT_HOST = "PROMPT_HOST"
PROMPT_PORT = "PROMPT_PORT"
TOOL_METADATA = "tool_metadata"
TOOL_ID = "tool_id"
OUTPUTS = "outputs"
NAME = "name"
ACTIVE = "active"
PROMPT = "prompt"
CHUNK_SIZE = "chunk-size"
VECTOR_DB = "vector-db"
EMBEDDING = "embedding"
X2TEXT_ADAPTER = "x2text_adapter"
CHUNK_OVERLAP = "chunk-overlap"
LLM = "llm"
RETRIEVAL_STRATEGY = "retrieval-strategy"
SIMPLE = "simple"
TYPE = "type"
NUMBER = "number"
EMAIL = "email"
DATE = "date"
BOOLEAN = "boolean"
JSON = "json"
PREAMBLE = "preamble"
SIMILARITY_TOP_K = "similarity-top-k"
PROMPT_TOKENS = "prompt_tokens"
COMPLETION_TOKENS = "completion_tokens"
TOTAL_TOKENS = "total_tokens"
RESPONSE = "response"
POSTAMBLE = "postamble"
GRAMMAR = "grammar"
WORD = "word"
SYNONYMS = "synonyms"
OUTPUTS = "outputs"
SECTION = "section"
DEFAULT = "default"
AUTHOR = "author"
ICON = "icon"
TOOL_ID = "tool_id"
# PDF_TO_TEXT_CONVERTER = "pdf-to-text-converters"
REINDEX = "reindex"
STRUCTURE_OUTPUT = "structure_output"
TOOL_SETTINGS = "tool_settings"
ENABLE_SINGLE_PASS_EXTRACTION = "enable_single_pass_extraction"
CHALLENGE_LLM = "challenge_llm"
ENABLE_CHALLENGE = "enable_challenge"
SINGLE_PASS_EXTRACTION_MODE = "single_pass_extraction_mode"
CHALLENGE_LLM_ADAPTER_ID = "challenge_llm_adapter_id"
SUMMARIZE_AS_SOURCE = "summarize_as_source"
SUMMARIZE_PROMPT = "summarize_prompt"
CONTEXT = "context"
ERROR = "error"
LLM_ADAPTER_INSTANCE_ID = "llm_adapter_instance_id"
RUN_ID = "run_id"
PROMPT_KEYS = "prompt_keys"
DATA = "data"
EXTRACT = "EXTRACT"
SUMMARIZE = "SUMMARIZE"
STATUS = "status"
OK = "OK"
FILE_NAME = "file_name"
FILE_HASH = "file_hash"
ENABLE_HIGHLIGHT = "enable_highlight"
ENABLE_WORD_CONFIDENCE = "enable_word_confidence"
NAME = "name"
INCLUDE_METADATA = "include_metadata"
TABLE_SETTINGS = "table_settings"
INPUT_FILE = "input_file"
METADATA = "metadata"
EPILOGUE = "epilogue"
HIGHLIGHT_DATA = "highlight_data"
CONFIDENCE_DATA = "confidence_data"
FILE_PATH = "file_path"
EXECUTION_SOURCE = "execution_source"
TOOL = "tool"
METRICS = "metrics"
INDEXING = "indexing"
EXECUTION_ID = "execution_id"
IS_DIRECTORY_MODE = "is_directory_mode"
LLM_PROFILE_ID = "llm_profile_id"
CUSTOM_DATA = "custom_data"
OUTPUT = "output" # For API deployment response format compatibility


class IndexingConstants:
TOOL_ID = "tool_id"
EMBEDDING_INSTANCE_ID = "embedding_instance_id"
VECTOR_DB_INSTANCE_ID = "vector_db_instance_id"
X2TEXT_INSTANCE_ID = "x2text_instance_id"
FILE_PATH = "file_path"
CHUNK_SIZE = "chunk_size"
CHUNK_OVERLAP = "chunk_overlap"
REINDEX = "reindex"
FILE_HASH = "file_hash"
OUTPUT_FILE_PATH = "output_file_path"
ENABLE_HIGHLIGHT = "enable_highlight"
USAGE_KWARGS = "usage_kwargs"
PROCESS_TEXT = "process_text"
EXTRACTED_TEXT = "extracted_text"
TAGS = "tags"
EXECUTION_SOURCE = "execution_source"
DOC_ID = "doc_id"
TOOL_EXECUTION_METATADA = "tool_execution_metadata"
EXECUTION_DATA_DIR = "execution_data_dir"
RUN_ID = "run_id"
EXECUTION_ID = "execution_id"
Loading