Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 194 additions & 23 deletions cli/selftune/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ export interface SyncStepResult {
skipped: number;
}

export interface SyncPhaseTiming {
phase: string;
elapsed_ms: number;
}

export interface SyncResult {
since: string | null;
dry_run: boolean;
Expand All @@ -87,6 +92,8 @@ export interface SyncResult {
repaired_records: number;
codex_repaired_records: number;
};
timings: SyncPhaseTiming[];
total_elapsed_ms: number;
}

export interface SyncOptions {
Expand All @@ -107,6 +114,8 @@ export interface SyncOptions {
rebuildSkillUsage: boolean;
}

export type SyncProgressCallback = (message: string) => void;

export interface SyncDeps {
syncClaude?: (options: SyncOptions) => SyncStepResult;
syncCodex?: (options: SyncOptions) => SyncStepResult;
Expand Down Expand Up @@ -139,14 +148,29 @@ export function createDefaultSyncOptions(overrides: Partial<SyncOptions> = {}):
};
}

function syncClaudeSource(options: SyncOptions): SyncStepResult {
/** Shared file-list cache so repair can reuse the ingest-phase scan. */
interface FileListCache {
claudeTranscripts?: string[];
codexRollouts?: string[];
}

function syncClaudeSource(
options: SyncOptions,
onProgress?: SyncProgressCallback,
cache?: FileListCache,
): SyncStepResult {
if (!existsSync(options.projectsDir)) {
return { available: false, scanned: 0, synced: 0, skipped: 0 };
}

onProgress?.("scanning Claude transcripts...");
const transcriptFiles = findTranscriptFiles(options.projectsDir, options.since);
if (cache) cache.claudeTranscripts = transcriptFiles;

const alreadyIngested = options.force ? new Set<string>() : loadMarker(CLAUDE_CODE_MARKER);
const pending = transcriptFiles.filter((f) => !alreadyIngested.has(f));
onProgress?.(`found ${transcriptFiles.length} transcripts, ${pending.length} pending`);

const newIngested = new Set<string>();
let synced = 0;
let skipped = 0;
Expand Down Expand Up @@ -180,14 +204,23 @@ function syncClaudeSource(options: SyncOptions): SyncStepResult {
};
}

function syncCodexSource(options: SyncOptions): SyncStepResult {
function syncCodexSource(
options: SyncOptions,
onProgress?: SyncProgressCallback,
cache?: FileListCache,
): SyncStepResult {
onProgress?.("scanning Codex rollouts...");
const rolloutFiles = findRolloutFiles(options.codexHome, options.since);
if (cache) cache.codexRollouts = rolloutFiles;

if (rolloutFiles.length === 0 && !existsSync(join(options.codexHome, "sessions"))) {
return { available: false, scanned: 0, synced: 0, skipped: 0 };
}

const alreadyIngested = options.force ? new Set<string>() : loadMarker(CODEX_INGEST_MARKER);
const pending = rolloutFiles.filter((f) => !alreadyIngested.has(f));
onProgress?.(`found ${rolloutFiles.length} rollouts, ${pending.length} pending`);

const skillNames = findCodexSkillNames();
const newIngested = new Set<string>();
let synced = 0;
Expand Down Expand Up @@ -216,11 +249,15 @@ function syncCodexSource(options: SyncOptions): SyncStepResult {
};
}

function syncOpenCodeSource(options: SyncOptions): SyncStepResult {
function syncOpenCodeSource(
options: SyncOptions,
onProgress?: SyncProgressCallback,
): SyncStepResult {
if (!existsSync(options.opencodeDataDir)) {
return { available: false, scanned: 0, synced: 0, skipped: 0 };
}

onProgress?.("scanning OpenCode sessions...");
const dbPath = join(options.opencodeDataDir, "opencode.db");
const storageDir = join(options.opencodeDataDir, "storage");
const skillNames = findOpenCodeSkillNames();
Expand All @@ -237,6 +274,7 @@ function syncOpenCodeSource(options: SyncOptions): SyncStepResult {

const alreadyIngested = options.force ? new Set<string>() : loadMarker(OPENCODE_INGEST_MARKER);
const pending = allSessions.filter((session) => !alreadyIngested.has(session.session_id));
onProgress?.(`found ${allSessions.length} sessions, ${pending.length} pending`);
const newIngested = new Set<string>();

for (const session of pending) {
Expand All @@ -256,16 +294,21 @@ function syncOpenCodeSource(options: SyncOptions): SyncStepResult {
};
}

function syncOpenClawSource(options: SyncOptions): SyncStepResult {
function syncOpenClawSource(
options: SyncOptions,
onProgress?: SyncProgressCallback,
): SyncStepResult {
if (!existsSync(options.openclawAgentsDir)) {
return { available: false, scanned: 0, synced: 0, skipped: 0 };
}

onProgress?.("scanning OpenClaw sessions...");
const sinceTs = options.since ? options.since.getTime() : null;
const allSessions = findOpenClawSessions(options.openclawAgentsDir, sinceTs);
const skillNames = findOpenClawSkillNames(options.openclawAgentsDir);
const alreadyIngested = options.force ? new Set<string>() : loadMarker(OPENCLAW_INGEST_MARKER);
const pending = allSessions.filter((session) => !alreadyIngested.has(session.sessionId));
onProgress?.(`found ${allSessions.length} sessions, ${pending.length} pending`);
const newIngested = new Set<string>();
let synced = 0;
let skipped = 0;
Expand Down Expand Up @@ -293,13 +336,26 @@ function syncOpenClawSource(options: SyncOptions): SyncStepResult {
};
}

function rebuildSkillUsageOverlay(options: SyncOptions): {
function rebuildSkillUsageOverlay(
options: SyncOptions,
onProgress?: SyncProgressCallback,
cache?: FileListCache,
): {
repairedSessions: number;
repairedRecords: number;
codexRepairedRecords: number;
} {
const transcriptPaths = findTranscriptFiles(options.projectsDir, options.since);
const rolloutPaths = findRolloutFiles(options.codexHome, options.since);
// Reuse cached file lists from ingest phase when available to avoid re-walking the filesystem
const transcriptPaths =
cache?.claudeTranscripts ?? findTranscriptFiles(options.projectsDir, options.since);
const rolloutPaths = cache?.codexRollouts ?? findRolloutFiles(options.codexHome, options.since);

const reusedClaude = cache?.claudeTranscripts ? " (cached)" : "";
const reusedCodex = cache?.codexRollouts ? " (cached)" : "";
onProgress?.(
`repairing from ${transcriptPaths.length} transcripts${reusedClaude}, ${rolloutPaths.length} rollouts${reusedCodex}`,
);

const rawSkillRecords = readJsonl<SkillUsageRecord>(options.skillLogPath);
const { repairedRecords, repairedSessionIds } = rebuildSkillUsageFromTranscripts(
transcriptPaths,
Expand All @@ -326,37 +382,86 @@ function rebuildSkillUsageOverlay(options: SyncOptions): {
);
}

onProgress?.(
`repaired ${repairedRecords.length} records across ${repairedSessionIds.size} sessions`,
);

return {
repairedSessions: repairedSessionIds.size,
repairedRecords: repairedRecords.length,
codexRepairedRecords: codexRecords.length,
};
}

export function syncSources(options: SyncOptions, deps: SyncDeps = {}): SyncResult {
const runClaude = deps.syncClaude ?? syncClaudeSource;
const runCodex = deps.syncCodex ?? syncCodexSource;
const runOpenCode = deps.syncOpenCode ?? syncOpenCodeSource;
const runOpenClaw = deps.syncOpenClaw ?? syncOpenClawSource;
const runRepair = deps.rebuildSkillUsage ?? rebuildSkillUsageOverlay;
function timePhase<T>(name: string, fn: () => T, timings: SyncPhaseTiming[]): T {
const start = performance.now();
const result = fn();
timings.push({ phase: name, elapsed_ms: Math.round(performance.now() - start) });
return result;
}

export function syncSources(
options: SyncOptions,
deps: SyncDeps = {},
onProgress?: SyncProgressCallback,
): SyncResult {
const totalStart = performance.now();
const timings: SyncPhaseTiming[] = [];
const cache: FileListCache = {};

const runClaude = deps.syncClaude;
const runCodex = deps.syncCodex;
const runOpenCode = deps.syncOpenCode;
const runOpenClaw = deps.syncOpenClaw;
const runRepair = deps.rebuildSkillUsage;

const disabledStep: SyncStepResult = { available: false, scanned: 0, synced: 0, skipped: 0 };

onProgress?.("starting sync...");

const claude = options.syncClaude
? runClaude(options)
: { available: false, scanned: 0, synced: 0, skipped: 0 };
? timePhase(
"claude",
() => (runClaude ? runClaude(options) : syncClaudeSource(options, onProgress, cache)),
timings,
)
: disabledStep;

const codex = options.syncCodex
? runCodex(options)
: { available: false, scanned: 0, synced: 0, skipped: 0 };
? timePhase(
"codex",
() => (runCodex ? runCodex(options) : syncCodexSource(options, onProgress, cache)),
timings,
)
: disabledStep;

const opencode = options.syncOpenCode
? runOpenCode(options)
: { available: false, scanned: 0, synced: 0, skipped: 0 };
? timePhase(
"opencode",
() => (runOpenCode ? runOpenCode(options) : syncOpenCodeSource(options, onProgress)),
timings,
)
: disabledStep;

const openclaw = options.syncOpenClaw
? runOpenClaw(options)
: { available: false, scanned: 0, synced: 0, skipped: 0 };
? timePhase(
"openclaw",
() => (runOpenClaw ? runOpenClaw(options) : syncOpenClawSource(options, onProgress)),
timings,
)
: disabledStep;

const repair = options.rebuildSkillUsage
? runRepair(options)
? timePhase(
"repair",
() =>
runRepair ? runRepair(options) : rebuildSkillUsageOverlay(options, onProgress, cache),
timings,
)
: { repairedSessions: 0, repairedRecords: 0, codexRepairedRecords: 0 };

const totalElapsed = Math.round(performance.now() - totalStart);

return {
since: options.since ? options.since.toISOString() : null,
dry_run: options.dryRun,
Expand All @@ -367,9 +472,25 @@ export function syncSources(options: SyncOptions, deps: SyncDeps = {}): SyncResu
repaired_records: repair.repairedRecords,
codex_repaired_records: repair.codexRepairedRecords,
},
timings,
total_elapsed_ms: totalElapsed,
};
}

function formatMs(ms: number): string {
if (ms < 1000) return `${ms}ms`;
return `${(ms / 1000).toFixed(1)}s`;
}

function formatStepLine(label: string, step: SyncStepResult, timing?: SyncPhaseTiming): string {
if (!step.available) return ` ${label}: not available`;
const parts = [`scanned ${step.scanned}`];
if (step.synced > 0) parts.push(`synced ${step.synced}`);
if (step.skipped > 0) parts.push(`skipped ${step.skipped}`);
const time = timing ? ` (${formatMs(timing.elapsed_ms)})` : "";
return ` ${label}: ${parts.join(", ")}${time}`;
}

export function cliMain(): void {
const { values } = parseArgs({
options: {
Expand All @@ -388,6 +509,7 @@ export function cliMain(): void {
"no-opencode": { type: "boolean", default: false },
"no-openclaw": { type: "boolean", default: false },
"no-repair": { type: "boolean", default: false },
json: { type: "boolean", default: false },
help: { type: "boolean", short: "h", default: false },
},
strict: true,
Expand Down Expand Up @@ -415,6 +537,7 @@ Options:
--no-opencode Skip OpenCode ingest
--no-openclaw Skip OpenClaw ingest
--no-repair Skip rebuilt skill-usage overlay
--json Output raw JSON instead of human-readable summary
-h, --help Show this help`);
process.exit(0);
}
Expand All @@ -428,6 +551,23 @@ Options:
}
}

// JSON output: explicit --json flag, or auto when stdout is not a TTY (preserves contract for automation)
const jsonOutput = (values.json ?? false) || !process.stdout.isTTY;

const onProgress: SyncProgressCallback | undefined = jsonOutput
? undefined
: (msg) => {
process.stderr.write(` ${msg}\n`);
};

if (!jsonOutput) {
const flags: string[] = [];
if (values.force) flags.push("--force");
if (values["dry-run"]) flags.push("--dry-run");
if (since) flags.push(`--since ${values.since}`);
process.stderr.write(`selftune sync${flags.length ? ` ${flags.join(" ")}` : ""}\n`);
}

const result = syncSources(
createDefaultSyncOptions({
projectsDir: values["projects-dir"] ?? CLAUDE_CODE_PROJECTS_DIR,
Expand All @@ -446,9 +586,40 @@ Options:
syncOpenClaw: !(values["no-openclaw"] ?? false),
rebuildSkillUsage: !(values["no-repair"] ?? false),
}),
{},
onProgress,
);

console.log(JSON.stringify(result, null, 2));
if (jsonOutput) {
console.log(JSON.stringify(result, null, 2));
} else {
const timingMap = new Map(result.timings.map((t) => [t.phase, t]));

process.stderr.write("\nSources:\n");
process.stderr.write(
`${formatStepLine("Claude", result.sources.claude, timingMap.get("claude"))}\n`,
);
process.stderr.write(
`${formatStepLine("Codex", result.sources.codex, timingMap.get("codex"))}\n`,
);
process.stderr.write(
`${formatStepLine("OpenCode", result.sources.opencode, timingMap.get("opencode"))}\n`,
);
process.stderr.write(
`${formatStepLine("OpenClaw", result.sources.openclaw, timingMap.get("openclaw"))}\n`,
);

if (result.repair.ran) {
const repairTiming = timingMap.get("repair");
const repairTime = repairTiming ? ` (${formatMs(repairTiming.elapsed_ms)})` : "";
process.stderr.write(
`\nRepair: ${result.repair.repaired_records} records, ` +
`${result.repair.repaired_sessions} sessions${repairTime}\n`,
);
}

process.stderr.write(`\nDone in ${formatMs(result.total_elapsed_ms)}\n`);
}
}

if (import.meta.main) {
Expand Down
Loading
Loading