Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 42 additions & 35 deletions packages/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,30 @@ A [Fastify](https://github.com/fastify/fastify) plugin for managing queue proces
## Features

- **Cron Jobs**: Schedule recurring tasks using standard cron expressions
- **Queue System**: Basic queue management with support for BullMQ and AWS SQS
- **Queue System**: Queue management with support for BullMQ and AWS SQS
- **BullMQ Integration**: Redis-based message queues for high-performance background processing
- **AWS SQS Integration**: Support for Amazon Simple Queue Service

## Requirements

- [@prefabs.tech/fastify-config](https://www.npmjs.com/package/@prefabs.tech/fastify-config)

## Usage

### Register Plugin
### Fastify Plugin

Register the worker plugin with your Fastify instance:

```typescript
import workerPlugin from "@prefabs.tech/fastify-worker";
import configPlugin from "@prefabs.tech/fastify-config";
import Fastify from "fastify";

import config from "./config";

const start = async () => {
// Create fastify instance
const fastify = Fastify({
logger: config.logger,
});

// Register fastify-config plugin
await fastify.register(configPlugin, { config });

// Register worker plugin
await fastify.register(workerPlugin);

await fastify.listen({
Expand All @@ -47,6 +40,44 @@ const start = async () => {
start();
```

### Pushing to the queue

The `AdapterRegistry` is a singleton. Once the plugin initializes the worker, any service can access the same registry directly — no instance passing required:

```typescript
await fastify.register(workerPlugin);
```

```typescript
import { Worker } from "@prefabs.tech/fastify-worker";

const queue = Worker.adapters.get("queue-name")

if (queue) {
queue.push({ message: 'Hello world!' })
}
```

The plugin creates the `Worker` instance, which populates `Worker.adapters` on `start()`. Services import `Worker` and access the static registry directly. On fastify close, `worker.shutdown()` drains all adapters.

### Standalone

Use the `Worker` class directly without Fastify:

```typescript
import { Worker } from "@prefabs.tech/fastify-worker";

const worker = new Worker({
cronJobs: [...],
queues: [...],
});

await worker.start();

// later...
await worker.shutdown();
```

## Configuration

Add worker configuration to your config:
Expand All @@ -60,7 +91,7 @@ const config: ApiConfig = {
worker: {
cronJobs: [
{
expression: "0 0 * * *", // Run daily at midnight
expression: "0 0 * * *",
task: async () => {
console.log("Running daily cleanup...");
},
Expand Down Expand Up @@ -98,7 +129,7 @@ const config: ApiConfig = {
endpoint: "",
region: "",
},
handler: async (message: any) => {
handler: async (message) => {
//
},
queueUrl: "",
Expand All @@ -108,27 +139,3 @@ const config: ApiConfig = {
},
};
```

## Adding Jobs to Queues

To add jobs to a registered queue, use the `QueueProcessorRegistry` to access the queue client:

```typescript
import { QueueProcessorRegistry } from "@prefabs.tech/fastify-worker";

// Get the processor for a specific queue
const processor = QueueProcessorRegistry.get("email-queue");

if (processor) {
// Add a job to the queue
await processor.getQueueClient().push({
to: "user@example.com",
subject: "Welcome!",
body: "Hello from Fastify Worker",
});

console.log("Job added to email-queue");
} else {
console.error("Queue not found");
}
```
2 changes: 1 addition & 1 deletion packages/worker/src/cron/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export { default as setupCronJobs } from "./setup";
export { default as CronScheduler } from "./scheduler";
23 changes: 23 additions & 0 deletions packages/worker/src/cron/scheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import cron, { ScheduledTask } from "node-cron";

import { CronJob } from "../types";

class CronScheduler {
private tasks: ScheduledTask[] = [];

schedule(job: CronJob): void {
const task = cron.schedule(job.expression, job.task, job.options);

this.tasks.push(task);
}

stopAll(): void {
for (const task of this.tasks) {
task.stop();
}

this.tasks = [];
}
}

export default CronScheduler;
15 changes: 0 additions & 15 deletions packages/worker/src/cron/setup.ts

This file was deleted.

2 changes: 1 addition & 1 deletion packages/worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ export { SQSClient } from "@aws-sdk/client-sqs";
export { Job, Queue } from "bullmq";

export { default } from "./plugin";
export { default as Worker } from "./worker";

export * from "./enum";
export * from "./lib";
export * from "./queue";
export * from "./types";
1 change: 0 additions & 1 deletion packages/worker/src/lib/index.ts

This file was deleted.

17 changes: 0 additions & 17 deletions packages/worker/src/lib/initialize.ts

This file was deleted.

17 changes: 11 additions & 6 deletions packages/worker/src/plugin.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { FastifyInstance } from "fastify";
import FastifyPlugin from "fastify-plugin";

import {
initializeCronJobs,
initializeQueueProcessors,
} from "./lib/initialize";
import Worker from "./worker";

const plugin = async (fastify: FastifyInstance) => {
const { config, log } = fastify;
Expand All @@ -17,8 +14,16 @@ const plugin = async (fastify: FastifyInstance) => {

log.info("Registering worker plugin");

initializeCronJobs(config.worker.cronJobs);
initializeQueueProcessors(config.worker.queues);
const worker = new Worker(config.worker);

await worker.start();

fastify.decorate("worker", worker);

fastify.addHook("onClose", async () => {
log.info("Shutting down worker");
await worker.shutdown();
});
};

export default FastifyPlugin(plugin);
35 changes: 35 additions & 0 deletions packages/worker/src/queue/adapterRegistry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import QueueAdapter from "./adapters/base";

class AdapterRegistry {
private adapters = new Map<string, QueueAdapter>();

add(adapter: QueueAdapter): void {
this.adapters.set(adapter.queueName, adapter);
}

get(name: string): QueueAdapter | undefined {
return this.adapters.get(name);
}

getAll(): QueueAdapter[] {
return [...this.adapters.values()];
}

has(name: string): boolean {
return this.adapters.has(name);
}

remove(name: string): void {
this.adapters.delete(name);
}

async shutdownAll(): Promise<void> {
for (const adapter of this.adapters.values()) {
await adapter.shutdown();
}

this.adapters.clear();
}
}

export default AdapterRegistry;
17 changes: 17 additions & 0 deletions packages/worker/src/queue/adapters/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
abstract class QueueAdapter<Payload = unknown> {
public queueName: string;

constructor(name: string) {
this.queueName = name;
}

abstract start(): Promise<void>;
abstract shutdown(): Promise<void>;
abstract getClient(): unknown;
abstract push(
data: Payload,
options?: Record<string, unknown>,
): Promise<string>;
}

export default QueueAdapter;
83 changes: 83 additions & 0 deletions packages/worker/src/queue/adapters/bullmq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import {
Queue as BullQueue,
Worker,
Job,
QueueOptions,
WorkerOptions,
JobsOptions,
} from "bullmq";

import QueueAdapter from "./base";

export interface BullMQAdapterConfig {
queueOptions: QueueOptions;
workerOptions?: WorkerOptions;
handler: (job: Job) => Promise<void>;
onError?: (error: Error) => void;
onFailed?: (job: Job, error: Error) => void;
}

class BullMQAdapter<Payload> extends QueueAdapter {
public queue?: BullQueue;
public worker?: Worker;
private config: BullMQAdapterConfig;
private queueOptions: QueueOptions;
private workerOptions: WorkerOptions;

constructor(name: string, config: BullMQAdapterConfig) {
super(name);

this.config = config;
this.queueOptions = config.queueOptions;
this.workerOptions = {
connection: config.queueOptions.connection,
...config.workerOptions,
};
}

async start(): Promise<void> {
this.queue = new BullQueue(this.queueName, this.queueOptions);
this.worker = new Worker(
this.queueName,
async (job: Job<Payload>) => {
await this.config.handler(job);
},
this.workerOptions,
);

this.worker.on("error", (error) => {
if (this.config.onError) {
this.config.onError(error);
}
});

this.worker.on("failed", (job, error) => {
if (this.config.onFailed) {
this.config.onFailed(job as Job<Payload>, error);
}
});
}

async shutdown(): Promise<void> {
await this.worker?.close();
await this.queue?.close();
}

getClient(): BullQueue {
return this.queue!;
}

async push(data: Payload, options?: JobsOptions): Promise<string> {
try {
const job = await this.queue!.add(this.queueName, data, options);

return job.id!;
} catch (error) {
throw new Error(
`Failed to push job to BullMQ queue: ${this.queueName}. Error: ${(error as Error).message}`,
);
}
}
}

export default BullMQAdapter;
6 changes: 6 additions & 0 deletions packages/worker/src/queue/adapters/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export { default as QueueAdapter } from "./base";
export { default as BullMQAdapter } from "./bullmq";
export { default as SQSAdapter } from "./sqs";

export type { BullMQAdapterConfig } from "./bullmq";
export type { SQSAdapterConfig } from "./sqs";
Loading