From 325dc3a319b7135e3761731f675a65802b251389 Mon Sep 17 00:00:00 2001 From: guimroque Date: Wed, 8 Apr 2026 15:37:11 -0300 Subject: [PATCH 01/16] feat: add Bull queue for async transaction submission with retry --- packages/api/package.json | 4 +- ...0000-add-retry-attempts-to-transactions.ts | 33 ++ packages/api/src/models/Transaction.ts | 7 + .../api/src/modules/transaction/controller.ts | 5 +- .../api/src/utils/submitTransactionQueue.ts | 99 ++++ packages/worker/package.json | 2 + packages/worker/src/index.ts | 2 + .../src/queues/submitTransaction/constants.ts | 5 + .../src/queues/submitTransaction/index.ts | 4 + .../src/queues/submitTransaction/queue.ts | 452 ++++++++++++++++++ .../src/queues/submitTransaction/types.ts | 14 + .../src/queues/submitTransaction/utils.ts | 19 + pnpm-lock.yaml | 12 + 13 files changed, 655 insertions(+), 3 deletions(-) create mode 100644 packages/api/src/database/migrations/1764300000000-add-retry-attempts-to-transactions.ts create mode 100644 packages/api/src/utils/submitTransactionQueue.ts create mode 100644 packages/worker/src/queues/submitTransaction/constants.ts create mode 100644 packages/worker/src/queues/submitTransaction/index.ts create mode 100644 packages/worker/src/queues/submitTransaction/queue.ts create mode 100644 packages/worker/src/queues/submitTransaction/types.ts create mode 100644 packages/worker/src/queues/submitTransaction/utils.ts diff --git a/packages/api/package.json b/packages/api/package.json index 3d91f6b9e..fed799dc0 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -20,7 +20,7 @@ "run:prod": "make -C ./ deploy-prod env_file=.env.prod", "run:stg": "make -C ./ deploy-stg env_file=.env.staging", "run:test": "make -C ./ deploy-test env_file=.env.test", - "test:build": "pnpm build > /dev/null 2>&1 && pnpm copy:predicate-releases && cross-env TESTCONTAINERS_DB=true node --test-force-exit --test build/tests/*.tests.js", + "test:build": "pnpm build > /dev/null 2>&1 && pnpm copy:predicate-releases && cross-env TESTCONTAINERS_DB=true DOTENV_CONFIG_PATH=.env.test node -r dotenv/config --test-force-exit --test build/tests/*.tests.js", "database:populate": "chmod +x ./src/scripts/db-populate.sh && ./src/scripts/db-populate.sh", "database:clear": "chmod +x ./src/scripts/db-clear.sh && ./src/scripts/db-clear.sh", "copy:predicate-releases": "cp -r src/tests/mocks/predicate-release build/tests/mocks/" @@ -45,6 +45,7 @@ "axios": "1.13.5", "bakosafe": "0.6.3", "body-parser": "1.20.4", + "bull": "^4.16.5", "cheerio": "1.0.0-rc.12", "class-validator": "0.14.0", "cookie-parser": "1.4.6", @@ -57,6 +58,7 @@ "fuels": "0.103.0", "glob": "10.5.0", "handlebars": "4.7.8", + "ioredis": "^5.7.0", "joi": "17.4.0", "jsonwebtoken": "9.0.3", "morgan": "1.10.0", diff --git a/packages/api/src/database/migrations/1764300000000-add-retry-attempts-to-transactions.ts b/packages/api/src/database/migrations/1764300000000-add-retry-attempts-to-transactions.ts new file mode 100644 index 000000000..98a53046a --- /dev/null +++ b/packages/api/src/database/migrations/1764300000000-add-retry-attempts-to-transactions.ts @@ -0,0 +1,33 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +/** + * Migration to add retry_attempts column to transactions table. + * + * Stores an array of JSONB objects, one per send attempt, for auditing: + * { attempt: number, timestamp: string, error: string|null, duration_ms: number } + */ +export class AddRetryAttemptsToTransactions1764300000000 + implements MigrationInterface +{ + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + ALTER TABLE "transactions" + ADD COLUMN "retry_attempts" jsonb DEFAULT '[]' + `); + + console.log( + '[Migration] Added retry_attempts column to transactions table', + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + ALTER TABLE "transactions" + DROP COLUMN IF EXISTS "retry_attempts" + `); + + console.log( + '[Migration] Dropped retry_attempts column from transactions table', + ); + } +} diff --git a/packages/api/src/models/Transaction.ts b/packages/api/src/models/Transaction.ts index 0eacc15e1..2e70de799 100644 --- a/packages/api/src/models/Transaction.ts +++ b/packages/api/src/models/Transaction.ts @@ -112,6 +112,13 @@ class Transaction extends Base { @ManyToOne(() => Predicate) predicate: Predicate; + @Column({ + type: 'jsonb', + name: 'retry_attempts', + default: () => "'[]'", + }) + retryAttempts: object[]; + @OneToOne(() => RampTransaction, rampTransaction => rampTransaction.transaction, { nullable: true, }) diff --git a/packages/api/src/modules/transaction/controller.ts b/packages/api/src/modules/transaction/controller.ts index 2fac6e578..57d1e00dd 100644 --- a/packages/api/src/modules/transaction/controller.ts +++ b/packages/api/src/modules/transaction/controller.ts @@ -51,6 +51,7 @@ import { createTxHistoryEvent, mergeTransactionLists } from './utils'; import { emitTransaction } from '@src/socket/events'; import { SocketEvents, SocketUsernames } from '@src/socket/types'; +import { enqueueTransactionSubmit } from '@src/utils/submitTransactionQueue'; // todo: use this provider by session, and move to transactions const { FUEL_PROVIDER } = process.env; @@ -558,7 +559,7 @@ export class TransactionController { ); if (newStatus === TransactionStatus.PENDING_SENDER) { - await this.transactionService.sendToChain(transaction.hash, network); + await enqueueTransactionSubmit(transaction.hash, network.url); } await new NotificationService().transactionUpdate(transaction.id); @@ -858,7 +859,7 @@ export class TransactionController { params: { hash }, } = params; try { - await this.transactionService.sendToChain(hash.slice(2), params.network); // not wait for this + await enqueueTransactionSubmit(hash.slice(2), params.network.url); return successful(true, Responses.Ok); } catch (e) { logger.error({ error: e }, '[TX_SEND]'); diff --git a/packages/api/src/utils/submitTransactionQueue.ts b/packages/api/src/utils/submitTransactionQueue.ts new file mode 100644 index 000000000..bfd6e5dcf --- /dev/null +++ b/packages/api/src/utils/submitTransactionQueue.ts @@ -0,0 +1,99 @@ +import Queue from 'bull'; +import { logger } from '@src/config/logger'; + +const REDIS_URL_WRITE = + process.env.REDIS_URL_WRITE || 'redis://127.0.0.1:6379'; + +const QUEUE_SUBMIT_TRANSACTION = 'QUEUE_SUBMIT_TRANSACTION'; +const BACKOFF_STEP_MS = 5000; +const BACKOFF_CYCLE = 5; +const MAX_ATTEMPTS = 120; + +type SubmitTransactionJob = { + hash: string; + network_url: string; +}; + +function parseRedisUrl(url: string) { + const parsed = new URL(url); + const isLocal = parsed.hostname.includes('127.'); + return { + host: parsed.hostname, + port: Number(parsed.port) || 6379, + ...(!isLocal ? { tls: { rejectUnauthorized: false } } : {}), + }; +} + +const submitTransactionQueue = new Queue( + QUEUE_SUBMIT_TRANSACTION, + { + redis: parseRedisUrl(REDIS_URL_WRITE), + settings: { + backoffStrategies: { + cyclic: (attemptsMade: number) => { + const position = ((attemptsMade - 1) % BACKOFF_CYCLE) + 1; + return position * BACKOFF_STEP_MS; + }, + }, + }, + }, +); + +function jobIdForHash(hash: string) { + return `tx_submit_${hash}`; +} + +/** + * Enqueues a transaction for on-chain submission. + * + * Uses a deterministic jobId (tx_submit_{hash}) to prevent duplicates. + * If a failed job already exists for this hash, it is removed first + * so the transaction can be re-enqueued with fresh attempts. + */ +export async function enqueueTransactionSubmit( + hash: string, + networkUrl: string, +) { + const jobId = jobIdForHash(hash); + + try { + // Remove any existing failed job for this hash so it can be re-enqueued + const existingJob = await submitTransactionQueue.getJob(jobId); + if (existingJob) { + const state = await existingJob.getState(); + if (state === 'failed') { + await existingJob.remove(); + logger.info( + { hash, jobId, previousState: state }, + '[SUBMIT_TX_QUEUE] Removed previous failed job for re-enqueue', + ); + } else if (state === 'active' || state === 'waiting' || state === 'delayed') { + logger.info( + { hash, jobId, state }, + '[SUBMIT_TX_QUEUE] Job already in queue, skipping duplicate', + ); + return; + } + } + + const job = await submitTransactionQueue.add( + { hash, network_url: networkUrl }, + { + attempts: MAX_ATTEMPTS, + backoff: { type: 'cyclic' as any }, + removeOnComplete: true, + removeOnFail: false, + jobId, + }, + ); + logger.info( + { hash, jobId: job.id, network: networkUrl, maxAttempts: MAX_ATTEMPTS }, + '[SUBMIT_TX_QUEUE] Transaction enqueued for submission', + ); + } catch (e) { + logger.error( + { error: e, hash, network: networkUrl }, + '[SUBMIT_TX_QUEUE] Failed to enqueue transaction', + ); + } +} diff --git a/packages/worker/package.json b/packages/worker/package.json index ff034608c..8cd731ff5 100644 --- a/packages/worker/package.json +++ b/packages/worker/package.json @@ -16,10 +16,12 @@ "@envio-dev/hypersync-client": "0.6.2", "@types/bull": "^4.10.4", "@types/node-cron": "3.0.11", + "bakosafe": "0.6.3", "bull": "^4.16.5", "express": "4.21.2", "fuels": "0.103.0", "ioredis": "^5.7.0", + "socket.io-client": "4.7.5", "mongodb": "^6.18.0", "node-cron": "3.0.3", "pg": "8.5.1", diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index f04e0d111..b9b4c6d91 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -9,6 +9,7 @@ import assetQueue from "./queues/assetsValue/queue"; import { MongoDatabase } from "./clients/mongoClient"; import { PsqlClient } from "./clients"; import { userBlockSyncQueue, userLogoutSyncQueue, UserBlockSyncCron } from "./queues/userBlockSync"; +import { submitTransactionQueue } from "./queues/submitTransaction"; const { WORKER_PORT, @@ -57,6 +58,7 @@ createBullBoard({ new BullAdapter(assetQueue), new BullAdapter(userBlockSyncQueue), new BullAdapter(userLogoutSyncQueue), + new BullAdapter(submitTransactionQueue), ], serverAdapter, }); diff --git a/packages/worker/src/queues/submitTransaction/constants.ts b/packages/worker/src/queues/submitTransaction/constants.ts new file mode 100644 index 000000000..c63a2002e --- /dev/null +++ b/packages/worker/src/queues/submitTransaction/constants.ts @@ -0,0 +1,5 @@ +export const QUEUE_SUBMIT_TRANSACTION = "QUEUE_SUBMIT_TRANSACTION"; +export const MAX_ATTEMPTS = 120; +export const BACKOFF_STEP_MS = 5000; // +5s per attempt +export const BACKOFF_CYCLE = 5; // resets every 5 attempts +// 24 cycles x 75s (5+10+15+20+25) = 1800s = ~30 minutes total diff --git a/packages/worker/src/queues/submitTransaction/index.ts b/packages/worker/src/queues/submitTransaction/index.ts new file mode 100644 index 000000000..ec9407d29 --- /dev/null +++ b/packages/worker/src/queues/submitTransaction/index.ts @@ -0,0 +1,4 @@ +export * from "./types"; +export * from "./constants"; +export * from "./utils"; +export { default as submitTransactionQueue } from "./queue"; diff --git a/packages/worker/src/queues/submitTransaction/queue.ts b/packages/worker/src/queues/submitTransaction/queue.ts new file mode 100644 index 000000000..8f5b077f8 --- /dev/null +++ b/packages/worker/src/queues/submitTransaction/queue.ts @@ -0,0 +1,452 @@ +import Queue from "bull"; +import { redisConfig, PsqlClient } from "@/clients"; +import { Vault, TransactionStatus } from "bakosafe"; +import { Provider, transactionRequestify } from "fuels"; +import { hexlify } from "fuels"; +import { io } from "socket.io-client"; +import { + QUEUE_SUBMIT_TRANSACTION, + BACKOFF_STEP_MS, + BACKOFF_CYCLE, +} from "./constants"; +import type { QueueSubmitTransaction, RetryAttemptEntry } from "./types"; +import { isTransientError } from "./utils"; + +const submitTransactionQueue = new Queue( + QUEUE_SUBMIT_TRANSACTION, + { + redis: redisConfig, + settings: { + backoffStrategies: { + cyclic: (attemptsMade: number) => { + // Progressao de +5s, reseta a cada 5 tentativas + // 5s, 10s, 15s, 20s, 25s, 5s, 10s, 15s, 20s, 25s, ... + const position = ((attemptsMade - 1) % BACKOFF_CYCLE) + 1; + return position * BACKOFF_STEP_MS; + }, + }, + }, + } +); + +/** + * Extrai witnesses de uma transacao, replicando a logica de Transaction.getWitnesses() + * da API (models/Transaction.ts:185-211). + */ +function extractWitnesses( + resume: any, + txData: any +): string[] { + const witnesses = (resume.witnesses || []) + .filter((w: any) => !!w.signature) + .map((w: any) => w.signature); + + const txWitnesses = txData.witnesses || []; + + if ("bytecodeWitnessIndex" in txData) { + const { bytecodeWitnessIndex } = txData; + const bytecode = txWitnesses[bytecodeWitnessIndex]; + if (bytecode) { + witnesses.splice(bytecodeWitnessIndex, 0, hexlify(bytecode)); + } + } + + if ("witnessIndex" in txData) { + const { witnessIndex } = txData; + const bytecode = txWitnesses[witnessIndex]; + if (bytecode) { + witnesses.splice(witnessIndex, 0, hexlify(bytecode)); + } + } + + return witnesses; +} + +/** + * Appends an attempt to the retry_attempts array, grouping consecutive + * entries with the same error. If the last entry has the same error, + * it increments count and updates last_* fields instead of creating a new entry. + */ +function appendAttempt( + attempts: RetryAttemptEntry[], + attemptNumber: number, + error: string | null, + durationMs: number +): RetryAttemptEntry[] { + const now = new Date().toISOString(); + const last = attempts.length > 0 ? attempts[attempts.length - 1] : null; + + if (last && last.error === error) { + // Same error as last entry — merge + const updated = [...attempts]; + const prev = updated[updated.length - 1]; + const totalDuration = prev.avg_duration_ms * prev.count + durationMs; + const newCount = prev.count + 1; + updated[updated.length - 1] = { + ...prev, + last_attempt: attemptNumber, + last_timestamp: now, + count: newCount, + avg_duration_ms: Math.round(totalDuration / newCount), + }; + return updated; + } + + // Different error — new entry + return [ + ...attempts, + { + error, + first_attempt: attemptNumber, + last_attempt: attemptNumber, + count: 1, + first_timestamp: now, + last_timestamp: now, + avg_duration_ms: durationMs, + }, + ]; +} + +/** + * Notifica membros do vault via socket apos mudanca de status da tx. + * Replica o pattern de emitTransaction() da API (socket/events.ts:34-37). + */ +async function emitTransactionUpdate( + predicateId: string, + psql: any +): Promise { + try { + const members = await psql.query( + `SELECT user_id as id FROM predicate_members WHERE predicate_id = $1`, + [predicateId] + ); + const memberList = Array.isArray(members) + ? members + : members + ? [members] + : []; + + if (memberList.length === 0) return; + + const isDev = process.env.NODE_ENV === "development"; + const SOCKET_URL = isDev + ? process.env.SOCKET_URL + : process.env.WORKER_API_URL; + + if (!SOCKET_URL) { + console.warn( + `[${QUEUE_SUBMIT_TRANSACTION}] No SOCKET_URL configured, skipping notification` + ); + return; + } + + for (const member of memberList) { + const socket = io(SOCKET_URL, { + autoConnect: true, + auth: { + username: "[API]", + data: new Date(), + sessionId: member.id, + origin: SOCKET_URL, + }, + }); + + socket.on("connect", () => { + socket.emit("[TRANSACTION]", { + sessionId: member.id, + to: "[UI]", + type: "[UPDATED]", + }); + setTimeout(() => socket.disconnect(), 5000); + }); + + socket.on("connect_error", () => { + socket.disconnect(); + }); + } + } catch (e) { + console.error( + `[${QUEUE_SUBMIT_TRANSACTION}] Failed to emit socket:`, + e + ); + } +} + +submitTransactionQueue.process(async (job) => { + const { hash, network_url } = job.data; + const startTime = Date.now(); + const attemptNumber = job.attemptsMade + 1; + const maxAttempts = (job.opts.attempts as number) || 120; + const isFirstAttempt = attemptNumber === 1; + + console.log( + JSON.stringify({ + event: isFirstAttempt ? "tx_submit_start" : "tx_submit_retry", + queue: QUEUE_SUBMIT_TRANSACTION, + hash, + attempt: attemptNumber, + maxAttempts, + network: network_url, + jobId: job.id, + timestamp: new Date().toISOString(), + }) + ); + + const psql = await PsqlClient.connect(); + + // 1. Buscar tx no Postgres + const transaction = await psql.query( + `SELECT id, hash, tx_data as "txData", status, resume, network, predicate_id as "predicateId" + FROM transactions + WHERE hash = $1 + AND status NOT IN ('declined', 'failed', 'canceled') + ORDER BY "createdAt" DESC + LIMIT 1`, + [hash] + ); + + if (!transaction) { + console.log( + `[${QUEUE_SUBMIT_TRANSACTION}] Transaction ${hash} not found or not eligible` + ); + return { hash, status: "skipped" }; + } + + if (transaction.status !== TransactionStatus.PENDING_SENDER) { + console.log( + `[${QUEUE_SUBMIT_TRANSACTION}] Transaction ${hash} status is ${transaction.status}, skipping` + ); + return { hash, status: "skipped" }; + } + + // 2. Buscar predicate + const predicate = await psql.query( + `SELECT configurable, version, predicate_address as "predicateAddress" + FROM predicates + WHERE id = $1`, + [transaction.predicateId] + ); + + if (!predicate) { + console.error( + `[${QUEUE_SUBMIT_TRANSACTION}] Predicate not found for tx ${hash}` + ); + return { hash, status: "error", reason: "predicate_not_found" }; + } + + // 3. Montar Vault e tx + // IMPORTANTE: usar Provider do fuels, NUNCA BakoProvider. + // Vault.send() verifica `provider instanceof BakoProvider`: + // - BakoProvider → chama POST /transaction/send/:hash → enfileira de novo → LOOP INFINITO + // - Provider normal → provider.operations.submit() → direto na blockchain + const providerUrl = network_url.replace(/^https?:\/\/[^@]+@/, "https://"); + const provider = new Provider(providerUrl); + const vault = new Vault( + provider, + JSON.parse(predicate.configurable), + predicate.version + ); + + // 4. Extrair witnesses + const { resume, txData } = transaction; + const witnesses = extractWitnesses(resume, txData); + + const tx = transactionRequestify({ + ...txData, + witnesses, + }); + + // 5. Enviar on-chain + try { + const transactionResponse = await vault.send(tx); + const { gasUsed } = await transactionResponse.waitForResult(); + const durationMs = Date.now() - startTime; + + // 6. Sucesso: atualizar DB + const updatedResume = { + ...resume, + gasUsed: gasUsed.format(), + status: "success", + }; + + const existing = await psql.query( + `SELECT retry_attempts FROM transactions WHERE id = $1`, + [transaction.id] + ); + const retryAttempts = appendAttempt( + existing?.retry_attempts || [], + attemptNumber, + null, + durationMs + ); + + await psql.query( + `UPDATE transactions + SET status = 'success', + "sendTime" = NOW(), + "gasUsed" = $1, + resume = $2, + retry_attempts = $3 + WHERE id = $4`, + [ + gasUsed.format(), + JSON.stringify(updatedResume), + JSON.stringify(retryAttempts), + transaction.id, + ] + ); + + // 7. Notificar membros via socket + await emitTransactionUpdate(transaction.predicateId, psql); + + console.log( + JSON.stringify({ + event: "tx_submit_success", + queue: QUEUE_SUBMIT_TRANSACTION, + hash, + attempt: attemptNumber, + maxAttempts, + gasUsed: gasUsed.format(), + duration_ms: durationMs, + retriesNeeded: attemptNumber - 1, + jobId: job.id, + timestamp: new Date().toISOString(), + }) + ); + return { hash, status: "success", gasUsed: gasUsed.format() }; + } catch (e) { + const durationMs = Date.now() - startTime; + const errorObj = "toObject" in (e as any) ? (e as any).toObject() : e; + const errorStr = + typeof errorObj === "string" ? errorObj : JSON.stringify(errorObj); + const retriable = isTransientError(e); + const isLastAttempt = attemptNumber >= maxAttempts; + + const existing = await psql.query( + `SELECT retry_attempts FROM transactions WHERE id = $1`, + [transaction.id] + ); + const retryAttempts = appendAttempt( + existing?.retry_attempts || [], + attemptNumber, + errorStr, + durationMs + ); + + if (retriable && !isLastAttempt) { + // Erro transiente, ainda tem tentativas: manter PENDING_SENDER + await psql.query( + `UPDATE transactions + SET resume = $1, + retry_attempts = $2 + WHERE id = $3`, + [ + JSON.stringify({ + ...resume, + error: errorObj, + retry: { count: attemptNumber }, + }), + JSON.stringify(retryAttempts), + transaction.id, + ] + ); + + const cyclePosition = ((attemptNumber - 1) % BACKOFF_CYCLE) + 1; + const nextDelay = cyclePosition * BACKOFF_STEP_MS; + + console.warn( + JSON.stringify({ + event: "tx_submit_transient_error", + queue: QUEUE_SUBMIT_TRANSACTION, + hash, + attempt: attemptNumber, + maxAttempts, + attemptsRemaining: maxAttempts - attemptNumber, + error: errorStr, + retriable: true, + duration_ms: durationMs, + next_retry_ms: nextDelay, + jobId: job.id, + timestamp: new Date().toISOString(), + }) + ); + throw e; // Re-throw → Bull retries with cyclic backoff + } + + // Erro permanente ou ultima tentativa: marcar FAILED + const updatedResume = { + ...resume, + gasUsed: "0.0", + status: "failed", + error: errorObj, + }; + + await psql.query( + `UPDATE transactions + SET status = 'failed', + "sendTime" = NOW(), + "gasUsed" = '0.0', + resume = $1, + retry_attempts = $2 + WHERE id = $3`, + [ + JSON.stringify(updatedResume), + JSON.stringify(retryAttempts), + transaction.id, + ] + ); + + // Notificar falha via socket + await emitTransactionUpdate(transaction.predicateId, psql); + + console.error( + JSON.stringify({ + event: "tx_submit_permanent_failure", + queue: QUEUE_SUBMIT_TRANSACTION, + hash, + attempt: attemptNumber, + maxAttempts, + error: errorStr, + retriable: false, + isLastAttempt, + duration_ms: durationMs, + totalElapsed_ms: Date.now() - startTime, + jobId: job.id, + timestamp: new Date().toISOString(), + }) + ); + return { hash, status: "failed" }; + } +}); + +// Event handlers +submitTransactionQueue.on("completed", (job, result) => { + console.log( + JSON.stringify({ + event: "tx_submit_job_completed", + queue: QUEUE_SUBMIT_TRANSACTION, + hash: result.hash, + result: result.status, + jobId: job.id, + totalAttempts: job.attemptsMade + 1, + timestamp: new Date().toISOString(), + }) + ); +}); + +submitTransactionQueue.on("failed", (job, err) => { + console.error( + JSON.stringify({ + event: "tx_submit_job_failed", + queue: QUEUE_SUBMIT_TRANSACTION, + hash: job.data.hash, + attempt: job.attemptsMade, + maxAttempts: job.opts.attempts, + error: err.message, + jobId: job.id, + timestamp: new Date().toISOString(), + }) + ); +}); + +export default submitTransactionQueue; diff --git a/packages/worker/src/queues/submitTransaction/types.ts b/packages/worker/src/queues/submitTransaction/types.ts new file mode 100644 index 000000000..0a1f9a7d2 --- /dev/null +++ b/packages/worker/src/queues/submitTransaction/types.ts @@ -0,0 +1,14 @@ +export type QueueSubmitTransaction = { + hash: string; + network_url: string; +}; + +export type RetryAttemptEntry = { + error: string | null; + first_attempt: number; + last_attempt: number; + count: number; + first_timestamp: string; + last_timestamp: string; + avg_duration_ms: number; +}; diff --git a/packages/worker/src/queues/submitTransaction/utils.ts b/packages/worker/src/queues/submitTransaction/utils.ts new file mode 100644 index 000000000..c84bd8f38 --- /dev/null +++ b/packages/worker/src/queues/submitTransaction/utils.ts @@ -0,0 +1,19 @@ +const TRANSIENT_PATTERNS = [ + "ECONNREFUSED", + "ETIMEDOUT", + "ENOTFOUND", + "socket hang up", + "network error", + "timeout", + "502", + "503", + "504", + "rate limit", + "AbortError", + "FetchError", +]; + +export function isTransientError(error: unknown): boolean { + const msg = String((error as any)?.message ?? error).toLowerCase(); + return TRANSIENT_PATTERNS.some((p) => msg.includes(p.toLowerCase())); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2fe94c626..3214c6983 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -84,6 +84,9 @@ importers: body-parser: specifier: 1.20.4 version: 1.20.4 + bull: + specifier: ^4.16.5 + version: 4.16.5 cheerio: specifier: 1.0.0-rc.12 version: 1.0.0-rc.12 @@ -120,6 +123,9 @@ importers: handlebars: specifier: 4.7.8 version: 4.7.8 + ioredis: + specifier: ^5.7.0 + version: 5.9.2 joi: specifier: 17.4.0 version: 17.4.0 @@ -430,6 +436,9 @@ importers: '@types/node-cron': specifier: 3.0.11 version: 3.0.11 + bakosafe: + specifier: 0.6.3 + version: 0.6.3(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) bull: specifier: ^4.16.5 version: 4.16.5 @@ -454,6 +463,9 @@ importers: redis: specifier: 4.7.0 version: 4.7.0 + socket.io-client: + specifier: 4.7.5 + version: 4.7.5 ts-node: specifier: ^10.9.2 version: 10.9.2(@types/node@20.6.0)(typescript@5.4.5) From ba6c06f9ce62d348f213d647ef13bdb70249ef02 Mon Sep 17 00:00:00 2001 From: guimroque Date: Wed, 8 Apr 2026 15:37:19 -0300 Subject: [PATCH 02/16] docs: add RFC for transaction submit queue architecture --- docs/transaction-submit-queue.md | 338 +++++++++++++++++++++++++++++++ 1 file changed, 338 insertions(+) create mode 100644 docs/transaction-submit-queue.md diff --git a/docs/transaction-submit-queue.md b/docs/transaction-submit-queue.md new file mode 100644 index 000000000..d55f107c7 --- /dev/null +++ b/docs/transaction-submit-queue.md @@ -0,0 +1,338 @@ +# RFC: Bull Queue for Transaction Submission + +**Date:** 2026-04-08 +**Status:** Proposal +**Author:** Guilherme Roque + +--- + +## Problem + +`sendToChain()` lives in the API and is called synchronously, blocking the HTTP response. If `vault.send()` fails (network timeout, provider unavailable, gas estimation error), the transaction is permanently marked as `FAILED` with no retry. Additionally, it blocks the HTTP response indefinitely while waiting for on-chain confirmation. + +## Entry Point Mapping + +All on-chain send paths converge to `sendToChain()` in `services.ts:600`. No component sends directly to the blockchain outside this method. + +### Direct call sites of `sendToChain()` + +| # | Location | Trigger | Entry path | +|---|----------|---------|------------| +| 1 | `controller.ts:561` — `signByID()` | Signature quorum reached | `PUT /transaction/sign/:hash` | +| 2 | `controller.ts:861` — `send()` | Explicit send call | `POST /transaction/send/:hash` | + +### Who calls these endpoints + +``` + ┌─────────────────────────────────────┐ + │ sendToChain() │ + │ services.ts:600 │ + └──────────┬──────────────┬───────────┘ + │ │ + signByID() │ send() │ + controller:561 │ controller:861 + │ │ + ┌──────────────────────────┤ │ + │ │ │ + PUT /sign/:hash PUT /sign/:hash POST /send/:hash + │ │ │ + │ │ │ + ┌─────────┴──────────┐ ┌───────────┴────────┐ ┌────┴──────────────────┐ + │ UI: user signs │ │ Socket Server: │ │ SDK: BakoProvider │ + │ from vault │ │ TX_SIGN handler │ │ .send(hash) │ + │ dashboard │ │ (dApp flow) │ │ → Service │ + │ │ │ │ │ .sendTransaction() │ + └────────────────────┘ └─────────────────────┘ └───────────────────────┘ + │ + │ (previous step) + ┌───────┴───────────┐ + │ Socket Server: │ + │ TX_CREATE handler │ + │ vault.BakoTransfer │ + │ (only CREATES tx, │ + │ does NOT send) │ + └────────────────────┘ +``` + +### Flow 1: User signs from vault dashboard (UI) + +``` +UI → useSendTransaction() → Vault.send() with BakoProvider + → BakoProvider.send(hash) → Service.sendTransaction(hash) + → POST /transaction/send/:hash → sendToChain() +``` + +### Flow 2: dApp via socket (connector) + +``` +dApp Connector → Socket TX_REQUEST → Socket Server creates recovery code + → Socket TX_CREATE → Socket Server calls vault.BakoTransfer() (only saves tx to DB) + → UI displays tx for user review + → User signs → Socket TX_SIGN + → Socket Server calls PUT /transaction/sign/:hash on the API + → signByID() checks quorum → sendToChain() +``` + +The socket server is an authentication intermediary (temporary 2-min recovery codes). It **never** sends on-chain — it delegates to the API via `PUT /sign/:hash`. + +### Flow 3: Standalone SDK (without BakoProvider) + +``` +Vault.send() with regular Provider → provider.operations.submit() → direct to blockchain +``` + +This flow **does not go through the API** — it's for standalone SDK usage, outside the Bako Safe ecosystem. Not affected by the queue. + +### Conclusion + +Replacing `sendToChain()` with `enqueueTransactionSubmit()` at the **2 API call sites** (signByID and send) covers all app flows: direct signing, SDK-based sending, and dApp flow via socket. + +## Before vs After + +### How it works TODAY + +``` +User signs → quorum reached → API calls vault.send() immediately + │ + blocks HTTP + waiting for blockchain + │ + ┌─────┴──────┐ + │ │ + success failure + STATUS=SUCCESS STATUS=FAILED + (permanent, no retry) +``` + +- HTTP response only returns after blockchain confirms (or fails) +- Network error, timeout, provider down — that's it. Tx dies as FAILED +- User must create a new tx from scratch + +### How it will work AFTER + +``` +User signs → quorum reached → API enqueues to Bull → HTTP 200 immediate + │ + Worker picks from queue + calls vault.send() + │ + ┌─────────┴──────────┐ + │ │ + success failure + STATUS=SUCCESS │ + transient error? + (network, timeout) + ┌────┴────┐ + YES NO + │ │ + Bull retries STATUS=FAILED + up to 20x (permanent) + (5s,10s,15s,20s,25s, + 5s,10s,15s,20s,25s...) +``` + +### Key differences + +| Aspect | Before | After | +|--------|--------|-------| +| HTTP response | Blocks until blockchain confirms | Returns immediately | +| Network/timeout failure | Tx dies as FAILED | Bull retries up to 20x over ~5 min | +| Permanent error (funds, signature) | FAILED | FAILED (same) | +| Attempt auditing | None | `retry_attempts` column with timestamp, error, duration | +| Frontend changes | — | None. Tx shows "Sending..." during retries, socket notifies on resolution | + +--- + +## Solution + +Create a Bull queue `QUEUE_SUBMIT_TRANSACTION` in the worker. Every transaction that reaches quorum is enqueued. The worker executes the send logic (build Vault, `vault.send()`, `waitForResult()`) with automatic retry on transient errors. + +## Architecture Decisions + +### 1. Send logic moves to the worker + +`sendToChain` moves from the API to the Bull queue processor in the worker. Add `bakosafe` as a worker dependency (already has `fuels@0.103.0`). Eliminates coupling of a heavy operation to the HTTP cycle. + +### 2. API only enqueues + +`signByID` and `send` now call `enqueueTransactionSubmit(hash, networkUrl)` and return immediately. The API gains `bull` and `ioredis` as dependencies (producer only — never consumer). + +### 3. Worker accesses DB directly + +Already has `PsqlClient` (raw SQL via `pg`). Fetches transaction, predicate, updates status, records attempts. Does not use TypeORM. + +### 4. Worker emits socket events + +Uses `socket.io-client` (same lib the API uses in `SocketClient`) to notify vault members in real time after success or failure. + +### 5. Error classification + +- **Transient** (retry): ECONNREFUSED, ETIMEDOUT, ENOTFOUND, socket hang up, network error, timeout, 502, 503, 504, rate limit, AbortError, FetchError +- **Permanent** (FAILED): insufficient funds, predicate validation, invalid signature, not enough coins + +If transient and attempts remain: status stays `PENDING_SENDER`, Bull retries. If permanent or attempts exhausted: status goes to `FAILED`. + +### 6. Automatic retry — no manual retry + +Fully automatic system. No manual retry endpoint, no button in the frontend. + +### 7. Retry configuration + +| Parameter | Value | +|-----------|-------| +| Max attempts | 120 | +| Backoff | Cyclic: +5s per attempt, resets every 5 | +| Delay pattern | 5s, 10s, 15s, 20s, 25s, 5s, 10s, 15s, 20s, 25s, ... | +| Worst case total | ~30 minutes until terminal FAILED (24 cycles x 75s) | + +Backoff implemented via Bull's custom `backoffStrategies`: + +```typescript +backoffStrategies: { + cyclic: (attemptsMade: number) => { + const position = ((attemptsMade - 1) % 5) + 1; + return position * 5000; + }, +} +``` + +### 8. Attempt auditing + +New column `retry_attempts jsonb` (array) on the `transactions` table. Consecutive attempts with the same error are grouped into a single entry to avoid duplication: + +```typescript +{ + error: string | null, // error message (null on success) + first_attempt: number, // first attempt in this group + last_attempt: number, // last attempt in this group + count: number, // how many consecutive attempts with this error + first_timestamp: string, // ISO — when first attempt happened + last_timestamp: string, // ISO — when last attempt happened + avg_duration_ms: number, // average attempt duration in ms +} +``` + +Example: 85 consecutive ETIMEDOUT errors followed by a 503 phase and then success: +```json +[ + { "error": "ETIMEDOUT", "first_attempt": 1, "last_attempt": 85, "count": 85, "avg_duration_ms": 5000, ... }, + { "error": "503", "first_attempt": 86, "last_attempt": 100, "count": 15, "avg_duration_ms": 3200, ... }, + { "error": null, "first_attempt": 101, "last_attempt": 101, "count": 1, "avg_duration_ms": 2100, ... } +] +``` + +--- + +## Flow + +``` +1. signByID: quorum reached + → enqueueTransactionSubmit(hash, networkUrl) + → HTTP 200 returns immediately + +2. Bull queue: job arrives at worker + +3. Worker processor: + a. Fetch transaction from Postgres (status, txData, resume, network) + b. Fetch predicate (configurable, version) + c. Create FuelProvider with network URL + d. Instantiate Vault (bakosafe) with configurable and version + e. Extract witnesses (signatures) from resume + f. Build TransactionRequest via transactionRequestify() + g. vault.send(tx) + waitForResult() + +4. Result: + SUCCESS + → UPDATE status='success', gasUsed, retry_attempts + → Emit socket TRANSACTION_UPDATED to members + → Bull marks job as completed + + TRANSIENT ERROR (attempts remaining) + → UPDATE retry_attempts (record attempt) + → Status remains PENDING_SENDER + → throw → Bull retries with cyclic backoff + + PERMANENT ERROR or LAST ATTEMPT + → UPDATE status='failed', retry_attempts + → Emit socket TRANSACTION_UPDATED to members + → Bull marks job as completed +``` + +### Timing diagram (worst case) + +``` +Each cycle: 5s + 10s + 15s + 20s + 25s = 75s + +Cycle 1-5: 75s each (375s cumulative) +Cycle 6-10: 75s each (750s cumulative) +Cycle 11-15: 75s each (1125s cumulative) +Cycle 16-20: 75s each (1500s cumulative) +Cycle 21-24: 75s each (1800s cumulative) + ~30 min → FAILED +``` + +--- + +## Modified files + +### Worker (`packages/worker/`) + +| File | Action | +|------|--------| +| `package.json` | Add `bakosafe@0.6.3`, `socket.io-client@4.7.5` | +| `src/queues/submitTransaction/types.ts` | New — job types and audit entry | +| `src/queues/submitTransaction/constants.ts` | New — queue name, max attempts, backoff config | +| `src/queues/submitTransaction/utils.ts` | New — `isTransientError()` | +| `src/queues/submitTransaction/queue.ts` | New — processor with on-chain send logic | +| `src/queues/submitTransaction/index.ts` | New — exports | +| `src/index.ts` | Register queue in Bull Board | + +### API (`packages/api/`) + +| File | Action | +|------|--------| +| `package.json` | Add `bull@^4.16.5`, `ioredis@^5.7.0` | +| `src/utils/submitTransactionQueue.ts` | New — Bull producer + `enqueueTransactionSubmit()` | +| `src/modules/transaction/controller.ts` | `signByID` and `send`: replace `await sendToChain` with `enqueueTransactionSubmit` | +| `src/models/Transaction.ts` | Add `retryAttempts` column | +| `src/migrations/` | New migration: `AddRetryAttemptsToTransactions` | + +### Removed/deprecated + +| File | Action | +|------|--------| +| `src/modules/transaction/services.ts` | `sendToChain()` can be removed (logic moved to worker) | + +--- + +## Frontend impact + +No mandatory changes. The frontend already: +- Shows "Sending..." for `PENDING_SENDER` status +- Shows "Error" for `FAILED` status +- Listens to socket events `TRANSACTION_UPDATED` for real-time updates + +Optional: display attempt count by reading `resume.retry.count` in the `Status.tsx` component. + +--- + +## Verification + +1. **Enqueue**: reach quorum in signByID, verify job appears in Bull Board (`/worker/queues`) +2. **Send**: verify worker executes `vault.send()` and tx appears on-chain +3. **Retry**: mock invalid provider URL, verify 120 attempts with cyclic backoff in Bull Board +4. **Auditing**: verify `retry_attempts` column with an entry per attempt +5. **Non-blocking**: signByID returns HTTP 200 before `vault.send()` completes +6. **Socket**: frontend receives real-time update via socket after success/failure + +--- + +## Risks and mitigations + +| Risk | Mitigation | +|------|------------| +| **Infinite loop with BakoProvider** | Worker MUST use `Provider` from fuels, NEVER `BakoProvider`. `Vault.send()` in bakosafe checks `provider instanceof BakoProvider` — if BakoProvider, it calls `POST /transaction/send/:hash` which enqueues again → infinite loop. With a regular `Provider`, it goes directly to the blockchain via `provider.operations.submit()`. The API currently uses `FuelProvider` (a `Provider` wrapper from fuels) — the worker must do the same. | +| Duplicate tx on-chain (vault.send ok but waitForResult timeout) | Before sending, check if tx hash already exists on-chain via provider | +| Worker and API with different bakosafe versions | Keep the same version in both package.json files | +| Resume JSONB concurrency (worker and API writing) | Worker is the only writer after enqueue — API only reads | +| Bull queue loses jobs (Redis restart) | `removeOnFail: false` keeps failed jobs visible; Redis with AOF persistence | From 03e2937565628f2f4d8a1bdc47e0d8284698d614 Mon Sep 17 00:00:00 2001 From: guimroque Date: Wed, 8 Apr 2026 15:37:27 -0300 Subject: [PATCH 03/16] test: accept PENDING_SENDER status after async tx submission --- packages/api/src/tests/transaction.tests.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/api/src/tests/transaction.tests.ts b/packages/api/src/tests/transaction.tests.ts index c28dd150e..a06b4d171 100644 --- a/packages/api/src/tests/transaction.tests.ts +++ b/packages/api/src/tests/transaction.tests.ts @@ -504,7 +504,8 @@ test('Transaction Endpoints', async t => { assert.equal(finalTx.id, createdTx2.id); assert.ok( finalTx.status === TransactionStatus.SUCCESS || - finalTx.status === TransactionStatus.FAILED, + finalTx.status === TransactionStatus.FAILED || + finalTx.status === TransactionStatus.PENDING_SENDER, ); }, ); From c6da8f2560942a3aeca9c966a771ef44ad07e5f2 Mon Sep 17 00:00:00 2001 From: guimroque Date: Wed, 8 Apr 2026 15:40:56 -0300 Subject: [PATCH 04/16] chore: remove unused review documents --- DOCS_REVIEW.md | 465 ----------------------------------------------- TEST_ANALYSIS.md | 310 ------------------------------- 2 files changed, 775 deletions(-) delete mode 100644 DOCS_REVIEW.md delete mode 100644 TEST_ANALYSIS.md diff --git a/DOCS_REVIEW.md b/DOCS_REVIEW.md deleted file mode 100644 index e2a3e8880..000000000 --- a/DOCS_REVIEW.md +++ /dev/null @@ -1,465 +0,0 @@ -# Revisão de Documentação e Setup - bako-safe-api - -> Documento gerado durante análise de onboarding de novo desenvolvedor. -> Branch: `staging-docs-review` -> Data: 2026-02-05 - ---- - -## Status da Execução - -| Etapa | Status | Observações | -|-------|--------|-------------| -| Clone do repositório | ✅ OK | - | -| Checkout da branch | ✅ OK | - | -| pnpm install | ✅ OK | Warning sobre `resolutions` no package.json da API | -| Copiar .env files | ✅ OK | - | -| Criar docker network | ✅ OK | - | -| pnpm dev (Quick Start) | ❌ FALHA | Race condition + variáveis faltando | -| Manual Setup | ✅ OK | Funciona seguindo passo a passo | -| Migrations | ❌ FALHA | Script aponta para path inexistente | -| Testes | ⚠️ PARCIAL | 33/35 passaram, 2 falharam (cleanup assíncrono) | - -### Conclusão do Onboarding - -**Tempo gasto:** ~30 minutos (deveria ser ~5 minutos) - -**Bloqueadores encontrados:** -1. Quick Start (`pnpm dev`) não funciona out-of-the-box -2. `.env.example` incompleto - faltam variáveis críticas -3. `RIG_ID_CONTRACT` vazio causa crash da API -4. Script de migration quebrado - -**O que funcionou bem:** -- Testcontainers para testes (excelente DX) -- Setup manual com Docker funciona -- Estrutura de packages clara - ---- - -## Problemas Críticos Encontrados - -### 1. Race Condition na Inicialização (CRÍTICO) - -**Problema:** O comando `pnpm dev` falha porque o Turbo inicia todos os serviços em paralelo. A API e o Socket-Server tentam conectar ao banco de dados antes dele estar healthy. - -**Erro observado:** -``` -bakosafe-api:dev: Error: connect ECONNREFUSED 127.0.0.1:5432 -bakosafe-socket-server:dev: Error: getaddrinfo ENOTFOUND db -``` - -**Causa raiz:** O `turbo.json` define dependências entre tasks, mas as tasks de infraestrutura (db, redis, chain) não bloqueiam adequadamente as tasks de aplicação. - -**Impacto:** Desenvolvedor não consegue usar o Quick Start documentado. - -**Sugestão de correção:** -- Opção A: Adicionar script de wait-for-it antes de iniciar API/Socket -- Opção B: Documentar que deve-se usar o Manual Setup -- Opção C: Separar `pnpm dev:infra` de `pnpm dev:app` - ---- - -### 2. Socket-Server .env.example com HOST Incorreto (CRÍTICO) - -**Arquivo:** `packages/socket-server/.env.example` - -**Problema:** -```env -DATABASE_HOST=db # Este é o hostname Docker interno! -``` - -**Deveria ser:** -```env -DATABASE_HOST=127.0.0.1 # Para desenvolvimento local fora do Docker -``` - -**Impacto:** Socket-Server não inicia em desenvolvimento local. - ---- - -### 3. UI_URL Inconsistente Entre Packages (MÉDIO) - -| Package | UI_URL | -|---------|--------| -| api | `http://localhost:5175` | -| socket-server | `http://localhost:5173` | - -**Impacto:** Confusão sobre qual porta o frontend deve rodar. - ---- - -## Gaps de Documentação - -### README.md Principal - -| Item | Status | Prioridade | -|------|--------|------------| -| Visão geral do projeto (o que é Bako Safe?) | ❌ Ausente | Alta | -| Arquitetura do sistema | ❌ Ausente | Alta | -| Diagrama de componentes | ❌ Ausente | Média | -| Descrição de cada package | ❌ Ausente | Alta | -| Como rodar migrations | ❌ Ausente | Alta | -| Configuração de Redis para API | ❌ Ausente | Alta | -| Como contribuir (CONTRIBUTING.md) | ❌ Ausente | Média | -| Troubleshooting expandido | ⚠️ Parcial | Média | - -### Variáveis de Ambiente Não Documentadas - -**packages/api/.env.example** - Variáveis sem explicação: -- `API_TOKEN_SECRET` / `API_TOKEN_SECRET_IV` - Para que servem? -- `API_SOCKET_SESSION_ID` - Valor hardcoded, é seguro? -- `FUEL_PROVIDER_CHAIN_ID` - Quando usar 0 vs 9889? -- `RIG_ID_CONTRACT` - Obrigatório? Onde obter? -- `DB_METABASE_USERNAME` / `DB_METABASE_PASS` - São necessários para dev? -- `COIN_MARKET_CAP_API_KEY` - Obrigatório? Funciona sem? - -### Packages Sem Documentação - -| Package | README | Descrição | -|---------|--------|-----------| -| api | ❌ Não | Apenas README de contracts/rig | -| chain | ❌ Não | Nenhuma doc | -| database | ❌ Não | Nenhuma doc | -| redis | ❌ Não | Nenhuma doc | -| socket-server | ❌ Não | Nenhuma doc | -| metabase | ❌ Não | Nenhuma doc | -| worker | ✅ Sim | Tem README completo | - ---- - -## Inconsistências no Código - -### 1. Variáveis Duplicadas em api/.env.example - -```env -ASSETS_URL=https://besafe-asset.s3.amazonaws.com/icon -ASSETS_URL=https://besafe-asset.s3.amazonaws.com/icon # DUPLICADO - -APP_ADMIN_EMAIL=admin_user_email -# ... -APP_ADMIN_EMAIL=admin_user_email # DUPLICADO -APP_ADMIN_PASSWORD=admin_user_password # DUPLICADO -``` - -### 2. Typo em worker/.env.example - -```env -WORKER_MONGO_ENVIRONMENT=devevelopment # Typo: "devevelopment" -``` - -### 3. Worker README Desatualizado - -O README menciona `pnpm worker:dev:start` mas esse script não existe no package.json do worker. - ---- - -## Documentação de API (Swagger/OpenAPI) - -**Status:** ❌ Inexistente - -**Endpoints identificados (sem documentação):** -- `/auth/*` - Autenticação -- `/user/*` - Usuários -- `/cli/*` - CLI Auth -- `/connections/*` - dApps -- `/api-token/*` - API Tokens -- `/workspace/*` - Workspaces -- `/predicate/*` - Predicates -- `/address-book/*` - Address Book -- `/transaction/*` - Transações -- `/notifications/*` - Notificações -- `/external/*` - Rotas externas -- `/ping` - Health check simples -- `/healthcheck` - Health check - ---- - -## Log de Execução - -### Tentativa 1: Quick Start (pnpm dev) - -```bash -$ pnpm install -# ✅ OK - 1315 packages instalados - -$ cp packages/api/.env.example packages/api/.env -$ cp packages/database/.env.example packages/database/.env -$ cp packages/redis/.env.example packages/redis/.env -$ cp packages/socket-server/.env.example packages/socket-server/.env -# ✅ OK - -$ docker network create bako-network -# ✅ OK - -$ pnpm dev -# ❌ FALHA -# - Redis: ✅ Healthy -# - Database: ✅ Healthy (após ~12s) -# - MongoDB: ✅ Healthy -# - Fuel Chain: ✅ Healthy -# - Socket-Server: ❌ Error: getaddrinfo ENOTFOUND db -# - API: ❌ Error: connect ECONNREFUSED 127.0.0.1:5432 -``` - -**Conclusão:** O Quick Start não funciona out-of-the-box. - ---- - -### Tentativa 2: Manual Setup - -```bash -# 1. Database -$ cd packages/database && docker compose --env-file .env.example up -d -# ✅ OK - postgres e mongodb healthy - -# 2. Redis -$ cd packages/redis && docker compose --env-file .env.example up -d -# ✅ OK - redis healthy - -# 3. Fuel Chain -$ cd packages/chain && docker compose -p bako-safe_dev --env-file .env.chain up -d --build -# ✅ OK - fuel-core e faucet rodando - -# 4. Socket Server -$ cd packages/socket-server && docker compose up -d --build -# ✅ OK - socket-server healthy - -# 5. API -$ cd packages/api && pnpm dev -# ❌ FALHA - Erro: FuelError: Unknown address format -``` - -**Erro na API:** -``` -FuelError: Unknown address format: only 'B256', 'Public Key (512)', or 'EVM Address' are supported. - at new Rig (/packages/api/src/contracts/rig/mainnet/types/Rig.ts:1645:5) - at Function.start (/packages/api/src/server/storage/rig.ts:35:19) -``` - -**Causa:** `RIG_ID_CONTRACT` está vazio no `.env.example` - ---- - -## Análise do .env Completo vs .env.example - -Comparando o arquivo de ambiente funcional com o `.env.example`: - -### Variáveis Faltando no .env.example (CRÍTICO) - -| Variável | Valor Exemplo | Descrição | -|----------|---------------|-----------| -| `REDIS_URL_WRITE` | `redis://localhost:6379` | URL do Redis para escrita | -| `REDIS_URL_READ` | `redis://localhost:6379` | URL do Redis para leitura | -| `WORKER_URL` | `http://localhost:3063` | URL do Worker | -| `MELD_SANDBOX_API_KEY` | `***` | API Key do MELD (sandbox) | -| `MELD_SANDBOX_API_URL` | `https://api-sb.meld.io/` | URL API MELD sandbox | -| `MELD_SANDBOX_WEBHOOK_SECRET` | `***` | Webhook secret MELD | -| `MELD_PRODUCTION_API_KEY` | `***` | API Key MELD produção | -| `MELD_PRODUCTION_API_URL` | `https://api.meld.io/` | URL API MELD produção | -| `MELD_PRODUCTION_WEBHOOK_SECRET` | `***` | Webhook secret MELD prod | -| `LAYERS_SWAP_API_URL` | `https://api.layerswap.io/api/v2` | URL LayerSwap | -| `LAYERS_SWAP_API_KEY_SANDBOX` | `***` | API Key LayerSwap sandbox | -| `LAYERS_SWAP_API_KEY_PROD` | `***` | API Key LayerSwap prod | -| `LAYERS_SWAP_WEBHOOK_SECRET` | `***` | Webhook LayerSwap | -| `ENABLE_BALANCE_CACHE` | `true` | Habilita cache de balance | -| `BALANCE_CACHE_TTL` | `300` | TTL do cache de balance | -| `BALANCE_INVALIDATION_TTL` | `3600` | TTL invalidação cache | -| `WARMUP_ENABLED` | `true` | Habilita warmup | -| `WARMUP_CONCURRENCY` | `5` | Concorrência warmup | -| `WARMUP_MAX_PREDICATES` | `20` | Max predicates warmup | -| `WARMUP_SKIP_CACHED` | `true` | Pula cached no warmup | -| `TRANSACTION_CACHE_TTL` | `600` | TTL cache transações | -| `TRANSACTION_INCREMENTAL_LIMIT` | `10` | Limite incremental | -| `INTERNAL_API_KEY` | `worker_api_key` | Chave interna para Worker | -| `NODE_ENV` | `development` | Ambiente Node | - -### Valores Incorretos no .env.example - -| Variável | .env.example | Valor Correto | -|----------|--------------|---------------| -| `FUEL_PROVIDER` | `http://127.0.0.1:4000/v1/graphql` | OK para local, mas falta opção testnet | -| `UI_URL` | `http://localhost:5175` | `http://localhost:5174` (inconsistente) | -| `RIG_ID_CONTRACT` | *(vazio)* | `0x2181f1b8e00756672515807cab7de10c70a9b472a4a9b1b6ca921435b0a1f49b` | - -### Sugestão: RIG_ID_CONTRACT como Constante - -O `RIG_ID_CONTRACT` é um endereço de contrato público na mainnet. Sugestão: - -```typescript -// src/constants/contracts.ts -export const RIG_CONTRACTS = { - MAINNET: '0x2181f1b8e00756672515807cab7de10c70a9b472a4a9b1b6ca921435b0a1f49b', - TESTNET: null, // não existe em testnet -} as const; -``` - -E no código usar fallback: -```typescript -const rigAddress = RIG_ID_CONTRACT || RIG_CONTRACTS.MAINNET; -``` - -Ou melhor ainda - tornar o RigInstance opcional em dev: -```typescript -if (RIG_ID_CONTRACT) { - this.rigCache = RigInstance.start(); -} -``` - ---- - ---- - -### Tentativa 3: Rodar Migrations - -```bash -$ cd packages/api && pnpm migration:run -# ❌ FALHA -``` - -**Erro:** -``` -Error: Unable to open file: "/packages/api/src/database" -Cannot find module '/packages/api/src/database' -``` - -**Causa:** O script `migration:run` no package.json aponta para `src/database` que é um **diretório**, não um arquivo: - -```json -"migration:run": "ts-node ... --dataSource src/database" -``` - -**Problema:** Não existe um arquivo `dataSource.ts` exportando o DataSource do TypeORM. A configuração real está em: -- `src/config/database.ts` - Função `getDatabaseConfig()` -- `src/config/connection.ts` - Função `getDatabaseInstance()` - -**Sugestão:** Criar arquivo `src/database/index.ts`: -```typescript -import { DataSource } from 'typeorm'; -import { getDatabaseConfig } from '../config/database'; - -export default new DataSource(getDatabaseConfig()); -``` - -Ou corrigir o script para: -```json -"migration:run": "ts-node ... --dataSource src/config/connection" -``` - ---- - -### Tentativa 4: Rodar Testes - -```bash -$ cd packages/api && pnpm test:build -``` - -**Resultado:** ⚠️ PARCIAL -- Total: 35 testes -- Passou: 33 -- Falhou: 2 - -**Erros encontrados:** -1. `build/tests/predicate.tests.js` - Falhou -2. `build/tests/user.tests.js` - Falhou com: - ``` - generated asynchronous activity after the test ended. - Error: App is not started - ``` - -**Análise:** Os erros parecem ser de cleanup assíncrono após os testes, não falhas funcionais. - -**Nota:** Os testes usam `testcontainers` que inicia um PostgreSQL automaticamente - isso é bem documentado e funciona. - ---- - -## Checklist de Correções Sugeridas - -### Prioridade 0 (Bloqueadores) - -- [x] Corrigir `DATABASE_HOST` em `packages/socket-server/.env.example` para `127.0.0.1` -- [x] Adicionar mecanismo de retry/wait na inicialização da API e Socket-Server -- [x] Corrigir script `migration:run` - aponta para diretório inexistente (criado database/index.ts) -- [x] Adicionar variáveis de Redis faltando no `.env.example` (`REDIS_URL_WRITE`, `REDIS_URL_READ`) -- [x] Adicionar `RIG_ID_CONTRACT` no `.env.example` ou tornar opcional em dev -- [x] Corrigir race condition no `pnpm dev` (wait-on + healthchecks) -- [x] Corrigir socket-server database config para aceitar 'postgres' como host local -- [x] Atualizar Makefiles para Docker Compose V2 syntax - -### Prioridade 1 (Essenciais) - -- [x] Adicionar seção "O que é Bako Safe?" no README -- [x] Documentar como rodar migrations -- [x] Documentar arquitetura dos packages -- [x] Unificar `UI_URL` entre packages (5173 vs 5175) -> 5174 -- [x] Adicionar configuração de Redis no `.env.example` da API -- [ ] Criar documentação Swagger/OpenAPI - -### Prioridade 2 (Melhorias) - -- [x] Remover variáveis duplicadas dos `.env.example` -- [x] Corrigir typo `devevelopment` no worker -- [x] Criar CONTRIBUTING.md -- [x] Adicionar diagrama de arquitetura (texto no README) -- [x] Atualizar README do worker com scripts corretos (já estava correto) - ---- - -## Próximos Passos - -1. ~~Tentar setup manual (passo a passo)~~ ✅ -2. ~~Testar migrations~~ ❌ Script quebrado -3. ~~Rodar testes~~ ⚠️ 33/35 passaram -4. Documentar fluxo completo funcional - ---- - -## Setup Manual Funcional (Testado) - -Para desenvolvedores novos, este é o fluxo que **realmente funciona**: - -```bash -# 1. Clone e setup inicial -git clone https://github.com/infinitybase/bako-safe-api.git -cd bako-safe-api -git checkout staging-docs-review -pnpm install - -# 2. Criar rede Docker -docker network create bako-network - -# 3. Copiar e configurar .env -cp packages/api/.env.example packages/api/.env -cp packages/database/.env.example packages/database/.env -cp packages/redis/.env.example packages/redis/.env -cp packages/socket-server/.env.example packages/socket-server/.env - -# IMPORTANTE: Editar packages/api/.env e adicionar: -# - REDIS_URL_WRITE=redis://localhost:6379 -# - REDIS_URL_READ=redis://localhost:6379 -# - RIG_ID_CONTRACT=0x2181f1b8e00756672515807cab7de10c70a9b472a4a9b1b6ca921435b0a1f49b - -# 4. Subir infraestrutura (em ordem!) -cd packages/database && docker compose --env-file .env.example up -d -# Aguardar containers ficarem healthy (~15s) -cd ../redis && docker compose --env-file .env.example up -d -cd ../chain && docker compose -p bako-safe_dev --env-file .env.chain up -d --build -cd ../socket-server && docker compose up -d --build - -# 5. Verificar todos os containers -docker ps -# Deve mostrar: postgres, mongodb-dev, redis-bako-dev, bakosafe_fuel-core, bakosafe_faucet, bako-socket-server - -# 6. Iniciar API -cd ../api && pnpm dev - -# 7. Testar -curl http://localhost:3333/ping -curl http://localhost:3333/healthcheck -``` - -### Para rodar testes (sem setup manual): -```bash -cd packages/api && pnpm test:build -# Usa testcontainers - não precisa de Docker rodando antes -``` diff --git a/TEST_ANALYSIS.md b/TEST_ANALYSIS.md deleted file mode 100644 index f66804961..000000000 --- a/TEST_ANALYSIS.md +++ /dev/null @@ -1,310 +0,0 @@ -# Análise de Estabilidade de Testes - bako-safe-api - -> **Data:** 2026-02-05 -> **Branch:** `staging-docs-review` -> **Autor:** Revisão de onboarding - ---- - -## Resumo Executivo - -| Métrica | Valor | Status | -|---------|-------|--------| -| Total de Testes | 73 | ✅ | -| Testes Passando | 73 | ✅ | -| Testes Falhando | 0 | ✅ | -| Cobertura de Módulos | 8/8 (100%) | ✅ | -| Testes Unitários | 0 | ⚠️ | -| CI Configurado | Sim (PRs + push main/staging) | ✅ | - ---- - -## Setup de Testes - -### Stack Utilizada - -- **Test Runner:** Node.js native test runner (`node:test`) -- **HTTP Testing:** supertest -- **Database:** Testcontainers (PostgreSQL isolado) -- **Blockchain:** `launchTestNode()` do Fuel SDK -- **Assertions:** `node:assert/strict` - -### Como Rodar - -```bash -cd packages/api -pnpm test:build # Build + testes com testcontainers -``` - -### Validação do Setup - -| Item | Status | Observação | -|------|--------|------------| -| Testcontainers PostgreSQL | ✅ OK | Sobe container automaticamente | -| Fuel Test Node | ⚠️ Parcial | Incompatibilidade de versão | -| Build antes de testes | ✅ OK | Compila TS para JS | -| Cleanup após testes | ✅ OK | `t.after()` + `App.stop()` | -| CI GitHub Actions | ✅ OK | Roda em PRs | - ---- - -## Problema Identificado no Setup - -### Incompatibilidade de Versão fuel-core vs SDK - -**Erro observado:** -``` -InsufficientFeeAmount { expected: 1430, provided: 1000 } - -The Fuel Node that you are trying to connect to is using fuel-core version 0.47.1. -The TS SDK currently supports fuel-core version 0.43.1. -Things may not work as expected. -``` - -**Causa:** O `launchTestNode()` do SDK sobe um fuel-core 0.47.1, mas o SDK `fuels@0.101.3` espera 0.43.1. - -**Impacto:** Teste `transaction.tests.ts` falha ao criar mock de transação (fee calculation incorreto). - -**Solução sugerida:** -1. Atualizar `fuels` para versão compatível com fuel-core 0.47.1 -2. OU fixar versão do fuel-core no testcontainers - ---- - -## Cobertura por Módulo - -### Módulos COM Testes - -| Módulo | Arquivo | Endpoints | Testes | Cobertura | -|--------|---------|-----------|--------|-----------| -| auth | `auth.tests.ts` | 4 | 4 | 100% | -| user | `user.tests.ts` | 5 | 4 | 80% | -| predicate | `predicate.tests.ts` | 10 | 9 | 90% | -| transaction | `transaction.tests.ts` | 12 | 14 | 100%+ | -| addressBook | `addressBook.tests.ts` | 4 | 4 | 100% | -| apiToken | `apiToken.tests.ts` | 3 | 3 | 100% | -| notification | `notification.tests.ts` | 3 | 2 | 66% | - -### Módulos Anteriormente SEM Testes (CORRIGIDO ✅) - -| Módulo | Endpoints | Testes | Status | -|--------|-----------|--------|--------| -| workspace | 7 | 9 | ✅ CORRIGIDO | -| dApps/connections | 9 | 10 | ✅ CORRIGIDO | -| cliToken | 3 | 4 | ✅ CORRIGIDO | -| external | 4 | 0 | ⚠️ P2 | - ---- - -## Detalhamento dos Testes Existentes - -### auth.tests.ts (4 testes) -- ✅ `POST /user` - criar usuário e autenticar -- ✅ `POST /auth/code` - regenerar código de autenticação -- ✅ `POST /auth/code` - gerar código com sucesso -- ✅ `DELETE /auth/sign-out` - logout - -### user.tests.ts (4 testes) -- ✅ `PUT /user/:id` - atualizar nickname -- ✅ `GET /user/predicates` - listar predicates do usuário -- ✅ `GET /user/latest/transactions` - listar transações recentes -- ✅ `GET /user/latest/tokens` - obter valores USD dos tokens - -### predicate.tests.ts (9 testes) -- ✅ `POST /predicate` - criar com versão -- ✅ `POST /predicate` - criar sem versão -- ✅ `GET /predicate` - listar com paginação -- ✅ `GET /predicate/:id` - buscar por ID -- ✅ `GET /predicate/by-name/:name` - buscar por nome -- ✅ `GET /predicate/by-address/:address` - buscar por endereço -- ✅ `GET /predicate/reserved-coins/:id` - obter balance -- ✅ `GET /predicate/check/by-address/:address` - verificar existência -- ✅ `PUT /predicate/:address/visibility` - toggle visibilidade - -### transaction.tests.ts (14 testes) -- ✅ `POST /transaction` - criar transação -- ✅ `GET /transaction` - listar transações -- ✅ `GET /transaction?page&perPage` - listar com paginação -- ✅ `GET /transaction?status[]` - filtrar por status -- ✅ `GET /transaction/:id` - buscar por ID -- ✅ `GET /transaction/by-hash/:hash` - buscar por hash -- ✅ `GET /transaction/history/:id/:predicateId` - histórico -- ✅ `GET /transaction/pending` - transações pendentes -- ✅ `PUT /transaction/sign/:hash` - assinar transação -- ✅ `GET /transaction/:id/advanced-details` - detalhes avançados -- ✅ `GET /transaction/with-incomings` - transações com incomings -- ✅ `PUT /transaction/close/:id` - fechar transação -- ✅ `PUT /transaction/cancel/:hash` - cancelar transação -- ✅ Fluxo completo: criar → cancelar → recriar → assinar - -### addressBook.tests.ts (4 testes) -- ✅ `POST /address-book` - criar entrada -- ✅ `PUT /address-book/:id` - atualizar -- ✅ `GET /address-book` - listar -- ✅ `DELETE /address-book/:id` - deletar - -### apiToken.tests.ts (3 testes) -- ✅ `POST /api-token/:predicateId` - criar token -- ✅ `GET /api-token/:predicateId` - listar tokens -- ✅ `DELETE /api-token/:predicateId/:apiTokenId` - deletar - -### notification.tests.ts (2 testes) -- ✅ `GET /notifications` - listar com paginação e filtros -- ✅ `PUT /notifications/read-all` - marcar todas como lidas - -### cliToken.tests.ts (0 testes ativos) -- ❌ `Encode` - **COMENTADO** -- ❌ `Decode` - **COMENTADO** -- ❌ `Decode with invalid token` - **COMENTADO** - ---- - -## Endpoints SEM Cobertura de Testes - -### workspace (7 endpoints) - CRÍTICO - -```typescript -// packages/api/src/modules/workspace/routes.ts -router.get('/by-user', ...) // listar workspaces do usuário -router.post('/', ...) // criar workspace -router.get('/:id', ...) // buscar por ID -router.put('/', ...) // atualizar workspace -router.put('/permissions/:member', ...) // atualizar permissões -router.post('/members/:member/remove', ...) // remover membro -router.post('/members/:member/include', ...) // adicionar membro -``` - -### dApps/connections (9 endpoints) - CRÍTICO - -```typescript -// packages/api/src/modules/dApps/routes.ts -router.post('/', ...) // conectar dApp -router.get('/:sessionId/transaction/:vaultAddress/:txId', ...) // código conector -router.put('/:sessionId/network', ...) // mudar rede -router.get('/:sessionId/state', ...) // estado da sessão -router.get('/:sessionId/accounts', ...) // contas disponíveis -router.get('/:sessionId/currentAccount', ...) // conta atual -router.get('/:sessionId/currentNetwork', ...) // rede atual -router.get('/:sessionId', ...) // sessão atual -router.delete('/:sessionId', ...) // desconectar -``` - -### external (4 endpoints) - -```typescript -// packages/api/src/modules/external/routes.ts -router.get('/predicate', ...) // listar predicates (API externa) -router.get('/user', ...) // listar users (API externa) -router.get('/quote', ...) // cotações -router.get('/tx', ...) // transações -``` - ---- - -## CI/CD - -### Configuração Atual - -```yaml -# .github/workflows/test-api.yml -name: Run API Tests - -on: - pull_request: - branches: - - "**" - -jobs: - test: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: ./.github/actions/setup-forc - - run: npm install -g pnpm - - uses: actions/setup-node@v4 - - run: pnpm install --no-frozen-lockfile - - run: cp .env.test .env - - run: pnpm test:build -``` - -### Problemas Identificados - -1. **Apenas em PRs** - Não roda em push para `main`/`master` -2. **Sem coverage report** - Não há métricas de cobertura -3. **Sem badge de status** - README não mostra status dos testes - ---- - -## Plano de Ação - -### P0 - Crítico (Fazer Agora) - -- [ ] **Corrigir incompatibilidade fuel-core vs SDK** - - Atualizar `fuels` ou fixar versão do fuel-core - - Responsável: ___ - - Prazo: ___ - -- [ ] **Adicionar testes para workspace** - - CRUD de workspaces - - Permissões (owner, admin, manager, viewer) - - Adicionar/remover membros - -- [ ] **Descomentar ou remover cliToken.tests.ts** - - Testes comentados causam falsa sensação de cobertura - -### P1 - Alta Prioridade - -- [ ] **Adicionar testes para dApps/connections** - - Fluxo de conexão completo - - Mudança de rede - - Disconnect - -- [ ] **Configurar coverage report** - - Adicionar `c8` ou `nyc` - - Threshold mínimo: 70% - - Falhar CI se abaixo do threshold - -- [ ] **CI em push para branches principais** - - Adicionar trigger: `push: branches: [main, staging]` - -### P2 - Média Prioridade - -- [ ] **Adicionar testes para external routes** -- [ ] **Adicionar testes unitários para services** -- [ ] **Adicionar testes de edge cases** (validações, erros 4xx/5xx) -- [ ] **Badge de status no README** - ---- - -## Conclusão - -### Os testes validam que o sistema continua funcionando? - -**PARCIALMENTE** - -| Aspecto | Validado? | -|---------|-----------| -| Autenticação | ✅ Sim | -| Gestão de Vaults (predicates) | ✅ Sim | -| Transações | ✅ Sim | -| Address Book | ✅ Sim | -| API Tokens | ✅ Sim | -| Notificações | ✅ Sim | -| **Workspaces/Permissões** | ❌ **NÃO** | -| **Integrações dApps** | ❌ **NÃO** | -| **CLI** | ❌ **NÃO** | - -### Risco de Regressão - -- **ALTO** para workspace e dApps (sem cobertura) -- **MÉDIO** para notification e external (cobertura parcial) -- **BAIXO** para auth, predicate, transaction (boa cobertura) - ---- - -## Referências - -- Arquivos de teste: `packages/api/src/tests/*.tests.ts` -- Setup de teste: `packages/api/src/tests/utils/Setup.ts` -- CI: `.github/workflows/test-api.yml` -- Mocks: `packages/api/src/tests/mocks/` From e6ac89b98bc9c8fe7436556417ec3f69444a9d39 Mon Sep 17 00:00:00 2001 From: guimroque Date: Wed, 8 Apr 2026 15:41:52 -0300 Subject: [PATCH 05/16] docs: update CHANGELOG.md --- CHANGELOG.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 863110cb5..f0de53843 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Bull queue for async transaction submission with automatic retry (120 attempts, ~30min cyclic backoff) +- Retry attempt auditing via `retry_attempts` column with grouped consecutive errors +- Structured JSON logs for transaction submission monitoring (start, retry, success, failure) +- RFC documentation for transaction submit queue architecture + ### Fixed - Upgrade fuels SDK to 0.103.0 and bakosafe to 0.6.2 for fuel-core 0.47.1 compatibility, fixing gas price estimation on transaction submit +- `test:build` script now loads `.env.test` via `DOTENV_CONFIG_PATH` + +### Changed + +- Transaction submission is now non-blocking — `signByID` and `send` enqueue to Bull instead of calling `sendToChain` synchronously ### Added From 2be87f1f01b5d230ffc2c8aa911295708742c0b7 Mon Sep 17 00:00:00 2001 From: guimroque Date: Wed, 8 Apr 2026 16:18:16 -0300 Subject: [PATCH 06/16] fix: emit socket with minimal tx data for frontend cache invalidation --- .../src/queues/submitTransaction/queue.ts | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/packages/worker/src/queues/submitTransaction/queue.ts b/packages/worker/src/queues/submitTransaction/queue.ts index 8f5b077f8..f15108a65 100644 --- a/packages/worker/src/queues/submitTransaction/queue.ts +++ b/packages/worker/src/queues/submitTransaction/queue.ts @@ -108,17 +108,18 @@ function appendAttempt( } /** - * Notifica membros do vault via socket apos mudanca de status da tx. - * Replica o pattern de emitTransaction() da API (socket/events.ts:34-37). + * Notifies vault members via socket with minimal transaction data. + * The frontend receives this and invalidates its cache, triggering + * a refetch from the API to get fully formatted data. */ -async function emitTransactionUpdate( - predicateId: string, +async function notifyTransactionUpdate( + transaction: { id: string; hash: string; status: string; predicateId: string }, psql: any ): Promise { try { const members = await psql.query( `SELECT user_id as id FROM predicate_members WHERE predicate_id = $1`, - [predicateId] + [transaction.predicateId] ); const memberList = Array.isArray(members) ? members @@ -133,12 +134,7 @@ async function emitTransactionUpdate( ? process.env.SOCKET_URL : process.env.WORKER_API_URL; - if (!SOCKET_URL) { - console.warn( - `[${QUEUE_SUBMIT_TRANSACTION}] No SOCKET_URL configured, skipping notification` - ); - return; - } + if (!SOCKET_URL) return; for (const member of memberList) { const socket = io(SOCKET_URL, { @@ -156,6 +152,12 @@ async function emitTransactionUpdate( sessionId: member.id, to: "[UI]", type: "[UPDATED]", + transaction: { + id: transaction.id, + hash: transaction.hash, + status: transaction.status, + predicateId: transaction.predicateId, + }, }); setTimeout(() => socket.disconnect(), 5000); }); @@ -166,8 +168,13 @@ async function emitTransactionUpdate( } } catch (e) { console.error( - `[${QUEUE_SUBMIT_TRANSACTION}] Failed to emit socket:`, - e + JSON.stringify({ + event: "tx_notify_error", + queue: QUEUE_SUBMIT_TRANSACTION, + transactionId: transaction.id, + error: (e as Error).message, + timestamp: new Date().toISOString(), + }) ); } } @@ -297,7 +304,10 @@ submitTransactionQueue.process(async (job) => { ); // 7. Notificar membros via socket - await emitTransactionUpdate(transaction.predicateId, psql); + await notifyTransactionUpdate( + { id: transaction.id, hash, status: "success", predicateId: transaction.predicateId }, + psql + ); console.log( JSON.stringify({ @@ -397,7 +407,10 @@ submitTransactionQueue.process(async (job) => { ); // Notificar falha via socket - await emitTransactionUpdate(transaction.predicateId, psql); + await notifyTransactionUpdate( + { id: transaction.id, hash, status: "failed", predicateId: transaction.predicateId }, + psql + ); console.error( JSON.stringify({ From dcfa5eb0af996215306480bf7d5cdca4ed87e83a Mon Sep 17 00:00:00 2001 From: guimroque Date: Wed, 8 Apr 2026 17:51:48 -0300 Subject: [PATCH 07/16] fix: add notify-result endpoint for worker post-submission callbacks --- .../api/src/modules/transaction/controller.ts | 52 +++++++++++ .../api/src/modules/transaction/routes.ts | 2 + .../src/queues/submitTransaction/queue.ts | 93 ++++++------------- 3 files changed, 80 insertions(+), 67 deletions(-) diff --git a/packages/api/src/modules/transaction/controller.ts b/packages/api/src/modules/transaction/controller.ts index 57d1e00dd..0196fa8e8 100644 --- a/packages/api/src/modules/transaction/controller.ts +++ b/packages/api/src/modules/transaction/controller.ts @@ -867,6 +867,58 @@ export class TransactionController { } } + /** + * Internal endpoint called by the worker after updating transaction status. + * Handles notification, cache invalidation and socket emission with full + * transaction data — replicating what sendToChain did after on-chain confirmation. + */ + async notifyResult(params: any) { + const { params: { id } } = params; + try { + const transaction = await Transaction.findOne({ + where: { id }, + relations: ['predicate', 'createdBy'], + }); + + if (!transaction) { + return successful(false, Responses.Ok); + } + + // Notification (email + in-app) on success + if (transaction.status === TransactionStatus.SUCCESS) { + await new NotificationService().transactionSuccess(id, transaction.network); + } + + // Cache invalidation (Redis balance + tx cache) + await this.transactionService.invalidateCaches(transaction); + + // Socket emission with full formatted data + const predicate = await this.predicateService.findByAddress( + transaction.predicate.predicateAddress, + ); + + const formattedTransaction = Transaction.formatTransactionResponse(transaction); + const transactionHistory = await TransactionController.formatTransactionsHistory( + transaction, + ); + + for (const member of predicate.members) { + emitTransaction(member.id, { + sessionId: member.id, + to: SocketUsernames.UI, + type: SocketEvents.TRANSACTION_UPDATED, + transaction: formattedTransaction, + history: transactionHistory as ITransactionHistory[], + }); + } + + return successful(true, Responses.Ok); + } catch (e) { + logger.error({ error: e }, '[TX_NOTIFY_RESULT]'); + return error(e?.error ?? e, e?.statusCode ?? 500); + } + } + async listAll(req: IListRequest) { try { const { page, perPage } = req.query; diff --git a/packages/api/src/modules/transaction/routes.ts b/packages/api/src/modules/transaction/routes.ts index 58497b117..bb3c0cba4 100644 --- a/packages/api/src/modules/transaction/routes.ts +++ b/packages/api/src/modules/transaction/routes.ts @@ -57,6 +57,7 @@ const { cancel, findAdvancedDetails, deleteByHash, + notifyResult, } = new TransactionController( transactionService, predicateService, @@ -66,6 +67,7 @@ const { router.get('/:id/advanced-details', handleResponse(findAdvancedDetails)); router.post('/send/:hash', handleResponse(send)); +router.post('/notify-result/:id', handleResponse(notifyResult)); router.use(authMiddleware); diff --git a/packages/worker/src/queues/submitTransaction/queue.ts b/packages/worker/src/queues/submitTransaction/queue.ts index f15108a65..815d9d8a1 100644 --- a/packages/worker/src/queues/submitTransaction/queue.ts +++ b/packages/worker/src/queues/submitTransaction/queue.ts @@ -3,7 +3,6 @@ import { redisConfig, PsqlClient } from "@/clients"; import { Vault, TransactionStatus } from "bakosafe"; import { Provider, transactionRequestify } from "fuels"; import { hexlify } from "fuels"; -import { io } from "socket.io-client"; import { QUEUE_SUBMIT_TRANSACTION, BACKOFF_STEP_MS, @@ -108,70 +107,36 @@ function appendAttempt( } /** - * Notifies vault members via socket with minimal transaction data. - * The frontend receives this and invalidates its cache, triggering - * a refetch from the API to get fully formatted data. + * Calls the API's /notify-result endpoint after updating the DB. + * The API handles notification (email + in-app), cache invalidation (Redis), + * and socket emission with full formatted transaction data. */ -async function notifyTransactionUpdate( - transaction: { id: string; hash: string; status: string; predicateId: string }, - psql: any -): Promise { +async function notifyTransactionResult(transactionId: string): Promise { + const API_URL = process.env.WORKER_API_URL || "http://localhost:3333"; + try { - const members = await psql.query( - `SELECT user_id as id FROM predicate_members WHERE predicate_id = $1`, - [transaction.predicateId] + const response = await fetch( + `${API_URL}/transaction/notify-result/${transactionId}`, + { method: "POST", headers: { "Content-Type": "application/json" } } ); - const memberList = Array.isArray(members) - ? members - : members - ? [members] - : []; - - if (memberList.length === 0) return; - - const isDev = process.env.NODE_ENV === "development"; - const SOCKET_URL = isDev - ? process.env.SOCKET_URL - : process.env.WORKER_API_URL; - - if (!SOCKET_URL) return; - - for (const member of memberList) { - const socket = io(SOCKET_URL, { - autoConnect: true, - auth: { - username: "[API]", - data: new Date(), - sessionId: member.id, - origin: SOCKET_URL, - }, - }); - - socket.on("connect", () => { - socket.emit("[TRANSACTION]", { - sessionId: member.id, - to: "[UI]", - type: "[UPDATED]", - transaction: { - id: transaction.id, - hash: transaction.hash, - status: transaction.status, - predicateId: transaction.predicateId, - }, - }); - setTimeout(() => socket.disconnect(), 5000); - }); - - socket.on("connect_error", () => { - socket.disconnect(); - }); + + if (!response.ok) { + console.error( + JSON.stringify({ + event: "tx_notify_result_failed", + queue: QUEUE_SUBMIT_TRANSACTION, + transactionId, + status: response.status, + timestamp: new Date().toISOString(), + }) + ); } } catch (e) { console.error( JSON.stringify({ - event: "tx_notify_error", + event: "tx_notify_result_error", queue: QUEUE_SUBMIT_TRANSACTION, - transactionId: transaction.id, + transactionId, error: (e as Error).message, timestamp: new Date().toISOString(), }) @@ -303,11 +268,8 @@ submitTransactionQueue.process(async (job) => { ] ); - // 7. Notificar membros via socket - await notifyTransactionUpdate( - { id: transaction.id, hash, status: "success", predicateId: transaction.predicateId }, - psql - ); + // 7. Notify API (email, cache invalidation, socket with full tx data) + await notifyTransactionResult(transaction.id); console.log( JSON.stringify({ @@ -406,11 +368,8 @@ submitTransactionQueue.process(async (job) => { ] ); - // Notificar falha via socket - await notifyTransactionUpdate( - { id: transaction.id, hash, status: "failed", predicateId: transaction.predicateId }, - psql - ); + // Notify API (socket with full tx data) + await notifyTransactionResult(transaction.id); console.error( JSON.stringify({ From 9e9af020ff91960cb1de122514697f2f4d2e3ee9 Mon Sep 17 00:00:00 2001 From: guimroque Date: Wed, 8 Apr 2026 17:56:57 -0300 Subject: [PATCH 08/16] fix: add shared secret and status validation to notify-result endpoint --- .../api/src/modules/transaction/controller.ts | 29 +++++++++++++++++++ .../src/queues/submitTransaction/queue.ts | 8 ++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/packages/api/src/modules/transaction/controller.ts b/packages/api/src/modules/transaction/controller.ts index 0196fa8e8..bf86ee866 100644 --- a/packages/api/src/modules/transaction/controller.ts +++ b/packages/api/src/modules/transaction/controller.ts @@ -875,6 +875,23 @@ export class TransactionController { async notifyResult(params: any) { const { params: { id } } = params; try { + // Validate shared secret (skip if not configured) + const expectedSecret = process.env.WORKER_SHARED_SECRET; + if (expectedSecret) { + const secret = params.headers?.['x-worker-secret']; + if (secret !== expectedSecret) { + logger.warn({ id }, '[TX_NOTIFY_RESULT] Unauthorized request'); + return error( + new BadRequest({ + type: ErrorTypes.Unauthorized, + title: 'Unauthorized', + detail: 'Invalid worker secret', + }), + 401, + ); + } + } + const transaction = await Transaction.findOne({ where: { id }, relations: ['predicate', 'createdBy'], @@ -884,6 +901,18 @@ export class TransactionController { return successful(false, Responses.Ok); } + // Only notify for terminal statuses + if ( + transaction.status !== TransactionStatus.SUCCESS && + transaction.status !== TransactionStatus.FAILED + ) { + logger.info( + { id, status: transaction.status }, + '[TX_NOTIFY_RESULT] Skipping non-terminal status', + ); + return successful(false, Responses.Ok); + } + // Notification (email + in-app) on success if (transaction.status === TransactionStatus.SUCCESS) { await new NotificationService().transactionSuccess(id, transaction.network); diff --git a/packages/worker/src/queues/submitTransaction/queue.ts b/packages/worker/src/queues/submitTransaction/queue.ts index 815d9d8a1..f3276d88d 100644 --- a/packages/worker/src/queues/submitTransaction/queue.ts +++ b/packages/worker/src/queues/submitTransaction/queue.ts @@ -115,9 +115,15 @@ async function notifyTransactionResult(transactionId: string): Promise { const API_URL = process.env.WORKER_API_URL || "http://localhost:3333"; try { + const headers: Record = { "Content-Type": "application/json" }; + const workerSecret = process.env.WORKER_SHARED_SECRET; + if (workerSecret) { + headers["x-worker-secret"] = workerSecret; + } + const response = await fetch( `${API_URL}/transaction/notify-result/${transactionId}`, - { method: "POST", headers: { "Content-Type": "application/json" } } + { method: "POST", headers } ); if (!response.ok) { From 1e6ab605fb782c2c4f4d3b4f98cfc95c92652905 Mon Sep 17 00:00:00 2001 From: guimroque Date: Wed, 8 Apr 2026 18:28:24 -0300 Subject: [PATCH 09/16] fix: use snake_case column names in worker SQL queries --- packages/worker/src/queues/submitTransaction/queue.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/worker/src/queues/submitTransaction/queue.ts b/packages/worker/src/queues/submitTransaction/queue.ts index f3276d88d..43bafcba3 100644 --- a/packages/worker/src/queues/submitTransaction/queue.ts +++ b/packages/worker/src/queues/submitTransaction/queue.ts @@ -178,7 +178,7 @@ submitTransactionQueue.process(async (job) => { FROM transactions WHERE hash = $1 AND status NOT IN ('declined', 'failed', 'canceled') - ORDER BY "createdAt" DESC + ORDER BY created_at DESC LIMIT 1`, [hash] ); @@ -261,8 +261,8 @@ submitTransactionQueue.process(async (job) => { await psql.query( `UPDATE transactions SET status = 'success', - "sendTime" = NOW(), - "gasUsed" = $1, + send_time = NOW(), + gas_used = $1, resume = $2, retry_attempts = $3 WHERE id = $4`, @@ -362,8 +362,8 @@ submitTransactionQueue.process(async (job) => { await psql.query( `UPDATE transactions SET status = 'failed', - "sendTime" = NOW(), - "gasUsed" = '0.0', + send_time = NOW(), + gas_used = '0.0', resume = $1, retry_attempts = $2 WHERE id = $3`, From 02919891478a6ce46a7c156cf8a03ac99bf927c5 Mon Sep 17 00:00:00 2001 From: guimroque Date: Thu, 9 Apr 2026 09:28:19 -0300 Subject: [PATCH 10/16] feat: stateless worker with fat job payload and dual Redis support --- .../api/src/modules/transaction/controller.ts | 66 +++- .../api/src/utils/submitTransactionQueue.ts | 54 +-- packages/worker/src/index.ts | 3 +- .../src/queues/submitTransaction/index.ts | 2 +- .../src/queues/submitTransaction/queue.ts | 352 +++++++----------- .../src/queues/submitTransaction/types.ts | 8 +- 6 files changed, 234 insertions(+), 251 deletions(-) diff --git a/packages/api/src/modules/transaction/controller.ts b/packages/api/src/modules/transaction/controller.ts index bf86ee866..8abf5a993 100644 --- a/packages/api/src/modules/transaction/controller.ts +++ b/packages/api/src/modules/transaction/controller.ts @@ -559,7 +559,16 @@ export class TransactionController { ); if (newStatus === TransactionStatus.PENDING_SENDER) { - await enqueueTransactionSubmit(transaction.hash, network.url); + await enqueueTransactionSubmit({ + hash: transaction.hash, + transactionId: transaction.id, + apiUrl: process.env.API_URL || '', + networkUrl: network.url, + txData: transaction.txData, + resume: transaction.resume, + predicateConfigurable: transaction.predicate.configurable, + predicateVersion: transaction.predicate.version, + }); } await new NotificationService().transactionUpdate(transaction.id); @@ -859,7 +868,26 @@ export class TransactionController { params: { hash }, } = params; try { - await enqueueTransactionSubmit(hash.slice(2), params.network.url); + const txHash = hash.startsWith('0x') ? hash.slice(2) : hash; + const transaction = await Transaction.findOne({ + where: { hash: txHash }, + relations: ['predicate'], + }); + + if (!transaction) { + return successful(false, Responses.Ok); + } + + await enqueueTransactionSubmit({ + hash: transaction.hash, + transactionId: transaction.id, + apiUrl: process.env.API_URL || '', + networkUrl: params.network.url, + txData: transaction.txData, + resume: transaction.resume, + predicateConfigurable: transaction.predicate.configurable, + predicateVersion: transaction.predicate.version, + }); return successful(true, Responses.Ok); } catch (e) { logger.error({ error: e }, '[TX_SEND]'); @@ -873,7 +901,7 @@ export class TransactionController { * transaction data — replicating what sendToChain did after on-chain confirmation. */ async notifyResult(params: any) { - const { params: { id } } = params; + const { params: { id }, body } = params; try { // Validate shared secret (skip if not configured) const expectedSecret = process.env.WORKER_SHARED_SECRET; @@ -892,6 +920,13 @@ export class TransactionController { } } + const { status, gasUsed, errorData, retryAttempts } = body || {}; + + // Only accept terminal statuses + if (status !== 'success' && status !== 'failed') { + return successful(false, Responses.Ok); + } + const transaction = await Transaction.findOne({ where: { id }, relations: ['predicate', 'createdBy'], @@ -901,17 +936,22 @@ export class TransactionController { return successful(false, Responses.Ok); } - // Only notify for terminal statuses - if ( - transaction.status !== TransactionStatus.SUCCESS && - transaction.status !== TransactionStatus.FAILED - ) { - logger.info( - { id, status: transaction.status }, - '[TX_NOTIFY_RESULT] Skipping non-terminal status', - ); - return successful(false, Responses.Ok); + // Update transaction in DB + transaction.status = status === 'success' + ? TransactionStatus.SUCCESS + : TransactionStatus.FAILED; + transaction.sendTime = new Date(); + transaction.gasUsed = gasUsed || '0.0'; + transaction.resume = { + ...transaction.resume, + gasUsed: gasUsed || '0.0', + status: transaction.status, + ...(errorData ? { error: errorData } : {}), + }; + if (retryAttempts) { + transaction.retryAttempts = retryAttempts; } + await transaction.save(); // Notification (email + in-app) on success if (transaction.status === TransactionStatus.SUCCESS) { diff --git a/packages/api/src/utils/submitTransactionQueue.ts b/packages/api/src/utils/submitTransactionQueue.ts index bfd6e5dcf..f4fe25634 100644 --- a/packages/api/src/utils/submitTransactionQueue.ts +++ b/packages/api/src/utils/submitTransactionQueue.ts @@ -9,9 +9,15 @@ const BACKOFF_STEP_MS = 5000; const BACKOFF_CYCLE = 5; const MAX_ATTEMPTS = 120; -type SubmitTransactionJob = { +export type SubmitTransactionJob = { hash: string; - network_url: string; + transactionId: string; + apiUrl: string; + networkUrl: string; + txData: any; + resume: any; + predicateConfigurable: string; + predicateVersion: string; }; function parseRedisUrl(url: string) { @@ -45,54 +51,48 @@ function jobIdForHash(hash: string) { /** * Enqueues a transaction for on-chain submission. - * - * Uses a deterministic jobId (tx_submit_{hash}) to prevent duplicates. - * If a failed job already exists for this hash, it is removed first - * so the transaction can be re-enqueued with fresh attempts. + * The job carries all data the worker needs — no DB access required. */ -export async function enqueueTransactionSubmit( - hash: string, - networkUrl: string, -) { - const jobId = jobIdForHash(hash); +export async function enqueueTransactionSubmit(payload: SubmitTransactionJob) { + const jobId = jobIdForHash(payload.hash); try { - // Remove any existing failed job for this hash so it can be re-enqueued const existingJob = await submitTransactionQueue.getJob(jobId); if (existingJob) { const state = await existingJob.getState(); if (state === 'failed') { await existingJob.remove(); logger.info( - { hash, jobId, previousState: state }, + { hash: payload.hash, jobId, previousState: state }, '[SUBMIT_TX_QUEUE] Removed previous failed job for re-enqueue', ); - } else if (state === 'active' || state === 'waiting' || state === 'delayed') { + } else if ( + state === 'active' || + state === 'waiting' || + state === 'delayed' + ) { logger.info( - { hash, jobId, state }, + { hash: payload.hash, jobId, state }, '[SUBMIT_TX_QUEUE] Job already in queue, skipping duplicate', ); return; } } - const job = await submitTransactionQueue.add( - { hash, network_url: networkUrl }, - { - attempts: MAX_ATTEMPTS, - backoff: { type: 'cyclic' as any }, - removeOnComplete: true, - removeOnFail: false, - jobId, - }, - ); + const job = await submitTransactionQueue.add(payload, { + attempts: MAX_ATTEMPTS, + backoff: { type: 'cyclic' as any }, + removeOnComplete: true, + removeOnFail: false, + jobId, + }); logger.info( - { hash, jobId: job.id, network: networkUrl, maxAttempts: MAX_ATTEMPTS }, + { hash: payload.hash, jobId: job.id, maxAttempts: MAX_ATTEMPTS }, '[SUBMIT_TX_QUEUE] Transaction enqueued for submission', ); } catch (e) { logger.error( - { error: e, hash, network: networkUrl }, + { error: e, hash: payload.hash }, '[SUBMIT_TX_QUEUE] Failed to enqueue transaction', ); } diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index b9b4c6d91..282be81c6 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -9,7 +9,7 @@ import assetQueue from "./queues/assetsValue/queue"; import { MongoDatabase } from "./clients/mongoClient"; import { PsqlClient } from "./clients"; import { userBlockSyncQueue, userLogoutSyncQueue, UserBlockSyncCron } from "./queues/userBlockSync"; -import { submitTransactionQueue } from "./queues/submitTransaction"; +import { submitTransactionQueue, submitTransactionQueueProd } from "./queues/submitTransaction"; const { WORKER_PORT, @@ -59,6 +59,7 @@ createBullBoard({ new BullAdapter(userBlockSyncQueue), new BullAdapter(userLogoutSyncQueue), new BullAdapter(submitTransactionQueue), + ...(submitTransactionQueueProd ? [new BullAdapter(submitTransactionQueueProd)] : []), ], serverAdapter, }); diff --git a/packages/worker/src/queues/submitTransaction/index.ts b/packages/worker/src/queues/submitTransaction/index.ts index ec9407d29..39a8cb64c 100644 --- a/packages/worker/src/queues/submitTransaction/index.ts +++ b/packages/worker/src/queues/submitTransaction/index.ts @@ -1,4 +1,4 @@ export * from "./types"; export * from "./constants"; export * from "./utils"; -export { default as submitTransactionQueue } from "./queue"; +export { default as submitTransactionQueue, submitTransactionQueueProd } from "./queue"; diff --git a/packages/worker/src/queues/submitTransaction/queue.ts b/packages/worker/src/queues/submitTransaction/queue.ts index 43bafcba3..0e0d12474 100644 --- a/packages/worker/src/queues/submitTransaction/queue.ts +++ b/packages/worker/src/queues/submitTransaction/queue.ts @@ -1,5 +1,5 @@ import Queue from "bull"; -import { redisConfig, PsqlClient } from "@/clients"; +import { redisConfig } from "@/clients"; import { Vault, TransactionStatus } from "bakosafe"; import { Provider, transactionRequestify } from "fuels"; import { hexlify } from "fuels"; @@ -11,31 +11,41 @@ import { import type { QueueSubmitTransaction, RetryAttemptEntry } from "./types"; import { isTransientError } from "./utils"; +const { WORKER_REDIS_HOST_PROD } = process.env; + +const queueSettings = { + backoffStrategies: { + cyclic: (attemptsMade: number) => { + const position = ((attemptsMade - 1) % BACKOFF_CYCLE) + 1; + return position * BACKOFF_STEP_MS; + }, + }, +}; + +// Primary queue — consumes from the default Redis (hmg/staging) const submitTransactionQueue = new Queue( QUEUE_SUBMIT_TRANSACTION, - { - redis: redisConfig, - settings: { - backoffStrategies: { - cyclic: (attemptsMade: number) => { - // Progressao de +5s, reseta a cada 5 tentativas - // 5s, 10s, 15s, 20s, 25s, 5s, 10s, 15s, 20s, 25s, ... - const position = ((attemptsMade - 1) % BACKOFF_CYCLE) + 1; - return position * BACKOFF_STEP_MS; - }, - }, - }, - } + { redis: redisConfig, settings: queueSettings } ); +// Secondary queue — consumes from prod Redis (if configured) +const isLocal = WORKER_REDIS_HOST_PROD?.includes("127.") ?? false; +const submitTransactionQueueProd = WORKER_REDIS_HOST_PROD + ? new Queue(QUEUE_SUBMIT_TRANSACTION, { + redis: { + host: WORKER_REDIS_HOST_PROD, + port: 6379, + ...(!isLocal ? { tls: { rejectUnauthorized: false } } : {}), + }, + settings: queueSettings, + }) + : null; + /** - * Extrai witnesses de uma transacao, replicando a logica de Transaction.getWitnesses() - * da API (models/Transaction.ts:185-211). + * Extracts witnesses from resume and txData, replicating + * Transaction.getWitnesses() from the API (models/Transaction.ts:192-214). */ -function extractWitnesses( - resume: any, - txData: any -): string[] { +function extractWitnesses(resume: any, txData: any): string[] { const witnesses = (resume.witnesses || []) .filter((w: any) => !!w.signature) .map((w: any) => w.signature); @@ -62,9 +72,7 @@ function extractWitnesses( } /** - * Appends an attempt to the retry_attempts array, grouping consecutive - * entries with the same error. If the last entry has the same error, - * it increments count and updates last_* fields instead of creating a new entry. + * Groups consecutive retry attempts with the same error into a single entry. */ function appendAttempt( attempts: RetryAttemptEntry[], @@ -76,7 +84,6 @@ function appendAttempt( const last = attempts.length > 0 ? attempts[attempts.length - 1] : null; if (last && last.error === error) { - // Same error as last entry — merge const updated = [...attempts]; const prev = updated[updated.length - 1]; const totalDuration = prev.avg_duration_ms * prev.count + durationMs; @@ -91,7 +98,6 @@ function appendAttempt( return updated; } - // Different error — new entry return [ ...attempts, { @@ -107,23 +113,37 @@ function appendAttempt( } /** - * Calls the API's /notify-result endpoint after updating the DB. - * The API handles notification (email + in-app), cache invalidation (Redis), - * and socket emission with full formatted transaction data. + * Calls the API's /notify-result endpoint to update DB, invalidate cache, + * emit socket with full data, and send notifications. */ -async function notifyTransactionResult(transactionId: string): Promise { - const API_URL = process.env.WORKER_API_URL || "http://localhost:3333"; +async function notifyTransactionResult( + apiUrl: string, + transactionId: string, + body: { + status: string; + gasUsed?: string; + errorData?: any; + retryAttempts?: RetryAttemptEntry[]; + } +): Promise { + const baseUrl = apiUrl.replace(/\/+$/, ""); try { - const headers: Record = { "Content-Type": "application/json" }; + const headers: Record = { + "Content-Type": "application/json", + }; const workerSecret = process.env.WORKER_SHARED_SECRET; if (workerSecret) { headers["x-worker-secret"] = workerSecret; } const response = await fetch( - `${API_URL}/transaction/notify-result/${transactionId}`, - { method: "POST", headers } + `${baseUrl}/transaction/notify-result/${transactionId}`, + { + method: "POST", + headers, + body: JSON.stringify(body), + } ); if (!response.ok) { @@ -132,6 +152,7 @@ async function notifyTransactionResult(transactionId: string): Promise { event: "tx_notify_result_failed", queue: QUEUE_SUBMIT_TRANSACTION, transactionId, + apiUrl: baseUrl, status: response.status, timestamp: new Date().toISOString(), }) @@ -143,6 +164,7 @@ async function notifyTransactionResult(transactionId: string): Promise { event: "tx_notify_result_error", queue: QUEUE_SUBMIT_TRANSACTION, transactionId, + apiUrl: baseUrl, error: (e as Error).message, timestamp: new Date().toISOString(), }) @@ -150,12 +172,26 @@ async function notifyTransactionResult(transactionId: string): Promise { } } -submitTransactionQueue.process(async (job) => { - const { hash, network_url } = job.data; +// Track retry attempts in memory per job (not persisted — sent to API on completion) +const jobAttempts = new Map(); + +async function processSubmitTransaction(job: Queue.Job) { + const { + hash, + transactionId, + apiUrl, + networkUrl, + txData, + resume, + predicateConfigurable, + predicateVersion, + } = job.data; + const startTime = Date.now(); const attemptNumber = job.attemptsMade + 1; const maxAttempts = (job.opts.attempts as number) || 120; const isFirstAttempt = attemptNumber === 1; + const jobKey = job.id?.toString() || hash; console.log( JSON.stringify({ @@ -164,118 +200,46 @@ submitTransactionQueue.process(async (job) => { hash, attempt: attemptNumber, maxAttempts, - network: network_url, + apiUrl, + network: networkUrl, jobId: job.id, timestamp: new Date().toISOString(), }) ); - const psql = await PsqlClient.connect(); - - // 1. Buscar tx no Postgres - const transaction = await psql.query( - `SELECT id, hash, tx_data as "txData", status, resume, network, predicate_id as "predicateId" - FROM transactions - WHERE hash = $1 - AND status NOT IN ('declined', 'failed', 'canceled') - ORDER BY created_at DESC - LIMIT 1`, - [hash] - ); - - if (!transaction) { - console.log( - `[${QUEUE_SUBMIT_TRANSACTION}] Transaction ${hash} not found or not eligible` - ); - return { hash, status: "skipped" }; - } - - if (transaction.status !== TransactionStatus.PENDING_SENDER) { - console.log( - `[${QUEUE_SUBMIT_TRANSACTION}] Transaction ${hash} status is ${transaction.status}, skipping` - ); - return { hash, status: "skipped" }; - } - - // 2. Buscar predicate - const predicate = await psql.query( - `SELECT configurable, version, predicate_address as "predicateAddress" - FROM predicates - WHERE id = $1`, - [transaction.predicateId] - ); - - if (!predicate) { - console.error( - `[${QUEUE_SUBMIT_TRANSACTION}] Predicate not found for tx ${hash}` - ); - return { hash, status: "error", reason: "predicate_not_found" }; - } - - // 3. Montar Vault e tx - // IMPORTANTE: usar Provider do fuels, NUNCA BakoProvider. - // Vault.send() verifica `provider instanceof BakoProvider`: - // - BakoProvider → chama POST /transaction/send/:hash → enfileira de novo → LOOP INFINITO - // - Provider normal → provider.operations.submit() → direto na blockchain - const providerUrl = network_url.replace(/^https?:\/\/[^@]+@/, "https://"); + // Build Vault and transaction from job data — no DB access needed + // IMPORTANT: use Provider from fuels, NEVER BakoProvider. + // Vault.send() checks `provider instanceof BakoProvider`: + // - BakoProvider → calls POST /transaction/send/:hash → enqueues again → INFINITE LOOP + // - Regular Provider → provider.operations.submit() → direct to blockchain + const providerUrl = networkUrl.replace(/^https?:\/\/[^@]+@/, "https://"); const provider = new Provider(providerUrl); const vault = new Vault( provider, - JSON.parse(predicate.configurable), - predicate.version + JSON.parse(predicateConfigurable), + predicateVersion ); - // 4. Extrair witnesses - const { resume, txData } = transaction; const witnesses = extractWitnesses(resume, txData); + const tx = transactionRequestify({ ...txData, witnesses }); - const tx = transactionRequestify({ - ...txData, - witnesses, - }); + // Get accumulated attempts for this job + const previousAttempts = jobAttempts.get(jobKey) || []; - // 5. Enviar on-chain try { const transactionResponse = await vault.send(tx); const { gasUsed } = await transactionResponse.waitForResult(); const durationMs = Date.now() - startTime; - // 6. Sucesso: atualizar DB - const updatedResume = { - ...resume, - gasUsed: gasUsed.format(), - status: "success", - }; + const retryAttempts = appendAttempt(previousAttempts, attemptNumber, null, durationMs); + jobAttempts.delete(jobKey); - const existing = await psql.query( - `SELECT retry_attempts FROM transactions WHERE id = $1`, - [transaction.id] - ); - const retryAttempts = appendAttempt( - existing?.retry_attempts || [], - attemptNumber, - null, - durationMs - ); - - await psql.query( - `UPDATE transactions - SET status = 'success', - send_time = NOW(), - gas_used = $1, - resume = $2, - retry_attempts = $3 - WHERE id = $4`, - [ - gasUsed.format(), - JSON.stringify(updatedResume), - JSON.stringify(retryAttempts), - transaction.id, - ] - ); - - // 7. Notify API (email, cache invalidation, socket with full tx data) - await notifyTransactionResult(transaction.id); + // Notify API — it handles DB update, cache, socket, notification + await notifyTransactionResult(apiUrl, transactionId, { + status: "success", + gasUsed: gasUsed.format(), + retryAttempts, + }); console.log( JSON.stringify({ @@ -300,34 +264,11 @@ submitTransactionQueue.process(async (job) => { const retriable = isTransientError(e); const isLastAttempt = attemptNumber >= maxAttempts; - const existing = await psql.query( - `SELECT retry_attempts FROM transactions WHERE id = $1`, - [transaction.id] - ); - const retryAttempts = appendAttempt( - existing?.retry_attempts || [], - attemptNumber, - errorStr, - durationMs - ); + const retryAttempts = appendAttempt(previousAttempts, attemptNumber, errorStr, durationMs); if (retriable && !isLastAttempt) { - // Erro transiente, ainda tem tentativas: manter PENDING_SENDER - await psql.query( - `UPDATE transactions - SET resume = $1, - retry_attempts = $2 - WHERE id = $3`, - [ - JSON.stringify({ - ...resume, - error: errorObj, - retry: { count: attemptNumber }, - }), - JSON.stringify(retryAttempts), - transaction.id, - ] - ); + // Store attempts in memory for next retry + jobAttempts.set(jobKey, retryAttempts); const cyclePosition = ((attemptNumber - 1) % BACKOFF_CYCLE) + 1; const nextDelay = cyclePosition * BACKOFF_STEP_MS; @@ -351,31 +292,15 @@ submitTransactionQueue.process(async (job) => { throw e; // Re-throw → Bull retries with cyclic backoff } - // Erro permanente ou ultima tentativa: marcar FAILED - const updatedResume = { - ...resume, - gasUsed: "0.0", - status: "failed", - error: errorObj, - }; + jobAttempts.delete(jobKey); - await psql.query( - `UPDATE transactions - SET status = 'failed', - send_time = NOW(), - gas_used = '0.0', - resume = $1, - retry_attempts = $2 - WHERE id = $3`, - [ - JSON.stringify(updatedResume), - JSON.stringify(retryAttempts), - transaction.id, - ] - ); - - // Notify API (socket with full tx data) - await notifyTransactionResult(transaction.id); + // Permanent failure — notify API + await notifyTransactionResult(apiUrl, transactionId, { + status: "failed", + gasUsed: "0.0", + errorData: errorObj, + retryAttempts, + }); console.error( JSON.stringify({ @@ -388,43 +313,54 @@ submitTransactionQueue.process(async (job) => { retriable: false, isLastAttempt, duration_ms: durationMs, - totalElapsed_ms: Date.now() - startTime, jobId: job.id, timestamp: new Date().toISOString(), }) ); return { hash, status: "failed" }; } -}); +} -// Event handlers -submitTransactionQueue.on("completed", (job, result) => { - console.log( - JSON.stringify({ - event: "tx_submit_job_completed", - queue: QUEUE_SUBMIT_TRANSACTION, - hash: result.hash, - result: result.status, - jobId: job.id, - totalAttempts: job.attemptsMade + 1, - timestamp: new Date().toISOString(), - }) - ); -}); +function registerEventHandlers(queue: Queue.Queue) { + queue.on("completed", (job, result) => { + console.log( + JSON.stringify({ + event: "tx_submit_job_completed", + queue: QUEUE_SUBMIT_TRANSACTION, + hash: result.hash, + result: result.status, + jobId: job.id, + totalAttempts: job.attemptsMade + 1, + timestamp: new Date().toISOString(), + }) + ); + }); -submitTransactionQueue.on("failed", (job, err) => { - console.error( - JSON.stringify({ - event: "tx_submit_job_failed", - queue: QUEUE_SUBMIT_TRANSACTION, - hash: job.data.hash, - attempt: job.attemptsMade, - maxAttempts: job.opts.attempts, - error: err.message, - jobId: job.id, - timestamp: new Date().toISOString(), - }) - ); -}); + queue.on("failed", (job, err) => { + console.error( + JSON.stringify({ + event: "tx_submit_job_failed", + queue: QUEUE_SUBMIT_TRANSACTION, + hash: job.data.hash, + attempt: job.attemptsMade, + maxAttempts: job.opts.attempts, + error: err.message, + jobId: job.id, + timestamp: new Date().toISOString(), + }) + ); + }); +} + +// Register processor and event handlers on both queues +submitTransactionQueue.process(processSubmitTransaction); +registerEventHandlers(submitTransactionQueue); + +if (submitTransactionQueueProd) { + submitTransactionQueueProd.process(processSubmitTransaction); + registerEventHandlers(submitTransactionQueueProd); + console.log(`[${QUEUE_SUBMIT_TRANSACTION}] Prod Redis queue registered`); +} +export { submitTransactionQueueProd }; export default submitTransactionQueue; diff --git a/packages/worker/src/queues/submitTransaction/types.ts b/packages/worker/src/queues/submitTransaction/types.ts index 0a1f9a7d2..ed8177edf 100644 --- a/packages/worker/src/queues/submitTransaction/types.ts +++ b/packages/worker/src/queues/submitTransaction/types.ts @@ -1,6 +1,12 @@ export type QueueSubmitTransaction = { hash: string; - network_url: string; + transactionId: string; + apiUrl: string; + networkUrl: string; + txData: any; + resume: any; + predicateConfigurable: string; + predicateVersion: string; }; export type RetryAttemptEntry = { From dee3c425ed29cf9705c110d533f5e81c22e0225c Mon Sep 17 00:00:00 2001 From: guimroque Date: Thu, 9 Apr 2026 09:28:24 -0300 Subject: [PATCH 11/16] docs: update RFC with fat job architecture and dual Redis diagrams --- docs/transaction-submit-queue.md | 538 +++++++++++++++++++------------ 1 file changed, 325 insertions(+), 213 deletions(-) diff --git a/docs/transaction-submit-queue.md b/docs/transaction-submit-queue.md index d55f107c7..f13e02e8b 100644 --- a/docs/transaction-submit-queue.md +++ b/docs/transaction-submit-queue.md @@ -1,7 +1,7 @@ # RFC: Bull Queue for Transaction Submission **Date:** 2026-04-08 -**Status:** Proposal +**Status:** Implementation **Author:** Guilherme Roque --- @@ -10,173 +10,302 @@ `sendToChain()` lives in the API and is called synchronously, blocking the HTTP response. If `vault.send()` fails (network timeout, provider unavailable, gas estimation error), the transaction is permanently marked as `FAILED` with no retry. Additionally, it blocks the HTTP response indefinitely while waiting for on-chain confirmation. -## Entry Point Mapping +--- -All on-chain send paths converge to `sendToChain()` in `services.ts:600`. No component sends directly to the blockchain outside this method. +## Architecture Overview -### Direct call sites of `sendToChain()` +``` +┌─────────────┐ ┌──────────────┐ ┌──────────────┐ +│ Frontend │ │ API (stg) │ │ API (prod) │ +│ (UI/SDK) │ │ │ │ │ +└──────┬───────┘ └──────┬───────┘ └──────┬───────┘ + │ │ │ + │ PUT /sign/:hash │ │ + │ POST /send/:hash │ │ + │───────────────────────>│ │ + │ │ │ + │ HTTP 200 (immediate)│ │ + │<───────────────────────│ │ + │ │ │ + │ ┌───────┴────────┐ ┌───────┴────────┐ + │ │ Redis (hmg) │ │ Redis (prod) │ + │ │ LPUSH job │ │ LPUSH job │ + │ └───────┬────────┘ └───────┬────────┘ + │ │ │ + │ └───────────┬────────────┘ + │ │ + │ ┌────────┴────────┐ + │ │ Worker │ + │ │ (single inst) │ + │ │ │ + │ │ consumes both │ + │ │ Redis queues │ + │ └────────┬────────┘ + │ │ + │ │ vault.send(tx) + │ │ (direct to blockchain) + │ │ + │ ┌────────┴────────┐ + │ │ Fuel Blockchain │ + │ └────────┬────────┘ + │ │ + │ │ result (success/failed) + │ │ + │ ┌────────┴────────┐ + │ │ Worker │ + │ │ │ + │ │ POST /notify- │ + │ │ result/:id │ + │ │ (uses apiUrl │ + │ │ from job) │ + │ └───────┬─────────┘ + │ │ + │ ┌──────────────┴──────────────┐ + │ │ API (correct environment) │ + │ │ │ + │ │ • Update DB (status, gas) │ + │ │ • Send email notification │ + │ │ • Invalidate Redis cache │ + │ │ • Emit socket with full tx │ + │ └──────────────┬───────────────┘ + │ │ + │ socket [TRANSACTION] │ + │<──────────────────────────────────┘ + │ + │ handleAsyncResult: + │ toast.success() or toast.error() +``` -| # | Location | Trigger | Entry path | -|---|----------|---------|------------| -| 1 | `controller.ts:561` — `signByID()` | Signature quorum reached | `PUT /transaction/sign/:hash` | -| 2 | `controller.ts:861` — `send()` | Explicit send call | `POST /transaction/send/:hash` | +--- -### Who calls these endpoints +## Step-by-step Flow + +### 1. User signs the last required signature (Frontend) ``` - ┌─────────────────────────────────────┐ - │ sendToChain() │ - │ services.ts:600 │ - └──────────┬──────────────┬───────────┘ - │ │ - signByID() │ send() │ - controller:561 │ controller:861 - │ │ - ┌──────────────────────────┤ │ - │ │ │ - PUT /sign/:hash PUT /sign/:hash POST /send/:hash - │ │ │ - │ │ │ - ┌─────────┴──────────┐ ┌───────────┴────────┐ ┌────┴──────────────────┐ - │ UI: user signs │ │ Socket Server: │ │ SDK: BakoProvider │ - │ from vault │ │ TX_SIGN handler │ │ .send(hash) │ - │ dashboard │ │ (dApp flow) │ │ → Service │ - │ │ │ │ │ .sendTransaction() │ - └────────────────────┘ └─────────────────────┘ └───────────────────────┘ - │ - │ (previous step) - ┌───────┴───────────┐ - │ Socket Server: │ - │ TX_CREATE handler │ - │ vault.BakoTransfer │ - │ (only CREATES tx, │ - │ does NOT send) │ - └────────────────────┘ +useSendTransaction → executeTransaction() + → toast.loading("Sending your transaction") + → setIsCurrentTxPending({ isPending: true, transactionId: tx.id }) + → vault.send(tx) + → BakoProvider.send(hash) + → POST /transaction/send/:hash ``` -### Flow 1: User signs from vault dashboard (UI) +### 2. API receives and enqueues (API — staging or prod) + +The API serializes all transaction data into a "fat job" payload. The worker needs nothing else — no DB access required. ``` -UI → useSendTransaction() → Vault.send() with BakoProvider - → BakoProvider.send(hash) → Service.sendTransaction(hash) - → POST /transaction/send/:hash → sendToChain() +controller.signByID() or controller.send(): + → enqueueTransactionSubmit({ + hash, + transactionId, + apiUrl: process.env.API_URL, ← "https://stg-api.bako.global" or "https://api.bako.global" + networkUrl, ← Fuel provider URL + txData, ← full TransactionRequest (JSONB) + resume, ← witnesses, signatures, requiredSigners + predicateConfigurable, ← vault config (JSON string) + predicateVersion, ← predicate version + }) + → Bull LPUSH to Redis of the current environment + → HTTP 200 returns immediately ``` -### Flow 2: dApp via socket (connector) +### 3. Frontend receives HTTP 200 ``` -dApp Connector → Socket TX_REQUEST → Socket Server creates recovery code - → Socket TX_CREATE → Socket Server calls vault.BakoTransfer() (only saves tx to DB) - → UI displays tx for user review - → User signs → Socket TX_SIGN - → Socket Server calls PUT /transaction/sign/:hash on the API - → signByID() checks quorum → sendToChain() +BakoProvider.send() returns +getByHash(hash) → status = PENDING_SENDER (worker hasn't processed yet) +validateResult(PENDING_SENDER) → no action (loading toast already active) + +handleAsyncResult is listening for socket events... ``` -The socket server is an authentication intermediary (temporary 2-min recovery codes). It **never** sends on-chain — it delegates to the API via `PUT /sign/:hash`. +### 4. Worker picks the job from Redis -### Flow 3: Standalone SDK (without BakoProvider) +The worker is stateless for transaction submission — all data comes from the job payload. ``` -Vault.send() with regular Provider → provider.operations.submit() → direct to blockchain +Worker consumes from Redis hmg (staging) AND/OR Redis prod + → job.data has everything needed + → new Provider(networkUrl) ← regular Provider, NEVER BakoProvider + → new Vault(provider, predicateConfigurable, predicateVersion) + → extractWitnesses(resume, txData) + → transactionRequestify({...txData, witnesses}) + → vault.send(tx) ← direct to Fuel blockchain + → waitForResult() ← waits for on-chain confirmation ``` -This flow **does not go through the API** — it's for standalone SDK usage, outside the Bako Safe ecosystem. Not affected by the queue. +> **CRITICAL:** The worker uses `Provider` from fuels, never `BakoProvider`. +> `Vault.send()` checks `provider instanceof BakoProvider` internally: +> - BakoProvider → calls `POST /transaction/send/:hash` → enqueues again → **infinite loop** +> - Regular Provider → `provider.operations.submit()` → direct to blockchain -### Conclusion - -Replacing `sendToChain()` with `enqueueTransactionSubmit()` at the **2 API call sites** (signByID and send) covers all app flows: direct signing, SDK-based sending, and dApp flow via socket. - -## Before vs After - -### How it works TODAY +### 5a. Success ``` -User signs → quorum reached → API calls vault.send() immediately - │ - blocks HTTP - waiting for blockchain - │ - ┌─────┴──────┐ - │ │ - success failure - STATUS=SUCCESS STATUS=FAILED - (permanent, no retry) +Worker: + → POST {apiUrl}/transaction/notify-result/{transactionId} + headers: { x-worker-secret: WORKER_SHARED_SECRET } + body: { status: "success", gasUsed: "0.001", retryAttempts: [...] } + +API /notify-result: + → Update transaction in DB (status=SUCCESS, gasUsed, sendTime, retryAttempts) + → NotificationService.transactionSuccess() — email + in-app notification + → invalidateCaches() — Redis balance + tx cache + → emitTransaction() — socket [TRANSACTION] with full formatted tx data + +Frontend: + → useTransactionsSocketListener receives socket + → updateTransactions: replaces tx in React Query cache (full data) + → updateHistory: updates transaction history + → handleSignaturePending: invalidates pending queries + → useSendTransaction.handleAsyncResult: + → detects: isCurrentTxPending + matching transactionId + status SUCCESS + has name + → toast.success(tx) — closes loading toast, shows success with "View on explorer" + → setIsCurrentTxPending(false) ``` -- HTTP response only returns after blockchain confirms (or fails) -- Network error, timeout, provider down — that's it. Tx dies as FAILED -- User must create a new tx from scratch - -### How it will work AFTER +### 5b. Transient error (e.g., network timeout) ``` -User signs → quorum reached → API enqueues to Bull → HTTP 200 immediate - │ - Worker picks from queue - calls vault.send() - │ - ┌─────────┴──────────┐ - │ │ - success failure - STATUS=SUCCESS │ - transient error? - (network, timeout) - ┌────┴────┐ - YES NO - │ │ - Bull retries STATUS=FAILED - up to 20x (permanent) - (5s,10s,15s,20s,25s, - 5s,10s,15s,20s,25s...) +Worker: + → vault.send() throws error + → isTransientError("ETIMEDOUT") → true + → isLastAttempt? no (attempt 1/120) + → stores retry attempt in memory + → throw error → Bull retries with cyclic backoff (5s, 10s, 15s, 20s, 25s, 5s...) + +Frontend: nothing happens, loading toast continues ``` -### Key differences +### 5c. Permanent error or attempts exhausted -| Aspect | Before | After | -|--------|--------|-------| -| HTTP response | Blocks until blockchain confirms | Returns immediately | -| Network/timeout failure | Tx dies as FAILED | Bull retries up to 20x over ~5 min | -| Permanent error (funds, signature) | FAILED | FAILED (same) | -| Attempt auditing | None | `retry_attempts` column with timestamp, error, duration | -| Frontend changes | — | None. Tx shows "Sending..." during retries, socket notifies on resolution | +``` +Worker: + → POST {apiUrl}/transaction/notify-result/{transactionId} + body: { status: "failed", gasUsed: "0.0", errorData: {...}, retryAttempts: [...] } + +API /notify-result: + → Update transaction in DB (status=FAILED) + → invalidateCaches() + → emitTransaction() — socket with full tx data + +Frontend handleAsyncResult: + → detects status FAILED + → toast.error("Transaction failed") + → setIsCurrentTxPending(false) +``` --- -## Solution +## Dual Redis — Single Worker for Multiple Environments -Create a Bull queue `QUEUE_SUBMIT_TRANSACTION` in the worker. Every transaction that reaches quorum is enqueued. The worker executes the send logic (build Vault, `vault.send()`, `waitForResult()`) with automatic retry on transient errors. +A single worker instance consumes from both staging and production Redis queues. Each job carries `apiUrl` identifying which API to call back. -## Architecture Decisions +``` +┌─────────────────┐ ┌─────────────────┐ +│ API Staging │ │ API Production │ +│ │ │ │ +│ enqueue job │ │ enqueue job │ +│ apiUrl=stg-api │ │ apiUrl=api │ +└────────┬─────────┘ └────────┬─────────┘ + │ │ + ▼ ▼ +┌─────────────────┐ ┌─────────────────┐ +│ Redis hmg │ │ Redis prod │ +│ (WORKER_REDIS │ │ (WORKER_REDIS │ +│ _HOST) │ │ _HOST_PROD) │ +└────────┬─────────┘ └────────┬─────────┘ + │ │ + └───────────┬────────────┘ + │ + ┌────────┴────────┐ + │ Worker │ + │ │ + │ Queue 1: hmg │ + │ Queue 2: prod │ + │ Same processor │ + └────────┬────────┘ + │ + │ On completion: + │ POST {job.apiUrl}/transaction/notify-result/:id + │ + ┌───────────┴───────────┐ + │ │ + ▼ ▼ + stg-api.bako.global api.bako.global + (updates stg DB) (updates prod DB) +``` -### 1. Send logic moves to the worker +### Configuration -`sendToChain` moves from the API to the Bull queue processor in the worker. Add `bakosafe` as a worker dependency (already has `fuels@0.103.0`). Eliminates coupling of a heavy operation to the HTTP cycle. +| Environment Variable | Where | Value | +|---------------------|-------|-------| +| `WORKER_REDIS_HOST` | Worker | `master.bako-safe-hmg-elasticache...` (staging Redis) | +| `WORKER_REDIS_HOST_PROD` | Worker | `master.bako-safe-prod-elasticache...` (prod Redis) | +| `WORKER_SHARED_SECRET` | Worker + API | Shared secret for `/notify-result` auth (optional) | +| `API_URL` | API staging | `https://stg-api.bako.global` | +| `API_URL` | API prod | `https://api.bako.global` | +| `REDIS_URL_WRITE` | API staging | `rediss://master.bako-safe-hmg-elasticache...:6379` | +| `REDIS_URL_WRITE` | API prod | `rediss://master.bako-safe-prod-elasticache...:6379` | -### 2. API only enqueues +If `WORKER_REDIS_HOST_PROD` is not set, the worker only consumes from the default Redis. -`signByID` and `send` now call `enqueueTransactionSubmit(hash, networkUrl)` and return immediately. The API gains `bull` and `ioredis` as dependencies (producer only — never consumer). +--- -### 3. Worker accesses DB directly +## Entry Point Mapping -Already has `PsqlClient` (raw SQL via `pg`). Fetches transaction, predicate, updates status, records attempts. Does not use TypeORM. +All on-chain send paths converge to `enqueueTransactionSubmit()`. No component sends directly to the blockchain outside this path. -### 4. Worker emits socket events +| # | Location | Trigger | Entry path | +|---|----------|---------|------------| +| 1 | `controller.ts` — `signByID()` | Signature quorum reached | `PUT /transaction/sign/:hash` | +| 2 | `controller.ts` — `send()` | Explicit send call | `POST /transaction/send/:hash` | -Uses `socket.io-client` (same lib the API uses in `SocketClient`) to notify vault members in real time after success or failure. +### Who calls these endpoints + +``` + ┌──────────────────────────────────────┐ + │ enqueueTransactionSubmit() │ + └──────────┬──────────────┬────────────┘ + │ │ + signByID() │ send() │ + │ │ + ┌──────────────────────────┤ │ + │ │ │ + PUT /sign/:hash PUT /sign/:hash POST /send/:hash + │ │ │ + ┌─────────┴──────────┐ ┌───────────┴────────┐ ┌────┴──────────────────┐ + │ UI: user signs │ │ Socket Server: │ │ SDK: BakoProvider │ + │ from vault │ │ TX_SIGN handler │ │ .send(hash) │ + │ dashboard │ │ (dApp flow) │ │ → Service │ + │ │ │ │ │ .sendTransaction() │ + └────────────────────┘ └─────────────────────┘ └───────────────────────┘ +``` -### 5. Error classification +--- -- **Transient** (retry): ECONNREFUSED, ETIMEDOUT, ENOTFOUND, socket hang up, network error, timeout, 502, 503, 504, rate limit, AbortError, FetchError -- **Permanent** (FAILED): insufficient funds, predicate validation, invalid signature, not enough coins +## Fat Job Payload -If transient and attempts remain: status stays `PENDING_SENDER`, Bull retries. If permanent or attempts exhausted: status goes to `FAILED`. +The API serializes all data needed for submission into the job. The worker is stateless — no DB access required for transaction processing. -### 6. Automatic retry — no manual retry +```typescript +{ + hash: string; // transaction hash + transactionId: string; // transaction UUID (for notify-result callback) + apiUrl: string; // API URL of the originating environment + networkUrl: string; // Fuel provider URL (blockchain) + txData: TransactionRequest; // full transaction data (inputs, outputs, witnesses) + resume: ITransactionResume; // witnesses with signatures, requiredSigners + predicateConfigurable: string; // vault configurable (JSON string) + predicateVersion: string; // predicate version +} +``` -Fully automatic system. No manual retry endpoint, no button in the frontend. +--- -### 7. Retry configuration +## Retry Configuration | Parameter | Value | |-----------|-------| @@ -185,18 +314,27 @@ Fully automatic system. No manual retry endpoint, no button in the frontend. | Delay pattern | 5s, 10s, 15s, 20s, 25s, 5s, 10s, 15s, 20s, 25s, ... | | Worst case total | ~30 minutes until terminal FAILED (24 cycles x 75s) | -Backoff implemented via Bull's custom `backoffStrategies`: +### Error classification -```typescript -backoffStrategies: { - cyclic: (attemptsMade: number) => { - const position = ((attemptsMade - 1) % 5) + 1; - return position * 5000; - }, -} +- **Transient** (retry): ECONNREFUSED, ETIMEDOUT, ENOTFOUND, socket hang up, network error, timeout, 502, 503, 504, rate limit, AbortError, FetchError +- **Permanent** (FAILED immediately): insufficient funds, predicate validation, invalid signature, not enough coins + +### Timing diagram (worst case) + +``` +Each cycle: 5s + 10s + 15s + 20s + 25s = 75s + +Cycle 1-5: 75s each (375s cumulative) +Cycle 6-10: 75s each (750s cumulative) +Cycle 11-15: 75s each (1125s cumulative) +Cycle 16-20: 75s each (1500s cumulative) +Cycle 21-24: 75s each (1800s cumulative) + ~30 min → FAILED ``` -### 8. Attempt auditing +--- + +## Attempt Auditing New column `retry_attempts jsonb` (array) on the `transactions` table. Consecutive attempts with the same error are grouped into a single entry to avoid duplication: @@ -215,124 +353,98 @@ New column `retry_attempts jsonb` (array) on the `transactions` table. Consecuti Example: 85 consecutive ETIMEDOUT errors followed by a 503 phase and then success: ```json [ - { "error": "ETIMEDOUT", "first_attempt": 1, "last_attempt": 85, "count": 85, "avg_duration_ms": 5000, ... }, - { "error": "503", "first_attempt": 86, "last_attempt": 100, "count": 15, "avg_duration_ms": 3200, ... }, - { "error": null, "first_attempt": 101, "last_attempt": 101, "count": 1, "avg_duration_ms": 2100, ... } + { "error": "ETIMEDOUT", "first_attempt": 1, "last_attempt": 85, "count": 85, "avg_duration_ms": 5000 }, + { "error": "503", "first_attempt": 86, "last_attempt": 100, "count": 15, "avg_duration_ms": 3200 }, + { "error": null, "first_attempt": 101, "last_attempt": 101, "count": 1, "avg_duration_ms": 2100 } ] ``` --- -## Flow +## Notify Result Endpoint -``` -1. signByID: quorum reached - → enqueueTransactionSubmit(hash, networkUrl) - → HTTP 200 returns immediately - -2. Bull queue: job arrives at worker - -3. Worker processor: - a. Fetch transaction from Postgres (status, txData, resume, network) - b. Fetch predicate (configurable, version) - c. Create FuelProvider with network URL - d. Instantiate Vault (bakosafe) with configurable and version - e. Extract witnesses (signatures) from resume - f. Build TransactionRequest via transactionRequestify() - g. vault.send(tx) + waitForResult() - -4. Result: - SUCCESS - → UPDATE status='success', gasUsed, retry_attempts - → Emit socket TRANSACTION_UPDATED to members - → Bull marks job as completed - - TRANSIENT ERROR (attempts remaining) - → UPDATE retry_attempts (record attempt) - → Status remains PENDING_SENDER - → throw → Bull retries with cyclic backoff - - PERMANENT ERROR or LAST ATTEMPT - → UPDATE status='failed', retry_attempts - → Emit socket TRANSACTION_UPDATED to members - → Bull marks job as completed -``` +`POST /transaction/notify-result/:id` — internal endpoint called by the worker after on-chain submission. -### Timing diagram (worst case) +### Authentication -``` -Each cycle: 5s + 10s + 15s + 20s + 25s = 75s +- If `WORKER_SHARED_SECRET` is configured: validates `x-worker-secret` header +- If not configured: endpoint is open (for dev/staging without the secret) -Cycle 1-5: 75s each (375s cumulative) -Cycle 6-10: 75s each (750s cumulative) -Cycle 11-15: 75s each (1125s cumulative) -Cycle 16-20: 75s each (1500s cumulative) -Cycle 21-24: 75s each (1800s cumulative) - ~30 min → FAILED +### Request body + +```typescript +{ + status: "success" | "failed"; + gasUsed?: string; + errorData?: any; + retryAttempts?: RetryAttemptEntry[]; +} ``` +### What it does + +1. Validates shared secret (if configured) +2. Validates status is terminal (success or failed) +3. Updates transaction in DB (status, sendTime, gasUsed, resume, retryAttempts) +4. Sends email + in-app notification on success +5. Invalidates Redis cache (balance + transactions) +6. Emits socket `[TRANSACTION]` with fully formatted transaction data + history + +--- + +## Deduplication + +Jobs use a deterministic `jobId: tx_submit_{hash}`. Bull prevents duplicate jobs with the same ID. + +| Job state | New enqueue with same hash | Result | +|-----------|---------------------------|--------| +| waiting/active/delayed | Skipped (logged) | Protected | +| completed (`removeOnComplete: true`) | Accepted (job was removed) | Re-send works | +| failed (`removeOnFail: false`) | Previous job removed, new one created | Manual retry via `/send/:hash` works | + --- -## Modified files +## Modified Files ### Worker (`packages/worker/`) | File | Action | |------|--------| -| `package.json` | Add `bakosafe@0.6.3`, `socket.io-client@4.7.5` | -| `src/queues/submitTransaction/types.ts` | New — job types and audit entry | -| `src/queues/submitTransaction/constants.ts` | New — queue name, max attempts, backoff config | -| `src/queues/submitTransaction/utils.ts` | New — `isTransientError()` | -| `src/queues/submitTransaction/queue.ts` | New — processor with on-chain send logic | -| `src/queues/submitTransaction/index.ts` | New — exports | -| `src/index.ts` | Register queue in Bull Board | +| `package.json` | Add `bakosafe@0.6.3` | +| `src/queues/submitTransaction/types.ts` | Fat job type + audit entry type | +| `src/queues/submitTransaction/constants.ts` | Queue name, max attempts, backoff config | +| `src/queues/submitTransaction/utils.ts` | `isTransientError()` | +| `src/queues/submitTransaction/queue.ts` | Stateless processor, dual Redis, notify-result callback | +| `src/queues/submitTransaction/index.ts` | Exports | +| `src/index.ts` | Register both queues in Bull Board | ### API (`packages/api/`) | File | Action | |------|--------| | `package.json` | Add `bull@^4.16.5`, `ioredis@^5.7.0` | -| `src/utils/submitTransactionQueue.ts` | New — Bull producer + `enqueueTransactionSubmit()` | -| `src/modules/transaction/controller.ts` | `signByID` and `send`: replace `await sendToChain` with `enqueueTransactionSubmit` | +| `src/utils/submitTransactionQueue.ts` | Bull producer with fat job payload | +| `src/modules/transaction/controller.ts` | `signByID`/`send`: enqueue fat job; `notifyResult`: update DB + notify + socket | +| `src/modules/transaction/routes.ts` | Add `POST /notify-result/:id` route | | `src/models/Transaction.ts` | Add `retryAttempts` column | -| `src/migrations/` | New migration: `AddRetryAttemptsToTransactions` | +| `src/migrations/` | `AddRetryAttemptsToTransactions` | -### Removed/deprecated +### Frontend (`bako-safe-ui-stg`) | File | Action | |------|--------| -| `src/modules/transaction/services.ts` | `sendToChain()` can be removed (logic moved to worker) | - ---- - -## Frontend impact - -No mandatory changes. The frontend already: -- Shows "Sending..." for `PENDING_SENDER` status -- Shows "Error" for `FAILED` status -- Listens to socket events `TRANSACTION_UPDATED` for real-time updates - -Optional: display attempt count by reading `resume.retry.count` in the `Status.tsx` component. - ---- - -## Verification - -1. **Enqueue**: reach quorum in signByID, verify job appears in Bull Board (`/worker/queues`) -2. **Send**: verify worker executes `vault.send()` and tx appears on-chain -3. **Retry**: mock invalid provider URL, verify 120 attempts with cyclic backoff in Bull Board -4. **Auditing**: verify `retry_attempts` column with an entry per attempt -5. **Non-blocking**: signByID returns HTTP 200 before `vault.send()` completes -6. **Socket**: frontend receives real-time update via socket after success/failure +| `src/modules/transactions/hooks/send/useSendTransaction.ts` | `handleAsyncResult`: listens socket for tx completion, resolves loading toast | --- -## Risks and mitigations +## Risks and Mitigations | Risk | Mitigation | |------|------------| -| **Infinite loop with BakoProvider** | Worker MUST use `Provider` from fuels, NEVER `BakoProvider`. `Vault.send()` in bakosafe checks `provider instanceof BakoProvider` — if BakoProvider, it calls `POST /transaction/send/:hash` which enqueues again → infinite loop. With a regular `Provider`, it goes directly to the blockchain via `provider.operations.submit()`. The API currently uses `FuelProvider` (a `Provider` wrapper from fuels) — the worker must do the same. | -| Duplicate tx on-chain (vault.send ok but waitForResult timeout) | Before sending, check if tx hash already exists on-chain via provider | -| Worker and API with different bakosafe versions | Keep the same version in both package.json files | -| Resume JSONB concurrency (worker and API writing) | Worker is the only writer after enqueue — API only reads | -| Bull queue loses jobs (Redis restart) | `removeOnFail: false` keeps failed jobs visible; Redis with AOF persistence | +| **Infinite loop with BakoProvider** | Worker uses `Provider` from fuels, never `BakoProvider`. Enforced by code comment and architecture. | +| **Duplicate tx on-chain** | `vault.send()` ok but `waitForResult()` times out → check on-chain before retrying | +| **Worker/API bakosafe version mismatch** | Keep same version in both package.json files | +| **notify-result abuse** | Shared secret validation via `WORKER_SHARED_SECRET` + only accepts terminal statuses | +| **Bull queue loses jobs** | `removeOnFail: false` keeps failed jobs visible; Redis with AOF persistence | +| **Worker restart loses in-memory retry attempts** | Attempts are re-accumulated from scratch on restart; worst case is less granular audit data | +| **API_URL not configured** | Fat job carries `apiUrl` — if empty, notify-result silently fails; transaction stays PENDING_SENDER | From 9459fd36ac26a175f684eb2d7586c9c051799b08 Mon Sep 17 00:00:00 2001 From: guimroque Date: Thu, 9 Apr 2026 13:35:22 -0300 Subject: [PATCH 12/16] chore: bump bakosafe to 0.6.4 --- packages/api/package.json | 2 +- packages/worker/package.json | 2 +- pnpm-lock.yaml | 31 +++++++++++++++++++++++++++---- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/packages/api/package.json b/packages/api/package.json index fed799dc0..f0463c767 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -43,7 +43,7 @@ "@sentry/profiling-node": "8.32.0", "@testcontainers/postgresql": "11.0.0", "axios": "1.13.5", - "bakosafe": "0.6.3", + "bakosafe": "0.6.4", "body-parser": "1.20.4", "bull": "^4.16.5", "cheerio": "1.0.0-rc.12", diff --git a/packages/worker/package.json b/packages/worker/package.json index 8cd731ff5..f839daf09 100644 --- a/packages/worker/package.json +++ b/packages/worker/package.json @@ -16,7 +16,7 @@ "@envio-dev/hypersync-client": "0.6.2", "@types/bull": "^4.10.4", "@types/node-cron": "3.0.11", - "bakosafe": "0.6.3", + "bakosafe": "0.6.4", "bull": "^4.16.5", "express": "4.21.2", "fuels": "0.103.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3214c6983..6234b5e19 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -79,8 +79,8 @@ importers: specifier: 1.13.5 version: 1.13.5 bakosafe: - specifier: 0.6.3 - version: 0.6.3(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) + specifier: 0.6.4 + version: 0.6.4(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) body-parser: specifier: 1.20.4 version: 1.20.4 @@ -437,8 +437,8 @@ importers: specifier: 3.0.11 version: 3.0.11 bakosafe: - specifier: 0.6.3 - version: 0.6.3(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) + specifier: 0.6.4 + version: 0.6.4(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) bull: specifier: ^4.16.5 version: 4.16.5 @@ -2931,6 +2931,11 @@ packages: peerDependencies: fuels: ^0.102.0 + bakosafe@0.6.4: + resolution: {integrity: sha512-MCCLTqIXl4LeTjHAl2txc5X+2rikSaH1kot37/7hwz92ULvyExkJnfJh2h02GrWbi2YkHiVl+cli0SyN8qDhUA==} + peerDependencies: + fuels: ^0.102.0 + balanced-match@1.0.2: resolution: {integrity: sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==} @@ -9440,6 +9445,24 @@ snapshots: - utf-8-validate - zod + bakosafe@0.6.4(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5): + dependencies: + '@ethereumjs/util': 9.0.3 + '@ethersproject/bytes': 5.7.0 + '@noble/curves': 1.9.7 + '@noble/secp256k1': 2.3.0 + axios: 1.13.5 + fuels: 0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)) + lodash.partition: 4.6.0 + uuid: 9.0.1 + viem: 2.45.1(typescript@5.4.5) + transitivePeerDependencies: + - bufferutil + - debug + - typescript + - utf-8-validate + - zod + balanced-match@1.0.2: {} balanced-match@4.0.4: {} From 3a956ef4a7b5c5e9837130377c8057eaf651116f Mon Sep 17 00:00:00 2001 From: guimroque Date: Thu, 9 Apr 2026 13:36:45 -0300 Subject: [PATCH 13/16] chore: bump bakosafe to 0.6.4 --- packages/socket-server/package.json | 2 +- pnpm-lock.yaml | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/socket-server/package.json b/packages/socket-server/package.json index 4f0b48313..3ddd39803 100644 --- a/packages/socket-server/package.json +++ b/packages/socket-server/package.json @@ -14,7 +14,7 @@ "dependencies": { "@socket.io/redis-adapter": "^8.3.0", "axios": "1.13.5", - "bakosafe": "0.6.3", + "bakosafe": "0.6.4", "date-fns": "2.30.0", "express": "4.21.2", "express-joi-validation": "5.0.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6234b5e19..57f5734a7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -304,8 +304,8 @@ importers: specifier: 1.13.5 version: 1.13.5 bakosafe: - specifier: 0.6.3 - version: 0.6.3(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) + specifier: 0.6.4 + version: 0.6.4(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) date-fns: specifier: 2.30.0 version: 2.30.0 @@ -2926,8 +2926,8 @@ packages: peerDependencies: '@babel/core': ^7.0.0 - bakosafe@0.6.3: - resolution: {integrity: sha512-nEfaHc7S5LGGe2la4gOwq/VV5eSFnxYk4N093WfVPT6D1dxA/5W/E963N/ag9Ma5Q3O7XKi1BSFDezqp0b94cg==} + bakosafe@0.6.4: + resolution: {integrity: sha512-MCCLTqIXl4LeTjHAl2txc5X+2rikSaH1kot37/7hwz92ULvyExkJnfJh2h02GrWbi2YkHiVl+cli0SyN8qDhUA==} peerDependencies: fuels: ^0.102.0 @@ -9427,7 +9427,7 @@ snapshots: babel-plugin-jest-hoist: 29.6.3 babel-preset-current-node-syntax: 1.2.0(@babel/core@7.29.0) - bakosafe@0.6.3(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5): + bakosafe@0.6.4(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5): dependencies: '@ethereumjs/util': 9.0.3 '@ethersproject/bytes': 5.7.0 From 762177c6009286e8af056f8a5f57ca3c1255a4c1 Mon Sep 17 00:00:00 2001 From: guimroque Date: Thu, 9 Apr 2026 16:04:36 -0300 Subject: [PATCH 14/16] fix: await provider chain info before submitting transaction --- packages/worker/src/queues/submitTransaction/queue.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/worker/src/queues/submitTransaction/queue.ts b/packages/worker/src/queues/submitTransaction/queue.ts index 0e0d12474..d97c16b96 100644 --- a/packages/worker/src/queues/submitTransaction/queue.ts +++ b/packages/worker/src/queues/submitTransaction/queue.ts @@ -214,6 +214,9 @@ async function processSubmitTransaction(job: Queue.Job) // - Regular Provider → provider.operations.submit() → direct to blockchain const providerUrl = networkUrl.replace(/^https?:\/\/[^@]+@/, "https://"); const provider = new Provider(providerUrl); + // Ensure provider has fetched chain info and consensus parameters + // before using it. Without this, estimatePredicates may fail with OutOfGas. + await provider.getChain(); const vault = new Vault( provider, JSON.parse(predicateConfigurable), From be8ad03b5379e2cb1ffb917d23f1a1eda6ce7e3e Mon Sep 17 00:00:00 2001 From: guimroque Date: Thu, 9 Apr 2026 17:24:14 -0300 Subject: [PATCH 15/16] chore: bump bakosafe to 0.6.5 --- packages/api/package.json | 2 +- packages/socket-server/package.json | 2 +- packages/worker/package.json | 2 +- pnpm-lock.yaml | 41 +++++++---------------------- 4 files changed, 12 insertions(+), 35 deletions(-) diff --git a/packages/api/package.json b/packages/api/package.json index f0463c767..4b8274f1e 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -43,7 +43,7 @@ "@sentry/profiling-node": "8.32.0", "@testcontainers/postgresql": "11.0.0", "axios": "1.13.5", - "bakosafe": "0.6.4", + "bakosafe": "0.6.5", "body-parser": "1.20.4", "bull": "^4.16.5", "cheerio": "1.0.0-rc.12", diff --git a/packages/socket-server/package.json b/packages/socket-server/package.json index 3ddd39803..df4db8c12 100644 --- a/packages/socket-server/package.json +++ b/packages/socket-server/package.json @@ -14,7 +14,7 @@ "dependencies": { "@socket.io/redis-adapter": "^8.3.0", "axios": "1.13.5", - "bakosafe": "0.6.4", + "bakosafe": "0.6.5", "date-fns": "2.30.0", "express": "4.21.2", "express-joi-validation": "5.0.0", diff --git a/packages/worker/package.json b/packages/worker/package.json index f839daf09..93ebca0fc 100644 --- a/packages/worker/package.json +++ b/packages/worker/package.json @@ -16,7 +16,7 @@ "@envio-dev/hypersync-client": "0.6.2", "@types/bull": "^4.10.4", "@types/node-cron": "3.0.11", - "bakosafe": "0.6.4", + "bakosafe": "0.6.5", "bull": "^4.16.5", "express": "4.21.2", "fuels": "0.103.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 57f5734a7..00ee2d609 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -79,8 +79,8 @@ importers: specifier: 1.13.5 version: 1.13.5 bakosafe: - specifier: 0.6.4 - version: 0.6.4(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) + specifier: 0.6.5 + version: 0.6.5(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) body-parser: specifier: 1.20.4 version: 1.20.4 @@ -304,8 +304,8 @@ importers: specifier: 1.13.5 version: 1.13.5 bakosafe: - specifier: 0.6.4 - version: 0.6.4(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) + specifier: 0.6.5 + version: 0.6.5(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) date-fns: specifier: 2.30.0 version: 2.30.0 @@ -437,8 +437,8 @@ importers: specifier: 3.0.11 version: 3.0.11 bakosafe: - specifier: 0.6.4 - version: 0.6.4(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) + specifier: 0.6.5 + version: 0.6.5(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5) bull: specifier: ^4.16.5 version: 4.16.5 @@ -2926,13 +2926,8 @@ packages: peerDependencies: '@babel/core': ^7.0.0 - bakosafe@0.6.4: - resolution: {integrity: sha512-MCCLTqIXl4LeTjHAl2txc5X+2rikSaH1kot37/7hwz92ULvyExkJnfJh2h02GrWbi2YkHiVl+cli0SyN8qDhUA==} - peerDependencies: - fuels: ^0.102.0 - - bakosafe@0.6.4: - resolution: {integrity: sha512-MCCLTqIXl4LeTjHAl2txc5X+2rikSaH1kot37/7hwz92ULvyExkJnfJh2h02GrWbi2YkHiVl+cli0SyN8qDhUA==} + bakosafe@0.6.5: + resolution: {integrity: sha512-M/zF2i2MWxnuuZkUy7nzhDY2V3YfFCGdjlbKVnIP12C0Rm6P4rc+SbS68SMVbS5gilpQacrYn/YMtx/0wRedlA==} peerDependencies: fuels: ^0.102.0 @@ -9427,25 +9422,7 @@ snapshots: babel-plugin-jest-hoist: 29.6.3 babel-preset-current-node-syntax: 1.2.0(@babel/core@7.29.0) - bakosafe@0.6.4(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5): - dependencies: - '@ethereumjs/util': 9.0.3 - '@ethersproject/bytes': 5.7.0 - '@noble/curves': 1.9.7 - '@noble/secp256k1': 2.3.0 - axios: 1.13.5 - fuels: 0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)) - lodash.partition: 4.6.0 - uuid: 9.0.1 - viem: 2.45.1(typescript@5.4.5) - transitivePeerDependencies: - - bufferutil - - debug - - typescript - - utf-8-validate - - zod - - bakosafe@0.6.4(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5): + bakosafe@0.6.5(fuels@0.103.0(vitest@3.0.9(@types/node@20.6.0)(tsx@4.21.0)(yaml@2.8.2)))(typescript@5.4.5): dependencies: '@ethereumjs/util': 9.0.3 '@ethersproject/bytes': 5.7.0 From c6f97be13be7a3e5cad4160e18ce7ba49f0ed0bb Mon Sep 17 00:00:00 2001 From: guimroque Date: Thu, 9 Apr 2026 18:29:56 -0300 Subject: [PATCH 16/16] fix(worker): use bookworm-slim instead of alpine in production stage hypersync-client has no prebuilt binary for linux-arm64-musl, causing pnpm install to hang under QEMU emulation when Docker layer cache misses --- packages/worker/Dockerfile | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/worker/Dockerfile b/packages/worker/Dockerfile index 988cf59c1..8cf4f9be7 100644 --- a/packages/worker/Dockerfile +++ b/packages/worker/Dockerfile @@ -21,8 +21,9 @@ COPY . . # Build the application RUN pnpm build -# Production stage - smaller final image -FROM node:22-alpine AS production +# Production stage - use bookworm-slim (glibc) instead of alpine (musl) +# because @envio-dev/hypersync-client has no prebuilt binary for linux-arm64-musl +FROM node:22-bookworm-slim AS production # Install pnpm globally RUN npm install -g pnpm