Change MetricsAggregatorService to publisher to support live metrics without shmem#306
Change MetricsAggregatorService to publisher to support live metrics without shmem#306nv-alicheng wants to merge 6 commits intomainfrom
Conversation
Replace EventRecord-specific publisher/subscriber classes with generic ZmqMessagePublisher[T] / ZmqMessageSubscriber[T] parameterized by a MessageCodec[T] Protocol. EventRecordCodec preserves existing wire format and decode-error wrapping behavior. Sets up the generic transport that the upcoming MetricsSnapshot publisher will reuse. - protocol.py: drop EventRecordPublisher/Subscriber ABCs; add MessageCodec, MessagePublisher[T], MessageSubscriber[T]. - pubsub.py: rewrite as ZmqMessagePublisher[T]/ZmqMessageSubscriber[T]; expose sndhwm/linger/conflate so future callers (e.g. live snapshots) can choose drop-old vs. delivery-guarantee semantics. - record.py: add EventRecordCodec next to encode/decode helpers. - Update EventPublisherService, EventLoggerService, MetricsAggregatorService and tests to use the generic classes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per Gemini review on PR #300: catching only msgspec.DecodeError in MessageSubscriber._on_readable bakes the codec implementation into the supposedly-generic base class. A future codec backed by json, pickle, etc. raises different exception types and would bypass on_decode_error, crashing the reader. - protocol.py: widen the catch back to Exception so the base class makes no assumption about which decoder library a codec uses; drop the now- unused msgspec import. - record.py: tighten EventRecordCodec.on_decode_error to wrap only msgspec.DecodeError and re-raise other exceptions. Preserves the previous behavior parity (only malformed-payload errors become ErrorEventType.GENERIC records; programmer bugs in the decode path still surface). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Introduces the three primitives that the upcoming pub/sub metrics path will compose on top of: - snapshot.py: MetricsSnapshot wire struct (msgspec, tagged union of CounterStat | SeriesStat) plus SessionState enum (LIVE / DRAINING / COMPLETE) and msgpack codec. - registry.py: MetricsRegistry holding CounterSamplers and SeriesSamplers. Series samplers carry an HDR Histogram for cheap live percentiles, an array.array of raw values for exact-final computation, and exact rollup primitives. Histogram bucket edges are log-spaced over the observed [min, max] per snapshot, so they auto-zoom to data instead of wasting buckets on empty range. - New unit tests cover the wire codec round-trip, sampler hot path, and registry registration/collision behavior. Adds hdrhistogram==0.10.3 as a runtime dependency. Wiring of these primitives into the aggregator and removal of the old KVStore path follow in subsequent commits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- publisher.py: MetricsPublisher owns the periodic tick task that publishes live MetricsSnapshots over IPC pub/sub at refresh_hz, plus publish_final() which is awaited by the aggregator on ENDED. Final delivery is dual-path: * pub/sub publish (best-effort, telemetry knobs sndhwm=4, linger=10s) * disk fallback (atomic: tmp + fsync(file) + rename + fsync(parent dir)) Both paths are independently wrapped in try/except — neither failure suppresses the other. publish_final is async and awaits tick-task cancellation before publishing COMPLETE so a late LIVE/DRAINING tick can never land after COMPLETE on the wire. - subscriber.py: MetricsSnapshotSubscriber tracks ``latest`` and the ``COMPLETE``-state snapshot. Defaults to conflate=True (TUI / report consumer) but parametrized for any consumer that needs every tick. - New unit tests cover tick-task lifecycle, atomic disk fallback, independence of pub/sub vs disk failure paths, and the regression that publish_final must await tick-task cancellation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the mmap-backed BasicKVStore with the registry/publisher path introduced in the previous two commits. Aggregator changes: - MetricsAggregatorService now constructs a MetricsRegistry and MetricsPublisher on entry; trigger callbacks call registry.record / registry.increment instead of kv_store.update. - Tracks SessionState (LIVE → DRAINING on ENDED → COMPLETE on publish_final). The publisher tick task captures (state, n_pending_tasks) per tick via a callback; consumers detect drain timeout as state == COMPLETE and n_pending_tasks > 0. - Adds TRACKED_SAMPLES_FAILED counter, incremented on ERROR events whose tracked row still exists at processing time. Correctness depends on the load_generator emitting ERROR before COMPLETE; the matching test asserts that order. - ENDED handler awaits drain_tasks (30s timeout), publish_final, and closes the publisher (linger=10s drains pending pub/sub frames). Report changes: - Replaces from_kv_reader with from_snapshot (pure function on a MetricsSnapshot). complete is derived from state == COMPLETE and n_pending_tasks == 0. Display warns when not complete. Main-process changes (commands/benchmark/execute.py): - Spawns a MetricsSnapshotSubscriber on the main loop. Triple-redundant report sourcing: pub/sub COMPLETE → disk fallback → latest live. - Removes _setup_kv_reader, ARM tmpfs branching, and mmap salvage in _salvage_tmpfs (events.jsonl salvage is preserved). - Awaits subscriber.wait_for_complete(timeout=2.0) after launcher exit so the loop can dispatch the COMPLETE frame before deciding the pub/sub path missed. Removed: - async_utils/services/metrics_aggregator/kv_store.py - async_utils/services/metrics_aggregator/fs_check.py Tests: - Deletes test_kv_store.py. - Marks test_aggregator.py / test_aggregator_e2e.py / test_metrics_table.py / test_report_builder.py / conftest.py with module-level skip + a TODO referencing the design doc; rewriting these on the new fixtures is a tracked follow-up. - Adds test_aggregator_error_handler.py covering the TRACKED_SAMPLES_FAILED increment path and the negative case where COMPLETE arrives before ERROR (documents the bug the ERROR/COMPLETE swap fixes). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Swaps the publish order in BenchmarkSession._handle_response so that a QueryResult carrying an error emits ErrorEventType.GENERIC first, then SampleEventType.COMPLETE. This is required for metrics-aggregator correctness: COMPLETE causes MetricsTable.set_field to remove the tracked row, so an ERROR observed afterward has no row to inspect and TRACKED_SAMPLES_FAILED would silently stay at 0. Emitting ERROR first keeps the row alive long enough for the aggregator's error handler to identify the failure as tracked. EventLoggerService and other event consumers treat the two event types independently, so order is invisible to them. The test_failed_query_published_as_error_event test now asserts the order explicitly so a future revert is caught immediately, and the aggregator-side regression is covered by test_aggregator_error_handler. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅ |
|
|
||
| def encode(self, item: T) -> tuple[bytes, bytes]: | ||
| """Return (topic, payload). topic must be exactly TOPIC_FRAME_SIZE bytes.""" | ||
| ... |
| def decode(self, payload: bytes) -> T: | ||
| """Decode payload back to T. May raise; the caller routes failures | ||
| through on_decode_error.""" | ||
| ... |
| def on_decode_error(self, payload: bytes, exc: Exception) -> T | None: | ||
| """Fallback for malformed payloads. Return a sentinel item or None | ||
| to drop the message.""" | ||
| ... |
There was a problem hiding this comment.
Code Review
This pull request refactors the metrics aggregation system to use a registry-based architecture with HDR histograms and a generalized pub/sub transport layer, replacing the legacy mmap-backed storage. The update introduces periodic snapshot publishing with disk fallback and updates reporting logic to consume these snapshots. Feedback suggests improving encapsulation by exposing in-flight task metrics through public properties and adopting a more numerically stable variance formula for high-precision latency calculations. Additionally, several legacy tests have been skipped pending migration to the new system.
| self._refresh_hz, | ||
| get_runtime_state=lambda: ( | ||
| self._session_state, | ||
| len(table._in_flight_tasks), |
There was a problem hiding this comment.
| # ENDED has been observed; transition to DRAINING so any tick | ||
| # that fires before publish_final reflects the new state. | ||
| self._session_state = SessionState.DRAINING | ||
| logger.info("Draining %d async tasks...", len(table._in_flight_tasks)) |
There was a problem hiding this comment.
Directly accessing the protected member _in_flight_tasks of MetricsTable breaks encapsulation. It is better to expose this information through a public property on the MetricsTable class.
| logger.info("Draining %d async tasks...", len(table._in_flight_tasks)) | |
| logger.info("Draining %d async tasks...", table.in_flight_tasks_count) |
| for t in list(table._in_flight_tasks): | ||
| if not t.done(): | ||
| t.cancel() |
| for t in list(table._in_flight_tasks): | ||
| if not t.done(): | ||
| t.cancel() | ||
| n_pending = len(table._in_flight_tasks) |
There was a problem hiding this comment.
| self._registry = registry | ||
| self._in_flight: dict[str, SampleRow] = {} | ||
| self._triggers: dict[str, list[EmitTrigger]] = {} | ||
| self._in_flight_tasks: set[asyncio.Task] = set() |
There was a problem hiding this comment.
To improve encapsulation and support the refactored MetricsAggregatorService, please expose the count of in-flight tasks and a method to cancel them.
self._in_flight_tasks: set[asyncio.Task] = set()
@property
def in_flight_tasks_count(self) -> int:
"""Number of async trigger tasks currently in flight."""
return len(self._in_flight_tasks)
def cancel_all_tasks(self) -> None:
"""Cancel all in-flight async trigger tasks."""
for task in self._in_flight_tasks:
if not task.done():
task.cancel()| var_num = stat.sum_sq - stat.total * stat.total / n | ||
| std_dev = math.sqrt(var_num / (n - 1)) if var_num > 0 else 0.0 |
There was a problem hiding this comment.
The naive variance formula sum_sq - total^2 / n is numerically unstable when total^2 / n is very close to sum_sq, which is common for high-precision latency metrics in nanoseconds. Since total and sum_sq are Python integers for latency series, using the integer-based formula (n * sum_sq - total^2) / (n * (n - 1)) provides exact precision before the final square root.
| var_num = stat.sum_sq - stat.total * stat.total / n | |
| std_dev = math.sqrt(var_num / (n - 1)) if var_num > 0 else 0.0 | |
| # Use a more numerically stable formula for variance when aggregates are large. | |
| # If using integers, (n * sum_sq - total^2) is exact. | |
| if isinstance(stat.total, int) and isinstance(stat.sum_sq, int): | |
| var_num = (n * stat.sum_sq - stat.total**2) | |
| std_dev = math.sqrt(max(0, var_num)) / math.sqrt(n * (n - 1)) | |
| else: | |
| var_num = stat.sum_sq - stat.total * stat.total / n | |
| std_dev = math.sqrt(max(0, var_num / (n - 1))) |
What does this PR do?
shmemimplementation of KVStore in MetricsAggregatorService causes issues on ARM. Several solutions exist:This PR implements (2).
Type of change
Related issues
Testing
Checklist