From 978d0d746e978b6fd2019f1b1222f04ee9d2cc8d Mon Sep 17 00:00:00 2001 From: Zack White Date: Thu, 28 May 2026 18:17:42 -0600 Subject: [PATCH 1/2] feat: resume workflows from cursor-aware snapshots --- lib/runic/runner.ex | 145 ++++++++++++++++---- lib/runic/runner/store.ex | 26 +++- test/runner/worker_test.exs | 110 +++++++++++++++ test/support/snapshot_store_without_tail.ex | 115 ++++++++++++++++ test/support/snapshot_tail_store.ex | 122 ++++++++++++++++ 5 files changed, 494 insertions(+), 24 deletions(-) create mode 100644 test/support/snapshot_store_without_tail.ex create mode 100644 test/support/snapshot_tail_store.ex diff --git a/lib/runic/runner.ex b/lib/runic/runner.ex index 23a3ca1..a5df727 100644 --- a/lib/runic/runner.ex +++ b/lib/runic/runner.ex @@ -52,6 +52,11 @@ defmodule Runic.Runner do use Supervisor + alias Runic.Workflow + + @snapshot_tag :runic_workflow_snapshot + @snapshot_version 1 + # --- Public API --- def start_link(opts) do @@ -235,11 +240,44 @@ defmodule Runic.Runner do end end + @doc """ + Encodes a workflow snapshot for stores implementing `Runic.Runner.Store`. + + The encoded format is tagged and versioned so `resume/3` can distinguish + Runic workflow snapshots from legacy adapter-specific blobs. + """ + @spec encode_snapshot(Workflow.t()) :: binary() + def encode_snapshot(%Workflow{} = workflow) do + :erlang.term_to_binary({@snapshot_tag, @snapshot_version, workflow}) + end + + @doc """ + Decodes a workflow snapshot produced by `encode_snapshot/1`. + """ + @spec decode_snapshot(binary()) :: + {:ok, Workflow.t()} | {:error, :invalid_snapshot | {:unsupported_snapshot, term()}} + def decode_snapshot(snapshot) when is_binary(snapshot) do + case :erlang.binary_to_term(snapshot) do + {@snapshot_tag, @snapshot_version, %Workflow{} = workflow} -> + {:ok, workflow} + + {@snapshot_tag, version, _workflow} -> + {:error, {:unsupported_snapshot, version}} + + _other -> + {:error, :invalid_snapshot} + end + rescue + ArgumentError -> {:error, :invalid_snapshot} + end + @doc """ Resumes a workflow from persisted state. - Loads the workflow log from the store, rebuilds the workflow via - `Workflow.from_log/1`, and starts a new Worker. + Loads persisted workflow state from the configured store and starts a new + Worker. Event-sourced stores are replayed through `Workflow.from_events/2`. + Stores that implement snapshots plus cursor-aware replay can resume from a + saved workflow snapshot and replay only events after its cursor. ## Options @@ -256,14 +294,11 @@ defmodule Runic.Runner do {store_mod, store_state} = get_store(runner) if Runic.Runner.Store.supports_stream?(store_mod) do - case store_mod.stream(workflow_id, store_state) do - {:ok, event_stream} -> - rehydration = Keyword.get(opts, :rehydration, :full) - store = {store_mod, store_state} - events = Enum.to_list(event_stream) - - {workflow, resolver} = resume_from_events(events, rehydration, store) + rehydration = Keyword.get(opts, :rehydration, :full) + store = {store_mod, store_state} + case resume_from_streaming_store(workflow_id, rehydration, store) do + {:ok, workflow, resolver} -> worker_opts = opts |> Keyword.put(:resumed, true) @@ -283,7 +318,71 @@ defmodule Runic.Runner do end end - defp resume_from_events(events, :full, store) do + defp resume_from_streaming_store(workflow_id, rehydration, {store_mod, store_state} = store) do + if Runic.Runner.Store.supports_snapshots?(store_mod) and + Runic.Runner.Store.supports_stream_after?(store_mod) do + case store_mod.load_snapshot(workflow_id, store_state) do + {:ok, {cursor, snapshot}} -> + resume_from_snapshot(workflow_id, cursor, snapshot, rehydration, store) + + {:error, :not_found} -> + resume_from_full_stream(workflow_id, rehydration, store) + + {:error, _} = error -> + error + end + else + resume_from_full_stream(workflow_id, rehydration, store) + end + end + + defp resume_from_snapshot(workflow_id, cursor, snapshot, rehydration, store) do + case decode_snapshot(snapshot) do + {:ok, %Workflow{} = base_workflow} -> + resume_from_snapshot_tail(workflow_id, cursor, base_workflow, rehydration, store) + + {:error, _reason} -> + resume_from_full_stream(workflow_id, rehydration, store) + end + end + + defp resume_from_snapshot_tail( + workflow_id, + cursor, + %Workflow{} = base_workflow, + rehydration, + {store_mod, store_state} = store + ) do + case store_mod.stream_after(workflow_id, cursor, store_state) do + {:ok, event_stream} -> + events = Enum.to_list(event_stream) + {workflow, resolver} = resume_from_events(events, rehydration, store, base_workflow) + {:ok, workflow, resolver} + + {:error, :not_found} -> + {workflow, resolver} = resume_from_events([], rehydration, store, base_workflow) + {:ok, workflow, resolver} + + {:error, _} = error -> + error + end + end + + defp resume_from_full_stream(workflow_id, rehydration, {store_mod, store_state} = store) do + case store_mod.stream(workflow_id, store_state) do + {:ok, event_stream} -> + events = Enum.to_list(event_stream) + {workflow, resolver} = resume_from_events(events, rehydration, store) + {:ok, workflow, resolver} + + {:error, _} = error -> + error + end + end + + defp resume_from_events(events, rehydration, store, base_workflow \\ nil) + + defp resume_from_events(events, :full, store, base_workflow) do # Check if any FactProduced events have been stripped of values has_stripped = Enum.any?(events, fn @@ -293,35 +392,35 @@ defmodule Runic.Runner do if has_stripped do # Lean replay + resolve all facts to restore full in-memory state - workflow = Runic.Workflow.from_events(events, nil, fact_mode: :ref) + workflow = Workflow.from_events(events, base_workflow, fact_mode: :ref) all_ref_hashes = - for {hash, %Runic.Workflow.FactRef{}} <- workflow.graph.vertices, + for {hash, %Workflow.FactRef{}} <- workflow.graph.vertices, into: MapSet.new(), do: hash - resolver = Runic.Workflow.FactResolver.new(store) + resolver = Workflow.FactResolver.new(store) {workflow, _resolver} = - Runic.Workflow.Rehydration.resolve_hot(workflow, all_ref_hashes, resolver) + Workflow.Rehydration.resolve_hot(workflow, all_ref_hashes, resolver) {workflow, nil} else - {Runic.Workflow.from_events(events), nil} + {Workflow.from_events(events, base_workflow), nil} end end - defp resume_from_events(events, :hybrid, store) do - workflow = Runic.Workflow.from_events(events, nil, fact_mode: :ref) - %{hot: hot} = Runic.Workflow.Rehydration.classify(workflow) - resolver = Runic.Workflow.FactResolver.new(store) - {workflow, resolver} = Runic.Workflow.Rehydration.resolve_hot(workflow, hot, resolver) + defp resume_from_events(events, :hybrid, store, base_workflow) do + workflow = Workflow.from_events(events, base_workflow, fact_mode: :ref) + %{hot: hot} = Workflow.Rehydration.classify(workflow) + resolver = Workflow.FactResolver.new(store) + {workflow, resolver} = Workflow.Rehydration.resolve_hot(workflow, hot, resolver) {workflow, resolver} end - defp resume_from_events(events, :lazy, store) do - workflow = Runic.Workflow.from_events(events, nil, fact_mode: :ref) - {workflow, Runic.Workflow.FactResolver.new(store)} + defp resume_from_events(events, :lazy, store, base_workflow) do + workflow = Workflow.from_events(events, base_workflow, fact_mode: :ref) + {workflow, Workflow.FactResolver.new(store)} end defp resume_from_log(runner, workflow_id, store_mod, store_state, opts) do diff --git a/lib/runic/runner/store.ex b/lib/runic/runner/store.ex index 28c12f6..152be0d 100644 --- a/lib/runic/runner/store.ex +++ b/lib/runic/runner/store.ex @@ -14,6 +14,10 @@ defmodule Runic.Runner.Store do Stores that implement `append/3` and `stream/2` get automatic event-sourced checkpointing and recovery from the Worker. + `stream/2` must always return the full event stream for a workflow. Stores + that support snapshot-accelerated recovery can additionally implement + `stream_after/3` to return only events after a saved snapshot cursor. + ## Legacy Semantics (Snapshot) The `save/3` and `load/2` callbacks persist the full workflow log as a @@ -23,7 +27,7 @@ defmodule Runic.Runner.Store do ## Optional Capabilities - - **Snapshots** (`save_snapshot/4`, `load_snapshot/3`): Point-in-time + - **Snapshots** (`save_snapshot/4`, `load_snapshot/2`): Point-in-time workflow snapshots for faster recovery (replay from snapshot + events after cursor instead of full replay). - **Fact storage** (`save_fact/3`, `load_fact/2`): Content-addressed fact @@ -46,6 +50,8 @@ defmodule Runic.Runner.Store do {:ok, cursor()} | {:error, term()} @callback stream(workflow_id(), state()) :: {:ok, Enumerable.t()} | {:error, :not_found | term()} + @callback stream_after(workflow_id(), cursor(), state()) :: + {:ok, Enumerable.t()} | {:error, :not_found | term()} # Snapshot (optional — faster recovery with stream semantics) @callback save_snapshot(workflow_id(), cursor(), snapshot :: binary(), state()) :: @@ -68,6 +74,7 @@ defmodule Runic.Runner.Store do @optional_callbacks [ append: 3, stream: 2, + stream_after: 3, save_snapshot: 4, load_snapshot: 2, save_fact: 3, @@ -85,4 +92,21 @@ defmodule Runic.Runner.Store do def supports_stream?(store_mod) do function_exported?(store_mod, :append, 3) and function_exported?(store_mod, :stream, 2) end + + @doc """ + Returns true if the store module supports cursor-aware stream replay. + """ + @spec supports_stream_after?(module()) :: boolean() + def supports_stream_after?(store_mod) do + function_exported?(store_mod, :stream_after, 3) + end + + @doc """ + Returns true if the store module supports snapshot save/load semantics. + """ + @spec supports_snapshots?(module()) :: boolean() + def supports_snapshots?(store_mod) do + function_exported?(store_mod, :save_snapshot, 4) and + function_exported?(store_mod, :load_snapshot, 2) + end end diff --git a/test/runner/worker_test.exs b/test/runner/worker_test.exs index bb6833b..aa84b69 100644 --- a/test/runner/worker_test.exs +++ b/test/runner/worker_test.exs @@ -368,6 +368,95 @@ defmodule Runic.Runner.WorkerTest do assert Enum.sort(original_results) == Enum.sort(resumed_results) end + test "resume uses workflow snapshot plus cursor-aware tail stream when available" do + runner = :"test_runner_snapshot_tail_#{System.unique_integer([:positive])}" + + start_supervised!( + {Runic.Runner, name: runner, store: Runic.TestSupport.SnapshotTailStore}, + id: runner + ) + + workflow_id = :wf_snapshot_tail + {snapshot_workflow, cursor, events} = build_snapshot_tail_fixture() + {store_mod, store_state} = Runic.Runner.get_store(runner) + + assert {:ok, _cursor} = store_mod.append(workflow_id, events, store_state) + + assert :ok = + store_mod.save_snapshot( + workflow_id, + cursor, + Runic.Runner.encode_snapshot(snapshot_workflow), + store_state + ) + + assert {:ok, _pid} = Runic.Runner.resume(runner, workflow_id) + assert {:ok, results} = Runic.Runner.get_results(runner, workflow_id) + assert 12 in results + + assert store_mod.call_count(store_state, {:stream_after, workflow_id}) == 1 + assert store_mod.call_count(store_state, {:stream, workflow_id}) == 0 + end + + test "resume ignores snapshots when the store has no cursor-aware tail stream" do + runner = :"test_runner_snapshot_no_tail_#{System.unique_integer([:positive])}" + + start_supervised!( + {Runic.Runner, name: runner, store: Runic.TestSupport.SnapshotStoreWithoutTail}, + id: runner + ) + + workflow_id = :wf_snapshot_no_tail + {snapshot_workflow, cursor, events} = build_snapshot_tail_fixture() + {store_mod, store_state} = Runic.Runner.get_store(runner) + + assert {:ok, _cursor} = store_mod.append(workflow_id, events, store_state) + + assert :ok = + store_mod.save_snapshot( + workflow_id, + cursor, + Runic.Runner.encode_snapshot(snapshot_workflow), + store_state + ) + + assert {:ok, _pid} = Runic.Runner.resume(runner, workflow_id) + assert {:ok, results} = Runic.Runner.get_results(runner, workflow_id) + assert 12 in results + + assert store_mod.call_count(store_state, {:stream, workflow_id}) == 1 + end + + test "resume falls back to full stream when snapshot blob is not a Runic workflow snapshot" do + runner = :"test_runner_invalid_snapshot_#{System.unique_integer([:positive])}" + + start_supervised!( + {Runic.Runner, name: runner, store: Runic.TestSupport.SnapshotTailStore}, + id: runner + ) + + workflow_id = :wf_invalid_snapshot + {_snapshot_workflow, cursor, events} = build_snapshot_tail_fixture() + {store_mod, store_state} = Runic.Runner.get_store(runner) + + assert {:ok, _cursor} = store_mod.append(workflow_id, events, store_state) + + assert :ok = + store_mod.save_snapshot( + workflow_id, + cursor, + :erlang.term_to_binary({:legacy_event_log_snapshot, []}), + store_state + ) + + assert {:ok, _pid} = Runic.Runner.resume(runner, workflow_id) + assert {:ok, results} = Runic.Runner.get_results(runner, workflow_id) + assert 12 in results + + assert store_mod.call_count(store_state, {:stream_after, workflow_id}) == 0 + assert store_mod.call_count(store_state, {:stream, workflow_id}) == 1 + end + test "stop with persist: true (default) saves final state", %{runner: runner} do workflow = build_single_step_workflow() {:ok, _} = Runic.Runner.start_workflow(runner, :wf_persist_stop, workflow) @@ -625,6 +714,27 @@ defmodule Runic.Runner.WorkerTest do Runic.workflow(steps: [step]) end + defp build_snapshot_tail_fixture do + step_a = Runic.step(fn x -> x + 1 end, name: :snapshot_a) + step_b = Runic.step(fn x -> x * 2 end, name: :snapshot_b) + + workflow = + Runic.workflow(steps: [{step_a, [step_b]}]) + |> Workflow.enable_event_emission() + + snapshot_events = Workflow.build_log(workflow) + snapshot_workflow = Workflow.from_events(snapshot_events) + cursor = length(snapshot_events) + + tail_events = + workflow + |> Workflow.react_until_satisfied(5) + |> Map.fetch!(:uncommitted_events) + |> Enum.reverse() + + {snapshot_workflow, cursor, snapshot_events ++ tail_events} + end + defp assert_workflow_idle(runner, workflow_id, timeout \\ 2000) do deadline = System.monotonic_time(:millisecond) + timeout diff --git a/test/support/snapshot_store_without_tail.ex b/test/support/snapshot_store_without_tail.ex new file mode 100644 index 0000000..e45cdaf --- /dev/null +++ b/test/support/snapshot_store_without_tail.ex @@ -0,0 +1,115 @@ +defmodule Runic.TestSupport.SnapshotStoreWithoutTail do + @behaviour Runic.Runner.Store + + @impl Runic.Runner.Store + def init_store(opts) do + runner_name = Keyword.fetch!(opts, :runner_name) + + events_table = Module.concat(runner_name, SnapshotEvents) + counters_table = Module.concat(runner_name, SnapshotCounters) + snapshots_table = Module.concat(runner_name, Snapshots) + calls_table = Module.concat(runner_name, SnapshotCalls) + + ensure_table(events_table, [:ordered_set]) + ensure_table(counters_table, [:set]) + ensure_table(snapshots_table, [:set]) + ensure_table(calls_table, [:set]) + + {:ok, + %{ + events_table: events_table, + counters_table: counters_table, + snapshots_table: snapshots_table, + calls_table: calls_table + }} + end + + @impl Runic.Runner.Store + def save(_workflow_id, _log, _state), do: :ok + + @impl Runic.Runner.Store + def load(_workflow_id, _state), do: {:error, :not_found} + + @impl Runic.Runner.Store + def append(workflow_id, events, %{events_table: events_table, counters_table: counters_table}) + when is_list(events) do + count = length(events) + cursor = :ets.update_counter(counters_table, workflow_id, {2, count}, {workflow_id, 0}) + start_seq = cursor - count + 1 + + events + |> Enum.with_index(start_seq) + |> Enum.each(fn {event, seq} -> + :ets.insert(events_table, {{workflow_id, seq}, event}) + end) + + {:ok, cursor} + end + + @impl Runic.Runner.Store + def stream(workflow_id, %{calls_table: calls_table} = state) do + increment_call(calls_table, {:stream, workflow_id}) + stream_from(workflow_id, 1, state) + end + + @impl Runic.Runner.Store + def save_snapshot(workflow_id, cursor, snapshot, %{snapshots_table: snapshots_table}) + when is_integer(cursor) and is_binary(snapshot) do + :ets.insert(snapshots_table, {workflow_id, cursor, snapshot}) + :ok + end + + @impl Runic.Runner.Store + def load_snapshot(workflow_id, %{snapshots_table: snapshots_table}) do + case :ets.lookup(snapshots_table, workflow_id) do + [{^workflow_id, cursor, snapshot}] -> {:ok, {cursor, snapshot}} + [] -> {:error, :not_found} + end + end + + def call_count(%{calls_table: calls_table}, key) do + case :ets.lookup(calls_table, key) do + [{^key, count}] -> count + [] -> 0 + end + end + + defp stream_from(workflow_id, start_seq, %{ + events_table: events_table, + counters_table: counters_table + }) do + case :ets.lookup(counters_table, workflow_id) do + [] -> + {:error, :not_found} + + [{^workflow_id, count}] -> + stream = + Stream.resource( + fn -> start_seq end, + fn + seq when seq > count -> + {:halt, seq} + + seq -> + case :ets.lookup(events_table, {workflow_id, seq}) do + [{{^workflow_id, ^seq}, event}] -> {[event], seq + 1} + [] -> {:halt, seq} + end + end, + fn _acc -> :ok end + ) + + {:ok, stream} + end + end + + defp increment_call(calls_table, key) do + :ets.update_counter(calls_table, key, {2, 1}, {key, 0}) + end + + defp ensure_table(table, opts) do + if :ets.whereis(table) == :undefined do + :ets.new(table, [:named_table, :public, read_concurrency: true] ++ opts) + end + end +end diff --git a/test/support/snapshot_tail_store.ex b/test/support/snapshot_tail_store.ex new file mode 100644 index 0000000..a6e5dad --- /dev/null +++ b/test/support/snapshot_tail_store.ex @@ -0,0 +1,122 @@ +defmodule Runic.TestSupport.SnapshotTailStore do + @behaviour Runic.Runner.Store + + @impl Runic.Runner.Store + def init_store(opts) do + runner_name = Keyword.fetch!(opts, :runner_name) + + events_table = Module.concat(runner_name, SnapshotTailEvents) + counters_table = Module.concat(runner_name, SnapshotTailCounters) + snapshots_table = Module.concat(runner_name, SnapshotTailSnapshots) + calls_table = Module.concat(runner_name, SnapshotTailCalls) + + ensure_table(events_table, [:ordered_set]) + ensure_table(counters_table, [:set]) + ensure_table(snapshots_table, [:set]) + ensure_table(calls_table, [:set]) + + {:ok, + %{ + events_table: events_table, + counters_table: counters_table, + snapshots_table: snapshots_table, + calls_table: calls_table + }} + end + + @impl Runic.Runner.Store + def save(_workflow_id, _log, _state), do: :ok + + @impl Runic.Runner.Store + def load(_workflow_id, _state), do: {:error, :not_found} + + @impl Runic.Runner.Store + def append(workflow_id, events, %{events_table: events_table, counters_table: counters_table}) + when is_list(events) do + count = length(events) + cursor = :ets.update_counter(counters_table, workflow_id, {2, count}, {workflow_id, 0}) + start_seq = cursor - count + 1 + + events + |> Enum.with_index(start_seq) + |> Enum.each(fn {event, seq} -> + :ets.insert(events_table, {{workflow_id, seq}, event}) + end) + + {:ok, cursor} + end + + @impl Runic.Runner.Store + def stream(workflow_id, %{calls_table: calls_table} = state) do + increment_call(calls_table, {:stream, workflow_id}) + stream_from(workflow_id, 1, state) + end + + @impl Runic.Runner.Store + def stream_after(workflow_id, cursor, %{calls_table: calls_table} = state) + when is_integer(cursor) do + increment_call(calls_table, {:stream_after, workflow_id}) + stream_from(workflow_id, cursor + 1, state) + end + + @impl Runic.Runner.Store + def save_snapshot(workflow_id, cursor, snapshot, %{snapshots_table: snapshots_table}) + when is_integer(cursor) and is_binary(snapshot) do + :ets.insert(snapshots_table, {workflow_id, cursor, snapshot}) + :ok + end + + @impl Runic.Runner.Store + def load_snapshot(workflow_id, %{snapshots_table: snapshots_table}) do + case :ets.lookup(snapshots_table, workflow_id) do + [{^workflow_id, cursor, snapshot}] -> {:ok, {cursor, snapshot}} + [] -> {:error, :not_found} + end + end + + def call_count(%{calls_table: calls_table}, key) do + case :ets.lookup(calls_table, key) do + [{^key, count}] -> count + [] -> 0 + end + end + + defp stream_from(workflow_id, start_seq, %{ + events_table: events_table, + counters_table: counters_table + }) do + case :ets.lookup(counters_table, workflow_id) do + [] -> + {:error, :not_found} + + [{^workflow_id, count}] -> + stream = + Stream.resource( + fn -> start_seq end, + fn + seq when seq > count -> + {:halt, seq} + + seq -> + case :ets.lookup(events_table, {workflow_id, seq}) do + [{{^workflow_id, ^seq}, event}] -> {[event], seq + 1} + [] -> {:halt, seq} + end + end, + fn _acc -> :ok end + ) + + {:ok, stream} + end + end + + defp increment_call(calls_table, key) do + :ets.update_counter(calls_table, key, {2, 1}, {key, 0}) + end + + defp ensure_table(table, opts) do + if :ets.whereis(table) == :undefined do + :ets.new(table, [:named_table, :public, read_concurrency: true] ++ opts) + end + end +end From e0561f6795d1eeba7378a6486329ecb18732b8f0 Mon Sep 17 00:00:00 2001 From: Zack White Date: Fri, 29 May 2026 14:15:21 -0600 Subject: [PATCH 2/2] use stream/3 with opts instead of stream_after/2 for extensibility --- ...t-checkpoint-policy-implementation-plan.md | 322 ++++++++++++++++++ lib/runic/runner.ex | 4 +- lib/runic/runner/store.ex | 34 +- test/runner/worker_test.exs | 4 +- test/support/snapshot_tail_store.ex | 13 +- 5 files changed, 362 insertions(+), 15 deletions(-) create mode 100644 .docs/snapshot-checkpoint-policy-implementation-plan.md diff --git a/.docs/snapshot-checkpoint-policy-implementation-plan.md b/.docs/snapshot-checkpoint-policy-implementation-plan.md new file mode 100644 index 0000000..722ab16 --- /dev/null +++ b/.docs/snapshot-checkpoint-policy-implementation-plan.md @@ -0,0 +1,322 @@ +# Snapshot Checkpoint Policy — Implementation Plan + +**Status:** Draft follow-up +**Depends on:** `Runic.Runner.Store.stream/3`, tagged runner workflow snapshots +**Related:** [Checkpointing Implementation Plan](checkpointing-implementation-plan.md), [Lean Replay Implementation Plan](lean-replay-implementation-plan.md), [Full Breadth Runner Scheduling Considerations](full-breadth-runner-scheduling-considerations.md) +**Goal:** Add an explicit, opt-in Worker policy for writing workflow snapshots during checkpointing so snapshot + event-tail resume is useful without changing the event stream source-of-truth model. + +--- + +## Current Context + +Runic has two persistence mechanisms with different purposes: + +1. **Event checkpoints** are the correctness path. + The Worker drains `workflow.uncommitted_events` into `Store.append/3` for event-sourced stores. `stream/2` remains the complete event stream. + +2. **Workflow snapshots** are an acceleration path. + The Runner can now consume a tagged workflow snapshot via `load_snapshot/2` and replay `stream/3` with `after_cursor: cursor`. This bounds resume work to the event tail after the snapshot. + +The remaining gap is write-side policy: the Worker does not yet decide when to call `save_snapshot/4`. Stores can implement snapshots, but Runic will not automatically create them. + +--- + +## Design Principles + +- **Events remain source of truth.** Snapshots are optional acceleration and compaction aids, not the canonical history. +- **Snapshot writing is explicit policy.** Avoid surprising adapters with large blob writes just because they expose `save_snapshot/4`. +- **Keyword-oriented API.** Match existing Runner/Worker ergonomics: clear keyword options passed to `Runic.Runner.start_workflow/4`. +- **Store capabilities are discovered.** Snapshot policy is inert unless the store implements `save_snapshot/4` and `load_snapshot/2`. +- **Append before snapshot.** Save snapshots only after `append/3` succeeds; use the returned cursor as the snapshot boundary. +- **Tagged snapshots only.** Use `Runic.Runner.encode_snapshot/1` so resume never confuses legacy event-log blobs with workflow-state snapshots. + +--- + +## Proposed Public API + +Add a Worker option: + +```elixir +Runic.Runner.start_workflow(runner, workflow_id, workflow, + checkpoint_strategy: :every_cycle, + snapshot_strategy: {:after_events, 1_000} +) +``` + +### `:snapshot_strategy` + +```elixir +:never +:on_complete +:every_checkpoint +{:every_n_checkpoints, pos_integer()} +{:after_events, pos_integer()} +``` + +Recommended initial default: + +```elixir +snapshot_strategy: :never +``` + +Rationale: this is fully backward compatible and avoids sudden storage growth. Users can opt into snapshots once their store and workload justify it. + +### Optional Snapshot Options + +Start with one small option map/keyword list rather than many top-level options: + +```elixir +snapshot_opts: [ + mode: :full, + keep: :all +] +``` + +Initial supported values: + +```elixir +mode: :full +keep: :all +``` + +Reserved follow-up values: + +```elixir +mode: :lean +keep: {:latest, 1} +keep: {:latest, n} +``` + +Do not implement pruning in the first slice unless it is necessary to prove snapshot creation. Snapshot pruning should be a store-adapter concern or an explicit later callback. + +--- + +## Policy Semantics + +### `:never` + +No Worker-created snapshots. + +This remains the safest default. Event-sourced resume still works via full stream replay. + +### `:on_complete` + +Save a snapshot after the workflow reaches idle/satisfied and final persistence succeeds. + +Useful for completed workflow inspection and restart/reporting without adding mid-run snapshot cost. + +### `:every_checkpoint` + +After each successful event-sourced checkpoint append, save a workflow snapshot at the returned cursor. + +Useful only for low-frequency, expensive workflows. Risky for hot workflows because it serializes the whole workflow graph every checkpoint. + +### `{:every_n_checkpoints, n}` + +Count successful checkpoints and snapshot each nth checkpoint. + +This is easy to reason about and mirrors existing `checkpoint_strategy: {:every_n, n}` without overloading it. + +### `{:after_events, n}` + +Snapshot once `event_cursor - last_snapshot_cursor >= n`. + +This best matches snapshot + WAL compaction semantics. It is the preferred production strategy for long-running workflows because it is tied to log growth, not wall time or cycle count. + +--- + +## Worker State Changes + +Add fields to `Runic.Runner.Worker`: + +```elixir +:snapshot_strategy, +:snapshot_opts, +last_snapshot_cursor: 0, +snapshot_checkpoint_count: 0 +``` + +Initialize from `opts`: + +```elixir +snapshot_strategy: Keyword.get(opts, :snapshot_strategy, :never), +snapshot_opts: Keyword.get(opts, :snapshot_opts, []) +``` + +When resuming from a snapshot, seed `last_snapshot_cursor` from the loaded snapshot cursor. This prevents an immediate duplicate snapshot on first checkpoint after resume. + +Implementation note: `Runic.Runner.resume/3` currently passes `resumed: true` and `resolver`. Add a private worker option such as `snapshot_cursor: cursor` when resume used a valid snapshot. + +--- + +## Checkpoint Flow + +Current event-sourced checkpoint path: + +```elixir +store_mod.append(id, state.uncommitted_events, store_state) +%{state | uncommitted_events: []} +``` + +Target flow: + +```elixir +case store_mod.append(id, state.uncommitted_events, store_state) do + {:ok, cursor} -> + state = + state + |> mark_checkpoint_persisted(cursor) + |> maybe_save_workflow_snapshot(cursor) + + %{state | uncommitted_events: []} + + {:error, reason} -> + # existing error policy / match behavior should remain unchanged initially +end +``` + +`maybe_save_workflow_snapshot/2` should no-op when: + +- `snapshot_strategy == :never` +- the store does not implement snapshot callbacks +- no checkpoint was successfully appended +- policy predicate does not select this cursor/checkpoint + +Snapshot payload: + +```elixir +snapshot = Runic.Runner.encode_snapshot(state.workflow) +store_mod.save_snapshot(state.id, cursor, snapshot, store_state) +``` + +Telemetry should wrap snapshot writes separately from append writes: + +```elixir +Telemetry.store_span(:save_snapshot, %{workflow_id: id}, fn -> + store_mod.save_snapshot(id, cursor, snapshot, store_state) +end) +``` + +--- + +## Resume Flow Adjustments + +The current snapshot-aware resume path should remain: + +```text +load_snapshot/2 +decode tagged workflow snapshot +stream/3 with after_cursor from cursor +Workflow.from_events(tail_events, base_workflow) +``` + +Add one extra output from the resume path: + +```elixir +{:ok, workflow, resolver, snapshot_cursor} +``` + +Use `snapshot_cursor = 0` for full-stream resume. Pass it into Worker opts: + +```elixir +snapshot_cursor: snapshot_cursor +``` + +This keeps Worker snapshot counters consistent after snapshot-based recovery. + +--- + +## Testing Plan + +### Store Contract Tests + +Add tests for the new policy helpers and no-op behavior: + +- stores without snapshot callbacks still run with `snapshot_strategy` configured +- `stream/2` remains full stream +- `stream/3` with `after_cursor:` remains tail-only + +### Worker Integration Tests + +Use a test store that records snapshot writes. + +1. `snapshot_strategy: :never` + - run workflow + - checkpoint + - assert `save_snapshot/4` was not called + +2. `snapshot_strategy: :every_checkpoint` + - run workflow + - assert `save_snapshot/4` was called with cursor returned by `append/3` + - assert snapshot decodes with `Runic.Runner.decode_snapshot/1` + +3. `snapshot_strategy: {:after_events, n}` + - append fewer than `n` events, no snapshot + - append enough events, snapshot at current cursor + +4. `snapshot_strategy: {:every_n_checkpoints, 2}` + - first checkpoint no snapshot + - second checkpoint writes snapshot + +5. Resume from Worker-written snapshot + - run and snapshot a workflow + - stop + - resume + - assert resume uses `stream/3` with `after_cursor:` + - assert results match full replay + +6. Resume seeds `last_snapshot_cursor` + - resume from cursor N + - run a small tail below `{:after_events, threshold}` + - assert no immediate duplicate snapshot + +### Full Regression + +Run: + +```sh +mix format +git diff --check +mix compile +mix test test/runner/worker_test.exs +mix test test/runner/store_ets_test.exs test/runner/store_mnesia_test.exs +mix test +``` + +--- + +## Implementation Steps + +1. Extend Worker struct/options with `snapshot_strategy`, `snapshot_opts`, `last_snapshot_cursor`, and `snapshot_checkpoint_count`. +2. Add private snapshot strategy predicate helpers. +3. Capture `append/3` returned cursor in `do_checkpoint/1`. +4. Add `maybe_save_workflow_snapshot/2`. +5. Pass `snapshot_cursor` from snapshot-aware resume into Worker opts. +6. Add focused tests using a recording snapshot store. +7. Update `Runic.Runner.Store` docs to describe write-side snapshot policy once implemented. + +--- + +## Open Questions + +- Should `snapshot_strategy: :on_complete` run only when the workflow is satisfied, or also when a worker is explicitly stopped with `persist: true`? +- Should built-in ETS and Mnesia implement `save_snapshot/4`, `load_snapshot/2`, and `stream/3` in the same follow-up, or should the first slice stay limited to external stores plus test stores? +- Should snapshot pruning be an adapter responsibility, a future Store callback, or a separate Runner API? +- Should `snapshot_opts: [mode: :lean]` produce a graph with cold `FactRef`s, or should lean snapshots wait until hybrid rehydration has more runtime evidence? + +--- + +## Recommended First Slice + +Implement the minimum useful policy: + +```elixir +snapshot_strategy: :never | :every_checkpoint | {:after_events, n} +snapshot_opts: [mode: :full] +``` + +Keep `:never` as the default. + +Do not implement pruning or lean snapshot mode yet. + +This gives Postgres/SQLite adapters a clear, working compaction path while preserving Runic's event-sourced correctness model and avoiding surprise storage costs. diff --git a/lib/runic/runner.ex b/lib/runic/runner.ex index a5df727..f88314d 100644 --- a/lib/runic/runner.ex +++ b/lib/runic/runner.ex @@ -320,7 +320,7 @@ defmodule Runic.Runner do defp resume_from_streaming_store(workflow_id, rehydration, {store_mod, store_state} = store) do if Runic.Runner.Store.supports_snapshots?(store_mod) and - Runic.Runner.Store.supports_stream_after?(store_mod) do + Runic.Runner.Store.supports_stream_options?(store_mod) do case store_mod.load_snapshot(workflow_id, store_state) do {:ok, {cursor, snapshot}} -> resume_from_snapshot(workflow_id, cursor, snapshot, rehydration, store) @@ -353,7 +353,7 @@ defmodule Runic.Runner do rehydration, {store_mod, store_state} = store ) do - case store_mod.stream_after(workflow_id, cursor, store_state) do + case store_mod.stream(workflow_id, store_state, after_cursor: cursor) do {:ok, event_stream} -> events = Enum.to_list(event_stream) {workflow, resolver} = resume_from_events(events, rehydration, store, base_workflow) diff --git a/lib/runic/runner/store.ex b/lib/runic/runner/store.ex index 152be0d..457dab7 100644 --- a/lib/runic/runner/store.ex +++ b/lib/runic/runner/store.ex @@ -15,8 +15,21 @@ defmodule Runic.Runner.Store do event-sourced checkpointing and recovery from the Worker. `stream/2` must always return the full event stream for a workflow. Stores - that support snapshot-accelerated recovery can additionally implement - `stream_after/3` to return only events after a saved snapshot cursor. + that support cursor-aware or windowed replay can additionally implement + `stream/3` with options. + + Supported `stream/3` options: + + * `:after_cursor` — exclusive lower bound. `after_cursor: 10` returns + events with sequence/cursor greater than `10`. + * `:limit` — optional maximum number of events to return. Adapters may + ignore this when their backing stream does not support bounded reads. + * `:batch_size` — optional page-size hint for stores that fetch event rows + in batches. + + `stream/3` should be a superset of `stream/2`: calling it with an empty + option list should return the full stream. Adapters should ignore unknown + options they do not support. ## Legacy Semantics (Snapshot) @@ -39,6 +52,11 @@ defmodule Runic.Runner.Store do @type cursor :: non_neg_integer() @type log :: [struct()] @type state :: term() + @type stream_opts :: [ + after_cursor: cursor(), + limit: pos_integer(), + batch_size: pos_integer() + ] # Core (required) — snapshot-based @callback init_store(opts :: keyword()) :: {:ok, state()} | {:error, term()} @@ -50,7 +68,7 @@ defmodule Runic.Runner.Store do {:ok, cursor()} | {:error, term()} @callback stream(workflow_id(), state()) :: {:ok, Enumerable.t()} | {:error, :not_found | term()} - @callback stream_after(workflow_id(), cursor(), state()) :: + @callback stream(workflow_id(), state(), stream_opts()) :: {:ok, Enumerable.t()} | {:error, :not_found | term()} # Snapshot (optional — faster recovery with stream semantics) @@ -74,7 +92,7 @@ defmodule Runic.Runner.Store do @optional_callbacks [ append: 3, stream: 2, - stream_after: 3, + stream: 3, save_snapshot: 4, load_snapshot: 2, save_fact: 3, @@ -94,11 +112,11 @@ defmodule Runic.Runner.Store do end @doc """ - Returns true if the store module supports cursor-aware stream replay. + Returns true if the store module supports option-aware stream replay. """ - @spec supports_stream_after?(module()) :: boolean() - def supports_stream_after?(store_mod) do - function_exported?(store_mod, :stream_after, 3) + @spec supports_stream_options?(module()) :: boolean() + def supports_stream_options?(store_mod) do + function_exported?(store_mod, :stream, 3) end @doc """ diff --git a/test/runner/worker_test.exs b/test/runner/worker_test.exs index aa84b69..ad95d28 100644 --- a/test/runner/worker_test.exs +++ b/test/runner/worker_test.exs @@ -394,7 +394,7 @@ defmodule Runic.Runner.WorkerTest do assert {:ok, results} = Runic.Runner.get_results(runner, workflow_id) assert 12 in results - assert store_mod.call_count(store_state, {:stream_after, workflow_id}) == 1 + assert store_mod.call_count(store_state, {:stream, workflow_id, :after_cursor}) == 1 assert store_mod.call_count(store_state, {:stream, workflow_id}) == 0 end @@ -453,7 +453,7 @@ defmodule Runic.Runner.WorkerTest do assert {:ok, results} = Runic.Runner.get_results(runner, workflow_id) assert 12 in results - assert store_mod.call_count(store_state, {:stream_after, workflow_id}) == 0 + assert store_mod.call_count(store_state, {:stream, workflow_id, :after_cursor}) == 0 assert store_mod.call_count(store_state, {:stream, workflow_id}) == 1 end diff --git a/test/support/snapshot_tail_store.ex b/test/support/snapshot_tail_store.ex index a6e5dad..1936c0e 100644 --- a/test/support/snapshot_tail_store.ex +++ b/test/support/snapshot_tail_store.ex @@ -53,9 +53,9 @@ defmodule Runic.TestSupport.SnapshotTailStore do end @impl Runic.Runner.Store - def stream_after(workflow_id, cursor, %{calls_table: calls_table} = state) - when is_integer(cursor) do - increment_call(calls_table, {:stream_after, workflow_id}) + def stream(workflow_id, %{calls_table: calls_table} = state, opts) when is_list(opts) do + cursor = Keyword.get(opts, :after_cursor, 0) + increment_call(calls_table, {:stream, workflow_id, opts_key(opts)}) stream_from(workflow_id, cursor + 1, state) end @@ -114,6 +114,13 @@ defmodule Runic.TestSupport.SnapshotTailStore do :ets.update_counter(calls_table, key, {2, 1}, {key, 0}) end + defp opts_key(opts) do + cond do + Keyword.has_key?(opts, :after_cursor) -> :after_cursor + true -> :opts + end + end + defp ensure_table(table, opts) do if :ets.whereis(table) == :undefined do :ets.new(table, [:named_table, :public, read_concurrency: true] ++ opts)