Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions apps/code/src/main/db/repositories/workspace-repository.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ export function createMockWorkspaceRepository(): MockWorkspaceRepository {
taskIndex.set(workspace.taskId, workspace.id);
return { ...workspace };
},
createCloudMany: (taskIds: string[]) => {
const now = new Date().toISOString();
for (const taskId of taskIds) {
const workspace: Workspace = {
id: crypto.randomUUID(),
taskId,
repositoryId: null,
mode: "cloud",
pinnedAt: null,
lastViewedAt: null,
lastActivityAt: null,
linkedBranch: null,
createdAt: now,
updatedAt: now,
};
workspaces.set(workspace.id, workspace);
taskIndex.set(workspace.taskId, workspace.id);
}
},
deleteByTaskId: (taskId: string) => {
const id = taskIndex.get(taskId);
if (id) {
Expand Down
15 changes: 15 additions & 0 deletions apps/code/src/main/db/repositories/workspace-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export interface IWorkspaceRepository {
findAllPinned(): Workspace[];
findAll(): Workspace[];
create(data: CreateWorkspaceData): Workspace;
createCloudMany(taskIds: string[]): void;
deleteByTaskId(taskId: string): void;
deleteById(id: string): void;
updatePinnedAt(taskId: string, pinnedAt: string | null): void;
Expand Down Expand Up @@ -98,6 +99,20 @@ export class WorkspaceRepository implements IWorkspaceRepository {
return created;
}

createCloudMany(taskIds: string[]): void {
if (taskIds.length === 0) return;
const timestamp = now();
const rows: NewWorkspace[] = taskIds.map((taskId) => ({
id: crypto.randomUUID(),
taskId,
repositoryId: null,
mode: "cloud",
createdAt: timestamp,
updatedAt: timestamp,
}));
this.db.insert(workspaces).values(rows).run();
}

deleteByTaskId(taskId: string): void {
this.db.delete(workspaces).where(byTaskId(taskId)).run();
}
Expand Down
62 changes: 35 additions & 27 deletions apps/code/src/main/services/enrichment/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,15 @@ const STALE_FLAG_SUGGESTION_CAP = 4;
const STALE_FLAG_REFERENCES_PER_FLAG = 5;
const STALE_LOOKBACK_DAYS = 30;

// Yields to the event loop between batches; without this, parse-heavy scans
// freeze IPC / UI on the main process.
const SCAN_BATCH_SIZE = 32;
// Tree-sitter parse() is synchronous and runs on the main process event
// loop. To keep IPC responsive we (1) yield after every file (not every
// batch), (2) skip files past a size threshold — they're almost always
// minified bundles or generated code where parsing buys nothing, and
// (3) cap total parsed files so a monorepo (e.g. PostHog itself) doesn't
// stall boot for tens of seconds. When the cap trips we fall back to
// manifest-only install detection rather than failing outright.
const MAX_FILE_BYTES = 256 * 1024;
const MAX_FILES_TO_PARSE = 500;

interface ParsedRepoEntry {
langId: string;
Expand All @@ -102,21 +108,6 @@ function yieldToEventLoop(): Promise<void> {
return new Promise<void>((resolve) => setImmediate(resolve));
}

async function processInBatches<T, R>(
items: T[],
batchSize: number,
fn: (item: T) => Promise<R>,
): Promise<R[]> {
const out: R[] = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchResults = await Promise.all(batch.map(fn));
for (const r of batchResults) out.push(r);
await yieldToEventLoop();
}
return out;
}

function shouldSkipPath(relPath: string): boolean {
const parts = relPath.split(/[\\/]/);
return parts.some((segment) => SKIP_PATH_SEGMENTS.has(segment));
Expand Down Expand Up @@ -352,23 +343,40 @@ export class EnrichmentService {
const langId = langIdMap[ext];
if (!langId || !enricher.isSupported(langId)) continue;
toParse.push({ relPath, langId });
if (toParse.length >= MAX_FILES_TO_PARSE) {
log.info("Capping repo parse to keep main process responsive", {
repoPath,
totalCandidates: posthogFiles.length,
parseLimit: MAX_FILES_TO_PARSE,
});
Comment thread
charlesvien marked this conversation as resolved.
break;
}
}

const files = new Map<string, ParsedRepoEntry>();
await processInBatches(toParse, SCAN_BATCH_SIZE, async (candidate) => {
// Serial with a yield after every file. Tree-sitter parse() is sync CPU
// on the event loop; batching with Promise.all stacked all parses in one
// synchronous burst between yields, which froze IPC. Per-file yields cap
// each blocking window at one file's parse cost.
for (const candidate of toParse) {
const absPath = path.join(repoPath, candidate.relPath);
let content: string;
try {
content = await fs.readFile(
path.join(repoPath, candidate.relPath),
"utf-8",
);
const stat = await fs.stat(absPath);
if (stat.size > MAX_FILE_BYTES) {
files.set(candidate.relPath, {
langId: candidate.langId,
result: null,
});
continue;
}
content = await fs.readFile(absPath, "utf-8");
} catch {
return null;
continue;
}
try {
const result = await enricher.parse(content, candidate.langId);
files.set(candidate.relPath, { langId: candidate.langId, result });
return null;
} catch (err) {
log.debug("enricher.parse threw during repo scan, skipping file", {
file: candidate.relPath,
Expand All @@ -378,9 +386,9 @@ export class EnrichmentService {
langId: candidate.langId,
result: null,
});
return null;
}
});
await yieldToEventLoop();
}

const entry: ParsedRepoCacheEntry = { files, manifestHit };
this.repoScanCache.set(repoPath, entry);
Expand Down
43 changes: 36 additions & 7 deletions apps/code/src/main/services/llm-gateway/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,17 @@ export class LlmGatewayService {
system?: string;
maxTokens?: number;
model?: string;
signal?: AbortSignal;
timeoutMs?: number;
} = {},
): Promise<PromptOutput> {
const { system, maxTokens, model = "claude-haiku-4-5" } = options;
const {
system,
maxTokens,
model = "claude-haiku-4-5",
signal,
timeoutMs = 60_000,
} = options;

const auth = await this.authService.getValidAccessToken();
const gatewayUrl = getLlmGatewayUrl(auth.apiHost);
Expand All @@ -72,17 +80,38 @@ export class LlmGatewayService {
messageCount: messages.length,
});

const response = await this.authService.authenticatedFetch(
fetch,
messagesUrl,
{
const timeoutController = new AbortController();
const timeoutId = setTimeout(() => {
timeoutController.abort();
}, timeoutMs);
const onCallerAbort = () => timeoutController.abort();
if (signal) {
if (signal.aborted) timeoutController.abort();
else signal.addEventListener("abort", onCallerAbort, { once: true });
}

let response: Response;
try {
response = await this.authService.authenticatedFetch(fetch, messagesUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(requestBody),
},
);
signal: timeoutController.signal,
});
} catch (err) {
if (timeoutController.signal.aborted && !signal?.aborted) {
throw new LlmGatewayError(
`LLM gateway request timed out after ${timeoutMs}ms`,
"timeout",
);
}
throw err;
} finally {
clearTimeout(timeoutId);
signal?.removeEventListener("abort", onCallerAbort);
}

if (!response.ok) {
const errorBody = await response.text();
Expand Down
14 changes: 14 additions & 0 deletions apps/code/src/main/services/workspace/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ export const createWorkspaceInput = z
},
);

export const reconcileCloudWorkspacesInput = z.object({
taskIds: z.array(z.string()),
});

export const reconcileCloudWorkspacesOutput = z.object({
created: z.array(z.string()),
});

export const deleteWorkspaceInput = z.object({
taskId: z.string(),
mainRepoPath: z.string(),
Expand Down Expand Up @@ -264,6 +272,12 @@ export type WorkspaceInfo = z.infer<typeof workspaceInfoSchema>;
export type Workspace = z.infer<typeof workspaceSchema>;

export type CreateWorkspaceInput = z.infer<typeof createWorkspaceInput>;
export type ReconcileCloudWorkspacesInput = z.infer<
typeof reconcileCloudWorkspacesInput
>;
export type ReconcileCloudWorkspacesOutput = z.infer<
typeof reconcileCloudWorkspacesOutput
>;
export type DeleteWorkspaceInput = z.infer<typeof deleteWorkspaceInput>;
export type VerifyWorkspaceInput = z.infer<typeof verifyWorkspaceInput>;
export type GetWorkspaceInfoInput = z.infer<typeof getWorkspaceInfoInput>;
Expand Down
25 changes: 25 additions & 0 deletions apps/code/src/main/services/workspace/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import type {
BranchChangedPayload,
CreateWorkspaceInput,
LinkedBranchChangedPayload,
ReconcileCloudWorkspacesOutput,
Workspace,
WorkspaceErrorPayload,
WorkspaceInfo,
Expand Down Expand Up @@ -433,6 +434,30 @@ export class WorkspaceService extends TypedEventEmitter<WorkspaceServiceEvents>
}
}

// Batched cloud-workspace reconcile. The renderer calls this once on boot
// with every cloud taskId it sees that has no local workspace row, instead
// of firing one createWorkspace mutation per task. With 100+ cloud tasks
// the N-call pattern saturates the main thread on the tRPC IPC path; this
// collapses it to one IPC + one batched insert.
async reconcileCloudWorkspaces(
taskIds: string[],
): Promise<ReconcileCloudWorkspacesOutput> {
if (taskIds.length === 0) return { created: [] };

const existingTaskIds = new Set(
this.workspaceRepo.findAll().map((w) => w.taskId),
);
const uniqueRequested = Array.from(new Set(taskIds));
const toCreate = uniqueRequested.filter((id) => !existingTaskIds.has(id));
if (toCreate.length === 0) return { created: [] };

log.info(
`Reconciling ${toCreate.length} cloud workspaces (requested ${taskIds.length})`,
);
this.workspaceRepo.createCloudMany(toCreate);
return { created: toCreate };
}

async createWorkspace(options: CreateWorkspaceInput): Promise<WorkspaceInfo> {
// Prevent concurrent workspace creation for the same task
const existingPromise = this.creatingWorkspaces.get(options.taskId);
Expand Down
9 changes: 9 additions & 0 deletions apps/code/src/main/trpc/routers/workspace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import {
listGitWorktreesOutput,
markActivityInput,
markViewedInput,
reconcileCloudWorkspacesInput,
reconcileCloudWorkspacesOutput,
taskPrStatusInput,
taskPrStatusOutput,
togglePinInput,
Expand Down Expand Up @@ -66,6 +68,13 @@ export const workspaceRouter = router({
.output(createWorkspaceOutput)
.mutation(({ input }) => getService().createWorkspace(input)),

reconcileCloudWorkspaces: publicProcedure
.input(reconcileCloudWorkspacesInput)
.output(reconcileCloudWorkspacesOutput)
.mutation(({ input }) =>
getService().reconcileCloudWorkspaces(input.taskIds),
),

delete: publicProcedure
.input(deleteWorkspaceInput)
.mutation(({ input }) =>
Expand Down
44 changes: 17 additions & 27 deletions apps/code/src/renderer/components/MainLayout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -97,34 +97,24 @@ export function MainLayout() {
!reconcilingTaskIds.current.has(t.id),
);
if (missing.length === 0) return;
for (const t of missing) reconcilingTaskIds.current.add(t.id);
void Promise.allSettled(
missing.map((t) =>
workspaceApi.create({
taskId: t.id,
mainRepoPath: "",
folderId: "",
folderPath: "",
mode: "cloud",
}),
),
).then((results) => {
let anySucceeded = false;
for (const [i, r] of results.entries()) {
const id = missing[i].id;
reconcilingTaskIds.current.delete(id);
if (r.status === "rejected") {
log.warn(`Failed to reconcile workspace for task ${id}`, r.reason);
} else {
anySucceeded = true;
const missingIds = missing.map((t) => t.id);
for (const id of missingIds) reconcilingTaskIds.current.add(id);
// Single batched IPC instead of one mutation per task — with many cloud
// tasks the per-task pattern saturates the main thread at boot.
workspaceApi
.reconcileCloudWorkspaces(missingIds)
.then((result) => {
for (const id of missingIds) reconcilingTaskIds.current.delete(id);
if (result.created.length > 0) {
void queryClient.invalidateQueries(
trpcReact.workspace.getAll.pathFilter(),
);
}
}
if (anySucceeded) {
void queryClient.invalidateQueries(
trpcReact.workspace.getAll.pathFilter(),
);
}
});
})
.catch((err) => {
for (const id of missingIds) reconcilingTaskIds.current.delete(id);
log.warn("Failed to reconcile cloud workspaces", err);
});
}, [
syncCloudTasksEnabled,
tasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ function TaskCommandIcon({ task }: { task: Task }) {
const { prState, hasDiff } = useTaskPrStatus({
id: task.id,
cloudPrUrl: null,
taskRunEnvironment: task.latest_run?.environment,
});
return (
<TaskIcon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ export function useSetupDiscovery() {

useEffect(() => {
if (startedRef.current) return;
if (discoveryStatus === "done") return;
// Only auto-fire from a clean "idle" state. "done" needs no rerun, and
// "error" (which now includes interrupted runs persisted across boots —
// see setupStore partialize) requires an explicit user retry to recover.
if (discoveryStatus !== "idle") return;
if (!selectedDirectory) return;

startedRef.current = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ export class SetupRunService {
private enricherSuggestionsRunning = false;

startSetup(directory: string): void {
// Defense in depth: never auto-run from a non-idle persisted state.
// The hook (useSetupDiscovery) is the primary gate, but a direct call
// path could otherwise re-enter the loop that wedged users on boot —
// creating fresh cloud tasks and a tree-sitter parse storm against the
// user's repo on every launch.
const status = useSetupStore.getState().discoveryStatus;
if (status !== "idle") return;
this.injectEnricherSuggestions(directory);
this.startDiscovery(directory);
}
Expand Down
Loading
Loading