From fafa62419c61c1b82bf700949bc0f02aafad3b39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20I=C3=B1igo=20Blasco?= Date: Fri, 29 May 2026 09:08:52 +0200 Subject: [PATCH 1/2] refactor(object_store): PayloadView end-to-end; cross-store flushTo; atomic target swap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A consolidated refactor of the ObjectStore / DataEngine surface that makes PayloadView the universal vocabulary for ownership of bytes across the SDK, adds zero-copy cross-instance transfer of entries between stores, and exposes an atomic target-swap primitive on the plugin data host used to redirect writes between two stores at runtime. ObjectStore + ObjectEntry - ObjectEntry::payload is std::any holding either std::shared_ptr> (eager owned bytes, counted against the retention budget) or std::function (lazy resolver returning Span + BufferAnchor; not counted, bytes owned upstream). resolveEntry dispatches via std::any_cast; each branch handles its own concrete type. No static_pointer_cast on the lazy anchor — its type erasure (BufferAnchor = shared_ptr) is preserved end-to-end so producers can anchor on any shared_ptr. - ResolvedObjectEntry consolidates on PayloadView (Span + anchor). Removes the previous shared_ptr> field, which locked consumers to a concrete anchor type and required a hidden static_pointer_cast in resolveEntry that would fail silently for non-vector anchors. - ObjectStore::flushTo(ObjectStore& dst): two-phase, atomic, zero- copy bulk transfer of entries between two instances. Topics matched by descriptor; monotonicity enforced strictly per series; failure leaves both sides untouched. Each ObjectEntry is moved by value via std::move; the std::any inside transfers its buffer intact. - DataEngine::flushTo(DataEngine& dst): symmetric primitive for the columnar scalar store. Each TopicStorage's sealed-chunk deque is moved through to the destination; no chunk constructor invoked. - sdk::makePayloadView(std::vector) replaces sdk::makeOwnedPayloadView. Wraps a vector into a shared_ptr that serves as both the bytes backing and the BufferAnchor. - ObjectBytesBox removed from plugin_data_host. The C-ABI toolbox handle becomes a heap-allocated PayloadView directly. plugin_data_host atomic target swap - setTarget on engine and parser write hosts atomically swap which engine/store receives subsequent writes. Safe under concurrent ingest: in-flight writes complete on the previous target. Version bump - 0.4.0 -> 0.5.0. Source-incompatible: ResolvedObjectEntry::data renamed to payload (PayloadView instead of shared_ptr). Consumers migrate from entry->data->{data,size,empty}() to entry->payload.bytes.{data,size,empty}(); from entry->data to entry->payload.anchor for ownership retention. --- pj_datastore/include/pj_datastore/engine.hpp | 19 +- .../include/pj_datastore/object_store.hpp | 40 +++- .../include/pj_datastore/plugin_data_host.hpp | 29 +++ .../include/pj_datastore/topic_storage.hpp | 8 + pj_datastore/src/engine.cpp | 57 +++++ pj_datastore/src/object_store.cpp | 80 ++++++- pj_datastore/src/plugin_data_host.cpp | 142 +++++++---- pj_datastore/src/topic_storage.cpp | 11 +- .../tests/engine_integration_test.cpp | 167 +++++++++++++ pj_datastore/tests/object_store_test.cpp | 223 ++++++++++++++++++ .../tests/plugin_data_host_object_test.cpp | 75 ++++++ 11 files changed, 799 insertions(+), 52 deletions(-) diff --git a/pj_datastore/include/pj_datastore/engine.hpp b/pj_datastore/include/pj_datastore/engine.hpp index 708189d..09c5b60 100644 --- a/pj_datastore/include/pj_datastore/engine.hpp +++ b/pj_datastore/include/pj_datastore/engine.hpp @@ -79,9 +79,26 @@ class DataEngine { /// derived.onSourceCommitted(engine.commitChunks(writer.flushAll())); std::vector commitChunks(std::vector> chunks); - /// Evict old chunks outside retention window. + /// Evict old chunks outside the retention window. void enforceRetention(PJ::Timestamp retention_window_ns); + /// Move every committed chunk from this engine into `dst`, leaving this + /// engine's topic storages empty (datasets, topics, schemas, and time + /// domains remain registered). Topics are matched by descriptor + /// (`dataset_id` + `name`); both engines must have the matching topics + /// registered or the call fails without partial mutation. Monotonicity is + /// enforced strictly per topic: the source's earliest chunk timestamp must + /// be greater than or equal to the destination's current `time_max()`. + /// + /// Zero-copy on the chunk data: the destination's `std::deque` + /// receives the source's chunks via `std::move`, never via copy. The chunk + /// internals (column buffers, value arrays) are pointer/buffer moves. + /// + /// Schema compatibility is the caller's responsibility — typically the + /// destination is kept in lockstep with the source through parallel + /// `createDataset` / `createTopic` registration at startup. + PJ::Expected flushTo(DataEngine& dst); + // Writer/Reader factories /// Create a writer bound to this engine. [[nodiscard]] DataWriter createWriter(); diff --git a/pj_datastore/include/pj_datastore/object_store.hpp b/pj_datastore/include/pj_datastore/object_store.hpp index 987996a..03c3e09 100644 --- a/pj_datastore/include/pj_datastore/object_store.hpp +++ b/pj_datastore/include/pj_datastore/object_store.hpp @@ -49,11 +49,24 @@ using LazyCallback = std::function; struct ObjectEntry { Timestamp timestamp = 0; + // Holds either a SharedBuffer (eager owned payload, counted against the + // retention budget) or a LazyCallback (lazy resolver). resolveEntry + // discriminates via std::get_if; the variant is exhaustive over the two + // (and only two) payload kinds. std::variant payload; }; struct ResolvedObjectEntry { Timestamp timestamp = 0; + // PayloadView lets the entry hold an opaque anchor (any shared_ptr) plus + // a non-owning Span over the bytes. Consumers read `payload.bytes` for the + // data; they retain `payload.anchor` if they need the bytes to outlive the + // resolve call. Both `pushOwned` and `pushLazy` paths land here: + // - owned: payload.bytes spans the shared_ptr>; anchor IS that shared_ptr. + // - lazy: payload is whatever the closure returns; anchor can be any shared_ptr + // (e.g. a C-ABI payload anchor wrapped as shared_ptr). + // resolveEntry never casts the anchor to a concrete type; the type erasure + // stays opaque all the way to the consumer. sdk::PayloadView payload; }; @@ -119,7 +132,9 @@ class ObjectStore { // Fetcher runs on every read. Producers anchor on whatever owns the bytes // (chunk cache, mmap, fresh allocation); the store never copies — it just - // retains the anchor through PayloadView. + // retains the anchor through PayloadView. When the producer already holds + // the bytes behind a shared_ptr (e.g. a streaming buffer handed off between + // stores), the closure captures it and returns a view backed by it. Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch); // --- Read --- @@ -147,6 +162,29 @@ class ObjectStore { void evictBefore(ObjectTopicId id, Timestamp threshold); void evictAllBefore(Timestamp threshold); + // --- Cross-store flush --- + + // Move every entry from this store into `dst`, leaving this store empty + // (topic registrations are preserved). Topics are matched by descriptor + // (dataset_id + topic_name); both stores must have registered the same + // descriptors or the call fails without partial mutation. For each series, + // monotonicity is enforced strictly: the earliest timestamp being moved + // must be greater than or equal to the destination's current last + // timestamp. On any validation failure the call returns an error and + // neither store is mutated. + // + // Zero-copy on the payload bytes. Each ObjectEntry is moved into the + // destination's series by value; the std::variant inside holds either a + // shared_ptr or a std::function, and moving it is a pointer/buffer move — + // bytes captured by the closure or owned by the shared_ptr are never + // copied or materialized during the flush. Lazy + // entries preserve their semantics in the destination: their closure is + // re-invoked only when the destination is read. + // + // After the move, the destination's retention budget is applied to each + // touched series in normal order. + Expected flushTo(ObjectStore& dst); + // --- Lifecycle --- void removeTopic(ObjectTopicId id); diff --git a/pj_datastore/include/pj_datastore/plugin_data_host.hpp b/pj_datastore/include/pj_datastore/plugin_data_host.hpp index 954c056..1224578 100644 --- a/pj_datastore/include/pj_datastore/plugin_data_host.hpp +++ b/pj_datastore/include/pj_datastore/plugin_data_host.hpp @@ -31,6 +31,13 @@ class DatastoreSourceWriteHost { [[nodiscard]] PJ_source_write_host_t raw() noexcept; void flushPending(); + // Atomically swap the destination DataEngine. Mirrors the parser write + // host's setTarget: the streaming two-engine flow routes source-level + // scalar pushes to a secondary DataEngine during pause and back to the + // primary on resume. Pending rows are flushed to the current engine + // before the switch. Does not take ownership. + void setTarget(DataEngine* target); + private: std::unique_ptr state_; }; @@ -51,6 +58,13 @@ class DatastoreSourceObjectWriteHost { [[nodiscard]] PJ_object_write_host_t raw() noexcept; + // Atomically swap the destination store. Used by the streaming two-store + // flow to route pushes to a secondary ObjectStore during pause and back to + // the primary on resume. Must point to a live store; the host does not + // take ownership and the caller must keep the target alive for as long as + // the host can receive pushes against it. + void setTarget(ObjectStore* target) noexcept; + private: std::unique_ptr state_; }; @@ -68,6 +82,14 @@ class DatastoreParserWriteHost { [[nodiscard]] PJ_parser_write_host_t raw() noexcept; void flushPending(); + // Atomically swap the destination DataEngine. Mirrors the object write + // host's setTarget: the streaming two-store flow routes scalar pushes to a + // secondary DataEngine during pause and back to the primary on resume. + // Pending rows are flushed to the current engine before the switch; the + // bound topic must exist in `target` with the same TopicId (the streaming + // manager registers it lockstep on both engines). Does not take ownership. + void setTarget(DataEngine* target); + private: std::unique_ptr state_; }; @@ -111,6 +133,13 @@ class DatastoreParserObjectWriteHost { [[nodiscard]] PJ_parser_object_write_host_t raw() noexcept; + // Atomically swap the destination store. Used by the streaming two-store + // flow to route pushes to a secondary ObjectStore during pause and back to + // the primary on resume. The bound topic id must exist in `target` (the + // streaming manager ensures this via lockstep registerTopic on both + // stores). The host does not take ownership of the target. + void setTarget(ObjectStore* target) noexcept; + private: std::unique_ptr state_; }; diff --git a/pj_datastore/include/pj_datastore/topic_storage.hpp b/pj_datastore/include/pj_datastore/topic_storage.hpp index f1615b7..cf06419 100644 --- a/pj_datastore/include/pj_datastore/topic_storage.hpp +++ b/pj_datastore/include/pj_datastore/topic_storage.hpp @@ -57,6 +57,8 @@ struct TopicMetadata { uint32_t truncated_sample_count = 0; }; +class DataEngine; + /// Storage container for committed chunks of one topic. class TopicStorage { public: @@ -123,6 +125,12 @@ class TopicStorage { void setArrayExpansionCount(const std::string& field_path, uint32_t count); private: + // DataEngine::flushTo needs to move sealed_chunks_ between TopicStorage + // instances of different engines without copying. Friending it lets the + // transfer happen entirely inside DataEngine without exposing the move + // primitive on the public TopicStorage API. + friend class DataEngine; + TopicId topic_id_; TopicDescriptor descriptor_; std::deque sealed_chunks_; diff --git a/pj_datastore/src/engine.cpp b/pj_datastore/src/engine.cpp index 45b44b1..f77a6b1 100644 --- a/pj_datastore/src/engine.cpp +++ b/pj_datastore/src/engine.cpp @@ -182,6 +182,63 @@ void DataEngine::enforceRetention(Timestamp retention_window_ns) { } } +Expected DataEngine::flushTo(DataEngine& dst) { + if (&dst == this) { + return PJ::unexpected("flushTo: source and destination are the same engine"); + } + + // Phase 1: validate. Walk every src topic with sealed chunks and look up + // the matching dst topic by descriptor (dataset_id + name). Verify + // monotonicity against dst's current time_max. No mutation yet. + struct Step { + TopicStorage* src; + TopicStorage* dst; + }; + std::vector plan; + plan.reserve(impl_->topics.size()); + + for (auto it = impl_->topics.begin(); it != impl_->topics.end(); ++it) { + auto& src_storage = it.value(); + if (src_storage.empty()) { + continue; + } + TopicStorage* dst_storage = nullptr; + for (auto dst_it = dst.impl_->topics.begin(); dst_it != dst.impl_->topics.end(); ++dst_it) { + auto& candidate = dst_it.value(); + if (candidate.descriptor().dataset_id == src_storage.descriptor().dataset_id && + candidate.descriptor().name == src_storage.descriptor().name) { + dst_storage = &candidate; + break; + } + } + if (dst_storage == nullptr) { + return PJ::unexpected( + "flushTo: destination has no topic '" + src_storage.descriptor().name + "' for dataset " + + std::to_string(src_storage.descriptor().dataset_id)); + } + if (!dst_storage->empty() && src_storage.time_min() < dst_storage->time_max()) { + return PJ::unexpected("flushTo: monotonicity violation for topic '" + src_storage.descriptor().name + "'"); + } + plan.push_back({&src_storage, dst_storage}); + } + + // Phase 2: execute. friend access lets us move sealed_chunks_ directly + // between TopicStorage instances of different engines — the deque move + // transfers chunk ownership without copying any column data or value + // buffers. Each chunk's TopicChunkStats (t_min/t_max/row_count) rides + // along inside the chunk by value, so dst's time_min/time_max queries + // reflect the new state immediately after the move. + for (auto& step : plan) { + auto drained = std::move(step.src->sealed_chunks_); + step.src->sealed_chunks_.clear(); // post-move state: deque is valid but empty. + for (auto& chunk : drained) { + step.dst->sealed_chunks_.push_back(std::move(chunk)); + } + } + + return {}; +} + // --------------------------------------------------------------------------- // Listing helpers // --------------------------------------------------------------------------- diff --git a/pj_datastore/src/object_store.cpp b/pj_datastore/src/object_store.cpp index 2cae153..0488878 100644 --- a/pj_datastore/src/object_store.cpp +++ b/pj_datastore/src/object_store.cpp @@ -79,7 +79,8 @@ Status ObjectStore::pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector return unexpected("timestamp not monotonically non-decreasing"); } - size_t payload_size = payload.size(); + const size_t payload_size = payload.size(); + auto shared_data = std::make_shared>(std::move(payload)); ObjectEntry entry; @@ -264,6 +265,74 @@ void ObjectStore::evictAllBefore(Timestamp threshold) { } } +// --- Cross-store flush --- + +Expected ObjectStore::flushTo(ObjectStore& dst) { + if (&dst == this) { + return unexpected("flushTo: source and destination are the same store"); + } + + // Deterministic lock order by address to avoid deadlock with concurrent flushTo calls. + ObjectStore* first = this < &dst ? this : &dst; + ObjectStore* second = first == this ? &dst : this; + std::unique_lock first_lock(first->store_mutex_); + std::unique_lock second_lock(second->store_mutex_); + + // Phase 1: validate every source series can be matched to a destination + // topic by descriptor and that the move respects monotonicity. No mutation. + struct Step { + ObjectSeries* src; + ObjectSeries* dst; + }; + std::vector plan; + plan.reserve(topics_.size()); + + for (auto& [src_id, src_series] : topics_) { + if (src_series->entry_timestamps.empty()) { + continue; + } + ObjectSeries* dst_series = nullptr; + for (auto& [dst_id, dst_series_ptr] : dst.topics_) { + if (dst_series_ptr->descriptor.dataset_id == src_series->descriptor.dataset_id && + dst_series_ptr->descriptor.topic_name == src_series->descriptor.topic_name) { + dst_series = dst_series_ptr.get(); + break; + } + } + if (dst_series == nullptr) { + return unexpected( + "flushTo: destination has no topic '" + src_series->descriptor.topic_name + "' for dataset " + + std::to_string(src_series->descriptor.dataset_id)); + } + if (!dst_series->entry_timestamps.empty() && + src_series->entry_timestamps.front() < dst_series->entry_timestamps.back()) { + return unexpected("flushTo: monotonicity violation for topic '" + src_series->descriptor.topic_name + "'"); + } + plan.push_back({src_series.get(), dst_series}); + } + + // Phase 2: execute the moves. Holding both store_mutex_ unique means no + // other reader or writer can observe an intermediate state; per-series + // mutexes are not needed because no concurrent access can occur. + for (auto& step : plan) { + for (auto& entry : step.src->entries) { + step.dst->entries.push_back(std::move(entry)); + } + step.dst->entry_timestamps.insert( + step.dst->entry_timestamps.end(), step.src->entry_timestamps.begin(), step.src->entry_timestamps.end()); + step.dst->memory_bytes += step.src->memory_bytes; + + step.src->entries.clear(); + step.src->entry_timestamps.clear(); + step.src->memory_bytes = 0; + + const Timestamp newest = step.dst->entry_timestamps.empty() ? 0 : step.dst->entry_timestamps.back(); + applyRetention(*step.dst, newest); + } + + return {}; +} + // --- Lifecycle --- void ObjectStore::removeTopic(ObjectTopicId id) { @@ -305,6 +374,9 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) { resolved.timestamp = entry.timestamp; if (const auto* owned = std::get_if(&entry.payload)) { + // Owned branch: build a PayloadView whose Span covers the vector and + // whose anchor IS the same shared_ptr — refcount bump, no copy. A + // default-constructed entry holds a null SharedBuffer, so guard it. if (*owned) { resolved.payload = sdk::PayloadView{ Span{(*owned)->data(), (*owned)->size()}, @@ -312,6 +384,12 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) { }; } } else if (const auto* lazy = std::get_if(&entry.payload)) { + // Lazy branch: forward whatever the closure returns. The anchor stays + // opaque (shared_ptr); consumers retain it without needing + // to know the concrete type. No static_pointer_cast here — that was the + // hidden contract that locked the slot to shared_ptr>; + // dropping it lets producers anchor on arrow::Buffer, mmap pools, or + // C-ABI payload anchors equally. resolved.payload = (*lazy)(); } diff --git a/pj_datastore/src/plugin_data_host.cpp b/pj_datastore/src/plugin_data_host.cpp index 1a1611c..8c19fca 100644 --- a/pj_datastore/src/plugin_data_host.cpp +++ b/pj_datastore/src/plugin_data_host.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -909,14 +910,22 @@ struct ToolboxCore { struct DatastoreSourceWriteHostState { DatastoreSourceWriteHostState(DataEngine& engine, DataSourceHandle source_handle) - : core(engine), source(source_handle) {} - WriteCore core; + : core(std::make_unique(engine)), source(source_handle) {} + // Held by pointer so setTarget() can rebind to a different engine (streaming + // two-engine pause/resume) by reconstructing the WriteCore — WriteCore holds + // DataEngine by reference and is not reseatable. + std::unique_ptr core; DataSourceHandle source; }; struct DatastoreParserWriteHostState { - DatastoreParserWriteHostState(DataEngine& engine, TopicHandle topic_handle) : core(engine), topic(topic_handle) {} - WriteCore core; + DatastoreParserWriteHostState(DataEngine& engine, TopicHandle topic_handle) + : core(std::make_unique(engine)), topic(topic_handle) {} + // Held by pointer so setTarget() can rebind to a different engine (streaming + // two-store pause/resume) by reconstructing the WriteCore — its writer and + // caches are engine-specific. WriteCore itself is not reassignable (holds a + // DataEngine reference). + std::unique_ptr core; TopicHandle topic; }; @@ -935,8 +944,13 @@ struct DatastoreToolboxHostState { }; struct DatastoreSourceObjectWriteHostState { - DatastoreSourceObjectWriteHostState(ObjectStore& s, DatasetId dataset) : store(s), dataset_id(dataset) {} - ObjectStore& store; + DatastoreSourceObjectWriteHostState(ObjectStore& s, DatasetId dataset) : target(&s), dataset_id(dataset) {} + // Atomic pointer rather than reference: the streaming two-store flow + // retargets the host between the primary and secondary ObjectStore on each + // pause/resume transition. Plain reference would not be reassignable. The + // atomic guarantees the worker thread sees a fully-published swap from the + // manager thread without locking on the hot push path. + std::atomic target; DatasetId dataset_id; std::string last_error; @@ -956,8 +970,9 @@ struct DatastoreToolboxObjectReadHostState { }; struct DatastoreParserObjectWriteHostState { - DatastoreParserObjectWriteHostState(ObjectStore& s, ObjectTopicId topic) : store(s), bound_topic(topic) {} - ObjectStore& store; + DatastoreParserObjectWriteHostState(ObjectStore& s, ObjectTopicId topic) : target(&s), bound_topic(topic) {} + // Atomic pointer rather than reference: see DatastoreSourceObjectWriteHostState. + std::atomic target; ObjectTopicId bound_topic; std::string last_error; @@ -985,8 +1000,8 @@ bool guardHostCallback(PJ_error_t* out_error, Fn&& fn) noexcept { bool sourceEnsureTopic(void* ctx, PJ_string_view_t topic_name, TopicHandle* out_topic, PJ_error_t* out_error) noexcept { return guardHostCallback(out_error, [&] { auto* impl = static_cast(ctx); - if (!impl->core.ensureTopic(impl->source, toStringView(topic_name), out_topic)) { - propagateError(out_error, impl->core.lastError()); + if (!impl->core->ensureTopic(impl->source, toStringView(topic_name), out_topic)) { + propagateError(out_error, impl->core->lastError()); return false; } return true; @@ -998,8 +1013,8 @@ bool sourceEnsureField( PJ_error_t* out_error) noexcept { return guardHostCallback(out_error, [&] { auto* impl = static_cast(ctx); - if (!impl->core.ensureField(topic, toStringView(field_name), type, out_field)) { - propagateError(out_error, impl->core.lastError()); + if (!impl->core->ensureField(topic, toStringView(field_name), type, out_field)) { + propagateError(out_error, impl->core->lastError()); return false; } return true; @@ -1011,8 +1026,8 @@ bool sourceAppendRecord( PJ_error_t* out_error) noexcept { return guardHostCallback(out_error, [&] { auto* impl = static_cast(ctx); - if (!impl->core.appendRecord(topic, timestamp, fields, field_count)) { - propagateError(out_error, impl->core.lastError()); + if (!impl->core->appendRecord(topic, timestamp, fields, field_count)) { + propagateError(out_error, impl->core->lastError()); return false; } return true; @@ -1024,8 +1039,8 @@ bool sourceAppendBoundRecord( PJ_error_t* out_error) noexcept { return guardHostCallback(out_error, [&] { auto* impl = static_cast(ctx); - if (!impl->core.appendBoundRecord(topic, timestamp, fields, field_count)) { - propagateError(out_error, impl->core.lastError()); + if (!impl->core->appendBoundRecord(topic, timestamp, fields, field_count)) { + propagateError(out_error, impl->core->lastError()); return false; } return true; @@ -1037,9 +1052,9 @@ bool sourceAppendArrowStream( PJ_error_t* out_error) noexcept { return guardHostCallback(out_error, [&] { auto* impl = static_cast(ctx); - if (!impl->core.appendArrowStream(topic, stream, timestamp_column)) { + if (!impl->core->appendArrowStream(topic, stream, timestamp_column)) { // Failure: plugin retains ownership of the stream; we do NOT release. - propagateError(out_error, impl->core.lastError()); + propagateError(out_error, impl->core->lastError()); return false; } // Success: host now owns the stream — release it. @@ -1055,8 +1070,8 @@ bool parserEnsureField( PJ_error_t* out_error) noexcept { return guardHostCallback(out_error, [&] { auto* impl = static_cast(ctx); - if (!impl->core.ensureField(impl->topic, toStringView(field_name), type, out_field)) { - propagateError(out_error, impl->core.lastError()); + if (!impl->core->ensureField(impl->topic, toStringView(field_name), type, out_field)) { + propagateError(out_error, impl->core->lastError()); return false; } return true; @@ -1068,8 +1083,8 @@ bool parserAppendRecord( PJ_error_t* out_error) noexcept { return guardHostCallback(out_error, [&] { auto* impl = static_cast(ctx); - if (!impl->core.appendRecord(impl->topic, timestamp, fields, field_count)) { - propagateError(out_error, impl->core.lastError()); + if (!impl->core->appendRecord(impl->topic, timestamp, fields, field_count)) { + propagateError(out_error, impl->core->lastError()); return false; } return true; @@ -1081,8 +1096,8 @@ bool parserAppendBoundRecord( PJ_error_t* out_error) noexcept { return guardHostCallback(out_error, [&] { auto* impl = static_cast(ctx); - if (!impl->core.appendBoundRecord(impl->topic, timestamp, fields, field_count)) { - propagateError(out_error, impl->core.lastError()); + if (!impl->core->appendBoundRecord(impl->topic, timestamp, fields, field_count)) { + propagateError(out_error, impl->core->lastError()); return false; } return true; @@ -1093,8 +1108,8 @@ bool parserAppendArrowStream( void* ctx, struct ArrowArrayStream* stream, PJ_string_view_t timestamp_column, PJ_error_t* out_error) noexcept { return guardHostCallback(out_error, [&] { auto* impl = static_cast(ctx); - if (!impl->core.appendArrowStream(impl->topic, stream, timestamp_column)) { - propagateError(out_error, impl->core.lastError()); + if (!impl->core->appendArrowStream(impl->topic, stream, timestamp_column)) { + propagateError(out_error, impl->core->lastError()); return false; } if (stream != nullptr && stream->release != nullptr) { @@ -1323,12 +1338,13 @@ bool sourceObjectRegisterTopic( propagateError(out_error, "out_handle must not be null"); return false; } + auto* target = impl->target.load(std::memory_order_acquire); try { ObjectTopicDescriptor desc{}; desc.dataset_id = impl->dataset_id; desc.topic_name = std::string(toStringView(topic_name)); desc.metadata_json = std::string(toStringView(metadata_json)); - auto result = impl->store.registerTopic(desc); + auto result = target->registerTopic(desc); if (!result) { impl->setError(result.error()); propagateError(out_error, impl->last_error.c_str()); @@ -1352,12 +1368,13 @@ bool sourceObjectPushOwned( void* ctx, PJ_object_topic_handle_t topic, int64_t timestamp_ns, const uint8_t* data, std::size_t size, PJ_error_t* out_error) noexcept { auto* impl = static_cast(ctx); + auto* target = impl->target.load(std::memory_order_acquire); try { std::vector bytes; if (data != nullptr && size > 0) { bytes.assign(data, data + size); } - auto result = impl->store.pushOwned(ObjectTopicId{topic.id}, timestamp_ns, std::move(bytes)); + auto result = target->pushOwned(ObjectTopicId{topic.id}, timestamp_ns, std::move(bytes)); if (!result) { impl->setError(result.error()); propagateError(out_error, impl->last_error.c_str()); @@ -1387,6 +1404,7 @@ bool sourceObjectPushLazy( propagateError(out_error, "fetch_fn must not be null"); return false; } + auto* target = impl->target.load(std::memory_order_acquire); try { // shared_ptr keeps the ctx holder alive as long as ObjectStore keeps // the lambda; destructor runs exactly once when ObjectStore drops the @@ -1394,8 +1412,10 @@ bool sourceObjectPushLazy( auto holder = std::make_shared(fetch_fn, fetch_ctx, fetch_ctx_destroy); // Plugins return raw bytes via the C ABI; wrap them as a PayloadView whose // anchor is a shared_ptr>, per the pushLazy contract. + // Target pointer comes from the atomic swap layer (so writes follow the + // current target store, not a captured-at-construction one). auto closure = [holder]() -> sdk::PayloadView { return sdk::makePayloadView(holder->invoke()); }; - auto result = impl->store.pushLazy(ObjectTopicId{topic.id}, timestamp_ns, std::move(closure)); + auto result = target->pushLazy(ObjectTopicId{topic.id}, timestamp_ns, std::move(closure)); if (!result) { impl->setError(result.error()); propagateError(out_error, impl->last_error.c_str()); @@ -1421,11 +1441,12 @@ bool sourceObjectPushLazy( void sourceObjectSetRetentionBudget( void* ctx, PJ_object_topic_handle_t topic, int64_t time_window_ns, std::size_t max_memory_bytes) noexcept { auto* impl = static_cast(ctx); + auto* target = impl->target.load(std::memory_order_acquire); try { RetentionBudget budget{}; budget.time_window_ns = time_window_ns; budget.max_memory_bytes = max_memory_bytes; - impl->store.setRetentionBudget(ObjectTopicId{topic.id}, budget); + target->setRetentionBudget(ObjectTopicId{topic.id}, budget); } catch (...) { // Infallible by contract — swallow any exception from the store. } @@ -1435,12 +1456,12 @@ void sourceObjectSetRetentionBudget( // Toolbox object read host trampolines // --------------------------------------------------------------------------- -/// Heap holder for the PayloadView backing PJ_object_bytes_handle_t. -/// Allocated by read_latest_at; freed by release_bytes. -struct ObjectBytesBox { - sdk::PayloadView payload; -}; - +// The C-ABI exposes the bytes via a void-handle (PJ_object_bytes_handle_t) +// that the plugin must later release. We allocate a sdk::PayloadView on the +// heap and reinterpret_cast its pointer to the handle: the PayloadView's +// anchor is what keeps the underlying buffer alive until the plugin calls +// release_bytes. No wrapper struct needed — PayloadView already carries +// bytes (Span) + anchor (BufferAnchor) in one value. PJ_object_topic_handle_t toolboxObjectLookupTopic(void* ctx, PJ_string_view_t topic_name) noexcept { auto* impl = static_cast(ctx); try { @@ -1513,8 +1534,8 @@ bool toolboxObjectReadLatestAt( propagateError(out_error, impl->last_error.c_str()); return false; } - auto* box = new ObjectBytesBox{std::move(entry->payload)}; - *out_handle = reinterpret_cast(box); + auto* payload_handle = new sdk::PayloadView(std::move(entry->payload)); + *out_handle = reinterpret_cast(payload_handle); if (out_timestamp != nullptr) { *out_timestamp = entry->timestamp; } @@ -1541,15 +1562,15 @@ void toolboxObjectGetBytes(PJ_object_bytes_handle_t handle, const uint8_t** out_ if (handle == nullptr) { return; } - auto* box = reinterpret_cast(handle); - if (box->payload.bytes.empty()) { + const auto* payload = reinterpret_cast(handle); + if (payload->anchor == nullptr) { return; } if (out_data != nullptr) { - *out_data = box->payload.bytes.data(); + *out_data = payload->bytes.data(); } if (out_size != nullptr) { - *out_size = box->payload.bytes.size(); + *out_size = payload->bytes.size(); } } @@ -1557,7 +1578,7 @@ void toolboxObjectReleaseBytes(PJ_object_bytes_handle_t handle) noexcept { if (handle == nullptr) { return; } - delete reinterpret_cast(handle); + delete reinterpret_cast(handle); } std::size_t toolboxObjectEntryCount(void* ctx, PJ_object_topic_handle_t topic) noexcept { @@ -1596,12 +1617,13 @@ bool toolboxObjectTimeRange( bool parserObjectPushOwned( void* ctx, int64_t timestamp_ns, const uint8_t* data, std::size_t size, PJ_error_t* out_error) noexcept { auto* impl = static_cast(ctx); + auto* target = impl->target.load(std::memory_order_acquire); try { std::vector bytes; if (data != nullptr && size > 0) { bytes.assign(data, data + size); } - auto result = impl->store.pushOwned(impl->bound_topic, timestamp_ns, std::move(bytes)); + auto result = target->pushOwned(impl->bound_topic, timestamp_ns, std::move(bytes)); if (!result) { impl->setError(result.error()); propagateError(out_error, impl->last_error.c_str()); @@ -1631,10 +1653,11 @@ bool parserObjectPushLazy( propagateError(out_error, "fetch_fn must not be null"); return false; } + auto* target = impl->target.load(std::memory_order_acquire); try { auto holder = std::make_shared(fetch_fn, fetch_ctx, fetch_ctx_destroy); auto closure = [holder]() -> sdk::PayloadView { return sdk::makePayloadView(holder->invoke()); }; - auto result = impl->store.pushLazy(impl->bound_topic, timestamp_ns, std::move(closure)); + auto result = target->pushLazy(impl->bound_topic, timestamp_ns, std::move(closure)); if (!result) { impl->setError(result.error()); propagateError(out_error, impl->last_error.c_str()); @@ -1712,7 +1735,15 @@ PJ_source_write_host_t DatastoreSourceWriteHost::raw() noexcept { } void DatastoreSourceWriteHost::flushPending() { - state_->core.flushPending(); + state_->core->flushPending(); +} + +void DatastoreSourceWriteHost::setTarget(DataEngine* target) { + // Seal + commit any open chunk to the current engine so no rows are lost, + // then rebind to the new engine with a fresh WriteCore (its writer and + // per-engine caches must not carry over). Mirrors DatastoreParserWriteHost. + state_->core->flushPending(); + state_->core = std::make_unique(*target); } DatastoreParserWriteHost::DatastoreParserWriteHost(DataEngine& engine, TopicHandle topic) @@ -1726,7 +1757,16 @@ PJ_parser_write_host_t DatastoreParserWriteHost::raw() noexcept { } void DatastoreParserWriteHost::flushPending() { - state_->core.flushPending(); + state_->core->flushPending(); +} + +void DatastoreParserWriteHost::setTarget(DataEngine* target) { + // Seal + commit any open chunk to the current engine so no rows are lost, + // then rebind to the new engine with a fresh WriteCore (its writer and + // per-engine caches must not carry over). The bound topic is expected to + // already exist in `target` with the same TopicId. + state_->core->flushPending(); + state_->core = std::make_unique(*target); } DatastoreToolboxHost::DatastoreToolboxHost(DataEngine& engine, ObjectStore& object_store) @@ -1754,6 +1794,10 @@ PJ_object_write_host_t DatastoreSourceObjectWriteHost::raw() noexcept { return PJ_object_write_host_t{.ctx = state_.get(), .vtable = &kSourceObjectWriteVTable}; } +void DatastoreSourceObjectWriteHost::setTarget(ObjectStore* target) noexcept { + state_->target.store(target, std::memory_order_release); +} + DatastoreToolboxObjectReadHost::DatastoreToolboxObjectReadHost(ObjectStore& store) : state_(std::make_unique(store)) {} DatastoreToolboxObjectReadHost::~DatastoreToolboxObjectReadHost() = default; @@ -1776,4 +1820,8 @@ PJ_parser_object_write_host_t DatastoreParserObjectWriteHost::raw() noexcept { return PJ_parser_object_write_host_t{.ctx = state_.get(), .vtable = &kParserObjectWriteVTable}; } +void DatastoreParserObjectWriteHost::setTarget(ObjectStore* target) noexcept { + state_->target.store(target, std::memory_order_release); +} + } // namespace PJ diff --git a/pj_datastore/src/topic_storage.cpp b/pj_datastore/src/topic_storage.cpp index 5bf7373..8d91c7b 100644 --- a/pj_datastore/src/topic_storage.cpp +++ b/pj_datastore/src/topic_storage.cpp @@ -29,8 +29,15 @@ PJ::Status TopicStorage::appendSealedChunk(TopicChunk chunk) { } void TopicStorage::evictBefore(Timestamp t_keep_min) { - while (!sealed_chunks_.empty() && sealed_chunks_.front().stats.t_max < t_keep_min) { - sealed_chunks_.pop_front(); + // Chunks are commit-ordered: evict the contiguous prefix whose chunks are + // entirely older than t_keep_min. + size_t end_to_remove = 0; + while (end_to_remove < sealed_chunks_.size() && sealed_chunks_[end_to_remove].stats.t_max < t_keep_min) { + ++end_to_remove; + } + + if (end_to_remove > 0) { + sealed_chunks_.erase(sealed_chunks_.begin(), sealed_chunks_.begin() + static_cast(end_to_remove)); } } diff --git a/pj_datastore/tests/engine_integration_test.cpp b/pj_datastore/tests/engine_integration_test.cpp index 5499d01..3d77173 100644 --- a/pj_datastore/tests/engine_integration_test.cpp +++ b/pj_datastore/tests/engine_integration_test.cpp @@ -952,5 +952,172 @@ TEST(EngineIntegrationTest, BulkAppendEmpty) { EXPECT_TRUE(flushed.empty()); } +// ========================================================================= +// Cross-engine flush (flushTo) — zero-copy chunk transfer +// ========================================================================= + +namespace { + +// Builds two engines with the same topic registered (lockstep pattern used by +// pj4's StreamingSourceManager dual-buffer) and writes `row_count` rows to src. +struct FlushFixture { + DataEngine src; + DataEngine dst; + DatasetId src_dataset = 0; + DatasetId dst_dataset = 0; + ScalarSeriesHandle src_handle; + ScalarSeriesHandle dst_handle; +}; + +FlushFixture buildFlushFixture(const std::string& topic = "scalar/topic") { + FlushFixture f; + f.src_dataset = *f.src.createDataset(DatasetDescriptor{.source_name = "src", .time_domain_id = 0}); + f.dst_dataset = *f.dst.createDataset(DatasetDescriptor{.source_name = "dst", .time_domain_id = 0}); + + DataWriter sw = f.src.createWriter(); + f.src_handle = *sw.registerScalarSeries(f.src_dataset, topic, NumericType::kFloat64); + + DataWriter dw = f.dst.createWriter(); + f.dst_handle = *dw.registerScalarSeries(f.dst_dataset, topic, NumericType::kFloat64); + return f; +} + +void writeScalars(DataEngine& engine, ScalarSeriesHandle handle, Timestamp start, std::size_t count) { + DataWriter w = engine.createWriter(); + for (std::size_t i = 0; i < count; ++i) { + w.appendScalar(handle, start + static_cast(i) * 1000, static_cast(i)); + } + auto flushed = w.flushAll(); + engine.commitChunks(std::move(flushed)); +} + +} // namespace + +TEST(DataEngineFlushTest, MovesAllChunksFromSrcToDst) { + auto f = buildFlushFixture(); + writeScalars(f.src, f.src_handle, /*start=*/0, /*count=*/2500); // ~3 chunks at default 1024 rows. + + const auto* src_storage = f.src.getTopicStorage(f.src_handle.topic_id); + const auto* dst_storage = f.dst.getTopicStorage(f.dst_handle.topic_id); + ASSERT_NE(src_storage, nullptr); + ASSERT_NE(dst_storage, nullptr); + + const std::size_t pre_src_chunks = src_storage->sealedChunks().size(); + ASSERT_GE(pre_src_chunks, 2U); + ASSERT_EQ(dst_storage->sealedChunks().size(), 0U); + + auto result = f.src.flushTo(f.dst); + ASSERT_TRUE(result.has_value()) << result.error(); + + EXPECT_EQ(src_storage->sealedChunks().size(), 0U); + EXPECT_EQ(dst_storage->sealedChunks().size(), pre_src_chunks); + + // The destination can read the data via the standard reader interface. + DataReader reader = f.dst.createReader(); + std::size_t count = 0; + auto cursor = reader.rangeQuery( + QueryRange{ + .topic_id = f.dst_handle.topic_id, + .t_min = 0, + .t_max = static_cast(2499) * 1000, + }); + ASSERT_TRUE(cursor.has_value()) << cursor.error(); + cursor->forEach([&count](const SampleRow& row) { + (void)row; + ++count; + }); + EXPECT_EQ(count, 2500U); +} + +TEST(DataEngineFlushTest, AppendsToExistingDstChunks) { + auto f = buildFlushFixture(); + // dst already has data covering [0, 1023*1000]. + writeScalars(f.dst, f.dst_handle, /*start=*/0, /*count=*/1024); + // src has the next window [1024*1000, 2047*1000]. + writeScalars(f.src, f.src_handle, /*start=*/static_cast(1024) * 1000, /*count=*/1024); + + ASSERT_TRUE(f.src.flushTo(f.dst).has_value()); + + DataReader reader = f.dst.createReader(); + std::size_t count = 0; + auto cursor = reader.rangeQuery( + QueryRange{ + .topic_id = f.dst_handle.topic_id, + .t_min = 0, + .t_max = static_cast(2047) * 1000, + }); + ASSERT_TRUE(cursor.has_value()); + cursor->forEach([&count](const SampleRow& row) { + (void)row; + ++count; + }); + EXPECT_EQ(count, 2048U); +} + +TEST(DataEngineFlushTest, RejectsMonotonicityViolation) { + auto f = buildFlushFixture(); + writeScalars(f.dst, f.dst_handle, /*start=*/static_cast(1000) * 1000, /*count=*/1024); + writeScalars(f.src, f.src_handle, /*start=*/0, /*count=*/1024); // earlier than dst. + + const auto* src_storage = f.src.getTopicStorage(f.src_handle.topic_id); + const auto* dst_storage = f.dst.getTopicStorage(f.dst_handle.topic_id); + const std::size_t pre_src = src_storage->sealedChunks().size(); + const std::size_t pre_dst = dst_storage->sealedChunks().size(); + + auto result = f.src.flushTo(f.dst); + EXPECT_FALSE(result.has_value()); + + // Neither engine mutated. + EXPECT_EQ(src_storage->sealedChunks().size(), pre_src); + EXPECT_EQ(dst_storage->sealedChunks().size(), pre_dst); +} + +TEST(DataEngineFlushTest, RejectsUnknownTopicInDst) { + DataEngine src, dst; + DatasetId src_dataset = *src.createDataset(DatasetDescriptor{.source_name = "src", .time_domain_id = 0}); + DatasetId dst_dataset = *dst.createDataset(DatasetDescriptor{.source_name = "dst", .time_domain_id = 0}); + + DataWriter sw = src.createWriter(); + auto src_handle = *sw.registerScalarSeries(src_dataset, "only/in/src", NumericType::kFloat64); + + DataWriter dw = dst.createWriter(); + (void)dw.registerScalarSeries(dst_dataset, "only/in/dst", NumericType::kFloat64); + + writeScalars(src, src_handle, /*start=*/0, /*count=*/100); + + auto result = src.flushTo(dst); + EXPECT_FALSE(result.has_value()); + const auto* src_storage = src.getTopicStorage(src_handle.topic_id); + EXPECT_GE(src_storage->sealedChunks().size(), 1U); // src not mutated. +} + +TEST(DataEngineFlushTest, RejectsSameEngine) { + DataEngine engine; + auto dataset_id = *engine.createDataset(DatasetDescriptor{.source_name = "self", .time_domain_id = 0}); + DataWriter w = engine.createWriter(); + auto handle = *w.registerScalarSeries(dataset_id, "topic", NumericType::kFloat64); + writeScalars(engine, handle, /*start=*/0, /*count=*/100); + + auto result = engine.flushTo(engine); + EXPECT_FALSE(result.has_value()); + const auto* storage = engine.getTopicStorage(handle.topic_id); + EXPECT_GE(storage->sealedChunks().size(), 1U); // not mutated. +} + +TEST(DataEngineFlushTest, PreservesTopicRegistrationOnSrc) { + auto f = buildFlushFixture(); + writeScalars(f.src, f.src_handle, /*start=*/0, /*count=*/1024); + + ASSERT_TRUE(f.src.flushTo(f.dst).has_value()); + + // src topic is still registered — a fresh writer can push more data. + const auto* src_storage = f.src.getTopicStorage(f.src_handle.topic_id); + ASSERT_NE(src_storage, nullptr); + EXPECT_TRUE(src_storage->empty()); + + writeScalars(f.src, f.src_handle, /*start=*/static_cast(2000) * 1000, /*count=*/100); + EXPECT_FALSE(src_storage->empty()); +} + } // namespace } // namespace PJ diff --git a/pj_datastore/tests/object_store_test.cpp b/pj_datastore/tests/object_store_test.cpp index 6c46500..bd82507 100644 --- a/pj_datastore/tests/object_store_test.cpp +++ b/pj_datastore/tests/object_store_test.cpp @@ -510,5 +510,228 @@ TEST(ObjectStoreTest, ConcurrentReadWriteSmoke) { EXPECT_EQ(store.entryCount(id), static_cast(kPushCount)); } +// ========================================================================= +// Cross-store flush (flushTo) +// ========================================================================= + +namespace { + +ObjectTopicId registerSameDescriptor( + ObjectStore& store, DatasetId dataset_id = 1, const std::string& name = "test/topic") { + auto id_or = store.registerTopic({.dataset_id = dataset_id, .topic_name = name, .metadata_json = "{}"}); + EXPECT_TRUE(id_or.has_value()); + return *id_or; +} + +} // namespace + +TEST(ObjectStoreFlushTest, BasicMoveOwnedEntries) { + ObjectStore src, dst; + auto src_id = registerSameDescriptor(src); + auto dst_id = registerSameDescriptor(dst); + + src.pushOwned(src_id, 100, makePayload(8, 0xAA)); + src.pushOwned(src_id, 200, makePayload(8, 0xBB)); + src.pushOwned(src_id, 300, makePayload(8, 0xCC)); + + ASSERT_EQ(src.entryCount(src_id), 3u); + ASSERT_EQ(dst.entryCount(dst_id), 0u); + + auto result = src.flushTo(dst); + ASSERT_TRUE(result.has_value()) << result.error(); + + EXPECT_EQ(src.entryCount(src_id), 0u); + EXPECT_EQ(dst.entryCount(dst_id), 3u); + + auto e0 = dst.at(dst_id, 0); + ASSERT_TRUE(e0.has_value()); + EXPECT_EQ(e0->timestamp, 100); + EXPECT_EQ(e0->payload.bytes.size(), 8u); + EXPECT_EQ(e0->payload.bytes[0], 0xAA); + + auto e2 = dst.at(dst_id, 2); + ASSERT_TRUE(e2.has_value()); + EXPECT_EQ(e2->timestamp, 300); + EXPECT_EQ(e2->payload.bytes[0], 0xCC); +} + +TEST(ObjectStoreFlushTest, PreservesTopicRegistrationOnSrc) { + ObjectStore src, dst; + auto src_id = registerSameDescriptor(src); + registerSameDescriptor(dst); + + src.pushOwned(src_id, 100, makePayload(4)); + + ASSERT_TRUE(src.flushTo(dst).has_value()); + + // src is empty but the topic is still registered — the same id can accept new pushes. + EXPECT_EQ(src.entryCount(src_id), 0u); + auto post_push = src.pushOwned(src_id, 400, makePayload(4)); + EXPECT_TRUE(post_push.has_value()); + EXPECT_EQ(src.entryCount(src_id), 1u); +} + +TEST(ObjectStoreFlushTest, AppendsToExistingDstEntries) { + ObjectStore src, dst; + auto src_id = registerSameDescriptor(src); + auto dst_id = registerSameDescriptor(dst); + + dst.pushOwned(dst_id, 50, makePayload(4, 0x11)); + dst.pushOwned(dst_id, 100, makePayload(4, 0x22)); + src.pushOwned(src_id, 200, makePayload(4, 0x33)); + src.pushOwned(src_id, 300, makePayload(4, 0x44)); + + ASSERT_TRUE(src.flushTo(dst).has_value()); + + EXPECT_EQ(dst.entryCount(dst_id), 4u); + EXPECT_EQ(dst.at(dst_id, 0)->timestamp, 50); + EXPECT_EQ(dst.at(dst_id, 1)->timestamp, 100); + EXPECT_EQ(dst.at(dst_id, 2)->timestamp, 200); + EXPECT_EQ(dst.at(dst_id, 3)->timestamp, 300); +} + +TEST(ObjectStoreFlushTest, RejectsMonotonicityViolation) { + ObjectStore src, dst; + auto src_id = registerSameDescriptor(src); + auto dst_id = registerSameDescriptor(dst); + + dst.pushOwned(dst_id, 200, makePayload(4)); + src.pushOwned(src_id, 100, makePayload(4)); // earlier than dst's last + + auto result = src.flushTo(dst); + EXPECT_FALSE(result.has_value()); + + // Neither store is mutated on failure. + EXPECT_EQ(src.entryCount(src_id), 1u); + EXPECT_EQ(dst.entryCount(dst_id), 1u); +} + +TEST(ObjectStoreFlushTest, RejectsUnknownTopicInDst) { + ObjectStore src, dst; + auto src_id = registerSameDescriptor(src, 1, "missing/topic"); + // dst has a different topic. + registerSameDescriptor(dst, 1, "other/topic"); + + src.pushOwned(src_id, 100, makePayload(4)); + + auto result = src.flushTo(dst); + EXPECT_FALSE(result.has_value()); + EXPECT_EQ(src.entryCount(src_id), 1u); +} + +TEST(ObjectStoreFlushTest, EmptySourceSeriesSkipped) { + ObjectStore src, dst; + auto src_id_with_data = registerSameDescriptor(src, 1, "with/data"); + registerSameDescriptor(src, 1, "empty/series"); // registered but no pushes + + auto dst_id_with_data = registerSameDescriptor(dst, 1, "with/data"); + // dst does NOT register "empty/series" — flush should still succeed since src + // has no entries for it. + + src.pushOwned(src_id_with_data, 100, makePayload(4)); + + auto result = src.flushTo(dst); + EXPECT_TRUE(result.has_value()) << (result.has_value() ? "" : result.error()); + EXPECT_EQ(dst.entryCount(dst_id_with_data), 1u); +} + +TEST(ObjectStoreFlushTest, RejectsSameStore) { + ObjectStore store; + auto id = registerSameDescriptor(store); + store.pushOwned(id, 100, makePayload(4)); + + auto result = store.flushTo(store); + EXPECT_FALSE(result.has_value()); + EXPECT_EQ(store.entryCount(id), 1u); +} + +TEST(ObjectStoreFlushTest, MultipleTopicsFlushed) { + ObjectStore src, dst; + auto src_a = registerSameDescriptor(src, 1, "topic/a"); + auto src_b = registerSameDescriptor(src, 1, "topic/b"); + auto dst_a = registerSameDescriptor(dst, 1, "topic/a"); + auto dst_b = registerSameDescriptor(dst, 1, "topic/b"); + + src.pushOwned(src_a, 100, makePayload(4, 0xAA)); + src.pushOwned(src_b, 100, makePayload(4, 0xBB)); + src.pushOwned(src_a, 200, makePayload(4, 0xCC)); + src.pushOwned(src_b, 200, makePayload(4, 0xDD)); + + ASSERT_TRUE(src.flushTo(dst).has_value()); + + EXPECT_EQ(dst.entryCount(dst_a), 2u); + EXPECT_EQ(dst.entryCount(dst_b), 2u); + EXPECT_EQ(dst.at(dst_a, 0)->payload.bytes[0], 0xAA); + EXPECT_EQ(dst.at(dst_b, 1)->payload.bytes[0], 0xDD); +} + +TEST(ObjectStoreFlushTest, LazyEntriesPreserveSemantics) { + ObjectStore src, dst; + auto src_id = registerSameDescriptor(src); + auto dst_id = registerSameDescriptor(dst); + + int src_invocations = 0; + src.pushLazy(src_id, 100, [&src_invocations]() -> sdk::PayloadView { + ++src_invocations; + return sdk::makePayloadView({0xDE, 0xAD, 0xBE, 0xEF}); + }); + + // Flush itself must NOT invoke the closure — it just moves the std::function. + EXPECT_EQ(src_invocations, 0); + ASSERT_TRUE(src.flushTo(dst).has_value()); + EXPECT_EQ(src_invocations, 0); + + EXPECT_EQ(src.entryCount(src_id), 0u); + EXPECT_EQ(dst.entryCount(dst_id), 1u); + + // Reading from dst invokes the closure once, exactly like in src. + auto resolved = dst.latestAt(dst_id, 100); + ASSERT_TRUE(resolved.has_value()); + EXPECT_EQ(src_invocations, 1); + ASSERT_EQ(resolved->payload.bytes.size(), 4u); + EXPECT_EQ(resolved->payload.bytes[0], 0xDE); +} + +TEST(ObjectStoreFlushTest, RetentionBudgetAppliedAfterFlush) { + ObjectStore src, dst; + auto src_id = registerSameDescriptor(src); + auto dst_id = registerSameDescriptor(dst); + + // Time-window retention of 100 ns on dst: anything older than newest - 100 evicts. + dst.setRetentionBudget(dst_id, RetentionBudget{.time_window_ns = 100, .max_memory_bytes = 0}); + + src.pushOwned(src_id, 100, makePayload(4)); + src.pushOwned(src_id, 150, makePayload(4)); + src.pushOwned(src_id, 250, makePayload(4)); // newest after flush; evict anything < 150. + + ASSERT_TRUE(src.flushTo(dst).has_value()); + + // After flush: newest is 250, threshold = 150, so the entry at 100 evicts. + EXPECT_EQ(dst.entryCount(dst_id), 2u); + EXPECT_EQ(dst.at(dst_id, 0)->timestamp, 150); + EXPECT_EQ(dst.at(dst_id, 1)->timestamp, 250); +} + +TEST(ObjectStoreFlushTest, ZeroCopyOwnershipChainSurvives) { + // A shared_ptr handed in via pushOwned is shared between the entry's payload + // and any consumer of `at()`. After flushTo, the same shared_ptr instance + // must be the one observed in the destination — no copy, just refcount + // transfer through the move of the underlying ObjectEntry. + ObjectStore src, dst; + auto src_id = registerSameDescriptor(src); + auto dst_id = registerSameDescriptor(dst); + + src.pushOwned(src_id, 100, makePayload(16, 0x77)); + auto pre_handle = src.at(src_id, 0); + ASSERT_TRUE(pre_handle.has_value()); + const auto* pre_ptr = pre_handle->payload.anchor.get(); + + ASSERT_TRUE(src.flushTo(dst).has_value()); + + auto post_handle = dst.at(dst_id, 0); + ASSERT_TRUE(post_handle.has_value()); + EXPECT_EQ(post_handle->payload.anchor.get(), pre_ptr) << "shared_ptr identity must survive the flush"; +} + } // namespace } // namespace PJ diff --git a/pj_datastore/tests/plugin_data_host_object_test.cpp b/pj_datastore/tests/plugin_data_host_object_test.cpp index 63085a9..c786599 100644 --- a/pj_datastore/tests/plugin_data_host_object_test.cpp +++ b/pj_datastore/tests/plugin_data_host_object_test.cpp @@ -211,5 +211,80 @@ TEST(PluginDataHostObjectTest, ViewReportsNotBoundWhenRawIsEmpty) { EXPECT_FALSE(status.has_value()); } +// =========================================================================== +// setTarget — streaming two-store flow +// =========================================================================== + +TEST(PluginDataHostObjectTest, SetTargetRedirectsRegisterAndPushToSecondary) { + // Simulates the streaming pause/resume routing: the manager flips the host + // between a primary and a secondary store. After setTarget(secondary) the + // host's registerTopic + pushOwned land in the secondary; pushes against + // primary stop. Flipping back to primary resumes routing there. + ObjectStore primary; + ObjectStore secondary; + DatastoreSourceObjectWriteHost host_impl{primary, kDatasetId}; + SourceObjectWriteHostView host{host_impl.raw()}; + + // Lockstep registration on both stores BEFORE the swap, so the topic id + // is the same on each side (auto-counter ticks identically). This matches + // the manager wiring described in the two-store plan. + const auto primary_topic = *host.registerTopic("cam", "{}"); + host_impl.setTarget(&secondary); + const auto secondary_topic = *host.registerTopic("cam", "{}"); + EXPECT_EQ(primary_topic.id, secondary_topic.id); + + // A push now must land in secondary, not primary. + const std::vector payload_a{0x01, 0x02}; + ASSERT_TRUE(host.pushOwned(secondary_topic, 1000, payload_a).has_value()); + EXPECT_EQ(primary.entryCount(ObjectTopicId{primary_topic.id}), 0U); + EXPECT_EQ(secondary.entryCount(ObjectTopicId{secondary_topic.id}), 1U); + + // Flip back: subsequent pushes return to primary. + host_impl.setTarget(&primary); + const std::vector payload_b{0x03}; + ASSERT_TRUE(host.pushOwned(primary_topic, 2000, payload_b).has_value()); + EXPECT_EQ(primary.entryCount(ObjectTopicId{primary_topic.id}), 1U); + EXPECT_EQ(secondary.entryCount(ObjectTopicId{secondary_topic.id}), 1U); +} + +TEST(PluginDataHostObjectTest, ParserSetTargetRedirectsPushToSecondary) { + // Same test as above but for the parser-scoped host, which is the one the + // streaming worker actually drives (parser-bound topic id captured at + // bind() — invariant under the swap because the manager has registered + // the topic in both stores via lockstep registerTopic). + ObjectStore primary; + ObjectStore secondary; + // Pre-register on both stores in lockstep. Both auto-assign the same id. + const auto primary_topic = + *primary.registerTopic({.dataset_id = kDatasetId, .topic_name = "cam", .metadata_json = "{}"}); + const auto secondary_topic = + *secondary.registerTopic({.dataset_id = kDatasetId, .topic_name = "cam", .metadata_json = "{}"}); + ASSERT_EQ(primary_topic.id, secondary_topic.id); + + DatastoreParserObjectWriteHost host_impl{primary, primary_topic.id}; + sdk::ParserObjectWriteHostView host{host_impl.raw()}; + + const std::vector payload_a{0x01}; + const std::vector payload_b{0x02}; + const std::vector payload_c{0x03}; + + // Push before swap goes to primary. + ASSERT_TRUE(host.pushOwned(1000, payload_a).has_value()); + EXPECT_EQ(primary.entryCount(primary_topic), 1U); + EXPECT_EQ(secondary.entryCount(secondary_topic), 0U); + + // Swap and push: now lands in secondary, primary untouched. + host_impl.setTarget(&secondary); + ASSERT_TRUE(host.pushOwned(2000, payload_b).has_value()); + EXPECT_EQ(primary.entryCount(primary_topic), 1U); + EXPECT_EQ(secondary.entryCount(secondary_topic), 1U); + + // Swap back: resumes pushing into primary. + host_impl.setTarget(&primary); + ASSERT_TRUE(host.pushOwned(3000, payload_c).has_value()); + EXPECT_EQ(primary.entryCount(primary_topic), 2U); + EXPECT_EQ(secondary.entryCount(secondary_topic), 1U); +} + } // namespace } // namespace PJ From 94f0e4f4fae37e64e4d37c41ab57ea8405546319 Mon Sep 17 00:00:00 2001 From: Davide Faconti Date: Fri, 29 May 2026 11:28:53 +0200 Subject: [PATCH 2/2] minor changes --- pj_datastore/include/pj_datastore/engine.hpp | 26 ++++----- .../include/pj_datastore/object_store.hpp | 54 ++++++------------- pj_datastore/src/engine.cpp | 2 +- pj_datastore/src/object_store.cpp | 15 ++---- pj_datastore/src/plugin_data_host.cpp | 9 ++-- 5 files changed, 37 insertions(+), 69 deletions(-) diff --git a/pj_datastore/include/pj_datastore/engine.hpp b/pj_datastore/include/pj_datastore/engine.hpp index 09c5b60..ac80546 100644 --- a/pj_datastore/include/pj_datastore/engine.hpp +++ b/pj_datastore/include/pj_datastore/engine.hpp @@ -82,22 +82,18 @@ class DataEngine { /// Evict old chunks outside the retention window. void enforceRetention(PJ::Timestamp retention_window_ns); - /// Move every committed chunk from this engine into `dst`, leaving this - /// engine's topic storages empty (datasets, topics, schemas, and time - /// domains remain registered). Topics are matched by descriptor - /// (`dataset_id` + `name`); both engines must have the matching topics - /// registered or the call fails without partial mutation. Monotonicity is - /// enforced strictly per topic: the source's earliest chunk timestamp must - /// be greater than or equal to the destination's current `time_max()`. + /// Move every committed chunk into `dst`, leaving this engine's storages + /// empty (datasets, topics, schemas, time domains stay registered). Topics + /// are matched by descriptor (`dataset_id` + `name`); both engines must have + /// them registered. Monotonicity is enforced per topic: the source's + /// earliest chunk timestamp must be >= the destination's `time_max()`. Any + /// failure mutates neither engine. /// - /// Zero-copy on the chunk data: the destination's `std::deque` - /// receives the source's chunks via `std::move`, never via copy. The chunk - /// internals (column buffers, value arrays) are pointer/buffer moves. - /// - /// Schema compatibility is the caller's responsibility — typically the - /// destination is kept in lockstep with the source through parallel - /// `createDataset` / `createTopic` registration at startup. - PJ::Expected flushTo(DataEngine& dst); + /// Zero-copy: dst's `std::deque` receives the chunks via + /// `std::move` (column buffers/value arrays are pointer moves). Schema + /// compatibility is the caller's responsibility — typically dst is kept in + /// lockstep with the source via parallel registration at startup. + PJ::Status flushTo(DataEngine& dst); // Writer/Reader factories /// Create a writer bound to this engine. diff --git a/pj_datastore/include/pj_datastore/object_store.hpp b/pj_datastore/include/pj_datastore/object_store.hpp index 03c3e09..acd5980 100644 --- a/pj_datastore/include/pj_datastore/object_store.hpp +++ b/pj_datastore/include/pj_datastore/object_store.hpp @@ -49,24 +49,15 @@ using LazyCallback = std::function; struct ObjectEntry { Timestamp timestamp = 0; - // Holds either a SharedBuffer (eager owned payload, counted against the - // retention budget) or a LazyCallback (lazy resolver). resolveEntry - // discriminates via std::get_if; the variant is exhaustive over the two - // (and only two) payload kinds. + // Eager owned bytes or a lazy resolver; resolveEntry discriminates via std::get_if. std::variant payload; }; struct ResolvedObjectEntry { Timestamp timestamp = 0; - // PayloadView lets the entry hold an opaque anchor (any shared_ptr) plus - // a non-owning Span over the bytes. Consumers read `payload.bytes` for the - // data; they retain `payload.anchor` if they need the bytes to outlive the - // resolve call. Both `pushOwned` and `pushLazy` paths land here: - // - owned: payload.bytes spans the shared_ptr>; anchor IS that shared_ptr. - // - lazy: payload is whatever the closure returns; anchor can be any shared_ptr - // (e.g. a C-ABI payload anchor wrapped as shared_ptr). - // resolveEntry never casts the anchor to a concrete type; the type erasure - // stays opaque all the way to the consumer. + // Non-owning Span over the bytes plus an opaque anchor (any shared_ptr). + // Consumers read `payload.bytes`; retain `payload.anchor` to keep the bytes + // alive past the resolve call. resolveEntry never casts the anchor. sdk::PayloadView payload; }; @@ -130,11 +121,9 @@ class ObjectStore { Status pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector payload); - // Fetcher runs on every read. Producers anchor on whatever owns the bytes - // (chunk cache, mmap, fresh allocation); the store never copies — it just - // retains the anchor through PayloadView. When the producer already holds - // the bytes behind a shared_ptr (e.g. a streaming buffer handed off between - // stores), the closure captures it and returns a view backed by it. + // Fetcher runs on every read; the store retains the anchor via PayloadView + // and never copies. The closure can return a view over bytes the producer + // already owns (chunk cache, mmap, hand-off between stores). Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch); // --- Read --- @@ -164,26 +153,17 @@ class ObjectStore { // --- Cross-store flush --- - // Move every entry from this store into `dst`, leaving this store empty - // (topic registrations are preserved). Topics are matched by descriptor - // (dataset_id + topic_name); both stores must have registered the same - // descriptors or the call fails without partial mutation. For each series, - // monotonicity is enforced strictly: the earliest timestamp being moved - // must be greater than or equal to the destination's current last - // timestamp. On any validation failure the call returns an error and - // neither store is mutated. + // Move every entry into `dst`, leaving this store empty (registrations kept). + // Topics are matched by descriptor (dataset_id + topic_name); both stores + // must share descriptors. Monotonicity is enforced per series: the earliest + // moved timestamp must be >= the destination's last. Any validation failure + // returns an error and mutates neither store. // - // Zero-copy on the payload bytes. Each ObjectEntry is moved into the - // destination's series by value; the std::variant inside holds either a - // shared_ptr or a std::function, and moving it is a pointer/buffer move — - // bytes captured by the closure or owned by the shared_ptr are never - // copied or materialized during the flush. Lazy - // entries preserve their semantics in the destination: their closure is - // re-invoked only when the destination is read. - // - // After the move, the destination's retention budget is applied to each - // touched series in normal order. - Expected flushTo(ObjectStore& dst); + // Zero-copy: each ObjectEntry is moved by value, so the variant's shared_ptr + // or closure transfers as a pointer move — bytes are never copied. Lazy + // entries keep their semantics; their closure re-runs only on a dst read. + // Afterward, dst's retention budget is applied to each touched series. + Status flushTo(ObjectStore& dst); // --- Lifecycle --- diff --git a/pj_datastore/src/engine.cpp b/pj_datastore/src/engine.cpp index f77a6b1..693e1d2 100644 --- a/pj_datastore/src/engine.cpp +++ b/pj_datastore/src/engine.cpp @@ -182,7 +182,7 @@ void DataEngine::enforceRetention(Timestamp retention_window_ns) { } } -Expected DataEngine::flushTo(DataEngine& dst) { +Status DataEngine::flushTo(DataEngine& dst) { if (&dst == this) { return PJ::unexpected("flushTo: source and destination are the same engine"); } diff --git a/pj_datastore/src/object_store.cpp b/pj_datastore/src/object_store.cpp index 0488878..f24fc0e 100644 --- a/pj_datastore/src/object_store.cpp +++ b/pj_datastore/src/object_store.cpp @@ -267,7 +267,7 @@ void ObjectStore::evictAllBefore(Timestamp threshold) { // --- Cross-store flush --- -Expected ObjectStore::flushTo(ObjectStore& dst) { +Status ObjectStore::flushTo(ObjectStore& dst) { if (&dst == this) { return unexpected("flushTo: source and destination are the same store"); } @@ -374,9 +374,8 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) { resolved.timestamp = entry.timestamp; if (const auto* owned = std::get_if(&entry.payload)) { - // Owned branch: build a PayloadView whose Span covers the vector and - // whose anchor IS the same shared_ptr — refcount bump, no copy. A - // default-constructed entry holds a null SharedBuffer, so guard it. + // Span the vector, anchor on the same shared_ptr — refcount bump, no copy. + // A default-constructed entry holds a null SharedBuffer, so guard it. if (*owned) { resolved.payload = sdk::PayloadView{ Span{(*owned)->data(), (*owned)->size()}, @@ -384,12 +383,8 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) { }; } } else if (const auto* lazy = std::get_if(&entry.payload)) { - // Lazy branch: forward whatever the closure returns. The anchor stays - // opaque (shared_ptr); consumers retain it without needing - // to know the concrete type. No static_pointer_cast here — that was the - // hidden contract that locked the slot to shared_ptr>; - // dropping it lets producers anchor on arrow::Buffer, mmap pools, or - // C-ABI payload anchors equally. + // Forward the closure's PayloadView verbatim. The anchor stays opaque (no + // cast), so producers can back it with arrow::Buffer, mmap, or a C-ABI anchor. resolved.payload = (*lazy)(); } diff --git a/pj_datastore/src/plugin_data_host.cpp b/pj_datastore/src/plugin_data_host.cpp index 8c19fca..5c9957c 100644 --- a/pj_datastore/src/plugin_data_host.cpp +++ b/pj_datastore/src/plugin_data_host.cpp @@ -1456,12 +1456,9 @@ void sourceObjectSetRetentionBudget( // Toolbox object read host trampolines // --------------------------------------------------------------------------- -// The C-ABI exposes the bytes via a void-handle (PJ_object_bytes_handle_t) -// that the plugin must later release. We allocate a sdk::PayloadView on the -// heap and reinterpret_cast its pointer to the handle: the PayloadView's -// anchor is what keeps the underlying buffer alive until the plugin calls -// release_bytes. No wrapper struct needed — PayloadView already carries -// bytes (Span) + anchor (BufferAnchor) in one value. +// PJ_object_bytes_handle_t is a heap-allocated sdk::PayloadView: its anchor +// keeps the buffer alive until the plugin calls release_bytes. No wrapper +// struct needed — PayloadView already carries the Span + anchor. PJ_object_topic_handle_t toolboxObjectLookupTopic(void* ctx, PJ_string_view_t topic_name) noexcept { auto* impl = static_cast(ctx); try {