From a910d26dc8c65675b8f7d55ae74f43421ec88fd3 Mon Sep 17 00:00:00 2001 From: vikonix Date: Thu, 9 Apr 2026 16:52:57 +0200 Subject: [PATCH 01/11] add writer --- quasardb/quasardb/_table.pyi | 2 ++ quasardb/table.cpp | 7 +++++++ quasardb/table.hpp | 4 ++++ 3 files changed, 13 insertions(+) diff --git a/quasardb/quasardb/_table.pyi b/quasardb/quasardb/_table.pyi index 00c792c4..ac70c2da 100644 --- a/quasardb/quasardb/_table.pyi +++ b/quasardb/quasardb/_table.pyi @@ -5,6 +5,7 @@ import typing from typing import Any, Optional, Union from quasardb.quasardb._reader import Reader +from quasardb.quasardb._writer import Writer from quasardb.typing import MaskedArrayAny, NDArrayAny, NDArrayTime, RangeSet from ._entry import Entry @@ -118,6 +119,7 @@ class Table(Entry): batch_size: int = 0, ranges: RangeSet = [], ) -> Reader: ... + def writer(self) -> Writer: ... def retrieve_metadata(self) -> None: ... def string_get_ranges( self, column: str, ranges: Optional[RangeSet] = None diff --git a/quasardb/table.cpp b/quasardb/table.cpp index 818b920a..039fcbb6 100644 --- a/quasardb/table.cpp +++ b/quasardb/table.cpp @@ -5,6 +5,7 @@ #include "reader.hpp" #include "traits.hpp" #include "convert/point.hpp" +#include "writer.hpp" #include // for make_unique namespace qdb @@ -263,4 +264,10 @@ qdb::reader_ptr table::reader( // return std::make_unique(_handle, table_names, column_names, batch_size, ranges); }; +qdb::writer_ptr table::writer() const +{ + _handle->check_open(); + return std::make_unique(_handle); +} + }; // namespace qdb diff --git a/quasardb/table.hpp b/quasardb/table.hpp index 5e3541be..713b6a7b 100644 --- a/quasardb/table.hpp +++ b/quasardb/table.hpp @@ -34,6 +34,7 @@ #include "masked_array.hpp" #include "reader_fwd.hpp" #include "table_fwd.hpp" +#include "writer_fwd.hpp" #include "detail/ts_column.hpp" namespace qdb @@ -163,6 +164,8 @@ class table : public entry std::size_t batch_size, // std::vector const & ranges) const; + qdb::writer_ptr writer() const; + /** * Returns true if this table has a TTL assigned. */ @@ -350,6 +353,7 @@ static inline void register_table(Module & m) py::arg("column_names") = std::vector{}, // py::arg("batch_size") = std::size_t{0}, // py::arg("ranges") = std::vector{}) + .def("writer", &qdb::table::writer) .def("subscribe", &qdb::table::subscribe) .def("erase_ranges", &qdb::table::erase_ranges) From 67d53f9042462189820df6033f10faee892ad491 Mon Sep 17 00:00:00 2001 From: vikonix Date: Thu, 9 Apr 2026 23:12:16 +0200 Subject: [PATCH 02/11] read arrays --- quasardb/numpy/__init__.py | 163 +++++++++++++++++++++++++++++++ tests/test_numpy.py | 195 +++++++++++++++++++++++++++++++++++++ 2 files changed, 358 insertions(+) diff --git a/quasardb/numpy/__init__.py b/quasardb/numpy/__init__.py index 5fead519..d306a0f5 100644 --- a/quasardb/numpy/__init__.py +++ b/quasardb/numpy/__init__.py @@ -705,6 +705,169 @@ def write_array( write_with[ctype](column, index, data) +def _column_infos_by_names( + table: Table, columns: Optional[Sequence[str]] +) -> List[Tuple[str, quasardb.ColumnType]]: + infos = table.list_columns() + + if columns is None or len(columns) == 0: + return [(cinfo.name, cinfo.type) for cinfo in infos] + + if isinstance(columns, str): + raise TypeError("columns is expected to be a sequence of column names, got str") + + by_name = {cinfo.name: (cinfo.name, cinfo.type) for cinfo in infos} + missing = [column for column in columns if column not in by_name] + if missing: + raise KeyError("Column not found: {}".format(missing[0])) + + # Preserve explicit request order. + return [by_name[column] for column in columns] + + +def _concat_masked(xs: List[MaskedArrayAny]) -> MaskedArrayAny: + if len(xs) == 0: + return ma.masked_array(np.array([])) + if len(xs) == 1: + return xs[0] + + return ma.concatenate(xs) + + +def _empty_masked_for_ctype(ctype: quasardb.ColumnType) -> MaskedArrayAny: + return ma.masked_array(np.array([], dtype=_best_dtype_for_ctype(ctype))) + + +def _coerce_ranges(ranges: Any) -> Any: + if ranges is None: + return None + + if isinstance(ranges, np.ndarray): + if ranges.ndim != 2 or ranges.shape[1] != 2: + raise TypeError( + "ranges numpy array is expected to have shape (n, 2), got {}".format( + ranges.shape + ) + ) + + return [(ranges[i, 0], ranges[i, 1]) for i in range(ranges.shape[0])] + + return ranges + + +def _read_arrays_with_reader( + reader: Any, + cinfos: List[Tuple[str, quasardb.ColumnType]], +) -> Tuple[NDArrayTime, Dict[str, MaskedArrayAny]]: + column_names = [cname for (cname, _) in cinfos] + + idx_batches: List[NDArrayTime] = [] + value_batches: Dict[str, List[MaskedArrayAny]] = { + cname: [] for cname in column_names + } + + for batch in reader: + idx_batches.append(batch["$timestamp"]) + + for cname in column_names: + value_batches[cname].append(batch[cname]) + + if len(idx_batches) == 0: + return ( + np.array([], dtype=np.dtype("datetime64[ns]")), + {cname: _empty_masked_for_ctype(ctype) for (cname, ctype) in cinfos}, + ) + + if len(idx_batches) == 1: + return idx_batches[0], { + cname: value_batches[cname][0] for cname in column_names + } + + return np.concatenate(idx_batches), { + cname: _concat_masked(batches) for (cname, batches) in value_batches.items() + } + + +def read_arrays( + cluster: Optional[quasardb.Cluster] = None, + table: Optional[Union[str, Table]] = None, + columns: Optional[Sequence[str]] = None, + ranges: Any = None, +) -> Tuple[NDArrayTime, Dict[str, MaskedArrayAny]]: + """ + Read any number of columns from a table as numpy masked arrays. + + Parameters: + ----------- + + cluster: optional quasardb.Cluster + Active connection to the QuasarDB cluster. + Required when `table` is provided as a table name. + + table: quasardb.Table or str + Table object or table name to read from. + + columns: optional sequence[str] + Column names to read. + If None or an empty sequence is provided, all table columns are read. + + ranges: optional list[tuple] or numpy.ndarray + Time ranges to read. When provided as a numpy array, it is expected to + have shape (n, 2) and contain datetime64[ns] values. + If None, the full available range is read. + + Returns: + -------- + + tuple[numpy.ndarray, dict[str, numpy.ma.MaskedArray]] + A pair consisting of the shared timestamp index and a mapping of column + names to masked arrays. + + Examples: + --------- + + Read all columns: + + >>> idx, cols = qdbnp.read_arrays(table=my_table, columns=[]) + + Read a subset of columns for a given time range: + + >>> idx, cols = qdbnp.read_arrays( + ... table=my_table, + ... columns=["open", "close"], + ... ranges=[(start, end)], + ... ) + >>> opens = cols["open"] + >>> closes = cols["close"] + """ + if table is None: + raise RuntimeError("A table is required.") + + ranges = _coerce_ranges(ranges) + + if isinstance(table, str): + if cluster is None: + raise RuntimeError("A cluster is required when a table name is provided.") + table = table_cache.lookup(table, cluster) + + cinfos = _column_infos_by_names(table, columns) + column_names = [cname for (cname, _) in cinfos] + + reader_kwargs: Dict[str, Any] = {"batch_size": 0} + if len(column_names) > 0: + reader_kwargs["column_names"] = column_names + if ranges is not None: + reader_kwargs["ranges"] = ranges + + if cluster is not None: + reader_kwargs["table_names"] = [table.get_name()] + with cluster.reader(**reader_kwargs) as reader: + return _read_arrays_with_reader(reader, cinfos) + + with table.reader(**reader_kwargs) as reader: + return _read_arrays_with_reader(reader, cinfos) + + def write_arrays( data: Any, cluster: quasardb.Cluster, diff --git a/tests/test_numpy.py b/tests/test_numpy.py index 86dda992..85aa751f 100644 --- a/tests/test_numpy.py +++ b/tests/test_numpy.py @@ -10,6 +10,7 @@ import quasardb import quasardb.numpy as qdbnp +import test_table as tslib from utils import assert_indexed_arrays_equal, assert_ma_equal logger = logging.getLogger("test-numpy") @@ -314,6 +315,200 @@ def test_string_array_returns_unicode(array_with_index_and_table, qdbd_connectio assert qdbnp.dtypes_equal(xs.dtype, np.dtype("unicode")) +def test_read_arrays_reads_all_columns_when_columns_empty(qdbd_connection, table): + index = np.array( + [ + np.datetime64("2017-01-01T00:00:00", "ns"), + np.datetime64("2017-01-01T00:00:01", "ns"), + np.datetime64("2017-01-01T00:00:02", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + doubles = np.array([1.0, 2.0, 3.0], dtype=np.float64) + blobs = np.array([b"a\x00b", b"cd", b"ef"], dtype=np.object_) + strings = np.array(["content_0", "content_1", "content_2"], dtype=np.dtype("U")) + integers = np.array([10, 11, 12], dtype=np.int64) + timestamps = np.array( + [ + np.datetime64("2017-01-02T00:00:00", "ns"), + np.datetime64("2017-01-02T00:00:01", "ns"), + np.datetime64("2017-01-02T00:00:02", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + symbols = np.array(["sym_0", "sym_1", "sym_2"], dtype=np.dtype("U")) + + qdbnp.write_arrays( + { + tslib._double_col_name(table): doubles, + tslib._blob_col_name(table): blobs, + tslib._string_col_name(table): strings, + tslib._int64_col_name(table): integers, + tslib._ts_col_name(table): timestamps, + tslib._symbol_col_name(table): symbols, + }, + qdbd_connection, + table, + index=index, + infer_types=False, + dtype={ + tslib._double_col_name(table): doubles.dtype, + tslib._blob_col_name(table): blobs.dtype, + tslib._string_col_name(table): strings.dtype, + tslib._int64_col_name(table): integers.dtype, + tslib._ts_col_name(table): timestamps.dtype, + tslib._symbol_col_name(table): symbols.dtype, + }, + ) + + idx, xs = qdbnp.read_arrays(table, columns=[]) + + np.testing.assert_array_equal(idx, index) + assert list(xs.keys()) == [c.name for c in table.list_columns()] + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) + np.testing.assert_array_equal(xs[tslib._blob_col_name(table)], blobs) + np.testing.assert_array_equal(xs[tslib._string_col_name(table)], strings) + np.testing.assert_array_equal(xs[tslib._int64_col_name(table)], integers) + np.testing.assert_array_equal(xs[tslib._ts_col_name(table)], timestamps) + np.testing.assert_array_equal(xs[tslib._symbol_col_name(table)], symbols) + + +def test_read_arrays_reads_selected_columns_with_ranges(qdbd_connection, table): + index = np.array( + [ + np.datetime64("2017-01-01T00:00:00", "ns"), + np.datetime64("2017-01-01T00:00:01", "ns"), + np.datetime64("2017-01-01T00:00:02", "ns"), + np.datetime64("2017-01-01T00:00:03", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + doubles = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float64) + integers = np.array([10, 11, 12, 13], dtype=np.int64) + + qdbnp.write_arrays( + { + tslib._double_col_name(table): doubles, + tslib._int64_col_name(table): integers, + }, + qdbd_connection, + table, + index=index, + infer_types=False, + dtype={ + tslib._double_col_name(table): doubles.dtype, + tslib._int64_col_name(table): integers.dtype, + }, + ) + + columns = [tslib._double_col_name(table), tslib._int64_col_name(table)] + ranges = [(index[1], index[2] + np.timedelta64(1, "ns"))] + + idx, xs = qdbnp.read_arrays(table, columns=columns, ranges=ranges) + + np.testing.assert_array_equal(idx, index[1:3]) + assert list(xs.keys()) == columns + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles[1:3]) + np.testing.assert_array_equal(xs[tslib._int64_col_name(table)], integers[1:3]) + + +def test_read_arrays_accepts_numpy_ranges(qdbd_connection, table): + index = np.array( + [ + np.datetime64("2017-01-01T00:00:00", "ns"), + np.datetime64("2017-01-01T00:00:01", "ns"), + np.datetime64("2017-01-01T00:00:02", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + doubles = np.array([1.0, 2.0, 3.0], dtype=np.float64) + + qdbnp.write_arrays( + {tslib._double_col_name(table): doubles}, + qdbd_connection, + table, + index=index, + infer_types=False, + dtype={tslib._double_col_name(table): doubles.dtype}, + ) + + ranges = np.array([(index[1], index[2] + np.timedelta64(1, "ns"))]) + idx, xs = qdbnp.read_arrays( + table, columns=[tslib._double_col_name(table)], ranges=ranges + ) + + np.testing.assert_array_equal(idx, index[1:3]) + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles[1:3]) + + +def test_read_arrays_supports_cluster_with_table_object(qdbd_connection, table): + index = np.array( + [ + np.datetime64("2017-01-01T00:00:00", "ns"), + np.datetime64("2017-01-01T00:00:01", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + doubles = np.array([1.0, 2.0], dtype=np.float64) + + qdbnp.write_arrays( + {tslib._double_col_name(table): doubles}, + qdbd_connection, + table, + index=index, + infer_types=False, + dtype={tslib._double_col_name(table): doubles.dtype}, + ) + + idx, xs = qdbnp.read_arrays( + table, + columns=[tslib._double_col_name(table)], + cluster=qdbd_connection, + ) + + np.testing.assert_array_equal(idx, index) + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) + + +def test_read_arrays_supports_cluster_with_table_name(qdbd_connection, table): + index = np.array( + [ + np.datetime64("2017-01-01T00:00:00", "ns"), + np.datetime64("2017-01-01T00:00:01", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + doubles = np.array([1.0, 2.0], dtype=np.float64) + + qdbnp.write_arrays( + {tslib._double_col_name(table): doubles}, + qdbd_connection, + table, + index=index, + infer_types=False, + dtype={tslib._double_col_name(table): doubles.dtype}, + ) + + idx, xs = qdbnp.read_arrays( + table.get_name(), + columns=[tslib._double_col_name(table)], + cluster=qdbd_connection, + ) + + np.testing.assert_array_equal(idx, index) + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) + + +def test_read_arrays_rejects_string_columns(table): + with pytest.raises(TypeError): + qdbnp.read_arrays(table, columns="the_double") + + +def test_read_arrays_rejects_table_name_without_cluster(table): + with pytest.raises(RuntimeError): + qdbnp.read_arrays(table.get_name(), columns=["the_double"]) + + ###### # # Query tests From 00729193b80b183c91fbd39229b75ae2527e8f72 Mon Sep 17 00:00:00 2001 From: vikonix Date: Thu, 9 Apr 2026 23:26:17 +0200 Subject: [PATCH 03/11] fix calls --- tests/test_numpy.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/test_numpy.py b/tests/test_numpy.py index 85aa751f..0204f2bc 100644 --- a/tests/test_numpy.py +++ b/tests/test_numpy.py @@ -361,7 +361,7 @@ def test_read_arrays_reads_all_columns_when_columns_empty(qdbd_connection, table }, ) - idx, xs = qdbnp.read_arrays(table, columns=[]) + idx, xs = qdbnp.read_arrays(table=table, columns=[]) np.testing.assert_array_equal(idx, index) assert list(xs.keys()) == [c.name for c in table.list_columns()] @@ -404,7 +404,7 @@ def test_read_arrays_reads_selected_columns_with_ranges(qdbd_connection, table): columns = [tslib._double_col_name(table), tslib._int64_col_name(table)] ranges = [(index[1], index[2] + np.timedelta64(1, "ns"))] - idx, xs = qdbnp.read_arrays(table, columns=columns, ranges=ranges) + idx, xs = qdbnp.read_arrays(table=table, columns=columns, ranges=ranges) np.testing.assert_array_equal(idx, index[1:3]) assert list(xs.keys()) == columns @@ -434,7 +434,7 @@ def test_read_arrays_accepts_numpy_ranges(qdbd_connection, table): ranges = np.array([(index[1], index[2] + np.timedelta64(1, "ns"))]) idx, xs = qdbnp.read_arrays( - table, columns=[tslib._double_col_name(table)], ranges=ranges + table=table, columns=[tslib._double_col_name(table)], ranges=ranges ) np.testing.assert_array_equal(idx, index[1:3]) @@ -461,7 +461,7 @@ def test_read_arrays_supports_cluster_with_table_object(qdbd_connection, table): ) idx, xs = qdbnp.read_arrays( - table, + table=table, columns=[tslib._double_col_name(table)], cluster=qdbd_connection, ) @@ -490,7 +490,7 @@ def test_read_arrays_supports_cluster_with_table_name(qdbd_connection, table): ) idx, xs = qdbnp.read_arrays( - table.get_name(), + table=table.get_name(), columns=[tslib._double_col_name(table)], cluster=qdbd_connection, ) @@ -501,12 +501,12 @@ def test_read_arrays_supports_cluster_with_table_name(qdbd_connection, table): def test_read_arrays_rejects_string_columns(table): with pytest.raises(TypeError): - qdbnp.read_arrays(table, columns="the_double") + qdbnp.read_arrays(table=table, columns="the_double") def test_read_arrays_rejects_table_name_without_cluster(table): with pytest.raises(RuntimeError): - qdbnp.read_arrays(table.get_name(), columns=["the_double"]) + qdbnp.read_arrays(table=table.get_name(), columns=["the_double"]) ###### From 36c86ded08229d2fa14f3a5d3485f2704213d8d5 Mon Sep 17 00:00:00 2001 From: vikonix Date: Fri, 10 Apr 2026 11:34:05 +0200 Subject: [PATCH 04/11] revert writer --- quasardb/quasardb/_table.pyi | 2 -- quasardb/table.cpp | 7 ------- quasardb/table.hpp | 4 ---- 3 files changed, 13 deletions(-) diff --git a/quasardb/quasardb/_table.pyi b/quasardb/quasardb/_table.pyi index ac70c2da..00c792c4 100644 --- a/quasardb/quasardb/_table.pyi +++ b/quasardb/quasardb/_table.pyi @@ -5,7 +5,6 @@ import typing from typing import Any, Optional, Union from quasardb.quasardb._reader import Reader -from quasardb.quasardb._writer import Writer from quasardb.typing import MaskedArrayAny, NDArrayAny, NDArrayTime, RangeSet from ._entry import Entry @@ -119,7 +118,6 @@ class Table(Entry): batch_size: int = 0, ranges: RangeSet = [], ) -> Reader: ... - def writer(self) -> Writer: ... def retrieve_metadata(self) -> None: ... def string_get_ranges( self, column: str, ranges: Optional[RangeSet] = None diff --git a/quasardb/table.cpp b/quasardb/table.cpp index 039fcbb6..818b920a 100644 --- a/quasardb/table.cpp +++ b/quasardb/table.cpp @@ -5,7 +5,6 @@ #include "reader.hpp" #include "traits.hpp" #include "convert/point.hpp" -#include "writer.hpp" #include // for make_unique namespace qdb @@ -264,10 +263,4 @@ qdb::reader_ptr table::reader( // return std::make_unique(_handle, table_names, column_names, batch_size, ranges); }; -qdb::writer_ptr table::writer() const -{ - _handle->check_open(); - return std::make_unique(_handle); -} - }; // namespace qdb diff --git a/quasardb/table.hpp b/quasardb/table.hpp index 713b6a7b..5e3541be 100644 --- a/quasardb/table.hpp +++ b/quasardb/table.hpp @@ -34,7 +34,6 @@ #include "masked_array.hpp" #include "reader_fwd.hpp" #include "table_fwd.hpp" -#include "writer_fwd.hpp" #include "detail/ts_column.hpp" namespace qdb @@ -164,8 +163,6 @@ class table : public entry std::size_t batch_size, // std::vector const & ranges) const; - qdb::writer_ptr writer() const; - /** * Returns true if this table has a TTL assigned. */ @@ -353,7 +350,6 @@ static inline void register_table(Module & m) py::arg("column_names") = std::vector{}, // py::arg("batch_size") = std::size_t{0}, // py::arg("ranges") = std::vector{}) - .def("writer", &qdb::table::writer) .def("subscribe", &qdb::table::subscribe) .def("erase_ranges", &qdb::table::erase_ranges) From bd3a6696c67d882ba27f50d6380b02e9f0212049 Mon Sep 17 00:00:00 2001 From: vikonix Date: Fri, 10 Apr 2026 14:07:09 +0200 Subject: [PATCH 05/11] variant 2 --- quasardb/numpy/__init__.py | 181 ++++++++++++++++++------------------ quasardb/pandas/__init__.py | 45 +++------ tests/test_numpy.py | 73 +++++++++++---- 3 files changed, 156 insertions(+), 143 deletions(-) diff --git a/quasardb/numpy/__init__.py b/quasardb/numpy/__init__.py index d306a0f5..c6fbff40 100644 --- a/quasardb/numpy/__init__.py +++ b/quasardb/numpy/__init__.py @@ -31,16 +31,19 @@ import logging import time import warnings -from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Type, Union +from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, Type, Union import quasardb -import quasardb.table_cache as table_cache -from quasardb.quasardb import Table, Writer -from quasardb.typing import DType, MaskedArrayAny, NDArrayAny, NDArrayTime +from quasardb.quasardb import Cluster, Table, Writer +from quasardb.typing import DType, MaskedArrayAny, NDArrayAny, NDArrayTime, RangeSet logger = logging.getLogger("quasardb.numpy") +TableLike = Union[str, Table] +IndexedMaskedArrays = Tuple[NDArrayTime, Dict[str, MaskedArrayAny]] + + class NumpyRequired(ImportError): """ Exception raised when trying to use QuasarDB pandas integration, but @@ -705,26 +708,6 @@ def write_array( write_with[ctype](column, index, data) -def _column_infos_by_names( - table: Table, columns: Optional[Sequence[str]] -) -> List[Tuple[str, quasardb.ColumnType]]: - infos = table.list_columns() - - if columns is None or len(columns) == 0: - return [(cinfo.name, cinfo.type) for cinfo in infos] - - if isinstance(columns, str): - raise TypeError("columns is expected to be a sequence of column names, got str") - - by_name = {cinfo.name: (cinfo.name, cinfo.type) for cinfo in infos} - missing = [column for column in columns if column not in by_name] - if missing: - raise KeyError("Column not found: {}".format(missing[0])) - - # Preserve explicit request order. - return [by_name[column] for column in columns] - - def _concat_masked(xs: List[MaskedArrayAny]) -> MaskedArrayAny: if len(xs) == 0: return ma.masked_array(np.array([])) @@ -734,53 +717,31 @@ def _concat_masked(xs: List[MaskedArrayAny]) -> MaskedArrayAny: return ma.concatenate(xs) -def _empty_masked_for_ctype(ctype: quasardb.ColumnType) -> MaskedArrayAny: - return ma.masked_array(np.array([], dtype=_best_dtype_for_ctype(ctype))) +def _clean_batch(batch: Dict[str, MaskedArrayAny]) -> IndexedMaskedArrays: + assert "$timestamp" in batch + idx = batch["$timestamp"] + xs = {cname: values for (cname, values) in batch.items() if cname != "$timestamp"} -def _coerce_ranges(ranges: Any) -> Any: - if ranges is None: - return None - - if isinstance(ranges, np.ndarray): - if ranges.ndim != 2 or ranges.shape[1] != 2: - raise TypeError( - "ranges numpy array is expected to have shape (n, 2), got {}".format( - ranges.shape - ) - ) - - return [(ranges[i, 0], ranges[i, 1]) for i in range(ranges.shape[0])] - - return ranges + return idx, xs -def _read_arrays_with_reader( - reader: Any, - cinfos: List[Tuple[str, quasardb.ColumnType]], -) -> Tuple[NDArrayTime, Dict[str, MaskedArrayAny]]: - column_names = [cname for (cname, _) in cinfos] - +def _concat_array_batches(xs: Iterable[IndexedMaskedArrays]) -> IndexedMaskedArrays: idx_batches: List[NDArrayTime] = [] - value_batches: Dict[str, List[MaskedArrayAny]] = { - cname: [] for cname in column_names - } + value_batches: Dict[str, List[MaskedArrayAny]] = {} - for batch in reader: - idx_batches.append(batch["$timestamp"]) + for idx, batch in xs: + idx_batches.append(idx) - for cname in column_names: - value_batches[cname].append(batch[cname]) + for cname, values in batch.items(): + value_batches.setdefault(cname, []).append(values) if len(idx_batches) == 0: - return ( - np.array([], dtype=np.dtype("datetime64[ns]")), - {cname: _empty_masked_for_ctype(ctype) for (cname, ctype) in cinfos}, - ) + raise ValueError("No array batches to concatenate") if len(idx_batches) == 1: return idx_batches[0], { - cname: value_batches[cname][0] for cname in column_names + cname: batches[0] for cname, batches in value_batches.items() } return np.concatenate(idx_batches), { @@ -789,25 +750,30 @@ def _read_arrays_with_reader( def read_arrays( - cluster: Optional[quasardb.Cluster] = None, - table: Optional[Union[str, Table]] = None, - columns: Optional[Sequence[str]] = None, - ranges: Any = None, -) -> Tuple[NDArrayTime, Dict[str, MaskedArrayAny]]: + conn: Cluster, + tables: List[TableLike], + *, + batch_size: Optional[int] = 2**16, + column_names: Optional[Sequence[str]] = None, + ranges: Optional[RangeSet] = None, +) -> IndexedMaskedArrays: """ Read any number of columns from a table as numpy masked arrays. Parameters: ----------- - cluster: optional quasardb.Cluster - Active connection to the QuasarDB cluster. - Required when `table` is provided as a table name. + conn: quasardb.Cluster + Connection to the QuasarDB database. - table: quasardb.Table or str - Table object or table name to read from. + tables : list[str | quasardb.Table] + QuasarDB tables to read, as a list of strings or quasardb table objects. - columns: optional sequence[str] + batch_size : int + The amount of rows to fetch in a single read operation. If unset, uses 2^16 + (65536) rows as batch size by default. + + column_names: optional sequence[str] Column names to read. If None or an empty sequence is provided, all table columns are read. @@ -828,44 +794,73 @@ def read_arrays( Read all columns: - >>> idx, cols = qdbnp.read_arrays(table=my_table, columns=[]) + >>> idx, cols = qdbnp.read_arrays(conn, [my_table], column_names=[]) Read a subset of columns for a given time range: >>> idx, cols = qdbnp.read_arrays( - ... table=my_table, - ... columns=["open", "close"], + ... conn, + ... [my_table], + ... column_names=["open", "close"], ... ranges=[(start, end)], ... ) >>> opens = cols["open"] >>> closes = cols["close"] """ - if table is None: - raise RuntimeError("A table is required.") + xs = stream_arrays( + conn, + tables, + batch_size=batch_size, + column_names=column_names, + ranges=ranges, + ) - ranges = _coerce_ranges(ranges) + try: + return _concat_array_batches(xs) + except ValueError as e: + logger.error( + "Error while concatenating arrays. This can happen if result set is empty. Returning empty arrays. Error: %s", + e, + ) + return np.array([], dtype=np.dtype("datetime64[ns]")), {} - if isinstance(table, str): - if cluster is None: - raise RuntimeError("A cluster is required when a table name is provided.") - table = table_cache.lookup(table, cluster) - cinfos = _column_infos_by_names(table, columns) - column_names = [cname for (cname, _) in cinfos] +def stream_arrays( + conn: Cluster, + tables: List[TableLike], + *, + batch_size: Optional[int] = 2**16, + column_names: Optional[Sequence[str]] = None, + ranges: Optional[RangeSet] = None, +) -> Iterator[IndexedMaskedArrays]: + """ + Read one or more tables as numpy masked arrays. Returns a generator with + indexed batches of size `batch_size`, which is useful when traversing a + large dataset which does not fit into memory. + """ + # Sanitize batch_size + if batch_size is None: + batch_size = 2**16 + elif not isinstance(batch_size, int): + raise TypeError( + "batch_size should be an integer, but got: {} with value {}".format( + type(batch_size), str(batch_size) + ) + ) - reader_kwargs: Dict[str, Any] = {"batch_size": 0} - if len(column_names) > 0: - reader_kwargs["column_names"] = column_names - if ranges is not None: - reader_kwargs["ranges"] = ranges + kwargs: Dict[str, Any] = {"batch_size": batch_size} - if cluster is not None: - reader_kwargs["table_names"] = [table.get_name()] - with cluster.reader(**reader_kwargs) as reader: - return _read_arrays_with_reader(reader, cinfos) + if column_names: + kwargs["column_names"] = column_names + + if ranges: + kwargs["ranges"] = ranges - with table.reader(**reader_kwargs) as reader: - return _read_arrays_with_reader(reader, cinfos) + coerce_table_name_fn = lambda x: x if isinstance(x, str) else x.get_name() + kwargs["table_names"] = [coerce_table_name_fn(x) for x in tables] + with conn.reader(**kwargs) as reader: + for batch in reader: + yield _clean_batch(batch) def write_arrays( diff --git a/quasardb/pandas/__init__.py b/quasardb/pandas/__init__.py index 1221ded8..a091e5cf 100644 --- a/quasardb/pandas/__init__.py +++ b/quasardb/pandas/__init__.py @@ -250,10 +250,9 @@ def stream_dataframes( ranges: Optional[RangeSet] = None, ) -> Iterator[pd.DataFrame]: """ - Read a Pandas Dataframe from a QuasarDB Timeseries table. Returns a generator with dataframes of size `batch_size`, which is useful - when traversing a large dataset which does not fit into memory. - - Accepts the same parameters as `stream_dataframes`. + Read Pandas DataFrames from one or more QuasarDB tables. Returns a generator + with dataframes of size `batch_size`, which is useful when traversing a + large dataset which does not fit into memory. Parameters: ----------- @@ -279,36 +278,14 @@ def stream_dataframes( Defaults to the entire table. """ - # Sanitize batch_size - if batch_size is None: - batch_size = 2**16 - elif not isinstance(batch_size, int): - raise TypeError( - "batch_size should be an integer, but got: {} with value {}".format( - type(batch_size), str(batch_size) - ) - ) - - kwargs: Dict[str, Any] = {"batch_size": batch_size} - - if column_names: - kwargs["column_names"] = column_names - - if ranges: - kwargs["ranges"] = ranges - - coerce_table_name_fn = lambda x: x if isinstance(x, str) else x.get_name() - kwargs["table_names"] = [coerce_table_name_fn(x) for x in tables] - - with conn.reader(**kwargs) as reader: - for batch in reader: - # We always expect the timestamp column, and set this as the index - assert "$timestamp" in batch - - idx = pd.Index(batch.pop("$timestamp"), copy=False, name="$timestamp") - df = pd.DataFrame(batch, index=idx) - - yield df + for idx, xs in qdbnp.stream_arrays( + conn, + tables, + batch_size=batch_size, + column_names=column_names, + ranges=ranges, + ): + yield pd.DataFrame(xs, index=pd.Index(idx, copy=False, name="$timestamp")) def stream_dataframe( diff --git a/tests/test_numpy.py b/tests/test_numpy.py index 0204f2bc..29dcd614 100644 --- a/tests/test_numpy.py +++ b/tests/test_numpy.py @@ -361,7 +361,7 @@ def test_read_arrays_reads_all_columns_when_columns_empty(qdbd_connection, table }, ) - idx, xs = qdbnp.read_arrays(table=table, columns=[]) + idx, xs = qdbnp.read_arrays(qdbd_connection, [table], column_names=[]) np.testing.assert_array_equal(idx, index) assert list(xs.keys()) == [c.name for c in table.list_columns()] @@ -404,7 +404,13 @@ def test_read_arrays_reads_selected_columns_with_ranges(qdbd_connection, table): columns = [tslib._double_col_name(table), tslib._int64_col_name(table)] ranges = [(index[1], index[2] + np.timedelta64(1, "ns"))] - idx, xs = qdbnp.read_arrays(table=table, columns=columns, ranges=ranges) + idx, xs = qdbnp.read_arrays( + qdbd_connection, + [table], + batch_size=1, + column_names=columns, + ranges=ranges, + ) np.testing.assert_array_equal(idx, index[1:3]) assert list(xs.keys()) == columns @@ -434,14 +440,17 @@ def test_read_arrays_accepts_numpy_ranges(qdbd_connection, table): ranges = np.array([(index[1], index[2] + np.timedelta64(1, "ns"))]) idx, xs = qdbnp.read_arrays( - table=table, columns=[tslib._double_col_name(table)], ranges=ranges + qdbd_connection, + [table], + column_names=[tslib._double_col_name(table)], + ranges=ranges, ) np.testing.assert_array_equal(idx, index[1:3]) np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles[1:3]) -def test_read_arrays_supports_cluster_with_table_object(qdbd_connection, table): +def test_read_arrays_supports_table_object(qdbd_connection, table): index = np.array( [ np.datetime64("2017-01-01T00:00:00", "ns"), @@ -461,16 +470,16 @@ def test_read_arrays_supports_cluster_with_table_object(qdbd_connection, table): ) idx, xs = qdbnp.read_arrays( - table=table, - columns=[tslib._double_col_name(table)], - cluster=qdbd_connection, + qdbd_connection, + [table], + column_names=[tslib._double_col_name(table)], ) np.testing.assert_array_equal(idx, index) np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) -def test_read_arrays_supports_cluster_with_table_name(qdbd_connection, table): +def test_read_arrays_supports_table_name(qdbd_connection, table): index = np.array( [ np.datetime64("2017-01-01T00:00:00", "ns"), @@ -490,23 +499,55 @@ def test_read_arrays_supports_cluster_with_table_name(qdbd_connection, table): ) idx, xs = qdbnp.read_arrays( - table=table.get_name(), - columns=[tslib._double_col_name(table)], - cluster=qdbd_connection, + qdbd_connection, + [table.get_name()], + column_names=[tslib._double_col_name(table)], ) np.testing.assert_array_equal(idx, index) np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) -def test_read_arrays_rejects_string_columns(table): +def test_read_arrays_rejects_string_column_names(qdbd_connection, table): with pytest.raises(TypeError): - qdbnp.read_arrays(table=table, columns="the_double") + qdbnp.read_arrays(qdbd_connection, [table], column_names="the_double") + + +def test_stream_arrays_reads_batched_results(qdbd_connection, table): + index = np.array( + [ + np.datetime64("2017-01-01T00:00:00", "ns"), + np.datetime64("2017-01-01T00:00:01", "ns"), + np.datetime64("2017-01-01T00:00:02", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + doubles = np.array([1.0, 2.0, 3.0], dtype=np.float64) + qdbnp.write_arrays( + {tslib._double_col_name(table): doubles}, + qdbd_connection, + table, + index=index, + infer_types=False, + dtype={tslib._double_col_name(table): doubles.dtype}, + ) -def test_read_arrays_rejects_table_name_without_cluster(table): - with pytest.raises(RuntimeError): - qdbnp.read_arrays(table=table.get_name(), columns=["the_double"]) + xs = list( + qdbnp.stream_arrays( + qdbd_connection, + [table], + batch_size=1, + column_names=[tslib._double_col_name(table)], + ) + ) + + assert len(xs) == 3 + np.testing.assert_array_equal(np.concatenate([idx for idx, _ in xs]), index) + np.testing.assert_array_equal( + ma.concatenate([batch[tslib._double_col_name(table)] for _, batch in xs]), + doubles, + ) ###### From ab4f3f7013b973c53c2219b475d4af38a74a8190 Mon Sep 17 00:00:00 2001 From: vikonix Date: Fri, 10 Apr 2026 14:09:43 +0200 Subject: [PATCH 06/11] cosmetic --- quasardb/numpy/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/quasardb/numpy/__init__.py b/quasardb/numpy/__init__.py index c6fbff40..ed2844ba 100644 --- a/quasardb/numpy/__init__.py +++ b/quasardb/numpy/__init__.py @@ -717,7 +717,7 @@ def _concat_masked(xs: List[MaskedArrayAny]) -> MaskedArrayAny: return ma.concatenate(xs) -def _clean_batch(batch: Dict[str, MaskedArrayAny]) -> IndexedMaskedArrays: +def _reader_batch_to_arrays(batch: Dict[str, MaskedArrayAny]) -> IndexedMaskedArrays: assert "$timestamp" in batch idx = batch["$timestamp"] @@ -860,7 +860,7 @@ def stream_arrays( kwargs["table_names"] = [coerce_table_name_fn(x) for x in tables] with conn.reader(**kwargs) as reader: for batch in reader: - yield _clean_batch(batch) + yield _reader_batch_to_arrays(batch) def write_arrays( From 5680e5164ad1075db35f30b4ea934e70acf7d220 Mon Sep 17 00:00:00 2001 From: vikonix Date: Fri, 10 Apr 2026 14:14:27 +0200 Subject: [PATCH 07/11] cosmetic 2 --- quasardb/numpy/__init__.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/quasardb/numpy/__init__.py b/quasardb/numpy/__init__.py index ed2844ba..5e4f6439 100644 --- a/quasardb/numpy/__init__.py +++ b/quasardb/numpy/__init__.py @@ -758,7 +758,7 @@ def read_arrays( ranges: Optional[RangeSet] = None, ) -> IndexedMaskedArrays: """ - Read any number of columns from a table as numpy masked arrays. + Read any number of columns from tables as numpy masked arrays. Parameters: ----------- @@ -777,9 +777,8 @@ def read_arrays( Column names to read. If None or an empty sequence is provided, all table columns are read. - ranges: optional list[tuple] or numpy.ndarray - Time ranges to read. When provided as a numpy array, it is expected to - have shape (n, 2) and contain datetime64[ns] values. + ranges: optional list[tuple] + Time ranges to read. If None, the full available range is read. Returns: From e33747a12d2481cf5d4aada28147e48e1bfd3b7a Mon Sep 17 00:00:00 2001 From: vikonix Date: Fri, 10 Apr 2026 14:17:49 +0200 Subject: [PATCH 08/11] cosmetic 3 --- quasardb/pandas/__init__.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/quasardb/pandas/__init__.py b/quasardb/pandas/__init__.py index a091e5cf..9c72c47e 100644 --- a/quasardb/pandas/__init__.py +++ b/quasardb/pandas/__init__.py @@ -250,9 +250,8 @@ def stream_dataframes( ranges: Optional[RangeSet] = None, ) -> Iterator[pd.DataFrame]: """ - Read Pandas DataFrames from one or more QuasarDB tables. Returns a generator - with dataframes of size `batch_size`, which is useful when traversing a - large dataset which does not fit into memory. + Read a Pandas Dataframe from a QuasarDB Timeseries table. Returns a generator with dataframes of size `batch_size`, which is useful + when traversing a large dataset which does not fit into memory. Parameters: ----------- From 851ebbcf72e9ce484670fe4e4fae0e01e52af6d9 Mon Sep 17 00:00:00 2001 From: vikonix Date: Fri, 10 Apr 2026 14:20:55 +0200 Subject: [PATCH 09/11] formatting --- quasardb/numpy/__init__.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/quasardb/numpy/__init__.py b/quasardb/numpy/__init__.py index 5e4f6439..c12647ff 100644 --- a/quasardb/numpy/__init__.py +++ b/quasardb/numpy/__init__.py @@ -31,7 +31,18 @@ import logging import time import warnings -from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, Type, Union +from typing import ( + Any, + Dict, + Iterable, + Iterator, + List, + Optional, + Sequence, + Tuple, + Type, + Union, +) import quasardb from quasardb.quasardb import Cluster, Table, Writer From 4e2f75dd24983c9897ebf66ea52dbc3bc4502cbe Mon Sep 17 00:00:00 2001 From: vikonix Date: Fri, 10 Apr 2026 14:24:52 +0200 Subject: [PATCH 10/11] validation --- quasardb/numpy/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/quasardb/numpy/__init__.py b/quasardb/numpy/__init__.py index c12647ff..bb1f6f89 100644 --- a/quasardb/numpy/__init__.py +++ b/quasardb/numpy/__init__.py @@ -45,6 +45,7 @@ ) import quasardb +import quasardb.table_cache as table_cache from quasardb.quasardb import Cluster, Table, Writer from quasardb.typing import DType, MaskedArrayAny, NDArrayAny, NDArrayTime, RangeSet From ac5d6d628eb3878b1345d8e928ede6155e987b60 Mon Sep 17 00:00:00 2001 From: vikonix Date: Fri, 10 Apr 2026 14:36:25 +0200 Subject: [PATCH 11/11] fix tests --- tests/test_numpy.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/tests/test_numpy.py b/tests/test_numpy.py index 29dcd614..e816f3e0 100644 --- a/tests/test_numpy.py +++ b/tests/test_numpy.py @@ -364,7 +364,10 @@ def test_read_arrays_reads_all_columns_when_columns_empty(qdbd_connection, table idx, xs = qdbnp.read_arrays(qdbd_connection, [table], column_names=[]) np.testing.assert_array_equal(idx, index) - assert list(xs.keys()) == [c.name for c in table.list_columns()] + assert list(xs.keys()) == ["$table"] + [c.name for c in table.list_columns()] + np.testing.assert_array_equal( + xs["$table"], np.array([table.get_name()] * len(index)) + ) np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) np.testing.assert_array_equal(xs[tslib._blob_col_name(table)], blobs) np.testing.assert_array_equal(xs[tslib._string_col_name(table)], strings) @@ -413,12 +416,15 @@ def test_read_arrays_reads_selected_columns_with_ranges(qdbd_connection, table): ) np.testing.assert_array_equal(idx, index[1:3]) - assert list(xs.keys()) == columns + assert list(xs.keys()) == ["$table"] + columns + np.testing.assert_array_equal( + xs["$table"], np.array([table.get_name()] * len(index[1:3])) + ) np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles[1:3]) np.testing.assert_array_equal(xs[tslib._int64_col_name(table)], integers[1:3]) -def test_read_arrays_accepts_numpy_ranges(qdbd_connection, table): +def test_read_arrays_accepts_ranges(qdbd_connection, table): index = np.array( [ np.datetime64("2017-01-01T00:00:00", "ns"), @@ -438,7 +444,7 @@ def test_read_arrays_accepts_numpy_ranges(qdbd_connection, table): dtype={tslib._double_col_name(table): doubles.dtype}, ) - ranges = np.array([(index[1], index[2] + np.timedelta64(1, "ns"))]) + ranges = [(index[1], index[2] + np.timedelta64(1, "ns"))] idx, xs = qdbnp.read_arrays( qdbd_connection, [table], @@ -447,6 +453,9 @@ def test_read_arrays_accepts_numpy_ranges(qdbd_connection, table): ) np.testing.assert_array_equal(idx, index[1:3]) + np.testing.assert_array_equal( + xs["$table"], np.array([table.get_name()] * len(index[1:3])) + ) np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles[1:3]) @@ -476,6 +485,9 @@ def test_read_arrays_supports_table_object(qdbd_connection, table): ) np.testing.assert_array_equal(idx, index) + np.testing.assert_array_equal( + xs["$table"], np.array([table.get_name()] * len(index)) + ) np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) @@ -505,6 +517,9 @@ def test_read_arrays_supports_table_name(qdbd_connection, table): ) np.testing.assert_array_equal(idx, index) + np.testing.assert_array_equal( + xs["$table"], np.array([table.get_name()] * len(index)) + ) np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles)