diff --git a/.gitignore b/.gitignore index 5a79e426..696a7cc4 100644 --- a/.gitignore +++ b/.gitignore @@ -113,7 +113,11 @@ jest_html_reporters.html reports # generated files -src/db/prisma/generated +src/db/prisma/generated/* +# But keep TypedSQL types for Docker builds +!src/db/prisma/generated/client/ +src/db/prisma/generated/client/* +!src/db/prisma/generated/client/sql/ scripts/generateVersionedOpenApi.mjs # local values diff --git a/Dockerfile b/Dockerfile index 7072fc32..59c5aa4a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,5 @@ FROM node:24 AS build - WORKDIR /tmp/buildApp COPY ./package*.json ./ diff --git a/helm/values.yaml b/helm/values.yaml index 0d8d105e..1100dfaa 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -22,14 +22,14 @@ fullnameOverride: "" livenessProbe: enabled: true initialDelaySeconds: 30 - timeoutSeconds: 5 + timeoutSeconds: 60 failureThreshold: 6 path: /liveness readinessProbe: enabled: true initialDelaySeconds: 30 - timeoutSeconds: 5 + timeoutSeconds: 60 failureThreshold: 6 path: /liveness diff --git a/openapi3.yaml b/openapi3.yaml index 5a1b7953..58c6bf4d 100644 --- a/openapi3.yaml +++ b/openapi3.yaml @@ -5,7 +5,7 @@ info: version: 0.2.0 license: name: MIT - url: https://opensource.org/licenses/MIT + url: 'https://opensource.org/licenses/MIT' security: [] paths: /v1/jobs: @@ -109,7 +109,7 @@ paths: input_path: /data/traced/batch_001 output_path: /data/output/batch_001 traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 - tracestate: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE + tracestate: 'rojo=00f067aa0ba902b7,congo=t61rcWkgMzE' responses: '201': description: Job created successfully @@ -172,9 +172,9 @@ paths: deadline: '2025-08-01T00:00:00.000Z' cost_center: CC-12345 traceparent: 00-660f9511f3ac52e5b827557766551111-22b378cc5ca902b7-01 - tracestate: analytics=dept123,priority=very_high + tracestate: 'analytics=dept123,priority=very_high' '400': - description: Invalid request, could not create job + description: 'Invalid request, could not create job' content: application/json: schema: @@ -209,7 +209,7 @@ paths: application/json: schema: $ref: '#/components/schemas/internalErrorsResponse' - /v1/jobs/{jobId}: + '/v1/jobs/{jobId}': parameters: - $ref: '#/components/parameters/jobId' get: @@ -348,14 +348,14 @@ paths: traceparent: 00-660f9511f3ac52e5b827557766551111-22b378cc5ca902b7-01 data: source_path: /data/production - destination: s3://backup-bucket/2025-07-26 + destination: 's3://backup-bucket/2025-07-26' compression: gzip encryption: true userMetadata: backup_type: daily retention_days: 30 '400': - description: Invalid request, could not get job + description: 'Invalid request, could not get job' content: application/json: schema: @@ -374,7 +374,7 @@ paths: $ref: '#/components/schemas/internalErrorsResponse' delete: operationId: deleteJobV1 - summary: Delete a job and all its associated resources (stages, tasks) + summary: 'Delete a job and all its associated resources (stages, tasks)' description: > Permanently removes a job and all its associated stages and tasks from the system. @@ -438,7 +438,7 @@ paths: application/json: schema: $ref: '#/components/schemas/internalErrorsResponse' - /v1/jobs/{jobId}/user-metadata: + '/v1/jobs/{jobId}/user-metadata': patch: operationId: updateUserMetadataV1 parameters: @@ -492,7 +492,7 @@ paths: application/json: schema: $ref: '#/components/schemas/internalErrorsResponse' - /v1/jobs/{jobId}/priority: + '/v1/jobs/{jobId}/priority': parameters: - $ref: '#/components/parameters/jobId' patch: @@ -560,7 +560,7 @@ paths: application/json: schema: $ref: '#/components/schemas/internalErrorsResponse' - /v1/jobs/{jobId}/status: + '/v1/jobs/{jobId}/status': parameters: - $ref: '#/components/parameters/jobId' put: @@ -645,7 +645,7 @@ paths: application/json: schema: $ref: '#/components/schemas/internalErrorsResponse' - /v1/jobs/{jobId}/stages: + '/v1/jobs/{jobId}/stages': parameters: - $ref: '#/components/parameters/jobId' get: @@ -704,7 +704,7 @@ paths: application/json: schema: $ref: '#/components/schemas/internalErrorsResponse' - /v1/jobs/{jobId}/stage: + '/v1/jobs/{jobId}/stage': post: operationId: addStageV1 summary: Add a new stage as the last stage in the job workflow @@ -781,7 +781,7 @@ paths: validation_type: schema_check strict_mode: true traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 - tracestate: vendor=trace123,service=validation + tracestate: 'vendor=trace123,service=validation' responses: '201': description: Stage successfully created and added to the job @@ -819,7 +819,7 @@ paths: retried: 0 total: 0 traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-33c489dd6db902b7-01 - tracestate: processing=batch001,stage=data_proc + tracestate: 'processing=batch001,stage=data_proc' waiting_stage_response: summary: Response for manual approval stage creation value: @@ -924,7 +924,7 @@ paths: application/json: schema: $ref: '#/components/schemas/internalErrorsResponse' - /v1/stages/{stageId}: + '/v1/stages/{stageId}': parameters: - $ref: '#/components/parameters/stageId' get: @@ -971,7 +971,7 @@ paths: application/json: schema: $ref: '#/components/schemas/internalErrorsResponse' - /v1/stages/{stageId}/summary: + '/v1/stages/{stageId}/summary': get: operationId: getStageSummaryV1 parameters: @@ -1017,7 +1017,7 @@ paths: application/json: schema: $ref: '#/components/schemas/internalErrorsResponse' - /v1/stages/{stageId}/user-metadata: + '/v1/stages/{stageId}/user-metadata': patch: operationId: updateStageUserMetadataV1 parameters: @@ -1071,7 +1071,7 @@ paths: application/json: schema: $ref: '#/components/schemas/internalErrorsResponse' - /v1/stages/{stageId}/status: + '/v1/stages/{stageId}/status': put: operationId: updateStageStatusV1 parameters: @@ -1179,7 +1179,7 @@ paths: - UNKNOWN_ERROR - ILLEGAL_JOB_STATUS_TRANSITION - JOB_NOT_FOUND - /v1/stages/{stageId}/tasks: + '/v1/stages/{stageId}/tasks': parameters: - $ref: '#/components/parameters/stageId' get: @@ -1282,10 +1282,10 @@ paths: summary: API integration tasks with different configurations value: - data: - endpoint: https://api.external.com/customers + endpoint: 'https://api.external.com/customers' method: GET headers: - Authorization: Bearer ${API_TOKEN} + Authorization: 'Bearer ${API_TOKEN}' pagination: page_size: 100 max_pages: 50 @@ -1293,10 +1293,10 @@ paths: source: customer_system priority: high - data: - endpoint: https://api.external.com/orders + endpoint: 'https://api.external.com/orders' method: GET headers: - Authorization: Bearer ${API_TOKEN} + Authorization: 'Bearer ${API_TOKEN}' query_params: start_date: '2025-01-01T00:00:00.000Z' end_date: '2025-07-27T00:00:00.000Z' @@ -1314,7 +1314,7 @@ paths: - demographic_api - credit_api traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 - tracestate: customer=enrich123,priority=high + tracestate: 'customer=enrich123,priority=high' maxAttempts: 2 - data: process_type: data_validation @@ -1351,7 +1351,7 @@ paths: creationTime: '2025-07-27T10:30:00.000Z' updateTime: '2025-07-27T10:30:00.000Z' traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-55e6abbf8fd902b7-01 - tracestate: file=001,processing=csv_parquet + tracestate: 'file=001,processing=csv_parquet' - id: 469e8a0c-9232-4144-920f-fb5cef356ff7 data: file_path: /data/input/file_002.csv @@ -1367,7 +1367,7 @@ paths: creationTime: '2025-07-27T10:30:00.000Z' updateTime: '2025-07-27T10:30:00.000Z' traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-66f7bcc090e902b7-01 - tracestate: file=002,processing=csv_parquet + tracestate: 'file=002,processing=csv_parquet' - id: 6a7ecf04-4455-41d1-aa04-8786178cb4a3 data: file_path: /data/input/file_003.csv @@ -1383,16 +1383,16 @@ paths: creationTime: '2025-07-27T10:30:00.000Z' updateTime: '2025-07-27T10:30:00.000Z' traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-77081dd1a1f902b7-01 - tracestate: file=003,processing=csv_parquet + tracestate: 'file=003,processing=csv_parquet' api_integration_tasks_response: summary: Response for API integration tasks creation value: - id: ddfad658-da33-4573-b5a3-d70a204a3e0b data: - endpoint: https://api.external.com/customers + endpoint: 'https://api.external.com/customers' method: GET headers: - Authorization: Bearer ${API_TOKEN} + Authorization: 'Bearer ${API_TOKEN}' pagination: page_size: 100 max_pages: 50 @@ -1406,13 +1406,13 @@ paths: creationTime: '2025-07-27T10:30:00.000Z' updateTime: '2025-07-27T10:30:00.000Z' traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-88192ee2b20902b7-01 - tracestate: api=customers,source=external + tracestate: 'api=customers,source=external' - id: df59d74e-f442-4e66-b8c9-b312ad948c47 data: - endpoint: https://api.external.com/orders + endpoint: 'https://api.external.com/orders' method: GET headers: - Authorization: Bearer ${API_TOKEN} + Authorization: 'Bearer ${API_TOKEN}' query_params: start_date: '2025-01-01T00:00:00.000Z' end_date: '2025-07-27T00:00:00.000Z' @@ -1426,7 +1426,7 @@ paths: creationTime: '2025-07-27T10:30:00.000Z' updateTime: '2025-07-27T10:30:00.000Z' traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-99203ff3c31902b7-01 - tracestate: api=orders,source=external + tracestate: 'api=orders,source=external' tasks_with_lifecycle_states: summary: >- Tasks showing different lifecycle states with timing @@ -1526,7 +1526,7 @@ paths: application/json: schema: $ref: '#/components/schemas/internalErrorsResponse' - /v1/stages/{stageType}/tasks/dequeue: + '/v1/stages/{stageType}/tasks/dequeue': parameters: - $ref: '#/components/parameters/stageType' patch: @@ -1572,6 +1572,21 @@ paths: application/json: schema: $ref: '#/components/schemas/taskNotFoundResponse' + '409': + description: >- + task was claimed by another worker. This occurs when multiple + workers attempt to dequeue the same task simultaneously. The client + should retry the dequeue operation to get a different task. + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/baseErrorResponse' + - type: object + properties: + code: + enum: + - TASK_STATUS_UPDATE_FAILED '500': description: Internal server error or invalid state transition content: @@ -1642,7 +1657,7 @@ paths: $ref: '#/components/schemas/internalErrorsResponse' tags: - tasks - /v1/tasks/{taskId}: + '/v1/tasks/{taskId}': parameters: - $ref: '#/components/parameters/taskId' get: @@ -1680,7 +1695,7 @@ paths: $ref: '#/components/schemas/internalErrorsResponse' tags: - tasks - /v1/tasks/{taskId}/user-metadata: + '/v1/tasks/{taskId}/user-metadata': patch: operationId: updateTaskUserMetadataV1 parameters: @@ -1734,7 +1749,7 @@ paths: application/json: schema: $ref: '#/components/schemas/internalErrorsResponse' - /v1/tasks/{taskId}/status: + '/v1/tasks/{taskId}/status': parameters: - $ref: '#/components/parameters/taskId' put: @@ -1820,6 +1835,21 @@ paths: application/json: schema: $ref: '#/components/schemas/taskNotFoundResponse' + '409': + description: >- + task status was modified by another request. This occurs when + multiple workers attempt to update the same task simultaneously. The + current state of the task has changed since it was retrieved. + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/baseErrorResponse' + - type: object + properties: + code: + enum: + - TASK_STATUS_UPDATE_FAILED '500': description: Internal server error content: @@ -1879,7 +1909,7 @@ components: type: integer minimum: 0 maximum: 100 - description: Completion percentage of a job, stage, or task (0-100) + description: 'Completion percentage of a job, stage, or task (0-100)' attempts: type: integer minimum: 0 @@ -1918,7 +1948,7 @@ components: description: | W3C traceparent for distributed tracing. Auto-injected if not provided. See [W3C Trace Context](https://www.w3.org/TR/trace-context/). - pattern: ^[\da-f]{2}-[\da-f]{32}-[\da-f]{16}-[\da-f]{2}$ + pattern: '^[\da-f]{2}-[\da-f]{32}-[\da-f]{16}-[\da-f]{2}$' example: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 tracestate: type: string @@ -1927,7 +1957,7 @@ components: available. pattern: >- ^[a-z0-9][a-z0-9_\\-\\*\\/]*=[^,=]+(?:,[a-z0-9][a-z0-9_\\-\\*\\/]*=[^,=]+)*$ - example: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE + example: 'rojo=00f067aa0ba902b7,congo=t61rcWkgMzE' successMessages: type: string enum: @@ -2315,7 +2345,7 @@ components: taskId: type: string format: uuid - description: Unique identifier for a task, generated by the system upon task creation + description: 'Unique identifier for a task, generated by the system upon task creation' taskPayload: type: object additionalProperties: true @@ -2563,7 +2593,7 @@ components: fromDate: in: query name: from_date - description: Filter results by update time, starting from this date/time + description: 'Filter results by update time, starting from this date/time' required: false schema: type: string @@ -2571,7 +2601,7 @@ components: endDate: in: query name: end_date - description: Filter results by update time, ending at this date/time + description: 'Filter results by update time, ending at this date/time' required: false schema: type: string @@ -2579,14 +2609,14 @@ components: includeStages: in: query name: should_return_stages - description: When true, includes stage data in the response + description: 'When true, includes stage data in the response' required: false schema: $ref: '#/components/schemas/returnStage' includeTasks: in: query name: should_return_tasks - description: When true, includes task data in the response + description: 'When true, includes task data in the response' required: false schema: $ref: '#/components/schemas/returnTask' @@ -2607,7 +2637,7 @@ components: paramStageType: in: query name: stage_type - description: Filter results by stage type (e.g., processing, validation) + description: 'Filter results by stage type (e.g., processing, validation)' required: false schema: $ref: '#/components/schemas/stageType' diff --git a/openapi_v1.yaml b/openapi_v1.yaml index a6cea83a..bb68de0f 100644 --- a/openapi_v1.yaml +++ b/openapi_v1.yaml @@ -2,7 +2,7 @@ openapi: 3.0.1 info: title: Job Manager Service - API v1 description: Job Manager Service API version 1 - version: 0.1.0 + version: 0.2.0 license: name: MIT url: https://opensource.org/licenses/MIT @@ -1571,6 +1571,21 @@ paths: application/json: schema: $ref: '#/components/schemas/taskNotFoundResponse' + '409': + description: >- + task was claimed by another worker. + This occurs when multiple workers attempt to dequeue the same task simultaneously. + The client should retry the dequeue operation to get a different task. + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/baseErrorResponse' + - type: object + properties: + code: + enum: + - TASK_STATUS_UPDATE_FAILED '500': description: Internal server error or invalid state transition content: @@ -1819,6 +1834,21 @@ paths: application/json: schema: $ref: '#/components/schemas/taskNotFoundResponse' + '409': + description: >- + task status was modified by another request. + This occurs when multiple workers attempt to update the same task simultaneously. + The current state of the task has changed since it was retrieved. + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/baseErrorResponse' + - type: object + properties: + code: + enum: + - TASK_STATUS_UPDATE_FAILED '500': description: Internal server error content: diff --git a/package.json b/package.json index a478b854..19d7d101 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "lint:fix": "eslint --fix .", "prebuild": "npm run clean && npm run migration:generate-types", "build": "tsc --project tsconfig.build.json && npm run prisma:copy && tsc-alias -p tsconfig.build.json && npm run assets:copy", + "build:docker": "npx prisma generate --schema ./src/db/prisma/schema.prisma || true && npm run clean && tsc --project tsconfig.build.json && npm run prisma:copy && tsc-alias -p tsconfig.build.json && npm run assets:copy", "start": "npm run build && cd dist && node --import ./instrumentation.mjs ./index.js", "start:dev": "npm run build && cd dist && cross-env CONFIG_OFFLINE_MODE=true node --enable-source-maps --import ./instrumentation.mjs ./index.js", "assets:copy": "copyfiles -f ./config/* ./dist/config && copyfiles -f ./openapi3.yaml ./dist/ && copyfiles ./package.json dist && copyfiles -f ./src/db/prisma/migrations/** ./dist/db/prisma/migrations", @@ -32,6 +33,7 @@ "migration:dev": "npx prisma migrate dev --schema ./src/db/prisma/schema.prisma", "migration:generate": "npx prisma migrate dev --schema ./src/db/prisma/schema.prisma --name ", "migration:generate-types": "npx prisma generate --schema ./src/db/prisma/schema.prisma", + "migration:generate-sql-types": "npx prisma generate --sql --schema ./src/db/prisma/schema.prisma", "migration:deploy": "npx prisma migrate deploy --schema ./src/db/prisma/schema.prisma", "openapi:build": "node scripts/generateVersionedOpenApi.mts" }, diff --git a/src/api/v1/tasks/controller.ts b/src/api/v1/tasks/controller.ts index c170808f..17e5a933 100644 --- a/src/api/v1/tasks/controller.ts +++ b/src/api/v1/tasks/controller.ts @@ -117,9 +117,11 @@ export class TaskControllerV1 { } catch (err) { if (err instanceof TaskNotFoundError) { (err as HttpError).status = httpStatus.NOT_FOUND; + } else if (err instanceof TaskStatusUpdateFailedError) { + // Race condition: resource was modified by another request + (err as HttpError).status = httpStatus.CONFLICT; } else if (badRequestErrors.some((e) => err instanceof e)) { (err as HttpError).status = httpStatus.BAD_REQUEST; - this.logger.error({ msg: `Task status update failed: invalid status transition`, status: req.body.status, err }); } return next(err); @@ -134,6 +136,9 @@ export class TaskControllerV1 { } catch (err) { if (err instanceof TaskNotFoundError) { (err as HttpError).status = httpStatus.NOT_FOUND; + } else if (err instanceof TaskStatusUpdateFailedError) { + // Race condition: another worker already dequeued this task + (err as HttpError).status = httpStatus.CONFLICT; } else if (internalErrors.some((e) => err instanceof e)) { (err as HttpError).status = httpStatus.INTERNAL_SERVER_ERROR; } diff --git a/src/common/constants.ts b/src/common/constants.ts index 0af9d8f5..b9837482 100644 --- a/src/common/constants.ts +++ b/src/common/constants.ts @@ -9,7 +9,8 @@ type SuccessMessagesObj = { export const SERVICE_NAME = readPackageJsonSync().name ?? 'unknown_service'; export const DEFAULT_SERVER_PORT = 80; -export const DB_CONNECTION_TIMEOUT = 5000; +export const DB_CONNECTION_TIMEOUT = 20000; +export const TX_TIMEOUT_MS = 30000; export const NODE_VERSION = process.versions.node; export const IGNORED_OUTGOING_TRACE_ROUTES = [/^.*\/v1\/metrics.*$/]; diff --git a/src/db/createConnection.ts b/src/db/createConnection.ts index 7f06ceb8..21cce3d0 100644 --- a/src/db/createConnection.ts +++ b/src/db/createConnection.ts @@ -3,6 +3,7 @@ import { hostname } from 'node:os'; import { commonDbFullV1Type } from '@map-colonies/schemas'; import type { PoolConfig } from 'pg'; import { PrismaPg } from '@prisma/adapter-pg'; +import { TX_TIMEOUT_MS } from '@src/common/constants'; import { PrismaClient } from '../db/prisma/generated/client'; interface SchemaExistsResult { @@ -37,7 +38,12 @@ export const createConnectionOptions = (dbConfig: DbConfig): PoolConfig => { // eslint-disable-next-line @typescript-eslint/explicit-function-return-type export function createPrismaClient(poolConfig: PoolConfig, schema: string) { const adapter = new PrismaPg(poolConfig, { schema }); - const prisma = new PrismaClient({ adapter }).$extends({ + const prisma = new PrismaClient({ + adapter, + transactionOptions: { + timeout: TX_TIMEOUT_MS, + }, + }).$extends({ query: { // eslint-disable-next-line @typescript-eslint/promise-function-async $allOperations({ args, query }) { diff --git a/src/db/prisma/generated/client/sql/$DbEnums.d.ts b/src/db/prisma/generated/client/sql/$DbEnums.d.ts new file mode 100644 index 00000000..430192aa --- /dev/null +++ b/src/db/prisma/generated/client/sql/$DbEnums.d.ts @@ -0,0 +1,8 @@ +export interface $DbEnums {} + +export namespace $DbEnums { + type job_operation_status_enum = 'Pending' | 'In-Progress' | 'Completed' | 'Failed' | 'Aborted' | 'Created' | 'Paused'; + type stage_operation_status_enum = 'Pending' | 'In-Progress' | 'Completed' | 'Failed' | 'Aborted' | 'Waiting' | 'Created'; + type priority_enum = 'Very-High' | 'High' | 'Medium' | 'Low' | 'Very-Low'; + type task_operation_status_enum = 'Pending' | 'In-Progress' | 'Completed' | 'Failed' | 'Created' | 'Retried'; +} diff --git a/src/db/prisma/generated/client/sql/findAndLockTask.d.ts b/src/db/prisma/generated/client/sql/findAndLockTask.d.ts new file mode 100644 index 00000000..d5ef57f3 --- /dev/null +++ b/src/db/prisma/generated/client/sql/findAndLockTask.d.ts @@ -0,0 +1,27 @@ +import * as $runtime from '../runtime/library'; +import { $DbEnums } from './$DbEnums'; + +/** + * @param text + */ +export const findAndLockTask: (text: string) => $runtime.TypedSql; + +export namespace findAndLockTask { + export type Parameters = [text: string]; + export type Result = { + id: string; + data: $runtime.JsonValue; + status: $DbEnums.task_operation_status_enum; + xstate: $runtime.JsonValue; + creation_time: Date; + update_time: Date; + user_metadata: $runtime.JsonValue; + stage_id: string; + attempts: number; + max_attempts: number; + traceparent: string; + tracestate: string | null; + end_time: Date | null; + start_time: Date | null; + }; +} diff --git a/src/db/prisma/generated/client/sql/findAndLockTask.edge.js b/src/db/prisma/generated/client/sql/findAndLockTask.edge.js new file mode 100644 index 00000000..031e1cc3 --- /dev/null +++ b/src/db/prisma/generated/client/sql/findAndLockTask.edge.js @@ -0,0 +1,8 @@ +/* !!! This is code generated by Prisma. Do not edit directly. !!! +/* eslint-disable */ +// biome-ignore-all lint: generated file +'use strict'; +const { makeTypedQueryFactory: $mkFactory } = require('../runtime/edge.js'); +exports.findAndLockTask = /*#__PURE__*/ $mkFactory( + 'SELECT t.*\nFROM "job_manager"."task" t\nINNER JOIN "job_manager"."stage" s ON t."stage_id" = s.id\nINNER JOIN "job_manager"."job" j ON s."job_id" = j.id\nWHERE s.type = $1\nAND t.status IN (\'Pending\', \'Retried\')\nAND s.status IN (\'Pending\', \'In-Progress\')\nAND j.status IN (\'Pending\', \'In-Progress\')\nORDER BY j.priority ASC\nLIMIT 1\nFOR UPDATE OF t SKIP LOCKED;' +); diff --git a/src/db/prisma/generated/client/sql/findAndLockTask.edge.mjs b/src/db/prisma/generated/client/sql/findAndLockTask.edge.mjs new file mode 100644 index 00000000..a9a44bdc --- /dev/null +++ b/src/db/prisma/generated/client/sql/findAndLockTask.edge.mjs @@ -0,0 +1,7 @@ +/* !!! This is code generated by Prisma. Do not edit directly. !!! +/* eslint-disable */ +// biome-ignore-all lint: generated file +import { makeTypedQueryFactory as $mkFactory } from '../runtime/edge.js'; +export const findAndLockTask = /*#__PURE__*/ $mkFactory( + 'SELECT t.*\nFROM "job_manager"."task" t\nINNER JOIN "job_manager"."stage" s ON t."stage_id" = s.id\nINNER JOIN "job_manager"."job" j ON s."job_id" = j.id\nWHERE s.type = $1\nAND t.status IN (\'Pending\', \'Retried\')\nAND s.status IN (\'Pending\', \'In-Progress\')\nAND j.status IN (\'Pending\', \'In-Progress\')\nORDER BY j.priority ASC\nLIMIT 1\nFOR UPDATE OF t SKIP LOCKED;' +); diff --git a/src/db/prisma/generated/client/sql/findAndLockTask.js b/src/db/prisma/generated/client/sql/findAndLockTask.js new file mode 100644 index 00000000..5ae9e664 --- /dev/null +++ b/src/db/prisma/generated/client/sql/findAndLockTask.js @@ -0,0 +1,8 @@ +/* !!! This is code generated by Prisma. Do not edit directly. !!! +/* eslint-disable */ +// biome-ignore-all lint: generated file +'use strict'; +const { makeTypedQueryFactory: $mkFactory } = require('../runtime/library'); +exports.findAndLockTask = /*#__PURE__*/ $mkFactory( + 'SELECT t.*\nFROM "job_manager"."task" t\nINNER JOIN "job_manager"."stage" s ON t."stage_id" = s.id\nINNER JOIN "job_manager"."job" j ON s."job_id" = j.id\nWHERE s.type = $1\nAND t.status IN (\'Pending\', \'Retried\')\nAND s.status IN (\'Pending\', \'In-Progress\')\nAND j.status IN (\'Pending\', \'In-Progress\')\nORDER BY j.priority ASC\nLIMIT 1\nFOR UPDATE OF t SKIP LOCKED;' +); diff --git a/src/db/prisma/generated/client/sql/findAndLockTask.mjs b/src/db/prisma/generated/client/sql/findAndLockTask.mjs new file mode 100644 index 00000000..089ab263 --- /dev/null +++ b/src/db/prisma/generated/client/sql/findAndLockTask.mjs @@ -0,0 +1,7 @@ +/* !!! This is code generated by Prisma. Do not edit directly. !!! +/* eslint-disable */ +// biome-ignore-all lint: generated file +import { makeTypedQueryFactory as $mkFactory } from '../runtime/library'; +export const findAndLockTask = /*#__PURE__*/ $mkFactory( + 'SELECT t.*\nFROM "job_manager"."task" t\nINNER JOIN "job_manager"."stage" s ON t."stage_id" = s.id\nINNER JOIN "job_manager"."job" j ON s."job_id" = j.id\nWHERE s.type = $1\nAND t.status IN (\'Pending\', \'Retried\')\nAND s.status IN (\'Pending\', \'In-Progress\')\nAND j.status IN (\'Pending\', \'In-Progress\')\nORDER BY j.priority ASC\nLIMIT 1\nFOR UPDATE OF t SKIP LOCKED;' +); diff --git a/src/db/prisma/generated/client/sql/index.d.ts b/src/db/prisma/generated/client/sql/index.d.ts new file mode 100644 index 00000000..a0e26f25 --- /dev/null +++ b/src/db/prisma/generated/client/sql/index.d.ts @@ -0,0 +1,3 @@ +export { $DbEnums } from './$DbEnums'; + +export * from './findAndLockTask'; diff --git a/src/db/prisma/generated/client/sql/index.edge.js b/src/db/prisma/generated/client/sql/index.edge.js new file mode 100644 index 00000000..12fd3c02 --- /dev/null +++ b/src/db/prisma/generated/client/sql/index.edge.js @@ -0,0 +1,5 @@ +/* !!! This is code generated by Prisma. Do not edit directly. !!! +/* eslint-disable */ +// biome-ignore-all lint: generated file +'use strict'; +exports.findAndLockTask = require('./findAndLockTask.edge.js').findAndLockTask; diff --git a/src/db/prisma/generated/client/sql/index.edge.mjs b/src/db/prisma/generated/client/sql/index.edge.mjs new file mode 100644 index 00000000..61e236d0 --- /dev/null +++ b/src/db/prisma/generated/client/sql/index.edge.mjs @@ -0,0 +1,4 @@ +/* !!! This is code generated by Prisma. Do not edit directly. !!! +/* eslint-disable */ +// biome-ignore-all lint: generated file +export * from './findAndLockTask.edge.mjs'; diff --git a/src/db/prisma/generated/client/sql/index.js b/src/db/prisma/generated/client/sql/index.js new file mode 100644 index 00000000..4e695b7e --- /dev/null +++ b/src/db/prisma/generated/client/sql/index.js @@ -0,0 +1,5 @@ +/* !!! This is code generated by Prisma. Do not edit directly. !!! +/* eslint-disable */ +// biome-ignore-all lint: generated file +'use strict'; +exports.findAndLockTask = require('./findAndLockTask.js').findAndLockTask; diff --git a/src/db/prisma/generated/client/sql/index.mjs b/src/db/prisma/generated/client/sql/index.mjs new file mode 100644 index 00000000..926105e7 --- /dev/null +++ b/src/db/prisma/generated/client/sql/index.mjs @@ -0,0 +1,4 @@ +/* !!! This is code generated by Prisma. Do not edit directly. !!! +/* eslint-disable */ +// biome-ignore-all lint: generated file +export * from './findAndLockTask.mjs'; diff --git a/src/db/prisma/schema.prisma b/src/db/prisma/schema.prisma index 981f7fbc..4c8ba9f6 100644 --- a/src/db/prisma/schema.prisma +++ b/src/db/prisma/schema.prisma @@ -1,7 +1,8 @@ generator client { - provider = "prisma-client-js" - output = "./generated/client" - binaryTargets = ["native", "linux-musl-openssl-3.0.x"] + provider = "prisma-client-js" + output = "./generated/client" + binaryTargets = ["native", "linux-musl-openssl-3.0.x"] + previewFeatures = ["typedSql"] } generator json { diff --git a/src/db/prisma/sql/findAndLockTask.sql b/src/db/prisma/sql/findAndLockTask.sql new file mode 100644 index 00000000..39f0f0e6 --- /dev/null +++ b/src/db/prisma/sql/findAndLockTask.sql @@ -0,0 +1,11 @@ +SELECT t.* +FROM "job_manager"."task" t +INNER JOIN "job_manager"."stage" s ON t."stage_id" = s.id +INNER JOIN "job_manager"."job" j ON s."job_id" = j.id +WHERE s.type = $1 + AND t.status IN ('Pending', 'Retried') + AND s.status IN ('Pending', 'In-Progress') + AND j.status IN ('Pending', 'In-Progress') +ORDER BY j.priority ASC +LIMIT 1 +FOR UPDATE OF t SKIP LOCKED; diff --git a/src/openapi.d.ts b/src/openapi.d.ts index afe23ccc..d881c319 100644 --- a/src/openapi.d.ts +++ b/src/openapi.d.ts @@ -1839,6 +1839,18 @@ export interface operations { 'application/json': components['schemas']['taskNotFoundResponse']; }; }; + /** @description Race condition detected: task was claimed by another worker. This occurs when multiple workers attempt to dequeue the same task simultaneously. The client should retry the dequeue operation to get a different task. */ + 409: { + headers: { + [name: string]: unknown; + }; + content: { + 'application/json': components['schemas']['baseErrorResponse'] & { + /** @enum {unknown} */ + code?: 'TASK_STATUS_UPDATE_FAILED'; + }; + }; + }; /** @description Internal server error or invalid state transition */ 500: { headers: { @@ -2065,6 +2077,18 @@ export interface operations { 'application/json': components['schemas']['taskNotFoundResponse']; }; }; + /** @description Race condition detected: task status was modified by another request. This occurs when multiple workers attempt to update the same task simultaneously. The current state of the task has changed since it was retrieved. */ + 409: { + headers: { + [name: string]: unknown; + }; + content: { + 'application/json': components['schemas']['baseErrorResponse'] & { + /** @enum {unknown} */ + code?: 'TASK_STATUS_UPDATE_FAILED'; + }; + }; + }; /** @description Internal server error */ 500: { headers: { diff --git a/src/stages/models/manager.ts b/src/stages/models/manager.ts index 7a2a3aac..8defa689 100644 --- a/src/stages/models/manager.ts +++ b/src/stages/models/manager.ts @@ -7,7 +7,7 @@ import { INFRA_CONVENTIONS } from '@map-colonies/semantic-conventions'; import type { PrismaClient } from '@prismaClient'; import { JobOperationStatus, Prisma, StageOperationStatus } from '@prismaClient'; import { JobManager } from '@src/jobs/models/manager'; -import { SERVICES, XSTATE_DONE_STATE } from '@common/constants'; +import { SERVICES, TX_TIMEOUT_MS, XSTATE_DONE_STATE } from '@common/constants'; import { resolveTraceContext } from '@src/common/utils/tracingHelpers'; import { jobStateMachine } from '@src/jobs/models/jobStateMachine'; import { illegalStatusTransitionErrorMessage, prismaKnownErrors } from '@src/common/errors'; @@ -242,9 +242,12 @@ export class StageManager { }); if (!tx) { - return this.prisma.$transaction(async (newTx) => { - await this.executeUpdateStatus(stageId, status, newTx); - }); + return this.prisma.$transaction( + async (newTx) => { + await this.executeUpdateStatus(stageId, status, newTx); + }, + { timeout: TX_TIMEOUT_MS } + ); } await this.executeUpdateStatus(stageId, status, tx); @@ -304,6 +307,8 @@ export class StageManager { // update stage status if it was initialized by first task // and the stage is not already in progress + // Race condition protection: Only transition if stage is PENDING + // Multiple concurrent tasks may trigger this check simultaneously if (updatedSummary.inProgress > 0 && stage.status === StageOperationStatus.PENDING) { await this.updateStatus(stageId, StageOperationStatus.IN_PROGRESS, tx); trace.getActiveSpan()?.addEvent('Stage set to IN_PROGRESS because first task started', { stageId }); @@ -311,17 +316,31 @@ export class StageManager { } @withSpanAsyncV4 - private async executeUpdateStatus(stageId: string, status: StageOperationStatus, tx: PrismaTransaction): Promise { + private async executeUpdateStatus(stageId: string, targetStatus: StageOperationStatus, tx: PrismaTransaction): Promise { const stage = await this.getStageEntityById(stageId, { includeJob: true, tx }); if (!stage) { throw new StageNotFoundError(stagesErrorMessages.stageNotFound); } + + // Idempotent status update: if already in target status, no-op + // This prevents errors during race conditions where multiple workers + // try to set the same status (e.g., multiple tasks setting stage to IN_PROGRESS) + /* v8 ignore next 4 -- @preserve */ + if (stage.status === targetStatus) { + this.logger.debug({ + msg: 'Stage already in target status, skipping transition', + stageId, + targetStatus, + }); + return; + } + //#region validate status transition rules const previousStageOrder = stage.order - 1; // can't move to PENDING if previous stage is not COMPLETED - if (status === StageOperationStatus.PENDING && previousStageOrder > 0) { + if (targetStatus === StageOperationStatus.PENDING && previousStageOrder > 0) { const previousStage = await tx.stage.findFirst({ where: { jobId: stage.jobId, @@ -334,12 +353,12 @@ export class StageManager { } } - const nextStatusChange = OperationStatusMapper[status]; + const nextStatusChange = OperationStatusMapper[targetStatus]; const updateActor = createActor(stageStateMachine, { snapshot: stage.xstate }).start(); const isValidStatus = updateActor.getSnapshot().can({ type: nextStatusChange }); if (!isValidStatus) { - throw new IllegalStageStatusTransitionError(illegalStatusTransitionErrorMessage(stage.status, status)); + throw new IllegalStageStatusTransitionError(illegalStatusTransitionErrorMessage(stage.status, targetStatus)); } //#endregion updateActor.send({ type: nextStatusChange }); @@ -350,7 +369,7 @@ export class StageManager { id: stageId, }, data: { - status, + status: targetStatus, xstate: newPersistedSnapshot, }, }; @@ -360,7 +379,7 @@ export class StageManager { //#region update related entities // Update job completion when a stage is completed // If the stage is marked as completed, and there is a next stage in the job, update the next stage status to PENDING - if (status === StageOperationStatus.COMPLETED) { + if (targetStatus === StageOperationStatus.COMPLETED) { const nextStageOrder = stage.order + 1; const nextStage = await tx.stage.findFirst({ where: { @@ -386,11 +405,11 @@ export class StageManager { } } - if (status === StageOperationStatus.IN_PROGRESS && stage.job.status === JobOperationStatus.PENDING) { + if (targetStatus === StageOperationStatus.IN_PROGRESS && stage.job.status === JobOperationStatus.PENDING) { // Update job status to IN_PROGRESS await this.jobManager.updateStatus(stage.job.id, JobOperationStatus.IN_PROGRESS, tx); trace.getActiveSpan()?.addEvent('Job status set to IN_PROGRESS because first stage is being processed', { jobId: stage.jobId }); - } else if (status === StageOperationStatus.FAILED) { + } else if (targetStatus === StageOperationStatus.FAILED) { // Update job status to FAILED await this.jobManager.updateStatus(stage.jobId, JobOperationStatus.FAILED, tx); trace.getActiveSpan()?.addEvent('Job set to FAILED because its stage failed', { jobId: stage.jobId }); diff --git a/src/tasks/DAL/taskRepository.ts b/src/tasks/DAL/taskRepository.ts new file mode 100644 index 00000000..4e353a1e --- /dev/null +++ b/src/tasks/DAL/taskRepository.ts @@ -0,0 +1,35 @@ +import { inject, Lifecycle, scoped } from 'tsyringe'; +import { type Logger } from '@map-colonies/js-logger'; +import { PrismaClient } from '@prismaClient'; +import { SERVICES } from '@src/common/constants'; +import type { PrismaTransaction } from '@src/db/types'; +import { findAndLockTask } from '@src/db/prisma/generated/client/sql'; +import type { TaskPrismaObject } from '../models/models'; +import { convertRawToTaskModel } from '../models/helper'; + +@scoped(Lifecycle.ContainerScoped) +export class TaskRepository { + public constructor( + @inject(SERVICES.LOGGER) private readonly logger: Logger, + @inject(SERVICES.PRISMA) private readonly prisma: PrismaClient + ) {} + + /** + * Finds and locks the next available high-priority task for processing. + * * Uses a row-level lock with `SKIP LOCKED` to allow multiple concurrent + * workers to claim different tasks without blocking each other. + * * @param stageType - The stage category to pull tasks from. + * @param tx - The current database transaction. + * @returns The locked task or null if no eligible tasks are found. + */ + public async findAndLockTaskForDequeue(stageType: string, tx: PrismaTransaction): Promise { + const tasks = await tx.$queryRawTyped(findAndLockTask(stageType)); + + if (tasks.length === 0 || tasks[0] === undefined) { + return null; + } + + const task = convertRawToTaskModel(tasks[0]); + return task; + } +} diff --git a/src/tasks/models/helper.ts b/src/tasks/models/helper.ts index fe37913b..6c92f649 100644 --- a/src/tasks/models/helper.ts +++ b/src/tasks/models/helper.ts @@ -1,5 +1,7 @@ +import { Snapshot } from 'xstate'; import { Prisma } from '@prismaClient'; -import { TaskModel } from './models'; +import { findAndLockTask } from '@src/db/prisma/generated/client/sql'; +import { TaskModel, TaskPrismaObject } from './models'; /** * This function converts a Prisma stage object to a TaskModel API object. @@ -30,3 +32,19 @@ export function convertPrismaToTaskResponse(prismaObjects: Prisma.TaskGetPayload export function convertArrayPrismaTaskToTaskResponse(prismaObjects: Prisma.TaskGetPayload>[]): TaskModel[] { return prismaObjects.map((task) => convertPrismaToTaskResponse(task)); } + +/** + * This function converts a Prisma stage object to a TaskModel API object. + * @param prismaObjects db entity + * @returns TaskModel + */ +export function convertRawToTaskModel(raw: findAndLockTask.Result): TaskPrismaObject { + return { + ...raw, + stageId: raw.stage_id, // Handle camelCase conversion + status: raw.status.toUpperCase() as TaskPrismaObject['status'], + data: raw.data as Record, + userMetadata: (raw.user_metadata ?? {}) as Record, + xstate: raw.xstate as unknown as Snapshot, + } as unknown as TaskPrismaObject; +} diff --git a/src/tasks/models/manager.ts b/src/tasks/models/manager.ts index a4b78fa9..381ea00a 100644 --- a/src/tasks/models/manager.ts +++ b/src/tasks/models/manager.ts @@ -5,7 +5,7 @@ import { trace, type Tracer } from '@opentelemetry/api'; import { withSpanAsyncV4 } from '@map-colonies/tracing-utils'; import { subMinutes } from 'date-fns'; import { INFRA_CONVENTIONS } from '@map-colonies/semantic-conventions'; -import { JobOperationStatus, Prisma, StageOperationStatus, Task, TaskOperationStatus, type PrismaClient } from '@prismaClient'; +import { Prisma, StageOperationStatus, Task, TaskOperationStatus, type PrismaClient } from '@prismaClient'; import { SERVICES, XSTATE_DONE_STATE } from '@common/constants'; import { resolveTraceContext } from '@src/common/utils/tracingHelpers'; import { StageManager } from '@src/stages/models/manager'; @@ -24,59 +24,11 @@ import { TaskStatusUpdateFailedError, } from '@src/common/generated/errors'; import { ATTR_MESSAGING_DESTINATION_NAME, ATTR_MESSAGING_MESSAGE_ID } from '@src/common/semconv'; +import { TaskRepository } from '../DAL/taskRepository'; import type { TasksFindCriteriaArg, TaskModel, TaskPrismaObject, TaskCreateModel } from './models'; import { errorMessages as tasksErrorMessages } from './errors'; import { convertArrayPrismaTaskToTaskResponse, convertPrismaToTaskResponse } from './helper'; -// eslint-disable-next-line @typescript-eslint/explicit-function-return-type -function generatePrioritizedTaskQuery(stageType: string) { - // Define valid states for filtering - const validTaskStatuses = [TaskOperationStatus.PENDING, TaskOperationStatus.RETRIED]; - const validStageStatuses = [StageOperationStatus.PENDING, StageOperationStatus.IN_PROGRESS]; - const validJobStatuses = [JobOperationStatus.PENDING, JobOperationStatus.IN_PROGRESS]; - - const queryBody = { - where: { - stage: { - type: stageType, - status: { - in: validStageStatuses, - }, - job: { - status: { - in: validJobStatuses, - }, - }, - }, - status: { - in: validTaskStatuses, - }, - }, - include: { - stage: { - include: { - job: { - select: { - priority: true, - id: true, - status: true, - }, - }, - }, - }, - }, - orderBy: { - stage: { - job: { - priority: Prisma.SortOrder.asc, - }, - }, - }, - } satisfies Prisma.TaskFindFirstArgs; - - return queryBody; -} - @injectable() export class TaskManager { public constructor( @@ -84,7 +36,8 @@ export class TaskManager { @inject(SERVICES.PRISMA) private readonly prisma: PrismaClient, @inject(SERVICES.TRACER) public readonly tracer: Tracer, @inject(StageManager) private readonly stageManager: StageManager, - @inject(SERVICES.CONFIG) private readonly config: ConfigType + @inject(SERVICES.CONFIG) private readonly config: ConfigType, + @inject(TaskRepository) private readonly taskRepository: TaskRepository ) {} @withSpanAsyncV4 @@ -253,7 +206,7 @@ export class TaskManager { [INFRA_CONVENTIONS.infra.jobnik.stage.status]: status, }); - /* v8 ignore next 6 -- @preserve */ + /* v8 ignore next 8 -- @preserve */ if (!tx) { return this.prisma.$transaction(async (newTx) => { return this.executeUpdateStatus(taskId, status, newTx); @@ -276,7 +229,7 @@ export class TaskManager { [ATTR_MESSAGING_DESTINATION_NAME]: stageType, }); - /* v8 ignore next 5 -- @preserve */ + /* v8 ignore next 7 -- @preserve */ if (tx === undefined) { return this.prisma.$transaction(async (newTx) => { return this.executeDequeue(stageType, newTx); @@ -365,6 +318,8 @@ export class TaskManager { /** * Executes the dequeue operation within a transaction. + * Uses SELECT FOR UPDATE to lock the task row, preventing race conditions + * when multiple workers try to dequeue simultaneously. * @param stageType - The type of stage to dequeue a task from * @param tx - The transaction object * @returns The dequeued task @@ -373,11 +328,9 @@ export class TaskManager { private async executeDequeue(stageType: string, tx: PrismaTransaction): Promise { const spanActive = trace.getActiveSpan(); - const queryBody = generatePrioritizedTaskQuery(stageType); + const task = await this.taskRepository.findAndLockTaskForDequeue(stageType, tx); - const task = await tx.task.findFirst(queryBody); - - if (task === null) { + if (!task) { throw new TaskNotFoundError(tasksErrorMessages.taskNotFound); } @@ -426,7 +379,7 @@ export class TaskManager { [INFRA_CONVENTIONS.infra.jobnik.stage.id]: task.stageId, }); - /* v8 ignore next 5 -- @preserve */ + /* v8 ignore next 7 -- @preserve */ if (!tx) { return this.prisma.$transaction(async (newTx) => { return this.executeUpdateAndValidateStatus(task, status, newTx); @@ -453,6 +406,15 @@ export class TaskManager { const previousStatus = task.status; const { nextStatus, taskDataToUpdate } = this.determineNextStatus(task, status); + this.logger.debug({ + msg: 'Attempting task status update', + taskId: task.id, + stageId: task.stageId, + currentStatus: previousStatus, + requestedStatus: status, + nextStatus, + }); + const newPersistedSnapshot = updateTaskMachineState(nextStatus, task.xstate); const startTime: Date | undefined = nextStatus === TaskOperationStatus.IN_PROGRESS ? new Date() : undefined; @@ -461,12 +423,21 @@ export class TaskManager { // Create update query with race condition protection for IN_PROGRESS const updateQueryBody = { - where: this.createUpdateWhereClause(task.id, nextStatus, previousStatus), + where: this.createUpdateWhereClause(task.id, previousStatus), data: { ...taskDataToUpdate, status: nextStatus, xstate: newPersistedSnapshot, startTime, endTime }, }; const updatedTasks = await tx.task.updateManyAndReturn(updateQueryBody); if (updatedTasks[0] === undefined) { + // Race condition detected: another process already modified this task + this.logger.warn({ + msg: 'Task status update failed - race condition detected', + taskId: task.id, + stageId: task.stageId, + attemptedTransition: `${previousStatus} -> ${nextStatus}`, + expectedStatus: previousStatus, + reason: 'Task status was changed by another worker before this update could complete', + }); throw new TaskStatusUpdateFailedError(tasksErrorMessages.taskStatusUpdateFailed); } @@ -528,28 +499,18 @@ export class TaskManager { } /** - * Creates the where clause for task updates, with race condition protection. - * @param taskId - The ID of the task to update - * @param nextStatus - The target status - * @param previousStatus - The current status - * @returns The where clause object for the update query + * Generates the query filter for task updates using optimistic locking. + * * By including `previousStatus` in the WHERE clause, we ensure that state + * transitions (e.g., PENDING → IN_PROGRESS) only occur if no other worker + * has modified the task in the interim. + * + * @param taskId - The ID of the task to update. + * @param nextStatus - The target status. + * @param previousStatus - The expected current status to prevent race conditions. + * @returns The filter object for the update query. */ - private createUpdateWhereClause( - taskId: string, - nextStatus: TaskOperationStatus, - previousStatus: TaskOperationStatus - ): { - id: string; - status?: TaskOperationStatus; - } { - const whereClause = { id: taskId }; - - // Add status check to prevent race conditions when setting to IN_PROGRESS - if (nextStatus === TaskOperationStatus.IN_PROGRESS) { - return { ...whereClause, status: previousStatus }; - } - - return whereClause; + private createUpdateWhereClause(taskId: string, previousStatus: TaskOperationStatus): { id: string; status: TaskOperationStatus } { + return { id: taskId, status: previousStatus }; } private async updateStageSummary( diff --git a/tests/integration/tasks/tasks.spec.ts b/tests/integration/tasks/tasks.spec.ts index 9e884ac4..388b8724 100644 --- a/tests/integration/tasks/tasks.spec.ts +++ b/tests/integration/tasks/tasks.spec.ts @@ -1455,9 +1455,7 @@ describe('task', function () { const transactionSpy = createProxyMock(prisma, '$transaction'); transactionSpy.mockImplementationOnce(async (callback: (tx: PrismaTransaction) => Promise): Promise => { const mockTx = { - task: { - findFirst: vi.fn().mockRejectedValueOnce(error), - }, + $queryRawTyped: vi.fn().mockRejectedValueOnce(error), } as unknown as PrismaTransaction; await callback(mockTx); @@ -1480,9 +1478,7 @@ describe('task', function () { const transactionSpy = createProxyMock(prisma, '$transaction'); transactionSpy.mockImplementationOnce(async (callback: (tx: PrismaTransaction) => Promise): Promise => { const mockTx = { - task: { - findFirst: vi.fn().mockRejectedValueOnce(error), - }, + $queryRawTyped: vi.fn().mockRejectedValueOnce(error), } as unknown as PrismaTransaction; await callback(mockTx); @@ -1536,7 +1532,7 @@ describe('task', function () { expect(getJobResponse.body).toHaveProperty('status', JobOperationStatus.PENDING); }); - it('should return 500 and prevent multiple dequeue of the same task', async function () { + it('should prevent multiple dequeue of the same task using database-level locking', async function () { expect.assertions(4); const initialSummary = { ...defaultStatusCounts, pending: 1, total: 1 }; @@ -1555,52 +1551,162 @@ describe('task', function () { const stageId = stage.id; const taskId = tasks[0]!.id; - let continueUpdateFirstTask: (value?: unknown) => void; - let continueUpdateSecondTask: (value?: unknown) => void; - const updateTaskHolderFirst = new Promise((resolve) => { - continueUpdateFirstTask = resolve; + // With FOR UPDATE SKIP LOCKED, concurrent dequeues are handled at database level + // The first transaction locks the row, second transaction skips it and finds no tasks + const dequeueFirstPromise = requestSender.dequeueTaskV1({ + pathParams: { stageType: 'SOME_TEST_TYPE_PREVENT_MULTIPLE_DEQUEUE' }, }); - const updateTaskHolderSecond = new Promise((resolve) => { - continueUpdateSecondTask = resolve; + const dequeueSecondPromise = requestSender.dequeueTaskV1({ + pathParams: { stageType: 'SOME_TEST_TYPE_PREVENT_MULTIPLE_DEQUEUE' }, }); - const original = prisma.task.findFirst.bind(prisma.task); - const spy = createProxyMock(prisma.task, 'findFirst'); - spy.mockImplementationOnce(async (...args: Parameters) => { - const res = await original(...args); - await updateTaskHolderFirst; // prevent updating the task until the second dequeue is called + + const [firstResponse, secondResponse] = await Promise.all([dequeueFirstPromise, dequeueSecondPromise]); + + // First call will success and pull task + expect(firstResponse).toSatisfyApiSpec(); + expect(firstResponse).toMatchObject({ + status: StatusCodes.OK, + body: { + id: taskId, + status: TaskOperationStatus.IN_PROGRESS, + stageId: stageId, + }, + }); + + // Second call will fail with 404 status code because task was locked by first transaction + expect(secondResponse).toSatisfyApiSpec(); + expect(secondResponse).toMatchObject({ + status: StatusCodes.NOT_FOUND, + body: { + message: tasksErrorMessages.taskNotFound, + code: 'TASK_NOT_FOUND', + }, + }); + }); + + it('should return 409 CONFLICT when dequeue encounters race condition during task update', async function () { + const initialSummary = { ...defaultStatusCounts, pending: 1, total: 1 }; + + await createJobnikTree( + prisma, + { status: JobOperationStatus.IN_PROGRESS, xstate: inProgressStageXstatePersistentSnapshot, traceparent: DEFAULT_TRACEPARENT }, + { + status: StageOperationStatus.IN_PROGRESS, + xstate: inProgressStageXstatePersistentSnapshot, + summary: initialSummary, + type: 'SOME_TEST_TYPE_DEQUEUE_RACE_CONFLICT', + }, + [{ status: TaskOperationStatus.PENDING, xstate: pendingStageXstatePersistentSnapshot }] + ); + + // Mock updateManyAndReturn to simulate race condition where task was modified between lock and update + const transactionSpy = createProxyMock(prisma, '$transaction'); + transactionSpy.mockImplementationOnce(async (callback: (tx: PrismaTransaction) => Promise): Promise => { + const mockTx = { + ...prisma, + $queryRawTyped: prisma.$queryRawTyped.bind(prisma), + task: { + ...prisma.task, + findUnique: prisma.task.findUnique.bind(prisma.task), + updateManyAndReturn: vi.fn().mockResolvedValue([]), // Simulate race condition - no rows updated + }, + } as unknown as PrismaTransaction; + + return callback(mockTx); + }); + + const response = await requestSender.dequeueTaskV1({ + pathParams: { stageType: 'SOME_TEST_TYPE_DEQUEUE_RACE_CONFLICT' }, + }); + + expect(response).toSatisfyApiSpec(); + expect(response).toMatchObject({ + status: StatusCodes.CONFLICT, + body: { + message: tasksErrorMessages.taskStatusUpdateFailed, + code: 'TASK_STATUS_UPDATE_FAILED', + }, + }); + }); + + it('should handle multiple concurrent updateStatus operations with race condition protection', async function () { + expect.assertions(4); + const initialSummary = { ...defaultStatusCounts, inProgress: 1, total: 1 }; + + const { tasks } = await createJobnikTree( + prisma, + { status: JobOperationStatus.IN_PROGRESS, xstate: inProgressStageXstatePersistentSnapshot, traceparent: DEFAULT_TRACEPARENT }, + { + status: StageOperationStatus.IN_PROGRESS, + xstate: inProgressStageXstatePersistentSnapshot, + summary: initialSummary, + type: 'SOME_TEST_TYPE_MULTIPLE_UPDATE_RACE', + }, + [{ status: TaskOperationStatus.IN_PROGRESS, xstate: inProgressStageXstatePersistentSnapshot }] + ); + + const taskId = tasks[0]!.id; + let continueFirstUpdate: (value?: unknown) => void; + let continueSecondUpdate: (value?: unknown) => void; + const firstUpdateHolder = new Promise((resolve) => { + continueFirstUpdate = resolve; + }); + const secondUpdateHolder = new Promise((resolve) => { + continueSecondUpdate = resolve; + }); + + // Mock task.findUnique for both update calls + const originalFindUnique = prisma.task.findUnique.bind(prisma.task); + const findUniqueSpy = createProxyMock(prisma.task, 'findUnique'); + + // First update call - pause before updating + findUniqueSpy.mockImplementationOnce(async (...args: Parameters) => { + const res = await originalFindUnique(...args); + await firstUpdateHolder; // Pause first update return res; }); - spy.mockImplementationOnce(async (...args: Parameters) => { - const res = await original(...args); - continueUpdateFirstTask(); // release the first dequeue update process - await updateTaskHolderSecond; // prevent updating the task until first dequeue release it (after his updating) + // Second update call - pause before updating + findUniqueSpy.mockImplementationOnce(async (...args: Parameters) => { + const res = await originalFindUnique(...args); + continueFirstUpdate(); // Allow first update to proceed + await secondUpdateHolder; // Pause second update return res; }); - const dequeueFirstPromise = requestSender.dequeueTaskV1({ - pathParams: { stageType: 'SOME_TEST_TYPE_PREVENT_MULTIPLE_DEQUEUE' }, + + // Start both update operations concurrently (simulating 2 workers completing the same task) + const firstUpdatePromise = requestSender.updateTaskStatusV1({ + pathParams: { taskId }, + requestBody: { status: TaskOperationStatus.COMPLETED }, }); - const dequeueSecondPromise = requestSender.dequeueTaskV1({ - pathParams: { stageType: 'SOME_TEST_TYPE_PREVENT_MULTIPLE_DEQUEUE' }, + const secondUpdatePromise = requestSender.updateTaskStatusV1({ + pathParams: { taskId }, + requestBody: { status: TaskOperationStatus.COMPLETED }, }); - const firstResponse = await dequeueFirstPromise; + + // Wait for first update to complete + const firstResponse = await firstUpdatePromise; + + // Allow second update to proceed // @ts-expect-error not recognized initialization - continueUpdateSecondTask(); //release to update second call - const secondResponse = await dequeueSecondPromise; - // first call will success and pull task + continueSecondUpdate(); + const secondResponse = await secondUpdatePromise; + + // First update should succeed - task transitioned from IN_PROGRESS to COMPLETED expect(firstResponse).toSatisfyApiSpec(); expect(firstResponse).toMatchObject({ status: StatusCodes.OK, body: { id: taskId, - status: TaskOperationStatus.IN_PROGRESS, - stageId: stageId, + status: TaskOperationStatus.COMPLETED, }, }); - //second call will fail with 500 status code due to race condition protection + + // Second update should fail because task is no longer IN_PROGRESS + // The optimistic locking prevents duplicate completion expect(secondResponse).toSatisfyApiSpec(); expect(secondResponse).toMatchObject({ - status: StatusCodes.INTERNAL_SERVER_ERROR, + status: StatusCodes.CONFLICT, body: { message: tasksErrorMessages.taskStatusUpdateFailed, code: 'TASK_STATUS_UPDATE_FAILED', diff --git a/tests/unit/generator.ts b/tests/unit/generator.ts index cefc9b73..aa481111 100644 --- a/tests/unit/generator.ts +++ b/tests/unit/generator.ts @@ -1,6 +1,7 @@ import { faker } from '@faker-js/faker'; import { createActor } from 'xstate'; import { JobOperationStatus, Priority, Prisma, Stage, StageOperationStatus, Task, TaskOperationStatus } from '@prismaClient'; +import type { findAndLockTask } from '@src/db/prisma/generated/client/sql'; import { jobStateMachine } from '@src/jobs/models/jobStateMachine'; import { JobCreateModel } from '@src/jobs/models/models'; import { stageStateMachine } from '@src/stages/models/stageStateMachine'; @@ -83,3 +84,28 @@ export const createTaskEntity = (override: Partial): TaskPrism } satisfies TaskPrismaObject; return { ...taskEntity, ...override }; }; + +/** + * Creates raw task entity with snake_case field names for database layer testing + */ +/* eslint-disable @typescript-eslint/naming-convention */ +export const createRawTaskEntity = (override?: Partial): findAndLockTask.Result => { + const rawTaskEntity: findAndLockTask.Result = { + id: faker.string.uuid(), + stage_id: faker.string.uuid(), + status: 'Created', + attempts: 0, + max_attempts: 3, + data: {}, + user_metadata: {}, + xstate: taskInitializedPersistedSnapshot as Prisma.JsonValue, + creation_time: new Date(), + update_time: new Date(), + start_time: null, + end_time: null, + traceparent: DEFAULT_TRACEPARENT, + tracestate: null, + }; + return { ...rawTaskEntity, ...override }; +}; +/* eslint-enable @typescript-eslint/naming-convention */ diff --git a/tests/unit/jobs/jobs.spec.ts b/tests/unit/jobs/jobs.spec.ts index f7f609d1..95fff7fd 100644 --- a/tests/unit/jobs/jobs.spec.ts +++ b/tests/unit/jobs/jobs.spec.ts @@ -1,5 +1,5 @@ -import { describe, beforeEach, afterEach, it, expect, vi } from 'vitest'; -import { jsLogger } from '@map-colonies/js-logger'; +import { describe, beforeEach, afterEach, it, expect, vi, beforeAll } from 'vitest'; +import { jsLogger, Logger } from '@map-colonies/js-logger'; import { trace } from '@opentelemetry/api'; import { mockDeep, type DeepMockProxy } from 'vitest-mock-extended'; import type { PrismaClient } from '@prismaClient'; @@ -18,9 +18,15 @@ const tracer = trace.getTracer(SERVICE_NAME); const jobNotFoundError = new Prisma.PrismaClientKnownRequestError('RECORD_NOT_FOUND', { code: prismaKnownErrors.recordNotFound, clientVersion: '1' }); describe('JobManager', () => { + let logger: Logger; + + beforeAll(function () { + logger = jsLogger({ enabled: false }); + }); + beforeEach(function () { prisma = mockDeep(); - jobManager = new JobManager(jsLogger({ enabled: false }), prisma, tracer); + jobManager = new JobManager(logger, prisma, tracer); }); afterEach(function () { diff --git a/tests/unit/stages/stages.spec.ts b/tests/unit/stages/stages.spec.ts index 051d3f08..ac10923d 100644 --- a/tests/unit/stages/stages.spec.ts +++ b/tests/unit/stages/stages.spec.ts @@ -1,6 +1,6 @@ /* eslint-disable @typescript-eslint/naming-convention */ -import { describe, beforeEach, afterEach, it, expect, vi } from 'vitest'; -import { jsLogger } from '@map-colonies/js-logger'; +import { describe, beforeEach, afterEach, it, expect, vi, beforeAll } from 'vitest'; +import { jsLogger, Logger } from '@map-colonies/js-logger'; import { faker } from '@faker-js/faker'; import { trace } from '@opentelemetry/api'; import { mockDeep, type DeepMockProxy } from 'vitest-mock-extended'; @@ -38,11 +38,17 @@ type StageAggregateResult = Prisma.GetStageAggregateType { + let logger: Logger; + + beforeAll(function () { + logger = jsLogger({ enabled: false }); + }); + beforeEach(function () { prisma = mockDeep(); - jobManager = new JobManager(jsLogger({ enabled: false }), prisma, tracer); - stageRepository = new StageRepository(jsLogger({ enabled: false }), prisma); - stageManager = new StageManager(jsLogger({ enabled: false }), prisma, tracer, stageRepository, jobManager); + jobManager = new JobManager(logger, prisma, tracer); + stageRepository = new StageRepository(logger, prisma); + stageManager = new StageManager(logger, prisma, tracer, stageRepository, jobManager); }); afterEach(function () { diff --git a/tests/unit/tasks/taskRepository.spec.ts b/tests/unit/tasks/taskRepository.spec.ts new file mode 100644 index 00000000..07425405 --- /dev/null +++ b/tests/unit/tasks/taskRepository.spec.ts @@ -0,0 +1,110 @@ +/* eslint-disable @typescript-eslint/naming-convention */ +import { describe, beforeEach, it, expect, vi } from 'vitest'; +import { jsLogger } from '@map-colonies/js-logger'; +import { faker } from '@faker-js/faker'; +import { mockDeep, type DeepMockProxy } from 'vitest-mock-extended'; +import type { PrismaClient } from '@prismaClient'; +import { TaskOperationStatus } from '@prismaClient'; +import { TaskRepository } from '@src/tasks/DAL/taskRepository'; +import { createRawTaskEntity } from '../generator'; + +let taskRepository: TaskRepository; +let prisma: DeepMockProxy; + +describe('TaskRepository', () => { + beforeEach(function () { + prisma = mockDeep(); + taskRepository = new TaskRepository(jsLogger({ enabled: false }), prisma); + }); + + describe('#findAndLockTaskForDequeue', () => { + describe('#HappyPath', () => { + it('should find and lock a task for dequeue', async function () { + const stageType = 'SOME_STAGE_TYPE'; + const taskId = faker.string.uuid(); + const stageId = faker.string.uuid(); + + const rawTaskEntity = createRawTaskEntity({ + id: taskId, + stage_id: stageId, + status: 'Pending', + }); + + const mockTx = { + $queryRawTyped: vi.fn().mockResolvedValue([rawTaskEntity]), + } as unknown as Parameters[1]; + + const result = await taskRepository.findAndLockTaskForDequeue(stageType, mockTx); + expect(result).toMatchObject({ stageId: stageId, status: TaskOperationStatus.PENDING, id: taskId }); + expect(mockTx.$queryRawTyped).toHaveBeenCalledOnce(); + }); + + it('should handle null data and userMetadata fields', async function () { + const stageType = 'SOME_STAGE_TYPE'; + const taskId = faker.string.uuid(); + const stageId = faker.string.uuid(); + + const rawTaskEntity = createRawTaskEntity({ + id: taskId, + stage_id: stageId, + status: 'Pending', + data: null, + user_metadata: null, + }); + + const mockTx = { + $queryRawTyped: vi.fn().mockResolvedValue([rawTaskEntity]), + } as unknown as Parameters[1]; + + const result = await taskRepository.findAndLockTaskForDequeue(stageType, mockTx); + + expect(result).toMatchObject({ + stageId: stageId, + status: TaskOperationStatus.PENDING, + id: taskId, + data: {}, + userMetadata: {}, + }); + expect(mockTx.$queryRawTyped).toHaveBeenCalledOnce(); + }); + + it('should return null when no tasks are available', async function () { + const stageType = 'SOME_STAGE_TYPE'; + + const mockTx = { + $queryRawTyped: vi.fn().mockResolvedValue([]), + } as unknown as Parameters[1]; + + const result = await taskRepository.findAndLockTaskForDequeue(stageType, mockTx); + + expect(result).toBeNull(); + expect(mockTx.$queryRawTyped).toHaveBeenCalledOnce(); + }); + }); + + describe('#SadPath', () => { + it('should throw error when database query fails', async function () { + const stageType = 'SOME_STAGE_TYPE'; + const error = new Error('Database connection error'); + + const mockTx = { + $queryRawTyped: vi.fn().mockRejectedValue(error), + } as unknown as Parameters[1]; + + await expect(taskRepository.findAndLockTaskForDequeue(stageType, mockTx)).rejects.toThrow('Database connection error'); + }); + + it('should throw error when findUnique fails', async function () { + const stageType = 'SOME_STAGE_TYPE'; + + const error = new Error('Database connection error'); + + const mockTx = { + $queryRawTyped: vi.fn().mockRejectedValue(error), + } as unknown as Parameters[1]; + + await expect(taskRepository.findAndLockTaskForDequeue(stageType, mockTx)).rejects.toThrow('Database connection error'); + }); + }); + }); +}); diff --git a/tests/unit/tasks/tasks.spec.ts b/tests/unit/tasks/tasks.spec.ts index 5fb73552..7bcb7274 100644 --- a/tests/unit/tasks/tasks.spec.ts +++ b/tests/unit/tasks/tasks.spec.ts @@ -1,6 +1,6 @@ /* eslint-disable @typescript-eslint/naming-convention */ import { describe, beforeEach, afterEach, it, expect, beforeAll, vi } from 'vitest'; -import { jsLogger } from '@map-colonies/js-logger'; +import { jsLogger, type Logger } from '@map-colonies/js-logger'; import { faker } from '@faker-js/faker'; import { trace } from '@opentelemetry/api'; import { subHours, subMinutes } from 'date-fns'; @@ -15,6 +15,7 @@ import { TaskManager } from '@src/tasks/models/manager'; import { prismaKnownErrors } from '@src/common/errors'; import { TaskCreateModel } from '@src/tasks/models/models'; import { StageRepository } from '@src/stages/DAL/stageRepository'; +import { TaskRepository } from '@src/tasks/DAL/taskRepository'; import { SERVICE_NAME } from '@src/common/constants'; import { IllegalTaskStatusTransitionError, NotAllowedToAddTasksToInProgressStageError, StageInFiniteStateError } from '@src/common/generated/errors'; import { getConfig, initConfig } from '@src/common/config'; @@ -41,6 +42,7 @@ let jobManager: JobManager; let stageManager: StageManager; let taskManager: TaskManager; let stageRepository: StageRepository; +let taskRepository: TaskRepository; let prisma: DeepMockProxy; const tracer = trace.getTracer(SERVICE_NAME); @@ -50,17 +52,21 @@ let config: ReturnType; const notFoundError = new Prisma.PrismaClientKnownRequestError('RECORD_NOT_FOUND', { code: prismaKnownErrors.recordNotFound, clientVersion: '1' }); describe('JobManager', () => { + let logger: Logger; + beforeAll(async function () { + logger = jsLogger({ enabled: false }); await initConfig(true); }); beforeEach(function () { config = getConfig(); prisma = mockDeep(); - jobManager = new JobManager(jsLogger({ enabled: false }), prisma, tracer); - stageRepository = new StageRepository(jsLogger({ enabled: false }), prisma); - stageManager = new StageManager(jsLogger({ enabled: false }), prisma, tracer, stageRepository, jobManager); - taskManager = new TaskManager(jsLogger({ enabled: false }), prisma, tracer, stageManager, config); + jobManager = new JobManager(logger, prisma, tracer); + stageRepository = new StageRepository(logger, prisma); + taskRepository = new TaskRepository(logger, prisma); + stageManager = new StageManager(logger, prisma, tracer, stageRepository, jobManager); + taskManager = new TaskManager(logger, prisma, tracer, stageManager, config, taskRepository); }); afterEach(function () { @@ -586,10 +592,12 @@ describe('JobManager', () => { xstate: pendingStageXstatePersistentSnapshot, }); + vi.spyOn(taskRepository, 'findAndLockTaskForDequeue').mockResolvedValue(taskEntity); + prisma.$transaction.mockImplementationOnce(async (callback) => { const mockTx = { task: { - findFirst: vi.fn().mockResolvedValue(taskEntity), + findUnique: vi.fn().mockResolvedValue(taskEntity), updateManyAndReturn: vi.fn().mockResolvedValue([taskEntity]), }, stage: { @@ -608,12 +616,10 @@ describe('JobManager', () => { describe('#BadPath', () => { it('should get code 404 not found for no available tasks to dequeue', async function () { + vi.spyOn(taskRepository, 'findAndLockTaskForDequeue').mockResolvedValue(null); + prisma.$transaction.mockImplementationOnce(async (callback) => { - const mockTx = { - task: { - findFirst: vi.fn().mockResolvedValue(null), - }, - } as unknown as Omit; + const mockTx = {} as unknown as Omit; return callback(mockTx); }); @@ -624,12 +630,10 @@ describe('JobManager', () => { describe('#SadPath', () => { it('should fail with a database error when adding tasks', async function () { + vi.spyOn(taskRepository, 'findAndLockTaskForDequeue').mockRejectedValue(new Error('db connection error')); + prisma.$transaction.mockImplementationOnce(async (callback) => { - const mockTx = { - task: { - findFirst: vi.fn().mockRejectedValue(new Error('db connection error')), - }, - } as unknown as Omit; + const mockTx = {} as unknown as Omit; return callback(mockTx); }); @@ -651,10 +655,12 @@ describe('JobManager', () => { xstate: pendingStageXstatePersistentSnapshot, }); + vi.spyOn(taskRepository, 'findAndLockTaskForDequeue').mockResolvedValue(taskEntity); + prisma.$transaction.mockImplementationOnce(async (callback) => { const mockTx = { task: { - findFirst: vi.fn().mockResolvedValue(taskEntity), + findUnique: vi.fn().mockResolvedValue(taskEntity), updateManyAndReturn: vi.fn().mockResolvedValue([]), }, } as unknown as Omit; diff --git a/vitest.config.mts b/vitest.config.mts index b74ddba0..64eccc5a 100644 --- a/vitest.config.mts +++ b/vitest.config.mts @@ -28,6 +28,11 @@ export default defineConfig({ setupFiles: ['./tests/configurations/initJestOpenapi.setup.ts', './tests/configurations/vite.setup.ts'], include: ['tests/unit/**/*.spec.ts'], environment: 'node', + server: { + deps: { + external: ['node-cron'], + }, + }, }, resolve: { alias: pathAlias, @@ -40,6 +45,11 @@ export default defineConfig({ setupFiles: ['./tests/configurations/initJestOpenapi.setup.ts', './tests/configurations/vite.setup.ts'], include: ['tests/integration/**/*.spec.ts'], environment: 'node', + server: { + deps: { + external: ['node-cron'], + }, + }, }, resolve: { alias: pathAlias,