From 39eccdf70a2298cd6275a6420d9ef6bfd18457b8 Mon Sep 17 00:00:00 2001 From: sudeep Date: Fri, 20 Feb 2026 18:01:02 +0545 Subject: [PATCH 1/2] refactor: update queue terminologies and flattern classes --- packages/worker/src/cron/index.ts | 2 +- packages/worker/src/cron/scheduler.ts | 23 ++++ packages/worker/src/cron/setup.ts | 15 --- packages/worker/src/index.ts | 2 +- packages/worker/src/lib/index.ts | 1 - packages/worker/src/lib/initialize.ts | 17 --- packages/worker/src/plugin.ts | 17 ++- packages/worker/src/queue/adapterRegistry.ts | 35 ++++++ packages/worker/src/queue/adapters/base.ts | 17 +++ packages/worker/src/queue/adapters/bullmq.ts | 83 +++++++++++++ packages/worker/src/queue/adapters/index.ts | 6 + packages/worker/src/queue/adapters/sqs.ts | 124 +++++++++++++++++++ packages/worker/src/queue/clients/base.ts | 21 ---- packages/worker/src/queue/clients/bull.ts | 89 ------------- packages/worker/src/queue/clients/index.ts | 3 - packages/worker/src/queue/clients/sqs.ts | 118 ------------------ packages/worker/src/queue/factory.ts | 33 +++++ packages/worker/src/queue/index.ts | 7 +- packages/worker/src/queue/processor.ts | 56 --------- packages/worker/src/queue/registry.ts | 30 ----- packages/worker/src/queue/setup.ts | 17 --- packages/worker/src/types/queue.ts | 8 +- packages/worker/src/worker.ts | 39 ++++++ 23 files changed, 380 insertions(+), 383 deletions(-) create mode 100644 packages/worker/src/cron/scheduler.ts delete mode 100644 packages/worker/src/cron/setup.ts delete mode 100644 packages/worker/src/lib/index.ts delete mode 100644 packages/worker/src/lib/initialize.ts create mode 100644 packages/worker/src/queue/adapterRegistry.ts create mode 100644 packages/worker/src/queue/adapters/base.ts create mode 100644 packages/worker/src/queue/adapters/bullmq.ts create mode 100644 packages/worker/src/queue/adapters/index.ts create mode 100644 packages/worker/src/queue/adapters/sqs.ts delete mode 100644 packages/worker/src/queue/clients/base.ts delete mode 100644 packages/worker/src/queue/clients/bull.ts delete mode 100644 packages/worker/src/queue/clients/index.ts delete mode 100644 packages/worker/src/queue/clients/sqs.ts create mode 100644 packages/worker/src/queue/factory.ts delete mode 100644 packages/worker/src/queue/processor.ts delete mode 100644 packages/worker/src/queue/registry.ts delete mode 100644 packages/worker/src/queue/setup.ts create mode 100644 packages/worker/src/worker.ts diff --git a/packages/worker/src/cron/index.ts b/packages/worker/src/cron/index.ts index 503f327dc..d11e20550 100644 --- a/packages/worker/src/cron/index.ts +++ b/packages/worker/src/cron/index.ts @@ -1 +1 @@ -export { default as setupCronJobs } from "./setup"; +export { default as CronScheduler } from "./scheduler"; diff --git a/packages/worker/src/cron/scheduler.ts b/packages/worker/src/cron/scheduler.ts new file mode 100644 index 000000000..9fd3d13ec --- /dev/null +++ b/packages/worker/src/cron/scheduler.ts @@ -0,0 +1,23 @@ +import cron, { ScheduledTask } from "node-cron"; + +import { CronJob } from "../types"; + +class CronScheduler { + private tasks: ScheduledTask[] = []; + + schedule(job: CronJob): void { + const task = cron.schedule(job.expression, job.task, job.options); + + this.tasks.push(task); + } + + stopAll(): void { + for (const task of this.tasks) { + task.stop(); + } + + this.tasks = []; + } +} + +export default CronScheduler; diff --git a/packages/worker/src/cron/setup.ts b/packages/worker/src/cron/setup.ts deleted file mode 100644 index e1199497f..000000000 --- a/packages/worker/src/cron/setup.ts +++ /dev/null @@ -1,15 +0,0 @@ -import cron from "node-cron"; - -import { CronJob } from "src/types"; - -const setupCronJobs = (cronJobs: CronJob[]) => { - if (cronJobs.length === 0) { - return; - } - - for (const job of cronJobs) { - cron.schedule(job.expression, job.task, job.options); - } -}; - -export default setupCronJobs; diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index d287ce467..eda4a09f4 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -12,8 +12,8 @@ export { SQSClient } from "@aws-sdk/client-sqs"; export { Job, Queue } from "bullmq"; export { default } from "./plugin"; +export { default as Worker } from "./worker"; export * from "./enum"; -export * from "./lib"; export * from "./queue"; export * from "./types"; diff --git a/packages/worker/src/lib/index.ts b/packages/worker/src/lib/index.ts deleted file mode 100644 index 048cfe5bb..000000000 --- a/packages/worker/src/lib/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from "./initialize"; diff --git a/packages/worker/src/lib/initialize.ts b/packages/worker/src/lib/initialize.ts deleted file mode 100644 index 36ade4bd8..000000000 --- a/packages/worker/src/lib/initialize.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { setupCronJobs } from "../cron"; -import { setupQueueProcessors } from "../queue"; -import { CronJob, QueueConfig } from "../types"; - -const initializeCronJobs = async (cronConfigs?: CronJob[]) => { - if (!cronConfigs) return; - - setupCronJobs(cronConfigs); -}; - -const initializeQueueProcessors = async (queueConfigs?: QueueConfig[]) => { - if (!queueConfigs) return; - - setupQueueProcessors(queueConfigs); -}; - -export { initializeCronJobs, initializeQueueProcessors }; diff --git a/packages/worker/src/plugin.ts b/packages/worker/src/plugin.ts index 8e20d7333..b6cd2b059 100644 --- a/packages/worker/src/plugin.ts +++ b/packages/worker/src/plugin.ts @@ -1,10 +1,7 @@ import { FastifyInstance } from "fastify"; import FastifyPlugin from "fastify-plugin"; -import { - initializeCronJobs, - initializeQueueProcessors, -} from "./lib/initialize"; +import Worker from "./worker"; const plugin = async (fastify: FastifyInstance) => { const { config, log } = fastify; @@ -17,8 +14,16 @@ const plugin = async (fastify: FastifyInstance) => { log.info("Registering worker plugin"); - initializeCronJobs(config.worker.cronJobs); - initializeQueueProcessors(config.worker.queues); + const worker = new Worker(config.worker); + + await worker.start(); + + fastify.decorate("worker", worker); + + fastify.addHook("onClose", async () => { + log.info("Shutting down worker"); + await worker.shutdown(); + }); }; export default FastifyPlugin(plugin); diff --git a/packages/worker/src/queue/adapterRegistry.ts b/packages/worker/src/queue/adapterRegistry.ts new file mode 100644 index 000000000..5ab83412e --- /dev/null +++ b/packages/worker/src/queue/adapterRegistry.ts @@ -0,0 +1,35 @@ +import QueueAdapter from "./adapters/base"; + +class AdapterRegistry { + private adapters = new Map(); + + add(adapter: QueueAdapter): void { + this.adapters.set(adapter.queueName, adapter); + } + + get(name: string): QueueAdapter | undefined { + return this.adapters.get(name); + } + + getAll(): QueueAdapter[] { + return [...this.adapters.values()]; + } + + has(name: string): boolean { + return this.adapters.has(name); + } + + remove(name: string): void { + this.adapters.delete(name); + } + + async shutdownAll(): Promise { + for (const adapter of this.adapters.values()) { + await adapter.shutdown(); + } + + this.adapters.clear(); + } +} + +export default AdapterRegistry; diff --git a/packages/worker/src/queue/adapters/base.ts b/packages/worker/src/queue/adapters/base.ts new file mode 100644 index 000000000..845e7d9cd --- /dev/null +++ b/packages/worker/src/queue/adapters/base.ts @@ -0,0 +1,17 @@ +abstract class QueueAdapter { + public queueName: string; + + constructor(name: string) { + this.queueName = name; + } + + abstract start(): Promise; + abstract shutdown(): Promise; + abstract getClient(): unknown; + abstract push( + data: Payload, + options?: Record, + ): Promise; +} + +export default QueueAdapter; diff --git a/packages/worker/src/queue/adapters/bullmq.ts b/packages/worker/src/queue/adapters/bullmq.ts new file mode 100644 index 000000000..6dd3a3192 --- /dev/null +++ b/packages/worker/src/queue/adapters/bullmq.ts @@ -0,0 +1,83 @@ +import { + Queue as BullQueue, + Worker, + Job, + QueueOptions, + WorkerOptions, + JobsOptions, +} from "bullmq"; + +import QueueAdapter from "./base"; + +export interface BullMQAdapterConfig { + queueOptions: QueueOptions; + workerOptions?: WorkerOptions; + handler: (job: Job) => Promise; + onError?: (error: Error) => void; + onFailed?: (job: Job, error: Error) => void; +} + +class BullMQAdapter extends QueueAdapter { + public queue?: BullQueue; + public worker?: Worker; + private config: BullMQAdapterConfig; + private queueOptions: QueueOptions; + private workerOptions: WorkerOptions; + + constructor(name: string, config: BullMQAdapterConfig) { + super(name); + + this.config = config; + this.queueOptions = config.queueOptions; + this.workerOptions = { + connection: config.queueOptions.connection, + ...config.workerOptions, + }; + } + + async start(): Promise { + this.queue = new BullQueue(this.queueName, this.queueOptions); + this.worker = new Worker( + this.queueName, + async (job: Job) => { + await this.config.handler(job); + }, + this.workerOptions, + ); + + this.worker.on("error", (error) => { + if (this.config.onError) { + this.config.onError(error); + } + }); + + this.worker.on("failed", (job, error) => { + if (this.config.onFailed) { + this.config.onFailed(job as Job, error); + } + }); + } + + async shutdown(): Promise { + await this.worker?.close(); + await this.queue?.close(); + } + + getClient(): BullQueue { + return this.queue!; + } + + async push(data: Payload, options?: JobsOptions): Promise { + try { + const job = await this.queue!.add(this.queueName, data, options); + + return job.id!; + } catch (error) { + throw new Error( + `Failed to push job to BullMQ queue: ${this.queueName}. Error: ${(error as Error).message}`, + ); + } + } +} + +export default BullMQAdapter; diff --git a/packages/worker/src/queue/adapters/index.ts b/packages/worker/src/queue/adapters/index.ts new file mode 100644 index 000000000..266871156 --- /dev/null +++ b/packages/worker/src/queue/adapters/index.ts @@ -0,0 +1,6 @@ +export { default as QueueAdapter } from "./base"; +export { default as BullMQAdapter } from "./bullmq"; +export { default as SQSAdapter } from "./sqs"; + +export type { BullMQAdapterConfig } from "./bullmq"; +export type { SQSAdapterConfig } from "./sqs"; diff --git a/packages/worker/src/queue/adapters/sqs.ts b/packages/worker/src/queue/adapters/sqs.ts new file mode 100644 index 000000000..4a58a5e06 --- /dev/null +++ b/packages/worker/src/queue/adapters/sqs.ts @@ -0,0 +1,124 @@ +import { + DeleteMessageCommand, + Message, + ReceiveMessageCommand, + ReceiveMessageCommandInput, + SendMessageCommand, + SQSClient, + SQSClientConfig, +} from "@aws-sdk/client-sqs"; + +import QueueAdapter from "./base"; + +export interface SQSAdapterConfig { + clientConfig: SQSClientConfig; + handler: (data: unknown) => Promise; + onError?: (error: Error, message?: Message) => void; + queueUrl: string; + receiveMessageOptions?: ReceiveMessageCommandInput; +} + +class SQSAdapter extends QueueAdapter { + private config: SQSAdapterConfig; + public client?: SQSClient; + private queueUrl: string; + private isPolling: boolean = false; + + constructor(name: string, config: SQSAdapterConfig) { + super(name); + + this.config = config; + this.queueUrl = config.queueUrl; + } + + async start(): Promise { + this.client = new SQSClient(this.config.clientConfig); + this.startPolling(); + } + + async shutdown(): Promise { + this.isPolling = false; + this.client?.destroy(); + } + + getClient(): SQSClient { + return this.client!; + } + + private startPolling(): void { + if (this.isPolling) { + return; + } + + this.isPolling = true; + this.poll(); + } + + private async poll(): Promise { + while (this.isPolling) { + try { + const command = new ReceiveMessageCommand({ + QueueUrl: this.queueUrl, + ...this.config.receiveMessageOptions, + }); + + const response = await this.client!.send(command); + + if (response.Messages && response.Messages.length > 0) { + await Promise.all( + response.Messages.map(async (message: Message) => { + try { + const data = JSON.parse(message.Body ?? "{}") as Payload; + + await this.config.handler(data); + + await this.client!.send( + new DeleteMessageCommand({ + QueueUrl: this.queueUrl, + ReceiptHandle: message.ReceiptHandle, + }), + ); + } catch (error) { + if (this.config.onError) { + this.config.onError( + error instanceof Error ? error : new Error(String(error)), + message, + ); + } + } + }), + ); + } + } catch (error) { + if (this.config.onError) { + this.config.onError( + error instanceof Error ? error : new Error(String(error)), + ); + } + } + } + } + + async push( + data: Payload, + options?: Record, + ): Promise { + try { + const command = new SendMessageCommand({ + QueueUrl: this.queueUrl, + MessageBody: JSON.stringify(data), + ...options, + }); + + const response = await this.client!.send(command); + + return response.MessageId!; + } catch (error) { + throw new Error( + `Failed to push job to SQS queue: ${this.queueName}. Error: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } +} + +export default SQSAdapter; diff --git a/packages/worker/src/queue/clients/base.ts b/packages/worker/src/queue/clients/base.ts deleted file mode 100644 index aa725d966..000000000 --- a/packages/worker/src/queue/clients/base.ts +++ /dev/null @@ -1,21 +0,0 @@ -abstract class BaseQueueClient { - public queueName: string; - - constructor(name: string) { - this.queueName = name; - } - - abstract getClient(): Payload; - - abstract process( - handler: (data: Payload) => Promise, - concurrency?: number, - ): void; - - abstract push( - data: Payload, - options?: Record, - ): Promise; -} - -export default BaseQueueClient; diff --git a/packages/worker/src/queue/clients/bull.ts b/packages/worker/src/queue/clients/bull.ts deleted file mode 100644 index a62c4290d..000000000 --- a/packages/worker/src/queue/clients/bull.ts +++ /dev/null @@ -1,89 +0,0 @@ -import { - Queue as BullQueue, - Worker, - Job, - QueueOptions, - WorkerOptions, - JobsOptions, -} from "bullmq"; - -import BaseQueueClient from "./base"; - -export interface BullMQClientConfig { - queueOptions: QueueOptions; - workerOptions?: WorkerOptions; - handler: (job: Job) => Promise; - onError?: (error: Error) => void; - onFailed?: (job: Job, error: Error) => void; -} - -class BullMqClient extends BaseQueueClient { - public queue: BullQueue; - public worker?: Worker; - private handler: (job: Job) => Promise; - private onError?: (error: Error) => void; - private onFailed?: (job: Job, error: Error) => void; - private queueOptions: QueueOptions; - private workerOptions?: WorkerOptions; - - constructor(name: string, config: BullMQClientConfig) { - super(name); - - this.queueOptions = config.queueOptions; - this.workerOptions = { - connection: config.queueOptions.connection, - ...config.workerOptions, - }; - this.handler = config.handler; - this.onError = config.onError; - this.onFailed = config.onFailed; - this.queue = new BullQueue(this.queueName, this.queueOptions); - this.process(); - } - - getClient(): BullQueue { - return this.queue; - } - - async push(data: Payload, options?: JobsOptions): Promise { - try { - const job = await this.queue.add(this.queueName, data, options); - - return job.id!; - } catch (error) { - throw new Error( - `Failed to push job to BullMQ queue: ${this.queueName}. Error: ${(error as Error).message}`, - ); - } - } - - process(): void { - try { - this.worker = new Worker( - this.queueName, - async (job: Job) => { - await this.handler(job); - }, - this.workerOptions, - ); - - this.worker.on("error", (error) => { - if (this.onError) { - this.onError(error); - } - }); - - this.worker.on("failed", (job, error) => { - if (this.onFailed) { - this.onFailed(job as Job, error); - } - }); - } catch (error) { - throw new Error( - `Failed to process jobs from BullMQ queue: ${this.queueName}. Error: ${(error as Error).message}`, - ); - } - } -} - -export default BullMqClient; diff --git a/packages/worker/src/queue/clients/index.ts b/packages/worker/src/queue/clients/index.ts deleted file mode 100644 index 7ccf0cd53..000000000 --- a/packages/worker/src/queue/clients/index.ts +++ /dev/null @@ -1,3 +0,0 @@ -export { default as BaseQueueClient } from "./base"; -export { default as BullMQQueueClient } from "./bull"; -export { default as SQSQueueClient } from "./sqs"; diff --git a/packages/worker/src/queue/clients/sqs.ts b/packages/worker/src/queue/clients/sqs.ts deleted file mode 100644 index 6268f1cf3..000000000 --- a/packages/worker/src/queue/clients/sqs.ts +++ /dev/null @@ -1,118 +0,0 @@ -import { - DeleteMessageCommand, - Message, - ReceiveMessageCommand, - ReceiveMessageCommandInput, - SendMessageCommand, - SQSClient, - SQSClientConfig, -} from "@aws-sdk/client-sqs"; - -import BaseQueueClient from "./base"; - -export interface SQSQueueClientConfig { - clientConfig: SQSClientConfig; - handler: (data: unknown) => Promise; - onError?: (error: Error, message?: Message) => void; - queueUrl: string; - receiveMessageOptions?: ReceiveMessageCommandInput; -} - -class SQSQueueClient extends BaseQueueClient { - private config: SQSQueueClientConfig; - public client: SQSClient; - private queueUrl: string; - private isPooling: boolean = false; - - constructor(name: string, config: SQSQueueClientConfig) { - super(name); - - this.config = config; - this.client = new SQSClient(config.clientConfig); - this.queueUrl = config.queueUrl; - - this.process(config.handler); - } - - getClient(): SQSClient { - return this.client; - } - - async process(handler: (data: Payload) => Promise): Promise { - if (this.isPooling) { - return; - } - - this.isPooling = true; - - const pool = async () => { - while (this.isPooling) { - try { - const command = new ReceiveMessageCommand({ - QueueUrl: this.queueUrl, - ...this.config.receiveMessageOptions, - }); - - const response = await this.client.send(command); - - if (response.Messages && response.Messages.length > 0) { - await Promise.all( - response.Messages.map(async (message: Message) => { - try { - const data = JSON.parse(message.Body ?? "{}") as Payload; - - await handler(data); - - await this.client.send( - new DeleteMessageCommand({ - QueueUrl: this.queueUrl, - ReceiptHandle: message.ReceiptHandle, - }), - ); - } catch (error) { - if (this.config.onError) { - this.config.onError( - error instanceof Error ? error : new Error(String(error)), - message, - ); - } - } - }), - ); - } - } catch (error) { - if (this.config.onError) { - this.config.onError( - error instanceof Error ? error : new Error(String(error)), - ); - } - } - } - }; - - pool(); - } - - async push( - data: Payload, - options?: Record, - ): Promise { - try { - const command = new SendMessageCommand({ - QueueUrl: this.queueUrl, - MessageBody: JSON.stringify(data), - ...options, - }); - - const response = await this.client.send(command); - - return response.MessageId!; - } catch (error) { - throw new Error( - `Failed to push job to SQS queue: ${this.queueName}. Error: ${error instanceof Error ? error.message : String(error)}`, - ); - } - } -} - -export default SQSQueueClient; diff --git a/packages/worker/src/queue/factory.ts b/packages/worker/src/queue/factory.ts new file mode 100644 index 000000000..517d017c0 --- /dev/null +++ b/packages/worker/src/queue/factory.ts @@ -0,0 +1,33 @@ +import { QueueProvider } from "../enum"; +import { QueueConfig } from "../types"; +import { QueueAdapter, BullMQAdapter, SQSAdapter } from "./adapters"; + +const createQueueAdapter = (config: QueueConfig): QueueAdapter => { + switch (config.provider) { + case QueueProvider.BULLMQ: { + if (!config.bullmqConfig) { + throw new Error( + `BullMQ configuration is required for queue: ${config.name}`, + ); + } + + return new BullMQAdapter(config.name, config.bullmqConfig); + } + + case QueueProvider.SQS: { + if (!config.sqsConfig) { + throw new Error( + `SQS configuration is required for queue: ${config.name}`, + ); + } + + return new SQSAdapter(config.name, config.sqsConfig); + } + + default: { + throw new Error(`Unsupported queue provider: ${config.provider}`); + } + } +}; + +export default createQueueAdapter; diff --git a/packages/worker/src/queue/index.ts b/packages/worker/src/queue/index.ts index 4b887b5e2..241050086 100644 --- a/packages/worker/src/queue/index.ts +++ b/packages/worker/src/queue/index.ts @@ -1,5 +1,4 @@ -export * from "./clients"; +export * from "./adapters"; -export { default as setupQueueProcessors } from "./setup"; -export { default as QueueProcessor } from "./processor"; -export { default as QueueProcessorRegistry } from "./registry"; +export { default as AdapterRegistry } from "./adapterRegistry"; +export { default as createQueueAdapter } from "./factory"; diff --git a/packages/worker/src/queue/processor.ts b/packages/worker/src/queue/processor.ts deleted file mode 100644 index 0da9efb16..000000000 --- a/packages/worker/src/queue/processor.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { QueueProvider } from "../enum"; -import { BaseQueueClient, BullMQQueueClient, SQSQueueClient } from "./clients"; -import { QueueConfig } from "../types/queue"; - -class QueueProcessor { - private queueClient: BaseQueueClient; - - constructor(config: QueueConfig) { - this.queueClient = this.initializeQueueClient(config); - } - - protected initializeQueueClient(config: QueueConfig) { - let queueClient: BaseQueueClient; - - switch (config.provider) { - case QueueProvider.BULLMQ: { - if (!config.bullmqConfig) { - throw new Error( - `BullMQ configuration is required for queue: ${config.name}`, - ); - } - - queueClient = new BullMQQueueClient(config.name, config.bullmqConfig); - - break; - } - - case QueueProvider.SQS: { - if (!config.sqsConfig) { - throw new Error( - `SQS configuration is required for queue: ${config.name}`, - ); - } - - queueClient = new SQSQueueClient(config.name, config.sqsConfig); - - break; - } - default: { - throw new Error(`Unsupported queue provider: ${config.provider}`); - } - } - - return queueClient; - } - - public getQueueClient() { - return this.queueClient; - } - - public getName() { - return this.queueClient.queueName; - } -} - -export default QueueProcessor; diff --git a/packages/worker/src/queue/registry.ts b/packages/worker/src/queue/registry.ts deleted file mode 100644 index acc150bf2..000000000 --- a/packages/worker/src/queue/registry.ts +++ /dev/null @@ -1,30 +0,0 @@ -import QueueProcessor from "./processor"; - -class QueueProcessorRegistry { - public static queueProcessors: Map = new Map(); - - public static add(queueProcessor: QueueProcessor) { - QueueProcessorRegistry.queueProcessors.set( - queueProcessor.getName(), - queueProcessor, - ); - } - - public static get(queueName: string): QueueProcessor | undefined { - return QueueProcessorRegistry.queueProcessors.get(queueName); - } - - public static getAll(): QueueProcessor[] { - return [...QueueProcessorRegistry.queueProcessors.values()]; - } - - public static has(queueName: string): boolean { - return QueueProcessorRegistry.queueProcessors.has(queueName); - } - - public static remove(queueName: string): void { - QueueProcessorRegistry.queueProcessors.delete(queueName); - } -} - -export default QueueProcessorRegistry; diff --git a/packages/worker/src/queue/setup.ts b/packages/worker/src/queue/setup.ts deleted file mode 100644 index ae7cdabb0..000000000 --- a/packages/worker/src/queue/setup.ts +++ /dev/null @@ -1,17 +0,0 @@ -import QueueProcessor from "./processor"; -import QueueProcessorRegistry from "./registry"; -import { QueueConfig } from "../types"; - -const setupQueueProcessors = (queueConfigs: QueueConfig[]) => { - if (queueConfigs.length === 0) { - return; - } - - for (const queueConfig of queueConfigs) { - const queueProcessor = new QueueProcessor(queueConfig); - - QueueProcessorRegistry.add(queueProcessor); - } -}; - -export default setupQueueProcessors; diff --git a/packages/worker/src/types/queue.ts b/packages/worker/src/types/queue.ts index 623bf27e8..fc3e1e0dd 100644 --- a/packages/worker/src/types/queue.ts +++ b/packages/worker/src/types/queue.ts @@ -1,10 +1,10 @@ import { QueueProvider } from "../enum"; -import { BullMQClientConfig } from "../queue/clients/bull"; -import { SQSQueueClientConfig } from "../queue/clients/sqs"; +import { BullMQAdapterConfig } from "../queue/adapters/bullmq"; +import { SQSAdapterConfig } from "../queue/adapters/sqs"; export interface QueueConfig { - bullmqConfig?: BullMQClientConfig; + bullmqConfig?: BullMQAdapterConfig; name: string; provider: QueueProvider; - sqsConfig?: SQSQueueClientConfig; + sqsConfig?: SQSAdapterConfig; } diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts new file mode 100644 index 000000000..31e40d87d --- /dev/null +++ b/packages/worker/src/worker.ts @@ -0,0 +1,39 @@ +import { CronScheduler } from "./cron"; +import { AdapterRegistry, createQueueAdapter } from "./queue"; +import { WorkerConfig } from "./types"; + +class Worker { + public readonly cron: CronScheduler; + public readonly adapters: AdapterRegistry; + private config: WorkerConfig; + + constructor(config: WorkerConfig) { + this.config = config; + this.cron = new CronScheduler(); + this.adapters = new AdapterRegistry(); + } + + async start(): Promise { + if (this.config.cronJobs) { + for (const job of this.config.cronJobs) { + this.cron.schedule(job); + } + } + + if (this.config.queues) { + for (const queueConfig of this.config.queues) { + const adapter = createQueueAdapter(queueConfig); + + await adapter.start(); + this.adapters.add(adapter); + } + } + } + + async shutdown(): Promise { + this.cron.stopAll(); + await this.adapters.shutdownAll(); + } +} + +export default Worker; From 7e99a5d88033e0ed7ab7f13a36989e4a8a8564fa Mon Sep 17 00:00:00 2001 From: sudeep Date: Mon, 23 Feb 2026 14:26:40 +0545 Subject: [PATCH 2/2] feat: make adapter registery static in worker class --- packages/worker/README.md | 77 +++++++++++++++++++---------------- packages/worker/src/worker.ts | 7 ++-- 2 files changed, 45 insertions(+), 39 deletions(-) diff --git a/packages/worker/README.md b/packages/worker/README.md index 70c19e983..618384998 100644 --- a/packages/worker/README.md +++ b/packages/worker/README.md @@ -5,37 +5,30 @@ A [Fastify](https://github.com/fastify/fastify) plugin for managing queue proces ## Features - **Cron Jobs**: Schedule recurring tasks using standard cron expressions -- **Queue System**: Basic queue management with support for BullMQ and AWS SQS +- **Queue System**: Queue management with support for BullMQ and AWS SQS - **BullMQ Integration**: Redis-based message queues for high-performance background processing - **AWS SQS Integration**: Support for Amazon Simple Queue Service ## Requirements - - [@prefabs.tech/fastify-config](https://www.npmjs.com/package/@prefabs.tech/fastify-config) ## Usage -### Register Plugin +### Fastify Plugin Register the worker plugin with your Fastify instance: ```typescript import workerPlugin from "@prefabs.tech/fastify-worker"; -import configPlugin from "@prefabs.tech/fastify-config"; import Fastify from "fastify"; import config from "./config"; const start = async () => { - // Create fastify instance const fastify = Fastify({ logger: config.logger, }); - // Register fastify-config plugin - await fastify.register(configPlugin, { config }); - - // Register worker plugin await fastify.register(workerPlugin); await fastify.listen({ @@ -47,6 +40,44 @@ const start = async () => { start(); ``` +### Pushing to the queue + +The `AdapterRegistry` is a singleton. Once the plugin initializes the worker, any service can access the same registry directly — no instance passing required: + +```typescript +await fastify.register(workerPlugin); +``` + +```typescript +import { Worker } from "@prefabs.tech/fastify-worker"; + +const queue = Worker.adapters.get("queue-name") + +if (queue) { + queue.push({ message: 'Hello world!' }) +} +``` + +The plugin creates the `Worker` instance, which populates `Worker.adapters` on `start()`. Services import `Worker` and access the static registry directly. On fastify close, `worker.shutdown()` drains all adapters. + +### Standalone + +Use the `Worker` class directly without Fastify: + +```typescript +import { Worker } from "@prefabs.tech/fastify-worker"; + +const worker = new Worker({ + cronJobs: [...], + queues: [...], +}); + +await worker.start(); + +// later... +await worker.shutdown(); +``` + ## Configuration Add worker configuration to your config: @@ -60,7 +91,7 @@ const config: ApiConfig = { worker: { cronJobs: [ { - expression: "0 0 * * *", // Run daily at midnight + expression: "0 0 * * *", task: async () => { console.log("Running daily cleanup..."); }, @@ -98,7 +129,7 @@ const config: ApiConfig = { endpoint: "", region: "", }, - handler: async (message: any) => { + handler: async (message) => { // }, queueUrl: "", @@ -108,27 +139,3 @@ const config: ApiConfig = { }, }; ``` - -## Adding Jobs to Queues - -To add jobs to a registered queue, use the `QueueProcessorRegistry` to access the queue client: - -```typescript -import { QueueProcessorRegistry } from "@prefabs.tech/fastify-worker"; - -// Get the processor for a specific queue -const processor = QueueProcessorRegistry.get("email-queue"); - -if (processor) { - // Add a job to the queue - await processor.getQueueClient().push({ - to: "user@example.com", - subject: "Welcome!", - body: "Hello from Fastify Worker", - }); - - console.log("Job added to email-queue"); -} else { - console.error("Queue not found"); -} -``` diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 31e40d87d..207a57edf 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -3,14 +3,13 @@ import { AdapterRegistry, createQueueAdapter } from "./queue"; import { WorkerConfig } from "./types"; class Worker { + public static readonly adapters = new AdapterRegistry(); public readonly cron: CronScheduler; - public readonly adapters: AdapterRegistry; private config: WorkerConfig; constructor(config: WorkerConfig) { this.config = config; this.cron = new CronScheduler(); - this.adapters = new AdapterRegistry(); } async start(): Promise { @@ -25,14 +24,14 @@ class Worker { const adapter = createQueueAdapter(queueConfig); await adapter.start(); - this.adapters.add(adapter); + Worker.adapters.add(adapter); } } } async shutdown(): Promise { this.cron.stopAll(); - await this.adapters.shutdownAll(); + await Worker.adapters.shutdownAll(); } }