From 3ad2968ae1dd5d19a931e40a83750a33856a745d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20I=C3=B1igo=20Blasco?= Date: Thu, 28 May 2026 18:02:39 +0200 Subject: [PATCH 1/5] refactor(object_store): lazy fetch returns PayloadView; payload slot uses std::any MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The lazy resolver registered with ObjectStore::pushLazy now returns a PayloadView (Span + BufferAnchor) instead of a std::vector. This lets producers hand off bytes they already hold behind a shared_ptr (e.g. a streaming buffer being reused across stores) without copying — the returned anchor extends the buffer's lifetime through the read. For producers whose only payload is a plain std::vector, the new sdk::makePayloadView() helper in pj_base wraps the vector into a shared_ptr (which becomes both owner and type-erased anchor) and returns a PayloadView pointing at its contents. This is also the contract resolveEntry() relies on: it recovers the shared_ptr from the anchor via static_pointer_cast. ObjectEntry::payload now stores a std::any instead of a std::variant. The dispatch in ObjectStore::resolveEntry uses std::any_cast for each branch. Rationale: the variant alternatives are not part of any public discriminator (callers go through pushOwned/pushLazy), so the variant tag was carrying no semantic value — std::any keeps the storage slot flexible without constraining future producer shapes. --- pj_base/include/pj_base/buffer_anchor.hpp | 16 +++++++++++++++ .../include/pj_datastore/object_store.hpp | 18 +++++++++++++---- pj_datastore/src/object_store.cpp | 20 +++++++++++++------ pj_datastore/src/plugin_data_host.cpp | 6 ++++-- pj_datastore/tests/object_store_test.cpp | 8 +++++--- 5 files changed, 53 insertions(+), 15 deletions(-) diff --git a/pj_base/include/pj_base/buffer_anchor.hpp b/pj_base/include/pj_base/buffer_anchor.hpp index ebaa973..bce49e7 100644 --- a/pj_base/include/pj_base/buffer_anchor.hpp +++ b/pj_base/include/pj_base/buffer_anchor.hpp @@ -15,6 +15,8 @@ #include #include +#include +#include #include "pj_base/span.hpp" @@ -49,5 +51,19 @@ struct PayloadView { anchor(std::move(buffer)) {} }; +/// Convenience helper for producers whose only payload is a plain +/// std::vector. Wraps it in a shared_ptr (which becomes both the +/// owner of the bytes and the type-erased anchor), and returns a PayloadView +/// whose Span points at that vector's contents. Satisfies the contract +/// expected by ObjectStore::resolveEntry on the lazy branch: the anchor is a +/// shared_ptr> recoverable via static_pointer_cast. +inline PayloadView makePayloadView(std::vector bytes) { + auto shared = std::make_shared>(std::move(bytes)); + return PayloadView{ + Span{shared->data(), shared->size()}, + BufferAnchor{shared}, + }; +} + } // namespace sdk } // namespace PJ diff --git a/pj_datastore/include/pj_datastore/object_store.hpp b/pj_datastore/include/pj_datastore/object_store.hpp index 8f3c6c0..bbaea23 100644 --- a/pj_datastore/include/pj_datastore/object_store.hpp +++ b/pj_datastore/include/pj_datastore/object_store.hpp @@ -2,6 +2,7 @@ // Copyright 2026 Davide Faconti // SPDX-License-Identifier: MPL-2.0 +#include #include #include #include @@ -13,9 +14,9 @@ #include #include #include -#include #include +#include "pj_base/buffer_anchor.hpp" #include "pj_base/expected.hpp" #include "pj_base/types.hpp" @@ -40,7 +41,10 @@ struct ObjectTopicDescriptor { struct ObjectEntry { Timestamp timestamp = 0; - std::variant>, std::function()>> payload; + // Holds either a shared_ptr> (eager owned payload) + // or a std::function (lazy resolver). resolveEntry + // discriminates via std::any_cast. + std::any payload; }; struct ResolvedObjectEntry { @@ -108,8 +112,14 @@ class ObjectStore { Expected pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector payload); - Expected pushLazy( - ObjectTopicId id, Timestamp timestamp, std::function()> fetch); + // The fetch callable is invoked on every read. It returns a PayloadView + // (Span + anchor). When the producer already holds the bytes in memory + // behind a shared_ptr (e.g. a streaming buffer being handed off between + // stores), the closure can capture that shared_ptr and return a view + // backed by it — no copy. For producers that materialize bytes from disk + // or other sources, the closure allocates a fresh buffer and uses it as + // the anchor. + Expected pushLazy(ObjectTopicId id, Timestamp timestamp, std::function fetch); // --- Read --- diff --git a/pj_datastore/src/object_store.cpp b/pj_datastore/src/object_store.cpp index a75bc88..10795fa 100644 --- a/pj_datastore/src/object_store.cpp +++ b/pj_datastore/src/object_store.cpp @@ -95,7 +95,7 @@ Expected ObjectStore::pushOwned( } Expected ObjectStore::pushLazy( - ObjectTopicId id, Timestamp timestamp, std::function()> fetch) { + ObjectTopicId id, Timestamp timestamp, std::function fetch) { std::shared_lock store_lock(store_mutex_); auto* series = findSeries(id); if (series == nullptr) { @@ -306,11 +306,19 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) { ResolvedObjectEntry resolved; resolved.timestamp = entry.timestamp; - if (const auto* owned = std::get_if>>(&entry.payload)) { + if (const auto* owned = std::any_cast>>(&entry.payload)) { resolved.data = *owned; - } else if (const auto* lazy = std::get_if()>>(&entry.payload)) { - auto bytes = (*lazy)(); - resolved.data = std::make_shared>(std::move(bytes)); + } else if (const auto* lazy = std::any_cast>(&entry.payload)) { + // Recover the shared_ptr> from the PayloadView's + // type-erased anchor. Contract: callers of pushLazy must construct the + // PayloadView with an anchor that is exactly a + // shared_ptr> — typically the same buffer the + // closure's Span points at. The static_pointer_cast reinterprets the + // shared_ptr back to that concrete type, sharing ownership + // without copying bytes. Producers that hold their bytes in any other + // container should use pushOwned instead. + auto pv = (*lazy)(); + resolved.data = std::static_pointer_cast>(pv.anchor); } return resolved; @@ -322,7 +330,7 @@ void ObjectStore::evictFront(ObjectSeries& series) { } const auto& front = series.entries.front(); - if (const auto* owned = std::get_if>>(&front.payload)) { + if (const auto* owned = std::any_cast>>(&front.payload)) { series.memory_bytes -= (*owned)->size(); } diff --git a/pj_datastore/src/plugin_data_host.cpp b/pj_datastore/src/plugin_data_host.cpp index 8aa5f7c..7a0652e 100644 --- a/pj_datastore/src/plugin_data_host.cpp +++ b/pj_datastore/src/plugin_data_host.cpp @@ -1392,7 +1392,9 @@ bool sourceObjectPushLazy( // the lambda; destructor runs exactly once when ObjectStore drops the // entry (retention, evict, removeTopic, clear, or store teardown). auto holder = std::make_shared(fetch_fn, fetch_ctx, fetch_ctx_destroy); - auto closure = [holder]() -> std::vector { return holder->invoke(); }; + // Plugins return raw bytes via the C ABI; wrap them as a PayloadView whose + // anchor is a shared_ptr>, per the pushLazy contract. + auto closure = [holder]() -> sdk::PayloadView { return sdk::makePayloadView(holder->invoke()); }; auto result = impl->store.pushLazy(ObjectTopicId{topic.id}, timestamp_ns, std::move(closure)); if (!result) { impl->setError(result.error()); @@ -1631,7 +1633,7 @@ bool parserObjectPushLazy( } try { auto holder = std::make_shared(fetch_fn, fetch_ctx, fetch_ctx_destroy); - auto closure = [holder]() -> std::vector { return holder->invoke(); }; + auto closure = [holder]() -> sdk::PayloadView { return sdk::makePayloadView(holder->invoke()); }; auto result = impl->store.pushLazy(impl->bound_topic, timestamp_ns, std::move(closure)); if (!result) { impl->setError(result.error()); diff --git a/pj_datastore/tests/object_store_test.cpp b/pj_datastore/tests/object_store_test.cpp index 0edeb2d..0c299c2 100644 --- a/pj_datastore/tests/object_store_test.cpp +++ b/pj_datastore/tests/object_store_test.cpp @@ -281,9 +281,9 @@ TEST(ObjectStoreTest, PushLazyResolves) { ObjectStore store; auto id = registerTestTopic(store); int call_count = 0; - store.pushLazy(id, 100, [&call_count]() -> std::vector { + store.pushLazy(id, 100, [&call_count]() -> sdk::PayloadView { ++call_count; - return {0xDE, 0xAD}; + return sdk::makePayloadView({0xDE, 0xAD}); }); EXPECT_EQ(call_count, 0); @@ -420,7 +420,9 @@ TEST(ObjectStoreTest, LazyEntriesZeroMemory) { ObjectStore store; auto id = registerTestTopic(store); for (int i = 0; i < 10; ++i) { - store.pushLazy(id, static_cast(i) * 100, []() { return makePayload(1000); }); + store.pushLazy(id, static_cast(i) * 100, []() -> sdk::PayloadView { + return sdk::makePayloadView(makePayload(1000)); + }); } EXPECT_EQ(store.memoryUsage(id), 0u); } From 4f6d935833dd6c00544533f059dfee462ac9d265 Mon Sep 17 00:00:00 2001 From: Davide Faconti Date: Thu, 28 May 2026 22:17:22 +0200 Subject: [PATCH 2/5] refactor(object_store): preserve BufferAnchor end-to-end; restore typed variant MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Builds on the prior commit's PayloadView-returning pushLazy signature, but fixes two anchor-discarding bugs and restores a typed payload variant. Bugs in the prior implementation: 1. resolveEntry did static_pointer_cast>(pv.anchor), forcing every producer's anchor to be exactly a shared_ptr. Any other anchor type (chunk cache, mmap, parquet column slice) was UB. This negated the entire point of BufferAnchor = shared_ptr. 2. resolveEntry discarded PayloadView::bytes (the Span) and returned the whole anchor's vector. Producers can no longer publish a sub-range of a larger backing buffer — which is the canonical zero-copy use case (one decompressed MCAP chunk anchored once, many message Spans into it). Both bugs traced to a single root cause: ResolvedObjectEntry::data was still shared_ptr>, so resolution had to materialize a vector somehow. Fixed by making ResolvedObjectEntry carry {BufferAnchor anchor, Span view} — type-erased anchor + producer-published span. No static_pointer_cast anywhere. Variant + named aliases: ObjectEntry::payload returns to std::variant; the prior std::any traded compile-time exhaustiveness for nothing (still two alternatives, still typed access via the cast). Two named aliases capture the two payload shapes at the type level: using SharedBuffer = std::shared_ptr>; using LazyCallback = std::function; Trampolines (plugin_data_host.cpp): ObjectBytesBox now holds {BufferAnchor, Span}; toolboxObjectGetBytes returns view.data()/view.size() — no shared_ptr in the read path. Misc consistency: - pushOwned/pushLazy now return Status (alias for Expected); the one other Expected use at service_registry_builder.hpp also switched. - PayloadView(shared_ptr) constructor takes shared_ptr to match SharedBuffer and makePayloadView. Tests: - All resolved->data accessors migrated to resolved->view / resolved->anchor across object_store_test, plugin_data_host_object_test, plugin_parser_object_write_test. - Two regression tests added: PushLazyPreservesAnchorType — anchor is shared_ptr (not vector). Bug-1 regression: ASAN would catch the prior static cast. PushLazyHonorsSpanSubview — anchor is a 100-byte vector with Span [20,30). Bug-2 regression: verifies view.data()==chunk+20. ./build.sh --debug && ./test.sh → 62/62 passing under ASAN. Co-Authored-By: Claude Opus 4.7 (1M context) --- pj_base/include/pj_base/buffer_anchor.hpp | 10 ++- .../include/pj_datastore/object_store.hpp | 30 ++++++-- pj_datastore/src/object_store.cpp | 28 +++---- pj_datastore/src/plugin_data_host.cpp | 18 +++-- pj_datastore/tests/object_store_test.cpp | 74 +++++++++++++++++-- .../tests/plugin_data_host_object_test.cpp | 13 ++-- .../tests/plugin_parser_object_write_test.cpp | 8 +- .../host/service_registry_builder.hpp | 2 +- 8 files changed, 130 insertions(+), 53 deletions(-) diff --git a/pj_base/include/pj_base/buffer_anchor.hpp b/pj_base/include/pj_base/buffer_anchor.hpp index bce49e7..6f41001 100644 --- a/pj_base/include/pj_base/buffer_anchor.hpp +++ b/pj_base/include/pj_base/buffer_anchor.hpp @@ -46,7 +46,7 @@ struct PayloadView { PayloadView(Span bytes_, BufferAnchor anchor_) : bytes(bytes_), anchor(std::move(anchor_)) {} - PayloadView(std::shared_ptr> buffer) + PayloadView(std::shared_ptr> buffer) : bytes(buffer ? Span(buffer->data(), buffer->size()) : Span()), anchor(std::move(buffer)) {} }; @@ -54,9 +54,11 @@ struct PayloadView { /// Convenience helper for producers whose only payload is a plain /// std::vector. Wraps it in a shared_ptr (which becomes both the /// owner of the bytes and the type-erased anchor), and returns a PayloadView -/// whose Span points at that vector's contents. Satisfies the contract -/// expected by ObjectStore::resolveEntry on the lazy branch: the anchor is a -/// shared_ptr> recoverable via static_pointer_cast. +/// whose Span points at that vector's contents. Use this when the producer +/// materializes bytes from a source that has no natural anchor (e.g. a +/// C-ABI fetch returning a raw byte buffer); when an upstream allocation +/// already exists (a chunk cache, an mmap region), construct the PayloadView +/// directly with that anchor to avoid the helper's copy. inline PayloadView makePayloadView(std::vector bytes) { auto shared = std::make_shared>(std::move(bytes)); return PayloadView{ diff --git a/pj_datastore/include/pj_datastore/object_store.hpp b/pj_datastore/include/pj_datastore/object_store.hpp index bbaea23..0e63282 100644 --- a/pj_datastore/include/pj_datastore/object_store.hpp +++ b/pj_datastore/include/pj_datastore/object_store.hpp @@ -2,7 +2,6 @@ // Copyright 2026 Davide Faconti // SPDX-License-Identifier: MPL-2.0 -#include #include #include #include @@ -14,10 +13,12 @@ #include #include #include +#include #include #include "pj_base/buffer_anchor.hpp" #include "pj_base/expected.hpp" +#include "pj_base/span.hpp" #include "pj_base/types.hpp" namespace PJ { @@ -39,17 +40,30 @@ struct ObjectTopicDescriptor { std::string metadata_json; }; +/// Eager payload alternative: shared ownership of an owned byte buffer. +/// memoryUsage() counts the buffer's size against the retention budget. +using SharedBuffer = std::shared_ptr>; + +/// Lazy payload alternative: idempotent, thread-safe callable returning a +/// PayloadView (Span + BufferAnchor). The anchor may be any shared_ptr-backed +/// storage (chunk cache, mmap, etc.); type erasure is preserved end-to-end +/// through the store. Lazy entries are not counted against the retention +/// budget — bytes are owned upstream, not by the store. +using LazyCallback = std::function; + struct ObjectEntry { Timestamp timestamp = 0; - // Holds either a shared_ptr> (eager owned payload) - // or a std::function (lazy resolver). resolveEntry - // discriminates via std::any_cast. - std::any payload; + std::variant payload; }; +/// Result of resolving an ObjectEntry. Holds the type-erased BufferAnchor +/// keeping the bytes alive and a Span over the producer-published range. +/// For eager entries the anchor wraps the SharedBuffer's underlying vector; +/// for lazy entries it is whatever the producer's PayloadView carries. struct ResolvedObjectEntry { Timestamp timestamp = 0; - std::shared_ptr> data; + sdk::BufferAnchor anchor; + Span view; }; struct RetentionBudget { @@ -110,7 +124,7 @@ class ObjectStore { // --- Write --- - Expected pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector payload); + Status pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector payload); // The fetch callable is invoked on every read. It returns a PayloadView // (Span + anchor). When the producer already holds the bytes in memory @@ -119,7 +133,7 @@ class ObjectStore { // backed by it — no copy. For producers that materialize bytes from disk // or other sources, the closure allocates a fresh buffer and uses it as // the anchor. - Expected pushLazy(ObjectTopicId id, Timestamp timestamp, std::function fetch); + Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch); // --- Read --- diff --git a/pj_datastore/src/object_store.cpp b/pj_datastore/src/object_store.cpp index 10795fa..09ca050 100644 --- a/pj_datastore/src/object_store.cpp +++ b/pj_datastore/src/object_store.cpp @@ -67,8 +67,7 @@ std::vector ObjectStore::listTopics(DatasetId dataset_id) const { // --- Write --- -Expected ObjectStore::pushOwned( - ObjectTopicId id, Timestamp timestamp, std::vector payload) { +Status ObjectStore::pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector payload) { std::shared_lock store_lock(store_mutex_); auto* series = findSeries(id); if (series == nullptr) { @@ -94,8 +93,7 @@ Expected ObjectStore::pushOwned( return {}; } -Expected ObjectStore::pushLazy( - ObjectTopicId id, Timestamp timestamp, std::function fetch) { +Status ObjectStore::pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch) { std::shared_lock store_lock(store_mutex_); auto* series = findSeries(id); if (series == nullptr) { @@ -306,19 +304,15 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) { ResolvedObjectEntry resolved; resolved.timestamp = entry.timestamp; - if (const auto* owned = std::any_cast>>(&entry.payload)) { - resolved.data = *owned; - } else if (const auto* lazy = std::any_cast>(&entry.payload)) { - // Recover the shared_ptr> from the PayloadView's - // type-erased anchor. Contract: callers of pushLazy must construct the - // PayloadView with an anchor that is exactly a - // shared_ptr> — typically the same buffer the - // closure's Span points at. The static_pointer_cast reinterprets the - // shared_ptr back to that concrete type, sharing ownership - // without copying bytes. Producers that hold their bytes in any other - // container should use pushOwned instead. + if (const auto* owned = std::get_if(&entry.payload)) { + if (*owned) { + resolved.anchor = sdk::BufferAnchor{*owned}; + resolved.view = Span{(*owned)->data(), (*owned)->size()}; + } + } else if (const auto* lazy = std::get_if(&entry.payload)) { auto pv = (*lazy)(); - resolved.data = std::static_pointer_cast>(pv.anchor); + resolved.anchor = std::move(pv.anchor); + resolved.view = pv.bytes; } return resolved; @@ -330,7 +324,7 @@ void ObjectStore::evictFront(ObjectSeries& series) { } const auto& front = series.entries.front(); - if (const auto* owned = std::any_cast>>(&front.payload)) { + if (const auto* owned = std::get_if(&front.payload); owned != nullptr && *owned) { series.memory_bytes -= (*owned)->size(); } diff --git a/pj_datastore/src/plugin_data_host.cpp b/pj_datastore/src/plugin_data_host.cpp index 7a0652e..65ffa74 100644 --- a/pj_datastore/src/plugin_data_host.cpp +++ b/pj_datastore/src/plugin_data_host.cpp @@ -1435,10 +1435,12 @@ void sourceObjectSetRetentionBudget( // Toolbox object read host trampolines // --------------------------------------------------------------------------- -/// Box holding the shared_ptr that keeps ObjectStore bytes alive. One -/// allocated per successful read_latest_at; freed by release_bytes. +/// Box holding the type-erased anchor that keeps ObjectStore bytes alive, +/// plus the Span over the producer-published range. One allocated per +/// successful read_latest_at; freed by release_bytes. struct ObjectBytesBox { - std::shared_ptr> bytes; + sdk::BufferAnchor anchor; + Span view; }; PJ_object_topic_handle_t toolboxObjectLookupTopic(void* ctx, PJ_string_view_t topic_name) noexcept { @@ -1508,12 +1510,12 @@ bool toolboxObjectReadLatestAt( *out_handle = nullptr; try { auto entry = impl->store.latestAt(ObjectTopicId{topic.id}, timestamp_ns); - if (!entry.has_value() || entry->data == nullptr) { + if (!entry.has_value() || entry->anchor == nullptr) { impl->setError("no entry at-or-before timestamp"); propagateError(out_error, impl->last_error.c_str()); return false; } - auto* box = new ObjectBytesBox{std::move(entry->data)}; + auto* box = new ObjectBytesBox{std::move(entry->anchor), entry->view}; *out_handle = reinterpret_cast(box); if (out_timestamp != nullptr) { *out_timestamp = entry->timestamp; @@ -1542,14 +1544,14 @@ void toolboxObjectGetBytes(PJ_object_bytes_handle_t handle, const uint8_t** out_ return; } auto* box = reinterpret_cast(handle); - if (!box->bytes) { + if (box->view.empty()) { return; } if (out_data != nullptr) { - *out_data = box->bytes->data(); + *out_data = box->view.data(); } if (out_size != nullptr) { - *out_size = box->bytes->size(); + *out_size = box->view.size(); } } diff --git a/pj_datastore/tests/object_store_test.cpp b/pj_datastore/tests/object_store_test.cpp index 0c299c2..c1997a7 100644 --- a/pj_datastore/tests/object_store_test.cpp +++ b/pj_datastore/tests/object_store_test.cpp @@ -5,8 +5,10 @@ #include +#include #include #include +#include #include #include #include @@ -148,7 +150,7 @@ TEST(ObjectStoreTest, LatestAtExact) { auto r = store.latestAt(id, 200); ASSERT_TRUE(r.has_value()); EXPECT_EQ(r->timestamp, 200); - EXPECT_EQ((*r->data)[0], 0x02); + EXPECT_EQ(r->view[0], 0x02); } TEST(ObjectStoreTest, LatestAtBetween) { @@ -290,8 +292,68 @@ TEST(ObjectStoreTest, PushLazyResolves) { auto r = store.latestAt(id, 100); ASSERT_TRUE(r.has_value()); EXPECT_EQ(call_count, 1); - EXPECT_EQ(r->data->size(), 2u); - EXPECT_EQ((*r->data)[0], 0xDE); + EXPECT_EQ(r->view.size(), 2u); + EXPECT_EQ(r->view[0], 0xDE); +} + +// Regression: the lazy path must preserve the BufferAnchor's concrete type. +// Anchor here is a shared_ptr — NOT a shared_ptr. If +// resolveEntry static_pointer_cast'd to vector (the prior implementation), +// view would point at garbage and ASAN would flag it. We assert that +// (a) the bytes the producer published survive resolution unchanged, and +// (b) the anchor's refcount stays > 0 through the resolve (the store does +// not silently swap it for a different anchor type). +TEST(ObjectStoreTest, PushLazyPreservesAnchorType) { + struct TestBuffer { + std::array bytes{0x11, 0x22, 0x33, 0x44}; + }; + ObjectStore store; + auto id = registerTestTopic(store); + + auto buffer = std::make_shared(); + std::weak_ptr weak_buffer = buffer; + + store.pushLazy(id, 100, [buffer]() -> sdk::PayloadView { + return sdk::PayloadView{ + Span{buffer->bytes.data(), buffer->bytes.size()}, + sdk::BufferAnchor{buffer}, + }; + }); + + auto r = store.latestAt(id, 100); + ASSERT_TRUE(r.has_value()); + ASSERT_EQ(r->view.size(), 4u); + EXPECT_EQ(r->view[0], 0x11); + EXPECT_EQ(r->view[3], 0x44); + EXPECT_FALSE(weak_buffer.expired()); // anchor still holds the buffer alive +} + +// Regression: PayloadView::bytes is the *producer-chosen sub-range* of the +// anchor's storage. resolveEntry must propagate that Span verbatim — not the +// anchor's full extent. The prior implementation ignored the Span and +// returned the anchor's whole vector. +TEST(ObjectStoreTest, PushLazyHonorsSpanSubview) { + ObjectStore store; + auto id = registerTestTopic(store); + + auto chunk = std::make_shared>(100); + for (size_t i = 0; i < chunk->size(); ++i) { + (*chunk)[i] = static_cast(i); + } + + store.pushLazy(id, 100, [chunk]() -> sdk::PayloadView { + return sdk::PayloadView{ + Span{chunk->data() + 20, 10}, // bytes [20, 30) + sdk::BufferAnchor{chunk}, + }; + }); + + auto r = store.latestAt(id, 100); + ASSERT_TRUE(r.has_value()); + EXPECT_EQ(r->view.size(), 10u); + EXPECT_EQ(r->view.data(), chunk->data() + 20); + EXPECT_EQ(r->view[0], 20); + EXPECT_EQ(r->view[9], 29); } // ========================================================================= @@ -328,13 +390,13 @@ TEST(ObjectStoreTest, HandleSurvivesEviction) { auto handle = store.latestAt(id, 100); ASSERT_TRUE(handle.has_value()); - EXPECT_EQ((*handle->data)[0], 0xAA); + EXPECT_EQ(handle->view[0], 0xAA); store.evictBefore(id, 150); EXPECT_EQ(store.entryCount(id), 1u); - EXPECT_EQ(handle->data->size(), 4u); - EXPECT_EQ((*handle->data)[0], 0xAA); + EXPECT_EQ(handle->view.size(), 4u); + EXPECT_EQ(handle->view[0], 0xAA); } // ========================================================================= diff --git a/pj_datastore/tests/plugin_data_host_object_test.cpp b/pj_datastore/tests/plugin_data_host_object_test.cpp index c23f324..e4987d7 100644 --- a/pj_datastore/tests/plugin_data_host_object_test.cpp +++ b/pj_datastore/tests/plugin_data_host_object_test.cpp @@ -3,6 +3,7 @@ #include +#include #include #include #include @@ -63,9 +64,9 @@ TEST(PluginDataHostObjectTest, PushOwnedStoresBytes) { EXPECT_EQ(f.store.entryCount(store_id), 2U); auto resolved = f.store.latestAt(store_id, 2000); ASSERT_TRUE(resolved.has_value()); - ASSERT_NE(resolved->data, nullptr); - EXPECT_EQ(resolved->data->size(), payload.size()); - EXPECT_EQ(*resolved->data, payload); + ASSERT_NE(resolved->anchor, nullptr); + EXPECT_EQ(resolved->view.size(), payload.size()); + EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), payload.begin(), payload.end())); } TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) { @@ -93,8 +94,8 @@ TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) { // Each read invokes the fetch closure. auto first = f.store.latestAt(ObjectTopicId{topic.id}, 42); ASSERT_TRUE(first.has_value()); - ASSERT_NE(first->data, nullptr); - EXPECT_EQ(*first->data, shared->payload); + ASSERT_NE(first->anchor, nullptr); + EXPECT_TRUE(std::equal(first->view.begin(), first->view.end(), shared->payload.begin(), shared->payload.end())); EXPECT_GE(shared->fetch_calls.load(), 1); auto second = f.store.latestAt(ObjectTopicId{topic.id}, 42); @@ -149,7 +150,7 @@ TEST(PluginDataHostObjectTest, PushLazyDestroyCallbackRunsExactlyOnceOnEviction) // Fetch once — the callback runs but the ctx stays alive. auto resolved = f.store.latestAt(ObjectTopicId{topic.id}, 100); ASSERT_TRUE(resolved.has_value()); - EXPECT_EQ(*resolved->data, ctx->payload); + EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), ctx->payload.begin(), ctx->payload.end())); EXPECT_EQ(ctx->destroy_count.load(), 0); // Evict — destroy_fn runs exactly once. diff --git a/pj_datastore/tests/plugin_parser_object_write_test.cpp b/pj_datastore/tests/plugin_parser_object_write_test.cpp index 2f0e135..d856b5a 100644 --- a/pj_datastore/tests/plugin_parser_object_write_test.cpp +++ b/pj_datastore/tests/plugin_parser_object_write_test.cpp @@ -9,6 +9,7 @@ #include +#include #include #include #include @@ -140,9 +141,9 @@ TEST(ParserObjectWriteHostTest, ParserWritesToBothHostsFromOneParse) { // Object-store side: bytes landed. auto resolved = store.latestAt(ObjectTopicId{obj_topic.id}, 100); ASSERT_TRUE(resolved.has_value()); - ASSERT_NE(resolved->data, nullptr); + ASSERT_NE(resolved->anchor, nullptr); const std::vector expected{0xAA, 0xBB, 0xCC}; - EXPECT_EQ(*resolved->data, expected); + EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), expected.begin(), expected.end())); // (Scalar side requires flushing + a read path; Phase-3 scope is proving // both hosts were resolved and invoked. Scalar writes go into DataEngine @@ -202,7 +203,8 @@ TEST(ParserObjectWriteHostTest, ObjectHostViewPushLazyThroughSdk) { auto resolved = store.latestAt(ObjectTopicId{topic.id}, 10); ASSERT_TRUE(resolved.has_value()); - EXPECT_EQ(*resolved->data, (std::vector{0xAA, 0xBB})); + const std::vector expected{0xAA, 0xBB}; + EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), expected.begin(), expected.end())); EXPECT_GE(fetch_calls, 1); } diff --git a/pj_plugins/include/pj_plugins/host/service_registry_builder.hpp b/pj_plugins/include/pj_plugins/host/service_registry_builder.hpp index d372ed1..e27ceca 100644 --- a/pj_plugins/include/pj_plugins/host/service_registry_builder.hpp +++ b/pj_plugins/include/pj_plugins/host/service_registry_builder.hpp @@ -43,7 +43,7 @@ class ServiceRegistryBuilder { /// not take ownership of ctx/vtable. /// @return `Expected`: ok on success, error string on duplicate or /// null fat-pointer field. - [[nodiscard]] ::PJ::Expected tryRegisterService( + [[nodiscard]] ::PJ::Status tryRegisterService( std::string_view name, uint32_t protocol_version, PJ_service_t service) { const std::string service_name(name); if (service.ctx == nullptr || service.vtable == nullptr) { From a4127b003412c0274f507c524db896a5cd07b6f3 Mon Sep 17 00:00:00 2001 From: Davide Faconti Date: Thu, 28 May 2026 22:26:40 +0200 Subject: [PATCH 3/5] refactor(object_store): consolidate ResolvedObjectEntry/ObjectBytesBox on PayloadView MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ResolvedObjectEntry and ObjectBytesBox both held an inline {anchor, view} pair — the same shape as sdk::PayloadView. Collapse both onto a single PayloadView field; one named point of truth for "bytes + their lifetime anchor" across producer, store, and consumer-handle layers. Also tightens the doc comments touched in the prior commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- pj_base/include/pj_base/buffer_anchor.hpp | 12 ++---- .../include/pj_datastore/object_store.hpp | 27 ++++-------- pj_datastore/src/object_store.cpp | 10 ++--- pj_datastore/src/plugin_data_host.cpp | 18 ++++---- pj_datastore/tests/object_store_test.cpp | 42 ++++++++----------- .../tests/plugin_data_host_object_test.cpp | 17 +++++--- .../tests/plugin_parser_object_write_test.cpp | 8 ++-- 7 files changed, 58 insertions(+), 76 deletions(-) diff --git a/pj_base/include/pj_base/buffer_anchor.hpp b/pj_base/include/pj_base/buffer_anchor.hpp index 6f41001..17441d0 100644 --- a/pj_base/include/pj_base/buffer_anchor.hpp +++ b/pj_base/include/pj_base/buffer_anchor.hpp @@ -51,14 +51,10 @@ struct PayloadView { anchor(std::move(buffer)) {} }; -/// Convenience helper for producers whose only payload is a plain -/// std::vector. Wraps it in a shared_ptr (which becomes both the -/// owner of the bytes and the type-erased anchor), and returns a PayloadView -/// whose Span points at that vector's contents. Use this when the producer -/// materializes bytes from a source that has no natural anchor (e.g. a -/// C-ABI fetch returning a raw byte buffer); when an upstream allocation -/// already exists (a chunk cache, an mmap region), construct the PayloadView -/// directly with that anchor to avoid the helper's copy. +/// Wrap a fresh vector as both anchor and Span. Use when the producer has +/// no natural anchor (e.g. raw bytes from a C-ABI fetch); when an upstream +/// allocation already exists, construct PayloadView directly to avoid this +/// helper's copy. inline PayloadView makePayloadView(std::vector bytes) { auto shared = std::make_shared>(std::move(bytes)); return PayloadView{ diff --git a/pj_datastore/include/pj_datastore/object_store.hpp b/pj_datastore/include/pj_datastore/object_store.hpp index 0e63282..987996a 100644 --- a/pj_datastore/include/pj_datastore/object_store.hpp +++ b/pj_datastore/include/pj_datastore/object_store.hpp @@ -40,15 +40,11 @@ struct ObjectTopicDescriptor { std::string metadata_json; }; -/// Eager payload alternative: shared ownership of an owned byte buffer. -/// memoryUsage() counts the buffer's size against the retention budget. +/// Eager payload: store-owned bytes, counted against the retention budget. using SharedBuffer = std::shared_ptr>; -/// Lazy payload alternative: idempotent, thread-safe callable returning a -/// PayloadView (Span + BufferAnchor). The anchor may be any shared_ptr-backed -/// storage (chunk cache, mmap, etc.); type erasure is preserved end-to-end -/// through the store. Lazy entries are not counted against the retention -/// budget — bytes are owned upstream, not by the store. +/// Lazy payload: idempotent, thread-safe fetcher returning bytes + anchor. +/// Invoked on every read; bytes are not counted against the retention budget. using LazyCallback = std::function; struct ObjectEntry { @@ -56,14 +52,9 @@ struct ObjectEntry { std::variant payload; }; -/// Result of resolving an ObjectEntry. Holds the type-erased BufferAnchor -/// keeping the bytes alive and a Span over the producer-published range. -/// For eager entries the anchor wraps the SharedBuffer's underlying vector; -/// for lazy entries it is whatever the producer's PayloadView carries. struct ResolvedObjectEntry { Timestamp timestamp = 0; - sdk::BufferAnchor anchor; - Span view; + sdk::PayloadView payload; }; struct RetentionBudget { @@ -126,13 +117,9 @@ class ObjectStore { Status pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector payload); - // The fetch callable is invoked on every read. It returns a PayloadView - // (Span + anchor). When the producer already holds the bytes in memory - // behind a shared_ptr (e.g. a streaming buffer being handed off between - // stores), the closure can capture that shared_ptr and return a view - // backed by it — no copy. For producers that materialize bytes from disk - // or other sources, the closure allocates a fresh buffer and uses it as - // the anchor. + // Fetcher runs on every read. Producers anchor on whatever owns the bytes + // (chunk cache, mmap, fresh allocation); the store never copies — it just + // retains the anchor through PayloadView. Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch); // --- Read --- diff --git a/pj_datastore/src/object_store.cpp b/pj_datastore/src/object_store.cpp index 09ca050..2cae153 100644 --- a/pj_datastore/src/object_store.cpp +++ b/pj_datastore/src/object_store.cpp @@ -306,13 +306,13 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) { if (const auto* owned = std::get_if(&entry.payload)) { if (*owned) { - resolved.anchor = sdk::BufferAnchor{*owned}; - resolved.view = Span{(*owned)->data(), (*owned)->size()}; + resolved.payload = sdk::PayloadView{ + Span{(*owned)->data(), (*owned)->size()}, + sdk::BufferAnchor{*owned}, + }; } } else if (const auto* lazy = std::get_if(&entry.payload)) { - auto pv = (*lazy)(); - resolved.anchor = std::move(pv.anchor); - resolved.view = pv.bytes; + resolved.payload = (*lazy)(); } return resolved; diff --git a/pj_datastore/src/plugin_data_host.cpp b/pj_datastore/src/plugin_data_host.cpp index 65ffa74..1a1611c 100644 --- a/pj_datastore/src/plugin_data_host.cpp +++ b/pj_datastore/src/plugin_data_host.cpp @@ -1435,12 +1435,10 @@ void sourceObjectSetRetentionBudget( // Toolbox object read host trampolines // --------------------------------------------------------------------------- -/// Box holding the type-erased anchor that keeps ObjectStore bytes alive, -/// plus the Span over the producer-published range. One allocated per -/// successful read_latest_at; freed by release_bytes. +/// Heap holder for the PayloadView backing PJ_object_bytes_handle_t. +/// Allocated by read_latest_at; freed by release_bytes. struct ObjectBytesBox { - sdk::BufferAnchor anchor; - Span view; + sdk::PayloadView payload; }; PJ_object_topic_handle_t toolboxObjectLookupTopic(void* ctx, PJ_string_view_t topic_name) noexcept { @@ -1510,12 +1508,12 @@ bool toolboxObjectReadLatestAt( *out_handle = nullptr; try { auto entry = impl->store.latestAt(ObjectTopicId{topic.id}, timestamp_ns); - if (!entry.has_value() || entry->anchor == nullptr) { + if (!entry.has_value() || entry->payload.anchor == nullptr) { impl->setError("no entry at-or-before timestamp"); propagateError(out_error, impl->last_error.c_str()); return false; } - auto* box = new ObjectBytesBox{std::move(entry->anchor), entry->view}; + auto* box = new ObjectBytesBox{std::move(entry->payload)}; *out_handle = reinterpret_cast(box); if (out_timestamp != nullptr) { *out_timestamp = entry->timestamp; @@ -1544,14 +1542,14 @@ void toolboxObjectGetBytes(PJ_object_bytes_handle_t handle, const uint8_t** out_ return; } auto* box = reinterpret_cast(handle); - if (box->view.empty()) { + if (box->payload.bytes.empty()) { return; } if (out_data != nullptr) { - *out_data = box->view.data(); + *out_data = box->payload.bytes.data(); } if (out_size != nullptr) { - *out_size = box->view.size(); + *out_size = box->payload.bytes.size(); } } diff --git a/pj_datastore/tests/object_store_test.cpp b/pj_datastore/tests/object_store_test.cpp index c1997a7..6c46500 100644 --- a/pj_datastore/tests/object_store_test.cpp +++ b/pj_datastore/tests/object_store_test.cpp @@ -150,7 +150,7 @@ TEST(ObjectStoreTest, LatestAtExact) { auto r = store.latestAt(id, 200); ASSERT_TRUE(r.has_value()); EXPECT_EQ(r->timestamp, 200); - EXPECT_EQ(r->view[0], 0x02); + EXPECT_EQ(r->payload.bytes[0], 0x02); } TEST(ObjectStoreTest, LatestAtBetween) { @@ -292,17 +292,13 @@ TEST(ObjectStoreTest, PushLazyResolves) { auto r = store.latestAt(id, 100); ASSERT_TRUE(r.has_value()); EXPECT_EQ(call_count, 1); - EXPECT_EQ(r->view.size(), 2u); - EXPECT_EQ(r->view[0], 0xDE); + EXPECT_EQ(r->payload.bytes.size(), 2u); + EXPECT_EQ(r->payload.bytes[0], 0xDE); } -// Regression: the lazy path must preserve the BufferAnchor's concrete type. -// Anchor here is a shared_ptr — NOT a shared_ptr. If -// resolveEntry static_pointer_cast'd to vector (the prior implementation), -// view would point at garbage and ASAN would flag it. We assert that -// (a) the bytes the producer published survive resolution unchanged, and -// (b) the anchor's refcount stays > 0 through the resolve (the store does -// not silently swap it for a different anchor type). +// Regression: anchor type-erasure must survive resolveEntry. The anchor here +// is a shared_ptr (not vector); a prior static_pointer_cast to +// vector would UB. TEST(ObjectStoreTest, PushLazyPreservesAnchorType) { struct TestBuffer { std::array bytes{0x11, 0x22, 0x33, 0x44}; @@ -322,16 +318,14 @@ TEST(ObjectStoreTest, PushLazyPreservesAnchorType) { auto r = store.latestAt(id, 100); ASSERT_TRUE(r.has_value()); - ASSERT_EQ(r->view.size(), 4u); - EXPECT_EQ(r->view[0], 0x11); - EXPECT_EQ(r->view[3], 0x44); + ASSERT_EQ(r->payload.bytes.size(), 4u); + EXPECT_EQ(r->payload.bytes[0], 0x11); + EXPECT_EQ(r->payload.bytes[3], 0x44); EXPECT_FALSE(weak_buffer.expired()); // anchor still holds the buffer alive } -// Regression: PayloadView::bytes is the *producer-chosen sub-range* of the -// anchor's storage. resolveEntry must propagate that Span verbatim — not the -// anchor's full extent. The prior implementation ignored the Span and -// returned the anchor's whole vector. +// Regression: the producer's Span is a sub-range of the anchor's storage. +// resolveEntry must propagate it verbatim — not the anchor's full extent. TEST(ObjectStoreTest, PushLazyHonorsSpanSubview) { ObjectStore store; auto id = registerTestTopic(store); @@ -350,10 +344,10 @@ TEST(ObjectStoreTest, PushLazyHonorsSpanSubview) { auto r = store.latestAt(id, 100); ASSERT_TRUE(r.has_value()); - EXPECT_EQ(r->view.size(), 10u); - EXPECT_EQ(r->view.data(), chunk->data() + 20); - EXPECT_EQ(r->view[0], 20); - EXPECT_EQ(r->view[9], 29); + EXPECT_EQ(r->payload.bytes.size(), 10u); + EXPECT_EQ(r->payload.bytes.data(), chunk->data() + 20); + EXPECT_EQ(r->payload.bytes[0], 20); + EXPECT_EQ(r->payload.bytes[9], 29); } // ========================================================================= @@ -390,13 +384,13 @@ TEST(ObjectStoreTest, HandleSurvivesEviction) { auto handle = store.latestAt(id, 100); ASSERT_TRUE(handle.has_value()); - EXPECT_EQ(handle->view[0], 0xAA); + EXPECT_EQ(handle->payload.bytes[0], 0xAA); store.evictBefore(id, 150); EXPECT_EQ(store.entryCount(id), 1u); - EXPECT_EQ(handle->view.size(), 4u); - EXPECT_EQ(handle->view[0], 0xAA); + EXPECT_EQ(handle->payload.bytes.size(), 4u); + EXPECT_EQ(handle->payload.bytes[0], 0xAA); } // ========================================================================= diff --git a/pj_datastore/tests/plugin_data_host_object_test.cpp b/pj_datastore/tests/plugin_data_host_object_test.cpp index e4987d7..63085a9 100644 --- a/pj_datastore/tests/plugin_data_host_object_test.cpp +++ b/pj_datastore/tests/plugin_data_host_object_test.cpp @@ -64,9 +64,10 @@ TEST(PluginDataHostObjectTest, PushOwnedStoresBytes) { EXPECT_EQ(f.store.entryCount(store_id), 2U); auto resolved = f.store.latestAt(store_id, 2000); ASSERT_TRUE(resolved.has_value()); - ASSERT_NE(resolved->anchor, nullptr); - EXPECT_EQ(resolved->view.size(), payload.size()); - EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), payload.begin(), payload.end())); + ASSERT_NE(resolved->payload.anchor, nullptr); + EXPECT_EQ(resolved->payload.bytes.size(), payload.size()); + EXPECT_TRUE( + std::equal(resolved->payload.bytes.begin(), resolved->payload.bytes.end(), payload.begin(), payload.end())); } TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) { @@ -94,8 +95,10 @@ TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) { // Each read invokes the fetch closure. auto first = f.store.latestAt(ObjectTopicId{topic.id}, 42); ASSERT_TRUE(first.has_value()); - ASSERT_NE(first->anchor, nullptr); - EXPECT_TRUE(std::equal(first->view.begin(), first->view.end(), shared->payload.begin(), shared->payload.end())); + ASSERT_NE(first->payload.anchor, nullptr); + EXPECT_TRUE( + std::equal( + first->payload.bytes.begin(), first->payload.bytes.end(), shared->payload.begin(), shared->payload.end())); EXPECT_GE(shared->fetch_calls.load(), 1); auto second = f.store.latestAt(ObjectTopicId{topic.id}, 42); @@ -150,7 +153,9 @@ TEST(PluginDataHostObjectTest, PushLazyDestroyCallbackRunsExactlyOnceOnEviction) // Fetch once — the callback runs but the ctx stays alive. auto resolved = f.store.latestAt(ObjectTopicId{topic.id}, 100); ASSERT_TRUE(resolved.has_value()); - EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), ctx->payload.begin(), ctx->payload.end())); + EXPECT_TRUE( + std::equal( + resolved->payload.bytes.begin(), resolved->payload.bytes.end(), ctx->payload.begin(), ctx->payload.end())); EXPECT_EQ(ctx->destroy_count.load(), 0); // Evict — destroy_fn runs exactly once. diff --git a/pj_datastore/tests/plugin_parser_object_write_test.cpp b/pj_datastore/tests/plugin_parser_object_write_test.cpp index d856b5a..19068ef 100644 --- a/pj_datastore/tests/plugin_parser_object_write_test.cpp +++ b/pj_datastore/tests/plugin_parser_object_write_test.cpp @@ -141,9 +141,10 @@ TEST(ParserObjectWriteHostTest, ParserWritesToBothHostsFromOneParse) { // Object-store side: bytes landed. auto resolved = store.latestAt(ObjectTopicId{obj_topic.id}, 100); ASSERT_TRUE(resolved.has_value()); - ASSERT_NE(resolved->anchor, nullptr); + ASSERT_NE(resolved->payload.anchor, nullptr); const std::vector expected{0xAA, 0xBB, 0xCC}; - EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), expected.begin(), expected.end())); + EXPECT_TRUE( + std::equal(resolved->payload.bytes.begin(), resolved->payload.bytes.end(), expected.begin(), expected.end())); // (Scalar side requires flushing + a read path; Phase-3 scope is proving // both hosts were resolved and invoked. Scalar writes go into DataEngine @@ -204,7 +205,8 @@ TEST(ParserObjectWriteHostTest, ObjectHostViewPushLazyThroughSdk) { auto resolved = store.latestAt(ObjectTopicId{topic.id}, 10); ASSERT_TRUE(resolved.has_value()); const std::vector expected{0xAA, 0xBB}; - EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), expected.begin(), expected.end())); + EXPECT_TRUE( + std::equal(resolved->payload.bytes.begin(), resolved->payload.bytes.end(), expected.begin(), expected.end())); EXPECT_GE(fetch_calls, 1); } From 24ecc1e863f666180c8d24ed6661070bc342db62 Mon Sep 17 00:00:00 2001 From: Davide Faconti Date: Thu, 28 May 2026 22:30:35 +0200 Subject: [PATCH 4/5] chore: bump version to 0.5.0 Source-incompatible changes in pj_datastore/object_store.hpp: - ObjectEntry::payload variant tag types - pushLazy fetch signature (returns PayloadView) - ResolvedObjectEntry field layout (carries a PayloadView) Per the pre-1.0 convention, this warrants a MINOR bump. Co-Authored-By: Claude Opus 4.7 (1M context) --- CMakeLists.txt | 2 +- conanfile.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index cb1b5a6..81261a9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -202,7 +202,7 @@ endif() if(PJ_INSTALL_SDK) include(CMakePackageConfigHelpers) - set(PJ_PACKAGE_VERSION "0.4.2") + set(PJ_PACKAGE_VERSION "0.5.0") set(PJ_PACKAGE_CMAKE_DIR ${CMAKE_INSTALL_LIBDIR}/cmake/plotjuggler_core) install(EXPORT plotjuggler_coreTargets diff --git a/conanfile.py b/conanfile.py index e20dde6..b442f98 100644 --- a/conanfile.py +++ b/conanfile.py @@ -7,7 +7,7 @@ plugin_sdk — umbrella for plugin authors (base + dialog SDK + parser SDK) plugin_host — umbrella for host loaders (data_source/parser/toolbox/dialog) -A consuming Conan recipe declares e.g. `plotjuggler_core/0.4.2` and then: +A consuming Conan recipe declares e.g. `plotjuggler_core/0.5.0` and then: find_package(plotjuggler_core REQUIRED COMPONENTS plugin_sdk) target_link_libraries(my_plugin PRIVATE plotjuggler_core::plugin_sdk) @@ -27,7 +27,7 @@ class PlotjugglerCoreConan(ConanFile): name = "plotjuggler_core" - version = "0.4.2" + version = "0.5.0" # Apache-2.0 covers pj_base + pj_plugins (the plugin-facing SDK); # MPL-2.0 covers pj_datastore (the storage engine). See LICENSE. license = "Apache-2.0 AND MPL-2.0" From afe0997693e85508e31b65d04a8a39febe12ab1c Mon Sep 17 00:00:00 2001 From: Davide Faconti Date: Fri, 29 May 2026 09:46:36 +0200 Subject: [PATCH 5/5] docs(object_store): sync design doc with PayloadView refactor OBJECT_STORE_DESIGN.md still described the pre-refactor API: a vector-returning lazy callback and a ResolvedObjectEntry carrying a shared_ptr> data field. Bring it in line with the code already on this branch: - ObjectEntry::payload is std::variant; document the two named aliases. - pushLazy's callable returns sdk::PayloadView (Span + type-erased BufferAnchor), enabling zero-copy sub-range views; the store never copies on resolve. - ResolvedObjectEntry carries an sdk::PayloadView; bytes/anchor replace the old data field, and an empty anchor means "no bytes". Docs-only; no behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) --- pj_datastore/docs/OBJECT_STORE_DESIGN.md | 27 ++++++++++++++++-------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/pj_datastore/docs/OBJECT_STORE_DESIGN.md b/pj_datastore/docs/OBJECT_STORE_DESIGN.md index d857869..b34bf65 100644 --- a/pj_datastore/docs/OBJECT_STORE_DESIGN.md +++ b/pj_datastore/docs/OBJECT_STORE_DESIGN.md @@ -30,11 +30,14 @@ struct ObjectTopicDescriptor { std::string metadata_json; }; +// Eager payload: store-owned bytes, counted against the retention budget. +using SharedBuffer = std::shared_ptr>; +// Lazy payload: idempotent fetcher returning a view + ownership anchor. +using LazyCallback = std::function; + struct ObjectEntry { Timestamp timestamp; - std::variant< - std::shared_ptr>, - std::function()>> payload; + std::variant payload; }; struct RetentionBudget { @@ -55,9 +58,13 @@ order. Equal timestamps are allowed. Out-of-order writes fail. shared buffer owned by the store. Owned entries contribute to `memoryUsage()`. `pushLazy(id, timestamp, fetch)` stores a callable instead of bytes. The callable -is invoked on each read and returns a fresh byte vector. Lazy entries do not -contribute to `memoryUsage()` because the store does not retain the bytes between -reads. +is invoked on each read and returns a `sdk::PayloadView` — a `Span` +paired with a type-erased `BufferAnchor` that keeps those bytes alive for as long +as the resolved view is held. The producer anchors on whatever already owns the +bytes (a decompressed chunk, an mmap, or a fresh allocation via +`sdk::makePayloadView`), so the store never copies on resolve. Lazy entries do not +contribute to `memoryUsage()` because the store retains the callable, not the +fetched bytes. Both write paths apply the topic retention budget after the new entry is inserted. @@ -80,12 +87,14 @@ Resolved entries contain: ```cpp struct ResolvedObjectEntry { Timestamp timestamp; - std::shared_ptr> data; + sdk::PayloadView payload; // { Span bytes; BufferAnchor anchor; } }; ``` -The shared pointer keeps resolved bytes alive independently of later store -mutation. +`payload.bytes` is the resolved view; `payload.anchor` keeps those bytes alive +independently of later store mutation (eviction, removal, or `clear()`). For an +owned entry the anchor is the store's `SharedBuffer`; for a lazy entry it is +whatever the fetcher anchored on. An empty `anchor` means "no bytes". ## Retention