Skip to content

feat(storage): replay + ring buffer (PR3 of 4)#110

Draft
sidd-27 wants to merge 13 commits into
kornia:feat/bubbaloop-storage-specfrom
sidd-27:storage-pr3-replay-ringbuffer
Draft

feat(storage): replay + ring buffer (PR3 of 4)#110
sidd-27 wants to merge 13 commits into
kornia:feat/bubbaloop-storage-specfrom
sidd-27:storage-pr3-replay-ringbuffer

Conversation

@sidd-27

@sidd-27 sidd-27 commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Stacked split of the storage subsystem — part 3 of 4. Builds on PR2. Scoped to spec §17 PR3 (in-tree half).

PR3 — replay + ring buffer

  • ring_buffer (§3.3.5) — a pure, IO-free sliding window bounded by BOTH a time window and a 256 MiB byte cap, FIFO eviction by log_time, never drops the sole sample; non-destructive snapshot(), drain(). seal_samples is the circular MCAP writer: materializes a snapshot into chunks/chunk-{idx:06}-{prefix8}.mcap with rosbag2 wire defaults (786 432-byte chunks, zstd/fast, CRC on, §3.3.7), dual timestamps, §4.5 channel metadata (zenoh.encoding/zenoh.topic/bubbaloop.schema_name), streaming SHA-256 finalized into the name (§3.3.6), rolling a new chunk at the size target; atomic manifest write.
  • replay (§4.5) — read_recording_messages recovers messages from a recording's MCAP chunks preserving topic/encoding + both timestamps; a pure plan() planner (filter by --topics/--exclude key-expr intersection, --start/--end trim, timeline order, rate-scale, --remap); ReplaySink trait + run_replay driver that paces to recorded timing, --loop, prompt cancel via CancellationToken.
  • CLIbubbaloop storage replay <name> with --rate/--loop/--start-time/--end-time/--topics/--exclude/--remap/--use-log-time, publishing through a Zenoh sink (Ctrl-C cancels).

New dep: mcap = "0.24" (§12). The mcap-recorder marketplace-node relocation (the other half of PR3) lives in the separate bubbaloop-nodes-official repo.

Verification

122 storage unit tests; clippy-clean; s3 still builds. Includes PR3 review fixes (replay back-pressure via CongestionControl::Block, pre-parsed key patterns, cancel-after-sleep, extracted shared atomic_write, >=1 GiB warning).

🤖 Generated with Claude Code

Siddharth Rambhia and others added 13 commits June 14, 2026 22:38
Implements the pure-logic core of the bubbaloop storage subsystem from
docs/superpowers/specs/2026-05-26-bubbaloop-storage-design.md (spec §17 "PR1"),
with no Zenoh dependencies and full unit coverage:

- integrity: streaming + one-shot SHA-256, hex/prefix8/base64 helpers, verify
  (mandatory per-chunk integrity, §11)
- recording: manifest.json data model (§4.4) with forward-compatible
  unknown-field preservation and derived lifecycle/upload state
- manifest: atomic write_tmp_then_rename save, load, structural validation (§3.4.5)
- profile: profile YAML + full v1 validator (§4.3.1), reserved `pipelines:`->v2
  gate (§4.3.2), canonical profile_sha256 (§4.3.3)
- secrets: opaque zeroize-on-drop Secret (redacted Debug), secrets.toml chmod 0600 (§4.2)
- backend: async StorageBackend trait + LocalFs impl with checksum verification
  and path-traversal guards; deterministic object-key builders (§8)

S3Compat backend, discover/sync/reconcile/replay, CLI/MCP surfaces and the
dashboard tab are deferred to later PRs per the spec phasing.

Adds sha2, zeroize, async-trait deps. 57 unit tests; clippy-clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Enables building/testing on Apple Silicon (the workspace previously declared
only linux-aarch64 and linux-64). No dependency changes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Implements `storage::reconcile` per spec §3.5 — the user-facing healing
primitive that diffs the three sources of truth for a recording (local
manifest, local chunk files, remote listing) and converges them.

- Applies the §3.5.2 diff matrix row-by-row: uploads missing-remote
  chunks, re-uploads on remote content mismatch, marks `uploaded_at_ns`
  when the remote already has a chunk, flags local corruption and
  chunks missing everywhere, and reports (never touches) orphans.
- Optional `restore` re-downloads remote-only chunks, verifying SHA-256
  on receipt before writing (§8).
- Never deletes anything (§3.5.4) — idempotent and safe to re-run.
- Backend-agnostic (`&dyn StorageBackend`) and clock-free (caller passes
  `now_ns`), so it's fully unit-testable against `LocalFs` as a stand-in
  remote. Re-uploads the manifest at the end (§3.5.3 step 6).

11 new unit tests (68 storage tests total); clippy-clean within storage.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…quence (PR2)

Implements the deterministic core of `storage::sync` per spec §3.4 that the
daemon's background queue driver is built from:

- BackoffSchedule + SharedBackoff: the shared exponential backoff (§3.4.3) —
  immediate, then 5s→10s→20s→40s→60s clamp, dead-letter after max_elapsed_time.
  Backoff is shared across jobs and resets on the first success.
- classify(): retryable / terminal / pause-sync error mapping (§3.4.3). Adds
  Forbidden + NoSuchBucket terminal variants to BackendError for the future
  S3 backend.
- upload_one(): the per-chunk upload sequence (§3.4.4) — re-verify local
  SHA-256, HEAD-before-PUT idempotency (AlreadyPresent / Conflict / PUT), PUT
  with the checksum header.
- pending_jobs() + mark_chunk_uploaded(): the startup-scan enqueue (§3.4.1
  trigger 3) and the atomic manifest mutation recording uploaded_at_ns (§3.4.5).
- DeadLetterList: the persisted ~/.bubbaloop/storage/dead_letter.json
  (§3.4.6) that survives restarts and that reconcile flushes; dedup add +
  remove + atomic save.

The async queue driver (bounded VecDeque + semaphore + the in-process MPSC of
ChunkFinalized events) is intentionally deferred — it consumes recorder events
that land in a later PR. The decision logic it needs lives here.

12 new unit tests (80 storage tests total); clippy-clean within storage.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…PR2)

Wires the data-plane CLI verbs (spec §5) over the StorageBackend trait, so the
whole flow runs end-to-end against the `local` backend without R2:

- `storage::config` — parses the `[storage]` table of ~/.bubbaloop/config.toml
  (§4.1) and a backend factory. `local` builds a LocalFs object store; the cloud
  backends (r2/s3/gcs/minio) parse but return BackendNotImplemented until the
  S3-compatible backend lands. Secrets are never read here (§4.2).
- `cli/storage.rs` — argh subcommands (NOT clap), output via `log` (never
  println!):
  - `list [--remote] [--machine] [--tag] [--since]` — scan local manifests (or
    remote objects) into a summarised table.
  - `info <name>` — pretty-print manifest.json.
  - `upload <name>` — flush un-uploaded chunks via sync::upload_one (idempotent).
  - `download <name> [--to] [--machine]` — pull + SHA-256-verify each chunk.
  - `reconcile <name> [--restore]` — drive storage::reconcile.
  - `rm <name> [--remote] [--machine]` — delete local (+ remote objects).
- Registered `Command::Storage` in the binary; pure-local so no Zenoh/daemon.

Recording-control verbs (configure/topics/profile/record/replay) and the cloud
backends are later slices. Collection/formatting logic lives in pure helpers
(`collect_local`, `format_table`, `parse_since`) with 13 new unit tests (93
storage tests total). Smoke-tested end-to-end against a temp HOME: list→upload→
list --remote→download→reconcile --restore→info→rm. clippy-clean within storage.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ficiency)

Security:
- Validate recording names before they become filesystem paths or object
  keys. New storage::validate_recording_name (charset [A-Za-z0-9._-], no
  ./.. /NUL) is enforced inside recording_dir/chunks_dir, closing a path
  traversal where `storage rm '../../x'` would remove_dir_all an arbitrary
  directory (and info/download/reconcile reads/writes escaping the sandbox).
- Enforce canonical chunk filenames in manifest::validate, so a tampered
  remote manifest can't smuggle a `../evil` chunk.name into a download write;
  download also derives the on-disk name from (index, sha256) as defense.

Correctness:
- `--remote` (list/rm) now refuses a non-remote backend instead of silently
  operating on the local object store (StorageConfig::is_remote).
- run_upload only persists + re-uploads the manifest when ≥1 chunk changed,
  so an all-failed/all-conflict run can't clobber newer remote manifest state.
- reconcile's "present remote, absent local" branch now counts the chunk
  (marked/verified) instead of mutating the manifest with no report signal.
- reconcile marks the recording durably `corrupt` (new Recording.corrupt
  field, surfaced in `list` status) when a chunk is missing both locally and
  remotely, instead of only emitting a transient error.
- reconcile fetches+hashes the full digest when the backend HEAD returns no
  checksum, instead of trusting the 8-char key prefix (§8 integrity).
- SharedBackoff::on_failure re-anchors on a backward clock step so a hopeless
  job still dead-letters instead of retrying forever (no saturating_sub=0 trap).
- `rm` errors when a name matched nothing (local or remote) instead of a
  silent exit-0 success on a typo with --remote.

Efficiency:
- run_upload mutates the manifest in memory and saves once, replacing the
  per-chunk load+save (O(N) full-manifest rewrites).
- collect_remote lists the per-machine prefix when --machine is given rather
  than enumerating the whole bucket.

Adds 5 unit tests (name validation, canonical-name rejection, backward-clock
dead-letter, is_remote) and tightens reconcile tests; 98 storage tests pass,
clippy-clean. Smoke-tested: traversal/typo/--remote paths rejected, valid
local flow intact.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Adds create/list/show/rm over storage::profile (§5):
- create builds a Profile from flags and validates it against the v1 schema
  via profile::save (rejects e.g. ring_buffer without window_secs); --force to
  overwrite; profile names are run through validate_recording_name.
- list scans ~/.bubbaloop/profiles/*.yaml with name/sha256/description.
- show prints the on-disk YAML after confirming it parses.
- rm deletes the profile (errors if absent).

Pure-local, no backend/Zenoh. +1 unit test (split_csv); 99 storage tests pass,
clippy-clean. Smoke-tested create/list/show/rm + validation + name guard.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- StorageConfig::save/save_to writes the [storage] table into config.toml,
  merging via toml::Table so unrelated tables (e.g. [agent]) survive; atomic
  tmp+rename.
- `storage configure --backend ... [--endpoint --region --bucket
  --retention-days-local --disk-quota-gb --local-path --no-auto-upload]` writes
  the config; `--access-key-id`/`--secret-access-key` (both required together)
  write secrets.toml via storage::secrets::save_r2 (chmod 0600). Secrets never
  touch config.toml. Validates backend kind and requires --bucket for cloud
  backends. Flag-driven/non-interactive (the wizard will call the same handler).

+1 config round-trip test; clippy-clean. Smoke-tested: local/r2 config,
secrets split + 0600, [agent] preservation, validation paths.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- storage::discover queries `bubbaloop/**/manifest`, decodes the CBOR node
  manifests, and groups output topics by machine_id → instance → topic with
  per-topic liveness (live/idle from still_live && ever_fired), mirroring the
  `bubbaloop dataflow` query pattern. Zenoh I/O (query_nodes) is thin; the
  grouping/filtering (group) is pure with --machine / --grep filters.
- `storage topics [--machine] [--grep] [-z endpoint] [--timeout-secs]` prints
  the grouped tree.

4 unit tests on the pure grouping (sort order, machine filter, case-insensitive
grep dropping empty nodes). The Zenoh path is compile-checked; it needs a live
router + nodes to exercise. 103 storage tests pass, clippy-clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Implements StorageBackend over aws-sdk-s3 for R2/AWS/GCS/MinIO (spec §8):
- Explicit static Credentials (never the ambient AWS chain), region (auto for
  R2), explicit endpoint_url, force_path_style for MinIO. TLS via rustls+ring
  (aws-smithy-http-client) so the build needs no aws-lc/cmake.
- Per-chunk x-amz-checksum-sha256 (base64) on PUT for server-side validation;
  checksum requested on HEAD/GET so reconcile can compare the full digest.
- HEAD→None on not-found; ListObjectsV2 paginated; DeleteObject idempotent.
- Error mapping to BackendError per §3.4.3: NoSuchBucket→terminal-pause,
  AccessDenied/Forbidden→terminal, BadDigest→terminal, NoSuchKey→NotFound,
  everything else→retryable Io.

Optional `s3` cargo feature keeps aws-sdk-s3 out of default builds; the config
factory builds S3Compat under the feature (reading creds from secrets.toml) and
returns BackendNotImplemented without it. So `bubbaloop storage configure
--backend r2 ...` + the existing upload/download/reconcile/rm verbs now reach
real R2 when built with --features s3.

Verified: `cargo check/test/clippy --features s3` clean (rustls-ring, no cmake);
default build unchanged (103 storage tests). Functional R2 round-trips need live
credentials (CI/minio) — compile + unit-verified here.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…rface

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Implements the in-tree half of spec §17 PR3: the circular MCAP writer and
sliding-window ring buffer (§3.3.5), and MCAP-based replay into Zenoh (§4.5).

storage::ring_buffer
- `RingBuffer`: pure, IO-free sliding window bounded by BOTH a time window
  and a byte cap (256 MiB default OOM safety net), FIFO eviction by log_time,
  never drops the sole sample. snapshot() is non-destructive; drain() clears.
- `seal_samples`: the circular MCAP writer — materializes a snapshot into
  `chunks/chunk-{idx:06}-{prefix8}.mcap` with rosbag2 wire defaults (786 432-byte
  chunks, zstd/fast, CRC on, §3.3.7), dual timestamps, §4.5 channel metadata
  (zenoh.encoding / zenoh.topic / bubbaloop.schema_name), streaming SHA-256 with
  the prefix embedded at finalize (§3.3.6), rolling a new chunk file at the size
  target. Writes manifest.json atomically and returns the assembled Recording.

storage::replay
- `read_recording_messages`: recover messages from a recording's MCAP chunks,
  preserving topic/encoding (from channel metadata) and both timestamps.
- `plan` (pure): filter by --topics/--exclude (zenoh key-expr intersection) and
  --start-time/--end-time trim, order by the selected timeline (publish_time, or
  log_time with --use-log-time), rate-scale offsets, apply --remap.
- `ReplaySink` trait + `run_replay` driver: sleeps between messages to honor the
  recorded timing, loops with --loop, cancels promptly via CancellationToken.

CLI: `bubbaloop storage replay <name>` with --rate/--loop/--start-time/
--end-time/--topics/--exclude/--remap/--use-log-time, publishing through a
Zenoh-backed sink (Ctrl-C cancels; the only way to stop --loop).

Deps: add `mcap = "0.24"` (§12). 19 new unit tests (eviction, byte cap,
seal→read round-trip with both timestamps, planner trim/filter/remap/rate,
driver ordering + loop-until-cancel). 122 storage tests pass; clippy-clean.

The `mcap-recorder` marketplace-node relocation (the other half of PR3) lives in
the separate bubbaloop-nodes-official repo, not present in this checkout.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review of d1adc50 surfaced no correctness bugs; these harden replay fidelity,
scaling, and shared-mechanism reuse.

- Replay back-pressures instead of dropping: ZenohReplaySink now PUTs with
  CongestionControl::Block (not the put-default Drop), so a slow link/subscriber
  stalls the (already-paced) replay rather than silently losing samples (§4.5).
- Planner pre-parses --topics/--exclude key patterns once instead of re-parsing
  them per message (was O(messages × patterns) KeyExpr parses in plan()).
- run_replay re-checks cancellation after the inter-message sleep, before the
  publish, so Ctrl-C in that window doesn't emit one extra sample.
- Shared atomic write: extract storage::atomic_write (tmp + fsync + rename);
  manifest::save_atomic and ring_buffer chunk writes both delegate, so the
  crash-atomicity guarantee lives in one place (§3.4.5).
- Warn before replaying a >=1 GiB recording (v1 buffers it fully in RAM).
- Document that McapWriteConfig.chunk_size_bytes is an uncompressed-payload
  threshold (rosbag2 semantics), so on-disk .mcap sizes differ post-zstd.

122 storage tests pass; PR3 + manifest clippy-clean; s3 feature still builds.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@sidd-27

sidd-27 commented Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Stacked PR — review in order. Each merges into feat/bubbaloop-storage-spec before the next.

  1. feat(storage): foundation — integrity, manifest, profile, secrets, backend (PR1 of 4) #108 — PR1 foundation
  2. feat(storage): discover / sync / reconcile / CLI (PR2 of 4) #109 — PR2 discover/sync/reconcile/CLI
  3. feat(storage): replay + ring buffer (PR3 of 4) #110 — PR3 replay + ring buffer
  4. feat(storage): sync driver + daemon wiring + MCP parity + docs (PR4 of 4) #111 — PR4 sync driver + daemon + MCP parity

Drafts #109#111 currently show a cumulative diff against the spec branch; each collapses to its own phase automatically once the PR above it merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant