Skip to content

Add PostgreSQL as an alternative persistence backend#57

Open
yarlson wants to merge 5 commits intokibertoad:mainfrom
yarlson:feat/postgresql-persistence
Open

Add PostgreSQL as an alternative persistence backend#57
yarlson wants to merge 5 commits intokibertoad:mainfrom
yarlson:feat/postgresql-persistence

Conversation

@yarlson
Copy link
Copy Markdown

@yarlson yarlson commented Mar 27, 2026

Summary

fauxqs persistence is currently SQLite-only, which works well for single-instance local dev but limits use cases where shared persistent state across instances or integration with existing Postgres infrastructure is needed.

This adds PostgreSQL as an opt-in alternative backend — SQLite remains the default, no breaking changes to the env var surface or programmatic API.

Motivation

  • Enables shared state across multiple fauxqs instances (e.g. in CI or staging environments)
  • Lets teams reuse existing PostgreSQL infrastructure instead of managing SQLite files
  • Opens the door for future backends by introducing a PersistenceProvider interface

New configuration

Variable Default Description
FAUXQS_PERSISTENCE_BACKEND sqlite sqlite or postgresql
FAUXQS_POSTGRESQL_URL PostgreSQL connection string

Programmatic: startFauxqs({ persistenceBackend: 'postgresql', postgresqlUrl: '...' })

Test plan

  • All 797 existing tests pass (zero behavior change for SQLite path)
  • New PG integration tests use testcontainers to spin up a real PostgreSQL instance and verify SQS/SNS/S3 state survives server restart

Summary by CodeRabbit

  • New Features

    • Added PostgreSQL as an alternative persistence backend and an option to choose persistence backend at startup.
    • Programmatic server APIs for queues, topics, messages, and S3 now operate asynchronously (return promises).
  • Chores

    • Added integration tests validating PostgreSQL persistence across restarts and updated tests for async API behavior.

yarlson added 4 commits March 27, 2026 11:42
Move persistence.ts to persistence/sqlitePersistence.ts and extract a
PersistenceProvider interface that covers SQS, SNS, and S3 persistence.
Store property types now reference the interface instead of the concrete
SqlitePersistence class, enabling alternative backends.
All persistence interface methods now return void | Promise<void>
(or Buffer | Promise<Buffer> for readBody). Store methods and action
handlers that call persistence are now async with await. This enables
asynchronous backends (e.g. PostgreSQL) while keeping SQLite unchanged
since await on synchronous void is a no-op.
Implement PgPersistence as an alternative to SqlitePersistence, using
the pg library with Pool for async connection management. Activated via
FAUXQS_PERSISTENCE_BACKEND=postgresql and FAUXQS_POSTGRESQL_URL env vars,
or the programmatic API options persistenceBackend and postgresqlUrl.
SQLite remains the default. Includes a factory function createPersistence()
and skippable PG integration tests.
Replace env-var-gated test skipping with testcontainers to automatically
spin up a PostgreSQL container for integration tests.
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 27, 2026

📝 Walkthrough

Walkthrough

Adds PostgreSQL persistence backend and test support; converts core server API, stores, and action handlers for SQS/SNS/S3 to async/Promise-based implementations; introduces a unified PersistenceProvider and a PgPersistence implementation; updates startup/config to select backend and adds integration tests for Postgres persistence.

Changes

Cohort / File(s) Summary
Dependencies
package.json
Added runtime pg and dev deps @testcontainers/postgresql, @types/pg, testcontainers for PostgreSQL support and integration tests.
Core API & Server Setup
src/app.ts, src/initConfig.ts, src/server.ts
Made FauxqsServer APIs and init/config functions async (return Promises). Added persistenceBackend and postgresqlUrl options and runtime validation to select SQLite vs PostgreSQL.
Persistence Layer
src/persistence/index.ts, src/persistence/persistenceProvider.ts, src/persistence/pgPersistence.ts, src/persistence/sqlitePersistence.ts
Added PersistenceProvider interface and createPersistence() entry. Implemented PgPersistence with full SQL schema, transactional operations, load/purge/close methods. Renamed/updated SQLite persistence to implement PersistenceProvider and made load async.
S3 subsystem
src/s3/s3Persistence.ts, src/s3/s3Store.ts, src/s3/s3Router.ts, src/s3/actions/*.ts
Made S3 persistence interface accept async returns. Converted S3 store methods and all S3 action handlers to async and updated router to await handlers. Persistence reads may be awaited on-demand.
SQS subsystem
src/sqs/sqsStore.ts, src/sqs/actions/*.ts
Converted SqsStore, SqsQueue methods and all SQS action handlers to async (enqueue/dequeue/delete/changeVisibility/sequence counters), and updated DLQ and persistence interactions to await persistence operations.
SNS subsystem
src/sns/snsStore.ts, src/sns/actions/*.ts
Converted SNS store and action handlers (create/delete topic, subscribe/unsubscribe, publish/fan-out, attributes/tags) to async and await persistence updates; fan-out now awaits queue interactions.
Tests
test/persistence/pg-persistence.test.ts, test/programmatic-api.test.ts, test/...
Added Postgres integration tests using testcontainers; updated programmatic and persistence tests to await async server APIs and lifecycle (setup, reset, create/send/publish/delete ops).

Sequence Diagram(s)

sequenceDiagram
  participant Client as Client/Test
  participant Server as Fauxqs Server
  participant Factory as Persistence Factory
  participant DB as PostgreSQL

  Client->>Server: startFauxqs({ persistenceBackend: "postgresql", postgresqlUrl })
  Server->>Factory: createPersistence({ type: "postgresql", connectionString })
  Factory->>DB: pg.Pool(connect & run migrations) 
  DB-->>Factory: pool instance
  Factory-->>Server: PgPersistence
  Server->>Server: await setup(config)
  Server->>PgPersistence: await load(sqsStore,snsStore,s3Store)
  PgPersistence->>DB: SELECT/LOAD persisted rows
  DB-->>PgPersistence: rows
  PgPersistence-->>Server: restored state
  Client->>Server: SQS/SNS/S3 requests (send/publish/put)
  Server->>PgPersistence: await write-through operations (insertMessage/insertTopic/upsertObject)
  PgPersistence->>DB: INSERT/UPDATE/TRANSACTION
  DB-->>PgPersistence: OK
  PgPersistence-->>Server: ack
  Server-->>Client: response
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • add bucket lifecycle #55: Modifies S3 persistence and bucket lifecycle handling — closely related to the S3 persistence/store changes in this PR.

Suggested labels

minor

Poem

🐰 I hopped through async fields so green,
Promises stitched where callbacks had been,
Postgres planted, SQLite hums along,
Queues and buckets sing a transactional song,
A rabbit's cheer — persistence now strong! 🥕✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 1.92% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and concisely describes the main change: adding PostgreSQL as a persistence backend option.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
src/sqs/actions/tagQueue.ts (1)

15-20: ⚠️ Potential issue | 🟠 Major

Rollback tag mutations if the persistence write fails.

The queue's Map is mutated before the awaited write on Line 20. If that persistence call rejects, TagQueue returns an error after the tags were already changed in memory.

💡 Suggested change
+  const previousTags = new Map(queue.tags);
   const tags = (body.Tags as Record<string, string>) ?? {};
   for (const [key, value] of Object.entries(tags)) {
     queue.tags.set(key, value);
   }

-  await queue.persistence?.insertQueue(queue);
+  try {
+    await queue.persistence?.insertQueue(queue);
+  } catch (error) {
+    queue.tags = previousTags;
+    throw error;
+  }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sqs/actions/tagQueue.ts` around lines 15 - 20, The code mutates
queue.tags (a Map) before awaiting queue.persistence?.insertQueue(queue), so if
insertQueue rejects the in-memory tags remain changed; to fix, capture the
current state (e.g., clone entries from queue.tags) before applying the new
tags, apply the new tags as now, await queue.persistence?.insertQueue(queue),
and if the promise throws restore queue.tags to the saved state (or clear and
re-set entries) and re-throw the error; reference the tags variable, queue.tags
Map, and queue.persistence?.insertQueue(queue) (inside TagQueue) when making the
change.
src/sqs/sqsStore.ts (1)

244-250: ⚠️ Potential issue | 🔴 Critical

Rollback the ready→inflight transition if persistence rejects.

By the time these writes run, the message has already been removed from the ready queue and inserted into inflightMessages. If updateMessageInflight() rejects, ReceiveMessage fails after local state changed; in the FIFO path the group is still unlocked, so later messages can leapfrog the failed one. Please wrap this transition in a try/catch and restore the message to the head of the source queue/group before rethrowing.

Also applies to: 344-350

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sqs/sqsStore.ts` around lines 244 - 250, Wrap the call to
this.persistence.updateMessageInflight(...) inside a try/catch in the
ReceiveMessage code path: if updateMessageInflight rejects, catch the error,
remove the message from inflightMessages and reinsert it at the head of the
original ready queue or group (preserving FIFO group locking semantics), then
rethrow the error; apply the same pattern to the other occurrence around lines
344-350 so both updateMessageInflight usages restore local state on failure.
Ensure you reference and mutate the same structures used elsewhere
(inflightMessages and the ready queue/group) and keep
receiptHandle/visibilityDeadline arguments unchanged when rethrowing.
src/sqs/actions/createQueue.ts (1)

73-79: ⚠️ Potential issue | 🔴 Critical

Merged attributes on existing queue are not persisted.

When new attributes are merged into an existing queue (lines 73–78), the changes remain in-memory only and are never persisted. This contrasts with how state mutations are handled elsewhere (e.g., tagQueue.ts and untagQueue.ts explicitly call queue.persistence?.insertQueue(queue) after state changes). If the server restarts, merged attributes will be lost.

Add a persistence call after the merge to ensure consistency:

await existing.persistence?.insertQueue(existing);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sqs/actions/createQueue.ts` around lines 73 - 79, The merged attributes
are only kept in memory inside the createQueue logic (the for-loop that updates
existing.attributes) and must be persisted like other state changes; after the
loop that mutates existing.attributes in createQueue (the variable named
existing in src/sqs/actions/createQueue.ts), call and await
existing.persistence?.insertQueue(existing) so the updated attributes are saved
to durable storage.
src/sns/actions/tagResource.ts (1)

20-30: ⚠️ Potential issue | 🟠 Major

Rollback in-memory tag mutations when persistence fails.

Both actions update topic.tags before persisting. If insertTopic rejects, the request fails but the in-memory state stays changed, causing divergence from persisted state.

💡 Suggested fix (snapshot + rollback)
 export async function tagResource(
   params: Record<string, string>,
   snsStore: SnsStore,
 ): Promise<string> {
@@
   const topic = snsStore.getTopic(resourceArn);
@@
+  const previousTags = new Map(topic.tags);
+
   // Parse Tags.member.N.Key/Value
   for (const [key, value] of Object.entries(params)) {
@@
       topic.tags.set(value, tagValue);
     }
   }
 
-  await snsStore.persistence?.insertTopic(topic);
+  try {
+    await snsStore.persistence?.insertTopic(topic);
+  } catch (err) {
+    topic.tags.clear();
+    for (const [k, v] of previousTags) topic.tags.set(k, v);
+    throw err;
+  }
@@
 export async function untagResource(
   params: Record<string, string>,
   snsStore: SnsStore,
 ): Promise<string> {
@@
   const topic = snsStore.getTopic(resourceArn);
@@
+  const previousTags = new Map(topic.tags);
+
   // Parse TagKeys.member.N
   for (const [key, value] of Object.entries(params)) {
@@
       topic.tags.delete(value);
     }
   }
 
-  await snsStore.persistence?.insertTopic(topic);
+  try {
+    await snsStore.persistence?.insertTopic(topic);
+  } catch (err) {
+    topic.tags.clear();
+    for (const [k, v] of previousTags) topic.tags.set(k, v);
+    throw err;
+  }

Also applies to: 49-58

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sns/actions/tagResource.ts` around lines 20 - 30, The current loop
mutates topic.tags directly before calling
snsStore.persistence?.insertTopic(topic), so if insertTopic rejects the
in-memory topic stays changed; to fix, take a snapshot of the existing tags
(e.g., Array.from(topic.tags) or a cloned Map) before applying changes in
tagResource, apply the new tags to topic.tags, then await
snsStore.persistence?.insertTopic(topic) inside a try/catch and on any error
restore topic.tags from the snapshot (or replace with the cloned Map) and
rethrow the error; do the same snapshot/restore pattern for the other similar
mutation block referenced (lines 49-58) to ensure in-memory rollback on
persistence failure, and refer to topic.tags and
snsStore.persistence.insertTopic when locating the code.
src/sns/actions/setSubscriptionAttributes.ts (1)

42-50: ⚠️ Potential issue | 🟠 Major

Avoid mutating in-memory subscription state before awaited persistence.
At Line 42, state is updated first; if the awaited write at Lines 47-50 fails, memory and persisted state diverge.

Proposed fix
-    subscription.attributes[attributeName] = attributeValue;
+    const nextAttributes = {
+      ...subscription.attributes,
+      [attributeName]: attributeValue,
+    };

     // Invalidate cached parsed filter policy when relevant attributes change
     if (attributeName === "FilterPolicy" || attributeName === "FilterPolicyScope") {
       subscription.parsedFilterPolicy = undefined;
     }
-    await snsStore.persistence?.updateSubscriptionAttributes(
-      subscriptionArn,
-      subscription.attributes,
-    );
+    await snsStore.persistence?.updateSubscriptionAttributes(subscriptionArn, nextAttributes);
+    subscription.attributes = nextAttributes;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sns/actions/setSubscriptionAttributes.ts` around lines 42 - 50, The code
mutates subscription.attributes and subscription.parsedFilterPolicy before
awaiting persistence, risking divergence if updateSubscriptionAttributes fails;
change the flow to first create an updatedAttributes object (copy
subscription.attributes with attributeName set to attributeValue and derive
parsedFilterPolicy reset logic), call await
snsStore.persistence?.updateSubscriptionAttributes(subscriptionArn,
updatedAttributes), and only after successful persistence assign
subscription.attributes = updatedAttributes and update
subscription.parsedFilterPolicy as needed; use the existing symbols
subscription.attributes, parsedFilterPolicy, updateSubscriptionAttributes,
subscriptionArn, attributeName, and attributeValue to locate and apply this
change.
src/initConfig.ts (1)

206-213: ⚠️ Potential issue | 🟠 Major

Await S3 bucket initialization calls inside async init flow.
At Lines 206-213, bucket creation/lifecycle writes are not awaited. applyInitConfig can complete before S3 setup finishes, and async failures may escape init error handling.

Proposed fix
       if (typeof entry === "string") {
-        s3Store.createBucket(entry);
+        await s3Store.createBucket(entry);
       } else {
-        s3Store.createBucket(entry.name, entry.type);
+        await s3Store.createBucket(entry.name, entry.type);
         if (entry.lifecycleConfiguration) {
-          s3Store.putBucketLifecycleConfiguration(
+          await s3Store.putBucketLifecycleConfiguration(
             entry.name,
             lifecycleConfigToXml(entry.lifecycleConfiguration),
           );
         }
       }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/initConfig.ts` around lines 206 - 213, The S3 bucket creation and
lifecycle writes in applyInitConfig are fired without awaiting, so change the
calls to await s3Store.createBucket(...) and await
s3Store.putBucketLifecycleConfiguration(...) (or await the promise-returning
wrapper used by s3Store) and ensure applyInitConfig (or its caller) correctly
propagates errors; specifically update the branches where
s3Store.createBucket(entry) and s3Store.createBucket(entry.name, entry.type)
plus s3Store.putBucketLifecycleConfiguration(entry.name,
lifecycleConfigToXml(...)) are invoked to await their returned promises and let
failures bubble up to the async init flow.
src/sns/actions/publish.ts (1)

223-237: ⚠️ Potential issue | 🟠 Major

Catch per-entry fan-out failures inside publishBatch.

fanOutToSubscriptions() can now reject on async queue/persistence errors. One rejection aborts the whole batch after earlier entries may already have been delivered, and the failing entry never gets a <Failed> member. Handle that await per entry so the batch response matches the partial work already done.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sns/actions/publish.ts` around lines 223 - 237, In publishBatch, don’t
await fanOutToSubscriptions() for the whole batch at once; instead wrap the
await call for each entry in a try/catch inside the per-entry loop (the code
that currently calls fanOutToSubscriptions with topicArn, topic, messageId,
entry.message, entry.messageAttributes, subject, messageGroupId,
messageDeduplicationId, snsStore, sqsStore) so that when fanOutToSubscriptions
rejects you push a corresponding
<member><Id>...</Id><Code>...</Code><Message>...</Message></member> into the
failed list (the same structure used elsewhere) and continue processing
remaining entries; on success continue to push the successfulXml entry as
before. Ensure you include the entry.id and messageId in both success and
failure entries and do not let one rejection abort the entire publishBatch.
🧹 Nitpick comments (6)
test/persistence/pg-persistence.test.ts (1)

71-104: Close fauxqs servers in a finally path.

Each test stops its servers only on the happy path. If an SDK call or assertion throws earlier, the listener and DB connections stay open for the rest of the suite. Wrap each server lifetime in try/finally so failures clean up reliably.

Also applies to: 108-145, 149-184

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/persistence/pg-persistence.test.ts` around lines 71 - 104, The test
starts fauxqs servers via startFauxqs (server1, server2) and calls
serverX.stop() only on the happy path; wrap each server's lifecycle in
try/finally so stop() is always invoked. Specifically, for the blocks that
create server1/server2 and use
makeSqsClient/SendMessageCommand/GetQueueUrlCommand/ReceiveMessageCommand, move
the test logic into a try and call await serverX.stop() in the corresponding
finally; do the same for the other similar test ranges (lines referenced: the
other startFauxqs usages) to ensure listeners and DB connections are cleaned up
on failures.
src/persistence/pgPersistence.ts (4)

687-703: Queue restoration triggers unnecessary re-persistence.

When loading queues, sqsStore.createQueue() is called which will persist the queue again (via insertQueue). The comment acknowledges this but suggests overwriting timestamps in memory. A cleaner approach would be to use a separate restore method that doesn't trigger persistence.

This pattern is acceptable for now since ON CONFLICT DO UPDATE handles idempotency, but consider adding a restoreQueue method to SqsStore that bypasses persistence for cleaner separation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/pgPersistence.ts` around lines 687 - 703, loadSqsQueues
currently calls SqsStore.createQueue which re-persists queues during
restoration; add a non-persisting restore method on SqsStore (e.g.,
restoreQueue) and call it from loadSqsQueues instead of createQueue so
restoration populates in-memory queue objects without triggering insertQueue/DB
writes; ensure restoreQueue accepts the same params (name, url, arn, attributes,
tags) and sets createdTimestamp, lastModifiedTimestamp and sequenceCounter
directly without calling persistence code, then update loadSqsQueues to use
restoreQueue and keep the subsequent loadSqsMessages call.

870-908: N+1 query pattern in multipart upload loading.

For each multipart upload, a separate query fetches its parts. With many uploads, this becomes inefficient. Consider using a JOIN or fetching all parts in one query grouped by upload_id.

♻️ Suggested optimization to fetch parts in bulk
   private async loadS3MultipartUploads(s3Store: S3Store): Promise<void> {
     const uploadResult = await this.pool.query("SELECT * FROM s3_multipart_uploads");
+    const partResult = await this.pool.query("SELECT * FROM s3_multipart_parts");
+    
+    // Group parts by upload_id
+    const partsByUpload = new Map<string, typeof partResult.rows>();
+    for (const pRow of partResult.rows) {
+      const parts = partsByUpload.get(pRow.upload_id) ?? [];
+      parts.push(pRow);
+      partsByUpload.set(pRow.upload_id, parts);
+    }

     for (const row of uploadResult.rows) {
       const upload: MultipartUpload = {
         // ... existing fields ...
       };

-      const partResult = await this.pool.query(
-        "SELECT * FROM s3_multipart_parts WHERE upload_id = $1",
-        [row.upload_id],
-      );
-
-      for (const pRow of partResult.rows) {
+      for (const pRow of partsByUpload.get(row.upload_id) ?? []) {
         upload.parts.set(pRow.part_number, {
           // ... existing mapping ...
         });
       }

       s3Store.restoreMultipartUpload(upload);
     }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/pgPersistence.ts` around lines 870 - 908, In
loadS3MultipartUploads, avoid the N+1 pattern by replacing the per-upload query
of s3_multipart_parts with a single bulk query that selects all parts (e.g.,
SELECT * FROM s3_multipart_parts WHERE upload_id = ANY($1)) or a JOIN of
s3_multipart_uploads and s3_multipart_parts; group the returned part rows by
upload_id into a Map<number, Part[]> (or Map<string, Map<partNumber, Part>>),
then when iterating uploadResult.rows build each MultipartUpload and populate
upload.parts from the grouped parts map before calling
s3Store.restoreMultipartUpload(upload); keep JSON.parse(row.metadata) and the
field mappings intact and reference loadS3MultipartUploads,
s3_multipart_uploads, s3_multipart_parts, upload.parts, and
s3Store.restoreMultipartUpload when making the change.

746-760: Inline SQL duplicates updateMessageReady logic.

This inline query has the same issue as updateMessageReady - it preserves receive count correctly here but the method on line 234-240 doesn't. Additionally, this duplicates SQL that could call the existing method.

♻️ Consider reusing updateMessageReady after fixing it

After fixing updateMessageReady to preserve receive count:

         // Visibility expired — clear inflight state in DB
-        await this.pool.query(
-          `UPDATE sqs_messages SET receipt_handle = $1, visibility_deadline = $2, approximate_receive_count = $3, approximate_first_receive_timestamp = $4
-           WHERE message_id = $5`,
-          [
-            null,
-            null,
-            Number(row.approximate_receive_count),
-            row.approximate_first_receive_timestamp != null
-              ? Number(row.approximate_first_receive_timestamp)
-              : null,
-            row.message_id,
-          ],
-        );
+        await this.updateMessageReady(row.message_id);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/pgPersistence.ts` around lines 746 - 760, The inline UPDATE
duplicates logic from updateMessageReady and incorrectly duplicates behavior
instead of reusing a single method; first modify updateMessageReady to preserve
approximate_receive_count and approximate_first_receive_timestamp when clearing
visibility (accepting parameters or reading from the row) so it updates
receipt_handle and visibility_deadline while keeping receive counts intact, then
replace this.inline pool.query block with a call to
updateMessageReady(row.message_id, { receipt_handle: null, visibility_deadline:
null, approximate_receive_count: Number(row.approximate_receive_count),
approximate_first_receive_timestamp: row.approximate_first_receive_timestamp !=
null ? Number(row.approximate_first_receive_timestamp) : null }); reference
updateMessageReady and the current pool.query usage that reads row.message_id,
row.approximate_receive_count, and row.approximate_first_receive_timestamp to
locate the code to change.

11-105: Consider adding indexes for common query patterns.

The schema creates tables with primary keys but lacks indexes on foreign key columns and commonly queried fields. This could impact query performance as data grows.

📊 Suggested indexes to add
 const SCHEMA_STATEMENTS = [
   // ... existing table definitions ...
+  `CREATE INDEX IF NOT EXISTS idx_sqs_messages_queue_name ON sqs_messages(queue_name)`,
+  `CREATE INDEX IF NOT EXISTS idx_sns_subscriptions_topic_arn ON sns_subscriptions(topic_arn)`,
+  `CREATE INDEX IF NOT EXISTS idx_s3_objects_bucket ON s3_objects(bucket)`,
 ];
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/pgPersistence.ts` around lines 11 - 105, SCHEMA_STATEMENTS
currently defines table DDLs but lacks indexes on FK and high-cardinality query
columns; add CREATE INDEX statements to the SCHEMA_STATEMENTS array for columns
such as sqs_messages.queue_name, sqs_messages.sent_timestamp (or
visibility_deadline), sns_subscriptions.topic_arn, s3_objects.bucket (and
optionally s3_objects.last_modified or etag if queried),
s3_multipart_parts.upload_id and s3_multipart_parts.part_number to improve
lookup and join performance; update the SCHEMA_STATEMENTS constant to include
those CREATE INDEX IF NOT EXISTS ... ON ... statements so migrations create
indexes alongside tables.
src/s3/s3Store.ts (1)

380-387: Consider using Promise.all for parallel deletion.

The current implementation uses sequential await in a loop, which can be slow for bulk deletions. Since each deleteObject call is independent, these can be parallelized.

However, this may be intentional to avoid overwhelming the database with concurrent requests, so marking as optional.

♻️ Optional refactor for parallel deletion
   async deleteObjects(bucket: string, keys: string[]): Promise<string[]> {
-    const deleted: string[] = [];
-    for (const key of keys) {
-      await this.deleteObject(bucket, key);
-      deleted.push(key);
-    }
-    return deleted;
+    await Promise.all(keys.map((key) => this.deleteObject(bucket, key)));
+    return keys;
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/s3/s3Store.ts` around lines 380 - 387, The deleteObjects implementation
in deleteObjects currently awaits deleteObject sequentially, causing slow bulk
deletions; change it to run deletions in parallel by mapping keys to promises
from this.deleteObject(bucket, key) and awaiting Promise.all to collect results,
ensuring you still return the array of deleted keys (preserve order by awaiting
Promise.all on the mapped array); if you need to limit concurrency to avoid
overwhelming services, use a concurrency-controlled mapper (e.g., p-map or a
simple batching loop) instead of firing all promises at once.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/app.ts`:
- Around line 406-418: The code may silently start without persistence when
persistenceBackend is "postgresql" but options.postgresqlUrl is missing and it
doesn't honor environment variables; update the startup logic to read
FAUXQS_PERSISTENCE_BACKEND and FAUXQS_POSTGRESQL_URL (falling back to
options.persistenceBackend and options.postgresqlUrl), use that resolved backend
value for the existing branch logic, and add validation in the branch handling
postgresql (where createPersistence is invoked) to throw/exit with a clear error
if no postgresql URL is available; keep existing SqlitePersistence instantiation
for sqlite fallback and reference the symbols
persistenceBackend/options.postgresqlUrl/createPersistence/SqlitePersistence/persistenceManager
when making changes.

In `@src/persistence/pgPersistence.ts`:
- Around line 804-815: The boolean conversion for `confirmed` is fragile because
`row.confirmed === 1` only handles integer representations; update the
assignment in the loop that builds the `SnsSubscription` (the `for (const row of
result.rows)` block) to robustly handle both PostgreSQL boolean and integer
types—for example, set `confirmed` using a normalized check like
`Boolean(row.confirmed)` (or `row.confirmed === true || row.confirmed === 1`)
when constructing the `sub` object so `snsStore.subscriptions.set(row.arn, sub)`
always receives a proper boolean.
- Around line 114-120: The schema initialization in PgPersistence.create
currently runs SCHEMA_STATEMENTS directly against a new pg.Pool and can leave a
partial schema on failure; update PgPersistence.create to begin a transaction on
a client from the pool, execute each statement (referencing SCHEMA_STATEMENTS)
within that transaction, then commit if all succeed or rollback on any error
(and rethrow), ensuring the pool/client are properly released; return new
PgPersistence(pool) only after a successful commit.
- Around line 234-240: updateMessageReady currently writes NULL into
approximate_receive_count (and likely approximate_first_receive_timestamp),
which loses receive-count data; change the SQL in updateMessageReady so it only
clears receipt_handle and visibility_deadline (set them to NULL) and does NOT
overwrite approximate_receive_count (and leave
approximate_first_receive_timestamp untouched) — update the pool.query call in
updateMessageReady to remove approximate_receive_count from the SET clause and
adjust the parameter list accordingly so messageId remains the locator.

In `@src/server.ts`:
- Around line 5-20: Validate FAUXQS_PERSISTENCE_BACKEND, FAUXQS_PERSISTENCE
(persistenceEnv), and required backend-specific vars before calling startFauxqs:
ensure persistenceBackend is one of the allowed values ("sqlite" or
"postgresql"), require persistenceEnv === "true" to enable persistence, and if
persistenceBackend === "sqlite" assert dataDir is present, or if "postgresql"
assert postgresqlUrl is present (fail fast with a thrown error or
process.exit(1) and a clear logger.error message). Use the existing symbols
(persistenceBackend, persistenceEnv, dataDir, postgresqlUrl, s3StorageDir if
applicable, enablePersistence, startFauxqs) to gate constructing the options and
prevent silently falling back to in-memory behavior.

In `@src/sns/actions/publish.ts`:
- Around line 337-345: The FIFO dedup sequence is not atomic: replace the
separate calls to queue.checkDeduplication(), queue.nextSequenceNumber(), and
queue.recordDeduplication() with a single queue-level async critical operation
(e.g., add a method like queue.reserveDeduplication(messageDeduplicationId,
messageId) or queue.beginFifoPublish(messageDeduplicationId, messageId)) that
atomically checks for duplicates and records the dedup entry while returning the
assigned sequenceNumber; update the caller inside the if (queue.isFifo() &&
messageDeduplicationId) block to call that single async method and set
sqsMsg.sequenceNumber from its result; also ensure there is a clear rollback API
(e.g., queue.releaseDeduplication(messageDeduplicationId) or a commit/release
handle returned by the reserve method) and call it if await
queue.enqueue(sqsMsg) fails so the dedup reservation is not left orphaned.

In `@src/sns/actions/setTopicAttributes.ts`:
- Around line 36-37: The code mutates topic.attributes
(topic.attributes[attributeName] = attributeValue) before awaiting persistence
(snsStore.persistence?.updateTopicAttributes), which can leave in-memory state
inconsistent if the write fails; instead create a newAttributes snapshot (e.g.,
shallow copy of topic.attributes with attributeName set to attributeValue) and
call snsStore.persistence?.updateTopicAttributes(topicArn, newAttributes) first,
then assign topic.attributes = newAttributes only after the await succeeds, or
alternatively perform the in-memory mutation but wrap the await in a try/catch
and revert topic.attributes back to its original value on error; reference
topic.attributes, attributeName, attributeValue, topicArn and
snsStore.persistence.updateTopicAttributes (SetTopicAttributes flow) when making
the change.

In `@src/sns/snsStore.ts`:
- Around line 67-69: The in-memory maps are being mutated before awaiting
persistence (e.g., calls that do this.topics.set(arn, topic) then await
this.persistence?.insertTopic(...) and similarly for this.subscriptions.set(...)
with insertSubscription), which can expose half-applied state or leave
memory/persistence inconsistent on failure; change the flow to await the
persistence write first (await
this.persistence?.insertTopic/insertSubscription(...)) and only on success
update the in-memory maps, or wrap the current order in try/catch and on
persistence failure remove/rollback the in-memory entry (e.g., remove the key
from topics/subscriptions in the catch and rethrow) so memory and DB cannot get
out of sync.

In `@src/sqs/sqsStore.ts`:
- Around line 229-230: The delete-then-enqueue sequence (calls to
persistence.deleteMessage and dlq.enqueue) must be made atomic: either add a
single transactional persistence operation (e.g., implement and call
persistence.transferToDlq(msg) that enqueues to DLQ and deletes from source in
one DB transaction) or, if a DB transaction is not possible, perform the enqueue
first and only delete on success, and on enqueue failure perform an explicit
rollback by re-adding the message to the source (e.g., persistence.requeue(msg))
with retries and error handling; update both occurrences (the calls around
deleteMessage/dlq.enqueue and the similar block at the other spot) to use the
new transfer or the enqueue-then-conditional-delete-with-rollback pattern and
surface any errors so failures don’t cause message loss.
- Around line 558-566: The purge() implementation must snapshot current
in-memory structures, call await this.persistence.deleteQueueMessages(this.name)
before committing the in-memory clears, and restore the snapshots if the delete
call rejects; update purge() to perform: 1) create deep copies of this.messages,
this.delayedMessages, this.inflightMessages, this.fifoMessages,
this.fifoDelayed, this.fifoLockedGroups, 2) attempt await
this.persistence?.deleteQueueMessages(this.name), 3) on success clear the
originals, and 4) on failure restore the snapshots and rethrow the error. Also
make SqsStore.clearMessages() async and have it await this.purge() (instead of
fire-and-forget) so callers observe and handle failures from purge().
- Around line 121-132: The current setAttributes mutates this.attributes and
this.lastModifiedTimestamp before awaiting persistence.updateQueueAttributes,
risking in-memory state changes on failure and concurrent mutation leaks;
instead, create a staged copy (e.g., const stagedAttrs = { ...this.attributes,
...attrs }), apply the RedrivePolicy empty check to stagedAttrs, compute a local
stagedTimestamp, call await this.persistence.updateQueueAttributes(this.name,
stagedAttrs, stagedTimestamp), and only after that assignment succeeds assign
this.attributes = stagedAttrs and this.lastModifiedTimestamp = stagedTimestamp
so the in-memory state is updated atomically on successful persistence in
setAttributes.
- Around line 422-425: The inflight entry is being deleted before the awaited
persistence write, causing loss of state if updateMessageReady fails; change the
logic in the timeout branch so you do not call
this.inflightMessages.delete(receiptHandle) until after await
this.persistence?.updateMessageReady(entry.message.messageId) completes
successfully (or, if you must delete first, re-add it on failure). Ensure any
errors from updateMessageReady are propagated/handled (try/catch) so FIFO locks
controlled via isFifo() and entry.message.messageGroupId remain consistent and
are only released when persistence has reliably marked the message ready.

---

Outside diff comments:
In `@src/initConfig.ts`:
- Around line 206-213: The S3 bucket creation and lifecycle writes in
applyInitConfig are fired without awaiting, so change the calls to await
s3Store.createBucket(...) and await s3Store.putBucketLifecycleConfiguration(...)
(or await the promise-returning wrapper used by s3Store) and ensure
applyInitConfig (or its caller) correctly propagates errors; specifically update
the branches where s3Store.createBucket(entry) and
s3Store.createBucket(entry.name, entry.type) plus
s3Store.putBucketLifecycleConfiguration(entry.name, lifecycleConfigToXml(...))
are invoked to await their returned promises and let failures bubble up to the
async init flow.

In `@src/sns/actions/publish.ts`:
- Around line 223-237: In publishBatch, don’t await fanOutToSubscriptions() for
the whole batch at once; instead wrap the await call for each entry in a
try/catch inside the per-entry loop (the code that currently calls
fanOutToSubscriptions with topicArn, topic, messageId, entry.message,
entry.messageAttributes, subject, messageGroupId, messageDeduplicationId,
snsStore, sqsStore) so that when fanOutToSubscriptions rejects you push a
corresponding
<member><Id>...</Id><Code>...</Code><Message>...</Message></member> into the
failed list (the same structure used elsewhere) and continue processing
remaining entries; on success continue to push the successfulXml entry as
before. Ensure you include the entry.id and messageId in both success and
failure entries and do not let one rejection abort the entire publishBatch.

In `@src/sns/actions/setSubscriptionAttributes.ts`:
- Around line 42-50: The code mutates subscription.attributes and
subscription.parsedFilterPolicy before awaiting persistence, risking divergence
if updateSubscriptionAttributes fails; change the flow to first create an
updatedAttributes object (copy subscription.attributes with attributeName set to
attributeValue and derive parsedFilterPolicy reset logic), call await
snsStore.persistence?.updateSubscriptionAttributes(subscriptionArn,
updatedAttributes), and only after successful persistence assign
subscription.attributes = updatedAttributes and update
subscription.parsedFilterPolicy as needed; use the existing symbols
subscription.attributes, parsedFilterPolicy, updateSubscriptionAttributes,
subscriptionArn, attributeName, and attributeValue to locate and apply this
change.

In `@src/sns/actions/tagResource.ts`:
- Around line 20-30: The current loop mutates topic.tags directly before calling
snsStore.persistence?.insertTopic(topic), so if insertTopic rejects the
in-memory topic stays changed; to fix, take a snapshot of the existing tags
(e.g., Array.from(topic.tags) or a cloned Map) before applying changes in
tagResource, apply the new tags to topic.tags, then await
snsStore.persistence?.insertTopic(topic) inside a try/catch and on any error
restore topic.tags from the snapshot (or replace with the cloned Map) and
rethrow the error; do the same snapshot/restore pattern for the other similar
mutation block referenced (lines 49-58) to ensure in-memory rollback on
persistence failure, and refer to topic.tags and
snsStore.persistence.insertTopic when locating the code.

In `@src/sqs/actions/createQueue.ts`:
- Around line 73-79: The merged attributes are only kept in memory inside the
createQueue logic (the for-loop that updates existing.attributes) and must be
persisted like other state changes; after the loop that mutates
existing.attributes in createQueue (the variable named existing in
src/sqs/actions/createQueue.ts), call and await
existing.persistence?.insertQueue(existing) so the updated attributes are saved
to durable storage.

In `@src/sqs/actions/tagQueue.ts`:
- Around line 15-20: The code mutates queue.tags (a Map) before awaiting
queue.persistence?.insertQueue(queue), so if insertQueue rejects the in-memory
tags remain changed; to fix, capture the current state (e.g., clone entries from
queue.tags) before applying the new tags, apply the new tags as now, await
queue.persistence?.insertQueue(queue), and if the promise throws restore
queue.tags to the saved state (or clear and re-set entries) and re-throw the
error; reference the tags variable, queue.tags Map, and
queue.persistence?.insertQueue(queue) (inside TagQueue) when making the change.

In `@src/sqs/sqsStore.ts`:
- Around line 244-250: Wrap the call to
this.persistence.updateMessageInflight(...) inside a try/catch in the
ReceiveMessage code path: if updateMessageInflight rejects, catch the error,
remove the message from inflightMessages and reinsert it at the head of the
original ready queue or group (preserving FIFO group locking semantics), then
rethrow the error; apply the same pattern to the other occurrence around lines
344-350 so both updateMessageInflight usages restore local state on failure.
Ensure you reference and mutate the same structures used elsewhere
(inflightMessages and the ready queue/group) and keep
receiptHandle/visibilityDeadline arguments unchanged when rethrowing.

---

Nitpick comments:
In `@src/persistence/pgPersistence.ts`:
- Around line 687-703: loadSqsQueues currently calls SqsStore.createQueue which
re-persists queues during restoration; add a non-persisting restore method on
SqsStore (e.g., restoreQueue) and call it from loadSqsQueues instead of
createQueue so restoration populates in-memory queue objects without triggering
insertQueue/DB writes; ensure restoreQueue accepts the same params (name, url,
arn, attributes, tags) and sets createdTimestamp, lastModifiedTimestamp and
sequenceCounter directly without calling persistence code, then update
loadSqsQueues to use restoreQueue and keep the subsequent loadSqsMessages call.
- Around line 870-908: In loadS3MultipartUploads, avoid the N+1 pattern by
replacing the per-upload query of s3_multipart_parts with a single bulk query
that selects all parts (e.g., SELECT * FROM s3_multipart_parts WHERE upload_id =
ANY($1)) or a JOIN of s3_multipart_uploads and s3_multipart_parts; group the
returned part rows by upload_id into a Map<number, Part[]> (or Map<string,
Map<partNumber, Part>>), then when iterating uploadResult.rows build each
MultipartUpload and populate upload.parts from the grouped parts map before
calling s3Store.restoreMultipartUpload(upload); keep JSON.parse(row.metadata)
and the field mappings intact and reference loadS3MultipartUploads,
s3_multipart_uploads, s3_multipart_parts, upload.parts, and
s3Store.restoreMultipartUpload when making the change.
- Around line 746-760: The inline UPDATE duplicates logic from
updateMessageReady and incorrectly duplicates behavior instead of reusing a
single method; first modify updateMessageReady to preserve
approximate_receive_count and approximate_first_receive_timestamp when clearing
visibility (accepting parameters or reading from the row) so it updates
receipt_handle and visibility_deadline while keeping receive counts intact, then
replace this.inline pool.query block with a call to
updateMessageReady(row.message_id, { receipt_handle: null, visibility_deadline:
null, approximate_receive_count: Number(row.approximate_receive_count),
approximate_first_receive_timestamp: row.approximate_first_receive_timestamp !=
null ? Number(row.approximate_first_receive_timestamp) : null }); reference
updateMessageReady and the current pool.query usage that reads row.message_id,
row.approximate_receive_count, and row.approximate_first_receive_timestamp to
locate the code to change.
- Around line 11-105: SCHEMA_STATEMENTS currently defines table DDLs but lacks
indexes on FK and high-cardinality query columns; add CREATE INDEX statements to
the SCHEMA_STATEMENTS array for columns such as sqs_messages.queue_name,
sqs_messages.sent_timestamp (or visibility_deadline),
sns_subscriptions.topic_arn, s3_objects.bucket (and optionally
s3_objects.last_modified or etag if queried), s3_multipart_parts.upload_id and
s3_multipart_parts.part_number to improve lookup and join performance; update
the SCHEMA_STATEMENTS constant to include those CREATE INDEX IF NOT EXISTS ...
ON ... statements so migrations create indexes alongside tables.

In `@src/s3/s3Store.ts`:
- Around line 380-387: The deleteObjects implementation in deleteObjects
currently awaits deleteObject sequentially, causing slow bulk deletions; change
it to run deletions in parallel by mapping keys to promises from
this.deleteObject(bucket, key) and awaiting Promise.all to collect results,
ensuring you still return the array of deleted keys (preserve order by awaiting
Promise.all on the mapped array); if you need to limit concurrency to avoid
overwhelming services, use a concurrency-controlled mapper (e.g., p-map or a
simple batching loop) instead of firing all promises at once.

In `@test/persistence/pg-persistence.test.ts`:
- Around line 71-104: The test starts fauxqs servers via startFauxqs (server1,
server2) and calls serverX.stop() only on the happy path; wrap each server's
lifecycle in try/finally so stop() is always invoked. Specifically, for the
blocks that create server1/server2 and use
makeSqsClient/SendMessageCommand/GetQueueUrlCommand/ReceiveMessageCommand, move
the test logic into a try and call await serverX.stop() in the corresponding
finally; do the same for the other similar test ranges (lines referenced: the
other startFauxqs usages) to ensure listeners and DB connections are cleaned up
on failures.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c9988cbf-c860-45da-a305-9ba5495720e6

📥 Commits

Reviewing files that changed from the base of the PR and between 13d76c6 and 95a63b5.

⛔ Files ignored due to path filters (1)
  • package-lock.json is excluded by !**/package-lock.json
📒 Files selected for processing (51)
  • package.json
  • src/app.ts
  • src/initConfig.ts
  • src/persistence/index.ts
  • src/persistence/persistenceProvider.ts
  • src/persistence/pgPersistence.ts
  • src/persistence/sqlitePersistence.ts
  • src/s3/actions/abortMultipartUpload.ts
  • src/s3/actions/completeMultipartUpload.ts
  • src/s3/actions/createBucket.ts
  • src/s3/actions/createMultipartUpload.ts
  • src/s3/actions/deleteBucket.ts
  • src/s3/actions/deleteBucketLifecycleConfiguration.ts
  • src/s3/actions/deleteObject.ts
  • src/s3/actions/deleteObjects.ts
  • src/s3/actions/getObject.ts
  • src/s3/actions/postObject.ts
  • src/s3/actions/putBucketLifecycleConfiguration.ts
  • src/s3/actions/putObject.ts
  • src/s3/actions/renameObject.ts
  • src/s3/actions/uploadPart.ts
  • src/s3/s3Persistence.ts
  • src/s3/s3Router.ts
  • src/s3/s3Store.ts
  • src/server.ts
  • src/sns/actions/confirmSubscription.ts
  • src/sns/actions/createTopic.ts
  • src/sns/actions/deleteTopic.ts
  • src/sns/actions/publish.ts
  • src/sns/actions/setSubscriptionAttributes.ts
  • src/sns/actions/setTopicAttributes.ts
  • src/sns/actions/subscribe.ts
  • src/sns/actions/tagResource.ts
  • src/sns/actions/unsubscribe.ts
  • src/sns/snsStore.ts
  • src/sqs/actions/changeMessageVisibility.ts
  • src/sqs/actions/changeMessageVisibilityBatch.ts
  • src/sqs/actions/createQueue.ts
  • src/sqs/actions/deleteMessage.ts
  • src/sqs/actions/deleteMessageBatch.ts
  • src/sqs/actions/deleteQueue.ts
  • src/sqs/actions/purgeQueue.ts
  • src/sqs/actions/receiveMessage.ts
  • src/sqs/actions/sendMessage.ts
  • src/sqs/actions/sendMessageBatch.ts
  • src/sqs/actions/setQueueAttributes.ts
  • src/sqs/actions/tagQueue.ts
  • src/sqs/actions/untagQueue.ts
  • src/sqs/sqsStore.ts
  • test/persistence/pg-persistence.test.ts
  • test/programmatic-api.test.ts

Comment thread src/app.ts
Comment thread src/persistence/pgPersistence.ts
Comment thread src/persistence/pgPersistence.ts
Comment thread src/persistence/pgPersistence.ts
Comment thread src/server.ts Outdated
Comment thread src/sns/snsStore.ts
Comment on lines 67 to 69
this.topics.set(arn, topic);
this.persistence?.insertTopic(topic);
await this.persistence?.insertTopic(topic);
return topic;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't expose half-applied SNS mutations across awaited persistence writes.

These methods mutate topics/subscriptions before the awaited provider calls finish. With PostgreSQL, concurrent requests can observe entities that are not committed yet, and a rejection leaves memory and persistence out of sync until restart. Persist first, or add rollback/transaction handling around the in-memory mutation.

Also applies to: 77-84, 150-154, 161-169

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sns/snsStore.ts` around lines 67 - 69, The in-memory maps are being
mutated before awaiting persistence (e.g., calls that do this.topics.set(arn,
topic) then await this.persistence?.insertTopic(...) and similarly for
this.subscriptions.set(...) with insertSubscription), which can expose
half-applied state or leave memory/persistence inconsistent on failure; change
the flow to await the persistence write first (await
this.persistence?.insertTopic/insertSubscription(...)) and only on success
update the in-memory maps, or wrap the current order in try/catch and on
persistence failure remove/rollback the in-memory entry (e.g., remove the key
from topics/subscriptions in the catch and rethrow) so memory and DB cannot get
out of sync.

Comment thread src/sqs/sqsStore.ts
Comment on lines +121 to +132
async setAttributes(attrs: Record<string, string>): Promise<void> {
Object.assign(this.attributes, attrs);
// Empty RedrivePolicy clears the DLQ association
if (attrs.RedrivePolicy === "") {
delete this.attributes.RedrivePolicy;
}
this.lastModifiedTimestamp = Math.floor(Date.now() / 1000);
this.persistence?.updateQueueAttributes(this.name, this.attributes, this.lastModifiedTimestamp);
await this.persistence?.updateQueueAttributes(
this.name,
this.attributes,
this.lastModifiedTimestamp,
);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Stage queue attribute changes until the persistence write succeeds.

this.attributes and lastModifiedTimestamp are mutated before the awaited write. If persistence fails here, SetQueueAttributes returns an error after the queue was already changed in memory, and the shared this.attributes object can leak concurrent mutations into the persisted payload.

💡 Suggested change
-    Object.assign(this.attributes, attrs);
-    // Empty RedrivePolicy clears the DLQ association
-    if (attrs.RedrivePolicy === "") {
-      delete this.attributes.RedrivePolicy;
-    }
-    this.lastModifiedTimestamp = Math.floor(Date.now() / 1000);
-    await this.persistence?.updateQueueAttributes(
-      this.name,
-      this.attributes,
-      this.lastModifiedTimestamp,
-    );
+    const nextAttributes = { ...this.attributes, ...attrs };
+    if (attrs.RedrivePolicy === "") {
+      delete nextAttributes.RedrivePolicy;
+    }
+    const nextLastModifiedTimestamp = Math.floor(Date.now() / 1000);
+    await this.persistence?.updateQueueAttributes(
+      this.name,
+      nextAttributes,
+      nextLastModifiedTimestamp,
+    );
+    this.attributes = nextAttributes;
+    this.lastModifiedTimestamp = nextLastModifiedTimestamp;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sqs/sqsStore.ts` around lines 121 - 132, The current setAttributes
mutates this.attributes and this.lastModifiedTimestamp before awaiting
persistence.updateQueueAttributes, risking in-memory state changes on failure
and concurrent mutation leaks; instead, create a staged copy (e.g., const
stagedAttrs = { ...this.attributes, ...attrs }), apply the RedrivePolicy empty
check to stagedAttrs, compute a local stagedTimestamp, call await
this.persistence.updateQueueAttributes(this.name, stagedAttrs, stagedTimestamp),
and only after that assignment succeeds assign this.attributes = stagedAttrs and
this.lastModifiedTimestamp = stagedTimestamp so the in-memory state is updated
atomically on successful persistence in setAttributes.

Comment thread src/sqs/sqsStore.ts
Comment on lines +229 to +230
await this.persistence?.deleteMessage(msg.messageId);
await dlq.enqueue(msg);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Make the DLQ handoff atomic.

The message has already been shifted out of the source queue when these awaits run. If the source delete succeeds and dlq.enqueue(msg) fails — or if the delete rejects after the local pop — the message is no longer in the source queue and never reaches the DLQ. This needs a single transactional persistence operation, or explicit rollback to the source queue on failure.

Also applies to: 329-330

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sqs/sqsStore.ts` around lines 229 - 230, The delete-then-enqueue sequence
(calls to persistence.deleteMessage and dlq.enqueue) must be made atomic: either
add a single transactional persistence operation (e.g., implement and call
persistence.transferToDlq(msg) that enqueues to DLQ and deletes from source in
one DB transaction) or, if a DB transaction is not possible, perform the enqueue
first and only delete on success, and on enqueue failure perform an explicit
rollback by re-adding the message to the source (e.g., persistence.requeue(msg))
with retries and error handling; update both occurrences (the calls around
deleteMessage/dlq.enqueue and the similar block at the other spot) to use the
new transfer or the enqueue-then-conditional-delete-with-rollback pattern and
surface any errors so failures don’t cause message loss.

Comment thread src/sqs/sqsStore.ts
Comment on lines 422 to 425
if (timeoutSeconds === 0) {
this.inflightMessages.delete(receiptHandle);
this.persistence?.updateMessageReady(entry.message.messageId);
await this.persistence?.updateMessageReady(entry.message.messageId);
if (this.isFifo() && entry.message.messageGroupId) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't drop the inflight entry before updateMessageReady() succeeds.

Line 423 removes the only local handle to the message before the awaited persistence write on Line 424. If that write fails, the message is neither inflight nor ready, and FIFO group locks never get released.

💡 Suggested change
       this.inflightMessages.delete(receiptHandle);
-      await this.persistence?.updateMessageReady(entry.message.messageId);
+      try {
+        await this.persistence?.updateMessageReady(entry.message.messageId);
+      } catch (error) {
+        this.inflightMessages.set(receiptHandle, entry);
+        throw error;
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (timeoutSeconds === 0) {
this.inflightMessages.delete(receiptHandle);
this.persistence?.updateMessageReady(entry.message.messageId);
await this.persistence?.updateMessageReady(entry.message.messageId);
if (this.isFifo() && entry.message.messageGroupId) {
if (timeoutSeconds === 0) {
this.inflightMessages.delete(receiptHandle);
try {
await this.persistence?.updateMessageReady(entry.message.messageId);
} catch (error) {
this.inflightMessages.set(receiptHandle, entry);
throw error;
}
if (this.isFifo() && entry.message.messageGroupId) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sqs/sqsStore.ts` around lines 422 - 425, The inflight entry is being
deleted before the awaited persistence write, causing loss of state if
updateMessageReady fails; change the logic in the timeout branch so you do not
call this.inflightMessages.delete(receiptHandle) until after await
this.persistence?.updateMessageReady(entry.message.messageId) completes
successfully (or, if you must delete first, re-add it on failure). Ensure any
errors from updateMessageReady are propagated/handled (try/catch) so FIFO locks
controlled via isFifo() and entry.message.messageGroupId remain consistent and
are only released when persistence has reliably marked the message ready.

Comment thread src/sqs/sqsStore.ts
Comment on lines +558 to 566
async purge(): Promise<void> {
this.messages = [];
this.delayedMessages = [];
this.inflightMessages.clear();
this.fifoMessages.clear();
this.fifoDelayed.clear();
this.fifoLockedGroups.clear();
this.persistence?.deleteQueueMessages(this.name);
await this.persistence?.deleteQueueMessages(this.name);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

purge() now needs rollback semantics and an awaited caller.

This clears every in-memory message structure before the awaited delete on Line 565. If that call rejects, PurgeQueue fails after the live queue was already emptied. Also, SqsStore.clearMessages() still fire-and-forgets this promise, so failures can go unhandled and callers can observe stale persisted data. Please snapshot/restore on error and make clearMessages() async so it awaits each purge.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sqs/sqsStore.ts` around lines 558 - 566, The purge() implementation must
snapshot current in-memory structures, call await
this.persistence.deleteQueueMessages(this.name) before committing the in-memory
clears, and restore the snapshots if the delete call rejects; update purge() to
perform: 1) create deep copies of this.messages, this.delayedMessages,
this.inflightMessages, this.fifoMessages, this.fifoDelayed,
this.fifoLockedGroups, 2) attempt await
this.persistence?.deleteQueueMessages(this.name), 3) on success clear the
originals, and 4) on failure restore the snapshots and rethrow the error. Also
make SqsStore.clearMessages() async and have it await this.purge() (instead of
fire-and-forget) so callers observe and handle failures from purge().

… bug

- Fail fast on invalid/missing persistence config in server.ts and app.ts
- Wrap PG schema creation in a transaction for atomic DDL
- Fix updateMessageReady zeroing out receive count and first-receive timestamp
- Use Boolean() for confirmed column comparison in PG persistence
- Make clearMessages() and reset() async to properly await purge calls
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/app.ts (1)

293-325: ⚠️ Potential issue | 🟠 Major

This changes the public programmatic API in a breaking way.

These methods used to be synchronous and now return Promises. Existing callers that read the returned object synchronously or rely on sync sequencing will now get a pending promise unless they update every call site to await, which conflicts with the PR's non-breaking goal.

Also applies to: 331-369

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app.ts` around lines 293 - 325, The public API was accidentally changed
to async: restore the original synchronous signatures for inspectQueue,
createTopic, subscribe (and the other methods flagged around 331-369) so callers
aren’t broken; either revert the return types from Promise<T> back to their
original synchronous types and implement them synchronously, or else add new
explicitly-named async variants (e.g.,
inspectQueueAsync/createTopicAsync/subscribeAsync) and keep the original
inspectQueue/createTopic/subscribe signatures intact. Ensure the implementations
and exported types for inspectQueue, createTopic, and subscribe match their
previous sync contracts and update any internal callers accordingly.
src/sqs/sqsStore.ts (1)

244-250: ⚠️ Potential issue | 🔴 Critical

Rollback the local dequeue state if updateMessageInflight() fails.

Both dequeue paths have already removed the message from the ready collection and incremented receive metadata before this awaited write. A rejection here leaves standard messages hidden, overstates receive counts, and in the FIFO path the group lock is still not acquired yet, so later same-group messages can leak out of order.

Also applies to: 344-350

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sqs/sqsStore.ts` around lines 244 - 250, The awaited call to
persistence.updateMessageInflight (and the same block around lines 344-350) can
throw after we've already removed the message from the ready collection and
bumped receive metadata; on failure we must rollback local dequeue state. Wrap
the updateMessageInflight call in a try/catch, and on catch: re-insert the
message back into the ready collection exactly as it was before removal,
restore/decrement the receive metadata fields you incremented
(approximateReceiveCount and approximateFirstReceiveTimestamp as applicable),
ensure any FIFO group state remains unmodified (don't mark the group as dequeued
or acquire the group lock), then rethrow the error so the caller knows the
persistence write failed. Use the existing symbols msg, receiptHandle,
visibilityDeadline, updateMessageInflight and the local ready collection /
receive-metadata-updating logic to implement the rollback.
♻️ Duplicate comments (3)
src/sqs/sqsStore.ts (3)

558-565: ⚠️ Potential issue | 🔴 Critical

Add rollback semantics to purge().

purge() clears every in-memory collection before the awaited deleteQueueMessages(). If persistence rejects, PurgeQueue fails after the running queue has already been emptied. Snapshot/restore the collections, or delete persisted rows first and only then commit the in-memory clear.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sqs/sqsStore.ts` around lines 558 - 565, The purge() implementation
clears all in-memory collections (messages, delayedMessages, inflightMessages,
fifoMessages, fifoDelayed, fifoLockedGroups) before awaiting
persistence.deleteQueueMessages(this.name), which can leave the store empty if
the persistence call fails; change purge() to either (A) call and await
this.persistence?.deleteQueueMessages(this.name) first and only clear the
in-memory collections after it resolves, or (B) snapshot the current collections
into local variables, clear them, await deleteQueueMessages, and if the await
rejects restore the snapshots back into messages, delayedMessages,
inflightMessages, fifoMessages, fifoDelayed and fifoLockedGroups so the method
has proper rollback semantics.

423-424: ⚠️ Potential issue | 🔴 Critical

Delay visibility-state mutations until persistence succeeds.

The zero-timeout branch drops the inflight entry before updateMessageReady(), and the positive-timeout branch overwrites entry.visibilityDeadline before updateMessageInflight(). If either write fails, the request returns an error after live state has already diverged; the zero-timeout path can even leave the message in neither collection.

Also applies to: 442-449

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sqs/sqsStore.ts` around lines 423 - 424, The in-memory
visibility/inflight mutations (inflightMessages.delete, entry.visibilityDeadline
update) are happening before the persistence calls (updateMessageReady,
updateMessageInflight), which can leave in-memory state inconsistent if the
persistence write fails; change the logic so that you first perform the
persistence update using immutable/local values (e.g., compute
newVisibilityDeadline locally), await updateMessageReady or
updateMessageInflight on persistence, and only after the await succeeds mutate
in-memory state (call inflightMessages.delete or set entry.visibilityDeadline
and update the inflight map). Also ensure you use try/catch around the
persistence call to return the error without changing live state if the write
fails.

229-230: ⚠️ Potential issue | 🔴 Critical

Make the DLQ transfer atomic.

The message has already been removed from the live queue before these awaits run. If the source delete succeeds and dlq.enqueue() fails, the message disappears from both queues. Use a single transactional transfer, or enqueue first and only delete on success with an explicit rollback path.

Also applies to: 329-330

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sqs/sqsStore.ts` around lines 229 - 230, The transfer to the DLQ is not
atomic because the code calls this.persistence?.deleteMessage(...) before
dlq.enqueue(...); change the flow to call dlq.enqueue(msg) first and only call
this.persistence.deleteMessage(msg.messageId) after enqueue resolves
successfully, and if the subsequent delete fails implement a rollback that
removes the message from the DLQ (or retries the delete with backoff and logs a
fatal/alert) so the message is not lost; apply the same change to the other
occurrence that uses this.persistence?.deleteMessage and dlq.enqueue(msg) as
well.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/app.ts`:
- Around line 621-623: The deduplication record is written before confirming the
message persisted, causing failed enqueues to be treated as duplicates; change
the order so you call queue.nextSequenceNumber() and set msg.sequenceNumber as
before, but await queue.enqueue(msg) first and only call
queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber) after
enqueue resolves successfully (ensure any enqueue rejection prevents the dedup
write). Use the existing symbols queue.nextSequenceNumber(), queue.enqueue(),
and queue.recordDeduplication(...) to locate and reorder the calls accordingly.

In `@src/persistence/pgPersistence.ts`:
- Around line 717-719: The SELECT that loads messages for a queue omits ORDER
BY, which can reshuffle FIFO groups on restart; update the query in the method
that calls this.pool.query to include "ORDER BY sequence_number" (or "ORDER BY
sequence_number ASC") so rows are returned in FIFO sequence before you rebuild
fifoMessages and fifoDelayed, and ensure you append rows in the returned order
to those arrays (references: the this.pool.query call and the fifoMessages /
fifoDelayed rebuild logic).

---

Outside diff comments:
In `@src/app.ts`:
- Around line 293-325: The public API was accidentally changed to async: restore
the original synchronous signatures for inspectQueue, createTopic, subscribe
(and the other methods flagged around 331-369) so callers aren’t broken; either
revert the return types from Promise<T> back to their original synchronous types
and implement them synchronously, or else add new explicitly-named async
variants (e.g., inspectQueueAsync/createTopicAsync/subscribeAsync) and keep the
original inspectQueue/createTopic/subscribe signatures intact. Ensure the
implementations and exported types for inspectQueue, createTopic, and subscribe
match their previous sync contracts and update any internal callers accordingly.

In `@src/sqs/sqsStore.ts`:
- Around line 244-250: The awaited call to persistence.updateMessageInflight
(and the same block around lines 344-350) can throw after we've already removed
the message from the ready collection and bumped receive metadata; on failure we
must rollback local dequeue state. Wrap the updateMessageInflight call in a
try/catch, and on catch: re-insert the message back into the ready collection
exactly as it was before removal, restore/decrement the receive metadata fields
you incremented (approximateReceiveCount and approximateFirstReceiveTimestamp as
applicable), ensure any FIFO group state remains unmodified (don't mark the
group as dequeued or acquire the group lock), then rethrow the error so the
caller knows the persistence write failed. Use the existing symbols msg,
receiptHandle, visibilityDeadline, updateMessageInflight and the local ready
collection / receive-metadata-updating logic to implement the rollback.

---

Duplicate comments:
In `@src/sqs/sqsStore.ts`:
- Around line 558-565: The purge() implementation clears all in-memory
collections (messages, delayedMessages, inflightMessages, fifoMessages,
fifoDelayed, fifoLockedGroups) before awaiting
persistence.deleteQueueMessages(this.name), which can leave the store empty if
the persistence call fails; change purge() to either (A) call and await
this.persistence?.deleteQueueMessages(this.name) first and only clear the
in-memory collections after it resolves, or (B) snapshot the current collections
into local variables, clear them, await deleteQueueMessages, and if the await
rejects restore the snapshots back into messages, delayedMessages,
inflightMessages, fifoMessages, fifoDelayed and fifoLockedGroups so the method
has proper rollback semantics.
- Around line 423-424: The in-memory visibility/inflight mutations
(inflightMessages.delete, entry.visibilityDeadline update) are happening before
the persistence calls (updateMessageReady, updateMessageInflight), which can
leave in-memory state inconsistent if the persistence write fails; change the
logic so that you first perform the persistence update using immutable/local
values (e.g., compute newVisibilityDeadline locally), await updateMessageReady
or updateMessageInflight on persistence, and only after the await succeeds
mutate in-memory state (call inflightMessages.delete or set
entry.visibilityDeadline and update the inflight map). Also ensure you use
try/catch around the persistence call to return the error without changing live
state if the write fails.
- Around line 229-230: The transfer to the DLQ is not atomic because the code
calls this.persistence?.deleteMessage(...) before dlq.enqueue(...); change the
flow to call dlq.enqueue(msg) first and only call
this.persistence.deleteMessage(msg.messageId) after enqueue resolves
successfully, and if the subsequent delete fails implement a rollback that
removes the message from the DLQ (or retries the delete with backoff and logs a
fatal/alert) so the message is not lost; apply the same change to the other
occurrence that uses this.persistence?.deleteMessage and dlq.enqueue(msg) as
well.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7a5cd90e-9b7f-49ea-bd7e-7ed496b1a312

📥 Commits

Reviewing files that changed from the base of the PR and between 95a63b5 and 658865d.

📒 Files selected for processing (8)
  • src/app.ts
  • src/persistence/pgPersistence.ts
  • src/server.ts
  • src/sqs/sqsStore.ts
  • test/persistence/file-s3-persistence.test.ts
  • test/persistence/persistence.test.ts
  • test/programmatic-api.test.ts
  • test/s3/rename-object.test.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/server.ts
  • test/programmatic-api.test.ts

Comment thread src/app.ts
Comment on lines +621 to +623
msg.sequenceNumber = await queue.nextSequenceNumber();
queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber);
queue.enqueue(msg);
await queue.enqueue(msg);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Record FIFO deduplication only after the enqueue succeeds.

If queue.enqueue() rejects after the cache write, retries with the same dedup ID are treated as duplicates even though no message was persisted, so the send is effectively lost for the full dedup window.

💡 Suggested fix
         msg.sequenceNumber = await queue.nextSequenceNumber();
-        queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber);
         await queue.enqueue(msg);
+        queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
msg.sequenceNumber = await queue.nextSequenceNumber();
queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber);
queue.enqueue(msg);
await queue.enqueue(msg);
msg.sequenceNumber = await queue.nextSequenceNumber();
await queue.enqueue(msg);
queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app.ts` around lines 621 - 623, The deduplication record is written
before confirming the message persisted, causing failed enqueues to be treated
as duplicates; change the order so you call queue.nextSequenceNumber() and set
msg.sequenceNumber as before, but await queue.enqueue(msg) first and only call
queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber) after
enqueue resolves successfully (ensure any enqueue rejection prevents the dedup
write). Use the existing symbols queue.nextSequenceNumber(), queue.enqueue(),
and queue.recordDeduplication(...) to locate and reorder the calls accordingly.

Comment on lines +717 to +719
const result = await this.pool.query("SELECT * FROM sqs_messages WHERE queue_name = $1", [
queue.name,
]);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Preserve FIFO order when reloading messages from PostgreSQL.

This query has no ORDER BY, so restart can reshuffle rows before they are appended back into each FIFO group's array. That breaks per-group ordering guarantees. Load FIFO queues ordered by sequence_number before rebuilding fifoMessages and fifoDelayed.

💡 Suggested fix
-    const result = await this.pool.query("SELECT * FROM sqs_messages WHERE queue_name = $1", [
-      queue.name,
-    ]);
+    const result = await this.pool.query(
+      queue.isFifo()
+        ? "SELECT * FROM sqs_messages WHERE queue_name = $1 ORDER BY sequence_number ASC"
+        : "SELECT * FROM sqs_messages WHERE queue_name = $1",
+      [queue.name],
+    );
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/persistence/pgPersistence.ts` around lines 717 - 719, The SELECT that
loads messages for a queue omits ORDER BY, which can reshuffle FIFO groups on
restart; update the query in the method that calls this.pool.query to include
"ORDER BY sequence_number" (or "ORDER BY sequence_number ASC") so rows are
returned in FIFO sequence before you rebuild fifoMessages and fifoDelayed, and
ensure you append rows in the returned order to those arrays (references: the
this.pool.query call and the fifoMessages / fifoDelayed rebuild logic).

@kibertoad
Copy link
Copy Markdown
Owner

@yarlson can you resolve the conflicts and address coderabbit comments? I've been going back and forth over introducing a new dependency, but 90 kbs ain't that much, and I can see how this can be useful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants