From b3d89cfa026208d8e3b1fafe3083c8249c6976aa Mon Sep 17 00:00:00 2001 From: erick-gege Date: Tue, 5 May 2026 12:12:47 -0500 Subject: [PATCH 1/2] feat: add error logs for spiders and deploys --- estela-api/api/serializers/deploy.py | 21 +- estela-api/api/serializers/job.py | 19 ++ estela-api/api/views/deploy.py | 16 +- estela-api/api/views/job.py | 19 +- estela-api/config/celery.py | 4 + estela-api/core/error_logs.py | 195 ++++++++++++++++++ estela-api/core/tasks.py | 71 ++++++- estela-api/engines/kubernetes.py | 73 ++++++- estela-web/src/pages/DeployListPage/index.tsx | 67 +++++- .../src/pages/DeployListPage/styles.scss | 41 +++- estela-web/src/pages/JobDetailPage/index.tsx | 136 +++++++++++- .../templates/API/api-serviceaccount.yaml | 2 +- 12 files changed, 647 insertions(+), 17 deletions(-) create mode 100644 estela-api/core/error_logs.py diff --git a/estela-api/api/serializers/deploy.py b/estela-api/api/serializers/deploy.py index 58b16315..3c5899fb 100644 --- a/estela-api/api/serializers/deploy.py +++ b/estela-api/api/serializers/deploy.py @@ -1,8 +1,11 @@ +from datetime import datetime + from rest_framework import serializers from django.conf import settings from api.serializers.project import UserDetailSerializer from api.serializers.spider import SpiderSerializer +from config.job_manager import spiderdata_db_client from core.models import Deploy, Spider from engines.kubernetes import KubernetesEngine @@ -48,14 +51,23 @@ class DeployUpdateSerializer(serializers.ModelSerializer): required=False, help_text="Spider names in this deploy.", ) + error_reason = serializers.CharField( + write_only=True, + required=False, + allow_null=True, + allow_blank=True, + max_length=200_000, + help_text="Error logs to persist in deploy_logs (Mongo) on failure.", + ) class Meta: model = Deploy - fields = ["did", "status", "spiders_names"] + fields = ["did", "status", "spiders_names", "error_reason"] def update(self, instance, validated_data): status = validated_data.get("status", instance.status) spiders_names = validated_data.get("spiders_names", []) + error_reason = validated_data.pop("error_reason", None) project = instance.project if status != instance.status: if instance.status != Deploy.BUILDING_STATUS: @@ -66,6 +78,13 @@ def update(self, instance, validated_data): ) else: instance.status = status + if status == Deploy.FAILURE_STATUS and error_reason and spiderdata_db_client.get_connection(): + db = str(project.pid) + spiderdata_db_client.client[db]["deploy_logs"].insert_one({ + "deploy_id": instance.did, + "logs": f"=== Deploy ===\n{error_reason}", + "created": datetime.utcnow(), + }) if status == Deploy.SUCCESS_STATUS and spiders_names: project.spiders.filter(name__in=spiders_names, deleted=True).update( diff --git a/estela-api/api/serializers/job.py b/estela-api/api/serializers/job.py index d71abdcd..d42156ce 100644 --- a/estela-api/api/serializers/job.py +++ b/estela-api/api/serializers/job.py @@ -1,3 +1,5 @@ +from datetime import datetime + from django.shortcuts import get_object_or_404 from rest_framework import serializers @@ -202,6 +204,14 @@ class SpiderJobUpdateSerializer(serializers.ModelSerializer): required=False, help_text="Job data expiry days.", ) + error_reason = serializers.CharField( + write_only=True, + required=False, + allow_null=True, + allow_blank=True, + max_length=200_000, + help_text="Error logs to persist in job_logs (Mongo) on failure.", + ) allowed_status_to_stop = [ SpiderJob.WAITING_STATUS, @@ -228,12 +238,14 @@ class Meta: "data_status", "data_expiry_days", "proxy_usage_data", + "error_reason", ) def update(self, instance, validated_data): status = validated_data.get("status", instance.status) data_status = validated_data.get("data_status", "") data_expiry_days = int(validated_data.get("data_expiry_days", 1)) + error_reason = validated_data.pop("error_reason", None) if status != instance.status: if instance.status == SpiderJob.STOPPED_STATUS: @@ -258,6 +270,13 @@ def update(self, instance, validated_data): pass instance.save() job_manager.delete_job(instance.name) + if status == SpiderJob.ERROR_STATUS and error_reason and spiderdata_db_client.get_connection(): + db = str(instance.spider.project.pid) + spiderdata_db_client.client[db]["job_logs"].insert_one({ + "job_id": instance.jid, + "logs": error_reason, + "created": datetime.utcnow(), + }) instance.status = status for field in self.job_fields: diff --git a/estela-api/api/views/deploy.py b/estela-api/api/views/deploy.py index f0d1c3e3..2c5f07ec 100644 --- a/estela-api/api/views/deploy.py +++ b/estela-api/api/views/deploy.py @@ -1,6 +1,7 @@ from django.shortcuts import get_object_or_404 from drf_yasg.utils import swagger_auto_schema from rest_framework import status, viewsets +from rest_framework.decorators import action from rest_framework.exceptions import APIException, ParseError, PermissionDenied from rest_framework.response import Response @@ -11,7 +12,7 @@ DeploySerializer, DeployUpdateSerializer, ) -from config.job_manager import credentials +from config.job_manager import credentials, spiderdata_db_client from core.models import Deploy, Project from core.views import launch_deploy_job @@ -102,3 +103,16 @@ def update(self, request, *args, **kwargs): headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_200_OK, headers=headers) + + @action(detail=True, methods=["get"], url_path="logs") + def logs(self, request, *args, **kwargs): + deploy = self.get_object() + if not spiderdata_db_client.get_connection(): + return Response({"error": "Could not connect to database."}, status=status.HTTP_503_SERVICE_UNAVAILABLE) + db = str(deploy.project.pid) + record = spiderdata_db_client.client[db]["deploy_logs"].find_one( + {"deploy_id": deploy.did}, sort=[("created", -1)] + ) + if not record: + return Response({"logs": None}) + return Response({"logs": record.get("logs")}) diff --git a/estela-api/api/views/job.py b/estela-api/api/views/job.py index 6adb2d67..fbaf91ad 100644 --- a/estela-api/api/views/job.py +++ b/estela-api/api/views/job.py @@ -3,6 +3,7 @@ from drf_yasg import openapi from drf_yasg.utils import swagger_auto_schema from rest_framework import mixins, status +from rest_framework.decorators import action from rest_framework.exceptions import ParseError from rest_framework.response import Response @@ -14,7 +15,7 @@ SpiderJobUpdateSerializer, ) from api.utils import get_proxy_provider_envs, update_stats_from_redis -from config.job_manager import job_manager +from config.job_manager import job_manager, spiderdata_db_client from core.models import DataStatus, Project, ProxyProvider, Spider, SpiderJob from core.tiers import DEFAULT_TIER @@ -199,3 +200,19 @@ def retrieve(self, request, *args, jid=None, **kwargs): headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_200_OK, headers=headers) + + @action(detail=True, methods=["get"], url_path="error_logs") + def error_logs(self, request, *args, **kwargs): + job = self.get_object() + if not spiderdata_db_client.get_connection(): + return Response( + {"error": "Could not connect to database."}, + status=status.HTTP_503_SERVICE_UNAVAILABLE, + ) + db = str(job.spider.project.pid) + record = spiderdata_db_client.client[db]["job_logs"].find_one( + {"job_id": job.jid}, sort=[("created", -1)] + ) + if not record: + return Response({"logs": None}) + return Response({"logs": record.get("logs")}) diff --git a/estela-api/config/celery.py b/estela-api/config/celery.py index f4fc9d91..5c17f5fc 100644 --- a/estela-api/config/celery.py +++ b/estela-api/config/celery.py @@ -17,6 +17,10 @@ }, "check-and-update-job-status-errors": { "task": "core.tasks.check_and_update_job_status_errors", + "schedule": 30, + }, + "check-and-update-deploy-status-errors": { + "task": "core.tasks.check_and_update_deploy_status_errors", "schedule": 60, }, "delete-expired-jobs-data": { diff --git a/estela-api/core/error_logs.py b/estela-api/core/error_logs.py new file mode 100644 index 00000000..9b43b5a7 --- /dev/null +++ b/estela-api/core/error_logs.py @@ -0,0 +1,195 @@ +"""Capture and format failure information for spider jobs and deploys. + +The functions in this module are best-effort log-scrapers that turn raw +Kubernetes pod output into something a non-developer end user can read: + +* a friendly one-line description of *why* the job stopped, derived from + the pod's termination reason (OOMKilled, Evicted, exit code, etc.). +* the last user-relevant Python traceback in the pod logs, with infra + side-effect tracebacks (logging handlers crashing because Kafka + closed, async cleanup failures, etc.) stripped out. + +Anything outside a traceback block is intentionally discarded — that's +the contract that keeps Kafka/Redis/Twisted infrastructure noise out of +what the user sees. +""" + +from datetime import datetime + +from config.job_manager import job_manager, spiderdata_db_client + + +_TRACEBACK_HEADER = "Traceback (most recent call last):" +_CHAIN_MARKERS = ( + "During handling of the above exception, another exception occurred:", + "The above exception was the direct cause of the following exception:", +) +# Markers that signal the next traceback is a downstream side-effect of the +# real error (logging handler crashed while reporting it, async cleanup +# failed after the spider already died, etc.). These are stable strings +# emitted by the Python stdlib `logging` module and Twisted, not random +# substrings — they're how those modules officially announce a meta-error. +_INFRA_MARKERS = ( + "--- Logging error ---", + "--- ---", + "[Failure instance:", +) + + +def format_friendly_reason(termination): + """Translate a k8s container-termination dict into one user-friendly line. + + Returns None when no termination data is available. Best-effort: in + local clusters kubelet often does not populate `reason`, so several + paths fall through to a generic message; managed k8s (EKS/GKE) + populates these reliably. + """ + if not termination: + return None + reason = termination.get("reason") + exit_code = termination.get("exit_code") + init_container = termination.get("init_container") + if init_container: + return "The spider could not be prepared and was stopped before starting." + if reason == "OOMKilled": + return "The spider used too much memory and was stopped." + if reason == "DeadlineExceeded": + return "The spider ran for too long and was stopped." + if reason == "Evicted": + return "The spider was stopped because the system ran out of resources." + if exit_code == 137: + return "The spider was stopped unexpectedly." + if reason and reason != "Error": + return f"The spider stopped unexpectedly ({reason})." + return "The spider stopped due to an unexpected error." + + +def _parse_traceback_block(lines, start): + """Parse a single traceback block starting at lines[start]. + Returns (end_index_exclusive, block_lines) or (None, None) if invalid.""" + block = [lines[start]] + i = start + 1 + saw_exception = False + while i < len(lines): + ln = lines[i] + if not ln.strip(): + break + if ln.startswith((" ", "\t")): + block.append(ln) + elif not saw_exception: + block.append(ln) + saw_exception = True + else: + break + i += 1 + if not saw_exception: + return None, None + return i, block + + +def extract_last_traceback(raw): + """Return the last user-relevant Python traceback chain in the logs. + + A chain is one traceback or several connected by Python's chain markers + ("During handling of the above exception..." or "The above exception + was the direct cause of..."). All chained blocks plus their connecting + lines are returned so the user can see the original cause. + + Tracebacks whose preceding region contains an infra marker + (`_INFRA_MARKERS`) are skipped — they're side-effects of the error + machinery itself (e.g. log handler failed because Kafka already + closed), not the user's problem. + + Free-form log lines outside a traceback block or chain marker are + discarded — that's the contract that keeps infra noise out. + """ + if not raw: + return None + lines = raw.splitlines() + raw_starts = [i for i, ln in enumerate(lines) if _TRACEBACK_HEADER in ln] + if not raw_starts: + return None + + blocks = [] # list of (start_idx, end_idx_exclusive, block_lines) + last_end = 0 + for s in raw_starts: + between = lines[last_end:s] + is_infra = any( + marker in ln for ln in between for marker in _INFRA_MARKERS + ) + end, block = _parse_traceback_block(lines, s) + if block: + if not is_infra: + blocks.append((s, end, block)) + last_end = end + else: + last_end = s + 1 + if not blocks: + return None + + # Walk backwards from the last block, including predecessors connected + # to it by a chain marker. + chain = [len(blocks) - 1] + for i in range(len(blocks) - 1, 0, -1): + cur_start = blocks[i][0] + prev_end = blocks[i - 1][1] + between = lines[prev_end:cur_start] + if any(marker in ln for ln in between for marker in _CHAIN_MARKERS): + chain.insert(0, i - 1) + else: + break + + out = [] + for pos, block_idx in enumerate(chain): + _, end, block = blocks[block_idx] + out.extend(block) + if pos < len(chain) - 1: + next_start = blocks[chain[pos + 1]][0] + for ln in lines[end:next_start]: + stripped = ln.strip() + if stripped and any(m in stripped for m in _CHAIN_MARKERS): + out.append("") + out.append(stripped) + out.append("") + return "\n".join(out) + + +def strip_blanks(raw): + """Return the log with blank lines removed, or None. Used for build + containers (downloader/kaniko/spider-status) which don't run the + spider's runtime stack, so they don't contain infra-level noise.""" + if not raw: + return None + lines = [ln for ln in raw.splitlines() if ln.strip()] + return "\n".join(lines) if lines else None + + +def capture_job_error_reason(job): + """Build the failure summary stored in Mongo for a failed spider job. + + Combines the friendly termination reason (when k8s provides one) and + the last user-relevant Python traceback (when one is in the pod log). + Returns None if neither piece of info is available. + """ + termination = job_manager.read_pod_termination_reason(job.name) + logs = job_manager.read_pod_logs(job.name, tail=2000) + parts = [] + reason = format_friendly_reason(termination) + if reason: + parts.append(reason) + traceback = extract_last_traceback(logs) + if traceback: + parts.append(f"Error details:\n{traceback}") + return "\n\n".join(parts) or None + + +def write_job_logs_to_mongo(job, logs): + """Insert a job_logs record into the project's Mongo database.""" + if not logs or not spiderdata_db_client.get_connection(): + return + db = str(job.spider.project.pid) + spiderdata_db_client.client[db]["job_logs"].insert_one({ + "job_id": job.jid, + "logs": logs, + "created": datetime.utcnow(), + }) diff --git a/estela-api/core/tasks.py b/estela-api/core/tasks.py index d930d47b..545c45dd 100644 --- a/estela-api/core/tasks.py +++ b/estela-api/core/tasks.py @@ -21,12 +21,18 @@ from core.models import ( DataStatus, Permission, + Deploy, Project, ProxyProvider, Spider, SpiderJob, UsageRecord, ) +from core.error_logs import ( + capture_job_error_reason, + strip_blanks, + write_job_logs_to_mongo, +) from core.tiers import get_tier_resources from core.utils import parse_k8s_resource, parse_memory_to_mi @@ -283,9 +289,11 @@ def launch_job(sid_, data_, data_expiry_days=None, token=None): @celery_app.task(name="core.tasks.check_and_update_job_status_errors") def check_and_update_job_status_errors(): - jobs = SpiderJob.objects.filter( - status__in=[SpiderJob.WAITING_STATUS, SpiderJob.RUNNING_STATUS] - )[: settings.CHECK_JOB_ERRORS_BATCH_SIZE] + candidate_statuses = [SpiderJob.WAITING_STATUS, SpiderJob.RUNNING_STATUS] + jobs = SpiderJob.objects.filter(status__in=candidate_statuses)[ + : settings.CHECK_JOB_ERRORS_BATCH_SIZE + ] + for job in jobs: job_status = job_manager.read_job_status(job.name) is_waiting = job.status == SpiderJob.WAITING_STATUS @@ -298,10 +306,65 @@ def check_and_update_job_status_errors(): try: update_stats_from_redis(job, save_to_database=True) delete_stats_from_redis(job) - except: + except Exception: pass + error_logs = capture_job_error_reason(job) job.status = SpiderJob.ERROR_STATUS job.save() + write_job_logs_to_mongo(job, error_logs) + + # Fallback: jobs already in ERROR but with no entry in job_logs yet. + # Covers paths that mark ERROR without writing logs (e.g. the + # SpiderJob.job_status property mutation, manual API marks). + if not spiderdata_db_client.get_connection(): + return + error_jobs = SpiderJob.objects.filter( + status=SpiderJob.ERROR_STATUS + ).order_by("-created")[: settings.CHECK_JOB_ERRORS_BATCH_SIZE] + for job in error_jobs: + db = str(job.spider.project.pid) + if spiderdata_db_client.client[db]["job_logs"].find_one( + {"job_id": job.jid}, projection={"_id": 1} + ): + continue + error_logs = capture_job_error_reason(job) + write_job_logs_to_mongo(job, error_logs) + + +@celery_app.task(name="core.tasks.check_and_update_deploy_status_errors") +def check_and_update_deploy_status_errors(): + deploys = Deploy.objects.filter(status=Deploy.BUILDING_STATUS)[ + : settings.CHECK_JOB_ERRORS_BATCH_SIZE + ] + + for deploy in deploys: + job_name = f"deploy-project-{deploy.did}" + job_status = job_manager.read_job_status(job_name) + if job_status is None or ( + job_status.active is None and job_status.succeeded is None + ): + build_logs = job_manager.read_build_logs(job_name) or {} + if build_logs and spiderdata_db_client.get_connection(): + parts = [] + for label, key in ( + ("Downloader", "project-downloader"), + ("Build", "kaniko-builder"), + ("Deploy", "spider-status"), + ): + log = build_logs.get(key) + excerpt = strip_blanks(log) + if excerpt: + parts.append(f"=== {label} ===\n{excerpt}") + combined = "\n\n".join(parts) + if combined: + db = str(deploy.project.pid) + spiderdata_db_client.client[db]["deploy_logs"].insert_one({ + "deploy_id": deploy.did, + "logs": combined, + "created": datetime.utcnow(), + }) + deploy.status = Deploy.FAILURE_STATUS + deploy.save() @celery_app.task( diff --git a/estela-api/engines/kubernetes.py b/estela-api/engines/kubernetes.py index c12d6f53..6a429df8 100644 --- a/estela-api/engines/kubernetes.py +++ b/estela-api/engines/kubernetes.py @@ -230,13 +230,17 @@ def read_job_status(self, name, namespace="default", api_instance=None): return self.Status(api_response.status) + def _get_job_pods(self, job_name, namespace="default"): + config.load_incluster_config() + v1 = client.CoreV1Api() + return v1.list_namespaced_pod(namespace, label_selector=f"job-name={job_name}") + def get_deploy_stage(self, did, namespace="default"): try: - core_api = client.CoreV1Api() batch_api = self.get_api_instance() job_name = f"deploy-project-{did}" batch_api.read_namespaced_job(job_name, namespace) - pods = core_api.list_namespaced_pod(namespace, label_selector=f"job-name={job_name}") + pods = self._get_job_pods(job_name, namespace) if not pods.items: return None init_statuses = pods.items[0].status.init_container_statuses or [] @@ -247,6 +251,71 @@ def get_deploy_stage(self, did, namespace="default"): pass return None + def read_pod_termination_reason(self, job_name, namespace="default"): + try: + pods = self._get_job_pods(job_name, namespace) + for pod in pods.items: + for cs in pod.status.init_container_statuses or []: + t = cs.state.terminated + if t and t.exit_code and t.exit_code != 0: + return { + "reason": t.reason or "Error", + "exit_code": t.exit_code, + "init_container": cs.name, + } + for cs in pod.status.container_statuses or []: + t = cs.state.terminated + if t and t.exit_code and t.exit_code != 0: + return { + "reason": t.reason or "Error", + "exit_code": t.exit_code, + "init_container": None, + } + except ApiException: + pass + return None + + def read_pod_logs(self, job_name, namespace="default", tail=None): + try: + v1 = client.CoreV1Api() + pods = self._get_job_pods(job_name, namespace) + for pod in pods.items: + try: + return v1.read_namespaced_pod_log( + pod.metadata.name, + namespace, + tail_lines=tail, + ) + except ApiException: + continue + except ApiException: + pass + return None + + def read_build_logs(self, job_name, namespace="default"): + logs = {} + try: + v1 = client.CoreV1Api() + pods = self._get_job_pods(job_name, namespace) + for pod in pods.items: + for container_name in ("project-downloader", "kaniko-builder"): + try: + logs[container_name] = v1.read_namespaced_pod_log( + pod.metadata.name, namespace, container=container_name + ) + except ApiException: + logs[container_name] = None + try: + logs["spider-status"] = v1.read_namespaced_pod_log( + pod.metadata.name, namespace, container="spider-status" + ) + except ApiException: + logs["spider-status"] = None + break + except ApiException: + pass + return logs + def _create_build_volumes(self): """Create shared volume for build containers""" return [ diff --git a/estela-web/src/pages/DeployListPage/index.tsx b/estela-web/src/pages/DeployListPage/index.tsx index d1ea53aa..bc7ab8c3 100644 --- a/estela-web/src/pages/DeployListPage/index.tsx +++ b/estela-web/src/pages/DeployListPage/index.tsx @@ -1,5 +1,6 @@ import React, { Component, ReactElement } from "react"; import { Layout, Pagination, Row, Table, Button, Tag, Col, Typography, Modal, Tooltip as TooltipAnt } from "antd"; +import { ExclamationCircleOutlined } from "@ant-design/icons"; import { RouteComponentProps } from "react-router-dom"; import Copy from "../../assets/icons/copy.svg"; import Info from "../../assets/icons/info.svg"; @@ -21,6 +22,10 @@ import { resourceNotAllowedNotification, Spin, PaginationItem } from "../../shar import { convertDateToString } from "../../utils"; import { TourStore } from "../../tour"; +interface DeployLogs { + logs: string | null; +} + const { Content } = Layout; const { Text, Paragraph } = Typography; @@ -30,6 +35,7 @@ interface DeployListPageState { count: number; loaded: boolean; modalIsOpen: boolean; + logsModal: { visible: boolean; loading: boolean; logs: DeployLogs | null }; } interface RouteParams { @@ -90,10 +96,24 @@ export class DeployListPage extends Component, current: 0, loaded: false, modalIsOpen: false, + logsModal: { visible: false, loading: false, logs: null }, }; apiService = ApiService(); projectId: string = this.props.match.params.projectId; + fetchDeployLogs = async (deployId: number): Promise => { + this.setState({ logsModal: { visible: true, loading: true, logs: null } }); + try { + const response = await fetch(`${API_BASE_URL}/api/projects/${this.projectId}/deploys/${deployId}/logs`, { + headers: AuthService.getDefaultAuthHeaders(), + }); + const logs: DeployLogs = await response.json(); + this.setState({ logsModal: { visible: true, loading: false, logs } }); + } catch (e) { + this.setState({ logsModal: { visible: true, loading: false, logs: null } }); + } + }; + columns = [ { title: "DEPLOY ID", @@ -120,6 +140,17 @@ export class DeployListPage extends Component, dataIndex: "spidersCount", render: (count: number): ReactElement => (count === 0 ? -/- : {count}), }, + { + title: "", + key: "logsHint", + width: 110, + render: (_: unknown, deploy: Deploy): ReactElement => + deploy.status === "FAILURE" ? ( + View logs › + ) : ( + + ), + }, { title: ( @@ -134,14 +165,21 @@ export class DeployListPage extends Component, ), key: "status", dataIndex: "status", - render: (state: string): ReactElement => ( + render: (state: string, deploy: Deploy): ReactElement => ( {ACTIVE_STAGES.includes(state as DeployStatusEnum) ? ( ) : state === "SUCCESS" ? ( Completed ) : ( - + } + className="deploy-failure-tag border-0 text-s bg-estela-blue-low rounded-md text-estela-red-full cursor-pointer flex items-center" + onClick={(e) => { + e.stopPropagation(); + if (deploy.did) this.fetchDeployLogs(deploy.did); + }} + > Failure )} @@ -213,9 +251,24 @@ export class DeployListPage extends Component, }; render(): JSX.Element { - const { loaded, deploys, count, current, modalIsOpen } = this.state; + const { loaded, deploys, count, current, modalIsOpen, logsModal } = this.state; return ( + this.setState({ logsModal: { visible: false, loading: false, logs: null } })} + footer={null} + width={900} + > + {logsModal.loading ? ( + + ) : ( +
+                            {(logsModal.logs && logsModal.logs.logs) || "No logs available."}
+                        
+ )} +
{loaded ? ( @@ -381,6 +434,14 @@ export class DeployListPage extends Component, size="middle" className="my-4" locale={{ emptyText: "No jobs yet." }} + onRow={(deploy: Deploy) => + deploy.status === "FAILURE" + ? { + className: "deploy-row-failure cursor-pointer", + onClick: () => deploy.did && this.fetchDeployLogs(deploy.did), + } + : {} + } />
td:first-child { + box-shadow: inset 4px 0 0 $estela-red-full; + padding-left: 24px; // push the deploy ID away from the bar + } + + .deploy-view-logs-hint { + visibility: hidden; + } + + &:hover { + > td { + background-color: $estela-red-low !important; + } + + .deploy-view-logs-hint { + visibility: visible; + } + + .deploy-failure-tag, + .deploy-failure-tag .anticon { + background-color: $estela-red-full !important; + color: #ffffff !important; + } + } +} diff --git a/estela-web/src/pages/JobDetailPage/index.tsx b/estela-web/src/pages/JobDetailPage/index.tsx index f590c94d..af83b8bb 100644 --- a/estela-web/src/pages/JobDetailPage/index.tsx +++ b/estela-web/src/pages/JobDetailPage/index.tsx @@ -18,10 +18,15 @@ import { import type { RangePickerProps } from "antd/es/date-picker"; import { Link, RouteComponentProps } from "react-router-dom"; +import { ExclamationCircleOutlined } from "@ant-design/icons"; + import "./styles.scss"; import JobCreateModal from "../JobCreateModal"; import { ApiService } from "../../services"; import { parseDuration, durationToString, formatBytes, getFilteredEnvVars } from "../../utils"; +import { ApiService, AuthService } from "../../services"; +import { API_BASE_URL } from "../../constants"; +import { BytesMetric, parseDuration, durationToString, formatBytes, getFilteredEnvVars } from "../../utils"; import Copy from "../../assets/icons/copy.svg"; import Pause from "../../assets/icons/pause.svg"; import Export from "../../assets/icons/export.svg"; @@ -73,6 +78,9 @@ interface JobDetailPageState { activeKey: string; created: string | undefined; status: string | undefined; + errorLogsModal: { visible: boolean; loading: boolean; logs: string | null }; + errorPreview: string | null; + errorLogsState: "idle" | "loading" | "ready" | "missing"; cronjob: number | undefined | null; items: Dictionary[]; loadedItems: boolean; @@ -205,6 +213,9 @@ export class JobDetailPage extends Component, J activeKey: "1", created: "", status: "", + errorLogsModal: { visible: false, loading: false, logs: null }, + errorPreview: null, + errorLogsState: "idle", cronjob: null, items: [], loadedItems: false, @@ -270,6 +281,9 @@ export class JobDetailPage extends Component, J resourceTier: response.resourceTier || DEFAULT_RESOURCE_TIER, peakMemory: response.peakMemory, }); + if (response.jobStatus === "ERROR") { + this.loadErrorLogs(); + } }, (error: unknown) => { error; @@ -283,6 +297,34 @@ export class JobDetailPage extends Component, J }, 500); } + loadErrorLogs = async (): Promise => { + this.setState({ errorLogsState: "loading" }); + try { + const url = `${API_BASE_URL}/api/projects/${this.projectId}/spiders/${this.spiderId}/jobs/${this.jobId}/error_logs`; + const response = await fetch(url, { headers: AuthService.getDefaultAuthHeaders() }); + const data = await response.json(); + const logs: string | null = data?.logs ?? null; + const trimmed = logs ? logs.trim() : ""; + const preview = trimmed + ? (trimmed.split("\n").find((ln) => ln.trim().length > 0) || "").slice(0, 140) + : null; + this.setState({ + errorLogsState: trimmed ? "ready" : "missing", + errorPreview: preview, + errorLogsModal: { ...this.state.errorLogsModal, logs }, + }); + } catch { + this.setState({ errorLogsState: "missing", errorPreview: null }); + } + }; + + openErrorLogsModal = (): void => { + if (this.state.errorLogsState !== "ready") return; + this.setState({ + errorLogsModal: { visible: true, loading: false, logs: this.state.errorLogsModal.logs }, + }); + }; + getProjectSpiders = async (): Promise => { const requestParams: ApiProjectsSpidersListRequest = { pid: this.projectId }; this.apiService.apiProjectsSpidersList(requestParams).then( @@ -457,6 +499,71 @@ export class JobDetailPage extends Component, J ); }; + chartConfigs = (bytes: BytesMetric): [number[], string[]] => { + const dataChartProportions = [1, 0]; + const colorChartArray = ["#7DC932", "#F1F1F1"]; + if (bytes.type === "Bytes") { + dataChartProportions[0] = 0.05 * (bytes.quantity / 1024); + dataChartProportions[1] = 1 - dataChartProportions[0]; + } else if (bytes.type === "KB") { + dataChartProportions[0] = 0.1 * (bytes.quantity / 1024); + dataChartProportions[1] = 1 - dataChartProportions[0]; + } else if (bytes.type === "MB") { + dataChartProportions[0] = bytes.quantity / 1024; + dataChartProportions[1] = 1 - dataChartProportions[0]; + if (dataChartProportions[0] > 0.75) { + colorChartArray[0] = "#FFC002"; + } + if (dataChartProportions[0] > 0.9) { + colorChartArray[0] = "#E34A46"; + } + } else { + dataChartProportions[0] = bytes.quantity; + colorChartArray[0] = "#E34A46"; + } + return [dataChartProportions, colorChartArray]; + }; + + renderErrorBanner = (): React.ReactNode => { + if (this.state.status !== "ERROR") return null; + const { errorLogsState, errorPreview } = this.state; + if (errorLogsState === "loading" || errorLogsState === "idle") { + return ( +
+
+ + This job ended with an error — loading details… +
+
+ ); + } + if (errorLogsState === "ready") { + return ( +
+
+ +
+
This job ended with an error
+ {errorPreview && ( + {errorPreview} + )} +
+
+ See full error logs › +
+ ); + } + return ( +
+ + This job ended with an error — no logs were captured for this run. +
+ ); + }; + overview = (): React.ReactNode => { const { cronjob, @@ -641,6 +748,8 @@ export class JobDetailPage extends Component, J + {this.renderErrorBanner()} + , J )} {status == "ERROR" && ( - + {status} @@ -945,6 +1059,7 @@ export class JobDetailPage extends Component, J const { loaded, status, + errorLogsModal, modalStop, modalClone, spiders, @@ -1032,6 +1147,25 @@ export class JobDetailPage extends Component, J )} + + this.setState({ + errorLogsModal: { visible: false, loading: false, logs: null }, + }) + } + footer={null} + width={800} + > + {errorLogsModal.loading ? ( + + ) : ( +
+                                                {errorLogsModal.logs || "No error details available."}
+                                            
+ )} +
{modalClone && ( Date: Thu, 28 May 2026 12:06:32 -0500 Subject: [PATCH 2/2] fix: job detail build errors --- estela-api/api/serializers/deploy.py | 13 +--- estela-api/api/serializers/job.py | 12 +--- estela-api/config/celery.py | 2 +- estela-api/core/error_logs.py | 41 ++++++++++-- estela-api/core/models.py | 13 ---- estela-api/core/tasks.py | 67 ++++++++----------- .../src/pages/DeployListPage/styles.scss | 6 ++ estela-web/src/pages/JobDetailPage/index.tsx | 5 +- 8 files changed, 76 insertions(+), 83 deletions(-) diff --git a/estela-api/api/serializers/deploy.py b/estela-api/api/serializers/deploy.py index 3c5899fb..12d1bdc7 100644 --- a/estela-api/api/serializers/deploy.py +++ b/estela-api/api/serializers/deploy.py @@ -1,11 +1,9 @@ -from datetime import datetime - from rest_framework import serializers from django.conf import settings from api.serializers.project import UserDetailSerializer from api.serializers.spider import SpiderSerializer -from config.job_manager import spiderdata_db_client +from core.error_logs import write_deploy_logs_to_mongo from core.models import Deploy, Spider from engines.kubernetes import KubernetesEngine @@ -78,13 +76,8 @@ def update(self, instance, validated_data): ) else: instance.status = status - if status == Deploy.FAILURE_STATUS and error_reason and spiderdata_db_client.get_connection(): - db = str(project.pid) - spiderdata_db_client.client[db]["deploy_logs"].insert_one({ - "deploy_id": instance.did, - "logs": f"=== Deploy ===\n{error_reason}", - "created": datetime.utcnow(), - }) + if status == Deploy.FAILURE_STATUS and error_reason: + write_deploy_logs_to_mongo(instance, f"=== Deploy ===\n{error_reason}") if status == Deploy.SUCCESS_STATUS and spiders_names: project.spiders.filter(name__in=spiders_names, deleted=True).update( diff --git a/estela-api/api/serializers/job.py b/estela-api/api/serializers/job.py index d42156ce..4d1e69e7 100644 --- a/estela-api/api/serializers/job.py +++ b/estela-api/api/serializers/job.py @@ -1,5 +1,3 @@ -from datetime import datetime - from django.shortcuts import get_object_or_404 from rest_framework import serializers @@ -18,6 +16,7 @@ get_collection_name, ) from config.job_manager import job_manager, spiderdata_db_client +from core.error_logs import write_job_logs_to_mongo from core.models import ( DataStatus, SpiderJob, @@ -270,13 +269,8 @@ def update(self, instance, validated_data): pass instance.save() job_manager.delete_job(instance.name) - if status == SpiderJob.ERROR_STATUS and error_reason and spiderdata_db_client.get_connection(): - db = str(instance.spider.project.pid) - spiderdata_db_client.client[db]["job_logs"].insert_one({ - "job_id": instance.jid, - "logs": error_reason, - "created": datetime.utcnow(), - }) + if status == SpiderJob.ERROR_STATUS and error_reason: + write_job_logs_to_mongo(instance, error_reason) instance.status = status for field in self.job_fields: diff --git a/estela-api/config/celery.py b/estela-api/config/celery.py index 5c17f5fc..e0e7d408 100644 --- a/estela-api/config/celery.py +++ b/estela-api/config/celery.py @@ -17,7 +17,7 @@ }, "check-and-update-job-status-errors": { "task": "core.tasks.check_and_update_job_status_errors", - "schedule": 30, + "schedule": 60, }, "check-and-update-deploy-status-errors": { "task": "core.tasks.check_and_update_deploy_status_errors", diff --git a/estela-api/core/error_logs.py b/estela-api/core/error_logs.py index 9b43b5a7..5d1d9198 100644 --- a/estela-api/core/error_logs.py +++ b/estela-api/core/error_logs.py @@ -184,12 +184,39 @@ def capture_job_error_reason(job): def write_job_logs_to_mongo(job, logs): - """Insert a job_logs record into the project's Mongo database.""" - if not logs or not spiderdata_db_client.get_connection(): + """Upsert a job_logs record into the project's Mongo database. + + Uses $setOnInsert so concurrent writes for the same job_id are idempotent — + the first writer wins and subsequent calls are no-ops. + """ + if not spiderdata_db_client.get_connection(): return db = str(job.spider.project.pid) - spiderdata_db_client.client[db]["job_logs"].insert_one({ - "job_id": job.jid, - "logs": logs, - "created": datetime.utcnow(), - }) + spiderdata_db_client.client[db]["job_logs"].update_one( + {"job_id": job.jid}, + {"$setOnInsert": { + "job_id": job.jid, + "logs": logs, + "created": datetime.utcnow(), + }}, + upsert=True, + ) + + +def write_deploy_logs_to_mongo(deploy, logs): + """Upsert a deploy_logs record into the project's Mongo database. + + Uses $setOnInsert so concurrent writes for the same deploy_id are idempotent. + """ + if not spiderdata_db_client.get_connection(): + return + db = str(deploy.project.pid) + spiderdata_db_client.client[db]["deploy_logs"].update_one( + {"deploy_id": deploy.did}, + {"$setOnInsert": { + "deploy_id": deploy.did, + "logs": logs, + "created": datetime.utcnow(), + }}, + upsert=True, + ) diff --git a/estela-api/core/models.py b/estela-api/core/models.py index aac8b036..69cab0b9 100644 --- a/estela-api/core/models.py +++ b/estela-api/core/models.py @@ -400,19 +400,6 @@ def key(self): @property def job_status(self): - if ( - self.status == self.WAITING_STATUS - and timezone.now() - timedelta(seconds=job_manager.JOB_TIME_CREATION) - > self.created - ): - job_status = job_manager.read_job_status(self.name) - if job_status is None: - self.status = self.ERROR_STATUS - self.save() - elif job_status.active is None: - if job_status.succeeded is None: - self.status = self.ERROR_STATUS - self.save() return self.status diff --git a/estela-api/core/tasks.py b/estela-api/core/tasks.py index 545c45dd..c905dc2e 100644 --- a/estela-api/core/tasks.py +++ b/estela-api/core/tasks.py @@ -1,6 +1,6 @@ import json from collections import defaultdict -from datetime import timedelta, datetime +from datetime import timedelta from typing import List import logging @@ -32,6 +32,7 @@ capture_job_error_reason, strip_blanks, write_job_logs_to_mongo, + write_deploy_logs_to_mongo, ) from core.tiers import get_tier_resources from core.utils import parse_k8s_resource, parse_memory_to_mi @@ -287,7 +288,7 @@ def launch_job(sid_, data_, data_expiry_days=None, token=None): serializer.save(**save_kwargs) -@celery_app.task(name="core.tasks.check_and_update_job_status_errors") +@celery_app.task(name="core.tasks.check_and_update_job_status_errors", soft_time_limit=50) def check_and_update_job_status_errors(): candidate_statuses = [SpiderJob.WAITING_STATUS, SpiderJob.RUNNING_STATUS] jobs = SpiderJob.objects.filter(status__in=candidate_statuses)[ @@ -313,58 +314,46 @@ def check_and_update_job_status_errors(): job.save() write_job_logs_to_mongo(job, error_logs) - # Fallback: jobs already in ERROR but with no entry in job_logs yet. - # Covers paths that mark ERROR without writing logs (e.g. the - # SpiderJob.job_status property mutation, manual API marks). - if not spiderdata_db_client.get_connection(): - return - error_jobs = SpiderJob.objects.filter( - status=SpiderJob.ERROR_STATUS - ).order_by("-created")[: settings.CHECK_JOB_ERRORS_BATCH_SIZE] - for job in error_jobs: - db = str(job.spider.project.pid) - if spiderdata_db_client.client[db]["job_logs"].find_one( - {"job_id": job.jid}, projection={"_id": 1} - ): - continue - error_logs = capture_job_error_reason(job) - write_job_logs_to_mongo(job, error_logs) -@celery_app.task(name="core.tasks.check_and_update_deploy_status_errors") +@celery_app.task(name="core.tasks.check_and_update_deploy_status_errors", soft_time_limit=50) def check_and_update_deploy_status_errors(): - deploys = Deploy.objects.filter(status=Deploy.BUILDING_STATUS)[ - : settings.CHECK_JOB_ERRORS_BATCH_SIZE - ] + recent_cutoff = timezone.now() - timedelta(seconds=120) + deploys = Deploy.objects.filter( + status=Deploy.BUILDING_STATUS, + created__lt=recent_cutoff, + )[: settings.CHECK_JOB_ERRORS_BATCH_SIZE] + + has_mongo = spiderdata_db_client.get_connection() for deploy in deploys: - job_name = f"deploy-project-{deploy.did}" - job_status = job_manager.read_job_status(job_name) - if job_status is None or ( - job_status.active is None and job_status.succeeded is None - ): - build_logs = job_manager.read_build_logs(job_name) or {} - if build_logs and spiderdata_db_client.get_connection(): + try: + job_name = f"deploy-project-{deploy.did}" + job_status = job_manager.read_job_status(job_name) + dead = job_status is None or ( + job_status.active is None + and job_status.succeeded is None + and job_status.failed is not None + ) + if not dead: + continue + if has_mongo: + build_logs = job_manager.read_build_logs(job_name) or {} parts = [] for label, key in ( ("Downloader", "project-downloader"), ("Build", "kaniko-builder"), ("Deploy", "spider-status"), ): - log = build_logs.get(key) - excerpt = strip_blanks(log) + excerpt = strip_blanks(build_logs.get(key)) if excerpt: parts.append(f"=== {label} ===\n{excerpt}") - combined = "\n\n".join(parts) - if combined: - db = str(deploy.project.pid) - spiderdata_db_client.client[db]["deploy_logs"].insert_one({ - "deploy_id": deploy.did, - "logs": combined, - "created": datetime.utcnow(), - }) + combined = "\n\n".join(parts) or None + write_deploy_logs_to_mongo(deploy, combined) deploy.status = Deploy.FAILURE_STATUS deploy.save() + except Exception as e: + logging.error("check_and_update_deploy_status_errors: failed for deploy %s: %s", deploy.did, e) @celery_app.task( diff --git a/estela-web/src/pages/DeployListPage/styles.scss b/estela-web/src/pages/DeployListPage/styles.scss index 50a44fbd..ab588609 100644 --- a/estela-web/src/pages/DeployListPage/styles.scss +++ b/estela-web/src/pages/DeployListPage/styles.scss @@ -59,4 +59,10 @@ $estela-red-low: #fff5f2; color: #ffffff !important; } } + + .deploy-failure-tag, + .deploy-failure-tag .anticon { + pointer-events: none; + transition: none !important; + } } diff --git a/estela-web/src/pages/JobDetailPage/index.tsx b/estela-web/src/pages/JobDetailPage/index.tsx index af83b8bb..1b58346a 100644 --- a/estela-web/src/pages/JobDetailPage/index.tsx +++ b/estela-web/src/pages/JobDetailPage/index.tsx @@ -22,8 +22,6 @@ import { ExclamationCircleOutlined } from "@ant-design/icons"; import "./styles.scss"; import JobCreateModal from "../JobCreateModal"; -import { ApiService } from "../../services"; -import { parseDuration, durationToString, formatBytes, getFilteredEnvVars } from "../../utils"; import { ApiService, AuthService } from "../../services"; import { API_BASE_URL } from "../../constants"; import { BytesMetric, parseDuration, durationToString, formatBytes, getFilteredEnvVars } from "../../utils"; @@ -615,6 +613,7 @@ export class JobDetailPage extends Component, J return ( <> + {this.renderErrorBanner()} , J
- - {this.renderErrorBanner()}