diff --git a/desktop/CodexProviderSync.Core.Tests/CoreIntegrationTests.cs b/desktop/CodexProviderSync.Core.Tests/CoreIntegrationTests.cs index 9f300a6..4bdff36 100644 --- a/desktop/CodexProviderSync.Core.Tests/CoreIntegrationTests.cs +++ b/desktop/CodexProviderSync.Core.Tests/CoreIntegrationTests.cs @@ -84,6 +84,66 @@ await fixture.WriteStateDbAsync( Assert.Contains("\"model_provider\":\"apigather\"", rollout); } + [Fact] + public async Task RunSync_NormalizesRolloutLastWriteTimeToSqliteUpdatedAtMs() + { + TestCodexHomeFixture fixture = await TestCodexHomeFixture.CreateAsync(); + await fixture.WriteConfigAsync("model_provider = \"openai\""); + string sessionPath = fixture.RolloutPath("sessions", "rollout-a.jsonl"); + const string rolloutActivityIso = "2026-03-22T15:45:30.000Z"; + const string sqliteActivityIso = "2026-03-21T09:10:11.123Z"; + await fixture.WriteRolloutLinesAsync( + sessionPath, + """ + {"timestamp":"2026-03-19T00:00:20.000Z","type":"session_meta","payload":{"id":"thread-a","timestamp":"2026-03-19T00:00:00.000Z","cwd":"C:\\AITemp","source":"cli","cli_version":"0.115.0","model_provider":"apigather"}} + """, + $$$""" + {"timestamp":"{{{rolloutActivityIso}}}","type":"event_msg","payload":{"type":"assistant_message","message":"latest"}} + """); + DateTime wrongTimestamp = new(2026, 4, 1, 0, 0, 0, DateTimeKind.Utc); + File.SetLastWriteTimeUtc(sessionPath, wrongTimestamp); + DateTimeOffset sqliteActivity = DateTimeOffset.Parse(sqliteActivityIso, null, System.Globalization.DateTimeStyles.RoundtripKind).ToUniversalTime(); + await fixture.WriteStateDbAsync( + [ + ("thread-a", "apigather", false, sqliteActivity.ToUnixTimeSeconds(), sqliteActivity.ToUnixTimeMilliseconds()) + ]); + + CodexSyncService service = new(); + await service.RunSyncAsync(fixture.CodexHome); + + DateTime expectedUtc = sqliteActivity.UtcDateTime; + DateTime actualUtc = File.GetLastWriteTimeUtc(sessionPath); + Assert.InRange(Math.Abs((actualUtc - expectedUtc).TotalSeconds), 0, 2); + } + + [Fact] + public async Task RunSync_PreservesRolloutLastWriteTime_WhenSqliteActivityIsUnavailable() + { + TestCodexHomeFixture fixture = await TestCodexHomeFixture.CreateAsync(); + await fixture.WriteConfigAsync("model_provider = \"openai\""); + string sessionPath = fixture.RolloutPath("sessions", "rollout-a.jsonl"); + await fixture.WriteRolloutLinesAsync( + sessionPath, + """ + {"timestamp":"2026-03-19T00:00:20.000Z","type":"session_meta","payload":{"id":"thread-a","timestamp":"2026-03-19T00:00:00.000Z","cwd":"C:\\AITemp","source":"cli","cli_version":"0.115.0","model_provider":"apigather"}} + """, + """ + {"timestamp":"2026-03-25T12:00:00.000Z","type":"event_msg","payload":{"type":"assistant_message","message":"later in rollout only"}} + """); + DateTime preservedTimestamp = new(2026, 4, 1, 0, 0, 0, DateTimeKind.Utc); + File.SetLastWriteTimeUtc(sessionPath, preservedTimestamp); + await fixture.WriteStateDbAsync( + [ + ("thread-a", "apigather", false) + ]); + + CodexSyncService service = new(); + await service.RunSyncAsync(fixture.CodexHome); + + DateTime actualUtc = File.GetLastWriteTimeUtc(sessionPath); + Assert.InRange(Math.Abs((actualUtc - preservedTimestamp).TotalSeconds), 0, 2); + } + [Fact] public async Task GetStatus_ReportsImplicitDefaultProviderAndCounts() { diff --git a/desktop/CodexProviderSync.Core.Tests/TestCodexHomeFixture.cs b/desktop/CodexProviderSync.Core.Tests/TestCodexHomeFixture.cs index 6cf96ad..50b0c98 100644 --- a/desktop/CodexProviderSync.Core.Tests/TestCodexHomeFixture.cs +++ b/desktop/CodexProviderSync.Core.Tests/TestCodexHomeFixture.cs @@ -77,6 +77,11 @@ public async Task WriteRolloutAsync(string filePath, string id, string provider) await File.WriteAllTextAsync(filePath, $"{first}\n{second}\n"); } + public Task WriteRolloutLinesAsync(string filePath, params string[] lines) + { + return File.WriteAllTextAsync(filePath, string.Join("\n", lines) + "\n"); + } + public async Task WriteBackupAsync(string directoryName, params (string RelativePath, string Content)[] files) { string backupDir = BackupPath(directoryName); @@ -113,6 +118,26 @@ public async Task WriteBackupAsync(string directoryName, params (string Re } public async Task WriteStateDbAsync(IEnumerable<(string Id, string ModelProvider, bool Archived)> rows) + { + await WriteStateDbRowsAsync(rows.Select(static row => new ThreadStateRow( + row.Id, + row.ModelProvider, + row.Archived, + null, + null))); + } + + public async Task WriteStateDbAsync(IEnumerable<(string Id, string ModelProvider, bool Archived, long? UpdatedAt, long? UpdatedAtMs)> rows) + { + await WriteStateDbRowsAsync(rows.Select(static row => new ThreadStateRow( + row.Id, + row.ModelProvider, + row.Archived, + row.UpdatedAt, + row.UpdatedAtMs))); + } + + private async Task WriteStateDbRowsAsync(IEnumerable rows) { string dbPath = Path.Combine(CodexHome, "state_5.sqlite"); await using SqliteConnection connection = OpenSqliteConnection(); @@ -121,6 +146,10 @@ public async Task WriteStateDbAsync(IEnumerable<(string Id, string ModelProvider create.CommandText = """ CREATE TABLE threads ( id TEXT PRIMARY KEY, + rollout_path TEXT, + created_at INTEGER, + updated_at INTEGER, + updated_at_ms INTEGER, model_provider TEXT, archived INTEGER NOT NULL DEFAULT 0, first_user_message TEXT NOT NULL DEFAULT '' @@ -128,16 +157,27 @@ first_user_message TEXT NOT NULL DEFAULT '' """; await create.ExecuteNonQueryAsync(); - foreach ((string id, string modelProvider, bool archived) in rows) + foreach (ThreadStateRow row in rows) { SqliteCommand insert = connection.CreateCommand(); insert.CommandText = """ - INSERT INTO threads (id, model_provider, archived, first_user_message) - VALUES ($id, $provider, $archived, 'hello') + INSERT INTO threads ( + id, + rollout_path, + created_at, + updated_at, + updated_at_ms, + model_provider, + archived, + first_user_message + ) + VALUES ($id, '', NULL, $updatedAt, $updatedAtMs, $provider, $archived, 'hello') """; - insert.Parameters.AddWithValue("$id", id); - insert.Parameters.AddWithValue("$provider", modelProvider); - insert.Parameters.AddWithValue("$archived", archived ? 1 : 0); + insert.Parameters.AddWithValue("$id", row.Id); + insert.Parameters.AddWithValue("$updatedAt", (object?)row.UpdatedAt ?? DBNull.Value); + insert.Parameters.AddWithValue("$updatedAtMs", (object?)row.UpdatedAtMs ?? DBNull.Value); + insert.Parameters.AddWithValue("$provider", row.ModelProvider); + insert.Parameters.AddWithValue("$archived", row.Archived ? 1 : 0); await insert.ExecuteNonQueryAsync(); } } @@ -146,4 +186,11 @@ public SqliteConnection OpenSqliteConnection() { return new SqliteConnection($"Data Source={Path.Combine(CodexHome, "state_5.sqlite")};Mode=ReadWriteCreate;Pooling=False"); } + + private sealed record ThreadStateRow( + string Id, + string ModelProvider, + bool Archived, + long? UpdatedAt, + long? UpdatedAtMs); } diff --git a/desktop/CodexProviderSync.Core/BackupService.cs b/desktop/CodexProviderSync.Core/BackupService.cs index 3727429..bca813d 100644 --- a/desktop/CodexProviderSync.Core/BackupService.cs +++ b/desktop/CodexProviderSync.Core/BackupService.cs @@ -57,7 +57,8 @@ public async Task CreateBackupAsync( { Path = change.Path, OriginalFirstLine = change.OriginalFirstLine, - OriginalSeparator = change.OriginalSeparator + OriginalSeparator = change.OriginalSeparator, + NormalizedLastWriteTimeUtcTicks = change.NormalizedLastWriteTimeUtcTicks }).ToList() }; await File.WriteAllTextAsync( @@ -177,7 +178,8 @@ await File.ReadAllTextAsync(metadataPath), { Path = change.Path, OriginalFirstLine = change.OriginalFirstLine, - OriginalSeparator = change.OriginalSeparator + OriginalSeparator = change.OriginalSeparator, + NormalizedLastWriteTimeUtcTicks = change.NormalizedLastWriteTimeUtcTicks }).ToList() }; metadata = new BackupMetadataFile diff --git a/desktop/CodexProviderSync.Core/CodexSyncService.cs b/desktop/CodexProviderSync.Core/CodexSyncService.cs index 4f17706..ed7d5ae 100644 --- a/desktop/CodexProviderSync.Core/CodexSyncService.cs +++ b/desktop/CodexProviderSync.Core/CodexSyncService.cs @@ -93,6 +93,17 @@ public async Task RunSyncAsync( await using LockHandle _ = await _lockService.AcquireLockAsync(codexHome, "sync"); SessionChangeCollection sessionInfo = await _sessionRolloutService.CollectSessionChangesAsync(codexHome, targetProvider, skipLockedReads: true); + IReadOnlyDictionary threadActivityUtcTicks = await _sqliteStateService.ReadThreadActivityUtcTicksByIdAsync( + codexHome, + sessionInfo.Changes.Select(static change => change.ThreadId)); + foreach (SessionChange change in sessionInfo.Changes) + { + if (change.ThreadId is not null + && threadActivityUtcTicks.TryGetValue(change.ThreadId, out long normalizedTicks)) + { + change.NormalizedLastWriteTimeUtcTicks = normalizedTicks; + } + } (IReadOnlyList writableChanges, IReadOnlyList lockedChanges) = await _sessionRolloutService.SplitLockedSessionChangesAsync(sessionInfo.Changes); diff --git a/desktop/CodexProviderSync.Core/Models.cs b/desktop/CodexProviderSync.Core/Models.cs index 1e09e75..f0ef864 100644 --- a/desktop/CodexProviderSync.Core/Models.cs +++ b/desktop/CodexProviderSync.Core/Models.cs @@ -46,6 +46,7 @@ public sealed class SessionChange public required int OriginalOffset { get; init; } public required long OriginalFileLength { get; init; } public required long OriginalLastWriteTimeUtcTicks { get; init; } + public required long NormalizedLastWriteTimeUtcTicks { get; set; } public required string UpdatedFirstLine { get; init; } } @@ -159,4 +160,5 @@ internal sealed class SessionBackupManifestEntry public required string Path { get; init; } public required string OriginalFirstLine { get; init; } public required string OriginalSeparator { get; init; } + public long? NormalizedLastWriteTimeUtcTicks { get; init; } } diff --git a/desktop/CodexProviderSync.Core/SessionRolloutService.cs b/desktop/CodexProviderSync.Core/SessionRolloutService.cs index 9ddacc8..13795f7 100644 --- a/desktop/CodexProviderSync.Core/SessionRolloutService.cs +++ b/desktop/CodexProviderSync.Core/SessionRolloutService.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.Text; +using System.Text.Json; using System.Text.Json.Nodes; namespace CodexProviderSync.Core; @@ -63,6 +64,7 @@ record = await ReadFirstLineRecordAsync(rolloutPath); OriginalOffset = record.Offset, OriginalFileLength = snapshot.Length, OriginalLastWriteTimeUtcTicks = snapshot.LastWriteTimeUtcTicks, + NormalizedLastWriteTimeUtcTicks = snapshot.LastWriteTimeUtcTicks, UpdatedFirstLine = root!.ToJsonString() }); } @@ -157,7 +159,11 @@ internal async Task RestoreSessionChangesAsync(IEnumerable changes) { Path = change.Path, OriginalFirstLine = change.OriginalFirstLine, - OriginalSeparator = change.OriginalSeparator + OriginalSeparator = change.OriginalSeparator, + NormalizedLastWriteTimeUtcTicks = change.NormalizedLastWriteTimeUtcTicks })); } @@ -226,26 +233,30 @@ private async Task TryRewriteCollectedSessionChangeAsync(SessionChange cha { try { - await using FileStream sourceStream = OpenExclusiveRewriteStream(change.Path); - if (sourceStream.Length != change.OriginalFileLength) + await using (FileStream sourceStream = OpenExclusiveRewriteStream(change.Path)) { - return false; - } + if (sourceStream.Length != change.OriginalFileLength) + { + return false; + } - FirstLineRecord current = await ReadFirstLineRecordAsync(sourceStream); - if (!string.Equals(current.FirstLine, change.OriginalFirstLine, StringComparison.Ordinal) - || current.Offset != change.OriginalOffset) - { - return false; + FirstLineRecord current = await ReadFirstLineRecordAsync(sourceStream); + if (!string.Equals(current.FirstLine, change.OriginalFirstLine, StringComparison.Ordinal) + || current.Offset != change.OriginalOffset) + { + return false; + } + + await RewriteFirstLineAsync( + sourceStream, + change.Path, + change.UpdatedFirstLine, + change.OriginalSeparator, + change.OriginalOffset, + headerOnly: change.OriginalOffset >= change.OriginalFileLength); } - await RewriteFirstLineAsync( - sourceStream, - change.Path, - change.UpdatedFirstLine, - change.OriginalSeparator, - change.OriginalOffset, - headerOnly: change.OriginalOffset >= change.OriginalFileLength); + ApplyNormalizedLastWriteTime(change.Path, change.NormalizedLastWriteTimeUtcTicks); return true; } catch (Exception error) when (IsRolloutFileBusyError(error)) @@ -254,15 +265,23 @@ await RewriteFirstLineAsync( } } - private async Task RewriteFirstLineAsync(string filePath, string nextFirstLine, string separator) + private async Task RewriteFirstLineAsync( + string filePath, + string nextFirstLine, + string separator, + long? normalizedLastWriteTimeUtcTicks = null) { try { - await using FileStream sourceStream = OpenExclusiveRewriteStream(filePath); - FirstLineRecord current = await ReadFirstLineRecordAsync(sourceStream); - bool headerOnly = string.IsNullOrEmpty(current.Separator) - && current.Offset == Encoding.UTF8.GetByteCount(current.FirstLine); - await RewriteFirstLineAsync(sourceStream, filePath, nextFirstLine, separator, current.Offset, headerOnly); + await using (FileStream sourceStream = OpenExclusiveRewriteStream(filePath)) + { + FirstLineRecord current = await ReadFirstLineRecordAsync(sourceStream); + bool headerOnly = string.IsNullOrEmpty(current.Separator) + && current.Offset == Encoding.UTF8.GetByteCount(current.FirstLine); + await RewriteFirstLineAsync(sourceStream, filePath, nextFirstLine, separator, current.Offset, headerOnly); + } + + ApplyNormalizedLastWriteTime(filePath, normalizedLastWriteTimeUtcTicks); } catch (Exception error) { @@ -399,6 +418,18 @@ private static FileSnapshot GetFileSnapshot(string filePath) return new FileSnapshot(fileInfo.Length, fileInfo.LastWriteTimeUtc.Ticks); } + private static void ApplyNormalizedLastWriteTime(string filePath, long? normalizedLastWriteTimeUtcTicks) + { + if (normalizedLastWriteTimeUtcTicks is null or <= 0) + { + return; + } + + DateTime normalizedUtc = new(normalizedLastWriteTimeUtcTicks.Value, DateTimeKind.Utc); + File.SetLastWriteTimeUtc(filePath, normalizedUtc); + File.SetLastAccessTimeUtc(filePath, normalizedUtc); + } + private static async Task> FindLockedFilesAsync(IEnumerable filePaths) { List lockedPaths = []; diff --git a/desktop/CodexProviderSync.Core/SqliteStateService.cs b/desktop/CodexProviderSync.Core/SqliteStateService.cs index dabef24..bc6b835 100644 --- a/desktop/CodexProviderSync.Core/SqliteStateService.cs +++ b/desktop/CodexProviderSync.Core/SqliteStateService.cs @@ -59,6 +59,78 @@ FROM threads }; } + public async Task> ReadThreadActivityUtcTicksByIdAsync( + string codexHome, + IEnumerable threadIds) + { + string dbPath = StateDbPath(codexHome); + if (!File.Exists(dbPath)) + { + return new Dictionary(StringComparer.Ordinal); + } + + string[] normalizedIds = threadIds + .Where(static id => !string.IsNullOrWhiteSpace(id)) + .Select(static id => id!) + .Distinct(StringComparer.Ordinal) + .ToArray(); + if (normalizedIds.Length == 0) + { + return new Dictionary(StringComparer.Ordinal); + } + + await using SqliteConnection connection = OpenConnection(dbPath); + try + { + await connection.OpenAsync(); + string? activityColumn = await GetThreadActivityColumnAsync(connection); + if (activityColumn is null) + { + return new Dictionary(StringComparer.Ordinal); + } + + Dictionary result = new(StringComparer.Ordinal); + foreach (string[] chunk in normalizedIds.Chunk(500)) + { + await using SqliteCommand command = connection.CreateCommand(); + string[] parameterNames = new string[chunk.Length]; + for (int index = 0; index < chunk.Length; index += 1) + { + parameterNames[index] = $"$id{index}"; + command.Parameters.AddWithValue(parameterNames[index], chunk[index]); + } + + command.CommandText = $""" + SELECT id, {activityColumn} AS activity_value + FROM threads + WHERE id IN ({string.Join(", ", parameterNames)}) + """; + + await using SqliteDataReader reader = await command.ExecuteReaderAsync(); + while (await reader.ReadAsync()) + { + if (reader.IsDBNull(1)) + { + continue; + } + + long rawValue = reader.GetInt64(1); + long? utcTicks = ToActivityUtcTicks(activityColumn, rawValue); + if (utcTicks is not null) + { + result[reader.GetString(0)] = utcTicks.Value; + } + } + } + + return result; + } + catch + { + return new Dictionary(StringComparer.Ordinal); + } + } + public async Task AssertSqliteWritableAsync(string codexHome, int? busyTimeoutMs = null) { string dbPath = StateDbPath(codexHome); @@ -168,6 +240,46 @@ private static async Task ExecuteNonQueryAsync(SqliteConnection connection, stri await command.ExecuteNonQueryAsync(); } + private static async Task GetThreadActivityColumnAsync(SqliteConnection connection) + { + await using SqliteCommand command = connection.CreateCommand(); + command.CommandText = "PRAGMA table_info(threads)"; + + HashSet columns = new(StringComparer.Ordinal); + await using SqliteDataReader reader = await command.ExecuteReaderAsync(); + while (await reader.ReadAsync()) + { + columns.Add(reader.GetString(1)); + } + + if (columns.Contains("updated_at_ms")) + { + return "updated_at_ms"; + } + + if (columns.Contains("updated_at")) + { + return "updated_at"; + } + + return null; + } + + private static long? ToActivityUtcTicks(string activityColumn, long rawValue) + { + if (rawValue <= 0) + { + return null; + } + + return activityColumn switch + { + "updated_at_ms" => DateTimeOffset.FromUnixTimeMilliseconds(rawValue).UtcDateTime.Ticks, + "updated_at" => DateTimeOffset.FromUnixTimeSeconds(rawValue).UtcDateTime.Ticks, + _ => null + }; + } + private static Exception WrapSqliteBusyError(Exception error, string action) { if (error is not SqliteException sqliteError diff --git a/src/backup.js b/src/backup.js index 7f37af5..d279601 100644 --- a/src/backup.js +++ b/src/backup.js @@ -64,7 +64,8 @@ export async function createBackup({ files: sessionChanges.map((change) => ({ path: change.path, originalFirstLine: change.originalFirstLine, - originalSeparator: change.originalSeparator + originalSeparator: change.originalSeparator, + lastActivityTimestampMs: change.lastActivityTimestampMs ?? null })) }; await fs.writeFile( @@ -103,7 +104,8 @@ export async function updateSessionBackupManifest(backupDir, sessionChanges) { sessionManifest.files = sessionChanges.map((change) => ({ path: change.path, originalFirstLine: change.originalFirstLine, - originalSeparator: change.originalSeparator + originalSeparator: change.originalSeparator, + lastActivityTimestampMs: change.lastActivityTimestampMs ?? null })); metadata.changedSessionFiles = sessionChanges.length; diff --git a/src/service.js b/src/service.js index 1251822..81f3e36 100644 --- a/src/service.js +++ b/src/service.js @@ -32,6 +32,7 @@ import { } from "./session-files.js"; import { assertSqliteWritable, + readSqliteThreadActivityTimestamps, readSqliteProviderCounts, updateSqliteProvider } from "./sqlite-state.js"; @@ -145,6 +146,16 @@ export async function runSync({ lockedPaths: lockedReadPaths, providerCounts } = await collectSessionChanges(codexHome, targetProvider, { skipLockedReads: true }); + const threadActivityTimestamps = await readSqliteThreadActivityTimestamps( + codexHome, + changes.map((change) => change.threadId) + ); + for (const change of changes) { + const timestampMs = threadActivityTimestamps.get(change.threadId); + if (Number.isFinite(timestampMs)) { + change.lastActivityTimestampMs = timestampMs; + } + } emitProgress(onProgress, { stage: "scan_rollout_files", status: "complete", @@ -269,7 +280,8 @@ export async function runSync({ await restoreSessionChanges(appliedSessionChanges.map((change) => ({ path: change.path, originalFirstLine: change.originalFirstLine, - originalSeparator: change.originalSeparator + originalSeparator: change.originalSeparator, + lastActivityTimestampMs: change.lastActivityTimestampMs ?? null }))); } catch (restoreError) { throw new Error( diff --git a/src/session-files.js b/src/session-files.js index ee7e10a..c6e1250 100644 --- a/src/session-files.js +++ b/src/session-files.js @@ -35,6 +35,15 @@ async function getFileSnapshot(filePath) { }; } +async function applyNormalizedMtime(filePath, normalizedMtimeMs) { + if (!Number.isFinite(normalizedMtimeMs)) { + return; + } + + const normalizedDate = new Date(normalizedMtimeMs); + await fsp.utimes(filePath, normalizedDate, normalizedDate); +} + function snapshotMatches(change, snapshot) { return change.originalSize === snapshot.size && change.originalMtimeMs === snapshot.mtimeMs; @@ -305,7 +314,7 @@ async function invokeWindowsExclusiveRewrite(change, options) { return result; } -async function rewriteFirstLine(filePath, nextFirstLine, separator) { +async function rewriteFirstLine(filePath, nextFirstLine, separator, normalizedMtimeMs = null) { if (process.platform === "win32") { const result = await invokeWindowsExclusiveRewrite( { @@ -355,6 +364,7 @@ async function rewriteFirstLine(filePath, nextFirstLine, separator) { }); await fsp.rename(tmpPath, filePath); + await applyNormalizedMtime(filePath, normalizedMtimeMs); } catch (error) { await fsp.rm(tmpPath, { force: true }); throw wrapRolloutFileBusyError(error, filePath, "rewrite"); @@ -404,6 +414,7 @@ async function tryRewriteCollectedFirstLine(change) { } await fsp.rename(tmpPath, change.path); + await applyNormalizedMtime(change.path, change.lastActivityTimestampMs); return true; } catch (error) { await fsp.rm(tmpPath, { force: true }); @@ -502,6 +513,7 @@ export async function collectSessionChanges(codexHome, targetProvider, options = originalOffset: record.offset, originalSize: snapshot.size, originalMtimeMs: snapshot.mtimeMs, + lastActivityTimestampMs: snapshot.mtimeMs, updatedFirstLine: JSON.stringify(parsed) }); } @@ -521,6 +533,10 @@ export async function applySessionChanges(changes) { const results = await invokeWindowsExclusiveRewriteBatch(normalizedChanges, { requireOriginalMatch: true }); for (let index = 0; index < normalizedChanges.length; index += 1) { if (results[index] === "APPLIED") { + await applyNormalizedMtime( + normalizedChanges[index].path, + normalizedChanges[index].lastActivityTimestampMs + ); appliedChanges += 1; appliedPaths.push(normalizedChanges[index].path); } else { @@ -606,7 +622,8 @@ export async function restoreSessionChanges(manifestEntries) { const changes = manifestEntries.map((entry) => ({ path: entry.path, separator: entry.originalSeparator ?? "\n", - updatedFirstLine: entry.originalFirstLine + updatedFirstLine: entry.originalFirstLine, + lastActivityTimestampMs: entry.lastActivityTimestampMs })); const results = await invokeWindowsExclusiveRewriteBatch(changes, { requireOriginalMatch: false }); const firstFailureIndex = results.findIndex((result) => result !== "APPLIED"); @@ -616,11 +633,20 @@ export async function restoreSessionChanges(manifestEntries) { `Unable to rewrite rollout file because it is currently in use. Close Codex and the Codex app, then retry. Locked file: ${filePath}` ); } + + for (const change of changes) { + await applyNormalizedMtime(change.path, change.lastActivityTimestampMs); + } return; } for (const entry of manifestEntries) { - await rewriteFirstLine(entry.path, entry.originalFirstLine, entry.originalSeparator ?? "\n"); + await rewriteFirstLine( + entry.path, + entry.originalFirstLine, + entry.originalSeparator ?? "\n", + entry.lastActivityTimestampMs ?? null + ); } } diff --git a/src/sqlite-state.js b/src/sqlite-state.js index 9456669..12348c4 100644 --- a/src/sqlite-state.js +++ b/src/sqlite-state.js @@ -14,6 +14,40 @@ function openDatabase(dbPath) { return new DatabaseSync(dbPath); } +function normalizeThreadIds(threadIds) { + return [...new Set( + (threadIds ?? []) + .filter((threadId) => typeof threadId === "string") + .map((threadId) => threadId.trim()) + .filter(Boolean) + )]; +} + +function detectThreadActivityColumn(db) { + const rows = db.prepare("PRAGMA table_info(threads)").all(); + const columns = new Set(rows.map((row) => row.name)); + if (columns.has("updated_at_ms")) { + return "updated_at_ms"; + } + if (columns.has("updated_at")) { + return "updated_at"; + } + return null; +} + +function toActivityTimestampMs(columnName, value) { + if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) { + return null; + } + if (columnName === "updated_at_ms") { + return value; + } + if (columnName === "updated_at") { + return value * 1000; + } + return null; +} + function normalizeBusyTimeoutMs(busyTimeoutMs) { return Number.isInteger(busyTimeoutMs) && busyTimeoutMs >= 0 ? busyTimeoutMs @@ -74,6 +108,53 @@ export async function readSqliteProviderCounts(codexHome) { } } +export async function readSqliteThreadActivityTimestamps(codexHome, threadIds) { + const normalizedThreadIds = normalizeThreadIds(threadIds); + if (normalizedThreadIds.length === 0) { + return new Map(); + } + + const dbPath = stateDbPath(codexHome); + try { + await fs.access(dbPath); + } catch { + return new Map(); + } + + const db = openDatabase(dbPath); + try { + const activityColumn = detectThreadActivityColumn(db); + if (!activityColumn) { + return new Map(); + } + + const result = new Map(); + const chunkSize = 500; + for (let index = 0; index < normalizedThreadIds.length; index += chunkSize) { + const chunk = normalizedThreadIds.slice(index, index + chunkSize); + const placeholders = chunk.map(() => "?").join(", "); + const rows = db.prepare(` + SELECT id, ${activityColumn} AS activity_value + FROM threads + WHERE id IN (${placeholders}) + `).all(...chunk); + + for (const row of rows) { + const timestampMs = toActivityTimestampMs(activityColumn, row.activity_value); + if (Number.isFinite(timestampMs)) { + result.set(row.id, timestampMs); + } + } + } + + return result; + } catch { + return new Map(); + } finally { + db.close(); + } +} + export async function assertSqliteWritable(codexHome, options = {}) { const dbPath = stateDbPath(codexHome); try { diff --git a/test/sync-service.test.js b/test/sync-service.test.js index 8548405..39f8bac 100644 --- a/test/sync-service.test.js +++ b/test/sync-service.test.js @@ -49,6 +49,10 @@ async function writeCustomRollout(filePath, payload, message = "hi") { await fs.writeFile(filePath, `${lines.join("\n")}\n`, "utf8"); } +async function writeRolloutLines(filePath, lines) { + await fs.writeFile(filePath, `${lines.join("\n")}\n`, "utf8"); +} + function backupRoot(codexHome) { return path.join(codexHome, "backups_state", "provider-sync"); } @@ -94,14 +98,38 @@ async function writeStateDb(codexHome, rows) { db.exec(` CREATE TABLE threads ( id TEXT PRIMARY KEY, + rollout_path TEXT, + created_at INTEGER, + updated_at INTEGER, + updated_at_ms INTEGER, model_provider TEXT, archived INTEGER NOT NULL DEFAULT 0, first_user_message TEXT NOT NULL DEFAULT '' ) `); - const stmt = db.prepare("INSERT INTO threads (id, model_provider, archived, first_user_message) VALUES (?, ?, ?, ?)"); + const stmt = db.prepare(` + INSERT INTO threads ( + id, + rollout_path, + created_at, + updated_at, + updated_at_ms, + model_provider, + archived, + first_user_message + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `); for (const row of rows) { - stmt.run(row.id, row.model_provider, row.archived ? 1 : 0, row.first_user_message ?? "hello"); + stmt.run( + row.id, + row.rollout_path ?? "", + row.created_at ?? null, + row.updated_at ?? null, + row.updated_at_ms ?? null, + row.model_provider, + row.archived ? 1 : 0, + row.first_user_message ?? "hello" + ); } } finally { db.close(); @@ -274,6 +302,90 @@ test("runSync reports stage progress and backup duration", async () => { assert.ok(backupCompleteEvent.durationMs >= 0); }); +test("runSync normalizes rollout mtime to sqlite thread updated_at_ms", async () => { + const { codexHome } = await makeTempCodexHome(); + await writeConfig(codexHome, 'model_provider = "openai"'); + const sessionPath = path.join(codexHome, "sessions", "2026", "03", "19", "rollout-a.jsonl"); + const rolloutActivityIso = "2026-03-22T15:45:30.000Z"; + const sqliteActivityIso = "2026-03-21T09:10:11.123Z"; + await writeRolloutLines(sessionPath, [ + JSON.stringify({ + timestamp: "2026-03-19T00:00:20.000Z", + type: "session_meta", + payload: { + id: "thread-a", + timestamp: "2026-03-19T00:00:00.000Z", + cwd: "C:\\AITemp", + source: "cli", + cli_version: "0.115.0", + model_provider: "apigather" + } + }), + JSON.stringify({ + timestamp: rolloutActivityIso, + type: "event_msg", + payload: { + type: "assistant_message", + message: "latest" + } + }) + ]); + const wrongTimestamp = new Date("2026-04-01T00:00:00.000Z"); + await fs.utimes(sessionPath, wrongTimestamp, wrongTimestamp); + await writeStateDb(codexHome, [ + { + id: "thread-a", + model_provider: "apigather", + archived: false, + updated_at: Math.floor(Date.parse(sqliteActivityIso) / 1000), + updated_at_ms: Date.parse(sqliteActivityIso) + } + ]); + + await runSync({ codexHome }); + + const stat = await fs.stat(sessionPath); + assert.ok(Math.abs(stat.mtimeMs - Date.parse(sqliteActivityIso)) < 2000); +}); + +test("runSync preserves original rollout mtime when sqlite thread activity is unavailable", async () => { + const { codexHome } = await makeTempCodexHome(); + await writeConfig(codexHome, 'model_provider = "openai"'); + const sessionPath = path.join(codexHome, "sessions", "2026", "03", "19", "rollout-a.jsonl"); + await writeRolloutLines(sessionPath, [ + JSON.stringify({ + timestamp: "2026-03-19T00:00:20.000Z", + type: "session_meta", + payload: { + id: "thread-a", + timestamp: "2026-03-19T00:00:00.000Z", + cwd: "C:\\AITemp", + source: "cli", + cli_version: "0.115.0", + model_provider: "apigather" + } + }), + JSON.stringify({ + timestamp: "2026-03-25T12:00:00.000Z", + type: "event_msg", + payload: { + type: "assistant_message", + message: "later in rollout only" + } + }) + ]); + const preservedTimestamp = new Date("2026-04-01T00:00:00.000Z"); + await fs.utimes(sessionPath, preservedTimestamp, preservedTimestamp); + await writeStateDb(codexHome, [ + { id: "thread-a", model_provider: "apigather", archived: false } + ]); + + await runSync({ codexHome }); + + const stat = await fs.stat(sessionPath); + assert.ok(Math.abs(stat.mtimeMs - preservedTimestamp.getTime()) < 2000); +}); + test("runSwitch updates config and syncs provider metadata", async () => { const { codexHome } = await makeTempCodexHome(); await writeConfig(codexHome);