From 834b6133463adbb0e4f4f8218e420536aae9e8a7 Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Tue, 12 May 2026 12:18:38 -0700 Subject: [PATCH 1/7] [DataLoader] Emit OTel metrics for read operations Wire OpenTelemetry instruments into the dataloader read path so operators can observe per-split and per-batch latency, throughput, and failure rates per table. Adds load_table/plan_files duration & attempt counters around the retry loop, and split/batch duration, row, byte, batch-count and error instruments around DataLoaderSplit and _TimedBatchIter. Common attributes (database, table, branch, and caller-provided execution_context under openhouse.ctx.*) are attached via a single build_attributes helper. execution_context is plumbed through TableScanContext so worker-side splits carry the same labels. --- .../dataloader/_table_scan_context.py | 12 +- .../src/openhouse/dataloader/data_loader.py | 51 ++- .../openhouse/dataloader/data_loader_split.py | 97 ++++-- .../openhouse/dataloader/metrics/__init__.py | 11 +- .../dataloader/metrics/attributes.py | 29 ++ .../dataloader/metrics/instruments.py | 95 ++++++ .../python/dataloader/tests/test_metrics.py | 304 +++++++++++++++++- 7 files changed, 551 insertions(+), 48 deletions(-) create mode 100644 integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py create mode 100644 integrations/python/dataloader/src/openhouse/dataloader/metrics/instruments.py diff --git a/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py b/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py index ae20bd9c5..b1644bd2b 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py @@ -1,6 +1,8 @@ from __future__ import annotations -from dataclasses import dataclass +from collections.abc import Mapping +from dataclasses import dataclass, field +from types import MappingProxyType from pyiceberg.expressions import AlwaysTrue, BooleanExpression from pyiceberg.io import FileIO, load_file_io @@ -9,6 +11,8 @@ from openhouse.dataloader.table_identifier import TableIdentifier +_EMPTY_EXECUTION_CONTEXT: Mapping[str, str] = MappingProxyType({}) + def _unpickle_scan_context( table_metadata: TableMetadata, @@ -17,6 +21,7 @@ def _unpickle_scan_context( row_filter: BooleanExpression, table_id: TableIdentifier, worker_jvm_args: str | None = None, + execution_context: Mapping[str, str] | None = None, ) -> TableScanContext: return TableScanContext( table_metadata=table_metadata, @@ -25,6 +30,7 @@ def _unpickle_scan_context( row_filter=row_filter, table_id=table_id, worker_jvm_args=worker_jvm_args, + execution_context=execution_context if execution_context is not None else _EMPTY_EXECUTION_CONTEXT, ) @@ -42,6 +48,8 @@ class TableScanContext: table_id: Identifier for the table being scanned row_filter: Row-level filter expression pushed down to the scan worker_jvm_args: JVM arguments applied when the JNI JVM is created in worker processes + execution_context: Caller-provided context (e.g. tenant, environment) attached as + attributes on metrics emitted while iterating splits. """ table_metadata: TableMetadata @@ -50,6 +58,7 @@ class TableScanContext: table_id: TableIdentifier row_filter: BooleanExpression = AlwaysTrue() worker_jvm_args: str | None = None + execution_context: Mapping[str, str] = field(default_factory=lambda: _EMPTY_EXECUTION_CONTEXT) def __reduce__(self) -> tuple: return ( @@ -61,5 +70,6 @@ def __reduce__(self) -> tuple: self.row_filter, self.table_id, self.worker_jvm_args, + dict(self.execution_context), ), ) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 26424d52c..806d5296e 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import time from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence from dataclasses import dataclass from functools import cached_property @@ -8,6 +9,7 @@ from types import MappingProxyType from typing import TypeVar +from opentelemetry.metrics import Counter, Histogram from pyiceberg.catalog import Catalog from pyiceberg.table import Table from pyiceberg.table.snapshots import Snapshot @@ -27,6 +29,7 @@ _to_pyiceberg, always_true, ) +from openhouse.dataloader.metrics import build_attributes, instruments from openhouse.dataloader.scan_optimizer import optimize_scan from openhouse.dataloader.table_identifier import TableIdentifier from openhouse.dataloader.table_transformer import TableTransformer @@ -51,22 +54,37 @@ def _batched(iterable: Iterable[_T], n: int) -> Iterator[tuple[_T, ...]]: yield batch -def _retry(fn: Callable[[], _T], label: str, max_attempts: int) -> _T: - """Call *fn* with retry logic, logging duration of each attempt. +def _retry( + fn: Callable[[], _T], + label: str, + max_attempts: int, + duration_histogram: Histogram, + attempts_counter: Counter, + attributes: Mapping[str, str], +) -> _T: + """Call *fn* with retry logic, logging and emitting metrics for each attempt. Retries on ``OSError`` (transient network/storage I/O failures), except ``HTTPError`` which is only retried for 5xx status codes. Uses exponential backoff with up to *max_attempts* total attempts. + + Bumps *attempts_counter* once per attempt and records *duration_histogram* + once when the call ultimately returns or raises (wall-clock across retries). """ - for attempt in Retrying( - retry=retry_if_exception(_is_transient), - stop=stop_after_attempt(max_attempts), - wait=wait_exponential(), - reraise=True, - ): - with attempt, log_duration(logger, "%s (attempt %d)", label, attempt.retry_state.attempt_number): - return fn() - raise AssertionError("unreachable") # pragma: no cover + overall_start = time.monotonic() + try: + for attempt in Retrying( + retry=retry_if_exception(_is_transient), + stop=stop_after_attempt(max_attempts), + wait=wait_exponential(), + reraise=True, + ): + with attempt, log_duration(logger, "%s (attempt %d)", label, attempt.retry_state.attempt_number): + attempts_counter.add(1, attributes) + return fn() + raise AssertionError("unreachable") # pragma: no cover + finally: + duration_histogram.record(time.monotonic() - overall_start, attributes) @dataclass(frozen=True) @@ -173,6 +191,9 @@ def _iceberg_table(self) -> Table: lambda: self._catalog.load_table((self._table_id.database, self._table_id.table)), label=f"load_table {self._table_id}", max_attempts=self._max_attempts, + duration_histogram=instruments.load_table_duration, + attempts_counter=instruments.load_table_attempts, + attributes=build_attributes(self._table_id, self._context.execution_context), ) @property @@ -276,12 +297,18 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: row_filter=row_filter, table_id=self._table_id, worker_jvm_args=self._context.jvm_config.worker_args if self._context.jvm_config else None, + execution_context=self._context.execution_context or {}, ) # plan_files() materializes all tasks at once (PyIceberg doesn't support streaming) # Manifests are read in parallel with one thread per manifest scan_tasks = _retry( - lambda: scan.plan_files(), label=f"plan_files {self._table_id}", max_attempts=self._max_attempts + lambda: scan.plan_files(), + label=f"plan_files {self._table_id}", + max_attempts=self._max_attempts, + duration_histogram=instruments.plan_files_duration, + attempts_counter=instruments.plan_files_attempts, + attributes=build_attributes(self._table_id, self._context.execution_context), ) for chunk in _batched(scan_tasks, self._files_per_split): diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py index 77e8c1d81..ad4f8881e 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py @@ -18,6 +18,7 @@ from openhouse.dataloader._table_scan_context import TableScanContext from openhouse.dataloader._timer import log_duration from openhouse.dataloader.filters import _quote_identifier +from openhouse.dataloader.metrics import build_attributes, instruments from openhouse.dataloader.table_identifier import TableIdentifier from openhouse.dataloader.udf_registry import NoOpRegistry, UDFRegistry @@ -57,12 +58,21 @@ def _bind_batch_table(session: SessionContext, table_id: TableIdentifier, batch: class _TimedBatchIter: - """Wraps a RecordBatch iterator to log the wall-clock time of each ``next()`` call.""" + """Wraps a RecordBatch iterator to log and emit metrics for each ``next()`` call.""" - def __init__(self, inner: Iterator[RecordBatch], split_id: str) -> None: + def __init__( + self, + inner: Iterator[RecordBatch], + split_id: str, + attributes: Mapping[str, str], + ) -> None: self._inner = inner self._split_id = split_id + self._attributes = attributes self._idx = 0 + self.total_rows = 0 + self.total_bytes = 0 + self.batch_count = 0 def __iter__(self) -> _TimedBatchIter: return self @@ -74,11 +84,20 @@ def __next__(self) -> RecordBatch: except StopIteration: raise except Exception: - logger.warning( - "record_batch %s [%d] failed after %.3fs", self._split_id, self._idx, time.monotonic() - start - ) + elapsed = time.monotonic() - start + logger.warning("record_batch %s [%d] failed after %.3fs", self._split_id, self._idx, elapsed) + instruments.batch_errors.add(1, self._attributes) raise - logger.info("record_batch %s [%d] in %.3fs", self._split_id, self._idx, time.monotonic() - start) + elapsed = time.monotonic() - start + logger.info("record_batch %s [%d] in %.3fs", self._split_id, self._idx, elapsed) + rows = batch.num_rows + nbytes = batch.nbytes + instruments.batch_duration.record(elapsed, self._attributes) + instruments.batch_rows.record(rows, self._attributes) + instruments.batch_bytes.record(nbytes, self._attributes) + self.total_rows += rows + self.total_bytes += nbytes + self.batch_count += 1 self._idx += 1 return batch @@ -140,34 +159,48 @@ def __iter__(self) -> Iterator[RecordBatch]: ctx = self._scan_context if ctx.worker_jvm_args is not None: apply_libhdfs_opts(ctx.worker_jvm_args) - arrow_scan = ArrowScan( - table_metadata=ctx.table_metadata, - io=ctx.io, - projected_schema=ctx.projected_schema, - row_filter=ctx.row_filter, - ) - - split_id = self.id[:12] - - with log_duration(logger, "setup_scan %s", split_id): - batches = arrow_scan.to_record_batches( - self._file_scan_tasks, - order=ArrivalOrder(concurrent_streams=len(self._file_scan_tasks), batch_size=self._batch_size), + attributes = build_attributes(ctx.table_id, ctx.execution_context) + split_start = time.monotonic() + timed: _TimedBatchIter | None = None + try: + arrow_scan = ArrowScan( + table_metadata=ctx.table_metadata, + io=ctx.io, + projected_schema=ctx.projected_schema, + row_filter=ctx.row_filter, ) - timed = _TimedBatchIter(iter(batches), split_id) - - if self._transform_sql is None: - yield from timed - else: - # Materialize the first batch before creating the transform session - # so that the HDFS JVM starts (and picks up worker_jvm_args) before - # any UDF registration code can trigger JNI. - first = next(timed, None) - if first is None: - return - session = _create_transform_session(self._scan_context.table_id, self._udf_registry, self._batch_size) - yield from _timed_transform(chain([first], timed), split_id, session, self._apply_transform) + split_id = self.id[:12] + + with log_duration(logger, "setup_scan %s", split_id): + batches = arrow_scan.to_record_batches( + self._file_scan_tasks, + order=ArrivalOrder(concurrent_streams=len(self._file_scan_tasks), batch_size=self._batch_size), + ) + + timed = _TimedBatchIter(iter(batches), split_id, attributes) + + if self._transform_sql is None: + yield from timed + else: + # Materialize the first batch before creating the transform session + # so that the HDFS JVM starts (and picks up worker_jvm_args) before + # any UDF registration code can trigger JNI. + first = next(timed, None) + if first is None: + return + session = _create_transform_session(self._scan_context.table_id, self._udf_registry, self._batch_size) + yield from _timed_transform(chain([first], timed), split_id, session, self._apply_transform) + except BaseException: + instruments.split_errors.add(1, attributes) + raise + finally: + instruments.split_duration.record(time.monotonic() - split_start, attributes) + instruments.split_files.record(len(self._file_scan_tasks), attributes) + if timed is not None: + instruments.split_rows.record(timed.total_rows, attributes) + instruments.split_bytes.record(timed.total_bytes, attributes) + instruments.split_batches.record(timed.batch_count, attributes) def _apply_transform(self, session: SessionContext, batch: RecordBatch) -> Iterator[RecordBatch]: """Execute the transform SQL against a single RecordBatch.""" diff --git a/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py b/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py index f180e5d27..e657b3ecc 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py @@ -4,12 +4,21 @@ fallback when no SDK is configured. The *application* (not this library) is responsible for installing an SDK and configuring exporters. -Call sites should obtain a ``Meter`` via the OTEL API directly:: +External code that wants the dataloader's meter can call:: from opentelemetry.metrics import get_meter from openhouse.dataloader.metrics import METER_NAME meter = get_meter(METER_NAME) + +Internal emission inside the dataloader uses the module-level instruments +in :mod:`openhouse.dataloader.metrics.instruments` paired with the attribute +helper :func:`openhouse.dataloader.metrics.attributes.build_attributes`. """ METER_NAME = "openhouse.dataloader" + +from openhouse.dataloader.metrics import instruments # noqa: E402 +from openhouse.dataloader.metrics.attributes import build_attributes # noqa: E402 + +__all__ = ["METER_NAME", "build_attributes", "instruments"] diff --git a/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py b/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py new file mode 100644 index 000000000..90db99b5e --- /dev/null +++ b/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py @@ -0,0 +1,29 @@ +"""Helpers for building the common metric attribute dict.""" + +from __future__ import annotations + +from collections.abc import Mapping + +from openhouse.dataloader.table_identifier import TableIdentifier + + +def build_attributes( + table_id: TableIdentifier, + execution_context: Mapping[str, str] | None = None, +) -> dict[str, str]: + """Return the standard metric attributes for a table read. + + Caller-provided ``execution_context`` keys are namespaced under + ``openhouse.ctx.`` so they cannot collide with built-in attributes. + ``openhouse.branch`` is omitted when the table identifier has no branch. + """ + attrs: dict[str, str] = { + "openhouse.database": table_id.database, + "openhouse.table": table_id.table, + } + if table_id.branch is not None: + attrs["openhouse.branch"] = table_id.branch + if execution_context: + for key, value in execution_context.items(): + attrs[f"openhouse.ctx.{key}"] = value + return attrs diff --git a/integrations/python/dataloader/src/openhouse/dataloader/metrics/instruments.py b/integrations/python/dataloader/src/openhouse/dataloader/metrics/instruments.py new file mode 100644 index 000000000..7d020f5c8 --- /dev/null +++ b/integrations/python/dataloader/src/openhouse/dataloader/metrics/instruments.py @@ -0,0 +1,95 @@ +"""OpenTelemetry instrument singletons for the dataloader. + +All instruments are bound to a single ``Meter`` obtained via ``get_meter(METER_NAME)``. +Call sites import the module-level objects rather than constructing their own so the +instrument inventory stays in one place. + +When no SDK is configured, ``opentelemetry-api`` returns no-op instruments and the +``record``/``add`` calls are cheap. +""" + +from __future__ import annotations + +from opentelemetry.metrics import get_meter + +from openhouse.dataloader.metrics import METER_NAME + +_meter = get_meter(METER_NAME) + +# Retried catalog/metadata operations. Duration is wall-clock across all attempts; +# attempts counts every try (success or failure). +load_table_duration = _meter.create_histogram( + name="openhouse.dataloader.load_table.duration", + unit="s", + description="Wall-clock duration of the load_table call (across retries).", +) +load_table_attempts = _meter.create_counter( + name="openhouse.dataloader.load_table.attempts", + unit="1", + description="Attempt count for the load_table call (each try, success or failure).", +) + +plan_files_duration = _meter.create_histogram( + name="openhouse.dataloader.plan_files.duration", + unit="s", + description="Wall-clock duration of the plan_files call (across retries).", +) +plan_files_attempts = _meter.create_counter( + name="openhouse.dataloader.plan_files.attempts", + unit="1", + description="Attempt count for the plan_files call (each try, success or failure).", +) + +# Per-split read. +split_duration = _meter.create_histogram( + name="openhouse.dataloader.split.duration", + unit="s", + description="Wall-clock duration of a DataLoaderSplit iteration.", +) +split_files = _meter.create_histogram( + name="openhouse.dataloader.split.files", + unit="1", + description="Number of files in a DataLoaderSplit.", +) +split_rows = _meter.create_histogram( + name="openhouse.dataloader.split.rows", + unit="1", + description="Total rows yielded by a DataLoaderSplit.", +) +split_bytes = _meter.create_histogram( + name="openhouse.dataloader.split.bytes", + unit="By", + description="Total decompressed bytes yielded by a DataLoaderSplit.", +) +split_batches = _meter.create_histogram( + name="openhouse.dataloader.split.batches", + unit="1", + description="Total RecordBatches yielded by a DataLoaderSplit.", +) +split_errors = _meter.create_counter( + name="openhouse.dataloader.split.errors", + unit="1", + description="Unhandled exceptions raised from a DataLoaderSplit iteration.", +) + +# Per-batch read. +batch_duration = _meter.create_histogram( + name="openhouse.dataloader.batch.duration", + unit="s", + description="Wall-clock duration of reading a single RecordBatch.", +) +batch_rows = _meter.create_histogram( + name="openhouse.dataloader.batch.rows", + unit="1", + description="Rows in a single RecordBatch.", +) +batch_bytes = _meter.create_histogram( + name="openhouse.dataloader.batch.bytes", + unit="By", + description="Decompressed bytes of a single RecordBatch.", +) +batch_errors = _meter.create_counter( + name="openhouse.dataloader.batch.errors", + unit="1", + description="Unhandled exceptions raised while reading a RecordBatch.", +) diff --git a/integrations/python/dataloader/tests/test_metrics.py b/integrations/python/dataloader/tests/test_metrics.py index 00273ceda..55ee3dcde 100644 --- a/integrations/python/dataloader/tests/test_metrics.py +++ b/integrations/python/dataloader/tests/test_metrics.py @@ -1,8 +1,35 @@ -"""Tests for the OpenTelemetry metrics infrastructure.""" +"""Tests for the OpenTelemetry metrics emitted by the dataloader.""" +from __future__ import annotations + +import os +import pickle +from collections.abc import Iterator + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest +from opentelemetry import metrics as otel_metrics from opentelemetry.metrics import Meter, get_meter +from opentelemetry.metrics import _internal as otel_metrics_internal +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from pyiceberg.io import load_file_io +from pyiceberg.manifest import DataFile, FileFormat +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC +from pyiceberg.schema import Schema +from pyiceberg.table import FileScanTask +from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER +from pyiceberg.types import LongType, NestedField + +from openhouse.dataloader._table_scan_context import TableScanContext +from openhouse.dataloader.data_loader import _retry +from openhouse.dataloader.data_loader_split import DataLoaderSplit +from openhouse.dataloader.metrics import METER_NAME, build_attributes, instruments +from openhouse.dataloader.table_identifier import TableIdentifier -from openhouse.dataloader.metrics import METER_NAME +# --- Meter / METER_NAME basics --- def test_meter_name_is_stable(): @@ -11,3 +38,276 @@ def test_meter_name_is_stable(): def test_get_meter_with_meter_name_returns_a_meter(): assert isinstance(get_meter(METER_NAME), Meter) + + +# --- build_attributes --- + + +def test_build_attributes_includes_database_table(): + table_id = TableIdentifier("db1", "tbl1") + attrs = build_attributes(table_id, None) + assert attrs == {"openhouse.database": "db1", "openhouse.table": "tbl1"} + + +def test_build_attributes_includes_branch_when_set(): + table_id = TableIdentifier("db1", "tbl1", branch="release") + attrs = build_attributes(table_id, None) + assert attrs["openhouse.branch"] == "release" + + +def test_build_attributes_namespaces_execution_context(): + table_id = TableIdentifier("db1", "tbl1") + attrs = build_attributes(table_id, {"tenant": "team-a", "env": "prod"}) + assert attrs["openhouse.ctx.tenant"] == "team-a" + assert attrs["openhouse.ctx.env"] == "prod" + + +# --- InMemoryMetricReader harness --- + + +@pytest.fixture +def metrics_reader() -> Iterator[InMemoryMetricReader]: + """Install an SDK MeterProvider with an InMemoryMetricReader for the test. + + Resets the one-shot ``_METER_PROVIDER_SET_ONCE`` guard and restores the + prior MeterProvider on exit so other tests are not affected. + """ + reader = InMemoryMetricReader() + provider = MeterProvider(metric_readers=[reader]) + once = otel_metrics_internal._METER_PROVIDER_SET_ONCE + prior_provider = otel_metrics_internal._METER_PROVIDER + prior_done = once._done + once._done = False + otel_metrics.set_meter_provider(provider) + try: + yield reader + finally: + otel_metrics_internal._METER_PROVIDER = prior_provider + once._done = prior_done + + +def _data_points(reader: InMemoryMetricReader, metric_name: str) -> list: + """Collect and return all data points for *metric_name* across scopes.""" + data = reader.get_metrics_data() + points: list = [] + if data is None: + return points + for resource_metric in data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + if metric.name == metric_name: + points.extend(metric.data.data_points) + return points + + +def _attrs(point) -> dict: + return dict(point.attributes) + + +# --- _retry attempts / duration --- + + +def test_retry_emits_one_attempt_and_one_duration_on_success(metrics_reader): + table_id = TableIdentifier("db", "tbl") + attrs = build_attributes(table_id, None) + result = _retry( + lambda: "ok", + label="load_table db.tbl", + max_attempts=3, + duration_histogram=instruments.load_table_duration, + attempts_counter=instruments.load_table_attempts, + attributes=attrs, + ) + assert result == "ok" + + attempts = _data_points(metrics_reader, "openhouse.dataloader.load_table.attempts") + assert len(attempts) == 1 + assert _attrs(attempts[0]) == attrs + assert attempts[0].value == 1 + + durations = _data_points(metrics_reader, "openhouse.dataloader.load_table.duration") + assert len(durations) == 1 + assert _attrs(durations[0]) == attrs + + +def test_retry_counts_each_attempt_on_transient_then_success(metrics_reader): + table_id = TableIdentifier("db", "tbl") + attrs = build_attributes(table_id, {"tenant": "t1"}) + calls = {"n": 0} + + def fn(): + calls["n"] += 1 + if calls["n"] == 1: + raise OSError("transient") + return "ok" + + result = _retry( + fn, + label="plan_files db.tbl", + max_attempts=3, + duration_histogram=instruments.plan_files_duration, + attempts_counter=instruments.plan_files_attempts, + attributes=attrs, + ) + assert result == "ok" + assert calls["n"] == 2 + + attempts = _data_points(metrics_reader, "openhouse.dataloader.plan_files.attempts") + assert len(attempts) == 1 + assert attempts[0].value == 2 + assert _attrs(attempts[0])["openhouse.ctx.tenant"] == "t1" + + durations = _data_points(metrics_reader, "openhouse.dataloader.plan_files.duration") + assert len(durations) == 1 + + +def test_retry_permanent_failure_still_records_duration(metrics_reader): + table_id = TableIdentifier("db", "tbl") + attrs = build_attributes(table_id, None) + + class _NonTransient(Exception): + pass + + def fn(): + raise _NonTransient("nope") + + with pytest.raises(_NonTransient): + _retry( + fn, + label="load_table", + max_attempts=3, + duration_histogram=instruments.load_table_duration, + attempts_counter=instruments.load_table_attempts, + attributes=attrs, + ) + + attempts = _data_points(metrics_reader, "openhouse.dataloader.load_table.attempts") + assert len(attempts) == 1 + assert attempts[0].value == 1 + + durations = _data_points(metrics_reader, "openhouse.dataloader.load_table.duration") + assert len(durations) == 1 + + +# --- DataLoaderSplit instrumentation --- + +_SPLIT_SCHEMA = Schema(NestedField(field_id=1, name="id", field_type=LongType(), required=False)) +_SPLIT_TABLE_ID = TableIdentifier("db", "tbl") + + +def _make_split(tmp_path, execution_context: dict | None = None) -> DataLoaderSplit: + file_path = str(tmp_path / "data.parquet") + table = pa.table({"id": pa.array([1, 2, 3], type=pa.int64())}) + fields = [field.with_metadata({b"PARQUET:field_id": str(i + 1).encode()}) for i, field in enumerate(table.schema)] + pq.write_table(table.cast(pa.schema(fields)), file_path) + + metadata = new_table_metadata( + schema=_SPLIT_SCHEMA, + partition_spec=UNPARTITIONED_PARTITION_SPEC, + sort_order=UNSORTED_SORT_ORDER, + location=str(tmp_path), + ) + scan_context = TableScanContext( + table_metadata=metadata, + io=load_file_io(properties={}, location=file_path), + projected_schema=_SPLIT_SCHEMA, + table_id=_SPLIT_TABLE_ID, + execution_context=execution_context or {}, + ) + data_file = DataFile.from_args( + file_path=file_path, + file_format=FileFormat.PARQUET, + record_count=table.num_rows, + file_size_in_bytes=os.path.getsize(file_path), + ) + data_file._spec_id = 0 + task = FileScanTask(data_file=data_file) + return DataLoaderSplit(file_scan_tasks=[task], scan_context=scan_context) + + +def test_split_emits_per_split_and_per_batch_metrics(tmp_path, metrics_reader): + split = _make_split(tmp_path, execution_context={"tenant": "t1"}) + batches = list(split) + assert sum(b.num_rows for b in batches) == 3 + + expected_attrs = { + "openhouse.database": "db", + "openhouse.table": "tbl", + "openhouse.ctx.tenant": "t1", + } + + split_duration = _data_points(metrics_reader, "openhouse.dataloader.split.duration") + assert len(split_duration) == 1 + assert _attrs(split_duration[0]) == expected_attrs + + split_files = _data_points(metrics_reader, "openhouse.dataloader.split.files") + assert len(split_files) == 1 + assert split_files[0].sum == 1 + + split_rows = _data_points(metrics_reader, "openhouse.dataloader.split.rows") + assert len(split_rows) == 1 + assert split_rows[0].sum == 3 + + split_bytes = _data_points(metrics_reader, "openhouse.dataloader.split.bytes") + assert len(split_bytes) == 1 + assert split_bytes[0].sum > 0 + + split_batches = _data_points(metrics_reader, "openhouse.dataloader.split.batches") + assert len(split_batches) == 1 + assert split_batches[0].sum >= 1 + + batch_duration = _data_points(metrics_reader, "openhouse.dataloader.batch.duration") + assert len(batch_duration) == 1 + assert _attrs(batch_duration[0]) == expected_attrs + + batch_rows = _data_points(metrics_reader, "openhouse.dataloader.batch.rows") + assert len(batch_rows) == 1 + assert batch_rows[0].sum == 3 + + +def test_batch_read_failure_bumps_error_counters(tmp_path, monkeypatch, metrics_reader): + split = _make_split(tmp_path) + + class _ReaderError(Exception): + pass + + def _fake_to_record_batches(self, scan_tasks, **kwargs): + def _gen(): + raise _ReaderError("boom") + yield # pragma: no cover -- makes this a generator + + return _gen() + + monkeypatch.setattr( + "openhouse.dataloader.data_loader_split.ArrowScan.to_record_batches", + _fake_to_record_batches, + ) + + with pytest.raises(_ReaderError): + list(split) + + batch_errors = _data_points(metrics_reader, "openhouse.dataloader.batch.errors") + assert len(batch_errors) == 1 + assert batch_errors[0].value == 1 + + split_errors = _data_points(metrics_reader, "openhouse.dataloader.split.errors") + assert len(split_errors) == 1 + assert split_errors[0].value == 1 + + # split.duration is still recorded on failure + split_duration = _data_points(metrics_reader, "openhouse.dataloader.split.duration") + assert len(split_duration) == 1 + + +# --- TableScanContext.execution_context --- + + +def test_table_scan_context_default_execution_context_is_empty(tmp_path): + split = _make_split(tmp_path) + assert dict(split._scan_context.execution_context) == {} + + +def test_table_scan_context_pickle_preserves_execution_context(tmp_path): + split = _make_split(tmp_path, execution_context={"tenant": "t1"}) + restored = pickle.loads(pickle.dumps(split._scan_context)) + assert dict(restored.execution_context) == {"tenant": "t1"} From 82b9fee2f228fbcb5528d0dd5d225cff0df429cf Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Tue, 12 May 2026 13:01:58 -0700 Subject: [PATCH 2/7] [DataLoader] Switch metric attributes to explicit caller-provided dict MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DataLoaderContext gains a new ``metric_attributes`` field; values are attached verbatim to every emitted metric so callers pick exactly which dimensions are dimensions (and at what cardinality), rather than the loader auto-promoting every ``execution_context`` key. ``execution_context`` keeps its original transformer-only role. Drops the unused MappingProxyType empty-context singleton in favor of ``default_factory=dict`` per Python dataclass idiom, removes the now-superfluous branch attribute, and documents why metric names stay lowercase — OSS OpenTelemetry SDK normalizes names to lowercase on export, which would turn LinkedIn-spec PascalCase into unreadable concatenated tokens (``OpenHouse.DataLoader.LoadTableTime`` → ``openhouse.dataloader.loadtabletime``). PascalCase is only preserved by the linkedin.opentelemetry wrapper, which this OSS library does not depend on. --- .../dataloader/_table_scan_context.py | 15 +++----- .../src/openhouse/dataloader/data_loader.py | 11 ++++-- .../openhouse/dataloader/data_loader_split.py | 2 +- .../dataloader/metrics/attributes.py | 16 ++++---- .../dataloader/metrics/instruments.py | 7 ++++ .../python/dataloader/tests/test_metrics.py | 38 +++++++++---------- 6 files changed, 48 insertions(+), 41 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py b/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py index b1644bd2b..7038242b0 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py @@ -2,7 +2,6 @@ from collections.abc import Mapping from dataclasses import dataclass, field -from types import MappingProxyType from pyiceberg.expressions import AlwaysTrue, BooleanExpression from pyiceberg.io import FileIO, load_file_io @@ -11,8 +10,6 @@ from openhouse.dataloader.table_identifier import TableIdentifier -_EMPTY_EXECUTION_CONTEXT: Mapping[str, str] = MappingProxyType({}) - def _unpickle_scan_context( table_metadata: TableMetadata, @@ -21,7 +18,7 @@ def _unpickle_scan_context( row_filter: BooleanExpression, table_id: TableIdentifier, worker_jvm_args: str | None = None, - execution_context: Mapping[str, str] | None = None, + metric_attributes: Mapping[str, str] | None = None, ) -> TableScanContext: return TableScanContext( table_metadata=table_metadata, @@ -30,7 +27,7 @@ def _unpickle_scan_context( row_filter=row_filter, table_id=table_id, worker_jvm_args=worker_jvm_args, - execution_context=execution_context if execution_context is not None else _EMPTY_EXECUTION_CONTEXT, + metric_attributes=metric_attributes if metric_attributes is not None else {}, ) @@ -48,8 +45,8 @@ class TableScanContext: table_id: Identifier for the table being scanned row_filter: Row-level filter expression pushed down to the scan worker_jvm_args: JVM arguments applied when the JNI JVM is created in worker processes - execution_context: Caller-provided context (e.g. tenant, environment) attached as - attributes on metrics emitted while iterating splits. + metric_attributes: Caller-provided attributes (e.g. tenant, environment) attached + verbatim to every metric emitted while iterating splits. Caller controls naming. """ table_metadata: TableMetadata @@ -58,7 +55,7 @@ class TableScanContext: table_id: TableIdentifier row_filter: BooleanExpression = AlwaysTrue() worker_jvm_args: str | None = None - execution_context: Mapping[str, str] = field(default_factory=lambda: _EMPTY_EXECUTION_CONTEXT) + metric_attributes: Mapping[str, str] = field(default_factory=dict) def __reduce__(self) -> tuple: return ( @@ -70,6 +67,6 @@ def __reduce__(self) -> tuple: self.row_filter, self.table_id, self.worker_jvm_args, - dict(self.execution_context), + dict(self.metric_attributes), ), ) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 806d5296e..4525aed2a 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -119,6 +119,10 @@ class DataLoaderContext: Args: execution_context: Dictionary of execution context information (e.g. tenant, environment) + metric_attributes: Optional attributes attached to every metric emitted by this loader + (e.g. ``{"Tenant": "team-a"}``). Keys/values are passed through verbatim so + callers control naming. Keep cardinality bounded — LinkedIn pre-agg expects + fewer than ~100 unique values per dimension. table_transformer: Transformation to apply to the table before loading (e.g. column masking) udf_registry: UDFs required for the table transformation jvm_config: JVM configuration for JNI-based storage access. Currently only HDFS is supported @@ -126,6 +130,7 @@ class DataLoaderContext: """ execution_context: Mapping[str, str] | None = None + metric_attributes: Mapping[str, str] | None = None table_transformer: TableTransformer | None = None udf_registry: UDFRegistry | None = None jvm_config: JvmConfig | None = None @@ -193,7 +198,7 @@ def _iceberg_table(self) -> Table: max_attempts=self._max_attempts, duration_histogram=instruments.load_table_duration, attempts_counter=instruments.load_table_attempts, - attributes=build_attributes(self._table_id, self._context.execution_context), + attributes=build_attributes(self._table_id, self._context.metric_attributes), ) @property @@ -297,7 +302,7 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: row_filter=row_filter, table_id=self._table_id, worker_jvm_args=self._context.jvm_config.worker_args if self._context.jvm_config else None, - execution_context=self._context.execution_context or {}, + metric_attributes=self._context.metric_attributes or {}, ) # plan_files() materializes all tasks at once (PyIceberg doesn't support streaming) @@ -308,7 +313,7 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: max_attempts=self._max_attempts, duration_histogram=instruments.plan_files_duration, attempts_counter=instruments.plan_files_attempts, - attributes=build_attributes(self._table_id, self._context.execution_context), + attributes=build_attributes(self._table_id, self._context.metric_attributes), ) for chunk in _batched(scan_tasks, self._files_per_split): diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py index ad4f8881e..3c78d3b80 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py @@ -159,7 +159,7 @@ def __iter__(self) -> Iterator[RecordBatch]: ctx = self._scan_context if ctx.worker_jvm_args is not None: apply_libhdfs_opts(ctx.worker_jvm_args) - attributes = build_attributes(ctx.table_id, ctx.execution_context) + attributes = build_attributes(ctx.table_id, ctx.metric_attributes) split_start = time.monotonic() timed: _TimedBatchIter | None = None try: diff --git a/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py b/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py index 90db99b5e..5febfbf82 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py @@ -9,21 +9,19 @@ def build_attributes( table_id: TableIdentifier, - execution_context: Mapping[str, str] | None = None, + extra: Mapping[str, str] | None = None, ) -> dict[str, str]: """Return the standard metric attributes for a table read. - Caller-provided ``execution_context`` keys are namespaced under - ``openhouse.ctx.`` so they cannot collide with built-in attributes. - ``openhouse.branch`` is omitted when the table identifier has no branch. + Caller-provided ``extra`` attributes are merged in verbatim (no namespacing), + so callers can supply exactly the dimensions they want on every emission — + e.g. ``{"tenant": "team-a", "env": "prod"}``. Caller keys override built-ins + on collision. """ attrs: dict[str, str] = { "openhouse.database": table_id.database, "openhouse.table": table_id.table, } - if table_id.branch is not None: - attrs["openhouse.branch"] = table_id.branch - if execution_context: - for key, value in execution_context.items(): - attrs[f"openhouse.ctx.{key}"] = value + if extra: + attrs.update(extra) return attrs diff --git a/integrations/python/dataloader/src/openhouse/dataloader/metrics/instruments.py b/integrations/python/dataloader/src/openhouse/dataloader/metrics/instruments.py index 7d020f5c8..64d91fb20 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/metrics/instruments.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/metrics/instruments.py @@ -4,6 +4,13 @@ Call sites import the module-level objects rather than constructing their own so the instrument inventory stays in one place. +Names use lowercase dotted form (``openhouse.dataloader..``) because +the OSS OpenTelemetry SDK normalizes metric names to lowercase on export, which would +otherwise turn the LinkedIn-spec PascalCase form into unreadable concatenated tokens +(e.g. ``OpenHouse.DataLoader.LoadTableTime`` → ``openhouse.dataloader.loadtabletime``). +The LinkedIn ``linkedin.opentelemetry`` wrapper preserves PascalCase via its registry, +but this library depends only on ``opentelemetry-api`` so consumers can wire any SDK. + When no SDK is configured, ``opentelemetry-api`` returns no-op instruments and the ``record``/``add`` calls are cheap. """ diff --git a/integrations/python/dataloader/tests/test_metrics.py b/integrations/python/dataloader/tests/test_metrics.py index 55ee3dcde..90953c633 100644 --- a/integrations/python/dataloader/tests/test_metrics.py +++ b/integrations/python/dataloader/tests/test_metrics.py @@ -49,17 +49,17 @@ def test_build_attributes_includes_database_table(): assert attrs == {"openhouse.database": "db1", "openhouse.table": "tbl1"} -def test_build_attributes_includes_branch_when_set(): - table_id = TableIdentifier("db1", "tbl1", branch="release") - attrs = build_attributes(table_id, None) - assert attrs["openhouse.branch"] == "release" +def test_build_attributes_merges_caller_provided_attributes_verbatim(): + table_id = TableIdentifier("db1", "tbl1") + attrs = build_attributes(table_id, {"tenant": "team-a", "env": "prod"}) + assert attrs["tenant"] == "team-a" + assert attrs["env"] == "prod" -def test_build_attributes_namespaces_execution_context(): +def test_build_attributes_caller_keys_override_builtins(): table_id = TableIdentifier("db1", "tbl1") - attrs = build_attributes(table_id, {"tenant": "team-a", "env": "prod"}) - assert attrs["openhouse.ctx.tenant"] == "team-a" - assert attrs["openhouse.ctx.env"] == "prod" + attrs = build_attributes(table_id, {"openhouse.table": "override"}) + assert attrs["openhouse.table"] == "override" # --- InMemoryMetricReader harness --- @@ -155,7 +155,7 @@ def fn(): attempts = _data_points(metrics_reader, "openhouse.dataloader.plan_files.attempts") assert len(attempts) == 1 assert attempts[0].value == 2 - assert _attrs(attempts[0])["openhouse.ctx.tenant"] == "t1" + assert _attrs(attempts[0])["tenant"] == "t1" durations = _data_points(metrics_reader, "openhouse.dataloader.plan_files.duration") assert len(durations) == 1 @@ -195,7 +195,7 @@ def fn(): _SPLIT_TABLE_ID = TableIdentifier("db", "tbl") -def _make_split(tmp_path, execution_context: dict | None = None) -> DataLoaderSplit: +def _make_split(tmp_path, metric_attributes: dict | None = None) -> DataLoaderSplit: file_path = str(tmp_path / "data.parquet") table = pa.table({"id": pa.array([1, 2, 3], type=pa.int64())}) fields = [field.with_metadata({b"PARQUET:field_id": str(i + 1).encode()}) for i, field in enumerate(table.schema)] @@ -212,7 +212,7 @@ def _make_split(tmp_path, execution_context: dict | None = None) -> DataLoaderSp io=load_file_io(properties={}, location=file_path), projected_schema=_SPLIT_SCHEMA, table_id=_SPLIT_TABLE_ID, - execution_context=execution_context or {}, + metric_attributes=metric_attributes or {}, ) data_file = DataFile.from_args( file_path=file_path, @@ -226,14 +226,14 @@ def _make_split(tmp_path, execution_context: dict | None = None) -> DataLoaderSp def test_split_emits_per_split_and_per_batch_metrics(tmp_path, metrics_reader): - split = _make_split(tmp_path, execution_context={"tenant": "t1"}) + split = _make_split(tmp_path, metric_attributes={"tenant": "t1"}) batches = list(split) assert sum(b.num_rows for b in batches) == 3 expected_attrs = { "openhouse.database": "db", "openhouse.table": "tbl", - "openhouse.ctx.tenant": "t1", + "tenant": "t1", } split_duration = _data_points(metrics_reader, "openhouse.dataloader.split.duration") @@ -299,15 +299,15 @@ def _gen(): assert len(split_duration) == 1 -# --- TableScanContext.execution_context --- +# --- TableScanContext.metric_attributes --- -def test_table_scan_context_default_execution_context_is_empty(tmp_path): +def test_table_scan_context_default_metric_attributes_is_empty(tmp_path): split = _make_split(tmp_path) - assert dict(split._scan_context.execution_context) == {} + assert dict(split._scan_context.metric_attributes) == {} -def test_table_scan_context_pickle_preserves_execution_context(tmp_path): - split = _make_split(tmp_path, execution_context={"tenant": "t1"}) +def test_table_scan_context_pickle_preserves_metric_attributes(tmp_path): + split = _make_split(tmp_path, metric_attributes={"tenant": "t1"}) restored = pickle.loads(pickle.dumps(split._scan_context)) - assert dict(restored.execution_context) == {"tenant": "t1"} + assert dict(restored.metric_attributes) == {"tenant": "t1"} From 4af7af8f81b40d93c2b9955fc38d11bb165ac1fd Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Tue, 12 May 2026 13:42:43 -0700 Subject: [PATCH 3/7] [DataLoader] Inline instruments and adopt LinkedIn PascalCase naming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drops the dedicated metrics/instruments.py module; instruments are now declared at the top of the file that uses them (data_loader.py for the load_table / plan_files pair, data_loader_split.py for the split / batch pairs). Matches the pattern used by OSS Python OTel libraries that emit their own metrics — no central inventory file, no extra import indirection. Metric and attribute names switch to LinkedIn's ``{Domain}.{Subsystem}.{MetricName}`` PascalCase convention (``OpenHouse.DataLoader.LoadTableTime``, ``OpenHouse.Database``, etc.) per observability-instrumentation/concepts/metrics.md. METER_NAME becomes ``OpenHouse.DataLoader``. The OSS opentelemetry-sdk lowercases instrument names at registration time, so the stored/exported form ends up as ``openhouse.dataloader.loadtabletime``; tests assert against that lowercased form and a docstring on the test helper points to the SDK source so the discrepancy is obvious to a future reader. Source matches LinkedIn schema-registry convention; runtime form is whatever the SDK emits. build_attributes now merges caller-provided extras verbatim — no ``openhouse.ctx.*`` namespacing — so callers control dimension naming entirely. --- .../dataloader/_table_scan_context.py | 3 +- .../src/openhouse/dataloader/data_loader.py | 43 +++++--- .../openhouse/dataloader/data_loader_split.py | 76 +++++++++++-- .../openhouse/dataloader/metrics/__init__.py | 5 +- .../dataloader/metrics/attributes.py | 12 +-- .../dataloader/metrics/instruments.py | 102 ------------------ .../python/dataloader/tests/test_metrics.py | 92 +++++++++------- 7 files changed, 153 insertions(+), 180 deletions(-) delete mode 100644 integrations/python/dataloader/src/openhouse/dataloader/metrics/instruments.py diff --git a/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py b/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py index 7038242b0..1b7d3ac33 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py @@ -45,8 +45,7 @@ class TableScanContext: table_id: Identifier for the table being scanned row_filter: Row-level filter expression pushed down to the scan worker_jvm_args: JVM arguments applied when the JNI JVM is created in worker processes - metric_attributes: Caller-provided attributes (e.g. tenant, environment) attached - verbatim to every metric emitted while iterating splits. Caller controls naming. + metric_attributes: Attributes attached to every metric emitted while iterating splits. """ table_metadata: TableMetadata diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 4525aed2a..f0fc5e023 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -9,7 +9,7 @@ from types import MappingProxyType from typing import TypeVar -from opentelemetry.metrics import Counter, Histogram +from opentelemetry.metrics import Counter, Histogram, get_meter from pyiceberg.catalog import Catalog from pyiceberg.table import Table from pyiceberg.table.snapshots import Snapshot @@ -29,7 +29,7 @@ _to_pyiceberg, always_true, ) -from openhouse.dataloader.metrics import build_attributes, instruments +from openhouse.dataloader.metrics import METER_NAME, build_attributes from openhouse.dataloader.scan_optimizer import optimize_scan from openhouse.dataloader.table_identifier import TableIdentifier from openhouse.dataloader.table_transformer import TableTransformer @@ -37,6 +37,29 @@ logger = logging.getLogger(__name__) +_meter = get_meter(METER_NAME) + +_load_table_duration = _meter.create_histogram( + name="OpenHouse.DataLoader.LoadTableTime", + unit="s", + description="Duration of the load_table call.", +) +_load_table_attempts = _meter.create_counter( + name="OpenHouse.DataLoader.LoadTableAttempts", + unit="1", + description="Attempt count for the load_table call.", +) +_plan_files_duration = _meter.create_histogram( + name="OpenHouse.DataLoader.PlanFilesTime", + unit="s", + description="Duration of the plan_files call.", +) +_plan_files_attempts = _meter.create_counter( + name="OpenHouse.DataLoader.PlanFilesAttempts", + unit="1", + description="Attempt count for the plan_files call.", +) + def _is_transient(exc: BaseException) -> bool: """Return True if the exception is transient and worth retrying.""" @@ -67,9 +90,6 @@ def _retry( Retries on ``OSError`` (transient network/storage I/O failures), except ``HTTPError`` which is only retried for 5xx status codes. Uses exponential backoff with up to *max_attempts* total attempts. - - Bumps *attempts_counter* once per attempt and records *duration_histogram* - once when the call ultimately returns or raises (wall-clock across retries). """ overall_start = time.monotonic() try: @@ -119,10 +139,7 @@ class DataLoaderContext: Args: execution_context: Dictionary of execution context information (e.g. tenant, environment) - metric_attributes: Optional attributes attached to every metric emitted by this loader - (e.g. ``{"Tenant": "team-a"}``). Keys/values are passed through verbatim so - callers control naming. Keep cardinality bounded — LinkedIn pre-agg expects - fewer than ~100 unique values per dimension. + metric_attributes: Attributes attached to every metric emitted by this loader. table_transformer: Transformation to apply to the table before loading (e.g. column masking) udf_registry: UDFs required for the table transformation jvm_config: JVM configuration for JNI-based storage access. Currently only HDFS is supported @@ -196,8 +213,8 @@ def _iceberg_table(self) -> Table: lambda: self._catalog.load_table((self._table_id.database, self._table_id.table)), label=f"load_table {self._table_id}", max_attempts=self._max_attempts, - duration_histogram=instruments.load_table_duration, - attempts_counter=instruments.load_table_attempts, + duration_histogram=_load_table_duration, + attempts_counter=_load_table_attempts, attributes=build_attributes(self._table_id, self._context.metric_attributes), ) @@ -311,8 +328,8 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: lambda: scan.plan_files(), label=f"plan_files {self._table_id}", max_attempts=self._max_attempts, - duration_histogram=instruments.plan_files_duration, - attempts_counter=instruments.plan_files_attempts, + duration_histogram=_plan_files_duration, + attempts_counter=_plan_files_attempts, attributes=build_attributes(self._table_id, self._context.metric_attributes), ) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py index 3c78d3b80..73bd12a19 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py @@ -10,6 +10,7 @@ from datafusion import SessionConfig from datafusion.context import SessionContext +from opentelemetry.metrics import get_meter from pyarrow import RecordBatch from pyiceberg.io.pyarrow import ArrowScan from pyiceberg.table import ArrivalOrder, FileScanTask @@ -18,12 +19,65 @@ from openhouse.dataloader._table_scan_context import TableScanContext from openhouse.dataloader._timer import log_duration from openhouse.dataloader.filters import _quote_identifier -from openhouse.dataloader.metrics import build_attributes, instruments +from openhouse.dataloader.metrics import METER_NAME, build_attributes from openhouse.dataloader.table_identifier import TableIdentifier from openhouse.dataloader.udf_registry import NoOpRegistry, UDFRegistry logger = logging.getLogger(__name__) +_meter = get_meter(METER_NAME) + +_split_duration = _meter.create_histogram( + name="OpenHouse.DataLoader.SplitTime", + unit="s", + description="Duration of a DataLoaderSplit iteration.", +) +_split_files = _meter.create_histogram( + name="OpenHouse.DataLoader.SplitFiles", + unit="1", + description="Files in a DataLoaderSplit.", +) +_split_rows = _meter.create_histogram( + name="OpenHouse.DataLoader.SplitRows", + unit="1", + description="Rows yielded by a DataLoaderSplit.", +) +_split_bytes = _meter.create_histogram( + name="OpenHouse.DataLoader.SplitBytes", + unit="By", + description="Bytes yielded by a DataLoaderSplit.", +) +_split_batches = _meter.create_histogram( + name="OpenHouse.DataLoader.SplitBatches", + unit="1", + description="RecordBatches yielded by a DataLoaderSplit.", +) +_split_errors = _meter.create_counter( + name="OpenHouse.DataLoader.SplitErrors", + unit="1", + description="Errors from a DataLoaderSplit iteration.", +) +_batch_duration = _meter.create_histogram( + name="OpenHouse.DataLoader.BatchTime", + unit="s", + description="Duration of a RecordBatch read.", +) +_batch_rows = _meter.create_histogram( + name="OpenHouse.DataLoader.BatchRows", + unit="1", + description="Rows in a RecordBatch.", +) +_batch_bytes = _meter.create_histogram( + name="OpenHouse.DataLoader.BatchBytes", + unit="By", + description="Bytes in a RecordBatch.", +) +_batch_errors = _meter.create_counter( + name="OpenHouse.DataLoader.BatchErrors", + unit="1", + description="Errors raised while reading a RecordBatch.", +) + def to_sql_identifier(table_id: TableIdentifier) -> str: """Return the quoted DataFusion SQL identifier, e.g. ``"db"."tbl"``.""" @@ -86,15 +140,15 @@ def __next__(self) -> RecordBatch: except Exception: elapsed = time.monotonic() - start logger.warning("record_batch %s [%d] failed after %.3fs", self._split_id, self._idx, elapsed) - instruments.batch_errors.add(1, self._attributes) + _batch_errors.add(1, self._attributes) raise elapsed = time.monotonic() - start logger.info("record_batch %s [%d] in %.3fs", self._split_id, self._idx, elapsed) rows = batch.num_rows nbytes = batch.nbytes - instruments.batch_duration.record(elapsed, self._attributes) - instruments.batch_rows.record(rows, self._attributes) - instruments.batch_bytes.record(nbytes, self._attributes) + _batch_duration.record(elapsed, self._attributes) + _batch_rows.record(rows, self._attributes) + _batch_bytes.record(nbytes, self._attributes) self.total_rows += rows self.total_bytes += nbytes self.batch_count += 1 @@ -192,15 +246,15 @@ def __iter__(self) -> Iterator[RecordBatch]: session = _create_transform_session(self._scan_context.table_id, self._udf_registry, self._batch_size) yield from _timed_transform(chain([first], timed), split_id, session, self._apply_transform) except BaseException: - instruments.split_errors.add(1, attributes) + _split_errors.add(1, attributes) raise finally: - instruments.split_duration.record(time.monotonic() - split_start, attributes) - instruments.split_files.record(len(self._file_scan_tasks), attributes) + _split_duration.record(time.monotonic() - split_start, attributes) + _split_files.record(len(self._file_scan_tasks), attributes) if timed is not None: - instruments.split_rows.record(timed.total_rows, attributes) - instruments.split_bytes.record(timed.total_bytes, attributes) - instruments.split_batches.record(timed.batch_count, attributes) + _split_rows.record(timed.total_rows, attributes) + _split_bytes.record(timed.total_bytes, attributes) + _split_batches.record(timed.batch_count, attributes) def _apply_transform(self, session: SessionContext, batch: RecordBatch) -> Iterator[RecordBatch]: """Execute the transform SQL against a single RecordBatch.""" diff --git a/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py b/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py index e657b3ecc..758b17797 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py @@ -16,9 +16,8 @@ helper :func:`openhouse.dataloader.metrics.attributes.build_attributes`. """ -METER_NAME = "openhouse.dataloader" +METER_NAME = "OpenHouse.DataLoader" -from openhouse.dataloader.metrics import instruments # noqa: E402 from openhouse.dataloader.metrics.attributes import build_attributes # noqa: E402 -__all__ = ["METER_NAME", "build_attributes", "instruments"] +__all__ = ["METER_NAME", "build_attributes"] diff --git a/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py b/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py index 5febfbf82..23567a10a 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py @@ -11,16 +11,10 @@ def build_attributes( table_id: TableIdentifier, extra: Mapping[str, str] | None = None, ) -> dict[str, str]: - """Return the standard metric attributes for a table read. - - Caller-provided ``extra`` attributes are merged in verbatim (no namespacing), - so callers can supply exactly the dimensions they want on every emission — - e.g. ``{"tenant": "team-a", "env": "prod"}``. Caller keys override built-ins - on collision. - """ + """Return the metric attributes for *table_id*, merged with *extra*.""" attrs: dict[str, str] = { - "openhouse.database": table_id.database, - "openhouse.table": table_id.table, + "OpenHouse.Database": table_id.database, + "OpenHouse.Table": table_id.table, } if extra: attrs.update(extra) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/metrics/instruments.py b/integrations/python/dataloader/src/openhouse/dataloader/metrics/instruments.py deleted file mode 100644 index 64d91fb20..000000000 --- a/integrations/python/dataloader/src/openhouse/dataloader/metrics/instruments.py +++ /dev/null @@ -1,102 +0,0 @@ -"""OpenTelemetry instrument singletons for the dataloader. - -All instruments are bound to a single ``Meter`` obtained via ``get_meter(METER_NAME)``. -Call sites import the module-level objects rather than constructing their own so the -instrument inventory stays in one place. - -Names use lowercase dotted form (``openhouse.dataloader..``) because -the OSS OpenTelemetry SDK normalizes metric names to lowercase on export, which would -otherwise turn the LinkedIn-spec PascalCase form into unreadable concatenated tokens -(e.g. ``OpenHouse.DataLoader.LoadTableTime`` → ``openhouse.dataloader.loadtabletime``). -The LinkedIn ``linkedin.opentelemetry`` wrapper preserves PascalCase via its registry, -but this library depends only on ``opentelemetry-api`` so consumers can wire any SDK. - -When no SDK is configured, ``opentelemetry-api`` returns no-op instruments and the -``record``/``add`` calls are cheap. -""" - -from __future__ import annotations - -from opentelemetry.metrics import get_meter - -from openhouse.dataloader.metrics import METER_NAME - -_meter = get_meter(METER_NAME) - -# Retried catalog/metadata operations. Duration is wall-clock across all attempts; -# attempts counts every try (success or failure). -load_table_duration = _meter.create_histogram( - name="openhouse.dataloader.load_table.duration", - unit="s", - description="Wall-clock duration of the load_table call (across retries).", -) -load_table_attempts = _meter.create_counter( - name="openhouse.dataloader.load_table.attempts", - unit="1", - description="Attempt count for the load_table call (each try, success or failure).", -) - -plan_files_duration = _meter.create_histogram( - name="openhouse.dataloader.plan_files.duration", - unit="s", - description="Wall-clock duration of the plan_files call (across retries).", -) -plan_files_attempts = _meter.create_counter( - name="openhouse.dataloader.plan_files.attempts", - unit="1", - description="Attempt count for the plan_files call (each try, success or failure).", -) - -# Per-split read. -split_duration = _meter.create_histogram( - name="openhouse.dataloader.split.duration", - unit="s", - description="Wall-clock duration of a DataLoaderSplit iteration.", -) -split_files = _meter.create_histogram( - name="openhouse.dataloader.split.files", - unit="1", - description="Number of files in a DataLoaderSplit.", -) -split_rows = _meter.create_histogram( - name="openhouse.dataloader.split.rows", - unit="1", - description="Total rows yielded by a DataLoaderSplit.", -) -split_bytes = _meter.create_histogram( - name="openhouse.dataloader.split.bytes", - unit="By", - description="Total decompressed bytes yielded by a DataLoaderSplit.", -) -split_batches = _meter.create_histogram( - name="openhouse.dataloader.split.batches", - unit="1", - description="Total RecordBatches yielded by a DataLoaderSplit.", -) -split_errors = _meter.create_counter( - name="openhouse.dataloader.split.errors", - unit="1", - description="Unhandled exceptions raised from a DataLoaderSplit iteration.", -) - -# Per-batch read. -batch_duration = _meter.create_histogram( - name="openhouse.dataloader.batch.duration", - unit="s", - description="Wall-clock duration of reading a single RecordBatch.", -) -batch_rows = _meter.create_histogram( - name="openhouse.dataloader.batch.rows", - unit="1", - description="Rows in a single RecordBatch.", -) -batch_bytes = _meter.create_histogram( - name="openhouse.dataloader.batch.bytes", - unit="By", - description="Decompressed bytes of a single RecordBatch.", -) -batch_errors = _meter.create_counter( - name="openhouse.dataloader.batch.errors", - unit="1", - description="Unhandled exceptions raised while reading a RecordBatch.", -) diff --git a/integrations/python/dataloader/tests/test_metrics.py b/integrations/python/dataloader/tests/test_metrics.py index 90953c633..9c2c0133c 100644 --- a/integrations/python/dataloader/tests/test_metrics.py +++ b/integrations/python/dataloader/tests/test_metrics.py @@ -24,16 +24,22 @@ from pyiceberg.types import LongType, NestedField from openhouse.dataloader._table_scan_context import TableScanContext -from openhouse.dataloader.data_loader import _retry +from openhouse.dataloader.data_loader import ( + _load_table_attempts, + _load_table_duration, + _plan_files_attempts, + _plan_files_duration, + _retry, +) from openhouse.dataloader.data_loader_split import DataLoaderSplit -from openhouse.dataloader.metrics import METER_NAME, build_attributes, instruments +from openhouse.dataloader.metrics import METER_NAME, build_attributes from openhouse.dataloader.table_identifier import TableIdentifier # --- Meter / METER_NAME basics --- def test_meter_name_is_stable(): - assert METER_NAME == "openhouse.dataloader" + assert METER_NAME == "OpenHouse.DataLoader" def test_get_meter_with_meter_name_returns_a_meter(): @@ -46,20 +52,20 @@ def test_get_meter_with_meter_name_returns_a_meter(): def test_build_attributes_includes_database_table(): table_id = TableIdentifier("db1", "tbl1") attrs = build_attributes(table_id, None) - assert attrs == {"openhouse.database": "db1", "openhouse.table": "tbl1"} + assert attrs == {"OpenHouse.Database": "db1", "OpenHouse.Table": "tbl1"} def test_build_attributes_merges_caller_provided_attributes_verbatim(): table_id = TableIdentifier("db1", "tbl1") - attrs = build_attributes(table_id, {"tenant": "team-a", "env": "prod"}) - assert attrs["tenant"] == "team-a" - assert attrs["env"] == "prod" + attrs = build_attributes(table_id, {"Tenant": "team-a", "Env": "prod"}) + assert attrs["Tenant"] == "team-a" + assert attrs["Env"] == "prod" def test_build_attributes_caller_keys_override_builtins(): table_id = TableIdentifier("db1", "tbl1") - attrs = build_attributes(table_id, {"openhouse.table": "override"}) - assert attrs["openhouse.table"] == "override" + attrs = build_attributes(table_id, {"OpenHouse.Table": "override"}) + assert attrs["OpenHouse.Table"] == "override" # --- InMemoryMetricReader harness --- @@ -87,7 +93,13 @@ def metrics_reader() -> Iterator[InMemoryMetricReader]: def _data_points(reader: InMemoryMetricReader, metric_name: str) -> list: - """Collect and return all data points for *metric_name* across scopes.""" + """Collect and return all data points for *metric_name* across scopes. + + ``metric_name`` must be the lowercase form stored by the SDK — the + OpenTelemetry SDK lowercases instrument names at registration time + (``opentelemetry/sdk/metrics/_internal/instrument.py``), even though + the declared names are PascalCase. + """ data = reader.get_metrics_data() points: list = [] if data is None: @@ -114,25 +126,25 @@ def test_retry_emits_one_attempt_and_one_duration_on_success(metrics_reader): lambda: "ok", label="load_table db.tbl", max_attempts=3, - duration_histogram=instruments.load_table_duration, - attempts_counter=instruments.load_table_attempts, + duration_histogram=_load_table_duration, + attempts_counter=_load_table_attempts, attributes=attrs, ) assert result == "ok" - attempts = _data_points(metrics_reader, "openhouse.dataloader.load_table.attempts") + attempts = _data_points(metrics_reader, "openhouse.dataloader.loadtableattempts") assert len(attempts) == 1 assert _attrs(attempts[0]) == attrs assert attempts[0].value == 1 - durations = _data_points(metrics_reader, "openhouse.dataloader.load_table.duration") + durations = _data_points(metrics_reader, "openhouse.dataloader.loadtabletime") assert len(durations) == 1 assert _attrs(durations[0]) == attrs def test_retry_counts_each_attempt_on_transient_then_success(metrics_reader): table_id = TableIdentifier("db", "tbl") - attrs = build_attributes(table_id, {"tenant": "t1"}) + attrs = build_attributes(table_id, {"Tenant": "t1"}) calls = {"n": 0} def fn(): @@ -145,19 +157,19 @@ def fn(): fn, label="plan_files db.tbl", max_attempts=3, - duration_histogram=instruments.plan_files_duration, - attempts_counter=instruments.plan_files_attempts, + duration_histogram=_plan_files_duration, + attempts_counter=_plan_files_attempts, attributes=attrs, ) assert result == "ok" assert calls["n"] == 2 - attempts = _data_points(metrics_reader, "openhouse.dataloader.plan_files.attempts") + attempts = _data_points(metrics_reader, "openhouse.dataloader.planfilesattempts") assert len(attempts) == 1 assert attempts[0].value == 2 - assert _attrs(attempts[0])["tenant"] == "t1" + assert _attrs(attempts[0])["Tenant"] == "t1" - durations = _data_points(metrics_reader, "openhouse.dataloader.plan_files.duration") + durations = _data_points(metrics_reader, "openhouse.dataloader.planfilestime") assert len(durations) == 1 @@ -176,16 +188,16 @@ def fn(): fn, label="load_table", max_attempts=3, - duration_histogram=instruments.load_table_duration, - attempts_counter=instruments.load_table_attempts, + duration_histogram=_load_table_duration, + attempts_counter=_load_table_attempts, attributes=attrs, ) - attempts = _data_points(metrics_reader, "openhouse.dataloader.load_table.attempts") + attempts = _data_points(metrics_reader, "openhouse.dataloader.loadtableattempts") assert len(attempts) == 1 assert attempts[0].value == 1 - durations = _data_points(metrics_reader, "openhouse.dataloader.load_table.duration") + durations = _data_points(metrics_reader, "openhouse.dataloader.loadtabletime") assert len(durations) == 1 @@ -226,41 +238,41 @@ def _make_split(tmp_path, metric_attributes: dict | None = None) -> DataLoaderSp def test_split_emits_per_split_and_per_batch_metrics(tmp_path, metrics_reader): - split = _make_split(tmp_path, metric_attributes={"tenant": "t1"}) + split = _make_split(tmp_path, metric_attributes={"Tenant": "t1"}) batches = list(split) assert sum(b.num_rows for b in batches) == 3 expected_attrs = { - "openhouse.database": "db", - "openhouse.table": "tbl", - "tenant": "t1", + "OpenHouse.Database": "db", + "OpenHouse.Table": "tbl", + "Tenant": "t1", } - split_duration = _data_points(metrics_reader, "openhouse.dataloader.split.duration") + split_duration = _data_points(metrics_reader, "openhouse.dataloader.splittime") assert len(split_duration) == 1 assert _attrs(split_duration[0]) == expected_attrs - split_files = _data_points(metrics_reader, "openhouse.dataloader.split.files") + split_files = _data_points(metrics_reader, "openhouse.dataloader.splitfiles") assert len(split_files) == 1 assert split_files[0].sum == 1 - split_rows = _data_points(metrics_reader, "openhouse.dataloader.split.rows") + split_rows = _data_points(metrics_reader, "openhouse.dataloader.splitrows") assert len(split_rows) == 1 assert split_rows[0].sum == 3 - split_bytes = _data_points(metrics_reader, "openhouse.dataloader.split.bytes") + split_bytes = _data_points(metrics_reader, "openhouse.dataloader.splitbytes") assert len(split_bytes) == 1 assert split_bytes[0].sum > 0 - split_batches = _data_points(metrics_reader, "openhouse.dataloader.split.batches") + split_batches = _data_points(metrics_reader, "openhouse.dataloader.splitbatches") assert len(split_batches) == 1 assert split_batches[0].sum >= 1 - batch_duration = _data_points(metrics_reader, "openhouse.dataloader.batch.duration") + batch_duration = _data_points(metrics_reader, "openhouse.dataloader.batchtime") assert len(batch_duration) == 1 assert _attrs(batch_duration[0]) == expected_attrs - batch_rows = _data_points(metrics_reader, "openhouse.dataloader.batch.rows") + batch_rows = _data_points(metrics_reader, "openhouse.dataloader.batchrows") assert len(batch_rows) == 1 assert batch_rows[0].sum == 3 @@ -286,16 +298,16 @@ def _gen(): with pytest.raises(_ReaderError): list(split) - batch_errors = _data_points(metrics_reader, "openhouse.dataloader.batch.errors") + batch_errors = _data_points(metrics_reader, "openhouse.dataloader.batcherrors") assert len(batch_errors) == 1 assert batch_errors[0].value == 1 - split_errors = _data_points(metrics_reader, "openhouse.dataloader.split.errors") + split_errors = _data_points(metrics_reader, "openhouse.dataloader.spliterrors") assert len(split_errors) == 1 assert split_errors[0].value == 1 # split.duration is still recorded on failure - split_duration = _data_points(metrics_reader, "openhouse.dataloader.split.duration") + split_duration = _data_points(metrics_reader, "openhouse.dataloader.splittime") assert len(split_duration) == 1 @@ -308,6 +320,6 @@ def test_table_scan_context_default_metric_attributes_is_empty(tmp_path): def test_table_scan_context_pickle_preserves_metric_attributes(tmp_path): - split = _make_split(tmp_path, metric_attributes={"tenant": "t1"}) + split = _make_split(tmp_path, metric_attributes={"Tenant": "t1"}) restored = pickle.loads(pickle.dumps(split._scan_context)) - assert dict(restored.metric_attributes) == {"tenant": "t1"} + assert dict(restored.metric_attributes) == {"Tenant": "t1"} From 61d607e4053ab95a13ef9a77a8b29e1556c378e3 Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Wed, 13 May 2026 13:49:26 -0700 Subject: [PATCH 4/7] [DataLoader] Rewrite metric descriptions in operator-readable language Replaces internal identifiers (load_table, plan_files, DataLoaderSplit, RecordBatch) in instrument descriptions with prose suited to dashboard labels. Also reverts the metrics package docstring to match master. --- .../src/openhouse/dataloader/data_loader.py | 8 ++++---- .../openhouse/dataloader/data_loader_split.py | 20 +++++++++---------- .../openhouse/dataloader/metrics/__init__.py | 6 +----- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index f0fc5e023..407a86cf5 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -42,22 +42,22 @@ _load_table_duration = _meter.create_histogram( name="OpenHouse.DataLoader.LoadTableTime", unit="s", - description="Duration of the load_table call.", + description="Time spent loading the Iceberg table from the catalog.", ) _load_table_attempts = _meter.create_counter( name="OpenHouse.DataLoader.LoadTableAttempts", unit="1", - description="Attempt count for the load_table call.", + description="Number of attempts to load the Iceberg table from the catalog.", ) _plan_files_duration = _meter.create_histogram( name="OpenHouse.DataLoader.PlanFilesTime", unit="s", - description="Duration of the plan_files call.", + description="Time spent planning which files to scan.", ) _plan_files_attempts = _meter.create_counter( name="OpenHouse.DataLoader.PlanFilesAttempts", unit="1", - description="Attempt count for the plan_files call.", + description="Number of attempts to plan files for the scan.", ) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py index 73bd12a19..67c2dff35 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py @@ -30,52 +30,52 @@ _split_duration = _meter.create_histogram( name="OpenHouse.DataLoader.SplitTime", unit="s", - description="Duration of a DataLoaderSplit iteration.", + description="Time spent iterating a split.", ) _split_files = _meter.create_histogram( name="OpenHouse.DataLoader.SplitFiles", unit="1", - description="Files in a DataLoaderSplit.", + description="Number of files in a split.", ) _split_rows = _meter.create_histogram( name="OpenHouse.DataLoader.SplitRows", unit="1", - description="Rows yielded by a DataLoaderSplit.", + description="Rows yielded by a split.", ) _split_bytes = _meter.create_histogram( name="OpenHouse.DataLoader.SplitBytes", unit="By", - description="Bytes yielded by a DataLoaderSplit.", + description="Bytes yielded by a split.", ) _split_batches = _meter.create_histogram( name="OpenHouse.DataLoader.SplitBatches", unit="1", - description="RecordBatches yielded by a DataLoaderSplit.", + description="Record batches yielded by a split.", ) _split_errors = _meter.create_counter( name="OpenHouse.DataLoader.SplitErrors", unit="1", - description="Errors from a DataLoaderSplit iteration.", + description="Errors raised while iterating a split.", ) _batch_duration = _meter.create_histogram( name="OpenHouse.DataLoader.BatchTime", unit="s", - description="Duration of a RecordBatch read.", + description="Time spent reading a record batch.", ) _batch_rows = _meter.create_histogram( name="OpenHouse.DataLoader.BatchRows", unit="1", - description="Rows in a RecordBatch.", + description="Rows in a record batch.", ) _batch_bytes = _meter.create_histogram( name="OpenHouse.DataLoader.BatchBytes", unit="By", - description="Bytes in a RecordBatch.", + description="Bytes in a record batch.", ) _batch_errors = _meter.create_counter( name="OpenHouse.DataLoader.BatchErrors", unit="1", - description="Errors raised while reading a RecordBatch.", + description="Errors raised while reading a record batch.", ) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py b/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py index 758b17797..94fb994f5 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py @@ -4,16 +4,12 @@ fallback when no SDK is configured. The *application* (not this library) is responsible for installing an SDK and configuring exporters. -External code that wants the dataloader's meter can call:: +Call sites should obtain a ``Meter`` via the OTEL API directly:: from opentelemetry.metrics import get_meter from openhouse.dataloader.metrics import METER_NAME meter = get_meter(METER_NAME) - -Internal emission inside the dataloader uses the module-level instruments -in :mod:`openhouse.dataloader.metrics.instruments` paired with the attribute -helper :func:`openhouse.dataloader.metrics.attributes.build_attributes`. """ METER_NAME = "OpenHouse.DataLoader" From 35c8a95802111d75441f4b317c76c503f0e89644 Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Thu, 14 May 2026 11:39:21 -0700 Subject: [PATCH 5/7] [DataLoader] Derive metric attributes from execution_context whitelist Replaces DataLoaderContext.metric_attributes with metric_attribute_keys, a whitelist of keys pulled from execution_context. Single source of truth for caller-supplied identifiers; callers opt in to which keys become metric dimensions, keeping cardinality controlled. --- .../src/openhouse/dataloader/data_loader.py | 18 ++++++--- .../python/dataloader/tests/test_metrics.py | 39 +++++++++++++++++++ 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 0bf9aee7d..95bbb4402 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -140,7 +140,7 @@ class DataLoaderContext: Args: execution_context: Dictionary of execution context information (e.g. tenant, environment) - metric_attributes: Attributes attached to every metric emitted by this loader. + metric_attribute_keys: Keys from ``execution_context`` to attach as dimensions on emitted metrics. table_transformer: Transformation to apply to the table before loading (e.g. column masking) udf_registry: UDFs required for the table transformation jvm_config: JVM configuration for JNI-based storage access. Currently only HDFS is supported @@ -148,7 +148,7 @@ class DataLoaderContext: """ execution_context: Mapping[str, str] | None = None - metric_attributes: Mapping[str, str] | None = None + metric_attribute_keys: Sequence[str] | None = None table_transformer: TableTransformer | None = None udf_registry: UDFRegistry | None = None jvm_config: JvmConfig | None = None @@ -209,6 +209,14 @@ def __init__( if self._context.jvm_config is not None and self._context.jvm_config.planner_args is not None: apply_libhdfs_opts(self._context.jvm_config.planner_args) + @cached_property + def _resolved_metric_attributes(self) -> Mapping[str, str]: + keys = self._context.metric_attribute_keys + if not keys: + return {} + execution_context = self._context.execution_context or {} + return {k: execution_context[k] for k in keys if k in execution_context} + @cached_property def _iceberg_table(self) -> Table: return _retry( @@ -217,7 +225,7 @@ def _iceberg_table(self) -> Table: max_attempts=self._max_attempts, duration_histogram=_load_table_duration, attempts_counter=_load_table_attempts, - attributes=build_attributes(self._table_id, self._context.metric_attributes), + attributes=build_attributes(self._table_id, self._resolved_metric_attributes), ) @property @@ -326,7 +334,7 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: row_filter=row_filter, table_id=self._table_id, worker_jvm_args=self._context.jvm_config.worker_args if self._context.jvm_config else None, - metric_attributes=self._context.metric_attributes or {}, + metric_attributes=self._resolved_metric_attributes, ) # plan_files() materializes all tasks at once (PyIceberg doesn't support streaming) @@ -337,7 +345,7 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: max_attempts=self._max_attempts, duration_histogram=_plan_files_duration, attempts_counter=_plan_files_attempts, - attributes=build_attributes(self._table_id, self._context.metric_attributes), + attributes=build_attributes(self._table_id, self._resolved_metric_attributes), ) for chunk in _batched(scan_tasks, self._files_per_split): diff --git a/integrations/python/dataloader/tests/test_metrics.py b/integrations/python/dataloader/tests/test_metrics.py index 9c2c0133c..1102263cd 100644 --- a/integrations/python/dataloader/tests/test_metrics.py +++ b/integrations/python/dataloader/tests/test_metrics.py @@ -5,6 +5,7 @@ import os import pickle from collections.abc import Iterator +from unittest.mock import MagicMock import pyarrow as pa import pyarrow.parquet as pq @@ -23,6 +24,7 @@ from pyiceberg.table.sorting import UNSORTED_SORT_ORDER from pyiceberg.types import LongType, NestedField +from openhouse.dataloader import DataLoaderContext, OpenHouseDataLoader from openhouse.dataloader._table_scan_context import TableScanContext from openhouse.dataloader.data_loader import ( _load_table_attempts, @@ -68,6 +70,43 @@ def test_build_attributes_caller_keys_override_builtins(): assert attrs["OpenHouse.Table"] == "override" +# --- DataLoaderContext.metric_attribute_keys resolution --- + + +def _loader(context: DataLoaderContext) -> OpenHouseDataLoader: + return OpenHouseDataLoader(catalog=MagicMock(), database="db", table="tbl", context=context) + + +def test_resolved_metric_attributes_picks_whitelisted_keys(): + loader = _loader( + DataLoaderContext( + execution_context={"tenant": "t1", "env": "prod", "user_id": "u-42"}, + metric_attribute_keys=["tenant", "env"], + ) + ) + assert dict(loader._resolved_metric_attributes) == {"tenant": "t1", "env": "prod"} + + +def test_resolved_metric_attributes_skips_missing_keys(): + loader = _loader( + DataLoaderContext( + execution_context={"tenant": "t1"}, + metric_attribute_keys=["tenant", "env"], + ) + ) + assert dict(loader._resolved_metric_attributes) == {"tenant": "t1"} + + +def test_resolved_metric_attributes_empty_when_no_keys_configured(): + loader = _loader(DataLoaderContext(execution_context={"tenant": "t1"})) + assert dict(loader._resolved_metric_attributes) == {} + + +def test_resolved_metric_attributes_empty_when_execution_context_missing(): + loader = _loader(DataLoaderContext(metric_attribute_keys=["tenant"])) + assert dict(loader._resolved_metric_attributes) == {} + + # --- InMemoryMetricReader harness --- From 9f163c8974a69d4c0e15d0052ba3224f100c12da Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Thu, 14 May 2026 11:45:37 -0700 Subject: [PATCH 6/7] [DataLoader] Fold table identifier into resolved metric attributes The build_attributes helper merged OpenHouse.Database / OpenHouse.Table with caller-supplied extras at every emission boundary. Move that merge into the loader's cached _resolved_metric_attributes property so the table-level identifier is part of the dict from the start, and drop the helper plus its module. --- .../src/openhouse/dataloader/data_loader.py | 20 ++++--- .../openhouse/dataloader/data_loader_split.py | 4 +- .../openhouse/dataloader/metrics/__init__.py | 4 +- .../dataloader/metrics/attributes.py | 21 ------- .../python/dataloader/tests/test_metrics.py | 60 ++++++------------- 5 files changed, 35 insertions(+), 74 deletions(-) delete mode 100644 integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 95bbb4402..44b8904f3 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -30,7 +30,7 @@ _to_pyiceberg, always_true, ) -from openhouse.dataloader.metrics import METER_NAME, build_attributes +from openhouse.dataloader.metrics import METER_NAME from openhouse.dataloader.scan_optimizer import optimize_scan from openhouse.dataloader.table_identifier import TableIdentifier from openhouse.dataloader.table_transformer import TableTransformer @@ -211,11 +211,17 @@ def __init__( @cached_property def _resolved_metric_attributes(self) -> Mapping[str, str]: + attrs: dict[str, str] = { + "OpenHouse.Database": self._table_id.database, + "OpenHouse.Table": self._table_id.table, + } keys = self._context.metric_attribute_keys - if not keys: - return {} - execution_context = self._context.execution_context or {} - return {k: execution_context[k] for k in keys if k in execution_context} + if keys: + execution_context = self._context.execution_context or {} + for k in keys: + if k in execution_context: + attrs[k] = execution_context[k] + return attrs @cached_property def _iceberg_table(self) -> Table: @@ -225,7 +231,7 @@ def _iceberg_table(self) -> Table: max_attempts=self._max_attempts, duration_histogram=_load_table_duration, attempts_counter=_load_table_attempts, - attributes=build_attributes(self._table_id, self._resolved_metric_attributes), + attributes=self._resolved_metric_attributes, ) @property @@ -345,7 +351,7 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: max_attempts=self._max_attempts, duration_histogram=_plan_files_duration, attempts_counter=_plan_files_attempts, - attributes=build_attributes(self._table_id, self._resolved_metric_attributes), + attributes=self._resolved_metric_attributes, ) for chunk in _batched(scan_tasks, self._files_per_split): diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py index 67c2dff35..75b882cc9 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py @@ -19,7 +19,7 @@ from openhouse.dataloader._table_scan_context import TableScanContext from openhouse.dataloader._timer import log_duration from openhouse.dataloader.filters import _quote_identifier -from openhouse.dataloader.metrics import METER_NAME, build_attributes +from openhouse.dataloader.metrics import METER_NAME from openhouse.dataloader.table_identifier import TableIdentifier from openhouse.dataloader.udf_registry import NoOpRegistry, UDFRegistry @@ -213,7 +213,7 @@ def __iter__(self) -> Iterator[RecordBatch]: ctx = self._scan_context if ctx.worker_jvm_args is not None: apply_libhdfs_opts(ctx.worker_jvm_args) - attributes = build_attributes(ctx.table_id, ctx.metric_attributes) + attributes = ctx.metric_attributes split_start = time.monotonic() timed: _TimedBatchIter | None = None try: diff --git a/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py b/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py index 94fb994f5..d41e2aff1 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/metrics/__init__.py @@ -14,6 +14,4 @@ METER_NAME = "OpenHouse.DataLoader" -from openhouse.dataloader.metrics.attributes import build_attributes # noqa: E402 - -__all__ = ["METER_NAME", "build_attributes"] +__all__ = ["METER_NAME"] diff --git a/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py b/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py deleted file mode 100644 index 23567a10a..000000000 --- a/integrations/python/dataloader/src/openhouse/dataloader/metrics/attributes.py +++ /dev/null @@ -1,21 +0,0 @@ -"""Helpers for building the common metric attribute dict.""" - -from __future__ import annotations - -from collections.abc import Mapping - -from openhouse.dataloader.table_identifier import TableIdentifier - - -def build_attributes( - table_id: TableIdentifier, - extra: Mapping[str, str] | None = None, -) -> dict[str, str]: - """Return the metric attributes for *table_id*, merged with *extra*.""" - attrs: dict[str, str] = { - "OpenHouse.Database": table_id.database, - "OpenHouse.Table": table_id.table, - } - if extra: - attrs.update(extra) - return attrs diff --git a/integrations/python/dataloader/tests/test_metrics.py b/integrations/python/dataloader/tests/test_metrics.py index 1102263cd..6ecc701de 100644 --- a/integrations/python/dataloader/tests/test_metrics.py +++ b/integrations/python/dataloader/tests/test_metrics.py @@ -34,7 +34,7 @@ _retry, ) from openhouse.dataloader.data_loader_split import DataLoaderSplit -from openhouse.dataloader.metrics import METER_NAME, build_attributes +from openhouse.dataloader.metrics import METER_NAME from openhouse.dataloader.table_identifier import TableIdentifier # --- Meter / METER_NAME basics --- @@ -48,33 +48,19 @@ def test_get_meter_with_meter_name_returns_a_meter(): assert isinstance(get_meter(METER_NAME), Meter) -# --- build_attributes --- - - -def test_build_attributes_includes_database_table(): - table_id = TableIdentifier("db1", "tbl1") - attrs = build_attributes(table_id, None) - assert attrs == {"OpenHouse.Database": "db1", "OpenHouse.Table": "tbl1"} - - -def test_build_attributes_merges_caller_provided_attributes_verbatim(): - table_id = TableIdentifier("db1", "tbl1") - attrs = build_attributes(table_id, {"Tenant": "team-a", "Env": "prod"}) - assert attrs["Tenant"] == "team-a" - assert attrs["Env"] == "prod" +# --- DataLoaderContext.metric_attribute_keys resolution --- -def test_build_attributes_caller_keys_override_builtins(): - table_id = TableIdentifier("db1", "tbl1") - attrs = build_attributes(table_id, {"OpenHouse.Table": "override"}) - assert attrs["OpenHouse.Table"] == "override" +def _loader(context: DataLoaderContext) -> OpenHouseDataLoader: + return OpenHouseDataLoader(catalog=MagicMock(), database="db", table="tbl", context=context) -# --- DataLoaderContext.metric_attribute_keys resolution --- +_BASE_ATTRS = {"OpenHouse.Database": "db", "OpenHouse.Table": "tbl"} -def _loader(context: DataLoaderContext) -> OpenHouseDataLoader: - return OpenHouseDataLoader(catalog=MagicMock(), database="db", table="tbl", context=context) +def test_resolved_metric_attributes_includes_table_identifier_only_by_default(): + loader = _loader(DataLoaderContext()) + assert dict(loader._resolved_metric_attributes) == _BASE_ATTRS def test_resolved_metric_attributes_picks_whitelisted_keys(): @@ -84,7 +70,7 @@ def test_resolved_metric_attributes_picks_whitelisted_keys(): metric_attribute_keys=["tenant", "env"], ) ) - assert dict(loader._resolved_metric_attributes) == {"tenant": "t1", "env": "prod"} + assert dict(loader._resolved_metric_attributes) == {**_BASE_ATTRS, "tenant": "t1", "env": "prod"} def test_resolved_metric_attributes_skips_missing_keys(): @@ -94,17 +80,17 @@ def test_resolved_metric_attributes_skips_missing_keys(): metric_attribute_keys=["tenant", "env"], ) ) - assert dict(loader._resolved_metric_attributes) == {"tenant": "t1"} + assert dict(loader._resolved_metric_attributes) == {**_BASE_ATTRS, "tenant": "t1"} -def test_resolved_metric_attributes_empty_when_no_keys_configured(): +def test_resolved_metric_attributes_no_extras_when_no_keys_configured(): loader = _loader(DataLoaderContext(execution_context={"tenant": "t1"})) - assert dict(loader._resolved_metric_attributes) == {} + assert dict(loader._resolved_metric_attributes) == _BASE_ATTRS -def test_resolved_metric_attributes_empty_when_execution_context_missing(): +def test_resolved_metric_attributes_no_extras_when_execution_context_missing(): loader = _loader(DataLoaderContext(metric_attribute_keys=["tenant"])) - assert dict(loader._resolved_metric_attributes) == {} + assert dict(loader._resolved_metric_attributes) == _BASE_ATTRS # --- InMemoryMetricReader harness --- @@ -159,8 +145,7 @@ def _attrs(point) -> dict: def test_retry_emits_one_attempt_and_one_duration_on_success(metrics_reader): - table_id = TableIdentifier("db", "tbl") - attrs = build_attributes(table_id, None) + attrs = {"OpenHouse.Database": "db", "OpenHouse.Table": "tbl"} result = _retry( lambda: "ok", label="load_table db.tbl", @@ -182,8 +167,7 @@ def test_retry_emits_one_attempt_and_one_duration_on_success(metrics_reader): def test_retry_counts_each_attempt_on_transient_then_success(metrics_reader): - table_id = TableIdentifier("db", "tbl") - attrs = build_attributes(table_id, {"Tenant": "t1"}) + attrs = {"OpenHouse.Database": "db", "OpenHouse.Table": "tbl", "Tenant": "t1"} calls = {"n": 0} def fn(): @@ -213,8 +197,7 @@ def fn(): def test_retry_permanent_failure_still_records_duration(metrics_reader): - table_id = TableIdentifier("db", "tbl") - attrs = build_attributes(table_id, None) + attrs = {"OpenHouse.Database": "db", "OpenHouse.Table": "tbl"} class _NonTransient(Exception): pass @@ -277,16 +260,11 @@ def _make_split(tmp_path, metric_attributes: dict | None = None) -> DataLoaderSp def test_split_emits_per_split_and_per_batch_metrics(tmp_path, metrics_reader): - split = _make_split(tmp_path, metric_attributes={"Tenant": "t1"}) + expected_attrs = {**_BASE_ATTRS, "Tenant": "t1"} + split = _make_split(tmp_path, metric_attributes=expected_attrs) batches = list(split) assert sum(b.num_rows for b in batches) == 3 - expected_attrs = { - "OpenHouse.Database": "db", - "OpenHouse.Table": "tbl", - "Tenant": "t1", - } - split_duration = _data_points(metrics_reader, "openhouse.dataloader.splittime") assert len(split_duration) == 1 assert _attrs(split_duration[0]) == expected_attrs From 170b923bdc75ef713221c3c292c5a66150af7128 Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Fri, 15 May 2026 14:56:03 -0700 Subject: [PATCH 7/7] [DataLoader] Address PR #582 review: outcome counters + transform timing - Replace LoadTable/PlanFiles Attempts counters with explicit Success and Failure counters so alerting can target final outcome directly without deriving from durations. - Add a TransformTime histogram emitted per-batch from inside the existing _timed_transform generator so transformer execution is observable when a table transformer is configured. --- .../src/openhouse/dataloader/data_loader.py | 40 ++++++--- .../openhouse/dataloader/data_loader_split.py | 18 ++++- .../python/dataloader/tests/test_metrics.py | 81 ++++++++++++++----- 3 files changed, 101 insertions(+), 38 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 44b8904f3..da9a3e943 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -45,20 +45,30 @@ unit="s", description="Time spent loading the Iceberg table from the catalog.", ) -_load_table_attempts = _meter.create_counter( - name="OpenHouse.DataLoader.LoadTableAttempts", +_load_table_success = _meter.create_counter( + name="OpenHouse.DataLoader.LoadTableSuccess", unit="1", - description="Number of attempts to load the Iceberg table from the catalog.", + description="Successful loads of the Iceberg table from the catalog.", +) +_load_table_failure = _meter.create_counter( + name="OpenHouse.DataLoader.LoadTableFailure", + unit="1", + description="Failed loads of the Iceberg table from the catalog.", ) _plan_files_duration = _meter.create_histogram( name="OpenHouse.DataLoader.PlanFilesTime", unit="s", description="Time spent planning which files to scan.", ) -_plan_files_attempts = _meter.create_counter( - name="OpenHouse.DataLoader.PlanFilesAttempts", +_plan_files_success = _meter.create_counter( + name="OpenHouse.DataLoader.PlanFilesSuccess", + unit="1", + description="Successful file-planning operations for the scan.", +) +_plan_files_failure = _meter.create_counter( + name="OpenHouse.DataLoader.PlanFilesFailure", unit="1", - description="Number of attempts to plan files for the scan.", + description="Failed file-planning operations for the scan.", ) @@ -83,16 +93,18 @@ def _retry( label: str, max_attempts: int, duration_histogram: Histogram, - attempts_counter: Counter, + success_counter: Counter, + failure_counter: Counter, attributes: Mapping[str, str], ) -> _T: - """Call *fn* with retry logic, logging and emitting metrics for each attempt. + """Call *fn* with retry logic, logging the duration and recording the outcome. Retries on ``OSError`` (transient network/storage I/O failures), except ``HTTPError`` which is only retried for 5xx status codes. Uses exponential backoff with up to *max_attempts* total attempts. """ overall_start = time.monotonic() + succeeded = False try: for attempt in Retrying( retry=retry_if_exception(_is_transient), @@ -101,11 +113,13 @@ def _retry( reraise=True, ): with attempt, log_duration(logger, "%s (attempt %d)", label, attempt.retry_state.attempt_number): - attempts_counter.add(1, attributes) - return fn() + result = fn() + succeeded = True + return result raise AssertionError("unreachable") # pragma: no cover finally: duration_histogram.record(time.monotonic() - overall_start, attributes) + (success_counter if succeeded else failure_counter).add(1, attributes) @dataclass(frozen=True) @@ -230,7 +244,8 @@ def _iceberg_table(self) -> Table: label=f"load_table {self._table_id}", max_attempts=self._max_attempts, duration_histogram=_load_table_duration, - attempts_counter=_load_table_attempts, + success_counter=_load_table_success, + failure_counter=_load_table_failure, attributes=self._resolved_metric_attributes, ) @@ -350,7 +365,8 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: label=f"plan_files {self._table_id}", max_attempts=self._max_attempts, duration_histogram=_plan_files_duration, - attempts_counter=_plan_files_attempts, + success_counter=_plan_files_success, + failure_counter=_plan_files_failure, attributes=self._resolved_metric_attributes, ) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py index 75b882cc9..e9bd9ae82 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py @@ -77,6 +77,11 @@ unit="1", description="Errors raised while reading a record batch.", ) +_transform_duration = _meter.create_histogram( + name="OpenHouse.DataLoader.TransformTime", + unit="s", + description="Time spent applying the transform to a record batch.", +) def to_sql_identifier(table_id: TableIdentifier) -> str: @@ -161,11 +166,16 @@ def _timed_transform( split_id: str, session: SessionContext, apply_fn: Callable[[SessionContext, RecordBatch], Iterator[RecordBatch]], + attributes: Mapping[str, str], ) -> Iterator[RecordBatch]: - """Apply a transform to each batch, logging the wall-clock time of each.""" + """Apply a transform to each batch, logging and recording the wall-clock time of each.""" for idx, batch in enumerate(batches): - with log_duration(logger, "transform_batch %s [%d]", split_id, idx): - transformed = list(apply_fn(session, batch)) + transform_start = time.monotonic() + try: + with log_duration(logger, "transform_batch %s [%d]", split_id, idx): + transformed = list(apply_fn(session, batch)) + finally: + _transform_duration.record(time.monotonic() - transform_start, attributes) yield from transformed @@ -244,7 +254,7 @@ def __iter__(self) -> Iterator[RecordBatch]: if first is None: return session = _create_transform_session(self._scan_context.table_id, self._udf_registry, self._batch_size) - yield from _timed_transform(chain([first], timed), split_id, session, self._apply_transform) + yield from _timed_transform(chain([first], timed), split_id, session, self._apply_transform, attributes) except BaseException: _split_errors.add(1, attributes) raise diff --git a/integrations/python/dataloader/tests/test_metrics.py b/integrations/python/dataloader/tests/test_metrics.py index 6ecc701de..6f074f127 100644 --- a/integrations/python/dataloader/tests/test_metrics.py +++ b/integrations/python/dataloader/tests/test_metrics.py @@ -27,10 +27,12 @@ from openhouse.dataloader import DataLoaderContext, OpenHouseDataLoader from openhouse.dataloader._table_scan_context import TableScanContext from openhouse.dataloader.data_loader import ( - _load_table_attempts, _load_table_duration, - _plan_files_attempts, + _load_table_failure, + _load_table_success, _plan_files_duration, + _plan_files_failure, + _plan_files_success, _retry, ) from openhouse.dataloader.data_loader_split import DataLoaderSplit @@ -141,32 +143,35 @@ def _attrs(point) -> dict: return dict(point.attributes) -# --- _retry attempts / duration --- +# --- _retry success / failure / duration --- -def test_retry_emits_one_attempt_and_one_duration_on_success(metrics_reader): +def test_retry_emits_success_and_duration_on_first_try(metrics_reader): attrs = {"OpenHouse.Database": "db", "OpenHouse.Table": "tbl"} result = _retry( lambda: "ok", label="load_table db.tbl", max_attempts=3, duration_histogram=_load_table_duration, - attempts_counter=_load_table_attempts, + success_counter=_load_table_success, + failure_counter=_load_table_failure, attributes=attrs, ) assert result == "ok" - attempts = _data_points(metrics_reader, "openhouse.dataloader.loadtableattempts") - assert len(attempts) == 1 - assert _attrs(attempts[0]) == attrs - assert attempts[0].value == 1 + successes = _data_points(metrics_reader, "openhouse.dataloader.loadtablesuccess") + assert len(successes) == 1 + assert _attrs(successes[0]) == attrs + assert successes[0].value == 1 + + assert _data_points(metrics_reader, "openhouse.dataloader.loadtablefailure") == [] durations = _data_points(metrics_reader, "openhouse.dataloader.loadtabletime") assert len(durations) == 1 assert _attrs(durations[0]) == attrs -def test_retry_counts_each_attempt_on_transient_then_success(metrics_reader): +def test_retry_emits_single_success_after_transient_retry(metrics_reader): attrs = {"OpenHouse.Database": "db", "OpenHouse.Table": "tbl", "Tenant": "t1"} calls = {"n": 0} @@ -181,22 +186,25 @@ def fn(): label="plan_files db.tbl", max_attempts=3, duration_histogram=_plan_files_duration, - attempts_counter=_plan_files_attempts, + success_counter=_plan_files_success, + failure_counter=_plan_files_failure, attributes=attrs, ) assert result == "ok" assert calls["n"] == 2 - attempts = _data_points(metrics_reader, "openhouse.dataloader.planfilesattempts") - assert len(attempts) == 1 - assert attempts[0].value == 2 - assert _attrs(attempts[0])["Tenant"] == "t1" + successes = _data_points(metrics_reader, "openhouse.dataloader.planfilessuccess") + assert len(successes) == 1 + assert successes[0].value == 1 + assert _attrs(successes[0])["Tenant"] == "t1" + + assert _data_points(metrics_reader, "openhouse.dataloader.planfilesfailure") == [] durations = _data_points(metrics_reader, "openhouse.dataloader.planfilestime") assert len(durations) == 1 -def test_retry_permanent_failure_still_records_duration(metrics_reader): +def test_retry_emits_failure_and_duration_on_permanent_failure(metrics_reader): attrs = {"OpenHouse.Database": "db", "OpenHouse.Table": "tbl"} class _NonTransient(Exception): @@ -211,13 +219,16 @@ def fn(): label="load_table", max_attempts=3, duration_histogram=_load_table_duration, - attempts_counter=_load_table_attempts, + success_counter=_load_table_success, + failure_counter=_load_table_failure, attributes=attrs, ) - attempts = _data_points(metrics_reader, "openhouse.dataloader.loadtableattempts") - assert len(attempts) == 1 - assert attempts[0].value == 1 + failures = _data_points(metrics_reader, "openhouse.dataloader.loadtablefailure") + assert len(failures) == 1 + assert failures[0].value == 1 + + assert _data_points(metrics_reader, "openhouse.dataloader.loadtablesuccess") == [] durations = _data_points(metrics_reader, "openhouse.dataloader.loadtabletime") assert len(durations) == 1 @@ -229,7 +240,11 @@ def fn(): _SPLIT_TABLE_ID = TableIdentifier("db", "tbl") -def _make_split(tmp_path, metric_attributes: dict | None = None) -> DataLoaderSplit: +def _make_split( + tmp_path, + metric_attributes: dict | None = None, + transform_sql: str | None = None, +) -> DataLoaderSplit: file_path = str(tmp_path / "data.parquet") table = pa.table({"id": pa.array([1, 2, 3], type=pa.int64())}) fields = [field.with_metadata({b"PARQUET:field_id": str(i + 1).encode()}) for i, field in enumerate(table.schema)] @@ -256,7 +271,7 @@ def _make_split(tmp_path, metric_attributes: dict | None = None) -> DataLoaderSp ) data_file._spec_id = 0 task = FileScanTask(data_file=data_file) - return DataLoaderSplit(file_scan_tasks=[task], scan_context=scan_context) + return DataLoaderSplit(file_scan_tasks=[task], scan_context=scan_context, transform_sql=transform_sql) def test_split_emits_per_split_and_per_batch_metrics(tmp_path, metrics_reader): @@ -328,6 +343,28 @@ def _gen(): assert len(split_duration) == 1 +def test_split_with_transform_emits_transform_time(tmp_path, metrics_reader): + expected_attrs = {**_BASE_ATTRS, "Tenant": "t1"} + split = _make_split( + tmp_path, + metric_attributes=expected_attrs, + transform_sql='SELECT id FROM "db"."tbl"', + ) + list(split) + + transform_times = _data_points(metrics_reader, "openhouse.dataloader.transformtime") + assert len(transform_times) == 1 + assert _attrs(transform_times[0]) == expected_attrs + assert transform_times[0].sum > 0 + + +def test_split_without_transform_does_not_emit_transform_time(tmp_path, metrics_reader): + split = _make_split(tmp_path) + list(split) + + assert _data_points(metrics_reader, "openhouse.dataloader.transformtime") == [] + + # --- TableScanContext.metric_attributes ---