Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions BackendAcademy/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { SocialModule } from './social/social.module';
import { OnboardingModule } from './onboarding/onboarding.module';
import { LessonModule } from './lessons/lesson.module';
import { TaskModule } from './tasks/task.module';
import { JobsModule } from './jobs/jobs.module';
import { LoggingModule } from './logging/logging.module';
import { ProgressModule } from './courses/progress/progress.module';
import { AppConfigModule } from './config/config.module';
Expand Down Expand Up @@ -56,6 +57,7 @@ import { SessionsModule } from './sessions/sessions.module';
OnboardingModule,
LessonModule,
TaskModule,
JobsModule,
LoggingModule,
PathfindingModule,
MonitoringModule,
Expand Down
41 changes: 41 additions & 0 deletions BackendAcademy/src/jobs/entities/grading-job.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { Entity, PrimaryGeneratedColumn, Column, CreateDateColumn, UpdateDateColumn } from 'typeorm';

export enum GradingJobStatus {
PENDING = 'PENDING',
IN_PROGRESS = 'IN_PROGRESS',
FAILED = 'FAILED',
COMPLETED = 'COMPLETED',
}

@Entity({ name: 'grading_jobs' })
export class GradingJobEntity {
@PrimaryGeneratedColumn('uuid')
id: string;

@Column('uuid')
submissionId: string;

@Column({ type: 'jsonb', nullable: true })
payload: any;

@Column({ type: 'int', default: 0 })
attempts: number;

@Column({ type: 'int', default: 5 })
maxAttempts: number;

@Column({ type: 'text', nullable: true })
lastError?: string | null;

@Column({ type: 'timestamptz', nullable: true })
nextRetryAt?: Date | null;

@Column({ type: 'varchar', default: GradingJobStatus.PENDING })
status: GradingJobStatus;

@CreateDateColumn({ type: 'timestamptz' })
createdAt: Date;

@UpdateDateColumn({ type: 'timestamptz' })
updatedAt: Date;
}
62 changes: 62 additions & 0 deletions BackendAcademy/src/jobs/grading-job.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { GradingJobService } from './grading-job.service';
import { GradingJobEntity, GradingJobStatus } from './entities/grading-job.entity';

describe('GradingJobService retries', () => {
let service: GradingJobService;
let mockRepo: any;
let mockGradingResultService: any;

beforeEach(() => {
mockRepo = {
create: jest.fn((v) => v),
save: jest.fn(async (v) => v),
find: jest.fn(async () => []),
};

let call = 0;
mockGradingResultService = {
saveResult: jest.fn(async () => {
call += 1;
if (call === 1) throw new Error('transient');
return {};
}),
};

service = new GradingJobService(mockRepo, mockGradingResultService);
});

it('retries a failed job and marks completed on success', async () => {
const job: GradingJobEntity = {
id: '1',
submissionId: 'sub-1',
payload: { foo: 'bar' },
attempts: 0,
maxAttempts: 3,
lastError: null,
nextRetryAt: new Date(Date.now() - 1000),
status: GradingJobStatus.PENDING,
createdAt: new Date(),
updatedAt: new Date(),
} as any;

mockRepo.find.mockResolvedValue([job]);

await service.processOnce();

// After first pass the job should have been saved with attempts=1 and status PENDING
expect(mockRepo.save).toHaveBeenCalled();
const savedAfterFirst = mockRepo.save.mock.calls[0][0];
expect(savedAfterFirst.attempts).toBe(1);
expect(savedAfterFirst.status).toBe(GradingJobStatus.PENDING);

// simulate that find returns the job again for second pass
// ensure nextRetryAt is in the past so the retry will execute immediately
savedAfterFirst.nextRetryAt = new Date(Date.now() - 1000);
mockRepo.find.mockResolvedValue([savedAfterFirst]);
await service.processOnce();

// final save should mark COMPLETED
const finalSaved = mockRepo.save.mock.calls[mockRepo.save.mock.calls.length - 1][0];
expect(finalSaved.status).toBe(GradingJobStatus.COMPLETED);
});
});
95 changes: 95 additions & 0 deletions BackendAcademy/src/jobs/grading-job.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { Injectable, OnModuleInit, Logger, Inject, forwardRef } from '@nestjs/common';
import { Repository } from 'typeorm';
import { InjectRepository } from '@nestjs/typeorm';
import { GradingJobEntity, GradingJobStatus } from './entities/grading-job.entity';
import { GradingResultService } from '../submissions/grading-result.service';

@Injectable()
export class GradingJobService implements OnModuleInit {
private readonly logger = new Logger(GradingJobService.name);
private intervalHandle?: NodeJS.Timeout;

constructor(
@InjectRepository(GradingJobEntity)
private readonly repo: Repository<GradingJobEntity>,
@Inject(forwardRef(() => GradingResultService))
private readonly gradingResultService: GradingResultService,
) {}

async onModuleInit() {
// Start polling loop for retries every 10 seconds
this.intervalHandle = setInterval(() => this.processPendingJobs().catch(err => this.logger.error(err)), 10_000);
}

async enqueueFailedJob(submissionId: string, payload: any, maxAttempts = 5) {
const job = this.repo.create({
submissionId,
payload,
attempts: 0,
maxAttempts,
status: GradingJobStatus.PENDING,
nextRetryAt: new Date(),
});
return this.repo.save(job);
}

async processOnce() {
return this.processPendingJobs();
}

private async processPendingJobs() {
const now = new Date();
const jobs = await this.repo.find({
where: {
status: GradingJobStatus.PENDING,
// TypeORM can't express nextRetryAt <= now in object form reliably across DBs,
},
order: { createdAt: 'ASC' },
take: 10,
});

for (const job of jobs) {
if (job.nextRetryAt && job.nextRetryAt > now) continue;

// mark in progress to avoid duplicate processing
job.status = GradingJobStatus.IN_PROGRESS;
await this.repo.save(job);

try {
// Attempt to replay the grading result using saved payload
await this.gradingResultService.saveResult(job.submissionId, job.payload);

job.status = GradingJobStatus.COMPLETED;
job.lastError = null;
await this.repo.save(job);
this.logger.debug(`Grading job ${job.id} completed`);
} catch (err: any) {
job.attempts = (job.attempts || 0) + 1;
job.lastError = err?.message ?? String(err);

if (job.attempts >= (job.maxAttempts ?? 5)) {
job.status = GradingJobStatus.FAILED;
job.nextRetryAt = null;
this.logger.warn(`Grading job ${job.id} failed permanently: ${job.lastError}`);
} else {
job.status = GradingJobStatus.PENDING;
// exponential backoff (seconds) with jitter
const base = 2;
const delaySeconds = base * Math.pow(2, job.attempts - 1);
const jitterMs = Math.floor(Math.random() * 1000);
job.nextRetryAt = new Date(Date.now() + delaySeconds * 1000 + jitterMs);
this.logger.debug(`Grading job ${job.id} will retry in ${delaySeconds}s (attempt ${job.attempts})`);
}

await this.repo.save(job);
}
}
}

async shutdown() {
if (this.intervalHandle) {
clearInterval(this.intervalHandle);
this.intervalHandle = undefined;
}
}
}
12 changes: 12 additions & 0 deletions BackendAcademy/src/jobs/jobs.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Module, forwardRef } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { GradingJobEntity } from './entities/grading-job.entity';
import { GradingJobService } from './grading-job.service';
import { SubmissionModule } from '../submissions/submission.module';

@Module({
imports: [TypeOrmModule.forFeature([GradingJobEntity]), forwardRef(() => SubmissionModule)],
providers: [GradingJobService],
exports: [GradingJobService],
})
export class JobsModule {}
23 changes: 23 additions & 0 deletions Retry_Logic.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Grading jobs retry system
=========================

This module provides a DB-backed retry mechanism for failed grading jobs.

How it works
- A `grading_jobs` table stores failed job payloads and metadata (`attempts`, `nextRetryAt`, etc.).
- `GradingJobService` polls the table periodically and replays jobs by calling `GradingResultService.saveResult`.
- Exponential backoff is used between attempts. After `maxAttempts` the job is marked `FAILED`.

Configuration & running
- No external dependencies (Redis) required — the system uses the primary database (TypeORM).
- The module is auto-registered in `AppModule` and will start polling when the application boots.

Usage
- To enqueue a failed grading job from other code, inject `GradingJobService` and call `enqueueFailedJob(submissionId, payload, maxAttempts?)`.

Testing
- The service exposes `processOnce()` for tests to trigger a single retry pass.

Notes
- Ensure `autoLoadEntities` or explicit entity registration is enabled so TypeORM picks up `GradingJobEntity`.
- Consider adding retention/cleanup for old failed jobs and hooks for alerting on repeated failures.