diff --git a/CONTEXT.md b/CONTEXT.md index c0339d9..aa8ab05 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -105,7 +105,7 @@ The side-effect record the host executes after one runtime turn. Lives in `crate ## EffectsSink -The typed per-field hook surface a [Runtime Host](#runtime-host) implements to apply one turn's [`Effects`](#effects); the **apply policy** that drives it lives in core, once. `Effects::apply(&self, sink: &mut impl EffectsSink)` (`crates/microflow-core/src/runtime/context.rs`) iterates the fields in the **canonical order** — `outbound_bytes → cancellations → wakeups → cloud_requests → component_events` — calling one hook each: `write_bytes`, `cancel_wakeup`, `arm_wakeup`, `perform_cloud`, `dispatch_event`. Bytes first (wire latency); cancel-before-arm (so a cancel + re-arm of the same logical timer in one turn is safe); cloud calls launched before UI events leave; UI events last (they leave the runtime and do not feed back this turn). Decided in [ADR-0008](docs/adr/0008-effects-apply-policy.md) after the two hosts' inline apply loops had already drifted in order. The platform *primitives* behind each hook stay per-host (desktop: serial flush + Tauri `emit` + Tokio timer; browser: `connection.write` + store ingest + `setTimeout`). The desktop `Actor` calls `Effects::apply` directly; the browser reactor cannot reach into Rust, so it mirrors the same shape in `apps/web/src/lib/firmata/effects-sink.ts` (`applyEffects` + an `EffectsSink` interface `FlowReactor` implements), held in lockstep by a conformance test on both sides (`context::apply_tests` / `__tests__/effects-sink.test.ts`). Adding an `Effects` field adds a hook here — a compile error in every sink until handled (exactly how ADR-0009's `cloud_requests` field forced `perform_cloud`, for [`CloudRequest`](#cloudrequest)s, into the order). +The typed per-field hook surface a [Runtime Host](#runtime-host) implements to apply one turn's [`Effects`](#effects); the **apply policy** that drives it lives in core, once. `Effects::apply(&self, sink: &mut impl EffectsSink)` (`crates/microflow-core/src/runtime/context.rs`) iterates the fields in the **canonical order** — `outbound_bytes → cancellations → wakeups → cloud_requests → component_events` — calling one hook each: `write_bytes`, `cancel_wakeup`, `arm_wakeup`, `perform_cloud`, `dispatch_event`. Bytes first (wire latency); cancel-before-arm (so a cancel + re-arm of the same logical timer in one turn is safe); cloud calls launched before UI events leave; UI events last (they leave the runtime and do not feed back this turn). Decided in [ADR-0008](docs/adr/0008-effects-apply-policy.md) after the two hosts' inline apply loops had already drifted in order. The platform *primitives* behind each hook stay per-host (desktop: serial flush + Tauri `emit` + Tokio timer; browser: `connection.write` + store ingest + `setTimeout`). The desktop `Actor` calls `Effects::apply` directly; the browser reactor cannot reach into Rust, so it mirrors the same shape in `apps/web/src/lib/firmata/effects-sink.ts` (`applyEffects` + an `EffectsSink` interface `FlowReactor` implements). The mirror is held **structurally**, not by a test alone: `applyEffects` drives an `EFFECT_HANDLERS` map and an `APPLY_ORDER` tuple, both typed exhaustive over `keyof Effects`, so a new `Effects` field is a TypeScript compile error (unhandled / unordered) on the browser side just as it is a missing trait method on every Rust sink — the conformance test (`context::apply_tests` / `__tests__/effects-sink.test.ts`) now asserts the runtime *order*, no longer carrying the coverage guarantee alone. Adding an `Effects` field thus forces a hook in core's trait **and** in the browser handlers (exactly how ADR-0009's `cloud_requests` field forced `perform_cloud`, for [`CloudRequest`](#cloudrequest)s, into the order). ## BoardWriter @@ -176,8 +176,14 @@ services + the latest-wins LLM task table). Phase 3 added the browser performer: desktop `HttpLlmProvider`, with latest-wins `AbortController` cancellation) and re-enters the result through the wasm `injectEvent` binding. `MqttPublish` (MQTT + Figma) publishes over WSS via `mqtt.js`; inbound subscribe routing comes -back through the wasm `deliverMessage` binding, reconciled from -`subscriberWirings()` on each `applyFlow` (mirroring the desktop `flow_update`). +back through the wasm `deliverMessage` binding. The desired subscription set is +reconciled by core's **`reconcile_desired`** (the shared winner-selection policy: +collapse to one sub per `(broker, topic)`, routing kinds beat display-echo, ties +break on the lower node id — `crates/microflow-core/src/runtime/subscriptions.rs`) +via the wasm `reconcileSubscriptions()` binding on each `applyFlow`; each host then +diffs that set against its own live subscriptions and owns its broker I/O. The +same `reconcile_desired` feeds the desktop `flow_update`, so both hosts pick the +identical owner per topic instead of mirroring the policy in two languages. ## Cloud Node Registration diff --git a/Cargo.lock b/Cargo.lock index 4f5c74d..73b4b5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2803,7 +2803,7 @@ dependencies = [ [[package]] name = "microflow" -version = "0.9.1" +version = "0.9.2" dependencies = [ "async-trait", "dashmap", diff --git a/apps/web/src-tauri/src/lib.rs b/apps/web/src-tauri/src/lib.rs index 602dafa..4de762f 100644 --- a/apps/web/src-tauri/src/lib.rs +++ b/apps/web/src-tauri/src/lib.rs @@ -62,15 +62,12 @@ use tokio::sync::Mutex as TokioMutex; /// subscription so `flow_update` can tell when a topic's *owner* changed (and /// must be re-subscribed) versus left untouched — the broker holds one callback /// per topic, so the owner is part of a subscription's identity. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum SubKind { - /// Payload-only delivery routed to a component. - Plain, - /// (topic, payload) delivery routed to a component (Figma). - TopicAware, - /// Payload echoed to the frontend only — no per-component routing. - DisplayEcho, -} +/// +/// Re-exported from core: the subscription winner-selection policy that consumes +/// this kind ([`microflow_core::runtime::reconcile_desired`]) is the single +/// source shared with the browser host via the wasm `reconcileSubscriptions()` +/// binding, so both hosts pick the same owner per topic. +pub use microflow_core::runtime::SubKind; /// One active Figma/MQTT subscription. Identity is `(broker_id, topic)`; the /// `component_id`/`kind` record which wiring currently owns the broker's single diff --git a/apps/web/src-tauri/src/runtime/commands.rs b/apps/web/src-tauri/src/runtime/commands.rs index 1384384..1dee060 100644 --- a/apps/web/src-tauri/src/runtime/commands.rs +++ b/apps/web/src-tauri/src/runtime/commands.rs @@ -11,7 +11,7 @@ use crate::AppState; use crate::mqtt::broker::BrokerConfig; use microflow_core::flow::FlowUpdate; use crate::SubKind; -use microflow_core::runtime::{ComponentValue, SubscriberWiring}; +use microflow_core::runtime::{reconcile_desired, ComponentValue, DesiredSub, SubscriberWiring}; use std::collections::HashMap; use std::sync::Arc; use tauri::Emitter; @@ -42,19 +42,11 @@ fn microflow_uid(topic: &str) -> Option<&str> { parts.next().filter(|s| !s.is_empty()) } -/// The `SubKind` a wiring resolves to (records which callback shape owns the -/// broker's single per-topic callback). -fn sub_kind(wiring: &SubscriberWiring) -> SubKind { - match wiring { - SubscriberWiring::Plain { .. } => SubKind::Plain, - SubscriberWiring::TopicAware { .. } => SubKind::TopicAware, - SubscriberWiring::DisplayEcho { .. } => SubKind::DisplayEcho, - } -} - /// Map the live/desired subscription set to `uid -> broker_id` over its /// `microflow/{uid}` topics. Generic over the map value so it serves both the -/// `FigmaSubscription` (live) and `DesiredSub` (desired) maps. +/// `FigmaSubscription` (live) and core [`DesiredSub`] (desired) maps. Host-side: +/// it drives the Figma connect/disconnect lifecycle against this host's own +/// broker pool, so it stays here rather than in the shared reconcile policy. fn uid_brokers(set: &HashMap<(String, String), V>) -> HashMap { let mut out: HashMap = HashMap::new(); for (broker_id, topic) in set.keys() { @@ -65,31 +57,6 @@ fn uid_brokers(set: &HashMap<(String, String), V>) -> HashMap out } -/// One desired subscription, before it is reconciled against the live set. -#[derive(Clone)] -struct DesiredSub { - component_id: String, - kind: SubKind, -} - -impl DesiredSub { - /// Deterministic winner when several components resolve to the same - /// `(broker, topic)` — the broker keeps a single callback per topic. - /// Routing kinds (`Plain`/`TopicAware`) win over `DisplayEcho` so a - /// display-only echo never shadows component delivery; ties break on the - /// lower component id. Being deterministic (not dependent on component - /// `HashMap` iteration order) is what keeps the desired set stable across - /// `flow_update`s, so an unchanged flow reconciles to *zero* broker traffic. - fn beats(&self, other: &DesiredSub) -> bool { - let echo = |k: SubKind| matches!(k, SubKind::DisplayEcho); - match (echo(self.kind), echo(other.kind)) { - (false, true) => true, - (true, false) => false, - _ => self.component_id < other.component_id, - } - } -} - /// Update the flow with new nodes and edges #[tauri::command] pub async fn flow_update( @@ -182,23 +149,14 @@ pub async fn flow_update( .map_err(|_| "runtime actor dropped the flow_update reply".to_string())? }; - // Desired subscription set, one entry per (broker_id, topic). The broker - // keeps a single callback per topic, so when several components resolve to - // the same topic we pick a deterministic winner (see DesiredSub::beats), - // which also subsumes the old DisplayEcho dedup. - let mut desired: HashMap<(String, String), DesiredSub> = HashMap::new(); - for (component_id, wiring) in &component_wirings { - let key = (wiring.broker_id().to_string(), wiring.topic().to_string()); - let cand = DesiredSub { component_id: component_id.clone(), kind: sub_kind(wiring) }; - desired - .entry(key) - .and_modify(|cur| { - if cand.beats(cur) { - *cur = cand.clone(); - } - }) - .or_insert(cand); - } + // Desired subscription set, one entry per (broker_id, topic). The collapse + + // deterministic winner-selection is core policy (`reconcile_desired`), shared + // with the browser host so both pick the same owner per topic; here we key it + // by (broker, topic) for the diff against the live set below. + let desired: HashMap<(String, String), DesiredSub> = reconcile_desired(&component_wirings) + .into_iter() + .map(|d| ((d.broker_id.clone(), d.topic.clone()), d)) + .collect(); // Snapshot the live subscriptions, then release the lock for the network I/O // below (re-locked at the end to commit) — matching the original's @@ -244,7 +202,7 @@ pub async fn flow_update( let mut next_live: Vec = Vec::with_capacity(desired.len()); for ((broker_id, topic), d) in &desired { if let Some(existing) = live.get(&(broker_id.clone(), topic.clone())) { - if existing.component_id == d.component_id && existing.kind == d.kind { + if existing.component_id == d.node_id && existing.kind == d.kind { next_live.push(existing.clone()); continue; } @@ -253,12 +211,12 @@ pub async fn flow_update( if !state.mqtt_manager.is_connected(broker_id).await { log::warn!( "[MQTT] Broker {broker_id} not connected, skipping subscription for {}", - d.component_id + d.node_id ); continue; } - let component_id = d.component_id.clone(); + let component_id = d.node_id.clone(); let callback: Arc = match d.kind { SubKind::Plain | SubKind::TopicAware => { let actor = state.actor.clone(); diff --git a/apps/web/src/lib/firmata/__tests__/mqtt-subscriptions.test.ts b/apps/web/src/lib/firmata/__tests__/mqtt-subscriptions.test.ts index 077f965..8097da6 100644 --- a/apps/web/src/lib/firmata/__tests__/mqtt-subscriptions.test.ts +++ b/apps/web/src/lib/firmata/__tests__/mqtt-subscriptions.test.ts @@ -1,24 +1,19 @@ -// Reconcile-logic conformance for the browser MQTT host (ADR-0009 Phase 3), -// mirroring the desktop `flow_update` dedup/diff (commands.rs). +// Host-local subscription diffing for the browser MQTT host (ADR-0009 Phase 3). +// +// The collapse + winner-selection policy (`reconcileDesired`/`beats`) moved to +// core (`microflow-core` `subscriptions.rs` tests) — both hosts share it via the +// wasm `reconcileSubscriptions()` binding. What remains here is the host-local +// diff against this host's live set + the Figma uid lifecycle keys. import { describe, expect, test } from "bun:test"; import { - beats, diffSubscriptions, - reconcileDesired, subKey, uidBrokers, type ActiveSub, type SubKind, - type SubscriberWiring, } from "../cloud/mqtt-subscriptions"; -const wiring = (nodeId: string, kind: SubKind, brokerId: string, topic: string): SubscriberWiring => ({ - nodeId, - kind, - brokerId, - topic, -}); const active = (nodeId: string, kind: SubKind, brokerId: string, topic: string): ActiveSub => ({ nodeId, kind, @@ -26,28 +21,6 @@ const active = (nodeId: string, kind: SubKind, brokerId: string, topic: string): topic, }); -describe("reconcileDesired", () => { - test("a routing kind wins over displayEcho on the same (broker, topic)", () => { - const desired = reconcileDesired([ - wiring("zEcho", "displayEcho", "b", "t"), - wiring("aRoute", "topicAware", "b", "t"), - ]); - expect(desired.size).toBe(1); - expect(desired.get(subKey("b", "t"))?.nodeId).toBe("aRoute"); - expect(desired.get(subKey("b", "t"))?.kind).toBe("topicAware"); - }); - - test("ties break on the lower node id", () => { - const desired = reconcileDesired([wiring("n2", "plain", "b", "t"), wiring("n1", "plain", "b", "t")]); - expect(desired.get(subKey("b", "t"))?.nodeId).toBe("n1"); - }); - - test("distinct topics are each kept", () => { - const desired = reconcileDesired([wiring("n1", "plain", "b", "t1"), wiring("n1", "plain", "b", "t2")]); - expect(desired.size).toBe(2); - }); -}); - describe("diffSubscriptions", () => { test("new subscribes, gone unsubscribes, identical untouched", () => { const live = new Map([ @@ -81,10 +54,3 @@ describe("uidBrokers", () => { expect(map.size).toBe(1); }); }); - -test("beats: routing beats echo, else lower id wins", () => { - const route = active("z", "plain", "b", "t"); - const echo = active("a", "displayEcho", "b", "t"); - expect(beats(route, echo)).toBe(true); - expect(beats(echo, route)).toBe(false); -}); diff --git a/apps/web/src/lib/firmata/cloud/mqtt-subscriptions.ts b/apps/web/src/lib/firmata/cloud/mqtt-subscriptions.ts index e35e621..43cb990 100644 Binary files a/apps/web/src/lib/firmata/cloud/mqtt-subscriptions.ts and b/apps/web/src/lib/firmata/cloud/mqtt-subscriptions.ts differ diff --git a/apps/web/src/lib/firmata/effects-sink.ts b/apps/web/src/lib/firmata/effects-sink.ts index 4632044..0d1050e 100644 --- a/apps/web/src/lib/firmata/effects-sink.ts +++ b/apps/web/src/lib/firmata/effects-sink.ts @@ -1,10 +1,14 @@ // The browser mirror of the core `EffectsSink` + `Effects::apply` (ADR-0008). // // The Rust↔TS boundary means the browser host cannot call into core's -// `Effects::apply`; instead it mirrors the same four-hook shape in the same -// canonical order. The shared *order* — not shared code — is the contract, and -// `__tests__/effects-sink.test.ts` is the browser half of the conformance -// scenario that holds this in lockstep with the Rust `apply_tests`. +// `Effects::apply`; instead it mirrors the same hook shape in the same canonical +// order. The order + the per-field handlers below are both typed exhaustive over +// `keyof Effects`, so a field added to the Rust `Effects` (regenerated into the +// wasm type) is a **compile error** here until it is ordered and handled — the +// browser can no longer silently drop a new field, the way the desktop sink +// can't (its Rust trait gains a required method). `__tests__/effects-sink.test.ts` +// remains the behavioural twin of the Rust `apply_tests`, asserting the order at +// runtime; the types now guarantee coverage so the test can't be the only guard. import type { CloudRequest, Effects } from "@/lib/runtime/wasm"; @@ -15,11 +19,12 @@ export type Wakeup = Effects["wakeups"][number]; export type ComponentEvent = Effects["componentEvents"][number]; /** - * The four platform primitives an effects application drives — the TypeScript - * shape of the Rust `EffectsSink`. The {@link FlowReactor} implements these - * (serial write, `clearTimeout`, `setTimeout`, store ingest); {@link applyEffects} - * sequences them. A new `Effects` field adds a method here, mirroring the - * compile-time new-field guard the Rust trait gives the desktop sink. + * The platform primitives an effects application drives — the TypeScript shape + * of the Rust `EffectsSink`. The {@link FlowReactor} implements these (serial + * write, `clearTimeout`, `setTimeout`, store ingest); {@link applyEffects} + * sequences them. A new field that needs a new primitive adds a method here, via + * the {@link EFFECT_HANDLERS} entry that would reference it — mirroring the + * compile-time new-field guard the Rust `EffectsSink` trait gives the desktop. */ export interface EffectsSink { writeBytes(bytes: number[]): void; @@ -30,17 +35,64 @@ export interface EffectsSink { } /** - * Apply one turn's effects in the **canonical order** (ADR-0008, extended by - * ADR-0009), mirroring the Rust `Effects::apply`: `outboundBytes → cancellations - * → wakeups → cloudRequests → componentEvents`. Bytes first (wire latency), - * cancel-before-arm (so a cancel + re-arm of the same logical timer in one turn - * is safe), cloud calls launched before UI events leave, UI events last (they - * leave the runtime and do not feed back this turn). + * One handler per `Effects` field. Typed `Record`, so adding a + * field to the Rust `Effects` (regenerated into the wasm `Effects` type) is a + * **compile error here** — a missing property — until it is handled, exactly the + * way a new field breaks every Rust `EffectsSink` impl. This is the structural + * guard that was previously only a conformance test: the browser can no longer + * silently drop a new field (ADR-0008/0009). + */ +const EFFECT_HANDLERS: { [K in keyof Effects]: (fx: Effects, sink: EffectsSink) => void } = { + outboundBytes: (fx, sink) => { + if (fx.outboundBytes.length > 0) sink.writeBytes(fx.outboundBytes); + }, + cancellations: (fx, sink) => { + for (const id of fx.cancellations) sink.cancelWakeup(id); + }, + wakeups: (fx, sink) => { + for (const wakeup of fx.wakeups) sink.armWakeup(wakeup); + }, + cloudRequests: (fx, sink) => { + for (const request of fx.cloudRequests) sink.performCloud(request); + }, + componentEvents: (fx, sink) => { + for (const event of fx.componentEvents) sink.dispatchEvent(event); + }, +}; + +/** + * The **canonical order** (ADR-0008, extended by ADR-0009) the fields apply in: + * `outboundBytes → cancellations → wakeups → cloudRequests → componentEvents`. + * Bytes first (wire latency), cancel-before-arm (so a cancel + re-arm of one + * logical timer in a turn is safe), cloud launched before UI events leave, UI + * events last (they leave the runtime and do not feed back this turn). + * + * `satisfies` pins every entry to a real field; {@link AssertOrderIsExhaustive} + * below pins the *reverse* — a new field absent from this tuple fails to compile, + * so the order can never silently lose a field either. + */ +const APPLY_ORDER = [ + "outboundBytes", + "cancellations", + "wakeups", + "cloudRequests", + "componentEvents", +] as const satisfies readonly (keyof Effects)[]; + +/** Errors unless {@link APPLY_ORDER} lists every key of `Effects` (the wrap in a + * 1-tuple stops the conditional distributing, so `never` reads as covered). */ +type AssertOrderIsExhaustive = [Exclude] extends [never] + ? true + : ["Effects field missing from APPLY_ORDER", Exclude]; +const _orderIsExhaustive: AssertOrderIsExhaustive = true; + +/** + * Apply one turn's effects in the canonical order, driving one {@link EFFECT_HANDLERS} + * entry per field. The order lives in {@link APPLY_ORDER}; the per-field work in + * the handlers. Both are exhaustive over `keyof Effects`, so a new field is a + * compile error until it is ordered *and* handled — the browser mirror of the + * Rust `Effects::apply` + `EffectsSink` trait guard. */ export function applyEffects(fx: Effects, sink: EffectsSink): void { - if (fx.outboundBytes.length > 0) sink.writeBytes(fx.outboundBytes); - for (const id of fx.cancellations) sink.cancelWakeup(id); - for (const wakeup of fx.wakeups) sink.armWakeup(wakeup); - for (const request of fx.cloudRequests) sink.performCloud(request); - for (const event of fx.componentEvents) sink.dispatchEvent(event); + for (const field of APPLY_ORDER) EFFECT_HANDLERS[field](fx, sink); } diff --git a/apps/web/src/lib/firmata/flow-reactor.ts b/apps/web/src/lib/firmata/flow-reactor.ts index 7c9f252..d8b1c63 100644 --- a/apps/web/src/lib/firmata/flow-reactor.ts +++ b/apps/web/src/lib/firmata/flow-reactor.ts @@ -9,16 +9,16 @@ // events into the very same UI stores the desktop path feeds. So the canvas // (node values + edge signals) renders identically on both platforms. +import type { EmitOf } from "@/components/flow/nodes/_base/_base.types"; import { applyComponentEvent } from "@/lib/event-ingest"; import { createFlowRuntime, type Effects, type FlowRuntime } from "@/lib/runtime/wasm"; import { performLlmGenerate, type LlmProviderConn } from "./cloud/llm-client"; import { BrokerConnections, type BrokerConn } from "./cloud/mqtt-client"; import { diffSubscriptions, - reconcileDesired, + subKey, uidBrokers, type ActiveSub, - type SubscriberWiring, } from "./cloud/mqtt-subscriptions"; import { applyEffects, @@ -55,15 +55,18 @@ export type CloudDeps = { onMqttMessage?: (topic: string, payload: Uint8Array, nodeId?: string) => void; }; -// The `Llm` node's output handles. ADR-0007 contract: these MUST equal the -// catalog `emits` for `Llm` (`thinking`/`value`/`done`/`error`), which the -// Catalog Parity Guard pins to the Rust `Llm::emits()` / `Llm::E_*` consts. The -// browser host injects results on exactly these handles, mirroring the desktop -// `CloudPerformer`. -const LLM_THINKING = "thinking"; -const LLM_VALUE = "value"; -const LLM_DONE = "done"; -const LLM_ERROR = "error"; +// The `Llm` node's output handles, typed against the catalog's `Llm` emits +// (ADR-0007). `EmitOf<"Llm">` is the literal union the codegen derives from +// node-components.json — the SAME source the Catalog Parity Guard pins the Rust +// `Llm::emits()` / `Llm::E_*` consts to. Annotating each const with it means a +// renamed/removed handle in the catalog makes these assignments fail to compile, +// closing the gap where the browser hard-coded a string the desktop sourced from +// a Rust const. The browser host injects results on exactly these handles, +// mirroring the desktop `CloudPerformer`. +const LLM_THINKING: EmitOf<"Llm"> = "thinking"; +const LLM_VALUE: EmitOf<"Llm"> = "value"; +const LLM_DONE: EmitOf<"Llm"> = "done"; +const LLM_ERROR: EmitOf<"Llm"> = "error"; const now = (): number => typeof performance !== "undefined" ? performance.now() : Date.now(); @@ -252,15 +255,19 @@ export class FlowReactor implements EffectsSink { * Called after every `applyFlow`. */ private reconcileSubscriptions(): void { if (!this.runtime || this.disposed) return; - let wirings: SubscriberWiring[]; + // The collapse + winner-selection is core policy (`reconcile_desired`); the + // wasm binding hands back an already-reconciled desired set, one per topic. + let reconciled: ActiveSub[]; try { - wirings = JSON.parse(this.runtime.subscriberWirings()) as SubscriberWiring[]; + reconciled = JSON.parse(this.runtime.reconcileSubscriptions()) as ActiveSub[]; } catch (error) { - console.error("[flow-reactor] bad subscriberWirings json:", error); + console.error("[flow-reactor] bad reconcileSubscriptions json:", error); return; } - const desired = reconcileDesired(wirings); + const desired = new Map( + reconciled.map((sub) => [subKey(sub.brokerId, sub.topic), sub] as const), + ); const { subscribe, unsubscribe } = diffSubscriptions(desired, this.liveSubs); this.figmaLifecycle(uidBrokers(this.liveSubs.values()), uidBrokers(desired.values())); diff --git a/crates/microflow-core/src/runtime/mod.rs b/crates/microflow-core/src/runtime/mod.rs index fb2bbcb..21435bd 100644 --- a/crates/microflow-core/src/runtime/mod.rs +++ b/crates/microflow-core/src/runtime/mod.rs @@ -20,6 +20,7 @@ pub mod error; pub mod pin_mode; pub mod registry; pub mod router; +pub mod subscriptions; pub mod value; pub mod wiring; @@ -49,6 +50,7 @@ pub use context::{ pub use error::{HardwareError, RuntimeError}; pub use registry::ComponentRegistry; pub use router::{ComponentLookup, DispatchCall, EdgeTarget, FlowRouter}; +pub use subscriptions::{reconcile_desired, DesiredSub, SubKind}; pub use value::{ComponentEvent, ComponentValue, PinConfig}; pub use wiring::{ListenerWiring, SubscriberWiring}; diff --git a/crates/microflow-core/src/runtime/subscriptions.rs b/crates/microflow-core/src/runtime/subscriptions.rs new file mode 100644 index 0000000..ed9f067 --- /dev/null +++ b/crates/microflow-core/src/runtime/subscriptions.rs @@ -0,0 +1,188 @@ +//! Subscription reconciliation policy — shared by both **Runtime Host**s. +//! +//! A flow's subscribe nodes each return a [`SubscriberWiring`]; several can +//! resolve to the same `(broker_id, topic)`, but a broker keeps exactly one +//! callback per topic. Collapsing the wirings to one **desired** subscription per +//! topic — and picking a *deterministic* winner when they collide — is policy +//! BOTH hosts must apply identically, or the desktop and browser would disagree +//! on which node owns a topic. It previously lived in two languages (desktop +//! `commands.rs` `DesiredSub::beats`, browser `mqtt-subscriptions.ts` +//! `beats`/`reconcileDesired`), kept in lockstep only by a comment. This is the +//! single source. +//! +//! Each host still owns the *diff against its own live set* and the broker I/O — +//! those are irreducibly per-platform (`rumqttc` vs `mqtt.js`) and operate on +//! host-local state, so they are not policy this module centralizes. + +use crate::runtime::wiring::SubscriberWiring; +use serde::Serialize; +use std::collections::HashMap; + +/// Which callback shape a subscription drives — the routing identity a broker's +/// single per-topic callback carries. Serializes to the `plain`/`topicAware`/ +/// `displayEcho` strings both hosts use on the wire. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum SubKind { + /// Payload-only delivery routed to a node. + Plain, + /// (topic, payload) delivery routed to a node (Figma). + TopicAware, + /// Payload echoed to the frontend only — no per-node routing. + DisplayEcho, +} + +impl SubKind { + /// The kind a wiring resolves to. + #[must_use] + pub fn of(wiring: &SubscriberWiring) -> Self { + match wiring { + SubscriberWiring::Plain { .. } => SubKind::Plain, + SubscriberWiring::TopicAware { .. } => SubKind::TopicAware, + SubscriberWiring::DisplayEcho { .. } => SubKind::DisplayEcho, + } + } + + #[must_use] + fn is_echo(self) -> bool { + matches!(self, SubKind::DisplayEcho) + } +} + +/// One reconciled subscription: exactly one per `(broker_id, topic)`. Carries the +/// winning node id + kind so a host can tell when a topic's *owner* changed (and +/// must be re-subscribed) versus left untouched. Serializes to the +/// `{ brokerId, topic, nodeId, kind }` shape the browser host consumes via the +/// wasm `reconcileSubscriptions()` binding. +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DesiredSub { + pub broker_id: String, + pub topic: String, + pub node_id: String, + pub kind: SubKind, +} + +impl DesiredSub { + /// Deterministic winner when several nodes resolve to the same + /// `(broker, topic)`. Routing kinds (`Plain`/`TopicAware`) beat `DisplayEcho` + /// so a display-only echo never shadows node delivery; ties break on the + /// lower node id. Determinism (not `HashMap` iteration order) is what keeps + /// the desired set stable across `update_flow`s, so an unchanged flow + /// reconciles to *zero* broker traffic. + #[must_use] + fn beats(&self, other: &DesiredSub) -> bool { + match (self.kind.is_echo(), other.kind.is_echo()) { + (false, true) => true, + (true, false) => false, + _ => self.node_id < other.node_id, + } + } +} + +/// Collapse raw `(node_id, wiring)` pairs to one [`DesiredSub`] per +/// `(broker_id, topic)`, choosing the [`DesiredSub::beats`] winner on collisions. +/// The result is sorted by `(broker_id, topic)`, so it is deterministic across +/// calls regardless of input order — both hosts derive the identical desired set +/// and an unchanged flow diffs to nothing. +#[must_use] +pub fn reconcile_desired(wirings: &[(String, SubscriberWiring)]) -> Vec { + let mut desired: HashMap<(&str, &str), DesiredSub> = HashMap::new(); + for (node_id, wiring) in wirings { + let candidate = DesiredSub { + broker_id: wiring.broker_id().to_string(), + topic: wiring.topic().to_string(), + node_id: node_id.clone(), + kind: SubKind::of(wiring), + }; + let key = (wiring.broker_id(), wiring.topic()); + let wins = match desired.get(&key) { + Some(current) => candidate.beats(current), + None => true, + }; + if wins { + desired.insert(key, candidate); + } + } + let mut out: Vec = desired.into_values().collect(); + out.sort_by(|a, b| (&a.broker_id, &a.topic).cmp(&(&b.broker_id, &b.topic))); + out +} + +#[cfg(test)] +mod tests { + use super::*; + + fn wiring(node: &str, kind: SubKind, broker: &str, topic: &str) -> (String, SubscriberWiring) { + let broker_id = broker.to_string(); + let topic = topic.to_string(); + let w = match kind { + SubKind::Plain => SubscriberWiring::Plain { broker_id, topic }, + SubKind::TopicAware => SubscriberWiring::TopicAware { broker_id, topic }, + SubKind::DisplayEcho => SubscriberWiring::DisplayEcho { broker_id, topic }, + }; + (node.to_string(), w) + } + + fn find<'a>(subs: &'a [DesiredSub], broker: &str, topic: &str) -> &'a DesiredSub { + subs.iter() + .find(|s| s.broker_id == broker && s.topic == topic) + .expect("a desired sub for that (broker, topic)") + } + + #[test] + fn routing_kind_beats_display_echo_on_same_topic() { + // A display-only echo must never shadow a routing wiring on the same topic. + let desired = reconcile_desired(&[ + wiring("zEcho", SubKind::DisplayEcho, "b", "t"), + wiring("aRoute", SubKind::TopicAware, "b", "t"), + ]); + assert_eq!(desired.len(), 1); + let s = find(&desired, "b", "t"); + assert_eq!(s.node_id, "aRoute"); + assert_eq!(s.kind, SubKind::TopicAware); + } + + #[test] + fn ties_break_on_lower_node_id() { + let desired = reconcile_desired(&[ + wiring("n2", SubKind::Plain, "b", "t"), + wiring("n1", SubKind::Plain, "b", "t"), + ]); + assert_eq!(find(&desired, "b", "t").node_id, "n1"); + } + + #[test] + fn distinct_topics_are_each_kept() { + let desired = reconcile_desired(&[ + wiring("n1", SubKind::Plain, "b", "t1"), + wiring("n1", SubKind::Plain, "b", "t2"), + ]); + assert_eq!(desired.len(), 2); + } + + #[test] + fn reconcile_is_deterministic_regardless_of_input_order() { + // The whole point of the tie-break: input order (which is `HashMap` + // iteration order at the call site) must not change the winner, or an + // unchanged flow would churn broker subscriptions. + let a = reconcile_desired(&[ + wiring("n2", SubKind::Plain, "b", "t"), + wiring("n1", SubKind::Plain, "b", "t"), + ]); + let b = reconcile_desired(&[ + wiring("n1", SubKind::Plain, "b", "t"), + wiring("n2", SubKind::Plain, "b", "t"), + ]); + assert_eq!(a, b); + } + + #[test] + fn serializes_to_the_host_wire_shape() { + let desired = reconcile_desired(&[wiring("n1", SubKind::TopicAware, "b", "t")]); + let json = serde_json::to_string(&desired).expect("serialize"); + assert!(json.contains("\"brokerId\":\"b\""), "{json}"); + assert!(json.contains("\"nodeId\":\"n1\""), "{json}"); + assert!(json.contains("\"kind\":\"topicAware\""), "{json}"); + } +} diff --git a/crates/microflow-runtime-wasm/src/lib.rs b/crates/microflow-runtime-wasm/src/lib.rs index 22f98b1..5ebf85b 100644 --- a/crates/microflow-runtime-wasm/src/lib.rs +++ b/crates/microflow-runtime-wasm/src/lib.rs @@ -16,7 +16,7 @@ //! Every entry point returns the turn's `Effects` as JSON, ready to `JSON.parse`. use microflow_core::flow::FlowUpdate; -use microflow_core::runtime::{ComponentValue, Effects, FlowRuntime as CoreRuntime, SubscriberWiring}; +use microflow_core::runtime::{reconcile_desired, ComponentValue, Effects, FlowRuntime as CoreRuntime}; use wasm_bindgen::prelude::*; /// Install a panic hook so a Rust panic surfaces as a readable `console.error`. @@ -157,36 +157,22 @@ impl FlowRuntime { effects_json(&self.inner.inject_event(source, handle, value)) } - /// The active subscribe components' broker wirings, as a JSON array of - /// `{ nodeId, kind, brokerId, topic }` (`kind` ∈ `plain`/`topicAware`/ - /// `displayEcho`). The browser host reconciles these into WSS subscriptions - /// and routes inbound payloads back via [`deliver_message`](FlowRuntime::deliver_message) + /// The active subscribe nodes' broker wirings, **reconciled** to one desired + /// subscription per `(broker_id, topic)` — the deterministic winner-selection + /// policy lives in core ([`reconcile_desired`]), shared with the desktop host + /// so both pick the same owner per topic. Returns a JSON array of + /// `{ brokerId, topic, nodeId, kind }` (`kind` ∈ `plain`/`topicAware`/ + /// `displayEcho`). The browser host diffs this against its live set, (un)subscribes + /// WSS, and routes inbound payloads back via [`deliver_message`](FlowRuntime::deliver_message) /// (the analog of the desktop `flow_update` reply + MQTT manager). /// /// # Errors - /// `JsError` only if the wiring list fails to serialize. - #[wasm_bindgen(js_name = subscriberWirings)] - pub fn subscriber_wirings(&self) -> Result { - let arr: Vec = self - .inner - .collect_subscriber_wirings() - .iter() - .map(|(node_id, wiring)| { - let kind = match wiring { - SubscriberWiring::Plain { .. } => "plain", - SubscriberWiring::TopicAware { .. } => "topicAware", - SubscriberWiring::DisplayEcho { .. } => "displayEcho", - }; - serde_json::json!({ - "nodeId": node_id, - "kind": kind, - "brokerId": wiring.broker_id(), - "topic": wiring.topic(), - }) - }) - .collect(); - serde_json::to_string(&arr) - .map_err(|e| JsError::new(&format!("failed to serialize wirings: {e}"))) + /// `JsError` only if the reconciled list fails to serialize. + #[wasm_bindgen(js_name = reconcileSubscriptions)] + pub fn reconcile_subscriptions(&self) -> Result { + let desired = reconcile_desired(&self.inner.collect_subscriber_wirings()); + serde_json::to_string(&desired) + .map_err(|e| JsError::new(&format!("failed to serialize subscriptions: {e}"))) } /// Deliver an inbound broker payload (MQTT / Figma) to subscribe component @@ -284,9 +270,9 @@ mod tests { } #[test] - fn subscriber_wirings_reports_subscribe_topics() { + fn reconcile_subscriptions_reports_desired_topics() { // An Mqtt subscribe node advertises a `plain` wiring the browser host - // turns into a WSS subscription. + // turns into a WSS subscription — core reconciles it to one desired sub. let mut rt = FlowRuntime::new(); let flow = r#"{ "nodes": [ @@ -295,7 +281,7 @@ mod tests { "edges": [] }"#; rt.update_flow(flow, 0.0).expect("update ok"); - let json = rt.subscriber_wirings().expect("wirings ok"); + let json = rt.reconcile_subscriptions().expect("subscriptions ok"); assert!(json.contains("\"kind\":\"plain\""), "got: {json}"); assert!(json.contains("sensors/x"), "got: {json}"); assert!(json.contains("\"nodeId\":\"m\""), "got: {json}");