diff --git a/src/database/sync/JsonlVaultWatcher.ts b/src/database/sync/JsonlVaultWatcher.ts index c08ae17e2..03df75369 100644 --- a/src/database/sync/JsonlVaultWatcher.ts +++ b/src/database/sync/JsonlVaultWatcher.ts @@ -279,11 +279,11 @@ export class JsonlVaultWatcher { this.suppressed.delete(key); return false; } - // Consume the suppression so later remote writes aren't silently - // dropped if Obsidian Sync lands quickly after ours. If the plugin - // appends multiple events to the same stream in rapid succession, - // `JSONLWriter` re-suppresses before each append. - this.suppressed.delete(key); + // Don't consume: let the entry live until its TTL expires so that + // shard rotation (which fires two vault events for the same stream) + // doesn't trigger a needless sync on the second event. JSONLWriter + // re-suppresses before each separate append, so remote writes that + // land after the TTL window are never silently dropped. return true; } @@ -291,6 +291,16 @@ export class JsonlVaultWatcher { if (this.debounceTimer !== undefined) { clearTimeout(this.debounceTimer); } + + // Sweep expired suppression entries to prevent unbounded Map growth + // in long-running sessions. Runs at most once per debounce cycle. + const now = Date.now(); + for (const [key, expiry] of this.suppressed) { + if (now > expiry) { + this.suppressed.delete(key); + } + } + this.debounceTimer = setTimeout(() => { this.debounceTimer = undefined; if (!this.running) { diff --git a/src/database/sync/SyncCoordinator.ts b/src/database/sync/SyncCoordinator.ts index 9e67871b7..863541f0c 100644 --- a/src/database/sync/SyncCoordinator.ts +++ b/src/database/sync/SyncCoordinator.ts @@ -37,30 +37,30 @@ function isValidWorkspaceId(id: string): boolean { // Interfaces // ============================================================================ -export interface IJSONLWriter { - getDeviceId(): string; - listFiles(category: 'workspaces' | 'conversations' | 'tasks'): Promise; - getFileModTime(file: string): Promise; - readEvents(file: string): Promise; - getEventsNotFromDevice( - file: string, - deviceId: string, - sinceTimestamp?: number - ): Promise; -} - -export interface ISQLiteCacheManager { - getSyncState(deviceId: string): Promise; - updateSyncState(deviceId: string, lastEventTimestamp: number, fileTimestamps: Record): Promise; - isEventApplied(eventId: string): Promise; - markEventApplied(eventId: string): Promise; - run(sql: string, params?: unknown[]): Promise; - query(sql: string, params?: unknown[]): Promise; - queryOne(sql: string, params?: unknown[]): Promise; - clearAllData(): Promise; - rebuildFTSIndexes(): Promise; - save(): Promise; -} +export interface IJSONLWriter { + getDeviceId(): string; + listFiles(category: 'workspaces' | 'conversations' | 'tasks'): Promise; + getFileModTime(file: string): Promise; + readEvents(file: string): Promise; + getEventsNotFromDevice( + file: string, + deviceId: string, + sinceTimestamp?: number + ): Promise; +} + +export interface ISQLiteCacheManager { + getSyncState(deviceId: string): Promise; + updateSyncState(deviceId: string, lastEventTimestamp: number, fileTimestamps: Record): Promise; + isEventApplied(eventId: string): Promise; + markEventApplied(eventId: string): Promise; + run(sql: string, params?: unknown[]): Promise; + query(sql: string, params?: unknown[]): Promise; + queryOne(sql: string, params?: unknown[]): Promise; + clearAllData(): Promise; + rebuildFTSIndexes(): Promise; + save(): Promise; +} export interface SyncState { deviceId: string; @@ -96,6 +96,11 @@ export class SyncCoordinator { private conversationApplier: ConversationEventApplier; private taskApplier: TaskEventApplier; + /** Guards against overlapping sync() calls. */ + private syncing = false; + /** Set when a sync() call arrives while another is in-flight. */ + private syncQueued = false; + constructor(jsonlWriter: IJSONLWriter, sqliteCache: ISQLiteCacheManager) { this.jsonlWriter = jsonlWriter; this.sqliteCache = sqliteCache; @@ -107,53 +112,79 @@ export class SyncCoordinator { /** * Synchronize JSONL files to SQLite cache. + * + * Guarded by an async mutex: if a sync is already running, the call is + * queued and the in-flight run will re-check for pending changes when it + * finishes. This prevents two overlapping runs from applying the same + * events twice or writing stale timestamps. */ - async sync(options: SyncOptions = {}): Promise { - const startTime = Date.now(); - const errors: string[] = []; - let eventsApplied = 0; - let eventsSkipped = 0; - const filesProcessed: string[] = []; - const nextFileTimestamps: Record = {}; - - try { - if (options.forceRebuild) { - return this.fullRebuild(options); - } - - const syncState = await this.sqliteCache.getSyncState(this.deviceId); - const previousFileTimestamps = syncState?.fileTimestamps ?? {}; - - // Process workspace files - const workspaceResult = await this.processWorkspaceFiles(previousFileTimestamps, options, errors); - eventsApplied += workspaceResult.applied; - eventsSkipped += workspaceResult.skipped; - filesProcessed.push(...workspaceResult.files); - Object.assign(nextFileTimestamps, workspaceResult.fileTimestamps); - - // Process conversation files - const conversationResult = await this.processConversationFiles(previousFileTimestamps, options, errors); - eventsApplied += conversationResult.applied; - eventsSkipped += conversationResult.skipped; - filesProcessed.push(...conversationResult.files); - Object.assign(nextFileTimestamps, conversationResult.fileTimestamps); - - // Process task files - const taskResult = await this.processTaskFiles(previousFileTimestamps, options, errors); - eventsApplied += taskResult.applied; - eventsSkipped += taskResult.skipped; - filesProcessed.push(...taskResult.files); - Object.assign(nextFileTimestamps, taskResult.fileTimestamps); - - // Update sync state and save - await this.sqliteCache.updateSyncState(this.deviceId, Date.now(), nextFileTimestamps); - await this.sqliteCache.save(); + async sync(options: SyncOptions = {}): Promise { + if (this.syncing) { + this.syncQueued = true; + return this.createResult(true, 0, 0, [], Date.now(), []); + } + + this.syncing = true; + try { + return await this.syncInner(options); + } finally { + this.syncing = false; + if (this.syncQueued) { + this.syncQueued = false; + // Re-run to pick up changes that landed during the previous sync. + // Don't await — callers of the queued sync already got their + // early-return result above. + void this.sync(options); + } + } + } + + private async syncInner(options: SyncOptions = {}): Promise { + const startTime = Date.now(); + const errors: string[] = []; + let eventsApplied = 0; + let eventsSkipped = 0; + const filesProcessed: string[] = []; + const nextFileTimestamps: Record = {}; + + try { + if (options.forceRebuild) { + return this.fullRebuild(options); + } + + const syncState = await this.sqliteCache.getSyncState(this.deviceId); + const previousFileTimestamps = syncState?.fileTimestamps ?? {}; + + // Process workspace files + const workspaceResult = await this.processWorkspaceFiles(previousFileTimestamps, options, errors); + eventsApplied += workspaceResult.applied; + eventsSkipped += workspaceResult.skipped; + filesProcessed.push(...workspaceResult.files); + Object.assign(nextFileTimestamps, workspaceResult.fileTimestamps); + + // Process conversation files + const conversationResult = await this.processConversationFiles(previousFileTimestamps, options, errors); + eventsApplied += conversationResult.applied; + eventsSkipped += conversationResult.skipped; + filesProcessed.push(...conversationResult.files); + Object.assign(nextFileTimestamps, conversationResult.fileTimestamps); + + // Process task files + const taskResult = await this.processTaskFiles(previousFileTimestamps, options, errors); + eventsApplied += taskResult.applied; + eventsSkipped += taskResult.skipped; + filesProcessed.push(...taskResult.files); + Object.assign(nextFileTimestamps, taskResult.fileTimestamps); + + // Update sync state and save + await this.sqliteCache.updateSyncState(this.deviceId, Date.now(), nextFileTimestamps); + await this.sqliteCache.save(); options.onProgress?.('Complete', 1, 1); return this.createResult(errors.length === 0, eventsApplied, eventsSkipped, errors, startTime, filesProcessed); - } catch (error) { - return this.createResult(false, eventsApplied, eventsSkipped, [...errors, `Sync failed: ${String(error)}`], startTime, filesProcessed); + } catch (error) { + return this.createResult(false, eventsApplied, eventsSkipped, [...errors, `Sync failed: ${String(error)}`], startTime, filesProcessed); } } @@ -199,8 +230,8 @@ export class SyncCoordinator { options.onProgress?.('Complete', 1, 1); return this.createResult(errors.length === 0, eventsApplied, 0, errors, startTime, filesProcessed); - } catch (error) { - console.error('[SyncCoordinator] Full rebuild failed:', error); + } catch (error) { + console.error('[SyncCoordinator] Full rebuild failed:', error); // Still save sync state so we don't rebuild again on next restart try { await this.sqliteCache.updateSyncState(this.deviceId, Date.now(), {}); @@ -208,7 +239,7 @@ export class SyncCoordinator { } catch (saveError) { console.error('[SyncCoordinator] Failed to save sync state:', saveError); } - return this.createResult(false, eventsApplied, 0, [...errors, `Rebuild failed: ${String(error)}`], startTime, filesProcessed); + return this.createResult(false, eventsApplied, 0, [...errors, `Rebuild failed: ${String(error)}`], startTime, filesProcessed); } } @@ -216,18 +247,18 @@ export class SyncCoordinator { // Private Helpers // ============================================================================ - private async processWorkspaceFiles( - previousFileTimestamps: Record, - options: SyncOptions, - errors: string[] - ): Promise<{ applied: number; skipped: number; files: string[]; fileTimestamps: Record }> { - let applied = 0; - let skipped = 0; - const files: string[] = []; - const fileTimestamps: Record = {}; - - const workspaceFiles = await this.jsonlWriter.listFiles('workspaces'); - options.onProgress?.('Processing workspaces', 0, workspaceFiles.length); + private async processWorkspaceFiles( + previousFileTimestamps: Record, + options: SyncOptions, + errors: string[] + ): Promise<{ applied: number; skipped: number; files: string[]; fileTimestamps: Record }> { + let applied = 0; + let skipped = 0; + const files: string[] = []; + const fileTimestamps: Record = {}; + + const workspaceFiles = await this.jsonlWriter.listFiles('workspaces'); + options.onProgress?.('Processing workspaces', 0, workspaceFiles.length); for (let i = 0; i < workspaceFiles.length; i++) { const file = workspaceFiles[i]; @@ -236,24 +267,24 @@ export class SyncCoordinator { const wsIdMatch = file.match(/ws_(.+)\.jsonl$/); if (wsIdMatch && !isValidWorkspaceId(wsIdMatch[1])) { continue; - } - - try { - const modTime = await this.jsonlWriter.getFileModTime(file); - if (typeof modTime === 'number' && Number.isFinite(modTime)) { - fileTimestamps[file] = modTime; - if ((previousFileTimestamps[file] ?? 0) >= modTime) { - files.push(file); - options.onProgress?.('Processing workspaces', i + 1, workspaceFiles.length); - continue; - } - } - - const events = await this.jsonlWriter.getEventsNotFromDevice( - file, this.deviceId - ); - - for (const event of events) { + } + + try { + const modTime = await this.jsonlWriter.getFileModTime(file); + if (typeof modTime === 'number' && Number.isFinite(modTime)) { + fileTimestamps[file] = modTime; + if ((previousFileTimestamps[file] ?? 0) >= modTime) { + files.push(file); + options.onProgress?.('Processing workspaces', i + 1, workspaceFiles.length); + continue; + } + } + + const events = await this.jsonlWriter.getEventsNotFromDevice( + file, this.deviceId + ); + + for (const event of events) { if (await this.sqliteCache.isEventApplied(event.id)) { skipped++; continue; @@ -265,45 +296,45 @@ export class SyncCoordinator { files.push(file); options.onProgress?.('Processing workspaces', i + 1, workspaceFiles.length); - } catch (e) { - errors.push(`Failed to process ${file}: ${String(e)}`); - } - } - - return { applied, skipped, files, fileTimestamps }; - } - - private async processConversationFiles( - previousFileTimestamps: Record, - options: SyncOptions, - errors: string[] - ): Promise<{ applied: number; skipped: number; files: string[]; fileTimestamps: Record }> { - let applied = 0; - let skipped = 0; - const files: string[] = []; - const fileTimestamps: Record = {}; - - const conversationFiles = await this.jsonlWriter.listFiles('conversations'); - options.onProgress?.('Processing conversations', 0, conversationFiles.length); - - for (let i = 0; i < conversationFiles.length; i++) { - const file = conversationFiles[i]; - try { - const modTime = await this.jsonlWriter.getFileModTime(file); - if (typeof modTime === 'number' && Number.isFinite(modTime)) { - fileTimestamps[file] = modTime; - if ((previousFileTimestamps[file] ?? 0) >= modTime) { - files.push(file); - options.onProgress?.('Processing conversations', i + 1, conversationFiles.length); - continue; - } - } - - const events = await this.jsonlWriter.getEventsNotFromDevice( - file, this.deviceId - ); - - for (const event of events) { + } catch (e) { + errors.push(`Failed to process ${file}: ${String(e)}`); + } + } + + return { applied, skipped, files, fileTimestamps }; + } + + private async processConversationFiles( + previousFileTimestamps: Record, + options: SyncOptions, + errors: string[] + ): Promise<{ applied: number; skipped: number; files: string[]; fileTimestamps: Record }> { + let applied = 0; + let skipped = 0; + const files: string[] = []; + const fileTimestamps: Record = {}; + + const conversationFiles = await this.jsonlWriter.listFiles('conversations'); + options.onProgress?.('Processing conversations', 0, conversationFiles.length); + + for (let i = 0; i < conversationFiles.length; i++) { + const file = conversationFiles[i]; + try { + const modTime = await this.jsonlWriter.getFileModTime(file); + if (typeof modTime === 'number' && Number.isFinite(modTime)) { + fileTimestamps[file] = modTime; + if ((previousFileTimestamps[file] ?? 0) >= modTime) { + files.push(file); + options.onProgress?.('Processing conversations', i + 1, conversationFiles.length); + continue; + } + } + + const events = await this.jsonlWriter.getEventsNotFromDevice( + file, this.deviceId + ); + + for (const event of events) { if (await this.sqliteCache.isEventApplied(event.id)) { skipped++; continue; @@ -315,13 +346,13 @@ export class SyncCoordinator { files.push(file); options.onProgress?.('Processing conversations', i + 1, conversationFiles.length); - } catch (e) { - errors.push(`Failed to process ${file}: ${String(e)}`); - } - } - - return { applied, skipped, files, fileTimestamps }; - } + } catch (e) { + errors.push(`Failed to process ${file}: ${String(e)}`); + } + } + + return { applied, skipped, files, fileTimestamps }; + } private async rebuildWorkspaces( options: SyncOptions, @@ -373,8 +404,8 @@ export class SyncCoordinator { // Save after each file to prevent memory accumulation (OOM prevention) await this.sqliteCache.save(); - } catch (e) { - errors.push(`Failed to process ${file}: ${String(e)}`); + } catch (e) { + errors.push(`Failed to process ${file}: ${String(e)}`); } } @@ -417,45 +448,45 @@ export class SyncCoordinator { // Save after each file to prevent memory accumulation (OOM prevention) await this.sqliteCache.save(); - } catch (e) { - errors.push(`Failed to process ${file}: ${String(e)}`); + } catch (e) { + errors.push(`Failed to process ${file}: ${String(e)}`); } } return { applied, files }; } - private async processTaskFiles( - previousFileTimestamps: Record, - options: SyncOptions, - errors: string[] - ): Promise<{ applied: number; skipped: number; files: string[]; fileTimestamps: Record }> { - let applied = 0; - let skipped = 0; - const files: string[] = []; - const fileTimestamps: Record = {}; - - const taskFiles = await this.jsonlWriter.listFiles('tasks'); - options.onProgress?.('Processing tasks', 0, taskFiles.length); - - for (let i = 0; i < taskFiles.length; i++) { - const file = taskFiles[i]; - try { - const modTime = await this.jsonlWriter.getFileModTime(file); - if (typeof modTime === 'number' && Number.isFinite(modTime)) { - fileTimestamps[file] = modTime; - if ((previousFileTimestamps[file] ?? 0) >= modTime) { - files.push(file); - options.onProgress?.('Processing tasks', i + 1, taskFiles.length); - continue; - } - } - - const events = await this.jsonlWriter.getEventsNotFromDevice( - file, this.deviceId - ); - - for (const event of events) { + private async processTaskFiles( + previousFileTimestamps: Record, + options: SyncOptions, + errors: string[] + ): Promise<{ applied: number; skipped: number; files: string[]; fileTimestamps: Record }> { + let applied = 0; + let skipped = 0; + const files: string[] = []; + const fileTimestamps: Record = {}; + + const taskFiles = await this.jsonlWriter.listFiles('tasks'); + options.onProgress?.('Processing tasks', 0, taskFiles.length); + + for (let i = 0; i < taskFiles.length; i++) { + const file = taskFiles[i]; + try { + const modTime = await this.jsonlWriter.getFileModTime(file); + if (typeof modTime === 'number' && Number.isFinite(modTime)) { + fileTimestamps[file] = modTime; + if ((previousFileTimestamps[file] ?? 0) >= modTime) { + files.push(file); + options.onProgress?.('Processing tasks', i + 1, taskFiles.length); + continue; + } + } + + const events = await this.jsonlWriter.getEventsNotFromDevice( + file, this.deviceId + ); + + for (const event of events) { if (await this.sqliteCache.isEventApplied(event.id)) { skipped++; continue; @@ -467,13 +498,13 @@ export class SyncCoordinator { files.push(file); options.onProgress?.('Processing tasks', i + 1, taskFiles.length); - } catch (e) { - errors.push(`Failed to process ${file}: ${String(e)}`); - } - } - - return { applied, skipped, files, fileTimestamps }; - } + } catch (e) { + errors.push(`Failed to process ${file}: ${String(e)}`); + } + } + + return { applied, skipped, files, fileTimestamps }; + } private async rebuildTasks( options: SyncOptions, @@ -511,9 +542,9 @@ export class SyncCoordinator { // Save after each file to prevent memory accumulation (OOM prevention) await this.sqliteCache.save(); - } catch (e) { - errors.push(`Failed to process ${file}: ${String(e)}`); - } + } catch (e) { + errors.push(`Failed to process ${file}: ${String(e)}`); + } } return { applied, files }; diff --git a/src/settings/tabs/WorkspacesTab.ts b/src/settings/tabs/WorkspacesTab.ts index ce18199e5..e4191447e 100644 --- a/src/settings/tabs/WorkspacesTab.ts +++ b/src/settings/tabs/WorkspacesTab.ts @@ -138,8 +138,30 @@ export class WorkspacesTab { if (workspaceChanges.length > 0) { try { + // If the user is editing a workspace detail form, avoid + // destroying unsaved inputs with a full re-render. + const isEditingDetail = this.currentView === 'detail' && !!this.currentWorkspace?.id; + const editedWorkspaceModified = isEditingDetail && + workspaceChanges.some((m) => m.businessId === this.currentWorkspace!.id); + await this.loadWorkspaces(); - this.render(); + + if (isEditingDetail) { + // The user has a detail form open. If the externally- + // modified workspace is NOT the one being edited we can + // silently refresh the backing list — the detail view + // stays untouched. If it IS the edited workspace we + // still skip the re-render to preserve dirty form state; + // the user will pick up the remote changes next time + // they navigate away and back. + if (!editedWorkspaceModified) { + // List data refreshed; no visual change needed. + } + // else: edited workspace was modified externally — keep + // the user's unsaved edits intact. + } else { + this.render(); + } } catch (error) { console.error('[WorkspacesTab] Failed to refresh workspaces on external-sync:', error); } diff --git a/src/ui/chat/components/ConversationList.ts b/src/ui/chat/components/ConversationList.ts index 6c4af7306..946390ffa 100644 --- a/src/ui/chat/components/ConversationList.ts +++ b/src/ui/chat/components/ConversationList.ts @@ -75,6 +75,9 @@ export class ConversationList { * Render the conversation list */ private render(): void { + // Preserve scroll position across re-renders (M4 fix) + const savedScrollTop = this.container.scrollTop; + this.container.empty(); this.loadMoreBtn = null; // container.empty() destroys child nodes this.container.addClass('conversation-list'); @@ -151,6 +154,9 @@ export class ConversationList { // Load More button this.renderLoadMoreButton(); + + // Restore scroll position after DOM rebuild + this.container.scrollTop = savedScrollTop; } /** diff --git a/tests/unit/JsonlVaultWatcher.test.ts b/tests/unit/JsonlVaultWatcher.test.ts index 7243d947d..638ee0e88 100644 --- a/tests/unit/JsonlVaultWatcher.test.ts +++ b/tests/unit/JsonlVaultWatcher.test.ts @@ -222,19 +222,19 @@ describe('JsonlVaultWatcher', () => { expect(onChange).not.toHaveBeenCalled(); }); - it('only suppresses one event per suppressLogicalPath call', async () => { - const { watcher, fire, onChange } = build({ debounceMs: 100 }); + it('suppresses all events within TTL window (e.g. shard rotation)', async () => { + const { watcher, fire, onChange } = build({ debounceMs: 100, suppressTtlMs: 500 }); watcher.start(); watcher.suppressLogicalPath('conversations/conv_abc.jsonl'); - // First modify: consumed by suppression. - fire('modify', makeTFile('Nexus/data/conversations/conv_abc/shard-000.jsonl')); - // Second modify (e.g. remote write lands right after): NOT suppressed. + // Both modifies (e.g. shard rotation producing two vault events) are + // suppressed within the TTL window — no needless sync triggered. fire('modify', makeTFile('Nexus/data/conversations/conv_abc/shard-000.jsonl')); + fire('modify', makeTFile('Nexus/data/conversations/conv_abc/shard-001.jsonl')); await jest.advanceTimersByTimeAsync(100); - expect(onChange).toHaveBeenCalledTimes(1); + expect(onChange).not.toHaveBeenCalled(); }); it('expires suppression after TTL', async () => { @@ -352,4 +352,194 @@ describe('JsonlVaultWatcher', () => { expect(onChange).toHaveBeenCalledTimes(1); expect(onChange.mock.calls[0][0][0].streamId).toBe('conv_xyz'); }); + + describe('create and delete events', () => { + it('fires onChange for a create event on a new JSONL shard', async () => { + const { watcher, fire, onChange } = build({ debounceMs: 50 }); + watcher.start(); + + fire('create', makeTFile('Nexus/data/conversations/conv_sync/shard-000.jsonl')); + await jest.advanceTimersByTimeAsync(50); + + expect(onChange).toHaveBeenCalledTimes(1); + const modified: ModifiedStream[] = onChange.mock.calls[0][0]; + expect(modified).toHaveLength(1); + expect(modified[0]).toMatchObject({ + category: 'conversations', + streamId: 'conv_sync', + businessId: 'sync' + }); + }); + + it('fires onChange for a delete event on a JSONL shard', async () => { + const { watcher, fire, onChange } = build({ debounceMs: 50 }); + watcher.start(); + + fire('delete', makeTFile('Nexus/data/workspaces/ws_work-1/shard-000.jsonl')); + await jest.advanceTimersByTimeAsync(50); + + expect(onChange).toHaveBeenCalledTimes(1); + const modified: ModifiedStream[] = onChange.mock.calls[0][0]; + expect(modified).toHaveLength(1); + expect(modified[0]).toMatchObject({ + category: 'workspaces', + streamId: 'ws_work-1', + businessId: 'work-1' + }); + }); + + it('classifies stream types correctly for create events across categories', async () => { + const { watcher, fire, onChange } = build({ debounceMs: 50 }); + watcher.start(); + + fire('create', makeTFile('Nexus/data/conversations/conv_chat-1/shard-000.jsonl')); + fire('create', makeTFile('Nexus/data/workspaces/ws_ws-2/shard-000.jsonl')); + fire('create', makeTFile('Nexus/data/tasks/tasks_proj-3/shard-000.jsonl')); + + await jest.advanceTimersByTimeAsync(50); + + expect(onChange).toHaveBeenCalledTimes(1); + const modified: ModifiedStream[] = onChange.mock.calls[0][0]; + expect(modified).toHaveLength(3); + + const byCategory = Object.fromEntries(modified.map((m) => [m.category, m])); + expect(byCategory['conversations']).toMatchObject({ + streamId: 'conv_chat-1', + businessId: 'chat-1' + }); + expect(byCategory['workspaces']).toMatchObject({ + streamId: 'ws_ws-2', + businessId: 'ws-2' + }); + expect(byCategory['tasks']).toMatchObject({ + streamId: 'tasks_proj-3', + businessId: 'proj-3' + }); + }); + + it('ignores create events for non-jsonl files inside the data path', async () => { + const { watcher, fire, onChange } = build({ debounceMs: 50 }); + watcher.start(); + + fire('create', makeTFile('Nexus/data/_meta/storage-manifest.json')); + fire('create', makeTFile('Nexus/data/conversations/conv_abc/metadata.json')); + + await jest.advanceTimersByTimeAsync(50); + expect(onChange).not.toHaveBeenCalled(); + }); + + it('respects self-write suppression for create events', async () => { + const { watcher, fire, onChange } = build({ debounceMs: 50 }); + watcher.start(); + + watcher.suppressLogicalPath('conversations/conv_local.jsonl'); + fire('create', makeTFile('Nexus/data/conversations/conv_local/shard-000.jsonl')); + + await jest.advanceTimersByTimeAsync(50); + expect(onChange).not.toHaveBeenCalled(); + }); + }); + + describe('error resilience', () => { + it('continues processing events after onChange throws synchronously', async () => { + const onChange = jest.fn, [ModifiedStream[]]>(); + const { app, fire } = createMockApp(); + const watcher = new JsonlVaultWatcher({ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + app: app as any, + dataPath: 'Nexus/data', + onChange, + debounceMs: 50 + }); + + // First call throws. + onChange.mockImplementationOnce(() => { throw new Error('boom'); }); + // Second call succeeds. + onChange.mockImplementationOnce(() => Promise.resolve()); + + watcher.start(); + + // First event — onChange throws. + fire('modify', makeTFile('Nexus/data/conversations/conv_a/shard-000.jsonl')); + await jest.advanceTimersByTimeAsync(50); + expect(onChange).toHaveBeenCalledTimes(1); + + // Second event — should still work (dispatching flag was reset). + fire('modify', makeTFile('Nexus/data/conversations/conv_b/shard-000.jsonl')); + await jest.advanceTimersByTimeAsync(50); + expect(onChange).toHaveBeenCalledTimes(2); + }); + + it('continues processing events after onChange rejects', async () => { + const onChange = jest.fn, [ModifiedStream[]]>(); + const { app, fire } = createMockApp(); + const watcher = new JsonlVaultWatcher({ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + app: app as any, + dataPath: 'Nexus/data', + onChange, + debounceMs: 50 + }); + + // First call rejects. + onChange.mockImplementationOnce(() => Promise.reject(new Error('async boom'))); + // Second call succeeds. + onChange.mockImplementationOnce(() => Promise.resolve()); + + watcher.start(); + + // First event — onChange rejects. + fire('modify', makeTFile('Nexus/data/conversations/conv_a/shard-000.jsonl')); + await jest.advanceTimersByTimeAsync(50); + // Allow the rejected promise to settle. + await Promise.resolve(); + await Promise.resolve(); + expect(onChange).toHaveBeenCalledTimes(1); + + // Second event — should still work (dispatching flag was reset by finally). + fire('modify', makeTFile('Nexus/data/conversations/conv_b/shard-000.jsonl')); + await jest.advanceTimersByTimeAsync(50); + expect(onChange).toHaveBeenCalledTimes(2); + }); + + it('processes queued changes after onChange throws during dispatch', async () => { + const onChange = jest.fn, [ModifiedStream[]]>(); + const { app, fire } = createMockApp(); + const watcher = new JsonlVaultWatcher({ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + app: app as any, + dataPath: 'Nexus/data', + onChange, + debounceMs: 50 + }); + + // First call: long-running then throws. Second: succeeds. + let rejectFirst: ((err: Error) => void) | undefined; + onChange.mockImplementationOnce( + () => new Promise((_, reject) => { rejectFirst = reject; }) + ); + onChange.mockImplementationOnce(() => Promise.resolve()); + + watcher.start(); + + // First event triggers dispatch. + fire('modify', makeTFile('Nexus/data/conversations/conv_a/shard-000.jsonl')); + await jest.advanceTimersByTimeAsync(50); + expect(onChange).toHaveBeenCalledTimes(1); + + // While first dispatch is in-flight, a new change lands (queued). + fire('modify', makeTFile('Nexus/data/conversations/conv_b/shard-000.jsonl')); + await jest.advanceTimersByTimeAsync(50); + + // First dispatch fails — queued dispatch should still fire. + rejectFirst?.(new Error('dispatch failure')); + await Promise.resolve(); + await Promise.resolve(); + await jest.advanceTimersByTimeAsync(50); + + expect(onChange).toHaveBeenCalledTimes(2); + const secondBatch = onChange.mock.calls[1][0]; + expect(secondBatch.map((s: ModifiedStream) => s.streamId)).toEqual(['conv_b']); + }); + }); });