diff --git a/docs/designs/production-units.md b/docs/designs/production-units.md index fb91fdd..11c52a4 100644 --- a/docs/designs/production-units.md +++ b/docs/designs/production-units.md @@ -162,7 +162,6 @@ This document does not restate full table schemas; it only defines key field sem |------|------|------| | GET | `/tasks` | List (filters: `workstation_id/status/limit/offset`) | | GET | `/tasks/:id` | Detail (includes `episode` if linked) | -| PUT | `/tasks/:id` | Status update (restricted transitions; see §6.2) | | GET | `/tasks/:id/config` | Generate recorder config (requires workstation robot + collector bindings) | --- @@ -184,12 +183,14 @@ This document does not restate full table schemas; it only defines key field sem ### 6.2 Task states -- **State set**: `pending` | `ready` | `in_progress` | `completed` | `failed` | `cancelled` -- **Prepare (pending→ready)**: triggered by UI/scheduler (currently via `PUT /tasks/:id`). -- **Run (ready→in_progress)**: triggered by UI/device workflow (currently via `PUT /tasks/:id`). +- **State set**: `pending` | `ready` | `in_progress` | `uploading` | `completed` | `failed` | `cancelled` +- **Prepare (pending→ready)**: triggered by recorder config application (`config_applied` / ready state snapshot). +- **Run (ready→in_progress)**: triggered by recorder start callback or recording state snapshot. +- **Finish (in_progress→uploading)**: triggered by recorder finish callback; Keystone sends `upload_request` to Transfer when connected. - **Transfer ACK**: - - On verified upload ACK, Keystone marks task `in_progress -> completed` (only if currently `in_progress`). - - On `upload_failed`, Keystone marks task `in_progress -> failed`. + - On verified upload ACK, Keystone sends `upload_ack`, then marks task `pending/ready/in_progress/uploading -> completed`. + - Duplicate ACKs for already `completed` tasks are allowed but must not re-advance batch/order state. + - On `upload_failed`, Keystone marks task `in_progress/uploading -> failed`. - **Revert to pending (ready/in_progress→pending)**: used for recovery when Transfer disconnects (to avoid stuck runnable tasks). ### 6.3 Batch states @@ -227,15 +228,14 @@ When the device reports `upload_complete`, Keystone runs the Verified ACK flow: - **Idempotent**: if an Episode already exists for this `task_id`, do not insert again - Insert into `episodes` (persist denormalized fields such as `batch_id/order_id/scene_id/...`) - `batches.episode_count += 1` (only when a new Episode is inserted) - - Update `tasks.status` to **`completed`** (and set `completed_at`) **only when current status is `in_progress`** 3. **Send `upload_ack`** to the device +4. **Mark task completed**: update `tasks.status` to **`completed`** (and set `completed_at`) when current status is `pending/ready/in_progress/uploading`; already completed tasks remain idempotent --- ## 8. Known gaps and evolution -- **In-recording state**: `callbacks/start` does not persist state today; `ready -> in_progress` validation/persistence is not implemented yet. -- **Failure path**: an end-to-end `failed` terminal state and error attribution are not fully implemented (callbacks/transfer need to be extended). +- **Failure path**: terminal `failed` handling exists for transfer failures; recorder callback failure attribution still needs tighter end-to-end coverage. - **Quota consistency**: - Dispatch quota is based on non-deleted task rows, not completed rows. - New/bulk batch creation uses `remaining_assignable = target_count - order_task_count`. diff --git a/docs/designs/task-manage.md b/docs/designs/task-manage.md index bf48e63..67321b6 100644 --- a/docs/designs/task-manage.md +++ b/docs/designs/task-manage.md @@ -50,10 +50,11 @@ The Task state machine defines the complete lifecycle of a Task, with state tran | State | Description | Valid Previous States | Valid Next States | |-------|-------------|----------------------|-------------------| | `pending` | Task created, awaiting workstation preparation | `ready`(cancel), `*`(new) | `ready`, `cancelled` | -| `ready` | Data collector clicked "Make Ready", awaiting recording start | `pending` | `in_progress`, `pending`, `cancelled` | -| `in_progress` | Recording in progress | `ready` | `completed`, `failed` | -| `completed` | Recording completed successfully | `in_progress` | *(terminal)* | -| `failed` | Recording failed | `in_progress` | *(terminal)* | +| `ready` | Recorder applied TaskConfig, awaiting recording start | `pending` | `in_progress`, `pending`, `cancelled` | +| `in_progress` | Recording in progress | `pending`, `ready` | `uploading`, `failed` | +| `uploading` | Recording finished, waiting for Transfer upload ACK | `pending`, `ready`, `in_progress` | `completed`, `failed` | +| `completed` | Recording uploaded and acknowledged successfully | `pending`, `ready`, `in_progress`, `uploading` | *(terminal)* | +| `failed` | Recording or upload failed | `in_progress`, `uploading` | *(terminal)* | | `cancelled` | Task cancelled | `pending`, `ready` | *(terminal)* | ### 2.2 State Transition Diagram @@ -61,13 +62,14 @@ The Task state machine defines the complete lifecycle of a Task, with state tran ```mermaid stateDiagram-v2 [*] --> pending: Order created/Task generated - pending --> ready: Data Collector clicks "Make Ready" + pending --> ready: Recorder config_applied pending --> cancelled: Task cancelled ready --> in_progress: Axon POST /callbacks/start - ready --> pending: Data Collector clicks "Unready" + ready --> pending: Transfer disconnect recovery ready --> cancelled: Task cancelled - in_progress --> completed: Axon POST /callbacks/finish (success) - in_progress --> failed: Axon POST /callbacks/finish (failure) + in_progress --> uploading: Axon POST /callbacks/finish + uploading --> completed: Transfer upload_complete + upload_ack + uploading --> failed: Transfer upload_failed completed --> [*] failed --> [*] cancelled --> [*] @@ -77,7 +79,7 @@ stateDiagram-v2 #### pending → ready -- **Trigger**: Data collector clicks "Make Ready" in Synapse UI +- **Trigger**: Recorder confirms TaskConfig application (`config_applied` or ready state snapshot) - **Validation**: - Task status is `pending` - Task is assigned to a Workstation @@ -96,12 +98,21 @@ stateDiagram-v2 - Record `started_at` timestamp - Record active ROS Topics -#### in_progress → completed +#### in_progress → uploading - **Trigger**: Axon calls [`POST /callbacks/finish`](implementation/axon_teleoperation.md:1107) with `error == null` - **Validation**: Task status is `in_progress` +- **Side Effects**: + - Mark Task `uploading` + - Trigger Transfer `upload_request` when the device is connected + +#### uploading → completed + +- **Trigger**: Transfer reports `upload_complete`, Keystone verifies S3 objects, then sends `upload_ack` +- **Validation**: Task status is `pending`, `ready`, `in_progress`, or `uploading` - **Side Effects**: - Record `completed_at` timestamp + - Clear upload error message - Create Episode (`qa_status: pending_qa`) - Trigger Edge Dagster QA Job @@ -147,13 +158,13 @@ Batch-scoped Tasks are created for selected Workstations (status: pending) ### Phase 1: Task Preparation and TaskConfig Distribution ``` -Data Collector → Synapse UI clicks "Make Ready" +Keystone → Dispatch TaskConfig to recorder ↓ - Synapse → PATCH /tasks/{id} {status: "ready"} + Axon recorder applies config ↓ Keystone → Task: pending → ready ↓ - Trigger notification for Axon to pull config + Axon keeps TaskConfig locally ↓ Axon → GET /tasks/{id}/config (Device token) ↓ @@ -253,7 +264,6 @@ Four weighted checks: | `GET` | `/tasks` | List tasks (with filtering) | Synapse UI | | `GET` | `/tasks/{id}` | Get task details | Synapse UI | | `GET` | `/tasks/{id}/config` | Get task config (Axon pull) | Axon | -| `PATCH` | `/tasks/{id}` | Update task status | Synapse UI | | `POST` | `/tasks` | Create task (internal use) | Order Manager | ### 4.2 Callback Interfaces @@ -388,8 +398,8 @@ Response 500: **Operations**: -- "Make Ready": `pending` → `ready` -- "Unready": `ready` → `pending` (timeout or reset) +- Synapse displays task status and creates tasks. +- Task status transitions are driven by recorder callbacks/state snapshots and Transfer upload events. ### 5.3 Task and Dagster QA Interaction @@ -444,7 +454,7 @@ CREATE TABLE tasks ( sop_id BIGINT NOT NULL, -- Status - status VARCHAR(32) NOT NULL DEFAULT 'pending' COMMENT 'pending|ready|in_progress|completed|failed|cancelled', + status VARCHAR(32) NOT NULL DEFAULT 'pending' COMMENT 'pending|ready|in_progress|uploading|completed|failed|cancelled', -- Timestamps created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, diff --git a/docs/designs/task-uploading-status.zh.html b/docs/designs/task-uploading-status.zh.html new file mode 100644 index 0000000..37ef4b3 --- /dev/null +++ b/docs/designs/task-uploading-status.zh.html @@ -0,0 +1,821 @@ + + + + + + + Task Uploading 状态设计 + + + +
+
+
+

Keystone task state design

+

Task uploading 状态方案

+

+ 本方案为 task 新增 uploading 状态,用于表达“录制已结束,任务进入上传后处理阶段”。核心目标是让 Recorder 断连、ping timeout 或重连状态同步不再把已完成录制的任务回退到 pending,同时允许订单和批次的生产侧先收敛。 +

+ +
+ +
+ +
+

目标与边界

+
+
+ 目标 + 保护已结束录制的任务 +

收到合法 finish callback 后,task 立即进入 uploading,后续 Recorder 断连不再回退。

+
+
+ 非目标 + 不设计上传调度器 +

第一版不做自动重试、不做上传超时、不持久化上传进度、不新增 pending_uploadupload_stalled

+
+
+ 约束 + 区分生产与后处理 +

completed_at 仍只表示 task 到达终态;order/batch completed 可以早于部分 uploading 后处理完成。

+
+
+
+ +
+

状态语义

+
+
+ pending +

未下发,等待配置或执行。

+
+
+ ready +

Recorder 已配置,等待开始录制。

+
+
+ in_progress +

Recorder 正在录制,仍属于 Recorder 阶段。

+
+
+ uploading +

录制已结束,上传后处理已接管或待接管。

+
+
+ completed +

S3 验证、episode 写入、upload_ack 已完成。

+
+
+
+ 关键定义:uploading 不严格表示“字节已经开始上传”,而表示“Recorder 不再拥有该 task,任务进入上传后处理阶段”。它不计入 in_progress,也不占用工位生产状态。 +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
状态是否终态是否允许回退到 pending主要拥有方
pending不适用Keystone
ready允许,来自 clear、cancel、断连恢复Recorder
in_progress允许,来自 cancel、断连恢复Recorder
uploading禁止Transfer / Keystone
completed / failed / cancelled禁止Keystone
+
+ +
+

设备归属校验

+

所有来自 Recorder 或 Transfer 的设备事件,在修改 task 状态、写 error_message、写 episode 或发送 ACK 前,都必须确认事件来自拥有该 task 的设备。

+
tasks.task_id = message.task_id
+tasks.workstation_id = workstations.id
+workstations.robot_id = robots.id
+robots.device_id = callback.device_id 或当前 Transfer WebSocket 的 device_id
+ + + + + + + + + + + + + + + + + + + + + +
入口校验失败时
finish callback / Recorder state reconcile不改状态,只记录 warning/error;finish callback 返回可说明 task/device 不匹配。
upload_started / upload_failed / upload_not_found不改状态,不写错误到该 task,只记录来源设备和 task_id。
upload_complete不写 episode,不改 task,不发 upload_ack
+
+ +
+

事件规则

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
事件状态变更字段与响应
合法 finish callbackpending/ready/in_progress/uploading -> uploading归属校验通过后立即更新状态;不写 completed_at;之后尝试发送 upload_request
upload_request 发送成功保持 uploading清空 upload 相关 error_message;finish callback 返回 200,并带 upload_request_sent: true
upload_request 发送失败保持 uploading写入 error_message;合法 finish callback 仍返回 200,并带 upload_request_sent: false
upload_startedpending/ready/in_progress/uploading -> uploading归属校验通过后幂等确认上传阶段;清空 upload 相关 error_message;不碰终态。
upload_complete 且 ACK 成功pending/ready/in_progress/uploading -> completed归属校验通过后写 episode;发送 upload_ack;写 completed_at 并清空 error_message
重复 upload_complete 且 task 已 completed保持 completed允许幂等发送 upload_ack;不重复写 episode、不改 completed_at、不重复推进 batch/order。
upload_failedin_progress/uploading -> failed归属校验通过后写 completed_aterror_message;不回滚已 completed 的 order/batch。
upload_not_found保持 uploading归属校验通过后只写 error_message;不自动失败;不触发 batch/order 推进。
重复 finish callback保持 uploading幂等处理;不写 completed_at;重新尝试 upload_request;根据发送结果清空或更新错误。
终态收到迟到事件failed/cancelled 不恢复failed/cancelled 收到 upload_complete 时不改状态、不发 ACK;只记录日志。
+ +

状态机约束

+
pending -> ready -> in_progress -> uploading -> completed
+                                  |             |
+                                  |             -> failed
+                                  -> failed
+
+ready/in_progress -> pending       仅限 Recorder 阶段回退
+uploading -> pending               禁止
+failed -> uploading/completed      第一版禁止自动恢复
+
+ 第一版允许 task 长期停留在 uploading。该状态表示上传后处理残留,不阻塞 order/batch 的生产侧完成;后续如需收敛,由自动超时方案处理。 +
+
+ +
+

断连与回退规则

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
触发源允许回退禁止影响
Recorder 普通 disconnectready/in_progress -> pendinguploading、终态
Recorder ping timeoutready/in_progress -> pendinguploading、终态
Transfer disconnectready/in_progress -> pending,并可通知 Recorder clear/canceluploading、终态
Recorder state reconcilepending -> readypending/ready -> in_progress不能把 uploading 拉回 readyin_progress
Recorder clear/cancel RPCready/in_progress -> pendinguploading、终态
+
+ +
+

接口调整

+ +
+ +
+

批次与订单

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
规则结论
uploading 是否终态不是终态,但不永久阻塞 order/batch 的生产侧收敛。
正常 batch active -> completed仍要求所有 task 进入 completed/failed/cancelled
order 达标收尾order 可以 completed;open batch 可 completed;保留 uploading task 继续后处理。
batch cancel / recall只取消 pending/ready/in_progress;不取消 uploading
completed order/batch 下的后处理uploading -> completed/failed 只更新 task、episode 和统计;不回滚 order/batch。
自动准备下一条任务uploading 不阻塞同工位准备下一条;但需要 Recorder fresh idle 且 Transfer 在线。
工位占用uploading 不改变工位生产占用语义,工位可用性仍看 Recorder 状态和设备连接。
+
+ +
+

数据库与迁移

+ +
+ +
+

前端与大屏

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
位置调整
任务列表 / 详情uploading 显示为“上传中”,颜色复用进行中或同步中的蓝色系;详情和列表可展示 error_message
生产大屏生命周期轨改为 5 节点:pending -> ready -> in_progress -> uploading -> completed
大屏后端统计dashboardTaskCounts、summary、trend、batch task summary、status distribution 都单独处理 uploading
趋势与统计卡uploading 不计入 pendingin_progress,避免双算或误读。
recent tasksstatus_textuploading 返回“上传中”。
生产语义order/batch completed 下仍可显示 uploading task;这是允许的上传后处理状态。
+
+ +
+

测试清单

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
类别必须覆盖的行为
finish callback归属校验通过的合法 callback 立即把 pending/ready/in_progress/uploading 改为或保持 uploading,并返回 200
上传请求失败Transfer 不在线或写失败时保持 uploading,写 error_message,响应 upload_request_sent: false
设备归属callback、upload_startedupload_completeupload_failedupload_not_found 的 device mismatch 都不能改 task。
断连恢复Recorder disconnect、Recorder ping timeout、Transfer disconnect 都不能把 uploading 回退到 pending
上传完成upload_complete + upload_ackuploading 改为 completed;已 completed 的重复 complete 可幂等 ACK。
上传失败upload_faileduploading 改为 failed,并写终态时间和错误;迟到 complete 不恢复 failed。
未找到上传upload_not_found 保持 uploading,只更新 error_message
接口移除PUT /tasks/:idPOST /transfer/{device_id}/upload_ack 不再可用。
删除与 API 展示uploading 禁止删除;任务 API 返回 error_message
批次与订单order 达标可 completed;open batch 可 completed;uploading 保留后处理且不影响工位占用。
大屏统计状态分布、summary、trend、batch summary、recent tasks、5 节点生命周期轨都能正确展示 uploading
长期上传中第一版允许 task 长期停留 uploading,不阻塞 order/batch 生产侧完成。
+
+
+ + diff --git a/docs/designs/upload-service-design.md b/docs/designs/upload-service-design.md index ac3142f..6ab9b52 100644 --- a/docs/designs/upload-service-design.md +++ b/docs/designs/upload-service-design.md @@ -311,10 +311,7 @@ POST /api/v1/transfer/{device_id}/status_query #### Manual ACK -``` -POST /api/v1/transfer/{device_id}/upload_ack -{"task_id": "task_abc123"} -``` +Removed. Keystone now sends `upload_ack` only from the verified `upload_complete` path. ## Security Considerations diff --git a/internal/api/handlers/axon_rpc.go b/internal/api/handlers/axon_rpc.go index 68f34fe..92a4bd5 100644 --- a/internal/api/handlers/axon_rpc.go +++ b/internal/api/handlers/axon_rpc.go @@ -152,14 +152,14 @@ func (h *RecorderHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request "SELECT COUNT(1) FROM robots WHERE device_id = ? AND deleted_at IS NULL", deviceID, ); err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(queryCtx.Err(), context.DeadlineExceeded) { - logger.Printf("[RECORDER] Device %s: DB query timeout after %s (timeout_ms=%d): %v", deviceID, timeoutLogValue(queryTimeout), timeoutLogMilliseconds(queryTimeout), err) + logger.Printf("%s DB query timeout after %s (timeout_ms=%d): %v", recorderLogPrefix(deviceID), timeoutLogValue(queryTimeout), timeoutLogMilliseconds(queryTimeout), err) } else { - logger.Printf("[RECORDER] Device %s: DB query error: %v", deviceID, err) + logger.Printf("%s DB query error: %v", recorderLogPrefix(deviceID), err) } } // Check count regardless of DB error (count defaults to 0 on error) if count == 0 { - logger.Printf("[RECORDER] Device %s: robot not found in database", deviceID) + logger.Printf("%s robot not found in database", recorderLogPrefix(deviceID)) w.WriteHeader(http.StatusNotFound) return } @@ -168,7 +168,7 @@ func (h *RecorderHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request // Allow any origin in dev; tighten in production conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{InsecureSkipVerify: true}) if err != nil { - logger.Printf("[RECORDER] Device %s: WebSocket accept error: %v", deviceID, err) + logger.Printf("%s WebSocket accept error: %v", recorderLogPrefix(deviceID), err) return } @@ -181,7 +181,7 @@ func (h *RecorderHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request defer func() { if err := conn.Close(websocket.StatusNormalClosure, ""); err != nil { if !isExpectedWebSocketCloseError(err) { - logger.Printf("[RECORDER] Device %s: WebSocket close error: %v", deviceID, err) + logger.Printf("%s WebSocket close error: %v", recorderLogPrefix(deviceID), err) } } }() @@ -196,14 +196,14 @@ func (h *RecorderHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request go h.pingLoop(ctx, rc) // #nosec G706 -- Set aside for now - logger.Printf("[RECORDER] Recorder %s connected from %s", deviceID, remoteIP) + logger.Printf("%s connected from %s", recorderLogPrefix(deviceID), remoteIP) go h.syncRecorderStateFromDevice(ctx, rc) for { _, raw, err := conn.Read(ctx) if err != nil { if !isExpectedWebSocketCloseError(err) { - logger.Printf("[RECORDER] Recorder %s disconnected: %v", deviceID, err) + logger.Printf("%s disconnected: %v", recorderLogPrefix(deviceID), err) } return } @@ -212,7 +212,7 @@ func (h *RecorderHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request var msg map[string]interface{} if err := json.Unmarshal(raw, &msg); err != nil { - logger.Printf("[RECORDER] Recorder %s invalid JSON: %v", deviceID, err) + logger.Printf("%s invalid JSON: %v", recorderLogPrefix(deviceID), err) continue } @@ -294,12 +294,12 @@ func (h *RecorderHandler) Begin(c *gin.Context) { // pending is allowed because recorder may preserve ready state across a transient // WebSocket disconnect while Keystone has already rolled the task back. if taskID != "" && h.db != nil { - res, err := advanceTaskPendingOrReadyToInProgress(h.db, taskID) + rowsAffected, _, err := advanceTaskPendingOrReadyToInProgress(h.db, taskID) if err != nil { - logger.Printf("[RECORDER] Device %s: failed to advance task pending/ready->in_progress after begin: task=%s err=%v", c.Param("device_id"), taskID, err) + logger.Printf("%s failed to advance task pending/ready->in_progress after begin: err=%v", recorderTaskLogPrefix(c.Param("device_id"), taskID), err) return } - if n, _ := res.RowsAffected(); n == 0 { + if rowsAffected == 0 { h.logBeginTransitionNoop(c.Param("device_id"), taskID) } } @@ -406,12 +406,12 @@ func (h *RecorderHandler) Cancel(c *gin.Context) { now, taskID, ) if err != nil { - logger.Printf("[RECORDER] Device %s: failed to revert task after cancel RPC: task=%s err=%v", deviceID, taskID, err) + logger.Printf("%s failed to revert task after cancel RPC: err=%v", recorderTaskLogPrefix(deviceID, taskID), err) return } n, _ := res.RowsAffected() if n == 0 { - logger.Printf("[RECORDER] Device %s: task revert skipped after cancel RPC (not found or not ready/in_progress): task=%s", deviceID, taskID) + logger.Printf("%s task revert skipped after cancel RPC (not found or not ready/in_progress)", recorderTaskLogPrefix(deviceID, taskID)) } } } @@ -461,11 +461,11 @@ func (h *RecorderHandler) Clear(c *gin.Context) { now, taskID, ) if err != nil { - logger.Printf("[RECORDER] Device %s: failed to revert task ready->pending after clear: task=%s err=%v", c.Param("device_id"), taskID, err) + logger.Printf("%s failed to revert task ready->pending after clear: err=%v", recorderTaskLogPrefix(c.Param("device_id"), taskID), err) return } if n, _ := res.RowsAffected(); n == 0 { - logger.Printf("[RECORDER] Device %s: task ready->pending skipped after clear (not found or not ready): task=%s", c.Param("device_id"), taskID) + logger.Printf("%s task ready->pending skipped after clear (not found or not ready)", recorderTaskLogPrefix(c.Param("device_id"), taskID)) } } } @@ -957,7 +957,7 @@ func recorderPreviousStateFromRaw(raw map[string]interface{}) string { func (h *RecorderHandler) handleMessage(deviceID string, rc *services.RecorderConn, msg map[string]interface{}) { if h.hub.Get(deviceID) != rc { - logger.Printf("[RECORDER] Recorder %s ignored message from replaced connection", deviceID) + logger.Printf("%s ignored message from replaced connection", recorderLogPrefix(deviceID)) return } @@ -969,16 +969,16 @@ func (h *RecorderHandler) handleMessage(deviceID string, rc *services.RecorderCo h.handleStateUpdate(rc, msg) case "connected": // #nosec G706 -- Set aside for now - logger.Printf("[RECORDER] Recorder %s sent connected event", deviceID) + logger.Printf("%s sent connected event", recorderLogPrefix(deviceID)) case "config_applied": data := mapValue(msg, "data") taskID := stringValue(data, "task_id") // #nosec G706 -- Set aside for now - logger.Printf("[RECORDER] Recorder %s config applied task=%s", deviceID, taskID) + logger.Printf("%s config applied", recorderTaskLogPrefix(deviceID, taskID)) advanceTaskPendingToReady(h.db, deviceID, taskID, "config_applied") default: // #nosec G706 -- Set aside for now - logger.Printf("[RECORDER] Recorder %s unknown message type %q", deviceID, msgType) + logger.Printf("%s unknown message type %q", recorderLogPrefix(deviceID), msgType) } } @@ -991,7 +991,7 @@ func (h *RecorderHandler) handleRPCResponse(deviceID string, msg map[string]inte Data: mapValue(msg, "data"), } if !h.hub.HandleRPCResponse(deviceID, response) { - logger.Printf("[RECORDER] Recorder %s unmatched response request_id=%s", deviceID, response.RequestID) + logger.Printf("%s unmatched response request_id=%s", recorderLogPrefix(deviceID), response.RequestID) } } @@ -1015,7 +1015,7 @@ func (h *RecorderHandler) syncRecorderStateFromDevice(ctx context.Context, rc *s if _, _, err := h.refreshRecorderState(ctx, rc.DeviceID, rc, -1); err != nil { if !errors.Is(err, services.ErrRecorderNotConnected) && !errors.Is(err, context.Canceled) { - logger.Printf("[RECORDER] Recorder %s get_state after connect failed: %v", rc.DeviceID, err) + logger.Printf("%s get_state after connect failed: %v", recorderLogPrefix(rc.DeviceID), err) } } } @@ -1024,6 +1024,7 @@ func (h *RecorderHandler) applyRecorderStateSnapshot(rc *services.RecorderConn, if rc == nil { return nil } + previous := rc.GetState() state.Source = strings.TrimSpace(source) state.TaskID = strings.TrimSpace(state.TaskID) if !recorderStateKeepsTaskID(state.CurrentState) { @@ -1035,22 +1036,115 @@ func (h *RecorderHandler) applyRecorderStateSnapshot(rc *services.RecorderConn, if state.TaskID == "" && recorderStateRequiresTaskID(state.CurrentState, state.Source) { err := fmt.Errorf("recorder %s snapshot missing task_id for state=%s", state.Source, strings.TrimSpace(state.CurrentState)) h.markRecorderSyncing(rc, state.Source, err) - logger.Printf("[RECORDER] Recorder %s ignored %s state=%s without task_id", rc.DeviceID, state.Source, state.CurrentState) + logger.Printf("%s ignored %s state=%s without task_id", recorderLogPrefix(rc.DeviceID), state.Source, state.CurrentState) return err } rc.UpdateState(state) st := rc.GetState() h.reconcileRecorderTaskState(rc.DeviceID, st, source) h.publishRecorderStateSnapshot(rc, st, source, true) - if source == "state_update" { - // #nosec G706 -- Set aside for now - logger.Printf("[RECORDER] Recorder %s state=%s task=%s", rc.DeviceID, st.CurrentState, st.TaskID) - return nil + if recorderStateSnapshotChanged(previous, st) { + logRecorderStateChange(rc.DeviceID, previous, st, source) } - logger.Printf("[RECORDER] Recorder %s state=%s task=%s source=%s", rc.DeviceID, st.CurrentState, st.TaskID, source) return nil } +func recorderStateSnapshotChanged(previous services.RecorderState, next services.RecorderState) bool { + return !strings.EqualFold(strings.TrimSpace(previous.CurrentState), strings.TrimSpace(next.CurrentState)) || + strings.TrimSpace(previous.TaskID) != strings.TrimSpace(next.TaskID) +} + +func logRecorderStateChange(deviceID string, previous services.RecorderState, next services.RecorderState, source string) { + prefix := recorderStateLogPrefix(deviceID, previous.TaskID, next.TaskID) + previousState := recorderLogState(previous.CurrentState) + nextState := recorderLogState(next.CurrentState) + logSource := recorderLogSource(source) + if !strings.EqualFold(strings.TrimSpace(previous.CurrentState), strings.TrimSpace(next.CurrentState)) { + logger.Printf("%s state changed: %s -> %s source=%s", prefix, previousState, nextState, logSource) + return + } + logger.Printf("%s task changed: %s -> %s state=%s source=%s", + prefix, + recorderLogTaskID(previous.TaskID), + recorderLogTaskID(next.TaskID), + nextState, + logSource, + ) +} + +func recorderLogState(state string) string { + state = strings.TrimSpace(state) + if state == "" { + return "-" + } + return state +} + +func recorderLogTaskID(taskID string) string { + taskID = strings.TrimSpace(taskID) + if taskID == "" { + return "-" + } + return taskID +} + +func recorderLogSource(source string) string { + source = strings.TrimSpace(source) + if source == "" { + return "state_update" + } + return source +} + +func recorderLogPrefix(deviceID string) string { + return logPrefixWithTask("RECORDER", deviceID, "") +} + +func recorderTaskLogPrefix(deviceID, taskID string) string { + return logPrefixWithTask("RECORDER", deviceID, taskID) +} + +func recorderStateLogPrefix(deviceID, previousTaskID, nextTaskID string) string { + taskID := strings.TrimSpace(nextTaskID) + if taskID == "" { + taskID = strings.TrimSpace(previousTaskID) + } + return recorderTaskLogPrefix(deviceID, taskID) +} + +func transferLogPrefix(deviceID string) string { + return logPrefixWithTask("TRANSFER", deviceID, "") +} + +func transferTaskLogPrefix(deviceID, taskID string) string { + return logPrefixWithTask("TRANSFER", deviceID, taskID) +} + +func deviceLogPrefix(deviceID string) string { + return logPrefixWithTask("DEVICE", deviceID, "") +} + +func deviceTaskLogPrefix(deviceID, taskID string) string { + return logPrefixWithTask("DEVICE", deviceID, taskID) +} + +func logPrefixWithTask(component, deviceID, taskID string) string { + component = strings.TrimSpace(component) + if component == "" { + component = "DEVICE" + } + deviceID = strings.TrimSpace(deviceID) + taskID = strings.TrimSpace(taskID) + prefix := "[" + component + "]" + if deviceID != "" { + prefix += "[" + deviceID + "]" + } + if taskID != "" { + prefix += "[" + taskID + "]" + } + return prefix +} + func recorderStateFromRPCData(data map[string]interface{}) services.RecorderState { state := services.RecorderState{ CurrentState: firstNonEmptyString(data, "state", "current", "current_state"), @@ -1116,15 +1210,15 @@ func (h *RecorderHandler) reconcileRecorderTaskState(deviceID string, state serv currentState := strings.ToLower(strings.TrimSpace(state.CurrentState)) switch currentState { case "ready": - advanceTaskPendingToReady(h.db, deviceID, taskID, source+" ready") + advanceTaskPendingToReady(h.db, deviceID, taskID, source+"_ready") case "recording", "paused": - res, err := advanceTaskPendingOrReadyToInProgress(h.db, taskID) + rowsAffected, previousStatus, err := advanceTaskPendingOrReadyToInProgress(h.db, taskID) if err != nil { - logger.Printf("[RECORDER] Recorder %s: failed to advance task pending/ready->in_progress after %s %s: task=%s err=%v", deviceID, source, currentState, taskID, err) + logger.Printf("%s failed to advance task pending/ready->in_progress after %s %s: err=%v", recorderTaskLogPrefix(deviceID, taskID), source, currentState, err) return } - if n, _ := res.RowsAffected(); n > 0 { - logger.Printf("[RECORDER] Device %s: task status reconciled: task=%s source=%s_%s status=in_progress", deviceID, taskID, source, currentState) + if rowsAffected > 0 { + logger.Printf("%s task status updated: %s -> in_progress reason=%s_%s", recorderTaskLogPrefix(deviceID, taskID), taskStatusLogValue(previousStatus, "unknown"), source, currentState) } } } @@ -1134,6 +1228,7 @@ func advanceTaskPendingToReady(db *sqlx.DB, deviceID, taskID, source string) { if db == nil || taskID == "" { return } + previousStatus, _, _ := currentTaskStatus(db, taskID) now := time.Now().UTC() res, err := db.Exec( `UPDATE tasks @@ -1145,44 +1240,51 @@ func advanceTaskPendingToReady(db *sqlx.DB, deviceID, taskID, source string) { now, now, taskID, ) if err != nil { - logger.Printf("[RECORDER] Device %s: failed to advance task pending->ready after %s: task=%s err=%v", deviceID, source, taskID, err) + logger.Printf("%s failed to advance task pending->ready after %s: err=%v", recorderTaskLogPrefix(deviceID, taskID), source, err) return } if n, _ := res.RowsAffected(); n > 0 { - logger.Printf("[RECORDER] Device %s: task status reconciled: task=%s source=%s status=ready", deviceID, taskID, source) + logger.Printf("%s task status updated: %s -> ready reason=%s", recorderTaskLogPrefix(deviceID, taskID), taskStatusLogValue(previousStatus, "unknown"), source) } } -func advanceTaskPendingOrReadyToInProgress(db *sqlx.DB, taskID string) (sql.Result, error) { +func advanceTaskPendingOrReadyToInProgress(db *sqlx.DB, taskID string) (int64, string, error) { if db == nil { - return nil, nil + return 0, "", nil } + taskID = strings.TrimSpace(taskID) + previousStatus, _, _ := currentTaskStatus(db, taskID) now := time.Now().UTC() - return db.Exec( + res, err := db.Exec( `UPDATE tasks SET status = 'in_progress', started_at = CASE WHEN started_at IS NULL THEN ? ELSE started_at END, updated_at = ? WHERE task_id = ? AND status IN ('pending', 'ready') AND deleted_at IS NULL`, - now, now, strings.TrimSpace(taskID), + now, now, taskID, ) + if err != nil { + return 0, previousStatus, err + } + rowsAffected, _ := res.RowsAffected() + return rowsAffected, previousStatus, nil } func (h *RecorderHandler) logBeginTransitionNoop(deviceID, taskID string) { status, ok, err := currentTaskStatus(h.db, taskID) if err != nil { - logger.Printf("[RECORDER] Device %s: task status lookup failed after begin: task=%s err=%v", deviceID, taskID, err) + logger.Printf("%s task status lookup failed after begin: err=%v", recorderTaskLogPrefix(deviceID, taskID), err) return } if ok && (status == "in_progress" || status == "completed") { return } if !ok { - logger.Printf("[RECORDER] Device %s: task pending/ready->in_progress skipped after begin (task not found): task=%s", deviceID, taskID) + logger.Printf("%s task pending/ready->in_progress skipped after begin (task not found)", recorderTaskLogPrefix(deviceID, taskID)) return } - logger.Printf("[RECORDER] Device %s: task pending/ready->in_progress skipped after begin (current_status=%s): task=%s", deviceID, status, taskID) + logger.Printf("%s task pending/ready->in_progress skipped after begin (current_status=%s)", recorderTaskLogPrefix(deviceID, taskID), status) } func currentTaskStatus(db *sqlx.DB, taskID string) (string, bool, error) { @@ -1221,10 +1323,10 @@ func (h *RecorderHandler) pingLoop(ctx context.Context, rc *services.RecorderCon cancel() if err != nil { if ctx.Err() == nil { - logWebSocketPingFailure("RECORDER", "Recorder", rc.DeviceID, timeout, timedOut, err) + logWebSocketPingFailure("RECORDER", rc.DeviceID, timeout, timedOut, err) if closeErr := rc.Conn.CloseNow(); closeErr != nil { if !isExpectedWebSocketCloseError(closeErr) { - logger.Printf("[RECORDER] Recorder %s close after ping failure: %v", rc.DeviceID, closeErr) + logger.Printf("%s close after ping failure: %v", recorderLogPrefix(rc.DeviceID), closeErr) } } } @@ -1270,21 +1372,21 @@ func logRecorderRPCTimeout(deviceID, action, taskID, source string, timeout time } taskID = strings.TrimSpace(taskID) if taskID != "" { - logger.Printf("[RECORDER] Recorder %s RPC timeout after %s (timeout_ms=%d): action=%s task=%s source=%s err=%v", deviceID, timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), action, taskID, source, err) + logger.Printf("%s RPC timeout after %s (timeout_ms=%d): action=%s source=%s err=%v", recorderTaskLogPrefix(deviceID, taskID), timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), action, source, err) return } - logger.Printf("[RECORDER] Recorder %s RPC timeout after %s (timeout_ms=%d): action=%s source=%s err=%v", deviceID, timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), action, source, err) + logger.Printf("%s RPC timeout after %s (timeout_ms=%d): action=%s source=%s err=%v", recorderLogPrefix(deviceID), timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), action, source, err) } -func logWebSocketPingFailure(component, label, deviceID string, timeout time.Duration, timedOut bool, err error) { +func logWebSocketPingFailure(component, deviceID string, timeout time.Duration, timedOut bool, err error) { component = strings.TrimSpace(component) - label = strings.TrimSpace(label) deviceID = strings.TrimSpace(deviceID) + prefix := logPrefixWithTask(component, deviceID, "") if timedOut { - logger.Printf("[%s] %s %s ping timeout after %s (timeout_ms=%d): %v", component, label, deviceID, timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), err) + logger.Printf("%s ping timeout after %s (timeout_ms=%d): %v", prefix, timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), err) return } - logger.Printf("[%s] %s %s ping failed (timeout=%s timeout_ms=%d): %v", component, label, deviceID, timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), err) + logger.Printf("%s ping failed (timeout=%s timeout_ms=%d): %v", prefix, timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), err) } func recorderPingInterval(cfg *config.RecorderConfig) time.Duration { @@ -1305,10 +1407,10 @@ func closeReplacedRecorderConn(deviceID string, rc *services.RecorderConn) { if rc == nil || rc.Conn == nil { return } - logger.Printf("[RECORDER] Device %s: closing replaced WebSocket connection", deviceID) + logger.Printf("%s closing replaced WebSocket connection", recorderLogPrefix(deviceID)) if err := rc.Conn.CloseNow(); err != nil { if !isExpectedWebSocketCloseError(err) { - logger.Printf("[RECORDER] Device %s: replaced WebSocket close error: %v", deviceID, err) + logger.Printf("%s replaced WebSocket close error: %v", recorderLogPrefix(deviceID), err) } } } diff --git a/internal/api/handlers/batch.go b/internal/api/handlers/batch.go index 39c95b2..74b399f 100644 --- a/internal/api/handlers/batch.go +++ b/internal/api/handlers/batch.go @@ -100,14 +100,6 @@ func (h *BatchHandler) RegisterCollectorRoutes(apiV1 gin.IRoutes) { // recalledEpisodeLabel is appended to episodes.labels (JSON string array) when a batch is recalled (see RecallBatch). const recalledEpisodeLabel = "recalled_batch" -// batchAdvanceTriggerStatuses lists task statuses that trigger tryAdvanceBatchStatus after a task -// update via PUT /tasks. Tasks become cancelled only via PATCH batch cancel, which does not invoke -// that hook; transfer completion sets completed and also calls tryAdvanceBatchStatus. -var batchAdvanceTriggerStatuses = map[string]struct{}{ - "completed": {}, - "failed": {}, -} - var validBatchStatuses = map[string]struct{}{ "pending": {}, "active": {}, diff --git a/internal/api/handlers/production_dashboard.go b/internal/api/handlers/production_dashboard.go index 4e9d0aa..1a60a07 100644 --- a/internal/api/handlers/production_dashboard.go +++ b/internal/api/handlers/production_dashboard.go @@ -114,6 +114,7 @@ type dashboardOverviewSummary struct { TodayEpisodes int64 `json:"today_episodes"` CompletedTasks int64 `json:"completed_tasks"` InProgressTasks int64 `json:"in_progress_tasks"` + UploadingTasks int64 `json:"uploading_tasks"` PendingTasks int64 `json:"pending_tasks"` FailedTasks int64 `json:"failed_tasks"` CancelledTasks int64 `json:"cancelled_tasks"` @@ -132,6 +133,7 @@ type dashboardTaskCounts struct { Total int64 `json:"total" db:"total"` Completed int64 `json:"completed" db:"completed"` InProgress int64 `json:"in_progress" db:"in_progress"` + Uploading int64 `json:"uploading" db:"uploading"` Pending int64 `json:"pending" db:"pending"` Ready int64 `json:"ready" db:"ready"` Failed int64 `json:"failed" db:"failed"` @@ -143,6 +145,7 @@ type dashboardTrendItem struct { Total int64 `json:"total" db:"total"` Completed int64 `json:"completed" db:"completed"` InProgress int64 `json:"in_progress" db:"in_progress"` + Uploading int64 `json:"uploading" db:"uploading"` Pending int64 `json:"pending" db:"pending"` Failed int64 `json:"failed" db:"failed"` } @@ -358,6 +361,7 @@ type dashboardBatchTaskSummaryItem struct { Pending int64 `json:"-" db:"pending"` Ready int64 `json:"-" db:"ready"` InProgress int64 `json:"-" db:"in_progress"` + Uploading int64 `json:"-" db:"uploading"` Completed int64 `json:"-" db:"completed"` Failed int64 `json:"-" db:"failed"` Cancelled int64 `json:"-" db:"cancelled"` @@ -684,6 +688,7 @@ func (h *ProductionDashboardHandler) GetBatchTaskSummary(c *gin.Context) { COALESCE(SUM(CASE WHEN t.status = 'pending' THEN 1 ELSE 0 END), 0) AS pending, COALESCE(SUM(CASE WHEN t.status = 'ready' THEN 1 ELSE 0 END), 0) AS ready, COALESCE(SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END), 0) AS in_progress, + COALESCE(SUM(CASE WHEN t.status = 'uploading' THEN 1 ELSE 0 END), 0) AS uploading, COALESCE(SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END), 0) AS completed, COALESCE(SUM(CASE WHEN t.status = 'failed' THEN 1 ELSE 0 END), 0) AS failed, COALESCE(SUM(CASE WHEN t.status = 'cancelled' THEN 1 ELSE 0 END), 0) AS cancelled @@ -704,6 +709,7 @@ func (h *ProductionDashboardHandler) GetBatchTaskSummary(c *gin.Context) { "pending": items[i].Pending, "ready": items[i].Ready, "in_progress": items[i].InProgress, + "uploading": items[i].Uploading, "completed": items[i].Completed, "failed": items[i].Failed, "cancelled": items[i].Cancelled, @@ -872,6 +878,7 @@ func (h *ProductionDashboardHandler) dashboardTaskCounts(db dashboardDB, scope p COUNT(1) AS total, COALESCE(SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END), 0) AS completed, COALESCE(SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END), 0) AS in_progress, + COALESCE(SUM(CASE WHEN t.status = 'uploading' THEN 1 ELSE 0 END), 0) AS uploading, COALESCE(SUM(CASE WHEN t.status = 'pending' THEN 1 ELSE 0 END), 0) AS pending, COALESCE(SUM(CASE WHEN t.status = 'ready' THEN 1 ELSE 0 END), 0) AS ready, COALESCE(SUM(CASE WHEN t.status = 'failed' THEN 1 ELSE 0 END), 0) AS failed, @@ -899,6 +906,7 @@ func (h *ProductionDashboardHandler) dashboardOverviewTaskMetrics(db dashboardDB COALESCE(SUM(CASE WHEN t.assigned_at >= ? AND t.assigned_at < ? THEN 1 ELSE 0 END), 0) AS total, COALESCE(SUM(CASE WHEN t.status = 'completed' AND t.completed_at >= ? AND t.completed_at < ? THEN 1 ELSE 0 END), 0) AS completed, COALESCE(SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END), 0) AS in_progress, + COALESCE(SUM(CASE WHEN t.status = 'uploading' THEN 1 ELSE 0 END), 0) AS uploading, COALESCE(SUM(CASE WHEN t.status = 'pending' THEN 1 ELSE 0 END), 0) AS pending, 0 AS ready, COALESCE(SUM(CASE WHEN t.status = 'failed' AND t.completed_at >= ? AND t.completed_at < ? THEN 1 ELSE 0 END), 0) AS failed, @@ -962,6 +970,7 @@ func (h *ProductionDashboardHandler) dashboardTaskTrend(db dashboardDB, scope pr DATE_FORMAT(` + localEventExpr + `, '%m-%d') AS date, COALESCE(SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END), 0) AS completed, COALESCE(SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END), 0) AS in_progress, + COALESCE(SUM(CASE WHEN t.status = 'uploading' THEN 1 ELSE 0 END), 0) AS uploading, COALESCE(SUM(CASE WHEN t.status IN ('pending', 'ready') THEN 1 ELSE 0 END), 0) AS pending, COALESCE(SUM(CASE WHEN t.status = 'failed' THEN 1 ELSE 0 END), 0) AS failed FROM tasks t @@ -976,7 +985,7 @@ func (h *ProductionDashboardHandler) dashboardTaskTrend(db dashboardDB, scope pr byDate := make(map[string]dashboardTrendItem, len(rows)) for _, row := range rows { - row.Total = row.Completed + row.InProgress + row.Pending + row.Failed + row.Total = row.Completed + row.InProgress + row.Uploading + row.Pending + row.Failed byDate[row.Date] = row } items := make([]dashboardTrendItem, 0, q.TrendDays) @@ -1423,6 +1432,7 @@ func buildOverviewSummary( TodayEpisodes: todayProductionTotals.TotalEpisodes, CompletedTasks: overviewTaskMetrics.Completed, InProgressTasks: overviewTaskMetrics.InProgress, + UploadingTasks: overviewTaskMetrics.Uploading, PendingTasks: overviewTaskMetrics.Pending, FailedTasks: overviewTaskMetrics.Failed, CancelledTasks: tasks.Cancelled, @@ -1443,6 +1453,7 @@ func buildTaskStatusDistribution(tasks dashboardTaskCounts) []dashboardStatusDis {Status: "pending", Label: dashboardTaskStatusText("pending"), Value: tasks.Pending}, {Status: "ready", Label: dashboardTaskStatusText("ready"), Value: tasks.Ready}, {Status: "in_progress", Label: dashboardTaskStatusText("in_progress"), Value: tasks.InProgress}, + {Status: "uploading", Label: dashboardTaskStatusText("uploading"), Value: tasks.Uploading}, {Status: "completed", Label: dashboardTaskStatusText("completed"), Value: tasks.Completed}, {Status: "failed", Label: dashboardTaskStatusText("failed"), Value: tasks.Failed}, {Status: "cancelled", Label: dashboardTaskStatusText("cancelled"), Value: tasks.Cancelled}, @@ -1671,6 +1682,8 @@ func dashboardTaskStatusText(status string) string { return "就绪" case "in_progress": return "进行中" + case "uploading": + return "上传中" case "completed": return "已完成" case "failed": diff --git a/internal/api/handlers/recorder_axon_interaction_test.go b/internal/api/handlers/recorder_axon_interaction_test.go index 0d6cbfd..b9617a7 100644 --- a/internal/api/handlers/recorder_axon_interaction_test.go +++ b/internal/api/handlers/recorder_axon_interaction_test.go @@ -8,6 +8,7 @@ import ( "bytes" "context" "encoding/json" + "log" "net/http" "net/http/httptest" "strings" @@ -15,6 +16,7 @@ import ( "time" "archebase.com/keystone-edge/internal/config" + "archebase.com/keystone-edge/internal/logger" "archebase.com/keystone-edge/internal/services" "github.com/coder/websocket" "github.com/coder/websocket/wsjson" @@ -1308,6 +1310,44 @@ func TestRecorderWebSocketStateUpdateBeforeGetStateResponseIsIdempotent(t *testi } } +func TestRecorderStateSnapshotLogsOnlyStateChanges(t *testing.T) { + var buf bytes.Buffer + previousLogger := logger.Get() + logger.Set(log.New(&buf, "", 0)) + defer logger.Set(previousLogger) + + hub := services.NewRecorderHub() + handler := NewRecorderHandler(hub, &config.RecorderConfig{}, nil) + rc := hub.NewRecorderConn(nil, "robot-001", "127.0.0.1") + + if err := handler.applyRecorderStateSnapshot(rc, services.RecorderState{CurrentState: "idle"}, "get_state"); err != nil { + t.Fatalf("apply idle snapshot: %v", err) + } + if err := handler.applyRecorderStateSnapshot(rc, services.RecorderState{CurrentState: "idle"}, "state_update"); err != nil { + t.Fatalf("apply duplicate idle snapshot: %v", err) + } + if err := handler.applyRecorderStateSnapshot(rc, services.RecorderState{CurrentState: "ready", TaskID: "task-ready"}, "state_update"); err != nil { + t.Fatalf("apply ready snapshot: %v", err) + } + if err := handler.applyRecorderStateSnapshot(rc, services.RecorderState{CurrentState: "ready", TaskID: "task-ready"}, "rpc_response:config"); err != nil { + t.Fatalf("apply duplicate ready snapshot: %v", err) + } + + output := buf.String() + if got := strings.Count(output, "state changed:"); got != 2 { + t.Fatalf("state change log count=%d want=2 output=%q", got, output) + } + if !strings.Contains(output, "[RECORDER][robot-001] state changed: unknown -> idle source=get_state") { + t.Fatalf("initial state change log missing: %q", output) + } + if !strings.Contains(output, "[RECORDER][robot-001][task-ready] state changed: idle -> ready source=state_update") { + t.Fatalf("ready state change log missing: %q", output) + } + if strings.Contains(output, "rpc_response:config") { + t.Fatalf("duplicate rpc_response snapshot should not be logged: %q", output) + } +} + func TestRecorderWebSocketGetStateReadyRejectsConfigWithoutSendingRPC(t *testing.T) { db := newRecorderInteractionDB(t) seedRecorderInteractionDevice(t, db, "robot-001", 1, 101) diff --git a/internal/api/handlers/task.go b/internal/api/handlers/task.go index 4b01d47..584a3bc 100644 --- a/internal/api/handlers/task.go +++ b/internal/api/handlers/task.go @@ -118,7 +118,6 @@ func (h *TaskHandler) RegisterRoutes(apiV1 *gin.RouterGroup) { apiV1.POST("/tasks", h.CreateTask) apiV1.GET("/tasks", h.ListTasks) apiV1.GET("/tasks/:id", h.GetTask) - apiV1.PUT("/tasks/:id", h.UpdateTask) apiV1.DELETE("/tasks/:id", h.DeleteTask) apiV1.GET("/tasks/:id/config", h.GetTaskConfig) } @@ -127,6 +126,7 @@ var validTaskStatuses = map[string]struct{}{ "pending": {}, "ready": {}, "in_progress": {}, + "uploading": {}, "completed": {}, "failed": {}, "cancelled": {}, @@ -147,6 +147,7 @@ type TaskListItem struct { SubsceneID string `json:"subscene_id" db:"subscene_id"` SubsceneName string `json:"subscene_name" db:"subscene_name"` Status string `json:"status" db:"status"` + ErrorMessage *string `json:"error_message" db:"error_message"` AssignedAt *string `json:"assigned_at" db:"assigned_at"` } @@ -183,6 +184,7 @@ type TaskDetailResponse struct { FactoryID *string `json:"factory_id" db:"factory_id"` OrganizationID *int64 `json:"organization_id" db:"organization_id"` Status string `json:"status" db:"status"` + ErrorMessage *string `json:"error_message" db:"error_message"` CreatedAt *string `json:"created_at" db:"created_at"` AssignedAt *string `json:"assigned_at" db:"assigned_at"` StartedAt *string `json:"started_at" db:"started_at"` @@ -192,37 +194,6 @@ type TaskDetailResponse struct { EpisodePublicID sql.NullString `db:"episode_public_id" json:"-"` } -// UpdateTaskRequest represents the request body for updating a task status. -type UpdateTaskRequest struct { - Status string `json:"status"` - UpdatedBy string `json:"updated_by"` -} - -// UpdateTaskResponse represents the response body for updating a task status. -type UpdateTaskResponse struct { - ID string `json:"id"` - Status string `json:"status"` - UpdatedAt string `json:"updated_at"` -} - -var validTaskStatusTransitions = map[string]map[string]struct{}{ - "pending": { - "ready": {}, - }, - "ready": { - "in_progress": {}, - "pending": {}, - }, - "in_progress": { - "pending": {}, - "completed": {}, - "failed": {}, - }, - "failed": {}, - "completed": {}, - "cancelled": {}, -} - // ListTasks handles task listing requests with optional filtering. // // @Summary List tasks @@ -315,6 +286,7 @@ func (h *TaskHandler) ListTasks(c *gin.Context) { CAST(tasks.subscene_id AS CHAR) AS subscene_id, COALESCE(tasks.subscene_name, '') AS subscene_name, tasks.status, + tasks.error_message, CASE WHEN tasks.assigned_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(tasks.assigned_at, @@session.time_zone, '+00:00'), '%%Y-%%m-%%dT%%H:%%i:%%sZ') END AS assigned_at FROM tasks LEFT JOIN workstations ws ON ws.id = tasks.workstation_id AND ws.deleted_at IS NULL @@ -377,6 +349,7 @@ func (h *TaskHandler) GetTask(c *gin.Context) { CASE WHEN t.factory_id IS NULL THEN NULL ELSE CAST(t.factory_id AS CHAR) END AS factory_id, t.organization_id AS organization_id, t.status, + t.error_message, CASE WHEN t.created_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.created_at, @@session.time_zone, '+00:00'), '%Y-%m-%dT%H:%i:%sZ') END AS created_at, CASE WHEN t.assigned_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.assigned_at, @@session.time_zone, '+00:00'), '%Y-%m-%dT%H:%i:%sZ') END AS assigned_at, CASE WHEN t.started_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.started_at, @@session.time_zone, '+00:00'), '%Y-%m-%dT%H:%i:%sZ') END AS started_at, @@ -412,161 +385,6 @@ func (h *TaskHandler) GetTask(c *gin.Context) { c.JSON(http.StatusOK, task) } -// UpdateTask handles task status update requests. -// -// @Summary Update task -// @Description Updates task status with restricted state transitions. Setting status to cancelled is not allowed; cancel the parent batch instead. Rejected when the parent batch status is cancelled or recalled. -// @Tags tasks -// @Accept json -// @Produce json -// @Param id path string true "Task ID" -// @Param body body UpdateTaskRequest true "Task update payload" -// @Success 200 {object} UpdateTaskResponse -// @Failure 400 {object} map[string]string -// @Failure 404 {object} map[string]string -// @Failure 409 {object} map[string]string "Conflict (invalid transition or batch is cancelled/recalled)" -// @Failure 500 {object} map[string]string -// @Router /tasks/{id} [put] -func (h *TaskHandler) UpdateTask(c *gin.Context) { - idStr := strings.TrimSpace(c.Param("id")) - if idStr == "" { - c.JSON(http.StatusBadRequest, gin.H{"error_msg": "task id is required"}) - return - } - id, err := strconv.ParseInt(idStr, 10, 64) - if err != nil || id <= 0 { - c.JSON(http.StatusBadRequest, gin.H{"error_msg": "invalid task id"}) - return - } - - var req UpdateTaskRequest - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{ - "error_msg": "Invalid request body: " + err.Error(), - }) - return - } - - req.Status = strings.TrimSpace(req.Status) - req.UpdatedBy = strings.TrimSpace(req.UpdatedBy) - - if req.Status == "" { - c.JSON(http.StatusBadRequest, gin.H{"error_msg": "status is required"}) - return - } - - if _, ok := validTaskStatuses[req.Status]; !ok { - c.JSON(http.StatusBadRequest, gin.H{"error_msg": "invalid status"}) - return - } - - if req.Status == "cancelled" { - c.JSON(http.StatusBadRequest, gin.H{ - "error_msg": "setting status to 'cancelled' is not allowed via PUT; cancel the parent batch (PATCH /batches/:id) instead", - }) - return - } - - if req.UpdatedBy == "" { - c.JSON(http.StatusBadRequest, gin.H{"error_msg": "updated_by is required"}) - return - } - - var taskRow struct { - Status string `db:"status"` - OrderID int64 `db:"order_id"` - BatchStatus sql.NullString `db:"batch_status"` - } - err = h.db.Get(&taskRow, ` - SELECT t.status, t.order_id, b.status AS batch_status - FROM tasks t - LEFT JOIN batches b ON b.id = t.batch_id AND b.deleted_at IS NULL - WHERE t.id = ? AND t.deleted_at IS NULL`, id) - if err == sql.ErrNoRows { - c.JSON(http.StatusNotFound, gin.H{"error_msg": "Task not found: " + idStr}) - return - } - if err != nil { - logger.Printf("[TASK] Failed to query task: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to query task"}) - return - } - - if taskRow.BatchStatus.Valid { - bs := taskRow.BatchStatus.String - if bs == "cancelled" || bs == "recalled" { - c.JSON(http.StatusConflict, gin.H{ - "error_msg": fmt.Sprintf("cannot update task while parent batch status is %q", bs), - "batch_status": bs, - "requested_status": req.Status, - }) - return - } - } - - if _, ok := validTaskStatusTransitions[taskRow.Status][req.Status]; !ok { - c.JSON(http.StatusConflict, gin.H{ - "error_msg": fmt.Sprintf("Cannot transition from '%s' to '%s'", taskRow.Status, req.Status), - "current_status": taskRow.Status, - "requested_status": req.Status, - }) - return - } - - now := time.Now().UTC() - - // Fetch batch_id for post-update batch state advancement - var batchIDForAdvance int64 - _ = h.db.Get(&batchIDForAdvance, "SELECT batch_id FROM tasks WHERE id = ? AND deleted_at IS NULL LIMIT 1", id) - orderIDForAdvance := taskRow.OrderID - - result, err := h.db.Exec( - "UPDATE tasks SET status = ?, updated_at = ?, ready_at = CASE WHEN ? = 'ready' THEN ? ELSE ready_at END WHERE id = ? AND status = ? AND deleted_at IS NULL", - req.Status, - now, - req.Status, - now, - id, - taskRow.Status, - ) - if err != nil { - logger.Printf("[TASK] Failed to update task: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to update task"}) - return - } - - rowsAffected, err := result.RowsAffected() - if err != nil { - logger.Printf("[TASK] Failed to get rows affected: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to verify update"}) - return - } - - if rowsAffected == 0 { - c.JSON(http.StatusConflict, gin.H{ - "error_msg": fmt.Sprintf("Cannot transition from '%s' to '%s'", taskRow.Status, req.Status), - "current_status": taskRow.Status, - "requested_status": req.Status, - }) - return - } - - // After completed/failed, try to advance the batch status (pending->active, active->completed). - if _, ok := batchAdvanceTriggerStatuses[req.Status]; ok && batchIDForAdvance > 0 { - go tryAdvanceBatchStatus(h.db, batchIDForAdvance) - } - // After completed, try to advance the order status (created->in_progress, in_progress->completed). - if req.Status == "completed" && orderIDForAdvance > 0 { - go tryAdvanceOrderStatus(h.db, orderIDForAdvance, h.recorderHub, h.recorderRPCTimeout) - } - - c.JSON(http.StatusOK, UpdateTaskResponse{ - ID: idStr, - Status: req.Status, - UpdatedAt: now.Format(time.RFC3339), - }) -} - // DeleteTask handles soft deletion of a task. // Tasks with status "completed" cannot be deleted. // @@ -600,9 +418,9 @@ func (h *TaskHandler) DeleteTask(c *gin.Context) { return } - // Completed tasks cannot be deleted (they form part of the audit trail) - if taskStatus == "completed" { - c.JSON(http.StatusConflict, gin.H{"error_msg": "cannot delete a completed task; completed tasks form part of the audit trail"}) + // Completed/uploading tasks cannot be deleted because their files or audit trail may still be in use. + if taskStatus == "completed" || taskStatus == "uploading" { + c.JSON(http.StatusConflict, gin.H{"error_msg": "cannot delete a completed or uploading task"}) return } @@ -973,11 +791,11 @@ func (h *TaskHandler) OnRecordingStart(c *gin.Context) { return } - logger.Printf("[RECORDER] Device %s: received start callback for task=%s", callback.DeviceID, callback.TaskID) + logger.Printf("%s received start callback", recorderTaskLogPrefix(callback.DeviceID, callback.TaskID)) // Validate required fields if callback.TaskID == "" { - logger.Printf("[RECORDER] Device %s: Missing task_id in callback", callback.DeviceID) + logger.Printf("%s missing task_id in callback", recorderLogPrefix(callback.DeviceID)) c.JSON(http.StatusBadRequest, gin.H{ "error_msg": "Missing required field: task_id", }) @@ -986,12 +804,12 @@ func (h *TaskHandler) OnRecordingStart(c *gin.Context) { taskStatus := "unknown" if h.db != nil { - res, err := advanceTaskPendingOrReadyToInProgress(h.db, callback.TaskID) + rowsAffected, previousStatus, err := advanceTaskPendingOrReadyToInProgress(h.db, callback.TaskID) if err != nil { - logger.Printf("[RECORDER] Device %s: failed to advance task pending/ready->in_progress after start callback: task=%s err=%v", callback.DeviceID, callback.TaskID, err) - } else if n, _ := res.RowsAffected(); n > 0 { + logger.Printf("%s failed to advance task pending/ready->in_progress after start callback: err=%v", recorderTaskLogPrefix(callback.DeviceID, callback.TaskID), err) + } else if rowsAffected > 0 { taskStatus = "in_progress" - logger.Printf("[RECORDER] Device %s: task status reconciled: task=%s source=start_callback status=in_progress", callback.DeviceID, callback.TaskID) + logger.Printf("%s task status updated: %s -> in_progress reason=start_callback", recorderTaskLogPrefix(callback.DeviceID, callback.TaskID), taskStatusLogValue(previousStatus, "unknown")) } } @@ -1036,7 +854,7 @@ func (h *TaskHandler) OnRecordingFinish(c *gin.Context) { } if callback.OutputPath == "" { - logger.Printf("[RECORDER] Failed to parse callback: missing output_path for task_id=%s", callback.TaskID) + logger.Printf("%s failed to parse callback: missing output_path", recorderTaskLogPrefix(callback.DeviceID, callback.TaskID)) c.JSON(http.StatusBadRequest, gin.H{ "error_msg": "Missing required field: output_path", }) @@ -1052,23 +870,43 @@ func (h *TaskHandler) OnRecordingFinish(c *gin.Context) { return } - logger.Printf("[RECORDER] Device %s: received finish callback for task=%s", callback.DeviceID, callback.TaskID) + logger.Printf("%s received finish callback", recorderTaskLogPrefix(callback.DeviceID, callback.TaskID)) if h.db != nil { - res, err := advanceTaskPendingOrReadyToInProgress(h.db, callback.TaskID) + previousStatus, _, _ := currentOwnedTaskStatus(c.Request.Context(), h.db, deviceID, callback.TaskID) + res, err := markOwnedTaskUploading(c.Request.Context(), h.db, deviceID, callback.TaskID) if err != nil { - logger.Printf("[RECORDER] Device %s: failed to restore task pending/ready->in_progress after finish callback: task=%s err=%v", deviceID, callback.TaskID, err) + logger.Printf("%s failed to mark task uploading after finish callback: err=%v", recorderTaskLogPrefix(deviceID, callback.TaskID), err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to update task status"}) + return } else if n, _ := res.RowsAffected(); n > 0 { - logger.Printf("[RECORDER] Device %s: task status reconciled: task=%s source=finish_callback status=in_progress", deviceID, callback.TaskID) + logger.Printf("%s task status updated: %s -> uploading reason=finish_callback", recorderTaskLogPrefix(deviceID, callback.TaskID), taskStatusLogValue(previousStatus, "unknown")) + } else { + logger.Printf("%s task uploading transition skipped after finish callback", recorderTaskLogPrefix(deviceID, callback.TaskID)) + c.JSON(http.StatusConflict, gin.H{ + "error_msg": "task is not owned by device or is not uploadable", + }) + return } } - dc := h.hub.Get(deviceID) + var dc *services.TransferConn + if h.hub != nil { + dc = h.hub.Get(deviceID) + } if dc == nil { - // TODO: add status pending_upload, when device reconnects, check for any pending_upload tasks and trigger upload then - logger.Printf("[RECORDER] Device %s: Not found in hub for task=%s, cannot trigger upload", deviceID, callback.TaskID) - c.JSON(http.StatusConflict, gin.H{ - "error_msg": "Recording finished, device not connected for auto-upload", + errorMessage := "transfer disconnected; upload_request not sent" + if h.db != nil { + if _, err := writeOwnedUploadingTaskError(c.Request.Context(), h.db, deviceID, callback.TaskID, errorMessage); err != nil { + logger.Printf("%s failed to write upload_request error: err=%v", recorderTaskLogPrefix(deviceID, callback.TaskID), err) + } + } + logger.Printf("%s not found in hub, upload_request not sent", recorderTaskLogPrefix(deviceID, callback.TaskID)) + c.JSON(http.StatusOK, gin.H{ + "success": true, + "message": "Recording finished; upload_request not sent because transfer is disconnected", + "task_status": "uploading", + "upload_request_sent": false, }) return } @@ -1081,24 +919,39 @@ func (h *TaskHandler) OnRecordingFinish(c *gin.Context) { writeTimeout := h.axonTransferWriteTimeout() if err := h.hub.SendToDeviceWithTimeout(c.Request.Context(), deviceID, uploadRequest, writeTimeout); err != nil { - status := http.StatusInternalServerError if errors.Is(err, services.ErrTransferWriteTimeout) { - status = http.StatusGatewayTimeout - logger.Printf("[TRANSFER] Device %s: auto upload_request timed out after %s for task=%s: %v", deviceID, timeoutLogValue(writeTimeout), callback.TaskID, err) + logger.Printf("%s auto upload_request timed out after %s: %v", recorderTaskLogPrefix(deviceID, callback.TaskID), timeoutLogValue(writeTimeout), err) } else { - logger.Printf("[RECORDER] Failed to send upload_request to device %s: %v", deviceID, err) + logger.Printf("%s failed to send upload_request: %v", recorderTaskLogPrefix(deviceID, callback.TaskID), err) + } + errorMessage := "upload_request failed: " + err.Error() + if h.db != nil { + if _, writeErr := writeOwnedUploadingTaskError(c.Request.Context(), h.db, deviceID, callback.TaskID, errorMessage); writeErr != nil { + logger.Printf("%s failed to write upload_request error: err=%v", recorderTaskLogPrefix(deviceID, callback.TaskID), writeErr) + } } - c.JSON(status, gin.H{ - "error_msg": "Failed to trigger upload: " + err.Error(), + c.JSON(http.StatusOK, gin.H{ + "success": true, + "message": "Recording finished; upload_request not sent", + "task_status": "uploading", + "upload_request_sent": false, + "upload_request_error": err.Error(), }) return } - logger.Printf("[RECORDER] Device %s: successfully triggered upload for task_id=%s", deviceID, callback.TaskID) + if h.db != nil { + if _, err := clearOwnedUploadingTaskError(c.Request.Context(), h.db, deviceID, callback.TaskID); err != nil { + logger.Printf("%s failed to clear upload_request error: err=%v", recorderTaskLogPrefix(deviceID, callback.TaskID), err) + } + } + logger.Printf("%s successfully triggered upload", recorderTaskLogPrefix(deviceID, callback.TaskID)) c.JSON(http.StatusOK, gin.H{ - "success": true, - "message": "Upload triggered successfully", + "success": true, + "message": "Upload triggered successfully", + "task_status": "uploading", + "upload_request_sent": true, }) } diff --git a/internal/api/handlers/task_state_recovery_test.go b/internal/api/handlers/task_state_recovery_test.go index 95c1338..d57e017 100644 --- a/internal/api/handlers/task_state_recovery_test.go +++ b/internal/api/handlers/task_state_recovery_test.go @@ -7,6 +7,7 @@ package handlers import ( "bytes" "context" + "database/sql" "encoding/json" "fmt" "net/http" @@ -456,8 +457,8 @@ func TestRecordingFinishAutoUploadUsesConfiguredTransferWriteTimeout(t *testing. w := httptest.NewRecorder() router.ServeHTTP(w, req) - if w.Code != http.StatusGatewayTimeout { - t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusGatewayTimeout, w.Body.String()) + if w.Code != http.StatusOK { + t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String()) } if hub.getDeviceID != "robot-001" { t.Fatalf("Get device=%q want=%q", hub.getDeviceID, "robot-001") @@ -474,13 +475,58 @@ func TestRecordingFinishAutoUploadUsesConfiguredTransferWriteTimeout(t *testing. if got := fmt.Sprint(hub.msg["task_id"]); got != "task-finish" { t.Fatalf("message task_id=%q want=%q", got, "task-finish") } - assertTaskStateRecoveryStatus(t, db, "task-finish", "in_progress") - assertTaskStateRecoveryTimestampSet(t, db, "task-finish", "started_at") + assertTaskStateRecoveryStatus(t, db, "task-finish", "uploading") + assertTaskStateRecoveryErrorContains(t, db, "task-finish", "upload_request failed") + assertTaskStateRecoveryErrorContains(t, db, "task-finish", "fake blocked transfer write") if !strings.Contains(w.Body.String(), custom.String()) { t.Fatalf("response body %q does not mention custom timeout %s", w.Body.String(), custom) } } +func TestRecordingFinishDisconnectedTransferRecordsUploadRequestError(t *testing.T) { + db := newTaskStateRecoveryDB(t) + defer db.Close() + seedTaskStateRecoveryTask(t, db, "task-finish-disconnected", "pending") + + hub := &recordingFinishTransferHub{} + handler := &TaskHandler{db: db, hub: hub} + + gin.SetMode(gin.TestMode) + router := gin.New() + handler.RegisterCallbackRoutes(router.Group("/callbacks")) + + body, err := json.Marshal(RecordingFinishCallback{ + TaskID: "task-finish-disconnected", + DeviceID: "robot-001", + Status: "finished", + FinishedAt: time.Now().UTC().Format(time.RFC3339), + OutputPath: "/data/task-finish-disconnected.mcap", + }) + if err != nil { + t.Fatalf("marshal callback: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/callbacks/finish", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String()) + } + if hub.getDeviceID != "robot-001" { + t.Fatalf("Get device=%q want=%q", hub.getDeviceID, "robot-001") + } + if hub.sendDeviceID != "" { + t.Fatalf("Send device=%q want empty for disconnected transfer", hub.sendDeviceID) + } + assertTaskStateRecoveryStatus(t, db, "task-finish-disconnected", "uploading") + assertTaskStateRecoveryErrorContains(t, db, "task-finish-disconnected", "transfer disconnected") + if !strings.Contains(w.Body.String(), `"upload_request_sent":false`) { + t.Fatalf("response body %q does not report unsent upload request", w.Body.String()) + } +} + type recordingFinishTransferHub struct { conn *services.TransferConn getDeviceID string @@ -510,16 +556,38 @@ func newTaskStateRecoveryDB(t *testing.T) *sqlx.DB { if _, err := db.Exec(`CREATE TABLE tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, + workstation_id INTEGER NULL, status TEXT NOT NULL, ready_at TIMESTAMP NULL, started_at TIMESTAMP NULL, completed_at TIMESTAMP NULL, + error_message TEXT NULL, created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL, deleted_at TIMESTAMP NULL )`); err != nil { t.Fatalf("create tasks schema: %v", err) } + if _, err := db.Exec(`CREATE TABLE robots ( + id INTEGER PRIMARY KEY, + device_id TEXT NOT NULL, + deleted_at TIMESTAMP NULL + )`); err != nil { + t.Fatalf("create robots schema: %v", err) + } + if _, err := db.Exec(`CREATE TABLE workstations ( + id INTEGER PRIMARY KEY, + robot_id INTEGER NOT NULL, + deleted_at TIMESTAMP NULL + )`); err != nil { + t.Fatalf("create workstations schema: %v", err) + } + if _, err := db.Exec(`INSERT INTO robots (id, device_id) VALUES (1, 'robot-001')`); err != nil { + t.Fatalf("seed robot: %v", err) + } + if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 1)`); err != nil { + t.Fatalf("seed workstation: %v", err) + } return db } @@ -527,7 +595,7 @@ func seedTaskStateRecoveryTask(t *testing.T, db *sqlx.DB, taskID string, status t.Helper() now := time.Now().UTC() if _, err := db.Exec( - `INSERT INTO tasks (task_id, status, created_at, updated_at) VALUES (?, ?, ?, ?)`, + `INSERT INTO tasks (task_id, workstation_id, status, created_at, updated_at) VALUES (?, 10, ?, ?, ?)`, taskID, status, now, @@ -548,6 +616,20 @@ func assertTaskStateRecoveryStatus(t *testing.T, db *sqlx.DB, taskID string, wan } } +func assertTaskStateRecoveryErrorContains(t *testing.T, db *sqlx.DB, taskID string, want string) { + t.Helper() + var got sql.NullString + if err := db.Get(&got, `SELECT error_message FROM tasks WHERE task_id = ?`, taskID); err != nil { + t.Fatalf("query task error_message: %v", err) + } + if !got.Valid { + t.Fatalf("task error_message is NULL, want substring %q", want) + } + if !strings.Contains(got.String, want) { + t.Fatalf("task error_message=%q want substring %q", got.String, want) + } +} + func assertTaskStateRecoveryTimestampSet(t *testing.T, db *sqlx.DB, taskID string, column string) { t.Helper() if column != "ready_at" && column != "started_at" { diff --git a/internal/api/handlers/task_uploading.go b/internal/api/handlers/task_uploading.go new file mode 100644 index 0000000..61ccff7 --- /dev/null +++ b/internal/api/handlers/task_uploading.go @@ -0,0 +1,151 @@ +// SPDX-FileCopyrightText: 2026 ArcheBase +// +// SPDX-License-Identifier: MulanPSL-2.0 + +package handlers + +import ( + "context" + "database/sql" + "strings" + "time" +) + +type taskStateExecutor interface { + ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) +} + +type taskStateQuerier interface { + GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error +} + +func currentOwnedTaskStatus(ctx context.Context, querier taskStateQuerier, deviceID, taskID string) (string, bool, error) { + if querier == nil { + return "", false, nil + } + var status string + err := querier.GetContext(ctx, &status, ` + SELECT t.status + FROM tasks t + JOIN workstations ws ON ws.id = t.workstation_id AND ws.deleted_at IS NULL + JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL + WHERE t.task_id = ? + AND t.deleted_at IS NULL + AND r.device_id = ? + LIMIT 1 + `, strings.TrimSpace(taskID), strings.TrimSpace(deviceID)) + if err == sql.ErrNoRows { + return "", false, nil + } + if err != nil { + return "", false, err + } + return strings.TrimSpace(status), true, nil +} + +func taskStatusLogValue(status, fallback string) string { + status = strings.TrimSpace(status) + if status == "" { + return fallback + } + return status +} + +func markOwnedTaskUploading(ctx context.Context, exec taskStateExecutor, deviceID, taskID string) (sql.Result, error) { + if exec == nil { + return nil, nil + } + now := time.Now().UTC() + return exec.ExecContext(ctx, ` + UPDATE tasks + SET + status = 'uploading', + updated_at = ?, + error_message = NULL + WHERE task_id = ? + AND status IN ('pending', 'ready', 'in_progress', 'uploading') + AND deleted_at IS NULL + AND EXISTS ( + SELECT 1 + FROM workstations ws + JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL + WHERE ws.id = tasks.workstation_id + AND ws.deleted_at IS NULL + AND r.device_id = ? + ) + `, now, strings.TrimSpace(taskID), strings.TrimSpace(deviceID)) +} + +func failOwnedUploadingTask(ctx context.Context, exec taskStateExecutor, deviceID, taskID, reason string) (sql.Result, error) { + if exec == nil { + return nil, nil + } + now := time.Now().UTC() + return exec.ExecContext(ctx, ` + UPDATE tasks + SET + status = 'failed', + completed_at = CASE WHEN completed_at IS NULL THEN ? ELSE completed_at END, + error_message = ?, + updated_at = ? + WHERE task_id = ? + AND status IN ('in_progress', 'uploading') + AND deleted_at IS NULL + AND EXISTS ( + SELECT 1 + FROM workstations ws + JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL + WHERE ws.id = tasks.workstation_id + AND ws.deleted_at IS NULL + AND r.device_id = ? + ) + `, now, strings.TrimSpace(reason), now, strings.TrimSpace(taskID), strings.TrimSpace(deviceID)) +} + +func writeOwnedUploadingTaskError(ctx context.Context, exec taskStateExecutor, deviceID, taskID, message string) (sql.Result, error) { + if exec == nil { + return nil, nil + } + now := time.Now().UTC() + return exec.ExecContext(ctx, ` + UPDATE tasks + SET + error_message = ?, + updated_at = ? + WHERE task_id = ? + AND status = 'uploading' + AND deleted_at IS NULL + AND EXISTS ( + SELECT 1 + FROM workstations ws + JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL + WHERE ws.id = tasks.workstation_id + AND ws.deleted_at IS NULL + AND r.device_id = ? + ) + `, strings.TrimSpace(message), now, strings.TrimSpace(taskID), strings.TrimSpace(deviceID)) +} + +func clearOwnedUploadingTaskError(ctx context.Context, exec taskStateExecutor, deviceID, taskID string) (sql.Result, error) { + if exec == nil { + return nil, nil + } + now := time.Now().UTC() + return exec.ExecContext(ctx, ` + UPDATE tasks + SET + error_message = NULL, + updated_at = ? + WHERE task_id = ? + AND status = 'uploading' + AND deleted_at IS NULL + AND EXISTS ( + SELECT 1 + FROM workstations ws + JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL + WHERE ws.id = tasks.workstation_id + AND ws.deleted_at IS NULL + AND r.device_id = ? + ) + `, now, strings.TrimSpace(taskID), strings.TrimSpace(deviceID)) +} diff --git a/internal/api/handlers/transfer.go b/internal/api/handlers/transfer.go index 3ebb650..ac9112a 100644 --- a/internal/api/handlers/transfer.go +++ b/internal/api/handlers/transfer.go @@ -86,7 +86,6 @@ func (h *TransferHandler) RegisterRoutes(apiV1 *gin.RouterGroup) { transfer.POST("/upload_all", h.UploadAll) transfer.POST("/status_query", h.StatusQuery) transfer.POST("/cancel", h.CancelUpload) - transfer.POST("/upload_ack", h.ManualACK) } } @@ -106,14 +105,14 @@ func (h *TransferHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request "SELECT COUNT(1) FROM robots WHERE device_id = ? AND deleted_at IS NULL", deviceID, ); err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(queryCtx.Err(), context.DeadlineExceeded) { - logger.Printf("[TRANSFER] Device %s: DB query timeout after %s (timeout_ms=%d): %v", deviceID, timeoutLogValue(queryTimeout), timeoutLogMilliseconds(queryTimeout), err) + logger.Printf("%s DB query timeout after %s (timeout_ms=%d): %v", transferLogPrefix(deviceID), timeoutLogValue(queryTimeout), timeoutLogMilliseconds(queryTimeout), err) } else { - logger.Printf("[TRANSFER] Device %s: DB query error: %v", deviceID, err) + logger.Printf("%s DB query error: %v", transferLogPrefix(deviceID), err) } } // Check count regardless of DB error (count defaults to 0 on error) if count == 0 { - logger.Printf("[TRANSFER] Device %s: robot not found in database", deviceID) + logger.Printf("%s robot not found in database", transferLogPrefix(deviceID)) w.WriteHeader(http.StatusNotFound) return } @@ -123,7 +122,7 @@ func (h *TransferHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request InsecureSkipVerify: true, // allow any origin in dev; tighten in production }) if err != nil { - logger.Printf("[TRANSFER] Device %s: WebSocket accept error: %v", deviceID, err) + logger.Printf("%s WebSocket accept error: %v", transferLogPrefix(deviceID), err) return } @@ -136,7 +135,7 @@ func (h *TransferHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request defer func() { if err := conn.Close(websocket.StatusNormalClosure, ""); err != nil { if !isExpectedWebSocketCloseError(err) { - logger.Printf("[TRANSFER] WebSocket close error for device %s: %v", deviceID, err) + logger.Printf("%s WebSocket close error: %v", transferLogPrefix(deviceID), err) } } }() @@ -153,7 +152,7 @@ func (h *TransferHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request go h.pingLoop(ctx, dc) // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Transfer %s connected from %s", deviceID, remoteIP) + logger.Printf("%s connected from %s", transferLogPrefix(deviceID), remoteIP) // Read loop: use ctx directly for infinite wait. // context.WithTimeout(ctx, 0) would set deadline=now and cause immediate timeout, @@ -163,14 +162,14 @@ func (h *TransferHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request _, raw, err := conn.Read(ctx) if err != nil { if !isExpectedWebSocketCloseError(err) { - logger.Printf("[TRANSFER] Device %s disconnected: %v", deviceID, err) + logger.Printf("%s disconnected: %v", transferLogPrefix(deviceID), err) } break } var msg map[string]interface{} if jsonErr := json.Unmarshal(raw, &msg); jsonErr != nil { - logger.Printf("[TRANSFER] Device %s: invalid JSON: %v", deviceID, jsonErr) + logger.Printf("%s invalid JSON: %v", transferLogPrefix(deviceID), jsonErr) continue } @@ -203,10 +202,10 @@ func (h *TransferHandler) pingLoop(ctx context.Context, dc *services.TransferCon cancel() if err != nil { if ctx.Err() == nil { - logWebSocketPingFailure("TRANSFER", "Device", dc.DeviceID, timeout, timedOut, err) + logWebSocketPingFailure("TRANSFER", dc.DeviceID, timeout, timedOut, err) if closeErr := dc.Conn.CloseNow(); closeErr != nil { if !isExpectedWebSocketCloseError(closeErr) { - logger.Printf("[TRANSFER] Device %s close after ping failure: %v", dc.DeviceID, closeErr) + logger.Printf("%s close after ping failure: %v", transferLogPrefix(dc.DeviceID), closeErr) } } } @@ -250,10 +249,10 @@ func transferMessageType(msg map[string]interface{}) string { func logTransferSendFailure(deviceID string, msgType string, timeout time.Duration, err error) { if errors.Is(err, services.ErrTransferWriteTimeout) { - logger.Printf("[TRANSFER] Device %s: send %s timed out after %s: %v", deviceID, msgType, timeoutLogValue(timeout), err) + logger.Printf("%s send %s timed out after %s: %v", transferLogPrefix(deviceID), msgType, timeoutLogValue(timeout), err) return } - logger.Printf("[TRANSFER] Device %s: failed to send %s: %v", deviceID, msgType, err) + logger.Printf("%s failed to send %s: %v", transferLogPrefix(deviceID), msgType, err) } func (h *TransferHandler) sendToDevice(c *gin.Context, deviceID string, msg map[string]interface{}) bool { @@ -274,10 +273,10 @@ func closeReplacedTransferConn(deviceID string, dc *services.TransferConn) { if dc == nil || dc.Conn == nil { return } - logger.Printf("[TRANSFER] Device %s: closing replaced WebSocket connection", deviceID) + logger.Printf("%s closing replaced WebSocket connection", transferLogPrefix(deviceID)) if err := dc.Conn.CloseNow(); err != nil { if !isExpectedWebSocketCloseError(err) { - logger.Printf("[TRANSFER] Device %s: replaced WebSocket close error: %v", deviceID, err) + logger.Printf("%s replaced WebSocket close error: %v", transferLogPrefix(deviceID), err) } } } @@ -285,7 +284,7 @@ func closeReplacedTransferConn(deviceID string, dc *services.TransferConn) { // handleMessage dispatches an inbound WebSocket message to the appropriate handler func (h *TransferHandler) handleMessage(ctx context.Context, dc *services.TransferConn, msg map[string]interface{}) { if h.hub.Get(dc.DeviceID) != dc { - logger.Printf("[TRANSFER] Device %s ignored message from replaced connection", dc.DeviceID) + logger.Printf("%s ignored message from replaced connection", transferLogPrefix(dc.DeviceID)) return } @@ -295,7 +294,7 @@ func (h *TransferHandler) handleMessage(ctx context.Context, dc *services.Transf case "connected": h.onConnected(dc, msg) case "upload_started": - h.onUploadStarted(dc, msg) + h.onUploadStarted(ctx, dc, msg) case "upload_progress": h.onUploadProgress(dc, msg) case "upload_complete": @@ -308,7 +307,7 @@ func (h *TransferHandler) handleMessage(ctx context.Context, dc *services.Transf h.onStatus(dc, msg) default: // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: unknown message type %q", dc.DeviceID, msgType) + logger.Printf("%s unknown message type %q", transferLogPrefix(dc.DeviceID), msgType) } } @@ -327,20 +326,29 @@ func (h *TransferHandler) onConnected(dc *services.TransferConn, msg map[string] } dc.UpdateStatus(s) // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Transfer %s connected: version=%s pending=%d uploading=%d failed=%d", - dc.DeviceID, s.Version, s.PendingCount, s.UploadingCount, s.FailedCount) + logger.Printf("%s connected: version=%s pending=%d uploading=%d failed=%d", + transferLogPrefix(dc.DeviceID), s.Version, s.PendingCount, s.UploadingCount, s.FailedCount) } // onUploadStarted handles "upload_started" message -func (h *TransferHandler) onUploadStarted(dc *services.TransferConn, msg map[string]interface{}) { +func (h *TransferHandler) onUploadStarted(ctx context.Context, dc *services.TransferConn, msg map[string]interface{}) { data, _ := msg["data"].(map[string]interface{}) if data == nil { return } taskID := stringVal(data, "task_id") + if h.db != nil && taskID != "" { + previousStatus, _, _ := currentOwnedTaskStatus(ctx, h.db, dc.DeviceID, taskID) + res, err := markOwnedTaskUploading(ctx, h.db, dc.DeviceID, taskID) + if err != nil { + logger.Printf("%s failed to mark task uploading after upload_started: err=%v", transferTaskLogPrefix(dc.DeviceID, taskID), err) + } else if n, _ := res.RowsAffected(); n > 0 { + logger.Printf("%s task status updated: %s -> uploading reason=upload_started", transferTaskLogPrefix(dc.DeviceID, taskID), taskStatusLogValue(previousStatus, "unknown")) + } + } // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: upload started task=%s total_bytes=%d", - dc.DeviceID, taskID, int64Val(data, "total_bytes")) + logger.Printf("%s upload started total_bytes=%d", + transferTaskLogPrefix(dc.DeviceID, taskID), int64Val(data, "total_bytes")) } // onUploadProgress handles "upload_progress" message @@ -352,7 +360,7 @@ func (h *TransferHandler) onUploadProgress(dc *services.TransferConn, msg map[st taskID := stringVal(data, "task_id") percent := intVal(data, "percent") // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: upload progress task=%s %d%%", dc.DeviceID, taskID, percent) + logger.Printf("%s upload progress %d%%", transferTaskLogPrefix(dc.DeviceID, taskID), percent) } // sidecarRecording is the subset of the sidecar JSON "recording" block we care about. @@ -434,26 +442,50 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra data, _ := msg["data"].(map[string]interface{}) if data == nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: upload complete data is nil", dc.DeviceID) + logger.Printf("%s upload complete data is nil", transferLogPrefix(dc.DeviceID)) return } taskID := stringVal(data, "task_id") if taskID == "" { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: upload complete taskID is empty", dc.DeviceID) + logger.Printf("%s upload complete taskID is empty", transferLogPrefix(dc.DeviceID)) return } // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: upload complete for task=%s", dc.DeviceID, taskID) + logger.Printf("%s upload complete", transferTaskLogPrefix(dc.DeviceID, taskID)) if h.s3 == nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: S3 not configured, skipping upload_complete for task=%s", dc.DeviceID, taskID) + logger.Printf("%s S3 not configured, skipping upload_complete", transferTaskLogPrefix(dc.DeviceID, taskID)) return } if h.db == nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: DB not configured, skipping upload_complete for task=%s", dc.DeviceID, taskID) + logger.Printf("%s DB not configured, skipping upload_complete", transferTaskLogPrefix(dc.DeviceID, taskID)) + return + } + + var ownedTask struct { + ID int64 `db:"id"` + Status string `db:"status"` + } + if err := h.db.GetContext(ctx, &ownedTask, ` + SELECT t.id, t.status + FROM tasks t + JOIN workstations ws ON ws.id = t.workstation_id AND ws.deleted_at IS NULL + JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL + WHERE t.task_id = ? AND r.device_id = ? AND t.deleted_at IS NULL + LIMIT 1 + `, taskID, dc.DeviceID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + logger.Printf("%s upload_complete ignored because task ownership check failed", transferTaskLogPrefix(dc.DeviceID, taskID)) + } else { + logger.Printf("%s upload_complete ownership lookup failed: err=%v", transferTaskLogPrefix(dc.DeviceID, taskID), err) + } + return + } + if ownedTask.Status == "failed" || ownedTask.Status == "cancelled" { + logger.Printf("%s upload_complete ignored for terminal status=%s", transferTaskLogPrefix(dc.DeviceID, taskID), ownedTask.Status) return } @@ -461,14 +493,14 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra mcapKey := uploadCompleteS3Key(data) if mcapKey == "" { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: upload_complete for task=%s missing s3_key, skipping ACK", dc.DeviceID, taskID) + logger.Printf("%s upload_complete missing s3_key, skipping ACK", transferTaskLogPrefix(dc.DeviceID, taskID)) return } jsonKey, ok := uploadCompleteSidecarS3Key(mcapKey) if !ok { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: upload_complete for task=%s has invalid MCAP s3_key=%q, cannot derive sidecar key, skipping ACK", dc.DeviceID, taskID, mcapKey) + logger.Printf("%s upload_complete has invalid MCAP s3_key=%q, cannot derive sidecar key, skipping ACK", transferTaskLogPrefix(dc.DeviceID, taskID), mcapKey) return } @@ -496,14 +528,14 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra if mcapErr != nil || jsonErr != nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: S3 HeadObject error", dc.DeviceID) + logger.Printf("%s S3 HeadObject error", transferTaskLogPrefix(dc.DeviceID, taskID)) return } if !mcapExists || !jsonExists { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: S3 files not found for task=%s, skipping ACK", - dc.DeviceID, taskID) + logger.Printf("%s S3 files not found, skipping ACK", + transferTaskLogPrefix(dc.DeviceID, taskID)) return } @@ -514,7 +546,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra tx, err := h.db.BeginTx(ctx, nil) if err != nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: DB begin transaction error for task=%s: %v", dc.DeviceID, taskID, err) + logger.Printf("%s DB begin transaction error: %v", transferTaskLogPrefix(dc.DeviceID, taskID), err) return } defer func() { @@ -527,7 +559,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra var taskPK int64 if err := tx.QueryRowContext(ctx, "SELECT id FROM tasks WHERE task_id = ? AND deleted_at IS NULL", taskID).Scan(&taskPK); err != nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: failed to resolve task id for task=%s: %v", dc.DeviceID, taskID, err) + logger.Printf("%s failed to resolve task id: %v", transferTaskLogPrefix(dc.DeviceID, taskID), err) return } @@ -536,7 +568,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra var batchIDForAdvance int64 if err := tx.QueryRowContext(ctx, "SELECT batch_id FROM tasks WHERE id = ? AND deleted_at IS NULL", taskPK).Scan(&batchIDForAdvance); err != nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: failed to resolve batch id for task=%s (task_pk=%d): %v", dc.DeviceID, taskID, taskPK, err) + logger.Printf("%s failed to resolve batch id (task_pk=%d): %v", transferTaskLogPrefix(dc.DeviceID, taskID), taskPK, err) batchIDForAdvance = 0 } @@ -545,7 +577,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra var orderIDForAdvance int64 if err := tx.QueryRowContext(ctx, "SELECT order_id FROM tasks WHERE id = ? AND deleted_at IS NULL", taskPK).Scan(&orderIDForAdvance); err != nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: failed to resolve order id for task=%s (task_pk=%d): %v", dc.DeviceID, taskID, taskPK, err) + logger.Printf("%s failed to resolve order id (task_pk=%d): %v", transferTaskLogPrefix(dc.DeviceID, taskID), taskPK, err) orderIDForAdvance = 0 } @@ -557,12 +589,12 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra ).Scan(&count) if err != nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: DB query error for task=%s: %v", dc.DeviceID, taskID, err) + logger.Printf("%s DB query error: %v", transferTaskLogPrefix(dc.DeviceID, taskID), err) return } if count > 0 { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: task=%s already exists in DB (by mcap_path or sidecar_path), skipping insert", dc.DeviceID, taskID) + logger.Printf("%s already exists in DB (by mcap_path or sidecar_path), skipping insert", transferTaskLogPrefix(dc.DeviceID, taskID)) } else { var taskRow struct { ID int64 `db:"id"` @@ -614,12 +646,12 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra if err == nil && existingEpisodeID == "" { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: data corruption: empty episode_id found for task_pk=%d task=%s", dc.DeviceID, taskRow.ID, taskID) + logger.Printf("%s data corruption: empty episode_id found for task_pk=%d", transferTaskLogPrefix(dc.DeviceID, taskID), taskRow.ID) return } if err != nil && !errors.Is(err, sql.ErrNoRows) { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: DB query failed for existing episode check task_pk=%d task=%s: %v", dc.DeviceID, taskRow.ID, taskID, err) + logger.Printf("%s DB query failed for existing episode check task_pk=%d: %v", transferTaskLogPrefix(dc.DeviceID, taskID), taskRow.ID, err) return } @@ -683,7 +715,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra ) if dbErr != nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: DB insert failed for task=%s: %v", dc.DeviceID, taskID, dbErr) + logger.Printf("%s DB insert failed: %v", transferTaskLogPrefix(dc.DeviceID, taskID), dbErr) return } @@ -694,7 +726,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra WHERE id = ? AND deleted_at IS NULL `, taskRow.BatchID); dbErr != nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: DB update failed for batch=%d task=%s: %v", dc.DeviceID, taskRow.BatchID, taskID, dbErr) + logger.Printf("%s DB update failed for batch=%d: %v", transferTaskLogPrefix(dc.DeviceID, taskID), taskRow.BatchID, dbErr) return } } @@ -703,7 +735,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra // Commit transaction if err := tx.Commit(); err != nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: DB commit error for task=%s: %v", dc.DeviceID, taskID, err) + logger.Printf("%s DB commit error: %v", transferTaskLogPrefix(dc.DeviceID, taskID), err) return } @@ -718,29 +750,35 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra return } // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: upload_ack sent for task=%s", dc.DeviceID, taskID) + logger.Printf("%s upload_ack sent", transferTaskLogPrefix(dc.DeviceID, taskID)) - // After upload_ack is sent, mark task as completed (pending, ready, or in_progress -> completed). - // pending is allowed when Keystone rolled the task back during a transient recorder disconnect, - // while the device kept recording and successfully uploaded the episode. + // After upload_ack is sent, mark task as completed. pending/ready/in_progress remain accepted + // for legacy weak-network recovery; uploading is the normal post-recording path. // Best-effort: do not affect the already-sent acknowledgement. now := time.Now().UTC() - if _, err := h.db.ExecContext(ctx, ` + res, err := h.db.ExecContext(ctx, ` UPDATE tasks SET status = 'completed', completed_at = CASE WHEN completed_at IS NULL THEN ? ELSE completed_at END, + error_message = NULL, updated_at = ? - WHERE id = ? AND status IN ('pending', 'in_progress', 'ready') AND deleted_at IS NULL - `, now, now, taskPK); err != nil { + WHERE id = ? AND status IN ('pending', 'ready', 'in_progress', 'uploading', 'completed') AND deleted_at IS NULL + `, now, now, taskPK) + if err != nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: failed to mark task pending/ready/in_progress->completed after upload_ack: task=%s err=%v", dc.DeviceID, taskID, err) + logger.Printf("%s failed to mark task completed after upload_ack: err=%v", transferTaskLogPrefix(dc.DeviceID, taskID), err) } else { - if batchIDForAdvance > 0 { + rowsAffected, _ := res.RowsAffected() + shouldAdvance := ownedTask.Status != "completed" && rowsAffected > 0 + if shouldAdvance { + logger.Printf("%s task status updated: %s -> completed reason=upload_ack", transferTaskLogPrefix(dc.DeviceID, taskID), taskStatusLogValue(ownedTask.Status, "unknown")) + } + if shouldAdvance && batchIDForAdvance > 0 { // Must run after the task row is terminal: tryAdvanceBatchStatus counts tasks in DB. go tryAdvanceBatchStatus(h.db, batchIDForAdvance) } - if orderIDForAdvance > 0 { + if shouldAdvance && orderIDForAdvance > 0 { go tryAdvanceOrderStatus(h.db, orderIDForAdvance, h.recorderHub, h.recorderRPCTimeout) } } @@ -758,17 +796,17 @@ func (h *TransferHandler) onUploadFailed(ctx context.Context, dc *services.Trans // Log full message for debugging // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Received from device %s: full message=%+v", dc.DeviceID, msg) + logger.Printf("%s received upload_failed full message=%+v", transferTaskLogPrefix(dc.DeviceID, taskID), msg) // Try to extract bucket info if present if bucket, ok := data["bucket"].(string); ok { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: task=%s bucket=%s reason=%q retries=%d", - dc.DeviceID, taskID, bucket, reason, retryCount) + logger.Printf("%s upload_failed bucket=%s reason=%q retries=%d", + transferTaskLogPrefix(dc.DeviceID, taskID), bucket, reason, retryCount) } else { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: task=%s reason=%q retries=%d", - dc.DeviceID, taskID, reason, retryCount) + logger.Printf("%s upload_failed reason=%q retries=%d", + transferTaskLogPrefix(dc.DeviceID, taskID), reason, retryCount) } // Log configured S3 bucket for comparison @@ -776,27 +814,23 @@ func (h *TransferHandler) onUploadFailed(ctx context.Context, dc *services.Trans logger.Printf("[TRANSFER] Keystone configured bucket: %s", h.s3.Bucket()) } - // Mark task as failed when upload_failed is received and task is in_progress. + // Mark task as failed when upload_failed is received and this transfer owns the task. if h.db == nil || taskID == "" { return } - now := time.Now().UTC() - result, err := h.db.ExecContext(ctx, ` - UPDATE tasks - SET - status = 'failed', - completed_at = CASE WHEN completed_at IS NULL THEN ? ELSE completed_at END, - updated_at = ? - WHERE task_id = ? AND status = 'in_progress' AND deleted_at IS NULL - `, now, now, taskID) + message := strings.TrimSpace(reason) + if message == "" { + message = "upload failed" + } + result, err := failOwnedUploadingTask(ctx, h.db, dc.DeviceID, taskID, message) if err != nil { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: failed to mark task failed on upload_failed: task=%s err=%v", dc.DeviceID, taskID, err) + logger.Printf("%s failed to mark task failed on upload_failed: err=%v", transferTaskLogPrefix(dc.DeviceID, taskID), err) return } if rows, _ := result.RowsAffected(); rows > 0 { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: task=%s marked as failed due to upload_failed", dc.DeviceID, taskID) + logger.Printf("%s marked as failed due to upload_failed", transferTaskLogPrefix(dc.DeviceID, taskID)) // Trigger batch status advancement since the task reached a terminal state. var batchID int64 if err := h.db.QueryRowContext(ctx, @@ -845,12 +879,12 @@ func revertRunnableTasksOnDeviceDisconnect(db *sqlx.DB, deviceID string, recorde `, deviceID) if err != nil { // #nosec G706 -- Set aside for now - logger.Printf("[DEVICE] Device %s: failed to query runnable tasks on disconnect: %v", deviceID, err) + logger.Printf("%s failed to query runnable tasks on disconnect: %v", deviceLogPrefix(deviceID), err) return } defer func() { if cerr := rows.Close(); cerr != nil { - logger.Printf("[DEVICE] Device %s: close rows after disconnect task query: %v", deviceID, cerr) + logger.Printf("%s close rows after disconnect task query: %v", deviceLogPrefix(deviceID), cerr) } }() @@ -864,13 +898,13 @@ func revertRunnableTasksOnDeviceDisconnect(db *sqlx.DB, deviceID string, recorde for rows.Next() { var ref taskRef if err := rows.Scan(&ref.id, &ref.taskID, &ref.batchID, &ref.status); err != nil { - logger.Printf("[DEVICE] Device %s: scan error during disconnect task query: %v", deviceID, err) + logger.Printf("%s scan error during disconnect task query: %v", deviceLogPrefix(deviceID), err) continue } toRevert = append(toRevert, ref) } if err := rows.Err(); err != nil { - logger.Printf("[DEVICE] Device %s: rows error during disconnect task query: %v", deviceID, err) + logger.Printf("%s rows error during disconnect task query: %v", deviceLogPrefix(deviceID), err) } if notifyRecorder && recorderHub != nil { @@ -887,7 +921,7 @@ func revertRunnableTasksOnDeviceDisconnect(db *sqlx.DB, deviceID string, recorde if errors.Is(err, services.ErrRecorderRPCTimeout) { logRecorderRPCTimeout(deviceID, "clear", tid, "transfer_disconnect", timeout, err) } else { - logger.Printf("[DEVICE] Device %s: recorder clear after transfer disconnect failed (task=%s): %v", deviceID, tid, err) + logger.Printf("%s recorder clear after transfer disconnect failed: %v", deviceTaskLogPrefix(deviceID, tid), err) } } case "in_progress": @@ -895,7 +929,7 @@ func revertRunnableTasksOnDeviceDisconnect(db *sqlx.DB, deviceID string, recorde if errors.Is(err, services.ErrRecorderRPCTimeout) { logRecorderRPCTimeout(deviceID, "cancel", tid, "transfer_disconnect", timeout, err) } else { - logger.Printf("[DEVICE] Device %s: recorder cancel after transfer disconnect failed (task=%s): %v", deviceID, tid, err) + logger.Printf("%s recorder cancel after transfer disconnect failed: %v", deviceTaskLogPrefix(deviceID, tid), err) } } } @@ -916,12 +950,12 @@ func revertRunnableTasksOnDeviceDisconnect(db *sqlx.DB, deviceID string, recorde `, now, ref.id) if err != nil { // #nosec G706 -- Set aside for now - logger.Printf("[DEVICE] Device %s: failed to revert task=%s to pending on disconnect: %v", deviceID, ref.taskID, err) + logger.Printf("%s failed to revert to pending on disconnect: %v", deviceTaskLogPrefix(deviceID, ref.taskID), err) continue } if affected, _ := result.RowsAffected(); affected > 0 { // #nosec G706 -- Set aside for now - logger.Printf("[DEVICE] Device %s: task=%s reverted to pending due to device disconnect", deviceID, ref.taskID) + logger.Printf("%s reverted to pending due to device disconnect", deviceTaskLogPrefix(deviceID, ref.taskID)) } } } @@ -932,22 +966,31 @@ func revertRunnableTasksOnDeviceDisconnect(db *sqlx.DB, deviceID string, recorde func (h *TransferHandler) onUploadNotFound(dc *services.TransferConn, msg map[string]interface{}) { data, _ := msg["data"].(map[string]interface{}) if data == nil { - logger.Printf("[TRANSFER] ERROR: Device %s reported upload_not_found with empty data", dc.DeviceID) + logger.Printf("%s ERROR: reported upload_not_found with empty data", transferLogPrefix(dc.DeviceID)) return } taskID := strings.TrimSpace(stringVal(data, "task_id")) detail := strings.TrimSpace(stringVal(data, "detail")) if taskID == "" { - logger.Printf("[TRANSFER] ERROR: Device %s reported upload_not_found without task_id detail=%q", dc.DeviceID, detail) + logger.Printf("%s ERROR: reported upload_not_found without task_id detail=%q", transferLogPrefix(dc.DeviceID), detail) return } - logger.Printf("[TRANSFER] ERROR: Device %s reported upload_not_found task=%s detail=%q", dc.DeviceID, taskID, detail) + message := "upload file not found" + if detail != "" { + message = detail + } + if h.db != nil { + if _, err := writeOwnedUploadingTaskError(context.Background(), h.db, dc.DeviceID, taskID, message); err != nil { + logger.Printf("%s ERROR: failed to write upload_not_found error: err=%v", transferTaskLogPrefix(dc.DeviceID, taskID), err) + } + } + logger.Printf("%s ERROR: reported upload_not_found detail=%q", transferTaskLogPrefix(dc.DeviceID, taskID), detail) } // onStatus handles "status" message and updates the device status snapshot func (h *TransferHandler) onStatus(dc *services.TransferConn, msg map[string]interface{}) { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: received status update", dc.DeviceID) + logger.Printf("%s received status update", transferLogPrefix(dc.DeviceID)) data, _ := msg["data"].(map[string]interface{}) if data == nil { return @@ -1020,6 +1063,11 @@ func (h *TransferHandler) UploadRequest(c *gin.Context) { if !h.sendToDevice(c, deviceID, msg) { return } + if h.db != nil { + if _, err := clearOwnedUploadingTaskError(c.Request.Context(), h.db, deviceID, body.TaskID); err != nil { + logger.Printf("%s failed to clear upload_request error: err=%v", transferTaskLogPrefix(deviceID, body.TaskID), err) + } + } c.JSON(http.StatusOK, gin.H{"status": "sent"}) } @@ -1035,29 +1083,29 @@ func (h *TransferHandler) UploadRequest(c *gin.Context) { func (h *TransferHandler) UploadAll(c *gin.Context) { deviceID := c.Param("device_id") - logger.Printf("[TRANSFER] Device %s: received upload_all request", deviceID) + logger.Printf("%s received upload_all request", transferLogPrefix(deviceID)) // Check if device is connected dc := h.hub.Get(deviceID) if dc == nil { - logger.Printf("[TRANSFER] Device %s: not connected", deviceID) + logger.Printf("%s not connected", transferLogPrefix(deviceID)) c.JSON(http.StatusNotFound, gin.H{"error": fmt.Sprintf("device %s not connected", deviceID)}) return } - logger.Printf("[TRANSFER] Device %s: connected, remote_ip=%s", deviceID, dc.RemoteIP) + logger.Printf("%s connected, remote_ip=%s", transferLogPrefix(deviceID), dc.RemoteIP) status := dc.GetStatus() - logger.Printf("[TRANSFER] Device %s: current status is pending=%d uploading=%d failed=%d waiting_ack=%d", - deviceID, status.PendingCount, status.UploadingCount, status.FailedCount, status.WaitingACKCount) + logger.Printf("%s current status is pending=%d uploading=%d failed=%d waiting_ack=%d", + transferLogPrefix(deviceID), status.PendingCount, status.UploadingCount, status.FailedCount, status.WaitingACKCount) msg := map[string]interface{}{"type": "upload_all"} - logger.Printf("[TRANSFER] Sending message to device %s: %+v", deviceID, msg) + logger.Printf("%s sending message: %+v", transferLogPrefix(deviceID), msg) if !h.sendToDevice(c, deviceID, msg) { return } - logger.Printf("[TRANSFER] Message sent successfully to device %s", deviceID) + logger.Printf("%s message sent successfully", transferLogPrefix(deviceID)) c.JSON(http.StatusOK, gin.H{"status": "sent"}) } @@ -1119,65 +1167,6 @@ func (h *TransferHandler) StatusQuery(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "sent"}) } -// ManualACK sends an upload_ack message to the device. -// -// @Summary Manually acknowledge an upload -// @Tags transfer -// @Accept json -// @Produce json -// @Param device_id path string true "Device ID" -// @Param body body object true "task_id to acknowledge" -// @Success 200 {object} map[string]interface{} -// @Failure 404 {object} map[string]interface{} -// @Router /transfer/{device_id}/upload_ack [post] -func (h *TransferHandler) ManualACK(c *gin.Context) { - deviceID := c.Param("device_id") - - var body struct { - TaskID string `json:"task_id" binding:"required"` - } - if err := c.ShouldBindJSON(&body); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - msg := map[string]interface{}{ - "type": "upload_ack", - "task_id": body.TaskID, - } - if !h.sendToDevice(c, deviceID, msg) { - return - } - - // After upload_ack is sent, mark task as completed (ready or in_progress -> completed). - // Best-effort: do not fail the acknowledgement response. - if h.db != nil { - now := time.Now().UTC() - if _, err := h.db.Exec( - `UPDATE tasks - SET - status = 'completed', - completed_at = CASE WHEN completed_at IS NULL THEN ? ELSE completed_at END, - updated_at = ? - WHERE task_id = ? AND status IN ('in_progress', 'ready') AND deleted_at IS NULL`, - now, now, body.TaskID, - ); err != nil { - // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: failed to mark task ready/in_progress->completed after manual upload_ack: task=%s err=%v", deviceID, body.TaskID, err) - } else { - var batchID int64 - if err := h.db.Get(&batchID, "SELECT batch_id FROM tasks WHERE task_id = ? AND deleted_at IS NULL LIMIT 1", body.TaskID); err == nil && batchID > 0 { - go tryAdvanceBatchStatus(h.db, batchID) - } - var orderID int64 - if err := h.db.Get(&orderID, "SELECT order_id FROM tasks WHERE task_id = ? AND deleted_at IS NULL LIMIT 1", body.TaskID); err == nil && orderID > 0 { - go tryAdvanceOrderStatus(h.db, orderID, h.recorderHub, h.recorderRPCTimeout) - } - } - } - c.JSON(http.StatusOK, gin.H{"status": "sent"}) -} - // extractIP extracts the IP address from a RemoteAddr string (host:port) func extractIP(remoteAddr string) string { host, _, err := net.SplitHostPort(remoteAddr) diff --git a/internal/api/handlers/transfer_connection_takeover_test.go b/internal/api/handlers/transfer_connection_takeover_test.go index aa553f0..caf44ee 100644 --- a/internal/api/handlers/transfer_connection_takeover_test.go +++ b/internal/api/handlers/transfer_connection_takeover_test.go @@ -74,14 +74,36 @@ func newTransferTakeoverDB(t *testing.T) *sqlx.DB { id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, batch_id INTEGER NOT NULL DEFAULT 0, + workstation_id INTEGER NULL, status TEXT NOT NULL, completed_at TIMESTAMP NULL, + error_message TEXT NULL, created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL, deleted_at TIMESTAMP NULL )`); err != nil { t.Fatalf("create tasks schema: %v", err) } + if _, err := db.Exec(`CREATE TABLE robots ( + id INTEGER PRIMARY KEY, + device_id TEXT NOT NULL, + deleted_at TIMESTAMP NULL + )`); err != nil { + t.Fatalf("create robots schema: %v", err) + } + if _, err := db.Exec(`CREATE TABLE workstations ( + id INTEGER PRIMARY KEY, + robot_id INTEGER NOT NULL, + deleted_at TIMESTAMP NULL + )`); err != nil { + t.Fatalf("create workstations schema: %v", err) + } + if _, err := db.Exec(`INSERT INTO robots (id, device_id) VALUES (1, 'robot-001')`); err != nil { + t.Fatalf("seed robot: %v", err) + } + if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 1)`); err != nil { + t.Fatalf("seed workstation: %v", err) + } return db } @@ -89,7 +111,7 @@ func seedTransferTakeoverTask(t *testing.T, db *sqlx.DB, taskID string, status s t.Helper() now := time.Now().UTC() if _, err := db.Exec( - `INSERT INTO tasks (task_id, status, created_at, updated_at) VALUES (?, ?, ?, ?)`, + `INSERT INTO tasks (task_id, workstation_id, status, created_at, updated_at) VALUES (?, 10, ?, ?, ?)`, taskID, status, now, diff --git a/internal/storage/database/migrations/000001_initial_schema.up.sql b/internal/storage/database/migrations/000001_initial_schema.up.sql index 9c6a889..667a6ce 100644 --- a/internal/storage/database/migrations/000001_initial_schema.up.sql +++ b/internal/storage/database/migrations/000001_initial_schema.up.sql @@ -290,7 +290,7 @@ CREATE TABLE IF NOT EXISTS tasks ( factory_id BIGINT COMMENT 'Denormalized: from workstation.factory_id for filtering', organization_id BIGINT COMMENT 'Denormalized: from factory.organization_id for filtering', initial_scene_layout TEXT, - status ENUM('pending', 'ready', 'in_progress', 'completed', 'failed', 'cancelled') DEFAULT 'pending', + status ENUM('pending', 'ready', 'in_progress', 'uploading', 'completed', 'failed', 'cancelled') DEFAULT 'pending', version INT DEFAULT 0 COMMENT 'Optimistic locking version', assigned_at TIMESTAMP NULL, ready_at TIMESTAMP NULL, diff --git a/internal/storage/database/migrations/000005_task_uploading_status.down.sql b/internal/storage/database/migrations/000005_task_uploading_status.down.sql new file mode 100644 index 0000000..7d4e11b --- /dev/null +++ b/internal/storage/database/migrations/000005_task_uploading_status.down.sql @@ -0,0 +1,10 @@ +-- SPDX-FileCopyrightText: 2026 ArcheBase +-- +-- SPDX-License-Identifier: MulanPSL-2.0 + +UPDATE tasks +SET status = 'in_progress' +WHERE status = 'uploading'; + +ALTER TABLE tasks + MODIFY COLUMN status ENUM('pending', 'ready', 'in_progress', 'completed', 'failed', 'cancelled') DEFAULT 'pending'; diff --git a/internal/storage/database/migrations/000005_task_uploading_status.up.sql b/internal/storage/database/migrations/000005_task_uploading_status.up.sql new file mode 100644 index 0000000..7721a66 --- /dev/null +++ b/internal/storage/database/migrations/000005_task_uploading_status.up.sql @@ -0,0 +1,6 @@ +-- SPDX-FileCopyrightText: 2026 ArcheBase +-- +-- SPDX-License-Identifier: MulanPSL-2.0 + +ALTER TABLE tasks + MODIFY COLUMN status ENUM('pending', 'ready', 'in_progress', 'uploading', 'completed', 'failed', 'cancelled') DEFAULT 'pending';