Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions cloudflare/scripts/test_e2e_runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ async function run() {
const dbState = buildMockDb();
const queue: QueueTask[] = [];

const API_INGRESS_TOKEN = "e2e-ingress-token";
const WORKFLOW_INTERNAL_TOKEN = "e2e-workflow-internal-token";

const workflowEnv: Env = {
DB: dbState.db,
AUTOMATION_QUEUE: {
Expand All @@ -120,21 +123,35 @@ async function run() {
} as unknown as Queue<QueueTask>,
WORKFLOW_SERVICE: {} as Fetcher,
ENV_NAME: "test",
OPENAI_API_KEY: "fixture-openai"
OPENAI_API_KEY: "fixture-openai",
WORKFLOW_INTERNAL_TOKEN
};

const workflowService = {
async fetch(input: string | URL | Request, init?: RequestInit) {
const originalRequest = input instanceof Request ? input : undefined;
const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url;
const path = new URL(url).pathname;
const forwardedToken =
(init?.headers as Record<string, string> | undefined)?.["x-workflow-internal-token"] ??
originalRequest?.headers.get("x-workflow-internal-token") ??
"";
if (path === "/health/config") {
return workflowWorker.fetch(new Request("https://workflow.example/health/config"), workflowEnv);
return workflowWorker.fetch(
new Request("https://workflow.example/health/config", {
headers: { "x-workflow-internal-token": forwardedToken }
}),
workflowEnv
);
}
if (path === "/run-sync" || path === "/run-async") {
return workflowWorker.fetch(
new Request(`https://workflow.example${path}`, {
method: "POST",
headers: { "content-type": "application/json" },
headers: {
"content-type": "application/json",
"x-workflow-internal-token": forwardedToken
},
body: String(init?.body ?? "{}")
}),
workflowEnv
Expand All @@ -152,7 +169,9 @@ async function run() {
}
} as unknown as Queue<QueueTask>,
WORKFLOW_SERVICE: workflowService,
ENV_NAME: "test"
ENV_NAME: "test",
API_INGRESS_TOKEN,
WORKFLOW_INTERNAL_TOKEN
};

const originalFetch = globalThis.fetch;
Expand All @@ -173,7 +192,8 @@ async function run() {
method: "POST",
headers: {
"content-type": "application/json",
"x-trace-id": "e2e-sync-1"
"x-trace-id": "e2e-sync-1",
"x-api-token": API_INGRESS_TOKEN
},
body: JSON.stringify({
prompt: "hello from e2e"
Expand All @@ -188,7 +208,8 @@ async function run() {
method: "POST",
headers: {
"content-type": "application/json",
"x-trace-id": "e2e-async-1"
"x-trace-id": "e2e-async-1",
"x-api-token": API_INGRESS_TOKEN
},
body: JSON.stringify({ hello: "async" })
}),
Expand All @@ -209,7 +230,8 @@ async function run() {
DB: dbState.db,
AUTOMATION_QUEUE: apiEnv.AUTOMATION_QUEUE,
WORKFLOW_SERVICE: workflowService,
ENV_NAME: "test"
ENV_NAME: "test",
WORKFLOW_INTERNAL_TOKEN
} as Env
);

Expand All @@ -222,7 +244,8 @@ async function run() {
method: "POST",
headers: {
"content-type": "application/json",
"x-trace-id": "e2e-fail-1"
"x-trace-id": "e2e-fail-1",
"x-api-token": API_INGRESS_TOKEN
},
body: JSON.stringify({ text: "will fail due missing secret" })
}),
Expand All @@ -242,7 +265,8 @@ async function run() {
DB: dbState.db,
AUTOMATION_QUEUE: apiEnv.AUTOMATION_QUEUE,
WORKFLOW_SERVICE: workflowService,
ENV_NAME: "test"
ENV_NAME: "test",
WORKFLOW_INTERNAL_TOKEN
} as Env
);

Expand Down
12 changes: 9 additions & 3 deletions cloudflare/scripts/test_failed_run_replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,17 @@ async function run() {
queued.push(task);
}
} as unknown as Queue<QueueTask>,
ENV_NAME: "test"
ENV_NAME: "test",
OPS_DASHBOARD_TOKEN: "failed-run-replay-token",
OPS_DASHBOARD_WRITE_TOKEN: "failed-run-replay-token"
};

const authHeaders = { authorization: `Bearer ${env.OPS_DASHBOARD_TOKEN}` };

const successResponse = await opsDashboardWorker.fetch(
new Request("https://ops.example.com/api/replay/trace-failed", {
method: "POST"
method: "POST",
headers: authHeaders
}),
env as any
);
Expand All @@ -113,7 +118,8 @@ async function run() {

const invalidResponse = await opsDashboardWorker.fetch(
new Request("https://ops.example.com/api/replay/trace-succeeded", {
method: "POST"
method: "POST",
headers: authHeaders
}),
env as any
);
Expand Down
15 changes: 12 additions & 3 deletions cloudflare/scripts/test_failure_modes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,26 @@ import assert from "node:assert/strict";
import type { Env } from "../shared/types";
import workflowWorker from "../workers/workflow/src/index";

const WORKFLOW_INTERNAL_TOKEN = "failure-modes-internal-token";

function env(): Env {
return {
DB: {} as D1Database,
AUTOMATION_QUEUE: {} as Queue<unknown>,
WORKFLOW_SERVICE: {} as Fetcher,
ENV_NAME: "test"
ENV_NAME: "test",
WORKFLOW_INTERNAL_TOKEN
} as unknown as Env;
}

async function run() {
const unsupportedTaskResponse = await workflowWorker.fetch(
new Request("https://workflow.example/run-sync", {
method: "POST",
headers: { "content-type": "application/json" },
headers: {
"content-type": "application/json",
"x-workflow-internal-token": WORKFLOW_INTERNAL_TOKEN
},
body: JSON.stringify({
kind: "unsupported_kind",
traceId: "trace-unsupported",
Expand All @@ -33,7 +39,10 @@ async function run() {
const missingRouteResponse = await workflowWorker.fetch(
new Request("https://workflow.example/run-sync", {
method: "POST",
headers: { "content-type": "application/json" },
headers: {
"content-type": "application/json",
"x-workflow-internal-token": WORKFLOW_INTERNAL_TOKEN
},
body: JSON.stringify({
kind: "http_route",
traceId: "trace-missing-route",
Expand Down
31 changes: 25 additions & 6 deletions cloudflare/scripts/test_ingress_security.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,25 @@ async function run() {
}

{
const { env } = createTestEnv({ API_RATE_LIMIT_PER_MINUTE: "1" });
const { env } = createTestEnv({
API_RATE_LIMIT_PER_MINUTE: "1",
API_INGRESS_TOKEN: "rate-limit-token"
});

const first = await apiWorker.fetch(
makeRequest("/api/webhook_echo", body, "security-rate-1", { "cf-connecting-ip": "203.0.113.1" }),
makeRequest("/api/webhook_echo", body, "security-rate-1", {
"cf-connecting-ip": "203.0.113.1",
"x-api-token": "rate-limit-token"
}),
env
);
assert.equal(first.status, 202);

const second = await apiWorker.fetch(
makeRequest("/api/webhook_echo", body, "security-rate-2", { "cf-connecting-ip": "203.0.113.1" }),
makeRequest("/api/webhook_echo", body, "security-rate-2", {
"cf-connecting-ip": "203.0.113.1",
"x-api-token": "rate-limit-token"
}),
env
);
assert.equal(second.status, 429);
Expand All @@ -152,6 +161,7 @@ async function run() {
{
const { env } = createTestEnv({
API_RATE_LIMIT_PER_MINUTE: "10",
API_INGRESS_TOKEN: "route-limit-token",
API_ROUTE_LIMITS_JSON: JSON.stringify({
webhook_echo: {
rpm: 1,
Expand All @@ -161,19 +171,28 @@ async function run() {
});

const first = await apiWorker.fetch(
makeRequest("/api/webhook_echo", body, "security-route-limit-1", { "cf-connecting-ip": "203.0.113.90" }),
makeRequest("/api/webhook_echo", body, "security-route-limit-1", {
"cf-connecting-ip": "203.0.113.90",
"x-api-token": "route-limit-token"
}),
env
);
assert.equal(first.status, 202);

const blocked = await apiWorker.fetch(
makeRequest("/api/webhook_echo", body, "security-route-limit-2", { "cf-connecting-ip": "203.0.113.90" }),
makeRequest("/api/webhook_echo", body, "security-route-limit-2", {
"cf-connecting-ip": "203.0.113.90",
"x-api-token": "route-limit-token"
}),
env
);
assert.equal(blocked.status, 429);

const allowedOtherRoute = await apiWorker.fetch(
makeRequest("/api/noop_ack", body, "security-route-limit-3", { "cf-connecting-ip": "203.0.113.90" }),
makeRequest("/api/noop_ack", body, "security-route-limit-3", {
"cf-connecting-ip": "203.0.113.90",
"x-api-token": "route-limit-token"
}),
env
);
assert.equal(allowedOtherRoute.status, 202);
Expand Down
5 changes: 4 additions & 1 deletion cloudflare/scripts/test_route_fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ function createTestEnv() {
});
}
} as unknown as Fetcher,
ENV_NAME: "test"
ENV_NAME: "test",
API_INGRESS_TOKEN: "fixture-ingress-token",
WORKFLOW_INTERNAL_TOKEN: "fixture-workflow-internal-token"
};

return { env, queuedTasks, syncTasks };
Expand Down Expand Up @@ -154,6 +156,7 @@ function buildPostRequest(path: string, body: unknown, traceId: string) {
headers: {
"content-type": "application/json",
"x-trace-id": traceId,
"x-api-token": "fixture-ingress-token",
origin: "https://fixture-client.example"
},
body: JSON.stringify(body)
Expand Down
7 changes: 5 additions & 2 deletions cloudflare/scripts/test_route_validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ function env() {
});
}
} as unknown as Fetcher,
ENV_NAME: "test"
ENV_NAME: "test",
API_INGRESS_TOKEN: "route-validation-token",
WORKFLOW_INTERNAL_TOKEN: "route-validation-internal"
} as Env,
queuedTasks
};
Expand All @@ -80,7 +82,8 @@ async function run() {
method: "POST",
headers: {
"content-type": "application/json",
"x-trace-id": "route-validation-1"
"x-trace-id": "route-validation-1",
"x-api-token": "route-validation-token"
},
body: JSON.stringify({ amount: "bad" })
}),
Expand Down
10 changes: 10 additions & 0 deletions cloudflare/scripts/test_service_registry_urls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ function isRetryableStatus(status: number) {
return status === 408 || status === 425 || status === 429 || status >= 500;
}

function isReachableButBlockedStatus(status: number) {
return status === 401 || status === 403;
}

async function fetchWithTimeout(url: string, method: "HEAD" | "GET") {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), REQUEST_TIMEOUT_MS);
Expand Down Expand Up @@ -83,6 +87,9 @@ async function checkSingleUrl(url: string): Promise<UrlCheckResult> {
if (status >= 200 && status < 400) {
return { url, ok: true, status };
}
if (isReachableButBlockedStatus(status)) {
return { url, ok: true, status };
}

if (attempt < MAX_RETRIES && isRetryableStatus(status)) {
await sleep(250 * (attempt + 1));
Expand Down Expand Up @@ -113,6 +120,9 @@ async function checkSingleUrl(url: string): Promise<UrlCheckResult> {
if (Number.isFinite(statusCode) && statusCode >= 200 && statusCode < 400) {
return { url, ok: true, status: statusCode };
}
if (Number.isFinite(statusCode) && isReachableButBlockedStatus(statusCode)) {
return { url, ok: true, status: statusCode };
}
return { url, ok: false, status: statusCode, reason: `curl status ${statusCode}` };
}

Expand Down
13 changes: 9 additions & 4 deletions cloudflare/scripts/test_workspace_isolation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,24 @@ async function run() {
const env = {
DB: buildMockDb(),
AUTOMATION_QUEUE: {} as Queue<unknown>,
ENV_NAME: "test"
ENV_NAME: "test",
OPS_DASHBOARD_TOKEN: "workspace-isolation-token",
OPS_DASHBOARD_READ_TOKEN: "workspace-isolation-token",
OPS_DASHBOARD_WRITE_TOKEN: "workspace-isolation-token"
};

const authHeaders = { authorization: `Bearer ${env.OPS_DASHBOARD_TOKEN}` };

const allRuns = await opsDashboardWorker.fetch(
new Request("https://ops.example.com/api/runs?limit=20", { method: "GET" }),
new Request("https://ops.example.com/api/runs?limit=20", { method: "GET", headers: authHeaders }),
env as any
);
assert.equal(allRuns.status, 200);
const allPayload = (await allRuns.json()) as { runs: unknown[] };
assert.equal(allPayload.runs.length, 2);

const teamRuns = await opsDashboardWorker.fetch(
new Request("https://ops.example.com/api/runs?limit=20&workspace=team-a", { method: "GET" }),
new Request("https://ops.example.com/api/runs?limit=20&workspace=team-a", { method: "GET", headers: authHeaders }),
env as any
);
assert.equal(teamRuns.status, 200);
Expand All @@ -146,7 +151,7 @@ async function run() {
assert.equal(teamPayload.runs[0]?.workspaceId, "team-a");

const teamDeadLetters = await opsDashboardWorker.fetch(
new Request("https://ops.example.com/api/dead-letters?workspace=team-a", { method: "GET" }),
new Request("https://ops.example.com/api/dead-letters?workspace=team-a", { method: "GET", headers: authHeaders }),
env as any
);
assert.equal(teamDeadLetters.status, 200);
Expand Down
1 change: 1 addition & 0 deletions cloudflare/shared/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export interface Env {
ENABLED_HTTP_ROUTES?: string;
DISABLED_HTTP_ROUTES?: string;
API_INGRESS_TOKEN?: string;
WORKFLOW_INTERNAL_TOKEN?: string;
API_HMAC_SECRET?: string;
API_HMAC_MAX_SKEW_SECONDS?: string;
API_RATE_LIMIT_PER_MINUTE?: string;
Expand Down
Loading
Loading