diff --git a/CMakeLists.txt b/CMakeLists.txt index cb1b5a6..81261a9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -202,7 +202,7 @@ endif() if(PJ_INSTALL_SDK) include(CMakePackageConfigHelpers) - set(PJ_PACKAGE_VERSION "0.4.2") + set(PJ_PACKAGE_VERSION "0.5.0") set(PJ_PACKAGE_CMAKE_DIR ${CMAKE_INSTALL_LIBDIR}/cmake/plotjuggler_core) install(EXPORT plotjuggler_coreTargets diff --git a/conanfile.py b/conanfile.py index e20dde6..b442f98 100644 --- a/conanfile.py +++ b/conanfile.py @@ -7,7 +7,7 @@ plugin_sdk — umbrella for plugin authors (base + dialog SDK + parser SDK) plugin_host — umbrella for host loaders (data_source/parser/toolbox/dialog) -A consuming Conan recipe declares e.g. `plotjuggler_core/0.4.2` and then: +A consuming Conan recipe declares e.g. `plotjuggler_core/0.5.0` and then: find_package(plotjuggler_core REQUIRED COMPONENTS plugin_sdk) target_link_libraries(my_plugin PRIVATE plotjuggler_core::plugin_sdk) @@ -27,7 +27,7 @@ class PlotjugglerCoreConan(ConanFile): name = "plotjuggler_core" - version = "0.4.2" + version = "0.5.0" # Apache-2.0 covers pj_base + pj_plugins (the plugin-facing SDK); # MPL-2.0 covers pj_datastore (the storage engine). See LICENSE. license = "Apache-2.0 AND MPL-2.0" diff --git a/pj_base/include/pj_base/buffer_anchor.hpp b/pj_base/include/pj_base/buffer_anchor.hpp index ebaa973..17441d0 100644 --- a/pj_base/include/pj_base/buffer_anchor.hpp +++ b/pj_base/include/pj_base/buffer_anchor.hpp @@ -15,6 +15,8 @@ #include #include +#include +#include #include "pj_base/span.hpp" @@ -44,10 +46,22 @@ struct PayloadView { PayloadView(Span bytes_, BufferAnchor anchor_) : bytes(bytes_), anchor(std::move(anchor_)) {} - PayloadView(std::shared_ptr> buffer) + PayloadView(std::shared_ptr> buffer) : bytes(buffer ? Span(buffer->data(), buffer->size()) : Span()), anchor(std::move(buffer)) {} }; +/// Wrap a fresh vector as both anchor and Span. Use when the producer has +/// no natural anchor (e.g. raw bytes from a C-ABI fetch); when an upstream +/// allocation already exists, construct PayloadView directly to avoid this +/// helper's copy. +inline PayloadView makePayloadView(std::vector bytes) { + auto shared = std::make_shared>(std::move(bytes)); + return PayloadView{ + Span{shared->data(), shared->size()}, + BufferAnchor{shared}, + }; +} + } // namespace sdk } // namespace PJ diff --git a/pj_datastore/docs/OBJECT_STORE_DESIGN.md b/pj_datastore/docs/OBJECT_STORE_DESIGN.md index d857869..b34bf65 100644 --- a/pj_datastore/docs/OBJECT_STORE_DESIGN.md +++ b/pj_datastore/docs/OBJECT_STORE_DESIGN.md @@ -30,11 +30,14 @@ struct ObjectTopicDescriptor { std::string metadata_json; }; +// Eager payload: store-owned bytes, counted against the retention budget. +using SharedBuffer = std::shared_ptr>; +// Lazy payload: idempotent fetcher returning a view + ownership anchor. +using LazyCallback = std::function; + struct ObjectEntry { Timestamp timestamp; - std::variant< - std::shared_ptr>, - std::function()>> payload; + std::variant payload; }; struct RetentionBudget { @@ -55,9 +58,13 @@ order. Equal timestamps are allowed. Out-of-order writes fail. shared buffer owned by the store. Owned entries contribute to `memoryUsage()`. `pushLazy(id, timestamp, fetch)` stores a callable instead of bytes. The callable -is invoked on each read and returns a fresh byte vector. Lazy entries do not -contribute to `memoryUsage()` because the store does not retain the bytes between -reads. +is invoked on each read and returns a `sdk::PayloadView` — a `Span` +paired with a type-erased `BufferAnchor` that keeps those bytes alive for as long +as the resolved view is held. The producer anchors on whatever already owns the +bytes (a decompressed chunk, an mmap, or a fresh allocation via +`sdk::makePayloadView`), so the store never copies on resolve. Lazy entries do not +contribute to `memoryUsage()` because the store retains the callable, not the +fetched bytes. Both write paths apply the topic retention budget after the new entry is inserted. @@ -80,12 +87,14 @@ Resolved entries contain: ```cpp struct ResolvedObjectEntry { Timestamp timestamp; - std::shared_ptr> data; + sdk::PayloadView payload; // { Span bytes; BufferAnchor anchor; } }; ``` -The shared pointer keeps resolved bytes alive independently of later store -mutation. +`payload.bytes` is the resolved view; `payload.anchor` keeps those bytes alive +independently of later store mutation (eviction, removal, or `clear()`). For an +owned entry the anchor is the store's `SharedBuffer`; for a lazy entry it is +whatever the fetcher anchored on. An empty `anchor` means "no bytes". ## Retention diff --git a/pj_datastore/include/pj_datastore/object_store.hpp b/pj_datastore/include/pj_datastore/object_store.hpp index 8f3c6c0..987996a 100644 --- a/pj_datastore/include/pj_datastore/object_store.hpp +++ b/pj_datastore/include/pj_datastore/object_store.hpp @@ -16,7 +16,9 @@ #include #include +#include "pj_base/buffer_anchor.hpp" #include "pj_base/expected.hpp" +#include "pj_base/span.hpp" #include "pj_base/types.hpp" namespace PJ { @@ -38,14 +40,21 @@ struct ObjectTopicDescriptor { std::string metadata_json; }; +/// Eager payload: store-owned bytes, counted against the retention budget. +using SharedBuffer = std::shared_ptr>; + +/// Lazy payload: idempotent, thread-safe fetcher returning bytes + anchor. +/// Invoked on every read; bytes are not counted against the retention budget. +using LazyCallback = std::function; + struct ObjectEntry { Timestamp timestamp = 0; - std::variant>, std::function()>> payload; + std::variant payload; }; struct ResolvedObjectEntry { Timestamp timestamp = 0; - std::shared_ptr> data; + sdk::PayloadView payload; }; struct RetentionBudget { @@ -106,10 +115,12 @@ class ObjectStore { // --- Write --- - Expected pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector payload); + Status pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector payload); - Expected pushLazy( - ObjectTopicId id, Timestamp timestamp, std::function()> fetch); + // Fetcher runs on every read. Producers anchor on whatever owns the bytes + // (chunk cache, mmap, fresh allocation); the store never copies — it just + // retains the anchor through PayloadView. + Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch); // --- Read --- diff --git a/pj_datastore/src/object_store.cpp b/pj_datastore/src/object_store.cpp index a75bc88..2cae153 100644 --- a/pj_datastore/src/object_store.cpp +++ b/pj_datastore/src/object_store.cpp @@ -67,8 +67,7 @@ std::vector ObjectStore::listTopics(DatasetId dataset_id) const { // --- Write --- -Expected ObjectStore::pushOwned( - ObjectTopicId id, Timestamp timestamp, std::vector payload) { +Status ObjectStore::pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector payload) { std::shared_lock store_lock(store_mutex_); auto* series = findSeries(id); if (series == nullptr) { @@ -94,8 +93,7 @@ Expected ObjectStore::pushOwned( return {}; } -Expected ObjectStore::pushLazy( - ObjectTopicId id, Timestamp timestamp, std::function()> fetch) { +Status ObjectStore::pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch) { std::shared_lock store_lock(store_mutex_); auto* series = findSeries(id); if (series == nullptr) { @@ -306,11 +304,15 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) { ResolvedObjectEntry resolved; resolved.timestamp = entry.timestamp; - if (const auto* owned = std::get_if>>(&entry.payload)) { - resolved.data = *owned; - } else if (const auto* lazy = std::get_if()>>(&entry.payload)) { - auto bytes = (*lazy)(); - resolved.data = std::make_shared>(std::move(bytes)); + if (const auto* owned = std::get_if(&entry.payload)) { + if (*owned) { + resolved.payload = sdk::PayloadView{ + Span{(*owned)->data(), (*owned)->size()}, + sdk::BufferAnchor{*owned}, + }; + } + } else if (const auto* lazy = std::get_if(&entry.payload)) { + resolved.payload = (*lazy)(); } return resolved; @@ -322,7 +324,7 @@ void ObjectStore::evictFront(ObjectSeries& series) { } const auto& front = series.entries.front(); - if (const auto* owned = std::get_if>>(&front.payload)) { + if (const auto* owned = std::get_if(&front.payload); owned != nullptr && *owned) { series.memory_bytes -= (*owned)->size(); } diff --git a/pj_datastore/src/plugin_data_host.cpp b/pj_datastore/src/plugin_data_host.cpp index 8aa5f7c..1a1611c 100644 --- a/pj_datastore/src/plugin_data_host.cpp +++ b/pj_datastore/src/plugin_data_host.cpp @@ -1392,7 +1392,9 @@ bool sourceObjectPushLazy( // the lambda; destructor runs exactly once when ObjectStore drops the // entry (retention, evict, removeTopic, clear, or store teardown). auto holder = std::make_shared(fetch_fn, fetch_ctx, fetch_ctx_destroy); - auto closure = [holder]() -> std::vector { return holder->invoke(); }; + // Plugins return raw bytes via the C ABI; wrap them as a PayloadView whose + // anchor is a shared_ptr>, per the pushLazy contract. + auto closure = [holder]() -> sdk::PayloadView { return sdk::makePayloadView(holder->invoke()); }; auto result = impl->store.pushLazy(ObjectTopicId{topic.id}, timestamp_ns, std::move(closure)); if (!result) { impl->setError(result.error()); @@ -1433,10 +1435,10 @@ void sourceObjectSetRetentionBudget( // Toolbox object read host trampolines // --------------------------------------------------------------------------- -/// Box holding the shared_ptr that keeps ObjectStore bytes alive. One -/// allocated per successful read_latest_at; freed by release_bytes. +/// Heap holder for the PayloadView backing PJ_object_bytes_handle_t. +/// Allocated by read_latest_at; freed by release_bytes. struct ObjectBytesBox { - std::shared_ptr> bytes; + sdk::PayloadView payload; }; PJ_object_topic_handle_t toolboxObjectLookupTopic(void* ctx, PJ_string_view_t topic_name) noexcept { @@ -1506,12 +1508,12 @@ bool toolboxObjectReadLatestAt( *out_handle = nullptr; try { auto entry = impl->store.latestAt(ObjectTopicId{topic.id}, timestamp_ns); - if (!entry.has_value() || entry->data == nullptr) { + if (!entry.has_value() || entry->payload.anchor == nullptr) { impl->setError("no entry at-or-before timestamp"); propagateError(out_error, impl->last_error.c_str()); return false; } - auto* box = new ObjectBytesBox{std::move(entry->data)}; + auto* box = new ObjectBytesBox{std::move(entry->payload)}; *out_handle = reinterpret_cast(box); if (out_timestamp != nullptr) { *out_timestamp = entry->timestamp; @@ -1540,14 +1542,14 @@ void toolboxObjectGetBytes(PJ_object_bytes_handle_t handle, const uint8_t** out_ return; } auto* box = reinterpret_cast(handle); - if (!box->bytes) { + if (box->payload.bytes.empty()) { return; } if (out_data != nullptr) { - *out_data = box->bytes->data(); + *out_data = box->payload.bytes.data(); } if (out_size != nullptr) { - *out_size = box->bytes->size(); + *out_size = box->payload.bytes.size(); } } @@ -1631,7 +1633,7 @@ bool parserObjectPushLazy( } try { auto holder = std::make_shared(fetch_fn, fetch_ctx, fetch_ctx_destroy); - auto closure = [holder]() -> std::vector { return holder->invoke(); }; + auto closure = [holder]() -> sdk::PayloadView { return sdk::makePayloadView(holder->invoke()); }; auto result = impl->store.pushLazy(impl->bound_topic, timestamp_ns, std::move(closure)); if (!result) { impl->setError(result.error()); diff --git a/pj_datastore/tests/object_store_test.cpp b/pj_datastore/tests/object_store_test.cpp index 0edeb2d..6c46500 100644 --- a/pj_datastore/tests/object_store_test.cpp +++ b/pj_datastore/tests/object_store_test.cpp @@ -5,8 +5,10 @@ #include +#include #include #include +#include #include #include #include @@ -148,7 +150,7 @@ TEST(ObjectStoreTest, LatestAtExact) { auto r = store.latestAt(id, 200); ASSERT_TRUE(r.has_value()); EXPECT_EQ(r->timestamp, 200); - EXPECT_EQ((*r->data)[0], 0x02); + EXPECT_EQ(r->payload.bytes[0], 0x02); } TEST(ObjectStoreTest, LatestAtBetween) { @@ -281,17 +283,71 @@ TEST(ObjectStoreTest, PushLazyResolves) { ObjectStore store; auto id = registerTestTopic(store); int call_count = 0; - store.pushLazy(id, 100, [&call_count]() -> std::vector { + store.pushLazy(id, 100, [&call_count]() -> sdk::PayloadView { ++call_count; - return {0xDE, 0xAD}; + return sdk::makePayloadView({0xDE, 0xAD}); }); EXPECT_EQ(call_count, 0); auto r = store.latestAt(id, 100); ASSERT_TRUE(r.has_value()); EXPECT_EQ(call_count, 1); - EXPECT_EQ(r->data->size(), 2u); - EXPECT_EQ((*r->data)[0], 0xDE); + EXPECT_EQ(r->payload.bytes.size(), 2u); + EXPECT_EQ(r->payload.bytes[0], 0xDE); +} + +// Regression: anchor type-erasure must survive resolveEntry. The anchor here +// is a shared_ptr (not vector); a prior static_pointer_cast to +// vector would UB. +TEST(ObjectStoreTest, PushLazyPreservesAnchorType) { + struct TestBuffer { + std::array bytes{0x11, 0x22, 0x33, 0x44}; + }; + ObjectStore store; + auto id = registerTestTopic(store); + + auto buffer = std::make_shared(); + std::weak_ptr weak_buffer = buffer; + + store.pushLazy(id, 100, [buffer]() -> sdk::PayloadView { + return sdk::PayloadView{ + Span{buffer->bytes.data(), buffer->bytes.size()}, + sdk::BufferAnchor{buffer}, + }; + }); + + auto r = store.latestAt(id, 100); + ASSERT_TRUE(r.has_value()); + ASSERT_EQ(r->payload.bytes.size(), 4u); + EXPECT_EQ(r->payload.bytes[0], 0x11); + EXPECT_EQ(r->payload.bytes[3], 0x44); + EXPECT_FALSE(weak_buffer.expired()); // anchor still holds the buffer alive +} + +// Regression: the producer's Span is a sub-range of the anchor's storage. +// resolveEntry must propagate it verbatim — not the anchor's full extent. +TEST(ObjectStoreTest, PushLazyHonorsSpanSubview) { + ObjectStore store; + auto id = registerTestTopic(store); + + auto chunk = std::make_shared>(100); + for (size_t i = 0; i < chunk->size(); ++i) { + (*chunk)[i] = static_cast(i); + } + + store.pushLazy(id, 100, [chunk]() -> sdk::PayloadView { + return sdk::PayloadView{ + Span{chunk->data() + 20, 10}, // bytes [20, 30) + sdk::BufferAnchor{chunk}, + }; + }); + + auto r = store.latestAt(id, 100); + ASSERT_TRUE(r.has_value()); + EXPECT_EQ(r->payload.bytes.size(), 10u); + EXPECT_EQ(r->payload.bytes.data(), chunk->data() + 20); + EXPECT_EQ(r->payload.bytes[0], 20); + EXPECT_EQ(r->payload.bytes[9], 29); } // ========================================================================= @@ -328,13 +384,13 @@ TEST(ObjectStoreTest, HandleSurvivesEviction) { auto handle = store.latestAt(id, 100); ASSERT_TRUE(handle.has_value()); - EXPECT_EQ((*handle->data)[0], 0xAA); + EXPECT_EQ(handle->payload.bytes[0], 0xAA); store.evictBefore(id, 150); EXPECT_EQ(store.entryCount(id), 1u); - EXPECT_EQ(handle->data->size(), 4u); - EXPECT_EQ((*handle->data)[0], 0xAA); + EXPECT_EQ(handle->payload.bytes.size(), 4u); + EXPECT_EQ(handle->payload.bytes[0], 0xAA); } // ========================================================================= @@ -420,7 +476,9 @@ TEST(ObjectStoreTest, LazyEntriesZeroMemory) { ObjectStore store; auto id = registerTestTopic(store); for (int i = 0; i < 10; ++i) { - store.pushLazy(id, static_cast(i) * 100, []() { return makePayload(1000); }); + store.pushLazy(id, static_cast(i) * 100, []() -> sdk::PayloadView { + return sdk::makePayloadView(makePayload(1000)); + }); } EXPECT_EQ(store.memoryUsage(id), 0u); } diff --git a/pj_datastore/tests/plugin_data_host_object_test.cpp b/pj_datastore/tests/plugin_data_host_object_test.cpp index c23f324..63085a9 100644 --- a/pj_datastore/tests/plugin_data_host_object_test.cpp +++ b/pj_datastore/tests/plugin_data_host_object_test.cpp @@ -3,6 +3,7 @@ #include +#include #include #include #include @@ -63,9 +64,10 @@ TEST(PluginDataHostObjectTest, PushOwnedStoresBytes) { EXPECT_EQ(f.store.entryCount(store_id), 2U); auto resolved = f.store.latestAt(store_id, 2000); ASSERT_TRUE(resolved.has_value()); - ASSERT_NE(resolved->data, nullptr); - EXPECT_EQ(resolved->data->size(), payload.size()); - EXPECT_EQ(*resolved->data, payload); + ASSERT_NE(resolved->payload.anchor, nullptr); + EXPECT_EQ(resolved->payload.bytes.size(), payload.size()); + EXPECT_TRUE( + std::equal(resolved->payload.bytes.begin(), resolved->payload.bytes.end(), payload.begin(), payload.end())); } TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) { @@ -93,8 +95,10 @@ TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) { // Each read invokes the fetch closure. auto first = f.store.latestAt(ObjectTopicId{topic.id}, 42); ASSERT_TRUE(first.has_value()); - ASSERT_NE(first->data, nullptr); - EXPECT_EQ(*first->data, shared->payload); + ASSERT_NE(first->payload.anchor, nullptr); + EXPECT_TRUE( + std::equal( + first->payload.bytes.begin(), first->payload.bytes.end(), shared->payload.begin(), shared->payload.end())); EXPECT_GE(shared->fetch_calls.load(), 1); auto second = f.store.latestAt(ObjectTopicId{topic.id}, 42); @@ -149,7 +153,9 @@ TEST(PluginDataHostObjectTest, PushLazyDestroyCallbackRunsExactlyOnceOnEviction) // Fetch once — the callback runs but the ctx stays alive. auto resolved = f.store.latestAt(ObjectTopicId{topic.id}, 100); ASSERT_TRUE(resolved.has_value()); - EXPECT_EQ(*resolved->data, ctx->payload); + EXPECT_TRUE( + std::equal( + resolved->payload.bytes.begin(), resolved->payload.bytes.end(), ctx->payload.begin(), ctx->payload.end())); EXPECT_EQ(ctx->destroy_count.load(), 0); // Evict — destroy_fn runs exactly once. diff --git a/pj_datastore/tests/plugin_parser_object_write_test.cpp b/pj_datastore/tests/plugin_parser_object_write_test.cpp index 2f0e135..19068ef 100644 --- a/pj_datastore/tests/plugin_parser_object_write_test.cpp +++ b/pj_datastore/tests/plugin_parser_object_write_test.cpp @@ -9,6 +9,7 @@ #include +#include #include #include #include @@ -140,9 +141,10 @@ TEST(ParserObjectWriteHostTest, ParserWritesToBothHostsFromOneParse) { // Object-store side: bytes landed. auto resolved = store.latestAt(ObjectTopicId{obj_topic.id}, 100); ASSERT_TRUE(resolved.has_value()); - ASSERT_NE(resolved->data, nullptr); + ASSERT_NE(resolved->payload.anchor, nullptr); const std::vector expected{0xAA, 0xBB, 0xCC}; - EXPECT_EQ(*resolved->data, expected); + EXPECT_TRUE( + std::equal(resolved->payload.bytes.begin(), resolved->payload.bytes.end(), expected.begin(), expected.end())); // (Scalar side requires flushing + a read path; Phase-3 scope is proving // both hosts were resolved and invoked. Scalar writes go into DataEngine @@ -202,7 +204,9 @@ TEST(ParserObjectWriteHostTest, ObjectHostViewPushLazyThroughSdk) { auto resolved = store.latestAt(ObjectTopicId{topic.id}, 10); ASSERT_TRUE(resolved.has_value()); - EXPECT_EQ(*resolved->data, (std::vector{0xAA, 0xBB})); + const std::vector expected{0xAA, 0xBB}; + EXPECT_TRUE( + std::equal(resolved->payload.bytes.begin(), resolved->payload.bytes.end(), expected.begin(), expected.end())); EXPECT_GE(fetch_calls, 1); } diff --git a/pj_plugins/include/pj_plugins/host/service_registry_builder.hpp b/pj_plugins/include/pj_plugins/host/service_registry_builder.hpp index d372ed1..e27ceca 100644 --- a/pj_plugins/include/pj_plugins/host/service_registry_builder.hpp +++ b/pj_plugins/include/pj_plugins/host/service_registry_builder.hpp @@ -43,7 +43,7 @@ class ServiceRegistryBuilder { /// not take ownership of ctx/vtable. /// @return `Expected`: ok on success, error string on duplicate or /// null fat-pointer field. - [[nodiscard]] ::PJ::Expected tryRegisterService( + [[nodiscard]] ::PJ::Status tryRegisterService( std::string_view name, uint32_t protocol_version, PJ_service_t service) { const std::string service_name(name); if (service.ctx == nullptr || service.vtable == nullptr) {