diff --git a/config.ini.example b/config.ini.example index 280cd004..62069784 100644 --- a/config.ini.example +++ b/config.ini.example @@ -50,20 +50,6 @@ THROTTLE_INTERVAL = 0 ## relayed and batteries decide on their own. #ACTIVE_CONTROL = True -## --- Smoothing --- -## EMA alpha for the grid reading (0.0-1.0). Higher = tracks load changes faster; -## lower = filters noise but adds lag that can cause overshoot with multiple batteries. -## The battery's own ramp rate already filters noise, so 0.9 works well when the -## powermeter updates at >= 1 Hz. Reduce toward 0.3 for slower powermeters. -#SMOOTH_TARGET_ALPHA = 0.9 -## When |grid total| is within this band (W), the smoothed target decays toward zero -## instead of chasing noise. Keeps batteries from hunting around the zero-crossing. -## 10-30 W is sensible; 0 disables. -#DEADBAND = 20 -## Maximum watts the smoothed target may change per cycle. Acts as a slew-rate limit. -## 0 disables (default). Rarely needed at high alpha. -#MAX_SMOOTH_STEP = 0 - ## --- Fair distribution (multi-battery balancing) --- ## Adjust each battery's target so they share the load evenly. ## Only matters with 2+ batteries. @@ -149,6 +135,14 @@ THROTTLE_INTERVAL = 0 ## Per-powermeter throttling override (optional) ## Shelly devices are typically fast, so throttling may not be needed #THROTTLE_INTERVAL = 0 +## Per-powermeter smoothing (optional, defaults come from [GENERAL]) +## EMA alpha (0.0-1.0). Higher = tracks faster; lower = filters noise. +## 0.9 works well at >= 1 Hz; reduce toward 0.3 for slower powermeters. +#SMOOTH_TARGET_ALPHA = 0.9 +## Return zeros when |grid total| < DEADBAND (W). 10-30 W sensible; 0 disables. +#DEADBAND = 20 +## Max watts the smoothed target may change per cycle (slew-rate limit). 0 disables. +#MAX_SMOOTH_STEP = 0 #[TASMOTA] #IP = 192.168.1.101 diff --git a/src/astrameter/config/config_loader.py b/src/astrameter/config/config_loader.py index 630cbacf..3f49cfd1 100644 --- a/src/astrameter/config/config_loader.py +++ b/src/astrameter/config/config_loader.py @@ -37,6 +37,10 @@ VZLogger, parse_sml_obis_config, ) +from astrameter.powermeter.wrappers.smoothing import ( + DeadbandPowermeter, + SmoothedPowermeter, +) SHELLY_SECTION = "SHELLY" TASMOTA_SECTION = "TASMOTA" @@ -153,6 +157,11 @@ def read_all_powermeter_configs( global_wait_for_next_message = config.getboolean( "GENERAL", "WAIT_FOR_NEXT_MESSAGE", fallback=True ) + global_smooth_alpha = config.getfloat( + "GENERAL", "SMOOTH_TARGET_ALPHA", fallback=0.0 + ) + global_max_smooth_step = config.getfloat("GENERAL", "MAX_SMOOTH_STEP", fallback=0.0) + global_deadband = config.getfloat("GENERAL", "DEADBAND", fallback=0.0) global_pid_kp = config.getfloat("GENERAL", "PID_KP", fallback=0.0) global_pid_ki = config.getfloat("GENERAL", "PID_KI", fallback=0.0) global_pid_kd = config.getfloat("GENERAL", "PID_KD", fallback=0.0) @@ -199,6 +208,49 @@ def read_all_powermeter_configs( ) powermeter = ThrottledPowermeter(powermeter, section_throttle_interval) + section_smooth_alpha = config.getfloat( + section, "SMOOTH_TARGET_ALPHA", fallback=global_smooth_alpha + ) + if section_smooth_alpha > 0: + section_smooth_alpha = max(0.01, min(1.0, section_smooth_alpha)) + section_max_smooth_step = config.getfloat( + section, "MAX_SMOOTH_STEP", fallback=global_max_smooth_step + ) + smooth_source = ( + "section-specific" + if config.has_option(section, "SMOOTH_TARGET_ALPHA") + else "global" + ) + logger.info( + "Applying %s EMA smoothing (alpha=%.2f, max_step=%.0f) to %s", + smooth_source, + section_smooth_alpha, + section_max_smooth_step, + section, + ) + powermeter = SmoothedPowermeter( + powermeter, + alpha=section_smooth_alpha, + max_step=section_max_smooth_step, + ) + + section_deadband = config.getfloat( + section, "DEADBAND", fallback=global_deadband + ) + if section_deadband > 0: + deadband_source = ( + "section-specific" + if config.has_option(section, "DEADBAND") + else "global" + ) + logger.info( + "Applying %s deadband (%.0fW) to %s", + deadband_source, + section_deadband, + section, + ) + powermeter = DeadbandPowermeter(powermeter, deadband=section_deadband) + section_pid_kp = config.getfloat(section, "PID_KP", fallback=global_pid_kp) if section_pid_kp > 0: pid_source = ( diff --git a/src/astrameter/ct002/balancer.py b/src/astrameter/ct002/balancer.py index 30cfa853..2fdb7288 100644 --- a/src/astrameter/ct002/balancer.py +++ b/src/astrameter/ct002/balancer.py @@ -10,7 +10,6 @@ from astrameter.config.logger import logger from .protocol import parse_int -from .smoother import TargetSmoother EFFICIENCY_HYSTERESIS_FACTOR = 1.2 # Seconds to suppress saturation checks after a battery is promoted from @@ -285,7 +284,7 @@ def __init__( *, saturation_enabled: bool = True, clock: Callable[[], float] | None = None, - smoother: TargetSmoother | None = None, + reset_fn: Callable[[], None] | None = None, ) -> None: self._clock = clock or time.time self._cfg = config @@ -298,10 +297,10 @@ def __init__( clock=self._clock, ) self._saturation_grace_seconds = max(0.0, saturation_grace_seconds) - # Optional: the meter smoother is reseeded after every probe - # commit / rejection so post-handoff state cannot drag in a - # stale pre-probe EMA value. Injected by CT002 at construction. - self._smoother = smoother + # Optional: called after every probe commit / rejection so + # post-handoff state cannot drag in stale pre-probe EMA values. + # Injected by CT002 at construction. + self._reset_fn = reset_fn self._consumers: dict[str, BalancerConsumerState] = {} self._deprioritized: set[str] = set() self._priority: list[str] = [] @@ -425,7 +424,7 @@ def _commit_probe(self, reports: dict, now: float, actual: float) -> None: actual, ) self._invalidate_efficiency_cache() - # Reseed the meter smoother so the post-handoff balance runs + # Reset powermeter wrapper state so the post-handoff balance runs # against a fresh baseline instead of an EMA that still carries # pre-probe state (including the transient zero-crossing that # happens while the candidate ramps up and the backup drops out). @@ -434,12 +433,12 @@ def _commit_probe(self, reports: dict, now: float, actual: float) -> None: # ``_resolve_probe_state`` which is called from # ``_compute_efficiency_deprioritized`` from # ``_compute_auto_target`` — the current ``compute_target`` call - # has already captured ``smoothed_target`` as a parameter, so - # the reseed here does NOT affect the current tick's target. - # It only affects the NEXT ``_compute_smooth_target`` call in - # :class:`CT002`, which is the desired semantics. - if self._smoother is not None: - self._smoother.reseed() + # has already captured ``grid_total`` as a parameter, so the + # reset here does NOT affect the current tick's target. It only + # affects the NEXT powermeter reading, which is the desired + # semantics. + if self._reset_fn is not None: + self._reset_fn() def _reject_probe(self, now: float, reason: str) -> None: probe = self._probe_state @@ -469,11 +468,11 @@ def _reject_probe(self, now: float, reason: str) -> None: self._invalidate_efficiency_cache() # See _commit_probe — same rationale: force a fresh baseline # after the probe window ends. - if self._smoother is not None: - self._smoother.reseed() + if self._reset_fn is not None: + self._reset_fn() def _resolve_probe_state( - self, reports: dict, now: float, smoothed_target: float + self, reports: dict, now: float, grid_total: float ) -> bool: probe = self._probe_state if probe is None: @@ -488,7 +487,7 @@ def _resolve_probe_state( actual = parse_int(reports.get(probe.candidate_id, {}).get("power", 0)) desired_total = ( sum(parse_int(report.get("power", 0)) for report in reports.values()) - + smoothed_target + + grid_total ) probe_success_threshold = self._probe_success_threshold demand_sign = 1 if desired_total > 0 else -1 if desired_total < 0 else 0 @@ -536,7 +535,7 @@ def _compute_probe_target( self, consumer_id: str | None, reports: dict, - smoothed_target: float, + grid_total: float, eff_part: dict[str, float], ) -> list[float] | None: probe = self._probe_state @@ -558,7 +557,7 @@ def _compute_probe_target( desired_total = ( sum(parse_int(report.get("power", 0)) for report in reports.values()) - + smoothed_target + + grid_total ) state = self._get_consumer(consumer_id) probe_actual = parse_int(reports.get(candidate_id, {}).get("power", 0)) @@ -614,8 +613,7 @@ def compute_target( consumer_id: str | None, consumer_mode: ConsumerMode, all_reports: dict, - smoothed_target: float, - raw_total: float, + grid_total: float, inactive: frozenset[str], manual: frozenset[str], sample_id: tuple = (), @@ -675,9 +673,7 @@ def compute_target( # Auto-pool reports (exclude manual consumers) reports = {cid: r for cid, r in active_reports.items() if cid not in manual} - return self._compute_auto_target( - consumer_id, reports, smoothed_target, raw_total, sample_id - ) + return self._compute_auto_target(consumer_id, reports, grid_total, sample_id) # ------------------------------------------------------------------ # Lifecycle @@ -808,8 +804,7 @@ def _compute_auto_target( self, consumer_id: str | None, reports: dict, - smoothed_target: float, - raw_total: float, + grid_total: float, sample_id: tuple = (), ) -> list[float]: """Automatic allocation for auto-pool consumers.""" @@ -818,7 +813,7 @@ def _compute_auto_target( eff_part = {cid: max(0.01, 1.0 - saturation.get(cid, 0.0)) for cid in reports} efficiency_adjustments = self._compute_efficiency_deprioritized( - reports, sample_id, smoothed_target + reports, sample_id, grid_total ) faded_adjustments = self._fade_efficiency_weights( efficiency_adjustments, set(reports.keys()) @@ -826,7 +821,7 @@ def _compute_auto_target( any_fading = any(0.0 < w < 1.0 for w in faded_adjustments.values()) probe_target = self._compute_probe_target( - consumer_id, reports, smoothed_target, eff_part + consumer_id, reports, grid_total, eff_part ) if probe_target is not None: return probe_target @@ -842,7 +837,7 @@ def _compute_auto_target( total_battery = sum( parse_int(reports.get(cid, {}).get("power", 0)) for cid in reports ) - demand = total_battery + smoothed_target + demand = total_battery + grid_total total_fade = sum(self._get_consumer(cid).fade_weight for cid in reports) desired = demand * fade_w / total_fade if total_fade > 0 else 0.0 target = desired - reported @@ -864,9 +859,9 @@ def _compute_auto_target( total_effective = sum(eff_part.values()) fair_share = ( - (smoothed_target / total_effective) * eff_part.get(consumer_id, 1.0) + (grid_total / total_effective) * eff_part.get(consumer_id, 1.0) if consumer_id and consumer_id in reports - else smoothed_target / num_consumers + else grid_total / num_consumers ) cfg = self._cfg @@ -874,7 +869,6 @@ def _compute_auto_target( not cfg.fair_distribution or consumer_id is None or consumer_id not in reports - or (cfg.deadband > 0 and abs(raw_total) < cfg.deadband) ): target = fair_share elif consumer_id in eff_part: @@ -884,8 +878,9 @@ def _compute_auto_target( else: target = fair_share - # Clamp sign disagreement - if (raw_total < 0 and target > 0) or (raw_total > 0 and target < 0): + # Clamp sign disagreement: prevent the inverter from acting + # against the current grid direction. + if (grid_total < 0 and target > 0) or (grid_total > 0 and target < 0): target = 0 if consumer_id: @@ -938,7 +933,7 @@ def _balance_correction( # ------------------------------------------------------------------ def _compute_efficiency_deprioritized( - self, reports: dict, sample_id: tuple, smoothed_target: float + self, reports: dict, sample_id: tuple, grid_total: float ) -> dict[str, float]: """Decide which consumers to deprioritize for efficiency.""" cfg = self._cfg @@ -964,7 +959,7 @@ def _compute_efficiency_deprioritized( 0, min(len(self._priority), len(self._priority) - len(self._deprioritized)) ) previous_active = tuple(self._priority[:prev_slots]) - probe_resolved = self._resolve_probe_state(reports, now, smoothed_target) + probe_resolved = self._resolve_probe_state(reports, now, grid_total) probe_active = self._probe_state is not None # Rotation check BEFORE cache @@ -1003,7 +998,7 @@ def _compute_efficiency_deprioritized( total_battery_power = sum( parse_int(reports.get(cid, {}).get("power", 0)) for cid in self._priority ) - abs_target = abs(total_battery_power + smoothed_target) + abs_target = abs(total_battery_power + grid_total) n = len(self._priority) per_consumer = abs_target / n diff --git a/src/astrameter/ct002/ct002.py b/src/astrameter/ct002/ct002.py index 84e94b93..8b6ea5e4 100644 --- a/src/astrameter/ct002/ct002.py +++ b/src/astrameter/ct002/ct002.py @@ -30,7 +30,6 @@ parse_int, parse_request, ) -from .smoother import TargetSmoother # Re-export protocol symbols for backward compatibility __all__ = [ @@ -104,8 +103,6 @@ def __init__( consumer_ttl=120, debug_status=False, active_control=True, - smooth_target_alpha=0.9, - max_smooth_step=0, fair_distribution=True, balance_gain=0.2, error_boost_threshold=150, @@ -128,6 +125,7 @@ def __init__( saturation_stall_timeout_seconds=SATURATION_STALL_TIMEOUT_SECONDS, device_id="", clock=None, + reset_fn=None, ): self.udp_port = udp_port self.ct_mac = ct_mac @@ -159,11 +157,7 @@ def __init__( self._before_send_last_warn: float = 0.0 # Composed components - self._smoother = TargetSmoother( - alpha=max(0.01, min(1.0, smooth_target_alpha)), - max_step=max(0, max_smooth_step), - deadband=max(0, deadband), - ) + self._last_smooth_target: float = 0.0 self._balancer = LoadBalancer( config=BalancerConfig( fair_distribution=fair_distribution, @@ -188,7 +182,7 @@ def __init__( saturation_stall_timeout_seconds=saturation_stall_timeout_seconds, saturation_enabled=saturation_detection, clock=clock, - smoother=self._smoother, + reset_fn=reset_fn, ) def _consumer_key(self, addr, fields): @@ -333,9 +327,9 @@ def _compute_smooth_target(self, values, consumer_id=None): if not self.active_control or not values or len(values) != 3: return values - raw_total = sum(parse_int(v, 0) for v in values) + total = sum(parse_int(v, 0) for v in values) + self._last_smooth_target = total sample_id = tuple(values) - smoothed = self._smoother.update(raw_total, sample_id) mode = self._consumer_mode(consumer_id) reports = { @@ -352,8 +346,7 @@ def _compute_smooth_target(self, values, consumer_id=None): consumer_id, mode, reports, - smoothed, - raw_total, + total, inactive, manual, sample_id, @@ -656,9 +649,7 @@ async def _handle_request(self, data, addr, transport): "active": is_active, "poll_interval": consumer.poll_interval if consumer else None, "last_seen": datetime.now(timezone.utc).isoformat(), - "smooth_target": self._smoother.value - if self._smoother.value is not None - else 0.0, + "smooth_target": self._last_smooth_target, "manual_target": consumer.manual_target if consumer else None, "auto_target": not consumer.manual_enabled if consumer else True, "active_control": self.active_control, diff --git a/src/astrameter/ct002/smoother.py b/src/astrameter/ct002/smoother.py deleted file mode 100644 index 43992124..00000000 --- a/src/astrameter/ct002/smoother.py +++ /dev/null @@ -1,120 +0,0 @@ -"""EMA-based target smoother with deadband and sign-change catchup.""" - -from __future__ import annotations - -from astrameter.config.logger import logger - - -class TargetSmoother: - """Exponential moving average smoother with deadband and step limiting. - - Smooths a noisy input signal using EMA. Provides faster catchup when - the sign of the input flips (e.g. grid switches from import to export) - and respects a deadband around zero. - - Multiple consumers polling within a single meter cycle must not - compound the EMA update. Dedup is keyed on the tuple identity of - *sample_id* **and** the actual *raw_total*: if ``raw_total`` differs - between two calls we always accept the new value, even when the - caller happens to reuse the same ``sample_id``. This prevents a - contrived caller from masking fresh readings, and also means that a - stale push-based powermeter cannot silently freeze the smoother at - an arbitrary past value — the smoother can still be forcibly - reseeded via :meth:`reseed`. - """ - - def __init__( - self, - alpha: float, - max_step: float = 0, - deadband: float = 0, - ) -> None: - self._alpha = alpha - self._max_step = max_step - self._deadband = deadband - self._value: float | None = None - self._last_sample: tuple | None = None - self._last_raw_total: float | None = None - - @property - def value(self) -> float | None: - """Current smoothed value, or ``None`` if no samples yet.""" - return self._value - - def reseed(self) -> None: - """Clear all smoother state. - - The next :meth:`update` call will seed ``_value`` directly from - the caller's ``raw_total`` — bypassing the EMA entirely so that - post-event state catches up in a single step. Used after - efficiency-rotation probe handoffs where the balancer needs a - fresh baseline and any residual EMA state would drag in stale - pre-handoff readings. - """ - logger.debug("TargetSmoother: reseed (previous value=%s)", self._value) - self._value = None - self._last_sample = None - self._last_raw_total = None - - def update(self, raw_total: float, sample_id: tuple) -> float: - """Smooth *raw_total*. - - Dedup fires only when **both** ``sample_id`` and ``raw_total`` - are unchanged; multi-consumer polls within one meter tick will - therefore still coalesce (they share the same ``sample_id`` - and ``raw_total``), but a fresh ``raw_total`` is never lost to - a stale dedup key. - - Returns the current smoothed value. - """ - if self._value is None: - self._value = raw_total - self._last_sample = sample_id - self._last_raw_total = raw_total - logger.debug( - "TargetSmoother: seed value=%.2f (raw=%.2f)", - self._value, - raw_total, - ) - return self._value - - # ``raw_total == self._last_raw_total`` uses exact equality on - # a float, which would normally be fragile. It's safe here - # because the production caller is - # :meth:`astrameter.ct002.ct002.CT002._compute_smooth_target` - # which computes ``raw_total = sum(parse_int(v, 0) for v in values)`` - # — all ints, so equality is exact. Tests pass floats but - # reuse the same value without intervening arithmetic, so - # equality is also exact there. If a future caller starts - # feeding computed floats through this path, swap to - # ``math.isclose`` and expect to justify the tolerance. - if sample_id == self._last_sample and raw_total == self._last_raw_total: - logger.debug( - "TargetSmoother: dedup hit (raw=%.2f value=%.2f)", - raw_total, - self._value, - ) - return self._value - - self._last_sample = sample_id - self._last_raw_total = raw_total - - if self._deadband > 0 and abs(raw_total) < self._deadband: - delta = -self._alpha * self._value - else: - catchup_alpha = self._alpha - if (raw_total > 0) != (self._value > 0): - catchup_alpha = min(0.5, self._alpha * 4) - delta = catchup_alpha * (raw_total - self._value) - if self._max_step > 0: - delta = max(-self._max_step, min(self._max_step, delta)) - prev = self._value - self._value += delta - logger.debug( - "TargetSmoother: update raw=%.2f prev=%.2f delta=%.2f new=%.2f", - raw_total, - prev, - delta, - self._value, - ) - return self._value diff --git a/src/astrameter/main.py b/src/astrameter/main.py index 6e6c2171..2dca93db 100644 --- a/src/astrameter/main.py +++ b/src/astrameter/main.py @@ -4,6 +4,7 @@ import os import signal from collections import OrderedDict +from collections.abc import Sequence from astrameter.config.config_loader import ( ClientFilter, @@ -103,6 +104,13 @@ async def test_powermeter(powermeter: Powermeter, client_filter: ClientFilter): ) from e +def _reset_all_powermeters( + powermeters: Sequence[tuple[Powermeter, object, object]], +) -> None: + for pm, *_ in powermeters: + pm.reset() + + async def run_device( device_type: str, cfg: configparser.ConfigParser, @@ -129,10 +137,6 @@ async def run_device( if os.environ.get("DEBUG_STATUS", "").lower() in ("1", "true", "yes"): debug_status = True active_control = cfg.getboolean(ct_section, "ACTIVE_CONTROL", fallback=True) - smooth_target_alpha = cfg.getfloat( - ct_section, "SMOOTH_TARGET_ALPHA", fallback=0.9 - ) - max_smooth_step = cfg.getint(ct_section, "MAX_SMOOTH_STEP", fallback=0) fair_distribution = cfg.getboolean( ct_section, "FAIR_DISTRIBUTION", fallback=True ) @@ -200,8 +204,7 @@ async def run_device( if min_efficient_power > 0: extras.append(f"efficiency optimization ({min_efficient_power}W)") logger.info( - "Active control enabled (alpha=%.2f): smooth target + load split%s", - smooth_target_alpha, + "Active control enabled: load split%s", " + " + " + ".join(extras) if extras else "", ) @@ -214,8 +217,6 @@ async def run_device( consumer_ttl=consumer_ttl, debug_status=debug_status, active_control=active_control, - smooth_target_alpha=smooth_target_alpha, - max_smooth_step=max_smooth_step, fair_distribution=fair_distribution, balance_gain=balance_gain, error_boost_threshold=error_boost_threshold, @@ -237,6 +238,7 @@ async def run_device( efficiency_saturation_threshold=efficiency_saturation_threshold, saturation_decay_factor=saturation_decay_factor, device_id=device_id or "", + reset_fn=lambda: _reset_all_powermeters(powermeters), ) async def update_readings(addr, _fields=None, _consumer_id=None): diff --git a/src/astrameter/powermeter/__init__.py b/src/astrameter/powermeter/__init__.py index 1171496f..437dee1e 100644 --- a/src/astrameter/powermeter/__init__.py +++ b/src/astrameter/powermeter/__init__.py @@ -8,20 +8,26 @@ from .json_http import JsonHttpPowermeter from .modbus import ModbusPowermeter from .mqtt import MqttPowermeter -from .pid import PidPowermeter from .script import Script from .shelly import Shelly, Shelly1PM, Shelly3EM, Shelly3EMPro, ShellyEM, ShellyPlus1PM from .shrdzm import Shrdzm from .sma_energy_meter import SmaEnergyMeter from .sml import Sml, parse_sml_obis_config from .tasmota import Tasmota -from .throttling import ThrottledPowermeter from .tq_em import TQEnergyManager -from .transform import TransformedPowermeter from .vzlogger import VZLogger +from .wrappers import ( + DeadbandPowermeter, + PidPowermeter, + PowermeterWrapper, + SmoothedPowermeter, + ThrottledPowermeter, + TransformedPowermeter, +) __all__ = [ "AmisReader", + "DeadbandPowermeter", "ESPHome", "Emlog", "HomeAssistant", @@ -32,6 +38,7 @@ "MqttPowermeter", "PidPowermeter", "Powermeter", + "PowermeterWrapper", "Script", "Shelly", "Shelly1PM", @@ -42,6 +49,7 @@ "Shrdzm", "SmaEnergyMeter", "Sml", + "SmoothedPowermeter", "TQEnergyManager", "Tasmota", "ThrottledPowermeter", diff --git a/src/astrameter/powermeter/base.py b/src/astrameter/powermeter/base.py index 5448add7..dc7de1bd 100644 --- a/src/astrameter/powermeter/base.py +++ b/src/astrameter/powermeter/base.py @@ -22,3 +22,6 @@ async def start(self): async def stop(self): pass + + def reset(self): + pass diff --git a/src/astrameter/powermeter/wrappers/__init__.py b/src/astrameter/powermeter/wrappers/__init__.py new file mode 100644 index 00000000..d83bf582 --- /dev/null +++ b/src/astrameter/powermeter/wrappers/__init__.py @@ -0,0 +1,14 @@ +from .base import PowermeterWrapper +from .pid import PidPowermeter +from .smoothing import DeadbandPowermeter, SmoothedPowermeter +from .throttling import ThrottledPowermeter +from .transform import TransformedPowermeter + +__all__ = [ + "DeadbandPowermeter", + "PidPowermeter", + "PowermeterWrapper", + "SmoothedPowermeter", + "ThrottledPowermeter", + "TransformedPowermeter", +] diff --git a/src/astrameter/powermeter/wrappers/base.py b/src/astrameter/powermeter/wrappers/base.py new file mode 100644 index 00000000..291a9b28 --- /dev/null +++ b/src/astrameter/powermeter/wrappers/base.py @@ -0,0 +1,26 @@ +from astrameter.powermeter.base import Powermeter + + +class PowermeterWrapper(Powermeter): + """Base for wrappers that decorate another Powermeter.""" + + def __init__(self, wrapped_powermeter: Powermeter) -> None: + self.wrapped_powermeter = wrapped_powermeter + + async def get_powermeter_watts(self) -> list[float]: + raise NotImplementedError() + + async def wait_for_message(self, timeout=5): + await self.wrapped_powermeter.wait_for_message(timeout) + + async def wait_for_next_message(self, timeout=5): + await self.wrapped_powermeter.wait_for_next_message(timeout) + + async def start(self): + await self.wrapped_powermeter.start() + + async def stop(self): + await self.wrapped_powermeter.stop() + + def reset(self): + self.wrapped_powermeter.reset() diff --git a/src/astrameter/powermeter/pid.py b/src/astrameter/powermeter/wrappers/pid.py similarity index 89% rename from src/astrameter/powermeter/pid.py rename to src/astrameter/powermeter/wrappers/pid.py index c626123a..1086ad5c 100644 --- a/src/astrameter/powermeter/pid.py +++ b/src/astrameter/powermeter/wrappers/pid.py @@ -1,10 +1,12 @@ import asyncio import time -from .base import Powermeter +from astrameter.powermeter.base import Powermeter +from .base import PowermeterWrapper -class PidPowermeter(Powermeter): + +class PidPowermeter(PowermeterWrapper): """ A wrapper around a powermeter that applies a PID (Proportional-Integral- Derivative) controller to steer the reported power toward zero (grid balance). @@ -78,7 +80,7 @@ def __init__( f"PID mode must be one of {self.VALID_MODES}, got '{mode}'" ) - self.wrapped_powermeter = wrapped_powermeter + super().__init__(wrapped_powermeter) self.kp = kp self.ki = ki self.kd = kd @@ -91,21 +93,6 @@ def __init__( self._prev_time: float | None = None self._lock = asyncio.Lock() - async def wait_for_message(self, timeout=5): - """Pass through to wrapped powermeter.""" - return await self.wrapped_powermeter.wait_for_message(timeout) - - async def wait_for_next_message(self, timeout=5): - return await self.wrapped_powermeter.wait_for_next_message(timeout) - - async def start(self): - """Pass through to wrapped powermeter.""" - await self.wrapped_powermeter.start() - - async def stop(self): - """Pass through to wrapped powermeter.""" - await self.wrapped_powermeter.stop() - async def get_powermeter_watts(self) -> list[float]: """Return PID-adjusted power readings for each phase.""" async with self._lock: diff --git a/src/astrameter/powermeter/pid_test.py b/src/astrameter/powermeter/wrappers/pid_test.py similarity index 95% rename from src/astrameter/powermeter/pid_test.py rename to src/astrameter/powermeter/wrappers/pid_test.py index 20b9bd32..05d1ac91 100644 --- a/src/astrameter/powermeter/pid_test.py +++ b/src/astrameter/powermeter/wrappers/pid_test.py @@ -104,11 +104,11 @@ async def test_integral_accumulates_over_time(mock_powermeter): pm = PidPowermeter(mock_powermeter, kp=0.0, ki=1.0, output_max=800.0) t0 = 1000.0 - with patch("astrameter.powermeter.pid.time") as mock_time: + with patch("astrameter.powermeter.wrappers.pid.time") as mock_time: mock_time.monotonic.return_value = t0 await pm.get_powermeter_watts() # first call — init state - with patch("astrameter.powermeter.pid.time") as mock_time: + with patch("astrameter.powermeter.wrappers.pid.time") as mock_time: mock_time.monotonic.return_value = t0 + 1.0 r2 = await pm.get_powermeter_watts() @@ -123,11 +123,11 @@ async def test_anti_windup_stops_integration(mock_powermeter): pm = PidPowermeter(mock_powermeter, kp=0.0, ki=1.0, output_max=200.0) t0 = 1000.0 - with patch("astrameter.powermeter.pid.time") as mock_time: + with patch("astrameter.powermeter.wrappers.pid.time") as mock_time: mock_time.monotonic.return_value = t0 await pm.get_powermeter_watts() # init - with patch("astrameter.powermeter.pid.time") as mock_time: + with patch("astrameter.powermeter.wrappers.pid.time") as mock_time: mock_time.monotonic.return_value = t0 + 10.0 result = await pm.get_powermeter_watts() @@ -147,12 +147,12 @@ async def test_derivative_reacts_to_change(mock_powermeter): t0 = 1000.0 mock_powermeter.get_powermeter_watts.return_value = [100.0] - with patch("astrameter.powermeter.pid.time") as mock_time: + with patch("astrameter.powermeter.wrappers.pid.time") as mock_time: mock_time.monotonic.return_value = t0 await pm.get_powermeter_watts() mock_powermeter.get_powermeter_watts.return_value = [200.0] - with patch("astrameter.powermeter.pid.time") as mock_time: + with patch("astrameter.powermeter.wrappers.pid.time") as mock_time: mock_time.monotonic.return_value = t0 + 1.0 result = await pm.get_powermeter_watts() diff --git a/src/astrameter/powermeter/wrappers/smoothing.py b/src/astrameter/powermeter/wrappers/smoothing.py new file mode 100644 index 00000000..de18e38f --- /dev/null +++ b/src/astrameter/powermeter/wrappers/smoothing.py @@ -0,0 +1,122 @@ +"""EMA-based smoothing and deadband powermeter wrappers.""" + +from __future__ import annotations + +from astrameter.config.logger import logger +from astrameter.powermeter.base import Powermeter + +from .base import PowermeterWrapper + + +class SmoothedPowermeter(PowermeterWrapper): + """EMA smoother that filters per-phase power readings. + + Applies an exponential moving average on the *total* power, then + distributes the smoothed total proportionally across phases so that + per-phase ratios are preserved. + + Dedup logic prevents multiple consumers polling within a single meter + cycle from compounding the EMA update: an update is skipped only when + **both** the per-phase sample identity and the raw total are unchanged. + """ + + def __init__( + self, + wrapped_powermeter: Powermeter, + alpha: float, + max_step: float = 0, + ) -> None: + super().__init__(wrapped_powermeter) + self._alpha = alpha + self._max_step = max_step + self._value: float | None = None + self._last_sample: tuple[float, ...] | None = None + self._last_raw_total: float | None = None + + @property + def smoothed_value(self) -> float | None: + """Current smoothed value, or ``None`` if no samples yet.""" + return self._value + + def reset(self) -> None: + super().reset() + logger.debug("SmoothedPowermeter: reset (previous value=%s)", self._value) + self._value = None + self._last_sample = None + self._last_raw_total = None + + async def get_powermeter_watts(self) -> list[float]: + raw_values = await self.wrapped_powermeter.get_powermeter_watts() + raw_total = sum(raw_values) + sample_id = tuple(raw_values) + + if self._value is None: + self._value = raw_total + self._last_sample = sample_id + self._last_raw_total = raw_total + logger.debug( + "SmoothedPowermeter: seed value=%.2f (raw=%.2f)", + self._value, + raw_total, + ) + return self._distribute(raw_values, raw_total) + + if sample_id == self._last_sample and raw_total == self._last_raw_total: + logger.debug( + "SmoothedPowermeter: dedup hit (raw=%.2f value=%.2f)", + raw_total, + self._value, + ) + return self._distribute(raw_values, raw_total) + + self._last_sample = sample_id + self._last_raw_total = raw_total + + catchup_alpha = self._alpha + if (raw_total > 0) != (self._value > 0): + catchup_alpha = min(0.5, self._alpha * 4) + delta = catchup_alpha * (raw_total - self._value) + + if self._max_step > 0: + delta = max(-self._max_step, min(self._max_step, delta)) + + prev = self._value + self._value += delta + logger.debug( + "SmoothedPowermeter: update raw=%.2f prev=%.2f delta=%.2f new=%.2f", + raw_total, + prev, + delta, + self._value, + ) + return self._distribute(raw_values, raw_total) + + def _distribute(self, raw_values: list[float], raw_total: float) -> list[float]: + """Distribute the smoothed total proportionally across phases.""" + if raw_total == 0 or self._value is None: + return list(raw_values) + ratio = self._value / raw_total + return [v * ratio for v in raw_values] + + +class DeadbandPowermeter(PowermeterWrapper): + """Gate that returns zeros when total power is below the deadband threshold. + + Stateless: the upstream :class:`SmoothedPowermeter` provides EMA + inertia, so the signal approaches the threshold gradually and the + entry/exit discontinuity is bounded by the deadband value. + """ + + def __init__( + self, + wrapped_powermeter: Powermeter, + deadband: float, + ) -> None: + super().__init__(wrapped_powermeter) + self._deadband = deadband + + async def get_powermeter_watts(self) -> list[float]: + values = await self.wrapped_powermeter.get_powermeter_watts() + if self._deadband > 0 and abs(sum(values)) < self._deadband: + return [0.0] * len(values) + return values diff --git a/src/astrameter/powermeter/wrappers/smoothing_test.py b/src/astrameter/powermeter/wrappers/smoothing_test.py new file mode 100644 index 00000000..a311ca3a --- /dev/null +++ b/src/astrameter/powermeter/wrappers/smoothing_test.py @@ -0,0 +1,243 @@ +"""Tests for SmoothedPowermeter and DeadbandPowermeter.""" + +import pytest + +from .smoothing import DeadbandPowermeter, SmoothedPowermeter + + +class FakePowermeter: + """Minimal powermeter stub for testing wrappers.""" + + def __init__(self, values: list[float] | None = None): + self._values: list[float] = values or [0.0] + self.started = False + self.stopped = False + self.reset_count = 0 + + def set(self, values: list[float]) -> None: + self._values = values + + async def get_powermeter_watts(self) -> list[float]: + return list(self._values) + + async def wait_for_message(self, timeout=5): + pass + + async def start(self): + self.started = True + + async def stop(self): + self.stopped = True + + def reset(self): + self.reset_count += 1 + + +# --------------------------------------------------------------------------- +# SmoothedPowermeter +# --------------------------------------------------------------------------- + + +class TestSmoothedPowermeter: + @pytest.mark.asyncio + async def test_first_call_seeds_value(self): + fake = FakePowermeter([100.0, 50.0, -30.0]) + sm = SmoothedPowermeter(fake, alpha=0.5) + result = await sm.get_powermeter_watts() + assert result == [100.0, 50.0, -30.0] + assert sm.smoothed_value == 120.0 + + @pytest.mark.asyncio + async def test_ema_converges_toward_raw(self): + fake = FakePowermeter([100.0]) + sm = SmoothedPowermeter(fake, alpha=0.5) + + # Seed + await sm.get_powermeter_watts() + assert sm.smoothed_value == 100.0 + + # Change raw → 200, EMA should move toward it + fake.set([200.0]) + await sm.get_powermeter_watts() + # delta = 0.5 * (200 - 100) = 50 → new = 150 + assert sm.smoothed_value == 150.0 + + # Another step — use two phases with same total to bypass dedup + fake.set([120.0, 80.0]) + await sm.get_powermeter_watts() + # delta = 0.5 * (200 - 150) = 25 → new = 175 + assert sm.smoothed_value == 175.0 + + @pytest.mark.asyncio + async def test_sign_change_catchup(self): + fake = FakePowermeter([100.0]) + sm = SmoothedPowermeter(fake, alpha=0.1) + + await sm.get_powermeter_watts() + assert sm.smoothed_value == 100.0 + + # Sign flip: raw goes negative + fake.set([-100.0]) + await sm.get_powermeter_watts() + # catchup_alpha = min(0.5, 0.1 * 4) = 0.4 + # delta = 0.4 * (-100 - 100) = -80 → new = 20 + assert sm.smoothed_value == pytest.approx(20.0) + + @pytest.mark.asyncio + async def test_max_step_limits_delta(self): + fake = FakePowermeter([100.0]) + sm = SmoothedPowermeter(fake, alpha=0.9, max_step=10) + + await sm.get_powermeter_watts() + + fake.set([1000.0]) + await sm.get_powermeter_watts() + # Without max_step: delta = 0.9 * 900 = 810 + # With max_step=10: clamped to 10 → new = 110 + assert sm.smoothed_value == 110.0 + + @pytest.mark.asyncio + async def test_dedup_identical_values(self): + fake = FakePowermeter([100.0]) + sm = SmoothedPowermeter(fake, alpha=0.5) + + await sm.get_powermeter_watts() + assert sm.smoothed_value == 100.0 + + # Same values → dedup hit, no EMA update + result = await sm.get_powermeter_watts() + assert sm.smoothed_value == 100.0 + assert result == [100.0] + + @pytest.mark.asyncio + async def test_dedup_same_sample_different_total_advances_ema(self): + """When sample_id matches but raw_total differs, EMA should advance.""" + fake = FakePowermeter([100.0, 50.0]) + sm = SmoothedPowermeter(fake, alpha=0.5) + + await sm.get_powermeter_watts() + assert sm.smoothed_value == 150.0 + + # Change values (different sample_id and different total) + fake.set([120.0, 60.0]) + await sm.get_powermeter_watts() + # delta = 0.5 * (180 - 150) = 15 → new = 165 + assert sm.smoothed_value == 165.0 + + @pytest.mark.asyncio + async def test_reset_clears_state(self): + fake = FakePowermeter([100.0]) + sm = SmoothedPowermeter(fake, alpha=0.5) + + await sm.get_powermeter_watts() + assert sm.smoothed_value == 100.0 + + sm.reset() + assert sm.smoothed_value is None + + # Next call seeds again + fake.set([200.0]) + await sm.get_powermeter_watts() + assert sm.smoothed_value == 200.0 + + @pytest.mark.asyncio + async def test_proportional_phase_distribution(self): + fake = FakePowermeter([60.0, 30.0, 10.0]) # total=100 + sm = SmoothedPowermeter(fake, alpha=0.5) + + # Seed + await sm.get_powermeter_watts() + + # Change to different values + fake.set([80.0, 40.0, 80.0]) # total=200 + result = await sm.get_powermeter_watts() + # smoothed: 100 + 0.5*(200-100) = 150 + # ratio = 150/200 = 0.75 + assert result == pytest.approx([60.0, 30.0, 60.0]) + + @pytest.mark.asyncio + async def test_all_zero_returns_raw(self): + fake = FakePowermeter([0.0, 0.0, 0.0]) + sm = SmoothedPowermeter(fake, alpha=0.5) + + result = await sm.get_powermeter_watts() + assert result == [0.0, 0.0, 0.0] + + @pytest.mark.asyncio + async def test_lifecycle_delegation(self): + fake = FakePowermeter([100.0]) + sm = SmoothedPowermeter(fake, alpha=0.5) + + await sm.start() + assert fake.started + + await sm.stop() + assert fake.stopped + + await sm.wait_for_message(timeout=1) + + sm.reset() + assert fake.reset_count == 1 + + +# --------------------------------------------------------------------------- +# DeadbandPowermeter +# --------------------------------------------------------------------------- + + +class TestDeadbandPowermeter: + @pytest.mark.asyncio + async def test_values_within_deadband_return_zeros(self): + fake = FakePowermeter([5.0, -3.0, 2.0]) # total=4 + db = DeadbandPowermeter(fake, deadband=20.0) + + result = await db.get_powermeter_watts() + assert result == [0.0, 0.0, 0.0] + + @pytest.mark.asyncio + async def test_values_outside_deadband_pass_through(self): + fake = FakePowermeter([50.0, 30.0, -10.0]) # total=70 + db = DeadbandPowermeter(fake, deadband=20.0) + + result = await db.get_powermeter_watts() + assert result == [50.0, 30.0, -10.0] + + @pytest.mark.asyncio + async def test_zero_deadband_disables_gating(self): + fake = FakePowermeter([1.0, -1.0, 0.5]) # total=0.5 + db = DeadbandPowermeter(fake, deadband=0.0) + + result = await db.get_powermeter_watts() + assert result == [1.0, -1.0, 0.5] + + @pytest.mark.asyncio + async def test_negative_total_within_deadband(self): + fake = FakePowermeter([-5.0, -3.0, 2.0]) # total=-6 + db = DeadbandPowermeter(fake, deadband=20.0) + + result = await db.get_powermeter_watts() + assert result == [0.0, 0.0, 0.0] + + @pytest.mark.asyncio + async def test_exactly_at_threshold_returns_zeros(self): + fake = FakePowermeter([19.9]) + db = DeadbandPowermeter(fake, deadband=20.0) + + result = await db.get_powermeter_watts() + assert result == [0.0] + + @pytest.mark.asyncio + async def test_lifecycle_delegation(self): + fake = FakePowermeter([100.0]) + db = DeadbandPowermeter(fake, deadband=20.0) + + await db.start() + assert fake.started + + await db.stop() + assert fake.stopped + + await db.wait_for_message(timeout=1) + + db.reset() + assert fake.reset_count == 1 diff --git a/src/astrameter/powermeter/throttling.py b/src/astrameter/powermeter/wrappers/throttling.py similarity index 87% rename from src/astrameter/powermeter/throttling.py rename to src/astrameter/powermeter/wrappers/throttling.py index 993f2590..29f8b6a7 100644 --- a/src/astrameter/powermeter/throttling.py +++ b/src/astrameter/powermeter/wrappers/throttling.py @@ -2,11 +2,12 @@ import time from astrameter.config.logger import logger +from astrameter.powermeter.base import Powermeter -from .base import Powermeter +from .base import PowermeterWrapper -class ThrottledPowermeter(Powermeter): +class ThrottledPowermeter(PowermeterWrapper): """ A wrapper around powermeter that throttles the rate of value fetching. @@ -18,7 +19,7 @@ class ThrottledPowermeter(Powermeter): """ def __init__(self, wrapped_powermeter: Powermeter, throttle_interval: float = 0.0): - self.wrapped_powermeter = wrapped_powermeter + super().__init__(wrapped_powermeter) self.throttle_interval = throttle_interval # Coalescing fetch pattern: when a fetch is in flight (including the @@ -28,18 +29,6 @@ def __init__(self, wrapped_powermeter: Powermeter, throttle_interval: float = 0. self._last_values: list[float] | None = None self._pending_fetch: asyncio.Future[list[float]] | None = None - async def wait_for_message(self, timeout=5): - return await self.wrapped_powermeter.wait_for_message(timeout) - - async def wait_for_next_message(self, timeout=5): - return await self.wrapped_powermeter.wait_for_next_message(timeout) - - async def start(self): - await self.wrapped_powermeter.start() - - async def stop(self): - await self.wrapped_powermeter.stop() - async def get_powermeter_watts(self) -> list[float]: if self.throttle_interval <= 0: return await self.wrapped_powermeter.get_powermeter_watts() diff --git a/src/astrameter/powermeter/throttling_test.py b/src/astrameter/powermeter/wrappers/throttling_test.py similarity index 100% rename from src/astrameter/powermeter/throttling_test.py rename to src/astrameter/powermeter/wrappers/throttling_test.py diff --git a/src/astrameter/powermeter/transform.py b/src/astrameter/powermeter/wrappers/transform.py similarity index 81% rename from src/astrameter/powermeter/transform.py rename to src/astrameter/powermeter/wrappers/transform.py index 5e06041b..10e12482 100644 --- a/src/astrameter/powermeter/transform.py +++ b/src/astrameter/powermeter/wrappers/transform.py @@ -1,9 +1,10 @@ from astrameter.config.logger import logger +from astrameter.powermeter.base import Powermeter -from .base import Powermeter +from .base import PowermeterWrapper -class TransformedPowermeter(Powermeter): +class TransformedPowermeter(PowermeterWrapper): """ A wrapper around a powermeter that applies a linear transformation (multiplier and offset) to each returned power value. @@ -25,24 +26,12 @@ def __init__( raise ValueError("offsets must be a non-empty list") if not multipliers: raise ValueError("multipliers must be a non-empty list") - self.wrapped_powermeter = wrapped_powermeter + super().__init__(wrapped_powermeter) self.offsets = offsets self.multipliers = multipliers self._offsets_mismatch_warned = False self._multipliers_mismatch_warned = False - async def wait_for_message(self, timeout=5): - return await self.wrapped_powermeter.wait_for_message(timeout) - - async def wait_for_next_message(self, timeout=5): - return await self.wrapped_powermeter.wait_for_next_message(timeout) - - async def start(self): - await self.wrapped_powermeter.start() - - async def stop(self): - await self.wrapped_powermeter.stop() - def _apply_transform(self, values: list[float]) -> list[float]: result = [] for i, value in enumerate(values): diff --git a/src/astrameter/powermeter/transform_test.py b/src/astrameter/powermeter/wrappers/transform_test.py similarity index 100% rename from src/astrameter/powermeter/transform_test.py rename to src/astrameter/powermeter/wrappers/transform_test.py diff --git a/tests/smoke_efficiency_saturation.py b/tests/smoke_efficiency_saturation.py index 82481350..206d7e1e 100644 --- a/tests/smoke_efficiency_saturation.py +++ b/tests/smoke_efficiency_saturation.py @@ -109,13 +109,13 @@ def __init__( ct_mac=ct_mac, active_control=True, fair_distribution=True, - smooth_target_alpha=0.9, deadband=5, min_efficient_power=min_efficient_power, efficiency_rotation_interval=scaled_rotation, efficiency_saturation_threshold=efficiency_saturation_threshold, saturation_decay_factor=saturation_decay_factor, consumer_ttl=120 / time_scale, + reset_fn=None, **ct_kwargs, ) diff --git a/tests/test_balancer_probe_lockup.py b/tests/test_balancer_probe_lockup.py index 01bc67ed..7af9d024 100644 --- a/tests/test_balancer_probe_lockup.py +++ b/tests/test_balancer_probe_lockup.py @@ -9,20 +9,17 @@ (visible in the log for ~1.5 hours until manual restart). The grid drifts ~97 W uncompensated for the entire window. -The root cause is exercised here at two levels: - -1. A *unit-level* repro that drives :class:`LoadBalancer.compute_target` - with the exact report sequence from the log (two phase-B consumers, - scripted power outputs and meter readings) and asserts the active - battery receives a reasonable target after the handoff. - -2. A *smoother-level* repro in ``test_smoother.py`` covering the - ``sample_id`` dedup that fires even when ``raw_total`` has changed. +The root cause is exercised here via a *unit-level* repro that drives +:class:`LoadBalancer.compute_target` with the exact report sequence from +the log (two phase-B consumers, scripted power outputs and meter +readings) and asserts the active battery receives a reasonable target +after the handoff. """ from __future__ import annotations import time +from collections.abc import Callable from astrameter.ct002.balancer import ( BalancerConfig, @@ -30,7 +27,6 @@ LoadBalancer, ProbeState, ) -from astrameter.ct002.smoother import TargetSmoother class _FakeClock: @@ -46,17 +42,13 @@ def advance(self, dt: float) -> None: def _make_balancer( clock: _FakeClock, - smoother: TargetSmoother | None = None, + reset_fn: Callable[[], None] | None = None, ) -> LoadBalancer: """Match the user's configuration (defaults plus efficiency enabled). - The ``smoother`` is injected the same way :class:`CT002` does in - production (`ct002.py:159`). Tests that exercise the probe - commit/reject path **must** pass their smoother here, otherwise - the balancer has no reference to the smoother and - ``_commit_probe``/``_reject_probe`` can't call ``reseed()`` on it — - the test would silently skip the reseed path the code is - supposed to cover. + Tests that exercise the probe commit/reject path **must** pass a + ``reset_fn`` here so ``_commit_probe``/``_reject_probe`` can + invoke it — otherwise the test silently skips the reset path. """ return LoadBalancer( config=BalancerConfig( @@ -82,7 +74,7 @@ def _make_balancer( saturation_stall_timeout_seconds=60.0, saturation_enabled=True, clock=clock, - smoother=smoother, + reset_fn=reset_fn, ) @@ -97,7 +89,6 @@ def _reports(active_power: int, backup_power: int) -> dict: def _tick( lb: LoadBalancer, - smoother: TargetSmoother, reports: dict, grid_reading: float, ) -> tuple[list[float], list[float]]: @@ -105,42 +96,35 @@ def _tick( Returns ``(active_target, backup_target)`` as 3-element phase lists. """ - sample_id = (grid_reading, 0.0, 0.0) - smoothed = smoother.update(grid_reading, sample_id) - # Order mirrors the real log: active battery first, backup second. active_target = lb.compute_target( consumer_id="24215edb1936", consumer_mode=ConsumerMode("auto"), all_reports=reports, - smoothed_target=smoothed, - raw_total=grid_reading, + grid_total=grid_reading, inactive=frozenset(), manual=frozenset(), - sample_id=sample_id, ) backup_target = lb.compute_target( consumer_id="acd929a74b20", consumer_mode=ConsumerMode("auto"), all_reports=reports, - smoothed_target=smoothed, - raw_total=grid_reading, + grid_total=grid_reading, inactive=frozenset(), manual=frozenset(), - sample_id=sample_id, ) return active_target, backup_target class TestProbeReseedsSmoother: - """After a probe commits or rejects, the balancer must reseed any - attached smoother so the post-handoff control loop cannot drag in - pre-probe EMA state. + """After a probe commits or rejects, the balancer must invoke the + injected ``reset_fn`` so the post-handoff control loop cannot drag + in pre-probe EMA state. """ - def test_probe_commit_reseeds_injected_smoother(self) -> None: + def test_probe_commit_calls_reset_fn(self) -> None: clock = _FakeClock() - smoother = TargetSmoother(alpha=0.5) + calls: list[str] = [] lb = LoadBalancer( config=BalancerConfig( min_efficient_power=50, @@ -153,11 +137,8 @@ def test_probe_commit_reseeds_injected_smoother(self) -> None: saturation_grace_seconds=90.0, saturation_stall_timeout_seconds=60.0, clock=clock, - smoother=smoother, + reset_fn=lambda: calls.append("reset"), ) - # Seed the smoother so reseed has something to clear. - smoother.update(50.0, (50.0,)) - assert smoother.value == 50.0 # Inject a fake in-flight probe and commit it. lb._probe_state = ProbeState( # type: ignore[attr-defined] @@ -177,11 +158,11 @@ def test_probe_commit_reseeds_injected_smoother(self) -> None: actual=22.0, ) - assert smoother.value is None, "smoother was not reseeded after probe commit" + assert len(calls) == 1, "reset_fn was not called after probe commit" - def test_probe_reject_reseeds_injected_smoother(self) -> None: + def test_probe_reject_calls_reset_fn(self) -> None: clock = _FakeClock() - smoother = TargetSmoother(alpha=0.5) + calls: list[str] = [] lb = LoadBalancer( config=BalancerConfig( min_efficient_power=50, @@ -194,10 +175,8 @@ def test_probe_reject_reseeds_injected_smoother(self) -> None: saturation_grace_seconds=90.0, saturation_stall_timeout_seconds=60.0, clock=clock, - smoother=smoother, + reset_fn=lambda: calls.append("reset"), ) - smoother.update(75.0, (75.0,)) - assert smoother.value == 75.0 lb._probe_state = ProbeState( # type: ignore[attr-defined] candidate_id="24215edb1936", @@ -209,7 +188,30 @@ def test_probe_reject_reseeds_injected_smoother(self) -> None: ) lb._reject_probe(now=clock(), reason="test") # type: ignore[attr-defined] - assert smoother.value is None, "smoother was not reseeded after probe reject" + assert len(calls) == 1, "reset_fn was not called after probe reject" + + +class _TestSmoother: + """Minimal EMA smoother for test use only.""" + + def __init__(self, alpha: float = 0.9) -> None: + self._alpha = alpha + self._value: float | None = None + + @property + def value(self) -> float | None: + return self._value + + def update(self, raw: float) -> float: + if self._value is None: + self._value = raw + else: + delta = self._alpha * (raw - self._value) + self._value += delta + return self._value + + def reset(self) -> None: + self._value = None class TestProbeHandoffLockup: @@ -230,11 +232,11 @@ def test_active_battery_keeps_covering_demand_after_probe_handoff(self) -> None: target, not zero it. """ clock = _FakeClock() - smoother = TargetSmoother(alpha=0.9, deadband=20.0) - # Wire the smoother into the balancer the same way CT002 does - # in production, so ``_commit_probe`` will reseed it and the - # test actually exercises the production reseed path. - lb = _make_balancer(clock, smoother=smoother) + smoother = _TestSmoother(alpha=0.9) + # Wire the smoother's reset into the balancer so + # ``_commit_probe`` will reset it and the test actually + # exercises the production reset path. + lb = _make_balancer(clock, reset_fn=smoother.reset) # --- Warm-up: drive to a single-active steady state -------------- # Prime: seed both consumers on phase B with a 94 W load on the @@ -243,34 +245,23 @@ def test_active_battery_keeps_covering_demand_after_probe_handoff(self) -> None: # anything while the priority list settles). for _ in range(10): reports = _reports(active_power=0, backup_power=94) - _tick(lb, smoother, reports, grid_reading=0.0) + smoothed = smoother.update(0.0) + _tick(lb, reports, grid_reading=smoothed) clock.advance(3.0) - # Feed a unique ``sample_id`` each tick so - # ``TargetSmoother.update()`` isn't deduped against the - # previous _tick call. The ``raw_total`` is deliberately - # the *true* zero reading (not ``1e-6 * clock()`` — that - # would be ~1700 because ``_FakeClock`` starts at - # ``time.time()``, which would hugely contaminate the EMA). - smoother.update(0.0, (clock(),)) # Strict exclusivity check: after warm-up, the balancer has # populated the priority list from ``sorted(current_pool)`` # (`balancer.py:867`), so ``24215edb1936`` (alphabetically # first) sits at slot 0 and ``acd929a74b20`` is the sole - # deprioritized consumer. The earlier assertion used an - # ``or`` expression that was tautologically true because - # ``acd929a74b20`` is always in ``_priority`` regardless of - # which slot it's in — this stricter form actually catches - # regressions where the warm-up leaves the pool in an - # unexpected state. + # deprioritized consumer. assert lb._priority == ["24215edb1936", "acd929a74b20"], ( f"Unexpected priority after warm-up: {lb._priority}" ) assert lb._deprioritized == {"acd929a74b20"}, ( f"Unexpected deprioritized after warm-up: {lb._deprioritized}" ) - # Sanity: the smoother pollution fix above must keep the EMA - # at true zero during a zero-grid warm-up. + # Sanity: the smoother must stay at true zero during a + # zero-grid warm-up. assert smoother.value == 0.0, ( f"Warm-up contaminated the smoother: {smoother.value}" ) @@ -284,7 +275,8 @@ def test_active_battery_keeps_covering_demand_after_probe_handoff(self) -> None: reports = _reports(active_power=p, backup_power=94) # Grid during probe: total battery = p + 94, load still 94, # so grid = 94 - (p + 94) = -p (slight export while probe ramps). - _tick(lb, smoother, reports, grid_reading=float(-p)) + smoothed = smoother.update(float(-p)) + _tick(lb, reports, grid_reading=smoothed) clock.advance(3.0) # --- Post-probe fade: backup collapses, active stays stuck at 22 W @@ -308,17 +300,12 @@ def test_active_battery_keeps_covering_demand_after_probe_handoff(self) -> None: grid = 94.0 - (active_power + backup_power) reports = _reports(active_power=active_power, backup_power=backup_power) - active_target, backup_target = _tick( - lb, smoother, reports, grid_reading=grid - ) + smoothed = smoother.update(grid) + active_target, backup_target = _tick(lb, reports, grid_reading=smoothed) clock.advance(3.0) # Phase B target for the active battery. b_target = active_target[1] - # ``smoother.value`` can be ``None`` on the tick immediately - # following a probe commit (the balancer reseeds its - # injected smoother in ``_commit_probe``). Use a sentinel - # so the trace print survives that transient. smoothed_str = ( f"{smoother.value:6.1f}" if smoother.value is not None else " None" ) diff --git a/tests/test_ct002_active_control.py b/tests/test_ct002_active_control.py index 3d7fa7a9..59ab542d 100644 --- a/tests/test_ct002_active_control.py +++ b/tests/test_ct002_active_control.py @@ -19,18 +19,6 @@ def test_smooth_target_splits_across_consumers(self): assert out[1] == 0 assert out[2] == 0 - def test_smooth_target_ema_smooths_raw_input(self): - device = CT002( - active_control=True, - fair_distribution=False, - smooth_target_alpha=0.5, - ) - device._update_consumer_report("a", "A", 0) - first = device._compute_smooth_target([400, 0, 0], "a") - second = device._compute_smooth_target([100, 0, 0], "a") - assert first[0] == 400 - assert second[0] == 250 - def test_active_control_off_passes_through_values(self): device = CT002(active_control=False) device._update_consumer_report("a", "A", 0) @@ -54,67 +42,6 @@ def test_active_control_splits_target_across_detected_phases(self): assert out[1] == 100 assert out[2] == 0 - def test_deadband_decays_smoothed_toward_zero(self): - """When raw total is within deadband, smoothed target should decay - toward zero rather than holding stale values.""" - device = CT002( - active_control=True, - fair_distribution=False, - smooth_target_alpha=0.3, - deadband=20, - ) - device._update_consumer_report("a", "A", 0) - # Set a large initial smoothed target - device._compute_smooth_target([500, 0, 0], "a") - assert device._smoother.value == 500 - - # Feed readings within deadband (grid balanced). - # Each call uses a unique value so the sample-dedup sees a fresh reading. - for i in range(20): - device._compute_smooth_target([i, 0, 0], "a") - - # Smoothed should have decayed significantly toward zero - assert device._smoother.value < 10 - - def test_deadband_decay_does_not_overshoot_zero(self): - """Deadband decay should not make smoothed target cross zero.""" - device = CT002( - active_control=True, - fair_distribution=False, - smooth_target_alpha=0.5, - deadband=20, - ) - device._update_consumer_report("a", "A", 0) - device._compute_smooth_target([100, 0, 0], "a") - # Decay multiple times with unique values within deadband - for i in range(50): - device._compute_smooth_target([i % 19, 0, 0], "a") - # Should approach zero but stay non-negative - assert device._smoother.value >= 0 - - def test_smoothing_applies_once_per_sample(self): - """Multiple consumers calling with the same meter reading should - not compound the smoothing update.""" - device = CT002( - active_control=True, - fair_distribution=False, - smooth_target_alpha=0.5, - ) - device._update_consumer_report("a", "A", 0) - device._update_consumer_report("b", "A", 0) - device._compute_smooth_target([400, 0, 0], "a") - assert device._smoother.value == 400 - - # Two consumers call with the same new reading - device._compute_smooth_target([100, 0, 0], "a") - after_first = device._smoother.value - device._compute_smooth_target([100, 0, 0], "b") - after_second = device._smoother.value - - # Smoothing should have applied only once - assert after_first == 250 # 400 + 0.5*(100-400) - assert after_second == 250 # unchanged - class TestFairDistribution: """Tests for fair load distribution across consumers.""" @@ -1254,7 +1181,6 @@ def test_probe_backup_uses_delta_not_absolute_output(self): device = CT002( active_control=True, fair_distribution=False, - smooth_target_alpha=1.0, min_efficient_power=150, probe_min_power=80, efficiency_fade_alpha=1.0, @@ -1279,7 +1205,6 @@ def test_probe_backup_ignores_probe_output_and_follows_demand(self): device = CT002( active_control=True, fair_distribution=False, - smooth_target_alpha=1.0, min_efficient_power=150, probe_min_power=80, efficiency_fade_alpha=1.0, @@ -1391,7 +1316,6 @@ def test_reactivated_consumer_rejoins_distribution(self): # Reactivate bat1 device.set_consumer_active("bat1", True) - device._smoother._last_sample = None # force re-evaluation result = device._compute_smooth_target([400, 0, 0], "bat1") assert result[0] == 200 # now split between two diff --git a/tests/test_e2e_probe_lockup.py b/tests/test_e2e_probe_lockup.py index f206dfc4..b9a77d2a 100644 --- a/tests/test_e2e_probe_lockup.py +++ b/tests/test_e2e_probe_lockup.py @@ -149,12 +149,12 @@ def __init__( ct_mac=ct_mac, active_control=True, fair_distribution=True, - smooth_target_alpha=0.9, deadband=5, min_efficient_power=min_efficient_power, efficiency_rotation_interval=efficiency_rotation_interval, probe_min_power=20, # lower so the test's small loads can probe clock=self.clock, + reset_fn=None, ) async def update_readings(_addr, _fields=None, _consumer_id=None): @@ -365,7 +365,7 @@ async def test_stale_meter_during_probe_causes_persistent_lockup( after = h.battery_powers() grid_after = h.grid_total() - smoothed = h.ct002._smoother.value + smoothed = h.ct002._last_smooth_target # The real grid is measurably off-balance because the # emulator drove the handoff blind. Accept either sign: diff --git a/tests/test_efficiency_e2e.py b/tests/test_efficiency_e2e.py index e12b7b97..612b237d 100644 --- a/tests/test_efficiency_e2e.py +++ b/tests/test_efficiency_e2e.py @@ -141,11 +141,11 @@ def __init__( ct_mac=ct_mac, active_control=True, fair_distribution=True, - smooth_target_alpha=0.9, deadband=5, min_efficient_power=min_efficient_power, efficiency_rotation_interval=efficiency_rotation_interval, clock=self.clock, + reset_fn=None, **ct_kwargs, ) diff --git a/tests/test_smoother.py b/tests/test_smoother.py deleted file mode 100644 index bb66bd06..00000000 --- a/tests/test_smoother.py +++ /dev/null @@ -1,119 +0,0 @@ -"""Unit tests for :class:`TargetSmoother`. - -These exercise both the happy paths (EMA convergence, deadband decay, -sign-change catchup) and the regression case described in -``tests/test_balancer_probe_lockup.py`` — a stable sensor value trapping -the smoother on the "wrong" side of a zero-crossing. -""" - -from __future__ import annotations - -from astrameter.ct002.smoother import TargetSmoother - - -class TestTargetSmootherBasics: - def test_first_sample_seeds_value(self) -> None: - s = TargetSmoother(alpha=0.5) - assert s.update(100.0, (100.0, 0.0, 0.0)) == 100.0 - assert s.value == 100.0 - - def test_ema_converges_toward_raw(self) -> None: - s = TargetSmoother(alpha=0.5) - s.update(0.0, (0.0,)) - # Each new sample must be a distinct sample_id or the dedup fires. - s.update(100.0, (100.0,)) - assert 49.0 < s.value < 51.0 # 0 + 0.5 * (100 - 0) - s.update(100.0, (100.1,)) - assert 74.0 < s.value < 76.0 - - def test_deadband_decays_toward_zero(self) -> None: - s = TargetSmoother(alpha=0.5, deadband=10.0) - s.update(100.0, (100.0,)) - s.update(2.0, (2.0,)) # inside deadband - assert 49.0 < s.value < 51.0 # decayed by alpha - s.update(3.0, (3.0,)) # still inside deadband - assert 24.0 < s.value < 26.0 - - def test_sign_flip_catchup(self) -> None: - s = TargetSmoother(alpha=0.1) - s.update(100.0, (100.0,)) - assert s.value == 100.0 - # Sign flip: catchup_alpha = min(0.5, 0.1*4) = 0.4 - s.update(-100.0, (-100.0,)) - # delta = 0.4 * (-100 - 100) = -80 → value = 100 + (-80) = 20 - assert 19.0 < s.value < 21.0 - - def test_max_step_limits_delta(self) -> None: - s = TargetSmoother(alpha=1.0, max_step=10.0) - s.update(0.0, (0.0,)) - s.update(100.0, (100.0,)) - assert s.value == 10.0 - - -class TestTargetSmootherLockupRegression: - """Regression: a stable sensor value must not trap the smoother. - - The balancer uses ``smoothed_target`` to compute per-consumer targets. - If the smoother stops advancing while the raw meter reading is stale - (e.g. push-based powermeter hasn't forwarded a new state event yet), - the entire control loop freezes at the last-known value. - - See the matching test in ``test_balancer_probe_lockup.py`` for the - end-to-end manifestation that prompted the fix. - """ - - def test_identical_samples_still_advance_value_when_changing(self) -> None: - """Two calls with the same ``sample_id`` but different ``raw_total`` - must still be allowed to advance the EMA — sample_id is meant to - coalesce multi-consumer polls within one meter tick, not to cap - the smoother at its first reading. - """ - s = TargetSmoother(alpha=0.5) - s.update(50.0, (0.0,)) # Seed value well away from zero. - assert s.value == 50.0 - # Same sample_id but raw has moved: this is the "stale key, fresh - # value" pattern that could mask a real meter change. The - # smoother must advance the EMA. - s.update(100.0, (0.0,)) - assert s.value != 50.0, ( - "Smoother ignored a fresh raw_total because sample_id was " - "identical — this is the lockup regression." - ) - # 50 + 0.5 * (100 - 50) = 75 - assert 74.0 < s.value < 76.0 - - def test_repeated_identical_call_within_tick_is_still_deduped(self) -> None: - """Within a single meter tick, multiple consumers polling must not - compound the EMA. The dedup still has to fire when *both* - ``raw_total`` and ``sample_id`` match. - """ - s = TargetSmoother(alpha=0.5) - s.update(0.0, (0.0,)) - s.update(100.0, (100.0,)) # Tick 1: advance to 50 - after_first = s.value - # Tick 1 continued: a second consumer reads the same meter value - # and calls update() with an identical (raw, sample_id) pair. The - # EMA must not compound. - s.update(100.0, (100.0,)) - assert s.value == after_first, "Dedup within a tick failed — EMA compounded" - - def test_reseed_clears_state_and_next_update_seeds_directly(self) -> None: - """After :meth:`reseed` the next update must set ``_value`` - directly to ``raw_total`` (bypassing EMA) so post-probe state - can re-anchor in a single step. - """ - s = TargetSmoother(alpha=0.1) - s.update(0.0, (0.0,)) - s.update(50.0, (50.0,)) - assert s.value is not None - assert s.value != 50.0 # EMA has dragged it somewhere in between - - s.reseed() - assert s.value is None - assert s._last_sample is None - assert s._last_raw_total is None - - s.update(100.0, (100.0,)) - assert s.value == 100.0, ( - "First post-reseed update must seed directly, not EMA-smooth" - )