Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pj_datastore/include/pj_datastore/engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,22 @@ class DataEngine {
/// derived.onSourceCommitted(engine.commitChunks(writer.flushAll()));
std::vector<PJ::TopicId> commitChunks(std::vector<std::pair<PJ::TopicId, TopicChunk>> chunks);

/// Evict old chunks outside retention window.
/// Evict old chunks outside the retention window.
void enforceRetention(PJ::Timestamp retention_window_ns);

/// Move every committed chunk into `dst`, leaving this engine's storages
/// empty (datasets, topics, schemas, time domains stay registered). Topics
/// are matched by descriptor (`dataset_id` + `name`); both engines must have
/// them registered. Monotonicity is enforced per topic: the source's
/// earliest chunk timestamp must be >= the destination's `time_max()`. Any
/// failure mutates neither engine.
///
/// Zero-copy: dst's `std::deque<TopicChunk>` receives the chunks via
/// `std::move` (column buffers/value arrays are pointer moves). Schema
/// compatibility is the caller's responsibility — typically dst is kept in
/// lockstep with the source via parallel registration at startup.
PJ::Status flushTo(DataEngine& dst);

// Writer/Reader factories
/// Create a writer bound to this engine.
[[nodiscard]] DataWriter createWriter();
Expand Down
24 changes: 21 additions & 3 deletions pj_datastore/include/pj_datastore/object_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,15 @@ using LazyCallback = std::function<sdk::PayloadView()>;

struct ObjectEntry {
Timestamp timestamp = 0;
// Eager owned bytes or a lazy resolver; resolveEntry discriminates via std::get_if.
std::variant<SharedBuffer, LazyCallback> payload;
};

struct ResolvedObjectEntry {
Timestamp timestamp = 0;
// Non-owning Span over the bytes plus an opaque anchor (any shared_ptr<T>).
// Consumers read `payload.bytes`; retain `payload.anchor` to keep the bytes
// alive past the resolve call. resolveEntry never casts the anchor.
sdk::PayloadView payload;
};

Expand Down Expand Up @@ -117,9 +121,9 @@ class ObjectStore {

Status pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload);

// Fetcher runs on every read. Producers anchor on whatever owns the bytes
// (chunk cache, mmap, fresh allocation); the store never copies — it just
// retains the anchor through PayloadView.
// Fetcher runs on every read; the store retains the anchor via PayloadView
// and never copies. The closure can return a view over bytes the producer
// already owns (chunk cache, mmap, hand-off between stores).
Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch);

// --- Read ---
Expand Down Expand Up @@ -147,6 +151,20 @@ class ObjectStore {
void evictBefore(ObjectTopicId id, Timestamp threshold);
void evictAllBefore(Timestamp threshold);

// --- Cross-store flush ---

// Move every entry into `dst`, leaving this store empty (registrations kept).
// Topics are matched by descriptor (dataset_id + topic_name); both stores
// must share descriptors. Monotonicity is enforced per series: the earliest
// moved timestamp must be >= the destination's last. Any validation failure
// returns an error and mutates neither store.
//
// Zero-copy: each ObjectEntry is moved by value, so the variant's shared_ptr
// or closure transfers as a pointer move — bytes are never copied. Lazy
// entries keep their semantics; their closure re-runs only on a dst read.
// Afterward, dst's retention budget is applied to each touched series.
Status flushTo(ObjectStore& dst);

// --- Lifecycle ---

void removeTopic(ObjectTopicId id);
Expand Down
29 changes: 29 additions & 0 deletions pj_datastore/include/pj_datastore/plugin_data_host.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ class DatastoreSourceWriteHost {
[[nodiscard]] PJ_source_write_host_t raw() noexcept;
void flushPending();

// Atomically swap the destination DataEngine. Mirrors the parser write
// host's setTarget: the streaming two-engine flow routes source-level
// scalar pushes to a secondary DataEngine during pause and back to the
// primary on resume. Pending rows are flushed to the current engine
// before the switch. Does not take ownership.
void setTarget(DataEngine* target);

private:
std::unique_ptr<DatastoreSourceWriteHostState> state_;
};
Expand All @@ -51,6 +58,13 @@ class DatastoreSourceObjectWriteHost {

[[nodiscard]] PJ_object_write_host_t raw() noexcept;

// Atomically swap the destination store. Used by the streaming two-store
// flow to route pushes to a secondary ObjectStore during pause and back to
// the primary on resume. Must point to a live store; the host does not
// take ownership and the caller must keep the target alive for as long as
// the host can receive pushes against it.
void setTarget(ObjectStore* target) noexcept;

private:
std::unique_ptr<DatastoreSourceObjectWriteHostState> state_;
};
Expand All @@ -68,6 +82,14 @@ class DatastoreParserWriteHost {
[[nodiscard]] PJ_parser_write_host_t raw() noexcept;
void flushPending();

// Atomically swap the destination DataEngine. Mirrors the object write
// host's setTarget: the streaming two-store flow routes scalar pushes to a
// secondary DataEngine during pause and back to the primary on resume.
// Pending rows are flushed to the current engine before the switch; the
// bound topic must exist in `target` with the same TopicId (the streaming
// manager registers it lockstep on both engines). Does not take ownership.
void setTarget(DataEngine* target);

private:
std::unique_ptr<DatastoreParserWriteHostState> state_;
};
Expand Down Expand Up @@ -111,6 +133,13 @@ class DatastoreParserObjectWriteHost {

[[nodiscard]] PJ_parser_object_write_host_t raw() noexcept;

// Atomically swap the destination store. Used by the streaming two-store
// flow to route pushes to a secondary ObjectStore during pause and back to
// the primary on resume. The bound topic id must exist in `target` (the
// streaming manager ensures this via lockstep registerTopic on both
// stores). The host does not take ownership of the target.
void setTarget(ObjectStore* target) noexcept;

private:
std::unique_ptr<DatastoreParserObjectWriteHostState> state_;
};
Expand Down
8 changes: 8 additions & 0 deletions pj_datastore/include/pj_datastore/topic_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ struct TopicMetadata {
uint32_t truncated_sample_count = 0;
};

class DataEngine;

/// Storage container for committed chunks of one topic.
class TopicStorage {
public:
Expand Down Expand Up @@ -123,6 +125,12 @@ class TopicStorage {
void setArrayExpansionCount(const std::string& field_path, uint32_t count);

private:
// DataEngine::flushTo needs to move sealed_chunks_ between TopicStorage
// instances of different engines without copying. Friending it lets the
// transfer happen entirely inside DataEngine without exposing the move
// primitive on the public TopicStorage API.
friend class DataEngine;

TopicId topic_id_;
TopicDescriptor descriptor_;
std::deque<TopicChunk> sealed_chunks_;
Expand Down
57 changes: 57 additions & 0 deletions pj_datastore/src/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,63 @@ void DataEngine::enforceRetention(Timestamp retention_window_ns) {
}
}

Status DataEngine::flushTo(DataEngine& dst) {
if (&dst == this) {
return PJ::unexpected("flushTo: source and destination are the same engine");
}

// Phase 1: validate. Walk every src topic with sealed chunks and look up
// the matching dst topic by descriptor (dataset_id + name). Verify
// monotonicity against dst's current time_max. No mutation yet.
struct Step {
TopicStorage* src;
TopicStorage* dst;
};
std::vector<Step> plan;
plan.reserve(impl_->topics.size());

for (auto it = impl_->topics.begin(); it != impl_->topics.end(); ++it) {
auto& src_storage = it.value();
if (src_storage.empty()) {
continue;
}
TopicStorage* dst_storage = nullptr;
for (auto dst_it = dst.impl_->topics.begin(); dst_it != dst.impl_->topics.end(); ++dst_it) {
auto& candidate = dst_it.value();
if (candidate.descriptor().dataset_id == src_storage.descriptor().dataset_id &&
candidate.descriptor().name == src_storage.descriptor().name) {
dst_storage = &candidate;
break;
}
}
if (dst_storage == nullptr) {
return PJ::unexpected(
"flushTo: destination has no topic '" + src_storage.descriptor().name + "' for dataset " +
std::to_string(src_storage.descriptor().dataset_id));
}
if (!dst_storage->empty() && src_storage.time_min() < dst_storage->time_max()) {
return PJ::unexpected("flushTo: monotonicity violation for topic '" + src_storage.descriptor().name + "'");
}
plan.push_back({&src_storage, dst_storage});
}

// Phase 2: execute. friend access lets us move sealed_chunks_ directly
// between TopicStorage instances of different engines — the deque move
// transfers chunk ownership without copying any column data or value
// buffers. Each chunk's TopicChunkStats (t_min/t_max/row_count) rides
// along inside the chunk by value, so dst's time_min/time_max queries
// reflect the new state immediately after the move.
for (auto& step : plan) {
auto drained = std::move(step.src->sealed_chunks_);
step.src->sealed_chunks_.clear(); // post-move state: deque is valid but empty.
for (auto& chunk : drained) {
step.dst->sealed_chunks_.push_back(std::move(chunk));
}
}

return {};
}

// ---------------------------------------------------------------------------
// Listing helpers
// ---------------------------------------------------------------------------
Expand Down
75 changes: 74 additions & 1 deletion pj_datastore/src/object_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ Status ObjectStore::pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector
return unexpected("timestamp not monotonically non-decreasing");
}

size_t payload_size = payload.size();
const size_t payload_size = payload.size();

auto shared_data = std::make_shared<const std::vector<uint8_t>>(std::move(payload));

ObjectEntry entry;
Expand Down Expand Up @@ -264,6 +265,74 @@ void ObjectStore::evictAllBefore(Timestamp threshold) {
}
}

// --- Cross-store flush ---

Status ObjectStore::flushTo(ObjectStore& dst) {
if (&dst == this) {
return unexpected("flushTo: source and destination are the same store");
}

// Deterministic lock order by address to avoid deadlock with concurrent flushTo calls.
ObjectStore* first = this < &dst ? this : &dst;
ObjectStore* second = first == this ? &dst : this;
std::unique_lock first_lock(first->store_mutex_);
std::unique_lock second_lock(second->store_mutex_);

// Phase 1: validate every source series can be matched to a destination
// topic by descriptor and that the move respects monotonicity. No mutation.
struct Step {
ObjectSeries* src;
ObjectSeries* dst;
};
std::vector<Step> plan;
plan.reserve(topics_.size());

for (auto& [src_id, src_series] : topics_) {
if (src_series->entry_timestamps.empty()) {
continue;
}
ObjectSeries* dst_series = nullptr;
for (auto& [dst_id, dst_series_ptr] : dst.topics_) {
if (dst_series_ptr->descriptor.dataset_id == src_series->descriptor.dataset_id &&
dst_series_ptr->descriptor.topic_name == src_series->descriptor.topic_name) {
dst_series = dst_series_ptr.get();
break;
}
}
if (dst_series == nullptr) {
return unexpected(
"flushTo: destination has no topic '" + src_series->descriptor.topic_name + "' for dataset " +
std::to_string(src_series->descriptor.dataset_id));
}
if (!dst_series->entry_timestamps.empty() &&
src_series->entry_timestamps.front() < dst_series->entry_timestamps.back()) {
return unexpected("flushTo: monotonicity violation for topic '" + src_series->descriptor.topic_name + "'");
}
plan.push_back({src_series.get(), dst_series});
}

// Phase 2: execute the moves. Holding both store_mutex_ unique means no
// other reader or writer can observe an intermediate state; per-series
// mutexes are not needed because no concurrent access can occur.
for (auto& step : plan) {
for (auto& entry : step.src->entries) {
step.dst->entries.push_back(std::move(entry));
}
step.dst->entry_timestamps.insert(
step.dst->entry_timestamps.end(), step.src->entry_timestamps.begin(), step.src->entry_timestamps.end());
step.dst->memory_bytes += step.src->memory_bytes;

step.src->entries.clear();
step.src->entry_timestamps.clear();
step.src->memory_bytes = 0;

const Timestamp newest = step.dst->entry_timestamps.empty() ? 0 : step.dst->entry_timestamps.back();
applyRetention(*step.dst, newest);
}

return {};
}

// --- Lifecycle ---

void ObjectStore::removeTopic(ObjectTopicId id) {
Expand Down Expand Up @@ -305,13 +374,17 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) {
resolved.timestamp = entry.timestamp;

if (const auto* owned = std::get_if<SharedBuffer>(&entry.payload)) {
// Span the vector, anchor on the same shared_ptr — refcount bump, no copy.
// A default-constructed entry holds a null SharedBuffer, so guard it.
if (*owned) {
resolved.payload = sdk::PayloadView{
Span<const uint8_t>{(*owned)->data(), (*owned)->size()},
sdk::BufferAnchor{*owned},
};
}
} else if (const auto* lazy = std::get_if<LazyCallback>(&entry.payload)) {
// Forward the closure's PayloadView verbatim. The anchor stays opaque (no
// cast), so producers can back it with arrow::Buffer, mmap, or a C-ABI anchor.
resolved.payload = (*lazy)();
}

Expand Down
Loading
Loading