From 0c822a41891d4db087bfe08141ae75a15e11cd48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=BCseyin=20Karakaya?= Date: Sun, 21 Jun 2026 13:26:47 +0300 Subject: [PATCH 1/8] feat(battery): add integration result model --- pythrust/battery/__init__.py | 3 ++- pythrust/battery/state.py | 52 +++++++++++++++++++++++++++++++++++- tests/test_battery.py | 48 ++++++++++++++++++++++++++++++++- 3 files changed, 100 insertions(+), 3 deletions(-) diff --git a/pythrust/battery/__init__.py b/pythrust/battery/__init__.py index 663d1e9..bcd525e 100644 --- a/pythrust/battery/__init__.py +++ b/pythrust/battery/__init__.py @@ -2,9 +2,10 @@ from .fixed import FixedVoltageBattery from .rate_map import RateMapBattery -from .state import BatteryPoint, BatteryState +from .state import BatteryIntegrationResult, BatteryPoint, BatteryState __all__ = [ + "BatteryIntegrationResult", "BatteryPoint", "BatteryState", "FixedVoltageBattery", diff --git a/pythrust/battery/state.py b/pythrust/battery/state.py index 778b534..9fbff32 100644 --- a/pythrust/battery/state.py +++ b/pythrust/battery/state.py @@ -4,7 +4,7 @@ from dataclasses import dataclass import math -from typing import Optional +from typing import Optional, Sequence @dataclass(frozen=True) @@ -45,3 +45,53 @@ class BatteryPoint: resistance_ohm: float is_feasible: bool infeasible_reason: Optional[str] = None + + +@dataclass(frozen=True) +class BatteryIntegrationResult: + """Battery integration time-history and summary result.""" + + final_state: BatteryState + time_s: Sequence[float] + dod: Sequence[float] + voltage_v: Sequence[float] + current_a: Sequence[float] + c_rate: Sequence[float] + power_w: Sequence[float] + efficiency: Sequence[float] + delivered_energy_wh: float + consumed_charge_ah: float + is_feasible: bool + stop_reason: str + + def __post_init__(self) -> None: + histories = { + "time_s": _as_float_tuple(self.time_s), + "dod": _as_float_tuple(self.dod), + "voltage_v": _as_float_tuple(self.voltage_v), + "current_a": _as_float_tuple(self.current_a), + "c_rate": _as_float_tuple(self.c_rate), + "power_w": _as_float_tuple(self.power_w), + "efficiency": _as_float_tuple(self.efficiency), + } + lengths = {len(values) for values in histories.values()} + if len(lengths) != 1: + raise ValueError("integration histories must have the same length") + if not histories["time_s"]: + raise ValueError("integration histories must contain at least one sample") + if not math.isfinite(self.delivered_energy_wh): + raise ValueError("delivered_energy_wh must be finite") + if not math.isfinite(self.consumed_charge_ah): + raise ValueError("consumed_charge_ah must be finite") + if not self.stop_reason: + raise ValueError("stop_reason must not be empty") + + for name, values in histories.items(): + object.__setattr__(self, name, values) + + +def _as_float_tuple(values: Sequence[float]) -> tuple[float, ...]: + result = tuple(float(value) for value in values) + if any(not math.isfinite(value) for value in result): + raise ValueError("integration histories must contain finite values") + return result diff --git a/tests/test_battery.py b/tests/test_battery.py index 51f382c..15ab00c 100644 --- a/tests/test_battery.py +++ b/tests/test_battery.py @@ -4,7 +4,12 @@ import pytest -from pythrust.battery import BatteryState, FixedVoltageBattery, RateMapBattery +from pythrust.battery import ( + BatteryIntegrationResult, + BatteryState, + FixedVoltageBattery, + RateMapBattery, +) from pythrust.propulsion.models import BatterySpec @@ -54,6 +59,47 @@ def test_battery_state_rejects_non_finite_dod(dod): BatteryState.from_dod(dod) +def test_battery_integration_result_normalizes_histories(): + final_state = BatteryState(soc=0.9) + result = BatteryIntegrationResult( + final_state=final_state, + time_s=[0, 10], + dod=[0.0, 0.1], + voltage_v=[16.0, 15.8], + current_a=[8.0, 8.0], + c_rate=[1.0, 1.0], + power_w=[128.0, 126.4], + efficiency=[0.95, 0.94], + delivered_energy_wh=0.35, + consumed_charge_ah=0.022, + is_feasible=True, + stop_reason="duration_complete", + ) + + assert result.final_state is final_state + assert result.time_s == (0.0, 10.0) + assert result.dod == (0.0, 0.1) + assert result.stop_reason == "duration_complete" + + +def test_battery_integration_result_rejects_mismatched_histories(): + with pytest.raises(ValueError, match="same length"): + BatteryIntegrationResult( + final_state=BatteryState(soc=0.9), + time_s=[0.0, 10.0], + dod=[0.0], + voltage_v=[16.0, 15.8], + current_a=[8.0, 8.0], + c_rate=[1.0, 1.0], + power_w=[128.0, 126.4], + efficiency=[0.95, 0.94], + delivered_energy_wh=0.35, + consumed_charge_ah=0.022, + is_feasible=True, + stop_reason="duration_complete", + ) + + def test_rate_map_state_at_current(rate_map_battery): state = BatteryState(soc=0.5) point = rate_map_battery.state_at_current(state=state, current_a=8.0) From dfa2003bf1680bafa856f34c78e8f9c36b3f5204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=BCseyin=20Karakaya?= Date: Sun, 21 Jun 2026 13:41:18 +0300 Subject: [PATCH 2/8] feat(battery): add constant current integration --- pythrust/battery/rate_map.py | 213 ++++++++++++++++++++++++++++++++++- tests/test_battery.py | 116 +++++++++++++++++++ 2 files changed, 328 insertions(+), 1 deletion(-) diff --git a/pythrust/battery/rate_map.py b/pythrust/battery/rate_map.py index 09c2867..9599984 100644 --- a/pythrust/battery/rate_map.py +++ b/pythrust/battery/rate_map.py @@ -10,7 +10,7 @@ import numpy as np -from .state import BatteryPoint, BatteryState +from .state import BatteryIntegrationResult, BatteryPoint, BatteryState @dataclass(frozen=True) @@ -213,6 +213,118 @@ def step_power(self, state: BatteryState, power_w: float, dt_s: float) -> Batter raise ValueError(point.infeasible_reason or "infeasible battery power") return self.step_current(state=state, current_a=point.current_a, dt_s=dt_s) + def integrate_current( + self, + state: BatteryState, + current_a: float, + dt_s: float, + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate battery state under constant pack current.""" + self._validate_integration_inputs(current_a, dt_s, max_step_s, "current_a") + + elapsed_s = 0.0 + current_state = state + current_point = self.state_at_current(state=current_state, current_a=current_a) + histories = self._initial_integration_histories(current_state, current_point) + delivered_energy_wh = 0.0 + consumed_charge_ah = 0.0 + + if not current_point.is_feasible: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason=current_point.infeasible_reason or "infeasible_state", + ) + if dt_s == 0.0: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="duration_complete", + ) + if current_a == 0.0: + self._append_integration_sample(histories, dt_s, current_state, current_point) + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="duration_complete", + ) + + while elapsed_s < dt_s: + step_s = min(max_step_s, dt_s - elapsed_s) + time_to_dod_limit_s = self._time_to_dod_limit(current_state, current_a) + reaches_dod_limit = time_to_dod_limit_s <= step_s + if reaches_dod_limit: + step_s = time_to_dod_limit_s + if step_s <= 0.0: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason="dod_limit", + ) + + next_state = self._advance_state_for_current(current_state, current_a, step_s) + next_point = self.state_at_current(state=next_state, current_a=current_a) + + if not next_point.is_feasible: + reason = next_point.infeasible_reason or "infeasible_state" + step_s, next_state, next_point = self._find_current_integration_boundary( + current_state, + current_a, + step_s, + ) + delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 + consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 + elapsed_s += step_s + self._append_integration_sample(histories, elapsed_s, next_state, next_point) + return self._integration_result( + final_state=next_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason=reason, + ) + + delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 + consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 + elapsed_s += step_s + self._append_integration_sample(histories, elapsed_s, next_state, next_point) + current_state = next_state + current_point = next_point + + if reaches_dod_limit and elapsed_s < dt_s: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason="dod_limit", + ) + + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="duration_complete", + ) + def _point_from_cell_state( self, state: BatteryState, @@ -253,3 +365,102 @@ def _point_from_cell_state( @staticmethod def _clip_dod(dod: float) -> float: return min(1.0, max(0.0, float(dod))) + + @staticmethod + def _trapezoid(left: float, right: float, width: float) -> float: + return 0.5 * (left + right) * width + + @staticmethod + def _validate_integration_inputs(load: float, dt_s: float, max_step_s: float, load_name: str) -> None: + if not math.isfinite(load) or load < 0.0: + raise ValueError(f"{load_name} must be finite and non-negative") + if not math.isfinite(dt_s) or dt_s < 0.0: + raise ValueError("dt_s must be finite and non-negative") + if not math.isfinite(max_step_s) or max_step_s <= 0.0: + raise ValueError("max_step_s must be finite and positive") + + def _advance_state_for_current(self, state: BatteryState, current_a: float, dt_s: float) -> BatteryState: + cell_current_a = current_a / self.parallel + dod_next = state.dod + cell_current_a * dt_s / self.capacity_as + return BatteryState.from_dod(self._clip_dod(dod_next)) + + def _time_to_dod_limit(self, state: BatteryState, current_a: float) -> float: + cell_current_a = current_a / self.parallel + if cell_current_a <= 0.0: + return math.inf + return max(0.0, (1.0 - state.dod) * self.capacity_as / cell_current_a) + + def _find_current_integration_boundary( + self, + state: BatteryState, + current_a: float, + step_s: float, + ) -> tuple[float, BatteryState, BatteryPoint]: + low_s = 0.0 + high_s = step_s + boundary_state = state + boundary_point = self.state_at_current(state=state, current_a=current_a) + + for _ in range(40): + mid_s = 0.5 * (low_s + high_s) + mid_state = self._advance_state_for_current(state, current_a, mid_s) + mid_point = self.state_at_current(state=mid_state, current_a=current_a) + if mid_point.is_feasible: + low_s = mid_s + boundary_state = mid_state + boundary_point = mid_point + else: + high_s = mid_s + + return low_s, boundary_state, boundary_point + + @staticmethod + def _initial_integration_histories(state: BatteryState, point: BatteryPoint) -> dict[str, list[float]]: + return { + "time_s": [0.0], + "dod": [state.dod], + "voltage_v": [point.terminal_voltage_v], + "current_a": [point.current_a], + "c_rate": [point.c_rate], + "power_w": [point.power_w], + "efficiency": [point.efficiency], + } + + @staticmethod + def _append_integration_sample( + histories: dict[str, list[float]], + time_s: float, + state: BatteryState, + point: BatteryPoint, + ) -> None: + histories["time_s"].append(time_s) + histories["dod"].append(state.dod) + histories["voltage_v"].append(point.terminal_voltage_v) + histories["current_a"].append(point.current_a) + histories["c_rate"].append(point.c_rate) + histories["power_w"].append(point.power_w) + histories["efficiency"].append(point.efficiency) + + @staticmethod + def _integration_result( + final_state: BatteryState, + histories: dict[str, list[float]], + delivered_energy_wh: float, + consumed_charge_ah: float, + is_feasible: bool, + stop_reason: str, + ) -> BatteryIntegrationResult: + return BatteryIntegrationResult( + final_state=final_state, + time_s=histories["time_s"], + dod=histories["dod"], + voltage_v=histories["voltage_v"], + current_a=histories["current_a"], + c_rate=histories["c_rate"], + power_w=histories["power_w"], + efficiency=histories["efficiency"], + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=is_feasible, + stop_reason=stop_reason, + ) diff --git a/tests/test_battery.py b/tests/test_battery.py index 15ab00c..eb39504 100644 --- a/tests/test_battery.py +++ b/tests/test_battery.py @@ -154,6 +154,122 @@ def test_rate_map_step_current(rate_map_battery): assert math.isclose(next_state.soc, 0.5) +def test_rate_map_integrates_constant_current(rate_map_battery): + result = rate_map_battery.integrate_current( + state=BatteryState(soc=1.0), + current_a=8.4, + dt_s=1800.0, + max_step_s=60.0, + ) + + expected_energy_wh = ((16.464 + 14.696) / 2.0) * 8.4 * 0.5 + + assert result.is_feasible is True + assert result.stop_reason == "duration_complete" + assert math.isclose(result.final_state.dod, 0.5) + assert math.isclose(result.consumed_charge_ah, 4.2) + assert math.isclose(result.delivered_energy_wh, expected_energy_wh) + assert result.time_s[0] == 0.0 + assert result.time_s[-1] == 1800.0 + assert result.dod[-1] == result.final_state.dod + + +def test_rate_map_integrates_zero_current_without_changing_state(rate_map_battery): + result = rate_map_battery.integrate_current( + state=BatteryState(soc=0.8), + current_a=0.0, + dt_s=120.0, + ) + + assert result.is_feasible is True + assert result.stop_reason == "duration_complete" + assert result.time_s == (0.0, 120.0) + assert all(math.isclose(dod, 0.2) for dod in result.dod) + assert result.consumed_charge_ah == 0.0 + assert result.delivered_energy_wh == 0.0 + + +def test_rate_map_integrates_zero_duration_without_extra_sample(rate_map_battery): + result = rate_map_battery.integrate_current( + state=BatteryState(soc=0.8), + current_a=8.0, + dt_s=0.0, + ) + + assert result.is_feasible is True + assert result.stop_reason == "duration_complete" + assert result.time_s == (0.0,) + assert math.isclose(result.dod[0], 0.2) + + +def test_rate_map_integrates_current_until_voltage_cutoff(rate_map_battery): + result = rate_map_battery.integrate_current( + state=BatteryState(soc=1.0), + current_a=36.0, + dt_s=1200.0, + max_step_s=60.0, + ) + + assert result.is_feasible is False + assert result.stop_reason == "voltage_cutoff" + assert result.time_s[-1] < 1200.0 + assert math.isclose(result.voltage_v[-1] / rate_map_battery.series, rate_map_battery.cutoff_voltage_v) + assert result.final_state.dod < 1.0 + + +def test_rate_map_integrates_current_until_dod_limit(rate_map_battery): + result = rate_map_battery.integrate_current( + state=BatteryState(soc=1.0), + current_a=1.0, + dt_s=40000.0, + max_step_s=5000.0, + ) + + assert result.is_feasible is False + assert result.stop_reason == "dod_limit" + assert result.final_state.dod == 1.0 + assert math.isclose(result.time_s[-1], 30240.0) + assert math.isclose(result.consumed_charge_ah, 8.4) + + +def test_rate_map_integrates_current_reports_initial_infeasible_state(rate_map_battery): + result = rate_map_battery.integrate_current( + state=BatteryState(soc=0.5), + current_a=1000.0, + dt_s=60.0, + ) + + assert result.is_feasible is False + assert result.stop_reason == "current_limit" + assert result.time_s == (0.0,) + assert result.consumed_charge_ah == 0.0 + assert result.delivered_energy_wh == 0.0 + + +@pytest.mark.parametrize( + ("kwargs", "message"), + [ + ({"current_a": -1.0}, "current_a"), + ({"current_a": math.nan}, "current_a"), + ({"dt_s": -1.0}, "dt_s"), + ({"dt_s": math.inf}, "dt_s"), + ({"max_step_s": 0.0}, "max_step_s"), + ({"max_step_s": math.nan}, "max_step_s"), + ], +) +def test_rate_map_integrate_current_rejects_invalid_inputs(rate_map_battery, kwargs, message): + params = { + "state": BatteryState(soc=1.0), + "current_a": 1.0, + "dt_s": 10.0, + "max_step_s": 1.0, + } + params.update(kwargs) + + with pytest.raises(ValueError, match=message): + rate_map_battery.integrate_current(**params) + + def test_rate_map_loads_from_json(tmp_path): path = tmp_path / "battery.json" path.write_text( From ef5f4ad96e073a34ebb9ab36ed78dc2496690666 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=BCseyin=20Karakaya?= Date: Sun, 21 Jun 2026 13:49:17 +0300 Subject: [PATCH 3/8] feat(battery): add constant power integration --- pythrust/battery/rate_map.py | 138 +++++++++++++++++++++++++++++++++++ tests/test_battery.py | 115 +++++++++++++++++++++++++++++ 2 files changed, 253 insertions(+) diff --git a/pythrust/battery/rate_map.py b/pythrust/battery/rate_map.py index 9599984..a01e0cf 100644 --- a/pythrust/battery/rate_map.py +++ b/pythrust/battery/rate_map.py @@ -325,6 +325,119 @@ def integrate_current( stop_reason="duration_complete", ) + def integrate_power( + self, + state: BatteryState, + power_w: float, + dt_s: float, + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate battery state under constant pack power draw.""" + self._validate_integration_inputs(power_w, dt_s, max_step_s, "power_w") + + elapsed_s = 0.0 + current_state = state + current_point = self.state_at_power(state=current_state, power_w=power_w) + histories = self._initial_integration_histories(current_state, current_point) + delivered_energy_wh = 0.0 + consumed_charge_ah = 0.0 + + if not current_point.is_feasible: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason=current_point.infeasible_reason or "infeasible_state", + ) + if dt_s == 0.0: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="duration_complete", + ) + if power_w == 0.0: + self._append_integration_sample(histories, dt_s, current_state, current_point) + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="duration_complete", + ) + + while elapsed_s < dt_s: + step_s = min(max_step_s, dt_s - elapsed_s) + time_to_dod_limit_s = self._time_to_dod_limit(current_state, current_point.current_a) + reaches_dod_limit = time_to_dod_limit_s <= step_s + if reaches_dod_limit: + step_s = time_to_dod_limit_s + if step_s <= 0.0: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason="dod_limit", + ) + + next_state = self._advance_state_for_current(current_state, current_point.current_a, step_s) + next_point = self.state_at_power(state=next_state, power_w=power_w) + + if not next_point.is_feasible: + reason = next_point.infeasible_reason or "infeasible_state" + step_s, next_state, next_point = self._find_power_integration_boundary( + current_state, + current_point.current_a, + power_w, + step_s, + ) + delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 + consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 + elapsed_s += step_s + self._append_integration_sample(histories, elapsed_s, next_state, next_point) + return self._integration_result( + final_state=next_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason=reason, + ) + + delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 + consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 + elapsed_s += step_s + self._append_integration_sample(histories, elapsed_s, next_state, next_point) + current_state = next_state + current_point = next_point + + if reaches_dod_limit and elapsed_s < dt_s: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason="dod_limit", + ) + + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="duration_complete", + ) + def _point_from_cell_state( self, state: BatteryState, @@ -414,6 +527,31 @@ def _find_current_integration_boundary( return low_s, boundary_state, boundary_point + def _find_power_integration_boundary( + self, + state: BatteryState, + current_a: float, + power_w: float, + step_s: float, + ) -> tuple[float, BatteryState, BatteryPoint]: + low_s = 0.0 + high_s = step_s + boundary_state = state + boundary_point = self.state_at_power(state=state, power_w=power_w) + + for _ in range(40): + mid_s = 0.5 * (low_s + high_s) + mid_state = self._advance_state_for_current(state, current_a, mid_s) + mid_point = self.state_at_power(state=mid_state, power_w=power_w) + if mid_point.is_feasible: + low_s = mid_s + boundary_state = mid_state + boundary_point = mid_point + else: + high_s = mid_s + + return low_s, boundary_state, boundary_point + @staticmethod def _initial_integration_histories(state: BatteryState, point: BatteryPoint) -> dict[str, list[float]]: return { diff --git a/tests/test_battery.py b/tests/test_battery.py index eb39504..3c1a73c 100644 --- a/tests/test_battery.py +++ b/tests/test_battery.py @@ -270,6 +270,121 @@ def test_rate_map_integrate_current_rejects_invalid_inputs(rate_map_battery, kwa rate_map_battery.integrate_current(**params) +def test_rate_map_integrates_constant_power(rate_map_battery): + result = rate_map_battery.integrate_power( + state=BatteryState(soc=1.0), + power_w=100.0, + dt_s=600.0, + max_step_s=10.0, + ) + + assert result.is_feasible is True + assert result.stop_reason == "duration_complete" + assert result.final_state.dod > 0.0 + assert result.time_s[0] == 0.0 + assert result.time_s[-1] == 600.0 + assert math.isclose(result.delivered_energy_wh, 100.0 * 600.0 / 3600.0) + assert result.consumed_charge_ah > 0.0 + assert result.current_a[-1] > result.current_a[0] + assert result.voltage_v[-1] < result.voltage_v[0] + + +def test_rate_map_integrates_zero_power_without_changing_state(rate_map_battery): + result = rate_map_battery.integrate_power( + state=BatteryState(soc=0.8), + power_w=0.0, + dt_s=120.0, + ) + + assert result.is_feasible is True + assert result.stop_reason == "duration_complete" + assert result.time_s == (0.0, 120.0) + assert all(math.isclose(dod, 0.2) for dod in result.dod) + assert result.consumed_charge_ah == 0.0 + assert result.delivered_energy_wh == 0.0 + + +def test_rate_map_integrates_zero_power_duration_without_extra_sample(rate_map_battery): + result = rate_map_battery.integrate_power( + state=BatteryState(soc=0.8), + power_w=100.0, + dt_s=0.0, + ) + + assert result.is_feasible is True + assert result.stop_reason == "duration_complete" + assert result.time_s == (0.0,) + assert math.isclose(result.dod[0], 0.2) + + +def test_rate_map_integrates_power_until_voltage_cutoff(rate_map_battery): + result = rate_map_battery.integrate_power( + state=BatteryState(soc=1.0), + power_w=300.0, + dt_s=20000.0, + max_step_s=20.0, + ) + + assert result.is_feasible is False + assert result.stop_reason == "voltage_cutoff" + assert result.time_s[-1] < 20000.0 + assert math.isclose(result.voltage_v[-1] / rate_map_battery.series, rate_map_battery.cutoff_voltage_v) + assert math.isclose(result.delivered_energy_wh, 300.0 * result.time_s[-1] / 3600.0) + + +def test_rate_map_integrates_power_until_dod_limit(rate_map_battery): + result = rate_map_battery.integrate_power( + state=BatteryState(soc=1.0), + power_w=120.0, + dt_s=20000.0, + max_step_s=20.0, + ) + + assert result.is_feasible is False + assert result.stop_reason == "dod_limit" + assert result.final_state.dod == 1.0 + assert result.voltage_v[-1] / rate_map_battery.series > rate_map_battery.cutoff_voltage_v + assert math.isclose(result.delivered_energy_wh, 120.0 * result.time_s[-1] / 3600.0) + + +def test_rate_map_integrates_power_reports_initial_power_limit(rate_map_battery): + result = rate_map_battery.integrate_power( + state=BatteryState(soc=0.5), + power_w=2000.0, + dt_s=60.0, + ) + + assert result.is_feasible is False + assert result.stop_reason == "power_limit" + assert result.time_s == (0.0,) + assert result.consumed_charge_ah == 0.0 + assert result.delivered_energy_wh == 0.0 + + +@pytest.mark.parametrize( + ("kwargs", "message"), + [ + ({"power_w": -1.0}, "power_w"), + ({"power_w": math.nan}, "power_w"), + ({"dt_s": -1.0}, "dt_s"), + ({"dt_s": math.inf}, "dt_s"), + ({"max_step_s": 0.0}, "max_step_s"), + ({"max_step_s": math.nan}, "max_step_s"), + ], +) +def test_rate_map_integrate_power_rejects_invalid_inputs(rate_map_battery, kwargs, message): + params = { + "state": BatteryState(soc=1.0), + "power_w": 100.0, + "dt_s": 10.0, + "max_step_s": 1.0, + } + params.update(kwargs) + + with pytest.raises(ValueError, match=message): + rate_map_battery.integrate_power(**params) + + def test_rate_map_loads_from_json(tmp_path): path = tmp_path / "battery.json" path.write_text( From be39942dba4f526d544f29a88abf8fb89fce8d6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=BCseyin=20Karakaya?= Date: Sun, 21 Jun 2026 13:55:43 +0300 Subject: [PATCH 4/8] feat(battery): add energy knockdown factors --- pythrust/battery/rate_map.py | 60 ++++++++++++++++++++++++++++++++++++ tests/test_battery.py | 57 ++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+) diff --git a/pythrust/battery/rate_map.py b/pythrust/battery/rate_map.py index a01e0cf..315304e 100644 --- a/pythrust/battery/rate_map.py +++ b/pythrust/battery/rate_map.py @@ -438,6 +438,40 @@ def integrate_power( stop_reason="duration_complete", ) + def energy_knockdown_dod( + self, + dod_initial: float, + dod_final: float, + *, + samples: int = 1001, + ) -> float: + """Return reversible energy fraction available over a DOD interval.""" + self._validate_dod_interval(dod_initial, dod_final) + if samples < 2: + raise ValueError("samples must be at least 2") + + total_energy_wh = self._reversible_energy_wh(0.0, 1.0, samples=samples) + interval_energy_wh = self._reversible_energy_wh(dod_initial, dod_final, samples=samples) + return interval_energy_wh / total_energy_wh + + def energy_knockdown_c_rate( + self, + c_rate: float, + *, + max_step_s: float = 1.0, + ) -> float: + """Return usable-energy fraction at C-rate relative to reversible energy.""" + if not math.isfinite(c_rate) or c_rate < 0.0: + raise ValueError("c_rate must be finite and non-negative") + if not math.isfinite(max_step_s) or max_step_s <= 0.0: + raise ValueError("max_step_s must be finite and positive") + if c_rate == 0.0: + return 1.0 + + energy_wh = self._usable_energy_at_c_rate(c_rate, max_step_s=max_step_s) + reversible_energy_wh = self._reversible_energy_wh(0.0, 1.0, samples=1001) + return energy_wh / reversible_energy_wh + def _point_from_cell_state( self, state: BatteryState, @@ -483,6 +517,23 @@ def _clip_dod(dod: float) -> float: def _trapezoid(left: float, right: float, width: float) -> float: return 0.5 * (left + right) * width + def _reversible_energy_wh(self, dod_initial: float, dod_final: float, *, samples: int) -> float: + dod_values = np.linspace(dod_initial, dod_final, samples) + ocv_values = np.array([self.ocv(float(dod)) for dod in dod_values]) + cell_energy_wh = float(np.trapezoid(ocv_values, dod_values)) * self.capacity_ah + return cell_energy_wh * self.series * self.parallel + + def _usable_energy_at_c_rate(self, c_rate: float, *, max_step_s: float) -> float: + current_a = c_rate * self.rated_current_a * self.parallel + duration_s = self.capacity_as / (self.rated_current_a * c_rate) if c_rate > 0.0 else 0.0 + result = self.integrate_current( + state=BatteryState(soc=1.0), + current_a=current_a, + dt_s=duration_s, + max_step_s=max_step_s, + ) + return result.delivered_energy_wh + @staticmethod def _validate_integration_inputs(load: float, dt_s: float, max_step_s: float, load_name: str) -> None: if not math.isfinite(load) or load < 0.0: @@ -492,6 +543,15 @@ def _validate_integration_inputs(load: float, dt_s: float, max_step_s: float, lo if not math.isfinite(max_step_s) or max_step_s <= 0.0: raise ValueError("max_step_s must be finite and positive") + @staticmethod + def _validate_dod_interval(dod_initial: float, dod_final: float) -> None: + if not math.isfinite(dod_initial) or not math.isfinite(dod_final): + raise ValueError("dod values must be finite") + if dod_initial < 0.0 or dod_initial > 1.0 or dod_final < 0.0 or dod_final > 1.0: + raise ValueError("dod values must be between 0 and 1") + if dod_final <= dod_initial: + raise ValueError("dod_final must be greater than dod_initial") + def _advance_state_for_current(self, state: BatteryState, current_a: float, dt_s: float) -> BatteryState: cell_current_a = current_a / self.parallel dod_next = state.dod + cell_current_a * dt_s / self.capacity_as diff --git a/tests/test_battery.py b/tests/test_battery.py index 3c1a73c..16fb54d 100644 --- a/tests/test_battery.py +++ b/tests/test_battery.py @@ -385,6 +385,63 @@ def test_rate_map_integrate_power_rejects_invalid_inputs(rate_map_battery, kwarg rate_map_battery.integrate_power(**params) +def test_rate_map_dod_energy_knockdown(rate_map_battery): + full = rate_map_battery.energy_knockdown_dod(0.0, 1.0) + first_half = rate_map_battery.energy_knockdown_dod(0.0, 0.5) + + assert math.isclose(full, 1.0) + assert math.isclose(first_half, 2.0 / 3.75) + + +@pytest.mark.parametrize( + ("dod_initial", "dod_final"), + [ + (-0.1, 0.5), + (0.5, 1.1), + (0.5, 0.5), + (0.8, 0.2), + (math.nan, 0.5), + ], +) +def test_rate_map_dod_energy_knockdown_rejects_invalid_inputs( + rate_map_battery, + dod_initial, + dod_final, +): + with pytest.raises(ValueError, match="dod"): + rate_map_battery.energy_knockdown_dod(dod_initial, dod_final) + + +def test_rate_map_c_rate_energy_knockdown(rate_map_battery): + zero_c = rate_map_battery.energy_knockdown_c_rate(0.0) + half_c = rate_map_battery.energy_knockdown_c_rate(0.5, max_step_s=60.0) + one_c = rate_map_battery.energy_knockdown_c_rate(1.0, max_step_s=60.0) + + assert zero_c == 1.0 + assert 0.0 < one_c < half_c < zero_c + + +@pytest.mark.parametrize( + ("kwargs", "message"), + [ + ({"c_rate": -1.0}, "c_rate"), + ({"c_rate": math.nan}, "c_rate"), + ({"max_step_s": 0.0}, "max_step_s"), + ({"max_step_s": math.inf}, "max_step_s"), + ], +) +def test_rate_map_c_rate_energy_knockdown_rejects_invalid_inputs( + rate_map_battery, + kwargs, + message, +): + params = {"c_rate": 1.0, "max_step_s": 60.0} + params.update(kwargs) + + with pytest.raises(ValueError, match=message): + rate_map_battery.energy_knockdown_c_rate(**params) + + def test_rate_map_loads_from_json(tmp_path): path = tmp_path / "battery.json" path.write_text( From 829fb7a4df26273761409e79d12351763ad4f53c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=BCseyin=20Karakaya?= Date: Sun, 21 Jun 2026 14:36:44 +0300 Subject: [PATCH 5/8] feat(battery): add bat-perf integration modes --- pythrust/battery/rate_map.py | 600 ++++++++++++++++++++++++++++++++++- tests/test_battery.py | 203 ++++++++++++ 2 files changed, 802 insertions(+), 1 deletion(-) diff --git a/pythrust/battery/rate_map.py b/pythrust/battery/rate_map.py index 315304e..152cc85 100644 --- a/pythrust/battery/rate_map.py +++ b/pythrust/battery/rate_map.py @@ -6,7 +6,7 @@ import json import math from pathlib import Path -from typing import Sequence +from typing import Callable, Sequence import numpy as np @@ -198,6 +198,17 @@ def state_at_load_resistance(self, state: BatteryState, resistance_ohm: float) - resistance_ohm=cell_internal_ohm, ) + def state_at_power_loss(self, state: BatteryState, power_loss_w: float) -> BatteryPoint: + """Evaluate battery point state for a specified pack internal power loss.""" + if not math.isfinite(power_loss_w) or power_loss_w < 0.0: + raise ValueError("power_loss_w must be finite and non-negative") + + dod = state.dod + resistance_ohm = self.resistance(dod) + cell_power_loss_w = power_loss_w / (self.series * self.parallel) + cell_current_a = math.sqrt(cell_power_loss_w / resistance_ohm) if cell_power_loss_w > 0.0 else 0.0 + return self.state_at_current(state=state, current_a=cell_current_a * self.parallel) + def step_current(self, state: BatteryState, current_a: float, dt_s: float) -> BatteryState: """Advance state using constant pack current over a time step.""" if dt_s < 0.0: @@ -325,6 +336,24 @@ def integrate_current( stop_reason="duration_complete", ) + def integrate_c_rate( + self, + state: BatteryState, + c_rate: float, + dt_s: float, + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate battery state under constant cell C-rate.""" + if not math.isfinite(c_rate) or c_rate < 0.0: + raise ValueError("c_rate must be finite and non-negative") + return self.integrate_current( + state=state, + current_a=c_rate * self.rated_current_a * self.parallel, + dt_s=dt_s, + max_step_s=max_step_s, + ) + def integrate_power( self, state: BatteryState, @@ -438,6 +467,247 @@ def integrate_power( stop_reason="duration_complete", ) + def integrate_voltage( + self, + state: BatteryState, + voltage_v: float, + dt_s: float, + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate battery state under constant pack terminal voltage.""" + self._validate_integration_inputs(voltage_v, dt_s, max_step_s, "voltage_v") + return self._integrate_with_point_function( + state=state, + dt_s=dt_s, + max_step_s=max_step_s, + point_function=lambda current_state: self.state_at_voltage(current_state, voltage_v), + ) + + def integrate_load_resistance( + self, + state: BatteryState, + resistance_ohm: float, + dt_s: float, + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate battery state under constant pack load resistance.""" + if not math.isfinite(resistance_ohm) or resistance_ohm <= 0.0: + raise ValueError("resistance_ohm must be finite and positive") + self._validate_integration_inputs(0.0, dt_s, max_step_s, "load") + return self._integrate_with_point_function( + state=state, + dt_s=dt_s, + max_step_s=max_step_s, + point_function=lambda current_state: self.state_at_load_resistance(current_state, resistance_ohm), + ) + + def integrate_power_loss( + self, + state: BatteryState, + power_loss_w: float, + dt_s: float, + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate battery state under constant pack internal power loss.""" + self._validate_integration_inputs(power_loss_w, dt_s, max_step_s, "power_loss_w") + return self._integrate_with_point_function( + state=state, + dt_s=dt_s, + max_step_s=max_step_s, + point_function=lambda current_state: self.state_at_power_loss(current_state, power_loss_w), + ) + + def integrate_current_to_dod( + self, + state: BatteryState, + current_a: float, + dod_final: float, + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate constant current until a target DOD is reached.""" + self._validate_integration_inputs(current_a, 0.0, max_step_s, "current_a") + return self._integrate_to_dod( + state=state, + dod_final=dod_final, + max_step_s=max_step_s, + point_function=lambda current_state: self.state_at_current(current_state, current_a), + ) + + def integrate_c_rate_to_dod( + self, + state: BatteryState, + c_rate: float, + dod_final: float, + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate constant cell C-rate until a target DOD is reached.""" + if not math.isfinite(c_rate) or c_rate < 0.0: + raise ValueError("c_rate must be finite and non-negative") + return self.integrate_current_to_dod( + state=state, + current_a=c_rate * self.rated_current_a * self.parallel, + dod_final=dod_final, + max_step_s=max_step_s, + ) + + def integrate_power_to_dod( + self, + state: BatteryState, + power_w: float, + dod_final: float, + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate constant pack power until a target DOD is reached.""" + self._validate_integration_inputs(power_w, 0.0, max_step_s, "power_w") + return self._integrate_to_dod( + state=state, + dod_final=dod_final, + max_step_s=max_step_s, + point_function=lambda current_state: self.state_at_power(current_state, power_w), + ) + + def integrate_voltage_to_dod( + self, + state: BatteryState, + voltage_v: float, + dod_final: float, + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate constant pack terminal voltage until a target DOD is reached.""" + self._validate_integration_inputs(voltage_v, 0.0, max_step_s, "voltage_v") + return self._integrate_to_dod( + state=state, + dod_final=dod_final, + max_step_s=max_step_s, + point_function=lambda current_state: self.state_at_voltage(current_state, voltage_v), + ) + + def integrate_load_resistance_to_dod( + self, + state: BatteryState, + resistance_ohm: float, + dod_final: float, + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate constant pack load resistance until a target DOD is reached.""" + if not math.isfinite(resistance_ohm) or resistance_ohm <= 0.0: + raise ValueError("resistance_ohm must be finite and positive") + self._validate_integration_inputs(0.0, 0.0, max_step_s, "load") + return self._integrate_to_dod( + state=state, + dod_final=dod_final, + max_step_s=max_step_s, + point_function=lambda current_state: self.state_at_load_resistance(current_state, resistance_ohm), + ) + + def integrate_power_loss_to_dod( + self, + state: BatteryState, + power_loss_w: float, + dod_final: float, + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate constant pack internal power loss until a target DOD is reached.""" + self._validate_integration_inputs(power_loss_w, 0.0, max_step_s, "power_loss_w") + return self._integrate_to_dod( + state=state, + dod_final=dod_final, + max_step_s=max_step_s, + point_function=lambda current_state: self.state_at_power_loss(current_state, power_loss_w), + ) + + def dod_at_voltage_power(self, voltage_v: float, power_w: float) -> float: + """Solve DOD where constant-voltage and requested-power states coincide.""" + self._validate_integration_inputs(voltage_v, 0.0, 1.0, "voltage_v") + self._validate_integration_inputs(power_w, 0.0, 1.0, "power_w") + return self._solve_dod( + lambda dod: self.state_at_voltage(BatteryState.from_dod(dod), voltage_v).power_w - power_w + ) + + def dod_at_power_voltage(self, power_w: float, voltage_v: float) -> float: + """Solve DOD where constant-power and requested-voltage states coincide.""" + self._validate_integration_inputs(power_w, 0.0, 1.0, "power_w") + self._validate_integration_inputs(voltage_v, 0.0, 1.0, "voltage_v") + return self._solve_dod( + lambda dod: self.state_at_power(BatteryState.from_dod(dod), power_w).terminal_voltage_v - voltage_v + ) + + def integrate_power_profile( + self, + state: BatteryState, + power_w: Sequence[float], + durations_s: Sequence[float], + *, + max_step_s: float = 1.0, + ) -> BatteryIntegrationResult: + """Integrate consecutive constant-power segments into one time history.""" + if len(power_w) != len(durations_s): + raise ValueError("power_w and durations_s must have the same length") + if len(power_w) == 0: + point = self.state_at_power(state=state, power_w=0.0) + return self._integration_result( + final_state=state, + histories=self._initial_integration_histories(state, point), + delivered_energy_wh=0.0, + consumed_charge_ah=0.0, + is_feasible=True, + stop_reason="duration_complete", + ) + if not math.isfinite(max_step_s) or max_step_s <= 0.0: + raise ValueError("max_step_s must be finite and positive") + + histories: dict[str, list[float]] | None = None + current_state = state + elapsed_s = 0.0 + delivered_energy_wh = 0.0 + consumed_charge_ah = 0.0 + + for power, duration_s in zip(power_w, durations_s): + result = self.integrate_power( + state=current_state, + power_w=power, + dt_s=duration_s, + max_step_s=max_step_s, + ) + if histories is None: + histories = {name: list(values) for name, values in self._histories_from_result(result).items()} + else: + self._extend_profile_histories(histories, result, elapsed_s) + + elapsed_s = histories["time_s"][-1] + delivered_energy_wh += result.delivered_energy_wh + consumed_charge_ah += result.consumed_charge_ah + current_state = result.final_state + if not result.is_feasible: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason=result.stop_reason, + ) + + if histories is None: + raise RuntimeError("integration profile did not produce histories") + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="duration_complete", + ) + def energy_knockdown_dod( self, dod_initial: float, @@ -534,6 +804,283 @@ def _usable_energy_at_c_rate(self, c_rate: float, *, max_step_s: float) -> float ) return result.delivered_energy_wh + def _integrate_with_point_function( + self, + state: BatteryState, + dt_s: float, + max_step_s: float, + point_function: Callable[[BatteryState], BatteryPoint], + ) -> BatteryIntegrationResult: + elapsed_s = 0.0 + current_state = state + current_point = point_function(current_state) + histories = self._initial_integration_histories(current_state, current_point) + delivered_energy_wh = 0.0 + consumed_charge_ah = 0.0 + + if not current_point.is_feasible: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason=current_point.infeasible_reason or "infeasible_state", + ) + if dt_s == 0.0: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="duration_complete", + ) + if current_point.current_a == 0.0: + self._append_integration_sample(histories, dt_s, current_state, current_point) + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="duration_complete", + ) + + while elapsed_s < dt_s: + step_s = min(max_step_s, dt_s - elapsed_s) + time_to_dod_limit_s = self._time_to_dod_limit(current_state, current_point.current_a) + reaches_dod_limit = time_to_dod_limit_s <= step_s + if reaches_dod_limit: + step_s = time_to_dod_limit_s + if step_s <= 0.0: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason="dod_limit", + ) + + next_state = self._advance_state_for_current(current_state, current_point.current_a, step_s) + next_point = point_function(next_state) + + if not next_point.is_feasible: + reason = next_point.infeasible_reason or "infeasible_state" + step_s, next_state, next_point = self._find_integration_boundary( + current_state, + current_point.current_a, + step_s, + point_function, + ) + delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 + consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 + elapsed_s += step_s + self._append_integration_sample(histories, elapsed_s, next_state, next_point) + return self._integration_result( + final_state=next_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason=reason, + ) + + delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 + consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 + elapsed_s += step_s + self._append_integration_sample(histories, elapsed_s, next_state, next_point) + current_state = next_state + current_point = next_point + + if reaches_dod_limit and elapsed_s < dt_s: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason="dod_limit", + ) + + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="duration_complete", + ) + + def _integrate_to_dod( + self, + state: BatteryState, + dod_final: float, + max_step_s: float, + point_function: Callable[[BatteryState], BatteryPoint], + ) -> BatteryIntegrationResult: + if not math.isfinite(dod_final) or dod_final < 0.0 or dod_final > 1.0: + raise ValueError("dod_final must be finite and between 0 and 1") + if dod_final <= state.dod: + raise ValueError("dod_final must be greater than the initial DOD") + + elapsed_s = 0.0 + current_state = state + current_point = point_function(current_state) + histories = self._initial_integration_histories(current_state, current_point) + delivered_energy_wh = 0.0 + consumed_charge_ah = 0.0 + + if not current_point.is_feasible: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason=current_point.infeasible_reason or "infeasible_state", + ) + if current_point.current_a <= 0.0: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason="zero_discharge_current", + ) + + while current_state.dod < dod_final: + cell_current_a = current_point.current_a / self.parallel + if cell_current_a <= 0.0: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason="zero_discharge_current", + ) + + time_to_target_s = (dod_final - current_state.dod) * self.capacity_as / cell_current_a + step_s = min(max_step_s, time_to_target_s) + if step_s <= 0.0: + final_state = BatteryState.from_dod(dod_final) + final_point = point_function(final_state) + self._append_integration_sample(histories, elapsed_s, final_state, final_point) + return self._integration_result( + final_state=final_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="dod_target", + ) + + reaches_target = time_to_target_s <= max_step_s + next_state = BatteryState.from_dod(dod_final) if reaches_target else self._advance_state_for_current( + current_state, + current_point.current_a, + step_s, + ) + next_point = point_function(next_state) + + if not next_point.is_feasible: + reason = next_point.infeasible_reason or "infeasible_state" + step_s, next_state, next_point = self._find_integration_boundary( + current_state, + current_point.current_a, + step_s, + point_function, + ) + delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 + consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 + elapsed_s += step_s + self._append_integration_sample(histories, elapsed_s, next_state, next_point) + return self._integration_result( + final_state=next_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason=reason, + ) + + dod_width = next_state.dod - current_state.dod + next_cell_current_a = next_point.current_a / self.parallel + if next_cell_current_a <= 0.0: + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=False, + stop_reason="zero_discharge_current", + ) + + elapsed_s += self._trapezoid( + self.capacity_as / cell_current_a, + self.capacity_as / next_cell_current_a, + dod_width, + ) + delivered_energy_wh += ( + self._trapezoid(current_point.terminal_voltage_v, next_point.terminal_voltage_v, dod_width) + * self.capacity_ah + * self.parallel + ) + consumed_charge_ah += self.capacity_ah * self.parallel * dod_width + self._append_integration_sample(histories, elapsed_s, next_state, next_point) + + if reaches_target: + return self._integration_result( + final_state=next_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="dod_target", + ) + + current_state = next_state + current_point = next_point + + return self._integration_result( + final_state=current_state, + histories=histories, + delivered_energy_wh=delivered_energy_wh, + consumed_charge_ah=consumed_charge_ah, + is_feasible=True, + stop_reason="dod_target", + ) + + def _solve_dod(self, residual: Callable[[float], float]) -> float: + low = 0.0 + high = 1.0 + low_value = residual(low) + high_value = residual(high) + if not math.isfinite(low_value) or not math.isfinite(high_value): + raise ValueError("DOD solve residual must be finite at interval bounds") + if low_value == 0.0: + return low + if high_value == 0.0: + return high + if low_value * high_value > 0.0: + raise ValueError("voltage and power do not bracket a DOD solution") + + for _ in range(80): + mid = 0.5 * (low + high) + mid_value = residual(mid) + if mid_value == 0.0: + return mid + if low_value * mid_value <= 0.0: + high = mid + else: + low = mid + low_value = mid_value + + return 0.5 * (low + high) + @staticmethod def _validate_integration_inputs(load: float, dt_s: float, max_step_s: float, load_name: str) -> None: if not math.isfinite(load) or load < 0.0: @@ -612,6 +1159,31 @@ def _find_power_integration_boundary( return low_s, boundary_state, boundary_point + def _find_integration_boundary( + self, + state: BatteryState, + current_a: float, + step_s: float, + point_function: Callable[[BatteryState], BatteryPoint], + ) -> tuple[float, BatteryState, BatteryPoint]: + low_s = 0.0 + high_s = step_s + boundary_state = state + boundary_point = point_function(state) + + for _ in range(40): + mid_s = 0.5 * (low_s + high_s) + mid_state = self._advance_state_for_current(state, current_a, mid_s) + mid_point = point_function(mid_state) + if mid_point.is_feasible: + low_s = mid_s + boundary_state = mid_state + boundary_point = mid_point + else: + high_s = mid_s + + return low_s, boundary_state, boundary_point + @staticmethod def _initial_integration_histories(state: BatteryState, point: BatteryPoint) -> dict[str, list[float]]: return { @@ -662,3 +1234,29 @@ def _integration_result( is_feasible=is_feasible, stop_reason=stop_reason, ) + + @staticmethod + def _histories_from_result(result: BatteryIntegrationResult) -> dict[str, Sequence[float]]: + return { + "time_s": result.time_s, + "dod": result.dod, + "voltage_v": result.voltage_v, + "current_a": result.current_a, + "c_rate": result.c_rate, + "power_w": result.power_w, + "efficiency": result.efficiency, + } + + @staticmethod + def _extend_profile_histories( + histories: dict[str, list[float]], + result: BatteryIntegrationResult, + time_offset_s: float, + ) -> None: + histories["time_s"].extend(time_offset_s + value for value in result.time_s[1:]) + histories["dod"].extend(result.dod[1:]) + histories["voltage_v"].extend(result.voltage_v[1:]) + histories["current_a"].extend(result.current_a[1:]) + histories["c_rate"].extend(result.c_rate[1:]) + histories["power_w"].extend(result.power_w[1:]) + histories["efficiency"].extend(result.efficiency[1:]) diff --git a/tests/test_battery.py b/tests/test_battery.py index 16fb54d..f6c779a 100644 --- a/tests/test_battery.py +++ b/tests/test_battery.py @@ -128,6 +128,27 @@ def test_rate_map_state_at_power(rate_map_battery): assert math.isclose(point.power_w, 100.0, rel_tol=1e-12) +def test_rate_map_state_at_power_loss(rate_map_battery): + state = BatteryState(soc=0.5) + point = rate_map_battery.state_at_power_loss(state=state, power_loss_w=7.68) + + internal_loss_w = ( + point.cell_current_a**2 + * point.resistance_ohm + * rate_map_battery.series + * rate_map_battery.parallel + ) + + assert point.is_feasible is True + assert math.isclose(internal_loss_w, 7.68) + + +@pytest.mark.parametrize("power_loss_w", [-1.0, math.nan]) +def test_rate_map_state_at_power_loss_rejects_invalid_inputs(rate_map_battery, power_loss_w): + with pytest.raises(ValueError, match="power_loss_w"): + rate_map_battery.state_at_power_loss(state=BatteryState(soc=0.5), power_loss_w=power_loss_w) + + def test_rate_map_reports_infeasible_power(rate_map_battery): state = BatteryState(soc=0.5) point = rate_map_battery.state_at_power(state=state, power_w=2000.0) @@ -174,6 +195,25 @@ def test_rate_map_integrates_constant_current(rate_map_battery): assert result.dod[-1] == result.final_state.dod +def test_rate_map_integrates_constant_c_rate_like_current(rate_map_battery): + c_rate = rate_map_battery.integrate_c_rate( + state=BatteryState(soc=1.0), + c_rate=1.0, + dt_s=600.0, + max_step_s=60.0, + ) + current = rate_map_battery.integrate_current( + state=BatteryState(soc=1.0), + current_a=rate_map_battery.rated_current_a * rate_map_battery.parallel, + dt_s=600.0, + max_step_s=60.0, + ) + + assert c_rate.final_state == current.final_state + assert c_rate.delivered_energy_wh == current.delivered_energy_wh + assert c_rate.consumed_charge_ah == current.consumed_charge_ah + + def test_rate_map_integrates_zero_current_without_changing_state(rate_map_battery): result = rate_map_battery.integrate_current( state=BatteryState(soc=0.8), @@ -385,6 +425,169 @@ def test_rate_map_integrate_power_rejects_invalid_inputs(rate_map_battery, kwarg rate_map_battery.integrate_power(**params) +def test_rate_map_integrates_constant_voltage(rate_map_battery): + result = rate_map_battery.integrate_voltage( + state=BatteryState(soc=0.5), + voltage_v=14.8, + dt_s=120.0, + max_step_s=10.0, + ) + + assert result.is_feasible is True + assert result.stop_reason == "duration_complete" + assert result.final_state.dod > 0.5 + assert all(math.isclose(voltage, 14.8) for voltage in result.voltage_v) + + +def test_rate_map_integrates_constant_load_resistance(rate_map_battery): + result = rate_map_battery.integrate_load_resistance( + state=BatteryState(soc=1.0), + resistance_ohm=1.5, + dt_s=120.0, + max_step_s=10.0, + ) + + assert result.is_feasible is True + assert result.stop_reason == "duration_complete" + assert result.final_state.dod > 0.0 + assert result.current_a[-1] > 0.0 + assert math.isclose(result.power_w[-1], result.voltage_v[-1] * result.current_a[-1]) + + +def test_rate_map_integrates_constant_power_loss(rate_map_battery): + result = rate_map_battery.integrate_power_loss( + state=BatteryState(soc=1.0), + power_loss_w=4.0, + dt_s=120.0, + max_step_s=10.0, + ) + + point_loss_w = ( + (result.current_a[-1] / rate_map_battery.parallel) ** 2 + * rate_map_battery.resistance(result.final_state.dod) + * rate_map_battery.series + * rate_map_battery.parallel + ) + + assert result.is_feasible is True + assert result.stop_reason == "duration_complete" + assert result.final_state.dod > 0.0 + assert math.isclose(point_loss_w, 4.0) + + +@pytest.mark.parametrize( + ("method_name", "kwargs", "message"), + [ + ("integrate_voltage", {"voltage_v": -1.0, "dt_s": 10.0}, "voltage_v"), + ("integrate_load_resistance", {"resistance_ohm": 0.0, "dt_s": 10.0}, "resistance_ohm"), + ("integrate_power_loss", {"power_loss_w": math.nan, "dt_s": 10.0}, "power_loss_w"), + ], +) +def test_rate_map_extended_integrators_reject_invalid_inputs( + rate_map_battery, + method_name, + kwargs, + message, +): + method = getattr(rate_map_battery, method_name) + + with pytest.raises(ValueError, match=message): + method(state=BatteryState(soc=1.0), **kwargs) + + +def test_rate_map_integrates_current_to_target_dod(rate_map_battery): + result = rate_map_battery.integrate_current_to_dod( + state=BatteryState(soc=1.0), + current_a=8.4, + dod_final=0.5, + max_step_s=60.0, + ) + + assert result.is_feasible is True + assert result.stop_reason == "dod_target" + assert math.isclose(result.final_state.dod, 0.5) + assert math.isclose(result.time_s[-1], 1800.0) + assert math.isclose(result.consumed_charge_ah, 4.2) + + +def test_rate_map_integrates_power_to_target_dod(rate_map_battery): + result = rate_map_battery.integrate_power_to_dod( + state=BatteryState(soc=1.0), + power_w=100.0, + dod_final=0.3, + max_step_s=10.0, + ) + + assert result.is_feasible is True + assert result.stop_reason == "dod_target" + assert math.isclose(result.final_state.dod, 0.3, abs_tol=1e-10) + assert math.isclose(result.delivered_energy_wh, 100.0 * result.time_s[-1] / 3600.0) + + +@pytest.mark.parametrize("dod_final", [0.0, -0.1, 1.1, math.nan]) +def test_rate_map_integrate_to_dod_rejects_invalid_target(rate_map_battery, dod_final): + with pytest.raises(ValueError, match="dod_final"): + rate_map_battery.integrate_current_to_dod( + state=BatteryState(soc=1.0), + current_a=8.4, + dod_final=dod_final, + ) + + +def test_rate_map_solves_dod_at_voltage_power(rate_map_battery): + state = BatteryState.from_dod(0.5) + point = rate_map_battery.state_at_power(state=state, power_w=100.0) + + dod_from_voltage = rate_map_battery.dod_at_voltage_power( + voltage_v=point.terminal_voltage_v, + power_w=point.power_w, + ) + dod_from_power = rate_map_battery.dod_at_power_voltage( + power_w=point.power_w, + voltage_v=point.terminal_voltage_v, + ) + + assert math.isclose(dod_from_voltage, 0.5, abs_tol=1e-10) + assert math.isclose(dod_from_power, 0.5, abs_tol=1e-10) + + +def test_rate_map_integrates_power_profile(rate_map_battery): + result = rate_map_battery.integrate_power_profile( + state=BatteryState(soc=1.0), + power_w=[100.0, 60.0], + durations_s=[120.0, 180.0], + max_step_s=10.0, + ) + first = rate_map_battery.integrate_power( + state=BatteryState(soc=1.0), + power_w=100.0, + dt_s=120.0, + max_step_s=10.0, + ) + second = rate_map_battery.integrate_power( + state=first.final_state, + power_w=60.0, + dt_s=180.0, + max_step_s=10.0, + ) + + assert result.is_feasible is True + assert result.stop_reason == "duration_complete" + assert math.isclose(result.time_s[-1], 300.0) + assert result.final_state == second.final_state + assert math.isclose(result.delivered_energy_wh, first.delivered_energy_wh + second.delivered_energy_wh) + assert math.isclose(result.consumed_charge_ah, first.consumed_charge_ah + second.consumed_charge_ah) + + +def test_rate_map_integrate_power_profile_rejects_mismatched_segments(rate_map_battery): + with pytest.raises(ValueError, match="same length"): + rate_map_battery.integrate_power_profile( + state=BatteryState(soc=1.0), + power_w=[100.0, 60.0], + durations_s=[120.0], + ) + + def test_rate_map_dod_energy_knockdown(rate_map_battery): full = rate_map_battery.energy_knockdown_dod(0.0, 1.0) first_half = rate_map_battery.energy_knockdown_dod(0.0, 0.5) From 512884fcd4e2b260fb6f44cae8723c14f63fef20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=BCseyin=20Karakaya?= Date: Sun, 21 Jun 2026 14:55:07 +0300 Subject: [PATCH 6/8] feat(battery): use adaptive rate-map integration --- docs/api_reference.md | 32 ++ docs/battery_model.md | 89 +++- docs/examples.md | 7 +- examples/rate_map_battery_mission.py | 7 +- examples/rate_map_battery_point_states.py | 5 +- pythrust/battery/rate_map.py | 576 +++++++--------------- 6 files changed, 304 insertions(+), 412 deletions(-) diff --git a/docs/api_reference.md b/docs/api_reference.md index e83b6d8..4869629 100644 --- a/docs/api_reference.md +++ b/docs/api_reference.md @@ -36,6 +36,38 @@ or `RateMapBattery` directly. `RateMapBattery.from_json(path, series=..., parallel=...)` loads one cell dataset and applies the requested pack topology at runtime. +Main `RateMapBattery` point-state helpers: + +| Method | Description | +|---|---| +| `state_at_current(state, current_a)` | Evaluate voltage, power, C-rate, efficiency, and feasibility at pack current | +| `state_at_c_rate(state, c_rate)` | Evaluate the state at cell C-rate | +| `state_at_voltage(state, voltage_v)` | Evaluate current required to hold pack terminal voltage | +| `state_at_power(state, power_w)` | Evaluate current and voltage at pack terminal power | +| `state_at_load_resistance(state, resistance_ohm)` | Evaluate a resistive load | +| `state_at_power_loss(state, power_loss_w)` | Evaluate a requested pack internal loss power | + +Main `RateMapBattery` integration helpers: + +| Method | Description | +|---|---| +| `integrate_current(...)` | Integrate over time at constant pack current | +| `integrate_c_rate(...)` | Integrate over time at constant cell C-rate | +| `integrate_power(...)` | Integrate over time at constant pack terminal power | +| `integrate_voltage(...)` | Integrate over time at constant pack terminal voltage | +| `integrate_load_resistance(...)` | Integrate over time at constant pack load resistance | +| `integrate_power_loss(...)` | Integrate over time at constant pack internal loss power | +| `integrate_current_to_dod(...)` | Integrate constant current until target DOD | +| `integrate_c_rate_to_dod(...)` | Integrate constant C-rate until target DOD | +| `integrate_power_to_dod(...)` | Integrate constant power until target DOD | +| `integrate_voltage_to_dod(...)` | Integrate constant voltage until target DOD | +| `integrate_load_resistance_to_dod(...)` | Integrate constant load resistance until target DOD | +| `integrate_power_loss_to_dod(...)` | Integrate constant internal loss power until target DOD | +| `integrate_power_profile(...)` | Integrate consecutive constant-power mission segments | + +`BatteryIntegrationResult` reports final state, sampled histories, delivered +energy, consumed charge, feasibility, and stop reason. + ## System and Propeller Specs | Class | Purpose | diff --git a/docs/battery_model.md b/docs/battery_model.md index 1a6ca3c..372ad25 100644 --- a/docs/battery_model.md +++ b/docs/battery_model.md @@ -1,7 +1,5 @@ # Battery Model -Status: implemented model note for issue #3. - PyThrust supports a fixed-voltage battery model and a lightweight rate-map battery model. The fixed-voltage path is useful for quick propulsion sizing, but it hides two effects that matter for electric aircraft performance studies: @@ -104,6 +102,7 @@ The most important point-state functions are: | Specified voltage | `cellStateV` | $I = (OCV - V) / R$ | | Specified power | `cellStateP` | solve $P = I(OCV - RI)$ | | Specified load resistance | `cellStateR` | $I = OCV / (R + R_{load})$ | +| Specified internal loss | `cellStatePloss` | solve $P_{loss} = I^2R$ | For specified power, the current is obtained from the quadratic: @@ -128,6 +127,73 @@ The implementation reports infeasible states when requested power exceeds this limit, when current exceeds the configured limit, or when terminal voltage falls below cutoff. +## Integration Modes + +`RateMapBattery` can integrate battery state through time or to a target depth +of discharge. All integration methods return `BatteryIntegrationResult`, which +contains the final state, sampled histories, delivered energy, consumed charge, +feasibility, and stop reason. + +| PyThrust method | bat-perf analogue | Load held constant | +|---|---|---| +| `integrate_current(...)` | `cellIntIt` | Pack current | +| `integrate_c_rate(...)` | `cellIntCt` | Cell C-rate | +| `integrate_power(...)` | `cellIntPt` | Pack terminal power | +| `integrate_voltage(...)` | `cellIntVt` | Pack terminal voltage | +| `integrate_load_resistance(...)` | `cellIntRt` | Pack load resistance | +| `integrate_power_loss(...)` | `cellIntPlosst` | Pack internal loss power | + +The target-DOD variants stop at a requested final DOD instead of a requested +duration: + +| PyThrust method | bat-perf analogue | +|---|---| +| `integrate_current_to_dod(...)` | `cellIntIdod` | +| `integrate_c_rate_to_dod(...)` | `cellIntCdod` | +| `integrate_power_to_dod(...)` | `cellIntPdod` | +| `integrate_voltage_to_dod(...)` | `cellIntVdod` | +| `integrate_load_resistance_to_dod(...)` | `cellIntRdod` | +| `integrate_power_loss_to_dod(...)` | `cellIntPlossdod` | + +Additional helpers cover inverse and segmented calculations: + +| Method | Purpose | +|---|---| +| `dod_at_voltage_power(...)` | Find the DOD where a requested voltage and power coincide | +| `dod_at_power_voltage(...)` | Equivalent solve from the constant-power state equation | +| `integrate_power_profile(...)` | Integrate consecutive constant-power mission segments | + +!!! note "Numerical method" + Time and target-DOD integrations use SciPy's adaptive `solve_ivp` + integrator with `max_step_s` as the maximum time step for time-domain + solves. Stop events detect current limits, voltage limits, cutoff voltage, + and DOD exhaustion. This is closer to the `ode45` workflow used by + `bat-perf` than fixed-step Coulomb counting. + +Example: + +```python +from pythrust.battery import BatteryState, RateMapBattery + +battery = RateMapBattery.from_json( + "data/batteries/example_liion_cell.json", + series=4, + parallel=2, +) +state = BatteryState(soc=1.0) + +result = battery.integrate_power( + state=state, + power_w=180.0, + dt_s=300.0, + max_step_s=1.0, +) + +print(result.delivered_energy_wh) +print(result.final_state.dod) +print(result.stop_reason) +``` + ## Python API Use explicit names for the two battery fidelities: @@ -173,10 +239,11 @@ voltage = battery.terminal_voltage(current_a=current, state=state) power = battery.terminal_power(current_a=current, state=state) ``` -`RateMapBattery` also supports state advancement: +`RateMapBattery` also supports state advancement and endurance integration: ```python next_state = battery.step_power(power_w=power, dt_s=dt, state=state) +result = battery.integrate_power(state=state, power_w=power, dt_s=dt) ``` For the fixed-voltage model, `terminal_voltage` returns the configured voltage. @@ -209,11 +276,9 @@ dataset: battery = RateMapBattery.from_json(cell_path, series=4, parallel=2) ``` -The current implementation interpolates `OCV(dod)` and `R(dod)` directly. A -later calibration utility can derive these curves from manufacturer C-rate -discharge maps. Manufacturer discharge curves are usually terminal voltage -under load, so real datasets should document how `OCV(dod)` and `R(dod)` were -derived. +The current implementation interpolates `OCV(dod)` and `R(dod)` directly. +Manufacturer discharge curves are usually terminal voltage under load, so real +datasets should document how `OCV(dod)` and `R(dod)` were derived. ## Solver Integration @@ -242,7 +307,7 @@ $$ This keeps the propeller/motor equilibrium as a one-dimensional root solve because current remains a function of RPM through the propeller torque demand. -For mission simulation, evaluate each time step with the current state, compute +For mission simulation, evaluate each segment with the current state, compute pack current/power, then advance DOD: $$ @@ -251,7 +316,7 @@ $$ ## Implementation Status -The initial implementation includes: +The implementation includes: - `pythrust.battery.FixedVoltageBattery` - `pythrust.battery.RateMapBattery` @@ -259,6 +324,10 @@ The initial implementation includes: - JSON cell datasets with explicit series and parallel counts at load time - Solver integration through `solve_operating_point(..., battery_state=...)` - `OperatingPoint` battery outputs for voltage, current, C-rate, and efficiency +- SciPy-based integration for current, C-rate, power, voltage, resistance, and + internal power-loss modes +- Target-DOD integration, energy knockdown helpers, and power-profile + integration - A runnable rate-map mission example ## References diff --git a/docs/examples.md b/docs/examples.md index c3d9d9a..9aff472 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -82,6 +82,7 @@ It demonstrates: | `state_at_voltage` | Current required to hold a requested pack voltage | | `state_at_power` | Current and voltage for a requested pack power | | `state_at_load_resistance` | Battery behavior under a resistive load | +| `state_at_power_loss` | Battery behavior for a requested internal loss power | | Infeasible power | How the model reports a power limit | This example is intentionally independent from the propulsion solver. It @@ -97,8 +98,8 @@ PYTHONPATH=. python examples/rate_map_battery_mission.py This example couples `RateMapBattery` to `PropulsionSolver` over a short segment schedule. Each segment solves the propulsion operating point using the -current battery state, reports pack current and voltage, then advances state of -charge from the solved battery current. +current battery state, reports pack current and voltage, then integrates state +of charge from the solved battery current. It demonstrates: @@ -107,7 +108,7 @@ It demonstrates: | Load cell data | Use `data/batteries/example_liion_cell.json` with explicit series and parallel counts | | Solve segment | Pass `battery_state` into `solve_operating_point(...)` | | Read outputs | Inspect `battery_voltage_v` and `battery_current_a` on `OperatingPoint` | -| Advance state | Use `step_current(...)` to update SoC for the next segment | +| Advance state | Use `integrate_current(...)` to update SoC and delivered energy for the next segment | ![Rate-map battery mission simulation](images/rate_map_battery_mission.png) diff --git a/examples/rate_map_battery_mission.py b/examples/rate_map_battery_mission.py index 20a47ac..d61c6fa 100644 --- a/examples/rate_map_battery_mission.py +++ b/examples/rate_map_battery_mission.py @@ -1,7 +1,7 @@ """Simulate a simple mission with a rate-map battery. This example couples RateMapBattery to PropulsionSolver. Each segment solves -the propulsion operating point from the current battery state, then advances +the propulsion operating point from the current battery state, then integrates state of charge using the solved battery current. Usage:: @@ -83,13 +83,14 @@ def main(): reason = op.infeasible_reason or "unknown" raise SystemExit(f"Mission segment '{segment['name']}' is infeasible: {reason}") - next_state = battery.step_current( + battery_result = battery.integrate_current( state=state, current_a=op.battery_current_a, dt_s=segment["duration_s"], ) + next_state = battery_result.final_state - total_energy_wh += op.battery_power_w * segment["duration_s"] / 3600.0 + total_energy_wh += battery_result.delivered_energy_wh total_time_s += segment["duration_s"] print( diff --git a/examples/rate_map_battery_point_states.py b/examples/rate_map_battery_point_states.py index 711a966..1131f65 100644 --- a/examples/rate_map_battery_point_states.py +++ b/examples/rate_map_battery_point_states.py @@ -1,8 +1,8 @@ """Evaluate rate-map battery point states. This example loads a cell-level rate-map dataset, applies a 4S2P pack topology, -and evaluates the same battery state under current, C-rate, voltage, power, and -load-resistance requests. +and evaluates the same battery state under current, C-rate, voltage, power, +load-resistance, and internal-loss requests. Usage:: @@ -51,6 +51,7 @@ def main(): print_point("voltage 14 V", battery.state_at_voltage(state=state, voltage_v=14.0)) print_point("power 180 W", battery.state_at_power(state=state, power_w=180.0)) print_point("load 1.5 ohm", battery.state_at_load_resistance(state=state, resistance_ohm=1.5)) + print_point("loss 8 W", battery.state_at_power_loss(state=state, power_loss_w=8.0)) print_point("too much power", battery.state_at_power(state=state, power_w=3000.0)) next_state = battery.step_current(state=state, current_a=12.0, dt_s=60.0) diff --git a/pythrust/battery/rate_map.py b/pythrust/battery/rate_map.py index 152cc85..c1c1ff8 100644 --- a/pythrust/battery/rate_map.py +++ b/pythrust/battery/rate_map.py @@ -9,6 +9,7 @@ from typing import Callable, Sequence import numpy as np +from scipy.integrate import solve_ivp from .state import BatteryIntegrationResult, BatteryPoint, BatteryState @@ -234,106 +235,11 @@ def integrate_current( ) -> BatteryIntegrationResult: """Integrate battery state under constant pack current.""" self._validate_integration_inputs(current_a, dt_s, max_step_s, "current_a") - - elapsed_s = 0.0 - current_state = state - current_point = self.state_at_current(state=current_state, current_a=current_a) - histories = self._initial_integration_histories(current_state, current_point) - delivered_energy_wh = 0.0 - consumed_charge_ah = 0.0 - - if not current_point.is_feasible: - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason=current_point.infeasible_reason or "infeasible_state", - ) - if dt_s == 0.0: - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=True, - stop_reason="duration_complete", - ) - if current_a == 0.0: - self._append_integration_sample(histories, dt_s, current_state, current_point) - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=True, - stop_reason="duration_complete", - ) - - while elapsed_s < dt_s: - step_s = min(max_step_s, dt_s - elapsed_s) - time_to_dod_limit_s = self._time_to_dod_limit(current_state, current_a) - reaches_dod_limit = time_to_dod_limit_s <= step_s - if reaches_dod_limit: - step_s = time_to_dod_limit_s - if step_s <= 0.0: - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason="dod_limit", - ) - - next_state = self._advance_state_for_current(current_state, current_a, step_s) - next_point = self.state_at_current(state=next_state, current_a=current_a) - - if not next_point.is_feasible: - reason = next_point.infeasible_reason or "infeasible_state" - step_s, next_state, next_point = self._find_current_integration_boundary( - current_state, - current_a, - step_s, - ) - delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 - consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 - elapsed_s += step_s - self._append_integration_sample(histories, elapsed_s, next_state, next_point) - return self._integration_result( - final_state=next_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason=reason, - ) - - delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 - consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 - elapsed_s += step_s - self._append_integration_sample(histories, elapsed_s, next_state, next_point) - current_state = next_state - current_point = next_point - - if reaches_dod_limit and elapsed_s < dt_s: - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason="dod_limit", - ) - - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=True, - stop_reason="duration_complete", + return self._integrate_with_point_function( + state=state, + dt_s=dt_s, + max_step_s=max_step_s, + point_function=lambda current_state: self.state_at_current(current_state, current_a), ) def integrate_c_rate( @@ -364,107 +270,11 @@ def integrate_power( ) -> BatteryIntegrationResult: """Integrate battery state under constant pack power draw.""" self._validate_integration_inputs(power_w, dt_s, max_step_s, "power_w") - - elapsed_s = 0.0 - current_state = state - current_point = self.state_at_power(state=current_state, power_w=power_w) - histories = self._initial_integration_histories(current_state, current_point) - delivered_energy_wh = 0.0 - consumed_charge_ah = 0.0 - - if not current_point.is_feasible: - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason=current_point.infeasible_reason or "infeasible_state", - ) - if dt_s == 0.0: - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=True, - stop_reason="duration_complete", - ) - if power_w == 0.0: - self._append_integration_sample(histories, dt_s, current_state, current_point) - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=True, - stop_reason="duration_complete", - ) - - while elapsed_s < dt_s: - step_s = min(max_step_s, dt_s - elapsed_s) - time_to_dod_limit_s = self._time_to_dod_limit(current_state, current_point.current_a) - reaches_dod_limit = time_to_dod_limit_s <= step_s - if reaches_dod_limit: - step_s = time_to_dod_limit_s - if step_s <= 0.0: - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason="dod_limit", - ) - - next_state = self._advance_state_for_current(current_state, current_point.current_a, step_s) - next_point = self.state_at_power(state=next_state, power_w=power_w) - - if not next_point.is_feasible: - reason = next_point.infeasible_reason or "infeasible_state" - step_s, next_state, next_point = self._find_power_integration_boundary( - current_state, - current_point.current_a, - power_w, - step_s, - ) - delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 - consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 - elapsed_s += step_s - self._append_integration_sample(histories, elapsed_s, next_state, next_point) - return self._integration_result( - final_state=next_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason=reason, - ) - - delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 - consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 - elapsed_s += step_s - self._append_integration_sample(histories, elapsed_s, next_state, next_point) - current_state = next_state - current_point = next_point - - if reaches_dod_limit and elapsed_s < dt_s: - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason="dod_limit", - ) - - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=True, - stop_reason="duration_complete", + return self._integrate_with_point_function( + state=state, + dt_s=dt_s, + max_step_s=max_step_s, + point_function=lambda current_state: self.state_at_power(current_state, power_w), ) def integrate_voltage( @@ -811,106 +621,89 @@ def _integrate_with_point_function( max_step_s: float, point_function: Callable[[BatteryState], BatteryPoint], ) -> BatteryIntegrationResult: - elapsed_s = 0.0 - current_state = state - current_point = point_function(current_state) - histories = self._initial_integration_histories(current_state, current_point) - delivered_energy_wh = 0.0 - consumed_charge_ah = 0.0 + initial_point = point_function(state) + histories = self._initial_integration_histories(state, initial_point) - if not current_point.is_feasible: + if not initial_point.is_feasible: return self._integration_result( - final_state=current_state, + final_state=state, histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, + delivered_energy_wh=0.0, + consumed_charge_ah=0.0, is_feasible=False, - stop_reason=current_point.infeasible_reason or "infeasible_state", + stop_reason=initial_point.infeasible_reason or "infeasible_state", ) if dt_s == 0.0: return self._integration_result( - final_state=current_state, + final_state=state, histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, + delivered_energy_wh=0.0, + consumed_charge_ah=0.0, is_feasible=True, stop_reason="duration_complete", ) - if current_point.current_a == 0.0: - self._append_integration_sample(histories, dt_s, current_state, current_point) + if initial_point.current_a == 0.0: + self._append_integration_sample(histories, dt_s, state, initial_point) return self._integration_result( - final_state=current_state, + final_state=state, histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, + delivered_energy_wh=0.0, + consumed_charge_ah=0.0, is_feasible=True, stop_reason="duration_complete", ) - while elapsed_s < dt_s: - step_s = min(max_step_s, dt_s - elapsed_s) - time_to_dod_limit_s = self._time_to_dod_limit(current_state, current_point.current_a) - reaches_dod_limit = time_to_dod_limit_s <= step_s - if reaches_dod_limit: - step_s = time_to_dod_limit_s - if step_s <= 0.0: - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason="dod_limit", - ) - - next_state = self._advance_state_for_current(current_state, current_point.current_a, step_s) - next_point = point_function(next_state) - - if not next_point.is_feasible: - reason = next_point.infeasible_reason or "infeasible_state" - step_s, next_state, next_point = self._find_integration_boundary( - current_state, - current_point.current_a, - step_s, - point_function, - ) - delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 - consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 - elapsed_s += step_s - self._append_integration_sample(histories, elapsed_s, next_state, next_point) - return self._integration_result( - final_state=next_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason=reason, - ) - - delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 - consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 - elapsed_s += step_s - self._append_integration_sample(histories, elapsed_s, next_state, next_point) - current_state = next_state - current_point = next_point - - if reaches_dod_limit and elapsed_s < dt_s: - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason="dod_limit", - ) + def rhs(_time_s: float, values: np.ndarray) -> list[float]: + state_at_t = BatteryState.from_dod(self._clip_dod(float(values[0]))) + point = point_function(state_at_t) + return [ + point.cell_current_a / self.capacity_as, + point.power_w / 3600.0, + point.current_a / 3600.0, + ] + + events = self._time_integration_events(point_function) + solution = solve_ivp( + rhs, + (0.0, dt_s), + [state.dod, 0.0, 0.0], + events=events, + max_step=max_step_s, + rtol=1e-9, + atol=1e-11, + ) + if not solution.success: + raise RuntimeError(solution.message) + + histories = { + "time_s": [], + "dod": [], + "voltage_v": [], + "current_a": [], + "c_rate": [], + "power_w": [], + "efficiency": [], + } + for time_s, dod in zip(solution.t, solution.y[0]): + sample_state = BatteryState.from_dod(self._clip_dod(float(dod))) + sample_point = point_function(sample_state) + self._append_integration_sample(histories, float(time_s), sample_state, sample_point) + + final_state = BatteryState.from_dod(self._clip_dod(float(solution.y[0, -1]))) + final_point = point_function(final_state) + stop_reason = "duration_complete" + is_feasible = True + if solution.t[-1] < dt_s - 1e-9: + stop_reason = self._stop_reason(final_state, final_point) + is_feasible = False return self._integration_result( - final_state=current_state, + final_state=final_state, histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=True, - stop_reason="duration_complete", + delivered_energy_wh=float(solution.y[1, -1]), + consumed_charge_ah=float(solution.y[2, -1]), + is_feasible=is_feasible, + stop_reason=stop_reason, ) def _integrate_to_dod( @@ -925,133 +718,81 @@ def _integrate_to_dod( if dod_final <= state.dod: raise ValueError("dod_final must be greater than the initial DOD") - elapsed_s = 0.0 - current_state = state - current_point = point_function(current_state) - histories = self._initial_integration_histories(current_state, current_point) - delivered_energy_wh = 0.0 - consumed_charge_ah = 0.0 + initial_point = point_function(state) + histories = self._initial_integration_histories(state, initial_point) - if not current_point.is_feasible: + if not initial_point.is_feasible: return self._integration_result( - final_state=current_state, + final_state=state, histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, + delivered_energy_wh=0.0, + consumed_charge_ah=0.0, is_feasible=False, - stop_reason=current_point.infeasible_reason or "infeasible_state", + stop_reason=initial_point.infeasible_reason or "infeasible_state", ) - if current_point.current_a <= 0.0: + if initial_point.current_a <= 0.0: return self._integration_result( - final_state=current_state, + final_state=state, histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, + delivered_energy_wh=0.0, + consumed_charge_ah=0.0, is_feasible=False, stop_reason="zero_discharge_current", ) - while current_state.dod < dod_final: - cell_current_a = current_point.current_a / self.parallel - if cell_current_a <= 0.0: - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason="zero_discharge_current", - ) - - time_to_target_s = (dod_final - current_state.dod) * self.capacity_as / cell_current_a - step_s = min(max_step_s, time_to_target_s) - if step_s <= 0.0: - final_state = BatteryState.from_dod(dod_final) - final_point = point_function(final_state) - self._append_integration_sample(histories, elapsed_s, final_state, final_point) - return self._integration_result( - final_state=final_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=True, - stop_reason="dod_target", - ) - - reaches_target = time_to_target_s <= max_step_s - next_state = BatteryState.from_dod(dod_final) if reaches_target else self._advance_state_for_current( - current_state, - current_point.current_a, - step_s, - ) - next_point = point_function(next_state) - - if not next_point.is_feasible: - reason = next_point.infeasible_reason or "infeasible_state" - step_s, next_state, next_point = self._find_integration_boundary( - current_state, - current_point.current_a, - step_s, - point_function, - ) - delivered_energy_wh += self._trapezoid(current_point.power_w, next_point.power_w, step_s) / 3600.0 - consumed_charge_ah += self._trapezoid(current_point.current_a, next_point.current_a, step_s) / 3600.0 - elapsed_s += step_s - self._append_integration_sample(histories, elapsed_s, next_state, next_point) - return self._integration_result( - final_state=next_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason=reason, - ) - - dod_width = next_state.dod - current_state.dod - next_cell_current_a = next_point.current_a / self.parallel - if next_cell_current_a <= 0.0: - return self._integration_result( - final_state=current_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=False, - stop_reason="zero_discharge_current", - ) - - elapsed_s += self._trapezoid( - self.capacity_as / cell_current_a, - self.capacity_as / next_cell_current_a, - dod_width, - ) - delivered_energy_wh += ( - self._trapezoid(current_point.terminal_voltage_v, next_point.terminal_voltage_v, dod_width) - * self.capacity_ah - * self.parallel - ) - consumed_charge_ah += self.capacity_ah * self.parallel * dod_width - self._append_integration_sample(histories, elapsed_s, next_state, next_point) - - if reaches_target: - return self._integration_result( - final_state=next_state, - histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=True, - stop_reason="dod_target", - ) - - current_state = next_state - current_point = next_point + def rhs(dod: float, _values: np.ndarray) -> list[float]: + state_at_dod = BatteryState.from_dod(self._clip_dod(dod)) + point = point_function(state_at_dod) + if point.cell_current_a <= 0.0: + return [0.0, 0.0, 0.0] + return [ + self.capacity_as / point.cell_current_a, + point.terminal_voltage_v * self.capacity_ah * self.parallel, + self.capacity_ah * self.parallel, + ] + + events = self._dod_integration_events(point_function) + solution = solve_ivp( + rhs, + (state.dod, dod_final), + [0.0, 0.0, 0.0], + events=events, + max_step=min(0.01, dod_final - state.dod), + rtol=1e-9, + atol=1e-11, + ) + if not solution.success: + raise RuntimeError(solution.message) + + histories = { + "time_s": [], + "dod": [], + "voltage_v": [], + "current_a": [], + "c_rate": [], + "power_w": [], + "efficiency": [], + } + for dod, time_s in zip(solution.t, solution.y[0]): + sample_state = BatteryState.from_dod(self._clip_dod(float(dod))) + sample_point = point_function(sample_state) + self._append_integration_sample(histories, float(time_s), sample_state, sample_point) + + final_state = BatteryState.from_dod(self._clip_dod(float(solution.t[-1]))) + final_point = point_function(final_state) + stop_reason = "dod_target" + is_feasible = True + if solution.t[-1] < dod_final - 1e-10: + stop_reason = self._stop_reason(final_state, final_point) + is_feasible = False return self._integration_result( - final_state=current_state, + final_state=final_state, histories=histories, - delivered_energy_wh=delivered_energy_wh, - consumed_charge_ah=consumed_charge_ah, - is_feasible=True, - stop_reason="dod_target", + delivered_energy_wh=float(solution.y[1, -1]), + consumed_charge_ah=float(solution.y[2, -1]), + is_feasible=is_feasible, + stop_reason=stop_reason, ) def _solve_dod(self, residual: Callable[[float], float]) -> float: @@ -1081,6 +822,53 @@ def _solve_dod(self, residual: Callable[[float], float]) -> float: return 0.5 * (low + high) + def _time_integration_events( + self, + point_function: Callable[[BatteryState], BatteryPoint], + ) -> list[Callable[[float, np.ndarray], float]]: + def event(_time_s: float, values: np.ndarray) -> float: + dod = float(values[0]) + state = BatteryState.from_dod(self._clip_dod(dod)) + point = point_function(state) + return self._feasibility_margin(dod, point) + + event.terminal = True # type: ignore[attr-defined] + event.direction = -1 # type: ignore[attr-defined] + return [event] + + def _dod_integration_events( + self, + point_function: Callable[[BatteryState], BatteryPoint], + ) -> list[Callable[[float, np.ndarray], float]]: + def event(dod: float, _values: np.ndarray) -> float: + state = BatteryState.from_dod(self._clip_dod(dod)) + point = point_function(state) + return self._feasibility_margin(dod, point) + + event.terminal = True # type: ignore[attr-defined] + event.direction = -1 # type: ignore[attr-defined] + return [event] + + def _feasibility_margin(self, dod: float, point: BatteryPoint) -> float: + return min( + self.max_current_a - point.cell_current_a, + point.cell_voltage_v - self.cutoff_voltage_v, + self.charge_voltage_v - point.cell_voltage_v, + 1.0 - dod, + ) + + def _stop_reason(self, state: BatteryState, point: BatteryPoint) -> str: + if not point.is_feasible: + return point.infeasible_reason or "infeasible_state" + + margins = { + "current_limit": self.max_current_a - point.cell_current_a, + "voltage_cutoff": point.cell_voltage_v - self.cutoff_voltage_v, + "voltage_limit": self.charge_voltage_v - point.cell_voltage_v, + "dod_limit": 1.0 - state.dod, + } + return min(margins, key=lambda name: abs(margins[name])) + @staticmethod def _validate_integration_inputs(load: float, dt_s: float, max_step_s: float, load_name: str) -> None: if not math.isfinite(load) or load < 0.0: From a6a387549ae79f2f62a3bc349cdf1b17f351b9df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=BCseyin=20Karakaya?= Date: Sun, 21 Jun 2026 17:29:12 +0300 Subject: [PATCH 7/8] feat(battery): update section title and clarify implementation details --- docs/battery_model.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/battery_model.md b/docs/battery_model.md index 372ad25..e275958 100644 --- a/docs/battery_model.md +++ b/docs/battery_model.md @@ -314,9 +314,9 @@ $$ x_{next} = x + \frac{I_{cell}}{Q_{cell}} \Delta t $$ -## Implementation Status +## Feature Summary -The implementation includes: +The battery package provides: - `pythrust.battery.FixedVoltageBattery` - `pythrust.battery.RateMapBattery` From 76c871b37fbadd80be79fcf12362b60d1df83168 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=BCseyin=20Karakaya?= Date: Sun, 21 Jun 2026 17:33:41 +0300 Subject: [PATCH 8/8] fix(battery): clean up adaptive integration helpers --- pythrust/battery/rate_map.py | 95 ++---------------------------------- 1 file changed, 4 insertions(+), 91 deletions(-) diff --git a/pythrust/battery/rate_map.py b/pythrust/battery/rate_map.py index c1c1ff8..94645a6 100644 --- a/pythrust/battery/rate_map.py +++ b/pythrust/battery/rate_map.py @@ -593,10 +593,6 @@ def _point_from_cell_state( def _clip_dod(dod: float) -> float: return min(1.0, max(0.0, float(dod))) - @staticmethod - def _trapezoid(left: float, right: float, width: float) -> float: - return 0.5 * (left + right) * width - def _reversible_energy_wh(self, dod_initial: float, dod_final: float, *, samples: int) -> float: dod_values = np.linspace(dod_initial, dod_final, samples) ocv_values = np.array([self.ocv(float(dod)) for dod in dod_values]) @@ -740,6 +736,8 @@ def _integrate_to_dod( stop_reason="zero_discharge_current", ) + max_dod_step = max_step_s * initial_point.cell_current_a / self.capacity_as + def rhs(dod: float, _values: np.ndarray) -> list[float]: state_at_dod = BatteryState.from_dod(self._clip_dod(dod)) point = point_function(state_at_dod) @@ -757,7 +755,7 @@ def rhs(dod: float, _values: np.ndarray) -> list[float]: (state.dod, dod_final), [0.0, 0.0, 0.0], events=events, - max_step=min(0.01, dod_final - state.dod), + max_step=min(max_dod_step, dod_final - state.dod), rtol=1e-9, atol=1e-11, ) @@ -867,7 +865,7 @@ def _stop_reason(self, state: BatteryState, point: BatteryPoint) -> str: "voltage_limit": self.charge_voltage_v - point.cell_voltage_v, "dod_limit": 1.0 - state.dod, } - return min(margins, key=lambda name: abs(margins[name])) + return min(margins, key=lambda name: margins[name]) @staticmethod def _validate_integration_inputs(load: float, dt_s: float, max_step_s: float, load_name: str) -> None: @@ -887,91 +885,6 @@ def _validate_dod_interval(dod_initial: float, dod_final: float) -> None: if dod_final <= dod_initial: raise ValueError("dod_final must be greater than dod_initial") - def _advance_state_for_current(self, state: BatteryState, current_a: float, dt_s: float) -> BatteryState: - cell_current_a = current_a / self.parallel - dod_next = state.dod + cell_current_a * dt_s / self.capacity_as - return BatteryState.from_dod(self._clip_dod(dod_next)) - - def _time_to_dod_limit(self, state: BatteryState, current_a: float) -> float: - cell_current_a = current_a / self.parallel - if cell_current_a <= 0.0: - return math.inf - return max(0.0, (1.0 - state.dod) * self.capacity_as / cell_current_a) - - def _find_current_integration_boundary( - self, - state: BatteryState, - current_a: float, - step_s: float, - ) -> tuple[float, BatteryState, BatteryPoint]: - low_s = 0.0 - high_s = step_s - boundary_state = state - boundary_point = self.state_at_current(state=state, current_a=current_a) - - for _ in range(40): - mid_s = 0.5 * (low_s + high_s) - mid_state = self._advance_state_for_current(state, current_a, mid_s) - mid_point = self.state_at_current(state=mid_state, current_a=current_a) - if mid_point.is_feasible: - low_s = mid_s - boundary_state = mid_state - boundary_point = mid_point - else: - high_s = mid_s - - return low_s, boundary_state, boundary_point - - def _find_power_integration_boundary( - self, - state: BatteryState, - current_a: float, - power_w: float, - step_s: float, - ) -> tuple[float, BatteryState, BatteryPoint]: - low_s = 0.0 - high_s = step_s - boundary_state = state - boundary_point = self.state_at_power(state=state, power_w=power_w) - - for _ in range(40): - mid_s = 0.5 * (low_s + high_s) - mid_state = self._advance_state_for_current(state, current_a, mid_s) - mid_point = self.state_at_power(state=mid_state, power_w=power_w) - if mid_point.is_feasible: - low_s = mid_s - boundary_state = mid_state - boundary_point = mid_point - else: - high_s = mid_s - - return low_s, boundary_state, boundary_point - - def _find_integration_boundary( - self, - state: BatteryState, - current_a: float, - step_s: float, - point_function: Callable[[BatteryState], BatteryPoint], - ) -> tuple[float, BatteryState, BatteryPoint]: - low_s = 0.0 - high_s = step_s - boundary_state = state - boundary_point = point_function(state) - - for _ in range(40): - mid_s = 0.5 * (low_s + high_s) - mid_state = self._advance_state_for_current(state, current_a, mid_s) - mid_point = point_function(mid_state) - if mid_point.is_feasible: - low_s = mid_s - boundary_state = mid_state - boundary_point = mid_point - else: - high_s = mid_s - - return low_s, boundary_state, boundary_point - @staticmethod def _initial_integration_histories(state: BatteryState, point: BatteryPoint) -> dict[str, list[float]]: return {