Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/benchmarks/density-v0.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,42 @@ process; with the stub workload it does, comfortably. The realistic
per-session footprint validation (and the ~50-100 sessions per 4 GB
working number) is deferred to the §8.4 real-LiveKit integration tests
once the dev-server harness lands in Phase 2.

### 2026-05-05 — local: macOS Darwin 24.3.0 / Python 3.13.5 / arm64 (10 cores, 16 GB)

Re-run after `tests/benchmarks/density.py` was extended with scheduler-
latency sampling (10 ms cadence, median + p99 + max) and a hardware
fingerprint dict (commit `32bde3a`). Three back-to-back runs at the
§7 gate:

| Run | Sessions | Successes | Failures | Baseline RSS | Peak RSS | Delta RSS | Elapsed | Sched p99 | Sched max | Within budget |
|-----|----------|-----------|----------|--------------|----------|-----------|---------|-----------|-----------|----------------|
| A | 50 | 50 | 0 | 116.1 MB | 367.0 MB | 251.0 MB | 1.06 s | 6.17 ms | 50.27 ms | ✓ |
| B | 50 | 50 | 0 | 116.5 MB | 354.0 MB | 237.5 MB | 1.08 s | 5.64 ms | 63.66 ms | ✓ |
| C | 50 | 50 | 0 | 116.5 MB | 367.4 MB | 251.0 MB | 1.02 s | 3.19 ms | 3.36 ms | ✓ |

Hardware fingerprint (identical across runs): `arm` / `10 cores` /
`16 GB total` / `Darwin 24.3.0` / `Python 3.13.5`.

Notes:

- Peak-RSS numbers track the 2026-05-03 row at the same N=50 config
(~367 MB), confirming no regression from the benchmark instrumentation
additions.
- Scheduler median latency holds in the 1.06-1.10 ms band — well below
the 10 ms sampling interval, so the loop is not starved at this load.
- Scheduler p99 sits at 3-6 ms; the higher values (50-64 ms) on runs A
and B come from a single-sample tail spike each (the `max` column),
most likely a transient OS scheduling event on a busy laptop. Run C,
with all background processes quiet, lands at p99 = 3.19 ms / max =
3.36 ms — the clean baseline. The p99 is the load-bearing number for
worker stability; the tail max is an environmental artefact.
- Walltime stays in the same 1.0-1.1 s band (≈ 1 s sleep + setup).

### Verdict (2026-05-05)

**Phase 1 §7 gate continues to pass.** All three runs hit 50/50
sessions, 0 failures, peak RSS ≤ 367 MB. The new scheduler-latency
metric provides additional Phase 2 capacity-planning input: a healthy
loop runs at ~1 ms median / ~3 ms p99 under a 50-session stub
workload.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ branch = true
dev = [
"mypy>=1.19.1",
"pre-commit>=4.5.1",
"psutil>=7",
"pytest>=9.0.2",
"pytest-asyncio>=1.2.0",
"pytest-cov>=7.0.0",
Expand Down
113 changes: 111 additions & 2 deletions tests/benchmarks/density.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@
import contextlib
import json
import multiprocessing as mp
import os
import platform
import statistics
import sys
import time
from dataclasses import asdict, dataclass
from dataclasses import asdict, dataclass, field
from types import SimpleNamespace
from typing import Any

import psutil
from livekit.agents import JobExecutorType

from openrtc.execution.coroutine import CoroutinePool
Expand All @@ -48,6 +52,24 @@

_RSS_SAMPLE_INTERVAL_SECONDS = 0.05
_SESSION_HOLD_SECONDS = 1.0
_LATENCY_SAMPLE_INTERVAL_SECONDS = 0.01


@dataclass
class SchedulerLatency:
samples: int
median: float
p99: float
max: float # noqa: A003 — keep the JSON key stable for downstream consumers


@dataclass
class Hardware:
cpu_model: str
cpu_count: int | None
total_ram_gb: float
kernel: str
python_version: str


@dataclass
Expand All @@ -61,7 +83,22 @@ class DensityResult:
delta_rss_mb: float | None
elapsed_seconds: float
rss_within_budget: bool
notes: list[str]
scheduler_latency_ms: SchedulerLatency | None = None
hardware: Hardware | None = None
notes: list[str] = field(default_factory=list)


def _hardware_fingerprint() -> Hardware:
"""Capture the host signature so benchmark results stay reproducible."""
uname = platform.uname()
total_ram_bytes = psutil.virtual_memory().total
return Hardware(
cpu_model=platform.processor() or uname.machine,
cpu_count=os.cpu_count(),
total_ram_gb=round(total_ram_bytes / (1024**3), 2),
kernel=f"{uname.system} {uname.release}",
python_version=platform.python_version(),
)


def _stub_running_job_info(job_id: str) -> Any:
Expand Down Expand Up @@ -129,6 +166,38 @@ async def _sample_rss(stop: asyncio.Event, samples: list[int]) -> None:
await asyncio.wait_for(stop.wait(), timeout=_RSS_SAMPLE_INTERVAL_SECONDS)


async def _sample_loop_latency(stop: asyncio.Event, samples: list[float]) -> None:
"""Background task: measure scheduler wakeup latency in milliseconds.

A small ``asyncio.sleep`` is requested every interval; the delta between
the requested wakeup time and the actual return time is the loop's
scheduling latency. Under heavy task pressure this rises and signals
starvation of the event loop.
"""
while not stop.is_set():
target = time.monotonic() + _LATENCY_SAMPLE_INTERVAL_SECONDS
with contextlib.suppress(TimeoutError):
await asyncio.wait_for(
stop.wait(), timeout=_LATENCY_SAMPLE_INTERVAL_SECONDS
)
actual = time.monotonic()
samples.append(max(0.0, (actual - target) * 1000.0))


def _percentile(values: list[float], pct: float) -> float:
"""Linear-interpolation percentile (no numpy dependency)."""
if not values:
return 0.0
ordered = sorted(values)
if len(ordered) == 1:
return ordered[0]
rank = (pct / 100.0) * (len(ordered) - 1)
lower = int(rank)
upper = min(lower + 1, len(ordered) - 1)
weight = rank - lower
return ordered[lower] * (1 - weight) + ordered[upper] * weight


async def run_density_benchmark(
*,
sessions: int,
Expand All @@ -144,7 +213,11 @@ async def run_density_benchmark(
pool = _build_pool(max_concurrent_sessions=sessions)
stop_event = asyncio.Event()
samples: list[int] = []
latency_samples: list[float] = []
sampler = asyncio.create_task(_sample_rss(stop_event, samples))
latency_sampler = asyncio.create_task(
_sample_loop_latency(stop_event, latency_samples)
)

start = time.monotonic()
try:
Expand All @@ -162,6 +235,7 @@ async def run_density_benchmark(
elapsed = time.monotonic() - start
stop_event.set()
await sampler
await latency_sampler

bookkeeping = pool._density_results # type: ignore[attr-defined]
successes = len(bookkeeping["successes"])
Expand All @@ -178,6 +252,18 @@ async def run_density_benchmark(

rss_within_budget = peak_rss_mb is None or peak_rss_mb <= rss_budget_mb

scheduler_latency_ms: SchedulerLatency | None
if latency_samples:
scheduler_latency_ms = SchedulerLatency(
samples=len(latency_samples),
median=round(statistics.median(latency_samples), 3),
p99=round(_percentile(latency_samples, 99.0), 3),
max=round(max(latency_samples), 3),
)
else:
scheduler_latency_ms = None
notes.append("scheduler latency unavailable: no samples collected.")

return DensityResult(
sessions=sessions,
successes=successes,
Expand All @@ -188,6 +274,8 @@ async def run_density_benchmark(
delta_rss_mb=delta_rss_mb,
elapsed_seconds=elapsed,
rss_within_budget=rss_within_budget,
scheduler_latency_ms=scheduler_latency_ms,
hardware=_hardware_fingerprint(),
notes=notes,
)

Expand All @@ -207,6 +295,27 @@ def _mb(value: float | None) -> str:
f"within budget: {result.rss_within_budget}",
f"elapsed: {result.elapsed_seconds:.2f} s",
]
if result.scheduler_latency_ms is not None:
lines.extend(
[
"scheduler latency (ms):",
f" samples: {result.scheduler_latency_ms.samples}",
f" median: {result.scheduler_latency_ms.median:.3f}",
f" p99: {result.scheduler_latency_ms.p99:.3f}",
f" max: {result.scheduler_latency_ms.max:.3f}",
]
)
if result.hardware is not None:
lines.extend(
[
"hardware:",
f" cpu_model: {result.hardware.cpu_model}",
f" cpu_count: {result.hardware.cpu_count}",
f" total_ram_gb: {result.hardware.total_ram_gb}",
f" kernel: {result.hardware.kernel}",
f" python_version: {result.hardware.python_version}",
]
)
if result.notes:
lines.append("notes:")
lines.extend(f" - {note}" for note in result.notes)
Expand Down
2 changes: 2 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading