Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ endif()
if(PJ_INSTALL_SDK)
include(CMakePackageConfigHelpers)

set(PJ_PACKAGE_VERSION "0.4.2")
set(PJ_PACKAGE_VERSION "0.5.0")
set(PJ_PACKAGE_CMAKE_DIR ${CMAKE_INSTALL_LIBDIR}/cmake/plotjuggler_core)

install(EXPORT plotjuggler_coreTargets
Expand Down
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
plugin_sdk — umbrella for plugin authors (base + dialog SDK + parser SDK)
plugin_host — umbrella for host loaders (data_source/parser/toolbox/dialog)

A consuming Conan recipe declares e.g. `plotjuggler_core/0.4.2` and then:
A consuming Conan recipe declares e.g. `plotjuggler_core/0.5.0` and then:

find_package(plotjuggler_core REQUIRED COMPONENTS plugin_sdk)
target_link_libraries(my_plugin PRIVATE plotjuggler_core::plugin_sdk)
Expand All @@ -27,7 +27,7 @@

class PlotjugglerCoreConan(ConanFile):
name = "plotjuggler_core"
version = "0.4.2"
version = "0.5.0"
# Apache-2.0 covers pj_base + pj_plugins (the plugin-facing SDK);
# MPL-2.0 covers pj_datastore (the storage engine). See LICENSE.
license = "Apache-2.0 AND MPL-2.0"
Expand Down
16 changes: 15 additions & 1 deletion pj_base/include/pj_base/buffer_anchor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

#include <cstdint>
#include <memory>
#include <utility>
#include <vector>

#include "pj_base/span.hpp"

Expand Down Expand Up @@ -44,10 +46,22 @@ struct PayloadView {

PayloadView(Span<const uint8_t> bytes_, BufferAnchor anchor_) : bytes(bytes_), anchor(std::move(anchor_)) {}

PayloadView(std::shared_ptr<std::vector<uint8_t>> buffer)
PayloadView(std::shared_ptr<const std::vector<uint8_t>> buffer)
: bytes(buffer ? Span<const uint8_t>(buffer->data(), buffer->size()) : Span<const uint8_t>()),
anchor(std::move(buffer)) {}
};

/// Wrap a fresh vector as both anchor and Span. Use when the producer has
/// no natural anchor (e.g. raw bytes from a C-ABI fetch); when an upstream
/// allocation already exists, construct PayloadView directly to avoid this
/// helper's copy.
inline PayloadView makePayloadView(std::vector<uint8_t> bytes) {
auto shared = std::make_shared<const std::vector<uint8_t>>(std::move(bytes));
return PayloadView{
Span<const uint8_t>{shared->data(), shared->size()},
BufferAnchor{shared},
};
}
Comment thread
facontidavide marked this conversation as resolved.

} // namespace sdk
} // namespace PJ
27 changes: 18 additions & 9 deletions pj_datastore/docs/OBJECT_STORE_DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ struct ObjectTopicDescriptor {
std::string metadata_json;
};

// Eager payload: store-owned bytes, counted against the retention budget.
using SharedBuffer = std::shared_ptr<const std::vector<uint8_t>>;
// Lazy payload: idempotent fetcher returning a view + ownership anchor.
using LazyCallback = std::function<sdk::PayloadView()>;

struct ObjectEntry {
Timestamp timestamp;
std::variant<
std::shared_ptr<const std::vector<uint8_t>>,
std::function<std::vector<uint8_t>()>> payload;
std::variant<SharedBuffer, LazyCallback> payload;
};

struct RetentionBudget {
Expand All @@ -55,9 +58,13 @@ order. Equal timestamps are allowed. Out-of-order writes fail.
shared buffer owned by the store. Owned entries contribute to `memoryUsage()`.

`pushLazy(id, timestamp, fetch)` stores a callable instead of bytes. The callable
is invoked on each read and returns a fresh byte vector. Lazy entries do not
contribute to `memoryUsage()` because the store does not retain the bytes between
reads.
is invoked on each read and returns a `sdk::PayloadView` — a `Span<const uint8_t>`
paired with a type-erased `BufferAnchor` that keeps those bytes alive for as long
as the resolved view is held. The producer anchors on whatever already owns the
bytes (a decompressed chunk, an mmap, or a fresh allocation via
`sdk::makePayloadView`), so the store never copies on resolve. Lazy entries do not
contribute to `memoryUsage()` because the store retains the callable, not the
fetched bytes.

Both write paths apply the topic retention budget after the new entry is
inserted.
Expand All @@ -80,12 +87,14 @@ Resolved entries contain:
```cpp
struct ResolvedObjectEntry {
Timestamp timestamp;
std::shared_ptr<const std::vector<uint8_t>> data;
sdk::PayloadView payload; // { Span<const uint8_t> bytes; BufferAnchor anchor; }
};
```

The shared pointer keeps resolved bytes alive independently of later store
mutation.
`payload.bytes` is the resolved view; `payload.anchor` keeps those bytes alive
independently of later store mutation (eviction, removal, or `clear()`). For an
owned entry the anchor is the store's `SharedBuffer`; for a lazy entry it is
whatever the fetcher anchored on. An empty `anchor` means "no bytes".

## Retention

Expand Down
21 changes: 16 additions & 5 deletions pj_datastore/include/pj_datastore/object_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#include <variant>
#include <vector>

#include "pj_base/buffer_anchor.hpp"
#include "pj_base/expected.hpp"
#include "pj_base/span.hpp"
#include "pj_base/types.hpp"

namespace PJ {
Expand All @@ -38,14 +40,21 @@ struct ObjectTopicDescriptor {
std::string metadata_json;
};

/// Eager payload: store-owned bytes, counted against the retention budget.
using SharedBuffer = std::shared_ptr<const std::vector<uint8_t>>;

/// Lazy payload: idempotent, thread-safe fetcher returning bytes + anchor.
/// Invoked on every read; bytes are not counted against the retention budget.
using LazyCallback = std::function<sdk::PayloadView()>;

struct ObjectEntry {
Timestamp timestamp = 0;
std::variant<std::shared_ptr<const std::vector<uint8_t>>, std::function<std::vector<uint8_t>()>> payload;
std::variant<SharedBuffer, LazyCallback> payload;
};

struct ResolvedObjectEntry {
Timestamp timestamp = 0;
std::shared_ptr<const std::vector<uint8_t>> data;
sdk::PayloadView payload;
};

struct RetentionBudget {
Expand Down Expand Up @@ -106,10 +115,12 @@ class ObjectStore {

// --- Write ---

Expected<void, std::string> pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload);
Status pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload);

Expected<void, std::string> pushLazy(
ObjectTopicId id, Timestamp timestamp, std::function<std::vector<uint8_t>()> fetch);
// Fetcher runs on every read. Producers anchor on whatever owns the bytes
// (chunk cache, mmap, fresh allocation); the store never copies — it just
// retains the anchor through PayloadView.
Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch);

// --- Read ---

Expand Down
22 changes: 12 additions & 10 deletions pj_datastore/src/object_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ std::vector<ObjectTopicId> ObjectStore::listTopics(DatasetId dataset_id) const {

// --- Write ---

Expected<void, std::string> ObjectStore::pushOwned(
ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload) {
Status ObjectStore::pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload) {
std::shared_lock store_lock(store_mutex_);
auto* series = findSeries(id);
if (series == nullptr) {
Expand All @@ -94,8 +93,7 @@ Expected<void, std::string> ObjectStore::pushOwned(
return {};
}

Expected<void, std::string> ObjectStore::pushLazy(
ObjectTopicId id, Timestamp timestamp, std::function<std::vector<uint8_t>()> fetch) {
Status ObjectStore::pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch) {
std::shared_lock store_lock(store_mutex_);
auto* series = findSeries(id);
if (series == nullptr) {
Expand Down Expand Up @@ -306,11 +304,15 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) {
ResolvedObjectEntry resolved;
resolved.timestamp = entry.timestamp;

if (const auto* owned = std::get_if<std::shared_ptr<const std::vector<uint8_t>>>(&entry.payload)) {
resolved.data = *owned;
} else if (const auto* lazy = std::get_if<std::function<std::vector<uint8_t>()>>(&entry.payload)) {
auto bytes = (*lazy)();
resolved.data = std::make_shared<const std::vector<uint8_t>>(std::move(bytes));
if (const auto* owned = std::get_if<SharedBuffer>(&entry.payload)) {
if (*owned) {
resolved.payload = sdk::PayloadView{
Span<const uint8_t>{(*owned)->data(), (*owned)->size()},
sdk::BufferAnchor{*owned},
};
}
} else if (const auto* lazy = std::get_if<LazyCallback>(&entry.payload)) {
resolved.payload = (*lazy)();
}

return resolved;
Expand All @@ -322,7 +324,7 @@ void ObjectStore::evictFront(ObjectSeries& series) {
}

const auto& front = series.entries.front();
if (const auto* owned = std::get_if<std::shared_ptr<const std::vector<uint8_t>>>(&front.payload)) {
if (const auto* owned = std::get_if<SharedBuffer>(&front.payload); owned != nullptr && *owned) {
series.memory_bytes -= (*owned)->size();
}

Expand Down
22 changes: 12 additions & 10 deletions pj_datastore/src/plugin_data_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,9 @@ bool sourceObjectPushLazy(
// the lambda; destructor runs exactly once when ObjectStore drops the
// entry (retention, evict, removeTopic, clear, or store teardown).
auto holder = std::make_shared<PluginFetchCtx>(fetch_fn, fetch_ctx, fetch_ctx_destroy);
auto closure = [holder]() -> std::vector<uint8_t> { return holder->invoke(); };
// Plugins return raw bytes via the C ABI; wrap them as a PayloadView whose
// anchor is a shared_ptr<const vector<uint8_t>>, per the pushLazy contract.
auto closure = [holder]() -> sdk::PayloadView { return sdk::makePayloadView(holder->invoke()); };
auto result = impl->store.pushLazy(ObjectTopicId{topic.id}, timestamp_ns, std::move(closure));
if (!result) {
impl->setError(result.error());
Expand Down Expand Up @@ -1433,10 +1435,10 @@ void sourceObjectSetRetentionBudget(
// Toolbox object read host trampolines
// ---------------------------------------------------------------------------

/// Box holding the shared_ptr that keeps ObjectStore bytes alive. One
/// allocated per successful read_latest_at; freed by release_bytes.
/// Heap holder for the PayloadView backing PJ_object_bytes_handle_t.
/// Allocated by read_latest_at; freed by release_bytes.
struct ObjectBytesBox {
std::shared_ptr<const std::vector<uint8_t>> bytes;
sdk::PayloadView payload;
};

PJ_object_topic_handle_t toolboxObjectLookupTopic(void* ctx, PJ_string_view_t topic_name) noexcept {
Expand Down Expand Up @@ -1506,12 +1508,12 @@ bool toolboxObjectReadLatestAt(
*out_handle = nullptr;
try {
auto entry = impl->store.latestAt(ObjectTopicId{topic.id}, timestamp_ns);
if (!entry.has_value() || entry->data == nullptr) {
if (!entry.has_value() || entry->payload.anchor == nullptr) {
impl->setError("no entry at-or-before timestamp");
propagateError(out_error, impl->last_error.c_str());
return false;
}
auto* box = new ObjectBytesBox{std::move(entry->data)};
auto* box = new ObjectBytesBox{std::move(entry->payload)};
*out_handle = reinterpret_cast<PJ_object_bytes_handle_t>(box);
if (out_timestamp != nullptr) {
*out_timestamp = entry->timestamp;
Expand Down Expand Up @@ -1540,14 +1542,14 @@ void toolboxObjectGetBytes(PJ_object_bytes_handle_t handle, const uint8_t** out_
return;
}
auto* box = reinterpret_cast<ObjectBytesBox*>(handle);
if (!box->bytes) {
if (box->payload.bytes.empty()) {
return;
}
if (out_data != nullptr) {
*out_data = box->bytes->data();
*out_data = box->payload.bytes.data();
}
if (out_size != nullptr) {
*out_size = box->bytes->size();
*out_size = box->payload.bytes.size();
}
}

Expand Down Expand Up @@ -1631,7 +1633,7 @@ bool parserObjectPushLazy(
}
try {
auto holder = std::make_shared<PluginFetchCtx>(fetch_fn, fetch_ctx, fetch_ctx_destroy);
auto closure = [holder]() -> std::vector<uint8_t> { return holder->invoke(); };
auto closure = [holder]() -> sdk::PayloadView { return sdk::makePayloadView(holder->invoke()); };
auto result = impl->store.pushLazy(impl->bound_topic, timestamp_ns, std::move(closure));
if (!result) {
impl->setError(result.error());
Expand Down
Loading
Loading