From 1ceb30f3d4ac40639cf77fb51ca516829387a24d Mon Sep 17 00:00:00 2001 From: Bojan Date: Wed, 18 Mar 2026 11:58:56 +0100 Subject: [PATCH 01/20] Fix publish replication reliability - Add semaphore (3 concurrent) to limit parallel replication messages - Batch replication to groups of minAcks+2 with early exit when minimum reached - Wrap individual node messages in try/catch so one failing peer doesn't kill the whole operation - Add single retry on NACK before giving up on a peer - Increase publish message timeout from 15s to 60s for large knowledge assets Made-with: Cursor --- .../sender/publish-replication-command.js | 154 +++++++++++++----- src/constants/constants.js | 2 +- 2 files changed, 114 insertions(+), 42 deletions(-) diff --git a/src/commands/protocols/publish/sender/publish-replication-command.js b/src/commands/protocols/publish/sender/publish-replication-command.js index 1568b1ddf..8381e424d 100644 --- a/src/commands/protocols/publish/sender/publish-replication-command.js +++ b/src/commands/protocols/publish/sender/publish-replication-command.js @@ -1,3 +1,4 @@ +import { Semaphore } from 'async-mutex'; import { OPERATION_ID_STATUS, ERROR_TYPE, @@ -10,6 +11,8 @@ import { } from '../../../../constants/constants.js'; import Command from '../../../command.js'; +const replicationSemaphore = new Semaphore(3); + class PublishReplicationCommand extends Command { constructor(ctx) { super(ctx); @@ -21,6 +24,7 @@ class PublishReplicationCommand extends Command { this.signatureService = ctx.signatureService; this.cryptoService = ctx.cryptoService; this.messagingService = ctx.messagingService; + this.pendingStorageService = ctx.pendingStorageService; this.errorType = ERROR_TYPE.LOCAL_STORE.LOCAL_STORE_ERROR; } @@ -133,18 +137,61 @@ class PublishReplicationCommand extends Command { return Command.empty(); } const { dataset } = await this.operationIdService.getCachedOperationIdData(operationId); + + if (nodePartOfShard) { + await this.pendingStorageService.cacheDataset( + operationId, + datasetRoot, + dataset, + currentPeerId, + ); + } + const message = { dataset: dataset.public, datasetRoot, blockchain, }; - // Run all message sending operations in parallel - await Promise.all( - shardNodes.map((node) => - this.sendAndHandleMessage(node, operationId, message, command, blockchain), - ), - ); + const replicationBatchSize = minAckResponses + 2; + + await replicationSemaphore.runExclusive(async () => { + this.logger.info( + `[REPLICATION] Starting for operationId: ${operationId}, ` + + `shard: ${shardNodes.length} nodes, batch: ${replicationBatchSize}, min ACKs: ${minAckResponses}`, + ); + + for (let i = 0; i < shardNodes.length; i += replicationBatchSize) { + if (i > 0) { + // eslint-disable-next-line no-await-in-loop + const record = await this.operationIdService.getOperationIdRecord( + operationId, + ); + if (record?.minAcksReached) { + this.logger.info( + `[REPLICATION] Minimum replication reached after ${i} nodes, ` + + `skipping remaining ${ + shardNodes.length - i + } for operationId: ${operationId}`, + ); + break; + } + } + + const batch = shardNodes.slice(i, i + replicationBatchSize); + this.logger.debug( + `Sending replication batch ${Math.floor(i / replicationBatchSize) + 1} ` + + `(${batch.length} nodes) for operationId: ${operationId}`, + ); + + // eslint-disable-next-line no-await-in-loop + await Promise.all( + batch.map((node) => + this.sendAndHandleMessage(node, operationId, message, command), + ), + ); + } + }); } catch (e) { await this.handleError(operationId, blockchain, e.message, this.errorType, true); this.operationIdService.emitChangeEvent( @@ -158,44 +205,69 @@ class PublishReplicationCommand extends Command { return Command.empty(); } - async sendAndHandleMessage(node, operationId, message, command, blockchain) { - const response = await this.messagingService.sendProtocolMessage( - node, - operationId, - message, - NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST, - NETWORK_MESSAGE_TIMEOUT_MILLS.PUBLISH.REQUEST, - ); - const responseData = response.data; - if (response.header.messageType === NETWORK_MESSAGE_TYPES.RESPONSES.ACK) { - // eslint-disable-next-line no-await-in-loop - await this.signatureService.addSignatureToStorage( - NETWORK_SIGNATURES_FOLDER, + async sendAndHandleMessage(node, operationId, message, command) { + try { + let response = await this.messagingService.sendProtocolMessage( + node, operationId, - responseData.identityId, - responseData.v, - responseData.r, - responseData.s, - responseData.vs, + message, + NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST, + NETWORK_MESSAGE_TIMEOUT_MILLS.PUBLISH.REQUEST, ); - // eslint-disable-next-line no-await-in-loop - await this.operationService.processResponse( - command, - OPERATION_REQUEST_STATUS.COMPLETED, - responseData, - ); - } else { - // eslint-disable-next-line no-await-in-loop - await this.operationService.processResponse( - command, - OPERATION_REQUEST_STATUS.FAILED, - responseData, - ); - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.FAILED, - operationId, - blockchain, + + if (response.header.messageType !== NETWORK_MESSAGE_TYPES.RESPONSES.ACK) { + const preRetryRecord = await this.operationIdService.getOperationIdRecord( + operationId, + ); + if (preRetryRecord?.minAcksReached) return; + + this.logger.info( + `[REPLICATION] Peer ${node.id} NACK for operationId: ${operationId}: ` + + `${response.data?.errorMessage || 'unknown reason'}, retrying...`, + ); + response = await this.messagingService.sendProtocolMessage( + node, + operationId, + message, + NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST, + NETWORK_MESSAGE_TIMEOUT_MILLS.PUBLISH.REQUEST, + ); + } + + const responseData = response.data; + if (response.header.messageType === NETWORK_MESSAGE_TYPES.RESPONSES.ACK) { + await this.signatureService.addSignatureToStorage( + NETWORK_SIGNATURES_FOLDER, + operationId, + responseData.identityId, + responseData.v, + responseData.r, + responseData.s, + responseData.vs, + ); + await this.operationService.processResponse( + command, + OPERATION_REQUEST_STATUS.COMPLETED, + responseData, + ); + } else { + this.logger.warn( + `[REPLICATION] Peer ${node.id} failed after retry for operationId: ${operationId}: ` + + `${responseData?.errorMessage || 'unknown reason'}`, + ); + await this.operationService.processResponse( + command, + OPERATION_REQUEST_STATUS.FAILED, + responseData, + ); + } + } catch (error) { + this.logger.warn( + `[REPLICATION] Peer ${node.id} error for operationId: ${operationId}: ${error.message}`, ); + await this.operationService.processResponse(command, OPERATION_REQUEST_STATUS.FAILED, { + errorMessage: error.message, + }); } } diff --git a/src/constants/constants.js b/src/constants/constants.js index 560bd0873..2a6269d2c 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -371,7 +371,7 @@ export const PARANET_NODES_ACCESS_POLICIES = ['OPEN', 'PERMISSIONED']; export const NETWORK_MESSAGE_TIMEOUT_MILLS = { PUBLISH: { - REQUEST: 15 * 1000, + REQUEST: 60 * 1000, }, UPDATE: { REQUEST: 60 * 1000, From aedef5cc2e440c91e9a63b2ce8068af9cd1277e6 Mon Sep 17 00:00:00 2001 From: Bojan Date: Wed, 18 Mar 2026 11:59:45 +0100 Subject: [PATCH 02/20] Fix blockchain transaction error handling - Add retry logic for transient errors (socket hang up, connection reset, 502/503, fee too low) - Only bump gas price on nonce/fee errors, not on network errors - Add gas estimation retries for transient RPC failures - Add 60s timeout on RPC provider connection to prevent node hanging on unresponsive RPCs - Fix BigNumber NaN bug in gas price calculation during retry - Fix Gnosis EIP-1559 gas params and double gwei-parsing in gas price comparison - Add RPC failover for blockchain event fetching (try all providers before failing) - Log warning when blockchain events are missed due to large block gaps Made-with: Cursor --- .../blockchain-event-listener-command.js | 8 +- src/constants/constants.js | 9 ++ .../implementation/ot-ethers/ot-ethers.js | 35 ++++- .../implementation/gnosis/gnosis-service.js | 24 +++- .../blockchain/implementation/web3-service.js | 122 +++++++++++++++--- 5 files changed, 170 insertions(+), 28 deletions(-) diff --git a/src/commands/blockchain-event-listener/blockchain-event-listener-command.js b/src/commands/blockchain-event-listener/blockchain-event-listener-command.js index 9ceabcac8..aaaa5e21c 100644 --- a/src/commands/blockchain-event-listener/blockchain-event-listener-command.js +++ b/src/commands/blockchain-event-listener/blockchain-event-listener-command.js @@ -65,7 +65,13 @@ class BlockchainEventListenerCommand extends Command { ); if (eventsMissed) { - // TODO: Add some logic for missed events in the future + const missedFrom = (lastCheckedBlockRecord?.lastCheckedBlock ?? 0) + 1; + this.logger.warn( + `[EVENT LISTENER] Blockchain events missed on ${blockchainId}! ` + + `Gap too large: blocks ${missedFrom}–${currentBlock} ` + + `(${currentBlock - missedFrom + 1} blocks). ` + + `Publish finality for assets created during this window will not complete.`, + ); } if (newEvents.length !== 0) { diff --git a/src/constants/constants.js b/src/constants/constants.js index 560bd0873..ac747f638 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -715,6 +715,15 @@ export const EXPECTED_TRANSACTION_ERRORS = { NONCE_TOO_LOW: 'nonce too low', REPLACEMENT_UNDERPRICED: 'replacement transaction underpriced', ALREADY_KNOWN: 'already known', + EXECUTION_FAILED: 'transaction execution fails', + FEE_TOO_LOW: 'feetoolow', + SOCKET_HANG_UP: 'socket hang up', + ECONNRESET: 'econnreset', + ECONNREFUSED: 'econnrefused', + SERVER_ERROR: 'server error', + BAD_GATEWAY: '502', + SERVICE_UNAVAILABLE: '503', + EXPECT_BLOCK_NUMBER: 'expect block number from id', }; /** diff --git a/src/modules/blockchain-events/implementation/ot-ethers/ot-ethers.js b/src/modules/blockchain-events/implementation/ot-ethers/ot-ethers.js index 61bb1c5bb..b1d549961 100644 --- a/src/modules/blockchain-events/implementation/ot-ethers/ot-ethers.js +++ b/src/modules/blockchain-events/implementation/ot-ethers/ot-ethers.js @@ -56,6 +56,38 @@ class OtEthers extends BlockchainEventsService { return blockchainProviders[randomIndex]; } + _getShuffledProviders(blockchain) { + const blockchainProviders = this.providers[blockchain]; + if (!blockchainProviders || blockchainProviders.length === 0) { + throw new Error(`No providers available for blockchain: ${blockchain}`); + } + const shuffled = [...blockchainProviders]; + for (let i = shuffled.length - 1; i > 0; i -= 1) { + const j = Math.floor(Math.random() * (i + 1)); + [shuffled[i], shuffled[j]] = [shuffled[j], shuffled[i]]; + } + return shuffled; + } + + async _sendWithFailover(blockchain, method, params) { + const providers = this._getShuffledProviders(blockchain); + let lastError; + for (const provider of providers) { + try { + return await provider.send(method, params); + } catch (error) { + lastError = error; + this.logger.warn( + `RPC provider failed for ${method} on ${blockchain}: ${error.message}. ` + + `Trying next provider (${providers.indexOf(provider) + 1}/${ + providers.length + })...`, + ); + } + } + throw lastError; + } + async _initializeContracts() { this.contracts = {}; @@ -156,8 +188,7 @@ class OtEthers extends BlockchainEventsService { const toBlockParam = ethers.BigNumber.from(toBlock) .toHexString() .replace(/^0x0+/, '0x'); - const provider = this._getRandomProvider(blockchain); - const newLogs = await provider.send('eth_getLogs', [ + const newLogs = await this._sendWithFailover(blockchain, 'eth_getLogs', [ { address: contractAddresses, fromBlock: fromBlockParam, diff --git a/src/modules/blockchain/implementation/gnosis/gnosis-service.js b/src/modules/blockchain/implementation/gnosis/gnosis-service.js index a65674636..cc763569b 100644 --- a/src/modules/blockchain/implementation/gnosis/gnosis-service.js +++ b/src/modules/blockchain/implementation/gnosis/gnosis-service.js @@ -44,16 +44,32 @@ class GnosisService extends Web3Service { ); return this.defaultGasPrice; } - if ( - gasPrice && - ethers.utils.parseUnits(gasPrice.toString(), 'gwei').gt(this.defaultGasPrice) - ) { + if (gasPrice && gasPrice.gt && gasPrice.gt(this.defaultGasPrice)) { return gasPrice; } return this.defaultGasPrice; } + buildTransactionGasParams(gasPrice) { + const minPriorityFee = ethers.BigNumber.from(2_000_000_000); + + let maxPriorityFeePerGas = minPriorityFee; + if (ethers.BigNumber.isBigNumber(gasPrice)) { + const derived = gasPrice.div(5); + if (derived.gt(minPriorityFee)) { + maxPriorityFeePerGas = derived; + } + } + + const maxFeePerGas = + ethers.BigNumber.isBigNumber(gasPrice) && gasPrice.gt(maxPriorityFeePerGas) + ? gasPrice + : maxPriorityFeePerGas.mul(2); + + return { maxFeePerGas, maxPriorityFeePerGas }; + } + async healthCheck() { try { const blockNumber = await this.getBlockNumber(); diff --git a/src/modules/blockchain/implementation/web3-service.js b/src/modules/blockchain/implementation/web3-service.js index 1f3157f51..fc53d0d95 100644 --- a/src/modules/blockchain/implementation/web3-service.js +++ b/src/modules/blockchain/implementation/web3-service.js @@ -522,6 +522,10 @@ class Web3Service { } } + buildTransactionGasParams(gasPrice) { + return { gasPrice }; + } + async callContractFunction(contractInstance, functionName, args, contractName = null) { const maxNumberOfRetries = 3; const retryDelayInSec = 12; @@ -567,11 +571,35 @@ class Web3Service { let retryCount = 0; const maxRetries = 3; - try { - /* eslint-disable no-await-in-loop */ - gasLimit = await contractInstance.estimateGas[functionName](...args); - } catch (error) { - this._decodeEstimateGasError(contractInstance, functionName, error, args); + for (let estimateAttempt = 0; estimateAttempt < 3; estimateAttempt += 1) { + try { + /* eslint-disable no-await-in-loop */ + gasLimit = await contractInstance.estimateGas[functionName](...args); + break; + } catch (error) { + const errMsg = error.message?.toLowerCase() ?? ''; + const isTransient = + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.EXECUTION_FAILED.toLowerCase()) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.FEE_TOO_LOW.toLowerCase()) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.SOCKET_HANG_UP) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.ECONNRESET) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.ECONNREFUSED) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.SERVER_ERROR) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.BAD_GATEWAY) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.SERVICE_UNAVAILABLE) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.EXPECT_BLOCK_NUMBER); + if (isTransient && estimateAttempt < 2) { + this.logger.warn( + `Gas estimation for ${functionName} failed with transient error on ${this.getBlockchainId()}, ` + + `retrying (${estimateAttempt + 1}/3): ${error.message}`, + ); + await new Promise((r) => { + setTimeout(r, 2000); + }); + continue; + } + this._decodeEstimateGasError(contractInstance, functionName, error, args); + } } gasLimit = gasLimit ?? ethers.utils.parseUnits('900', 'kwei'); @@ -590,12 +618,12 @@ class Web3Service { }${retryCount > 0 ? ` (retry ${retryCount})` : ''}`, ); + const txOverrides = this.buildTransactionGasParams(gasPrice); + txOverrides.gasLimit = gasLimit; + const tx = await contractInstance .connect(operationalWallet) - [functionName](...args, { - gasPrice, - gasLimit, - }); + [functionName](...args, txOverrides); try { result = await this.provider.waitForTransaction( @@ -608,38 +636,90 @@ class Web3Service { await this.provider.call(tx, tx.blockNumber); } } catch (error) { + if ( + error.message + .toLowerCase() + .includes(EXPECTED_TRANSACTION_ERRORS.TIMEOUT_EXCEEDED.toLowerCase()) + ) { + const existingReceipt = await this.provider.getTransactionReceipt(tx.hash); + if (existingReceipt) { + this.logger.info( + `Transaction ${functionName} (${tx.hash}) confirmed despite timeout. Block: ${existingReceipt.blockNumber}`, + ); + if (existingReceipt.status === 0) { + await this.provider.call(tx, existingReceipt.blockNumber); + } + return existingReceipt; + } + throw error; + } this._decodeWaitForTxError(contractInstance, functionName, error, args); } return result; } catch (error) { const errorMessage = error.message.toLowerCase(); - // Check for nonce-related errors - if ( + const isNonceError = errorMessage.includes( EXPECTED_TRANSACTION_ERRORS.NONCE_TOO_LOW.toLowerCase(), ) || errorMessage.includes( EXPECTED_TRANSACTION_ERRORS.REPLACEMENT_UNDERPRICED.toLowerCase(), ) || - errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.ALREADY_KNOWN.toLowerCase()) - ) { + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.ALREADY_KNOWN.toLowerCase()); + + const isTimeoutError = errorMessage.includes( + EXPECTED_TRANSACTION_ERRORS.TIMEOUT_EXCEEDED.toLowerCase(), + ); + + const isExecutionError = + errorMessage.includes( + EXPECTED_TRANSACTION_ERRORS.EXECUTION_FAILED.toLowerCase(), + ) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.FEE_TOO_LOW.toLowerCase()); + + const isNetworkError = + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.SOCKET_HANG_UP) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.ECONNRESET) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.ECONNREFUSED) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.SERVER_ERROR) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.BAD_GATEWAY) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.SERVICE_UNAVAILABLE) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.EXPECT_BLOCK_NUMBER); + + if (isNonceError || isTimeoutError || isExecutionError || isNetworkError) { retryCount += 1; if (retryCount < maxRetries) { - // Increase gas price by 20% for nonce errors - gasPrice = Math.ceil(gasPrice * 1.2); + const shouldBumpGas = isNonceError || isExecutionError; + if (shouldBumpGas) { + gasPrice = ethers.BigNumber.isBigNumber(gasPrice) + ? gasPrice.mul(120).div(100) + : Math.ceil(gasPrice * 1.2); + } + let errorType = 'Nonce'; + if (isTimeoutError) errorType = 'Timeout'; + else if (isExecutionError) errorType = 'Execution/fee'; + else if (isNetworkError) errorType = 'Network'; this.logger.warn( - `Nonce error detected for ${functionName}. Retrying with increased gas price: ${gasPrice} (retry ${retryCount}/${maxRetries})`, + `${errorType} error detected for ${functionName} on ${this.getBlockchainId()}. ` + + `Retrying ${ + shouldBumpGas + ? `with increased gas price: ${gasPrice}` + : 'with same gas price' + } (retry ${retryCount}/${maxRetries})`, ); continue; - } else { - this.logger.error( - `Max retries (${maxRetries}) reached for nonce error in ${functionName}. Final gas price: ${gasPrice}`, - ); } + let errorType = 'nonce'; + if (isTimeoutError) errorType = 'timeout'; + else if (isExecutionError) errorType = 'execution/fee'; + else if (isNetworkError) errorType = 'network'; + this.logger.error( + `Max retries (${maxRetries}) reached for ${errorType} error in ${functionName} on ${this.getBlockchainId()}. ` + + `Final gas price: ${gasPrice}`, + ); } - // If it's not a nonce error or we've exhausted retries, re-throw the error throw error; } } From 0e40733bf6edde5047fe5893f92c7edc8d31fb53 Mon Sep 17 00:00:00 2001 From: Bojan Date: Wed, 18 Mar 2026 12:00:29 +0100 Subject: [PATCH 03/20] Fix local get after publish and cache reliability - Add pending storage cache fallback in get-command so gets work right after publish - Add merkle root index in pending-storage-service for fast operationId lookups - Clean up merkle root index entries on cache removal to prevent memory leak - Add retry logic in publish-finalization-command for reading cached assertion data - Reduce cache retry window from 100s to 25s (5 retries x 5s) for faster failure detection - Add getPublishOperationIdByUal repository method for cache lookups Made-with: Cursor --- .../protocols/get/sender/get-command.js | 44 ++++++++++++++ .../publish/publish-finalization-command.js | 59 +++++++++++++++---- src/constants/constants.js | 4 +- .../finality-status-repository.js | 9 +++ .../repository/repository-module-manager.js | 4 ++ src/service/pending-storage-service.js | 19 ++++++ 6 files changed, 124 insertions(+), 15 deletions(-) diff --git a/src/commands/protocols/get/sender/get-command.js b/src/commands/protocols/get/sender/get-command.js index 9fd6a0c28..49b5dfb1e 100644 --- a/src/commands/protocols/get/sender/get-command.js +++ b/src/commands/protocols/get/sender/get-command.js @@ -30,6 +30,7 @@ class GetCommand extends Command { this.cryptoService = ctx.cryptoService; this.messagingService = ctx.messagingService; this.tripleStoreModuleManager = ctx.tripleStoreModuleManager; + this.pendingStorageService = ctx.pendingStorageService; } async handleError(operationId, blockchain, errorMessage, errorType) { @@ -262,6 +263,49 @@ class GetCommand extends Command { } this.logger.debug(`Could not find asset with UAL: ${ual} locally`); + try { + const latestMerkleRoot = + await this.blockchainModuleManager.getKnowledgeCollectionLatestMerkleRoot( + blockchain, + contract, + knowledgeCollectionId, + ); + if (latestMerkleRoot) { + const publishOpId = + this.pendingStorageService.getOperationIdByMerkleRoot(latestMerkleRoot); + if (publishOpId) { + const cachedAssertion = await this.pendingStorageService.getCachedDataset( + publishOpId, + ); + if ( + cachedAssertion && + (cachedAssertion.public?.length || cachedAssertion.private?.length) + ) { + const cachedResponseData = { assertion: cachedAssertion }; + + this.logger.info( + `Serving asset ${ual} from pending storage cache (merkleRoot: ${latestMerkleRoot})`, + ); + await this.operationService.markOperationAsCompleted( + operationId, + blockchain, + cachedResponseData, + [ + OPERATION_ID_STATUS.GET.GET_LOCAL_END, + OPERATION_ID_STATUS.GET.GET_END, + OPERATION_ID_STATUS.COMPLETED, + ], + ); + return Command.empty(); + } + } + } + } catch (cacheErr) { + this.logger.debug( + `Pending storage cache fallback failed for ${ual}: ${cacheErr.message}`, + ); + } + await this.operationIdService.emitChangeEvent( OPERATION_ID_STATUS.GET.GET_LOCAL_END, operationId, diff --git a/src/commands/protocols/publish/publish-finalization-command.js b/src/commands/protocols/publish/publish-finalization-command.js index 78db13bd0..22eba1b99 100644 --- a/src/commands/protocols/publish/publish-finalization-command.js +++ b/src/commands/protocols/publish/publish-finalization-command.js @@ -137,14 +137,42 @@ class PublishFinalizationCommand extends Command { const node = { id: publisherPeerId, protocol: networkProtocols[0] }; const message = { ual, publishOperationId, blockchain, operationId }; - // TODO: Add retry logic maybe - const response = await this.messagingService.sendProtocolMessage( - node, - operationId, - message, - NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST, - NETWORK_MESSAGE_TIMEOUT_MILLS.FINALITY.REQUEST, - ); + + const maxFinalityAttempts = 3; + const backoffDelays = [0, 5_000, 10_000]; + let response; + let lastError; + + for (let attempt = 0; attempt < maxFinalityAttempts; attempt += 1) { + if (backoffDelays[attempt] > 0) { + // eslint-disable-next-line no-await-in-loop + await new Promise((r) => { + setTimeout(r, backoffDelays[attempt]); + }); + } + try { + // eslint-disable-next-line no-await-in-loop + response = await this.messagingService.sendProtocolMessage( + node, + operationId, + message, + NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST, + NETWORK_MESSAGE_TIMEOUT_MILLS.FINALITY.REQUEST, + ); + lastError = null; + break; + } catch (err) { + lastError = err; + this.logger.warn( + `Finality request to publisher ${publisherPeerId} failed ` + + `(attempt ${attempt + 1}/${maxFinalityAttempts}): ${err.message}`, + ); + } + } + + if (lastError) { + throw lastError; + } await this.messagingService.handleProtocolResponse( response, @@ -196,16 +224,21 @@ class PublishFinalizationCommand extends Command { return cachedData; } catch (error) { attempt += 1; - // eslint-disable-next-line no-await-in-loop - await new Promise((resolve) => { - setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA); - }); + if (attempt < MAX_RETRIES_READ_CACHED_PUBLISH_DATA) { + this.logger.debug( + `[Cache] Read attempt ${attempt}/${MAX_RETRIES_READ_CACHED_PUBLISH_DATA} ` + + `failed for publishOperationId: ${publishOperationId}, retrying in ${RETRY_DELAY_READ_CACHED_PUBLISH_DATA}ms...`, + ); + // eslint-disable-next-line no-await-in-loop + await new Promise((resolve) => { + setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA); + }); + } } } this.logger.warn( `[Cache] Exhausted retries reading cached publish data (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`, ); - // TODO: Mark this operation as failed throw new Error('Failed to read cached publish data'); } diff --git a/src/constants/constants.js b/src/constants/constants.js index 560bd0873..31ab88105 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -1067,8 +1067,8 @@ export const LOCAL_INSERT_FOR_ASSET_SYNC_RETRY_DELAY = 1000; export const LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS = 5; export const LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY = 1000; -export const MAX_RETRIES_READ_CACHED_PUBLISH_DATA = 10; -export const RETRY_DELAY_READ_CACHED_PUBLISH_DATA = 10 * 1000; +export const MAX_RETRIES_READ_CACHED_PUBLISH_DATA = 5; +export const RETRY_DELAY_READ_CACHED_PUBLISH_DATA = 5 * 1000; export const TRIPLE_STORE_REPOSITORY = { DKG: 'dkg', diff --git a/src/modules/repository/implementation/sequelize/repositories/finality-status-repository.js b/src/modules/repository/implementation/sequelize/repositories/finality-status-repository.js index 53e10639d..b8f68c975 100644 --- a/src/modules/repository/implementation/sequelize/repositories/finality-status-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/finality-status-repository.js @@ -14,6 +14,15 @@ class FinalityStatusRepository { async saveFinalityAck(operationId, ual, peerId, options) { return this.model.upsert({ operationId, ual, peerId }, options); } + + async getPublishOperationIdByUal(ual, options) { + const record = await this.model.findOne({ + where: { ual }, + attributes: ['operationId'], + ...options, + }); + return record?.operationId ?? null; + } } export default FinalityStatusRepository; diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index 36f6e6626..69fbfb34f 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -467,6 +467,10 @@ class RepositoryModuleManager extends BaseModuleManager { return this.getRepository('finality_status').getFinalityAcksCount(ual, options); } + async getPublishOperationIdByUal(ual, options = {}) { + return this.getRepository('finality_status').getPublishOperationIdByUal(ual, options); + } + async getLatestRandomSamplingChallengeRecordForBlockchainId(blockchainId, limit = 1) { return this.getRepository( 'random_sampling_challenge', diff --git a/src/service/pending-storage-service.js b/src/service/pending-storage-service.js index f049ad927..74e2621af 100644 --- a/src/service/pending-storage-service.js +++ b/src/service/pending-storage-service.js @@ -10,6 +10,7 @@ class PendingStorageService { this.fileService = ctx.fileService; this.repositoryModuleManager = ctx.repositoryModuleManager; // this is not used this.tripleStoreService = ctx.tripleStoreService; // this is not used + this._merkleRootIndex = new Map(); } async cacheDataset(operationId, datasetRoot, dataset, remotePeerId) { @@ -17,6 +18,8 @@ class PendingStorageService { `Caching ${datasetRoot} dataset root, operation id: ${operationId} in file in pending storage`, ); + this._merkleRootIndex.set(datasetRoot, operationId); + await this.fileService.writeContentsToFile( this.fileService.getPendingStorageCachePath(), operationId, @@ -28,6 +31,10 @@ class PendingStorageService { ); } + getOperationIdByMerkleRoot(merkleRoot) { + return this._merkleRootIndex.get(merkleRoot) ?? null; + } + async getCachedDataset(operationId) { this.logger.debug(`Retrieving cached dataset for ${operationId} from pending storage`); @@ -94,6 +101,7 @@ class PendingStorageService { const createdDate = fileStats.mtime; if (createdDate.getTime() + expirationTimeMillis < now) { + this._removeMerkleRootIndexEntry(file); await this.fileService.removeFile(filePath); this.logger.debug(`Deleted expired file: ${filePath}`); return true; @@ -155,6 +163,8 @@ class PendingStorageService { `Removing cached assertion for ual: ${ual} operation id: ${operationId} from file in ${repository} pending storage`, ); + this._removeMerkleRootIndexEntry(operationId); + const pendingAssertionPath = await this.fileService.getPendingStorageDocumentPath( operationId, ); @@ -177,6 +187,15 @@ class PendingStorageService { } } + _removeMerkleRootIndexEntry(operationId) { + for (const [root, opId] of this._merkleRootIndex) { + if (opId === operationId) { + this._merkleRootIndex.delete(root); + break; + } + } + } + async getPendingState(operationId) { return this.fileService.getPendingStorageLatestDocument(operationId); } From 377fb6d0a687209ff2849380a4908f398c4523bf Mon Sep 17 00:00:00 2001 From: Bojan Date: Wed, 18 Mar 2026 12:01:05 +0100 Subject: [PATCH 04/20] Fix operation concurrency and response handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move mutex from individual services to base OperationService with per-operation granularity - Previously all operations shared one global mutex per service, which caused unnecessary blocking between unrelated operations. Now each operationId gets its own mutex. - Clean up mutex when operation completes or fails to prevent memory buildup - Fix updateMinAcksReached ordering in publish-service — was being called after markOperationAsCompleted which could cause a race where the result API returns before the flag is set - Fix result controller to return signatures when status is COMPLETED even if minAcksReached flag wasn't set yet (handles the race condition from the client side too) - Add try/catch around signature loading in result controller to not crash on missing files Made-with: Cursor --- .../v1/result-http-api-controller-v1.js | 43 ++++++++++++------- src/service/ask-service.js | 2 - src/service/batch-get-service.js | 2 - src/service/finality-service.js | 2 - src/service/get-service.js | 2 - src/service/operation-service.js | 18 +++++++- src/service/publish-service.js | 4 +- src/service/update-service.js | 2 - 8 files changed, 45 insertions(+), 30 deletions(-) diff --git a/src/controllers/http-api/v1/result-http-api-controller-v1.js b/src/controllers/http-api/v1/result-http-api-controller-v1.js index 5502f593d..299faef34 100644 --- a/src/controllers/http-api/v1/result-http-api-controller-v1.js +++ b/src/controllers/http-api/v1/result-http-api-controller-v1.js @@ -55,22 +55,33 @@ class ResultController extends BaseController { case 'update': { const minAcksReached = handlerRecord.minAcksReached || false; response.data = { ...response.data, minAcksReached }; - if (minAcksReached) { - const publisherNodeSignature = ( - await this.signatureService.getSignaturesFromStorage( - PUBLISHER_NODE_SIGNATURES_FOLDER, - operationId, - ) - )[0]; - const signatures = await this.signatureService.getSignaturesFromStorage( - NETWORK_SIGNATURES_FOLDER, - operationId, - ); - response.data = { - ...response.data, - publisherNodeSignature, - signatures, - }; + const shouldIncludeSignatures = + minAcksReached || + handlerRecord.status === OPERATION_ID_STATUS.COMPLETED; + if (shouldIncludeSignatures) { + try { + const publisherNodeSignature = ( + await this.signatureService.getSignaturesFromStorage( + PUBLISHER_NODE_SIGNATURES_FOLDER, + operationId, + ) + )[0]; + const signatures = + await this.signatureService.getSignaturesFromStorage( + NETWORK_SIGNATURES_FOLDER, + operationId, + ); + response.data = { + ...response.data, + minAcksReached: true, + publisherNodeSignature, + signatures, + }; + } catch (e) { + this.logger.warn( + `Failed to read signatures for operationId ${operationId}: ${e.message}`, + ); + } } break; } diff --git a/src/service/ask-service.js b/src/service/ask-service.js index 450ca7693..d68396bd0 100644 --- a/src/service/ask-service.js +++ b/src/service/ask-service.js @@ -1,4 +1,3 @@ -import { Mutex } from 'async-mutex'; import OperationService from './operation-service.js'; import { OPERATION_ID_STATUS, @@ -21,7 +20,6 @@ class AskService extends OperationService { OPERATION_ID_STATUS.ASK.ASK_END, OPERATION_ID_STATUS.COMPLETED, ]; - this.operationMutex = new Mutex(); } async processResponse(command, responseStatus, responseData) { diff --git a/src/service/batch-get-service.js b/src/service/batch-get-service.js index 3ecfedf53..4f025ddfb 100644 --- a/src/service/batch-get-service.js +++ b/src/service/batch-get-service.js @@ -1,4 +1,3 @@ -import { Mutex } from 'async-mutex'; import OperationService from './operation-service.js'; import { OPERATION_ID_STATUS, @@ -19,7 +18,6 @@ class BatchGetService extends OperationService { OPERATION_ID_STATUS.BATCH_GET.BATCH_GET_END, OPERATION_ID_STATUS.COMPLETED, ]; - this.operationMutex = new Mutex(); } } diff --git a/src/service/finality-service.js b/src/service/finality-service.js index 637d41319..7ad0b7072 100644 --- a/src/service/finality-service.js +++ b/src/service/finality-service.js @@ -1,4 +1,3 @@ -import { Mutex } from 'async-mutex'; import OperationService from './operation-service.js'; import { OPERATION_ID_STATUS, @@ -26,7 +25,6 @@ class FinalityService extends OperationService { this.repositoryModuleManager = ctx.repositoryModuleManager; this.blockchainModuleManager = ctx.blockchainModuleManager; this.paranetService = ctx.paranetService; - this.operationMutex = new Mutex(); } async processResponse(operationId, blockchain, responseStatus, responseData) { diff --git a/src/service/get-service.js b/src/service/get-service.js index b70dbd951..17e632745 100644 --- a/src/service/get-service.js +++ b/src/service/get-service.js @@ -1,4 +1,3 @@ -import { Mutex } from 'async-mutex'; import OperationService from './operation-service.js'; import { OPERATION_ID_STATUS, @@ -22,7 +21,6 @@ class GetService extends OperationService { OPERATION_ID_STATUS.GET.GET_END, OPERATION_ID_STATUS.COMPLETED, ]; - this.operationMutex = new Mutex(); } async processResponse(command, responseStatus, responseData) { diff --git a/src/service/operation-service.js b/src/service/operation-service.js index 10d84b2f4..8b0745f3d 100644 --- a/src/service/operation-service.js +++ b/src/service/operation-service.js @@ -1,3 +1,4 @@ +import { Mutex } from 'async-mutex'; import { OPERATION_ID_STATUS, OPERATION_REQUEST_STATUS, @@ -10,6 +11,18 @@ class OperationService { this.repositoryModuleManager = ctx.repositoryModuleManager; this.operationIdService = ctx.operationIdService; this.commandExecutor = ctx.commandExecutor; + this._operationMutexes = new Map(); + } + + _getOperationMutex(operationId) { + if (!this._operationMutexes.has(operationId)) { + this._operationMutexes.set(operationId, new Mutex()); + } + return this._operationMutexes.get(operationId); + } + + _cleanupOperationMutex(operationId) { + this._operationMutexes.delete(operationId); } getOperationName() { @@ -30,7 +43,8 @@ class OperationService { async getResponsesStatuses(responseStatus, errorMessage, operationId) { let responses = 0; const self = this; - await this.operationMutex.runExclusive(async () => { + const mutex = this._getOperationMutex(operationId); + await mutex.runExclusive(async () => { await self.repositoryModuleManager.createOperationResponseRecord( responseStatus, this.operationName, @@ -65,6 +79,7 @@ class OperationService { endStatuses, options = {}, ) { + this._cleanupOperationMutex(operationId); const { reuseExistingCache = false } = options; this.logger.info(`Finalizing ${this.operationName} for operationId: ${operationId}`); @@ -98,6 +113,7 @@ class OperationService { } async markOperationAsFailed(operationId, blockchain, message, errorType) { + this._cleanupOperationMutex(operationId); this.logger.info(`${this.operationName} for operationId: ${operationId} failed.`); await this.operationIdService.removeOperationIdCache(operationId); diff --git a/src/service/publish-service.js b/src/service/publish-service.js index 1e6046240..bc2ab2892 100644 --- a/src/service/publish-service.js +++ b/src/service/publish-service.js @@ -1,4 +1,3 @@ -import { Mutex } from 'async-mutex'; import OperationService from './operation-service.js'; import { @@ -23,7 +22,6 @@ class PublishService extends OperationService { OPERATION_ID_STATUS.PUBLISH.PUBLISH_END, OPERATION_ID_STATUS.COMPLETED, ]; - this.operationMutex = new Mutex(); } async processResponse(command, responseStatus, responseData, errorMessage = null) { @@ -79,6 +77,7 @@ class PublishService extends OperationService { `[PUBLISH] Minimum replication reached for operationId: ${operationId}, ` + `datasetRoot: ${datasetRoot}, completed: ${completedNumber}/${minAckResponses}`, ); + await this.repositoryModuleManager.updateMinAcksReached(operationId, true); const cachedData = (await this.operationIdService.getCachedOperationIdData(operationId)) || null; await this.markOperationAsCompleted( @@ -88,7 +87,6 @@ class PublishService extends OperationService { this.completedStatuses, { reuseExistingCache: true }, ); - await this.repositoryModuleManager.updateMinAcksReached(operationId, true); this.logResponsesSummary(completedNumber, failedNumber); } // 2.2 Otherwise, mark as failed diff --git a/src/service/update-service.js b/src/service/update-service.js index 0ee39c6e1..55c0f1af4 100644 --- a/src/service/update-service.js +++ b/src/service/update-service.js @@ -1,4 +1,3 @@ -import { Mutex } from 'async-mutex'; import OperationService from './operation-service.js'; import { @@ -22,7 +21,6 @@ class UpdateService extends OperationService { OPERATION_ID_STATUS.UPDATE.UPDATE_END, OPERATION_ID_STATUS.COMPLETED, ]; - this.operationMutex = new Mutex(); } async processResponse(command, responseStatus, responseData, errorMessage = null) { From cc78a790811b92a9587e8b9eceecec971a0f7dc4 Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 11:56:51 +0100 Subject: [PATCH 05/20] add codex review --- .codex/review-prompt.md | 186 +++++++++++++++++++++++++++++ .codex/review-schema.json | 35 ++++++ .github/workflows/codex-review.yml | 142 ++++++++++++++++++++++ 3 files changed, 363 insertions(+) create mode 100644 .codex/review-prompt.md create mode 100644 .codex/review-schema.json create mode 100644 .github/workflows/codex-review.yml diff --git a/.codex/review-prompt.md b/.codex/review-prompt.md new file mode 100644 index 000000000..0f92a9aa1 --- /dev/null +++ b/.codex/review-prompt.md @@ -0,0 +1,186 @@ +# PR Review Instructions + +You are a senior code reviewer for the OriginTrail DKG Engine (ot-node). Your job is to review a pull request diff and produce structured, actionable feedback as inline comments on specific changed lines. You review like a staff engineer who cares deeply about code quality, readability, and simplicity. + +## Context Files + +Read these files before reviewing: + +1. **`pr-diff.patch`** — The PR diff (generated at runtime). This is the primary input. + +You may read other files in the repository **only** to understand how code changed in the diff is called or referenced. Do not review, comment on, or mention code in files that are not part of the diff. All review comments and the summary must be strictly scoped to changes introduced by this PR's diff — nothing else. + +## Project Architecture + +- **Node.js** application (ESM modules, `.js` and `.mjs` files) +- **Awilix** dependency injection container for service management +- **libp2p** for peer-to-peer networking and message passing +- **Ethers.js / Web3.js** for multi-chain blockchain interactions (NeuroWeb, Gnosis, Base) +- **Sequelize** ORM for local SQLite database +- **Blazegraph** triple store for RDF/SPARQL knowledge graph operations +- **Pino** for structured logging +- **Command pattern** for async operations (publish, get, query) +- **BDD tests** using Cucumber.js with Gherkin feature files + +### Key Directories + +- `src/commands/` — Command implementations (publish, get, query protocols) +- `src/modules/` — Core modules (blockchain, network, repository, triple-store) +- `src/service/` — Service layer (pending-storage, operation, validation) +- `src/constants/` — System-wide constants and error definitions +- `test/bdd/` — Cucumber BDD tests (features, steps, utilities) + +## Review Philosophy + +Most PR issues in this codebase are maintainability problems — bloat, poor naming, scattered validation, hardcoded values, pattern drift. These matter a lot. + +However, review priority is always **severity-first**: + +1. **Blockers first** — correctness, security, auth, data integrity, blockchain safety. +2. **Then maintainability** — readability, simplicity, pattern conformance. + +When both exist, report blockers first. + +### Review Method + +Do three passes: + +1. **Context + risk-map pass (mandatory)** — Start from diff hunks, then read surrounding or full touched files when needed to evaluate maintainability, coupling, naming, and extraction opportunities. Use this context to assess changed behavior, not to run unrelated file-wide audits. +2. **Blockers pass** — Scan for correctness bugs, security issues, blockchain transaction safety, gas handling issues, data integrity risks, and missing tests for changed behavior. These are `🔴 Bug` comments. +3. **Maintainability pass** — Scan for code bloat, readability issues, naming problems, pattern violations, hardcoded values, and architecture drift in touched areas. These are `🟡 Issue`, `🔵 Nit`, or `💡 Suggestion` comments. + +### Comment Gate + +Before posting any comment, verify all four conditions: + +1. **Introduced by this diff** — The issue is introduced or materially worsened by the changes in this PR, not pre-existing. +2. **Materially impactful** — The issue affects correctness, security, readability, or maintainability in a meaningful way. Not a theoretical concern. +3. **Concrete fix direction** — You can suggest a specific fix or clear direction. If you can only say "this seems off" without a concrete suggestion, do not comment. +4. **Scope fit** — If the issue is mainly in pre-existing code, the PR must touch the same function/module and fixing it must directly simplify, de-risk, or de-duplicate the new/changed code. + +If any check fails, skip the comment. +Every comment must be traceable to changed behavior in this PR and anchored to a right-side line present in `pr-diff.patch`. Prefer added/modified lines; use nearby unchanged hunk lines only when necessary to explain a directly related issue. + +**Uncertainty guard:** If you are not certain an issue is real and cannot verify it from the diff and allowed context, do not label it `🔴 Bug`. Downgrade to `🟡 Issue` or `💡 Suggestion`, or skip it entirely. + +**Deduplication:** One comment per root cause. If the same pattern repeats across multiple lines, comment on the first occurrence and note "same pattern at lines X, Y, Z." Aim for a maximum of ~10 comments, highest impact first. + +## What to Review + +### Pass 1: Blockers + +#### Correctness + +- Logic errors, off-by-one, null/undefined handling, incorrect assumptions, race conditions. +- Boundary conditions — empty arrays, null inputs, zero values, maximum values. +- Error handling — swallowed errors, missing error propagation, unhelpful error messages. Do not flag missing error handling for internal code that cannot reasonably fail. +- Async/await correctness — unhandled promise rejections, missing awaits, race conditions in concurrent operations. +- Nonce management — verify blockchain nonce allocation and retry logic does not create orphan transactions or nonce gaps. + +#### Security + +- Injection risks (SQL, command, XSS) when handling user input. +- Hardcoded secrets — API keys, passwords, private keys, tokens in code. Private keys must never appear in source. +- Missing input validation at system boundaries (user input, external APIs, RPC responses). Not for internal function calls. +- Auth bypass, privilege escalation, or missing authorization checks. +- RPC endpoint exposure — verify no private/paid RPC URLs or API keys are hardcoded in committed code. + +#### Blockchain Safety + +- Gas handling — verify gas price calculations, multipliers, and buffers are reasonable and consistent across testnet/mainnet. +- Transaction retry logic — ensure retries don't waste gas, create duplicate transactions, or cause nonce conflicts. +- Wallet/key management — no hardcoded private keys, proper key isolation between environments. +- Multi-chain consistency — changes affecting one chain should be verified for impact on other supported chains (NeuroWeb, Gnosis, Base). +- BigNumber handling — verify arithmetic operations use BigNumber-safe methods, no precision loss from floating point. + +#### Tests for Changed Behavior + +- New behavior must have corresponding tests covering core functionality and error handling. +- Bug fixes must include a regression test that would have caught the original bug. +- Changed behavior must have updated tests reflecting the new expectations. +- If tests are present but brittle (testing implementation details rather than behavior), flag it. + +Missing tests for changed behavior are blockers (`🔴 Bug`) only when the change affects user-facing behavior, API contracts, or data integrity. Missing tests for internal refactors or trivial changes are `🟡 Issue`. + +### Pass 2: Maintainability + +#### Code Bloat and Unnecessary Complexity + +- **Excessive code** — More lines than necessary. Could this be done in fewer lines without sacrificing clarity? +- **Over-engineering** — Abstractions, helpers, or utilities for one-time operations. Premature generalization. +- **Dead code** — Unused variables, unreachable branches, commented-out code, leftover debug logging. +- **Duplicate code** — Same logic repeated instead of extracted. Do not suggest extraction for only 2-3 similar lines unless the repeated logic encodes a correctness invariant across multiple paths. + +#### Readability and Naming + +- **Confusing variable/function names** — Names that don't describe what the thing is or does. Generic names like `data`, `result`, `item`, `temp`, `val` when a specific name would be clearer. +- **Misleading names** — Names that suggest different behavior than what the code does. +- **Inconsistent naming** — Not following conventions in the rest of the codebase. +- **Long functions** — Functions doing too many things. If you need a comment to explain a section, it should probably be its own function. +- **Deep nesting** — More than 2-3 levels. Suggest early returns, guard clauses, or extraction. +- **Unclear control flow** — Complex conditionals that could be simplified or decomposed. + +#### Hardcoded Values and Magic Constants + +Flag only when the value is: + +- **Reused 3+ times** in touched files or the diff — should be a named constant. +- **Domain-significant** — timeout values, retry counts, gas multipliers, RPC URLs, network message timeouts. Even if used once, these belong in constants or configuration. + +Do not flag one-off numeric literals that are self-explanatory in context (e.g., `array.slice(0, 2)`, `Math.round(x * 100) / 100`). + +#### Performance (Only Obvious Issues) + +- N+1 queries — database queries inside loops. +- Blocking operations in async contexts — synchronous I/O in async code. +- Unnecessary work in hot paths — redundant allocations, repeated computations. +- Memory leaks — Maps/Sets/caches that grow unboundedly without cleanup. + +## What NOT to Review + +- Formatting or style — ESLint and Prettier handle this. +- Things that are clearly intentional design choices backed by existing patterns. +- Pre-existing issues in unchanged code outside the diff. +- Pre-existing issues in touched files when the PR does not introduce/worsen them. +- Adding documentation unless a public API is clearly undocumented. +- Repository-wide or file-wide audits not required by the changed behavior. +- Test configuration files (cucumber.js, .eslintrc) unless they introduce issues. + +## Comment Format + +Use severity prefixes: + +- `🔴 Bug:` — Correctness error, security issue, blockchain safety issue, data integrity risk. Will cause incorrect behavior. +- `🟡 Issue:` — Code quality problem that should be fixed. Bloated code, bad naming, pattern violation, missing tests. +- `🔵 Nit:` — Minor improvement, optional. +- `💡 Suggestion:` — Alternative approach worth considering. + +Be specific, be concise, explain why. One clear sentence with a concrete fix is better than a paragraph of theory. + +## Output Format + +Return raw JSON only. No markdown fences, no prose before or after the JSON object. Your output MUST be valid JSON matching the provided output schema. Example: + +```json +{ + "summary": "This PR improves blockchain error handling but introduces a potential gas waste issue in the retry loop and has leftover debug logging.", + "comments": [ + { + "path": "src/modules/blockchain/implementation/web3-service.js", + "line": 142, + "body": "🔴 Bug: Gas price is bumped on every retry including network errors, which wastes gas. Only bump for nonce conflicts and execution errors. Add a `shouldBumpGas` guard." + }, + { + "path": "src/commands/protocols/publish/sender/publish-replication-command.js", + "line": 58, + "body": "🟡 Issue: `console.log` debug statement left in production code. Use `this.logger.debug()` instead or remove it." + } + ] +} +``` + +The `line` field must refer to the line number in the new version of the file (right side of the diff), and it must be a line that actually appears in the diff hunks. Do not comment on lines outside the diff. + +## Summary + +Write a brief (2–4 sentence) overall assessment in the `summary` field covering **only** what this PR's diff changes. Do not mention code, packages, or behavior outside the diff. Lead with blockers if any exist. Mention whether the PR is clean/minimal or has code quality issues. Include one sentence on maintainability direction in touched areas (improved / neutral / worsened, and why). If the PR looks good, say so. diff --git a/.codex/review-schema.json b/.codex/review-schema.json new file mode 100644 index 000000000..0039cffb6 --- /dev/null +++ b/.codex/review-schema.json @@ -0,0 +1,35 @@ +{ + "type": "object", + "properties": { + "summary": { + "type": "string", + "description": "Brief overall assessment of the PR (2-4 sentences)" + }, + "comments": { + "type": "array", + "description": "Inline review comments on specific changed lines", + "items": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "File path relative to repository root" + }, + "line": { + "type": "integer", + "minimum": 1, + "description": "Line number in the new version of the file (must be within the diff)" + }, + "body": { + "type": "string", + "description": "Review comment with severity prefix" + } + }, + "required": ["path", "line", "body"], + "additionalProperties": false + } + } + }, + "required": ["summary", "comments"], + "additionalProperties": false +} diff --git a/.github/workflows/codex-review.yml b/.github/workflows/codex-review.yml new file mode 100644 index 000000000..08d8e2c5f --- /dev/null +++ b/.github/workflows/codex-review.yml @@ -0,0 +1,142 @@ +name: Codex PR Review + +on: + pull_request: + types: [opened, synchronize, reopened] + +concurrency: + group: codex-review-${{ github.event.pull_request.number }} + cancel-in-progress: true + +permissions: + contents: read + pull-requests: write + +jobs: + review: + name: Codex Review + runs-on: ubuntu-latest + timeout-minutes: 15 + # Skip fork PRs — they cannot access repository secrets + if: github.event.pull_request.head.repo.full_name == github.repository + + steps: + - name: Checkout PR merge commit + uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4 + with: + ref: refs/pull/${{ github.event.pull_request.number }}/merge + fetch-depth: 0 + + - name: Generate PR diff + run: git diff ${{ github.event.pull_request.base.sha }}...HEAD > pr-diff.patch + + - name: Run Codex review + id: codex + uses: openai/codex-action@f5c0ca71642badb34c1e66321d8d85685a0fa3dc # v1 + with: + openai-api-key: ${{ secrets.OPENAI_API_KEY }} + prompt-file: .codex/review-prompt.md + output-schema-file: .codex/review-schema.json + effort: high + sandbox: read-only + + - name: Post PR review with inline comments + uses: actions/github-script@f28e40c7f34bde8b3046d885e986cb6290c5673b # v7 + env: + REVIEW_JSON: ${{ steps.codex.outputs.final-message }} + with: + script: | + let review; + try { + review = JSON.parse(process.env.REVIEW_JSON); + } catch (e) { + console.error('Failed to parse Codex output:', e.message); + console.error('Raw output:', process.env.REVIEW_JSON?.slice(0, 500)); + await github.rest.pulls.createReview({ + owner: context.repo.owner, + repo: context.repo.repo, + pull_number: context.issue.number, + body: '⚠️ Codex review failed to produce valid JSON output. Check the [workflow logs](' + + `${process.env.GITHUB_SERVER_URL}/${context.repo.owner}/${context.repo.repo}/actions/runs/${process.env.GITHUB_RUN_ID}) for details.`, + event: 'COMMENT', + comments: [], + }); + return; + } + + // Fetch all changed files (paginated for large PRs) + const files = await github.paginate( + github.rest.pulls.listFiles, + { + owner: context.repo.owner, + repo: context.repo.repo, + pull_number: context.issue.number, + per_page: 100, + } + ); + + // Build set of valid (path:line) pairs from right-side diff hunk lines + // (added + context). This keeps comments bound to changed areas. + const validLines = new Set(); + for (const file of files) { + // Skip binary/large/truncated files with no patch + if (!file.patch) continue; + + const lines = file.patch.split('\n'); + let currentLine = 0; + for (const line of lines) { + const hunkMatch = line.match(/^@@ -\d+(?:,\d+)? \+(\d+)/); + if (hunkMatch) { + currentLine = parseInt(hunkMatch[1], 10); + continue; + } + // Added lines are valid comment targets + if (line.startsWith('+')) { + validLines.add(`${file.filename}:${currentLine}`); + currentLine++; + continue; + } + // Deleted lines don't exist in the new file + if (line.startsWith('-')) continue; + // Ignore hunk metadata lines + if (line.startsWith('\\')) continue; + // Context lines on the right side are also valid targets + validLines.add(`${file.filename}:${currentLine}`); + currentLine++; + } + } + + // Partition comments into valid (on right-side diff lines) and dropped + const comments = Array.isArray(review.comments) ? review.comments : []; + const validComments = []; + const droppedComments = []; + + for (const comment of comments) { + const key = `${comment.path}:${comment.line}`; + if (validLines.has(key)) { + validComments.push({ + path: comment.path, + line: comment.line, + body: comment.body, + side: 'RIGHT', + }); + } else { + droppedComments.push(comment); + } + } + + // Build review body from summary only. + // Intentionally do NOT publish out-of-diff comments. + let body = review.summary || 'Codex review complete.'; + + // Post the review + await github.rest.pulls.createReview({ + owner: context.repo.owner, + repo: context.repo.repo, + pull_number: context.issue.number, + body, + event: 'COMMENT', + comments: validComments, + }); + + console.log(`Review posted: ${validComments.length} inline comments, ${droppedComments.length} dropped out-of-diff comments`); From 64a3428ab3e9cf62dd68c37d883166e1389f5ff6 Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 12:01:49 +0100 Subject: [PATCH 06/20] fix: remove sandbox from codex review workflow bwrap sandbox fails on GitHub Actions runners with "Failed RTM_NEWADDR: Operation not permitted". GitHub Actions already provides process isolation so the sandbox is unnecessary. Made-with: Cursor --- .github/workflows/codex-review.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/codex-review.yml b/.github/workflows/codex-review.yml index 08d8e2c5f..4355a2ad4 100644 --- a/.github/workflows/codex-review.yml +++ b/.github/workflows/codex-review.yml @@ -38,7 +38,6 @@ jobs: prompt-file: .codex/review-prompt.md output-schema-file: .codex/review-schema.json effort: high - sandbox: read-only - name: Post PR review with inline comments uses: actions/github-script@f28e40c7f34bde8b3046d885e986cb6290c5673b # v7 From 8b73b7637bd9b4c33efaf971907d7e78bb0e9e2f Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 12:06:35 +0100 Subject: [PATCH 07/20] fix github actions --- .github/workflows/codex-review.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/codex-review.yml b/.github/workflows/codex-review.yml index 4355a2ad4..bdb41a68a 100644 --- a/.github/workflows/codex-review.yml +++ b/.github/workflows/codex-review.yml @@ -38,6 +38,7 @@ jobs: prompt-file: .codex/review-prompt.md output-schema-file: .codex/review-schema.json effort: high + sandbox: full-auto - name: Post PR review with inline comments uses: actions/github-script@f28e40c7f34bde8b3046d885e986cb6290c5673b # v7 From 581f9a42ed56f0c8ce1158d3f7b1454c7ae115df Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 12:07:55 +0100 Subject: [PATCH 08/20] fix github actions codex --- .github/workflows/codex-review.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/codex-review.yml b/.github/workflows/codex-review.yml index bdb41a68a..784e6b637 100644 --- a/.github/workflows/codex-review.yml +++ b/.github/workflows/codex-review.yml @@ -38,7 +38,7 @@ jobs: prompt-file: .codex/review-prompt.md output-schema-file: .codex/review-schema.json effort: high - sandbox: full-auto + sandbox: workspace-write - name: Post PR review with inline comments uses: actions/github-script@f28e40c7f34bde8b3046d885e986cb6290c5673b # v7 From d58d42aa898ff10c7476986c43368887d2cde519 Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 12:12:34 +0100 Subject: [PATCH 09/20] fix codex --- .github/workflows/codex-review.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/codex-review.yml b/.github/workflows/codex-review.yml index 784e6b637..1a40f798c 100644 --- a/.github/workflows/codex-review.yml +++ b/.github/workflows/codex-review.yml @@ -38,7 +38,7 @@ jobs: prompt-file: .codex/review-prompt.md output-schema-file: .codex/review-schema.json effort: high - sandbox: workspace-write + sandbox: danger-full-access - name: Post PR review with inline comments uses: actions/github-script@f28e40c7f34bde8b3046d885e986cb6290c5673b # v7 From 2c1901fefb566f958bdd54571a7cb1db019258b6 Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 12:16:29 +0100 Subject: [PATCH 10/20] update codex --- .github/workflows/codex-review.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/codex-review.yml b/.github/workflows/codex-review.yml index 1a40f798c..8087a3091 100644 --- a/.github/workflows/codex-review.yml +++ b/.github/workflows/codex-review.yml @@ -30,6 +30,9 @@ jobs: - name: Generate PR diff run: git diff ${{ github.event.pull_request.base.sha }}...HEAD > pr-diff.patch + - name: Allow unprivileged user namespaces for bubblewrap sandbox + run: sudo sysctl -w kernel.apparmor_restrict_unprivileged_userns=0 + - name: Run Codex review id: codex uses: openai/codex-action@f5c0ca71642badb34c1e66321d8d85685a0fa3dc # v1 @@ -38,7 +41,7 @@ jobs: prompt-file: .codex/review-prompt.md output-schema-file: .codex/review-schema.json effort: high - sandbox: danger-full-access + sandbox: read-only - name: Post PR review with inline comments uses: actions/github-script@f28e40c7f34bde8b3046d885e986cb6290c5673b # v7 From 7c5ec91b9abe51da340220cf9ff2db0cd16bda92 Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 12:19:33 +0100 Subject: [PATCH 11/20] update codex --- .github/workflows/codex-review.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/codex-review.yml b/.github/workflows/codex-review.yml index 8087a3091..a5be0afe1 100644 --- a/.github/workflows/codex-review.yml +++ b/.github/workflows/codex-review.yml @@ -31,7 +31,10 @@ jobs: run: git diff ${{ github.event.pull_request.base.sha }}...HEAD > pr-diff.patch - name: Allow unprivileged user namespaces for bubblewrap sandbox - run: sudo sysctl -w kernel.apparmor_restrict_unprivileged_userns=0 + run: | + if sudo sysctl -n kernel.apparmor_restrict_unprivileged_userns 2>/dev/null; then + sudo sysctl -w kernel.apparmor_restrict_unprivileged_userns=0 + fi - name: Run Codex review id: codex From a1d22bb93d3e870bbffe8cc65dfb112f63001289 Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 12:59:11 +0100 Subject: [PATCH 12/20] fix get command comment --- .../protocols/get/sender/get-command.js | 122 ++++++++++++------ 1 file changed, 86 insertions(+), 36 deletions(-) diff --git a/src/commands/protocols/get/sender/get-command.js b/src/commands/protocols/get/sender/get-command.js index 49b5dfb1e..792cdfcf7 100644 --- a/src/commands/protocols/get/sender/get-command.js +++ b/src/commands/protocols/get/sender/get-command.js @@ -263,47 +263,84 @@ class GetCommand extends Command { } this.logger.debug(`Could not find asset with UAL: ${ual} locally`); - try { - const latestMerkleRoot = - await this.blockchainModuleManager.getKnowledgeCollectionLatestMerkleRoot( - blockchain, - contract, - knowledgeCollectionId, - ); - if (latestMerkleRoot) { - const publishOpId = - this.pendingStorageService.getOperationIdByMerkleRoot(latestMerkleRoot); - if (publishOpId) { - const cachedAssertion = await this.pendingStorageService.getCachedDataset( - publishOpId, + if (!knowledgeAssetId) { + try { + const latestMerkleRoot = + await this.blockchainModuleManager.getKnowledgeCollectionLatestMerkleRoot( + blockchain, + contract, + knowledgeCollectionId, ); - if ( - cachedAssertion && - (cachedAssertion.public?.length || cachedAssertion.private?.length) - ) { - const cachedResponseData = { assertion: cachedAssertion }; - - this.logger.info( - `Serving asset ${ual} from pending storage cache (merkleRoot: ${latestMerkleRoot})`, - ); - await this.operationService.markOperationAsCompleted( - operationId, - blockchain, - cachedResponseData, - [ - OPERATION_ID_STATUS.GET.GET_LOCAL_END, - OPERATION_ID_STATUS.GET.GET_END, - OPERATION_ID_STATUS.COMPLETED, - ], + if (latestMerkleRoot) { + const publishOpId = + this.pendingStorageService.getOperationIdByMerkleRoot(latestMerkleRoot); + if (publishOpId) { + const cachedAssertion = await this.pendingStorageService.getCachedDataset( + publishOpId, ); - return Command.empty(); + if (cachedAssertion) { + const filteredAssertion = this._filterAssertionByContentType( + cachedAssertion, + contentType, + ); + + if ( + filteredAssertion.public?.length || + filteredAssertion.private?.length + ) { + let cachePassed = true; + if ( + paranetNodesAccessPolicy === PARANET_ACCESS_POLICY.PERMISSIONED + ) { + if (Array.isArray(filteredAssertion.public)) { + const shouldHavePrivate = filteredAssertion.public.some( + (triple) => + triple.includes(`${PRIVATE_ASSERTION_PREDICATE}`), + ); + if (shouldHavePrivate) { + cachePassed = filteredAssertion.private?.length > 0; + } + } else { + cachePassed = false; + } + } + + const cacheResponseData = { assertion: filteredAssertion }; + const cacheValid = await this.validateResponse( + cacheResponseData, + blockchain, + contract, + knowledgeCollectionId, + knowledgeAssetId, + paranetNodesAccessPolicy, + contentType, + ); + + if (cachePassed && cacheValid) { + this.logger.info( + `Serving asset ${ual} from pending storage cache (merkleRoot: ${latestMerkleRoot})`, + ); + await this.operationService.markOperationAsCompleted( + operationId, + blockchain, + cacheResponseData, + [ + OPERATION_ID_STATUS.GET.GET_LOCAL_END, + OPERATION_ID_STATUS.GET.GET_END, + OPERATION_ID_STATUS.COMPLETED, + ], + ); + return Command.empty(); + } + } + } } } + } catch (cacheErr) { + this.logger.debug( + `Pending storage cache fallback failed for ${ual}: ${cacheErr.message}`, + ); } - } catch (cacheErr) { - this.logger.debug( - `Pending storage cache fallback failed for ${ual}: ${cacheErr.message}`, - ); } await this.operationIdService.emitChangeEvent( @@ -673,6 +710,19 @@ class GetCommand extends Command { return false; } + _filterAssertionByContentType(assertion, contentType) { + if (contentType === 'private') { + return { private: assertion.private ?? [] }; + } + if (contentType === 'public') { + return { public: assertion.public ?? [] }; + } + return { + public: assertion.public ?? [], + private: assertion.private ?? [], + }; + } + /** * Builds default GetCommand * @param map From 86232a6194ac8e22fb4877587530ba4b6a3c95b3 Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 15:08:08 +0100 Subject: [PATCH 13/20] fix local get --- .../protocols/get/sender/get-command.js | 229 ++++++++++-------- .../sender/publish-replication-command.js | 8 + 2 files changed, 138 insertions(+), 99 deletions(-) diff --git a/src/commands/protocols/get/sender/get-command.js b/src/commands/protocols/get/sender/get-command.js index 792cdfcf7..2bdc9bd0a 100644 --- a/src/commands/protocols/get/sender/get-command.js +++ b/src/commands/protocols/get/sender/get-command.js @@ -73,19 +73,65 @@ class GetCommand extends Command { OPERATION_ID_STATUS.GET.GET_VALIDATE_ASSET_START, ); - const { isValid, errorMessage } = await this.validateUAL( - operationId, - blockchain, - contract, - knowledgeCollectionId, - ual, - ); + const maxGetRetries = 3; + const getRetryDelayMs = 5_000; + let ualValidationPassed = false; + let ualValidationError = null; + + for (let attempt = 1; attempt <= maxGetRetries; attempt += 1) { + try { + // eslint-disable-next-line no-await-in-loop + const { isValid, errorMessage: valMsg } = await this.validateUAL( + operationId, + blockchain, + contract, + knowledgeCollectionId, + ual, + ); + if (isValid) { + ualValidationPassed = true; + break; + } + ualValidationError = valMsg; + } catch (err) { + ualValidationError = `UAL validation failed: ${err.message}`; + } + + if (!ualValidationPassed) { + try { + // eslint-disable-next-line no-await-in-loop + const cachedResult = await this._tryCacheFallback( + blockchain, + contract, + knowledgeCollectionId, + ual, + operationId, + paranetNodesAccessPolicy, + ); + if (cachedResult) { + return cachedResult; + } + } catch (_cacheErr) { + // cache fallback also failed, will retry + } + } - if (!isValid) { + if (attempt < maxGetRetries) { + this.logger.debug( + `Get validation/cache attempt ${attempt}/${maxGetRetries} failed for ${ual}, retrying in ${getRetryDelayMs}ms`, + ); + // eslint-disable-next-line no-await-in-loop + await new Promise((resolve) => { + setTimeout(resolve, getRetryDelayMs); + }); + } + } + + if (!ualValidationPassed) { await this.handleError( operationId, blockchain, - errorMessage, + ualValidationError, ERROR_TYPE.GET.GET_VALIDATE_ASSET_ERROR, ); return Command.empty(); @@ -263,84 +309,22 @@ class GetCommand extends Command { } this.logger.debug(`Could not find asset with UAL: ${ual} locally`); - if (!knowledgeAssetId) { - try { - const latestMerkleRoot = - await this.blockchainModuleManager.getKnowledgeCollectionLatestMerkleRoot( - blockchain, - contract, - knowledgeCollectionId, - ); - if (latestMerkleRoot) { - const publishOpId = - this.pendingStorageService.getOperationIdByMerkleRoot(latestMerkleRoot); - if (publishOpId) { - const cachedAssertion = await this.pendingStorageService.getCachedDataset( - publishOpId, - ); - if (cachedAssertion) { - const filteredAssertion = this._filterAssertionByContentType( - cachedAssertion, - contentType, - ); - - if ( - filteredAssertion.public?.length || - filteredAssertion.private?.length - ) { - let cachePassed = true; - if ( - paranetNodesAccessPolicy === PARANET_ACCESS_POLICY.PERMISSIONED - ) { - if (Array.isArray(filteredAssertion.public)) { - const shouldHavePrivate = filteredAssertion.public.some( - (triple) => - triple.includes(`${PRIVATE_ASSERTION_PREDICATE}`), - ); - if (shouldHavePrivate) { - cachePassed = filteredAssertion.private?.length > 0; - } - } else { - cachePassed = false; - } - } - - const cacheResponseData = { assertion: filteredAssertion }; - const cacheValid = await this.validateResponse( - cacheResponseData, - blockchain, - contract, - knowledgeCollectionId, - knowledgeAssetId, - paranetNodesAccessPolicy, - contentType, - ); - - if (cachePassed && cacheValid) { - this.logger.info( - `Serving asset ${ual} from pending storage cache (merkleRoot: ${latestMerkleRoot})`, - ); - await this.operationService.markOperationAsCompleted( - operationId, - blockchain, - cacheResponseData, - [ - OPERATION_ID_STATUS.GET.GET_LOCAL_END, - OPERATION_ID_STATUS.GET.GET_END, - OPERATION_ID_STATUS.COMPLETED, - ], - ); - return Command.empty(); - } - } - } - } - } - } catch (cacheErr) { - this.logger.debug( - `Pending storage cache fallback failed for ${ual}: ${cacheErr.message}`, - ); + try { + const cachedResult = await this._tryCacheFallback( + blockchain, + contract, + knowledgeCollectionId, + ual, + operationId, + paranetNodesAccessPolicy, + ); + if (cachedResult) { + return cachedResult; } + } catch (cacheErr) { + this.logger.debug( + `Pending storage cache fallback failed for ${ual}: ${cacheErr.message}`, + ); } await this.operationIdService.emitChangeEvent( @@ -472,6 +456,66 @@ class GetCommand extends Command { return Command.empty(); } + async _tryCacheFallback( + blockchain, + contract, + knowledgeCollectionId, + ual, + operationId, + paranetNodesAccessPolicy, + ) { + const latestMerkleRoot = + await this.blockchainModuleManager.getKnowledgeCollectionLatestMerkleRoot( + blockchain, + contract, + knowledgeCollectionId, + ); + if (!latestMerkleRoot) return null; + + const publishOpId = this.pendingStorageService.getOperationIdByMerkleRoot(latestMerkleRoot); + if (!publishOpId) return null; + + const cachedAssertion = await this.pendingStorageService.getCachedDataset(publishOpId); + if ( + !cachedAssertion || + (!cachedAssertion.public?.length && !cachedAssertion.private?.length) + ) { + return null; + } + + let cachePassed = true; + if (paranetNodesAccessPolicy === PARANET_ACCESS_POLICY.PERMISSIONED) { + if (Array.isArray(cachedAssertion.public)) { + const shouldHavePrivate = cachedAssertion.public.some((triple) => + triple.includes(`${PRIVATE_ASSERTION_PREDICATE}`), + ); + if (shouldHavePrivate) { + cachePassed = cachedAssertion.private?.length > 0; + } + } else { + cachePassed = false; + } + } + + if (!cachePassed) return null; + + const cachedResponseData = { assertion: cachedAssertion }; + this.logger.info( + `Serving asset ${ual} from pending storage cache (merkleRoot: ${latestMerkleRoot})`, + ); + await this.operationService.markOperationAsCompleted( + operationId, + blockchain, + cachedResponseData, + [ + OPERATION_ID_STATUS.GET.GET_LOCAL_END, + OPERATION_ID_STATUS.GET.GET_END, + OPERATION_ID_STATUS.COMPLETED, + ], + ); + return Command.empty(); + } + async validateUAL(operationId, blockchain, contract, knowledgeCollectionId, ual) { const isUAL = this.ualService.isUAL(ual); @@ -710,19 +754,6 @@ class GetCommand extends Command { return false; } - _filterAssertionByContentType(assertion, contentType) { - if (contentType === 'private') { - return { private: assertion.private ?? [] }; - } - if (contentType === 'public') { - return { public: assertion.public ?? [] }; - } - return { - public: assertion.public ?? [], - private: assertion.private ?? [], - }; - } - /** * Builds default GetCommand * @param map diff --git a/src/commands/protocols/publish/sender/publish-replication-command.js b/src/commands/protocols/publish/sender/publish-replication-command.js index 1568b1ddf..09cf1c488 100644 --- a/src/commands/protocols/publish/sender/publish-replication-command.js +++ b/src/commands/protocols/publish/sender/publish-replication-command.js @@ -133,6 +133,14 @@ class PublishReplicationCommand extends Command { return Command.empty(); } const { dataset } = await this.operationIdService.getCachedOperationIdData(operationId); + + await this.pendingStorageService.cacheDataset( + operationId, + datasetRoot, + dataset, + currentPeerId, + ); + const message = { dataset: dataset.public, datasetRoot, From 268ff241d3589b0110a747667308d5c3a2a64334 Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 15:20:02 +0100 Subject: [PATCH 14/20] fix per-operation mutex cleanup timing - Don't delete mutex on operation completion/failure; late responses could create a new mutex and break serialization for the same operationId - Track terminal operations with timestamps so late responses short-circuit inside the same mutex instead of processing stale data - Add periodic sweeper (5min TTL) to clean up old terminal mutexes and prevent memory buildup Made-with: Cursor --- src/service/operation-service.js | 35 ++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/src/service/operation-service.js b/src/service/operation-service.js index 8b0745f3d..76eaccce3 100644 --- a/src/service/operation-service.js +++ b/src/service/operation-service.js @@ -5,6 +5,9 @@ import { OPERATION_STATUS, } from '../constants/constants.js'; +const MUTEX_TTL_MS = 5 * 60 * 1000; +const MUTEX_SWEEP_INTERVAL_MS = 5 * 60 * 1000; + class OperationService { constructor(ctx) { this.logger = ctx.logger; @@ -12,6 +15,12 @@ class OperationService { this.operationIdService = ctx.operationIdService; this.commandExecutor = ctx.commandExecutor; this._operationMutexes = new Map(); + this._terminalOperations = new Map(); + + this._sweepInterval = setInterval(() => this._sweepStaleMutexes(), MUTEX_SWEEP_INTERVAL_MS); + if (this._sweepInterval.unref) { + this._sweepInterval.unref(); + } } _getOperationMutex(operationId) { @@ -21,8 +30,22 @@ class OperationService { return this._operationMutexes.get(operationId); } - _cleanupOperationMutex(operationId) { - this._operationMutexes.delete(operationId); + _markOperationTerminal(operationId) { + this._terminalOperations.set(operationId, Date.now()); + } + + _isOperationTerminal(operationId) { + return this._terminalOperations.has(operationId); + } + + _sweepStaleMutexes() { + const now = Date.now(); + for (const [operationId, terminatedAt] of this._terminalOperations) { + if (now - terminatedAt >= MUTEX_TTL_MS) { + this._operationMutexes.delete(operationId); + this._terminalOperations.delete(operationId); + } + } } getOperationName() { @@ -45,6 +68,10 @@ class OperationService { const self = this; const mutex = this._getOperationMutex(operationId); await mutex.runExclusive(async () => { + if (self._isOperationTerminal(operationId)) { + self.logger.debug(`Skipping late response for terminal operation ${operationId}`); + return; + } await self.repositoryModuleManager.createOperationResponseRecord( responseStatus, this.operationName, @@ -79,7 +106,7 @@ class OperationService { endStatuses, options = {}, ) { - this._cleanupOperationMutex(operationId); + this._markOperationTerminal(operationId); const { reuseExistingCache = false } = options; this.logger.info(`Finalizing ${this.operationName} for operationId: ${operationId}`); @@ -113,7 +140,7 @@ class OperationService { } async markOperationAsFailed(operationId, blockchain, message, errorType) { - this._cleanupOperationMutex(operationId); + this._markOperationTerminal(operationId); this.logger.info(`${this.operationName} for operationId: ${operationId} failed.`); await this.operationIdService.removeOperationIdCache(operationId); From 1eb86145731bea1adc1c69f0506484e8a5c45b93 Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 15:25:02 +0100 Subject: [PATCH 15/20] fix responses not iterable when operation is terminal Initialize responses as empty array instead of 0 so the terminal short-circuit path returns an empty result instead of crashing on for..of iteration. Made-with: Cursor --- src/service/operation-service.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service/operation-service.js b/src/service/operation-service.js index 76eaccce3..57acb2820 100644 --- a/src/service/operation-service.js +++ b/src/service/operation-service.js @@ -64,7 +64,7 @@ class OperationService { } async getResponsesStatuses(responseStatus, errorMessage, operationId) { - let responses = 0; + let responses = []; const self = this; const mutex = this._getOperationMutex(operationId); await mutex.runExclusive(async () => { From 78883581ebd4989beab50b05028d269b18c28b1d Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 15:26:01 +0100 Subject: [PATCH 16/20] prevent node crash when peer stream is already closed Wrap sendMessageResponse in handleError with try/catch so a closed stream does not bring down the entire node. Made-with: Cursor --- .../common/handle-protocol-message-command.js | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/commands/protocols/common/handle-protocol-message-command.js b/src/commands/protocols/common/handle-protocol-message-command.js index 6248ded96..16dab5dfa 100644 --- a/src/commands/protocols/common/handle-protocol-message-command.js +++ b/src/commands/protocols/common/handle-protocol-message-command.js @@ -126,13 +126,19 @@ class HandleProtocolMessageCommand extends Command { this.errorType, ); - await this.networkModuleManager.sendMessageResponse( - protocol, - remotePeerId, - NETWORK_MESSAGE_TYPES.RESPONSES.NACK, - operationId, - { errorMessage }, - ); + try { + await this.networkModuleManager.sendMessageResponse( + protocol, + remotePeerId, + NETWORK_MESSAGE_TYPES.RESPONSES.NACK, + operationId, + { errorMessage }, + ); + } catch (sendErr) { + this.logger.debug( + `Failed to send NACK to ${remotePeerId} for operation ${operationId}: ${sendErr.message}`, + ); + } this.networkModuleManager.removeCachedSession(operationId, remotePeerId); } } From bbe0b0edc6ad4bf0612157c31beb40c657a69f40 Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 15:29:35 +0100 Subject: [PATCH 17/20] fix undefined destructure for terminal operation responses Always initialize the operationId entry in the status map so callers don't crash when responses is empty (terminal short-circuit). Made-with: Cursor --- src/service/operation-service.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/service/operation-service.js b/src/service/operation-service.js index 57acb2820..2ff9cde8d 100644 --- a/src/service/operation-service.js +++ b/src/service/operation-service.js @@ -85,10 +85,9 @@ class OperationService { }); const operationIdStatuses = {}; - for (const response of responses) { - if (!operationIdStatuses[operationId]) - operationIdStatuses[operationId] = { failedNumber: 0, completedNumber: 0 }; + operationIdStatuses[operationId] = { failedNumber: 0, completedNumber: 0 }; + for (const response of responses) { if (response.status === OPERATION_REQUEST_STATUS.FAILED) { operationIdStatuses[operationId].failedNumber += 1; } else { From 54a3b28d5f217baa7c81c93969e4274653db4c14 Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 15:37:16 +0100 Subject: [PATCH 18/20] add cache fallback safety checks per code review - Skip cache fallback for single-KA requests so a KA-scoped get never receives the whole KC payload - Filter cached assertion by contentType so public-only requests never receive private triples - Run validateResponse() on cached data before completing, same validation gate as the normal local and network paths Made-with: Cursor --- .../protocols/get/sender/get-command.js | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/src/commands/protocols/get/sender/get-command.js b/src/commands/protocols/get/sender/get-command.js index 2bdc9bd0a..e77645841 100644 --- a/src/commands/protocols/get/sender/get-command.js +++ b/src/commands/protocols/get/sender/get-command.js @@ -104,9 +104,11 @@ class GetCommand extends Command { blockchain, contract, knowledgeCollectionId, + knowledgeAssetId, ual, operationId, paranetNodesAccessPolicy, + contentType, ); if (cachedResult) { return cachedResult; @@ -314,9 +316,11 @@ class GetCommand extends Command { blockchain, contract, knowledgeCollectionId, + knowledgeAssetId, ual, operationId, paranetNodesAccessPolicy, + contentType, ); if (cachedResult) { return cachedResult; @@ -460,10 +464,14 @@ class GetCommand extends Command { blockchain, contract, knowledgeCollectionId, + knowledgeAssetId, ual, operationId, paranetNodesAccessPolicy, + contentType, ) { + if (knowledgeAssetId) return null; + const latestMerkleRoot = await this.blockchainModuleManager.getKnowledgeCollectionLatestMerkleRoot( blockchain, @@ -483,14 +491,19 @@ class GetCommand extends Command { return null; } + const filteredAssertion = this._filterAssertionByContentType(cachedAssertion, contentType); + if (!filteredAssertion.public?.length && !filteredAssertion.private?.length) { + return null; + } + let cachePassed = true; if (paranetNodesAccessPolicy === PARANET_ACCESS_POLICY.PERMISSIONED) { - if (Array.isArray(cachedAssertion.public)) { - const shouldHavePrivate = cachedAssertion.public.some((triple) => + if (Array.isArray(filteredAssertion.public)) { + const shouldHavePrivate = filteredAssertion.public.some((triple) => triple.includes(`${PRIVATE_ASSERTION_PREDICATE}`), ); if (shouldHavePrivate) { - cachePassed = cachedAssertion.private?.length > 0; + cachePassed = filteredAssertion.private?.length > 0; } } else { cachePassed = false; @@ -499,7 +512,18 @@ class GetCommand extends Command { if (!cachePassed) return null; - const cachedResponseData = { assertion: cachedAssertion }; + const cachedResponseData = { assertion: filteredAssertion }; + const isValid = await this.validateResponse( + cachedResponseData, + blockchain, + contract, + knowledgeCollectionId, + knowledgeAssetId, + paranetNodesAccessPolicy, + contentType, + ); + if (!isValid) return null; + this.logger.info( `Serving asset ${ual} from pending storage cache (merkleRoot: ${latestMerkleRoot})`, ); @@ -516,6 +540,17 @@ class GetCommand extends Command { return Command.empty(); } + _filterAssertionByContentType(assertion, contentType) { + if (!contentType || contentType === 'all') return assertion; + if (contentType === 'public') { + return { public: assertion.public || [] }; + } + if (contentType === 'private') { + return { private: assertion.private || [] }; + } + return assertion; + } + async validateUAL(operationId, blockchain, contract, knowledgeCollectionId, ual) { const isUAL = this.ualService.isUAL(ual); From 1b2f12667f059d3968e0b10a64290d5365ca4454 Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 15:45:55 +0100 Subject: [PATCH 19/20] add missing pendingStorageService to publish-replication-command constructor The cacheDataset call at line 137 uses this.pendingStorageService but it was never assigned in the constructor, causing a runtime crash. Made-with: Cursor --- .../protocols/publish/sender/publish-replication-command.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/commands/protocols/publish/sender/publish-replication-command.js b/src/commands/protocols/publish/sender/publish-replication-command.js index 09cf1c488..8616cdc47 100644 --- a/src/commands/protocols/publish/sender/publish-replication-command.js +++ b/src/commands/protocols/publish/sender/publish-replication-command.js @@ -21,6 +21,7 @@ class PublishReplicationCommand extends Command { this.signatureService = ctx.signatureService; this.cryptoService = ctx.cryptoService; this.messagingService = ctx.messagingService; + this.pendingStorageService = ctx.pendingStorageService; this.errorType = ERROR_TYPE.LOCAL_STORE.LOCAL_STORE_ERROR; } From ca93851eaed1d60e396978528d48abc8300554bc Mon Sep 17 00:00:00 2001 From: Bojan Date: Thu, 26 Mar 2026 16:03:42 +0100 Subject: [PATCH 20/20] bump version to 8.2.6 Made-with: Cursor --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2fae59830..4859837f2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "8.2.5", + "version": "8.2.6", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "8.2.5", + "version": "8.2.6", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^4.0.2", diff --git a/package.json b/package.json index 36aafc45e..31c300570 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "8.2.5", + "version": "8.2.6", "description": "OTNode V8", "main": "index.js", "type": "module",