Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/lib/messaging/applier/host-state-applier.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { beforeEach, describe, expect, it, vi } from "vitest";

import type { SandboxMessagingPlan } from "../manifest";
import { compactSandboxMessagingPlanForPersistence } from "../persistence";
import { MessagingHostStateApplier } from "./host-state-applier";
import { MessagingSetupApplier } from "./setup-applier";
import * as registry from "../../state/registry";
Expand Down Expand Up @@ -73,6 +74,38 @@ describe("MessagingHostStateApplier", () => {
});
});

it("hydrates compact existing plans before merging host state", () => {
registryMock.__setSandbox("demo", {
name: "demo",
messaging: {
schemaVersion: 1,
plan: compactSandboxMessagingPlanForPersistence(makePlan(["telegram"])),
},
});

const updated = MessagingHostStateApplier.applyPlanToRegistry(
"demo",
makePlan(["slack"], {
credentialBindings: [
makeCredentialBinding("slack", "bot"),
makeCredentialBinding("slack", "app"),
],
}),
{ mode: "merge" },
);

expect(updated).toBe(true);
const entry = registryMock.__getSandbox("demo");
const plan = (entry?.messaging as { plan: SandboxMessagingPlan }).plan;
expect(plan.channels.map((channel) => channel.channelId)).toEqual(["telegram", "slack"]);
expect(
plan.channels
.find((channel) => channel.channelId === "telegram")
?.hooks.some((hook) => hook.channelId === "telegram"),
).toBe(true);
expect(plan.agentRender.some((entry) => entry.channelId === "telegram")).toBe(true);
});

it("can merge a single-channel add plan into existing messaging state", () => {
registryMock.__setSandbox("demo", {
name: "demo",
Expand Down
10 changes: 8 additions & 2 deletions src/lib/messaging/applier/host-state-applier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

import type { SandboxMessagingPlan } from "../manifest";
import { hydrateDerivedSandboxMessagingPlanFields } from "../persistence";
import { parseSandboxMessagingPlan } from "../plan-validation";
import * as registry from "../../state/registry";
import { MessagingSetupApplier } from "./setup-applier";
import type { MessagingSetupEnvOptions } from "./types";
Expand Down Expand Up @@ -42,9 +44,13 @@ export class MessagingHostStateApplier {
if (plan.sandboxName !== sandboxName) return false;
const entry = registry.getSandbox(sandboxName);
if (!entry) return false;
const existingPlan = parseSandboxMessagingPlan(entry.messaging?.plan);
const hydratedExistingPlan = existingPlan
? hydrateDerivedSandboxMessagingPlanFields(existingPlan)
: null;
const nextPlan =
options.mode === "merge" && entry.messaging?.plan
? mergeSandboxMessagingPlans(entry.messaging.plan, plan)
options.mode === "merge" && hydratedExistingPlan
? mergeSandboxMessagingPlans(hydratedExistingPlan, plan)
: clonePlan(plan);
return registry.updateSandbox(sandboxName, {
messaging: {
Expand Down
123 changes: 74 additions & 49 deletions src/lib/messaging/compiler/workflow-planner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
createBuiltInRenderTemplateResolver,
} from "../channels";
import { createBuiltInMessagingHookRegistry, MessagingHookRegistry } from "../hooks";
import { createChannelManifestRegistry, type ChannelManifest } from "../manifest";
import { MessagingWorkflowPlanner } from "./workflow-planner";

const TEST_CREDENTIALS: Readonly<Record<string, string>> = {
Expand All @@ -24,6 +25,34 @@ const TEST_WECHAT_LOGIN = {
userId: "test-wechat-user",
} as const;

const CREDENTIAL_ONLY_MANIFEST: ChannelManifest = {
schemaVersion: 1,
id: "matrix",
displayName: "Matrix",
supportedAgents: ["openclaw"],
auth: { mode: "none" },
inputs: [
{
id: "botToken",
kind: "secret",
required: true,
envKey: "MATRIX_BOT_TOKEN",
},
],
credentials: [
{
id: "botToken",
sourceInput: "botToken",
providerName: "{sandboxName}-matrix",
providerEnvKey: "MATRIX_BOT_TOKEN",
placeholder: "MATRIX_BOT_TOKEN",
},
],
render: [],
state: {},
hooks: [],
};

function planner(): MessagingWorkflowPlanner {
return new MessagingWorkflowPlanner(
createBuiltInChannelManifestRegistry(),
Expand Down Expand Up @@ -446,55 +475,6 @@ describe("MessagingWorkflowPlanner", () => {
]);
});

it("refreshes missing manifest render entries from stale rebuild plans", async () => {
const existingPlan = await planner().buildPlan({
sandboxName: "demo",
agent: "hermes",
workflow: "onboard",
isInteractive: false,
configuredChannels: ["discord"],
credentialAvailability: {
DISCORD_BOT_TOKEN: true,
},
});
const stalePlan = {
...existingPlan,
credentialBindings: existingPlan.credentialBindings.map((binding) => ({
...binding,
credentialHash: "hash-discord-token",
})),
agentRender: [],
buildSteps: [],
};

const plan = await planner().buildRebuildPlanFromSandboxEntry({
sandboxName: "demo",
agent: "hermes",
sandboxEntry: {
name: "demo",
agent: "hermes",
messaging: { schemaVersion: 1, plan: stalePlan },
},
supportedChannelIds: ["discord"],
});

expect(plan?.workflow).toBe("rebuild");
expect(
plan?.credentialBindings.find((binding) => binding.providerEnvKey === "DISCORD_BOT_TOKEN")
?.credentialHash,
).toBe("hash-discord-token");
const discordEnvRender = plan?.agentRender.find(
(entry) =>
entry.channelId === "discord" &&
entry.kind === "env-lines" &&
entry.target === "~/.hermes/.env",
);
expect(discordEnvRender).toMatchObject({
kind: "env-lines",
lines: expect.arrayContaining(["DISCORD_BOT_TOKEN=openshell:resolve:env:DISCORD_BOT_TOKEN"]),
});
});

it("adds one manifest channel into an existing sandbox entry plan", async () => {
const existingPlan = await planner().buildPlan({
sandboxName: "demo",
Expand Down Expand Up @@ -578,6 +558,51 @@ describe("MessagingWorkflowPlanner", () => {
]);
});

it("does not trust credential availability from mismatched sandbox entry plans", async () => {
const registry = createChannelManifestRegistry([CREDENTIAL_ONLY_MANIFEST]);
const localPlanner = new MessagingWorkflowPlanner(
registry,
new MessagingHookRegistry(),
createBuiltInRenderTemplateResolver(),
);
const stalePlan = await localPlanner.buildPlan({
sandboxName: "other-sandbox",
agent: "openclaw",
workflow: "onboard",
isInteractive: false,
configuredChannels: ["matrix"],
credentialAvailability: {
MATRIX_BOT_TOKEN: true,
},
});

const plan = await localPlanner.buildChannelAddPlanFromSandboxEntry({
sandboxName: "demo",
agent: "openclaw",
sandboxEntry: {
name: "demo",
messaging: {
schemaVersion: 1,
plan: stalePlan,
},
},
channelId: "matrix",
isInteractive: false,
supportedChannelIds: ["matrix"],
});

expect(plan.channels).toHaveLength(1);
expect(plan.channels[0]).toMatchObject({
channelId: "matrix",
active: false,
configured: true,
});
expect(plan.credentialBindings[0]).toMatchObject({
channelId: "matrix",
credentialAvailable: false,
});
});

it("mutates disabled channel state in an existing sandbox entry plan", async () => {
const existingPlan = await planner().buildPlan({
sandboxName: "demo",
Expand Down
81 changes: 11 additions & 70 deletions src/lib/messaging/compiler/workflow-planner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

import { MessagingHookRegistry } from "../hooks";
import { hydrateDerivedSandboxMessagingPlanFields } from "../persistence";
import { parseSandboxMessagingPlan } from "../plan-validation";
import type {
ChannelManifestRegistry,
MessagingAgentId,
Expand Down Expand Up @@ -68,7 +70,7 @@ export class MessagingWorkflowPlanner {
supportedChannelIds: context.supportedChannelIds,
credentialAvailability: mergeAvailability(
credentialAvailabilityFromPlan(existingPlan),
this.credentialAvailabilityFromSandboxEntry(context.sandboxEntry, [context.channelId]),
this.credentialAvailabilityFromSandboxEntry(context, [context.channelId]),
context.credentialAvailability,
),
});
Expand Down Expand Up @@ -101,34 +103,11 @@ export class MessagingWorkflowPlanner {
): Promise<SandboxMessagingPlan | null> {
const existingPlan = readSandboxEntryPlan(context);
if (existingPlan) {
const normalizedPlan = setPlanDisabledChannels(
return setPlanDisabledChannels(
existingPlan,
disabledChannelsFromSandboxEntry(context.sandboxEntry, existingPlan),
"rebuild",
);
if (!planMissingActiveChannelRender(normalizedPlan)) return normalizedPlan;

const configuredChannels = uniqueChannels(
normalizedPlan.channels.map((channel) => channel.channelId),
);
const refreshedPlan = await this.buildPlan({
sandboxName: context.sandboxName,
agent: context.agent,
workflow: "rebuild",
isInteractive: false,
configuredChannels,
disabledChannels: normalizedPlan.disabledChannels,
supportedChannelIds: context.supportedChannelIds,
credentialAvailability: mergeAvailability(
credentialAvailabilityFromPlan(normalizedPlan),
this.credentialAvailabilityFromSandboxEntry(context.sandboxEntry, configuredChannels),
context.credentialAvailability,
),
});
return mergeSandboxMessagingPlans(
normalizedPlan,
preserveCredentialBindingHashes(normalizedPlan, refreshedPlan),
);
}
return null;
}
Expand Down Expand Up @@ -174,10 +153,10 @@ export class MessagingWorkflowPlanner {
}

private credentialAvailabilityFromSandboxEntry(
sandboxEntry: MessagingWorkflowPlannerSandboxEntry | null | undefined,
context: Pick<MessagingWorkflowPlannerSandboxContext, "agent" | "sandboxEntry" | "sandboxName">,
channelIds: readonly MessagingChannelId[],
): MessagingCompilerCredentialAvailability | undefined {
const plan = sandboxEntry?.messaging?.plan;
const plan = readSandboxEntryPlan(context);
if (!plan) return undefined;

const availability: Record<string, boolean> = {};
Expand Down Expand Up @@ -247,16 +226,11 @@ function onlyConfiguredChannels(
function readSandboxEntryPlan(
context: Pick<MessagingWorkflowPlannerSandboxContext, "agent" | "sandboxEntry" | "sandboxName">,
): SandboxMessagingPlan | null {
const plan = context.sandboxEntry?.messaging?.plan;
if (
!plan ||
plan.schemaVersion !== 1 ||
plan.sandboxName !== context.sandboxName ||
plan.agent !== context.agent
) {
return null;
}
return clonePlan(plan);
const plan = parseSandboxMessagingPlan(context.sandboxEntry?.messaging?.plan, {
sandboxName: context.sandboxName,
agent: context.agent,
});
return plan ? hydrateDerivedSandboxMessagingPlanFields(plan) : null;
}

function disabledChannelsFromSandboxEntry(
Expand Down Expand Up @@ -376,39 +350,6 @@ function setPlanDisabledChannels(
});
}

function preserveCredentialBindingHashes(
existing: SandboxMessagingPlan,
incoming: SandboxMessagingPlan,
): SandboxMessagingPlan {
const existingHashes = new Map(
existing.credentialBindings
.filter((binding) => binding.credentialHash)
.map((binding) => [credentialBindingKey(binding), binding.credentialHash] as const),
);
if (existingHashes.size === 0) return incoming;

return clonePlan({
...incoming,
credentialBindings: incoming.credentialBindings.map((binding) => ({
...binding,
credentialHash: binding.credentialHash ?? existingHashes.get(credentialBindingKey(binding)),
})),
});
}

function credentialBindingKey(
binding: Pick<SandboxMessagingPlan["credentialBindings"][number], "channelId" | "providerEnvKey">,
): string {
return binding.channelId + "\0" + binding.providerEnvKey;
}

function planMissingActiveChannelRender(plan: SandboxMessagingPlan): boolean {
const renderedChannels = new Set(plan.agentRender.map((entry) => entry.channelId));
return plan.channels.some(
(channel) => channel.active && !channel.disabled && !renderedChannels.has(channel.channelId),
);
}

function removePlanChannel(
plan: SandboxMessagingPlan,
channelId: MessagingChannelId,
Expand Down
1 change: 1 addition & 0 deletions src/lib/messaging/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ export * from "./compiler";
export * from "./hooks";
export * from "./applier";
export * from "./manifest";
export * from "./persistence";
export * from "./utils";
Loading
Loading