Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/copilot_usage/docs/implementation.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,17 @@ for idx, sd in fp.all_shutdowns:

### Model metrics merging

When two shutdowns reference the **same model**, their `ModelMetrics` are summed field-by-field. Accumulation is done **in-place** using `add_to_model_metrics()` and `copy_model_metrics()` (in `models.py`):
When two shutdowns reference the **same model**, their `ModelMetrics` are summed field-by-field. Because `ModelMetrics` is a frozen dataclass, accumulation returns a **new** instance via `add_to_model_metrics()`:

```python
for model_name, mm in sd.modelMetrics.items():
if model_name in merged_metrics:
add_to_model_metrics(merged_metrics[model_name], mm)
merged_metrics[model_name] = add_to_model_metrics(merged_metrics[model_name], mm)
else:
merged_metrics[model_name] = copy_model_metrics(mm)
```

Each model is copied exactly once (on first encounter) and accumulated in-place thereafter, yielding O(M) `copy_model_metrics` calls regardless of the number of shutdown cycles K. When two shutdowns reference **different models**, separate entries are kept in the result dict.
When two shutdowns reference **different models**, separate entries are kept in the result dict.

### Model resolution for shutdowns

Expand Down
58 changes: 31 additions & 27 deletions src/copilot_usage/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import builtins
import dataclasses
import math
from datetime import UTC, datetime
from enum import StrEnum
Expand Down Expand Up @@ -103,7 +104,8 @@ class SessionContext(BaseModel):
cwd: str | None = None


class TokenUsage(BaseModel):
@dataclasses.dataclass(frozen=True, slots=True)
class TokenUsage:
"""Token usage breakdown for a single model."""

inputTokens: int = 0
Expand All @@ -112,46 +114,48 @@ class TokenUsage(BaseModel):
cacheWriteTokens: int = 0


class RequestMetrics(BaseModel):
@dataclasses.dataclass(frozen=True, slots=True)
class RequestMetrics:
"""Request count and cost for a single model."""

count: int = 0
cost: int = 0


class ModelMetrics(BaseModel):
@dataclasses.dataclass(frozen=True, slots=True)
class ModelMetrics:
"""Combined request + usage metrics for one model."""

requests: RequestMetrics = Field(default_factory=RequestMetrics)
usage: TokenUsage = Field(default_factory=TokenUsage)
requests: RequestMetrics = dataclasses.field(default_factory=RequestMetrics)
usage: TokenUsage = dataclasses.field(default_factory=TokenUsage)
Comment on lines 107 to +130
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TokenUsage/RequestMetrics/ModelMetrics are no longer Pydantic models, so they no longer have .model_dump()/.model_fields/.model_copy(). There is still at least one usage of ModelMetrics.model_dump() in the test suite (tests/copilot_usage/test_report.py:5723), which will now fail. Please update those remaining call sites to use dataclasses.asdict() (or another equivalent serialization helper) instead of Pydantic APIs.

Copilot uses AI. Check for mistakes.


def add_to_model_metrics(target: ModelMetrics, source: ModelMetrics) -> None:
"""Add *source* fields into *target* in-place."""
target.requests.count += source.requests.count
target.requests.cost += source.requests.cost
target.usage.inputTokens += source.usage.inputTokens
target.usage.outputTokens += source.usage.outputTokens
target.usage.cacheReadTokens += source.usage.cacheReadTokens
target.usage.cacheWriteTokens += source.usage.cacheWriteTokens
def add_to_model_metrics(target: ModelMetrics, source: ModelMetrics) -> ModelMetrics:
"""Return a new ``ModelMetrics`` with *source* fields added to *target*."""
return ModelMetrics(
requests=RequestMetrics(
count=target.requests.count + source.requests.count,
cost=target.requests.cost + source.requests.cost,
),
usage=TokenUsage(
inputTokens=target.usage.inputTokens + source.usage.inputTokens,
outputTokens=target.usage.outputTokens + source.usage.outputTokens,
cacheReadTokens=target.usage.cacheReadTokens + source.usage.cacheReadTokens,
cacheWriteTokens=target.usage.cacheWriteTokens
+ source.usage.cacheWriteTokens,
),
)


def copy_model_metrics(mm: ModelMetrics) -> ModelMetrics:
"""Create an independent copy of *mm* via explicit construction.
"""Create a shallow copy of *mm*.

Builds new ``ModelMetrics``/``RequestMetrics``/``TokenUsage`` instances
instead of using Pydantic's ``model_copy(deep=True)`` which delegates to
``copy.deepcopy`` and is significantly slower for simple int fields.
With frozen dataclasses every instance is already immutable, so a
shallow ``dataclasses.replace`` (no field overrides) is sufficient —
the nested ``RequestMetrics`` and ``TokenUsage`` references are shared,
which is safe because they are themselves frozen.
"""
Comment on lines 150 to 157
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy_model_metrics() now returns dataclasses.replace(mm), which creates a new ModelMetrics instance but intentionally shares the nested RequestMetrics/TokenUsage references. Since everything is frozen this is safe, but the docstring still says “independent copy”, which reads like a deep copy. Consider rewording to “shallow copy” (or explicitly mention nested objects are shared) to avoid misleading future callers.

Copilot uses AI. Check for mistakes.
return ModelMetrics(
requests=RequestMetrics(count=mm.requests.count, cost=mm.requests.cost),
usage=TokenUsage(
inputTokens=mm.usage.inputTokens,
outputTokens=mm.usage.outputTokens,
cacheReadTokens=mm.usage.cacheReadTokens,
cacheWriteTokens=mm.usage.cacheWriteTokens,
),
)
return dataclasses.replace(mm)


def merge_model_metrics(
Expand All @@ -162,7 +166,7 @@ def merge_model_metrics(
result = {name: copy_model_metrics(mm) for name, mm in base.items()}
for name, mm in additional.items():
if name in result:
add_to_model_metrics(result[name], mm)
result[name] = add_to_model_metrics(result[name], mm)
else:
result[name] = copy_model_metrics(mm)
return result
Expand Down
4 changes: 3 additions & 1 deletion src/copilot_usage/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,9 @@ def _build_completed_summary(
all_files_modified.update(sd.codeChanges.filesModified)
for model_name, mm in sd.modelMetrics.items():
if model_name in merged_metrics:
add_to_model_metrics(merged_metrics[model_name], mm)
merged_metrics[model_name] = add_to_model_metrics(
merged_metrics[model_name], mm
)
else:
merged_metrics[model_name] = copy_model_metrics(mm)
shutdown_cycles.append(
Expand Down
7 changes: 4 additions & 3 deletions src/copilot_usage/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,15 @@ def _aggregate_model_metrics(
) -> dict[str, ModelMetrics]:
"""Merge model metrics across all sessions into a single dict.

Accumulates in-place so each unique model name is copied at most once,
reducing copy overhead from O(n × m) to O(m).
The returned dict contains one final accumulated ``ModelMetrics``
value per unique model name. Accumulation uses immutable addition,
so intermediate merged instances may be created while aggregating.
"""
result: dict[str, ModelMetrics] = {}
for s in sessions:
for model_name, mm in s.model_metrics.items():
if model_name in result:
add_to_model_metrics(result[model_name], mm)
result[model_name] = add_to_model_metrics(result[model_name], mm)
else:
result[model_name] = copy_model_metrics(mm)
return result
Expand Down
127 changes: 46 additions & 81 deletions tests/copilot_usage/test_models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for copilot_usage.models — Pydantic v2 event parsing."""
"""Tests for copilot_usage.models — event parsing and metric helpers."""

import dataclasses
import json
from datetime import UTC, datetime
from unittest.mock import patch
Expand Down Expand Up @@ -463,18 +464,17 @@ class TestAddToModelMetrics:
"""Unit tests for the add_to_model_metrics helper."""

def test_all_fields_accumulated(self) -> None:
# Assign distinct values per field so mis-mapped fields will fail the test.
target_requests_kwargs = {
name: idx + 1 for idx, name in enumerate(RequestMetrics.model_fields)
}
"""Every field from both operands is summed in the result."""
req_fields = [f.name for f in dataclasses.fields(RequestMetrics)]
usage_fields = [f.name for f in dataclasses.fields(TokenUsage)]

target_requests_kwargs = {name: idx + 1 for idx, name in enumerate(req_fields)}
source_requests_kwargs = {
name: (idx + 1) * 10 for idx, name in enumerate(RequestMetrics.model_fields)
}
target_usage_kwargs = {
name: idx + 100 for idx, name in enumerate(TokenUsage.model_fields)
name: (idx + 1) * 10 for idx, name in enumerate(req_fields)
}
target_usage_kwargs = {name: idx + 100 for idx, name in enumerate(usage_fields)}
source_usage_kwargs = {
name: (idx + 1) * 1000 for idx, name in enumerate(TokenUsage.model_fields)
name: (idx + 1) * 1000 for idx, name in enumerate(usage_fields)
}

target = ModelMetrics(
Expand All @@ -486,21 +486,22 @@ def test_all_fields_accumulated(self) -> None:
usage=TokenUsage(**source_usage_kwargs),
)

add_to_model_metrics(target, source)
result = add_to_model_metrics(target, source)

expected_requests = {
name: target_requests_kwargs[name] + source_requests_kwargs[name]
for name in RequestMetrics.model_fields
for name in req_fields
}
expected_usage = {
name: target_usage_kwargs[name] + source_usage_kwargs[name]
for name in TokenUsage.model_fields
for name in usage_fields
}

assert target.requests.model_dump() == expected_requests
assert target.usage.model_dump() == expected_usage
assert dataclasses.asdict(result.requests) == expected_requests
assert dataclasses.asdict(result.usage) == expected_usage

def test_zero_source_is_identity(self) -> None:
"""Adding a default (zero) source returns an equal value."""
target = ModelMetrics(
requests=RequestMetrics(count=5, cost=3),
usage=TokenUsage(
Expand All @@ -510,30 +511,32 @@ def test_zero_source_is_identity(self) -> None:
cacheWriteTokens=5,
),
)
before = target.model_dump()
add_to_model_metrics(target, ModelMetrics())
assert target.model_dump() == before
result = add_to_model_metrics(target, ModelMetrics())
assert dataclasses.asdict(result) == dataclasses.asdict(target)

def test_source_not_mutated(self) -> None:
def test_neither_operand_mutated(self) -> None:
"""add_to_model_metrics must not modify *target* or *source*."""
target = ModelMetrics(requests=RequestMetrics(count=1))
source = ModelMetrics(requests=RequestMetrics(count=5))
source_before = source.model_dump()
target_before = dataclasses.asdict(target)
source_before = dataclasses.asdict(source)
add_to_model_metrics(target, source)
assert source.model_dump() == source_before # source unchanged
assert dataclasses.asdict(target) == target_before
assert dataclasses.asdict(source) == source_before

def test_accumulates_incrementally(self) -> None:
"""Multiple sequential calls accumulate correctly."""
target = ModelMetrics()
acc = ModelMetrics()
for _ in range(3):
add_to_model_metrics(
target,
acc = add_to_model_metrics(
acc,
ModelMetrics(
requests=RequestMetrics(count=2),
usage=TokenUsage(outputTokens=10),
),
)
assert target.requests.count == 6
assert target.usage.outputTokens == 30
assert acc.requests.count == 6
assert acc.usage.outputTokens == 30


# ---------------------------------------------------------------------------
Expand All @@ -546,63 +549,34 @@ class TestCopyModelMetrics:

def test_returns_equal_value(self) -> None:
"""All fields are faithfully copied, including any future additions."""
# Build kwargs with non-default values for every field so newly-added
# fields are automatically covered by the model_dump() comparison.
req_kwargs = {
name: idx + 1 for idx, name in enumerate(RequestMetrics.model_fields)
}
usage_kwargs = {
name: (idx + 1) * 100 for idx, name in enumerate(TokenUsage.model_fields)
}
req_fields = [f.name for f in dataclasses.fields(RequestMetrics)]
usage_fields = [f.name for f in dataclasses.fields(TokenUsage)]
req_kwargs = {name: idx + 1 for idx, name in enumerate(req_fields)}
usage_kwargs = {name: (idx + 1) * 100 for idx, name in enumerate(usage_fields)}
mm = ModelMetrics(
requests=RequestMetrics(**req_kwargs),
usage=TokenUsage(**usage_kwargs),
)
result = copy_model_metrics(mm)
assert result.model_dump() == mm.model_dump()
assert dataclasses.asdict(result) == dataclasses.asdict(mm)

def test_requests_copy_is_independent(self) -> None:
"""Mutating the copy's requests must not affect the original."""
def test_copy_is_distinct_object(self) -> None:
"""The copy must be a new object, not the same reference."""
mm = ModelMetrics(requests=RequestMetrics(count=5, cost=3))
copy = copy_model_metrics(mm)
copy.requests.count = 999
copy.requests.cost = 888
assert mm.requests.count == 5
assert mm.requests.cost == 3
assert copy is not mm

def test_usage_copy_is_independent(self) -> None:
"""Mutating the copy's usage must not affect the original."""
mm = ModelMetrics(
usage=TokenUsage(
inputTokens=100,
outputTokens=50,
cacheReadTokens=20,
cacheWriteTokens=10,
),
)
copy = copy_model_metrics(mm)
copy.usage.inputTokens = 999
copy.usage.outputTokens = 888
copy.usage.cacheReadTokens = 777
copy.usage.cacheWriteTokens = 666
assert mm.usage.inputTokens == 100
assert mm.usage.outputTokens == 50
assert mm.usage.cacheReadTokens == 20
assert mm.usage.cacheWriteTokens == 10

def test_original_is_independent_of_copy_mutations(self) -> None:
"""Mutating the original must not affect the copy."""
def test_frozen_prevents_mutation(self) -> None:
"""Frozen dataclasses reject attribute assignment at runtime."""
mm = ModelMetrics(requests=RequestMetrics(count=5))
copy = copy_model_metrics(mm)
mm.requests.count = 999
assert copy.requests.count == 5
with pytest.raises(dataclasses.FrozenInstanceError):
mm.requests = RequestMetrics(count=999) # type: ignore[misc]

def test_defaults_copied_correctly(self) -> None:
"""Default (zero) values are preserved in the copy for all fields."""
mm = ModelMetrics()
copy = copy_model_metrics(mm)
# Ensure *all* default values (including any future fields) are preserved
assert copy.model_dump() == mm.model_dump()
assert dataclasses.asdict(copy) == dataclasses.asdict(mm)


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -728,19 +702,10 @@ def test_no_deep_copy_regression(self) -> None:
)
}

with (
patch.object(
ModelMetrics,
"model_copy",
side_effect=AssertionError(
"merge_model_metrics must not call model_copy"
),
),
patch(
"copy.deepcopy",
side_effect=AssertionError(
"merge_model_metrics must not call copy.deepcopy"
),
with patch(
"copy.deepcopy",
side_effect=AssertionError(
"merge_model_metrics must not call copy.deepcopy"
),
):
result = merge_model_metrics(base, additional)
Expand Down
3 changes: 2 additions & 1 deletion tests/copilot_usage/test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# pyright: reportPrivateUsage=false

import dataclasses
import json
import re
import warnings
Expand Down Expand Up @@ -5720,7 +5721,7 @@ def test_identical_field_values(self) -> None:
m = merged[model_name]
a = aggregated[model_name]
# Compare full ModelMetrics contents to catch any future field additions
assert m.model_dump() == a.model_dump()
assert dataclasses.asdict(m) == dataclasses.asdict(a)


# ---------------------------------------------------------------------------
Expand Down
Loading