feat: Cognitive Load Balancer - intelligent task distribution (ORCH-6)#571
feat: Cognitive Load Balancer - intelligent task distribution (ORCH-6)#571nikolasdehor wants to merge 5 commits intoSynkraAI:mainfrom
Conversation
|
@nikolasdehor is attempting to deploy a commit to the Pedro Valério Lopez's projects Team on Vercel. A member of the Team first needs to authorize it. |
WalkthroughAdds two new orchestration modules—CognitiveLoadBalancer and SwarmIntelligence—with event-driven task/agent orchestration, affinity-based assignment, throttling/rebalancing, metrics persistence, plus comprehensive test suites and manifest/registry updates listing the new modules. Changes
Sequence DiagramsequenceDiagram
participant Client as Client
participant LB as LoadBalancer
participant Registry as AgentRegistry
participant Queue as Queue
participant Agent as Agent
participant Metrics as Metrics
Client->>LB: submitTask(task)
activate LB
LB->>Registry: getOptimalAgent(task)
activate Registry
Registry->>Registry: calculateAffinityScore(task,agents)
Registry-->>LB: optimalAgent
deactivate Registry
alt Agent Available
LB->>Agent: assignTask(task)
activate Agent
Agent->>Agent: incrementLoad()
Agent-->>LB: taskAssigned
deactivate Agent
LB->>Metrics: recordAssignment(agent,task)
LB-->>Client: taskAssigned
else Overloaded / Overflow
LB->>Queue: handleOverflow(task)
LB->>LB: applyThrottlePolicy()
alt queue-when-full
Queue->>Queue: enqueue(task)
LB-->>Client: taskQueued
else reject-when-full
LB-->>Client: taskRejected
else spillover
LB->>Registry: getLeastLoadedAgent()
LB->>Agent: forceAssign(task)
end
end
deactivate LB
Agent->>LB: completeTask(taskId)
activate LB
LB->>Agent: releaseCapacity()
LB->>Metrics: updateMetrics(success)
LB->>Queue: processQueue()
LB->>LB: rebalance()
deactivate LB
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (2)
tests/core/orchestration/cognitive-load-balancer.test.js (1)
1030-1071: Add one export smoke test for the.aios-corecompatibility path.The PR promises a backward-compatible copy, but this suite only exercises
.aiox-core. A single assertion against the compatibility path would catch drift immediately.As per coding guidelines,
.aiox-core/core/**: Ensure backwards compatibility — core modules are consumed by all agents. Verify test coverage exists for new/modified functions.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/core/orchestration/cognitive-load-balancer.test.js` around lines 1030 - 1071, Add a smoke test that imports the backward-compatibility entry (the ".aios-core" compatibility path) and asserts it exposes the same core exports as the main module; specifically require the compatibility path and check that CognitiveLoadBalancer (and that compatibility.CognitiveLoadBalancer === CognitiveLoadBalancer), AgentStatus, TaskStatus, TaskPriority, ThrottlePolicy, AFFINITY_WEIGHTS and OVERLOAD_THRESHOLD are defined/equal to the expected values used in the existing tests so drift is caught early..aios-core/core/orchestration/cognitive-load-balancer.js (1)
1-934: Prefer a thin compatibility wrapper here.Keeping a second 900-line implementation means every scheduler bugfix now has to land twice. A wrapper around the canonical
.aiox-coremodule preserves backward compatibility without the drift risk.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.aios-core/core/orchestration/cognitive-load-balancer.js around lines 1 - 934, This file duplicates a large implementation; replace it with a thin compatibility wrapper that imports the canonical implementation and re-exports the same symbols to avoid duplication and drift: require the canonical module (the one in .aiox-core), grab and re-export CognitiveLoadBalancer, AgentStatus, TaskStatus, TaskPriority, ThrottlePolicy, AFFINITY_WEIGHTS and OVERLOAD_THRESHOLD so existing consumers using CognitiveLoadBalancer, module.exports.CognitiveLoadBalancer, and the named exports continue to work; keep the file as a small shim that forwards constructor/class and constants and removes the 900-line implementation, preserving emitted event/API behavior by delegating to the canonical class (e.g., module.exports = canonical.CognitiveLoadBalancer; module.exports.CognitiveLoadBalancer = canonical.CognitiveLoadBalancer; module.exports.AgentStatus = canonical.AgentStatus; etc.).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.aiox-core/core/orchestration/cognitive-load-balancer.js:
- Around line 312-338: Prevent state transitions on terminal or
already-processed tasks by adding checks at the start of assignTask,
completeTask, and failTask to verify the task's current TaskStatus (via
this.tasks.get(taskId).status) and bail with an error or no-op if the task is in
a terminal state (e.g., TaskStatus.COMPLETED or TaskStatus.FAILED) or already
assigned/processed; only mutate metrics like this.activeTasks and call helpers
(_assignTaskToAgent, _removeTaskFromAgent, _completeTaskMetrics,
_failTaskMetrics or similar) after passing these checks to avoid double-counting
and keep the task.record.status and activeTasks in sync.
- Around line 210-214: When registering a new agent in the block that calls
createAgentProfile and this.agents.set, invoke the queue processor so waiting
tasks get scheduled immediately; add a call to this._processQueue() right after
this.emit('agent:registered', { agentId, profile: agentProfile }) and before
return agentProfile. If _processQueue is async, await this._processQueue();
otherwise call it directly; ensure you reference the existing methods
createAgentProfile, this.agents.set, the 'agent:registered' emit, and
_processQueue so the queue drains as soon as capacity registers.
- Around line 96-102: The createAgentProfile and other public API entry points
must validate external inputs before normalizing them: ensure agentId is a
non-empty string, overrides is an object, and individual overrides (maxLoad,
specialties, processingSpeed, throttlePolicy, complexity) are of expected
types/values (e.g., maxLoad and processingSpeed are finite numbers, specialties
is an array, throttlePolicy and complexity are limited to allowed enum values)
and fall back to DEFAULT_* only after validation; add explicit guards in
createAgentProfile and the public methods that call it (e.g., registerAgent and
any complexity/throttle setters referenced around lines 117-123 and 163-169) to
coerce or reject invalid inputs and prevent NaN/invalid modes by normalizing or
throwing a clear error.
- Around line 270-272: When adding a new task (created by createTask) do not
overwrite an existing entry in this.tasks: check this.tasks.has(task.id) and
reject (throw or return an error/validation failure) if the ID already exists so
the original task object remains the canonical instance referenced by queue and
activeTasks; only call this.tasks.set(task.id, task) and increment
this.metrics.totalSubmitted when the ID is absent. Ensure the rejection path
does not change metrics or mutate queue/activeTasks and surfaces a clear
validation error to callers.
- Around line 378-379: completeTask() and failTask() call the async
_persistMetrics() without awaiting it, causing concurrent persistence races and
an empty catch block silently swallowing errors; fix by ensuring
_persistMetrics() calls are awaited and serialized (e.g., add a simple in-memory
mutex or promise-queue around _persistMetrics()) so writes cannot interleave,
remove/replace the empty catch block around the persistence call and add robust
error handling that logs contextual information (include the function name, task
id/snapshot id and the error) or rethrows as appropriate; update references in
completeTask(), failTask(), and the _persistMetrics() implementation to use the
new serialization primitive and logging so persistence failures are visible.
- Around line 737-744: Before calling this._updateAgentStatus(agent) capture the
agent's prior status (e.g., const prevStatus = agent.status), then call
this._updateAgentStatus(agent) and compare prevStatus to agent.status; only call
this.emit('agent:overloaded', { agentId: agent.id, load: loadPct }) when
agent.status has transitioned to OVERLOADED (and compute loadPct as you already
do), and only call this.emit('agent:available', { agentId: agent.id }) when
agent.status has transitioned to AVAILABLE. Apply this change in every place
that currently calls _updateAgentStatus(agent) followed by immediate emits (the
three spots around the existing overload/available emit logic) so events are
emitted only on actual status transitions.
In `@tests/core/orchestration/cognitive-load-balancer.test.js`:
- Line 21: Replace the brittle deep-relative require for CognitiveLoadBalancer
in tests/core/orchestration/cognitive-load-balancer.test.js with the
repository's absolute import path used elsewhere (e.g.,
require('core/orchestration/cognitive-load-balancer') or the project's
configured module-alias equivalent) so the test imports CognitiveLoadBalancer by
its absolute module name instead of
'../../../.aiox-core/core/orchestration/cognitive-load-balancer'; update the
require call referencing the CognitiveLoadBalancer symbol accordingly and run
the test to ensure module resolution works with the repo's absolute import
configuration.
---
Nitpick comments:
In @.aios-core/core/orchestration/cognitive-load-balancer.js:
- Around line 1-934: This file duplicates a large implementation; replace it
with a thin compatibility wrapper that imports the canonical implementation and
re-exports the same symbols to avoid duplication and drift: require the
canonical module (the one in .aiox-core), grab and re-export
CognitiveLoadBalancer, AgentStatus, TaskStatus, TaskPriority, ThrottlePolicy,
AFFINITY_WEIGHTS and OVERLOAD_THRESHOLD so existing consumers using
CognitiveLoadBalancer, module.exports.CognitiveLoadBalancer, and the named
exports continue to work; keep the file as a small shim that forwards
constructor/class and constants and removes the 900-line implementation,
preserving emitted event/API behavior by delegating to the canonical class
(e.g., module.exports = canonical.CognitiveLoadBalancer;
module.exports.CognitiveLoadBalancer = canonical.CognitiveLoadBalancer;
module.exports.AgentStatus = canonical.AgentStatus; etc.).
In `@tests/core/orchestration/cognitive-load-balancer.test.js`:
- Around line 1030-1071: Add a smoke test that imports the
backward-compatibility entry (the ".aios-core" compatibility path) and asserts
it exposes the same core exports as the main module; specifically require the
compatibility path and check that CognitiveLoadBalancer (and that
compatibility.CognitiveLoadBalancer === CognitiveLoadBalancer), AgentStatus,
TaskStatus, TaskPriority, ThrottlePolicy, AFFINITY_WEIGHTS and
OVERLOAD_THRESHOLD are defined/equal to the expected values used in the existing
tests so drift is caught early.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: b88b9406-79c6-422a-a849-d2ca1832abd7
📒 Files selected for processing (4)
.aios-core/core/orchestration/cognitive-load-balancer.js.aiox-core/core/orchestration/cognitive-load-balancer.js.aiox-core/install-manifest.yamltests/core/orchestration/cognitive-load-balancer.test.js
| const fs = require('fs'); | ||
| const os = require('os'); | ||
|
|
||
| const CognitiveLoadBalancer = require('../../../.aiox-core/core/orchestration/cognitive-load-balancer'); |
There was a problem hiding this comment.
Use the repository's absolute import style in this test.
This deep relative hop is brittle and violates the JS import convention used by the repo.
As per coding guidelines, **/*.{js,jsx,ts,tsx}: Use absolute imports instead of relative imports in all code.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/core/orchestration/cognitive-load-balancer.test.js` at line 21, Replace
the brittle deep-relative require for CognitiveLoadBalancer in
tests/core/orchestration/cognitive-load-balancer.test.js with the repository's
absolute import path used elsewhere (e.g.,
require('core/orchestration/cognitive-load-balancer') or the project's
configured module-alias equivalent) so the test imports CognitiveLoadBalancer by
its absolute module name instead of
'../../../.aiox-core/core/orchestration/cognitive-load-balancer'; update the
require call referencing the CognitiveLoadBalancer symbol accordingly and run
the test to ensure module resolution works with the repo's absolute import
configuration.
There was a problem hiding this comment.
Actionable comments posted: 9
♻️ Duplicate comments (1)
.aiox-core/core/orchestration/cognitive-load-balancer.js (1)
960-972:⚠️ Potential issue | 🟠 MajorSerialize
_persistMetrics()itself, not just the callers.Awaiting the call sites helps, but parallel
completeTask()/failTask()invocations can still enter_persistMetrics()together and race on the same JSON file. A small per-instance promise chain or mutex here would keep snapshots ordered.As per coding guidelines,
.aiox-core/core/**: Validate async/await patterns and ensure no unhandled promise rejections.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.aiox-core/core/orchestration/cognitive-load-balancer.js around lines 960 - 972, Concurrent calls to _persistMetrics can race writing the same JSON file; serialize _persistMetrics per instance by introducing a per-instance promise chain or mutex (e.g. this._persistPromise) and chain the write work onto it so each invocation waits for the previous write to finish; in _persistMetrics (and where callers like completeTask/failTask call it) set this._persistPromise = (this._persistPromise || Promise.resolve()).then(() => { /* perform mkdir/write and return result*/ }).catch(err => { /* log and swallow to prevent unhandled rejections */ }); ensure the function returns the chained promise so callers can await completion and errors are caught and logged with context.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.aios-core/core/orchestration/cognitive-load-balancer.js:
- Around line 1-987: Replace this duplicated full implementation with a thin
re-export: delete the implementation body in cognitive-load-balancer.js and
require/import the canonical module (the single source of truth) and re-export
its default and named exports (CognitiveLoadBalancer, AgentStatus, TaskStatus,
TaskPriority, ThrottlePolicy, AFFINITY_WEIGHTS, OVERLOAD_THRESHOLD, etc.) so
callers using either entrypoint get the exact same module object; ensure the
module.exports shape matches the current exports (default export and the named
properties) and do not keep any duplicate logic or helper functions in this
file.
In @.aiox-core/core/orchestration/cognitive-load-balancer.js:
- Around line 216-223: The current registration unconditionally overwrites
runtime state by calling createAgentProfile and this.agents.set(agentId,
agentProfile); change this so duplicate registrations either get rejected or
merge into the existing runtime record: check this.agents.has(agentId) inside
the registration flow, and if an entry exists merge incoming profile fields into
the existing agent object rather than replacing it (preserve existing
activeTasks, currentLoad, and any counter properties), or alternatively
throw/emit an error to reject duplicate registrations; after merging or
accepting, still emit('agent:registered', { agentId, profile: existingAgent })
and call this._processQueue() so queued tasks are handled with the preserved
runtime state.
- Around line 873-887: The SPILLOVER case and default branch push task.id into
this.queue unconditionally; change both to enforce the queue cap by checking
this.queue.length >= this.maxQueueSize before pushing. If the queue is not full,
push and return the existing queued response; if it is full, do not push and
return a rejection/overflow response (e.g., { taskId: task.id, assignedTo: null,
status: TaskStatus.REJECTED } or introduce a suitable TaskStatus constant) so
callers know the task was refused. Update the branches that call
this.queue.push(task.id) (referenced symbols: ThrottlePolicy.SPILLOVER,
this.queue, this.maxQueueSize, TaskStatus, _assignTaskToAgent,
_findLeastLoadedAgent) accordingly.
- Around line 778-780: The current _assignTaskToAgent() unconditionally
overwrites task.startedAt when assigning, which resets timers for in-flight
tasks; change _assignTaskToAgent (used by rebalance() and manual reassignment
flows) to only set task.startedAt if it is not already defined (e.g., if
task.startedAt is null/undefined), leaving existing startedAt values intact when
moving an existing task and only initializing startedAt for brand-new
assignments.
In @.aiox-core/core/orchestration/swarm-intelligence.js:
- Around line 880-891: The _saveToDisk method writes JSON directly to
this._persistPath which can leave the file truncated and cause loadFromDisk to
read partial JSON; change _saveToDisk to write the JSON to a temporary file in
the same directory (e.g., `${this._persistPath}.tmp.${pid || uniqueId}`),
fs.writeFile the temp file fully, then fs.rename the temp into this._persistPath
to atomically replace it; ensure the temp file is written with the same encoding
and handle/propagate errors so the temp file won’t be left as the active
snapshot.
In @.aiox-core/data/entity-registry.yaml:
- Around line 3-4: Do not hand-edit .aiox-core/data/entity-registry.yaml;
instead run the project’s entity-registry generator (the tool that scans
.aiox-core/core/**/*.js) to regenerate the registry so the new file
.aiox-core/core/orchestration/cognitive-load-balancer.js is discovered and the
manifest/hash/entityCount are updated; ensure the regenerated registry includes
both orchestration modules and commit the generated artifact rather than a
manual change to entity-registry.yaml.
- Line 12785: The purpose field currently contains a template fragment
("proposal.description,") for the swarm-intelligence module; replace that
placeholder with the module's actual descriptive text (a plain YAML string) —
remove the trailing comma and template token, and populate purpose with a
concise free-text description sourced from the module's manifest/README (e.g., a
one- or two-sentence summary of swarm-intelligence functionality) so registry
discovery and matching work correctly.
In `@tests/core/orchestration/swarm-intelligence.test.js`:
- Line 11: The test imports SwarmIntelligence using a brittle deep relative
path; replace the require call that assigns SwarmIntelligence (currently
require('../../../.aiox-core/core/orchestration/swarm-intelligence')) with the
repo's canonical absolute module import (e.g.,
require('aiox-core/core/orchestration/swarm-intelligence')) so tests follow the
project's absolute-import convention and avoid deep relative hops.
- Around line 945-955: The test is racing because it calls _saveToDisk() while
earlier calls scheduled via _persistAsync() may still be pending; before calling
_saveToDisk() ensure the queued persistence chain completes by awaiting the
internal persistence promise (e.g., if persisted._persistChain exists await
persisted._persistChain; otherwise invoke and await persisted._persistAsync()).
In short, after createSwarm/joinSwarm/shareKnowledge, wait for the pending
persistence chain (await persisted._persistChain || await
persisted._persistAsync()) before calling _saveToDisk().
---
Duplicate comments:
In @.aiox-core/core/orchestration/cognitive-load-balancer.js:
- Around line 960-972: Concurrent calls to _persistMetrics can race writing the
same JSON file; serialize _persistMetrics per instance by introducing a
per-instance promise chain or mutex (e.g. this._persistPromise) and chain the
write work onto it so each invocation waits for the previous write to finish; in
_persistMetrics (and where callers like completeTask/failTask call it) set
this._persistPromise = (this._persistPromise || Promise.resolve()).then(() => {
/* perform mkdir/write and return result*/ }).catch(err => { /* log and swallow
to prevent unhandled rejections */ }); ensure the function returns the chained
promise so callers can await completion and errors are caught and logged with
context.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 9e6feb97-8035-46d0-a989-0ec49f6e5782
📒 Files selected for processing (8)
.aios-core/core/orchestration/cognitive-load-balancer.js.aios-core/core/orchestration/swarm-intelligence.js.aiox-core/core/orchestration/cognitive-load-balancer.js.aiox-core/core/orchestration/swarm-intelligence.js.aiox-core/data/entity-registry.yaml.aiox-core/install-manifest.yamltests/core/orchestration/cognitive-load-balancer.test.jstests/core/orchestration/swarm-intelligence.test.js
✅ Files skipped from review due to trivial changes (1)
- .aios-core/core/orchestration/swarm-intelligence.js
| /** | ||
| * Cognitive Load Balancer | ||
| * Story ORCH-6 - Intelligent task distribution based on agent cognitive capacity | ||
| * @module aiox-core/orchestration/cognitive-load-balancer | ||
| * @version 1.0.0 | ||
| */ | ||
|
|
||
| 'use strict'; | ||
|
|
||
| const fs = require('fs').promises; | ||
| const path = require('path'); | ||
| const EventEmitter = require('events'); | ||
|
|
||
| // ═══════════════════════════════════════════════════════════════════════════════════ | ||
| // CONSTANTS | ||
| // ═══════════════════════════════════════════════════════════════════════════════════ | ||
|
|
||
| const METRICS_FILENAME = 'load-balancer-metrics.json'; | ||
| const METRICS_DIR = '.aiox'; | ||
|
|
||
| /** Default max concurrent tasks per agent */ | ||
| const DEFAULT_MAX_LOAD = 100; | ||
|
|
||
| /** Default processing speed multiplier */ | ||
| const DEFAULT_PROCESSING_SPEED = 1.0; | ||
|
|
||
| /** Overload threshold percentage */ | ||
| const OVERLOAD_THRESHOLD = 85; | ||
|
|
||
| /** Agent status enum */ | ||
| const AgentStatus = { | ||
| AVAILABLE: 'available', | ||
| BUSY: 'busy', | ||
| OVERLOADED: 'overloaded', | ||
| OFFLINE: 'offline', | ||
| }; | ||
|
|
||
| /** Task status enum */ | ||
| const TaskStatus = { | ||
| QUEUED: 'queued', | ||
| ASSIGNED: 'assigned', | ||
| IN_PROGRESS: 'in-progress', | ||
| COMPLETED: 'completed', | ||
| FAILED: 'failed', | ||
| }; | ||
|
|
||
| /** Task priority enum */ | ||
| const TaskPriority = { | ||
| LOW: 'low', | ||
| NORMAL: 'normal', | ||
| HIGH: 'high', | ||
| CRITICAL: 'critical', | ||
| }; | ||
|
|
||
| /** Priority weight for scoring */ | ||
| const PRIORITY_WEIGHTS = { | ||
| [TaskPriority.LOW]: 1, | ||
| [TaskPriority.NORMAL]: 2, | ||
| [TaskPriority.HIGH]: 4, | ||
| [TaskPriority.CRITICAL]: 8, | ||
| }; | ||
|
|
||
| /** Throttle policies */ | ||
| const ThrottlePolicy = { | ||
| QUEUE_WHEN_FULL: 'queue-when-full', | ||
| REJECT_WHEN_FULL: 'reject-when-full', | ||
| SPILLOVER: 'spillover', | ||
| }; | ||
|
|
||
| /** Affinity weight distribution */ | ||
| const AFFINITY_WEIGHTS = { | ||
| SPECIALTY: 0.4, | ||
| LOAD_INVERSE: 0.3, | ||
| SPEED: 0.2, | ||
| SUCCESS_RATE: 0.1, | ||
| }; | ||
|
|
||
| // ═══════════════════════════════════════════════════════════════════════════════════ | ||
| // HELPER FUNCTIONS | ||
| // ═══════════════════════════════════════════════════════════════════════════════════ | ||
|
|
||
| /** | ||
| * Generate a unique task ID | ||
| * @returns {string} Unique task ID | ||
| */ | ||
| function generateTaskId() { | ||
| return `task-${Date.now()}-${Math.random().toString(36).slice(2, 9)}`; | ||
| } | ||
|
|
||
| /** | ||
| * Create a default agent profile | ||
| * @param {string} agentId - Agent identifier | ||
| * @param {Object} overrides - Profile overrides | ||
| * @returns {Object} Complete agent profile | ||
| */ | ||
| function createAgentProfile(agentId, overrides = {}) { | ||
| return { | ||
| id: agentId, | ||
| maxLoad: overrides.maxLoad ?? DEFAULT_MAX_LOAD, | ||
| currentLoad: 0, | ||
| specialties: overrides.specialties ?? [], | ||
| processingSpeed: overrides.processingSpeed ?? DEFAULT_PROCESSING_SPEED, | ||
| activeTasks: [], | ||
| completedCount: 0, | ||
| failedCount: 0, | ||
| totalCompletionTime: 0, | ||
| avgCompletionTime: 0, | ||
| status: AgentStatus.AVAILABLE, | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Create a task object | ||
| * @param {Object} taskInput - Task input | ||
| * @returns {Object} Normalized task object | ||
| */ | ||
| function createTask(taskInput) { | ||
| return { | ||
| id: taskInput.id ?? generateTaskId(), | ||
| type: taskInput.type ?? 'general', | ||
| priority: taskInput.priority ?? TaskPriority.NORMAL, | ||
| complexity: Math.min(10, Math.max(1, taskInput.complexity ?? 5)), | ||
| requiredSpecialties: taskInput.requiredSpecialties ?? [], | ||
| assignedTo: null, | ||
| status: TaskStatus.QUEUED, | ||
| submittedAt: Date.now(), | ||
| startedAt: null, | ||
| completedAt: null, | ||
| result: null, | ||
| error: null, | ||
| }; | ||
| } | ||
|
|
||
| // ═══════════════════════════════════════════════════════════════════════════════════ | ||
| // COGNITIVE LOAD BALANCER CLASS | ||
| // ═══════════════════════════════════════════════════════════════════════════════════ | ||
|
|
||
| /** | ||
| * CognitiveLoadBalancer - Intelligent task distribution based on cognitive capacity | ||
| * | ||
| * Distributes tasks across agents using an affinity scoring algorithm: | ||
| * - Specialty match (40%) - How well agent specialties align with task requirements | ||
| * - Load inverse (30%) - Agents with less load score higher | ||
| * - Processing speed (20%) - Faster agents score higher | ||
| * - Success rate (10%) - Agents with better track records score higher | ||
| * | ||
| * Supports throttle policies for overload scenarios: | ||
| * - queue-when-full: Tasks wait in queue when all agents are at capacity | ||
| * - reject-when-full: Tasks are rejected when no agent can accept them | ||
| * - spillover: Tasks assigned to least-loaded agent regardless of capacity | ||
| * | ||
| * @extends EventEmitter | ||
| */ | ||
| class CognitiveLoadBalancer extends EventEmitter { | ||
| /** | ||
| * Creates a new CognitiveLoadBalancer instance | ||
| * @param {Object} [options={}] - Configuration options | ||
| * @param {string} [options.projectRoot] - Project root for metrics persistence | ||
| * @param {string} [options.throttlePolicy='queue-when-full'] - Default throttle policy | ||
| * @param {number} [options.maxQueueSize=1000] - Maximum queue size | ||
| * @param {boolean} [options.persistMetrics=true] - Whether to persist metrics to disk | ||
| */ | ||
| constructor(options = {}) { | ||
| super(); | ||
|
|
||
| this.projectRoot = options.projectRoot ?? process.cwd(); | ||
| this.throttlePolicy = options.throttlePolicy ?? ThrottlePolicy.QUEUE_WHEN_FULL; | ||
| this.maxQueueSize = options.maxQueueSize ?? 1000; | ||
| this.persistMetrics = options.persistMetrics ?? true; | ||
|
|
||
| /** @type {Map<string, Object>} Registered agents */ | ||
| this.agents = new Map(); | ||
|
|
||
| /** @type {Map<string, Object>} All tasks (active + completed) */ | ||
| this.tasks = new Map(); | ||
|
|
||
| /** @type {Array<string>} Task queue (task IDs in order) */ | ||
| this.queue = []; | ||
|
|
||
| /** @type {Object} Runtime metrics */ | ||
| this.metrics = { | ||
| totalSubmitted: 0, | ||
| totalCompleted: 0, | ||
| totalFailed: 0, | ||
| totalRejected: 0, | ||
| totalRebalanced: 0, | ||
| startTime: Date.now(), | ||
| }; | ||
| } | ||
|
|
||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
| // AGENT MANAGEMENT | ||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
|
|
||
| /** | ||
| * Register an agent with a cognitive profile | ||
| * @param {string} agentId - Unique agent identifier | ||
| * @param {Object} [profile={}] - Agent cognitive profile | ||
| * @param {number} [profile.maxLoad=100] - Maximum cognitive load capacity | ||
| * @param {string[]} [profile.specialties=[]] - List of specialties | ||
| * @param {number} [profile.processingSpeed=1.0] - Processing speed multiplier | ||
| * @returns {Object} Registered agent profile | ||
| * @throws {Error} If agentId is not a non-empty string | ||
| * @throws {Error} If profile is provided but is not an object | ||
| */ | ||
| registerAgent(agentId, profile = {}) { | ||
| if (!agentId || typeof agentId !== 'string') { | ||
| throw new Error('agentId must be a non-empty string'); | ||
| } | ||
|
|
||
| // Validate profile overrides must be object or undefined | ||
| if (profile !== undefined && profile !== null && typeof profile !== 'object') { | ||
| throw new Error('profile must be an object or undefined'); | ||
| } | ||
|
|
||
| const agentProfile = createAgentProfile(agentId, profile || {}); | ||
| this.agents.set(agentId, agentProfile); | ||
|
|
||
| this.emit('agent:registered', { agentId, profile: agentProfile }); | ||
|
|
||
| // Process queue when new agent registers so queued tasks get assigned | ||
| this._processQueue(); | ||
|
|
||
| return agentProfile; | ||
| } | ||
|
|
||
| /** | ||
| * Unregister an agent and redistribute its tasks | ||
| * @param {string} agentId - Agent to unregister | ||
| * @returns {string[]} IDs of tasks that were reassigned or queued | ||
| * @throws {Error} If agent is not found | ||
| */ | ||
| unregisterAgent(agentId) { | ||
| const agent = this.agents.get(agentId); | ||
| if (!agent) { | ||
| throw new Error(`Agent '${agentId}' not found`); | ||
| } | ||
|
|
||
| const orphanedTaskIds = [...agent.activeTasks]; | ||
|
|
||
| // Re-queue active tasks | ||
| for (const taskId of orphanedTaskIds) { | ||
| const task = this.tasks.get(taskId); | ||
| if (task) { | ||
| task.assignedTo = null; | ||
| task.status = TaskStatus.QUEUED; | ||
| task.startedAt = null; | ||
| this.queue.unshift(taskId); | ||
| } | ||
| } | ||
|
|
||
| this.agents.delete(agentId); | ||
| this.emit('agent:unregistered', { agentId, orphanedTasks: orphanedTaskIds }); | ||
|
|
||
| // Try to process queue after unregistration | ||
| this._processQueue(); | ||
|
|
||
| return orphanedTaskIds; | ||
| } | ||
|
|
||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
| // TASK SUBMISSION | ||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
|
|
||
| /** | ||
| * Submit a task for automatic distribution | ||
| * @param {Object} taskInput - Task to submit | ||
| * @param {string} [taskInput.id] - Task ID (auto-generated if omitted) | ||
| * @param {string} [taskInput.type='general'] - Task type | ||
| * @param {string} [taskInput.priority='normal'] - Priority level | ||
| * @param {number} [taskInput.complexity=5] - Complexity 1-10 | ||
| * @param {string[]} [taskInput.requiredSpecialties=[]] - Required specialties | ||
| * @returns {Object} Submission result with taskId and assignedTo | ||
| * @throws {Error} If task input is not a non-null object | ||
| * @throws {Error} If task with same ID already exists | ||
| */ | ||
| submitTask(taskInput) { | ||
| if (!taskInput || typeof taskInput !== 'object') { | ||
| throw new Error('Task must be a non-null object'); | ||
| } | ||
|
|
||
| const task = createTask(taskInput); | ||
|
|
||
| // Reject duplicate task IDs | ||
| if (this.tasks.has(task.id)) { | ||
| throw new Error(`Task '${task.id}' already exists`); | ||
| } | ||
|
|
||
| this.tasks.set(task.id, task); | ||
| this.metrics.totalSubmitted++; | ||
|
|
||
| this.emit('task:submitted', { taskId: task.id, task }); | ||
|
|
||
| // Critical priority tasks bypass queue | ||
| if (task.priority === TaskPriority.CRITICAL) { | ||
| const agent = this._findOptimalAgent(task); | ||
| if (agent) { | ||
| this._assignTaskToAgent(task, agent); | ||
| return { taskId: task.id, assignedTo: agent.id, status: TaskStatus.ASSIGNED }; | ||
| } | ||
| // Even critical tasks can be queued if using queue policy | ||
| if (this.throttlePolicy === ThrottlePolicy.REJECT_WHEN_FULL) { | ||
| task.status = TaskStatus.FAILED; | ||
| task.error = 'No available agent for critical task'; | ||
| this.metrics.totalRejected++; | ||
| this.emit('task:failed', { taskId: task.id, error: task.error }); | ||
| return { taskId: task.id, assignedTo: null, status: TaskStatus.FAILED }; | ||
| } | ||
| } | ||
|
|
||
| // Try to find an optimal agent | ||
| const optimalAgent = this._findOptimalAgent(task); | ||
|
|
||
| if (optimalAgent) { | ||
| this._assignTaskToAgent(task, optimalAgent); | ||
| return { taskId: task.id, assignedTo: optimalAgent.id, status: TaskStatus.ASSIGNED }; | ||
| } | ||
|
|
||
| // Handle overflow based on throttle policy | ||
| return this._handleOverflow(task); | ||
| } | ||
|
|
||
| /** | ||
| * Manually assign a task to a specific agent | ||
| * @param {string} taskId - Task to assign | ||
| * @param {string} agentId - Target agent | ||
| * @returns {Object} Assignment result | ||
| * @throws {Error} If task or agent not found | ||
| * @throws {Error} If task is already completed or failed | ||
| */ | ||
| assignTask(taskId, agentId) { | ||
| const task = this.tasks.get(taskId); | ||
| if (!task) { | ||
| throw new Error(`Task '${taskId}' not found`); | ||
| } | ||
|
|
||
| // Enforce task state transitions - cannot assign completed/failed tasks | ||
| if (task.status === TaskStatus.COMPLETED) { | ||
| throw new Error(`Task '${taskId}' is already completed`); | ||
| } | ||
| if (task.status === TaskStatus.FAILED) { | ||
| throw new Error(`Task '${taskId}' is already failed`); | ||
| } | ||
|
|
||
| const agent = this.agents.get(agentId); | ||
| if (!agent) { | ||
| throw new Error(`Agent '${agentId}' not found`); | ||
| } | ||
|
|
||
| // If task was in queue, remove it | ||
| const queueIndex = this.queue.indexOf(taskId); | ||
| if (queueIndex !== -1) { | ||
| this.queue.splice(queueIndex, 1); | ||
| } | ||
|
|
||
| // If task was assigned to another agent, remove it | ||
| if (task.assignedTo) { | ||
| const prevAgent = this.agents.get(task.assignedTo); | ||
| if (prevAgent) { | ||
| this._removeTaskFromAgent(prevAgent, taskId); | ||
| } | ||
| } | ||
|
|
||
| this._assignTaskToAgent(task, agent); | ||
| return { taskId, assignedTo: agentId, status: TaskStatus.ASSIGNED }; | ||
| } | ||
|
|
||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
| // TASK COMPLETION | ||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
|
|
||
| /** | ||
| * Mark a task as completed and free capacity | ||
| * @param {string} taskId - Task to complete | ||
| * @param {*} [result=null] - Task result | ||
| * @returns {Object} Completion info | ||
| * @throws {Error} If task not found | ||
| * @throws {Error} If task is already completed or failed | ||
| */ | ||
| async completeTask(taskId, result = null) { | ||
| const task = this.tasks.get(taskId); | ||
| if (!task) { | ||
| throw new Error(`Task '${taskId}' not found`); | ||
| } | ||
|
|
||
| // Enforce task state transitions | ||
| if (task.status === TaskStatus.COMPLETED) { | ||
| throw new Error(`Task '${taskId}' is already completed`); | ||
| } | ||
| if (task.status === TaskStatus.FAILED) { | ||
| throw new Error(`Task '${taskId}' is already failed`); | ||
| } | ||
|
|
||
| task.status = TaskStatus.COMPLETED; | ||
| task.completedAt = Date.now(); | ||
| task.result = result; | ||
|
|
||
| const agent = task.assignedTo ? this.agents.get(task.assignedTo) : null; | ||
| if (agent) { | ||
| this._removeTaskFromAgent(agent, taskId); | ||
| agent.completedCount++; | ||
| const completionTime = task.completedAt - (task.startedAt ?? task.submittedAt); | ||
| agent.totalCompletionTime += completionTime; | ||
| agent.avgCompletionTime = agent.totalCompletionTime / agent.completedCount; | ||
| this._updateAgentStatus(agent); | ||
| } | ||
|
|
||
| this.metrics.totalCompleted++; | ||
| this.emit('task:completed', { taskId, result, agentId: task.assignedTo }); | ||
|
|
||
| // Try to process queue after freeing capacity | ||
| this._processQueue(); | ||
|
|
||
| // Await metrics persistence instead of fire-and-forget | ||
| await this._persistMetrics(); | ||
|
|
||
| return { | ||
| taskId, | ||
| agentId: task.assignedTo, | ||
| completionTime: task.completedAt - (task.startedAt ?? task.submittedAt), | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Mark a task as failed and free capacity | ||
| * @param {string} taskId - Task that failed | ||
| * @param {string|Error} [error='Unknown error'] - Error description | ||
| * @returns {Object} Failure info | ||
| * @throws {Error} If task not found | ||
| * @throws {Error} If task is already completed or failed | ||
| */ | ||
| async failTask(taskId, error = 'Unknown error') { | ||
| const task = this.tasks.get(taskId); | ||
| if (!task) { | ||
| throw new Error(`Task '${taskId}' not found`); | ||
| } | ||
|
|
||
| // Enforce task state transitions | ||
| if (task.status === TaskStatus.COMPLETED) { | ||
| throw new Error(`Task '${taskId}' is already completed`); | ||
| } | ||
| if (task.status === TaskStatus.FAILED) { | ||
| throw new Error(`Task '${taskId}' is already failed`); | ||
| } | ||
|
|
||
| const errorMessage = error instanceof Error ? error.message : String(error); | ||
| task.status = TaskStatus.FAILED; | ||
| task.completedAt = Date.now(); | ||
| task.error = errorMessage; | ||
|
|
||
| const agent = task.assignedTo ? this.agents.get(task.assignedTo) : null; | ||
| if (agent) { | ||
| this._removeTaskFromAgent(agent, taskId); | ||
| agent.failedCount++; | ||
| this._updateAgentStatus(agent); | ||
| } | ||
|
|
||
| this.metrics.totalFailed++; | ||
| this.emit('task:failed', { taskId, error: errorMessage, agentId: task.assignedTo }); | ||
|
|
||
| // Try to process queue after freeing capacity | ||
| this._processQueue(); | ||
|
|
||
| // Await metrics persistence instead of fire-and-forget | ||
| await this._persistMetrics(); | ||
|
|
||
| return { | ||
| taskId, | ||
| agentId: task.assignedTo, | ||
| error: errorMessage, | ||
| }; | ||
| } | ||
|
|
||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
| // QUERY METHODS | ||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
|
|
||
| /** | ||
| * Get current load percentage for an agent (0-100%) | ||
| * @param {string} agentId - Agent to query | ||
| * @returns {number} Load percentage | ||
| * @throws {Error} If agent not found | ||
| */ | ||
| getAgentLoad(agentId) { | ||
| const agent = this.agents.get(agentId); | ||
| if (!agent) { | ||
| throw new Error(`Agent '${agentId}' not found`); | ||
| } | ||
|
|
||
| if (agent.maxLoad === 0) return 100; | ||
| return Math.min(100, (agent.currentLoad / agent.maxLoad) * 100); | ||
| } | ||
|
|
||
| /** | ||
| * Find the optimal agent for a task without assigning | ||
| * @param {Object} task - Task descriptor | ||
| * @returns {Object|null} Best agent info or null if none available | ||
| */ | ||
| getOptimalAgent(task) { | ||
| const normalizedTask = createTask(task); | ||
| const agent = this._findOptimalAgent(normalizedTask); | ||
|
|
||
| if (!agent) return null; | ||
|
|
||
| return { | ||
| agentId: agent.id, | ||
| currentLoad: this.getAgentLoad(agent.id), | ||
| affinityScore: this._calculateAffinityScore(agent, normalizedTask), | ||
| specialties: agent.specialties, | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Get the current task queue | ||
| * @returns {Object[]} Queued tasks with details | ||
| */ | ||
| getQueue() { | ||
| return this.queue.map((taskId) => { | ||
| const task = this.tasks.get(taskId); | ||
| return task ? { ...task } : null; | ||
| }).filter(Boolean); | ||
| } | ||
|
|
||
| /** | ||
| * Get comprehensive metrics | ||
| * @returns {Object} Metrics snapshot | ||
| */ | ||
| getMetrics() { | ||
| const agentUtilization = {}; | ||
| for (const [agentId, agent] of this.agents) { | ||
| agentUtilization[agentId] = { | ||
| load: agent.maxLoad > 0 ? (agent.currentLoad / agent.maxLoad) * 100 : 0, | ||
| activeTasks: agent.activeTasks.length, | ||
| completedCount: agent.completedCount, | ||
| failedCount: agent.failedCount, | ||
| avgCompletionTime: agent.avgCompletionTime, | ||
| successRate: this._getSuccessRate(agent), | ||
| status: agent.status, | ||
| }; | ||
| } | ||
|
|
||
| const uptime = Date.now() - this.metrics.startTime; | ||
| const throughput = uptime > 0 | ||
| ? (this.metrics.totalCompleted / (uptime / 1000)) * 60 | ||
| : 0; | ||
|
|
||
| return { | ||
| totalSubmitted: this.metrics.totalSubmitted, | ||
| totalCompleted: this.metrics.totalCompleted, | ||
| totalFailed: this.metrics.totalFailed, | ||
| totalRejected: this.metrics.totalRejected, | ||
| totalRebalanced: this.metrics.totalRebalanced, | ||
| queueLength: this.queue.length, | ||
| activeAgents: this.agents.size, | ||
| throughputPerMinute: Math.round(throughput * 100) / 100, | ||
| avgWaitTime: this._calculateAvgWaitTime(), | ||
| agentUtilization, | ||
| uptime, | ||
| }; | ||
| } | ||
|
|
||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
| // REBALANCING | ||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
|
|
||
| /** | ||
| * Rebalance tasks from overloaded to underloaded agents | ||
| * @returns {Object} Rebalance summary | ||
| */ | ||
| rebalance() { | ||
| const movements = []; | ||
| const overloaded = []; | ||
| const underloaded = []; | ||
|
|
||
| // Categorize agents | ||
| for (const [, agent] of this.agents) { | ||
| if (agent.status === AgentStatus.OFFLINE) continue; | ||
|
|
||
| const loadPct = agent.maxLoad > 0 ? (agent.currentLoad / agent.maxLoad) * 100 : 100; | ||
| if (loadPct > OVERLOAD_THRESHOLD) { | ||
| overloaded.push(agent); | ||
| } else if (loadPct < 50) { | ||
| underloaded.push(agent); | ||
| } | ||
| } | ||
|
|
||
| if (overloaded.length === 0 || underloaded.length === 0) { | ||
| return { movements: [], overloadedCount: overloaded.length, underloadedCount: underloaded.length }; | ||
| } | ||
|
|
||
| // Sort underloaded by available capacity (descending) | ||
| underloaded.sort((a, b) => { | ||
| const capA = a.maxLoad - a.currentLoad; | ||
| const capB = b.maxLoad - b.currentLoad; | ||
| return capB - capA; | ||
| }); | ||
|
|
||
| // Move tasks from overloaded to underloaded | ||
| for (const source of overloaded) { | ||
| const tasksToMove = [...source.activeTasks]; | ||
|
|
||
| for (const taskId of tasksToMove) { | ||
| const task = this.tasks.get(taskId); | ||
| if (!task) continue; | ||
|
|
||
| // Find best underloaded target | ||
| const target = this._findBestRebalanceTarget(task, underloaded, source.id); | ||
| if (!target) continue; | ||
|
|
||
| // Check if source is still overloaded | ||
| const sourceLoad = source.maxLoad > 0 ? (source.currentLoad / source.maxLoad) * 100 : 100; | ||
| if (sourceLoad <= OVERLOAD_THRESHOLD) break; | ||
|
|
||
| // Move task | ||
| this._removeTaskFromAgent(source, taskId); | ||
| this._assignTaskToAgent(task, target); | ||
|
|
||
| movements.push({ | ||
| taskId, | ||
| from: source.id, | ||
| to: target.id, | ||
| }); | ||
|
|
||
| this.metrics.totalRebalanced++; | ||
| this.emit('task:rebalanced', { taskId, from: source.id, to: target.id }); | ||
| } | ||
| } | ||
|
|
||
| // Update all agent statuses | ||
| for (const [, agent] of this.agents) { | ||
| this._updateAgentStatus(agent); | ||
| } | ||
|
|
||
| return { | ||
| movements, | ||
| overloadedCount: overloaded.length, | ||
| underloadedCount: underloaded.length, | ||
| }; | ||
| } | ||
|
|
||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
| // THROTTLE POLICY | ||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
|
|
||
| /** | ||
| * Set throttle policy for overload scenarios | ||
| * @param {string} policy - Policy: 'queue-when-full', 'reject-when-full', 'spillover' | ||
| * @throws {Error} If policy is invalid | ||
| */ | ||
| setThrottlePolicy(policy) { | ||
| const validPolicies = Object.values(ThrottlePolicy); | ||
| if (!validPolicies.includes(policy)) { | ||
| throw new Error(`Invalid throttle policy '${policy}'. Valid: ${validPolicies.join(', ')}`); | ||
| } | ||
| this.throttlePolicy = policy; | ||
| } | ||
|
|
||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
| // INTERNAL METHODS | ||
| // ═══════════════════════════════════════════════════════════════════════════════ | ||
|
|
||
| /** | ||
| * Calculate affinity score for an agent-task pair | ||
| * @param {Object} agent - Agent profile | ||
| * @param {Object} task - Task object | ||
| * @returns {number} Affinity score 0-1 | ||
| * @private | ||
| */ | ||
| _calculateAffinityScore(agent, task) { | ||
| // Specialty match (40%) | ||
| let specialtyScore = 0; | ||
| if (task.requiredSpecialties.length > 0 && agent.specialties.length > 0) { | ||
| const matches = task.requiredSpecialties.filter( | ||
| (s) => agent.specialties.includes(s) | ||
| ).length; | ||
| specialtyScore = matches / task.requiredSpecialties.length; | ||
| } else if (task.requiredSpecialties.length === 0) { | ||
| specialtyScore = 0.5; // Neutral when no specialties required | ||
| } | ||
|
|
||
| // Load inverse (30%) - Less load = higher score | ||
| const loadPct = agent.maxLoad > 0 ? agent.currentLoad / agent.maxLoad : 1; | ||
| const loadScore = 1 - loadPct; | ||
|
|
||
| // Processing speed (20%) | ||
| const speedScore = Math.min(1, agent.processingSpeed / 2.0); | ||
|
|
||
| // Success rate (10%) | ||
| const successRate = this._getSuccessRate(agent); | ||
|
|
||
| return ( | ||
| specialtyScore * AFFINITY_WEIGHTS.SPECIALTY + | ||
| loadScore * AFFINITY_WEIGHTS.LOAD_INVERSE + | ||
| speedScore * AFFINITY_WEIGHTS.SPEED + | ||
| successRate * AFFINITY_WEIGHTS.SUCCESS_RATE | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * Get success rate for an agent | ||
| * @param {Object} agent - Agent profile | ||
| * @returns {number} Success rate 0-1 | ||
| * @private | ||
| */ | ||
| _getSuccessRate(agent) { | ||
| const total = agent.completedCount + agent.failedCount; | ||
| if (total === 0) return 1; // Benefit of the doubt for new agents | ||
| return agent.completedCount / total; | ||
| } | ||
|
|
||
| /** | ||
| * Find optimal agent for a task | ||
| * @param {Object} task - Task to assign | ||
| * @returns {Object|null} Best agent or null | ||
| * @private | ||
| */ | ||
| _findOptimalAgent(task) { | ||
| let bestAgent = null; | ||
| let bestScore = -1; | ||
|
|
||
| for (const [, agent] of this.agents) { | ||
| if (agent.status === AgentStatus.OFFLINE) continue; | ||
|
|
||
| // Check capacity (unless spillover policy) | ||
| if (this.throttlePolicy !== ThrottlePolicy.SPILLOVER) { | ||
| const loadAfter = agent.currentLoad + task.complexity; | ||
| if (loadAfter > agent.maxLoad) continue; | ||
| } | ||
|
|
||
| const score = this._calculateAffinityScore(agent, task); | ||
| if (score > bestScore) { | ||
| bestScore = score; | ||
| bestAgent = agent; | ||
| } | ||
| } | ||
|
|
||
| return bestAgent; | ||
| } | ||
|
|
||
| /** | ||
| * Find best rebalance target from underloaded agents | ||
| * @param {Object} task - Task to move | ||
| * @param {Object[]} candidates - Underloaded agents | ||
| * @param {string} excludeId - Agent to exclude (source) | ||
| * @returns {Object|null} Best target agent | ||
| * @private | ||
| */ | ||
| _findBestRebalanceTarget(task, candidates, excludeId) { | ||
| let bestTarget = null; | ||
| let bestScore = -1; | ||
|
|
||
| for (const candidate of candidates) { | ||
| if (candidate.id === excludeId) continue; | ||
|
|
||
| const loadAfter = candidate.currentLoad + task.complexity; | ||
| if (loadAfter > candidate.maxLoad) continue; | ||
|
|
||
| const score = this._calculateAffinityScore(candidate, task); | ||
| if (score > bestScore) { | ||
| bestScore = score; | ||
| bestTarget = candidate; | ||
| } | ||
| } | ||
|
|
||
| return bestTarget; | ||
| } | ||
|
|
||
| /** | ||
| * Assign a task to an agent (internal) | ||
| * @param {Object} task - Task object | ||
| * @param {Object} agent - Agent profile | ||
| * @private | ||
| */ | ||
| _assignTaskToAgent(task, agent) { | ||
| // Save previous status for transition-only events | ||
| const previousStatus = agent.status; | ||
|
|
||
| task.assignedTo = agent.id; | ||
| task.status = TaskStatus.ASSIGNED; | ||
| task.startedAt = Date.now(); | ||
|
|
||
| agent.activeTasks.push(task.id); | ||
| agent.currentLoad += task.complexity; | ||
|
|
||
| this._updateAgentStatus(agent); | ||
| this.emit('task:assigned', { taskId: task.id, agentId: agent.id }); | ||
|
|
||
| // Only emit agent:overloaded on actual status transition | ||
| if (agent.status === AgentStatus.OVERLOADED && previousStatus !== AgentStatus.OVERLOADED) { | ||
| const loadPct = agent.maxLoad > 0 ? (agent.currentLoad / agent.maxLoad) * 100 : 100; | ||
| this.emit('agent:overloaded', { agentId: agent.id, load: loadPct }); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Remove a task from an agent's active list | ||
| * @param {Object} agent - Agent profile | ||
| * @param {string} taskId - Task to remove | ||
| * @private | ||
| */ | ||
| _removeTaskFromAgent(agent, taskId) { | ||
| // Save previous status for transition-only events | ||
| const previousStatus = agent.status; | ||
|
|
||
| const idx = agent.activeTasks.indexOf(taskId); | ||
| if (idx !== -1) { | ||
| agent.activeTasks.splice(idx, 1); | ||
| } | ||
|
|
||
| const task = this.tasks.get(taskId); | ||
| if (task) { | ||
| agent.currentLoad = Math.max(0, agent.currentLoad - task.complexity); | ||
| } | ||
|
|
||
| this._updateAgentStatus(agent); | ||
|
|
||
| // Only emit agent:available on transition FROM overloaded | ||
| if (agent.status !== AgentStatus.OVERLOADED && agent.status !== AgentStatus.OFFLINE | ||
| && previousStatus === AgentStatus.OVERLOADED) { | ||
| const loadPct = agent.maxLoad > 0 ? (agent.currentLoad / agent.maxLoad) * 100 : 100; | ||
| this.emit('agent:available', { agentId: agent.id, load: loadPct }); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Update agent status based on current load | ||
| * @param {Object} agent - Agent profile | ||
| * @private | ||
| */ | ||
| _updateAgentStatus(agent) { | ||
| if (agent.status === AgentStatus.OFFLINE) return; | ||
|
|
||
| const loadPct = agent.maxLoad > 0 ? (agent.currentLoad / agent.maxLoad) * 100 : 100; | ||
|
|
||
| if (loadPct >= OVERLOAD_THRESHOLD) { | ||
| agent.status = AgentStatus.OVERLOADED; | ||
| } else if (agent.activeTasks.length > 0) { | ||
| agent.status = AgentStatus.BUSY; | ||
| } else { | ||
| agent.status = AgentStatus.AVAILABLE; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Handle overflow when no agent can accept the task | ||
| * @param {Object} task - Task to handle | ||
| * @returns {Object} Handling result | ||
| * @private | ||
| */ | ||
| _handleOverflow(task) { | ||
| switch (this.throttlePolicy) { | ||
| case ThrottlePolicy.QUEUE_WHEN_FULL: { | ||
| if (this.queue.length >= this.maxQueueSize) { | ||
| task.status = TaskStatus.FAILED; | ||
| task.error = 'Queue is full'; | ||
| this.metrics.totalRejected++; | ||
| this.emit('queue:full', { taskId: task.id, queueSize: this.queue.length }); | ||
| this.emit('task:failed', { taskId: task.id, error: task.error }); | ||
| return { taskId: task.id, assignedTo: null, status: TaskStatus.FAILED }; | ||
| } | ||
| this.queue.push(task.id); | ||
| return { taskId: task.id, assignedTo: null, status: TaskStatus.QUEUED }; | ||
| } | ||
|
|
||
| case ThrottlePolicy.REJECT_WHEN_FULL: { | ||
| task.status = TaskStatus.FAILED; | ||
| task.error = 'All agents at capacity'; | ||
| this.metrics.totalRejected++; | ||
| this.emit('task:failed', { taskId: task.id, error: task.error }); | ||
| return { taskId: task.id, assignedTo: null, status: TaskStatus.FAILED }; | ||
| } | ||
|
|
||
| case ThrottlePolicy.SPILLOVER: { | ||
| // Force assign to least loaded agent | ||
| const leastLoaded = this._findLeastLoadedAgent(); | ||
| if (leastLoaded) { | ||
| this._assignTaskToAgent(task, leastLoaded); | ||
| return { taskId: task.id, assignedTo: leastLoaded.id, status: TaskStatus.ASSIGNED }; | ||
| } | ||
| // No agents at all — queue it | ||
| this.queue.push(task.id); | ||
| return { taskId: task.id, assignedTo: null, status: TaskStatus.QUEUED }; | ||
| } | ||
|
|
||
| default: | ||
| this.queue.push(task.id); | ||
| return { taskId: task.id, assignedTo: null, status: TaskStatus.QUEUED }; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Find the least loaded agent (for spillover policy) | ||
| * @returns {Object|null} Least loaded agent | ||
| * @private | ||
| */ | ||
| _findLeastLoadedAgent() { | ||
| let bestAgent = null; | ||
| let lowestLoad = Infinity; | ||
|
|
||
| for (const [, agent] of this.agents) { | ||
| if (agent.status === AgentStatus.OFFLINE) continue; | ||
|
|
||
| const loadPct = agent.maxLoad > 0 ? agent.currentLoad / agent.maxLoad : 1; | ||
| if (loadPct < lowestLoad) { | ||
| lowestLoad = loadPct; | ||
| bestAgent = agent; | ||
| } | ||
| } | ||
|
|
||
| return bestAgent; | ||
| } | ||
|
|
||
| /** | ||
| * Process queued tasks, assigning to available agents | ||
| * @private | ||
| */ | ||
| _processQueue() { | ||
| if (this.queue.length === 0) return; | ||
|
|
||
| const remaining = []; | ||
|
|
||
| for (const taskId of this.queue) { | ||
| const task = this.tasks.get(taskId); | ||
| if (!task || task.status !== TaskStatus.QUEUED) continue; | ||
|
|
||
| const agent = this._findOptimalAgent(task); | ||
| if (agent) { | ||
| this._assignTaskToAgent(task, agent); | ||
| } else { | ||
| remaining.push(taskId); | ||
| } | ||
| } | ||
|
|
||
| this.queue = remaining; | ||
| } | ||
|
|
||
| /** | ||
| * Calculate average wait time for completed tasks | ||
| * @returns {number} Average wait time in ms | ||
| * @private | ||
| */ | ||
| _calculateAvgWaitTime() { | ||
| let totalWait = 0; | ||
| let count = 0; | ||
|
|
||
| for (const [, task] of this.tasks) { | ||
| if (task.startedAt && task.submittedAt) { | ||
| totalWait += task.startedAt - task.submittedAt; | ||
| count++; | ||
| } | ||
| } | ||
|
|
||
| return count > 0 ? Math.round(totalWait / count) : 0; | ||
| } | ||
|
|
||
| /** | ||
| * Persist metrics to disk | ||
| * @private | ||
| */ | ||
| async _persistMetrics() { | ||
| if (!this.persistMetrics) return; | ||
|
|
||
| try { | ||
| const metricsDir = path.join(this.projectRoot, METRICS_DIR); | ||
| const metricsPath = path.join(metricsDir, METRICS_FILENAME); | ||
|
|
||
| await fs.mkdir(metricsDir, { recursive: true }); | ||
| await fs.writeFile(metricsPath, JSON.stringify(this.getMetrics(), null, 2), 'utf8'); | ||
| } catch (err) { | ||
| // Log persistence errors with context instead of silently ignoring | ||
| console.error(`Failed to persist load balancer metrics: ${err.message}`); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // ═══════════════════════════════════════════════════════════════════════════════════ | ||
| // EXPORTS | ||
| // ═══════════════════════════════════════════════════════════════════════════════════ | ||
|
|
||
| module.exports = CognitiveLoadBalancer; | ||
| module.exports.CognitiveLoadBalancer = CognitiveLoadBalancer; | ||
| module.exports.AgentStatus = AgentStatus; | ||
| module.exports.TaskStatus = TaskStatus; | ||
| module.exports.TaskPriority = TaskPriority; | ||
| module.exports.ThrottlePolicy = ThrottlePolicy; | ||
| module.exports.AFFINITY_WEIGHTS = AFFINITY_WEIGHTS; | ||
| module.exports.OVERLOAD_THRESHOLD = OVERLOAD_THRESHOLD; |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Make the legacy path a thin re-export.
This file is a second full implementation of the balancer, so every fix now has to land twice and the two entry points can drift. Re-export the canonical .aiox-core/core/orchestration/cognitive-load-balancer module here instead of copying the entire implementation.
🧰 Tools
🪛 GitHub Check: ESLint
[warning] 674-674:
Missing trailing comma
[warning] 56-56:
'PRIORITY_WEIGHTS' is assigned a value but never used. Allowed unused vars must match /^_/u
🪛 GitHub Check: Performance Metrics
[warning] 674-674:
Missing trailing comma
[warning] 56-56:
'PRIORITY_WEIGHTS' is assigned a value but never used. Allowed unused vars must match /^_/u
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aios-core/core/orchestration/cognitive-load-balancer.js around lines 1 -
987, Replace this duplicated full implementation with a thin re-export: delete
the implementation body in cognitive-load-balancer.js and require/import the
canonical module (the single source of truth) and re-export its default and
named exports (CognitiveLoadBalancer, AgentStatus, TaskStatus, TaskPriority,
ThrottlePolicy, AFFINITY_WEIGHTS, OVERLOAD_THRESHOLD, etc.) so callers using
either entrypoint get the exact same module object; ensure the module.exports
shape matches the current exports (default export and the named properties) and
do not keep any duplicate logic or helper functions in this file.
| const agentProfile = createAgentProfile(agentId, profile || {}); | ||
| this.agents.set(agentId, agentProfile); | ||
|
|
||
| this.emit('agent:registered', { agentId, profile: agentProfile }); | ||
|
|
||
| // Process queue when new agent registers so queued tasks get assigned | ||
| this._processQueue(); | ||
|
|
There was a problem hiding this comment.
Preserve live agent state on re-registration.
this.agents.set(agentId, agentProfile) replaces activeTasks, currentLoad, and the counters with a fresh profile. If callers re-register an agent after work has already been assigned, the scheduler loses track of that workload even though the tasks still point at the same agentId. Either reject duplicate registrations or merge profile updates into the existing runtime record instead of recreating it from scratch.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/orchestration/cognitive-load-balancer.js around lines 216 -
223, The current registration unconditionally overwrites runtime state by
calling createAgentProfile and this.agents.set(agentId, agentProfile); change
this so duplicate registrations either get rejected or merge into the existing
runtime record: check this.agents.has(agentId) inside the registration flow, and
if an entry exists merge incoming profile fields into the existing agent object
rather than replacing it (preserve existing activeTasks, currentLoad, and any
counter properties), or alternatively throw/emit an error to reject duplicate
registrations; after merging or accepting, still emit('agent:registered', {
agentId, profile: existingAgent }) and call this._processQueue() so queued tasks
are handled with the preserved runtime state.
| task.assignedTo = agent.id; | ||
| task.status = TaskStatus.ASSIGNED; | ||
| task.startedAt = Date.now(); |
There was a problem hiding this comment.
Don't reset startedAt when moving an existing task.
rebalance() and manual reassignment both route through _assignTaskToAgent(). Overwriting startedAt here truncates completion-time metrics and per-agent averages for tasks that were already in flight.
🛠️ Proposed fix
- task.startedAt = Date.now();
+ task.startedAt ??= Date.now();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/orchestration/cognitive-load-balancer.js around lines 778 -
780, The current _assignTaskToAgent() unconditionally overwrites task.startedAt
when assigning, which resets timers for in-flight tasks; change
_assignTaskToAgent (used by rebalance() and manual reassignment flows) to only
set task.startedAt if it is not already defined (e.g., if task.startedAt is
null/undefined), leaving existing startedAt values intact when moving an
existing task and only initializing startedAt for brand-new assignments.
| case ThrottlePolicy.SPILLOVER: { | ||
| // Force assign to least loaded agent | ||
| const leastLoaded = this._findLeastLoadedAgent(); | ||
| if (leastLoaded) { | ||
| this._assignTaskToAgent(task, leastLoaded); | ||
| return { taskId: task.id, assignedTo: leastLoaded.id, status: TaskStatus.ASSIGNED }; | ||
| } | ||
| // No agents at all — queue it | ||
| this.queue.push(task.id); | ||
| return { taskId: task.id, assignedTo: null, status: TaskStatus.QUEUED }; | ||
| } | ||
|
|
||
| default: | ||
| this.queue.push(task.id); | ||
| return { taskId: task.id, assignedTo: null, status: TaskStatus.QUEUED }; |
There was a problem hiding this comment.
Keep the queue cap enforced when spillover has no target agent.
In the SPILLOVER fallback and the default branch, tasks are pushed straight into this.queue without checking maxQueueSize. A cluster with no registered agents can therefore accumulate an unbounded backlog even though the balancer advertises a hard queue limit.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/orchestration/cognitive-load-balancer.js around lines 873 -
887, The SPILLOVER case and default branch push task.id into this.queue
unconditionally; change both to enforce the queue cap by checking
this.queue.length >= this.maxQueueSize before pushing. If the queue is not full,
push and return the existing queued response; if it is full, do not push and
return a rejection/overflow response (e.g., { taskId: task.id, assignedTo: null,
status: TaskStatus.REJECTED } or introduce a suitable TaskStatus constant) so
callers know the task was refused. Update the branches that call
this.queue.push(task.id) (referenced symbols: ThrottlePolicy.SPILLOVER,
this.queue, this.maxQueueSize, TaskStatus, _assignTaskToAgent,
_findLeastLoadedAgent) accordingly.
| async _saveToDisk() { | ||
| const dir = path.dirname(this._persistPath); | ||
| await fs.mkdir(dir, { recursive: true }); | ||
|
|
||
| const data = { | ||
| version: '1.0.0', | ||
| savedAt: new Date().toISOString(), | ||
| stats: this._stats, | ||
| swarms: this._serializeSwarms(), | ||
| }; | ||
|
|
||
| await fs.writeFile(this._persistPath, JSON.stringify(data, null, 2), 'utf8'); |
There was a problem hiding this comment.
Write swarms.json atomically.
fs.writeFile(this._persistPath, ...) truncates the file before the new JSON is fully written, so loadFromDisk() can observe partial content and hit the Unexpected end of JSON input failure CI is reporting. Write to a temp file and rename it into place only after the snapshot is complete.
🛠️ Proposed fix
async _saveToDisk() {
const dir = path.dirname(this._persistPath);
+ const tempPath = `${this._persistPath}.${process.pid}.${Date.now()}.tmp`;
await fs.mkdir(dir, { recursive: true });
const data = {
version: '1.0.0',
savedAt: new Date().toISOString(),
stats: this._stats,
swarms: this._serializeSwarms(),
};
- await fs.writeFile(this._persistPath, JSON.stringify(data, null, 2), 'utf8');
+ await fs.writeFile(tempPath, JSON.stringify(data, null, 2), 'utf8');
+ await fs.rename(tempPath, this._persistPath);
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/orchestration/swarm-intelligence.js around lines 880 - 891,
The _saveToDisk method writes JSON directly to this._persistPath which can leave
the file truncated and cause loadFromDisk to read partial JSON; change
_saveToDisk to write the JSON to a temporary file in the same directory (e.g.,
`${this._persistPath}.tmp.${pid || uniqueId}`), fs.writeFile the temp file
fully, then fs.rename the temp into this._persistPath to atomically replace it;
ensure the temp file is written with the same encoding and handle/propagate
errors so the temp file won’t be left as the active snapshot.
| lastUpdated: '2026-03-08T05:58:54.622Z' | ||
| entityCount: 746 |
There was a problem hiding this comment.
Regenerate the registry artifacts instead of hand-editing this metadata.
Line 4 only accounts for one new entity, but this PR also adds .aiox-core/core/orchestration/cognitive-load-balancer.js. Since the registry generator scans .aiox-core/core/**/*.js, this file should include both orchestration modules; right now CI is already failing with an outdated manifest/hash mismatch.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/data/entity-registry.yaml around lines 3 - 4, Do not hand-edit
.aiox-core/data/entity-registry.yaml; instead run the project’s entity-registry
generator (the tool that scans .aiox-core/core/**/*.js) to regenerate the
registry so the new file
.aiox-core/core/orchestration/cognitive-load-balancer.js is discovered and the
manifest/hash/entityCount are updated; ensure the regenerated registry includes
both orchestration modules and commit the generated artifact rather than a
manual change to entity-registry.yaml.
| path: .aiox-core/core/orchestration/swarm-intelligence.js | ||
| layer: L1 | ||
| type: module | ||
| purpose: proposal.description, |
There was a problem hiding this comment.
Replace the placeholder purpose value with the module’s real description.
proposal.description, is not valid registry metadata; it looks like a template fragment rather than extracted source text. This will degrade registry-based discovery and matching for swarm-intelligence.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/data/entity-registry.yaml at line 12785, The purpose field
currently contains a template fragment ("proposal.description,") for the
swarm-intelligence module; replace that placeholder with the module's actual
descriptive text (a plain YAML string) — remove the trailing comma and template
token, and populate purpose with a concise free-text description sourced from
the module's manifest/README (e.g., a one- or two-sentence summary of
swarm-intelligence functionality) so registry discovery and matching work
correctly.
| const path = require('path'); | ||
| const fs = require('fs').promises; | ||
|
|
||
| const SwarmIntelligence = require('../../../.aiox-core/core/orchestration/swarm-intelligence'); |
There was a problem hiding this comment.
Use the repo's absolute module path in these requires.
The deep relative hops are brittle and don't follow the repo-wide JS import rule.
As per coding guidelines, **/*.{js,jsx,ts,tsx}: Use absolute imports instead of relative imports in all code.
Also applies to: 1016-1016
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/core/orchestration/swarm-intelligence.test.js` at line 11, The test
imports SwarmIntelligence using a brittle deep relative path; replace the
require call that assigns SwarmIntelligence (currently
require('../../../.aiox-core/core/orchestration/swarm-intelligence')) with the
repo's canonical absolute module import (e.g.,
require('aiox-core/core/orchestration/swarm-intelligence')) so tests follow the
project's absolute-import convention and avoid deep relative hops.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/core/orchestration/swarm-intelligence.test.js (1)
792-796: Avoid variable shadowing in the callback.The outer
const kis shadowed by the arrow function parameter(k) =>, which can harm readability.♻️ Suggested fix
it('should increment citations on queried results', () => { si.queryKnowledge(swarm.id, { topic: 'deployment' }); - const k = swarm.knowledgeBase.find((k) => k.topic === 'deployment strategies'); - expect(k.citations).toBe(1); + const entry = swarm.knowledgeBase.find((k) => k.topic === 'deployment strategies'); + expect(entry.citations).toBe(1); });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/core/orchestration/swarm-intelligence.test.js` around lines 792 - 796, The test shadows the outer constant k by using (k) => in the Array.find callback; change the callback parameter name (for example to kb or item) so it doesn't shadow the outer const k used for assertions; update the find call in the it block (the function passed to swarm.knowledgeBase.find) to use the new parameter name (e.g., kb => kb.topic === 'deployment strategies') and leave the outer const k unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.aiox-core/install-manifest.yaml:
- Around line 2594-2626: The manifest SIZE_MISMATCHes happen because hashFile()
normalizes text (removes BOM, converts CRLF->LF) before hashing while entry.size
is taken from raw fs.stat.size; update the pipeline so stored manifest size
matches the normalized bytes used for hashing: either (A) change hashFile() (or
the code that constructs the manifest) to compute size as
Buffer.byteLength(normalizedContent, 'utf8') and write that into entry.size
alongside the hash, or (B) change post-install-validator to skip raw-size checks
for files identified as text (using the same text-detection logic used by
hashFile()) and only validate hash for text files; fix references are to
hashFile(), post-install-validator, and entry.size so the normalization logic
and size assignment/validation stay consistent.
---
Nitpick comments:
In `@tests/core/orchestration/swarm-intelligence.test.js`:
- Around line 792-796: The test shadows the outer constant k by using (k) => in
the Array.find callback; change the callback parameter name (for example to kb
or item) so it doesn't shadow the outer const k used for assertions; update the
find call in the it block (the function passed to swarm.knowledgeBase.find) to
use the new parameter name (e.g., kb => kb.topic === 'deployment strategies')
and leave the outer const k unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 1d37ee33-7364-47ce-9215-fddd21cbf79f
📒 Files selected for processing (2)
.aiox-core/install-manifest.yamltests/core/orchestration/swarm-intelligence.test.js
| size: 9573 | ||
| - path: development/templates/service-template/client.ts.hbs | ||
| hash: sha256:f342c60695fe611192002bdb8c04b3a0dbce6345b7fa39834ea1898f71689198 | ||
| type: template | ||
| size: 12213 | ||
| size: 11810 | ||
| - path: development/templates/service-template/errors.ts.hbs | ||
| hash: sha256:e0be40d8be19b71b26e35778eadffb20198e7ca88e9d140db9da1bfe12de01ec | ||
| type: template | ||
| size: 5395 | ||
| size: 5213 | ||
| - path: development/templates/service-template/index.ts.hbs | ||
| hash: sha256:d44012d54b76ab98356c7163d257ca939f7fed122f10fecf896fe1e7e206d10a | ||
| type: template | ||
| size: 3206 | ||
| size: 3086 | ||
| - path: development/templates/service-template/jest.config.js | ||
| hash: sha256:1681bfd7fbc0d330d3487d3427515847c4d57ef300833f573af59e0ad69ed159 | ||
| type: template | ||
| size: 1750 | ||
| - path: development/templates/service-template/package.json.hbs | ||
| hash: sha256:d89d35f56992ee95c2ceddf17fa1d455c18007a4d24af914ba83cf4abc38bca9 | ||
| type: template | ||
| size: 2314 | ||
| size: 2227 | ||
| - path: development/templates/service-template/README.md.hbs | ||
| hash: sha256:2c3dd4c2bf6df56b9b6db439977be7e1cc35820438c0e023140eccf6ccd227a0 | ||
| type: template | ||
| size: 3584 | ||
| size: 3426 | ||
| - path: development/templates/service-template/tsconfig.json | ||
| hash: sha256:8b465fcbdd45c4d6821ba99aea62f2bd7998b1bca8de80486a1525e77d43c9a1 | ||
| type: template | ||
| size: 1135 | ||
| - path: development/templates/service-template/types.ts.hbs | ||
| hash: sha256:3e52e0195003be8cd1225a3f27f4d040686c8b8c7762f71b41055f04cd1b841b | ||
| type: template | ||
| size: 2661 | ||
| size: 2516 |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Goal:
# Verify whether manifest size is computed/validated from the same byte stream as SHA256.
# Expected:
# - Generator uses fs.stat.size or Buffer.byteLength on the exact bytes it hashes.
# - Validator either uses the same rule or ignores `size` for pass/fail decisions.
echo "=== Generator / validator candidates ==="
fd 'generate-install-manifest\.js$|manifest-generator\.js$|manifest-validator\.js$|validate\.js$' . | while read -r file; do
echo "--- $file"
rg -n -C2 'size|sha256|hash|readFile|readFileSync|stat|byteLength' "$file" || true
echo
done
echo "=== Current manifest structure (sanity spot-check) ==="
rg -n -C1 '^(version|generated_at|file_count| - path:| hash:| size:)' .aiox-core/install-manifest.yaml | sed -n '1,160p'Repository: SynkraAI/aiox-core
Length of output: 8404
🏁 Script executed:
cat -n packages/installer/src/installer/file-hasher.jsRepository: SynkraAI/aiox-core
Length of output: 8341
🏁 Script executed:
cat -n packages/gemini-aiox-extension/commands/aiox-validate.jsRepository: SynkraAI/aiox-core
Length of output: 1037
🏁 Script executed:
rg -r 'install-manifest\.yaml|validate.*manifest|manifest.*validat' --type js -A 5 -B 2Repository: SynkraAI/aiox-core
Length of output: 458
🏁 Script executed:
fd -name '*validator*.js' -o -name '*validate*.js' | head -20 | while read f; do
echo "=== $f ==="
rg -n 'size|hash' "$f" | head -15
doneRepository: SynkraAI/aiox-core
Length of output: 292
🏁 Script executed:
rg 'install-manifest|validate.*manifest' --type js -A 3 -B 1 | head -80Repository: SynkraAI/aiox-core
Length of output: 5939
🏁 Script executed:
fd '\.js$' | xargs grep -l 'install-manifest' 2>/dev/null | head -10Repository: SynkraAI/aiox-core
Length of output: 506
🏁 Script executed:
cat -n scripts/validate-manifest.jsRepository: SynkraAI/aiox-core
Length of output: 8906
🏁 Script executed:
cat -n packages/installer/src/installer/post-install-validator.js | head -150Repository: SynkraAI/aiox-core
Length of output: 5188
🏁 Script executed:
rg 'SIZE_MISMATCH' packages/installer/src/installer/post-install-validator.js -A 3 -B 1Repository: SynkraAI/aiox-core
Length of output: 641
🏁 Script executed:
cat -n packages/installer/src/installer/post-install-validator.js | sed -n '140,250p'Repository: SynkraAI/aiox-core
Length of output: 5089
🏁 Script executed:
rg 'SIZE_MISMATCH|actualSize|entry\.size' packages/installer/src/installer/post-install-validator.js -B 5 -A 5Repository: SynkraAI/aiox-core
Length of output: 3850
🏁 Script executed:
cat -n packages/installer/src/installer/post-install-validator.js | sed -n '460,520p'Repository: SynkraAI/aiox-core
Length of output: 2797
Confirm and address the size/hash consistency issue affecting text file validation.
The concern is well-founded: hashFile() normalizes text file content before hashing (removes UTF-8 BOM, converts CRLF to LF), but size is always recorded as raw filesystem bytes from fs.stat.size. When post-install-validator runs in quick mode (without hash verification), it validates entry.size directly against actual file size, which causes false SIZE_MISMATCH failures when:
- Text files are checked after git
autocrlfoperations that normalize line endings - Files are moved between Windows and Unix without re-hashing
- BOM is added/removed by editors or tools
The manifest should either:
- Store size as the byte length of the normalized content (to match the hashed bytes), or
- Never use size for validation—only for informational purposes
Applies to all affected line ranges (2594–2626, 3378–3422, 3518–3554, 3702–4058).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/install-manifest.yaml around lines 2594 - 2626, The manifest
SIZE_MISMATCHes happen because hashFile() normalizes text (removes BOM, converts
CRLF->LF) before hashing while entry.size is taken from raw fs.stat.size; update
the pipeline so stored manifest size matches the normalized bytes used for
hashing: either (A) change hashFile() (or the code that constructs the manifest)
to compute size as Buffer.byteLength(normalizedContent, 'utf8') and write that
into entry.size alongside the hash, or (B) change post-install-validator to skip
raw-size checks for files identified as text (using the same text-detection
logic used by hashFile()) and only validate hash for text files; fix references
are to hashFile(), post-install-validator, and entry.size so the normalization
logic and size assignment/validation stay consistent.
…de tarefas Implementa o modulo de balanceamento de carga cognitiva (Story ORCH-6) que distribui tarefas entre agentes com base em capacidade cognitiva usando algoritmo de afinidade: specialty match (40%), load inverse (30%), processing speed (20%) e success rate (10%). Funcionalidades: - Registro/desregistro de agentes com perfil cognitivo - Submissao e roteamento automatico de tarefas - Politicas de throttle: queue-when-full, reject-when-full, spillover - Rebalanceamento de tarefas entre agentes - Metricas de utilizacao e throughput - Persistencia de metricas em disco - 99 testes unitarios passando
Implementa inteligencia emergente via coordenacao de enxame (ORCH-5): - Criacao e gestao de swarms com configuracao de votacao - Sistema de decisao coletiva com 4 estrategias (majority, weighted, unanimous, quorum) - Base de conhecimento compartilhado com busca por topico/tags - Eleicao de lider (most-capable, highest-reputation, round-robin) - Metricas de saude e reputacao dinamica de agentes - Persistencia em .aiox/swarms.json - 83 testes cobrindo todos os metodos, estrategias e edge cases
- Converte .aios-core wrapper para re-export do .aiox-core (elimina duplicacao) - Rejeita proposals expiradas antes de aplicar voting strategy em resolveProposal() - Corrige quorum strategy: conta apenas approves para atingir quorum (nao total de votos) - Persiste citation bumps apos queryKnowledge() - Serializa escritas de persistencia com promise chain (_persistAsync) - Diferencia ENOENT de outros erros em loadFromDisk() (rethrow erros reais) - Adiciona 4 novos testes cobrindo todas as correcoes (87 total, todos passando)
…Load Balancer - Validação de ID duplicado ao submeter tasks - Guards de transição de estado (COMPLETED/FAILED são terminais) - Validação do parâmetro profile em registerAgent - Processar fila ao registrar novo agente - Persistência async de métricas com tratamento de erro - Emissão de eventos apenas em transições reais de status
Teste 'save and load state from disk' agora aguarda _pendingSave antes de chamar _saveToDisk(), evitando escrita concorrente que corrompia JSON (SyntaxError: Unexpected end of JSON input no CI)
0ceae2c to
708b376
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 8
♻️ Duplicate comments (3)
.aiox-core/core/orchestration/swarm-intelligence.js (1)
880-891:⚠️ Potential issue | 🟠 MajorWrite the swarm snapshot atomically.
fs.writeFile(this._persistPath, ...)updates the live file in place, so a concurrent read or interrupted write can leaveswarms.jsontruncated and unparsable. Write to a temp file in the same directory and rename it into place only after the JSON payload is fully flushed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.aiox-core/core/orchestration/swarm-intelligence.js around lines 880 - 891, The _saveToDisk method currently writes JSON directly to this._persistPath which can leave the file truncated; instead, write the serialized payload from _serializeSwarms() to a temp file in the same directory (e.g. this._persistPath + a .tmp suffix or include process.pid/timestamp), ensure the temp file is fully flushed (open the temp file, write the JSON, fsync the file handle, close it), then atomically rename the temp file to this._persistPath (fs.rename) to replace the live file; use the same directory for the temp file so rename is atomic across most platforms..aiox-core/core/orchestration/cognitive-load-balancer.js (2)
216-223:⚠️ Potential issue | 🔴 CriticalReject duplicate agent registrations instead of replacing live runtime state.
Re-registering an existing
agentIdhere overwritesactiveTasks,currentLoad, and historical counters, while existing tasks still reference that same agent ID. That desynchronizes scheduling and metrics as soon as a live agent is registered twice. Either throw on duplicates or merge profile fields into the existing runtime object instead of replacing it.As per coding guidelines,
.aiox-core/core/**: Ensure backwards compatibility — core modules are consumed by all agents.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.aiox-core/core/orchestration/cognitive-load-balancer.js around lines 216 - 223, The current registration code unconditionally replaces the runtime entry created by createAgentProfile(agentId, profile || {}) via this.agents.set(agentId, agentProfile), which wipes activeTasks, currentLoad and counters and desynchronizes live tasks; change this to first check this.agents.has(agentId) and either throw an error to reject duplicate registrations or, preferably, merge the incoming profile fields into the existing runtime object (lookup existing = this.agents.get(agentId)), update only metadata fields on existing without replacing activeTasks/currentLoad/historical counters, then emit('agent:registered', { agentId, profile: existing }) and call this._processQueue() so queued tasks are processed while preserving runtime state.
873-887:⚠️ Potential issue | 🟠 MajorKeep
maxQueueSizeenforced in the spillover fallback paths.When
SPILLOVERhas no target agent, and again in thedefaultbranch, tasks are pushed straight intothis.queuewithout the capacity check used byQUEUE_WHEN_FULL. A balancer with no registered agents can therefore grow an unbounded backlog even though the module advertises a hard queue limit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.aiox-core/core/orchestration/cognitive-load-balancer.js around lines 873 - 887, In the SPILLOVER case fallback and in the default branch, enforce the queue capacity before pushing: replicate the same maxQueueSize check/behavior used in the QUEUE_WHEN_FULL branch (check this.queue.length >= this.maxQueueSize), and if full return the same overflow response (e.g., TaskStatus.QUEUED or a specific full/overflow status used elsewhere) instead of unconditionally this.queue.push(task.id); otherwise push and return the queued result; update the SPILLOVER branch (the block that calls this._findLeastLoadedAgent() and this._assignTaskToAgent()) and the default branch to use this capacity check against this.maxQueueSize and the same return shape as the QUEUE_WHEN_FULL logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.aios-core/core/orchestration/cognitive-load-balancer.js:
- Around line 614-616: When moving a task you must preserve its in-flight timing
instead of calling _assignTaskToAgent which resets startedAt and re-emits
"task:assigned": change the rebalance path to use a move-specific flow (either
add a new _moveTaskToAgent(task, target, {preserveStartedAt: true,
suppressAssignedEvent: true}) or extend _assignTaskToAgent to accept options) so
that startedAt is not overwritten and the "task:assigned" event is not emitted;
ensure completionTime/avgCompletionTime/avgWaitTime calculations keep the
original startedAt and that any metrics/timers updated by _assignTaskToAgent are
skipped for this move.
- Around line 345-364: assignTask currently only verifies agent existence and
can assign tasks to agents whose status is 'offline', causing stuck work; update
assignTask() to block manual assignment to offline agents by checking the target
agent's status (e.g., agent.status or agent.isOnline) after retrieving it via
this.agents.get(agentId) and before calling this._assignTaskToAgent(task,
agent). If the agent is offline, either throw a clear Error (e.g., "Agent '<id>'
is offline") or requeue the task instead of assigning, and ensure any prior
removal from this.queue or previous agent via this._removeTaskFromAgent remains
consistent with this new early-exit behavior.
- Around line 206-217: The registerAgent function currently overwrites an
existing entry in this.agents which desyncs activeTasks/currentLoad; change it
to reject duplicate agent IDs by checking this.agents.has(agentId) at the top of
registerAgent and throwing a clear Error (e.g., "agentId already registered")
instead of creating/setting a new profile; update the code path around
createAgentProfile and this.agents.set(agentId, agentProfile) so it only runs
when the agentId is not already present (and add/update any tests that expect
duplicate registration to raise).
- Around line 960-968: The _persistMetrics method can be called concurrently
from completeTask()/failTask(), causing overlapping fs.writeFile on the same
metricsPath; introduce a serialization promise chain (e.g. a private property
_pendingSave) on the class and make _persistMetrics enqueue its work by setting
this._pendingSave = (this._pendingSave || Promise.resolve()).then(() => { /*
perform mkdir+writeFile */ }).catch(() => {/* swallow/log errors */}); ensure
callers still await _persistMetrics as before and mirror the pattern used in
swarm-intelligence.js to prevent concurrent writes.
In @.aiox-core/core/orchestration/cognitive-load-balancer.js:
- Around line 386-392: The current guards in completeTask() and failTask() only
prevent transitions from COMPLETED or FAILED but still allow terminaling
queued/never-assigned tasks; change the checks so you only permit transitions to
COMPLETED or FAILED when the task.status is either TaskStatus.ASSIGNED or
TaskStatus.IN_PROGRESS. In both functions (completeTask and failTask) replace
the existing status checks with a guard that throws an Error referencing taskId
unless task.status === TaskStatus.ASSIGNED || task.status ===
TaskStatus.IN_PROGRESS, keeping the existing error messages for completed/failed
cases as appropriate.
In @.aiox-core/core/orchestration/swarm-intelligence.js:
- Around line 867-872: _persistAsync currently swallows _saveToDisk errors by
routing them to _log (which is silent by default); change the catch to both emit
a visible event and produce a non-silent log so callers can detect persistent
failures: in the promise catch for this._pendingSave (the block that calls
this._saveToDisk()), call this.emit('persistenceError', err) (or
this._emitPersistenceError if you have an event helper) with the error and
include contextual details (swarm id/state) and then call a non-silent logging
method (e.g., this._logger.error or this._log with a flag) so the failure is
surfaced even when debug logging is off; keep the serialization behavior but
ensure the error is re-emitted/visible to callers.
- Around line 372-375: The code sets proposal.status = PROPOSAL_STATUS.EXPIRED
then immediately throws, which prevents finalization and persistence; either
finalize here or defer to resolveProposal(). Fix by completing the expiry
transition before throwing: set proposal.status = PROPOSAL_STATUS.EXPIRED, set
proposal.resolvedAt to now, increment proposalsResolved, emit
"proposal:resolved" with the proposalId and status EXPIRED, and call
_persistAsync() to persist the change; alternatively, remove the status mutation
and let resolveProposal() perform the full transition so the throw does not
leave an unpersisted terminal state.
In @.aiox-core/data/entity-registry.yaml:
- Around line 12789-12796: This module record is missing the keys present in
neighboring entries (externalDeps, plannedDeps, and lifecycle) causing schema
inconsistency; update the record that currently contains usedBy, dependencies,
adaptability, checksum, and lastVerified to include externalDeps: [],
plannedDeps: [], and lifecycle: {} (or the same empty structure used by adjacent
entries) so the entry shape matches others and consumers won't break; locate the
block containing the checksum sha256:960b90f... and lastVerified
'2026-03-08T05:58:54.620Z' to add the missing keys in the same style as
neighboring module entries.
---
Duplicate comments:
In @.aiox-core/core/orchestration/cognitive-load-balancer.js:
- Around line 216-223: The current registration code unconditionally replaces
the runtime entry created by createAgentProfile(agentId, profile || {}) via
this.agents.set(agentId, agentProfile), which wipes activeTasks, currentLoad and
counters and desynchronizes live tasks; change this to first check
this.agents.has(agentId) and either throw an error to reject duplicate
registrations or, preferably, merge the incoming profile fields into the
existing runtime object (lookup existing = this.agents.get(agentId)), update
only metadata fields on existing without replacing
activeTasks/currentLoad/historical counters, then emit('agent:registered', {
agentId, profile: existing }) and call this._processQueue() so queued tasks are
processed while preserving runtime state.
- Around line 873-887: In the SPILLOVER case fallback and in the default branch,
enforce the queue capacity before pushing: replicate the same maxQueueSize
check/behavior used in the QUEUE_WHEN_FULL branch (check this.queue.length >=
this.maxQueueSize), and if full return the same overflow response (e.g.,
TaskStatus.QUEUED or a specific full/overflow status used elsewhere) instead of
unconditionally this.queue.push(task.id); otherwise push and return the queued
result; update the SPILLOVER branch (the block that calls
this._findLeastLoadedAgent() and this._assignTaskToAgent()) and the default
branch to use this capacity check against this.maxQueueSize and the same return
shape as the QUEUE_WHEN_FULL logic.
In @.aiox-core/core/orchestration/swarm-intelligence.js:
- Around line 880-891: The _saveToDisk method currently writes JSON directly to
this._persistPath which can leave the file truncated; instead, write the
serialized payload from _serializeSwarms() to a temp file in the same directory
(e.g. this._persistPath + a .tmp suffix or include process.pid/timestamp),
ensure the temp file is fully flushed (open the temp file, write the JSON, fsync
the file handle, close it), then atomically rename the temp file to
this._persistPath (fs.rename) to replace the live file; use the same directory
for the temp file so rename is atomic across most platforms.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ec398aeb-5c08-445c-b1e0-cd1a764f3bd2
📒 Files selected for processing (8)
.aios-core/core/orchestration/cognitive-load-balancer.js.aios-core/core/orchestration/swarm-intelligence.js.aiox-core/core/orchestration/cognitive-load-balancer.js.aiox-core/core/orchestration/swarm-intelligence.js.aiox-core/data/entity-registry.yaml.aiox-core/install-manifest.yamltests/core/orchestration/cognitive-load-balancer.test.jstests/core/orchestration/swarm-intelligence.test.js
✅ Files skipped from review due to trivial changes (1)
- tests/core/orchestration/cognitive-load-balancer.test.js
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/core/orchestration/swarm-intelligence.test.js
| registerAgent(agentId, profile = {}) { | ||
| if (!agentId || typeof agentId !== 'string') { | ||
| throw new Error('agentId must be a non-empty string'); | ||
| } | ||
|
|
||
| // Validate profile overrides must be object or undefined | ||
| if (profile !== undefined && profile !== null && typeof profile !== 'object') { | ||
| throw new Error('profile must be an object or undefined'); | ||
| } | ||
|
|
||
| const agentProfile = createAgentProfile(agentId, profile || {}); | ||
| this.agents.set(agentId, agentProfile); |
There was a problem hiding this comment.
Reject duplicate agent IDs instead of overwriting live state.
Re-registering an existing agentId replaces the tracked profile, but already-assigned tasks still point at that id. After that, currentLoad and activeTasks no longer reflect reality, so the balancer can over-assign work.
Proposed fix
registerAgent(agentId, profile = {}) {
if (!agentId || typeof agentId !== 'string') {
throw new Error('agentId must be a non-empty string');
}
+ if (this.agents.has(agentId)) {
+ throw new Error(`Agent '${agentId}' already exists`);
+ }
// Validate profile overrides must be object or undefined
if (profile !== undefined && profile !== null && typeof profile !== 'object') {
throw new Error('profile must be an object or undefined');
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| registerAgent(agentId, profile = {}) { | |
| if (!agentId || typeof agentId !== 'string') { | |
| throw new Error('agentId must be a non-empty string'); | |
| } | |
| // Validate profile overrides must be object or undefined | |
| if (profile !== undefined && profile !== null && typeof profile !== 'object') { | |
| throw new Error('profile must be an object or undefined'); | |
| } | |
| const agentProfile = createAgentProfile(agentId, profile || {}); | |
| this.agents.set(agentId, agentProfile); | |
| registerAgent(agentId, profile = {}) { | |
| if (!agentId || typeof agentId !== 'string') { | |
| throw new Error('agentId must be a non-empty string'); | |
| } | |
| if (this.agents.has(agentId)) { | |
| throw new Error(`Agent '${agentId}' already exists`); | |
| } | |
| // Validate profile overrides must be object or undefined | |
| if (profile !== undefined && profile !== null && typeof profile !== 'object') { | |
| throw new Error('profile must be an object or undefined'); | |
| } | |
| const agentProfile = createAgentProfile(agentId, profile || {}); | |
| this.agents.set(agentId, agentProfile); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aios-core/core/orchestration/cognitive-load-balancer.js around lines 206 -
217, The registerAgent function currently overwrites an existing entry in
this.agents which desyncs activeTasks/currentLoad; change it to reject duplicate
agent IDs by checking this.agents.has(agentId) at the top of registerAgent and
throwing a clear Error (e.g., "agentId already registered") instead of
creating/setting a new profile; update the code path around createAgentProfile
and this.agents.set(agentId, agentProfile) so it only runs when the agentId is
not already present (and add/update any tests that expect duplicate registration
to raise).
| const agent = this.agents.get(agentId); | ||
| if (!agent) { | ||
| throw new Error(`Agent '${agentId}' not found`); | ||
| } | ||
|
|
||
| // If task was in queue, remove it | ||
| const queueIndex = this.queue.indexOf(taskId); | ||
| if (queueIndex !== -1) { | ||
| this.queue.splice(queueIndex, 1); | ||
| } | ||
|
|
||
| // If task was assigned to another agent, remove it | ||
| if (task.assignedTo) { | ||
| const prevAgent = this.agents.get(task.assignedTo); | ||
| if (prevAgent) { | ||
| this._removeTaskFromAgent(prevAgent, taskId); | ||
| } | ||
| } | ||
|
|
||
| this._assignTaskToAgent(task, agent); |
There was a problem hiding this comment.
Block manual assignment to offline agents.
assignTask() only checks that the agent exists. If the target is offline, the task becomes assigned and its load increases, but _updateAgentStatus() preserves offline, so the work can get stuck on an agent the scheduler will never pick.
Proposed fix
const agent = this.agents.get(agentId);
if (!agent) {
throw new Error(`Agent '${agentId}' not found`);
}
+ if (agent.status === AgentStatus.OFFLINE) {
+ throw new Error(`Agent '${agentId}' is offline`);
+ }
// If task was in queue, remove it
const queueIndex = this.queue.indexOf(taskId);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aios-core/core/orchestration/cognitive-load-balancer.js around lines 345 -
364, assignTask currently only verifies agent existence and can assign tasks to
agents whose status is 'offline', causing stuck work; update assignTask() to
block manual assignment to offline agents by checking the target agent's status
(e.g., agent.status or agent.isOnline) after retrieving it via
this.agents.get(agentId) and before calling this._assignTaskToAgent(task,
agent). If the agent is offline, either throw a clear Error (e.g., "Agent '<id>'
is offline") or requeue the task instead of assigning, and ensure any prior
removal from this.queue or previous agent via this._removeTaskFromAgent remains
consistent with this new early-exit behavior.
| // Move task | ||
| this._removeTaskFromAgent(source, taskId); | ||
| this._assignTaskToAgent(task, target); |
There was a problem hiding this comment.
Preserve in-flight task timing during rebalance.
This path reuses _assignTaskToAgent(), which overwrites startedAt and re-emits task:assigned. A moved task then looks newly started, so completionTime, avgCompletionTime, and avgWaitTime become wrong for rebalanced work.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aios-core/core/orchestration/cognitive-load-balancer.js around lines 614 -
616, When moving a task you must preserve its in-flight timing instead of
calling _assignTaskToAgent which resets startedAt and re-emits "task:assigned":
change the rebalance path to use a move-specific flow (either add a new
_moveTaskToAgent(task, target, {preserveStartedAt: true, suppressAssignedEvent:
true}) or extend _assignTaskToAgent to accept options) so that startedAt is not
overwritten and the "task:assigned" event is not emitted; ensure
completionTime/avgCompletionTime/avgWaitTime calculations keep the original
startedAt and that any metrics/timers updated by _assignTaskToAgent are skipped
for this move.
| async _persistMetrics() { | ||
| if (!this.persistMetrics) return; | ||
|
|
||
| try { | ||
| const metricsDir = path.join(this.projectRoot, METRICS_DIR); | ||
| const metricsPath = path.join(metricsDir, METRICS_FILENAME); | ||
|
|
||
| await fs.mkdir(metricsDir, { recursive: true }); | ||
| await fs.writeFile(metricsPath, JSON.stringify(this.getMetrics(), null, 2), 'utf8'); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -n -C3 '_persistMetrics|_pendingSave|writeFile' .aios-core/core/orchestration/cognitive-load-balancer.js .aiox-core/core/orchestration/swarm-intelligence.jsRepository: SynkraAI/aiox-core
Length of output: 3803
Serialize metrics writes to prevent race conditions under concurrent task completion.
Multiple concurrent completeTask() / failTask() calls can reach _persistMetrics() simultaneously and trigger overlapping writeFile() operations against the same file path. This creates nondeterministic metrics snapshots under load. The sibling orchestrator in swarm-intelligence.js already uses a serialized _pendingSave chain to prevent this.
Proposed fix
async _persistMetrics() {
if (!this.persistMetrics) return;
- try {
- const metricsDir = path.join(this.projectRoot, METRICS_DIR);
- const metricsPath = path.join(metricsDir, METRICS_FILENAME);
-
- await fs.mkdir(metricsDir, { recursive: true });
- await fs.writeFile(metricsPath, JSON.stringify(this.getMetrics(), null, 2), 'utf8');
- } catch (err) {
- // Log persistence errors with context instead of silently ignoring
- console.error(`Failed to persist load balancer metrics: ${err.message}`);
- }
+ this._pendingSave = (this._pendingSave || Promise.resolve())
+ .then(async () => {
+ const metricsDir = path.join(this.projectRoot, METRICS_DIR);
+ const metricsPath = path.join(metricsDir, METRICS_FILENAME);
+
+ await fs.mkdir(metricsDir, { recursive: true });
+ await fs.writeFile(metricsPath, JSON.stringify(this.getMetrics(), null, 2), 'utf8');
+ })
+ .catch((err) => {
+ console.error(`Failed to persist load balancer metrics: ${err.message}`);
+ });
+
+ return this._pendingSave;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async _persistMetrics() { | |
| if (!this.persistMetrics) return; | |
| try { | |
| const metricsDir = path.join(this.projectRoot, METRICS_DIR); | |
| const metricsPath = path.join(metricsDir, METRICS_FILENAME); | |
| await fs.mkdir(metricsDir, { recursive: true }); | |
| await fs.writeFile(metricsPath, JSON.stringify(this.getMetrics(), null, 2), 'utf8'); | |
| async _persistMetrics() { | |
| if (!this.persistMetrics) return; | |
| this._pendingSave = (this._pendingSave || Promise.resolve()) | |
| .then(async () => { | |
| const metricsDir = path.join(this.projectRoot, METRICS_DIR); | |
| const metricsPath = path.join(metricsDir, METRICS_FILENAME); | |
| await fs.mkdir(metricsDir, { recursive: true }); | |
| await fs.writeFile(metricsPath, JSON.stringify(this.getMetrics(), null, 2), 'utf8'); | |
| }) | |
| .catch((err) => { | |
| console.error(`Failed to persist load balancer metrics: ${err.message}`); | |
| }); | |
| return this._pendingSave; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aios-core/core/orchestration/cognitive-load-balancer.js around lines 960 -
968, The _persistMetrics method can be called concurrently from
completeTask()/failTask(), causing overlapping fs.writeFile on the same
metricsPath; introduce a serialization promise chain (e.g. a private property
_pendingSave) on the class and make _persistMetrics enqueue its work by setting
this._pendingSave = (this._pendingSave || Promise.resolve()).then(() => { /*
perform mkdir+writeFile */ }).catch(() => {/* swallow/log errors */}); ensure
callers still await _persistMetrics as before and mirror the pattern used in
swarm-intelligence.js to prevent concurrent writes.
| // Enforce task state transitions | ||
| if (task.status === TaskStatus.COMPLETED) { | ||
| throw new Error(`Task '${taskId}' is already completed`); | ||
| } | ||
| if (task.status === TaskStatus.FAILED) { | ||
| throw new Error(`Task '${taskId}' is already failed`); | ||
| } |
There was a problem hiding this comment.
Only allow terminal transitions from an active execution state.
A queued or never-assigned task can currently be completed or failed, which increments terminal metrics without ever consuming agent capacity and leaves stale queue entries until some later drain. Guard both paths so only ASSIGNED or IN_PROGRESS tasks can move to COMPLETED/FAILED.
🛠️ Proposed guard
if (task.status === TaskStatus.COMPLETED) {
throw new Error(`Task '${taskId}' is already completed`);
}
if (task.status === TaskStatus.FAILED) {
throw new Error(`Task '${taskId}' is already failed`);
}
+ if (task.status !== TaskStatus.ASSIGNED && task.status !== TaskStatus.IN_PROGRESS) {
+ throw new Error(`Task '${taskId}' is not currently assigned`);
+ }Apply the same guard in both completeTask() and failTask().
Also applies to: 438-444
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/orchestration/cognitive-load-balancer.js around lines 386 -
392, The current guards in completeTask() and failTask() only prevent
transitions from COMPLETED or FAILED but still allow terminaling
queued/never-assigned tasks; change the checks so you only permit transitions to
COMPLETED or FAILED when the task.status is either TaskStatus.ASSIGNED or
TaskStatus.IN_PROGRESS. In both functions (completeTask and failTask) replace
the existing status checks with a guard that throws an Error referencing taskId
unless task.status === TaskStatus.ASSIGNED || task.status ===
TaskStatus.IN_PROGRESS, keeping the existing error messages for completed/failed
cases as appropriate.
| // Check deadline | ||
| if (new Date(proposal.deadline) < new Date()) { | ||
| proposal.status = PROPOSAL_STATUS.EXPIRED; | ||
| throw new Error(`Proposal ${proposalId} has expired`); |
There was a problem hiding this comment.
Don't strand expired proposals in an unpersisted terminal state.
This branch sets proposal.status = EXPIRED and then throws immediately. That skips resolvedAt, proposalsResolved, proposal:resolved, and _persistAsync(), so an expired proposal can disappear from resolution metrics and come back as pending after a restart. Either finalize the expiry here before returning/throwing, or leave the proposal untouched and let resolveProposal() own the transition.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/orchestration/swarm-intelligence.js around lines 372 - 375,
The code sets proposal.status = PROPOSAL_STATUS.EXPIRED then immediately throws,
which prevents finalization and persistence; either finalize here or defer to
resolveProposal(). Fix by completing the expiry transition before throwing: set
proposal.status = PROPOSAL_STATUS.EXPIRED, set proposal.resolvedAt to now,
increment proposalsResolved, emit "proposal:resolved" with the proposalId and
status EXPIRED, and call _persistAsync() to persist the change; alternatively,
remove the status mutation and let resolveProposal() perform the full transition
so the throw does not leave an unpersisted terminal state.
| // Serialize writes to prevent concurrent fs operations | ||
| this._pendingSave = (this._pendingSave || Promise.resolve()) | ||
| .then(() => this._saveToDisk()) | ||
| .catch((err) => { | ||
| this._log(`Persistence error: ${err.message}`); | ||
| }); |
There was a problem hiding this comment.
Surface persistence failures even when debug logging is off.
_persistAsync() catches _saveToDisk() failures and forwards them to _log(), but _log() is silent by default. In normal operation, persistence can fail forever with no caller-visible signal that swarm state is no longer durable. Emit a dedicated persistence error event and/or log the failure with context here.
As per coding guidelines, .aiox-core/core/**: Verify error handling is comprehensive with proper try/catch and error context.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/core/orchestration/swarm-intelligence.js around lines 867 - 872,
_persistAsync currently swallows _saveToDisk errors by routing them to _log
(which is silent by default); change the catch to both emit a visible event and
produce a non-silent log so callers can detect persistent failures: in the
promise catch for this._pendingSave (the block that calls this._saveToDisk()),
call this.emit('persistenceError', err) (or this._emitPersistenceError if you
have an event helper) with the error and include contextual details (swarm
id/state) and then call a non-silent logging method (e.g., this._logger.error or
this._log with a flag) so the failure is surfaced even when debug logging is
off; keep the serialization behavior but ensure the error is re-emitted/visible
to callers.
| usedBy: [] | ||
| dependencies: [] | ||
| adaptability: | ||
| score: 0.4 | ||
| constraints: [] | ||
| extensionPoints: [] | ||
| checksum: sha256:960b90fbcafb68d5728a645e892891d9bdec053b4e9f36086f8e6d35259a08c1 | ||
| lastVerified: '2026-03-08T05:58:54.620Z' |
There was a problem hiding this comment.
Keep the new module entry schema-consistent.
This record skips externalDeps, plannedDeps, and lifecycle, while neighboring module entries include them. That inconsistency can leak into registry consumers that expect a uniform shape.
🧩 Minimal fix
usedBy: []
dependencies: []
+ externalDeps: []
+ plannedDeps: []
+ lifecycle: production
adaptability:
score: 0.4📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| usedBy: [] | |
| dependencies: [] | |
| adaptability: | |
| score: 0.4 | |
| constraints: [] | |
| extensionPoints: [] | |
| checksum: sha256:960b90fbcafb68d5728a645e892891d9bdec053b4e9f36086f8e6d35259a08c1 | |
| lastVerified: '2026-03-08T05:58:54.620Z' | |
| usedBy: [] | |
| dependencies: [] | |
| externalDeps: [] | |
| plannedDeps: [] | |
| lifecycle: production | |
| adaptability: | |
| score: 0.4 | |
| constraints: [] | |
| extensionPoints: [] | |
| checksum: sha256:960b90fbcafb68d5728a645e892891d9bdec053b4e9f36086f8e6d35259a08c1 | |
| lastVerified: '2026-03-08T05:58:54.620Z' |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.aiox-core/data/entity-registry.yaml around lines 12789 - 12796, This module
record is missing the keys present in neighboring entries (externalDeps,
plannedDeps, and lifecycle) causing schema inconsistency; update the record that
currently contains usedBy, dependencies, adaptability, checksum, and
lastVerified to include externalDeps: [], plannedDeps: [], and lifecycle: {} (or
the same empty structure used by adjacent entries) so the entry shape matches
others and consumers won't break; locate the block containing the checksum
sha256:960b90f... and lastVerified '2026-03-08T05:58:54.620Z' to add the missing
keys in the same style as neighboring module entries.
Summary
queue-when-full,reject-when-full,spilloverFeatures
.aiox/load-balancer-metrics.jsontask:submitted,task:assigned,task:completed,task:failed,task:rebalanced,agent:registered,agent:overloaded,agent:available,queue:full.aios-core/pathFiles
.aiox-core/core/orchestration/cognitive-load-balancer.js.aios-core/core/orchestration/cognitive-load-balancer.jstests/core/orchestration/cognitive-load-balancer.test.jsTest plan
npm test -- tests/core/orchestration/cognitive-load-balancer.test.js)??usage for defaults (nullish coalescing, not||)Summary by CodeRabbit
New Features
Tests
Chores