From 318c361ec0988ae2c81497bcbfcb6302df81c4c3 Mon Sep 17 00:00:00 2001 From: 0xprathamesh Date: Thu, 19 Mar 2026 20:38:28 +0530 Subject: [PATCH] feat:listjobs & queries --- src/modules/job/job.controller.ts | 137 +++++++++++++++++++++++------- src/modules/job/job.repository.ts | 122 +++++++++++++++++--------- src/modules/job/job.routes.ts | 1 + src/modules/job/job.service.ts | 19 +++++ 4 files changed, 207 insertions(+), 72 deletions(-) diff --git a/src/modules/job/job.controller.ts b/src/modules/job/job.controller.ts index 03186b9..c413c6a 100644 --- a/src/modules/job/job.controller.ts +++ b/src/modules/job/job.controller.ts @@ -1,64 +1,135 @@ -import type { Request, Response } from "express" -import { jobService } from "./job.service" -import type { CreateJobInput } from "./job.types" -import { JobType } from "../../generated/prisma/enums" -import { logger } from "../../utils/logger" +import type { Request, Response } from "express"; +import { jobService } from "./job.service"; +import type { CreateJobInput } from "./job.types"; +import { JobType } from "../../generated/prisma/enums"; +import { logger } from "../../utils/logger"; +import type { JobStatus } from "../../generated/prisma/enums"; function isJobType(value: string): value is CreateJobInput["type"] { - return Object.values(JobType).includes(value as CreateJobInput["type"]) + return Object.values(JobType).includes(value as CreateJobInput["type"]); } function parseCreateBody(body: unknown): CreateJobInput | { error: string } { if (typeof body !== "object" || body === null) { - return { error: "Body must be an object" } + return { error: "Body must be an object" }; } - const o = body as Record - const type = o.type - const payload = o.payload + const o = body as Record; + const type = o.type; + const payload = o.payload; if (typeof type !== "string" || !isJobType(type)) { - return { error: `type must be one of: ${Object.values(JobType).join(", ")}` } + return { + error: `type must be one of: ${Object.values(JobType).join(", ")}`, + }; } if (typeof payload !== "object" || payload === null) { - return { error: "payload must be an object" } + return { error: "payload must be an object" }; } - const workflowId = typeof o.workflowId === "string" ? o.workflowId : undefined + const workflowId = + typeof o.workflowId === "string" ? o.workflowId : undefined; const maxRetries = - typeof o.maxRetries === "number" && Number.isInteger(o.maxRetries) && o.maxRetries >= 0 + typeof o.maxRetries === "number" && + Number.isInteger(o.maxRetries) && + o.maxRetries >= 0 ? o.maxRetries - : undefined - return { type: type as CreateJobInput["type"], payload: payload as CreateJobInput["payload"], workflowId, maxRetries } + : undefined; + return { + type: type as CreateJobInput["type"], + payload: payload as CreateJobInput["payload"], + workflowId, + maxRetries, + }; } const jobController = { async createJob(req: Request, res: Response): Promise { - const parsed = parseCreateBody(req.body) + const parsed = parseCreateBody(req.body); if ("error" in parsed) { - res.status(400).json({ message: parsed.error }) - return + res.status(400).json({ message: parsed.error }); + return; } try { - const job = await jobService.createJob(parsed) - res.status(201).json(job) + const job = await jobService.createJob(parsed); + res.status(201).json(job); } catch (err) { - logger.error(err, "createJob failed") - res.status(500).json({ message: "Failed to create job" }) + logger.error(err, "createJob failed"); + res.status(500).json({ message: "Failed to create job" }); } }, async getJob(req: Request, res: Response): Promise { - const rawId = req.params.id - const id = typeof rawId === "string" ? rawId : Array.isArray(rawId) ? rawId[0] : "" + const rawId = req.params.id; + const id = + typeof rawId === "string" ? rawId : Array.isArray(rawId) ? rawId[0] : ""; if (!id) { - res.status(400).json({ message: "Missing job id" }) - return + res.status(400).json({ message: "Missing job id" }); + return; } - const job = await jobService.getJobById(id) + const job = await jobService.getJobById(id); if (!job) { - res.status(404).json({ message: "Job not found" }) - return + res.status(404).json({ message: "Job not found" }); + return; } - res.json(job) + res.json(job); }, -} + async getAllJobs(req: Request, res: Response): Promise { + const { + status, + type, + workflowId, + queueJobId, + startedAt, + completedAt, + limit, + offset, + } = req.query; + + const filters: { + status?: JobStatus; + type?: JobType; + workflowId?: string; + queueJobId?: string; + startedAt?: Date; + completedAt?: Date; + limit?: number; + offset?: number; + } = (() => { + const limitValue = Array.isArray(limit) ? limit[0] : limit; + const offsetValue = Array.isArray(offset) ? offset[0] : offset; + const limitParsed = + typeof limitValue === "string" ? Number.parseInt(limitValue, 10) : NaN; + const offsetParsed = + typeof offsetValue === "string" + ? Number.parseInt(offsetValue, 10) + : NaN; + + const safeLimit = + Number.isFinite(limitParsed) && limitParsed >= 0 ? limitParsed : 10; + const safeOffset = + Number.isFinite(offsetParsed) && offsetParsed >= 0 ? offsetParsed : 0; + + return { limit: safeLimit, offset: safeOffset }; + })(); + if (status) { + filters.status = status as JobStatus; + } + if (type) { + filters.type = type as JobType; + } + if (workflowId) { + filters.workflowId = workflowId as string; + } + if (queueJobId) { + filters.queueJobId = queueJobId as string; + } + if (startedAt) { + filters.startedAt = new Date(startedAt as string); + } + if (completedAt) { + filters.completedAt = new Date(completedAt as string); + } + const { jobs, total } = await jobService.getJobs(filters); + res.json({ jobs, total }); + }, +}; -export { jobController } +export { jobController }; diff --git a/src/modules/job/job.repository.ts b/src/modules/job/job.repository.ts index f21ca18..05adbe6 100644 --- a/src/modules/job/job.repository.ts +++ b/src/modules/job/job.repository.ts @@ -1,6 +1,6 @@ -import type { JobStatus, JobType } from "../../generated/prisma/enums" -import type { JobRecord, JobResponse } from "./job.types" -import { prisma } from "../../infra/db/prisma" +import type { JobStatus, JobType } from "../../generated/prisma/enums"; +import type { JobRecord, JobResponse } from "./job.types"; +import { prisma } from "../../infra/db/prisma"; function toResponse(job: JobRecord): JobResponse { return { @@ -18,15 +18,58 @@ function toResponse(job: JobRecord): JobResponse { completedAt: job.completedAt ?? null, createdAt: job.createdAt, updatedAt: job.updatedAt, - } + }; } export const jobRepository = { + async findAll(filters: { + status?: JobStatus; + type?: JobType; + workflowId?: string; + queueJobId?: string; + startedAt?: Date; + completedAt?: Date; + limit?: number; + offset?: number; + }): Promise { + const { limit, offset, ...whereFilters } = filters; + const take = + limit === undefined ? undefined : Math.max(0, Math.min(limit, 100)); + const skip = offset === undefined ? undefined : Math.max(0, offset); + + const jobs = await prisma.job.findMany({ + where: whereFilters, + take, + skip, + orderBy: { + createdAt: "desc", + }, + }); + return jobs as JobRecord[]; + }, + async count(filters: { + status?: JobStatus; + type?: JobType; + workflowId?: string; + queueJobId?: string; + startedAt?: Date; + completedAt?: Date; + }): Promise { + const { + limit: _limit, + offset: _offset, + ...whereFilters + } = filters as typeof filters & { + limit?: number; + offset?: number; + }; + return await prisma.job.count({ where: whereFilters }); + }, async create(params: { - type: JobType - payload: import("../../generated/prisma/client").Prisma.InputJsonValue - workflowId?: string - maxRetries?: number + type: JobType; + payload: import("../../generated/prisma/client").Prisma.InputJsonValue; + workflowId?: string; + maxRetries?: number; }): Promise { const job = await prisma.job.create({ data: { @@ -35,101 +78,102 @@ export const jobRepository = { workflowId: params.workflowId ?? undefined, maxRetries: params.maxRetries ?? 3, }, - }) - return job as JobRecord + }); + return job as JobRecord; }, async findById(id: string): Promise { const job = await prisma.job.findUnique({ where: { id }, - }) - return job ? (job as JobRecord) : undefined + }); + return job ? (job as JobRecord) : undefined; }, async getById(id: string): Promise { - const job = await this.findById(id) - return job ? toResponse(job) : undefined + const job = await this.findById(id); + return job ? toResponse(job) : undefined; }, async setQueued(id: string, queueJobId: string): Promise { const job = await prisma.job.update({ where: { id }, data: { status: "QUEUED", queueJobId }, - }) - return job as JobRecord + }); + return job as JobRecord; }, async setRunning(id: string): Promise { const job = await prisma.job.update({ where: { id }, data: { status: "RUNNING", startedAt: new Date() }, - }) - return job as JobRecord + }); + return job as JobRecord; }, - async setRunningIfQueued(id: string): Promise { const result = await prisma.job.updateMany({ where: { id, status: "QUEUED" }, data: { status: "RUNNING", startedAt: new Date() }, - }) - return result.count === 1 + }); + return result.count === 1; }, - async setSuccess(id: string, result: JobRecord["result"]): Promise { + async setSuccess( + id: string, + result: JobRecord["result"], + ): Promise { const job = await prisma.job.update({ where: { id }, data: { status: "SUCCESS", - result: result as import("../../generated/prisma/client").Prisma.InputJsonValue, + result: + result as import("../../generated/prisma/client").Prisma.InputJsonValue, completedAt: new Date(), }, - }) - return job as JobRecord + }); + return job as JobRecord; }, async setFailed( id: string, errorMessage: string, result?: JobRecord["result"], - retries?: number + retries?: number, ): Promise { const data: Parameters[0]["data"] = { status: "FAILED", errorMessage, result: result ?? undefined, completedAt: new Date(), - } - if (retries !== undefined) data.retries = retries + }; + if (retries !== undefined) data.retries = retries; const job = await prisma.job.update({ where: { id }, data, - }) - return job as JobRecord + }); + return job as JobRecord; }, async incrementRetries(id: string): Promise { const job = await prisma.job.update({ where: { id }, data: { retries: { increment: 1 } }, - }) - return job as JobRecord + }); + return job as JobRecord; }, - async findStuckRunning(olderThan: Date): Promise { const jobs = await prisma.job.findMany({ where: { status: "RUNNING", startedAt: { lt: olderThan } }, - }) - return jobs as JobRecord[] + }); + return jobs as JobRecord[]; }, - async requeueStuck(id: string): Promise { const job = await prisma.job.update({ where: { id }, data: { status: "QUEUED", startedAt: null }, - }) - return job as JobRecord + }); + return job as JobRecord; }, -} +}; diff --git a/src/modules/job/job.routes.ts b/src/modules/job/job.routes.ts index bb8c04f..3ed104d 100644 --- a/src/modules/job/job.routes.ts +++ b/src/modules/job/job.routes.ts @@ -5,5 +5,6 @@ const router = Router() router.post("/", (req, res) => void jobController.createJob(req, res)) router.get("/:id", (req, res) => void jobController.getJob(req, res)) +router.get("/", (req, res) => void jobController.getAllJobs(req, res)) export default router diff --git a/src/modules/job/job.service.ts b/src/modules/job/job.service.ts index a9d917e..51b4108 100644 --- a/src/modules/job/job.service.ts +++ b/src/modules/job/job.service.ts @@ -3,10 +3,29 @@ import { jobRepository } from "./job.repository" import type { CreateJobInput, JobResponse, QueueJobData } from "./job.types" import type { JobType } from "../../generated/prisma/enums" import { ENV } from "../../config/env" +import type { JobStatus } from "../../generated/prisma/enums" const JOB_NAME = "process" export const jobService = { + async getJobs(filters: { + status?: JobStatus + type?: JobType + workflowId?: string + queueJobId?: string + startedAt?: Date + completedAt?: Date + limit?: number + offset?: number + }): Promise<{ jobs: JobResponse[], total: number }> { +const [jobs,total] = await Promise.all([ + jobRepository.findAll(filters), + jobRepository.count(filters), +]) + + return { jobs, total } + }, + async createJob(input: CreateJobInput): Promise { const job = await jobRepository.create({ type: input.type,