Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/utils/pipeline-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ export function createL2Runner(opts: {
logger.debug?.(
`${TAG} [L2] No new L1 records since cursor (session=${sessionKey}, updatedAfter=${cursor ?? "(full)"}), skipping scene extraction`,
);
return;
return { latestCursor: cursor || undefined };
}

logger.debug?.(
Expand Down Expand Up @@ -497,7 +497,7 @@ export function createL2Runner(opts: {

if (sessionRecords.length === 0) {
logger.debug?.(`${TAG} [L2] No new L1 records found (JSONL fallback, session=${sessionKey}), skipping scene extraction`);
return;
return { latestCursor: cursor || undefined };
}

records = sessionRecords.map((r) => ({
Expand Down
5 changes: 5 additions & 0 deletions src/utils/pipeline-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,11 @@ export class MemoryPipelineManager {
// Advance cursor using the record timestamp returned by the runner
if (result?.latestCursor) {
state.last_extraction_updated_time = result.latestCursor;
} else if (!state.last_extraction_updated_time) {
// Runner returned void (zero records or extraction failed) and cursor
// is still empty. Seed with current time so the next L2 run fetches
// incrementally instead of re-scanning all records.
state.last_extraction_updated_time = new Date().toISOString();
}

await this.persistStates();
Expand Down
Loading