diff --git a/quasardb/numpy/__init__.py b/quasardb/numpy/__init__.py index 5fead519..bb1f6f89 100644 --- a/quasardb/numpy/__init__.py +++ b/quasardb/numpy/__init__.py @@ -31,16 +31,31 @@ import logging import time import warnings -from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Type, Union +from typing import ( + Any, + Dict, + Iterable, + Iterator, + List, + Optional, + Sequence, + Tuple, + Type, + Union, +) import quasardb import quasardb.table_cache as table_cache -from quasardb.quasardb import Table, Writer -from quasardb.typing import DType, MaskedArrayAny, NDArrayAny, NDArrayTime +from quasardb.quasardb import Cluster, Table, Writer +from quasardb.typing import DType, MaskedArrayAny, NDArrayAny, NDArrayTime, RangeSet logger = logging.getLogger("quasardb.numpy") +TableLike = Union[str, Table] +IndexedMaskedArrays = Tuple[NDArrayTime, Dict[str, MaskedArrayAny]] + + class NumpyRequired(ImportError): """ Exception raised when trying to use QuasarDB pandas integration, but @@ -705,6 +720,160 @@ def write_array( write_with[ctype](column, index, data) +def _concat_masked(xs: List[MaskedArrayAny]) -> MaskedArrayAny: + if len(xs) == 0: + return ma.masked_array(np.array([])) + if len(xs) == 1: + return xs[0] + + return ma.concatenate(xs) + + +def _reader_batch_to_arrays(batch: Dict[str, MaskedArrayAny]) -> IndexedMaskedArrays: + assert "$timestamp" in batch + + idx = batch["$timestamp"] + xs = {cname: values for (cname, values) in batch.items() if cname != "$timestamp"} + + return idx, xs + + +def _concat_array_batches(xs: Iterable[IndexedMaskedArrays]) -> IndexedMaskedArrays: + idx_batches: List[NDArrayTime] = [] + value_batches: Dict[str, List[MaskedArrayAny]] = {} + + for idx, batch in xs: + idx_batches.append(idx) + + for cname, values in batch.items(): + value_batches.setdefault(cname, []).append(values) + + if len(idx_batches) == 0: + raise ValueError("No array batches to concatenate") + + if len(idx_batches) == 1: + return idx_batches[0], { + cname: batches[0] for cname, batches in value_batches.items() + } + + return np.concatenate(idx_batches), { + cname: _concat_masked(batches) for (cname, batches) in value_batches.items() + } + + +def read_arrays( + conn: Cluster, + tables: List[TableLike], + *, + batch_size: Optional[int] = 2**16, + column_names: Optional[Sequence[str]] = None, + ranges: Optional[RangeSet] = None, +) -> IndexedMaskedArrays: + """ + Read any number of columns from tables as numpy masked arrays. + + Parameters: + ----------- + + conn: quasardb.Cluster + Connection to the QuasarDB database. + + tables : list[str | quasardb.Table] + QuasarDB tables to read, as a list of strings or quasardb table objects. + + batch_size : int + The amount of rows to fetch in a single read operation. If unset, uses 2^16 + (65536) rows as batch size by default. + + column_names: optional sequence[str] + Column names to read. + If None or an empty sequence is provided, all table columns are read. + + ranges: optional list[tuple] + Time ranges to read. + If None, the full available range is read. + + Returns: + -------- + + tuple[numpy.ndarray, dict[str, numpy.ma.MaskedArray]] + A pair consisting of the shared timestamp index and a mapping of column + names to masked arrays. + + Examples: + --------- + + Read all columns: + + >>> idx, cols = qdbnp.read_arrays(conn, [my_table], column_names=[]) + + Read a subset of columns for a given time range: + + >>> idx, cols = qdbnp.read_arrays( + ... conn, + ... [my_table], + ... column_names=["open", "close"], + ... ranges=[(start, end)], + ... ) + >>> opens = cols["open"] + >>> closes = cols["close"] + """ + xs = stream_arrays( + conn, + tables, + batch_size=batch_size, + column_names=column_names, + ranges=ranges, + ) + + try: + return _concat_array_batches(xs) + except ValueError as e: + logger.error( + "Error while concatenating arrays. This can happen if result set is empty. Returning empty arrays. Error: %s", + e, + ) + return np.array([], dtype=np.dtype("datetime64[ns]")), {} + + +def stream_arrays( + conn: Cluster, + tables: List[TableLike], + *, + batch_size: Optional[int] = 2**16, + column_names: Optional[Sequence[str]] = None, + ranges: Optional[RangeSet] = None, +) -> Iterator[IndexedMaskedArrays]: + """ + Read one or more tables as numpy masked arrays. Returns a generator with + indexed batches of size `batch_size`, which is useful when traversing a + large dataset which does not fit into memory. + """ + # Sanitize batch_size + if batch_size is None: + batch_size = 2**16 + elif not isinstance(batch_size, int): + raise TypeError( + "batch_size should be an integer, but got: {} with value {}".format( + type(batch_size), str(batch_size) + ) + ) + + kwargs: Dict[str, Any] = {"batch_size": batch_size} + + if column_names: + kwargs["column_names"] = column_names + + if ranges: + kwargs["ranges"] = ranges + + coerce_table_name_fn = lambda x: x if isinstance(x, str) else x.get_name() + kwargs["table_names"] = [coerce_table_name_fn(x) for x in tables] + with conn.reader(**kwargs) as reader: + for batch in reader: + yield _reader_batch_to_arrays(batch) + + def write_arrays( data: Any, cluster: quasardb.Cluster, diff --git a/quasardb/pandas/__init__.py b/quasardb/pandas/__init__.py index 1221ded8..9c72c47e 100644 --- a/quasardb/pandas/__init__.py +++ b/quasardb/pandas/__init__.py @@ -253,8 +253,6 @@ def stream_dataframes( Read a Pandas Dataframe from a QuasarDB Timeseries table. Returns a generator with dataframes of size `batch_size`, which is useful when traversing a large dataset which does not fit into memory. - Accepts the same parameters as `stream_dataframes`. - Parameters: ----------- @@ -279,36 +277,14 @@ def stream_dataframes( Defaults to the entire table. """ - # Sanitize batch_size - if batch_size is None: - batch_size = 2**16 - elif not isinstance(batch_size, int): - raise TypeError( - "batch_size should be an integer, but got: {} with value {}".format( - type(batch_size), str(batch_size) - ) - ) - - kwargs: Dict[str, Any] = {"batch_size": batch_size} - - if column_names: - kwargs["column_names"] = column_names - - if ranges: - kwargs["ranges"] = ranges - - coerce_table_name_fn = lambda x: x if isinstance(x, str) else x.get_name() - kwargs["table_names"] = [coerce_table_name_fn(x) for x in tables] - - with conn.reader(**kwargs) as reader: - for batch in reader: - # We always expect the timestamp column, and set this as the index - assert "$timestamp" in batch - - idx = pd.Index(batch.pop("$timestamp"), copy=False, name="$timestamp") - df = pd.DataFrame(batch, index=idx) - - yield df + for idx, xs in qdbnp.stream_arrays( + conn, + tables, + batch_size=batch_size, + column_names=column_names, + ranges=ranges, + ): + yield pd.DataFrame(xs, index=pd.Index(idx, copy=False, name="$timestamp")) def stream_dataframe( diff --git a/tests/test_numpy.py b/tests/test_numpy.py index 86dda992..e816f3e0 100644 --- a/tests/test_numpy.py +++ b/tests/test_numpy.py @@ -10,6 +10,7 @@ import quasardb import quasardb.numpy as qdbnp +import test_table as tslib from utils import assert_indexed_arrays_equal, assert_ma_equal logger = logging.getLogger("test-numpy") @@ -314,6 +315,256 @@ def test_string_array_returns_unicode(array_with_index_and_table, qdbd_connectio assert qdbnp.dtypes_equal(xs.dtype, np.dtype("unicode")) +def test_read_arrays_reads_all_columns_when_columns_empty(qdbd_connection, table): + index = np.array( + [ + np.datetime64("2017-01-01T00:00:00", "ns"), + np.datetime64("2017-01-01T00:00:01", "ns"), + np.datetime64("2017-01-01T00:00:02", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + doubles = np.array([1.0, 2.0, 3.0], dtype=np.float64) + blobs = np.array([b"a\x00b", b"cd", b"ef"], dtype=np.object_) + strings = np.array(["content_0", "content_1", "content_2"], dtype=np.dtype("U")) + integers = np.array([10, 11, 12], dtype=np.int64) + timestamps = np.array( + [ + np.datetime64("2017-01-02T00:00:00", "ns"), + np.datetime64("2017-01-02T00:00:01", "ns"), + np.datetime64("2017-01-02T00:00:02", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + symbols = np.array(["sym_0", "sym_1", "sym_2"], dtype=np.dtype("U")) + + qdbnp.write_arrays( + { + tslib._double_col_name(table): doubles, + tslib._blob_col_name(table): blobs, + tslib._string_col_name(table): strings, + tslib._int64_col_name(table): integers, + tslib._ts_col_name(table): timestamps, + tslib._symbol_col_name(table): symbols, + }, + qdbd_connection, + table, + index=index, + infer_types=False, + dtype={ + tslib._double_col_name(table): doubles.dtype, + tslib._blob_col_name(table): blobs.dtype, + tslib._string_col_name(table): strings.dtype, + tslib._int64_col_name(table): integers.dtype, + tslib._ts_col_name(table): timestamps.dtype, + tslib._symbol_col_name(table): symbols.dtype, + }, + ) + + idx, xs = qdbnp.read_arrays(qdbd_connection, [table], column_names=[]) + + np.testing.assert_array_equal(idx, index) + assert list(xs.keys()) == ["$table"] + [c.name for c in table.list_columns()] + np.testing.assert_array_equal( + xs["$table"], np.array([table.get_name()] * len(index)) + ) + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) + np.testing.assert_array_equal(xs[tslib._blob_col_name(table)], blobs) + np.testing.assert_array_equal(xs[tslib._string_col_name(table)], strings) + np.testing.assert_array_equal(xs[tslib._int64_col_name(table)], integers) + np.testing.assert_array_equal(xs[tslib._ts_col_name(table)], timestamps) + np.testing.assert_array_equal(xs[tslib._symbol_col_name(table)], symbols) + + +def test_read_arrays_reads_selected_columns_with_ranges(qdbd_connection, table): + index = np.array( + [ + np.datetime64("2017-01-01T00:00:00", "ns"), + np.datetime64("2017-01-01T00:00:01", "ns"), + np.datetime64("2017-01-01T00:00:02", "ns"), + np.datetime64("2017-01-01T00:00:03", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + doubles = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float64) + integers = np.array([10, 11, 12, 13], dtype=np.int64) + + qdbnp.write_arrays( + { + tslib._double_col_name(table): doubles, + tslib._int64_col_name(table): integers, + }, + qdbd_connection, + table, + index=index, + infer_types=False, + dtype={ + tslib._double_col_name(table): doubles.dtype, + tslib._int64_col_name(table): integers.dtype, + }, + ) + + columns = [tslib._double_col_name(table), tslib._int64_col_name(table)] + ranges = [(index[1], index[2] + np.timedelta64(1, "ns"))] + + idx, xs = qdbnp.read_arrays( + qdbd_connection, + [table], + batch_size=1, + column_names=columns, + ranges=ranges, + ) + + np.testing.assert_array_equal(idx, index[1:3]) + assert list(xs.keys()) == ["$table"] + columns + np.testing.assert_array_equal( + xs["$table"], np.array([table.get_name()] * len(index[1:3])) + ) + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles[1:3]) + np.testing.assert_array_equal(xs[tslib._int64_col_name(table)], integers[1:3]) + + +def test_read_arrays_accepts_ranges(qdbd_connection, table): + index = np.array( + [ + np.datetime64("2017-01-01T00:00:00", "ns"), + np.datetime64("2017-01-01T00:00:01", "ns"), + np.datetime64("2017-01-01T00:00:02", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + doubles = np.array([1.0, 2.0, 3.0], dtype=np.float64) + + qdbnp.write_arrays( + {tslib._double_col_name(table): doubles}, + qdbd_connection, + table, + index=index, + infer_types=False, + dtype={tslib._double_col_name(table): doubles.dtype}, + ) + + ranges = [(index[1], index[2] + np.timedelta64(1, "ns"))] + idx, xs = qdbnp.read_arrays( + qdbd_connection, + [table], + column_names=[tslib._double_col_name(table)], + ranges=ranges, + ) + + np.testing.assert_array_equal(idx, index[1:3]) + np.testing.assert_array_equal( + xs["$table"], np.array([table.get_name()] * len(index[1:3])) + ) + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles[1:3]) + + +def test_read_arrays_supports_table_object(qdbd_connection, table): + index = np.array( + [ + np.datetime64("2017-01-01T00:00:00", "ns"), + np.datetime64("2017-01-01T00:00:01", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + doubles = np.array([1.0, 2.0], dtype=np.float64) + + qdbnp.write_arrays( + {tslib._double_col_name(table): doubles}, + qdbd_connection, + table, + index=index, + infer_types=False, + dtype={tslib._double_col_name(table): doubles.dtype}, + ) + + idx, xs = qdbnp.read_arrays( + qdbd_connection, + [table], + column_names=[tslib._double_col_name(table)], + ) + + np.testing.assert_array_equal(idx, index) + np.testing.assert_array_equal( + xs["$table"], np.array([table.get_name()] * len(index)) + ) + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) + + +def test_read_arrays_supports_table_name(qdbd_connection, table): + index = np.array( + [ + np.datetime64("2017-01-01T00:00:00", "ns"), + np.datetime64("2017-01-01T00:00:01", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + doubles = np.array([1.0, 2.0], dtype=np.float64) + + qdbnp.write_arrays( + {tslib._double_col_name(table): doubles}, + qdbd_connection, + table, + index=index, + infer_types=False, + dtype={tslib._double_col_name(table): doubles.dtype}, + ) + + idx, xs = qdbnp.read_arrays( + qdbd_connection, + [table.get_name()], + column_names=[tslib._double_col_name(table)], + ) + + np.testing.assert_array_equal(idx, index) + np.testing.assert_array_equal( + xs["$table"], np.array([table.get_name()] * len(index)) + ) + np.testing.assert_array_equal(xs[tslib._double_col_name(table)], doubles) + + +def test_read_arrays_rejects_string_column_names(qdbd_connection, table): + with pytest.raises(TypeError): + qdbnp.read_arrays(qdbd_connection, [table], column_names="the_double") + + +def test_stream_arrays_reads_batched_results(qdbd_connection, table): + index = np.array( + [ + np.datetime64("2017-01-01T00:00:00", "ns"), + np.datetime64("2017-01-01T00:00:01", "ns"), + np.datetime64("2017-01-01T00:00:02", "ns"), + ], + dtype=np.dtype("datetime64[ns]"), + ) + doubles = np.array([1.0, 2.0, 3.0], dtype=np.float64) + + qdbnp.write_arrays( + {tslib._double_col_name(table): doubles}, + qdbd_connection, + table, + index=index, + infer_types=False, + dtype={tslib._double_col_name(table): doubles.dtype}, + ) + + xs = list( + qdbnp.stream_arrays( + qdbd_connection, + [table], + batch_size=1, + column_names=[tslib._double_col_name(table)], + ) + ) + + assert len(xs) == 3 + np.testing.assert_array_equal(np.concatenate([idx for idx, _ in xs]), index) + np.testing.assert_array_equal( + ma.concatenate([batch[tslib._double_col_name(table)] for _, batch in xs]), + doubles, + ) + + ###### # # Query tests