diff --git a/.cspell/custom-dictionary-workspace.txt b/.cspell/custom-dictionary-workspace.txt index 30cf131c3..48a869b06 100644 --- a/.cspell/custom-dictionary-workspace.txt +++ b/.cspell/custom-dictionary-workspace.txt @@ -299,6 +299,7 @@ remotecontrol resetmidnight resultid resultmid +retrips rname Roboto rowspan diff --git a/apps/predbat/solis.py b/apps/predbat/solis.py index d165e9dbe..ecaeb5330 100644 --- a/apps/predbat/solis.py +++ b/apps/predbat/solis.py @@ -32,6 +32,13 @@ SOLIS_INITIAL_RETRY_DELAY = 1 # seconds SOLIS_REQUEST_TIMEOUT = 30 # seconds +# Circuit breaker configuration +SOLIS_CB_FAILURE_THRESHOLD = 3 # consecutive non-throttle failures before tripping +SOLIS_CB_INITIAL_RECOVERY_TIME = 60 # seconds before first retry after trip +SOLIS_CB_MAX_RECOVERY_TIME = 900 # 15-minute cap on recovery wait +SOLIS_CB_BACKOFF_MULTIPLIER = 2 # double recovery time on each successive trip +SOLIS_CB_THROTTLE_CODES = {"B0600", "B0173", "B0115"} # API response codes that trip the breaker immediately + # CID Constants (Control IDs for inverter registers) SOLIS_CID_STORAGE_MODE = 636 SOLIS_CID_BATTERY_RESERVE_SOC = 157 @@ -288,6 +295,13 @@ def initialize(self, api_key, api_secret, inverter_sn=None, automatic=False, bas # Tracking self.slots_reset = set() # Track which inverters had slots reset + # Circuit breaker state + self._cb_state = "CLOSED" # "CLOSED" | "OPEN" | "HALF_OPEN" + self._cb_failure_count = 0 # consecutive non-throttle failures + self._cb_trip_count = 0 # how many times the breaker has tripped (drives backoff) + self._cb_open_at = None # time.monotonic() when circuit last opened + self._cb_recovery_time = SOLIS_CB_INITIAL_RECOVERY_TIME # current wait duration before retry + self.log(f"Solis API: Initialised with inverter_sn={self.inverter_sn}, automatic={automatic}") # ==================== Helper Methods ==================== @@ -350,8 +364,56 @@ def _build_headers(self, endpoint, payload): # ==================== Core API Methods ==================== + def _cb_record_success(self): + """Record a successful API call and close the circuit breaker if it was recovering.""" + if self._cb_state == "HALF_OPEN": + self.log("Solis API: Circuit breaker closed (recovered)") + self._cb_state = "CLOSED" + self._cb_failure_count = 0 + self._cb_trip_count = 0 + self._cb_recovery_time = SOLIS_CB_INITIAL_RECOVERY_TIME + + def _cb_record_failure(self, response_code=None): + """Record a failed API call and trip the circuit breaker when appropriate.""" + if response_code is not None and response_code in SOLIS_CB_THROTTLE_CODES: + # Throttle code — trip immediately regardless of failure count + self._cb_trip_count += 1 + self._cb_recovery_time = min(SOLIS_CB_INITIAL_RECOVERY_TIME * (SOLIS_CB_BACKOFF_MULTIPLIER ** (self._cb_trip_count - 1)), SOLIS_CB_MAX_RECOVERY_TIME) + self._cb_state = "OPEN" + self._cb_open_at = time.monotonic() + self._cb_failure_count = 0 + self.log(f"Warn: Solis API: Circuit breaker OPEN for {self._cb_recovery_time:.0f}s (trip {self._cb_trip_count}) — throttle code {response_code}") + else: + self._cb_failure_count += 1 + if self._cb_failure_count >= SOLIS_CB_FAILURE_THRESHOLD: + self._cb_trip_count += 1 + self._cb_recovery_time = min(SOLIS_CB_INITIAL_RECOVERY_TIME * (SOLIS_CB_BACKOFF_MULTIPLIER ** (self._cb_trip_count - 1)), SOLIS_CB_MAX_RECOVERY_TIME) + self._cb_state = "OPEN" + self._cb_open_at = time.monotonic() + self._cb_failure_count = 0 + self.log(f"Warn: Solis API: Circuit breaker OPEN for {self._cb_recovery_time:.0f}s (trip {self._cb_trip_count}) — {self._cb_trip_count * SOLIS_CB_FAILURE_THRESHOLD} consecutive failures") + + @property + def _cb_is_open(self): + """Return True if the circuit breaker is blocking outgoing calls.""" + if self._cb_state == "CLOSED": + return False + if self._cb_state == "OPEN": + elapsed = time.monotonic() - self._cb_open_at + if elapsed >= self._cb_recovery_time: + self._cb_state = "HALF_OPEN" + self.log(f"Solis API: Circuit breaker transitioning to HALF_OPEN after {elapsed:.0f}s — allowing probe request") + return False + return True + # HALF_OPEN — let the next probe request through + return False + async def _execute_request(self, endpoint, payload): """Execute HTTP POST request to Solis API""" + # Circuit breaker check — bail immediately if the breaker is OPEN + if self._cb_is_open: + raise SolisAPIError("Circuit breaker OPEN, skipping API call") + url = f"{self.base_url}{endpoint}" headers = self._build_headers(endpoint, payload) @@ -363,6 +425,7 @@ async def _execute_request(self, endpoint, payload): error_text = await response.text() reason = "auth_error" if response.status in (401, 403) else "server_error" record_api_call("solis", False, reason) + self._cb_record_failure() raise SolisAPIError(f"HTTP error: {error_text}", status_code=response.status) # Parse JSON response @@ -375,18 +438,23 @@ async def _execute_request(self, endpoint, payload): if str(code) != "0": error_msg = response_json.get("msg", "Unknown error") error_detail = SOLIS_API_CODES.get(str(code), f"Unknown code: {code}") - record_api_call("solis", False, "server_error") + is_throttle = str(code) in SOLIS_CB_THROTTLE_CODES + record_api_call("solis", False, "rate_limit" if is_throttle else "server_error") + self._cb_record_failure(str(code)) raise SolisAPIError(f"API error: {error_msg} ({error_detail} - {response_json})", response_code=str(code)) # Return data field record_api_call("solis") + self._cb_record_success() return response_json.get("data") except asyncio.TimeoutError as err: record_api_call("solis", False, "connection_error") + self._cb_record_failure() raise SolisAPIError(f"Timeout accessing {url}") from err except aiohttp.ClientError as err: record_api_call("solis", False, "connection_error") + self._cb_record_failure() raise SolisAPIError(f"Network error accessing {url}: {str(err)}") from err async def _with_retry(self, operation, max_retry_time=SOLIS_MAX_RETRY_TIME): @@ -403,6 +471,10 @@ async def _with_retry(self, operation, max_retry_time=SOLIS_MAX_RETRY_TIME): if elapsed_time >= max_retry_time: raise err + # Bail immediately if the circuit just tripped — no point burning the retry window + if self._cb_state != "CLOSED": + raise err + attempt += 1 self.log(f"Warn: Solis API retry {attempt} after {elapsed_time:.1f}s: {str(err)}") @@ -2764,6 +2836,13 @@ async def run(self, seconds, first): """Main run cycle called every 5 seconds""" poll_success = True + # Circuit breaker — skip all API calls this cycle if the breaker is OPEN + if self._cb_is_open: + remaining = self._cb_recovery_time - (time.monotonic() - self._cb_open_at) + self.log(f"Solis API: Circuit breaker OPEN, skipping API calls this cycle ({remaining:.0f}s remaining)") + await self.publish_entities() # stale cache is fine; keeps HA entities alive + return False + # One-time startup configuration if first: # Create aiohttp session diff --git a/apps/predbat/tests/test_solis.py b/apps/predbat/tests/test_solis.py index f64ef6c37..049eca709 100644 --- a/apps/predbat/tests/test_solis.py +++ b/apps/predbat/tests/test_solis.py @@ -20,6 +20,7 @@ from solis import get_solis_mode_enum, compute_solis_mode_value from solis import ENUM_OTHER, ENUM_SELF_USE, ENUM_SELF_USE_NO_GRID_CHARGING, ENUM_FEED_IN_PRIORITY, ENUM_FEED_IN_PRIORITY_NO_GRID_CHARGING from solis import SOLIS_BIT_SELF_USE, SOLIS_BIT_FEED_IN_PRIORITY, SOLIS_BIT_GRID_CHARGING, SOLIS_BIT_OFF_GRID +from solis import SOLIS_CB_FAILURE_THRESHOLD, SOLIS_CB_INITIAL_RECOVERY_TIME, SOLIS_CB_MAX_RECOVERY_TIME, SOLIS_CB_BACKOFF_MULTIPLIER, SOLIS_CB_THROTTLE_CODES, SolisAPIError class MockBase: @@ -64,6 +65,13 @@ def __init__(self, prefix="predbat"): self.cached_infos = {} self.slots_reset = set() + # Circuit breaker state (mirrors initialize() in SolisAPI) + self._cb_state = "CLOSED" + self._cb_failure_count = 0 + self._cb_trip_count = 0 + self._cb_open_at = None + self._cb_recovery_time = SOLIS_CB_INITIAL_RECOVERY_TIME + # Logging self.log_messages = [] self.dashboard_items = {} @@ -308,6 +316,257 @@ async def test_fetch_entity_data_invalid_values(): return False +# ==================== Circuit Breaker Tests ==================== + + +async def test_cb_closed_to_open_on_threshold(): + """CLOSED → OPEN after SOLIS_CB_FAILURE_THRESHOLD consecutive non-throttle failures""" + print("\n=== Test: circuit breaker CLOSED → OPEN on threshold ===") + + api = MockSolisAPI() + assert api._cb_state == "CLOSED" + + # Fire (threshold - 1) failures — breaker should still be CLOSED + for i in range(SOLIS_CB_FAILURE_THRESHOLD - 1): + api._cb_record_failure() + assert api._cb_state == "CLOSED", f"Expected CLOSED after {i + 1} failure(s), got {api._cb_state}" + + # One more — should trip now + api._cb_record_failure() + assert api._cb_state == "OPEN", f"Expected OPEN after {SOLIS_CB_FAILURE_THRESHOLD} failures, got {api._cb_state}" + assert api._cb_trip_count == 1 + assert api._cb_recovery_time == SOLIS_CB_INITIAL_RECOVERY_TIME + assert api._cb_open_at is not None + + print("PASSED: circuit breaker CLOSED → OPEN on threshold") + return False + + +async def test_cb_immediate_trip_on_throttle_code(): + """CLOSED → OPEN immediately on a known throttle code (e.g. B0600)""" + print("\n=== Test: circuit breaker immediate trip on throttle code ===") + + for code in SOLIS_CB_THROTTLE_CODES: + api = MockSolisAPI() + assert api._cb_state == "CLOSED" + api._cb_record_failure(response_code=code) + assert api._cb_state == "OPEN", f"Expected OPEN for code {code}, got {api._cb_state}" + assert api._cb_trip_count == 1 + assert api._cb_failure_count == 0 # reset on trip + + print("PASSED: circuit breaker immediate trip on throttle code") + return False + + +async def test_cb_open_blocks_calls(): + """OPEN circuit breaker raises SolisAPIError without making HTTP requests""" + print("\n=== Test: OPEN circuit breaker blocks calls ===") + + api = MockSolisAPI() + # Trip the breaker + api._cb_record_failure(response_code="B0600") + assert api._cb_state == "OPEN" + + # _cb_is_open should return True (recovery time has NOT elapsed) + assert api._cb_is_open is True + + # Confirm it raises immediately when _execute_request is called + http_called = [] + + async def mock_post(*args, **kwargs): + http_called.append(True) + raise AssertionError("HTTP request should not have been made") + + # Patch session to detect if a real HTTP call is attempted + api.session = MagicMock() + api.session.post = mock_post + + try: + await api._execute_request("/v2/api/atRead", {"inverterSn": "TEST", "cid": 636}) + assert False, "Expected SolisAPIError to be raised" + except SolisAPIError as e: + assert "Circuit breaker OPEN" in str(e), f"Unexpected error message: {e}" + + assert not http_called, "HTTP call was made despite circuit being OPEN" + + print("PASSED: OPEN circuit breaker blocks calls") + return False + + +async def test_cb_open_to_half_open_after_recovery(): + """OPEN → HALF_OPEN after recovery time elapses""" + print("\n=== Test: OPEN → HALF_OPEN after recovery time ===") + + api = MockSolisAPI() + api._cb_record_failure(response_code="B0600") + assert api._cb_state == "OPEN" + + # Simulate recovery time having passed by backdating _cb_open_at + api._cb_open_at = api._cb_open_at - api._cb_recovery_time - 1 + + assert api._cb_is_open is False + assert api._cb_state == "HALF_OPEN" + + # _cb_is_open should now return False (probe allowed through) + assert api._cb_is_open is False + + print("PASSED: OPEN → HALF_OPEN after recovery time") + return False + + +async def test_cb_half_open_to_closed_on_success(): + """HALF_OPEN → CLOSED when the probe request succeeds""" + print("\n=== Test: HALF_OPEN → CLOSED on success ===") + + api = MockSolisAPI() + api._cb_record_failure(response_code="B0600") + api._cb_open_at = api._cb_open_at - api._cb_recovery_time - 1 + api._cb_is_open # trigger transition to HALF_OPEN + assert api._cb_state == "HALF_OPEN" + + api._cb_record_success() + assert api._cb_state == "CLOSED" + assert api._cb_failure_count == 0 + assert api._cb_trip_count == 0 + assert api._cb_recovery_time == SOLIS_CB_INITIAL_RECOVERY_TIME + + print("PASSED: HALF_OPEN → CLOSED on success") + return False + + +async def test_cb_half_open_retrips_on_failure(): + """HALF_OPEN → OPEN (re-trip) with doubled recovery time when probe request fails""" + print("\n=== Test: HALF_OPEN re-trips on failure with doubled recovery time ===") + + api = MockSolisAPI() + # First trip + api._cb_record_failure(response_code="B0600") + assert api._cb_trip_count == 1 + first_recovery = api._cb_recovery_time + + # Transition to HALF_OPEN + api._cb_open_at = api._cb_open_at - api._cb_recovery_time - 1 + api._cb_is_open # trigger transition + assert api._cb_state == "HALF_OPEN" + + # Probe fails — should re-trip + api._cb_record_failure(response_code="B0600") + assert api._cb_state == "OPEN" + assert api._cb_trip_count == 2 + expected_recovery = min(first_recovery * SOLIS_CB_BACKOFF_MULTIPLIER, SOLIS_CB_MAX_RECOVERY_TIME) + assert api._cb_recovery_time == expected_recovery, f"Expected {expected_recovery}s recovery, got {api._cb_recovery_time}s" + + print("PASSED: HALF_OPEN re-trips on failure with doubled recovery time") + return False + + +async def test_cb_backoff_sequence_and_cap(): + """Recovery time doubles on each trip and caps at SOLIS_CB_MAX_RECOVERY_TIME""" + print("\n=== Test: circuit breaker backoff sequence and cap ===") + + api = MockSolisAPI() + expected = SOLIS_CB_INITIAL_RECOVERY_TIME + + trip = 0 + while expected < SOLIS_CB_MAX_RECOVERY_TIME: + api._cb_record_failure(response_code="B0600") + trip += 1 + assert api._cb_trip_count == trip + assert api._cb_recovery_time == expected, f"Trip {trip}: expected {expected}s, got {api._cb_recovery_time}s" + + # Transition to HALF_OPEN, then re-trip + api._cb_open_at = api._cb_open_at - api._cb_recovery_time - 1 + api._cb_is_open # OPEN → HALF_OPEN + + expected = min(expected * SOLIS_CB_BACKOFF_MULTIPLIER, SOLIS_CB_MAX_RECOVERY_TIME) + + # One more trip — should be capped + api._cb_record_failure(response_code="B0600") + assert api._cb_recovery_time == SOLIS_CB_MAX_RECOVERY_TIME, f"Expected cap at {SOLIS_CB_MAX_RECOVERY_TIME}s, got {api._cb_recovery_time}s" + + print("PASSED: circuit breaker backoff sequence and cap") + return False + + +async def test_cb_full_recovery_resets_trip_count(): + """Full recovery (_cb_record_success from HALF_OPEN) resets trip count and recovery time""" + print("\n=== Test: full recovery resets trip count ===") + + api = MockSolisAPI() + # Trip multiple times + for _ in range(3): + api._cb_record_failure(response_code="B0600") + api._cb_open_at = api._cb_open_at - api._cb_recovery_time - 1 + api._cb_is_open # OPEN → HALF_OPEN + + assert api._cb_trip_count == 3 + assert api._cb_state == "HALF_OPEN" + + # Successful probe + api._cb_record_success() + assert api._cb_state == "CLOSED" + assert api._cb_trip_count == 0 + assert api._cb_recovery_time == SOLIS_CB_INITIAL_RECOVERY_TIME + + print("PASSED: full recovery resets trip count") + return False + + +async def test_cb_with_retry_bails_on_open(): + """_with_retry bails immediately when circuit is OPEN (does not sleep and retry)""" + print("\n=== Test: _with_retry bails immediately when circuit is OPEN ===") + + api = MockSolisAPI() + api._cb_record_failure(response_code="B0600") + assert api._cb_state == "OPEN" + + call_count = [0] + + async def failing_operation(): + call_count[0] += 1 + raise SolisAPIError("simulated API error") + + try: + await api._with_retry(failing_operation, max_retry_time=30) + assert False, "Expected SolisAPIError to be raised" + except SolisAPIError: + pass + + assert call_count[0] == 1, f"Expected exactly 1 call (no retries), got {call_count[0]}" + + print("PASSED: _with_retry bails immediately when circuit is OPEN") + return False + + +async def test_cb_run_skips_when_open(): + """run() short-circuits and calls publish_entities() when circuit is OPEN""" + print("\n=== Test: run() skips API calls when circuit is OPEN ===") + + api = MockSolisAPI() + + # Pre-populate so publish_entities() doesn't crash + api.inverter_sn = [] + api._cb_record_failure(response_code="B0600") + assert api._cb_state == "OPEN" + + publish_called = [] + original_publish = api.publish_entities + + async def mock_publish(): + publish_called.append(True) + + api.publish_entities = mock_publish + + result = await api.run(seconds=60, first=False) + + assert result is False, f"Expected run() to return False when circuit is OPEN, got {result}" + assert publish_called, "Expected publish_entities() to be called when circuit is OPEN" + assert any("Circuit breaker OPEN" in m for m in api.log_messages), "Expected circuit breaker log message" + + print("PASSED: run() skips API calls when circuit is OPEN") + return False + + def run_solis_tests(my_predbat): """ Run all Solis API tests @@ -366,6 +625,17 @@ def run_solis_tests(my_predbat): failed |= asyncio.run(test_fetch_entity_data_power_clamping()) failed |= asyncio.run(test_fetch_entity_data_invalid_values()) failed |= asyncio.run(test_automatic_config()) + # Circuit breaker tests + failed |= asyncio.run(test_cb_closed_to_open_on_threshold()) + failed |= asyncio.run(test_cb_immediate_trip_on_throttle_code()) + failed |= asyncio.run(test_cb_open_blocks_calls()) + failed |= asyncio.run(test_cb_open_to_half_open_after_recovery()) + failed |= asyncio.run(test_cb_half_open_to_closed_on_success()) + failed |= asyncio.run(test_cb_half_open_retrips_on_failure()) + failed |= asyncio.run(test_cb_backoff_sequence_and_cap()) + failed |= asyncio.run(test_cb_full_recovery_resets_trip_count()) + failed |= asyncio.run(test_cb_with_retry_bails_on_open()) + failed |= asyncio.run(test_cb_run_skips_when_open()) except Exception as e: print(f"Error running Solis tests: {e}")