Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 104 additions & 33 deletions src/modules/job/job.controller.ts
Original file line number Diff line number Diff line change
@@ -1,64 +1,135 @@
import type { Request, Response } from "express"
import { jobService } from "./job.service"
import type { CreateJobInput } from "./job.types"
import { JobType } from "../../generated/prisma/enums"
import { logger } from "../../utils/logger"
import type { Request, Response } from "express";
import { jobService } from "./job.service";
import type { CreateJobInput } from "./job.types";
import { JobType } from "../../generated/prisma/enums";
import { logger } from "../../utils/logger";
import type { JobStatus } from "../../generated/prisma/enums";

function isJobType(value: string): value is CreateJobInput["type"] {
return Object.values(JobType).includes(value as CreateJobInput["type"])
return Object.values(JobType).includes(value as CreateJobInput["type"]);
}

function parseCreateBody(body: unknown): CreateJobInput | { error: string } {
if (typeof body !== "object" || body === null) {
return { error: "Body must be an object" }
return { error: "Body must be an object" };
}
const o = body as Record<string, unknown>
const type = o.type
const payload = o.payload
const o = body as Record<string, unknown>;
const type = o.type;
const payload = o.payload;
if (typeof type !== "string" || !isJobType(type)) {
return { error: `type must be one of: ${Object.values(JobType).join(", ")}` }
return {
error: `type must be one of: ${Object.values(JobType).join(", ")}`,
};
}
if (typeof payload !== "object" || payload === null) {
return { error: "payload must be an object" }
return { error: "payload must be an object" };
}
const workflowId = typeof o.workflowId === "string" ? o.workflowId : undefined
const workflowId =
typeof o.workflowId === "string" ? o.workflowId : undefined;
const maxRetries =
typeof o.maxRetries === "number" && Number.isInteger(o.maxRetries) && o.maxRetries >= 0
typeof o.maxRetries === "number" &&
Number.isInteger(o.maxRetries) &&
o.maxRetries >= 0
? o.maxRetries
: undefined
return { type: type as CreateJobInput["type"], payload: payload as CreateJobInput["payload"], workflowId, maxRetries }
: undefined;
return {
type: type as CreateJobInput["type"],
payload: payload as CreateJobInput["payload"],
workflowId,
maxRetries,
};
}

const jobController = {
async createJob(req: Request, res: Response): Promise<void> {
const parsed = parseCreateBody(req.body)
const parsed = parseCreateBody(req.body);
if ("error" in parsed) {
res.status(400).json({ message: parsed.error })
return
res.status(400).json({ message: parsed.error });
return;
}
try {
const job = await jobService.createJob(parsed)
res.status(201).json(job)
const job = await jobService.createJob(parsed);
res.status(201).json(job);
} catch (err) {
logger.error(err, "createJob failed")
res.status(500).json({ message: "Failed to create job" })
logger.error(err, "createJob failed");
res.status(500).json({ message: "Failed to create job" });
}
},

async getJob(req: Request, res: Response): Promise<void> {
const rawId = req.params.id
const id = typeof rawId === "string" ? rawId : Array.isArray(rawId) ? rawId[0] : ""
const rawId = req.params.id;
const id =
typeof rawId === "string" ? rawId : Array.isArray(rawId) ? rawId[0] : "";
if (!id) {
res.status(400).json({ message: "Missing job id" })
return
res.status(400).json({ message: "Missing job id" });
return;
}
const job = await jobService.getJobById(id)
const job = await jobService.getJobById(id);
if (!job) {
res.status(404).json({ message: "Job not found" })
return
res.status(404).json({ message: "Job not found" });
return;
}
res.json(job)
res.json(job);
},
}
async getAllJobs(req: Request, res: Response): Promise<void> {
const {
status,
type,
workflowId,
queueJobId,
startedAt,
completedAt,
limit,
offset,
} = req.query;

const filters: {
status?: JobStatus;
type?: JobType;
workflowId?: string;
queueJobId?: string;
startedAt?: Date;
completedAt?: Date;
limit?: number;
offset?: number;
} = (() => {
const limitValue = Array.isArray(limit) ? limit[0] : limit;
const offsetValue = Array.isArray(offset) ? offset[0] : offset;
const limitParsed =
typeof limitValue === "string" ? Number.parseInt(limitValue, 10) : NaN;
const offsetParsed =
typeof offsetValue === "string"
? Number.parseInt(offsetValue, 10)
: NaN;

const safeLimit =
Number.isFinite(limitParsed) && limitParsed >= 0 ? limitParsed : 10;
const safeOffset =
Number.isFinite(offsetParsed) && offsetParsed >= 0 ? offsetParsed : 0;

return { limit: safeLimit, offset: safeOffset };
})();
if (status) {
filters.status = status as JobStatus;
}
if (type) {
filters.type = type as JobType;
}
if (workflowId) {
filters.workflowId = workflowId as string;
}
if (queueJobId) {
filters.queueJobId = queueJobId as string;
}
if (startedAt) {
filters.startedAt = new Date(startedAt as string);
}
if (completedAt) {
filters.completedAt = new Date(completedAt as string);
}
const { jobs, total } = await jobService.getJobs(filters);
res.json({ jobs, total });
},
};

export { jobController }
export { jobController };
122 changes: 83 additions & 39 deletions src/modules/job/job.repository.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { JobStatus, JobType } from "../../generated/prisma/enums"
import type { JobRecord, JobResponse } from "./job.types"
import { prisma } from "../../infra/db/prisma"
import type { JobStatus, JobType } from "../../generated/prisma/enums";
import type { JobRecord, JobResponse } from "./job.types";
import { prisma } from "../../infra/db/prisma";

function toResponse(job: JobRecord): JobResponse {
return {
Expand All @@ -18,15 +18,58 @@ function toResponse(job: JobRecord): JobResponse {
completedAt: job.completedAt ?? null,
createdAt: job.createdAt,
updatedAt: job.updatedAt,
}
};
}

export const jobRepository = {
async findAll(filters: {
status?: JobStatus;
type?: JobType;
workflowId?: string;
queueJobId?: string;
startedAt?: Date;
completedAt?: Date;
limit?: number;
offset?: number;
}): Promise<JobRecord[]> {
const { limit, offset, ...whereFilters } = filters;
const take =
limit === undefined ? undefined : Math.max(0, Math.min(limit, 100));
const skip = offset === undefined ? undefined : Math.max(0, offset);

const jobs = await prisma.job.findMany({
where: whereFilters,
take,
skip,
orderBy: {
createdAt: "desc",
},
});
return jobs as JobRecord[];
},
async count(filters: {
status?: JobStatus;
type?: JobType;
workflowId?: string;
queueJobId?: string;
startedAt?: Date;
completedAt?: Date;
}): Promise<number> {
const {
limit: _limit,
offset: _offset,
...whereFilters
} = filters as typeof filters & {
limit?: number;
offset?: number;
};
return await prisma.job.count({ where: whereFilters });
},
async create(params: {
type: JobType
payload: import("../../generated/prisma/client").Prisma.InputJsonValue
workflowId?: string
maxRetries?: number
type: JobType;
payload: import("../../generated/prisma/client").Prisma.InputJsonValue;
workflowId?: string;
maxRetries?: number;
}): Promise<JobRecord> {
const job = await prisma.job.create({
data: {
Expand All @@ -35,101 +78,102 @@ export const jobRepository = {
workflowId: params.workflowId ?? undefined,
maxRetries: params.maxRetries ?? 3,
},
})
return job as JobRecord
});
return job as JobRecord;
},

async findById(id: string): Promise<JobRecord | undefined> {
const job = await prisma.job.findUnique({
where: { id },
})
return job ? (job as JobRecord) : undefined
});
return job ? (job as JobRecord) : undefined;
},

async getById(id: string): Promise<JobResponse | undefined> {
const job = await this.findById(id)
return job ? toResponse(job) : undefined
const job = await this.findById(id);
return job ? toResponse(job) : undefined;
},

async setQueued(id: string, queueJobId: string): Promise<JobRecord> {
const job = await prisma.job.update({
where: { id },
data: { status: "QUEUED", queueJobId },
})
return job as JobRecord
});
return job as JobRecord;
},

async setRunning(id: string): Promise<JobRecord> {
const job = await prisma.job.update({
where: { id },
data: { status: "RUNNING", startedAt: new Date() },
})
return job as JobRecord
});
return job as JobRecord;
},


async setRunningIfQueued(id: string): Promise<boolean> {
const result = await prisma.job.updateMany({
where: { id, status: "QUEUED" },
data: { status: "RUNNING", startedAt: new Date() },
})
return result.count === 1
});
return result.count === 1;
},

async setSuccess(id: string, result: JobRecord["result"]): Promise<JobRecord> {
async setSuccess(
id: string,
result: JobRecord["result"],
): Promise<JobRecord> {
const job = await prisma.job.update({
where: { id },
data: {
status: "SUCCESS",
result: result as import("../../generated/prisma/client").Prisma.InputJsonValue,
result:
result as import("../../generated/prisma/client").Prisma.InputJsonValue,
completedAt: new Date(),
},
})
return job as JobRecord
});
return job as JobRecord;
},

async setFailed(
id: string,
errorMessage: string,
result?: JobRecord["result"],
retries?: number
retries?: number,
): Promise<JobRecord> {
const data: Parameters<typeof prisma.job.update>[0]["data"] = {
status: "FAILED",
errorMessage,
result: result ?? undefined,
completedAt: new Date(),
}
if (retries !== undefined) data.retries = retries
};
if (retries !== undefined) data.retries = retries;
const job = await prisma.job.update({
where: { id },
data,
})
return job as JobRecord
});
return job as JobRecord;
},

async incrementRetries(id: string): Promise<JobRecord> {
const job = await prisma.job.update({
where: { id },
data: { retries: { increment: 1 } },
})
return job as JobRecord
});
return job as JobRecord;
},


async findStuckRunning(olderThan: Date): Promise<JobRecord[]> {
const jobs = await prisma.job.findMany({
where: { status: "RUNNING", startedAt: { lt: olderThan } },
})
return jobs as JobRecord[]
});
return jobs as JobRecord[];
},


async requeueStuck(id: string): Promise<JobRecord> {
const job = await prisma.job.update({
where: { id },
data: { status: "QUEUED", startedAt: null },
})
return job as JobRecord
});
return job as JobRecord;
},
}
};
1 change: 1 addition & 0 deletions src/modules/job/job.routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ const router = Router()

router.post("/", (req, res) => void jobController.createJob(req, res))
router.get("/:id", (req, res) => void jobController.getJob(req, res))
router.get("/", (req, res) => void jobController.getAllJobs(req, res))

export default router
Loading
Loading