Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions CONTEXT.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ The side-effect record the host executes after one runtime turn. Lives in `crate

## EffectsSink

The typed per-field hook surface a [Runtime Host](#runtime-host) implements to apply one turn's [`Effects`](#effects); the **apply policy** that drives it lives in core, once. `Effects::apply(&self, sink: &mut impl EffectsSink)` (`crates/microflow-core/src/runtime/context.rs`) iterates the fields in the **canonical order** — `outbound_bytes → cancellations → wakeups → cloud_requests → component_events` — calling one hook each: `write_bytes`, `cancel_wakeup`, `arm_wakeup`, `perform_cloud`, `dispatch_event`. Bytes first (wire latency); cancel-before-arm (so a cancel + re-arm of the same logical timer in one turn is safe); cloud calls launched before UI events leave; UI events last (they leave the runtime and do not feed back this turn). Decided in [ADR-0008](docs/adr/0008-effects-apply-policy.md) after the two hosts' inline apply loops had already drifted in order. The platform *primitives* behind each hook stay per-host (desktop: serial flush + Tauri `emit` + Tokio timer; browser: `connection.write` + store ingest + `setTimeout`). The desktop `Actor` calls `Effects::apply` directly; the browser reactor cannot reach into Rust, so it mirrors the same shape in `apps/web/src/lib/firmata/effects-sink.ts` (`applyEffects` + an `EffectsSink` interface `FlowReactor` implements), held in lockstep by a conformance test on both sides (`context::apply_tests` / `__tests__/effects-sink.test.ts`). Adding an `Effects` field adds a hook here — a compile error in every sink until handled (exactly how ADR-0009's `cloud_requests` field forced `perform_cloud`, for [`CloudRequest`](#cloudrequest)s, into the order).
The typed per-field hook surface a [Runtime Host](#runtime-host) implements to apply one turn's [`Effects`](#effects); the **apply policy** that drives it lives in core, once. `Effects::apply(&self, sink: &mut impl EffectsSink)` (`crates/microflow-core/src/runtime/context.rs`) iterates the fields in the **canonical order** — `outbound_bytes → cancellations → wakeups → cloud_requests → component_events` — calling one hook each: `write_bytes`, `cancel_wakeup`, `arm_wakeup`, `perform_cloud`, `dispatch_event`. Bytes first (wire latency); cancel-before-arm (so a cancel + re-arm of the same logical timer in one turn is safe); cloud calls launched before UI events leave; UI events last (they leave the runtime and do not feed back this turn). Decided in [ADR-0008](docs/adr/0008-effects-apply-policy.md) after the two hosts' inline apply loops had already drifted in order. The platform *primitives* behind each hook stay per-host (desktop: serial flush + Tauri `emit` + Tokio timer; browser: `connection.write` + store ingest + `setTimeout`). The desktop `Actor` calls `Effects::apply` directly; the browser reactor cannot reach into Rust, so it mirrors the same shape in `apps/web/src/lib/firmata/effects-sink.ts` (`applyEffects` + an `EffectsSink` interface `FlowReactor` implements). The mirror is held **structurally**, not by a test alone: `applyEffects` drives an `EFFECT_HANDLERS` map and an `APPLY_ORDER` tuple, both typed exhaustive over `keyof Effects`, so a new `Effects` field is a TypeScript compile error (unhandled / unordered) on the browser side just as it is a missing trait method on every Rust sink — the conformance test (`context::apply_tests` / `__tests__/effects-sink.test.ts`) now asserts the runtime *order*, no longer carrying the coverage guarantee alone. Adding an `Effects` field thus forces a hook in core's trait **and** in the browser handlers (exactly how ADR-0009's `cloud_requests` field forced `perform_cloud`, for [`CloudRequest`](#cloudrequest)s, into the order).

## BoardWriter

Expand Down Expand Up @@ -176,8 +176,14 @@ services + the latest-wins LLM task table). Phase 3 added the browser performer:
desktop `HttpLlmProvider`, with latest-wins `AbortController` cancellation) and
re-enters the result through the wasm `injectEvent` binding. `MqttPublish`
(MQTT + Figma) publishes over WSS via `mqtt.js`; inbound subscribe routing comes
back through the wasm `deliverMessage` binding, reconciled from
`subscriberWirings()` on each `applyFlow` (mirroring the desktop `flow_update`).
back through the wasm `deliverMessage` binding. The desired subscription set is
reconciled by core's **`reconcile_desired`** (the shared winner-selection policy:
collapse to one sub per `(broker, topic)`, routing kinds beat display-echo, ties
break on the lower node id — `crates/microflow-core/src/runtime/subscriptions.rs`)
via the wasm `reconcileSubscriptions()` binding on each `applyFlow`; each host then
diffs that set against its own live subscriptions and owns its broker I/O. The
same `reconcile_desired` feeds the desktop `flow_update`, so both hosts pick the
identical owner per topic instead of mirroring the policy in two languages.

## Cloud Node Registration

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 6 additions & 9 deletions apps/web/src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,12 @@ use tokio::sync::Mutex as TokioMutex;
/// subscription so `flow_update` can tell when a topic's *owner* changed (and
/// must be re-subscribed) versus left untouched — the broker holds one callback
/// per topic, so the owner is part of a subscription's identity.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SubKind {
/// Payload-only delivery routed to a component.
Plain,
/// (topic, payload) delivery routed to a component (Figma).
TopicAware,
/// Payload echoed to the frontend only — no per-component routing.
DisplayEcho,
}
///
/// Re-exported from core: the subscription winner-selection policy that consumes
/// this kind ([`microflow_core::runtime::reconcile_desired`]) is the single
/// source shared with the browser host via the wasm `reconcileSubscriptions()`
/// binding, so both hosts pick the same owner per topic.
pub use microflow_core::runtime::SubKind;

/// One active Figma/MQTT subscription. Identity is `(broker_id, topic)`; the
/// `component_id`/`kind` record which wiring currently owns the broker's single
Expand Down
72 changes: 15 additions & 57 deletions apps/web/src-tauri/src/runtime/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::AppState;
use crate::mqtt::broker::BrokerConfig;
use microflow_core::flow::FlowUpdate;
use crate::SubKind;
use microflow_core::runtime::{ComponentValue, SubscriberWiring};
use microflow_core::runtime::{reconcile_desired, ComponentValue, DesiredSub, SubscriberWiring};
use std::collections::HashMap;
use std::sync::Arc;
use tauri::Emitter;
Expand Down Expand Up @@ -42,19 +42,11 @@ fn microflow_uid(topic: &str) -> Option<&str> {
parts.next().filter(|s| !s.is_empty())
}

/// The `SubKind` a wiring resolves to (records which callback shape owns the
/// broker's single per-topic callback).
fn sub_kind(wiring: &SubscriberWiring) -> SubKind {
match wiring {
SubscriberWiring::Plain { .. } => SubKind::Plain,
SubscriberWiring::TopicAware { .. } => SubKind::TopicAware,
SubscriberWiring::DisplayEcho { .. } => SubKind::DisplayEcho,
}
}

/// Map the live/desired subscription set to `uid -> broker_id` over its
/// `microflow/{uid}` topics. Generic over the map value so it serves both the
/// `FigmaSubscription` (live) and `DesiredSub` (desired) maps.
/// `FigmaSubscription` (live) and core [`DesiredSub`] (desired) maps. Host-side:
/// it drives the Figma connect/disconnect lifecycle against this host's own
/// broker pool, so it stays here rather than in the shared reconcile policy.
fn uid_brokers<V>(set: &HashMap<(String, String), V>) -> HashMap<String, String> {
let mut out: HashMap<String, String> = HashMap::new();
for (broker_id, topic) in set.keys() {
Expand All @@ -65,31 +57,6 @@ fn uid_brokers<V>(set: &HashMap<(String, String), V>) -> HashMap<String, String>
out
}

/// One desired subscription, before it is reconciled against the live set.
#[derive(Clone)]
struct DesiredSub {
component_id: String,
kind: SubKind,
}

impl DesiredSub {
/// Deterministic winner when several components resolve to the same
/// `(broker, topic)` — the broker keeps a single callback per topic.
/// Routing kinds (`Plain`/`TopicAware`) win over `DisplayEcho` so a
/// display-only echo never shadows component delivery; ties break on the
/// lower component id. Being deterministic (not dependent on component
/// `HashMap` iteration order) is what keeps the desired set stable across
/// `flow_update`s, so an unchanged flow reconciles to *zero* broker traffic.
fn beats(&self, other: &DesiredSub) -> bool {
let echo = |k: SubKind| matches!(k, SubKind::DisplayEcho);
match (echo(self.kind), echo(other.kind)) {
(false, true) => true,
(true, false) => false,
_ => self.component_id < other.component_id,
}
}
}

/// Update the flow with new nodes and edges
#[tauri::command]
pub async fn flow_update(
Expand Down Expand Up @@ -182,23 +149,14 @@ pub async fn flow_update(
.map_err(|_| "runtime actor dropped the flow_update reply".to_string())?
};

// Desired subscription set, one entry per (broker_id, topic). The broker
// keeps a single callback per topic, so when several components resolve to
// the same topic we pick a deterministic winner (see DesiredSub::beats),
// which also subsumes the old DisplayEcho dedup.
let mut desired: HashMap<(String, String), DesiredSub> = HashMap::new();
for (component_id, wiring) in &component_wirings {
let key = (wiring.broker_id().to_string(), wiring.topic().to_string());
let cand = DesiredSub { component_id: component_id.clone(), kind: sub_kind(wiring) };
desired
.entry(key)
.and_modify(|cur| {
if cand.beats(cur) {
*cur = cand.clone();
}
})
.or_insert(cand);
}
// Desired subscription set, one entry per (broker_id, topic). The collapse +
// deterministic winner-selection is core policy (`reconcile_desired`), shared
// with the browser host so both pick the same owner per topic; here we key it
// by (broker, topic) for the diff against the live set below.
let desired: HashMap<(String, String), DesiredSub> = reconcile_desired(&component_wirings)
.into_iter()
.map(|d| ((d.broker_id.clone(), d.topic.clone()), d))
.collect();

// Snapshot the live subscriptions, then release the lock for the network I/O
// below (re-locked at the end to commit) — matching the original's
Expand Down Expand Up @@ -244,7 +202,7 @@ pub async fn flow_update(
let mut next_live: Vec<crate::FigmaSubscription> = Vec::with_capacity(desired.len());
for ((broker_id, topic), d) in &desired {
if let Some(existing) = live.get(&(broker_id.clone(), topic.clone())) {
if existing.component_id == d.component_id && existing.kind == d.kind {
if existing.component_id == d.node_id && existing.kind == d.kind {
next_live.push(existing.clone());
continue;
}
Expand All @@ -253,12 +211,12 @@ pub async fn flow_update(
if !state.mqtt_manager.is_connected(broker_id).await {
log::warn!(
"[MQTT] Broker {broker_id} not connected, skipping subscription for {}",
d.component_id
d.node_id
);
continue;
}

let component_id = d.component_id.clone();
let component_id = d.node_id.clone();
let callback: Arc<dyn Fn(crate::mqtt::broker::MqttMessage) + Send + Sync> = match d.kind {
SubKind::Plain | SubKind::TopicAware => {
let actor = state.actor.clone();
Expand Down
46 changes: 6 additions & 40 deletions apps/web/src/lib/firmata/__tests__/mqtt-subscriptions.test.ts
Original file line number Diff line number Diff line change
@@ -1,53 +1,26 @@
// Reconcile-logic conformance for the browser MQTT host (ADR-0009 Phase 3),
// mirroring the desktop `flow_update` dedup/diff (commands.rs).
// Host-local subscription diffing for the browser MQTT host (ADR-0009 Phase 3).
//
// The collapse + winner-selection policy (`reconcileDesired`/`beats`) moved to
// core (`microflow-core` `subscriptions.rs` tests) — both hosts share it via the
// wasm `reconcileSubscriptions()` binding. What remains here is the host-local
// diff against this host's live set + the Figma uid lifecycle keys.

import { describe, expect, test } from "bun:test";
import {
beats,
diffSubscriptions,
reconcileDesired,
subKey,
uidBrokers,
type ActiveSub,
type SubKind,
type SubscriberWiring,
} from "../cloud/mqtt-subscriptions";

const wiring = (nodeId: string, kind: SubKind, brokerId: string, topic: string): SubscriberWiring => ({
nodeId,
kind,
brokerId,
topic,
});
const active = (nodeId: string, kind: SubKind, brokerId: string, topic: string): ActiveSub => ({
nodeId,
kind,
brokerId,
topic,
});

describe("reconcileDesired", () => {
test("a routing kind wins over displayEcho on the same (broker, topic)", () => {
const desired = reconcileDesired([
wiring("zEcho", "displayEcho", "b", "t"),
wiring("aRoute", "topicAware", "b", "t"),
]);
expect(desired.size).toBe(1);
expect(desired.get(subKey("b", "t"))?.nodeId).toBe("aRoute");
expect(desired.get(subKey("b", "t"))?.kind).toBe("topicAware");
});

test("ties break on the lower node id", () => {
const desired = reconcileDesired([wiring("n2", "plain", "b", "t"), wiring("n1", "plain", "b", "t")]);
expect(desired.get(subKey("b", "t"))?.nodeId).toBe("n1");
});

test("distinct topics are each kept", () => {
const desired = reconcileDesired([wiring("n1", "plain", "b", "t1"), wiring("n1", "plain", "b", "t2")]);
expect(desired.size).toBe(2);
});
});

describe("diffSubscriptions", () => {
test("new subscribes, gone unsubscribes, identical untouched", () => {
const live = new Map([
Expand Down Expand Up @@ -81,10 +54,3 @@ describe("uidBrokers", () => {
expect(map.size).toBe(1);
});
});

test("beats: routing beats echo, else lower id wins", () => {
const route = active("z", "plain", "b", "t");
const echo = active("a", "displayEcho", "b", "t");
expect(beats(route, echo)).toBe(true);
expect(beats(echo, route)).toBe(false);
});
Binary file modified apps/web/src/lib/firmata/cloud/mqtt-subscriptions.ts
Binary file not shown.
92 changes: 72 additions & 20 deletions apps/web/src/lib/firmata/effects-sink.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// The browser mirror of the core `EffectsSink` + `Effects::apply` (ADR-0008).
//
// The Rust↔TS boundary means the browser host cannot call into core's
// `Effects::apply`; instead it mirrors the same four-hook shape in the same
// canonical order. The shared *order* — not shared code — is the contract, and
// `__tests__/effects-sink.test.ts` is the browser half of the conformance
// scenario that holds this in lockstep with the Rust `apply_tests`.
// `Effects::apply`; instead it mirrors the same hook shape in the same canonical
// order. The order + the per-field handlers below are both typed exhaustive over
// `keyof Effects`, so a field added to the Rust `Effects` (regenerated into the
// wasm type) is a **compile error** here until it is ordered and handled — the
// browser can no longer silently drop a new field, the way the desktop sink
// can't (its Rust trait gains a required method). `__tests__/effects-sink.test.ts`
// remains the behavioural twin of the Rust `apply_tests`, asserting the order at
// runtime; the types now guarantee coverage so the test can't be the only guard.

import type { CloudRequest, Effects } from "@/lib/runtime/wasm";

Expand All @@ -15,11 +19,12 @@ export type Wakeup = Effects["wakeups"][number];
export type ComponentEvent = Effects["componentEvents"][number];

/**
* The four platform primitives an effects application drives — the TypeScript
* shape of the Rust `EffectsSink`. The {@link FlowReactor} implements these
* (serial write, `clearTimeout`, `setTimeout`, store ingest); {@link applyEffects}
* sequences them. A new `Effects` field adds a method here, mirroring the
* compile-time new-field guard the Rust trait gives the desktop sink.
* The platform primitives an effects application drives — the TypeScript shape
* of the Rust `EffectsSink`. The {@link FlowReactor} implements these (serial
* write, `clearTimeout`, `setTimeout`, store ingest); {@link applyEffects}
* sequences them. A new field that needs a new primitive adds a method here, via
* the {@link EFFECT_HANDLERS} entry that would reference it — mirroring the
* compile-time new-field guard the Rust `EffectsSink` trait gives the desktop.
*/
export interface EffectsSink {
writeBytes(bytes: number[]): void;
Expand All @@ -30,17 +35,64 @@ export interface EffectsSink {
}

/**
* Apply one turn's effects in the **canonical order** (ADR-0008, extended by
* ADR-0009), mirroring the Rust `Effects::apply`: `outboundBytes → cancellations
* → wakeups → cloudRequests → componentEvents`. Bytes first (wire latency),
* cancel-before-arm (so a cancel + re-arm of the same logical timer in one turn
* is safe), cloud calls launched before UI events leave, UI events last (they
* leave the runtime and do not feed back this turn).
* One handler per `Effects` field. Typed `Record<keyof Effects, …>`, so adding a
* field to the Rust `Effects` (regenerated into the wasm `Effects` type) is a
* **compile error here** — a missing property — until it is handled, exactly the
* way a new field breaks every Rust `EffectsSink` impl. This is the structural
* guard that was previously only a conformance test: the browser can no longer
* silently drop a new field (ADR-0008/0009).
*/
const EFFECT_HANDLERS: { [K in keyof Effects]: (fx: Effects, sink: EffectsSink) => void } = {
outboundBytes: (fx, sink) => {
if (fx.outboundBytes.length > 0) sink.writeBytes(fx.outboundBytes);
},
cancellations: (fx, sink) => {
for (const id of fx.cancellations) sink.cancelWakeup(id);
},
wakeups: (fx, sink) => {
for (const wakeup of fx.wakeups) sink.armWakeup(wakeup);
},
cloudRequests: (fx, sink) => {
for (const request of fx.cloudRequests) sink.performCloud(request);
},
componentEvents: (fx, sink) => {
for (const event of fx.componentEvents) sink.dispatchEvent(event);
},
};

/**
* The **canonical order** (ADR-0008, extended by ADR-0009) the fields apply in:
* `outboundBytes → cancellations → wakeups → cloudRequests → componentEvents`.
* Bytes first (wire latency), cancel-before-arm (so a cancel + re-arm of one
* logical timer in a turn is safe), cloud launched before UI events leave, UI
* events last (they leave the runtime and do not feed back this turn).
*
* `satisfies` pins every entry to a real field; {@link AssertOrderIsExhaustive}
* below pins the *reverse* — a new field absent from this tuple fails to compile,
* so the order can never silently lose a field either.
*/
const APPLY_ORDER = [
"outboundBytes",
"cancellations",
"wakeups",
"cloudRequests",
"componentEvents",
] as const satisfies readonly (keyof Effects)[];

/** Errors unless {@link APPLY_ORDER} lists every key of `Effects` (the wrap in a
* 1-tuple stops the conditional distributing, so `never` reads as covered). */
type AssertOrderIsExhaustive = [Exclude<keyof Effects, (typeof APPLY_ORDER)[number]>] extends [never]
? true
: ["Effects field missing from APPLY_ORDER", Exclude<keyof Effects, (typeof APPLY_ORDER)[number]>];
const _orderIsExhaustive: AssertOrderIsExhaustive = true;

/**
* Apply one turn's effects in the canonical order, driving one {@link EFFECT_HANDLERS}
* entry per field. The order lives in {@link APPLY_ORDER}; the per-field work in
* the handlers. Both are exhaustive over `keyof Effects`, so a new field is a
* compile error until it is ordered *and* handled — the browser mirror of the
* Rust `Effects::apply` + `EffectsSink` trait guard.
*/
export function applyEffects(fx: Effects, sink: EffectsSink): void {
if (fx.outboundBytes.length > 0) sink.writeBytes(fx.outboundBytes);
for (const id of fx.cancellations) sink.cancelWakeup(id);
for (const wakeup of fx.wakeups) sink.armWakeup(wakeup);
for (const request of fx.cloudRequests) sink.performCloud(request);
for (const event of fx.componentEvents) sink.dispatchEvent(event);
for (const field of APPLY_ORDER) EFFECT_HANDLERS[field](fx, sink);
}
Loading
Loading