diff --git a/docs/designs/production-units.md b/docs/designs/production-units.md
index fb91fdd..11c52a4 100644
--- a/docs/designs/production-units.md
+++ b/docs/designs/production-units.md
@@ -162,7 +162,6 @@ This document does not restate full table schemas; it only defines key field sem
|------|------|------|
| GET | `/tasks` | List (filters: `workstation_id/status/limit/offset`) |
| GET | `/tasks/:id` | Detail (includes `episode` if linked) |
-| PUT | `/tasks/:id` | Status update (restricted transitions; see §6.2) |
| GET | `/tasks/:id/config` | Generate recorder config (requires workstation robot + collector bindings) |
---
@@ -184,12 +183,14 @@ This document does not restate full table schemas; it only defines key field sem
### 6.2 Task states
-- **State set**: `pending` | `ready` | `in_progress` | `completed` | `failed` | `cancelled`
-- **Prepare (pending→ready)**: triggered by UI/scheduler (currently via `PUT /tasks/:id`).
-- **Run (ready→in_progress)**: triggered by UI/device workflow (currently via `PUT /tasks/:id`).
+- **State set**: `pending` | `ready` | `in_progress` | `uploading` | `completed` | `failed` | `cancelled`
+- **Prepare (pending→ready)**: triggered by recorder config application (`config_applied` / ready state snapshot).
+- **Run (ready→in_progress)**: triggered by recorder start callback or recording state snapshot.
+- **Finish (in_progress→uploading)**: triggered by recorder finish callback; Keystone sends `upload_request` to Transfer when connected.
- **Transfer ACK**:
- - On verified upload ACK, Keystone marks task `in_progress -> completed` (only if currently `in_progress`).
- - On `upload_failed`, Keystone marks task `in_progress -> failed`.
+ - On verified upload ACK, Keystone sends `upload_ack`, then marks task `pending/ready/in_progress/uploading -> completed`.
+ - Duplicate ACKs for already `completed` tasks are allowed but must not re-advance batch/order state.
+ - On `upload_failed`, Keystone marks task `in_progress/uploading -> failed`.
- **Revert to pending (ready/in_progress→pending)**: used for recovery when Transfer disconnects (to avoid stuck runnable tasks).
### 6.3 Batch states
@@ -227,15 +228,14 @@ When the device reports `upload_complete`, Keystone runs the Verified ACK flow:
- **Idempotent**: if an Episode already exists for this `task_id`, do not insert again
- Insert into `episodes` (persist denormalized fields such as `batch_id/order_id/scene_id/...`)
- `batches.episode_count += 1` (only when a new Episode is inserted)
- - Update `tasks.status` to **`completed`** (and set `completed_at`) **only when current status is `in_progress`**
3. **Send `upload_ack`** to the device
+4. **Mark task completed**: update `tasks.status` to **`completed`** (and set `completed_at`) when current status is `pending/ready/in_progress/uploading`; already completed tasks remain idempotent
---
## 8. Known gaps and evolution
-- **In-recording state**: `callbacks/start` does not persist state today; `ready -> in_progress` validation/persistence is not implemented yet.
-- **Failure path**: an end-to-end `failed` terminal state and error attribution are not fully implemented (callbacks/transfer need to be extended).
+- **Failure path**: terminal `failed` handling exists for transfer failures; recorder callback failure attribution still needs tighter end-to-end coverage.
- **Quota consistency**:
- Dispatch quota is based on non-deleted task rows, not completed rows.
- New/bulk batch creation uses `remaining_assignable = target_count - order_task_count`.
diff --git a/docs/designs/task-manage.md b/docs/designs/task-manage.md
index bf48e63..67321b6 100644
--- a/docs/designs/task-manage.md
+++ b/docs/designs/task-manage.md
@@ -50,10 +50,11 @@ The Task state machine defines the complete lifecycle of a Task, with state tran
| State | Description | Valid Previous States | Valid Next States |
|-------|-------------|----------------------|-------------------|
| `pending` | Task created, awaiting workstation preparation | `ready`(cancel), `*`(new) | `ready`, `cancelled` |
-| `ready` | Data collector clicked "Make Ready", awaiting recording start | `pending` | `in_progress`, `pending`, `cancelled` |
-| `in_progress` | Recording in progress | `ready` | `completed`, `failed` |
-| `completed` | Recording completed successfully | `in_progress` | *(terminal)* |
-| `failed` | Recording failed | `in_progress` | *(terminal)* |
+| `ready` | Recorder applied TaskConfig, awaiting recording start | `pending` | `in_progress`, `pending`, `cancelled` |
+| `in_progress` | Recording in progress | `pending`, `ready` | `uploading`, `failed` |
+| `uploading` | Recording finished, waiting for Transfer upload ACK | `pending`, `ready`, `in_progress` | `completed`, `failed` |
+| `completed` | Recording uploaded and acknowledged successfully | `pending`, `ready`, `in_progress`, `uploading` | *(terminal)* |
+| `failed` | Recording or upload failed | `in_progress`, `uploading` | *(terminal)* |
| `cancelled` | Task cancelled | `pending`, `ready` | *(terminal)* |
### 2.2 State Transition Diagram
@@ -61,13 +62,14 @@ The Task state machine defines the complete lifecycle of a Task, with state tran
```mermaid
stateDiagram-v2
[*] --> pending: Order created/Task generated
- pending --> ready: Data Collector clicks "Make Ready"
+ pending --> ready: Recorder config_applied
pending --> cancelled: Task cancelled
ready --> in_progress: Axon POST /callbacks/start
- ready --> pending: Data Collector clicks "Unready"
+ ready --> pending: Transfer disconnect recovery
ready --> cancelled: Task cancelled
- in_progress --> completed: Axon POST /callbacks/finish (success)
- in_progress --> failed: Axon POST /callbacks/finish (failure)
+ in_progress --> uploading: Axon POST /callbacks/finish
+ uploading --> completed: Transfer upload_complete + upload_ack
+ uploading --> failed: Transfer upload_failed
completed --> [*]
failed --> [*]
cancelled --> [*]
@@ -77,7 +79,7 @@ stateDiagram-v2
#### pending → ready
-- **Trigger**: Data collector clicks "Make Ready" in Synapse UI
+- **Trigger**: Recorder confirms TaskConfig application (`config_applied` or ready state snapshot)
- **Validation**:
- Task status is `pending`
- Task is assigned to a Workstation
@@ -96,12 +98,21 @@ stateDiagram-v2
- Record `started_at` timestamp
- Record active ROS Topics
-#### in_progress → completed
+#### in_progress → uploading
- **Trigger**: Axon calls [`POST /callbacks/finish`](implementation/axon_teleoperation.md:1107) with `error == null`
- **Validation**: Task status is `in_progress`
+- **Side Effects**:
+ - Mark Task `uploading`
+ - Trigger Transfer `upload_request` when the device is connected
+
+#### uploading → completed
+
+- **Trigger**: Transfer reports `upload_complete`, Keystone verifies S3 objects, then sends `upload_ack`
+- **Validation**: Task status is `pending`, `ready`, `in_progress`, or `uploading`
- **Side Effects**:
- Record `completed_at` timestamp
+ - Clear upload error message
- Create Episode (`qa_status: pending_qa`)
- Trigger Edge Dagster QA Job
@@ -147,13 +158,13 @@ Batch-scoped Tasks are created for selected Workstations (status: pending)
### Phase 1: Task Preparation and TaskConfig Distribution
```
-Data Collector → Synapse UI clicks "Make Ready"
+Keystone → Dispatch TaskConfig to recorder
↓
- Synapse → PATCH /tasks/{id} {status: "ready"}
+ Axon recorder applies config
↓
Keystone → Task: pending → ready
↓
- Trigger notification for Axon to pull config
+ Axon keeps TaskConfig locally
↓
Axon → GET /tasks/{id}/config (Device token)
↓
@@ -253,7 +264,6 @@ Four weighted checks:
| `GET` | `/tasks` | List tasks (with filtering) | Synapse UI |
| `GET` | `/tasks/{id}` | Get task details | Synapse UI |
| `GET` | `/tasks/{id}/config` | Get task config (Axon pull) | Axon |
-| `PATCH` | `/tasks/{id}` | Update task status | Synapse UI |
| `POST` | `/tasks` | Create task (internal use) | Order Manager |
### 4.2 Callback Interfaces
@@ -388,8 +398,8 @@ Response 500:
**Operations**:
-- "Make Ready": `pending` → `ready`
-- "Unready": `ready` → `pending` (timeout or reset)
+- Synapse displays task status and creates tasks.
+- Task status transitions are driven by recorder callbacks/state snapshots and Transfer upload events.
### 5.3 Task and Dagster QA Interaction
@@ -444,7 +454,7 @@ CREATE TABLE tasks (
sop_id BIGINT NOT NULL,
-- Status
- status VARCHAR(32) NOT NULL DEFAULT 'pending' COMMENT 'pending|ready|in_progress|completed|failed|cancelled',
+ status VARCHAR(32) NOT NULL DEFAULT 'pending' COMMENT 'pending|ready|in_progress|uploading|completed|failed|cancelled',
-- Timestamps
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
diff --git a/docs/designs/task-uploading-status.zh.html b/docs/designs/task-uploading-status.zh.html
new file mode 100644
index 0000000..37ef4b3
--- /dev/null
+++ b/docs/designs/task-uploading-status.zh.html
@@ -0,0 +1,821 @@
+
+
+
+
+
+
+ Task Uploading 状态设计
+
+
+
+
+
+
+
+ 目标与边界
+
+
+ 目标
+ 保护已结束录制的任务
+ 收到合法 finish callback 后,task 立即进入 uploading,后续 Recorder 断连不再回退。
+
+
+ 非目标
+ 不设计上传调度器
+ 第一版不做自动重试、不做上传超时、不持久化上传进度、不新增 pending_upload 或 upload_stalled。
+
+
+ 约束
+ 区分生产与后处理
+ completed_at 仍只表示 task 到达终态;order/batch completed 可以早于部分 uploading 后处理完成。
+
+
+
+
+
+ 状态语义
+
+
+
pending
+
未下发,等待配置或执行。
+
+
+
ready
+
Recorder 已配置,等待开始录制。
+
+
+
in_progress
+
Recorder 正在录制,仍属于 Recorder 阶段。
+
+
+
uploading
+
录制已结束,上传后处理已接管或待接管。
+
+
+
completed
+
S3 验证、episode 写入、upload_ack 已完成。
+
+
+
+ 关键定义:uploading 不严格表示“字节已经开始上传”,而表示“Recorder 不再拥有该 task,任务进入上传后处理阶段”。它不计入 in_progress,也不占用工位生产状态。
+
+
+
+
+
+ | 状态 |
+ 是否终态 |
+ 是否允许回退到 pending |
+ 主要拥有方 |
+
+
+
+
+ pending |
+ 否 |
+ 不适用 |
+ Keystone |
+
+
+ ready |
+ 否 |
+ 允许,来自 clear、cancel、断连恢复 |
+ Recorder |
+
+
+ in_progress |
+ 否 |
+ 允许,来自 cancel、断连恢复 |
+ Recorder |
+
+
+ uploading |
+ 否 |
+ 禁止 |
+ Transfer / Keystone |
+
+
+ completed / failed / cancelled |
+ 是 |
+ 禁止 |
+ Keystone |
+
+
+
+
+
+
+ 设备归属校验
+ 所有来自 Recorder 或 Transfer 的设备事件,在修改 task 状态、写 error_message、写 episode 或发送 ACK 前,都必须确认事件来自拥有该 task 的设备。
+ tasks.task_id = message.task_id
+tasks.workstation_id = workstations.id
+workstations.robot_id = robots.id
+robots.device_id = callback.device_id 或当前 Transfer WebSocket 的 device_id
+
+
+
+ | 入口 |
+ 校验失败时 |
+
+
+
+
+ | finish callback / Recorder state reconcile |
+ 不改状态,只记录 warning/error;finish callback 返回可说明 task/device 不匹配。 |
+
+
+ upload_started / upload_failed / upload_not_found |
+ 不改状态,不写错误到该 task,只记录来源设备和 task_id。 |
+
+
+ upload_complete |
+ 不写 episode,不改 task,不发 upload_ack。 |
+
+
+
+
+
+
+ 事件规则
+
+
+
+ | 事件 |
+ 状态变更 |
+ 字段与响应 |
+
+
+
+
+ | 合法 finish callback |
+ pending/ready/in_progress/uploading -> uploading |
+ 归属校验通过后立即更新状态;不写 completed_at;之后尝试发送 upload_request。 |
+
+
+ upload_request 发送成功 |
+ 保持 uploading |
+ 清空 upload 相关 error_message;finish callback 返回 200,并带 upload_request_sent: true。 |
+
+
+ upload_request 发送失败 |
+ 保持 uploading |
+ 写入 error_message;合法 finish callback 仍返回 200,并带 upload_request_sent: false。 |
+
+
+ upload_started |
+ pending/ready/in_progress/uploading -> uploading |
+ 归属校验通过后幂等确认上传阶段;清空 upload 相关 error_message;不碰终态。 |
+
+
+ upload_complete 且 ACK 成功 |
+ pending/ready/in_progress/uploading -> completed |
+ 归属校验通过后写 episode;发送 upload_ack;写 completed_at 并清空 error_message。 |
+
+
+ 重复 upload_complete 且 task 已 completed |
+ 保持 completed |
+ 允许幂等发送 upload_ack;不重复写 episode、不改 completed_at、不重复推进 batch/order。 |
+
+
+ upload_failed |
+ in_progress/uploading -> failed |
+ 归属校验通过后写 completed_at 和 error_message;不回滚已 completed 的 order/batch。 |
+
+
+ upload_not_found |
+ 保持 uploading |
+ 归属校验通过后只写 error_message;不自动失败;不触发 batch/order 推进。 |
+
+
+ | 重复 finish callback |
+ 保持 uploading |
+ 幂等处理;不写 completed_at;重新尝试 upload_request;根据发送结果清空或更新错误。 |
+
+
+ | 终态收到迟到事件 |
+ failed/cancelled 不恢复 |
+ failed/cancelled 收到 upload_complete 时不改状态、不发 ACK;只记录日志。 |
+
+
+
+
+ 状态机约束
+ pending -> ready -> in_progress -> uploading -> completed
+ | |
+ | -> failed
+ -> failed
+
+ready/in_progress -> pending 仅限 Recorder 阶段回退
+uploading -> pending 禁止
+failed -> uploading/completed 第一版禁止自动恢复
+
+ 第一版允许 task 长期停留在 uploading。该状态表示上传后处理残留,不阻塞 order/batch 的生产侧完成;后续如需收敛,由自动超时方案处理。
+
+
+
+
+ 断连与回退规则
+
+
+
+ | 触发源 |
+ 允许回退 |
+ 禁止影响 |
+
+
+
+
+ | Recorder 普通 disconnect |
+ ready/in_progress -> pending |
+ uploading、终态 |
+
+
+ | Recorder ping timeout |
+ ready/in_progress -> pending |
+ uploading、终态 |
+
+
+ | Transfer disconnect |
+ ready/in_progress -> pending,并可通知 Recorder clear/cancel |
+ uploading、终态 |
+
+
+ | Recorder state reconcile |
+ pending -> ready 或 pending/ready -> in_progress |
+ 不能把 uploading 拉回 ready 或 in_progress |
+
+
+ | Recorder clear/cancel RPC |
+ ready/in_progress -> pending |
+ uploading、终态 |
+
+
+
+
+
+
+ 接口调整
+
+ - 移除普通任务状态更新接口
PUT /tasks/:id;task 状态由系统事件和 batch cancel/recall 驱动。
+ - 移除手动 ACK 接口
POST /transfer/{device_id}/upload_ack;ACK 只由 upload_complete 验证路径自动发送。
+ - 手动
/transfer/{device_id}/upload_request 不负责把非 uploading 的任务推进到 uploading。
+ - 如果 task 已是
uploading,手动 upload_request 发送成功后清空 error_message。
+ - 第一版不做
uploading 自动超时,不自动把卡住的上传改为 failed。
+ - 第一版不新增管理员手动失败接口;
uploading -> failed 只来自 Transfer 的 upload_failed。
+
+
+
+
+ 批次与订单
+
+
+
+ | 规则 |
+ 结论 |
+
+
+
+
+ uploading 是否终态 |
+ 不是终态,但不永久阻塞 order/batch 的生产侧收敛。 |
+
+
+ 正常 batch active -> completed |
+ 仍要求所有 task 进入 completed/failed/cancelled。 |
+
+
+ | order 达标收尾 |
+ order 可以 completed;open batch 可 completed;保留 uploading task 继续后处理。 |
+
+
+ | batch cancel / recall |
+ 只取消 pending/ready/in_progress;不取消 uploading。 |
+
+
+ | completed order/batch 下的后处理 |
+ uploading -> completed/failed 只更新 task、episode 和统计;不回滚 order/batch。 |
+
+
+ | 自动准备下一条任务 |
+ uploading 不阻塞同工位准备下一条;但需要 Recorder fresh idle 且 Transfer 在线。 |
+
+
+ | 工位占用 |
+ uploading 不改变工位生产占用语义,工位可用性仍看 Recorder 状态和设备连接。 |
+
+
+
+
+
+
+ 数据库与迁移
+
+ tasks.status 增加枚举值 uploading。
+ - 只做 schema 迁移,不批量回填历史
in_progress 任务。
+ - 不新增
upload_started_at 字段。
+ - finish callback 进入
uploading 时不写 completed_at。
+ upload_complete + upload_ack 或 upload_failed 到达终态时写 completed_at。
+ GET /tasks 和 GET /tasks/:id 返回 error_message,用于展示上传调度异常。
+ uploading 和 completed task 禁止删除;长期 uploading 后续由自动超时方案收敛。
+
+
+
+
+ 前端与大屏
+
+
+
+ | 位置 |
+ 调整 |
+
+
+
+
+ | 任务列表 / 详情 |
+ uploading 显示为“上传中”,颜色复用进行中或同步中的蓝色系;详情和列表可展示 error_message。 |
+
+
+ | 生产大屏生命周期轨 |
+ 改为 5 节点:pending -> ready -> in_progress -> uploading -> completed。 |
+
+
+ | 大屏后端统计 |
+ dashboardTaskCounts、summary、trend、batch task summary、status distribution 都单独处理 uploading。 |
+
+
+ | 趋势与统计卡 |
+ uploading 不计入 pending 或 in_progress,避免双算或误读。 |
+
+
+ | recent tasks |
+ status_text 对 uploading 返回“上传中”。 |
+
+
+ | 生产语义 |
+ order/batch completed 下仍可显示 uploading task;这是允许的上传后处理状态。 |
+
+
+
+
+
+
+ 测试清单
+
+
+
+ | 类别 |
+ 必须覆盖的行为 |
+
+
+
+
+ | finish callback |
+ 归属校验通过的合法 callback 立即把 pending/ready/in_progress/uploading 改为或保持 uploading,并返回 200。 |
+
+
+ | 上传请求失败 |
+ Transfer 不在线或写失败时保持 uploading,写 error_message,响应 upload_request_sent: false。 |
+
+
+ | 设备归属 |
+ callback、upload_started、upload_complete、upload_failed、upload_not_found 的 device mismatch 都不能改 task。 |
+
+
+ | 断连恢复 |
+ Recorder disconnect、Recorder ping timeout、Transfer disconnect 都不能把 uploading 回退到 pending。 |
+
+
+ | 上传完成 |
+ upload_complete + upload_ack 把 uploading 改为 completed;已 completed 的重复 complete 可幂等 ACK。 |
+
+
+ | 上传失败 |
+ upload_failed 把 uploading 改为 failed,并写终态时间和错误;迟到 complete 不恢复 failed。 |
+
+
+ | 未找到上传 |
+ upload_not_found 保持 uploading,只更新 error_message。 |
+
+
+ | 接口移除 |
+ PUT /tasks/:id 和 POST /transfer/{device_id}/upload_ack 不再可用。 |
+
+
+ | 删除与 API 展示 |
+ uploading 禁止删除;任务 API 返回 error_message。 |
+
+
+ | 批次与订单 |
+ order 达标可 completed;open batch 可 completed;uploading 保留后处理且不影响工位占用。 |
+
+
+ | 大屏统计 |
+ 状态分布、summary、trend、batch summary、recent tasks、5 节点生命周期轨都能正确展示 uploading。 |
+
+
+ | 长期上传中 |
+ 第一版允许 task 长期停留 uploading,不阻塞 order/batch 生产侧完成。 |
+
+
+
+
+
+
+
diff --git a/docs/designs/upload-service-design.md b/docs/designs/upload-service-design.md
index ac3142f..6ab9b52 100644
--- a/docs/designs/upload-service-design.md
+++ b/docs/designs/upload-service-design.md
@@ -311,10 +311,7 @@ POST /api/v1/transfer/{device_id}/status_query
#### Manual ACK
-```
-POST /api/v1/transfer/{device_id}/upload_ack
-{"task_id": "task_abc123"}
-```
+Removed. Keystone now sends `upload_ack` only from the verified `upload_complete` path.
## Security Considerations
diff --git a/internal/api/handlers/axon_rpc.go b/internal/api/handlers/axon_rpc.go
index 68f34fe..92a4bd5 100644
--- a/internal/api/handlers/axon_rpc.go
+++ b/internal/api/handlers/axon_rpc.go
@@ -152,14 +152,14 @@ func (h *RecorderHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request
"SELECT COUNT(1) FROM robots WHERE device_id = ? AND deleted_at IS NULL", deviceID,
); err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(queryCtx.Err(), context.DeadlineExceeded) {
- logger.Printf("[RECORDER] Device %s: DB query timeout after %s (timeout_ms=%d): %v", deviceID, timeoutLogValue(queryTimeout), timeoutLogMilliseconds(queryTimeout), err)
+ logger.Printf("%s DB query timeout after %s (timeout_ms=%d): %v", recorderLogPrefix(deviceID), timeoutLogValue(queryTimeout), timeoutLogMilliseconds(queryTimeout), err)
} else {
- logger.Printf("[RECORDER] Device %s: DB query error: %v", deviceID, err)
+ logger.Printf("%s DB query error: %v", recorderLogPrefix(deviceID), err)
}
}
// Check count regardless of DB error (count defaults to 0 on error)
if count == 0 {
- logger.Printf("[RECORDER] Device %s: robot not found in database", deviceID)
+ logger.Printf("%s robot not found in database", recorderLogPrefix(deviceID))
w.WriteHeader(http.StatusNotFound)
return
}
@@ -168,7 +168,7 @@ func (h *RecorderHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request
// Allow any origin in dev; tighten in production
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{InsecureSkipVerify: true})
if err != nil {
- logger.Printf("[RECORDER] Device %s: WebSocket accept error: %v", deviceID, err)
+ logger.Printf("%s WebSocket accept error: %v", recorderLogPrefix(deviceID), err)
return
}
@@ -181,7 +181,7 @@ func (h *RecorderHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request
defer func() {
if err := conn.Close(websocket.StatusNormalClosure, ""); err != nil {
if !isExpectedWebSocketCloseError(err) {
- logger.Printf("[RECORDER] Device %s: WebSocket close error: %v", deviceID, err)
+ logger.Printf("%s WebSocket close error: %v", recorderLogPrefix(deviceID), err)
}
}
}()
@@ -196,14 +196,14 @@ func (h *RecorderHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request
go h.pingLoop(ctx, rc)
// #nosec G706 -- Set aside for now
- logger.Printf("[RECORDER] Recorder %s connected from %s", deviceID, remoteIP)
+ logger.Printf("%s connected from %s", recorderLogPrefix(deviceID), remoteIP)
go h.syncRecorderStateFromDevice(ctx, rc)
for {
_, raw, err := conn.Read(ctx)
if err != nil {
if !isExpectedWebSocketCloseError(err) {
- logger.Printf("[RECORDER] Recorder %s disconnected: %v", deviceID, err)
+ logger.Printf("%s disconnected: %v", recorderLogPrefix(deviceID), err)
}
return
}
@@ -212,7 +212,7 @@ func (h *RecorderHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request
var msg map[string]interface{}
if err := json.Unmarshal(raw, &msg); err != nil {
- logger.Printf("[RECORDER] Recorder %s invalid JSON: %v", deviceID, err)
+ logger.Printf("%s invalid JSON: %v", recorderLogPrefix(deviceID), err)
continue
}
@@ -294,12 +294,12 @@ func (h *RecorderHandler) Begin(c *gin.Context) {
// pending is allowed because recorder may preserve ready state across a transient
// WebSocket disconnect while Keystone has already rolled the task back.
if taskID != "" && h.db != nil {
- res, err := advanceTaskPendingOrReadyToInProgress(h.db, taskID)
+ rowsAffected, _, err := advanceTaskPendingOrReadyToInProgress(h.db, taskID)
if err != nil {
- logger.Printf("[RECORDER] Device %s: failed to advance task pending/ready->in_progress after begin: task=%s err=%v", c.Param("device_id"), taskID, err)
+ logger.Printf("%s failed to advance task pending/ready->in_progress after begin: err=%v", recorderTaskLogPrefix(c.Param("device_id"), taskID), err)
return
}
- if n, _ := res.RowsAffected(); n == 0 {
+ if rowsAffected == 0 {
h.logBeginTransitionNoop(c.Param("device_id"), taskID)
}
}
@@ -406,12 +406,12 @@ func (h *RecorderHandler) Cancel(c *gin.Context) {
now, taskID,
)
if err != nil {
- logger.Printf("[RECORDER] Device %s: failed to revert task after cancel RPC: task=%s err=%v", deviceID, taskID, err)
+ logger.Printf("%s failed to revert task after cancel RPC: err=%v", recorderTaskLogPrefix(deviceID, taskID), err)
return
}
n, _ := res.RowsAffected()
if n == 0 {
- logger.Printf("[RECORDER] Device %s: task revert skipped after cancel RPC (not found or not ready/in_progress): task=%s", deviceID, taskID)
+ logger.Printf("%s task revert skipped after cancel RPC (not found or not ready/in_progress)", recorderTaskLogPrefix(deviceID, taskID))
}
}
}
@@ -461,11 +461,11 @@ func (h *RecorderHandler) Clear(c *gin.Context) {
now, taskID,
)
if err != nil {
- logger.Printf("[RECORDER] Device %s: failed to revert task ready->pending after clear: task=%s err=%v", c.Param("device_id"), taskID, err)
+ logger.Printf("%s failed to revert task ready->pending after clear: err=%v", recorderTaskLogPrefix(c.Param("device_id"), taskID), err)
return
}
if n, _ := res.RowsAffected(); n == 0 {
- logger.Printf("[RECORDER] Device %s: task ready->pending skipped after clear (not found or not ready): task=%s", c.Param("device_id"), taskID)
+ logger.Printf("%s task ready->pending skipped after clear (not found or not ready)", recorderTaskLogPrefix(c.Param("device_id"), taskID))
}
}
}
@@ -957,7 +957,7 @@ func recorderPreviousStateFromRaw(raw map[string]interface{}) string {
func (h *RecorderHandler) handleMessage(deviceID string, rc *services.RecorderConn, msg map[string]interface{}) {
if h.hub.Get(deviceID) != rc {
- logger.Printf("[RECORDER] Recorder %s ignored message from replaced connection", deviceID)
+ logger.Printf("%s ignored message from replaced connection", recorderLogPrefix(deviceID))
return
}
@@ -969,16 +969,16 @@ func (h *RecorderHandler) handleMessage(deviceID string, rc *services.RecorderCo
h.handleStateUpdate(rc, msg)
case "connected":
// #nosec G706 -- Set aside for now
- logger.Printf("[RECORDER] Recorder %s sent connected event", deviceID)
+ logger.Printf("%s sent connected event", recorderLogPrefix(deviceID))
case "config_applied":
data := mapValue(msg, "data")
taskID := stringValue(data, "task_id")
// #nosec G706 -- Set aside for now
- logger.Printf("[RECORDER] Recorder %s config applied task=%s", deviceID, taskID)
+ logger.Printf("%s config applied", recorderTaskLogPrefix(deviceID, taskID))
advanceTaskPendingToReady(h.db, deviceID, taskID, "config_applied")
default:
// #nosec G706 -- Set aside for now
- logger.Printf("[RECORDER] Recorder %s unknown message type %q", deviceID, msgType)
+ logger.Printf("%s unknown message type %q", recorderLogPrefix(deviceID), msgType)
}
}
@@ -991,7 +991,7 @@ func (h *RecorderHandler) handleRPCResponse(deviceID string, msg map[string]inte
Data: mapValue(msg, "data"),
}
if !h.hub.HandleRPCResponse(deviceID, response) {
- logger.Printf("[RECORDER] Recorder %s unmatched response request_id=%s", deviceID, response.RequestID)
+ logger.Printf("%s unmatched response request_id=%s", recorderLogPrefix(deviceID), response.RequestID)
}
}
@@ -1015,7 +1015,7 @@ func (h *RecorderHandler) syncRecorderStateFromDevice(ctx context.Context, rc *s
if _, _, err := h.refreshRecorderState(ctx, rc.DeviceID, rc, -1); err != nil {
if !errors.Is(err, services.ErrRecorderNotConnected) && !errors.Is(err, context.Canceled) {
- logger.Printf("[RECORDER] Recorder %s get_state after connect failed: %v", rc.DeviceID, err)
+ logger.Printf("%s get_state after connect failed: %v", recorderLogPrefix(rc.DeviceID), err)
}
}
}
@@ -1024,6 +1024,7 @@ func (h *RecorderHandler) applyRecorderStateSnapshot(rc *services.RecorderConn,
if rc == nil {
return nil
}
+ previous := rc.GetState()
state.Source = strings.TrimSpace(source)
state.TaskID = strings.TrimSpace(state.TaskID)
if !recorderStateKeepsTaskID(state.CurrentState) {
@@ -1035,22 +1036,115 @@ func (h *RecorderHandler) applyRecorderStateSnapshot(rc *services.RecorderConn,
if state.TaskID == "" && recorderStateRequiresTaskID(state.CurrentState, state.Source) {
err := fmt.Errorf("recorder %s snapshot missing task_id for state=%s", state.Source, strings.TrimSpace(state.CurrentState))
h.markRecorderSyncing(rc, state.Source, err)
- logger.Printf("[RECORDER] Recorder %s ignored %s state=%s without task_id", rc.DeviceID, state.Source, state.CurrentState)
+ logger.Printf("%s ignored %s state=%s without task_id", recorderLogPrefix(rc.DeviceID), state.Source, state.CurrentState)
return err
}
rc.UpdateState(state)
st := rc.GetState()
h.reconcileRecorderTaskState(rc.DeviceID, st, source)
h.publishRecorderStateSnapshot(rc, st, source, true)
- if source == "state_update" {
- // #nosec G706 -- Set aside for now
- logger.Printf("[RECORDER] Recorder %s state=%s task=%s", rc.DeviceID, st.CurrentState, st.TaskID)
- return nil
+ if recorderStateSnapshotChanged(previous, st) {
+ logRecorderStateChange(rc.DeviceID, previous, st, source)
}
- logger.Printf("[RECORDER] Recorder %s state=%s task=%s source=%s", rc.DeviceID, st.CurrentState, st.TaskID, source)
return nil
}
+func recorderStateSnapshotChanged(previous services.RecorderState, next services.RecorderState) bool {
+ return !strings.EqualFold(strings.TrimSpace(previous.CurrentState), strings.TrimSpace(next.CurrentState)) ||
+ strings.TrimSpace(previous.TaskID) != strings.TrimSpace(next.TaskID)
+}
+
+func logRecorderStateChange(deviceID string, previous services.RecorderState, next services.RecorderState, source string) {
+ prefix := recorderStateLogPrefix(deviceID, previous.TaskID, next.TaskID)
+ previousState := recorderLogState(previous.CurrentState)
+ nextState := recorderLogState(next.CurrentState)
+ logSource := recorderLogSource(source)
+ if !strings.EqualFold(strings.TrimSpace(previous.CurrentState), strings.TrimSpace(next.CurrentState)) {
+ logger.Printf("%s state changed: %s -> %s source=%s", prefix, previousState, nextState, logSource)
+ return
+ }
+ logger.Printf("%s task changed: %s -> %s state=%s source=%s",
+ prefix,
+ recorderLogTaskID(previous.TaskID),
+ recorderLogTaskID(next.TaskID),
+ nextState,
+ logSource,
+ )
+}
+
+func recorderLogState(state string) string {
+ state = strings.TrimSpace(state)
+ if state == "" {
+ return "-"
+ }
+ return state
+}
+
+func recorderLogTaskID(taskID string) string {
+ taskID = strings.TrimSpace(taskID)
+ if taskID == "" {
+ return "-"
+ }
+ return taskID
+}
+
+func recorderLogSource(source string) string {
+ source = strings.TrimSpace(source)
+ if source == "" {
+ return "state_update"
+ }
+ return source
+}
+
+func recorderLogPrefix(deviceID string) string {
+ return logPrefixWithTask("RECORDER", deviceID, "")
+}
+
+func recorderTaskLogPrefix(deviceID, taskID string) string {
+ return logPrefixWithTask("RECORDER", deviceID, taskID)
+}
+
+func recorderStateLogPrefix(deviceID, previousTaskID, nextTaskID string) string {
+ taskID := strings.TrimSpace(nextTaskID)
+ if taskID == "" {
+ taskID = strings.TrimSpace(previousTaskID)
+ }
+ return recorderTaskLogPrefix(deviceID, taskID)
+}
+
+func transferLogPrefix(deviceID string) string {
+ return logPrefixWithTask("TRANSFER", deviceID, "")
+}
+
+func transferTaskLogPrefix(deviceID, taskID string) string {
+ return logPrefixWithTask("TRANSFER", deviceID, taskID)
+}
+
+func deviceLogPrefix(deviceID string) string {
+ return logPrefixWithTask("DEVICE", deviceID, "")
+}
+
+func deviceTaskLogPrefix(deviceID, taskID string) string {
+ return logPrefixWithTask("DEVICE", deviceID, taskID)
+}
+
+func logPrefixWithTask(component, deviceID, taskID string) string {
+ component = strings.TrimSpace(component)
+ if component == "" {
+ component = "DEVICE"
+ }
+ deviceID = strings.TrimSpace(deviceID)
+ taskID = strings.TrimSpace(taskID)
+ prefix := "[" + component + "]"
+ if deviceID != "" {
+ prefix += "[" + deviceID + "]"
+ }
+ if taskID != "" {
+ prefix += "[" + taskID + "]"
+ }
+ return prefix
+}
+
func recorderStateFromRPCData(data map[string]interface{}) services.RecorderState {
state := services.RecorderState{
CurrentState: firstNonEmptyString(data, "state", "current", "current_state"),
@@ -1116,15 +1210,15 @@ func (h *RecorderHandler) reconcileRecorderTaskState(deviceID string, state serv
currentState := strings.ToLower(strings.TrimSpace(state.CurrentState))
switch currentState {
case "ready":
- advanceTaskPendingToReady(h.db, deviceID, taskID, source+" ready")
+ advanceTaskPendingToReady(h.db, deviceID, taskID, source+"_ready")
case "recording", "paused":
- res, err := advanceTaskPendingOrReadyToInProgress(h.db, taskID)
+ rowsAffected, previousStatus, err := advanceTaskPendingOrReadyToInProgress(h.db, taskID)
if err != nil {
- logger.Printf("[RECORDER] Recorder %s: failed to advance task pending/ready->in_progress after %s %s: task=%s err=%v", deviceID, source, currentState, taskID, err)
+ logger.Printf("%s failed to advance task pending/ready->in_progress after %s %s: err=%v", recorderTaskLogPrefix(deviceID, taskID), source, currentState, err)
return
}
- if n, _ := res.RowsAffected(); n > 0 {
- logger.Printf("[RECORDER] Device %s: task status reconciled: task=%s source=%s_%s status=in_progress", deviceID, taskID, source, currentState)
+ if rowsAffected > 0 {
+ logger.Printf("%s task status updated: %s -> in_progress reason=%s_%s", recorderTaskLogPrefix(deviceID, taskID), taskStatusLogValue(previousStatus, "unknown"), source, currentState)
}
}
}
@@ -1134,6 +1228,7 @@ func advanceTaskPendingToReady(db *sqlx.DB, deviceID, taskID, source string) {
if db == nil || taskID == "" {
return
}
+ previousStatus, _, _ := currentTaskStatus(db, taskID)
now := time.Now().UTC()
res, err := db.Exec(
`UPDATE tasks
@@ -1145,44 +1240,51 @@ func advanceTaskPendingToReady(db *sqlx.DB, deviceID, taskID, source string) {
now, now, taskID,
)
if err != nil {
- logger.Printf("[RECORDER] Device %s: failed to advance task pending->ready after %s: task=%s err=%v", deviceID, source, taskID, err)
+ logger.Printf("%s failed to advance task pending->ready after %s: err=%v", recorderTaskLogPrefix(deviceID, taskID), source, err)
return
}
if n, _ := res.RowsAffected(); n > 0 {
- logger.Printf("[RECORDER] Device %s: task status reconciled: task=%s source=%s status=ready", deviceID, taskID, source)
+ logger.Printf("%s task status updated: %s -> ready reason=%s", recorderTaskLogPrefix(deviceID, taskID), taskStatusLogValue(previousStatus, "unknown"), source)
}
}
-func advanceTaskPendingOrReadyToInProgress(db *sqlx.DB, taskID string) (sql.Result, error) {
+func advanceTaskPendingOrReadyToInProgress(db *sqlx.DB, taskID string) (int64, string, error) {
if db == nil {
- return nil, nil
+ return 0, "", nil
}
+ taskID = strings.TrimSpace(taskID)
+ previousStatus, _, _ := currentTaskStatus(db, taskID)
now := time.Now().UTC()
- return db.Exec(
+ res, err := db.Exec(
`UPDATE tasks
SET
status = 'in_progress',
started_at = CASE WHEN started_at IS NULL THEN ? ELSE started_at END,
updated_at = ?
WHERE task_id = ? AND status IN ('pending', 'ready') AND deleted_at IS NULL`,
- now, now, strings.TrimSpace(taskID),
+ now, now, taskID,
)
+ if err != nil {
+ return 0, previousStatus, err
+ }
+ rowsAffected, _ := res.RowsAffected()
+ return rowsAffected, previousStatus, nil
}
func (h *RecorderHandler) logBeginTransitionNoop(deviceID, taskID string) {
status, ok, err := currentTaskStatus(h.db, taskID)
if err != nil {
- logger.Printf("[RECORDER] Device %s: task status lookup failed after begin: task=%s err=%v", deviceID, taskID, err)
+ logger.Printf("%s task status lookup failed after begin: err=%v", recorderTaskLogPrefix(deviceID, taskID), err)
return
}
if ok && (status == "in_progress" || status == "completed") {
return
}
if !ok {
- logger.Printf("[RECORDER] Device %s: task pending/ready->in_progress skipped after begin (task not found): task=%s", deviceID, taskID)
+ logger.Printf("%s task pending/ready->in_progress skipped after begin (task not found)", recorderTaskLogPrefix(deviceID, taskID))
return
}
- logger.Printf("[RECORDER] Device %s: task pending/ready->in_progress skipped after begin (current_status=%s): task=%s", deviceID, status, taskID)
+ logger.Printf("%s task pending/ready->in_progress skipped after begin (current_status=%s)", recorderTaskLogPrefix(deviceID, taskID), status)
}
func currentTaskStatus(db *sqlx.DB, taskID string) (string, bool, error) {
@@ -1221,10 +1323,10 @@ func (h *RecorderHandler) pingLoop(ctx context.Context, rc *services.RecorderCon
cancel()
if err != nil {
if ctx.Err() == nil {
- logWebSocketPingFailure("RECORDER", "Recorder", rc.DeviceID, timeout, timedOut, err)
+ logWebSocketPingFailure("RECORDER", rc.DeviceID, timeout, timedOut, err)
if closeErr := rc.Conn.CloseNow(); closeErr != nil {
if !isExpectedWebSocketCloseError(closeErr) {
- logger.Printf("[RECORDER] Recorder %s close after ping failure: %v", rc.DeviceID, closeErr)
+ logger.Printf("%s close after ping failure: %v", recorderLogPrefix(rc.DeviceID), closeErr)
}
}
}
@@ -1270,21 +1372,21 @@ func logRecorderRPCTimeout(deviceID, action, taskID, source string, timeout time
}
taskID = strings.TrimSpace(taskID)
if taskID != "" {
- logger.Printf("[RECORDER] Recorder %s RPC timeout after %s (timeout_ms=%d): action=%s task=%s source=%s err=%v", deviceID, timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), action, taskID, source, err)
+ logger.Printf("%s RPC timeout after %s (timeout_ms=%d): action=%s source=%s err=%v", recorderTaskLogPrefix(deviceID, taskID), timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), action, source, err)
return
}
- logger.Printf("[RECORDER] Recorder %s RPC timeout after %s (timeout_ms=%d): action=%s source=%s err=%v", deviceID, timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), action, source, err)
+ logger.Printf("%s RPC timeout after %s (timeout_ms=%d): action=%s source=%s err=%v", recorderLogPrefix(deviceID), timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), action, source, err)
}
-func logWebSocketPingFailure(component, label, deviceID string, timeout time.Duration, timedOut bool, err error) {
+func logWebSocketPingFailure(component, deviceID string, timeout time.Duration, timedOut bool, err error) {
component = strings.TrimSpace(component)
- label = strings.TrimSpace(label)
deviceID = strings.TrimSpace(deviceID)
+ prefix := logPrefixWithTask(component, deviceID, "")
if timedOut {
- logger.Printf("[%s] %s %s ping timeout after %s (timeout_ms=%d): %v", component, label, deviceID, timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), err)
+ logger.Printf("%s ping timeout after %s (timeout_ms=%d): %v", prefix, timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), err)
return
}
- logger.Printf("[%s] %s %s ping failed (timeout=%s timeout_ms=%d): %v", component, label, deviceID, timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), err)
+ logger.Printf("%s ping failed (timeout=%s timeout_ms=%d): %v", prefix, timeoutLogValue(timeout), timeoutLogMilliseconds(timeout), err)
}
func recorderPingInterval(cfg *config.RecorderConfig) time.Duration {
@@ -1305,10 +1407,10 @@ func closeReplacedRecorderConn(deviceID string, rc *services.RecorderConn) {
if rc == nil || rc.Conn == nil {
return
}
- logger.Printf("[RECORDER] Device %s: closing replaced WebSocket connection", deviceID)
+ logger.Printf("%s closing replaced WebSocket connection", recorderLogPrefix(deviceID))
if err := rc.Conn.CloseNow(); err != nil {
if !isExpectedWebSocketCloseError(err) {
- logger.Printf("[RECORDER] Device %s: replaced WebSocket close error: %v", deviceID, err)
+ logger.Printf("%s replaced WebSocket close error: %v", recorderLogPrefix(deviceID), err)
}
}
}
diff --git a/internal/api/handlers/batch.go b/internal/api/handlers/batch.go
index 39c95b2..74b399f 100644
--- a/internal/api/handlers/batch.go
+++ b/internal/api/handlers/batch.go
@@ -100,14 +100,6 @@ func (h *BatchHandler) RegisterCollectorRoutes(apiV1 gin.IRoutes) {
// recalledEpisodeLabel is appended to episodes.labels (JSON string array) when a batch is recalled (see RecallBatch).
const recalledEpisodeLabel = "recalled_batch"
-// batchAdvanceTriggerStatuses lists task statuses that trigger tryAdvanceBatchStatus after a task
-// update via PUT /tasks. Tasks become cancelled only via PATCH batch cancel, which does not invoke
-// that hook; transfer completion sets completed and also calls tryAdvanceBatchStatus.
-var batchAdvanceTriggerStatuses = map[string]struct{}{
- "completed": {},
- "failed": {},
-}
-
var validBatchStatuses = map[string]struct{}{
"pending": {},
"active": {},
diff --git a/internal/api/handlers/production_dashboard.go b/internal/api/handlers/production_dashboard.go
index 4e9d0aa..1a60a07 100644
--- a/internal/api/handlers/production_dashboard.go
+++ b/internal/api/handlers/production_dashboard.go
@@ -114,6 +114,7 @@ type dashboardOverviewSummary struct {
TodayEpisodes int64 `json:"today_episodes"`
CompletedTasks int64 `json:"completed_tasks"`
InProgressTasks int64 `json:"in_progress_tasks"`
+ UploadingTasks int64 `json:"uploading_tasks"`
PendingTasks int64 `json:"pending_tasks"`
FailedTasks int64 `json:"failed_tasks"`
CancelledTasks int64 `json:"cancelled_tasks"`
@@ -132,6 +133,7 @@ type dashboardTaskCounts struct {
Total int64 `json:"total" db:"total"`
Completed int64 `json:"completed" db:"completed"`
InProgress int64 `json:"in_progress" db:"in_progress"`
+ Uploading int64 `json:"uploading" db:"uploading"`
Pending int64 `json:"pending" db:"pending"`
Ready int64 `json:"ready" db:"ready"`
Failed int64 `json:"failed" db:"failed"`
@@ -143,6 +145,7 @@ type dashboardTrendItem struct {
Total int64 `json:"total" db:"total"`
Completed int64 `json:"completed" db:"completed"`
InProgress int64 `json:"in_progress" db:"in_progress"`
+ Uploading int64 `json:"uploading" db:"uploading"`
Pending int64 `json:"pending" db:"pending"`
Failed int64 `json:"failed" db:"failed"`
}
@@ -358,6 +361,7 @@ type dashboardBatchTaskSummaryItem struct {
Pending int64 `json:"-" db:"pending"`
Ready int64 `json:"-" db:"ready"`
InProgress int64 `json:"-" db:"in_progress"`
+ Uploading int64 `json:"-" db:"uploading"`
Completed int64 `json:"-" db:"completed"`
Failed int64 `json:"-" db:"failed"`
Cancelled int64 `json:"-" db:"cancelled"`
@@ -684,6 +688,7 @@ func (h *ProductionDashboardHandler) GetBatchTaskSummary(c *gin.Context) {
COALESCE(SUM(CASE WHEN t.status = 'pending' THEN 1 ELSE 0 END), 0) AS pending,
COALESCE(SUM(CASE WHEN t.status = 'ready' THEN 1 ELSE 0 END), 0) AS ready,
COALESCE(SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END), 0) AS in_progress,
+ COALESCE(SUM(CASE WHEN t.status = 'uploading' THEN 1 ELSE 0 END), 0) AS uploading,
COALESCE(SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END), 0) AS completed,
COALESCE(SUM(CASE WHEN t.status = 'failed' THEN 1 ELSE 0 END), 0) AS failed,
COALESCE(SUM(CASE WHEN t.status = 'cancelled' THEN 1 ELSE 0 END), 0) AS cancelled
@@ -704,6 +709,7 @@ func (h *ProductionDashboardHandler) GetBatchTaskSummary(c *gin.Context) {
"pending": items[i].Pending,
"ready": items[i].Ready,
"in_progress": items[i].InProgress,
+ "uploading": items[i].Uploading,
"completed": items[i].Completed,
"failed": items[i].Failed,
"cancelled": items[i].Cancelled,
@@ -872,6 +878,7 @@ func (h *ProductionDashboardHandler) dashboardTaskCounts(db dashboardDB, scope p
COUNT(1) AS total,
COALESCE(SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END), 0) AS completed,
COALESCE(SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END), 0) AS in_progress,
+ COALESCE(SUM(CASE WHEN t.status = 'uploading' THEN 1 ELSE 0 END), 0) AS uploading,
COALESCE(SUM(CASE WHEN t.status = 'pending' THEN 1 ELSE 0 END), 0) AS pending,
COALESCE(SUM(CASE WHEN t.status = 'ready' THEN 1 ELSE 0 END), 0) AS ready,
COALESCE(SUM(CASE WHEN t.status = 'failed' THEN 1 ELSE 0 END), 0) AS failed,
@@ -899,6 +906,7 @@ func (h *ProductionDashboardHandler) dashboardOverviewTaskMetrics(db dashboardDB
COALESCE(SUM(CASE WHEN t.assigned_at >= ? AND t.assigned_at < ? THEN 1 ELSE 0 END), 0) AS total,
COALESCE(SUM(CASE WHEN t.status = 'completed' AND t.completed_at >= ? AND t.completed_at < ? THEN 1 ELSE 0 END), 0) AS completed,
COALESCE(SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END), 0) AS in_progress,
+ COALESCE(SUM(CASE WHEN t.status = 'uploading' THEN 1 ELSE 0 END), 0) AS uploading,
COALESCE(SUM(CASE WHEN t.status = 'pending' THEN 1 ELSE 0 END), 0) AS pending,
0 AS ready,
COALESCE(SUM(CASE WHEN t.status = 'failed' AND t.completed_at >= ? AND t.completed_at < ? THEN 1 ELSE 0 END), 0) AS failed,
@@ -962,6 +970,7 @@ func (h *ProductionDashboardHandler) dashboardTaskTrend(db dashboardDB, scope pr
DATE_FORMAT(` + localEventExpr + `, '%m-%d') AS date,
COALESCE(SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END), 0) AS completed,
COALESCE(SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END), 0) AS in_progress,
+ COALESCE(SUM(CASE WHEN t.status = 'uploading' THEN 1 ELSE 0 END), 0) AS uploading,
COALESCE(SUM(CASE WHEN t.status IN ('pending', 'ready') THEN 1 ELSE 0 END), 0) AS pending,
COALESCE(SUM(CASE WHEN t.status = 'failed' THEN 1 ELSE 0 END), 0) AS failed
FROM tasks t
@@ -976,7 +985,7 @@ func (h *ProductionDashboardHandler) dashboardTaskTrend(db dashboardDB, scope pr
byDate := make(map[string]dashboardTrendItem, len(rows))
for _, row := range rows {
- row.Total = row.Completed + row.InProgress + row.Pending + row.Failed
+ row.Total = row.Completed + row.InProgress + row.Uploading + row.Pending + row.Failed
byDate[row.Date] = row
}
items := make([]dashboardTrendItem, 0, q.TrendDays)
@@ -1423,6 +1432,7 @@ func buildOverviewSummary(
TodayEpisodes: todayProductionTotals.TotalEpisodes,
CompletedTasks: overviewTaskMetrics.Completed,
InProgressTasks: overviewTaskMetrics.InProgress,
+ UploadingTasks: overviewTaskMetrics.Uploading,
PendingTasks: overviewTaskMetrics.Pending,
FailedTasks: overviewTaskMetrics.Failed,
CancelledTasks: tasks.Cancelled,
@@ -1443,6 +1453,7 @@ func buildTaskStatusDistribution(tasks dashboardTaskCounts) []dashboardStatusDis
{Status: "pending", Label: dashboardTaskStatusText("pending"), Value: tasks.Pending},
{Status: "ready", Label: dashboardTaskStatusText("ready"), Value: tasks.Ready},
{Status: "in_progress", Label: dashboardTaskStatusText("in_progress"), Value: tasks.InProgress},
+ {Status: "uploading", Label: dashboardTaskStatusText("uploading"), Value: tasks.Uploading},
{Status: "completed", Label: dashboardTaskStatusText("completed"), Value: tasks.Completed},
{Status: "failed", Label: dashboardTaskStatusText("failed"), Value: tasks.Failed},
{Status: "cancelled", Label: dashboardTaskStatusText("cancelled"), Value: tasks.Cancelled},
@@ -1671,6 +1682,8 @@ func dashboardTaskStatusText(status string) string {
return "就绪"
case "in_progress":
return "进行中"
+ case "uploading":
+ return "上传中"
case "completed":
return "已完成"
case "failed":
diff --git a/internal/api/handlers/recorder_axon_interaction_test.go b/internal/api/handlers/recorder_axon_interaction_test.go
index 0d6cbfd..b9617a7 100644
--- a/internal/api/handlers/recorder_axon_interaction_test.go
+++ b/internal/api/handlers/recorder_axon_interaction_test.go
@@ -8,6 +8,7 @@ import (
"bytes"
"context"
"encoding/json"
+ "log"
"net/http"
"net/http/httptest"
"strings"
@@ -15,6 +16,7 @@ import (
"time"
"archebase.com/keystone-edge/internal/config"
+ "archebase.com/keystone-edge/internal/logger"
"archebase.com/keystone-edge/internal/services"
"github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
@@ -1308,6 +1310,44 @@ func TestRecorderWebSocketStateUpdateBeforeGetStateResponseIsIdempotent(t *testi
}
}
+func TestRecorderStateSnapshotLogsOnlyStateChanges(t *testing.T) {
+ var buf bytes.Buffer
+ previousLogger := logger.Get()
+ logger.Set(log.New(&buf, "", 0))
+ defer logger.Set(previousLogger)
+
+ hub := services.NewRecorderHub()
+ handler := NewRecorderHandler(hub, &config.RecorderConfig{}, nil)
+ rc := hub.NewRecorderConn(nil, "robot-001", "127.0.0.1")
+
+ if err := handler.applyRecorderStateSnapshot(rc, services.RecorderState{CurrentState: "idle"}, "get_state"); err != nil {
+ t.Fatalf("apply idle snapshot: %v", err)
+ }
+ if err := handler.applyRecorderStateSnapshot(rc, services.RecorderState{CurrentState: "idle"}, "state_update"); err != nil {
+ t.Fatalf("apply duplicate idle snapshot: %v", err)
+ }
+ if err := handler.applyRecorderStateSnapshot(rc, services.RecorderState{CurrentState: "ready", TaskID: "task-ready"}, "state_update"); err != nil {
+ t.Fatalf("apply ready snapshot: %v", err)
+ }
+ if err := handler.applyRecorderStateSnapshot(rc, services.RecorderState{CurrentState: "ready", TaskID: "task-ready"}, "rpc_response:config"); err != nil {
+ t.Fatalf("apply duplicate ready snapshot: %v", err)
+ }
+
+ output := buf.String()
+ if got := strings.Count(output, "state changed:"); got != 2 {
+ t.Fatalf("state change log count=%d want=2 output=%q", got, output)
+ }
+ if !strings.Contains(output, "[RECORDER][robot-001] state changed: unknown -> idle source=get_state") {
+ t.Fatalf("initial state change log missing: %q", output)
+ }
+ if !strings.Contains(output, "[RECORDER][robot-001][task-ready] state changed: idle -> ready source=state_update") {
+ t.Fatalf("ready state change log missing: %q", output)
+ }
+ if strings.Contains(output, "rpc_response:config") {
+ t.Fatalf("duplicate rpc_response snapshot should not be logged: %q", output)
+ }
+}
+
func TestRecorderWebSocketGetStateReadyRejectsConfigWithoutSendingRPC(t *testing.T) {
db := newRecorderInteractionDB(t)
seedRecorderInteractionDevice(t, db, "robot-001", 1, 101)
diff --git a/internal/api/handlers/task.go b/internal/api/handlers/task.go
index 4b01d47..584a3bc 100644
--- a/internal/api/handlers/task.go
+++ b/internal/api/handlers/task.go
@@ -118,7 +118,6 @@ func (h *TaskHandler) RegisterRoutes(apiV1 *gin.RouterGroup) {
apiV1.POST("/tasks", h.CreateTask)
apiV1.GET("/tasks", h.ListTasks)
apiV1.GET("/tasks/:id", h.GetTask)
- apiV1.PUT("/tasks/:id", h.UpdateTask)
apiV1.DELETE("/tasks/:id", h.DeleteTask)
apiV1.GET("/tasks/:id/config", h.GetTaskConfig)
}
@@ -127,6 +126,7 @@ var validTaskStatuses = map[string]struct{}{
"pending": {},
"ready": {},
"in_progress": {},
+ "uploading": {},
"completed": {},
"failed": {},
"cancelled": {},
@@ -147,6 +147,7 @@ type TaskListItem struct {
SubsceneID string `json:"subscene_id" db:"subscene_id"`
SubsceneName string `json:"subscene_name" db:"subscene_name"`
Status string `json:"status" db:"status"`
+ ErrorMessage *string `json:"error_message" db:"error_message"`
AssignedAt *string `json:"assigned_at" db:"assigned_at"`
}
@@ -183,6 +184,7 @@ type TaskDetailResponse struct {
FactoryID *string `json:"factory_id" db:"factory_id"`
OrganizationID *int64 `json:"organization_id" db:"organization_id"`
Status string `json:"status" db:"status"`
+ ErrorMessage *string `json:"error_message" db:"error_message"`
CreatedAt *string `json:"created_at" db:"created_at"`
AssignedAt *string `json:"assigned_at" db:"assigned_at"`
StartedAt *string `json:"started_at" db:"started_at"`
@@ -192,37 +194,6 @@ type TaskDetailResponse struct {
EpisodePublicID sql.NullString `db:"episode_public_id" json:"-"`
}
-// UpdateTaskRequest represents the request body for updating a task status.
-type UpdateTaskRequest struct {
- Status string `json:"status"`
- UpdatedBy string `json:"updated_by"`
-}
-
-// UpdateTaskResponse represents the response body for updating a task status.
-type UpdateTaskResponse struct {
- ID string `json:"id"`
- Status string `json:"status"`
- UpdatedAt string `json:"updated_at"`
-}
-
-var validTaskStatusTransitions = map[string]map[string]struct{}{
- "pending": {
- "ready": {},
- },
- "ready": {
- "in_progress": {},
- "pending": {},
- },
- "in_progress": {
- "pending": {},
- "completed": {},
- "failed": {},
- },
- "failed": {},
- "completed": {},
- "cancelled": {},
-}
-
// ListTasks handles task listing requests with optional filtering.
//
// @Summary List tasks
@@ -315,6 +286,7 @@ func (h *TaskHandler) ListTasks(c *gin.Context) {
CAST(tasks.subscene_id AS CHAR) AS subscene_id,
COALESCE(tasks.subscene_name, '') AS subscene_name,
tasks.status,
+ tasks.error_message,
CASE WHEN tasks.assigned_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(tasks.assigned_at, @@session.time_zone, '+00:00'), '%%Y-%%m-%%dT%%H:%%i:%%sZ') END AS assigned_at
FROM tasks
LEFT JOIN workstations ws ON ws.id = tasks.workstation_id AND ws.deleted_at IS NULL
@@ -377,6 +349,7 @@ func (h *TaskHandler) GetTask(c *gin.Context) {
CASE WHEN t.factory_id IS NULL THEN NULL ELSE CAST(t.factory_id AS CHAR) END AS factory_id,
t.organization_id AS organization_id,
t.status,
+ t.error_message,
CASE WHEN t.created_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.created_at, @@session.time_zone, '+00:00'), '%Y-%m-%dT%H:%i:%sZ') END AS created_at,
CASE WHEN t.assigned_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.assigned_at, @@session.time_zone, '+00:00'), '%Y-%m-%dT%H:%i:%sZ') END AS assigned_at,
CASE WHEN t.started_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.started_at, @@session.time_zone, '+00:00'), '%Y-%m-%dT%H:%i:%sZ') END AS started_at,
@@ -412,161 +385,6 @@ func (h *TaskHandler) GetTask(c *gin.Context) {
c.JSON(http.StatusOK, task)
}
-// UpdateTask handles task status update requests.
-//
-// @Summary Update task
-// @Description Updates task status with restricted state transitions. Setting status to cancelled is not allowed; cancel the parent batch instead. Rejected when the parent batch status is cancelled or recalled.
-// @Tags tasks
-// @Accept json
-// @Produce json
-// @Param id path string true "Task ID"
-// @Param body body UpdateTaskRequest true "Task update payload"
-// @Success 200 {object} UpdateTaskResponse
-// @Failure 400 {object} map[string]string
-// @Failure 404 {object} map[string]string
-// @Failure 409 {object} map[string]string "Conflict (invalid transition or batch is cancelled/recalled)"
-// @Failure 500 {object} map[string]string
-// @Router /tasks/{id} [put]
-func (h *TaskHandler) UpdateTask(c *gin.Context) {
- idStr := strings.TrimSpace(c.Param("id"))
- if idStr == "" {
- c.JSON(http.StatusBadRequest, gin.H{"error_msg": "task id is required"})
- return
- }
- id, err := strconv.ParseInt(idStr, 10, 64)
- if err != nil || id <= 0 {
- c.JSON(http.StatusBadRequest, gin.H{"error_msg": "invalid task id"})
- return
- }
-
- var req UpdateTaskRequest
- if err := c.ShouldBindJSON(&req); err != nil {
- c.JSON(http.StatusBadRequest, gin.H{
- "error_msg": "Invalid request body: " + err.Error(),
- })
- return
- }
-
- req.Status = strings.TrimSpace(req.Status)
- req.UpdatedBy = strings.TrimSpace(req.UpdatedBy)
-
- if req.Status == "" {
- c.JSON(http.StatusBadRequest, gin.H{"error_msg": "status is required"})
- return
- }
-
- if _, ok := validTaskStatuses[req.Status]; !ok {
- c.JSON(http.StatusBadRequest, gin.H{"error_msg": "invalid status"})
- return
- }
-
- if req.Status == "cancelled" {
- c.JSON(http.StatusBadRequest, gin.H{
- "error_msg": "setting status to 'cancelled' is not allowed via PUT; cancel the parent batch (PATCH /batches/:id) instead",
- })
- return
- }
-
- if req.UpdatedBy == "" {
- c.JSON(http.StatusBadRequest, gin.H{"error_msg": "updated_by is required"})
- return
- }
-
- var taskRow struct {
- Status string `db:"status"`
- OrderID int64 `db:"order_id"`
- BatchStatus sql.NullString `db:"batch_status"`
- }
- err = h.db.Get(&taskRow, `
- SELECT t.status, t.order_id, b.status AS batch_status
- FROM tasks t
- LEFT JOIN batches b ON b.id = t.batch_id AND b.deleted_at IS NULL
- WHERE t.id = ? AND t.deleted_at IS NULL`, id)
- if err == sql.ErrNoRows {
- c.JSON(http.StatusNotFound, gin.H{"error_msg": "Task not found: " + idStr})
- return
- }
- if err != nil {
- logger.Printf("[TASK] Failed to query task: %v", err)
- c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to query task"})
- return
- }
-
- if taskRow.BatchStatus.Valid {
- bs := taskRow.BatchStatus.String
- if bs == "cancelled" || bs == "recalled" {
- c.JSON(http.StatusConflict, gin.H{
- "error_msg": fmt.Sprintf("cannot update task while parent batch status is %q", bs),
- "batch_status": bs,
- "requested_status": req.Status,
- })
- return
- }
- }
-
- if _, ok := validTaskStatusTransitions[taskRow.Status][req.Status]; !ok {
- c.JSON(http.StatusConflict, gin.H{
- "error_msg": fmt.Sprintf("Cannot transition from '%s' to '%s'", taskRow.Status, req.Status),
- "current_status": taskRow.Status,
- "requested_status": req.Status,
- })
- return
- }
-
- now := time.Now().UTC()
-
- // Fetch batch_id for post-update batch state advancement
- var batchIDForAdvance int64
- _ = h.db.Get(&batchIDForAdvance, "SELECT batch_id FROM tasks WHERE id = ? AND deleted_at IS NULL LIMIT 1", id)
- orderIDForAdvance := taskRow.OrderID
-
- result, err := h.db.Exec(
- "UPDATE tasks SET status = ?, updated_at = ?, ready_at = CASE WHEN ? = 'ready' THEN ? ELSE ready_at END WHERE id = ? AND status = ? AND deleted_at IS NULL",
- req.Status,
- now,
- req.Status,
- now,
- id,
- taskRow.Status,
- )
- if err != nil {
- logger.Printf("[TASK] Failed to update task: %v", err)
- c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to update task"})
- return
- }
-
- rowsAffected, err := result.RowsAffected()
- if err != nil {
- logger.Printf("[TASK] Failed to get rows affected: %v", err)
- c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to verify update"})
- return
- }
-
- if rowsAffected == 0 {
- c.JSON(http.StatusConflict, gin.H{
- "error_msg": fmt.Sprintf("Cannot transition from '%s' to '%s'", taskRow.Status, req.Status),
- "current_status": taskRow.Status,
- "requested_status": req.Status,
- })
- return
- }
-
- // After completed/failed, try to advance the batch status (pending->active, active->completed).
- if _, ok := batchAdvanceTriggerStatuses[req.Status]; ok && batchIDForAdvance > 0 {
- go tryAdvanceBatchStatus(h.db, batchIDForAdvance)
- }
- // After completed, try to advance the order status (created->in_progress, in_progress->completed).
- if req.Status == "completed" && orderIDForAdvance > 0 {
- go tryAdvanceOrderStatus(h.db, orderIDForAdvance, h.recorderHub, h.recorderRPCTimeout)
- }
-
- c.JSON(http.StatusOK, UpdateTaskResponse{
- ID: idStr,
- Status: req.Status,
- UpdatedAt: now.Format(time.RFC3339),
- })
-}
-
// DeleteTask handles soft deletion of a task.
// Tasks with status "completed" cannot be deleted.
//
@@ -600,9 +418,9 @@ func (h *TaskHandler) DeleteTask(c *gin.Context) {
return
}
- // Completed tasks cannot be deleted (they form part of the audit trail)
- if taskStatus == "completed" {
- c.JSON(http.StatusConflict, gin.H{"error_msg": "cannot delete a completed task; completed tasks form part of the audit trail"})
+ // Completed/uploading tasks cannot be deleted because their files or audit trail may still be in use.
+ if taskStatus == "completed" || taskStatus == "uploading" {
+ c.JSON(http.StatusConflict, gin.H{"error_msg": "cannot delete a completed or uploading task"})
return
}
@@ -973,11 +791,11 @@ func (h *TaskHandler) OnRecordingStart(c *gin.Context) {
return
}
- logger.Printf("[RECORDER] Device %s: received start callback for task=%s", callback.DeviceID, callback.TaskID)
+ logger.Printf("%s received start callback", recorderTaskLogPrefix(callback.DeviceID, callback.TaskID))
// Validate required fields
if callback.TaskID == "" {
- logger.Printf("[RECORDER] Device %s: Missing task_id in callback", callback.DeviceID)
+ logger.Printf("%s missing task_id in callback", recorderLogPrefix(callback.DeviceID))
c.JSON(http.StatusBadRequest, gin.H{
"error_msg": "Missing required field: task_id",
})
@@ -986,12 +804,12 @@ func (h *TaskHandler) OnRecordingStart(c *gin.Context) {
taskStatus := "unknown"
if h.db != nil {
- res, err := advanceTaskPendingOrReadyToInProgress(h.db, callback.TaskID)
+ rowsAffected, previousStatus, err := advanceTaskPendingOrReadyToInProgress(h.db, callback.TaskID)
if err != nil {
- logger.Printf("[RECORDER] Device %s: failed to advance task pending/ready->in_progress after start callback: task=%s err=%v", callback.DeviceID, callback.TaskID, err)
- } else if n, _ := res.RowsAffected(); n > 0 {
+ logger.Printf("%s failed to advance task pending/ready->in_progress after start callback: err=%v", recorderTaskLogPrefix(callback.DeviceID, callback.TaskID), err)
+ } else if rowsAffected > 0 {
taskStatus = "in_progress"
- logger.Printf("[RECORDER] Device %s: task status reconciled: task=%s source=start_callback status=in_progress", callback.DeviceID, callback.TaskID)
+ logger.Printf("%s task status updated: %s -> in_progress reason=start_callback", recorderTaskLogPrefix(callback.DeviceID, callback.TaskID), taskStatusLogValue(previousStatus, "unknown"))
}
}
@@ -1036,7 +854,7 @@ func (h *TaskHandler) OnRecordingFinish(c *gin.Context) {
}
if callback.OutputPath == "" {
- logger.Printf("[RECORDER] Failed to parse callback: missing output_path for task_id=%s", callback.TaskID)
+ logger.Printf("%s failed to parse callback: missing output_path", recorderTaskLogPrefix(callback.DeviceID, callback.TaskID))
c.JSON(http.StatusBadRequest, gin.H{
"error_msg": "Missing required field: output_path",
})
@@ -1052,23 +870,43 @@ func (h *TaskHandler) OnRecordingFinish(c *gin.Context) {
return
}
- logger.Printf("[RECORDER] Device %s: received finish callback for task=%s", callback.DeviceID, callback.TaskID)
+ logger.Printf("%s received finish callback", recorderTaskLogPrefix(callback.DeviceID, callback.TaskID))
if h.db != nil {
- res, err := advanceTaskPendingOrReadyToInProgress(h.db, callback.TaskID)
+ previousStatus, _, _ := currentOwnedTaskStatus(c.Request.Context(), h.db, deviceID, callback.TaskID)
+ res, err := markOwnedTaskUploading(c.Request.Context(), h.db, deviceID, callback.TaskID)
if err != nil {
- logger.Printf("[RECORDER] Device %s: failed to restore task pending/ready->in_progress after finish callback: task=%s err=%v", deviceID, callback.TaskID, err)
+ logger.Printf("%s failed to mark task uploading after finish callback: err=%v", recorderTaskLogPrefix(deviceID, callback.TaskID), err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to update task status"})
+ return
} else if n, _ := res.RowsAffected(); n > 0 {
- logger.Printf("[RECORDER] Device %s: task status reconciled: task=%s source=finish_callback status=in_progress", deviceID, callback.TaskID)
+ logger.Printf("%s task status updated: %s -> uploading reason=finish_callback", recorderTaskLogPrefix(deviceID, callback.TaskID), taskStatusLogValue(previousStatus, "unknown"))
+ } else {
+ logger.Printf("%s task uploading transition skipped after finish callback", recorderTaskLogPrefix(deviceID, callback.TaskID))
+ c.JSON(http.StatusConflict, gin.H{
+ "error_msg": "task is not owned by device or is not uploadable",
+ })
+ return
}
}
- dc := h.hub.Get(deviceID)
+ var dc *services.TransferConn
+ if h.hub != nil {
+ dc = h.hub.Get(deviceID)
+ }
if dc == nil {
- // TODO: add status pending_upload, when device reconnects, check for any pending_upload tasks and trigger upload then
- logger.Printf("[RECORDER] Device %s: Not found in hub for task=%s, cannot trigger upload", deviceID, callback.TaskID)
- c.JSON(http.StatusConflict, gin.H{
- "error_msg": "Recording finished, device not connected for auto-upload",
+ errorMessage := "transfer disconnected; upload_request not sent"
+ if h.db != nil {
+ if _, err := writeOwnedUploadingTaskError(c.Request.Context(), h.db, deviceID, callback.TaskID, errorMessage); err != nil {
+ logger.Printf("%s failed to write upload_request error: err=%v", recorderTaskLogPrefix(deviceID, callback.TaskID), err)
+ }
+ }
+ logger.Printf("%s not found in hub, upload_request not sent", recorderTaskLogPrefix(deviceID, callback.TaskID))
+ c.JSON(http.StatusOK, gin.H{
+ "success": true,
+ "message": "Recording finished; upload_request not sent because transfer is disconnected",
+ "task_status": "uploading",
+ "upload_request_sent": false,
})
return
}
@@ -1081,24 +919,39 @@ func (h *TaskHandler) OnRecordingFinish(c *gin.Context) {
writeTimeout := h.axonTransferWriteTimeout()
if err := h.hub.SendToDeviceWithTimeout(c.Request.Context(), deviceID, uploadRequest, writeTimeout); err != nil {
- status := http.StatusInternalServerError
if errors.Is(err, services.ErrTransferWriteTimeout) {
- status = http.StatusGatewayTimeout
- logger.Printf("[TRANSFER] Device %s: auto upload_request timed out after %s for task=%s: %v", deviceID, timeoutLogValue(writeTimeout), callback.TaskID, err)
+ logger.Printf("%s auto upload_request timed out after %s: %v", recorderTaskLogPrefix(deviceID, callback.TaskID), timeoutLogValue(writeTimeout), err)
} else {
- logger.Printf("[RECORDER] Failed to send upload_request to device %s: %v", deviceID, err)
+ logger.Printf("%s failed to send upload_request: %v", recorderTaskLogPrefix(deviceID, callback.TaskID), err)
+ }
+ errorMessage := "upload_request failed: " + err.Error()
+ if h.db != nil {
+ if _, writeErr := writeOwnedUploadingTaskError(c.Request.Context(), h.db, deviceID, callback.TaskID, errorMessage); writeErr != nil {
+ logger.Printf("%s failed to write upload_request error: err=%v", recorderTaskLogPrefix(deviceID, callback.TaskID), writeErr)
+ }
}
- c.JSON(status, gin.H{
- "error_msg": "Failed to trigger upload: " + err.Error(),
+ c.JSON(http.StatusOK, gin.H{
+ "success": true,
+ "message": "Recording finished; upload_request not sent",
+ "task_status": "uploading",
+ "upload_request_sent": false,
+ "upload_request_error": err.Error(),
})
return
}
- logger.Printf("[RECORDER] Device %s: successfully triggered upload for task_id=%s", deviceID, callback.TaskID)
+ if h.db != nil {
+ if _, err := clearOwnedUploadingTaskError(c.Request.Context(), h.db, deviceID, callback.TaskID); err != nil {
+ logger.Printf("%s failed to clear upload_request error: err=%v", recorderTaskLogPrefix(deviceID, callback.TaskID), err)
+ }
+ }
+ logger.Printf("%s successfully triggered upload", recorderTaskLogPrefix(deviceID, callback.TaskID))
c.JSON(http.StatusOK, gin.H{
- "success": true,
- "message": "Upload triggered successfully",
+ "success": true,
+ "message": "Upload triggered successfully",
+ "task_status": "uploading",
+ "upload_request_sent": true,
})
}
diff --git a/internal/api/handlers/task_state_recovery_test.go b/internal/api/handlers/task_state_recovery_test.go
index 95c1338..d57e017 100644
--- a/internal/api/handlers/task_state_recovery_test.go
+++ b/internal/api/handlers/task_state_recovery_test.go
@@ -7,6 +7,7 @@ package handlers
import (
"bytes"
"context"
+ "database/sql"
"encoding/json"
"fmt"
"net/http"
@@ -456,8 +457,8 @@ func TestRecordingFinishAutoUploadUsesConfiguredTransferWriteTimeout(t *testing.
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
- if w.Code != http.StatusGatewayTimeout {
- t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusGatewayTimeout, w.Body.String())
+ if w.Code != http.StatusOK {
+ t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String())
}
if hub.getDeviceID != "robot-001" {
t.Fatalf("Get device=%q want=%q", hub.getDeviceID, "robot-001")
@@ -474,13 +475,58 @@ func TestRecordingFinishAutoUploadUsesConfiguredTransferWriteTimeout(t *testing.
if got := fmt.Sprint(hub.msg["task_id"]); got != "task-finish" {
t.Fatalf("message task_id=%q want=%q", got, "task-finish")
}
- assertTaskStateRecoveryStatus(t, db, "task-finish", "in_progress")
- assertTaskStateRecoveryTimestampSet(t, db, "task-finish", "started_at")
+ assertTaskStateRecoveryStatus(t, db, "task-finish", "uploading")
+ assertTaskStateRecoveryErrorContains(t, db, "task-finish", "upload_request failed")
+ assertTaskStateRecoveryErrorContains(t, db, "task-finish", "fake blocked transfer write")
if !strings.Contains(w.Body.String(), custom.String()) {
t.Fatalf("response body %q does not mention custom timeout %s", w.Body.String(), custom)
}
}
+func TestRecordingFinishDisconnectedTransferRecordsUploadRequestError(t *testing.T) {
+ db := newTaskStateRecoveryDB(t)
+ defer db.Close()
+ seedTaskStateRecoveryTask(t, db, "task-finish-disconnected", "pending")
+
+ hub := &recordingFinishTransferHub{}
+ handler := &TaskHandler{db: db, hub: hub}
+
+ gin.SetMode(gin.TestMode)
+ router := gin.New()
+ handler.RegisterCallbackRoutes(router.Group("/callbacks"))
+
+ body, err := json.Marshal(RecordingFinishCallback{
+ TaskID: "task-finish-disconnected",
+ DeviceID: "robot-001",
+ Status: "finished",
+ FinishedAt: time.Now().UTC().Format(time.RFC3339),
+ OutputPath: "/data/task-finish-disconnected.mcap",
+ })
+ if err != nil {
+ t.Fatalf("marshal callback: %v", err)
+ }
+
+ req := httptest.NewRequest(http.MethodPost, "/callbacks/finish", bytes.NewReader(body))
+ req.Header.Set("Content-Type", "application/json")
+ w := httptest.NewRecorder()
+ router.ServeHTTP(w, req)
+
+ if w.Code != http.StatusOK {
+ t.Fatalf("status=%d want=%d body=%s", w.Code, http.StatusOK, w.Body.String())
+ }
+ if hub.getDeviceID != "robot-001" {
+ t.Fatalf("Get device=%q want=%q", hub.getDeviceID, "robot-001")
+ }
+ if hub.sendDeviceID != "" {
+ t.Fatalf("Send device=%q want empty for disconnected transfer", hub.sendDeviceID)
+ }
+ assertTaskStateRecoveryStatus(t, db, "task-finish-disconnected", "uploading")
+ assertTaskStateRecoveryErrorContains(t, db, "task-finish-disconnected", "transfer disconnected")
+ if !strings.Contains(w.Body.String(), `"upload_request_sent":false`) {
+ t.Fatalf("response body %q does not report unsent upload request", w.Body.String())
+ }
+}
+
type recordingFinishTransferHub struct {
conn *services.TransferConn
getDeviceID string
@@ -510,16 +556,38 @@ func newTaskStateRecoveryDB(t *testing.T) *sqlx.DB {
if _, err := db.Exec(`CREATE TABLE tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
+ workstation_id INTEGER NULL,
status TEXT NOT NULL,
ready_at TIMESTAMP NULL,
started_at TIMESTAMP NULL,
completed_at TIMESTAMP NULL,
+ error_message TEXT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
deleted_at TIMESTAMP NULL
)`); err != nil {
t.Fatalf("create tasks schema: %v", err)
}
+ if _, err := db.Exec(`CREATE TABLE robots (
+ id INTEGER PRIMARY KEY,
+ device_id TEXT NOT NULL,
+ deleted_at TIMESTAMP NULL
+ )`); err != nil {
+ t.Fatalf("create robots schema: %v", err)
+ }
+ if _, err := db.Exec(`CREATE TABLE workstations (
+ id INTEGER PRIMARY KEY,
+ robot_id INTEGER NOT NULL,
+ deleted_at TIMESTAMP NULL
+ )`); err != nil {
+ t.Fatalf("create workstations schema: %v", err)
+ }
+ if _, err := db.Exec(`INSERT INTO robots (id, device_id) VALUES (1, 'robot-001')`); err != nil {
+ t.Fatalf("seed robot: %v", err)
+ }
+ if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 1)`); err != nil {
+ t.Fatalf("seed workstation: %v", err)
+ }
return db
}
@@ -527,7 +595,7 @@ func seedTaskStateRecoveryTask(t *testing.T, db *sqlx.DB, taskID string, status
t.Helper()
now := time.Now().UTC()
if _, err := db.Exec(
- `INSERT INTO tasks (task_id, status, created_at, updated_at) VALUES (?, ?, ?, ?)`,
+ `INSERT INTO tasks (task_id, workstation_id, status, created_at, updated_at) VALUES (?, 10, ?, ?, ?)`,
taskID,
status,
now,
@@ -548,6 +616,20 @@ func assertTaskStateRecoveryStatus(t *testing.T, db *sqlx.DB, taskID string, wan
}
}
+func assertTaskStateRecoveryErrorContains(t *testing.T, db *sqlx.DB, taskID string, want string) {
+ t.Helper()
+ var got sql.NullString
+ if err := db.Get(&got, `SELECT error_message FROM tasks WHERE task_id = ?`, taskID); err != nil {
+ t.Fatalf("query task error_message: %v", err)
+ }
+ if !got.Valid {
+ t.Fatalf("task error_message is NULL, want substring %q", want)
+ }
+ if !strings.Contains(got.String, want) {
+ t.Fatalf("task error_message=%q want substring %q", got.String, want)
+ }
+}
+
func assertTaskStateRecoveryTimestampSet(t *testing.T, db *sqlx.DB, taskID string, column string) {
t.Helper()
if column != "ready_at" && column != "started_at" {
diff --git a/internal/api/handlers/task_uploading.go b/internal/api/handlers/task_uploading.go
new file mode 100644
index 0000000..61ccff7
--- /dev/null
+++ b/internal/api/handlers/task_uploading.go
@@ -0,0 +1,151 @@
+// SPDX-FileCopyrightText: 2026 ArcheBase
+//
+// SPDX-License-Identifier: MulanPSL-2.0
+
+package handlers
+
+import (
+ "context"
+ "database/sql"
+ "strings"
+ "time"
+)
+
+type taskStateExecutor interface {
+ ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
+}
+
+type taskStateQuerier interface {
+ GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
+}
+
+func currentOwnedTaskStatus(ctx context.Context, querier taskStateQuerier, deviceID, taskID string) (string, bool, error) {
+ if querier == nil {
+ return "", false, nil
+ }
+ var status string
+ err := querier.GetContext(ctx, &status, `
+ SELECT t.status
+ FROM tasks t
+ JOIN workstations ws ON ws.id = t.workstation_id AND ws.deleted_at IS NULL
+ JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL
+ WHERE t.task_id = ?
+ AND t.deleted_at IS NULL
+ AND r.device_id = ?
+ LIMIT 1
+ `, strings.TrimSpace(taskID), strings.TrimSpace(deviceID))
+ if err == sql.ErrNoRows {
+ return "", false, nil
+ }
+ if err != nil {
+ return "", false, err
+ }
+ return strings.TrimSpace(status), true, nil
+}
+
+func taskStatusLogValue(status, fallback string) string {
+ status = strings.TrimSpace(status)
+ if status == "" {
+ return fallback
+ }
+ return status
+}
+
+func markOwnedTaskUploading(ctx context.Context, exec taskStateExecutor, deviceID, taskID string) (sql.Result, error) {
+ if exec == nil {
+ return nil, nil
+ }
+ now := time.Now().UTC()
+ return exec.ExecContext(ctx, `
+ UPDATE tasks
+ SET
+ status = 'uploading',
+ updated_at = ?,
+ error_message = NULL
+ WHERE task_id = ?
+ AND status IN ('pending', 'ready', 'in_progress', 'uploading')
+ AND deleted_at IS NULL
+ AND EXISTS (
+ SELECT 1
+ FROM workstations ws
+ JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL
+ WHERE ws.id = tasks.workstation_id
+ AND ws.deleted_at IS NULL
+ AND r.device_id = ?
+ )
+ `, now, strings.TrimSpace(taskID), strings.TrimSpace(deviceID))
+}
+
+func failOwnedUploadingTask(ctx context.Context, exec taskStateExecutor, deviceID, taskID, reason string) (sql.Result, error) {
+ if exec == nil {
+ return nil, nil
+ }
+ now := time.Now().UTC()
+ return exec.ExecContext(ctx, `
+ UPDATE tasks
+ SET
+ status = 'failed',
+ completed_at = CASE WHEN completed_at IS NULL THEN ? ELSE completed_at END,
+ error_message = ?,
+ updated_at = ?
+ WHERE task_id = ?
+ AND status IN ('in_progress', 'uploading')
+ AND deleted_at IS NULL
+ AND EXISTS (
+ SELECT 1
+ FROM workstations ws
+ JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL
+ WHERE ws.id = tasks.workstation_id
+ AND ws.deleted_at IS NULL
+ AND r.device_id = ?
+ )
+ `, now, strings.TrimSpace(reason), now, strings.TrimSpace(taskID), strings.TrimSpace(deviceID))
+}
+
+func writeOwnedUploadingTaskError(ctx context.Context, exec taskStateExecutor, deviceID, taskID, message string) (sql.Result, error) {
+ if exec == nil {
+ return nil, nil
+ }
+ now := time.Now().UTC()
+ return exec.ExecContext(ctx, `
+ UPDATE tasks
+ SET
+ error_message = ?,
+ updated_at = ?
+ WHERE task_id = ?
+ AND status = 'uploading'
+ AND deleted_at IS NULL
+ AND EXISTS (
+ SELECT 1
+ FROM workstations ws
+ JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL
+ WHERE ws.id = tasks.workstation_id
+ AND ws.deleted_at IS NULL
+ AND r.device_id = ?
+ )
+ `, strings.TrimSpace(message), now, strings.TrimSpace(taskID), strings.TrimSpace(deviceID))
+}
+
+func clearOwnedUploadingTaskError(ctx context.Context, exec taskStateExecutor, deviceID, taskID string) (sql.Result, error) {
+ if exec == nil {
+ return nil, nil
+ }
+ now := time.Now().UTC()
+ return exec.ExecContext(ctx, `
+ UPDATE tasks
+ SET
+ error_message = NULL,
+ updated_at = ?
+ WHERE task_id = ?
+ AND status = 'uploading'
+ AND deleted_at IS NULL
+ AND EXISTS (
+ SELECT 1
+ FROM workstations ws
+ JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL
+ WHERE ws.id = tasks.workstation_id
+ AND ws.deleted_at IS NULL
+ AND r.device_id = ?
+ )
+ `, now, strings.TrimSpace(taskID), strings.TrimSpace(deviceID))
+}
diff --git a/internal/api/handlers/transfer.go b/internal/api/handlers/transfer.go
index 3ebb650..ac9112a 100644
--- a/internal/api/handlers/transfer.go
+++ b/internal/api/handlers/transfer.go
@@ -86,7 +86,6 @@ func (h *TransferHandler) RegisterRoutes(apiV1 *gin.RouterGroup) {
transfer.POST("/upload_all", h.UploadAll)
transfer.POST("/status_query", h.StatusQuery)
transfer.POST("/cancel", h.CancelUpload)
- transfer.POST("/upload_ack", h.ManualACK)
}
}
@@ -106,14 +105,14 @@ func (h *TransferHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request
"SELECT COUNT(1) FROM robots WHERE device_id = ? AND deleted_at IS NULL", deviceID,
); err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(queryCtx.Err(), context.DeadlineExceeded) {
- logger.Printf("[TRANSFER] Device %s: DB query timeout after %s (timeout_ms=%d): %v", deviceID, timeoutLogValue(queryTimeout), timeoutLogMilliseconds(queryTimeout), err)
+ logger.Printf("%s DB query timeout after %s (timeout_ms=%d): %v", transferLogPrefix(deviceID), timeoutLogValue(queryTimeout), timeoutLogMilliseconds(queryTimeout), err)
} else {
- logger.Printf("[TRANSFER] Device %s: DB query error: %v", deviceID, err)
+ logger.Printf("%s DB query error: %v", transferLogPrefix(deviceID), err)
}
}
// Check count regardless of DB error (count defaults to 0 on error)
if count == 0 {
- logger.Printf("[TRANSFER] Device %s: robot not found in database", deviceID)
+ logger.Printf("%s robot not found in database", transferLogPrefix(deviceID))
w.WriteHeader(http.StatusNotFound)
return
}
@@ -123,7 +122,7 @@ func (h *TransferHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request
InsecureSkipVerify: true, // allow any origin in dev; tighten in production
})
if err != nil {
- logger.Printf("[TRANSFER] Device %s: WebSocket accept error: %v", deviceID, err)
+ logger.Printf("%s WebSocket accept error: %v", transferLogPrefix(deviceID), err)
return
}
@@ -136,7 +135,7 @@ func (h *TransferHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request
defer func() {
if err := conn.Close(websocket.StatusNormalClosure, ""); err != nil {
if !isExpectedWebSocketCloseError(err) {
- logger.Printf("[TRANSFER] WebSocket close error for device %s: %v", deviceID, err)
+ logger.Printf("%s WebSocket close error: %v", transferLogPrefix(deviceID), err)
}
}
}()
@@ -153,7 +152,7 @@ func (h *TransferHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request
go h.pingLoop(ctx, dc)
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Transfer %s connected from %s", deviceID, remoteIP)
+ logger.Printf("%s connected from %s", transferLogPrefix(deviceID), remoteIP)
// Read loop: use ctx directly for infinite wait.
// context.WithTimeout(ctx, 0) would set deadline=now and cause immediate timeout,
@@ -163,14 +162,14 @@ func (h *TransferHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request
_, raw, err := conn.Read(ctx)
if err != nil {
if !isExpectedWebSocketCloseError(err) {
- logger.Printf("[TRANSFER] Device %s disconnected: %v", deviceID, err)
+ logger.Printf("%s disconnected: %v", transferLogPrefix(deviceID), err)
}
break
}
var msg map[string]interface{}
if jsonErr := json.Unmarshal(raw, &msg); jsonErr != nil {
- logger.Printf("[TRANSFER] Device %s: invalid JSON: %v", deviceID, jsonErr)
+ logger.Printf("%s invalid JSON: %v", transferLogPrefix(deviceID), jsonErr)
continue
}
@@ -203,10 +202,10 @@ func (h *TransferHandler) pingLoop(ctx context.Context, dc *services.TransferCon
cancel()
if err != nil {
if ctx.Err() == nil {
- logWebSocketPingFailure("TRANSFER", "Device", dc.DeviceID, timeout, timedOut, err)
+ logWebSocketPingFailure("TRANSFER", dc.DeviceID, timeout, timedOut, err)
if closeErr := dc.Conn.CloseNow(); closeErr != nil {
if !isExpectedWebSocketCloseError(closeErr) {
- logger.Printf("[TRANSFER] Device %s close after ping failure: %v", dc.DeviceID, closeErr)
+ logger.Printf("%s close after ping failure: %v", transferLogPrefix(dc.DeviceID), closeErr)
}
}
}
@@ -250,10 +249,10 @@ func transferMessageType(msg map[string]interface{}) string {
func logTransferSendFailure(deviceID string, msgType string, timeout time.Duration, err error) {
if errors.Is(err, services.ErrTransferWriteTimeout) {
- logger.Printf("[TRANSFER] Device %s: send %s timed out after %s: %v", deviceID, msgType, timeoutLogValue(timeout), err)
+ logger.Printf("%s send %s timed out after %s: %v", transferLogPrefix(deviceID), msgType, timeoutLogValue(timeout), err)
return
}
- logger.Printf("[TRANSFER] Device %s: failed to send %s: %v", deviceID, msgType, err)
+ logger.Printf("%s failed to send %s: %v", transferLogPrefix(deviceID), msgType, err)
}
func (h *TransferHandler) sendToDevice(c *gin.Context, deviceID string, msg map[string]interface{}) bool {
@@ -274,10 +273,10 @@ func closeReplacedTransferConn(deviceID string, dc *services.TransferConn) {
if dc == nil || dc.Conn == nil {
return
}
- logger.Printf("[TRANSFER] Device %s: closing replaced WebSocket connection", deviceID)
+ logger.Printf("%s closing replaced WebSocket connection", transferLogPrefix(deviceID))
if err := dc.Conn.CloseNow(); err != nil {
if !isExpectedWebSocketCloseError(err) {
- logger.Printf("[TRANSFER] Device %s: replaced WebSocket close error: %v", deviceID, err)
+ logger.Printf("%s replaced WebSocket close error: %v", transferLogPrefix(deviceID), err)
}
}
}
@@ -285,7 +284,7 @@ func closeReplacedTransferConn(deviceID string, dc *services.TransferConn) {
// handleMessage dispatches an inbound WebSocket message to the appropriate handler
func (h *TransferHandler) handleMessage(ctx context.Context, dc *services.TransferConn, msg map[string]interface{}) {
if h.hub.Get(dc.DeviceID) != dc {
- logger.Printf("[TRANSFER] Device %s ignored message from replaced connection", dc.DeviceID)
+ logger.Printf("%s ignored message from replaced connection", transferLogPrefix(dc.DeviceID))
return
}
@@ -295,7 +294,7 @@ func (h *TransferHandler) handleMessage(ctx context.Context, dc *services.Transf
case "connected":
h.onConnected(dc, msg)
case "upload_started":
- h.onUploadStarted(dc, msg)
+ h.onUploadStarted(ctx, dc, msg)
case "upload_progress":
h.onUploadProgress(dc, msg)
case "upload_complete":
@@ -308,7 +307,7 @@ func (h *TransferHandler) handleMessage(ctx context.Context, dc *services.Transf
h.onStatus(dc, msg)
default:
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: unknown message type %q", dc.DeviceID, msgType)
+ logger.Printf("%s unknown message type %q", transferLogPrefix(dc.DeviceID), msgType)
}
}
@@ -327,20 +326,29 @@ func (h *TransferHandler) onConnected(dc *services.TransferConn, msg map[string]
}
dc.UpdateStatus(s)
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Transfer %s connected: version=%s pending=%d uploading=%d failed=%d",
- dc.DeviceID, s.Version, s.PendingCount, s.UploadingCount, s.FailedCount)
+ logger.Printf("%s connected: version=%s pending=%d uploading=%d failed=%d",
+ transferLogPrefix(dc.DeviceID), s.Version, s.PendingCount, s.UploadingCount, s.FailedCount)
}
// onUploadStarted handles "upload_started" message
-func (h *TransferHandler) onUploadStarted(dc *services.TransferConn, msg map[string]interface{}) {
+func (h *TransferHandler) onUploadStarted(ctx context.Context, dc *services.TransferConn, msg map[string]interface{}) {
data, _ := msg["data"].(map[string]interface{})
if data == nil {
return
}
taskID := stringVal(data, "task_id")
+ if h.db != nil && taskID != "" {
+ previousStatus, _, _ := currentOwnedTaskStatus(ctx, h.db, dc.DeviceID, taskID)
+ res, err := markOwnedTaskUploading(ctx, h.db, dc.DeviceID, taskID)
+ if err != nil {
+ logger.Printf("%s failed to mark task uploading after upload_started: err=%v", transferTaskLogPrefix(dc.DeviceID, taskID), err)
+ } else if n, _ := res.RowsAffected(); n > 0 {
+ logger.Printf("%s task status updated: %s -> uploading reason=upload_started", transferTaskLogPrefix(dc.DeviceID, taskID), taskStatusLogValue(previousStatus, "unknown"))
+ }
+ }
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: upload started task=%s total_bytes=%d",
- dc.DeviceID, taskID, int64Val(data, "total_bytes"))
+ logger.Printf("%s upload started total_bytes=%d",
+ transferTaskLogPrefix(dc.DeviceID, taskID), int64Val(data, "total_bytes"))
}
// onUploadProgress handles "upload_progress" message
@@ -352,7 +360,7 @@ func (h *TransferHandler) onUploadProgress(dc *services.TransferConn, msg map[st
taskID := stringVal(data, "task_id")
percent := intVal(data, "percent")
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: upload progress task=%s %d%%", dc.DeviceID, taskID, percent)
+ logger.Printf("%s upload progress %d%%", transferTaskLogPrefix(dc.DeviceID, taskID), percent)
}
// sidecarRecording is the subset of the sidecar JSON "recording" block we care about.
@@ -434,26 +442,50 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
data, _ := msg["data"].(map[string]interface{})
if data == nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: upload complete data is nil", dc.DeviceID)
+ logger.Printf("%s upload complete data is nil", transferLogPrefix(dc.DeviceID))
return
}
taskID := stringVal(data, "task_id")
if taskID == "" {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: upload complete taskID is empty", dc.DeviceID)
+ logger.Printf("%s upload complete taskID is empty", transferLogPrefix(dc.DeviceID))
return
}
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: upload complete for task=%s", dc.DeviceID, taskID)
+ logger.Printf("%s upload complete", transferTaskLogPrefix(dc.DeviceID, taskID))
if h.s3 == nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: S3 not configured, skipping upload_complete for task=%s", dc.DeviceID, taskID)
+ logger.Printf("%s S3 not configured, skipping upload_complete", transferTaskLogPrefix(dc.DeviceID, taskID))
return
}
if h.db == nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: DB not configured, skipping upload_complete for task=%s", dc.DeviceID, taskID)
+ logger.Printf("%s DB not configured, skipping upload_complete", transferTaskLogPrefix(dc.DeviceID, taskID))
+ return
+ }
+
+ var ownedTask struct {
+ ID int64 `db:"id"`
+ Status string `db:"status"`
+ }
+ if err := h.db.GetContext(ctx, &ownedTask, `
+ SELECT t.id, t.status
+ FROM tasks t
+ JOIN workstations ws ON ws.id = t.workstation_id AND ws.deleted_at IS NULL
+ JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL
+ WHERE t.task_id = ? AND r.device_id = ? AND t.deleted_at IS NULL
+ LIMIT 1
+ `, taskID, dc.DeviceID); err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ logger.Printf("%s upload_complete ignored because task ownership check failed", transferTaskLogPrefix(dc.DeviceID, taskID))
+ } else {
+ logger.Printf("%s upload_complete ownership lookup failed: err=%v", transferTaskLogPrefix(dc.DeviceID, taskID), err)
+ }
+ return
+ }
+ if ownedTask.Status == "failed" || ownedTask.Status == "cancelled" {
+ logger.Printf("%s upload_complete ignored for terminal status=%s", transferTaskLogPrefix(dc.DeviceID, taskID), ownedTask.Status)
return
}
@@ -461,14 +493,14 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
mcapKey := uploadCompleteS3Key(data)
if mcapKey == "" {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: upload_complete for task=%s missing s3_key, skipping ACK", dc.DeviceID, taskID)
+ logger.Printf("%s upload_complete missing s3_key, skipping ACK", transferTaskLogPrefix(dc.DeviceID, taskID))
return
}
jsonKey, ok := uploadCompleteSidecarS3Key(mcapKey)
if !ok {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: upload_complete for task=%s has invalid MCAP s3_key=%q, cannot derive sidecar key, skipping ACK", dc.DeviceID, taskID, mcapKey)
+ logger.Printf("%s upload_complete has invalid MCAP s3_key=%q, cannot derive sidecar key, skipping ACK", transferTaskLogPrefix(dc.DeviceID, taskID), mcapKey)
return
}
@@ -496,14 +528,14 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
if mcapErr != nil || jsonErr != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: S3 HeadObject error", dc.DeviceID)
+ logger.Printf("%s S3 HeadObject error", transferTaskLogPrefix(dc.DeviceID, taskID))
return
}
if !mcapExists || !jsonExists {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: S3 files not found for task=%s, skipping ACK",
- dc.DeviceID, taskID)
+ logger.Printf("%s S3 files not found, skipping ACK",
+ transferTaskLogPrefix(dc.DeviceID, taskID))
return
}
@@ -514,7 +546,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
tx, err := h.db.BeginTx(ctx, nil)
if err != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: DB begin transaction error for task=%s: %v", dc.DeviceID, taskID, err)
+ logger.Printf("%s DB begin transaction error: %v", transferTaskLogPrefix(dc.DeviceID, taskID), err)
return
}
defer func() {
@@ -527,7 +559,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
var taskPK int64
if err := tx.QueryRowContext(ctx, "SELECT id FROM tasks WHERE task_id = ? AND deleted_at IS NULL", taskID).Scan(&taskPK); err != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: failed to resolve task id for task=%s: %v", dc.DeviceID, taskID, err)
+ logger.Printf("%s failed to resolve task id: %v", transferTaskLogPrefix(dc.DeviceID, taskID), err)
return
}
@@ -536,7 +568,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
var batchIDForAdvance int64
if err := tx.QueryRowContext(ctx, "SELECT batch_id FROM tasks WHERE id = ? AND deleted_at IS NULL", taskPK).Scan(&batchIDForAdvance); err != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: failed to resolve batch id for task=%s (task_pk=%d): %v", dc.DeviceID, taskID, taskPK, err)
+ logger.Printf("%s failed to resolve batch id (task_pk=%d): %v", transferTaskLogPrefix(dc.DeviceID, taskID), taskPK, err)
batchIDForAdvance = 0
}
@@ -545,7 +577,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
var orderIDForAdvance int64
if err := tx.QueryRowContext(ctx, "SELECT order_id FROM tasks WHERE id = ? AND deleted_at IS NULL", taskPK).Scan(&orderIDForAdvance); err != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: failed to resolve order id for task=%s (task_pk=%d): %v", dc.DeviceID, taskID, taskPK, err)
+ logger.Printf("%s failed to resolve order id (task_pk=%d): %v", transferTaskLogPrefix(dc.DeviceID, taskID), taskPK, err)
orderIDForAdvance = 0
}
@@ -557,12 +589,12 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
).Scan(&count)
if err != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: DB query error for task=%s: %v", dc.DeviceID, taskID, err)
+ logger.Printf("%s DB query error: %v", transferTaskLogPrefix(dc.DeviceID, taskID), err)
return
}
if count > 0 {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: task=%s already exists in DB (by mcap_path or sidecar_path), skipping insert", dc.DeviceID, taskID)
+ logger.Printf("%s already exists in DB (by mcap_path or sidecar_path), skipping insert", transferTaskLogPrefix(dc.DeviceID, taskID))
} else {
var taskRow struct {
ID int64 `db:"id"`
@@ -614,12 +646,12 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
if err == nil && existingEpisodeID == "" {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: data corruption: empty episode_id found for task_pk=%d task=%s", dc.DeviceID, taskRow.ID, taskID)
+ logger.Printf("%s data corruption: empty episode_id found for task_pk=%d", transferTaskLogPrefix(dc.DeviceID, taskID), taskRow.ID)
return
}
if err != nil && !errors.Is(err, sql.ErrNoRows) {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: DB query failed for existing episode check task_pk=%d task=%s: %v", dc.DeviceID, taskRow.ID, taskID, err)
+ logger.Printf("%s DB query failed for existing episode check task_pk=%d: %v", transferTaskLogPrefix(dc.DeviceID, taskID), taskRow.ID, err)
return
}
@@ -683,7 +715,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
)
if dbErr != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: DB insert failed for task=%s: %v", dc.DeviceID, taskID, dbErr)
+ logger.Printf("%s DB insert failed: %v", transferTaskLogPrefix(dc.DeviceID, taskID), dbErr)
return
}
@@ -694,7 +726,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
WHERE id = ? AND deleted_at IS NULL
`, taskRow.BatchID); dbErr != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: DB update failed for batch=%d task=%s: %v", dc.DeviceID, taskRow.BatchID, taskID, dbErr)
+ logger.Printf("%s DB update failed for batch=%d: %v", transferTaskLogPrefix(dc.DeviceID, taskID), taskRow.BatchID, dbErr)
return
}
}
@@ -703,7 +735,7 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
// Commit transaction
if err := tx.Commit(); err != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: DB commit error for task=%s: %v", dc.DeviceID, taskID, err)
+ logger.Printf("%s DB commit error: %v", transferTaskLogPrefix(dc.DeviceID, taskID), err)
return
}
@@ -718,29 +750,35 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra
return
}
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: upload_ack sent for task=%s", dc.DeviceID, taskID)
+ logger.Printf("%s upload_ack sent", transferTaskLogPrefix(dc.DeviceID, taskID))
- // After upload_ack is sent, mark task as completed (pending, ready, or in_progress -> completed).
- // pending is allowed when Keystone rolled the task back during a transient recorder disconnect,
- // while the device kept recording and successfully uploaded the episode.
+ // After upload_ack is sent, mark task as completed. pending/ready/in_progress remain accepted
+ // for legacy weak-network recovery; uploading is the normal post-recording path.
// Best-effort: do not affect the already-sent acknowledgement.
now := time.Now().UTC()
- if _, err := h.db.ExecContext(ctx, `
+ res, err := h.db.ExecContext(ctx, `
UPDATE tasks
SET
status = 'completed',
completed_at = CASE WHEN completed_at IS NULL THEN ? ELSE completed_at END,
+ error_message = NULL,
updated_at = ?
- WHERE id = ? AND status IN ('pending', 'in_progress', 'ready') AND deleted_at IS NULL
- `, now, now, taskPK); err != nil {
+ WHERE id = ? AND status IN ('pending', 'ready', 'in_progress', 'uploading', 'completed') AND deleted_at IS NULL
+ `, now, now, taskPK)
+ if err != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: failed to mark task pending/ready/in_progress->completed after upload_ack: task=%s err=%v", dc.DeviceID, taskID, err)
+ logger.Printf("%s failed to mark task completed after upload_ack: err=%v", transferTaskLogPrefix(dc.DeviceID, taskID), err)
} else {
- if batchIDForAdvance > 0 {
+ rowsAffected, _ := res.RowsAffected()
+ shouldAdvance := ownedTask.Status != "completed" && rowsAffected > 0
+ if shouldAdvance {
+ logger.Printf("%s task status updated: %s -> completed reason=upload_ack", transferTaskLogPrefix(dc.DeviceID, taskID), taskStatusLogValue(ownedTask.Status, "unknown"))
+ }
+ if shouldAdvance && batchIDForAdvance > 0 {
// Must run after the task row is terminal: tryAdvanceBatchStatus counts tasks in DB.
go tryAdvanceBatchStatus(h.db, batchIDForAdvance)
}
- if orderIDForAdvance > 0 {
+ if shouldAdvance && orderIDForAdvance > 0 {
go tryAdvanceOrderStatus(h.db, orderIDForAdvance, h.recorderHub, h.recorderRPCTimeout)
}
}
@@ -758,17 +796,17 @@ func (h *TransferHandler) onUploadFailed(ctx context.Context, dc *services.Trans
// Log full message for debugging
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Received from device %s: full message=%+v", dc.DeviceID, msg)
+ logger.Printf("%s received upload_failed full message=%+v", transferTaskLogPrefix(dc.DeviceID, taskID), msg)
// Try to extract bucket info if present
if bucket, ok := data["bucket"].(string); ok {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: task=%s bucket=%s reason=%q retries=%d",
- dc.DeviceID, taskID, bucket, reason, retryCount)
+ logger.Printf("%s upload_failed bucket=%s reason=%q retries=%d",
+ transferTaskLogPrefix(dc.DeviceID, taskID), bucket, reason, retryCount)
} else {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: task=%s reason=%q retries=%d",
- dc.DeviceID, taskID, reason, retryCount)
+ logger.Printf("%s upload_failed reason=%q retries=%d",
+ transferTaskLogPrefix(dc.DeviceID, taskID), reason, retryCount)
}
// Log configured S3 bucket for comparison
@@ -776,27 +814,23 @@ func (h *TransferHandler) onUploadFailed(ctx context.Context, dc *services.Trans
logger.Printf("[TRANSFER] Keystone configured bucket: %s", h.s3.Bucket())
}
- // Mark task as failed when upload_failed is received and task is in_progress.
+ // Mark task as failed when upload_failed is received and this transfer owns the task.
if h.db == nil || taskID == "" {
return
}
- now := time.Now().UTC()
- result, err := h.db.ExecContext(ctx, `
- UPDATE tasks
- SET
- status = 'failed',
- completed_at = CASE WHEN completed_at IS NULL THEN ? ELSE completed_at END,
- updated_at = ?
- WHERE task_id = ? AND status = 'in_progress' AND deleted_at IS NULL
- `, now, now, taskID)
+ message := strings.TrimSpace(reason)
+ if message == "" {
+ message = "upload failed"
+ }
+ result, err := failOwnedUploadingTask(ctx, h.db, dc.DeviceID, taskID, message)
if err != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: failed to mark task failed on upload_failed: task=%s err=%v", dc.DeviceID, taskID, err)
+ logger.Printf("%s failed to mark task failed on upload_failed: err=%v", transferTaskLogPrefix(dc.DeviceID, taskID), err)
return
}
if rows, _ := result.RowsAffected(); rows > 0 {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: task=%s marked as failed due to upload_failed", dc.DeviceID, taskID)
+ logger.Printf("%s marked as failed due to upload_failed", transferTaskLogPrefix(dc.DeviceID, taskID))
// Trigger batch status advancement since the task reached a terminal state.
var batchID int64
if err := h.db.QueryRowContext(ctx,
@@ -845,12 +879,12 @@ func revertRunnableTasksOnDeviceDisconnect(db *sqlx.DB, deviceID string, recorde
`, deviceID)
if err != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[DEVICE] Device %s: failed to query runnable tasks on disconnect: %v", deviceID, err)
+ logger.Printf("%s failed to query runnable tasks on disconnect: %v", deviceLogPrefix(deviceID), err)
return
}
defer func() {
if cerr := rows.Close(); cerr != nil {
- logger.Printf("[DEVICE] Device %s: close rows after disconnect task query: %v", deviceID, cerr)
+ logger.Printf("%s close rows after disconnect task query: %v", deviceLogPrefix(deviceID), cerr)
}
}()
@@ -864,13 +898,13 @@ func revertRunnableTasksOnDeviceDisconnect(db *sqlx.DB, deviceID string, recorde
for rows.Next() {
var ref taskRef
if err := rows.Scan(&ref.id, &ref.taskID, &ref.batchID, &ref.status); err != nil {
- logger.Printf("[DEVICE] Device %s: scan error during disconnect task query: %v", deviceID, err)
+ logger.Printf("%s scan error during disconnect task query: %v", deviceLogPrefix(deviceID), err)
continue
}
toRevert = append(toRevert, ref)
}
if err := rows.Err(); err != nil {
- logger.Printf("[DEVICE] Device %s: rows error during disconnect task query: %v", deviceID, err)
+ logger.Printf("%s rows error during disconnect task query: %v", deviceLogPrefix(deviceID), err)
}
if notifyRecorder && recorderHub != nil {
@@ -887,7 +921,7 @@ func revertRunnableTasksOnDeviceDisconnect(db *sqlx.DB, deviceID string, recorde
if errors.Is(err, services.ErrRecorderRPCTimeout) {
logRecorderRPCTimeout(deviceID, "clear", tid, "transfer_disconnect", timeout, err)
} else {
- logger.Printf("[DEVICE] Device %s: recorder clear after transfer disconnect failed (task=%s): %v", deviceID, tid, err)
+ logger.Printf("%s recorder clear after transfer disconnect failed: %v", deviceTaskLogPrefix(deviceID, tid), err)
}
}
case "in_progress":
@@ -895,7 +929,7 @@ func revertRunnableTasksOnDeviceDisconnect(db *sqlx.DB, deviceID string, recorde
if errors.Is(err, services.ErrRecorderRPCTimeout) {
logRecorderRPCTimeout(deviceID, "cancel", tid, "transfer_disconnect", timeout, err)
} else {
- logger.Printf("[DEVICE] Device %s: recorder cancel after transfer disconnect failed (task=%s): %v", deviceID, tid, err)
+ logger.Printf("%s recorder cancel after transfer disconnect failed: %v", deviceTaskLogPrefix(deviceID, tid), err)
}
}
}
@@ -916,12 +950,12 @@ func revertRunnableTasksOnDeviceDisconnect(db *sqlx.DB, deviceID string, recorde
`, now, ref.id)
if err != nil {
// #nosec G706 -- Set aside for now
- logger.Printf("[DEVICE] Device %s: failed to revert task=%s to pending on disconnect: %v", deviceID, ref.taskID, err)
+ logger.Printf("%s failed to revert to pending on disconnect: %v", deviceTaskLogPrefix(deviceID, ref.taskID), err)
continue
}
if affected, _ := result.RowsAffected(); affected > 0 {
// #nosec G706 -- Set aside for now
- logger.Printf("[DEVICE] Device %s: task=%s reverted to pending due to device disconnect", deviceID, ref.taskID)
+ logger.Printf("%s reverted to pending due to device disconnect", deviceTaskLogPrefix(deviceID, ref.taskID))
}
}
}
@@ -932,22 +966,31 @@ func revertRunnableTasksOnDeviceDisconnect(db *sqlx.DB, deviceID string, recorde
func (h *TransferHandler) onUploadNotFound(dc *services.TransferConn, msg map[string]interface{}) {
data, _ := msg["data"].(map[string]interface{})
if data == nil {
- logger.Printf("[TRANSFER] ERROR: Device %s reported upload_not_found with empty data", dc.DeviceID)
+ logger.Printf("%s ERROR: reported upload_not_found with empty data", transferLogPrefix(dc.DeviceID))
return
}
taskID := strings.TrimSpace(stringVal(data, "task_id"))
detail := strings.TrimSpace(stringVal(data, "detail"))
if taskID == "" {
- logger.Printf("[TRANSFER] ERROR: Device %s reported upload_not_found without task_id detail=%q", dc.DeviceID, detail)
+ logger.Printf("%s ERROR: reported upload_not_found without task_id detail=%q", transferLogPrefix(dc.DeviceID), detail)
return
}
- logger.Printf("[TRANSFER] ERROR: Device %s reported upload_not_found task=%s detail=%q", dc.DeviceID, taskID, detail)
+ message := "upload file not found"
+ if detail != "" {
+ message = detail
+ }
+ if h.db != nil {
+ if _, err := writeOwnedUploadingTaskError(context.Background(), h.db, dc.DeviceID, taskID, message); err != nil {
+ logger.Printf("%s ERROR: failed to write upload_not_found error: err=%v", transferTaskLogPrefix(dc.DeviceID, taskID), err)
+ }
+ }
+ logger.Printf("%s ERROR: reported upload_not_found detail=%q", transferTaskLogPrefix(dc.DeviceID, taskID), detail)
}
// onStatus handles "status" message and updates the device status snapshot
func (h *TransferHandler) onStatus(dc *services.TransferConn, msg map[string]interface{}) {
// #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: received status update", dc.DeviceID)
+ logger.Printf("%s received status update", transferLogPrefix(dc.DeviceID))
data, _ := msg["data"].(map[string]interface{})
if data == nil {
return
@@ -1020,6 +1063,11 @@ func (h *TransferHandler) UploadRequest(c *gin.Context) {
if !h.sendToDevice(c, deviceID, msg) {
return
}
+ if h.db != nil {
+ if _, err := clearOwnedUploadingTaskError(c.Request.Context(), h.db, deviceID, body.TaskID); err != nil {
+ logger.Printf("%s failed to clear upload_request error: err=%v", transferTaskLogPrefix(deviceID, body.TaskID), err)
+ }
+ }
c.JSON(http.StatusOK, gin.H{"status": "sent"})
}
@@ -1035,29 +1083,29 @@ func (h *TransferHandler) UploadRequest(c *gin.Context) {
func (h *TransferHandler) UploadAll(c *gin.Context) {
deviceID := c.Param("device_id")
- logger.Printf("[TRANSFER] Device %s: received upload_all request", deviceID)
+ logger.Printf("%s received upload_all request", transferLogPrefix(deviceID))
// Check if device is connected
dc := h.hub.Get(deviceID)
if dc == nil {
- logger.Printf("[TRANSFER] Device %s: not connected", deviceID)
+ logger.Printf("%s not connected", transferLogPrefix(deviceID))
c.JSON(http.StatusNotFound, gin.H{"error": fmt.Sprintf("device %s not connected", deviceID)})
return
}
- logger.Printf("[TRANSFER] Device %s: connected, remote_ip=%s", deviceID, dc.RemoteIP)
+ logger.Printf("%s connected, remote_ip=%s", transferLogPrefix(deviceID), dc.RemoteIP)
status := dc.GetStatus()
- logger.Printf("[TRANSFER] Device %s: current status is pending=%d uploading=%d failed=%d waiting_ack=%d",
- deviceID, status.PendingCount, status.UploadingCount, status.FailedCount, status.WaitingACKCount)
+ logger.Printf("%s current status is pending=%d uploading=%d failed=%d waiting_ack=%d",
+ transferLogPrefix(deviceID), status.PendingCount, status.UploadingCount, status.FailedCount, status.WaitingACKCount)
msg := map[string]interface{}{"type": "upload_all"}
- logger.Printf("[TRANSFER] Sending message to device %s: %+v", deviceID, msg)
+ logger.Printf("%s sending message: %+v", transferLogPrefix(deviceID), msg)
if !h.sendToDevice(c, deviceID, msg) {
return
}
- logger.Printf("[TRANSFER] Message sent successfully to device %s", deviceID)
+ logger.Printf("%s message sent successfully", transferLogPrefix(deviceID))
c.JSON(http.StatusOK, gin.H{"status": "sent"})
}
@@ -1119,65 +1167,6 @@ func (h *TransferHandler) StatusQuery(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "sent"})
}
-// ManualACK sends an upload_ack message to the device.
-//
-// @Summary Manually acknowledge an upload
-// @Tags transfer
-// @Accept json
-// @Produce json
-// @Param device_id path string true "Device ID"
-// @Param body body object true "task_id to acknowledge"
-// @Success 200 {object} map[string]interface{}
-// @Failure 404 {object} map[string]interface{}
-// @Router /transfer/{device_id}/upload_ack [post]
-func (h *TransferHandler) ManualACK(c *gin.Context) {
- deviceID := c.Param("device_id")
-
- var body struct {
- TaskID string `json:"task_id" binding:"required"`
- }
- if err := c.ShouldBindJSON(&body); err != nil {
- c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
- return
- }
-
- msg := map[string]interface{}{
- "type": "upload_ack",
- "task_id": body.TaskID,
- }
- if !h.sendToDevice(c, deviceID, msg) {
- return
- }
-
- // After upload_ack is sent, mark task as completed (ready or in_progress -> completed).
- // Best-effort: do not fail the acknowledgement response.
- if h.db != nil {
- now := time.Now().UTC()
- if _, err := h.db.Exec(
- `UPDATE tasks
- SET
- status = 'completed',
- completed_at = CASE WHEN completed_at IS NULL THEN ? ELSE completed_at END,
- updated_at = ?
- WHERE task_id = ? AND status IN ('in_progress', 'ready') AND deleted_at IS NULL`,
- now, now, body.TaskID,
- ); err != nil {
- // #nosec G706 -- Set aside for now
- logger.Printf("[TRANSFER] Device %s: failed to mark task ready/in_progress->completed after manual upload_ack: task=%s err=%v", deviceID, body.TaskID, err)
- } else {
- var batchID int64
- if err := h.db.Get(&batchID, "SELECT batch_id FROM tasks WHERE task_id = ? AND deleted_at IS NULL LIMIT 1", body.TaskID); err == nil && batchID > 0 {
- go tryAdvanceBatchStatus(h.db, batchID)
- }
- var orderID int64
- if err := h.db.Get(&orderID, "SELECT order_id FROM tasks WHERE task_id = ? AND deleted_at IS NULL LIMIT 1", body.TaskID); err == nil && orderID > 0 {
- go tryAdvanceOrderStatus(h.db, orderID, h.recorderHub, h.recorderRPCTimeout)
- }
- }
- }
- c.JSON(http.StatusOK, gin.H{"status": "sent"})
-}
-
// extractIP extracts the IP address from a RemoteAddr string (host:port)
func extractIP(remoteAddr string) string {
host, _, err := net.SplitHostPort(remoteAddr)
diff --git a/internal/api/handlers/transfer_connection_takeover_test.go b/internal/api/handlers/transfer_connection_takeover_test.go
index aa553f0..caf44ee 100644
--- a/internal/api/handlers/transfer_connection_takeover_test.go
+++ b/internal/api/handlers/transfer_connection_takeover_test.go
@@ -74,14 +74,36 @@ func newTransferTakeoverDB(t *testing.T) *sqlx.DB {
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
batch_id INTEGER NOT NULL DEFAULT 0,
+ workstation_id INTEGER NULL,
status TEXT NOT NULL,
completed_at TIMESTAMP NULL,
+ error_message TEXT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
deleted_at TIMESTAMP NULL
)`); err != nil {
t.Fatalf("create tasks schema: %v", err)
}
+ if _, err := db.Exec(`CREATE TABLE robots (
+ id INTEGER PRIMARY KEY,
+ device_id TEXT NOT NULL,
+ deleted_at TIMESTAMP NULL
+ )`); err != nil {
+ t.Fatalf("create robots schema: %v", err)
+ }
+ if _, err := db.Exec(`CREATE TABLE workstations (
+ id INTEGER PRIMARY KEY,
+ robot_id INTEGER NOT NULL,
+ deleted_at TIMESTAMP NULL
+ )`); err != nil {
+ t.Fatalf("create workstations schema: %v", err)
+ }
+ if _, err := db.Exec(`INSERT INTO robots (id, device_id) VALUES (1, 'robot-001')`); err != nil {
+ t.Fatalf("seed robot: %v", err)
+ }
+ if _, err := db.Exec(`INSERT INTO workstations (id, robot_id) VALUES (10, 1)`); err != nil {
+ t.Fatalf("seed workstation: %v", err)
+ }
return db
}
@@ -89,7 +111,7 @@ func seedTransferTakeoverTask(t *testing.T, db *sqlx.DB, taskID string, status s
t.Helper()
now := time.Now().UTC()
if _, err := db.Exec(
- `INSERT INTO tasks (task_id, status, created_at, updated_at) VALUES (?, ?, ?, ?)`,
+ `INSERT INTO tasks (task_id, workstation_id, status, created_at, updated_at) VALUES (?, 10, ?, ?, ?)`,
taskID,
status,
now,
diff --git a/internal/storage/database/migrations/000001_initial_schema.up.sql b/internal/storage/database/migrations/000001_initial_schema.up.sql
index 9c6a889..667a6ce 100644
--- a/internal/storage/database/migrations/000001_initial_schema.up.sql
+++ b/internal/storage/database/migrations/000001_initial_schema.up.sql
@@ -290,7 +290,7 @@ CREATE TABLE IF NOT EXISTS tasks (
factory_id BIGINT COMMENT 'Denormalized: from workstation.factory_id for filtering',
organization_id BIGINT COMMENT 'Denormalized: from factory.organization_id for filtering',
initial_scene_layout TEXT,
- status ENUM('pending', 'ready', 'in_progress', 'completed', 'failed', 'cancelled') DEFAULT 'pending',
+ status ENUM('pending', 'ready', 'in_progress', 'uploading', 'completed', 'failed', 'cancelled') DEFAULT 'pending',
version INT DEFAULT 0 COMMENT 'Optimistic locking version',
assigned_at TIMESTAMP NULL,
ready_at TIMESTAMP NULL,
diff --git a/internal/storage/database/migrations/000005_task_uploading_status.down.sql b/internal/storage/database/migrations/000005_task_uploading_status.down.sql
new file mode 100644
index 0000000..7d4e11b
--- /dev/null
+++ b/internal/storage/database/migrations/000005_task_uploading_status.down.sql
@@ -0,0 +1,10 @@
+-- SPDX-FileCopyrightText: 2026 ArcheBase
+--
+-- SPDX-License-Identifier: MulanPSL-2.0
+
+UPDATE tasks
+SET status = 'in_progress'
+WHERE status = 'uploading';
+
+ALTER TABLE tasks
+ MODIFY COLUMN status ENUM('pending', 'ready', 'in_progress', 'completed', 'failed', 'cancelled') DEFAULT 'pending';
diff --git a/internal/storage/database/migrations/000005_task_uploading_status.up.sql b/internal/storage/database/migrations/000005_task_uploading_status.up.sql
new file mode 100644
index 0000000..7721a66
--- /dev/null
+++ b/internal/storage/database/migrations/000005_task_uploading_status.up.sql
@@ -0,0 +1,6 @@
+-- SPDX-FileCopyrightText: 2026 ArcheBase
+--
+-- SPDX-License-Identifier: MulanPSL-2.0
+
+ALTER TABLE tasks
+ MODIFY COLUMN status ENUM('pending', 'ready', 'in_progress', 'uploading', 'completed', 'failed', 'cancelled') DEFAULT 'pending';