feat(storage): replay + ring buffer (PR3 of 4)#110
Draft
sidd-27 wants to merge 13 commits into
Draft
Conversation
Implements the pure-logic core of the bubbaloop storage subsystem from docs/superpowers/specs/2026-05-26-bubbaloop-storage-design.md (spec §17 "PR1"), with no Zenoh dependencies and full unit coverage: - integrity: streaming + one-shot SHA-256, hex/prefix8/base64 helpers, verify (mandatory per-chunk integrity, §11) - recording: manifest.json data model (§4.4) with forward-compatible unknown-field preservation and derived lifecycle/upload state - manifest: atomic write_tmp_then_rename save, load, structural validation (§3.4.5) - profile: profile YAML + full v1 validator (§4.3.1), reserved `pipelines:`->v2 gate (§4.3.2), canonical profile_sha256 (§4.3.3) - secrets: opaque zeroize-on-drop Secret (redacted Debug), secrets.toml chmod 0600 (§4.2) - backend: async StorageBackend trait + LocalFs impl with checksum verification and path-traversal guards; deterministic object-key builders (§8) S3Compat backend, discover/sync/reconcile/replay, CLI/MCP surfaces and the dashboard tab are deferred to later PRs per the spec phasing. Adds sha2, zeroize, async-trait deps. 57 unit tests; clippy-clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Enables building/testing on Apple Silicon (the workspace previously declared only linux-aarch64 and linux-64). No dependency changes. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Implements `storage::reconcile` per spec §3.5 — the user-facing healing primitive that diffs the three sources of truth for a recording (local manifest, local chunk files, remote listing) and converges them. - Applies the §3.5.2 diff matrix row-by-row: uploads missing-remote chunks, re-uploads on remote content mismatch, marks `uploaded_at_ns` when the remote already has a chunk, flags local corruption and chunks missing everywhere, and reports (never touches) orphans. - Optional `restore` re-downloads remote-only chunks, verifying SHA-256 on receipt before writing (§8). - Never deletes anything (§3.5.4) — idempotent and safe to re-run. - Backend-agnostic (`&dyn StorageBackend`) and clock-free (caller passes `now_ns`), so it's fully unit-testable against `LocalFs` as a stand-in remote. Re-uploads the manifest at the end (§3.5.3 step 6). 11 new unit tests (68 storage tests total); clippy-clean within storage. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…quence (PR2) Implements the deterministic core of `storage::sync` per spec §3.4 that the daemon's background queue driver is built from: - BackoffSchedule + SharedBackoff: the shared exponential backoff (§3.4.3) — immediate, then 5s→10s→20s→40s→60s clamp, dead-letter after max_elapsed_time. Backoff is shared across jobs and resets on the first success. - classify(): retryable / terminal / pause-sync error mapping (§3.4.3). Adds Forbidden + NoSuchBucket terminal variants to BackendError for the future S3 backend. - upload_one(): the per-chunk upload sequence (§3.4.4) — re-verify local SHA-256, HEAD-before-PUT idempotency (AlreadyPresent / Conflict / PUT), PUT with the checksum header. - pending_jobs() + mark_chunk_uploaded(): the startup-scan enqueue (§3.4.1 trigger 3) and the atomic manifest mutation recording uploaded_at_ns (§3.4.5). - DeadLetterList: the persisted ~/.bubbaloop/storage/dead_letter.json (§3.4.6) that survives restarts and that reconcile flushes; dedup add + remove + atomic save. The async queue driver (bounded VecDeque + semaphore + the in-process MPSC of ChunkFinalized events) is intentionally deferred — it consumes recorder events that land in a later PR. The decision logic it needs lives here. 12 new unit tests (80 storage tests total); clippy-clean within storage. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…PR2)
Wires the data-plane CLI verbs (spec §5) over the StorageBackend trait, so the
whole flow runs end-to-end against the `local` backend without R2:
- `storage::config` — parses the `[storage]` table of ~/.bubbaloop/config.toml
(§4.1) and a backend factory. `local` builds a LocalFs object store; the cloud
backends (r2/s3/gcs/minio) parse but return BackendNotImplemented until the
S3-compatible backend lands. Secrets are never read here (§4.2).
- `cli/storage.rs` — argh subcommands (NOT clap), output via `log` (never
println!):
- `list [--remote] [--machine] [--tag] [--since]` — scan local manifests (or
remote objects) into a summarised table.
- `info <name>` — pretty-print manifest.json.
- `upload <name>` — flush un-uploaded chunks via sync::upload_one (idempotent).
- `download <name> [--to] [--machine]` — pull + SHA-256-verify each chunk.
- `reconcile <name> [--restore]` — drive storage::reconcile.
- `rm <name> [--remote] [--machine]` — delete local (+ remote objects).
- Registered `Command::Storage` in the binary; pure-local so no Zenoh/daemon.
Recording-control verbs (configure/topics/profile/record/replay) and the cloud
backends are later slices. Collection/formatting logic lives in pure helpers
(`collect_local`, `format_table`, `parse_since`) with 13 new unit tests (93
storage tests total). Smoke-tested end-to-end against a temp HOME: list→upload→
list --remote→download→reconcile --restore→info→rm. clippy-clean within storage.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ficiency) Security: - Validate recording names before they become filesystem paths or object keys. New storage::validate_recording_name (charset [A-Za-z0-9._-], no ./.. /NUL) is enforced inside recording_dir/chunks_dir, closing a path traversal where `storage rm '../../x'` would remove_dir_all an arbitrary directory (and info/download/reconcile reads/writes escaping the sandbox). - Enforce canonical chunk filenames in manifest::validate, so a tampered remote manifest can't smuggle a `../evil` chunk.name into a download write; download also derives the on-disk name from (index, sha256) as defense. Correctness: - `--remote` (list/rm) now refuses a non-remote backend instead of silently operating on the local object store (StorageConfig::is_remote). - run_upload only persists + re-uploads the manifest when ≥1 chunk changed, so an all-failed/all-conflict run can't clobber newer remote manifest state. - reconcile's "present remote, absent local" branch now counts the chunk (marked/verified) instead of mutating the manifest with no report signal. - reconcile marks the recording durably `corrupt` (new Recording.corrupt field, surfaced in `list` status) when a chunk is missing both locally and remotely, instead of only emitting a transient error. - reconcile fetches+hashes the full digest when the backend HEAD returns no checksum, instead of trusting the 8-char key prefix (§8 integrity). - SharedBackoff::on_failure re-anchors on a backward clock step so a hopeless job still dead-letters instead of retrying forever (no saturating_sub=0 trap). - `rm` errors when a name matched nothing (local or remote) instead of a silent exit-0 success on a typo with --remote. Efficiency: - run_upload mutates the manifest in memory and saves once, replacing the per-chunk load+save (O(N) full-manifest rewrites). - collect_remote lists the per-machine prefix when --machine is given rather than enumerating the whole bucket. Adds 5 unit tests (name validation, canonical-name rejection, backward-clock dead-letter, is_remote) and tightens reconcile tests; 98 storage tests pass, clippy-clean. Smoke-tested: traversal/typo/--remote paths rejected, valid local flow intact. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Adds create/list/show/rm over storage::profile (§5): - create builds a Profile from flags and validates it against the v1 schema via profile::save (rejects e.g. ring_buffer without window_secs); --force to overwrite; profile names are run through validate_recording_name. - list scans ~/.bubbaloop/profiles/*.yaml with name/sha256/description. - show prints the on-disk YAML after confirming it parses. - rm deletes the profile (errors if absent). Pure-local, no backend/Zenoh. +1 unit test (split_csv); 99 storage tests pass, clippy-clean. Smoke-tested create/list/show/rm + validation + name guard. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- StorageConfig::save/save_to writes the [storage] table into config.toml, merging via toml::Table so unrelated tables (e.g. [agent]) survive; atomic tmp+rename. - `storage configure --backend ... [--endpoint --region --bucket --retention-days-local --disk-quota-gb --local-path --no-auto-upload]` writes the config; `--access-key-id`/`--secret-access-key` (both required together) write secrets.toml via storage::secrets::save_r2 (chmod 0600). Secrets never touch config.toml. Validates backend kind and requires --bucket for cloud backends. Flag-driven/non-interactive (the wizard will call the same handler). +1 config round-trip test; clippy-clean. Smoke-tested: local/r2 config, secrets split + 0600, [agent] preservation, validation paths. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- storage::discover queries `bubbaloop/**/manifest`, decodes the CBOR node manifests, and groups output topics by machine_id → instance → topic with per-topic liveness (live/idle from still_live && ever_fired), mirroring the `bubbaloop dataflow` query pattern. Zenoh I/O (query_nodes) is thin; the grouping/filtering (group) is pure with --machine / --grep filters. - `storage topics [--machine] [--grep] [-z endpoint] [--timeout-secs]` prints the grouped tree. 4 unit tests on the pure grouping (sort order, machine filter, case-insensitive grep dropping empty nodes). The Zenoh path is compile-checked; it needs a live router + nodes to exercise. 103 storage tests pass, clippy-clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Implements StorageBackend over aws-sdk-s3 for R2/AWS/GCS/MinIO (spec §8): - Explicit static Credentials (never the ambient AWS chain), region (auto for R2), explicit endpoint_url, force_path_style for MinIO. TLS via rustls+ring (aws-smithy-http-client) so the build needs no aws-lc/cmake. - Per-chunk x-amz-checksum-sha256 (base64) on PUT for server-side validation; checksum requested on HEAD/GET so reconcile can compare the full digest. - HEAD→None on not-found; ListObjectsV2 paginated; DeleteObject idempotent. - Error mapping to BackendError per §3.4.3: NoSuchBucket→terminal-pause, AccessDenied/Forbidden→terminal, BadDigest→terminal, NoSuchKey→NotFound, everything else→retryable Io. Optional `s3` cargo feature keeps aws-sdk-s3 out of default builds; the config factory builds S3Compat under the feature (reading creds from secrets.toml) and returns BackendNotImplemented without it. So `bubbaloop storage configure --backend r2 ...` + the existing upload/download/reconcile/rm verbs now reach real R2 when built with --features s3. Verified: `cargo check/test/clippy --features s3` clean (rustls-ring, no cmake); default build unchanged (103 storage tests). Functional R2 round-trips need live credentials (CI/minio) — compile + unit-verified here. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…rface Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Implements the in-tree half of spec §17 PR3: the circular MCAP writer and
sliding-window ring buffer (§3.3.5), and MCAP-based replay into Zenoh (§4.5).
storage::ring_buffer
- `RingBuffer`: pure, IO-free sliding window bounded by BOTH a time window
and a byte cap (256 MiB default OOM safety net), FIFO eviction by log_time,
never drops the sole sample. snapshot() is non-destructive; drain() clears.
- `seal_samples`: the circular MCAP writer — materializes a snapshot into
`chunks/chunk-{idx:06}-{prefix8}.mcap` with rosbag2 wire defaults (786 432-byte
chunks, zstd/fast, CRC on, §3.3.7), dual timestamps, §4.5 channel metadata
(zenoh.encoding / zenoh.topic / bubbaloop.schema_name), streaming SHA-256 with
the prefix embedded at finalize (§3.3.6), rolling a new chunk file at the size
target. Writes manifest.json atomically and returns the assembled Recording.
storage::replay
- `read_recording_messages`: recover messages from a recording's MCAP chunks,
preserving topic/encoding (from channel metadata) and both timestamps.
- `plan` (pure): filter by --topics/--exclude (zenoh key-expr intersection) and
--start-time/--end-time trim, order by the selected timeline (publish_time, or
log_time with --use-log-time), rate-scale offsets, apply --remap.
- `ReplaySink` trait + `run_replay` driver: sleeps between messages to honor the
recorded timing, loops with --loop, cancels promptly via CancellationToken.
CLI: `bubbaloop storage replay <name>` with --rate/--loop/--start-time/
--end-time/--topics/--exclude/--remap/--use-log-time, publishing through a
Zenoh-backed sink (Ctrl-C cancels; the only way to stop --loop).
Deps: add `mcap = "0.24"` (§12). 19 new unit tests (eviction, byte cap,
seal→read round-trip with both timestamps, planner trim/filter/remap/rate,
driver ordering + loop-until-cancel). 122 storage tests pass; clippy-clean.
The `mcap-recorder` marketplace-node relocation (the other half of PR3) lives in
the separate bubbaloop-nodes-official repo, not present in this checkout.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review of d1adc50 surfaced no correctness bugs; these harden replay fidelity, scaling, and shared-mechanism reuse. - Replay back-pressures instead of dropping: ZenohReplaySink now PUTs with CongestionControl::Block (not the put-default Drop), so a slow link/subscriber stalls the (already-paced) replay rather than silently losing samples (§4.5). - Planner pre-parses --topics/--exclude key patterns once instead of re-parsing them per message (was O(messages × patterns) KeyExpr parses in plan()). - run_replay re-checks cancellation after the inter-message sleep, before the publish, so Ctrl-C in that window doesn't emit one extra sample. - Shared atomic write: extract storage::atomic_write (tmp + fsync + rename); manifest::save_atomic and ring_buffer chunk writes both delegate, so the crash-atomicity guarantee lives in one place (§3.4.5). - Warn before replaying a >=1 GiB recording (v1 buffers it fully in RAM). - Document that McapWriteConfig.chunk_size_bytes is an uncompressed-payload threshold (rosbag2 semantics), so on-disk .mcap sizes differ post-zstd. 122 storage tests pass; PR3 + manifest clippy-clean; s3 feature still builds. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This was referenced Jun 18, 2026
Contributor
Author
|
Stacked PR — review in order. Each merges into
Drafts #109–#111 currently show a cumulative diff against the spec branch; each collapses to its own phase automatically once the PR above it merges. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stacked split of the storage subsystem — part 3 of 4. Builds on PR2. Scoped to spec §17 PR3 (in-tree half).
PR3 — replay + ring buffer
ring_buffer(§3.3.5) — a pure, IO-free sliding window bounded by BOTH a time window and a 256 MiB byte cap, FIFO eviction by log_time, never drops the sole sample; non-destructivesnapshot(),drain().seal_samplesis the circular MCAP writer: materializes a snapshot intochunks/chunk-{idx:06}-{prefix8}.mcapwith rosbag2 wire defaults (786 432-byte chunks, zstd/fast, CRC on, §3.3.7), dual timestamps, §4.5 channel metadata (zenoh.encoding/zenoh.topic/bubbaloop.schema_name), streaming SHA-256 finalized into the name (§3.3.6), rolling a new chunk at the size target; atomic manifest write.replay(§4.5) —read_recording_messagesrecovers messages from a recording's MCAP chunks preserving topic/encoding + both timestamps; a pureplan()planner (filter by--topics/--excludekey-expr intersection,--start/--endtrim, timeline order, rate-scale,--remap);ReplaySinktrait +run_replaydriver that paces to recorded timing,--loop, prompt cancel viaCancellationToken.bubbaloop storage replay <name>with--rate/--loop/--start-time/--end-time/--topics/--exclude/--remap/--use-log-time, publishing through a Zenoh sink (Ctrl-C cancels).New dep:
mcap = "0.24"(§12). Themcap-recordermarketplace-node relocation (the other half of PR3) lives in the separatebubbaloop-nodes-officialrepo.Verification
122 storage unit tests; clippy-clean;
s3still builds. Includes PR3 review fixes (replay back-pressure viaCongestionControl::Block, pre-parsed key patterns, cancel-after-sleep, extracted sharedatomic_write, >=1 GiB warning).🤖 Generated with Claude Code