Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
400 changes: 390 additions & 10 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions packages/daemon-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"@adhdev/session-host-core": "*",
"@agentclientprotocol/sdk": "^0.16.1",
"@xterm/xterm": "^6.0.0",
"better-sqlite3": "^12.10.0",
"chalk": "^5.3.0",
"chokidar": "^4.0.3",
"conf": "^13.0.0",
Expand All @@ -60,6 +61,7 @@
"@adhdev/ghostty-vt-node": "*"
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.13",
"@types/js-yaml": "^4.0.9",
"@types/node": "^22.0.0",
"@types/ws": "^8.18.1",
Expand Down
6 changes: 4 additions & 2 deletions packages/daemon-core/src/commands/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import {
import { buildMachineInfo, buildStatusSnapshot } from '../status/snapshot.js';
import { getSessionCompletionMarker } from '../status/snapshot.js';
import { execNpmCommandSync, resolveCurrentGlobalInstallSurface, spawnDetachedDaemonUpgradeHelper } from './upgrade-helper.js';
import { getMeshQueueRevision } from '../mesh/mesh-work-queue.js';
import type { RepoMeshSessionCleanupMode } from '../repo-mesh-types.js';
import { homedir } from 'os';
import { join as pathJoin, resolve as pathResolve } from 'path';
Expand Down Expand Up @@ -1706,7 +1707,7 @@ export class DaemonCommandRouter {
* the mesh doesn't exist in the local meshes.json file. */
private inlineMeshCache = new Map<string, any>();
/** Coordinator-owned whole-mesh aggregate status snapshots. Browser callers read this by default. */
private aggregateMeshStatusCache = new Map<string, { builtAt: number; snapshot: any }>();
private aggregateMeshStatusCache = new Map<string, { builtAt: number; snapshot: any; queueRevision: string }>();
/** In-memory async Refinery jobs keyed by meshId:nodeId to reject/return duplicate in-flight requests. */
private runningRefineJobs = new Map<string, MeshRefineJobHandle>();
/** Terminal async Refinery jobs preserve a clear answer after the worktree node has been removed. */
Expand Down Expand Up @@ -1796,6 +1797,7 @@ export class DaemonCommandRouter {
private getCachedAggregateMeshStatus(meshId: string, mesh?: any, options?: { requireDirectPeerTruth?: boolean }): any | null {
const cached = this.aggregateMeshStatusCache.get(meshId);
if (!cached?.snapshot || cached.snapshot.success !== true || !Array.isArray(cached.snapshot.nodes)) return null;
if (cached.queueRevision !== getMeshQueueRevision(meshId)) return null;
let snapshot = this.cloneJsonValue(cached.snapshot);
snapshot = this.hydrateCachedAggregateMeshStatusFromInline(snapshot, mesh, options);
if (shouldRefreshStalePendingAggregate(snapshot, options)) return null;
Expand Down Expand Up @@ -1840,7 +1842,7 @@ export class DaemonCommandRouter {
returnedAt: new Date(builtAt).toISOString(),
},
};
this.aggregateMeshStatusCache.set(meshId, { builtAt, snapshot: this.cloneJsonValue(next) });
this.aggregateMeshStatusCache.set(meshId, { builtAt, snapshot: this.cloneJsonValue(next), queueRevision: getMeshQueueRevision(meshId) });
return next;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/daemon-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ export { buildMeshLedgerReconciliationEvidence, buildMeshLedgerReplicaEvidence }
export type { MeshLedgerReconciliationEvidence, MeshLedgerReplicaEvidence, MeshLedgerReplicaStatus } from './mesh/mesh-ledger-reconciliation.js';

// ── Mesh Work Queue (GUPP) ──
export { enqueueTask, getQueue, claimNextTask, updateTaskStatus, updateSessionTaskStatus, cancelTask, requeueTask, getMeshQueueStats, normalizeMeshTaskMode, validateMeshTaskModeRequest } from './mesh/mesh-work-queue.js';
export { enqueueTask, getQueue, claimNextTask, updateTaskStatus, updateSessionTaskStatus, cancelTask, requeueTask, getMeshQueueStats, getMeshQueueRevision, normalizeMeshTaskMode, validateMeshTaskModeRequest } from './mesh/mesh-work-queue.js';
export type { MeshWorkQueueEntry, MeshTaskStatus, MeshTaskMode, MeshWorkQueueStats, MeshQueueMutationOptions, MeshTaskModeValidationResult } from './mesh/mesh-work-queue.js';
export { buildMeshActiveWork, buildMeshActiveWorkSummary } from './mesh/mesh-active-work.js';
export type { MeshActiveWorkRecord, MeshActiveWorkStatus, MeshActiveWorkSummary, MeshActiveWorkSource } from './mesh/mesh-active-work.js';
Expand Down
163 changes: 163 additions & 0 deletions packages/daemon-core/src/mesh/beads-db.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import Database from 'better-sqlite3';
import { existsSync, mkdirSync, readFileSync } from 'fs';
import { dirname, join } from 'path';
import { getLedgerDir } from './mesh-ledger.js';
import type { MeshTaskStatus, MeshWorkQueueEntry } from './mesh-work-queue.js';

function safeMeshId(meshId: string): string {
return meshId.replace(/[^a-zA-Z0-9_-]/g, '_');
}

function legacyQueuePath(meshId: string): string {
return join(getLedgerDir(), `${safeMeshId(meshId)}.queue.json`);
}

export class BeadsDB {
private static instance: BeadsDB | undefined;
private readonly db: Database.Database;
private readonly migratedMeshIds = new Set<string>();

private constructor(dbPath: string) {
const dir = dirname(dbPath);
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });

this.db = new Database(dbPath);
this.db.pragma('journal_mode = WAL');
this.db.pragma('synchronous = NORMAL');
this.db.pragma('foreign_keys = ON');
this.db.pragma('busy_timeout = 5000');
this.migrate();
}

static getInstance(): BeadsDB {
if (!this.instance) {
this.instance = new BeadsDB(join(getLedgerDir(), 'beads.db'));
}
return this.instance;
}

static resetForTests(): void {
this.instance?.close();
this.instance = undefined;
}

close(): void {
this.db.close();
}

transaction<T>(fn: () => T): T {
return this.db.transaction(fn).immediate();
}

private migrate(): void {
this.db.exec(`
CREATE TABLE IF NOT EXISTS mesh_queue (
id TEXT PRIMARY KEY,
mesh_id TEXT NOT NULL,
status TEXT NOT NULL,
target_node_id TEXT,
target_session_id TEXT,
assigned_node_id TEXT,
assigned_session_id TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
payload TEXT NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_mesh_queue_mesh_status_created
ON mesh_queue(mesh_id, status, created_at);
CREATE INDEX IF NOT EXISTS idx_mesh_queue_assignment
ON mesh_queue(mesh_id, assigned_node_id, assigned_session_id, status);
`);
}

private ensureLegacyQueueMigrated(meshId: string): void {
if (this.migratedMeshIds.has(meshId)) return;
this.migratedMeshIds.add(meshId);

const count = this.db
.prepare('SELECT COUNT(*) AS count FROM mesh_queue WHERE mesh_id = ?')
.get(meshId) as { count: number };
if (count.count > 0) return;

const path = legacyQueuePath(meshId);
if (!existsSync(path)) return;

try {
const entries = JSON.parse(readFileSync(path, 'utf-8')) as MeshWorkQueueEntry[];
if (!Array.isArray(entries)) return;
const insert = this.db.prepare(`
INSERT OR REPLACE INTO mesh_queue (
id, mesh_id, status, target_node_id, target_session_id,
assigned_node_id, assigned_session_id, created_at, updated_at, payload
) VALUES (
@id, @meshId, @status, @targetNodeId, @targetSessionId,
@assignedNodeId, @assignedSessionId, @createdAt, @updatedAt, @payload
)
`);
for (const entry of entries) {
insert.run(this.toRow(entry));
}
} catch {
return;
}
}

getQueueEntries(meshId: string, statuses?: MeshTaskStatus[]): MeshWorkQueueEntry[] {
this.ensureLegacyQueueMigrated(meshId);
if (statuses?.length) {
const placeholders = statuses.map(() => '?').join(', ');
const rows = this.db
.prepare(`SELECT payload FROM mesh_queue WHERE mesh_id = ? AND status IN (${placeholders}) ORDER BY created_at ASC`)
.all(meshId, ...statuses) as Array<{ payload: string }>;
return rows.map(row => JSON.parse(row.payload) as MeshWorkQueueEntry);
}
const rows = this.db
.prepare('SELECT payload FROM mesh_queue WHERE mesh_id = ? ORDER BY created_at ASC')
.all(meshId) as Array<{ payload: string }>;
return rows.map(row => JSON.parse(row.payload) as MeshWorkQueueEntry);
}

getQueueRevision(meshId: string): string {
this.ensureLegacyQueueMigrated(meshId);
const rows = this.db
.prepare('SELECT id, status, updated_at FROM mesh_queue WHERE mesh_id = ? ORDER BY id ASC')
.all(meshId) as Array<{ id: string; status: string; updated_at: string }>;
return rows.map(row => `${row.id}:${row.status}:${row.updated_at}`).join('|');
}

replaceQueue(meshId: string, queue: MeshWorkQueueEntry[]): void {
const deleteStmt = this.db.prepare('DELETE FROM mesh_queue WHERE mesh_id = ?');
const insert = this.db.prepare(`
INSERT INTO mesh_queue (
id, mesh_id, status, target_node_id, target_session_id,
assigned_node_id, assigned_session_id, created_at, updated_at, payload
) VALUES (
@id, @meshId, @status, @targetNodeId, @targetSessionId,
@assignedNodeId, @assignedSessionId, @createdAt, @updatedAt, @payload
)
`);
deleteStmt.run(meshId);
for (const entry of queue) insert.run(this.toRow(entry));
}

deleteQueue(meshId: string): void {
this.db.prepare('DELETE FROM mesh_queue WHERE mesh_id = ?').run(meshId);
this.migratedMeshIds.delete(meshId);
}

private toRow(entry: MeshWorkQueueEntry): Record<string, unknown> {
return {
id: entry.id,
meshId: entry.meshId,
status: entry.status,
targetNodeId: entry.targetNodeId ?? null,
targetSessionId: entry.targetSessionId ?? null,
assignedNodeId: entry.assignedNodeId ?? null,
assignedSessionId: entry.assignedSessionId ?? null,
createdAt: entry.createdAt,
updatedAt: entry.updatedAt,
payload: JSON.stringify(entry),
};
}
}
64 changes: 23 additions & 41 deletions packages/daemon-core/src/mesh/mesh-work-queue.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { existsSync, writeFileSync, readFileSync, openSync, closeSync, unlinkSync } from 'fs';
import { join } from 'path';
import { randomUUID } from 'crypto';
import { getLedgerDir } from './mesh-ledger.js';
import { requireMeshHostQueueOwner } from './mesh-host-ownership.js';
import type { RepoMeshDaemonRole } from '../repo-mesh-types.js';
import { BeadsDB } from './beads-db.js';

export type MeshTaskStatus = 'pending' | 'assigned' | 'completed' | 'failed' | 'cancelled';
export type MeshActiveTaskStatus = Extract<MeshTaskStatus, 'pending' | 'assigned'>;
Expand Down Expand Up @@ -99,50 +97,16 @@ export interface MeshQueueMutationOptions {
ownerRole?: RepoMeshDaemonRole;
}

function getQueuePath(meshId: string): string {
const safe = meshId.replace(/[^a-zA-Z0-9_-]/g, '_');
return join(getLedgerDir(), `${safe}.queue.json`);
}

function getLockPath(meshId: string): string {
const safe = meshId.replace(/[^a-zA-Z0-9_-]/g, '_');
return join(getLedgerDir(), `${safe}.queue.lock`);
}

/**
* Simple advisory file lock using O_EXCL (atomic create) for queue mutations.
* Retries up to 10 times at 30 ms intervals; proceeds without lock on timeout
* to prevent deadlock (best-effort — far better than no locking at all).
*/
function withQueueLock<T>(meshId: string, fn: () => T): T {
const lockPath = getLockPath(meshId);
let fd = -1;
for (let i = 0; i < 10; i++) {
try { fd = openSync(lockPath, 'wx'); break; } catch {
const deadline = Date.now() + 30;
while (Date.now() < deadline) { /* spin */ }
}
}
try { return fn(); } finally {
if (fd !== -1) try { closeSync(fd); } catch { /* noop */ }
try { unlinkSync(lockPath); } catch { /* already removed */ }
}
function withQueueLock<T>(_meshId: string, fn: () => T): T {
return BeadsDB.getInstance().transaction(fn);
}

function readQueue(meshId: string): MeshWorkQueueEntry[] {
const path = getQueuePath(meshId);
if (!existsSync(path)) return [];
try {
const content = readFileSync(path, 'utf-8');
return JSON.parse(content) as MeshWorkQueueEntry[];
} catch {
return [];
}
return BeadsDB.getInstance().getQueueEntries(meshId);
}

function writeQueue(meshId: string, queue: MeshWorkQueueEntry[]): void {
const path = getQueuePath(meshId);
writeFileSync(path, JSON.stringify(queue, null, 2), 'utf-8');
BeadsDB.getInstance().replaceQueue(meshId, queue);
}

/**
Expand Down Expand Up @@ -189,6 +153,10 @@ export function getQueue(meshId: string, opts?: { status?: MeshTaskStatus[] }):
return queue;
}

export function getMeshQueueRevision(meshId: string): string {
return BeadsDB.getInstance().getQueueRevision(meshId);
}

/**
* Find the next pending task that this node is allowed to claim, and mark it as assigned.
*/
Expand Down Expand Up @@ -408,3 +376,17 @@ export function getMeshQueueStats(meshId: string): MeshWorkQueueStats {
})),
};
}

export function __replaceMeshQueueForTests(meshId: string, queue: MeshWorkQueueEntry[]): void {
BeadsDB.getInstance().transaction(() => {
BeadsDB.getInstance().replaceQueue(meshId, queue);
});
}

export function __clearMeshQueueForTests(meshId: string): void {
BeadsDB.getInstance().deleteQueue(meshId);
}

export function __resetBeadsDBForTests(): void {
BeadsDB.resetForTests();
}
51 changes: 51 additions & 0 deletions packages/daemon-core/test/commands/mesh-status.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,57 @@ describe('mesh_status', () => {
}
})

it('does not reuse cached mesh_status after queue storage changes outside the router', async () => {
const configDir = await mkdtemp(join(tmpdir(), 'mesh-status-queue-cache-'))
const { dir, repoRoot } = await createTempGitRepo('mesh-status-queue-revision-')
const previousConfigDir = process.env.ADHDEV_CONFIG_DIR

try {
process.env.ADHDEV_CONFIG_DIR = configDir
const { createMesh, addNode } = await import('../../src/config/mesh-config.js')
const { enqueueTask, __resetBeadsDBForTests } = await import('../../src/mesh/mesh-work-queue.js')
__resetBeadsDBForTests()

const mesh = createMesh({
name: 'Queue Cache Mesh',
repoIdentity: 'github.com/acme/queue-cache',
defaultBranch: 'master',
})
addNode(mesh.id, { workspace: repoRoot, repoRoot })

const { router, sessionHostControl } = createRouter()
const initial = await router.execute('mesh_status', { meshId: mesh.id, refresh: true }) as any
expect(initial.success).toBe(true)
expect(initial.queue.summary.total).toBe(0)
expect(initial.sourceOfTruth.aggregateSnapshot.cached).toBe(false)

sessionHostControl.listSessions.mockClear()
enqueueTask(mesh.id, 'queue task created by another mesh process')

const afterQueueChange = await router.execute('mesh_status', { meshId: mesh.id }) as any

expect(afterQueueChange.success).toBe(true)
expect(afterQueueChange.queue.summary.total).toBe(1)
expect(afterQueueChange.queue.tasks).toHaveLength(1)
expect(afterQueueChange.queue.tasks[0]).toEqual(expect.objectContaining({
message: 'queue task created by another mesh process',
status: 'pending',
}))
expect(afterQueueChange.sourceOfTruth.aggregateSnapshot).toMatchObject({
cached: false,
refreshReason: 'stale_pending_cache_refresh',
})
expect(sessionHostControl.listSessions).toHaveBeenCalledTimes(1)
} finally {
const { __resetBeadsDBForTests } = await import('../../src/mesh/mesh-work-queue.js')
__resetBeadsDBForTests()
if (previousConfigDir === undefined) delete process.env.ADHDEV_CONFIG_DIR
else process.env.ADHDEV_CONFIG_DIR = previousConfigDir
await rm(configDir, { recursive: true, force: true })
await rm(dir, { recursive: true, force: true })
}
})

it('refreshes instead of returning a stale pending cache hit when direct peer truth is required', async () => {
const { dir, repoRoot } = await createTempGitRepo('mesh-status-stale-cache-refresh-')
try {
Expand Down
Loading