Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 172 additions & 3 deletions quasardb/numpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,31 @@
import logging
import time
import warnings
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Type, Union
from typing import (
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Sequence,
Tuple,
Type,
Union,
)

import quasardb
import quasardb.table_cache as table_cache
from quasardb.quasardb import Table, Writer
from quasardb.typing import DType, MaskedArrayAny, NDArrayAny, NDArrayTime
from quasardb.quasardb import Cluster, Table, Writer
from quasardb.typing import DType, MaskedArrayAny, NDArrayAny, NDArrayTime, RangeSet

logger = logging.getLogger("quasardb.numpy")


TableLike = Union[str, Table]
IndexedMaskedArrays = Tuple[NDArrayTime, Dict[str, MaskedArrayAny]]


class NumpyRequired(ImportError):
"""
Exception raised when trying to use QuasarDB pandas integration, but
Expand Down Expand Up @@ -705,6 +720,160 @@ def write_array(
write_with[ctype](column, index, data)


def _concat_masked(xs: List[MaskedArrayAny]) -> MaskedArrayAny:
if len(xs) == 0:
return ma.masked_array(np.array([]))
if len(xs) == 1:
return xs[0]

return ma.concatenate(xs)


def _reader_batch_to_arrays(batch: Dict[str, MaskedArrayAny]) -> IndexedMaskedArrays:
assert "$timestamp" in batch

idx = batch["$timestamp"]
xs = {cname: values for (cname, values) in batch.items() if cname != "$timestamp"}

return idx, xs


def _concat_array_batches(xs: Iterable[IndexedMaskedArrays]) -> IndexedMaskedArrays:
idx_batches: List[NDArrayTime] = []
value_batches: Dict[str, List[MaskedArrayAny]] = {}

for idx, batch in xs:
idx_batches.append(idx)

for cname, values in batch.items():
value_batches.setdefault(cname, []).append(values)

if len(idx_batches) == 0:
raise ValueError("No array batches to concatenate")

if len(idx_batches) == 1:
return idx_batches[0], {
cname: batches[0] for cname, batches in value_batches.items()
}

return np.concatenate(idx_batches), {
cname: _concat_masked(batches) for (cname, batches) in value_batches.items()
}
Comment thread
solatis marked this conversation as resolved.


def read_arrays(
conn: Cluster,
tables: List[TableLike],
*,
batch_size: Optional[int] = 2**16,
column_names: Optional[Sequence[str]] = None,
ranges: Optional[RangeSet] = None,
) -> IndexedMaskedArrays:
"""
Read any number of columns from tables as numpy masked arrays.

Parameters:
-----------

conn: quasardb.Cluster
Connection to the QuasarDB database.

tables : list[str | quasardb.Table]
QuasarDB tables to read, as a list of strings or quasardb table objects.

batch_size : int
The amount of rows to fetch in a single read operation. If unset, uses 2^16
(65536) rows as batch size by default.

column_names: optional sequence[str]
Column names to read.
If None or an empty sequence is provided, all table columns are read.

ranges: optional list[tuple]
Time ranges to read.
If None, the full available range is read.

Returns:
--------

tuple[numpy.ndarray, dict[str, numpy.ma.MaskedArray]]
A pair consisting of the shared timestamp index and a mapping of column
names to masked arrays.

Examples:
---------

Read all columns:

>>> idx, cols = qdbnp.read_arrays(conn, [my_table], column_names=[])

Read a subset of columns for a given time range:

>>> idx, cols = qdbnp.read_arrays(
... conn,
... [my_table],
... column_names=["open", "close"],
... ranges=[(start, end)],
... )
>>> opens = cols["open"]
>>> closes = cols["close"]
"""
xs = stream_arrays(
conn,
tables,
batch_size=batch_size,
column_names=column_names,
ranges=ranges,
)

try:
return _concat_array_batches(xs)
except ValueError as e:
logger.error(
"Error while concatenating arrays. This can happen if result set is empty. Returning empty arrays. Error: %s",
e,
)
return np.array([], dtype=np.dtype("datetime64[ns]")), {}


def stream_arrays(
conn: Cluster,
tables: List[TableLike],
*,
batch_size: Optional[int] = 2**16,
column_names: Optional[Sequence[str]] = None,
ranges: Optional[RangeSet] = None,
) -> Iterator[IndexedMaskedArrays]:
"""
Read one or more tables as numpy masked arrays. Returns a generator with
indexed batches of size `batch_size`, which is useful when traversing a
large dataset which does not fit into memory.
"""
# Sanitize batch_size
if batch_size is None:
batch_size = 2**16
elif not isinstance(batch_size, int):
raise TypeError(
"batch_size should be an integer, but got: {} with value {}".format(
type(batch_size), str(batch_size)
)
)

kwargs: Dict[str, Any] = {"batch_size": batch_size}

if column_names:
kwargs["column_names"] = column_names

if ranges:
kwargs["ranges"] = ranges

coerce_table_name_fn = lambda x: x if isinstance(x, str) else x.get_name()
kwargs["table_names"] = [coerce_table_name_fn(x) for x in tables]
with conn.reader(**kwargs) as reader:
for batch in reader:
yield _reader_batch_to_arrays(batch)


def write_arrays(
data: Any,
cluster: quasardb.Cluster,
Expand Down
40 changes: 8 additions & 32 deletions quasardb/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ def stream_dataframes(
Read a Pandas Dataframe from a QuasarDB Timeseries table. Returns a generator with dataframes of size `batch_size`, which is useful
when traversing a large dataset which does not fit into memory.

Accepts the same parameters as `stream_dataframes`.

Parameters:
-----------

Expand All @@ -279,36 +277,14 @@ def stream_dataframes(
Defaults to the entire table.

"""
# Sanitize batch_size
if batch_size is None:
batch_size = 2**16
elif not isinstance(batch_size, int):
raise TypeError(
"batch_size should be an integer, but got: {} with value {}".format(
type(batch_size), str(batch_size)
)
)

kwargs: Dict[str, Any] = {"batch_size": batch_size}

if column_names:
kwargs["column_names"] = column_names

if ranges:
kwargs["ranges"] = ranges

coerce_table_name_fn = lambda x: x if isinstance(x, str) else x.get_name()
kwargs["table_names"] = [coerce_table_name_fn(x) for x in tables]

with conn.reader(**kwargs) as reader:
for batch in reader:
# We always expect the timestamp column, and set this as the index
assert "$timestamp" in batch

idx = pd.Index(batch.pop("$timestamp"), copy=False, name="$timestamp")
df = pd.DataFrame(batch, index=idx)

yield df
for idx, xs in qdbnp.stream_arrays(
conn,
tables,
batch_size=batch_size,
column_names=column_names,
ranges=ranges,
):
yield pd.DataFrame(xs, index=pd.Index(idx, copy=False, name="$timestamp"))


def stream_dataframe(
Expand Down
Loading
Loading