diff --git a/.codex/review-prompt.md b/.codex/review-prompt.md new file mode 100644 index 0000000000..0f92a9aa1f --- /dev/null +++ b/.codex/review-prompt.md @@ -0,0 +1,186 @@ +# PR Review Instructions + +You are a senior code reviewer for the OriginTrail DKG Engine (ot-node). Your job is to review a pull request diff and produce structured, actionable feedback as inline comments on specific changed lines. You review like a staff engineer who cares deeply about code quality, readability, and simplicity. + +## Context Files + +Read these files before reviewing: + +1. **`pr-diff.patch`** — The PR diff (generated at runtime). This is the primary input. + +You may read other files in the repository **only** to understand how code changed in the diff is called or referenced. Do not review, comment on, or mention code in files that are not part of the diff. All review comments and the summary must be strictly scoped to changes introduced by this PR's diff — nothing else. + +## Project Architecture + +- **Node.js** application (ESM modules, `.js` and `.mjs` files) +- **Awilix** dependency injection container for service management +- **libp2p** for peer-to-peer networking and message passing +- **Ethers.js / Web3.js** for multi-chain blockchain interactions (NeuroWeb, Gnosis, Base) +- **Sequelize** ORM for local SQLite database +- **Blazegraph** triple store for RDF/SPARQL knowledge graph operations +- **Pino** for structured logging +- **Command pattern** for async operations (publish, get, query) +- **BDD tests** using Cucumber.js with Gherkin feature files + +### Key Directories + +- `src/commands/` — Command implementations (publish, get, query protocols) +- `src/modules/` — Core modules (blockchain, network, repository, triple-store) +- `src/service/` — Service layer (pending-storage, operation, validation) +- `src/constants/` — System-wide constants and error definitions +- `test/bdd/` — Cucumber BDD tests (features, steps, utilities) + +## Review Philosophy + +Most PR issues in this codebase are maintainability problems — bloat, poor naming, scattered validation, hardcoded values, pattern drift. These matter a lot. + +However, review priority is always **severity-first**: + +1. **Blockers first** — correctness, security, auth, data integrity, blockchain safety. +2. **Then maintainability** — readability, simplicity, pattern conformance. + +When both exist, report blockers first. + +### Review Method + +Do three passes: + +1. **Context + risk-map pass (mandatory)** — Start from diff hunks, then read surrounding or full touched files when needed to evaluate maintainability, coupling, naming, and extraction opportunities. Use this context to assess changed behavior, not to run unrelated file-wide audits. +2. **Blockers pass** — Scan for correctness bugs, security issues, blockchain transaction safety, gas handling issues, data integrity risks, and missing tests for changed behavior. These are `🔴 Bug` comments. +3. **Maintainability pass** — Scan for code bloat, readability issues, naming problems, pattern violations, hardcoded values, and architecture drift in touched areas. These are `🟡 Issue`, `🔵 Nit`, or `💡 Suggestion` comments. + +### Comment Gate + +Before posting any comment, verify all four conditions: + +1. **Introduced by this diff** — The issue is introduced or materially worsened by the changes in this PR, not pre-existing. +2. **Materially impactful** — The issue affects correctness, security, readability, or maintainability in a meaningful way. Not a theoretical concern. +3. **Concrete fix direction** — You can suggest a specific fix or clear direction. If you can only say "this seems off" without a concrete suggestion, do not comment. +4. **Scope fit** — If the issue is mainly in pre-existing code, the PR must touch the same function/module and fixing it must directly simplify, de-risk, or de-duplicate the new/changed code. + +If any check fails, skip the comment. +Every comment must be traceable to changed behavior in this PR and anchored to a right-side line present in `pr-diff.patch`. Prefer added/modified lines; use nearby unchanged hunk lines only when necessary to explain a directly related issue. + +**Uncertainty guard:** If you are not certain an issue is real and cannot verify it from the diff and allowed context, do not label it `🔴 Bug`. Downgrade to `🟡 Issue` or `💡 Suggestion`, or skip it entirely. + +**Deduplication:** One comment per root cause. If the same pattern repeats across multiple lines, comment on the first occurrence and note "same pattern at lines X, Y, Z." Aim for a maximum of ~10 comments, highest impact first. + +## What to Review + +### Pass 1: Blockers + +#### Correctness + +- Logic errors, off-by-one, null/undefined handling, incorrect assumptions, race conditions. +- Boundary conditions — empty arrays, null inputs, zero values, maximum values. +- Error handling — swallowed errors, missing error propagation, unhelpful error messages. Do not flag missing error handling for internal code that cannot reasonably fail. +- Async/await correctness — unhandled promise rejections, missing awaits, race conditions in concurrent operations. +- Nonce management — verify blockchain nonce allocation and retry logic does not create orphan transactions or nonce gaps. + +#### Security + +- Injection risks (SQL, command, XSS) when handling user input. +- Hardcoded secrets — API keys, passwords, private keys, tokens in code. Private keys must never appear in source. +- Missing input validation at system boundaries (user input, external APIs, RPC responses). Not for internal function calls. +- Auth bypass, privilege escalation, or missing authorization checks. +- RPC endpoint exposure — verify no private/paid RPC URLs or API keys are hardcoded in committed code. + +#### Blockchain Safety + +- Gas handling — verify gas price calculations, multipliers, and buffers are reasonable and consistent across testnet/mainnet. +- Transaction retry logic — ensure retries don't waste gas, create duplicate transactions, or cause nonce conflicts. +- Wallet/key management — no hardcoded private keys, proper key isolation between environments. +- Multi-chain consistency — changes affecting one chain should be verified for impact on other supported chains (NeuroWeb, Gnosis, Base). +- BigNumber handling — verify arithmetic operations use BigNumber-safe methods, no precision loss from floating point. + +#### Tests for Changed Behavior + +- New behavior must have corresponding tests covering core functionality and error handling. +- Bug fixes must include a regression test that would have caught the original bug. +- Changed behavior must have updated tests reflecting the new expectations. +- If tests are present but brittle (testing implementation details rather than behavior), flag it. + +Missing tests for changed behavior are blockers (`🔴 Bug`) only when the change affects user-facing behavior, API contracts, or data integrity. Missing tests for internal refactors or trivial changes are `🟡 Issue`. + +### Pass 2: Maintainability + +#### Code Bloat and Unnecessary Complexity + +- **Excessive code** — More lines than necessary. Could this be done in fewer lines without sacrificing clarity? +- **Over-engineering** — Abstractions, helpers, or utilities for one-time operations. Premature generalization. +- **Dead code** — Unused variables, unreachable branches, commented-out code, leftover debug logging. +- **Duplicate code** — Same logic repeated instead of extracted. Do not suggest extraction for only 2-3 similar lines unless the repeated logic encodes a correctness invariant across multiple paths. + +#### Readability and Naming + +- **Confusing variable/function names** — Names that don't describe what the thing is or does. Generic names like `data`, `result`, `item`, `temp`, `val` when a specific name would be clearer. +- **Misleading names** — Names that suggest different behavior than what the code does. +- **Inconsistent naming** — Not following conventions in the rest of the codebase. +- **Long functions** — Functions doing too many things. If you need a comment to explain a section, it should probably be its own function. +- **Deep nesting** — More than 2-3 levels. Suggest early returns, guard clauses, or extraction. +- **Unclear control flow** — Complex conditionals that could be simplified or decomposed. + +#### Hardcoded Values and Magic Constants + +Flag only when the value is: + +- **Reused 3+ times** in touched files or the diff — should be a named constant. +- **Domain-significant** — timeout values, retry counts, gas multipliers, RPC URLs, network message timeouts. Even if used once, these belong in constants or configuration. + +Do not flag one-off numeric literals that are self-explanatory in context (e.g., `array.slice(0, 2)`, `Math.round(x * 100) / 100`). + +#### Performance (Only Obvious Issues) + +- N+1 queries — database queries inside loops. +- Blocking operations in async contexts — synchronous I/O in async code. +- Unnecessary work in hot paths — redundant allocations, repeated computations. +- Memory leaks — Maps/Sets/caches that grow unboundedly without cleanup. + +## What NOT to Review + +- Formatting or style — ESLint and Prettier handle this. +- Things that are clearly intentional design choices backed by existing patterns. +- Pre-existing issues in unchanged code outside the diff. +- Pre-existing issues in touched files when the PR does not introduce/worsen them. +- Adding documentation unless a public API is clearly undocumented. +- Repository-wide or file-wide audits not required by the changed behavior. +- Test configuration files (cucumber.js, .eslintrc) unless they introduce issues. + +## Comment Format + +Use severity prefixes: + +- `🔴 Bug:` — Correctness error, security issue, blockchain safety issue, data integrity risk. Will cause incorrect behavior. +- `🟡 Issue:` — Code quality problem that should be fixed. Bloated code, bad naming, pattern violation, missing tests. +- `🔵 Nit:` — Minor improvement, optional. +- `💡 Suggestion:` — Alternative approach worth considering. + +Be specific, be concise, explain why. One clear sentence with a concrete fix is better than a paragraph of theory. + +## Output Format + +Return raw JSON only. No markdown fences, no prose before or after the JSON object. Your output MUST be valid JSON matching the provided output schema. Example: + +```json +{ + "summary": "This PR improves blockchain error handling but introduces a potential gas waste issue in the retry loop and has leftover debug logging.", + "comments": [ + { + "path": "src/modules/blockchain/implementation/web3-service.js", + "line": 142, + "body": "🔴 Bug: Gas price is bumped on every retry including network errors, which wastes gas. Only bump for nonce conflicts and execution errors. Add a `shouldBumpGas` guard." + }, + { + "path": "src/commands/protocols/publish/sender/publish-replication-command.js", + "line": 58, + "body": "🟡 Issue: `console.log` debug statement left in production code. Use `this.logger.debug()` instead or remove it." + } + ] +} +``` + +The `line` field must refer to the line number in the new version of the file (right side of the diff), and it must be a line that actually appears in the diff hunks. Do not comment on lines outside the diff. + +## Summary + +Write a brief (2–4 sentence) overall assessment in the `summary` field covering **only** what this PR's diff changes. Do not mention code, packages, or behavior outside the diff. Lead with blockers if any exist. Mention whether the PR is clean/minimal or has code quality issues. Include one sentence on maintainability direction in touched areas (improved / neutral / worsened, and why). If the PR looks good, say so. diff --git a/.codex/review-schema.json b/.codex/review-schema.json new file mode 100644 index 0000000000..0039cffb63 --- /dev/null +++ b/.codex/review-schema.json @@ -0,0 +1,35 @@ +{ + "type": "object", + "properties": { + "summary": { + "type": "string", + "description": "Brief overall assessment of the PR (2-4 sentences)" + }, + "comments": { + "type": "array", + "description": "Inline review comments on specific changed lines", + "items": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "File path relative to repository root" + }, + "line": { + "type": "integer", + "minimum": 1, + "description": "Line number in the new version of the file (must be within the diff)" + }, + "body": { + "type": "string", + "description": "Review comment with severity prefix" + } + }, + "required": ["path", "line", "body"], + "additionalProperties": false + } + } + }, + "required": ["summary", "comments"], + "additionalProperties": false +} diff --git a/.github/workflows/codex-review.yml b/.github/workflows/codex-review.yml new file mode 100644 index 0000000000..a5be0afe16 --- /dev/null +++ b/.github/workflows/codex-review.yml @@ -0,0 +1,148 @@ +name: Codex PR Review + +on: + pull_request: + types: [opened, synchronize, reopened] + +concurrency: + group: codex-review-${{ github.event.pull_request.number }} + cancel-in-progress: true + +permissions: + contents: read + pull-requests: write + +jobs: + review: + name: Codex Review + runs-on: ubuntu-latest + timeout-minutes: 15 + # Skip fork PRs — they cannot access repository secrets + if: github.event.pull_request.head.repo.full_name == github.repository + + steps: + - name: Checkout PR merge commit + uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4 + with: + ref: refs/pull/${{ github.event.pull_request.number }}/merge + fetch-depth: 0 + + - name: Generate PR diff + run: git diff ${{ github.event.pull_request.base.sha }}...HEAD > pr-diff.patch + + - name: Allow unprivileged user namespaces for bubblewrap sandbox + run: | + if sudo sysctl -n kernel.apparmor_restrict_unprivileged_userns 2>/dev/null; then + sudo sysctl -w kernel.apparmor_restrict_unprivileged_userns=0 + fi + + - name: Run Codex review + id: codex + uses: openai/codex-action@f5c0ca71642badb34c1e66321d8d85685a0fa3dc # v1 + with: + openai-api-key: ${{ secrets.OPENAI_API_KEY }} + prompt-file: .codex/review-prompt.md + output-schema-file: .codex/review-schema.json + effort: high + sandbox: read-only + + - name: Post PR review with inline comments + uses: actions/github-script@f28e40c7f34bde8b3046d885e986cb6290c5673b # v7 + env: + REVIEW_JSON: ${{ steps.codex.outputs.final-message }} + with: + script: | + let review; + try { + review = JSON.parse(process.env.REVIEW_JSON); + } catch (e) { + console.error('Failed to parse Codex output:', e.message); + console.error('Raw output:', process.env.REVIEW_JSON?.slice(0, 500)); + await github.rest.pulls.createReview({ + owner: context.repo.owner, + repo: context.repo.repo, + pull_number: context.issue.number, + body: '⚠️ Codex review failed to produce valid JSON output. Check the [workflow logs](' + + `${process.env.GITHUB_SERVER_URL}/${context.repo.owner}/${context.repo.repo}/actions/runs/${process.env.GITHUB_RUN_ID}) for details.`, + event: 'COMMENT', + comments: [], + }); + return; + } + + // Fetch all changed files (paginated for large PRs) + const files = await github.paginate( + github.rest.pulls.listFiles, + { + owner: context.repo.owner, + repo: context.repo.repo, + pull_number: context.issue.number, + per_page: 100, + } + ); + + // Build set of valid (path:line) pairs from right-side diff hunk lines + // (added + context). This keeps comments bound to changed areas. + const validLines = new Set(); + for (const file of files) { + // Skip binary/large/truncated files with no patch + if (!file.patch) continue; + + const lines = file.patch.split('\n'); + let currentLine = 0; + for (const line of lines) { + const hunkMatch = line.match(/^@@ -\d+(?:,\d+)? \+(\d+)/); + if (hunkMatch) { + currentLine = parseInt(hunkMatch[1], 10); + continue; + } + // Added lines are valid comment targets + if (line.startsWith('+')) { + validLines.add(`${file.filename}:${currentLine}`); + currentLine++; + continue; + } + // Deleted lines don't exist in the new file + if (line.startsWith('-')) continue; + // Ignore hunk metadata lines + if (line.startsWith('\\')) continue; + // Context lines on the right side are also valid targets + validLines.add(`${file.filename}:${currentLine}`); + currentLine++; + } + } + + // Partition comments into valid (on right-side diff lines) and dropped + const comments = Array.isArray(review.comments) ? review.comments : []; + const validComments = []; + const droppedComments = []; + + for (const comment of comments) { + const key = `${comment.path}:${comment.line}`; + if (validLines.has(key)) { + validComments.push({ + path: comment.path, + line: comment.line, + body: comment.body, + side: 'RIGHT', + }); + } else { + droppedComments.push(comment); + } + } + + // Build review body from summary only. + // Intentionally do NOT publish out-of-diff comments. + let body = review.summary || 'Codex review complete.'; + + // Post the review + await github.rest.pulls.createReview({ + owner: context.repo.owner, + repo: context.repo.repo, + pull_number: context.issue.number, + body, + event: 'COMMENT', + comments: validComments, + }); + + console.log(`Review posted: ${validComments.length} inline comments, ${droppedComments.length} dropped out-of-diff comments`); diff --git a/package-lock.json b/package-lock.json index 2fae598307..4859837f2f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "8.2.5", + "version": "8.2.6", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "8.2.5", + "version": "8.2.6", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^4.0.2", diff --git a/package.json b/package.json index 36aafc45eb..31c3005709 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "8.2.5", + "version": "8.2.6", "description": "OTNode V8", "main": "index.js", "type": "module", diff --git a/src/commands/blockchain-event-listener/blockchain-event-listener-command.js b/src/commands/blockchain-event-listener/blockchain-event-listener-command.js index 9ceabcac8f..aaaa5e21ce 100644 --- a/src/commands/blockchain-event-listener/blockchain-event-listener-command.js +++ b/src/commands/blockchain-event-listener/blockchain-event-listener-command.js @@ -65,7 +65,13 @@ class BlockchainEventListenerCommand extends Command { ); if (eventsMissed) { - // TODO: Add some logic for missed events in the future + const missedFrom = (lastCheckedBlockRecord?.lastCheckedBlock ?? 0) + 1; + this.logger.warn( + `[EVENT LISTENER] Blockchain events missed on ${blockchainId}! ` + + `Gap too large: blocks ${missedFrom}–${currentBlock} ` + + `(${currentBlock - missedFrom + 1} blocks). ` + + `Publish finality for assets created during this window will not complete.`, + ); } if (newEvents.length !== 0) { diff --git a/src/commands/protocols/common/handle-protocol-message-command.js b/src/commands/protocols/common/handle-protocol-message-command.js index 6248ded96b..16dab5dfaf 100644 --- a/src/commands/protocols/common/handle-protocol-message-command.js +++ b/src/commands/protocols/common/handle-protocol-message-command.js @@ -126,13 +126,19 @@ class HandleProtocolMessageCommand extends Command { this.errorType, ); - await this.networkModuleManager.sendMessageResponse( - protocol, - remotePeerId, - NETWORK_MESSAGE_TYPES.RESPONSES.NACK, - operationId, - { errorMessage }, - ); + try { + await this.networkModuleManager.sendMessageResponse( + protocol, + remotePeerId, + NETWORK_MESSAGE_TYPES.RESPONSES.NACK, + operationId, + { errorMessage }, + ); + } catch (sendErr) { + this.logger.debug( + `Failed to send NACK to ${remotePeerId} for operation ${operationId}: ${sendErr.message}`, + ); + } this.networkModuleManager.removeCachedSession(operationId, remotePeerId); } } diff --git a/src/commands/protocols/get/sender/get-command.js b/src/commands/protocols/get/sender/get-command.js index 9fd6a0c28b..e77645841c 100644 --- a/src/commands/protocols/get/sender/get-command.js +++ b/src/commands/protocols/get/sender/get-command.js @@ -30,6 +30,7 @@ class GetCommand extends Command { this.cryptoService = ctx.cryptoService; this.messagingService = ctx.messagingService; this.tripleStoreModuleManager = ctx.tripleStoreModuleManager; + this.pendingStorageService = ctx.pendingStorageService; } async handleError(operationId, blockchain, errorMessage, errorType) { @@ -72,19 +73,67 @@ class GetCommand extends Command { OPERATION_ID_STATUS.GET.GET_VALIDATE_ASSET_START, ); - const { isValid, errorMessage } = await this.validateUAL( - operationId, - blockchain, - contract, - knowledgeCollectionId, - ual, - ); + const maxGetRetries = 3; + const getRetryDelayMs = 5_000; + let ualValidationPassed = false; + let ualValidationError = null; + + for (let attempt = 1; attempt <= maxGetRetries; attempt += 1) { + try { + // eslint-disable-next-line no-await-in-loop + const { isValid, errorMessage: valMsg } = await this.validateUAL( + operationId, + blockchain, + contract, + knowledgeCollectionId, + ual, + ); + if (isValid) { + ualValidationPassed = true; + break; + } + ualValidationError = valMsg; + } catch (err) { + ualValidationError = `UAL validation failed: ${err.message}`; + } - if (!isValid) { + if (!ualValidationPassed) { + try { + // eslint-disable-next-line no-await-in-loop + const cachedResult = await this._tryCacheFallback( + blockchain, + contract, + knowledgeCollectionId, + knowledgeAssetId, + ual, + operationId, + paranetNodesAccessPolicy, + contentType, + ); + if (cachedResult) { + return cachedResult; + } + } catch (_cacheErr) { + // cache fallback also failed, will retry + } + } + + if (attempt < maxGetRetries) { + this.logger.debug( + `Get validation/cache attempt ${attempt}/${maxGetRetries} failed for ${ual}, retrying in ${getRetryDelayMs}ms`, + ); + // eslint-disable-next-line no-await-in-loop + await new Promise((resolve) => { + setTimeout(resolve, getRetryDelayMs); + }); + } + } + + if (!ualValidationPassed) { await this.handleError( operationId, blockchain, - errorMessage, + ualValidationError, ERROR_TYPE.GET.GET_VALIDATE_ASSET_ERROR, ); return Command.empty(); @@ -262,6 +311,26 @@ class GetCommand extends Command { } this.logger.debug(`Could not find asset with UAL: ${ual} locally`); + try { + const cachedResult = await this._tryCacheFallback( + blockchain, + contract, + knowledgeCollectionId, + knowledgeAssetId, + ual, + operationId, + paranetNodesAccessPolicy, + contentType, + ); + if (cachedResult) { + return cachedResult; + } + } catch (cacheErr) { + this.logger.debug( + `Pending storage cache fallback failed for ${ual}: ${cacheErr.message}`, + ); + } + await this.operationIdService.emitChangeEvent( OPERATION_ID_STATUS.GET.GET_LOCAL_END, operationId, @@ -391,6 +460,97 @@ class GetCommand extends Command { return Command.empty(); } + async _tryCacheFallback( + blockchain, + contract, + knowledgeCollectionId, + knowledgeAssetId, + ual, + operationId, + paranetNodesAccessPolicy, + contentType, + ) { + if (knowledgeAssetId) return null; + + const latestMerkleRoot = + await this.blockchainModuleManager.getKnowledgeCollectionLatestMerkleRoot( + blockchain, + contract, + knowledgeCollectionId, + ); + if (!latestMerkleRoot) return null; + + const publishOpId = this.pendingStorageService.getOperationIdByMerkleRoot(latestMerkleRoot); + if (!publishOpId) return null; + + const cachedAssertion = await this.pendingStorageService.getCachedDataset(publishOpId); + if ( + !cachedAssertion || + (!cachedAssertion.public?.length && !cachedAssertion.private?.length) + ) { + return null; + } + + const filteredAssertion = this._filterAssertionByContentType(cachedAssertion, contentType); + if (!filteredAssertion.public?.length && !filteredAssertion.private?.length) { + return null; + } + + let cachePassed = true; + if (paranetNodesAccessPolicy === PARANET_ACCESS_POLICY.PERMISSIONED) { + if (Array.isArray(filteredAssertion.public)) { + const shouldHavePrivate = filteredAssertion.public.some((triple) => + triple.includes(`${PRIVATE_ASSERTION_PREDICATE}`), + ); + if (shouldHavePrivate) { + cachePassed = filteredAssertion.private?.length > 0; + } + } else { + cachePassed = false; + } + } + + if (!cachePassed) return null; + + const cachedResponseData = { assertion: filteredAssertion }; + const isValid = await this.validateResponse( + cachedResponseData, + blockchain, + contract, + knowledgeCollectionId, + knowledgeAssetId, + paranetNodesAccessPolicy, + contentType, + ); + if (!isValid) return null; + + this.logger.info( + `Serving asset ${ual} from pending storage cache (merkleRoot: ${latestMerkleRoot})`, + ); + await this.operationService.markOperationAsCompleted( + operationId, + blockchain, + cachedResponseData, + [ + OPERATION_ID_STATUS.GET.GET_LOCAL_END, + OPERATION_ID_STATUS.GET.GET_END, + OPERATION_ID_STATUS.COMPLETED, + ], + ); + return Command.empty(); + } + + _filterAssertionByContentType(assertion, contentType) { + if (!contentType || contentType === 'all') return assertion; + if (contentType === 'public') { + return { public: assertion.public || [] }; + } + if (contentType === 'private') { + return { private: assertion.private || [] }; + } + return assertion; + } + async validateUAL(operationId, blockchain, contract, knowledgeCollectionId, ual) { const isUAL = this.ualService.isUAL(ual); diff --git a/src/commands/protocols/publish/publish-finalization-command.js b/src/commands/protocols/publish/publish-finalization-command.js index 78db13bd01..22eba1b99b 100644 --- a/src/commands/protocols/publish/publish-finalization-command.js +++ b/src/commands/protocols/publish/publish-finalization-command.js @@ -137,14 +137,42 @@ class PublishFinalizationCommand extends Command { const node = { id: publisherPeerId, protocol: networkProtocols[0] }; const message = { ual, publishOperationId, blockchain, operationId }; - // TODO: Add retry logic maybe - const response = await this.messagingService.sendProtocolMessage( - node, - operationId, - message, - NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST, - NETWORK_MESSAGE_TIMEOUT_MILLS.FINALITY.REQUEST, - ); + + const maxFinalityAttempts = 3; + const backoffDelays = [0, 5_000, 10_000]; + let response; + let lastError; + + for (let attempt = 0; attempt < maxFinalityAttempts; attempt += 1) { + if (backoffDelays[attempt] > 0) { + // eslint-disable-next-line no-await-in-loop + await new Promise((r) => { + setTimeout(r, backoffDelays[attempt]); + }); + } + try { + // eslint-disable-next-line no-await-in-loop + response = await this.messagingService.sendProtocolMessage( + node, + operationId, + message, + NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST, + NETWORK_MESSAGE_TIMEOUT_MILLS.FINALITY.REQUEST, + ); + lastError = null; + break; + } catch (err) { + lastError = err; + this.logger.warn( + `Finality request to publisher ${publisherPeerId} failed ` + + `(attempt ${attempt + 1}/${maxFinalityAttempts}): ${err.message}`, + ); + } + } + + if (lastError) { + throw lastError; + } await this.messagingService.handleProtocolResponse( response, @@ -196,16 +224,21 @@ class PublishFinalizationCommand extends Command { return cachedData; } catch (error) { attempt += 1; - // eslint-disable-next-line no-await-in-loop - await new Promise((resolve) => { - setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA); - }); + if (attempt < MAX_RETRIES_READ_CACHED_PUBLISH_DATA) { + this.logger.debug( + `[Cache] Read attempt ${attempt}/${MAX_RETRIES_READ_CACHED_PUBLISH_DATA} ` + + `failed for publishOperationId: ${publishOperationId}, retrying in ${RETRY_DELAY_READ_CACHED_PUBLISH_DATA}ms...`, + ); + // eslint-disable-next-line no-await-in-loop + await new Promise((resolve) => { + setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA); + }); + } } } this.logger.warn( `[Cache] Exhausted retries reading cached publish data (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`, ); - // TODO: Mark this operation as failed throw new Error('Failed to read cached publish data'); } diff --git a/src/commands/protocols/publish/sender/publish-replication-command.js b/src/commands/protocols/publish/sender/publish-replication-command.js index 1568b1ddfa..0e5c700dca 100644 --- a/src/commands/protocols/publish/sender/publish-replication-command.js +++ b/src/commands/protocols/publish/sender/publish-replication-command.js @@ -1,3 +1,4 @@ +import { Semaphore } from 'async-mutex'; import { OPERATION_ID_STATUS, ERROR_TYPE, @@ -10,6 +11,8 @@ import { } from '../../../../constants/constants.js'; import Command from '../../../command.js'; +const replicationSemaphore = new Semaphore(3); + class PublishReplicationCommand extends Command { constructor(ctx) { super(ctx); @@ -21,6 +24,7 @@ class PublishReplicationCommand extends Command { this.signatureService = ctx.signatureService; this.cryptoService = ctx.cryptoService; this.messagingService = ctx.messagingService; + this.pendingStorageService = ctx.pendingStorageService; this.errorType = ERROR_TYPE.LOCAL_STORE.LOCAL_STORE_ERROR; } @@ -133,18 +137,59 @@ class PublishReplicationCommand extends Command { return Command.empty(); } const { dataset } = await this.operationIdService.getCachedOperationIdData(operationId); + + await this.pendingStorageService.cacheDataset( + operationId, + datasetRoot, + dataset, + currentPeerId, + ); + const message = { dataset: dataset.public, datasetRoot, blockchain, }; - // Run all message sending operations in parallel - await Promise.all( - shardNodes.map((node) => - this.sendAndHandleMessage(node, operationId, message, command, blockchain), - ), - ); + const replicationBatchSize = minAckResponses + 2; + + await replicationSemaphore.runExclusive(async () => { + this.logger.info( + `[REPLICATION] Starting for operationId: ${operationId}, ` + + `shard: ${shardNodes.length} nodes, batch: ${replicationBatchSize}, min ACKs: ${minAckResponses}`, + ); + + for (let i = 0; i < shardNodes.length; i += replicationBatchSize) { + if (i > 0) { + // eslint-disable-next-line no-await-in-loop + const record = await this.operationIdService.getOperationIdRecord( + operationId, + ); + if (record?.minAcksReached) { + this.logger.info( + `[REPLICATION] Minimum replication reached after ${i} nodes, ` + + `skipping remaining ${ + shardNodes.length - i + } for operationId: ${operationId}`, + ); + break; + } + } + + const batch = shardNodes.slice(i, i + replicationBatchSize); + this.logger.debug( + `Sending replication batch ${Math.floor(i / replicationBatchSize) + 1} ` + + `(${batch.length} nodes) for operationId: ${operationId}`, + ); + + // eslint-disable-next-line no-await-in-loop + await Promise.all( + batch.map((node) => + this.sendAndHandleMessage(node, operationId, message, command), + ), + ); + } + }); } catch (e) { await this.handleError(operationId, blockchain, e.message, this.errorType, true); this.operationIdService.emitChangeEvent( @@ -158,44 +203,69 @@ class PublishReplicationCommand extends Command { return Command.empty(); } - async sendAndHandleMessage(node, operationId, message, command, blockchain) { - const response = await this.messagingService.sendProtocolMessage( - node, - operationId, - message, - NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST, - NETWORK_MESSAGE_TIMEOUT_MILLS.PUBLISH.REQUEST, - ); - const responseData = response.data; - if (response.header.messageType === NETWORK_MESSAGE_TYPES.RESPONSES.ACK) { - // eslint-disable-next-line no-await-in-loop - await this.signatureService.addSignatureToStorage( - NETWORK_SIGNATURES_FOLDER, + async sendAndHandleMessage(node, operationId, message, command) { + try { + let response = await this.messagingService.sendProtocolMessage( + node, operationId, - responseData.identityId, - responseData.v, - responseData.r, - responseData.s, - responseData.vs, - ); - // eslint-disable-next-line no-await-in-loop - await this.operationService.processResponse( - command, - OPERATION_REQUEST_STATUS.COMPLETED, - responseData, + message, + NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST, + NETWORK_MESSAGE_TIMEOUT_MILLS.PUBLISH.REQUEST, ); - } else { - // eslint-disable-next-line no-await-in-loop - await this.operationService.processResponse( - command, - OPERATION_REQUEST_STATUS.FAILED, - responseData, - ); - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.FAILED, - operationId, - blockchain, + + if (response.header.messageType !== NETWORK_MESSAGE_TYPES.RESPONSES.ACK) { + const preRetryRecord = await this.operationIdService.getOperationIdRecord( + operationId, + ); + if (preRetryRecord?.minAcksReached) return; + + this.logger.info( + `[REPLICATION] Peer ${node.id} NACK for operationId: ${operationId}: ` + + `${response.data?.errorMessage || 'unknown reason'}, retrying...`, + ); + response = await this.messagingService.sendProtocolMessage( + node, + operationId, + message, + NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST, + NETWORK_MESSAGE_TIMEOUT_MILLS.PUBLISH.REQUEST, + ); + } + + const responseData = response.data; + if (response.header.messageType === NETWORK_MESSAGE_TYPES.RESPONSES.ACK) { + await this.signatureService.addSignatureToStorage( + NETWORK_SIGNATURES_FOLDER, + operationId, + responseData.identityId, + responseData.v, + responseData.r, + responseData.s, + responseData.vs, + ); + await this.operationService.processResponse( + command, + OPERATION_REQUEST_STATUS.COMPLETED, + responseData, + ); + } else { + this.logger.warn( + `[REPLICATION] Peer ${node.id} failed after retry for operationId: ${operationId}: ` + + `${responseData?.errorMessage || 'unknown reason'}`, + ); + await this.operationService.processResponse( + command, + OPERATION_REQUEST_STATUS.FAILED, + responseData, + ); + } + } catch (error) { + this.logger.warn( + `[REPLICATION] Peer ${node.id} error for operationId: ${operationId}: ${error.message}`, ); + await this.operationService.processResponse(command, OPERATION_REQUEST_STATUS.FAILED, { + errorMessage: error.message, + }); } } diff --git a/src/constants/constants.js b/src/constants/constants.js index 560bd08735..e4182cadfd 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -371,7 +371,7 @@ export const PARANET_NODES_ACCESS_POLICIES = ['OPEN', 'PERMISSIONED']; export const NETWORK_MESSAGE_TIMEOUT_MILLS = { PUBLISH: { - REQUEST: 15 * 1000, + REQUEST: 60 * 1000, }, UPDATE: { REQUEST: 60 * 1000, @@ -715,6 +715,15 @@ export const EXPECTED_TRANSACTION_ERRORS = { NONCE_TOO_LOW: 'nonce too low', REPLACEMENT_UNDERPRICED: 'replacement transaction underpriced', ALREADY_KNOWN: 'already known', + EXECUTION_FAILED: 'transaction execution fails', + FEE_TOO_LOW: 'feetoolow', + SOCKET_HANG_UP: 'socket hang up', + ECONNRESET: 'econnreset', + ECONNREFUSED: 'econnrefused', + SERVER_ERROR: 'server error', + BAD_GATEWAY: '502', + SERVICE_UNAVAILABLE: '503', + EXPECT_BLOCK_NUMBER: 'expect block number from id', }; /** @@ -1067,8 +1076,8 @@ export const LOCAL_INSERT_FOR_ASSET_SYNC_RETRY_DELAY = 1000; export const LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS = 5; export const LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY = 1000; -export const MAX_RETRIES_READ_CACHED_PUBLISH_DATA = 10; -export const RETRY_DELAY_READ_CACHED_PUBLISH_DATA = 10 * 1000; +export const MAX_RETRIES_READ_CACHED_PUBLISH_DATA = 5; +export const RETRY_DELAY_READ_CACHED_PUBLISH_DATA = 5 * 1000; export const TRIPLE_STORE_REPOSITORY = { DKG: 'dkg', diff --git a/src/controllers/http-api/v1/result-http-api-controller-v1.js b/src/controllers/http-api/v1/result-http-api-controller-v1.js index 5502f593d9..299faef34d 100644 --- a/src/controllers/http-api/v1/result-http-api-controller-v1.js +++ b/src/controllers/http-api/v1/result-http-api-controller-v1.js @@ -55,22 +55,33 @@ class ResultController extends BaseController { case 'update': { const minAcksReached = handlerRecord.minAcksReached || false; response.data = { ...response.data, minAcksReached }; - if (minAcksReached) { - const publisherNodeSignature = ( - await this.signatureService.getSignaturesFromStorage( - PUBLISHER_NODE_SIGNATURES_FOLDER, - operationId, - ) - )[0]; - const signatures = await this.signatureService.getSignaturesFromStorage( - NETWORK_SIGNATURES_FOLDER, - operationId, - ); - response.data = { - ...response.data, - publisherNodeSignature, - signatures, - }; + const shouldIncludeSignatures = + minAcksReached || + handlerRecord.status === OPERATION_ID_STATUS.COMPLETED; + if (shouldIncludeSignatures) { + try { + const publisherNodeSignature = ( + await this.signatureService.getSignaturesFromStorage( + PUBLISHER_NODE_SIGNATURES_FOLDER, + operationId, + ) + )[0]; + const signatures = + await this.signatureService.getSignaturesFromStorage( + NETWORK_SIGNATURES_FOLDER, + operationId, + ); + response.data = { + ...response.data, + minAcksReached: true, + publisherNodeSignature, + signatures, + }; + } catch (e) { + this.logger.warn( + `Failed to read signatures for operationId ${operationId}: ${e.message}`, + ); + } } break; } diff --git a/src/modules/blockchain-events/implementation/ot-ethers/ot-ethers.js b/src/modules/blockchain-events/implementation/ot-ethers/ot-ethers.js index 61bb1c5bb5..b1d5499617 100644 --- a/src/modules/blockchain-events/implementation/ot-ethers/ot-ethers.js +++ b/src/modules/blockchain-events/implementation/ot-ethers/ot-ethers.js @@ -56,6 +56,38 @@ class OtEthers extends BlockchainEventsService { return blockchainProviders[randomIndex]; } + _getShuffledProviders(blockchain) { + const blockchainProviders = this.providers[blockchain]; + if (!blockchainProviders || blockchainProviders.length === 0) { + throw new Error(`No providers available for blockchain: ${blockchain}`); + } + const shuffled = [...blockchainProviders]; + for (let i = shuffled.length - 1; i > 0; i -= 1) { + const j = Math.floor(Math.random() * (i + 1)); + [shuffled[i], shuffled[j]] = [shuffled[j], shuffled[i]]; + } + return shuffled; + } + + async _sendWithFailover(blockchain, method, params) { + const providers = this._getShuffledProviders(blockchain); + let lastError; + for (const provider of providers) { + try { + return await provider.send(method, params); + } catch (error) { + lastError = error; + this.logger.warn( + `RPC provider failed for ${method} on ${blockchain}: ${error.message}. ` + + `Trying next provider (${providers.indexOf(provider) + 1}/${ + providers.length + })...`, + ); + } + } + throw lastError; + } + async _initializeContracts() { this.contracts = {}; @@ -156,8 +188,7 @@ class OtEthers extends BlockchainEventsService { const toBlockParam = ethers.BigNumber.from(toBlock) .toHexString() .replace(/^0x0+/, '0x'); - const provider = this._getRandomProvider(blockchain); - const newLogs = await provider.send('eth_getLogs', [ + const newLogs = await this._sendWithFailover(blockchain, 'eth_getLogs', [ { address: contractAddresses, fromBlock: fromBlockParam, diff --git a/src/modules/blockchain/implementation/gnosis/gnosis-service.js b/src/modules/blockchain/implementation/gnosis/gnosis-service.js index a656746363..cc763569b2 100644 --- a/src/modules/blockchain/implementation/gnosis/gnosis-service.js +++ b/src/modules/blockchain/implementation/gnosis/gnosis-service.js @@ -44,16 +44,32 @@ class GnosisService extends Web3Service { ); return this.defaultGasPrice; } - if ( - gasPrice && - ethers.utils.parseUnits(gasPrice.toString(), 'gwei').gt(this.defaultGasPrice) - ) { + if (gasPrice && gasPrice.gt && gasPrice.gt(this.defaultGasPrice)) { return gasPrice; } return this.defaultGasPrice; } + buildTransactionGasParams(gasPrice) { + const minPriorityFee = ethers.BigNumber.from(2_000_000_000); + + let maxPriorityFeePerGas = minPriorityFee; + if (ethers.BigNumber.isBigNumber(gasPrice)) { + const derived = gasPrice.div(5); + if (derived.gt(minPriorityFee)) { + maxPriorityFeePerGas = derived; + } + } + + const maxFeePerGas = + ethers.BigNumber.isBigNumber(gasPrice) && gasPrice.gt(maxPriorityFeePerGas) + ? gasPrice + : maxPriorityFeePerGas.mul(2); + + return { maxFeePerGas, maxPriorityFeePerGas }; + } + async healthCheck() { try { const blockNumber = await this.getBlockNumber(); diff --git a/src/modules/blockchain/implementation/web3-service.js b/src/modules/blockchain/implementation/web3-service.js index 1f3157f515..fc53d0d95c 100644 --- a/src/modules/blockchain/implementation/web3-service.js +++ b/src/modules/blockchain/implementation/web3-service.js @@ -522,6 +522,10 @@ class Web3Service { } } + buildTransactionGasParams(gasPrice) { + return { gasPrice }; + } + async callContractFunction(contractInstance, functionName, args, contractName = null) { const maxNumberOfRetries = 3; const retryDelayInSec = 12; @@ -567,11 +571,35 @@ class Web3Service { let retryCount = 0; const maxRetries = 3; - try { - /* eslint-disable no-await-in-loop */ - gasLimit = await contractInstance.estimateGas[functionName](...args); - } catch (error) { - this._decodeEstimateGasError(contractInstance, functionName, error, args); + for (let estimateAttempt = 0; estimateAttempt < 3; estimateAttempt += 1) { + try { + /* eslint-disable no-await-in-loop */ + gasLimit = await contractInstance.estimateGas[functionName](...args); + break; + } catch (error) { + const errMsg = error.message?.toLowerCase() ?? ''; + const isTransient = + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.EXECUTION_FAILED.toLowerCase()) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.FEE_TOO_LOW.toLowerCase()) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.SOCKET_HANG_UP) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.ECONNRESET) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.ECONNREFUSED) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.SERVER_ERROR) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.BAD_GATEWAY) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.SERVICE_UNAVAILABLE) || + errMsg.includes(EXPECTED_TRANSACTION_ERRORS.EXPECT_BLOCK_NUMBER); + if (isTransient && estimateAttempt < 2) { + this.logger.warn( + `Gas estimation for ${functionName} failed with transient error on ${this.getBlockchainId()}, ` + + `retrying (${estimateAttempt + 1}/3): ${error.message}`, + ); + await new Promise((r) => { + setTimeout(r, 2000); + }); + continue; + } + this._decodeEstimateGasError(contractInstance, functionName, error, args); + } } gasLimit = gasLimit ?? ethers.utils.parseUnits('900', 'kwei'); @@ -590,12 +618,12 @@ class Web3Service { }${retryCount > 0 ? ` (retry ${retryCount})` : ''}`, ); + const txOverrides = this.buildTransactionGasParams(gasPrice); + txOverrides.gasLimit = gasLimit; + const tx = await contractInstance .connect(operationalWallet) - [functionName](...args, { - gasPrice, - gasLimit, - }); + [functionName](...args, txOverrides); try { result = await this.provider.waitForTransaction( @@ -608,38 +636,90 @@ class Web3Service { await this.provider.call(tx, tx.blockNumber); } } catch (error) { + if ( + error.message + .toLowerCase() + .includes(EXPECTED_TRANSACTION_ERRORS.TIMEOUT_EXCEEDED.toLowerCase()) + ) { + const existingReceipt = await this.provider.getTransactionReceipt(tx.hash); + if (existingReceipt) { + this.logger.info( + `Transaction ${functionName} (${tx.hash}) confirmed despite timeout. Block: ${existingReceipt.blockNumber}`, + ); + if (existingReceipt.status === 0) { + await this.provider.call(tx, existingReceipt.blockNumber); + } + return existingReceipt; + } + throw error; + } this._decodeWaitForTxError(contractInstance, functionName, error, args); } return result; } catch (error) { const errorMessage = error.message.toLowerCase(); - // Check for nonce-related errors - if ( + const isNonceError = errorMessage.includes( EXPECTED_TRANSACTION_ERRORS.NONCE_TOO_LOW.toLowerCase(), ) || errorMessage.includes( EXPECTED_TRANSACTION_ERRORS.REPLACEMENT_UNDERPRICED.toLowerCase(), ) || - errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.ALREADY_KNOWN.toLowerCase()) - ) { + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.ALREADY_KNOWN.toLowerCase()); + + const isTimeoutError = errorMessage.includes( + EXPECTED_TRANSACTION_ERRORS.TIMEOUT_EXCEEDED.toLowerCase(), + ); + + const isExecutionError = + errorMessage.includes( + EXPECTED_TRANSACTION_ERRORS.EXECUTION_FAILED.toLowerCase(), + ) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.FEE_TOO_LOW.toLowerCase()); + + const isNetworkError = + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.SOCKET_HANG_UP) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.ECONNRESET) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.ECONNREFUSED) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.SERVER_ERROR) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.BAD_GATEWAY) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.SERVICE_UNAVAILABLE) || + errorMessage.includes(EXPECTED_TRANSACTION_ERRORS.EXPECT_BLOCK_NUMBER); + + if (isNonceError || isTimeoutError || isExecutionError || isNetworkError) { retryCount += 1; if (retryCount < maxRetries) { - // Increase gas price by 20% for nonce errors - gasPrice = Math.ceil(gasPrice * 1.2); + const shouldBumpGas = isNonceError || isExecutionError; + if (shouldBumpGas) { + gasPrice = ethers.BigNumber.isBigNumber(gasPrice) + ? gasPrice.mul(120).div(100) + : Math.ceil(gasPrice * 1.2); + } + let errorType = 'Nonce'; + if (isTimeoutError) errorType = 'Timeout'; + else if (isExecutionError) errorType = 'Execution/fee'; + else if (isNetworkError) errorType = 'Network'; this.logger.warn( - `Nonce error detected for ${functionName}. Retrying with increased gas price: ${gasPrice} (retry ${retryCount}/${maxRetries})`, + `${errorType} error detected for ${functionName} on ${this.getBlockchainId()}. ` + + `Retrying ${ + shouldBumpGas + ? `with increased gas price: ${gasPrice}` + : 'with same gas price' + } (retry ${retryCount}/${maxRetries})`, ); continue; - } else { - this.logger.error( - `Max retries (${maxRetries}) reached for nonce error in ${functionName}. Final gas price: ${gasPrice}`, - ); } + let errorType = 'nonce'; + if (isTimeoutError) errorType = 'timeout'; + else if (isExecutionError) errorType = 'execution/fee'; + else if (isNetworkError) errorType = 'network'; + this.logger.error( + `Max retries (${maxRetries}) reached for ${errorType} error in ${functionName} on ${this.getBlockchainId()}. ` + + `Final gas price: ${gasPrice}`, + ); } - // If it's not a nonce error or we've exhausted retries, re-throw the error throw error; } } diff --git a/src/modules/repository/implementation/sequelize/repositories/finality-status-repository.js b/src/modules/repository/implementation/sequelize/repositories/finality-status-repository.js index 53e10639d1..b8f68c9756 100644 --- a/src/modules/repository/implementation/sequelize/repositories/finality-status-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/finality-status-repository.js @@ -14,6 +14,15 @@ class FinalityStatusRepository { async saveFinalityAck(operationId, ual, peerId, options) { return this.model.upsert({ operationId, ual, peerId }, options); } + + async getPublishOperationIdByUal(ual, options) { + const record = await this.model.findOne({ + where: { ual }, + attributes: ['operationId'], + ...options, + }); + return record?.operationId ?? null; + } } export default FinalityStatusRepository; diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index 36f6e66268..69fbfb34f3 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -467,6 +467,10 @@ class RepositoryModuleManager extends BaseModuleManager { return this.getRepository('finality_status').getFinalityAcksCount(ual, options); } + async getPublishOperationIdByUal(ual, options = {}) { + return this.getRepository('finality_status').getPublishOperationIdByUal(ual, options); + } + async getLatestRandomSamplingChallengeRecordForBlockchainId(blockchainId, limit = 1) { return this.getRepository( 'random_sampling_challenge', diff --git a/src/service/ask-service.js b/src/service/ask-service.js index 450ca76933..d68396bd04 100644 --- a/src/service/ask-service.js +++ b/src/service/ask-service.js @@ -1,4 +1,3 @@ -import { Mutex } from 'async-mutex'; import OperationService from './operation-service.js'; import { OPERATION_ID_STATUS, @@ -21,7 +20,6 @@ class AskService extends OperationService { OPERATION_ID_STATUS.ASK.ASK_END, OPERATION_ID_STATUS.COMPLETED, ]; - this.operationMutex = new Mutex(); } async processResponse(command, responseStatus, responseData) { diff --git a/src/service/batch-get-service.js b/src/service/batch-get-service.js index 3ecfedf53a..4f025ddfbd 100644 --- a/src/service/batch-get-service.js +++ b/src/service/batch-get-service.js @@ -1,4 +1,3 @@ -import { Mutex } from 'async-mutex'; import OperationService from './operation-service.js'; import { OPERATION_ID_STATUS, @@ -19,7 +18,6 @@ class BatchGetService extends OperationService { OPERATION_ID_STATUS.BATCH_GET.BATCH_GET_END, OPERATION_ID_STATUS.COMPLETED, ]; - this.operationMutex = new Mutex(); } } diff --git a/src/service/finality-service.js b/src/service/finality-service.js index 637d413195..7ad0b7072a 100644 --- a/src/service/finality-service.js +++ b/src/service/finality-service.js @@ -1,4 +1,3 @@ -import { Mutex } from 'async-mutex'; import OperationService from './operation-service.js'; import { OPERATION_ID_STATUS, @@ -26,7 +25,6 @@ class FinalityService extends OperationService { this.repositoryModuleManager = ctx.repositoryModuleManager; this.blockchainModuleManager = ctx.blockchainModuleManager; this.paranetService = ctx.paranetService; - this.operationMutex = new Mutex(); } async processResponse(operationId, blockchain, responseStatus, responseData) { diff --git a/src/service/get-service.js b/src/service/get-service.js index b70dbd951a..17e6327455 100644 --- a/src/service/get-service.js +++ b/src/service/get-service.js @@ -1,4 +1,3 @@ -import { Mutex } from 'async-mutex'; import OperationService from './operation-service.js'; import { OPERATION_ID_STATUS, @@ -22,7 +21,6 @@ class GetService extends OperationService { OPERATION_ID_STATUS.GET.GET_END, OPERATION_ID_STATUS.COMPLETED, ]; - this.operationMutex = new Mutex(); } async processResponse(command, responseStatus, responseData) { diff --git a/src/service/operation-service.js b/src/service/operation-service.js index 10d84b2f48..2ff9cde8d3 100644 --- a/src/service/operation-service.js +++ b/src/service/operation-service.js @@ -1,15 +1,51 @@ +import { Mutex } from 'async-mutex'; import { OPERATION_ID_STATUS, OPERATION_REQUEST_STATUS, OPERATION_STATUS, } from '../constants/constants.js'; +const MUTEX_TTL_MS = 5 * 60 * 1000; +const MUTEX_SWEEP_INTERVAL_MS = 5 * 60 * 1000; + class OperationService { constructor(ctx) { this.logger = ctx.logger; this.repositoryModuleManager = ctx.repositoryModuleManager; this.operationIdService = ctx.operationIdService; this.commandExecutor = ctx.commandExecutor; + this._operationMutexes = new Map(); + this._terminalOperations = new Map(); + + this._sweepInterval = setInterval(() => this._sweepStaleMutexes(), MUTEX_SWEEP_INTERVAL_MS); + if (this._sweepInterval.unref) { + this._sweepInterval.unref(); + } + } + + _getOperationMutex(operationId) { + if (!this._operationMutexes.has(operationId)) { + this._operationMutexes.set(operationId, new Mutex()); + } + return this._operationMutexes.get(operationId); + } + + _markOperationTerminal(operationId) { + this._terminalOperations.set(operationId, Date.now()); + } + + _isOperationTerminal(operationId) { + return this._terminalOperations.has(operationId); + } + + _sweepStaleMutexes() { + const now = Date.now(); + for (const [operationId, terminatedAt] of this._terminalOperations) { + if (now - terminatedAt >= MUTEX_TTL_MS) { + this._operationMutexes.delete(operationId); + this._terminalOperations.delete(operationId); + } + } } getOperationName() { @@ -28,9 +64,14 @@ class OperationService { } async getResponsesStatuses(responseStatus, errorMessage, operationId) { - let responses = 0; + let responses = []; const self = this; - await this.operationMutex.runExclusive(async () => { + const mutex = this._getOperationMutex(operationId); + await mutex.runExclusive(async () => { + if (self._isOperationTerminal(operationId)) { + self.logger.debug(`Skipping late response for terminal operation ${operationId}`); + return; + } await self.repositoryModuleManager.createOperationResponseRecord( responseStatus, this.operationName, @@ -44,10 +85,9 @@ class OperationService { }); const operationIdStatuses = {}; - for (const response of responses) { - if (!operationIdStatuses[operationId]) - operationIdStatuses[operationId] = { failedNumber: 0, completedNumber: 0 }; + operationIdStatuses[operationId] = { failedNumber: 0, completedNumber: 0 }; + for (const response of responses) { if (response.status === OPERATION_REQUEST_STATUS.FAILED) { operationIdStatuses[operationId].failedNumber += 1; } else { @@ -65,6 +105,7 @@ class OperationService { endStatuses, options = {}, ) { + this._markOperationTerminal(operationId); const { reuseExistingCache = false } = options; this.logger.info(`Finalizing ${this.operationName} for operationId: ${operationId}`); @@ -98,6 +139,7 @@ class OperationService { } async markOperationAsFailed(operationId, blockchain, message, errorType) { + this._markOperationTerminal(operationId); this.logger.info(`${this.operationName} for operationId: ${operationId} failed.`); await this.operationIdService.removeOperationIdCache(operationId); diff --git a/src/service/pending-storage-service.js b/src/service/pending-storage-service.js index f049ad927a..74e2621af7 100644 --- a/src/service/pending-storage-service.js +++ b/src/service/pending-storage-service.js @@ -10,6 +10,7 @@ class PendingStorageService { this.fileService = ctx.fileService; this.repositoryModuleManager = ctx.repositoryModuleManager; // this is not used this.tripleStoreService = ctx.tripleStoreService; // this is not used + this._merkleRootIndex = new Map(); } async cacheDataset(operationId, datasetRoot, dataset, remotePeerId) { @@ -17,6 +18,8 @@ class PendingStorageService { `Caching ${datasetRoot} dataset root, operation id: ${operationId} in file in pending storage`, ); + this._merkleRootIndex.set(datasetRoot, operationId); + await this.fileService.writeContentsToFile( this.fileService.getPendingStorageCachePath(), operationId, @@ -28,6 +31,10 @@ class PendingStorageService { ); } + getOperationIdByMerkleRoot(merkleRoot) { + return this._merkleRootIndex.get(merkleRoot) ?? null; + } + async getCachedDataset(operationId) { this.logger.debug(`Retrieving cached dataset for ${operationId} from pending storage`); @@ -94,6 +101,7 @@ class PendingStorageService { const createdDate = fileStats.mtime; if (createdDate.getTime() + expirationTimeMillis < now) { + this._removeMerkleRootIndexEntry(file); await this.fileService.removeFile(filePath); this.logger.debug(`Deleted expired file: ${filePath}`); return true; @@ -155,6 +163,8 @@ class PendingStorageService { `Removing cached assertion for ual: ${ual} operation id: ${operationId} from file in ${repository} pending storage`, ); + this._removeMerkleRootIndexEntry(operationId); + const pendingAssertionPath = await this.fileService.getPendingStorageDocumentPath( operationId, ); @@ -177,6 +187,15 @@ class PendingStorageService { } } + _removeMerkleRootIndexEntry(operationId) { + for (const [root, opId] of this._merkleRootIndex) { + if (opId === operationId) { + this._merkleRootIndex.delete(root); + break; + } + } + } + async getPendingState(operationId) { return this.fileService.getPendingStorageLatestDocument(operationId); } diff --git a/src/service/publish-service.js b/src/service/publish-service.js index 1e6046240d..bc2ab28921 100644 --- a/src/service/publish-service.js +++ b/src/service/publish-service.js @@ -1,4 +1,3 @@ -import { Mutex } from 'async-mutex'; import OperationService from './operation-service.js'; import { @@ -23,7 +22,6 @@ class PublishService extends OperationService { OPERATION_ID_STATUS.PUBLISH.PUBLISH_END, OPERATION_ID_STATUS.COMPLETED, ]; - this.operationMutex = new Mutex(); } async processResponse(command, responseStatus, responseData, errorMessage = null) { @@ -79,6 +77,7 @@ class PublishService extends OperationService { `[PUBLISH] Minimum replication reached for operationId: ${operationId}, ` + `datasetRoot: ${datasetRoot}, completed: ${completedNumber}/${minAckResponses}`, ); + await this.repositoryModuleManager.updateMinAcksReached(operationId, true); const cachedData = (await this.operationIdService.getCachedOperationIdData(operationId)) || null; await this.markOperationAsCompleted( @@ -88,7 +87,6 @@ class PublishService extends OperationService { this.completedStatuses, { reuseExistingCache: true }, ); - await this.repositoryModuleManager.updateMinAcksReached(operationId, true); this.logResponsesSummary(completedNumber, failedNumber); } // 2.2 Otherwise, mark as failed diff --git a/src/service/update-service.js b/src/service/update-service.js index 0ee39c6e1d..55c0f1af49 100644 --- a/src/service/update-service.js +++ b/src/service/update-service.js @@ -1,4 +1,3 @@ -import { Mutex } from 'async-mutex'; import OperationService from './operation-service.js'; import { @@ -22,7 +21,6 @@ class UpdateService extends OperationService { OPERATION_ID_STATUS.UPDATE.UPDATE_END, OPERATION_ID_STATUS.COMPLETED, ]; - this.operationMutex = new Mutex(); } async processResponse(command, responseStatus, responseData, errorMessage = null) {