Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions desktop/CodexProviderSync.Core.Tests/CoreIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,66 @@ await fixture.WriteStateDbAsync(
Assert.Contains("\"model_provider\":\"apigather\"", rollout);
}

[Fact]
public async Task RunSync_NormalizesRolloutLastWriteTimeToSqliteUpdatedAtMs()
{
TestCodexHomeFixture fixture = await TestCodexHomeFixture.CreateAsync();
await fixture.WriteConfigAsync("model_provider = \"openai\"");
string sessionPath = fixture.RolloutPath("sessions", "rollout-a.jsonl");
const string rolloutActivityIso = "2026-03-22T15:45:30.000Z";
const string sqliteActivityIso = "2026-03-21T09:10:11.123Z";
await fixture.WriteRolloutLinesAsync(
sessionPath,
"""
{"timestamp":"2026-03-19T00:00:20.000Z","type":"session_meta","payload":{"id":"thread-a","timestamp":"2026-03-19T00:00:00.000Z","cwd":"C:\\AITemp","source":"cli","cli_version":"0.115.0","model_provider":"apigather"}}
""",
$$$"""
{"timestamp":"{{{rolloutActivityIso}}}","type":"event_msg","payload":{"type":"assistant_message","message":"latest"}}
""");
DateTime wrongTimestamp = new(2026, 4, 1, 0, 0, 0, DateTimeKind.Utc);
File.SetLastWriteTimeUtc(sessionPath, wrongTimestamp);
DateTimeOffset sqliteActivity = DateTimeOffset.Parse(sqliteActivityIso, null, System.Globalization.DateTimeStyles.RoundtripKind).ToUniversalTime();
await fixture.WriteStateDbAsync(
[
("thread-a", "apigather", false, sqliteActivity.ToUnixTimeSeconds(), sqliteActivity.ToUnixTimeMilliseconds())
]);

CodexSyncService service = new();
await service.RunSyncAsync(fixture.CodexHome);

DateTime expectedUtc = sqliteActivity.UtcDateTime;
DateTime actualUtc = File.GetLastWriteTimeUtc(sessionPath);
Assert.InRange(Math.Abs((actualUtc - expectedUtc).TotalSeconds), 0, 2);
}

[Fact]
public async Task RunSync_PreservesRolloutLastWriteTime_WhenSqliteActivityIsUnavailable()
{
TestCodexHomeFixture fixture = await TestCodexHomeFixture.CreateAsync();
await fixture.WriteConfigAsync("model_provider = \"openai\"");
string sessionPath = fixture.RolloutPath("sessions", "rollout-a.jsonl");
await fixture.WriteRolloutLinesAsync(
sessionPath,
"""
{"timestamp":"2026-03-19T00:00:20.000Z","type":"session_meta","payload":{"id":"thread-a","timestamp":"2026-03-19T00:00:00.000Z","cwd":"C:\\AITemp","source":"cli","cli_version":"0.115.0","model_provider":"apigather"}}
""",
"""
{"timestamp":"2026-03-25T12:00:00.000Z","type":"event_msg","payload":{"type":"assistant_message","message":"later in rollout only"}}
""");
DateTime preservedTimestamp = new(2026, 4, 1, 0, 0, 0, DateTimeKind.Utc);
File.SetLastWriteTimeUtc(sessionPath, preservedTimestamp);
await fixture.WriteStateDbAsync(
[
("thread-a", "apigather", false)
]);

CodexSyncService service = new();
await service.RunSyncAsync(fixture.CodexHome);

DateTime actualUtc = File.GetLastWriteTimeUtc(sessionPath);
Assert.InRange(Math.Abs((actualUtc - preservedTimestamp).TotalSeconds), 0, 2);
}

[Fact]
public async Task GetStatus_ReportsImplicitDefaultProviderAndCounts()
{
Expand Down
59 changes: 53 additions & 6 deletions desktop/CodexProviderSync.Core.Tests/TestCodexHomeFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public async Task WriteRolloutAsync(string filePath, string id, string provider)
await File.WriteAllTextAsync(filePath, $"{first}\n{second}\n");
}

public Task WriteRolloutLinesAsync(string filePath, params string[] lines)
{
return File.WriteAllTextAsync(filePath, string.Join("\n", lines) + "\n");
}

public async Task<long> WriteBackupAsync(string directoryName, params (string RelativePath, string Content)[] files)
{
string backupDir = BackupPath(directoryName);
Expand Down Expand Up @@ -113,6 +118,26 @@ public async Task<long> WriteBackupAsync(string directoryName, params (string Re
}

public async Task WriteStateDbAsync(IEnumerable<(string Id, string ModelProvider, bool Archived)> rows)
{
await WriteStateDbRowsAsync(rows.Select(static row => new ThreadStateRow(
row.Id,
row.ModelProvider,
row.Archived,
null,
null)));
}

public async Task WriteStateDbAsync(IEnumerable<(string Id, string ModelProvider, bool Archived, long? UpdatedAt, long? UpdatedAtMs)> rows)
{
await WriteStateDbRowsAsync(rows.Select(static row => new ThreadStateRow(
row.Id,
row.ModelProvider,
row.Archived,
row.UpdatedAt,
row.UpdatedAtMs)));
}

private async Task WriteStateDbRowsAsync(IEnumerable<ThreadStateRow> rows)
{
string dbPath = Path.Combine(CodexHome, "state_5.sqlite");
await using SqliteConnection connection = OpenSqliteConnection();
Expand All @@ -121,23 +146,38 @@ public async Task WriteStateDbAsync(IEnumerable<(string Id, string ModelProvider
create.CommandText = """
CREATE TABLE threads (
id TEXT PRIMARY KEY,
rollout_path TEXT,
created_at INTEGER,
updated_at INTEGER,
updated_at_ms INTEGER,
model_provider TEXT,
archived INTEGER NOT NULL DEFAULT 0,
first_user_message TEXT NOT NULL DEFAULT ''
)
""";
await create.ExecuteNonQueryAsync();

foreach ((string id, string modelProvider, bool archived) in rows)
foreach (ThreadStateRow row in rows)
{
SqliteCommand insert = connection.CreateCommand();
insert.CommandText = """
INSERT INTO threads (id, model_provider, archived, first_user_message)
VALUES ($id, $provider, $archived, 'hello')
INSERT INTO threads (
id,
rollout_path,
created_at,
updated_at,
updated_at_ms,
model_provider,
archived,
first_user_message
)
VALUES ($id, '', NULL, $updatedAt, $updatedAtMs, $provider, $archived, 'hello')
""";
insert.Parameters.AddWithValue("$id", id);
insert.Parameters.AddWithValue("$provider", modelProvider);
insert.Parameters.AddWithValue("$archived", archived ? 1 : 0);
insert.Parameters.AddWithValue("$id", row.Id);
insert.Parameters.AddWithValue("$updatedAt", (object?)row.UpdatedAt ?? DBNull.Value);
insert.Parameters.AddWithValue("$updatedAtMs", (object?)row.UpdatedAtMs ?? DBNull.Value);
insert.Parameters.AddWithValue("$provider", row.ModelProvider);
insert.Parameters.AddWithValue("$archived", row.Archived ? 1 : 0);
await insert.ExecuteNonQueryAsync();
}
}
Expand All @@ -146,4 +186,11 @@ public SqliteConnection OpenSqliteConnection()
{
return new SqliteConnection($"Data Source={Path.Combine(CodexHome, "state_5.sqlite")};Mode=ReadWriteCreate;Pooling=False");
}

private sealed record ThreadStateRow(
string Id,
string ModelProvider,
bool Archived,
long? UpdatedAt,
long? UpdatedAtMs);
}
6 changes: 4 additions & 2 deletions desktop/CodexProviderSync.Core/BackupService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public async Task<string> CreateBackupAsync(
{
Path = change.Path,
OriginalFirstLine = change.OriginalFirstLine,
OriginalSeparator = change.OriginalSeparator
OriginalSeparator = change.OriginalSeparator,
NormalizedLastWriteTimeUtcTicks = change.NormalizedLastWriteTimeUtcTicks
}).ToList()
};
await File.WriteAllTextAsync(
Expand Down Expand Up @@ -177,7 +178,8 @@ await File.ReadAllTextAsync(metadataPath),
{
Path = change.Path,
OriginalFirstLine = change.OriginalFirstLine,
OriginalSeparator = change.OriginalSeparator
OriginalSeparator = change.OriginalSeparator,
NormalizedLastWriteTimeUtcTicks = change.NormalizedLastWriteTimeUtcTicks
}).ToList()
};
metadata = new BackupMetadataFile
Expand Down
11 changes: 11 additions & 0 deletions desktop/CodexProviderSync.Core/CodexSyncService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ public async Task<SyncResult> RunSyncAsync(
await using LockHandle _ = await _lockService.AcquireLockAsync(codexHome, "sync");

SessionChangeCollection sessionInfo = await _sessionRolloutService.CollectSessionChangesAsync(codexHome, targetProvider, skipLockedReads: true);
IReadOnlyDictionary<string, long> threadActivityUtcTicks = await _sqliteStateService.ReadThreadActivityUtcTicksByIdAsync(
codexHome,
sessionInfo.Changes.Select(static change => change.ThreadId));
foreach (SessionChange change in sessionInfo.Changes)
{
if (change.ThreadId is not null
&& threadActivityUtcTicks.TryGetValue(change.ThreadId, out long normalizedTicks))
{
change.NormalizedLastWriteTimeUtcTicks = normalizedTicks;
}
}
(IReadOnlyList<SessionChange> writableChanges, IReadOnlyList<SessionChange> lockedChanges) =
await _sessionRolloutService.SplitLockedSessionChangesAsync(sessionInfo.Changes);

Expand Down
2 changes: 2 additions & 0 deletions desktop/CodexProviderSync.Core/Models.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public sealed class SessionChange
public required int OriginalOffset { get; init; }
public required long OriginalFileLength { get; init; }
public required long OriginalLastWriteTimeUtcTicks { get; init; }
public required long NormalizedLastWriteTimeUtcTicks { get; set; }
public required string UpdatedFirstLine { get; init; }
}

Expand Down Expand Up @@ -159,4 +160,5 @@ internal sealed class SessionBackupManifestEntry
public required string Path { get; init; }
public required string OriginalFirstLine { get; init; }
public required string OriginalSeparator { get; init; }
public long? NormalizedLastWriteTimeUtcTicks { get; init; }
}
79 changes: 55 additions & 24 deletions desktop/CodexProviderSync.Core/SessionRolloutService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Buffers;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;

namespace CodexProviderSync.Core;
Expand Down Expand Up @@ -63,6 +64,7 @@ record = await ReadFirstLineRecordAsync(rolloutPath);
OriginalOffset = record.Offset,
OriginalFileLength = snapshot.Length,
OriginalLastWriteTimeUtcTicks = snapshot.LastWriteTimeUtcTicks,
NormalizedLastWriteTimeUtcTicks = snapshot.LastWriteTimeUtcTicks,
UpdatedFirstLine = root!.ToJsonString()
});
}
Expand Down Expand Up @@ -157,7 +159,11 @@ internal async Task RestoreSessionChangesAsync(IEnumerable<SessionBackupManifest
{
foreach (SessionBackupManifestEntry entry in manifestEntries)
{
await RewriteFirstLineAsync(entry.Path, entry.OriginalFirstLine, entry.OriginalSeparator);
await RewriteFirstLineAsync(
entry.Path,
entry.OriginalFirstLine,
entry.OriginalSeparator,
entry.NormalizedLastWriteTimeUtcTicks);
}
}

Expand All @@ -168,7 +174,8 @@ internal Task RestoreSessionChangesAsync(IEnumerable<SessionChange> changes)
{
Path = change.Path,
OriginalFirstLine = change.OriginalFirstLine,
OriginalSeparator = change.OriginalSeparator
OriginalSeparator = change.OriginalSeparator,
NormalizedLastWriteTimeUtcTicks = change.NormalizedLastWriteTimeUtcTicks
}));
}

Expand Down Expand Up @@ -226,26 +233,30 @@ private async Task<bool> TryRewriteCollectedSessionChangeAsync(SessionChange cha
{
try
{
await using FileStream sourceStream = OpenExclusiveRewriteStream(change.Path);
if (sourceStream.Length != change.OriginalFileLength)
await using (FileStream sourceStream = OpenExclusiveRewriteStream(change.Path))
{
return false;
}
if (sourceStream.Length != change.OriginalFileLength)
{
return false;
}

FirstLineRecord current = await ReadFirstLineRecordAsync(sourceStream);
if (!string.Equals(current.FirstLine, change.OriginalFirstLine, StringComparison.Ordinal)
|| current.Offset != change.OriginalOffset)
{
return false;
FirstLineRecord current = await ReadFirstLineRecordAsync(sourceStream);
if (!string.Equals(current.FirstLine, change.OriginalFirstLine, StringComparison.Ordinal)
|| current.Offset != change.OriginalOffset)
{
return false;
}

await RewriteFirstLineAsync(
sourceStream,
change.Path,
change.UpdatedFirstLine,
change.OriginalSeparator,
change.OriginalOffset,
headerOnly: change.OriginalOffset >= change.OriginalFileLength);
}

await RewriteFirstLineAsync(
sourceStream,
change.Path,
change.UpdatedFirstLine,
change.OriginalSeparator,
change.OriginalOffset,
headerOnly: change.OriginalOffset >= change.OriginalFileLength);
ApplyNormalizedLastWriteTime(change.Path, change.NormalizedLastWriteTimeUtcTicks);
return true;
}
catch (Exception error) when (IsRolloutFileBusyError(error))
Expand All @@ -254,15 +265,23 @@ await RewriteFirstLineAsync(
}
}

private async Task RewriteFirstLineAsync(string filePath, string nextFirstLine, string separator)
private async Task RewriteFirstLineAsync(
string filePath,
string nextFirstLine,
string separator,
long? normalizedLastWriteTimeUtcTicks = null)
{
try
{
await using FileStream sourceStream = OpenExclusiveRewriteStream(filePath);
FirstLineRecord current = await ReadFirstLineRecordAsync(sourceStream);
bool headerOnly = string.IsNullOrEmpty(current.Separator)
&& current.Offset == Encoding.UTF8.GetByteCount(current.FirstLine);
await RewriteFirstLineAsync(sourceStream, filePath, nextFirstLine, separator, current.Offset, headerOnly);
await using (FileStream sourceStream = OpenExclusiveRewriteStream(filePath))
{
FirstLineRecord current = await ReadFirstLineRecordAsync(sourceStream);
bool headerOnly = string.IsNullOrEmpty(current.Separator)
&& current.Offset == Encoding.UTF8.GetByteCount(current.FirstLine);
await RewriteFirstLineAsync(sourceStream, filePath, nextFirstLine, separator, current.Offset, headerOnly);
}

ApplyNormalizedLastWriteTime(filePath, normalizedLastWriteTimeUtcTicks);
}
catch (Exception error)
{
Expand Down Expand Up @@ -399,6 +418,18 @@ private static FileSnapshot GetFileSnapshot(string filePath)
return new FileSnapshot(fileInfo.Length, fileInfo.LastWriteTimeUtc.Ticks);
}

private static void ApplyNormalizedLastWriteTime(string filePath, long? normalizedLastWriteTimeUtcTicks)
{
if (normalizedLastWriteTimeUtcTicks is null or <= 0)
{
return;
}

DateTime normalizedUtc = new(normalizedLastWriteTimeUtcTicks.Value, DateTimeKind.Utc);
File.SetLastWriteTimeUtc(filePath, normalizedUtc);
File.SetLastAccessTimeUtc(filePath, normalizedUtc);
}

private static async Task<List<string>> FindLockedFilesAsync(IEnumerable<string> filePaths)
{
List<string> lockedPaths = [];
Expand Down
Loading