UN-3085 Structure tool made into a celery task#1779
UN-3085 Structure tool made into a celery task#1779Deepak-Kesavan wants to merge 1 commit intomainfrom
Conversation
Summary by CodeRabbitRelease Notes
WalkthroughThis pull request introduces a new Celery-based structure extraction worker service to the system. The changes add a worker-structure service to Docker Compose, extend the workflow execution layer to route structure tools through Celery based on a feature flag, and implement a comprehensive structure extraction worker with task orchestration, configuration helpers, and health checks. Changes
Sequence Diagram(s)sequenceDiagram
participant WE as Workflow<br/>Execution
participant Tools as ToolsUtils
participant Celery as Celery<br/>Broker
participant SW as Structure<br/>Worker
participant Storage as File<br/>Storage
participant PT as PromptTool<br/>Service
WE->>Tools: run_tool(structure_tool)
Tools->>Tools: _should_use_celery_for_structure()
alt Feature Flag Enabled
Tools->>Tools: _dispatch_celery_task()
Tools->>Storage: Gather execution context<br/>& metadata
Tools->>Celery: Send task with kwargs
Note over Celery: structure_extraction queue
Celery->>SW: Dispatch to worker
SW->>Storage: Read settings & inputs
SW->>PT: Create PromptTool client
alt Agentic Tool
SW->>PT: _run_agentic_extraction()
else Prompt-Studio Tool
SW->>Storage: dynamic_extraction()
PT->>PT: Execute extraction
SW->>Storage: dynamic_indexing()
end
SW->>Storage: Write structured_output
SW->>Celery: Return result
Celery->>Tools: Task result
Tools->>WE: RunnerContainerRunResponse
else Feature Flag Disabled
Tools->>WE: run_tool_with_retry()<br/>(Docker path)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
|
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In `@workers/structure/__init__.py`:
- Line 3: Replace the implicit top-level import "from worker import app" with an
explicit relative import so the package-local module is resolved correctly;
change the import in workers.structure.__init__.py to import the app symbol from
the sibling module named worker (i.e., use a relative import of worker and
import app).
In `@workers/structure/tasks.py`:
- Around line 269-280: The code currently reads PROMPT_HOST and PROMPT_PORT into
local variables (prompt_host, prompt_port) then constructs PromptTool, but
STHelper.dynamic_extraction and STHelper.dynamic_indexing later call
tool.get_env_or_die which reads os.environ directly and will fail if the keys
are not present; before creating PromptTool (or before any STHelper calls) call
os.environ.setdefault(SettingsKeys.PROMPT_HOST, prompt_host) and
os.environ.setdefault(SettingsKeys.PROMPT_PORT, prompt_port) so the defaults
persist in the environment, ensuring tool.get_env_or_die (and
STHelper.dynamic_extraction/dynamic_indexing) can find them.
- Around line 296-326: Replace the broad except around get_prompt_studio_tool
with a specific catch for RequestException and only fall back to
get_agentic_studio_tool when the caught RequestException indicates a 404 from
the service; for any other RequestException (no response or non-404 status)
re-raise it so auth/network errors propagate. Concretely, change the try/except
that calls get_prompt_studio_tool (and references prompt_registry_id,
exported_tool, SettingsKeys.TOOL_METADATA, logger) to except RequestException as
e: check e.response exists and e.response.status_code == 404 to proceed to the
agentic lookup, otherwise raise e; keep the existing SdkError handling for
agentic lookup failures.
In `@workers/structure/utils.py`:
- Around line 1-3: The workers package is missing the json_repair dependency
used by repair_json_with_best_structure() in workers/structure/utils.py; add
"json_repair" to the dependencies list in workers/pyproject.toml (under
[tool.poetry.dependencies] or the equivalent dependencies section) so the import
from json_repair and usage in repair_json_with_best_structure() will not fail at
runtime.
In `@workers/structure/worker.py`:
- Around line 61-69: Replace the hardcoded "healthy" status in the Celery task
healthcheck with the real result from check_structure_health(): call
check_structure_health() inside the healthcheck(self) task, use its returned
status (or entire result) for the "status" field (and include any diagnostic
info it returns) while preserving existing fields like "worker_type", "task_id"
(self.request.id) and "worker_name" (config.worker_name fallback); update
references to the function name check_structure_health and the healthcheck task
to ensure the task reflects DEGRADED/unhealthy states from the API client.
🧹 Nitpick comments (7)
unstract/workflow-execution/src/unstract/workflow_execution/tools_utils.py (3)
263-266: Consider using constants for queue and task names.The queue name
"structure_extraction"and task path"structure.execute_extraction"are hardcoded. For consistency and maintainability, consider using theQueueName.STRUCTUREconstant fromworkers/shared/enums/worker_enums_base.py.♻️ Suggested refactor
+ from workers.shared.enums.worker_enums_base import QueueName + # Map task_name to full Celery task path task_map = { - "structure": "structure.execute_extraction", + "structure": "structure.tasks.execute_structure_extraction", # Future: Add other migrated tasks here } ... # Determine queue name based on task - queue_name = "structure_extraction" if task_name == "structure" else "celery" + queue_name = QueueName.STRUCTURE.value if task_name == "structure" else QueueName.GENERAL.valueAlso applies to: 335-336
360-370: Uselogger.exceptionto preserve stack trace on task failure.When logging the Celery task failure, using
logger.exceptioninstead oflogger.errorwill automatically include the full stack trace, which aids debugging.♻️ Proposed fix
except Exception as e: - logger.error( + logger.exception( f"Celery task {full_task_name} failed for " f"file_execution_id={file_execution_id}: {e}" )
285-295: Metadata fallback catches overly broad exception.Catching a bare
Exceptionwhen loading metadata may mask unexpected errors. Consider catching the specificFileMetadataJsonNotFoundexception (referenced in theExecutionFileHandler.get_workflow_metadatasnippet) or at minimum log the exception for debugging.♻️ Proposed improvement
+ from unstract.workflow_execution.exceptions import FileMetadataJsonNotFound + # Get metadata to extract execution context try: metadata = file_handler.get_workflow_metadata() - except Exception: + except FileMetadataJsonNotFound: # If metadata doesn't exist yet, create minimal metadata + logger.debug("Metadata file not found, using minimal metadata") metadata = { "source_name": "INFILE", "source_hash": "", "tags": [], "llm_profile_id": None, "custom_data": {}, }workers/structure/utils.py (1)
22-27: Naive plural handling may produce incorrect labels.The logic that strips a trailing 's' to singularize labels works for simple cases like "items" → "item" but will produce incorrect results for words like "status" → "statu", "address" → "addres", or "analysis" → "analysi".
Given the TODO comment acknowledges this limitation, consider whether this is acceptable for the expected data or if a more robust approach (e.g., using a library like
inflect) would be worthwhile.workers/structure/constants.py (2)
8-41: Remove duplicateSettingsKeysentries.
OUTPUTS,TOOL_ID, andNAMEare defined twice; the later assignments override earlier ones and add noise.♻️ Suggested cleanup
@@ - OUTPUTS = "outputs" @@ - TOOL_ID = "tool_id" @@ - NAME = "name"Also applies to: 67-67
103-106: Fix typo inTOOL_EXECUTION_METADATAconstant name.
TOOL_EXECUTION_METATADAis misspelled; correcting now avoids spreading the typo.✏️ Rename constant
- TOOL_EXECUTION_METATADA = "tool_execution_metadata" + TOOL_EXECUTION_METADATA = "tool_execution_metadata"🔧 Update usages (outside this file)
--- a/workers/structure/helpers.py +++ b/workers/structure/helpers.py @@ - IKeys.TOOL_EXECUTION_METATADA: tool.get_exec_metadata, + IKeys.TOOL_EXECUTION_METADATA: tool.get_exec_metadata, @@ - IKeys.TOOL_EXECUTION_METATADA: tool.get_exec_metadata, + IKeys.TOOL_EXECUTION_METADATA: tool.get_exec_metadata,workers/structure/tasks.py (1)
400-402: Use a randomized temp dir to avoid predictable/tmppaths.Predictable temp paths can collide across retries and are flagged as unsafe; prefer
tempfilewith a prefix that includes the execution id.♻️ Suggested fix
@@ -import os +import os +import tempfile @@ - tool_data_dir = Path(f"/tmp/structure_{file_execution_id}") + tool_data_dir = Path( + tempfile.mkdtemp(prefix=f"structure_{file_execution_id}_") + )
| @@ -0,0 +1,5 @@ | |||
| """Structure extraction worker module.""" | |||
|
|
|||
| from worker import app | |||
There was a problem hiding this comment.
Use explicit relative import for worker module.
The import from worker import app uses an implicit import which may fail in Python 3 if worker is not in sys.path. Since worker.py is in the same package (workers/structure/), use an explicit relative import.
🐛 Proposed fix
-from worker import app
+from .worker import app📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from worker import app | |
| from .worker import app |
🤖 Prompt for AI Agents
In `@workers/structure/__init__.py` at line 3, Replace the implicit top-level
import "from worker import app" with an explicit relative import so the
package-local module is resolved correctly; change the import in
workers.structure.__init__.py to import the app symbol from the sibling module
named worker (i.e., use a relative import of worker and import app).
| # Create SDK1 service clients using WorkerToolContext | ||
| prompt_host = os.getenv(SettingsKeys.PROMPT_HOST, "prompt-service") | ||
| prompt_port = os.getenv(SettingsKeys.PROMPT_PORT, "3003") | ||
| platform_host = os.getenv(ToolEnv.PLATFORM_HOST, "backend") | ||
| platform_port = os.getenv(ToolEnv.PLATFORM_PORT, "8000") | ||
|
|
||
| responder: PromptTool = PromptTool( | ||
| tool=tool_context, | ||
| prompt_host=prompt_host, | ||
| prompt_port=prompt_port, | ||
| request_id=file_execution_id, | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, let's check what the provided search finds
rg -n "PROMPT_HOST|PROMPT_PORT" -g "*.yml" -g "*.yaml" -g "*.env*" -g "*.py" | head -50Repository: Zipstack/unstract
Length of output: 3918
🏁 Script executed:
#!/bin/bash
# Let's also find the StructureToolHelper and understand how it uses get_env_or_die
rg -n "StructureToolHelper|get_env_or_die" --type py -A 3 -B 1 | head -100Repository: Zipstack/unstract
Length of output: 6937
🏁 Script executed:
#!/bin/bash
# Let's look at the actual file in question to understand the context better
wc -l workers/structure/tasks.pyRepository: Zipstack/unstract
Length of output: 91
🏁 Script executed:
#!/bin/bash
# Read the full context around lines 269-280 in tasks.py
sed -n '260,310p' workers/structure/tasks.pyRepository: Zipstack/unstract
Length of output: 2069
🏁 Script executed:
#!/bin/bash
# Also check where STHelper is called to understand the code flow
rg -n "STHelper|StructureToolHelper" workers/structure/tasks.py -A 5 -B 2Repository: Zipstack/unstract
Length of output: 1119
🏁 Script executed:
#!/bin/bash
# Check the get_env_or_die implementation in the tool/stream.py
sed -n '130,160p' unstract/sdk1/src/unstract/sdk1/tool/stream.pyRepository: Zipstack/unstract
Length of output: 917
Set PROMPT_HOST/PROMPT_PORT defaults into os.environ before STHelper calls.
Defaults are obtained locally via os.getenv (lines 270–271), but STHelper.dynamic_extraction (line 438) and STHelper.dynamic_indexing (line 492) later call tool.get_env_or_die for the same keys, which checks os.environ directly. If these environment variables aren't set at runtime, get_env_or_die will raise an error, even though local defaults exist. Use os.environ.setdefault() to persist the defaults.
Suggested fix
# Create SDK1 service clients using WorkerToolContext
prompt_host = os.getenv(SettingsKeys.PROMPT_HOST, "prompt-service")
prompt_port = os.getenv(SettingsKeys.PROMPT_PORT, "3003")
+ os.environ.setdefault(SettingsKeys.PROMPT_HOST, prompt_host)
+ os.environ.setdefault(SettingsKeys.PROMPT_PORT, prompt_port)
platform_host = os.getenv(ToolEnv.PLATFORM_HOST, "backend")
platform_port = os.getenv(ToolEnv.PLATFORM_PORT, "8000")🤖 Prompt for AI Agents
In `@workers/structure/tasks.py` around lines 269 - 280, The code currently reads
PROMPT_HOST and PROMPT_PORT into local variables (prompt_host, prompt_port) then
constructs PromptTool, but STHelper.dynamic_extraction and
STHelper.dynamic_indexing later call tool.get_env_or_die which reads os.environ
directly and will fail if the keys are not present; before creating PromptTool
(or before any STHelper calls) call
os.environ.setdefault(SettingsKeys.PROMPT_HOST, prompt_host) and
os.environ.setdefault(SettingsKeys.PROMPT_PORT, prompt_port) so the defaults
persist in the environment, ensuring tool.get_env_or_die (and
STHelper.dynamic_extraction/dynamic_indexing) can find them.
| try: | ||
| exported_tool = platform_helper.get_prompt_studio_tool( | ||
| prompt_registry_id=prompt_registry_id | ||
| ) | ||
| except Exception as e: | ||
| logger.info(f"Not found as prompt studio project, trying agentic registry: {e}") | ||
|
|
||
| if exported_tool and SettingsKeys.TOOL_METADATA in exported_tool: | ||
| tool_metadata = exported_tool[SettingsKeys.TOOL_METADATA] | ||
| is_agentic = False | ||
| tool_metadata["is_agentic"] = False | ||
| else: | ||
| # Try agentic registry as fallback | ||
| try: | ||
| agentic_tool = platform_helper.get_agentic_studio_tool( | ||
| agentic_registry_id=prompt_registry_id | ||
| ) | ||
| if not agentic_tool or SettingsKeys.TOOL_METADATA not in agentic_tool: | ||
| raise SdkError( | ||
| f"Error fetching project: Registry returned empty response for {prompt_registry_id}" | ||
| ) | ||
| tool_metadata = agentic_tool[SettingsKeys.TOOL_METADATA] | ||
| is_agentic = True | ||
| tool_metadata["is_agentic"] = True | ||
| logger.info( | ||
| f"Retrieved agentic project: {tool_metadata.get('name', prompt_registry_id)}" | ||
| ) | ||
| except Exception as agentic_error: | ||
| raise SdkError( | ||
| f"Error fetching project from both registries for ID '{prompt_registry_id}': {agentic_error}" | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, let's find the relevant files and understand the exception handling
rg -l "get_prompt_studio_tool" --type py | head -5Repository: Zipstack/unstract
Length of output: 236
🏁 Script executed:
#!/bin/bash
# Look for the actual implementation of get_prompt_studio_tool
rg -n "def get_prompt_studio_tool" --type py -A 15Repository: Zipstack/unstract
Length of output: 2654
🏁 Script executed:
#!/bin/bash
# Check for _call_service and how it handles errors
rg -n "def _call_service" --type py -A 20Repository: Zipstack/unstract
Length of output: 3706
🏁 Script executed:
#!/bin/bash
# Look for SdkError definition and how status_code is set
rg -n "class SdkError" --type py -A 10Repository: Zipstack/unstract
Length of output: 918
🏁 Script executed:
#!/bin/bash
# Get the full implementation of _call_service in platform.py
sed -n '218,280p' unstract/sdk1/src/unstract/sdk1/platform.pyRepository: Zipstack/unstract
Length of output: 2719
🏁 Script executed:
#!/bin/bash
# Check if there are more lines in the _call_service implementation
sed -n '218,350p' unstract/sdk1/src/unstract/sdk1/platform.pyRepository: Zipstack/unstract
Length of output: 5374
🏁 Script executed:
#!/bin/bash
# Check what exception types are actually raised and how they're caught in the codebase
rg -n "RequestException" unstract/sdk1/src/unstract/sdk1/platform.py -A 2 -B 2Repository: Zipstack/unstract
Length of output: 727
🏁 Script executed:
#!/bin/bash
# Check if there's a wrapper or custom exception that carries status_code
rg -n "raise.*RequestException\|raise.*SdkError" unstract/sdk1/src/unstract/sdk1/platform.pyRepository: Zipstack/unstract
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Look at the actual tasks.py code to see what happens when exception is caught
sed -n '296,330p' workers/structure/tasks.pyRepository: Zipstack/unstract
Length of output: 1537
🏁 Script executed:
#!/bin/bash
# Check what stream_error_and_exit does - does it raise or exit?
rg -n "def stream_error_and_exit" unstract/sdk1/src/unstract/sdk1 -A 5Repository: Zipstack/unstract
Length of output: 609
🏁 Script executed:
#!/bin/bash
# Check the imports in tasks.py to understand what exceptions should be caught
head -50 workers/structure/tasks.py | grep -E "^import|^from"Repository: Zipstack/unstract
Length of output: 720
🏁 Script executed:
#!/bin/bash
# Let's check if RequestException carries status_code attribute
python3 << 'EOF'
from requests.exceptions import RequestException
import inspect
# Check RequestException attributes
print("RequestException attributes:")
print([attr for attr in dir(RequestException('test')) if not attr.startswith('_')])
# Check the signature
e = RequestException('test')
print("\nHas status_code:", hasattr(e, 'status_code'))
EOFRepository: Zipstack/unstract
Length of output: 243
🏁 Script executed:
#!/bin/bash
# Check what happens in stream_error_and_exit - does it raise or call sys.exit?
sed -n '118,135p' unstract/sdk1/src/unstract/sdk1/tool/stream.pyRepository: Zipstack/unstract
Length of output: 646
Avoid falling back to agentic on non-404 errors.
Catching all exceptions treats auth/network failures as "not found," which can route to agentic incorrectly. However, the suggested fix is incorrect: _call_service raises RequestException (not SdkError) for 404 errors on tool lookups, and RequestException does not have a status_code attribute.
The correct approach is to catch RequestException specifically instead of all exceptions, since that's what _call_service raises for 404 on tool lookups:
✅ Corrected fix
try:
exported_tool = platform_helper.get_prompt_studio_tool(
prompt_registry_id=prompt_registry_id
)
- except Exception as e:
+ except RequestException as e:
logger.info(f"Not found as prompt studio project, trying agentic registry: {e}")This ensures only 404 responses from the tool lookup endpoints trigger the fallback; other exceptions (auth errors, network issues, etc.) will propagate properly.
🧰 Tools
🪛 Ruff (0.14.14)
[warning] 300-300: Do not catch blind exception: Exception
(BLE001)
[warning] 314-316: Abstract raise to an inner function
(TRY301)
[warning] 314-316: Avoid specifying long messages outside the exception class
(TRY003)
[warning] 323-323: Do not catch blind exception: Exception
(BLE001)
[warning] 324-326: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
[warning] 324-326: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
In `@workers/structure/tasks.py` around lines 296 - 326, Replace the broad except
around get_prompt_studio_tool with a specific catch for RequestException and
only fall back to get_agentic_studio_tool when the caught RequestException
indicates a 404 from the service; for any other RequestException (no response or
non-404 status) re-raise it so auth/network errors propagate. Concretely, change
the try/except that calls get_prompt_studio_tool (and references
prompt_registry_id, exported_tool, SettingsKeys.TOOL_METADATA, logger) to except
RequestException as e: check e.response exists and e.response.status_code == 404
to proceed to the agentic lookup, otherwise raise e; keep the existing SdkError
handling for agentic lookup failures.
| from typing import Any | ||
|
|
||
| from json_repair import repair_json |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check for json_repair in various dependency files
echo "=== Searching for json_repair in dependency files ==="
# List common dependency files in the repo
echo "=== Looking for dependency declaration files ==="
fd -t f '(requirements|setup|pyproject|poetry|Pipfile)'
echo ""
echo "=== Searching for json_repair in all files ==="
rg 'json.repair|json_repair' --type py --type txt --type toml --type cfg --type yamlRepository: Zipstack/unstract
Length of output: 1958
🏁 Script executed:
#!/bin/bash
echo "=== Content of workers/pyproject.toml ==="
cat workers/pyproject.toml
echo ""
echo "=== Content of root pyproject.toml ==="
cat pyproject.tomlRepository: Zipstack/unstract
Length of output: 13247
🏁 Script executed:
#!/bin/bash
echo "=== Searching for json_repair in unstract packages ==="
find unstract -name "pyproject.toml" -exec grep -l "json.repair\|json_repair" {} \;
echo ""
echo "=== Checking where repair_json is imported/used ==="
rg "from json_repair import|import json_repair|repair_json" --type py
echo ""
echo "=== Check if repair_json is actually used in workers/structure/utils.py ==="
cat workers/structure/utils.pyRepository: Zipstack/unstract
Length of output: 7142
Add json_repair to workers dependencies.
The json_repair library is imported in workers/structure/utils.py and actively used in repair_json_with_best_structure(), but it is not declared as a dependency in workers/pyproject.toml. Add it to the dependencies list to prevent runtime import failures.
🤖 Prompt for AI Agents
In `@workers/structure/utils.py` around lines 1 - 3, The workers package is
missing the json_repair dependency used by repair_json_with_best_structure() in
workers/structure/utils.py; add "json_repair" to the dependencies list in
workers/pyproject.toml (under [tool.poetry.dependencies] or the equivalent
dependencies section) so the import from json_repair and usage in
repair_json_with_best_structure() will not fail at runtime.
| @app.task(bind=True) | ||
| def healthcheck(self): | ||
| """Health check task for monitoring systems.""" | ||
| return { | ||
| "status": "healthy", | ||
| "worker_type": "structure", | ||
| "task_id": self.request.id, | ||
| "worker_name": config.worker_name if config else "structure-worker", | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, let's find the healthcheck task and understand its context
cd workers/structure || exit
cat -n worker.py | head -80Repository: Zipstack/unstract
Length of output: 2759
🏁 Script executed:
#!/bin/bash
# Search for check_structure_health function
rg -n "check_structure_health" --type pyRepository: Zipstack/unstract
Length of output: 221
🏁 Script executed:
#!/bin/bash
# Look for healthcheck references in the codebase
rg -n "healthcheck" --type pyRepository: Zipstack/unstract
Length of output: 506
🏁 Script executed:
#!/bin/bash
# Check the overall structure of the workers directory
find . -name "*.py" -type f | grep -E "(worker|health)" | head -20Repository: Zipstack/unstract
Length of output: 1054
🏁 Script executed:
#!/bin/bash
# Find HealthCheckResult class definition
rg -n "class HealthCheckResult" --type pyRepository: Zipstack/unstract
Length of output: 139
🏁 Script executed:
#!/bin/bash
# Check the HealthStatus enum
rg -n "class HealthStatus" --type pyRepository: Zipstack/unstract
Length of output: 140
🏁 Script executed:
#!/bin/bash
# Look at one of the other worker files to see if they have similar pattern
cat -n workers/general/worker.py | sed -n '50,80p'Repository: Zipstack/unstract
Length of output: 740
🏁 Script executed:
#!/bin/bash
# Read the HealthCheckResult and HealthStatus definitions
cat -n workers/shared/infrastructure/monitoring/health.pyRepository: Zipstack/unstract
Length of output: 21437
Use actual health check result in healthcheck task instead of hardcoding "healthy".
The healthcheck task currently always reports "status": "healthy" regardless of actual worker state. The check_structure_health() function can return DEGRADED status when the API client is unhealthy, but this is masked by the Celery task. If this task is used by liveness/readiness probes, it will incorrectly report the worker as healthy even during degradation.
✅ Suggested fix
`@app.task`(bind=True)
def healthcheck(self):
"""Health check task for monitoring systems."""
+ result = check_structure_health()
return {
- "status": "healthy",
+ "status": result.status.value,
"worker_type": "structure",
"task_id": self.request.id,
"worker_name": config.worker_name if config else "structure-worker",
+ "details": result.details,
+ "message": result.message,
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @app.task(bind=True) | |
| def healthcheck(self): | |
| """Health check task for monitoring systems.""" | |
| return { | |
| "status": "healthy", | |
| "worker_type": "structure", | |
| "task_id": self.request.id, | |
| "worker_name": config.worker_name if config else "structure-worker", | |
| } | |
| `@app.task`(bind=True) | |
| def healthcheck(self): | |
| """Health check task for monitoring systems.""" | |
| result = check_structure_health() | |
| return { | |
| "status": result.status.value, | |
| "worker_type": "structure", | |
| "task_id": self.request.id, | |
| "worker_name": config.worker_name if config else "structure-worker", | |
| "details": result.details, | |
| "message": result.message, | |
| } |
🤖 Prompt for AI Agents
In `@workers/structure/worker.py` around lines 61 - 69, Replace the hardcoded
"healthy" status in the Celery task healthcheck with the real result from
check_structure_health(): call check_structure_health() inside the
healthcheck(self) task, use its returned status (or entire result) for the
"status" field (and include any diagnostic info it returns) while preserving
existing fields like "worker_type", "task_id" (self.request.id) and
"worker_name" (config.worker_name fallback); update references to the function
name check_structure_health and the healthcheck task to ensure the task reflects
DEGRADED/unhealthy states from the API client.


What
Why
How
Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
Database Migrations
Env Config
Relevant Docs
Related Issues or PRs
Dependencies Versions
Notes on Testing
Screenshots
Checklist
I have read and understood the Contribution Guidelines.