Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 4 additions & 54 deletions packages/control-plane/src/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ import {
SourceControlProviderError,
type SourceControlProviderName,
} from "./source-control";
import { IntegrationSettingsStore } from "./db/integration-settings";
import { SessionIndexStore } from "./db/session-index";
import { UserScmTokenStore, DEFAULT_TOKEN_LIFETIME_MS } from "./db/user-scm-tokens";
import { UserStore, type ProviderIdentity } from "./db/user-store";
import { buildSessionInternalUrl, SessionInternalPaths } from "./session/contracts";
import { initializeSession, type SessionInitInput } from "./session/initialize";
import {
resolveCodeServerEnabled,
resolveSandboxSettings,
} from "./session/integration-settings-resolution";

import {
getValidModelOrDefault,
isValidModel,
isValidReasoningEffort,
VALID_MODELS,
type CodeServerSettings,
type SandboxSettings,
type ScreenshotArtifactMetadata,
type SessionStatus,
type CallbackContext,
Expand Down Expand Up @@ -75,57 +76,6 @@ const MAX_SPAWN_DEPTH = 2;
const MAX_CONCURRENT_CHILDREN = 5;
const MAX_TOTAL_CHILDREN = 15;

/**
* Resolve whether code-server should be enabled for a given repo,
* checking both the `enabled` setting and the `enabledRepos` allowlist.
*/
async function resolveCodeServerEnabled(
db: D1Database | undefined,
repoOwner: string,
repoName: string
): Promise<boolean> {
if (!db) return false;
const repo = `${repoOwner}/${repoName}`;
try {
const store = new IntegrationSettingsStore(db);
const { enabledRepos, settings } = await store.getResolvedConfig("code-server", repo);
const csSettings = settings as CodeServerSettings;
if (csSettings.enabled !== true) return false;
// enabledRepos: null → all repos, [] → none, [...] → allowlist
if (enabledRepos !== null && !enabledRepos.includes(repo)) return false;
return true;
} catch (e) {
logger.warn("Failed to resolve code-server integration settings, defaulting to disabled", {
error: e instanceof Error ? e.message : String(e),
});
return false;
}
}

/**
* Resolve sandbox settings for a given repo, merging global defaults with per-repo overrides.
*/
async function resolveSandboxSettings(
db: D1Database | undefined,
repoOwner: string,
repoName: string
): Promise<SandboxSettings> {
if (!db) return {};
const repo = `${repoOwner}/${repoName}`;
try {
const store = new IntegrationSettingsStore(db);
const { enabledRepos, settings } = await store.getResolvedConfig("sandbox", repo);
// enabledRepos: null → all repos, [] → none, [...] → allowlist
if (enabledRepos !== null && !enabledRepos.includes(repo)) return {};
return settings as SandboxSettings;
} catch (e) {
logger.warn("Failed to resolve sandbox settings, using defaults", {
error: e instanceof Error ? e.message : String(e),
});
return {};
}
}

const SESSION_STATUSES: SessionStatus[] = [
"created",
"active",
Expand Down
92 changes: 82 additions & 10 deletions packages/control-plane/src/scheduler/durable-object.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ vi.mock("../db/automation-store", () => ({
}));

const mockSessionStoreCreate = vi.fn().mockResolvedValue(undefined);
const mockSessionStoreUpdateStatus = vi.fn().mockResolvedValue(undefined);
vi.mock("../db/session-index", () => ({
SessionIndexStore: vi.fn().mockImplementation(() => ({
create: mockSessionStoreCreate,
updateStatus: mockSessionStoreUpdateStatus,
})),
}));

Expand Down Expand Up @@ -83,10 +85,70 @@ function createMockSessionStub(): DurableObjectStub {
} as never;
}

function createEmptyDbMock(): D1Database {
return {
prepare: vi.fn(() => ({
bind: vi.fn(() => ({
first: vi.fn(async () => null),
})),
})),
} as unknown as D1Database;
}

function createIntegrationSettingsDbMock(): D1Database {
return {
prepare: vi.fn((query: string) => ({
bind: vi.fn((integrationId: string, repo?: string) => ({
first: vi.fn(async () => {
if (query.includes("integration_settings")) {
if (integrationId === "code-server") {
return {
settings: JSON.stringify({ enabledRepos: null, defaults: { enabled: true } }),
};
}
if (integrationId === "sandbox") {
return {
settings: JSON.stringify({
enabledRepos: null,
defaults: { tunnelPorts: [3000], terminalEnabled: true },
}),
};
}
}

if (query.includes("integration_repo_settings") && repo === "acme/web-app") {
if (integrationId === "sandbox") {
return { settings: JSON.stringify({ tunnelPorts: [5173] }) };
}
}

return null;
}),
})),
})),
} as unknown as D1Database;
}

async function getInitBody(fetchMock: ReturnType<typeof vi.fn>): Promise<Record<string, unknown>> {
const initCall = fetchMock.mock.calls.find((call) => {
const input = call[0];
const url =
typeof input === "string" ? input : input instanceof Request ? input.url : String(input);
return new URL(url).pathname === "/internal/init";
});

expect(initCall).toBeDefined();
const [input, init] = initCall!;
if (input instanceof Request) {
return (await input.json()) as Record<string, unknown>;
}
return JSON.parse(String(init?.body)) as Record<string, unknown>;
}

function createEnv(overrides?: Partial<Env>): Env {
const sessionStub = createMockSessionStub();
return {
DB: {} as D1Database,
DB: createEmptyDbMock(),
SESSION: {
idFromName: vi.fn().mockReturnValue("fake-do-id"),
get: vi.fn().mockReturnValue(sessionStub),
Expand Down Expand Up @@ -200,18 +262,28 @@ describe("SchedulerDO", () => {
);

expect(res.status).toBe(200);
const initCall = fetchMock.mock.calls.find((call) => {
const input = call[0];
const url =
typeof input === "string" ? input : input instanceof Request ? input.url : String(input);
return new URL(url).pathname === "/internal/init";
});

expect(initCall).toBeDefined();
const initBody = JSON.parse(String(initCall?.[1]?.body));
const initBody = await getInitBody(fetchMock);
expect(initBody.reasoningEffort).toBe("high");
});

it("passes resolved code-server and sandbox settings into automation sessions", async () => {
mockStore.getOverdueAutomations.mockResolvedValue([sampleAutomation]);

const env = createEnv({ DB: createIntegrationSettingsDbMock() });
const stub = env.SESSION.get(env.SESSION.idFromName("any"));
const fetchMock = vi.mocked(stub.fetch);

const scheduler = createSchedulerDO(env);
const res = await scheduler.fetch(
new Request("http://internal/internal/tick", { method: "POST" })
);

expect(res.status).toBe(200);
const initBody = await getInitBody(fetchMock);
expect(initBody.codeServerEnabled).toBe(true);
expect(initBody.sandboxSettings).toEqual({ tunnelPorts: [5173], terminalEnabled: true });
});

it("skips automation with active run (concurrency guard)", async () => {
mockStore.getOverdueAutomations.mockResolvedValue([sampleAutomation]);
mockStore.getActiveRunForAutomation.mockResolvedValue({
Expand Down
84 changes: 39 additions & 45 deletions packages/control-plane/src/scheduler/durable-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import {
type TriggerConfig,
} from "@open-inspect/shared";
import { AutomationStore, toAutomationRun, type AutomationRow } from "../db/automation-store";
import { SessionIndexStore } from "../db/session-index";
import { UserStore } from "../db/user-store";
import { createRequestMetrics } from "../db/instrumented-d1";
import { generateId } from "../auth/crypto";
import { createLogger, parseLogLevel } from "../logger";
import type { Logger } from "../logger";
import type { Env } from "../types";
import { initializeSession } from "../session/initialize";
import {
resolveCodeServerEnabled,
resolveSandboxSettings,
} from "../session/integration-settings-resolution";

/** Max automations to process per tick (backpressure). */
const MAX_PER_TICK = 25;
Expand Down Expand Up @@ -542,30 +547,6 @@ export class SchedulerDO extends DurableObject<Env> {
runId: string
): Promise<{ sessionId: string }> {
const sessionId = generateId();
const doId = this.env.SESSION.idFromName(sessionId);
const stub = this.env.SESSION.get(doId);

// Initialize the session DO
const initResponse = await stub.fetch("http://internal/internal/init", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
sessionName: sessionId,
repoOwner: automation.repo_owner,
repoName: automation.repo_name,
repoId: automation.repo_id,
defaultBranch: automation.base_branch,
model: automation.model,
reasoningEffort: automation.reasoning_effort,
title: `[Auto] ${automation.name}`,
userId: automation.created_by,
spawnSource: "automation",
}),
});

if (!initResponse.ok) {
throw new Error(`Session init failed with status ${initResponse.status}`);
}

// Resolve the canonical user_id for the session index.
// New automations (post-Phase 5) have user_id populated at creation time, so this
Expand All @@ -587,26 +568,39 @@ export class SchedulerDO extends DurableObject<Env> {
}
}

// Index the session in D1
const now = Date.now();
const sessionStore = new SessionIndexStore(this.env.DB);
await sessionStore.create({
id: sessionId,
title: `[Auto] ${automation.name}`,
repoOwner: automation.repo_owner,
repoName: automation.repo_name,
model: automation.model,
reasoningEffort: automation.reasoning_effort,
baseBranch: automation.base_branch,
status: "created",
spawnSource: "automation",
spawnDepth: 0,
automationId: automation.id,
automationRunId: runId,
userId,
createdAt: now,
updatedAt: now,
});
const [codeServerEnabled, sandboxSettings] = await Promise.all([
resolveCodeServerEnabled(this.env.DB, automation.repo_owner, automation.repo_name),
resolveSandboxSettings(this.env.DB, automation.repo_owner, automation.repo_name),
]);

await initializeSession(
this.env,
{
sessionId,
repoOwner: automation.repo_owner,
repoName: automation.repo_name,
repoId: automation.repo_id,
defaultBranch: automation.base_branch,
title: `[Auto] ${automation.name}`,
model: automation.model,
reasoningEffort: automation.reasoning_effort,
participantUserId: automation.created_by,
platformUserId: userId,
scmTokenEncrypted: null,
scmRefreshTokenEncrypted: null,
codeServerEnabled,
sandboxSettings,
spawnSource: "automation",
spawnDepth: 0,
automationId: automation.id,
automationRunId: runId,
},
{
trace_id: `automation:${automation.id}`,
request_id: runId,
metrics: createRequestMetrics(),
}
);

return { sessionId };
}
Expand Down
22 changes: 22 additions & 0 deletions packages/control-plane/src/session/initialize.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ describe("initializeSession", () => {
spawnDepth: 0,
codeServerEnabled: false,
sandboxSettings: {},
automationId: null,
automationRunId: null,
};

const ctx = {
Expand Down Expand Up @@ -127,12 +129,32 @@ describe("initializeSession", () => {
expect(d1Entry.parentSessionId).toBeNull();
expect(d1Entry.spawnSource).toBe("user");
expect(d1Entry.spawnDepth).toBe(0);
expect(d1Entry.automationId).toBeNull();
expect(d1Entry.automationRunId).toBeNull();
expect(d1Entry.scmLogin).toBe("acmedev");
expect(d1Entry.userId).toBe("platform-user-1");
expect(d1Entry.createdAt).toBeTypeOf("number");
expect(d1Entry.updatedAt).toBeTypeOf("number");
});

it("passes automation lineage to D1 session index", async () => {
await initializeSession(
createEnv(),
{
...baseInput,
spawnSource: "automation",
automationId: "auto-1",
automationRunId: "run-1",
},
ctx as never
);

const d1Entry = createMock.mock.calls[0][0];
expect(d1Entry.spawnSource).toBe("automation");
expect(d1Entry.automationId).toBe("auto-1");
expect(d1Entry.automationRunId).toBe("run-1");
});

it("sends the correct body to DO init endpoint", async () => {
await initializeSession(createEnv(), baseInput, ctx as never);

Expand Down
4 changes: 4 additions & 0 deletions packages/control-plane/src/session/initialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ export interface SessionInitInput {
parentSessionId?: string | null;
spawnSource?: SpawnSource;
spawnDepth?: number;
automationId?: string | null;
automationRunId?: string | null;
}

/**
Expand Down Expand Up @@ -78,6 +80,8 @@ export async function initializeSession(
parentSessionId: input.parentSessionId,
spawnSource: input.spawnSource,
spawnDepth: input.spawnDepth,
automationId: input.automationId,
automationRunId: input.automationRunId,
scmLogin: input.scmLogin || null,
userId: input.platformUserId,
createdAt: now,
Expand Down
Loading
Loading