Add PostgreSQL as an alternative persistence backend#57
Add PostgreSQL as an alternative persistence backend#57yarlson wants to merge 5 commits intokibertoad:mainfrom
Conversation
Move persistence.ts to persistence/sqlitePersistence.ts and extract a PersistenceProvider interface that covers SQS, SNS, and S3 persistence. Store property types now reference the interface instead of the concrete SqlitePersistence class, enabling alternative backends.
All persistence interface methods now return void | Promise<void> (or Buffer | Promise<Buffer> for readBody). Store methods and action handlers that call persistence are now async with await. This enables asynchronous backends (e.g. PostgreSQL) while keeping SQLite unchanged since await on synchronous void is a no-op.
Implement PgPersistence as an alternative to SqlitePersistence, using the pg library with Pool for async connection management. Activated via FAUXQS_PERSISTENCE_BACKEND=postgresql and FAUXQS_POSTGRESQL_URL env vars, or the programmatic API options persistenceBackend and postgresqlUrl. SQLite remains the default. Includes a factory function createPersistence() and skippable PG integration tests.
Replace env-var-gated test skipping with testcontainers to automatically spin up a PostgreSQL container for integration tests.
📝 WalkthroughWalkthroughAdds PostgreSQL persistence backend and test support; converts core server API, stores, and action handlers for SQS/SNS/S3 to async/Promise-based implementations; introduces a unified PersistenceProvider and a PgPersistence implementation; updates startup/config to select backend and adds integration tests for Postgres persistence. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client/Test
participant Server as Fauxqs Server
participant Factory as Persistence Factory
participant DB as PostgreSQL
Client->>Server: startFauxqs({ persistenceBackend: "postgresql", postgresqlUrl })
Server->>Factory: createPersistence({ type: "postgresql", connectionString })
Factory->>DB: pg.Pool(connect & run migrations)
DB-->>Factory: pool instance
Factory-->>Server: PgPersistence
Server->>Server: await setup(config)
Server->>PgPersistence: await load(sqsStore,snsStore,s3Store)
PgPersistence->>DB: SELECT/LOAD persisted rows
DB-->>PgPersistence: rows
PgPersistence-->>Server: restored state
Client->>Server: SQS/SNS/S3 requests (send/publish/put)
Server->>PgPersistence: await write-through operations (insertMessage/insertTopic/upsertObject)
PgPersistence->>DB: INSERT/UPDATE/TRANSACTION
DB-->>PgPersistence: OK
PgPersistence-->>Server: ack
Server-->>Client: response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (7)
src/sqs/actions/tagQueue.ts (1)
15-20:⚠️ Potential issue | 🟠 MajorRollback tag mutations if the persistence write fails.
The queue's
Mapis mutated before the awaited write on Line 20. If that persistence call rejects,TagQueuereturns an error after the tags were already changed in memory.💡 Suggested change
+ const previousTags = new Map(queue.tags); const tags = (body.Tags as Record<string, string>) ?? {}; for (const [key, value] of Object.entries(tags)) { queue.tags.set(key, value); } - await queue.persistence?.insertQueue(queue); + try { + await queue.persistence?.insertQueue(queue); + } catch (error) { + queue.tags = previousTags; + throw error; + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sqs/actions/tagQueue.ts` around lines 15 - 20, The code mutates queue.tags (a Map) before awaiting queue.persistence?.insertQueue(queue), so if insertQueue rejects the in-memory tags remain changed; to fix, capture the current state (e.g., clone entries from queue.tags) before applying the new tags, apply the new tags as now, await queue.persistence?.insertQueue(queue), and if the promise throws restore queue.tags to the saved state (or clear and re-set entries) and re-throw the error; reference the tags variable, queue.tags Map, and queue.persistence?.insertQueue(queue) (inside TagQueue) when making the change.src/sqs/sqsStore.ts (1)
244-250:⚠️ Potential issue | 🔴 CriticalRollback the ready→inflight transition if persistence rejects.
By the time these writes run, the message has already been removed from the ready queue and inserted into
inflightMessages. IfupdateMessageInflight()rejects,ReceiveMessagefails after local state changed; in the FIFO path the group is still unlocked, so later messages can leapfrog the failed one. Please wrap this transition in atry/catchand restore the message to the head of the source queue/group before rethrowing.Also applies to: 344-350
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sqs/sqsStore.ts` around lines 244 - 250, Wrap the call to this.persistence.updateMessageInflight(...) inside a try/catch in the ReceiveMessage code path: if updateMessageInflight rejects, catch the error, remove the message from inflightMessages and reinsert it at the head of the original ready queue or group (preserving FIFO group locking semantics), then rethrow the error; apply the same pattern to the other occurrence around lines 344-350 so both updateMessageInflight usages restore local state on failure. Ensure you reference and mutate the same structures used elsewhere (inflightMessages and the ready queue/group) and keep receiptHandle/visibilityDeadline arguments unchanged when rethrowing.src/sqs/actions/createQueue.ts (1)
73-79:⚠️ Potential issue | 🔴 CriticalMerged attributes on existing queue are not persisted.
When new attributes are merged into an existing queue (lines 73–78), the changes remain in-memory only and are never persisted. This contrasts with how state mutations are handled elsewhere (e.g.,
tagQueue.tsanduntagQueue.tsexplicitly callqueue.persistence?.insertQueue(queue)after state changes). If the server restarts, merged attributes will be lost.Add a persistence call after the merge to ensure consistency:
await existing.persistence?.insertQueue(existing);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sqs/actions/createQueue.ts` around lines 73 - 79, The merged attributes are only kept in memory inside the createQueue logic (the for-loop that updates existing.attributes) and must be persisted like other state changes; after the loop that mutates existing.attributes in createQueue (the variable named existing in src/sqs/actions/createQueue.ts), call and await existing.persistence?.insertQueue(existing) so the updated attributes are saved to durable storage.src/sns/actions/tagResource.ts (1)
20-30:⚠️ Potential issue | 🟠 MajorRollback in-memory tag mutations when persistence fails.
Both actions update
topic.tagsbefore persisting. IfinsertTopicrejects, the request fails but the in-memory state stays changed, causing divergence from persisted state.💡 Suggested fix (snapshot + rollback)
export async function tagResource( params: Record<string, string>, snsStore: SnsStore, ): Promise<string> { @@ const topic = snsStore.getTopic(resourceArn); @@ + const previousTags = new Map(topic.tags); + // Parse Tags.member.N.Key/Value for (const [key, value] of Object.entries(params)) { @@ topic.tags.set(value, tagValue); } } - await snsStore.persistence?.insertTopic(topic); + try { + await snsStore.persistence?.insertTopic(topic); + } catch (err) { + topic.tags.clear(); + for (const [k, v] of previousTags) topic.tags.set(k, v); + throw err; + } @@ export async function untagResource( params: Record<string, string>, snsStore: SnsStore, ): Promise<string> { @@ const topic = snsStore.getTopic(resourceArn); @@ + const previousTags = new Map(topic.tags); + // Parse TagKeys.member.N for (const [key, value] of Object.entries(params)) { @@ topic.tags.delete(value); } } - await snsStore.persistence?.insertTopic(topic); + try { + await snsStore.persistence?.insertTopic(topic); + } catch (err) { + topic.tags.clear(); + for (const [k, v] of previousTags) topic.tags.set(k, v); + throw err; + }Also applies to: 49-58
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sns/actions/tagResource.ts` around lines 20 - 30, The current loop mutates topic.tags directly before calling snsStore.persistence?.insertTopic(topic), so if insertTopic rejects the in-memory topic stays changed; to fix, take a snapshot of the existing tags (e.g., Array.from(topic.tags) or a cloned Map) before applying changes in tagResource, apply the new tags to topic.tags, then await snsStore.persistence?.insertTopic(topic) inside a try/catch and on any error restore topic.tags from the snapshot (or replace with the cloned Map) and rethrow the error; do the same snapshot/restore pattern for the other similar mutation block referenced (lines 49-58) to ensure in-memory rollback on persistence failure, and refer to topic.tags and snsStore.persistence.insertTopic when locating the code.src/sns/actions/setSubscriptionAttributes.ts (1)
42-50:⚠️ Potential issue | 🟠 MajorAvoid mutating in-memory subscription state before awaited persistence.
At Line 42, state is updated first; if the awaited write at Lines 47-50 fails, memory and persisted state diverge.Proposed fix
- subscription.attributes[attributeName] = attributeValue; + const nextAttributes = { + ...subscription.attributes, + [attributeName]: attributeValue, + }; // Invalidate cached parsed filter policy when relevant attributes change if (attributeName === "FilterPolicy" || attributeName === "FilterPolicyScope") { subscription.parsedFilterPolicy = undefined; } - await snsStore.persistence?.updateSubscriptionAttributes( - subscriptionArn, - subscription.attributes, - ); + await snsStore.persistence?.updateSubscriptionAttributes(subscriptionArn, nextAttributes); + subscription.attributes = nextAttributes;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sns/actions/setSubscriptionAttributes.ts` around lines 42 - 50, The code mutates subscription.attributes and subscription.parsedFilterPolicy before awaiting persistence, risking divergence if updateSubscriptionAttributes fails; change the flow to first create an updatedAttributes object (copy subscription.attributes with attributeName set to attributeValue and derive parsedFilterPolicy reset logic), call await snsStore.persistence?.updateSubscriptionAttributes(subscriptionArn, updatedAttributes), and only after successful persistence assign subscription.attributes = updatedAttributes and update subscription.parsedFilterPolicy as needed; use the existing symbols subscription.attributes, parsedFilterPolicy, updateSubscriptionAttributes, subscriptionArn, attributeName, and attributeValue to locate and apply this change.src/initConfig.ts (1)
206-213:⚠️ Potential issue | 🟠 MajorAwait S3 bucket initialization calls inside async init flow.
At Lines 206-213, bucket creation/lifecycle writes are not awaited.applyInitConfigcan complete before S3 setup finishes, and async failures may escape init error handling.Proposed fix
if (typeof entry === "string") { - s3Store.createBucket(entry); + await s3Store.createBucket(entry); } else { - s3Store.createBucket(entry.name, entry.type); + await s3Store.createBucket(entry.name, entry.type); if (entry.lifecycleConfiguration) { - s3Store.putBucketLifecycleConfiguration( + await s3Store.putBucketLifecycleConfiguration( entry.name, lifecycleConfigToXml(entry.lifecycleConfiguration), ); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/initConfig.ts` around lines 206 - 213, The S3 bucket creation and lifecycle writes in applyInitConfig are fired without awaiting, so change the calls to await s3Store.createBucket(...) and await s3Store.putBucketLifecycleConfiguration(...) (or await the promise-returning wrapper used by s3Store) and ensure applyInitConfig (or its caller) correctly propagates errors; specifically update the branches where s3Store.createBucket(entry) and s3Store.createBucket(entry.name, entry.type) plus s3Store.putBucketLifecycleConfiguration(entry.name, lifecycleConfigToXml(...)) are invoked to await their returned promises and let failures bubble up to the async init flow.src/sns/actions/publish.ts (1)
223-237:⚠️ Potential issue | 🟠 MajorCatch per-entry fan-out failures inside
publishBatch.
fanOutToSubscriptions()can now reject on async queue/persistence errors. One rejection aborts the whole batch after earlier entries may already have been delivered, and the failing entry never gets a<Failed>member. Handle that await per entry so the batch response matches the partial work already done.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sns/actions/publish.ts` around lines 223 - 237, In publishBatch, don’t await fanOutToSubscriptions() for the whole batch at once; instead wrap the await call for each entry in a try/catch inside the per-entry loop (the code that currently calls fanOutToSubscriptions with topicArn, topic, messageId, entry.message, entry.messageAttributes, subject, messageGroupId, messageDeduplicationId, snsStore, sqsStore) so that when fanOutToSubscriptions rejects you push a corresponding <member><Id>...</Id><Code>...</Code><Message>...</Message></member> into the failed list (the same structure used elsewhere) and continue processing remaining entries; on success continue to push the successfulXml entry as before. Ensure you include the entry.id and messageId in both success and failure entries and do not let one rejection abort the entire publishBatch.
🧹 Nitpick comments (6)
test/persistence/pg-persistence.test.ts (1)
71-104: Close fauxqs servers in afinallypath.Each test stops its servers only on the happy path. If an SDK call or assertion throws earlier, the listener and DB connections stay open for the rest of the suite. Wrap each server lifetime in
try/finallyso failures clean up reliably.Also applies to: 108-145, 149-184
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/persistence/pg-persistence.test.ts` around lines 71 - 104, The test starts fauxqs servers via startFauxqs (server1, server2) and calls serverX.stop() only on the happy path; wrap each server's lifecycle in try/finally so stop() is always invoked. Specifically, for the blocks that create server1/server2 and use makeSqsClient/SendMessageCommand/GetQueueUrlCommand/ReceiveMessageCommand, move the test logic into a try and call await serverX.stop() in the corresponding finally; do the same for the other similar test ranges (lines referenced: the other startFauxqs usages) to ensure listeners and DB connections are cleaned up on failures.src/persistence/pgPersistence.ts (4)
687-703: Queue restoration triggers unnecessary re-persistence.When loading queues,
sqsStore.createQueue()is called which will persist the queue again (viainsertQueue). The comment acknowledges this but suggests overwriting timestamps in memory. A cleaner approach would be to use a separate restore method that doesn't trigger persistence.This pattern is acceptable for now since
ON CONFLICT DO UPDATEhandles idempotency, but consider adding arestoreQueuemethod toSqsStorethat bypasses persistence for cleaner separation.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/persistence/pgPersistence.ts` around lines 687 - 703, loadSqsQueues currently calls SqsStore.createQueue which re-persists queues during restoration; add a non-persisting restore method on SqsStore (e.g., restoreQueue) and call it from loadSqsQueues instead of createQueue so restoration populates in-memory queue objects without triggering insertQueue/DB writes; ensure restoreQueue accepts the same params (name, url, arn, attributes, tags) and sets createdTimestamp, lastModifiedTimestamp and sequenceCounter directly without calling persistence code, then update loadSqsQueues to use restoreQueue and keep the subsequent loadSqsMessages call.
870-908: N+1 query pattern in multipart upload loading.For each multipart upload, a separate query fetches its parts. With many uploads, this becomes inefficient. Consider using a JOIN or fetching all parts in one query grouped by upload_id.
♻️ Suggested optimization to fetch parts in bulk
private async loadS3MultipartUploads(s3Store: S3Store): Promise<void> { const uploadResult = await this.pool.query("SELECT * FROM s3_multipart_uploads"); + const partResult = await this.pool.query("SELECT * FROM s3_multipart_parts"); + + // Group parts by upload_id + const partsByUpload = new Map<string, typeof partResult.rows>(); + for (const pRow of partResult.rows) { + const parts = partsByUpload.get(pRow.upload_id) ?? []; + parts.push(pRow); + partsByUpload.set(pRow.upload_id, parts); + } for (const row of uploadResult.rows) { const upload: MultipartUpload = { // ... existing fields ... }; - const partResult = await this.pool.query( - "SELECT * FROM s3_multipart_parts WHERE upload_id = $1", - [row.upload_id], - ); - - for (const pRow of partResult.rows) { + for (const pRow of partsByUpload.get(row.upload_id) ?? []) { upload.parts.set(pRow.part_number, { // ... existing mapping ... }); } s3Store.restoreMultipartUpload(upload); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/persistence/pgPersistence.ts` around lines 870 - 908, In loadS3MultipartUploads, avoid the N+1 pattern by replacing the per-upload query of s3_multipart_parts with a single bulk query that selects all parts (e.g., SELECT * FROM s3_multipart_parts WHERE upload_id = ANY($1)) or a JOIN of s3_multipart_uploads and s3_multipart_parts; group the returned part rows by upload_id into a Map<number, Part[]> (or Map<string, Map<partNumber, Part>>), then when iterating uploadResult.rows build each MultipartUpload and populate upload.parts from the grouped parts map before calling s3Store.restoreMultipartUpload(upload); keep JSON.parse(row.metadata) and the field mappings intact and reference loadS3MultipartUploads, s3_multipart_uploads, s3_multipart_parts, upload.parts, and s3Store.restoreMultipartUpload when making the change.
746-760: Inline SQL duplicatesupdateMessageReadylogic.This inline query has the same issue as
updateMessageReady- it preserves receive count correctly here but the method on line 234-240 doesn't. Additionally, this duplicates SQL that could call the existing method.♻️ Consider reusing updateMessageReady after fixing it
After fixing
updateMessageReadyto preserve receive count:// Visibility expired — clear inflight state in DB - await this.pool.query( - `UPDATE sqs_messages SET receipt_handle = $1, visibility_deadline = $2, approximate_receive_count = $3, approximate_first_receive_timestamp = $4 - WHERE message_id = $5`, - [ - null, - null, - Number(row.approximate_receive_count), - row.approximate_first_receive_timestamp != null - ? Number(row.approximate_first_receive_timestamp) - : null, - row.message_id, - ], - ); + await this.updateMessageReady(row.message_id);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/persistence/pgPersistence.ts` around lines 746 - 760, The inline UPDATE duplicates logic from updateMessageReady and incorrectly duplicates behavior instead of reusing a single method; first modify updateMessageReady to preserve approximate_receive_count and approximate_first_receive_timestamp when clearing visibility (accepting parameters or reading from the row) so it updates receipt_handle and visibility_deadline while keeping receive counts intact, then replace this.inline pool.query block with a call to updateMessageReady(row.message_id, { receipt_handle: null, visibility_deadline: null, approximate_receive_count: Number(row.approximate_receive_count), approximate_first_receive_timestamp: row.approximate_first_receive_timestamp != null ? Number(row.approximate_first_receive_timestamp) : null }); reference updateMessageReady and the current pool.query usage that reads row.message_id, row.approximate_receive_count, and row.approximate_first_receive_timestamp to locate the code to change.
11-105: Consider adding indexes for common query patterns.The schema creates tables with primary keys but lacks indexes on foreign key columns and commonly queried fields. This could impact query performance as data grows.
📊 Suggested indexes to add
const SCHEMA_STATEMENTS = [ // ... existing table definitions ... + `CREATE INDEX IF NOT EXISTS idx_sqs_messages_queue_name ON sqs_messages(queue_name)`, + `CREATE INDEX IF NOT EXISTS idx_sns_subscriptions_topic_arn ON sns_subscriptions(topic_arn)`, + `CREATE INDEX IF NOT EXISTS idx_s3_objects_bucket ON s3_objects(bucket)`, ];🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/persistence/pgPersistence.ts` around lines 11 - 105, SCHEMA_STATEMENTS currently defines table DDLs but lacks indexes on FK and high-cardinality query columns; add CREATE INDEX statements to the SCHEMA_STATEMENTS array for columns such as sqs_messages.queue_name, sqs_messages.sent_timestamp (or visibility_deadline), sns_subscriptions.topic_arn, s3_objects.bucket (and optionally s3_objects.last_modified or etag if queried), s3_multipart_parts.upload_id and s3_multipart_parts.part_number to improve lookup and join performance; update the SCHEMA_STATEMENTS constant to include those CREATE INDEX IF NOT EXISTS ... ON ... statements so migrations create indexes alongside tables.src/s3/s3Store.ts (1)
380-387: Consider usingPromise.allfor parallel deletion.The current implementation uses sequential
awaitin a loop, which can be slow for bulk deletions. Since eachdeleteObjectcall is independent, these can be parallelized.However, this may be intentional to avoid overwhelming the database with concurrent requests, so marking as optional.
♻️ Optional refactor for parallel deletion
async deleteObjects(bucket: string, keys: string[]): Promise<string[]> { - const deleted: string[] = []; - for (const key of keys) { - await this.deleteObject(bucket, key); - deleted.push(key); - } - return deleted; + await Promise.all(keys.map((key) => this.deleteObject(bucket, key))); + return keys; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/s3/s3Store.ts` around lines 380 - 387, The deleteObjects implementation in deleteObjects currently awaits deleteObject sequentially, causing slow bulk deletions; change it to run deletions in parallel by mapping keys to promises from this.deleteObject(bucket, key) and awaiting Promise.all to collect results, ensuring you still return the array of deleted keys (preserve order by awaiting Promise.all on the mapped array); if you need to limit concurrency to avoid overwhelming services, use a concurrency-controlled mapper (e.g., p-map or a simple batching loop) instead of firing all promises at once.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/app.ts`:
- Around line 406-418: The code may silently start without persistence when
persistenceBackend is "postgresql" but options.postgresqlUrl is missing and it
doesn't honor environment variables; update the startup logic to read
FAUXQS_PERSISTENCE_BACKEND and FAUXQS_POSTGRESQL_URL (falling back to
options.persistenceBackend and options.postgresqlUrl), use that resolved backend
value for the existing branch logic, and add validation in the branch handling
postgresql (where createPersistence is invoked) to throw/exit with a clear error
if no postgresql URL is available; keep existing SqlitePersistence instantiation
for sqlite fallback and reference the symbols
persistenceBackend/options.postgresqlUrl/createPersistence/SqlitePersistence/persistenceManager
when making changes.
In `@src/persistence/pgPersistence.ts`:
- Around line 804-815: The boolean conversion for `confirmed` is fragile because
`row.confirmed === 1` only handles integer representations; update the
assignment in the loop that builds the `SnsSubscription` (the `for (const row of
result.rows)` block) to robustly handle both PostgreSQL boolean and integer
types—for example, set `confirmed` using a normalized check like
`Boolean(row.confirmed)` (or `row.confirmed === true || row.confirmed === 1`)
when constructing the `sub` object so `snsStore.subscriptions.set(row.arn, sub)`
always receives a proper boolean.
- Around line 114-120: The schema initialization in PgPersistence.create
currently runs SCHEMA_STATEMENTS directly against a new pg.Pool and can leave a
partial schema on failure; update PgPersistence.create to begin a transaction on
a client from the pool, execute each statement (referencing SCHEMA_STATEMENTS)
within that transaction, then commit if all succeed or rollback on any error
(and rethrow), ensuring the pool/client are properly released; return new
PgPersistence(pool) only after a successful commit.
- Around line 234-240: updateMessageReady currently writes NULL into
approximate_receive_count (and likely approximate_first_receive_timestamp),
which loses receive-count data; change the SQL in updateMessageReady so it only
clears receipt_handle and visibility_deadline (set them to NULL) and does NOT
overwrite approximate_receive_count (and leave
approximate_first_receive_timestamp untouched) — update the pool.query call in
updateMessageReady to remove approximate_receive_count from the SET clause and
adjust the parameter list accordingly so messageId remains the locator.
In `@src/server.ts`:
- Around line 5-20: Validate FAUXQS_PERSISTENCE_BACKEND, FAUXQS_PERSISTENCE
(persistenceEnv), and required backend-specific vars before calling startFauxqs:
ensure persistenceBackend is one of the allowed values ("sqlite" or
"postgresql"), require persistenceEnv === "true" to enable persistence, and if
persistenceBackend === "sqlite" assert dataDir is present, or if "postgresql"
assert postgresqlUrl is present (fail fast with a thrown error or
process.exit(1) and a clear logger.error message). Use the existing symbols
(persistenceBackend, persistenceEnv, dataDir, postgresqlUrl, s3StorageDir if
applicable, enablePersistence, startFauxqs) to gate constructing the options and
prevent silently falling back to in-memory behavior.
In `@src/sns/actions/publish.ts`:
- Around line 337-345: The FIFO dedup sequence is not atomic: replace the
separate calls to queue.checkDeduplication(), queue.nextSequenceNumber(), and
queue.recordDeduplication() with a single queue-level async critical operation
(e.g., add a method like queue.reserveDeduplication(messageDeduplicationId,
messageId) or queue.beginFifoPublish(messageDeduplicationId, messageId)) that
atomically checks for duplicates and records the dedup entry while returning the
assigned sequenceNumber; update the caller inside the if (queue.isFifo() &&
messageDeduplicationId) block to call that single async method and set
sqsMsg.sequenceNumber from its result; also ensure there is a clear rollback API
(e.g., queue.releaseDeduplication(messageDeduplicationId) or a commit/release
handle returned by the reserve method) and call it if await
queue.enqueue(sqsMsg) fails so the dedup reservation is not left orphaned.
In `@src/sns/actions/setTopicAttributes.ts`:
- Around line 36-37: The code mutates topic.attributes
(topic.attributes[attributeName] = attributeValue) before awaiting persistence
(snsStore.persistence?.updateTopicAttributes), which can leave in-memory state
inconsistent if the write fails; instead create a newAttributes snapshot (e.g.,
shallow copy of topic.attributes with attributeName set to attributeValue) and
call snsStore.persistence?.updateTopicAttributes(topicArn, newAttributes) first,
then assign topic.attributes = newAttributes only after the await succeeds, or
alternatively perform the in-memory mutation but wrap the await in a try/catch
and revert topic.attributes back to its original value on error; reference
topic.attributes, attributeName, attributeValue, topicArn and
snsStore.persistence.updateTopicAttributes (SetTopicAttributes flow) when making
the change.
In `@src/sns/snsStore.ts`:
- Around line 67-69: The in-memory maps are being mutated before awaiting
persistence (e.g., calls that do this.topics.set(arn, topic) then await
this.persistence?.insertTopic(...) and similarly for this.subscriptions.set(...)
with insertSubscription), which can expose half-applied state or leave
memory/persistence inconsistent on failure; change the flow to await the
persistence write first (await
this.persistence?.insertTopic/insertSubscription(...)) and only on success
update the in-memory maps, or wrap the current order in try/catch and on
persistence failure remove/rollback the in-memory entry (e.g., remove the key
from topics/subscriptions in the catch and rethrow) so memory and DB cannot get
out of sync.
In `@src/sqs/sqsStore.ts`:
- Around line 229-230: The delete-then-enqueue sequence (calls to
persistence.deleteMessage and dlq.enqueue) must be made atomic: either add a
single transactional persistence operation (e.g., implement and call
persistence.transferToDlq(msg) that enqueues to DLQ and deletes from source in
one DB transaction) or, if a DB transaction is not possible, perform the enqueue
first and only delete on success, and on enqueue failure perform an explicit
rollback by re-adding the message to the source (e.g., persistence.requeue(msg))
with retries and error handling; update both occurrences (the calls around
deleteMessage/dlq.enqueue and the similar block at the other spot) to use the
new transfer or the enqueue-then-conditional-delete-with-rollback pattern and
surface any errors so failures don’t cause message loss.
- Around line 558-566: The purge() implementation must snapshot current
in-memory structures, call await this.persistence.deleteQueueMessages(this.name)
before committing the in-memory clears, and restore the snapshots if the delete
call rejects; update purge() to perform: 1) create deep copies of this.messages,
this.delayedMessages, this.inflightMessages, this.fifoMessages,
this.fifoDelayed, this.fifoLockedGroups, 2) attempt await
this.persistence?.deleteQueueMessages(this.name), 3) on success clear the
originals, and 4) on failure restore the snapshots and rethrow the error. Also
make SqsStore.clearMessages() async and have it await this.purge() (instead of
fire-and-forget) so callers observe and handle failures from purge().
- Around line 121-132: The current setAttributes mutates this.attributes and
this.lastModifiedTimestamp before awaiting persistence.updateQueueAttributes,
risking in-memory state changes on failure and concurrent mutation leaks;
instead, create a staged copy (e.g., const stagedAttrs = { ...this.attributes,
...attrs }), apply the RedrivePolicy empty check to stagedAttrs, compute a local
stagedTimestamp, call await this.persistence.updateQueueAttributes(this.name,
stagedAttrs, stagedTimestamp), and only after that assignment succeeds assign
this.attributes = stagedAttrs and this.lastModifiedTimestamp = stagedTimestamp
so the in-memory state is updated atomically on successful persistence in
setAttributes.
- Around line 422-425: The inflight entry is being deleted before the awaited
persistence write, causing loss of state if updateMessageReady fails; change the
logic in the timeout branch so you do not call
this.inflightMessages.delete(receiptHandle) until after await
this.persistence?.updateMessageReady(entry.message.messageId) completes
successfully (or, if you must delete first, re-add it on failure). Ensure any
errors from updateMessageReady are propagated/handled (try/catch) so FIFO locks
controlled via isFifo() and entry.message.messageGroupId remain consistent and
are only released when persistence has reliably marked the message ready.
---
Outside diff comments:
In `@src/initConfig.ts`:
- Around line 206-213: The S3 bucket creation and lifecycle writes in
applyInitConfig are fired without awaiting, so change the calls to await
s3Store.createBucket(...) and await s3Store.putBucketLifecycleConfiguration(...)
(or await the promise-returning wrapper used by s3Store) and ensure
applyInitConfig (or its caller) correctly propagates errors; specifically update
the branches where s3Store.createBucket(entry) and
s3Store.createBucket(entry.name, entry.type) plus
s3Store.putBucketLifecycleConfiguration(entry.name, lifecycleConfigToXml(...))
are invoked to await their returned promises and let failures bubble up to the
async init flow.
In `@src/sns/actions/publish.ts`:
- Around line 223-237: In publishBatch, don’t await fanOutToSubscriptions() for
the whole batch at once; instead wrap the await call for each entry in a
try/catch inside the per-entry loop (the code that currently calls
fanOutToSubscriptions with topicArn, topic, messageId, entry.message,
entry.messageAttributes, subject, messageGroupId, messageDeduplicationId,
snsStore, sqsStore) so that when fanOutToSubscriptions rejects you push a
corresponding
<member><Id>...</Id><Code>...</Code><Message>...</Message></member> into the
failed list (the same structure used elsewhere) and continue processing
remaining entries; on success continue to push the successfulXml entry as
before. Ensure you include the entry.id and messageId in both success and
failure entries and do not let one rejection abort the entire publishBatch.
In `@src/sns/actions/setSubscriptionAttributes.ts`:
- Around line 42-50: The code mutates subscription.attributes and
subscription.parsedFilterPolicy before awaiting persistence, risking divergence
if updateSubscriptionAttributes fails; change the flow to first create an
updatedAttributes object (copy subscription.attributes with attributeName set to
attributeValue and derive parsedFilterPolicy reset logic), call await
snsStore.persistence?.updateSubscriptionAttributes(subscriptionArn,
updatedAttributes), and only after successful persistence assign
subscription.attributes = updatedAttributes and update
subscription.parsedFilterPolicy as needed; use the existing symbols
subscription.attributes, parsedFilterPolicy, updateSubscriptionAttributes,
subscriptionArn, attributeName, and attributeValue to locate and apply this
change.
In `@src/sns/actions/tagResource.ts`:
- Around line 20-30: The current loop mutates topic.tags directly before calling
snsStore.persistence?.insertTopic(topic), so if insertTopic rejects the
in-memory topic stays changed; to fix, take a snapshot of the existing tags
(e.g., Array.from(topic.tags) or a cloned Map) before applying changes in
tagResource, apply the new tags to topic.tags, then await
snsStore.persistence?.insertTopic(topic) inside a try/catch and on any error
restore topic.tags from the snapshot (or replace with the cloned Map) and
rethrow the error; do the same snapshot/restore pattern for the other similar
mutation block referenced (lines 49-58) to ensure in-memory rollback on
persistence failure, and refer to topic.tags and
snsStore.persistence.insertTopic when locating the code.
In `@src/sqs/actions/createQueue.ts`:
- Around line 73-79: The merged attributes are only kept in memory inside the
createQueue logic (the for-loop that updates existing.attributes) and must be
persisted like other state changes; after the loop that mutates
existing.attributes in createQueue (the variable named existing in
src/sqs/actions/createQueue.ts), call and await
existing.persistence?.insertQueue(existing) so the updated attributes are saved
to durable storage.
In `@src/sqs/actions/tagQueue.ts`:
- Around line 15-20: The code mutates queue.tags (a Map) before awaiting
queue.persistence?.insertQueue(queue), so if insertQueue rejects the in-memory
tags remain changed; to fix, capture the current state (e.g., clone entries from
queue.tags) before applying the new tags, apply the new tags as now, await
queue.persistence?.insertQueue(queue), and if the promise throws restore
queue.tags to the saved state (or clear and re-set entries) and re-throw the
error; reference the tags variable, queue.tags Map, and
queue.persistence?.insertQueue(queue) (inside TagQueue) when making the change.
In `@src/sqs/sqsStore.ts`:
- Around line 244-250: Wrap the call to
this.persistence.updateMessageInflight(...) inside a try/catch in the
ReceiveMessage code path: if updateMessageInflight rejects, catch the error,
remove the message from inflightMessages and reinsert it at the head of the
original ready queue or group (preserving FIFO group locking semantics), then
rethrow the error; apply the same pattern to the other occurrence around lines
344-350 so both updateMessageInflight usages restore local state on failure.
Ensure you reference and mutate the same structures used elsewhere
(inflightMessages and the ready queue/group) and keep
receiptHandle/visibilityDeadline arguments unchanged when rethrowing.
---
Nitpick comments:
In `@src/persistence/pgPersistence.ts`:
- Around line 687-703: loadSqsQueues currently calls SqsStore.createQueue which
re-persists queues during restoration; add a non-persisting restore method on
SqsStore (e.g., restoreQueue) and call it from loadSqsQueues instead of
createQueue so restoration populates in-memory queue objects without triggering
insertQueue/DB writes; ensure restoreQueue accepts the same params (name, url,
arn, attributes, tags) and sets createdTimestamp, lastModifiedTimestamp and
sequenceCounter directly without calling persistence code, then update
loadSqsQueues to use restoreQueue and keep the subsequent loadSqsMessages call.
- Around line 870-908: In loadS3MultipartUploads, avoid the N+1 pattern by
replacing the per-upload query of s3_multipart_parts with a single bulk query
that selects all parts (e.g., SELECT * FROM s3_multipart_parts WHERE upload_id =
ANY($1)) or a JOIN of s3_multipart_uploads and s3_multipart_parts; group the
returned part rows by upload_id into a Map<number, Part[]> (or Map<string,
Map<partNumber, Part>>), then when iterating uploadResult.rows build each
MultipartUpload and populate upload.parts from the grouped parts map before
calling s3Store.restoreMultipartUpload(upload); keep JSON.parse(row.metadata)
and the field mappings intact and reference loadS3MultipartUploads,
s3_multipart_uploads, s3_multipart_parts, upload.parts, and
s3Store.restoreMultipartUpload when making the change.
- Around line 746-760: The inline UPDATE duplicates logic from
updateMessageReady and incorrectly duplicates behavior instead of reusing a
single method; first modify updateMessageReady to preserve
approximate_receive_count and approximate_first_receive_timestamp when clearing
visibility (accepting parameters or reading from the row) so it updates
receipt_handle and visibility_deadline while keeping receive counts intact, then
replace this.inline pool.query block with a call to
updateMessageReady(row.message_id, { receipt_handle: null, visibility_deadline:
null, approximate_receive_count: Number(row.approximate_receive_count),
approximate_first_receive_timestamp: row.approximate_first_receive_timestamp !=
null ? Number(row.approximate_first_receive_timestamp) : null }); reference
updateMessageReady and the current pool.query usage that reads row.message_id,
row.approximate_receive_count, and row.approximate_first_receive_timestamp to
locate the code to change.
- Around line 11-105: SCHEMA_STATEMENTS currently defines table DDLs but lacks
indexes on FK and high-cardinality query columns; add CREATE INDEX statements to
the SCHEMA_STATEMENTS array for columns such as sqs_messages.queue_name,
sqs_messages.sent_timestamp (or visibility_deadline),
sns_subscriptions.topic_arn, s3_objects.bucket (and optionally
s3_objects.last_modified or etag if queried), s3_multipart_parts.upload_id and
s3_multipart_parts.part_number to improve lookup and join performance; update
the SCHEMA_STATEMENTS constant to include those CREATE INDEX IF NOT EXISTS ...
ON ... statements so migrations create indexes alongside tables.
In `@src/s3/s3Store.ts`:
- Around line 380-387: The deleteObjects implementation in deleteObjects
currently awaits deleteObject sequentially, causing slow bulk deletions; change
it to run deletions in parallel by mapping keys to promises from
this.deleteObject(bucket, key) and awaiting Promise.all to collect results,
ensuring you still return the array of deleted keys (preserve order by awaiting
Promise.all on the mapped array); if you need to limit concurrency to avoid
overwhelming services, use a concurrency-controlled mapper (e.g., p-map or a
simple batching loop) instead of firing all promises at once.
In `@test/persistence/pg-persistence.test.ts`:
- Around line 71-104: The test starts fauxqs servers via startFauxqs (server1,
server2) and calls serverX.stop() only on the happy path; wrap each server's
lifecycle in try/finally so stop() is always invoked. Specifically, for the
blocks that create server1/server2 and use
makeSqsClient/SendMessageCommand/GetQueueUrlCommand/ReceiveMessageCommand, move
the test logic into a try and call await serverX.stop() in the corresponding
finally; do the same for the other similar test ranges (lines referenced: the
other startFauxqs usages) to ensure listeners and DB connections are cleaned up
on failures.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c9988cbf-c860-45da-a305-9ba5495720e6
⛔ Files ignored due to path filters (1)
package-lock.jsonis excluded by!**/package-lock.json
📒 Files selected for processing (51)
package.jsonsrc/app.tssrc/initConfig.tssrc/persistence/index.tssrc/persistence/persistenceProvider.tssrc/persistence/pgPersistence.tssrc/persistence/sqlitePersistence.tssrc/s3/actions/abortMultipartUpload.tssrc/s3/actions/completeMultipartUpload.tssrc/s3/actions/createBucket.tssrc/s3/actions/createMultipartUpload.tssrc/s3/actions/deleteBucket.tssrc/s3/actions/deleteBucketLifecycleConfiguration.tssrc/s3/actions/deleteObject.tssrc/s3/actions/deleteObjects.tssrc/s3/actions/getObject.tssrc/s3/actions/postObject.tssrc/s3/actions/putBucketLifecycleConfiguration.tssrc/s3/actions/putObject.tssrc/s3/actions/renameObject.tssrc/s3/actions/uploadPart.tssrc/s3/s3Persistence.tssrc/s3/s3Router.tssrc/s3/s3Store.tssrc/server.tssrc/sns/actions/confirmSubscription.tssrc/sns/actions/createTopic.tssrc/sns/actions/deleteTopic.tssrc/sns/actions/publish.tssrc/sns/actions/setSubscriptionAttributes.tssrc/sns/actions/setTopicAttributes.tssrc/sns/actions/subscribe.tssrc/sns/actions/tagResource.tssrc/sns/actions/unsubscribe.tssrc/sns/snsStore.tssrc/sqs/actions/changeMessageVisibility.tssrc/sqs/actions/changeMessageVisibilityBatch.tssrc/sqs/actions/createQueue.tssrc/sqs/actions/deleteMessage.tssrc/sqs/actions/deleteMessageBatch.tssrc/sqs/actions/deleteQueue.tssrc/sqs/actions/purgeQueue.tssrc/sqs/actions/receiveMessage.tssrc/sqs/actions/sendMessage.tssrc/sqs/actions/sendMessageBatch.tssrc/sqs/actions/setQueueAttributes.tssrc/sqs/actions/tagQueue.tssrc/sqs/actions/untagQueue.tssrc/sqs/sqsStore.tstest/persistence/pg-persistence.test.tstest/programmatic-api.test.ts
| this.topics.set(arn, topic); | ||
| this.persistence?.insertTopic(topic); | ||
| await this.persistence?.insertTopic(topic); | ||
| return topic; |
There was a problem hiding this comment.
Don't expose half-applied SNS mutations across awaited persistence writes.
These methods mutate topics/subscriptions before the awaited provider calls finish. With PostgreSQL, concurrent requests can observe entities that are not committed yet, and a rejection leaves memory and persistence out of sync until restart. Persist first, or add rollback/transaction handling around the in-memory mutation.
Also applies to: 77-84, 150-154, 161-169
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/sns/snsStore.ts` around lines 67 - 69, The in-memory maps are being
mutated before awaiting persistence (e.g., calls that do this.topics.set(arn,
topic) then await this.persistence?.insertTopic(...) and similarly for
this.subscriptions.set(...) with insertSubscription), which can expose
half-applied state or leave memory/persistence inconsistent on failure; change
the flow to await the persistence write first (await
this.persistence?.insertTopic/insertSubscription(...)) and only on success
update the in-memory maps, or wrap the current order in try/catch and on
persistence failure remove/rollback the in-memory entry (e.g., remove the key
from topics/subscriptions in the catch and rethrow) so memory and DB cannot get
out of sync.
| async setAttributes(attrs: Record<string, string>): Promise<void> { | ||
| Object.assign(this.attributes, attrs); | ||
| // Empty RedrivePolicy clears the DLQ association | ||
| if (attrs.RedrivePolicy === "") { | ||
| delete this.attributes.RedrivePolicy; | ||
| } | ||
| this.lastModifiedTimestamp = Math.floor(Date.now() / 1000); | ||
| this.persistence?.updateQueueAttributes(this.name, this.attributes, this.lastModifiedTimestamp); | ||
| await this.persistence?.updateQueueAttributes( | ||
| this.name, | ||
| this.attributes, | ||
| this.lastModifiedTimestamp, | ||
| ); |
There was a problem hiding this comment.
Stage queue attribute changes until the persistence write succeeds.
this.attributes and lastModifiedTimestamp are mutated before the awaited write. If persistence fails here, SetQueueAttributes returns an error after the queue was already changed in memory, and the shared this.attributes object can leak concurrent mutations into the persisted payload.
💡 Suggested change
- Object.assign(this.attributes, attrs);
- // Empty RedrivePolicy clears the DLQ association
- if (attrs.RedrivePolicy === "") {
- delete this.attributes.RedrivePolicy;
- }
- this.lastModifiedTimestamp = Math.floor(Date.now() / 1000);
- await this.persistence?.updateQueueAttributes(
- this.name,
- this.attributes,
- this.lastModifiedTimestamp,
- );
+ const nextAttributes = { ...this.attributes, ...attrs };
+ if (attrs.RedrivePolicy === "") {
+ delete nextAttributes.RedrivePolicy;
+ }
+ const nextLastModifiedTimestamp = Math.floor(Date.now() / 1000);
+ await this.persistence?.updateQueueAttributes(
+ this.name,
+ nextAttributes,
+ nextLastModifiedTimestamp,
+ );
+ this.attributes = nextAttributes;
+ this.lastModifiedTimestamp = nextLastModifiedTimestamp;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/sqs/sqsStore.ts` around lines 121 - 132, The current setAttributes
mutates this.attributes and this.lastModifiedTimestamp before awaiting
persistence.updateQueueAttributes, risking in-memory state changes on failure
and concurrent mutation leaks; instead, create a staged copy (e.g., const
stagedAttrs = { ...this.attributes, ...attrs }), apply the RedrivePolicy empty
check to stagedAttrs, compute a local stagedTimestamp, call await
this.persistence.updateQueueAttributes(this.name, stagedAttrs, stagedTimestamp),
and only after that assignment succeeds assign this.attributes = stagedAttrs and
this.lastModifiedTimestamp = stagedTimestamp so the in-memory state is updated
atomically on successful persistence in setAttributes.
| await this.persistence?.deleteMessage(msg.messageId); | ||
| await dlq.enqueue(msg); |
There was a problem hiding this comment.
Make the DLQ handoff atomic.
The message has already been shifted out of the source queue when these awaits run. If the source delete succeeds and dlq.enqueue(msg) fails — or if the delete rejects after the local pop — the message is no longer in the source queue and never reaches the DLQ. This needs a single transactional persistence operation, or explicit rollback to the source queue on failure.
Also applies to: 329-330
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/sqs/sqsStore.ts` around lines 229 - 230, The delete-then-enqueue sequence
(calls to persistence.deleteMessage and dlq.enqueue) must be made atomic: either
add a single transactional persistence operation (e.g., implement and call
persistence.transferToDlq(msg) that enqueues to DLQ and deletes from source in
one DB transaction) or, if a DB transaction is not possible, perform the enqueue
first and only delete on success, and on enqueue failure perform an explicit
rollback by re-adding the message to the source (e.g., persistence.requeue(msg))
with retries and error handling; update both occurrences (the calls around
deleteMessage/dlq.enqueue and the similar block at the other spot) to use the
new transfer or the enqueue-then-conditional-delete-with-rollback pattern and
surface any errors so failures don’t cause message loss.
| if (timeoutSeconds === 0) { | ||
| this.inflightMessages.delete(receiptHandle); | ||
| this.persistence?.updateMessageReady(entry.message.messageId); | ||
| await this.persistence?.updateMessageReady(entry.message.messageId); | ||
| if (this.isFifo() && entry.message.messageGroupId) { |
There was a problem hiding this comment.
Don't drop the inflight entry before updateMessageReady() succeeds.
Line 423 removes the only local handle to the message before the awaited persistence write on Line 424. If that write fails, the message is neither inflight nor ready, and FIFO group locks never get released.
💡 Suggested change
this.inflightMessages.delete(receiptHandle);
- await this.persistence?.updateMessageReady(entry.message.messageId);
+ try {
+ await this.persistence?.updateMessageReady(entry.message.messageId);
+ } catch (error) {
+ this.inflightMessages.set(receiptHandle, entry);
+ throw error;
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (timeoutSeconds === 0) { | |
| this.inflightMessages.delete(receiptHandle); | |
| this.persistence?.updateMessageReady(entry.message.messageId); | |
| await this.persistence?.updateMessageReady(entry.message.messageId); | |
| if (this.isFifo() && entry.message.messageGroupId) { | |
| if (timeoutSeconds === 0) { | |
| this.inflightMessages.delete(receiptHandle); | |
| try { | |
| await this.persistence?.updateMessageReady(entry.message.messageId); | |
| } catch (error) { | |
| this.inflightMessages.set(receiptHandle, entry); | |
| throw error; | |
| } | |
| if (this.isFifo() && entry.message.messageGroupId) { |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/sqs/sqsStore.ts` around lines 422 - 425, The inflight entry is being
deleted before the awaited persistence write, causing loss of state if
updateMessageReady fails; change the logic in the timeout branch so you do not
call this.inflightMessages.delete(receiptHandle) until after await
this.persistence?.updateMessageReady(entry.message.messageId) completes
successfully (or, if you must delete first, re-add it on failure). Ensure any
errors from updateMessageReady are propagated/handled (try/catch) so FIFO locks
controlled via isFifo() and entry.message.messageGroupId remain consistent and
are only released when persistence has reliably marked the message ready.
| async purge(): Promise<void> { | ||
| this.messages = []; | ||
| this.delayedMessages = []; | ||
| this.inflightMessages.clear(); | ||
| this.fifoMessages.clear(); | ||
| this.fifoDelayed.clear(); | ||
| this.fifoLockedGroups.clear(); | ||
| this.persistence?.deleteQueueMessages(this.name); | ||
| await this.persistence?.deleteQueueMessages(this.name); | ||
| } |
There was a problem hiding this comment.
purge() now needs rollback semantics and an awaited caller.
This clears every in-memory message structure before the awaited delete on Line 565. If that call rejects, PurgeQueue fails after the live queue was already emptied. Also, SqsStore.clearMessages() still fire-and-forgets this promise, so failures can go unhandled and callers can observe stale persisted data. Please snapshot/restore on error and make clearMessages() async so it awaits each purge.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/sqs/sqsStore.ts` around lines 558 - 566, The purge() implementation must
snapshot current in-memory structures, call await
this.persistence.deleteQueueMessages(this.name) before committing the in-memory
clears, and restore the snapshots if the delete call rejects; update purge() to
perform: 1) create deep copies of this.messages, this.delayedMessages,
this.inflightMessages, this.fifoMessages, this.fifoDelayed,
this.fifoLockedGroups, 2) attempt await
this.persistence?.deleteQueueMessages(this.name), 3) on success clear the
originals, and 4) on failure restore the snapshots and rethrow the error. Also
make SqsStore.clearMessages() async and have it await this.purge() (instead of
fire-and-forget) so callers observe and handle failures from purge().
… bug - Fail fast on invalid/missing persistence config in server.ts and app.ts - Wrap PG schema creation in a transaction for atomic DDL - Fix updateMessageReady zeroing out receive count and first-receive timestamp - Use Boolean() for confirmed column comparison in PG persistence - Make clearMessages() and reset() async to properly await purge calls
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/app.ts (1)
293-325:⚠️ Potential issue | 🟠 MajorThis changes the public programmatic API in a breaking way.
These methods used to be synchronous and now return
Promises. Existing callers that read the returned object synchronously or rely on sync sequencing will now get a pending promise unless they update every call site toawait, which conflicts with the PR's non-breaking goal.Also applies to: 331-369
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/app.ts` around lines 293 - 325, The public API was accidentally changed to async: restore the original synchronous signatures for inspectQueue, createTopic, subscribe (and the other methods flagged around 331-369) so callers aren’t broken; either revert the return types from Promise<T> back to their original synchronous types and implement them synchronously, or else add new explicitly-named async variants (e.g., inspectQueueAsync/createTopicAsync/subscribeAsync) and keep the original inspectQueue/createTopic/subscribe signatures intact. Ensure the implementations and exported types for inspectQueue, createTopic, and subscribe match their previous sync contracts and update any internal callers accordingly.src/sqs/sqsStore.ts (1)
244-250:⚠️ Potential issue | 🔴 CriticalRollback the local dequeue state if
updateMessageInflight()fails.Both dequeue paths have already removed the message from the ready collection and incremented receive metadata before this awaited write. A rejection here leaves standard messages hidden, overstates receive counts, and in the FIFO path the group lock is still not acquired yet, so later same-group messages can leak out of order.
Also applies to: 344-350
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sqs/sqsStore.ts` around lines 244 - 250, The awaited call to persistence.updateMessageInflight (and the same block around lines 344-350) can throw after we've already removed the message from the ready collection and bumped receive metadata; on failure we must rollback local dequeue state. Wrap the updateMessageInflight call in a try/catch, and on catch: re-insert the message back into the ready collection exactly as it was before removal, restore/decrement the receive metadata fields you incremented (approximateReceiveCount and approximateFirstReceiveTimestamp as applicable), ensure any FIFO group state remains unmodified (don't mark the group as dequeued or acquire the group lock), then rethrow the error so the caller knows the persistence write failed. Use the existing symbols msg, receiptHandle, visibilityDeadline, updateMessageInflight and the local ready collection / receive-metadata-updating logic to implement the rollback.
♻️ Duplicate comments (3)
src/sqs/sqsStore.ts (3)
558-565:⚠️ Potential issue | 🔴 CriticalAdd rollback semantics to
purge().
purge()clears every in-memory collection before the awaiteddeleteQueueMessages(). If persistence rejects,PurgeQueuefails after the running queue has already been emptied. Snapshot/restore the collections, or delete persisted rows first and only then commit the in-memory clear.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sqs/sqsStore.ts` around lines 558 - 565, The purge() implementation clears all in-memory collections (messages, delayedMessages, inflightMessages, fifoMessages, fifoDelayed, fifoLockedGroups) before awaiting persistence.deleteQueueMessages(this.name), which can leave the store empty if the persistence call fails; change purge() to either (A) call and await this.persistence?.deleteQueueMessages(this.name) first and only clear the in-memory collections after it resolves, or (B) snapshot the current collections into local variables, clear them, await deleteQueueMessages, and if the await rejects restore the snapshots back into messages, delayedMessages, inflightMessages, fifoMessages, fifoDelayed and fifoLockedGroups so the method has proper rollback semantics.
423-424:⚠️ Potential issue | 🔴 CriticalDelay visibility-state mutations until persistence succeeds.
The zero-timeout branch drops the inflight entry before
updateMessageReady(), and the positive-timeout branch overwritesentry.visibilityDeadlinebeforeupdateMessageInflight(). If either write fails, the request returns an error after live state has already diverged; the zero-timeout path can even leave the message in neither collection.Also applies to: 442-449
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sqs/sqsStore.ts` around lines 423 - 424, The in-memory visibility/inflight mutations (inflightMessages.delete, entry.visibilityDeadline update) are happening before the persistence calls (updateMessageReady, updateMessageInflight), which can leave in-memory state inconsistent if the persistence write fails; change the logic so that you first perform the persistence update using immutable/local values (e.g., compute newVisibilityDeadline locally), await updateMessageReady or updateMessageInflight on persistence, and only after the await succeeds mutate in-memory state (call inflightMessages.delete or set entry.visibilityDeadline and update the inflight map). Also ensure you use try/catch around the persistence call to return the error without changing live state if the write fails.
229-230:⚠️ Potential issue | 🔴 CriticalMake the DLQ transfer atomic.
The message has already been removed from the live queue before these awaits run. If the source delete succeeds and
dlq.enqueue()fails, the message disappears from both queues. Use a single transactional transfer, or enqueue first and only delete on success with an explicit rollback path.Also applies to: 329-330
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sqs/sqsStore.ts` around lines 229 - 230, The transfer to the DLQ is not atomic because the code calls this.persistence?.deleteMessage(...) before dlq.enqueue(...); change the flow to call dlq.enqueue(msg) first and only call this.persistence.deleteMessage(msg.messageId) after enqueue resolves successfully, and if the subsequent delete fails implement a rollback that removes the message from the DLQ (or retries the delete with backoff and logs a fatal/alert) so the message is not lost; apply the same change to the other occurrence that uses this.persistence?.deleteMessage and dlq.enqueue(msg) as well.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/app.ts`:
- Around line 621-623: The deduplication record is written before confirming the
message persisted, causing failed enqueues to be treated as duplicates; change
the order so you call queue.nextSequenceNumber() and set msg.sequenceNumber as
before, but await queue.enqueue(msg) first and only call
queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber) after
enqueue resolves successfully (ensure any enqueue rejection prevents the dedup
write). Use the existing symbols queue.nextSequenceNumber(), queue.enqueue(),
and queue.recordDeduplication(...) to locate and reorder the calls accordingly.
In `@src/persistence/pgPersistence.ts`:
- Around line 717-719: The SELECT that loads messages for a queue omits ORDER
BY, which can reshuffle FIFO groups on restart; update the query in the method
that calls this.pool.query to include "ORDER BY sequence_number" (or "ORDER BY
sequence_number ASC") so rows are returned in FIFO sequence before you rebuild
fifoMessages and fifoDelayed, and ensure you append rows in the returned order
to those arrays (references: the this.pool.query call and the fifoMessages /
fifoDelayed rebuild logic).
---
Outside diff comments:
In `@src/app.ts`:
- Around line 293-325: The public API was accidentally changed to async: restore
the original synchronous signatures for inspectQueue, createTopic, subscribe
(and the other methods flagged around 331-369) so callers aren’t broken; either
revert the return types from Promise<T> back to their original synchronous types
and implement them synchronously, or else add new explicitly-named async
variants (e.g., inspectQueueAsync/createTopicAsync/subscribeAsync) and keep the
original inspectQueue/createTopic/subscribe signatures intact. Ensure the
implementations and exported types for inspectQueue, createTopic, and subscribe
match their previous sync contracts and update any internal callers accordingly.
In `@src/sqs/sqsStore.ts`:
- Around line 244-250: The awaited call to persistence.updateMessageInflight
(and the same block around lines 344-350) can throw after we've already removed
the message from the ready collection and bumped receive metadata; on failure we
must rollback local dequeue state. Wrap the updateMessageInflight call in a
try/catch, and on catch: re-insert the message back into the ready collection
exactly as it was before removal, restore/decrement the receive metadata fields
you incremented (approximateReceiveCount and approximateFirstReceiveTimestamp as
applicable), ensure any FIFO group state remains unmodified (don't mark the
group as dequeued or acquire the group lock), then rethrow the error so the
caller knows the persistence write failed. Use the existing symbols msg,
receiptHandle, visibilityDeadline, updateMessageInflight and the local ready
collection / receive-metadata-updating logic to implement the rollback.
---
Duplicate comments:
In `@src/sqs/sqsStore.ts`:
- Around line 558-565: The purge() implementation clears all in-memory
collections (messages, delayedMessages, inflightMessages, fifoMessages,
fifoDelayed, fifoLockedGroups) before awaiting
persistence.deleteQueueMessages(this.name), which can leave the store empty if
the persistence call fails; change purge() to either (A) call and await
this.persistence?.deleteQueueMessages(this.name) first and only clear the
in-memory collections after it resolves, or (B) snapshot the current collections
into local variables, clear them, await deleteQueueMessages, and if the await
rejects restore the snapshots back into messages, delayedMessages,
inflightMessages, fifoMessages, fifoDelayed and fifoLockedGroups so the method
has proper rollback semantics.
- Around line 423-424: The in-memory visibility/inflight mutations
(inflightMessages.delete, entry.visibilityDeadline update) are happening before
the persistence calls (updateMessageReady, updateMessageInflight), which can
leave in-memory state inconsistent if the persistence write fails; change the
logic so that you first perform the persistence update using immutable/local
values (e.g., compute newVisibilityDeadline locally), await updateMessageReady
or updateMessageInflight on persistence, and only after the await succeeds
mutate in-memory state (call inflightMessages.delete or set
entry.visibilityDeadline and update the inflight map). Also ensure you use
try/catch around the persistence call to return the error without changing live
state if the write fails.
- Around line 229-230: The transfer to the DLQ is not atomic because the code
calls this.persistence?.deleteMessage(...) before dlq.enqueue(...); change the
flow to call dlq.enqueue(msg) first and only call
this.persistence.deleteMessage(msg.messageId) after enqueue resolves
successfully, and if the subsequent delete fails implement a rollback that
removes the message from the DLQ (or retries the delete with backoff and logs a
fatal/alert) so the message is not lost; apply the same change to the other
occurrence that uses this.persistence?.deleteMessage and dlq.enqueue(msg) as
well.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7a5cd90e-9b7f-49ea-bd7e-7ed496b1a312
📒 Files selected for processing (8)
src/app.tssrc/persistence/pgPersistence.tssrc/server.tssrc/sqs/sqsStore.tstest/persistence/file-s3-persistence.test.tstest/persistence/persistence.test.tstest/programmatic-api.test.tstest/s3/rename-object.test.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- src/server.ts
- test/programmatic-api.test.ts
| msg.sequenceNumber = await queue.nextSequenceNumber(); | ||
| queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber); | ||
| queue.enqueue(msg); | ||
| await queue.enqueue(msg); |
There was a problem hiding this comment.
Record FIFO deduplication only after the enqueue succeeds.
If queue.enqueue() rejects after the cache write, retries with the same dedup ID are treated as duplicates even though no message was persisted, so the send is effectively lost for the full dedup window.
💡 Suggested fix
msg.sequenceNumber = await queue.nextSequenceNumber();
- queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber);
await queue.enqueue(msg);
+ queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| msg.sequenceNumber = await queue.nextSequenceNumber(); | |
| queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber); | |
| queue.enqueue(msg); | |
| await queue.enqueue(msg); | |
| msg.sequenceNumber = await queue.nextSequenceNumber(); | |
| await queue.enqueue(msg); | |
| queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/app.ts` around lines 621 - 623, The deduplication record is written
before confirming the message persisted, causing failed enqueues to be treated
as duplicates; change the order so you call queue.nextSequenceNumber() and set
msg.sequenceNumber as before, but await queue.enqueue(msg) first and only call
queue.recordDeduplication(dedupId, msg.messageId, msg.sequenceNumber) after
enqueue resolves successfully (ensure any enqueue rejection prevents the dedup
write). Use the existing symbols queue.nextSequenceNumber(), queue.enqueue(),
and queue.recordDeduplication(...) to locate and reorder the calls accordingly.
| const result = await this.pool.query("SELECT * FROM sqs_messages WHERE queue_name = $1", [ | ||
| queue.name, | ||
| ]); |
There was a problem hiding this comment.
Preserve FIFO order when reloading messages from PostgreSQL.
This query has no ORDER BY, so restart can reshuffle rows before they are appended back into each FIFO group's array. That breaks per-group ordering guarantees. Load FIFO queues ordered by sequence_number before rebuilding fifoMessages and fifoDelayed.
💡 Suggested fix
- const result = await this.pool.query("SELECT * FROM sqs_messages WHERE queue_name = $1", [
- queue.name,
- ]);
+ const result = await this.pool.query(
+ queue.isFifo()
+ ? "SELECT * FROM sqs_messages WHERE queue_name = $1 ORDER BY sequence_number ASC"
+ : "SELECT * FROM sqs_messages WHERE queue_name = $1",
+ [queue.name],
+ );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/persistence/pgPersistence.ts` around lines 717 - 719, The SELECT that
loads messages for a queue omits ORDER BY, which can reshuffle FIFO groups on
restart; update the query in the method that calls this.pool.query to include
"ORDER BY sequence_number" (or "ORDER BY sequence_number ASC") so rows are
returned in FIFO sequence before you rebuild fifoMessages and fifoDelayed, and
ensure you append rows in the returned order to those arrays (references: the
this.pool.query call and the fifoMessages / fifoDelayed rebuild logic).
|
@yarlson can you resolve the conflicts and address coderabbit comments? I've been going back and forth over introducing a new dependency, but 90 kbs ain't that much, and I can see how this can be useful. |
Summary
fauxqs persistence is currently SQLite-only, which works well for single-instance local dev but limits use cases where shared persistent state across instances or integration with existing Postgres infrastructure is needed.
This adds PostgreSQL as an opt-in alternative backend — SQLite remains the default, no breaking changes to the env var surface or programmatic API.
Motivation
PersistenceProviderinterfaceNew configuration
FAUXQS_PERSISTENCE_BACKENDsqlitesqliteorpostgresqlFAUXQS_POSTGRESQL_URLProgrammatic:
startFauxqs({ persistenceBackend: 'postgresql', postgresqlUrl: '...' })Test plan
Summary by CodeRabbit
New Features
Chores