Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/linux-ci.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
name: Linux CI
on:
push:
branches: [development, main]
branches: [main]
pull_request:
branches: [development, main]
branches: [main]
types: [opened, synchronize, reopened, ready_for_review]
workflow_dispatch: {}

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/macos-ci.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
name: macOS CI
on:
push:
branches: [development, main]
branches: [main]
pull_request:
branches: [development, main]
branches: [main]
types: [opened, synchronize, reopened, ready_for_review]
workflow_dispatch: {}

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
name: pre-commit
on:
push:
branches: [development, main]
branches: [main]
pull_request:
branches: [development, main]
branches: [main]
types: [opened, synchronize, reopened, ready_for_review]
workflow_dispatch: {}

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/windows-ci.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
name: Windows CI
on:
push:
branches: [development, main]
branches: [main]
pull_request:
branches: [development, main]
branches: [main]
types: [opened, synchronize, reopened, ready_for_review]
workflow_dispatch: {}

Expand Down
2 changes: 2 additions & 0 deletions pj_base/include/pj_base/number_parse.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#pragma once
// Copyright 2026 Davide Faconti
// SPDX-License-Identifier: Apache-2.0

#include <charconv>
#include <optional>
Expand Down
3 changes: 3 additions & 0 deletions pj_base/src/number_parse.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2026 Davide Faconti
// SPDX-License-Identifier: Apache-2.0

#include "pj_base/number_parse.hpp"

#include <cerrno>
Expand Down
95 changes: 51 additions & 44 deletions pj_datastore/src/query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,30 +126,37 @@ void RangeCursor::forEachChunk(std::function<void(const ChunkRowRange&)> callbac
}

void RangeCursor::findFirstValid() {
// Linear scan to find the first chunk whose t_max >= t_min_
// (i.e., that could contain data in our range)
for (chunk_index_ = 0; chunk_index_ < chunks_->size(); ++chunk_index_) {
const auto& chunk = (*chunks_)[chunk_index_];
if (chunk.stats.t_max >= t_min_) {
// This chunk might contain data in range.
// Now find the first row where timestamp >= t_min_.
for (row_index_ = 0; row_index_ < chunk.stats.row_count; ++row_index_) {
Timestamp ts = chunk.readTimestamp(row_index_);
if (ts >= t_min_) {
// Check if this row is also within t_max
if (ts <= t_max_) {
return; // Found a valid starting position
}
// ts > t_max_ means no valid data in range at all
chunk_index_ = chunks_->size();
return;
}
}
// All rows in this chunk are before t_min, try next chunk
continue;
}
const auto& chunks = *chunks_;

// First chunk that could contain a row in range, i.e. whose t_max >= t_min_.
// Committed chunks are non-empty and time-ordered (each chunk's t_min >= the
// previous chunk's t_max), so t_max is non-decreasing across the deque and we
// can binary-search it.
const auto chunk_it = std::lower_bound(
chunks.begin(), chunks.end(), t_min_,
[](const TopicChunk& chunk, Timestamp value) { return chunk.stats.t_max < value; });
if (chunk_it == chunks.end()) {
// All data is strictly before t_min_.
chunk_index_ = chunks.size();
row_index_ = 0;
return;
}
chunk_index_ = static_cast<std::size_t>(chunk_it - chunks.begin());

// First row with timestamp >= t_min_ within that chunk. Such a row exists
// because t_max (the chunk's last timestamp) >= t_min_.
const TopicChunk& chunk = *chunk_it;
const auto ts_begin = chunk.timestamps.begin();
const auto ts_end = ts_begin + static_cast<std::ptrdiff_t>(chunk.stats.row_count);
const auto row_it = std::lower_bound(ts_begin, ts_end, t_min_);
row_index_ = static_cast<std::size_t>(row_it - ts_begin);

// If the first row at or after t_min_ is already past t_max_, nothing in the
// deque falls inside [t_min_, t_max_].
if (row_it == ts_end || *row_it > t_max_) {
chunk_index_ = chunks.size();
row_index_ = 0;
}
// No valid chunk found: chunk_index_ == chunks_->size() (past-end)
}

void RangeCursor::skipToValid() {
Expand All @@ -171,30 +178,30 @@ void RangeCursor::skipToValid() {
// ===========================================================================

std::optional<SampleRow> latestAt(const std::deque<TopicChunk>& chunks, Timestamp t) {
if (chunks.empty()) {
// Last chunk that can contain a row at or before t, i.e. the latest chunk
// whose t_min <= t. Committed chunks are non-empty and have non-decreasing
// t_min, so upper_bound finds the first chunk strictly after t; the chunk
// before it is the answer. (At a shared boundary timestamp this selects the
// later chunk, matching the previous reverse-scan behaviour.)
const auto after = std::upper_bound(chunks.begin(), chunks.end(), t, [](Timestamp value, const TopicChunk& chunk) {
return value < chunk.stats.t_min;
});
if (after == chunks.begin()) {
// Empty deque, or every chunk starts strictly after t.
return std::nullopt;
}

// Reverse iterate chunks. For each chunk, if t_min <= t, search within it.
for (std::size_t ci = chunks.size(); ci > 0; --ci) {
const auto& chunk = chunks[ci - 1];
if (chunk.stats.t_min > t) {
continue; // Entire chunk is after t
}
// chunk.stats.t_min <= t, so there might be a row <= t in this chunk.
// Reverse scan within the chunk to find the last row with timestamp <= t.
for (std::size_t ri = chunk.stats.row_count; ri > 0; --ri) {
Timestamp ts = chunk.readTimestamp(ri - 1);
if (ts <= t) {
return SampleRow{ts, &chunk, ri - 1};
}
}
// All rows in this chunk are after t, but t_min <= t was true.
// This shouldn't happen with sorted data, but handle gracefully
// by continuing to the previous chunk.
const TopicChunk& chunk = *(after - 1);

// Last row with timestamp <= t within that chunk. Such a row exists because
// the chunk's first timestamp (t_min) is <= t.
const auto ts_begin = chunk.timestamps.begin();
const auto ts_end = ts_begin + static_cast<std::ptrdiff_t>(chunk.stats.row_count);
const auto row_after = std::upper_bound(ts_begin, ts_end, t);
if (row_after == ts_begin) {
return std::nullopt; // unreachable for committed chunks (row 0 ts == t_min <= t)
}

return std::nullopt;
const std::size_t row = static_cast<std::size_t>((row_after - 1) - ts_begin);
return SampleRow{chunk.readTimestamp(row), &chunk, row};
}

// ===========================================================================
Expand Down
69 changes: 69 additions & 0 deletions pj_datastore/tests/query_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ TopicChunk make_test_chunk(Timestamp t_start, uint32_t num_rows, Timestamp step)
return builder.seal();
}

// Helper: build a chunk from an explicit (possibly non-uniform / duplicated)
// timestamp list. The column value equals the row index, so a returned
// row_index can be cross-checked against value.
TopicChunk make_chunk_from_timestamps(const std::vector<Timestamp>& ts) {
std::vector<ColumnDescriptor> cols = {{0, PrimitiveType::kFloat32, "value"}};
TopicChunkBuilder builder(1, 1, cols, static_cast<uint32_t>(ts.size()));
for (std::size_t i = 0; i < ts.size(); ++i) {
builder.beginRow(ts[i]);
builder.set(0, static_cast<float>(i));
builder.finishRow();
}
return builder.seal();
}

// Build the standard 5-chunk test fixture:
// Chunk 0: t=[0, 90], step=10
// Chunk 1: t=[100, 190], step=10
Expand Down Expand Up @@ -160,6 +174,61 @@ TEST(QueryTest, LatestAtBetweenChunks) {
EXPECT_EQ(result->timestamp, 90);
}

// =========================================================================
// Binary-search edge cases (duplicate timestamps, shared chunk boundaries)
// =========================================================================

TEST(QueryTest, LatestAtWithDuplicateTimestampsReturnsLastDuplicate) {
std::deque<TopicChunk> chunks;
// Rows: 0 1 2 3 4
chunks.push_back(make_chunk_from_timestamps({10, 20, 20, 20, 30}));

auto result = latestAt(chunks, 20);
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result->timestamp, 20);
// upper_bound semantics: the last row with ts <= 20 is row index 3.
EXPECT_EQ(result->row_index, 3u);
}

TEST(QueryTest, RangeQueryWithDuplicateTimestampsStartsAtFirstDuplicate) {
std::deque<TopicChunk> chunks;
// Rows: 0 1 2 3 4
chunks.push_back(make_chunk_from_timestamps({10, 20, 20, 20, 30}));

auto cursor = rangeQuery(chunks, 20, 20);
std::vector<std::size_t> rows;
cursor.forEach([&](const SampleRow& row) { rows.push_back(row.row_index); });

// lower_bound semantics: starts at the first ts >= 20 (row 1) and includes
// every row with ts <= 20 (rows 1, 2, 3).
ASSERT_EQ(rows.size(), 3u);
EXPECT_EQ(rows.front(), 1u);
EXPECT_EQ(rows.back(), 3u);
}

TEST(QueryTest, LatestAtAtSharedChunkBoundarySelectsLaterChunk) {
std::deque<TopicChunk> chunks;
chunks.push_back(make_chunk_from_timestamps({70, 80, 90})); // chunk A, t_max=90
chunks.push_back(make_chunk_from_timestamps({90, 100, 110})); // chunk B, t_min=90

auto result = latestAt(chunks, 90);
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result->timestamp, 90);
// The boundary value 90 exists in both chunks; the later chunk (B, row 0) wins.
EXPECT_EQ(result->chunk, &chunks[1]);
EXPECT_EQ(result->row_index, 0u);
}

TEST(QueryTest, RangeQuerySingleTimestampPoint) {
auto chunks = make_standard_chunks();
// Degenerate inclusive range [200, 200] hits exactly one row.
auto cursor = rangeQuery(chunks, 200, 200);
std::vector<Timestamp> timestamps;
cursor.forEach([&](const SampleRow& row) { timestamps.push_back(row.timestamp); });
ASSERT_EQ(timestamps.size(), 1u);
EXPECT_EQ(timestamps.front(), 200);
}

// =========================================================================
// Empty deque tests
// =========================================================================
Expand Down
Loading