From 5464d53a0133abc92013e6f17926a6cb0d957f16 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 12:37:43 +0000 Subject: [PATCH 01/12] Implement ActionQueue for batching actions in OverkizClient and add integration tests --- pyoverkiz/action_queue.py | 217 ++++++++++++++++++++ pyoverkiz/client.py | 66 +++++- test_queue_example.py | 151 ++++++++++++++ tests/test_action_queue.py | 270 +++++++++++++++++++++++++ tests/test_client_queue_integration.py | 221 ++++++++++++++++++++ 5 files changed, 923 insertions(+), 2 deletions(-) create mode 100644 pyoverkiz/action_queue.py create mode 100644 test_queue_example.py create mode 100644 tests/test_action_queue.py create mode 100644 tests/test_client_queue_integration.py diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py new file mode 100644 index 00000000..c709ca8b --- /dev/null +++ b/pyoverkiz/action_queue.py @@ -0,0 +1,217 @@ +"""Action queue for batching multiple action executions into single API calls.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Callable, Coroutine +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pyoverkiz.enums import CommandMode + from pyoverkiz.models import Action + + +class QueuedExecution: + """Represents a queued action execution that will resolve to an exec_id when the batch executes.""" + + def __init__(self) -> None: + self._future: asyncio.Future[str] = asyncio.Future() + + def set_result(self, exec_id: str) -> None: + """Set the execution ID result.""" + if not self._future.done(): + self._future.set_result(exec_id) + + def set_exception(self, exception: Exception) -> None: + """Set an exception if the batch execution failed.""" + if not self._future.done(): + self._future.set_exception(exception) + + def __await__(self): + """Make this awaitable.""" + return self._future.__await__() + + +class ActionQueue: + """ + Batches multiple action executions into single API calls. + + When actions are added, they are held for a configurable delay period. + If more actions arrive during this window, they are batched together. + The batch is flushed when: + - The delay timer expires + - The max actions limit is reached + - The command mode changes + - Manual flush is requested + """ + + def __init__( + self, + executor: Callable[ + [list[Action], CommandMode | None, str | None], Coroutine[None, None, str] + ], + delay: float = 0.5, + max_actions: int = 20, + ) -> None: + """ + Initialize the action queue. + + :param executor: Async function to execute batched actions + :param delay: Seconds to wait before auto-flushing (default 0.5) + :param max_actions: Maximum actions per batch before forced flush (default 20) + """ + self._executor = executor + self._delay = delay + self._max_actions = max_actions + + self._pending_actions: list[Action] = [] + self._pending_mode: CommandMode | None = None + self._pending_label: str | None = None + self._pending_waiters: list[QueuedExecution] = [] + + self._flush_task: asyncio.Task[None] | None = None + self._lock = asyncio.Lock() + + async def add( + self, + actions: list[Action], + mode: CommandMode | None = None, + label: str | None = None, + ) -> QueuedExecution: + """ + Add actions to the queue. + + :param actions: Actions to queue + :param mode: Command mode (will flush if different from pending mode) + :param label: Label for the action group + :return: QueuedExecution that resolves to exec_id when batch executes + """ + async with self._lock: + # If mode or label changes, flush existing queue first + if self._pending_actions and ( + mode != self._pending_mode or label != self._pending_label + ): + await self._flush_now() + + # Add actions to pending queue + self._pending_actions.extend(actions) + self._pending_mode = mode + self._pending_label = label + + # Create waiter for this caller + waiter = QueuedExecution() + self._pending_waiters.append(waiter) + + # If we hit max actions, flush immediately + if len(self._pending_actions) >= self._max_actions: + await self._flush_now() + else: + # Schedule delayed flush if not already scheduled + if self._flush_task is None or self._flush_task.done(): + self._flush_task = asyncio.create_task(self._delayed_flush()) + + return waiter + + async def _delayed_flush(self) -> None: + """Wait for the delay period, then flush the queue.""" + await asyncio.sleep(self._delay) + async with self._lock: + if not self._pending_actions: + return + + # Take snapshot and clear state while holding lock + actions = self._pending_actions + mode = self._pending_mode + label = self._pending_label + waiters = self._pending_waiters + + self._pending_actions = [] + self._pending_mode = None + self._pending_label = None + self._pending_waiters = [] + self._flush_task = None + + # Execute outside the lock + try: + exec_id = await self._executor(actions, mode, label) + for waiter in waiters: + waiter.set_result(exec_id) + except Exception as exc: + for waiter in waiters: + waiter.set_exception(exc) + + async def _flush_now(self) -> None: + """Execute pending actions immediately (must be called with lock held).""" + if not self._pending_actions: + return + + # Cancel any pending flush task + if self._flush_task and not self._flush_task.done(): + self._flush_task.cancel() + self._flush_task = None + + # Take snapshot of current batch + actions = self._pending_actions + mode = self._pending_mode + label = self._pending_label + waiters = self._pending_waiters + + # Clear pending state + self._pending_actions = [] + self._pending_mode = None + self._pending_label = None + self._pending_waiters = [] + + # Execute the batch (must release lock before calling executor to avoid deadlock) + # Note: This is called within a lock context, we'll execute outside + try: + exec_id = await self._executor(actions, mode, label) + # Notify all waiters + for waiter in waiters: + waiter.set_result(exec_id) + except Exception as exc: + # Propagate exception to all waiters + for waiter in waiters: + waiter.set_exception(exc) + raise + + async def flush(self) -> list[str]: + """ + Force flush all pending actions immediately. + + :return: List of exec_ids from flushed batches + """ + async with self._lock: + if not self._pending_actions: + return [] + + # Since we can only have one batch pending at a time, + # this will return a single exec_id in a list + exec_ids: list[str] = [] + + try: + await self._flush_now() + # If flush succeeded, we can't actually return the exec_id here + # since it's delivered via the waiters. This method is mainly + # for forcing a flush, not retrieving results. + # Return empty list to indicate flush completed + except Exception: + # If flush fails, the exception will be propagated to waiters + # and also raised here + raise + + return exec_ids + + def get_pending_count(self) -> int: + """Get the number of actions currently waiting in the queue.""" + return len(self._pending_actions) + + async def shutdown(self) -> None: + """Shutdown the queue, flushing any pending actions.""" + async with self._lock: + if self._flush_task and not self._flush_task.done(): + self._flush_task.cancel() + self._flush_task = None + + if self._pending_actions: + await self._flush_now() diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index c81b20ea..32050d38 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -19,6 +19,7 @@ ServerDisconnectedError, ) +from pyoverkiz.action_queue import ActionQueue, QueuedExecution from pyoverkiz.auth import AuthStrategy, Credentials, build_auth_strategy from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.enums import APIType, CommandMode, Server @@ -141,6 +142,7 @@ class OverkizClient: session: ClientSession _ssl: ssl.SSLContext | bool = True _auth: AuthStrategy + _action_queue: ActionQueue | None = None def __init__( self, @@ -149,11 +151,17 @@ def __init__( credentials: Credentials, verify_ssl: bool = True, session: ClientSession | None = None, + action_queue_enabled: bool = False, + action_queue_delay: float = 0.5, + action_queue_max_actions: int = 20, ) -> None: """Constructor. :param server: ServerConfig :param session: optional ClientSession + :param action_queue_enabled: enable action batching queue (default False) + :param action_queue_delay: seconds to wait before flushing queue (default 0.5) + :param action_queue_max_actions: max actions per batch (default 20) """ self.server_config = self._normalize_server(server) @@ -173,6 +181,14 @@ def __init__( # Use the prebuilt SSL context with disabled strict validation for local API. self._ssl = SSL_CONTEXT_LOCAL_API + # Initialize action queue if enabled + if action_queue_enabled: + self._action_queue = ActionQueue( + executor=self._execute_action_group_direct, + delay=action_queue_delay, + max_actions=action_queue_max_actions, + ) + self._auth = build_auth_strategy( server_config=self.server_config, credentials=credentials, @@ -210,6 +226,10 @@ def _normalize_server(server: ServerConfig | Server | str) -> ServerConfig: async def close(self) -> None: """Close the session.""" + # Flush any pending actions in queue + if self._action_queue: + await self._action_queue.shutdown() + if self.event_listener_id: await self.unregister_event_listener() @@ -431,13 +451,13 @@ async def get_api_version(self) -> str: @retry_on_too_many_executions @retry_on_auth_error - async def execute_action_group( + async def _execute_action_group_direct( self, actions: list[Action], mode: CommandMode | None = None, label: str | None = "python-overkiz-api", ) -> str: - """Execute a non-persistent action group. + """Execute a non-persistent action group directly (internal method). The executed action group does not have to be persisted on the server before use. Per-session rate-limit : 1 calls per 28min 48s period for all operations of the same category (exec) @@ -462,6 +482,48 @@ async def execute_action_group( return cast(str, response["execId"]) + async def execute_action_group( + self, + actions: list[Action], + mode: CommandMode | None = None, + label: str | None = "python-overkiz-api", + ) -> str | QueuedExecution: + """Execute a non-persistent action group. + + If action queue is enabled, actions will be batched with other actions + executed within the configured delay window. Returns a QueuedExecution + that can be awaited to get the exec_id. + + If action queue is disabled, executes immediately and returns exec_id directly. + + :param actions: List of actions to execute + :param mode: Command mode (GEOLOCATED, INTERNAL, HIGH_PRIORITY, or None) + :param label: Label for the action group + :return: exec_id string (if queue disabled) or QueuedExecution (if queue enabled) + """ + if self._action_queue: + return await self._action_queue.add(actions, mode, label) + else: + return await self._execute_action_group_direct(actions, mode, label) + + async def flush_action_queue(self) -> None: + """Force flush all pending actions in the queue immediately. + + If action queue is disabled, this method does nothing. + If there are no pending actions, this method does nothing. + """ + if self._action_queue: + await self._action_queue.flush() + + def get_pending_actions_count(self) -> int: + """Get the number of actions currently waiting in the queue. + + Returns 0 if action queue is disabled. + """ + if self._action_queue: + return self._action_queue.get_pending_count() + return 0 + @retry_on_auth_error async def cancel_command(self, exec_id: str) -> None: """Cancel a running setup-level execution.""" diff --git a/test_queue_example.py b/test_queue_example.py new file mode 100644 index 00000000..15bc527a --- /dev/null +++ b/test_queue_example.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +"""Simple example demonstrating the action queue feature.""" + +import asyncio + +from pyoverkiz.client import OverkizClient +from pyoverkiz.const import SUPPORTED_SERVERS +from pyoverkiz.enums import OverkizCommand, Server +from pyoverkiz.models import Action, Command + + +async def example_without_queue(): + """Example: Execute actions without queue (immediate execution).""" + print("\n=== Example 1: Without Queue (Immediate Execution) ===") + + client = OverkizClient( + username="user@example.com", + password="password", + server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + action_queue_enabled=False, # Queue disabled + ) + + # Create some example actions + _action1 = Action( + device_url="io://1234-5678-9012/12345678", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + # This will execute immediately + print("Executing action 1...") + # exec_id = await client.execute_action_group([action1]) + # print(f"Got exec_id immediately: {exec_id}") + + print("Without queue: Each call executes immediately as a separate API request") + await client.close() + + +async def example_with_queue(): + """Example: Execute actions with queue (batched execution).""" + print("\n=== Example 2: With Queue (Batched Execution) ===") + + client = OverkizClient( + username="user@example.com", + password="password", + server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + action_queue_enabled=True, # Queue enabled! + action_queue_delay=0.5, # Wait 500ms before flushing + action_queue_max_actions=20, # Max 20 actions per batch + ) + + # Create some example actions + action1 = Action( + device_url="io://1234-5678-9012/12345678", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + action2 = Action( + device_url="io://1234-5678-9012/87654321", + commands=[Command(name=OverkizCommand.OPEN)], + ) + + action3 = Action( + device_url="io://1234-5678-9012/11111111", + commands=[Command(name=OverkizCommand.STOP)], + ) + + # These will be queued and batched together! + print("Queueing action 1...") + queued1 = await client.execute_action_group([action1]) + print(f"Got QueuedExecution object: {queued1}") + + print("Queueing action 2...") + _queued2 = await client.execute_action_group([action2]) + + print("Queueing action 3...") + _queued3 = await client.execute_action_group([action3]) + + print(f"Pending actions in queue: {client.get_pending_actions_count()}") + + # Wait for all actions to execute (they'll be batched together) + print("\nWaiting for batch to execute...") + # exec_id1 = await queued1 + # exec_id2 = await queued2 + # exec_id3 = await queued3 + + # All three will have the same exec_id since they were batched together! + # print(f"Exec ID 1: {exec_id1}") + # print(f"Exec ID 2: {exec_id2}") + # print(f"Exec ID 3: {exec_id3}") + # print(f"All same? {exec_id1 == exec_id2 == exec_id3}") + + print("\nWith queue: Multiple actions batched into single API request!") + await client.close() + + +async def example_manual_flush(): + """Example: Manually flush the queue.""" + print("\n=== Example 3: Manual Flush ===") + + client = OverkizClient( + username="user@example.com", + password="password", + server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + action_queue_enabled=True, + action_queue_delay=10.0, # Long delay + ) + + action = Action( + device_url="io://1234-5678-9012/12345678", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + print("Queueing action with 10s delay...") + _queued = await client.execute_action_group([action]) + + print(f"Pending actions: {client.get_pending_actions_count()}") + + # Don't want to wait 10 seconds? Flush manually! + print("Manually flushing queue...") + await client.flush_action_queue() + + print(f"Pending actions after flush: {client.get_pending_actions_count()}") + + # Now we can await the result + # exec_id = await queued + # print(f"Got exec_id: {exec_id}") + + await client.close() + + +async def main(): + """Run all examples.""" + print("=" * 60) + print("Action Queue Feature Examples") + print("=" * 60) + + await example_without_queue() + await example_with_queue() + await example_manual_flush() + + print("\n" + "=" * 60) + print("Key Benefits:") + print("- Reduces API calls by batching actions") + print("- Helps avoid Overkiz rate limits") + print("- Perfect for scenes/automations with multiple devices") + print("- Fully backward compatible (disabled by default)") + print("=" * 60) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/test_action_queue.py b/tests/test_action_queue.py new file mode 100644 index 00000000..aea39313 --- /dev/null +++ b/tests/test_action_queue.py @@ -0,0 +1,270 @@ +"""Tests for ActionQueue.""" + +import asyncio +from unittest.mock import AsyncMock + +import pytest + +from pyoverkiz.action_queue import ActionQueue, QueuedExecution +from pyoverkiz.enums import CommandMode, OverkizCommand +from pyoverkiz.models import Action, Command + + +@pytest.fixture +def mock_executor(): + """Create a mock executor function.""" + + async def executor(actions, mode, label): + # Return immediately, no delay + return f"exec-{len(actions)}-{mode}-{label}" + + return AsyncMock(side_effect=executor) + + +@pytest.mark.asyncio +async def test_action_queue_single_action(mock_executor): + """Test queue with a single action.""" + queue = ActionQueue(executor=mock_executor, delay=0.1) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + queued = await queue.add([action]) + assert isinstance(queued, QueuedExecution) + + # Wait for the batch to execute + exec_id = await queued + assert exec_id.startswith("exec-1-") + + # Verify executor was called + mock_executor.assert_called_once() + + +@pytest.mark.asyncio +async def test_action_queue_batching(mock_executor): + """Test that multiple actions are batched together.""" + queue = ActionQueue(executor=mock_executor, delay=0.2) + + actions = [ + Action( + device_url=f"io://1234-5678-9012/{i}", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + for i in range(3) + ] + + # Add actions in quick succession + queued1 = await queue.add([actions[0]]) + queued2 = await queue.add([actions[1]]) + queued3 = await queue.add([actions[2]]) + + # All should return the same exec_id + exec_id1 = await queued1 + exec_id2 = await queued2 + exec_id3 = await queued3 + + assert exec_id1 == exec_id2 == exec_id3 + assert "exec-3-" in exec_id1 # 3 actions in batch + + # Executor should be called only once + mock_executor.assert_called_once() + + +@pytest.mark.asyncio +async def test_action_queue_max_actions_flush(mock_executor): + """Test that queue flushes when max actions is reached.""" + queue = ActionQueue(executor=mock_executor, delay=10.0, max_actions=3) + + actions = [ + Action( + device_url=f"io://1234-5678-9012/{i}", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + for i in range(5) + ] + + # Add 3 actions - should trigger flush + queued1 = await queue.add([actions[0]]) + queued2 = await queue.add([actions[1]]) + queued3 = await queue.add([actions[2]]) + + # Wait a bit for flush to complete + await asyncio.sleep(0.05) + + # First 3 should be done + assert queued1._future.done() + assert queued2._future.done() + assert queued3._future.done() + + # Add 2 more - should start a new batch + queued4 = await queue.add([actions[3]]) + queued5 = await queue.add([actions[4]]) + + # Wait for second batch + await queued4 + await queued5 + + # Should have been called twice (2 batches) + assert mock_executor.call_count == 2 + + +@pytest.mark.asyncio +async def test_action_queue_mode_change_flush(mock_executor): + """Test that queue flushes when command mode changes.""" + queue = ActionQueue(executor=mock_executor, delay=0.5) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + # Add action with normal mode + queued1 = await queue.add([action], mode=None) + + # Add action with high priority - should flush previous batch + queued2 = await queue.add([action], mode=CommandMode.HIGH_PRIORITY) + + # Wait for both batches + exec_id1 = await queued1 + exec_id2 = await queued2 + + # Should be different exec_ids (different batches) + assert exec_id1 != exec_id2 + + # Should have been called twice + assert mock_executor.call_count == 2 + + +@pytest.mark.asyncio +async def test_action_queue_label_change_flush(mock_executor): + """Test that queue flushes when label changes.""" + queue = ActionQueue(executor=mock_executor, delay=0.5) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + # Add action with label1 + queued1 = await queue.add([action], label="label1") + + # Add action with label2 - should flush previous batch + queued2 = await queue.add([action], label="label2") + + # Wait for both batches + exec_id1 = await queued1 + exec_id2 = await queued2 + + # Should be different exec_ids (different batches) + assert exec_id1 != exec_id2 + + # Should have been called twice + assert mock_executor.call_count == 2 + + +@pytest.mark.asyncio +async def test_action_queue_manual_flush(mock_executor): + """Test manual flush of the queue.""" + queue = ActionQueue(executor=mock_executor, delay=10.0) # Long delay + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + queued = await queue.add([action]) + + # Manually flush + await queue.flush() + + # Should be done now + assert queued._future.done() + exec_id = await queued + assert exec_id.startswith("exec-1-") + + +@pytest.mark.asyncio +async def test_action_queue_shutdown(mock_executor): + """Test that shutdown flushes pending actions.""" + queue = ActionQueue(executor=mock_executor, delay=10.0) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + queued = await queue.add([action]) + + # Shutdown should flush + await queue.shutdown() + + # Should be done + assert queued._future.done() + mock_executor.assert_called_once() + + +@pytest.mark.asyncio +async def test_action_queue_error_propagation(mock_executor): + """Test that exceptions are propagated to all waiters.""" + # Make executor raise an exception + mock_executor.side_effect = ValueError("API Error") + + queue = ActionQueue(executor=mock_executor, delay=0.1) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + queued1 = await queue.add([action]) + queued2 = await queue.add([action]) + + # Both should raise the same exception + with pytest.raises(ValueError, match="API Error"): + await queued1 + + with pytest.raises(ValueError, match="API Error"): + await queued2 + + +@pytest.mark.asyncio +async def test_action_queue_get_pending_count(): + """Test getting pending action count.""" + mock_executor = AsyncMock(return_value="exec-123") + queue = ActionQueue(executor=mock_executor, delay=0.5) + + assert queue.get_pending_count() == 0 + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + await queue.add([action]) + assert queue.get_pending_count() == 1 + + await queue.add([action]) + assert queue.get_pending_count() == 2 + + # Wait for flush + await asyncio.sleep(0.6) + assert queue.get_pending_count() == 0 + + +@pytest.mark.asyncio +async def test_queued_execution_awaitable(): + """Test that QueuedExecution is properly awaitable.""" + queued = QueuedExecution() + + # Set result in background + async def set_result(): + await asyncio.sleep(0.05) + queued.set_result("exec-123") + + _task = asyncio.create_task(set_result()) # noqa: RUF006 + + # Await the result + result = await queued + assert result == "exec-123" diff --git a/tests/test_client_queue_integration.py b/tests/test_client_queue_integration.py new file mode 100644 index 00000000..40cd91f6 --- /dev/null +++ b/tests/test_client_queue_integration.py @@ -0,0 +1,221 @@ +"""Integration tests for OverkizClient with ActionQueue.""" + +import asyncio +from unittest.mock import AsyncMock, patch + +import pytest + +from pyoverkiz.client import OverkizClient +from pyoverkiz.const import SUPPORTED_SERVERS +from pyoverkiz.enums import OverkizCommand, Server +from pyoverkiz.models import Action, Command + + +@pytest.mark.asyncio +async def test_client_without_queue_executes_immediately(): + """Test that client without queue executes actions immediately.""" + client = OverkizClient( + username="test@example.com", + password="test", + server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + action_queue_enabled=False, + ) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + # Mock the internal execution + with patch.object( + client, "_OverkizClient__post", new_callable=AsyncMock + ) as mock_post: + mock_post.return_value = {"execId": "exec-123"} + + result = await client.execute_action_group([action]) + + # Should return exec_id directly (string) + assert isinstance(result, str) + assert result == "exec-123" + + # Should have called API immediately + mock_post.assert_called_once() + + await client.close() + + +@pytest.mark.asyncio +async def test_client_with_queue_batches_actions(): + """Test that client with queue batches multiple actions.""" + client = OverkizClient( + username="test@example.com", + password="test", + server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + action_queue_enabled=True, + action_queue_delay=0.1, + ) + + actions = [ + Action( + device_url=f"io://1234-5678-9012/{i}", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + for i in range(3) + ] + + with patch.object( + client, "_OverkizClient__post", new_callable=AsyncMock + ) as mock_post: + mock_post.return_value = {"execId": "exec-batched"} + + # Queue multiple actions quickly + queued1 = await client.execute_action_group([actions[0]]) + queued2 = await client.execute_action_group([actions[1]]) + queued3 = await client.execute_action_group([actions[2]]) + + # Should have 3 actions pending + assert client.get_pending_actions_count() == 3 + + # Wait for all to execute + exec_id1 = await queued1 + exec_id2 = await queued2 + exec_id3 = await queued3 + + # All should have the same exec_id (batched together) + assert exec_id1 == exec_id2 == exec_id3 == "exec-batched" + + # Should have called API only once (batched) + mock_post.assert_called_once() + + # Check that all 3 actions were in the batch + call_args = mock_post.call_args + payload = call_args[0][1] # Second argument is the payload + assert len(payload["actions"]) == 3 + + await client.close() + + +@pytest.mark.asyncio +async def test_client_manual_flush(): + """Test manually flushing the queue.""" + client = OverkizClient( + username="test@example.com", + password="test", + server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + action_queue_enabled=True, + action_queue_delay=10.0, # Long delay + ) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + with patch.object( + client, "_OverkizClient__post", new_callable=AsyncMock + ) as mock_post: + mock_post.return_value = {"execId": "exec-flushed"} + + queued = await client.execute_action_group([action]) + + # Should have 1 action pending + assert client.get_pending_actions_count() == 1 + + # Manually flush + await client.flush_action_queue() + + # Should be executed now + assert client.get_pending_actions_count() == 0 + + exec_id = await queued + assert exec_id == "exec-flushed" + + mock_post.assert_called_once() + + await client.close() + + +@pytest.mark.asyncio +async def test_client_close_flushes_queue(): + """Test that closing the client flushes pending actions.""" + client = OverkizClient( + username="test@example.com", + password="test", + server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + action_queue_enabled=True, + action_queue_delay=10.0, + ) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + with patch.object( + client, "_OverkizClient__post", new_callable=AsyncMock + ) as mock_post: + mock_post.return_value = {"execId": "exec-closed"} + + queued = await client.execute_action_group([action]) + + # Close should flush + await client.close() + + # Should be executed + exec_id = await queued + assert exec_id == "exec-closed" + + mock_post.assert_called_once() + + +@pytest.mark.asyncio +async def test_client_queue_respects_max_actions(): + """Test that queue flushes when max actions is reached.""" + client = OverkizClient( + username="test@example.com", + password="test", + server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + action_queue_enabled=True, + action_queue_delay=10.0, + action_queue_max_actions=2, # Max 2 actions + ) + + actions = [ + Action( + device_url=f"io://1234-5678-9012/{i}", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + for i in range(3) + ] + + with patch.object( + client, "_OverkizClient__post", new_callable=AsyncMock + ) as mock_post: + mock_post.return_value = {"execId": "exec-123"} + + # Add 2 actions - should trigger flush + queued1 = await client.execute_action_group([actions[0]]) + queued2 = await client.execute_action_group([actions[1]]) + + # Wait a bit for flush + await asyncio.sleep(0.05) + + # First 2 should be done + exec_id1 = await queued1 + exec_id2 = await queued2 + assert exec_id1 == "exec-123" + assert exec_id2 == "exec-123" + + # Add third action - starts new batch + queued3 = await client.execute_action_group([actions[2]]) + + # Flush to complete + await client.flush_action_queue() + + exec_id3 = await queued3 + assert exec_id3 == "exec-123" + + # Should have been called twice (2 batches) + assert mock_post.call_count == 2 + + await client.close() From 2ddffc7d7c005ddc7d23515b8eb0af11556b4814 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 16:23:30 +0000 Subject: [PATCH 02/12] Refactor docstrings for QueuedExecution and ActionQueue classes for consistency and clarity --- pyoverkiz/action_queue.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py index c709ca8b..6d01aab2 100644 --- a/pyoverkiz/action_queue.py +++ b/pyoverkiz/action_queue.py @@ -15,6 +15,7 @@ class QueuedExecution: """Represents a queued action execution that will resolve to an exec_id when the batch executes.""" def __init__(self) -> None: + """Initialize the queued execution.""" self._future: asyncio.Future[str] = asyncio.Future() def set_result(self, exec_id: str) -> None: @@ -33,8 +34,7 @@ def __await__(self): class ActionQueue: - """ - Batches multiple action executions into single API calls. + """Batches multiple action executions into single API calls. When actions are added, they are held for a configurable delay period. If more actions arrive during this window, they are batched together. @@ -53,8 +53,7 @@ def __init__( delay: float = 0.5, max_actions: int = 20, ) -> None: - """ - Initialize the action queue. + """Initialize the action queue. :param executor: Async function to execute batched actions :param delay: Seconds to wait before auto-flushing (default 0.5) @@ -78,8 +77,7 @@ async def add( mode: CommandMode | None = None, label: str | None = None, ) -> QueuedExecution: - """ - Add actions to the queue. + """Add actions to the queue. :param actions: Actions to queue :param mode: Command mode (will flush if different from pending mode) @@ -176,8 +174,7 @@ async def _flush_now(self) -> None: raise async def flush(self) -> list[str]: - """ - Force flush all pending actions immediately. + """Force flush all pending actions immediately. :return: List of exec_ids from flushed batches """ From 279b57668bec6c935e23f76f5a9b619d45be99a9 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 16:28:54 +0000 Subject: [PATCH 03/12] Update Command class to accept OverkizCommand or str for name parameter and enhance example script with type hints --- pyoverkiz/models.py | 2 +- test_queue_example.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pyoverkiz/models.py b/pyoverkiz/models.py index ce2c0064..9ac74062 100644 --- a/pyoverkiz/models.py +++ b/pyoverkiz/models.py @@ -472,7 +472,7 @@ class Command: """Represents an OverKiz Command.""" type: int | None = None - name: OverkizCommand + name: str | OverkizCommand parameters: list[str | int | float | OverkizCommandParam] | None def __init__( diff --git a/test_queue_example.py b/test_queue_example.py index 15bc527a..beafe6a0 100644 --- a/test_queue_example.py +++ b/test_queue_example.py @@ -1,6 +1,11 @@ #!/usr/bin/env python3 +# mypy: ignore-errors +# ty: ignore + """Simple example demonstrating the action queue feature.""" +from __future__ import annotations + import asyncio from pyoverkiz.client import OverkizClient From d1e73516b47f647cd4ba3eee2b247298224bae12 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Sat, 3 Jan 2026 22:34:26 +0100 Subject: [PATCH 04/12] Fix concurrency issues and simplify API in ActionQueue implementation (#1876) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - [x] Update integration tests to match the new simplified API - Updated test_client_with_queue_batches_actions to use asyncio.create_task for concurrent execution - Updated test_client_manual_flush to start execution as a task before checking pending count - Updated test_client_close_flushes_queue to work with str return type instead of QueuedExecution - Updated test_client_queue_respects_max_actions to handle direct exec_id returns - All 15 queue-related tests now passing (10 unit tests + 5 integration tests) --- ✨ Let Copilot coding agent [set things up for you](https://github.com/iMicknl/python-overkiz-api/issues/new?title=✨+Set+up+Copilot+instructions&body=Configure%20instructions%20for%20this%20repository%20as%20documented%20in%20%5BBest%20practices%20for%20Copilot%20coding%20agent%20in%20your%20repository%5D%28https://gh.io/copilot-coding-agent-tips%29%2E%0A%0A%3COnboard%20this%20repo%3E&assignees=copilot) — coding agent works faster and does higher quality work when set up for your repo. --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: iMicknl <1424596+iMicknl@users.noreply.github.com> --- pyoverkiz/action_queue.py | 167 ++++++++++++++++--------- pyoverkiz/client.py | 32 +++-- test_queue_example.py | 37 +++--- tests/test_action_queue.py | 15 ++- tests/test_client_queue_integration.py | 50 ++++---- 5 files changed, 187 insertions(+), 114 deletions(-) diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py index 6d01aab2..118c5245 100644 --- a/pyoverkiz/action_queue.py +++ b/pyoverkiz/action_queue.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import contextlib from collections.abc import Callable, Coroutine from typing import TYPE_CHECKING @@ -23,11 +24,15 @@ def set_result(self, exec_id: str) -> None: if not self._future.done(): self._future.set_result(exec_id) - def set_exception(self, exception: Exception) -> None: + def set_exception(self, exception: BaseException) -> None: """Set an exception if the batch execution failed.""" if not self._future.done(): self._future.set_exception(exception) + def is_done(self) -> bool: + """Check if the execution has completed (either with result or exception).""" + return self._future.done() + def __await__(self): """Make this awaitable.""" return self._future.__await__() @@ -84,64 +89,92 @@ async def add( :param label: Label for the action group :return: QueuedExecution that resolves to exec_id when batch executes """ + batch_to_execute = None + async with self._lock: # If mode or label changes, flush existing queue first if self._pending_actions and ( mode != self._pending_mode or label != self._pending_label ): - await self._flush_now() + batch_to_execute = self._prepare_flush() # Add actions to pending queue self._pending_actions.extend(actions) self._pending_mode = mode self._pending_label = label - # Create waiter for this caller + # Create waiter for this caller. This waiter is added to the current + # batch being built, even if we flushed a previous batch above due to + # a mode/label change. This ensures the waiter belongs to the batch + # containing the actions we just added. waiter = QueuedExecution() self._pending_waiters.append(waiter) # If we hit max actions, flush immediately if len(self._pending_actions) >= self._max_actions: - await self._flush_now() - else: + # Prepare the current batch for flushing (which includes the actions + # we just added). If we already flushed due to mode change, this is + # a second batch. + new_batch = self._prepare_flush() + # Execute the first batch if it exists, then the second + if batch_to_execute: + await self._execute_batch(*batch_to_execute) + batch_to_execute = new_batch + elif self._flush_task is None or self._flush_task.done(): # Schedule delayed flush if not already scheduled - if self._flush_task is None or self._flush_task.done(): - self._flush_task = asyncio.create_task(self._delayed_flush()) + self._flush_task = asyncio.create_task(self._delayed_flush()) + + # Execute batch outside the lock if we flushed + if batch_to_execute: + await self._execute_batch(*batch_to_execute) - return waiter + return waiter async def _delayed_flush(self) -> None: """Wait for the delay period, then flush the queue.""" - await asyncio.sleep(self._delay) - async with self._lock: - if not self._pending_actions: - return - - # Take snapshot and clear state while holding lock - actions = self._pending_actions - mode = self._pending_mode - label = self._pending_label - waiters = self._pending_waiters - - self._pending_actions = [] - self._pending_mode = None - self._pending_label = None - self._pending_waiters = [] - self._flush_task = None - - # Execute outside the lock + waiters: list[QueuedExecution] = [] try: - exec_id = await self._executor(actions, mode, label) - for waiter in waiters: - waiter.set_result(exec_id) - except Exception as exc: + await asyncio.sleep(self._delay) + async with self._lock: + if not self._pending_actions: + return + + # Take snapshot and clear state while holding lock + actions = self._pending_actions + mode = self._pending_mode + label = self._pending_label + waiters = self._pending_waiters + + self._pending_actions = [] + self._pending_mode = None + self._pending_label = None + self._pending_waiters = [] + self._flush_task = None + + # Execute outside the lock + try: + exec_id = await self._executor(actions, mode, label) + for waiter in waiters: + waiter.set_result(exec_id) + except Exception as exc: + for waiter in waiters: + waiter.set_exception(exc) + except asyncio.CancelledError as exc: + # Ensure all waiters are notified if this task is cancelled for waiter in waiters: waiter.set_exception(exc) + raise + + def _prepare_flush( + self, + ) -> tuple[list[Action], CommandMode | None, str | None, list[QueuedExecution]]: + """Prepare a flush by taking snapshot and clearing state (must be called with lock held). - async def _flush_now(self) -> None: - """Execute pending actions immediately (must be called with lock held).""" + Returns a tuple of (actions, mode, label, waiters) that should be executed + outside the lock using _execute_batch(). + """ if not self._pending_actions: - return + return ([], None, None, []) # Cancel any pending flush task if self._flush_task and not self._flush_task.done(): @@ -160,8 +193,19 @@ async def _flush_now(self) -> None: self._pending_label = None self._pending_waiters = [] - # Execute the batch (must release lock before calling executor to avoid deadlock) - # Note: This is called within a lock context, we'll execute outside + return (actions, mode, label, waiters) + + async def _execute_batch( + self, + actions: list[Action], + mode: CommandMode | None, + label: str | None, + waiters: list[QueuedExecution], + ) -> None: + """Execute a batch of actions and notify waiters (must be called without lock).""" + if not actions: + return + try: exec_id = await self._executor(actions, mode, label) # Notify all waiters @@ -173,42 +217,49 @@ async def _flush_now(self) -> None: waiter.set_exception(exc) raise - async def flush(self) -> list[str]: + async def flush(self) -> None: """Force flush all pending actions immediately. - :return: List of exec_ids from flushed batches + This method forces the queue to execute any pending batched actions + without waiting for the delay timer. The execution results are delivered + to the corresponding QueuedExecution objects returned by add(). + + This method is useful for forcing immediate execution without having to + wait for the delay timer to expire. """ + batch_to_execute = None async with self._lock: - if not self._pending_actions: - return [] - - # Since we can only have one batch pending at a time, - # this will return a single exec_id in a list - exec_ids: list[str] = [] + if self._pending_actions: + batch_to_execute = self._prepare_flush() - try: - await self._flush_now() - # If flush succeeded, we can't actually return the exec_id here - # since it's delivered via the waiters. This method is mainly - # for forcing a flush, not retrieving results. - # Return empty list to indicate flush completed - except Exception: - # If flush fails, the exception will be propagated to waiters - # and also raised here - raise - - return exec_ids + # Execute outside the lock + if batch_to_execute: + await self._execute_batch(*batch_to_execute) def get_pending_count(self) -> int: - """Get the number of actions currently waiting in the queue.""" + """Get the (approximate) number of actions currently waiting in the queue. + + This method does not acquire the internal lock and therefore returns a + best-effort snapshot that may be slightly out of date if the queue is + being modified concurrently by other coroutines. + """ return len(self._pending_actions) async def shutdown(self) -> None: """Shutdown the queue, flushing any pending actions.""" + batch_to_execute = None async with self._lock: if self._flush_task and not self._flush_task.done(): - self._flush_task.cancel() + task = self._flush_task + task.cancel() self._flush_task = None + # Wait for cancellation to complete + with contextlib.suppress(asyncio.CancelledError): + await task if self._pending_actions: - await self._flush_now() + batch_to_execute = self._prepare_flush() + + # Execute outside the lock + if batch_to_execute: + await self._execute_batch(*batch_to_execute) diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 32050d38..5fb5ac9b 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -19,7 +19,7 @@ ServerDisconnectedError, ) -from pyoverkiz.action_queue import ActionQueue, QueuedExecution +from pyoverkiz.action_queue import ActionQueue from pyoverkiz.auth import AuthStrategy, Credentials, build_auth_strategy from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.enums import APIType, CommandMode, Server @@ -161,7 +161,7 @@ def __init__( :param session: optional ClientSession :param action_queue_enabled: enable action batching queue (default False) :param action_queue_delay: seconds to wait before flushing queue (default 0.5) - :param action_queue_max_actions: max actions per batch (default 20) + :param action_queue_max_actions: maximum actions per batch before auto-flush (default 20) """ self.server_config = self._normalize_server(server) @@ -183,6 +183,11 @@ def __init__( # Initialize action queue if enabled if action_queue_enabled: + if action_queue_delay <= 0: + raise ValueError("action_queue_delay must be positive") + if action_queue_max_actions < 1: + raise ValueError("action_queue_max_actions must be at least 1") + self._action_queue = ActionQueue( executor=self._execute_action_group_direct, delay=action_queue_delay, @@ -487,22 +492,31 @@ async def execute_action_group( actions: list[Action], mode: CommandMode | None = None, label: str | None = "python-overkiz-api", - ) -> str | QueuedExecution: + ) -> str: """Execute a non-persistent action group. - If action queue is enabled, actions will be batched with other actions - executed within the configured delay window. Returns a QueuedExecution - that can be awaited to get the exec_id. + When action queue is enabled, actions will be batched with other actions + executed within the configured delay window. The method will wait for the + batch to execute and return the exec_id. + + When action queue is disabled, executes immediately and returns exec_id. - If action queue is disabled, executes immediately and returns exec_id directly. + The API is consistent regardless of queue configuration - always returns + exec_id string directly. :param actions: List of actions to execute :param mode: Command mode (GEOLOCATED, INTERNAL, HIGH_PRIORITY, or None) :param label: Label for the action group - :return: exec_id string (if queue disabled) or QueuedExecution (if queue enabled) + :return: exec_id string from the executed action group + + Example usage:: + + # Works the same with or without queue + exec_id = await client.execute_action_group([action]) """ if self._action_queue: - return await self._action_queue.add(actions, mode, label) + queued = await self._action_queue.add(actions, mode, label) + return await queued else: return await self._execute_action_group_direct(actions, mode, label) diff --git a/test_queue_example.py b/test_queue_example.py index beafe6a0..b39e64eb 100644 --- a/test_queue_example.py +++ b/test_queue_example.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # mypy: ignore-errors -# ty: ignore +# type: ignore """Simple example demonstrating the action queue feature.""" @@ -26,7 +26,7 @@ async def example_without_queue(): ) # Create some example actions - _action1 = Action( + Action( device_url="io://1234-5678-9012/12345678", commands=[Command(name=OverkizCommand.CLOSE)], ) @@ -71,28 +71,22 @@ async def example_with_queue(): # These will be queued and batched together! print("Queueing action 1...") - queued1 = await client.execute_action_group([action1]) - print(f"Got QueuedExecution object: {queued1}") + exec_id1 = await client.execute_action_group([action1]) + print(f"Got exec_id: {exec_id1}") print("Queueing action 2...") - _queued2 = await client.execute_action_group([action2]) + exec_id2 = await client.execute_action_group([action2]) print("Queueing action 3...") - _queued3 = await client.execute_action_group([action3]) + exec_id3 = await client.execute_action_group([action3]) print(f"Pending actions in queue: {client.get_pending_actions_count()}") - # Wait for all actions to execute (they'll be batched together) - print("\nWaiting for batch to execute...") - # exec_id1 = await queued1 - # exec_id2 = await queued2 - # exec_id3 = await queued3 - # All three will have the same exec_id since they were batched together! - # print(f"Exec ID 1: {exec_id1}") - # print(f"Exec ID 2: {exec_id2}") - # print(f"Exec ID 3: {exec_id3}") - # print(f"All same? {exec_id1 == exec_id2 == exec_id3}") + print(f"\nExec ID 1: {exec_id1}") + print(f"Exec ID 2: {exec_id2}") + print(f"Exec ID 3: {exec_id3}") + print(f"All same? {exec_id1 == exec_id2 == exec_id3}") print("\nWith queue: Multiple actions batched into single API request!") await client.close() @@ -116,8 +110,11 @@ async def example_manual_flush(): ) print("Queueing action with 10s delay...") - _queued = await client.execute_action_group([action]) + # Start execution in background (don't await yet) + exec_task = asyncio.create_task(client.execute_action_group([action])) + # Give it a moment to queue + await asyncio.sleep(0.1) print(f"Pending actions: {client.get_pending_actions_count()}") # Don't want to wait 10 seconds? Flush manually! @@ -126,9 +123,9 @@ async def example_manual_flush(): print(f"Pending actions after flush: {client.get_pending_actions_count()}") - # Now we can await the result - # exec_id = await queued - # print(f"Got exec_id: {exec_id}") + # Now get the result + exec_id = await exec_task + print(f"Got exec_id: {exec_id}") await client.close() diff --git a/tests/test_action_queue.py b/tests/test_action_queue.py index aea39313..7e131397 100644 --- a/tests/test_action_queue.py +++ b/tests/test_action_queue.py @@ -94,9 +94,9 @@ async def test_action_queue_max_actions_flush(mock_executor): await asyncio.sleep(0.05) # First 3 should be done - assert queued1._future.done() - assert queued2._future.done() - assert queued3._future.done() + assert queued1.is_done() + assert queued2.is_done() + assert queued3.is_done() # Add 2 more - should start a new batch queued4 = await queue.add([actions[3]]) @@ -180,7 +180,7 @@ async def test_action_queue_manual_flush(mock_executor): await queue.flush() # Should be done now - assert queued._future.done() + assert queued.is_done() exec_id = await queued assert exec_id.startswith("exec-1-") @@ -201,7 +201,7 @@ async def test_action_queue_shutdown(mock_executor): await queue.shutdown() # Should be done - assert queued._future.done() + assert queued.is_done() mock_executor.assert_called_once() @@ -263,8 +263,11 @@ async def set_result(): await asyncio.sleep(0.05) queued.set_result("exec-123") - _task = asyncio.create_task(set_result()) # noqa: RUF006 + task = asyncio.create_task(set_result()) # Await the result result = await queued assert result == "exec-123" + + # Ensure background task has completed + await task diff --git a/tests/test_client_queue_integration.py b/tests/test_client_queue_integration.py index 40cd91f6..5376ef7b 100644 --- a/tests/test_client_queue_integration.py +++ b/tests/test_client_queue_integration.py @@ -68,18 +68,21 @@ async def test_client_with_queue_batches_actions(): ) as mock_post: mock_post.return_value = {"execId": "exec-batched"} - # Queue multiple actions quickly - queued1 = await client.execute_action_group([actions[0]]) - queued2 = await client.execute_action_group([actions[1]]) - queued3 = await client.execute_action_group([actions[2]]) + # Queue multiple actions quickly - start them as tasks to allow batching + task1 = asyncio.create_task(client.execute_action_group([actions[0]])) + task2 = asyncio.create_task(client.execute_action_group([actions[1]])) + task3 = asyncio.create_task(client.execute_action_group([actions[2]])) + + # Give them a moment to queue + await asyncio.sleep(0.01) # Should have 3 actions pending assert client.get_pending_actions_count() == 3 # Wait for all to execute - exec_id1 = await queued1 - exec_id2 = await queued2 - exec_id3 = await queued3 + exec_id1 = await task1 + exec_id2 = await task2 + exec_id3 = await task3 # All should have the same exec_id (batched together) assert exec_id1 == exec_id2 == exec_id3 == "exec-batched" @@ -116,7 +119,11 @@ async def test_client_manual_flush(): ) as mock_post: mock_post.return_value = {"execId": "exec-flushed"} - queued = await client.execute_action_group([action]) + # Start execution as a task to allow checking pending count + exec_task = asyncio.create_task(client.execute_action_group([action])) + + # Give it a moment to queue + await asyncio.sleep(0.01) # Should have 1 action pending assert client.get_pending_actions_count() == 1 @@ -127,7 +134,7 @@ async def test_client_manual_flush(): # Should be executed now assert client.get_pending_actions_count() == 0 - exec_id = await queued + exec_id = await exec_task assert exec_id == "exec-flushed" mock_post.assert_called_once() @@ -156,13 +163,17 @@ async def test_client_close_flushes_queue(): ) as mock_post: mock_post.return_value = {"execId": "exec-closed"} - queued = await client.execute_action_group([action]) + # Start execution as a task + exec_task = asyncio.create_task(client.execute_action_group([action])) + + # Give it a moment to queue + await asyncio.sleep(0.01) # Close should flush await client.close() # Should be executed - exec_id = await queued + exec_id = await exec_task assert exec_id == "exec-closed" mock_post.assert_called_once() @@ -193,26 +204,23 @@ async def test_client_queue_respects_max_actions(): ) as mock_post: mock_post.return_value = {"execId": "exec-123"} - # Add 2 actions - should trigger flush - queued1 = await client.execute_action_group([actions[0]]) - queued2 = await client.execute_action_group([actions[1]]) + # Add 2 actions as tasks to trigger flush + task1 = asyncio.create_task(client.execute_action_group([actions[0]])) + task2 = asyncio.create_task(client.execute_action_group([actions[1]])) # Wait a bit for flush await asyncio.sleep(0.05) # First 2 should be done - exec_id1 = await queued1 - exec_id2 = await queued2 + exec_id1 = await task1 + exec_id2 = await task2 assert exec_id1 == "exec-123" assert exec_id2 == "exec-123" # Add third action - starts new batch - queued3 = await client.execute_action_group([actions[2]]) - - # Flush to complete - await client.flush_action_queue() + exec_id3 = await client.execute_action_group([actions[2]]) - exec_id3 = await queued3 + # Should have exec_id directly (waited for batch to complete) assert exec_id3 == "exec-123" # Should have been called twice (2 batches) From ae7b702594bfafdccdcb3912c7aeec57712e4436 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sat, 3 Jan 2026 21:48:42 +0000 Subject: [PATCH 05/12] Refactor OverkizClient initialization in examples and tests to use UsernamePasswordCredentials for improved clarity and consistency --- test_queue_example.py | 17 +++++++--------- tests/test_client_queue_integration.py | 27 +++++++++++--------------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/test_queue_example.py b/test_queue_example.py index b39e64eb..8365e102 100644 --- a/test_queue_example.py +++ b/test_queue_example.py @@ -8,8 +8,8 @@ import asyncio +from pyoverkiz.auth import UsernamePasswordCredentials from pyoverkiz.client import OverkizClient -from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.enums import OverkizCommand, Server from pyoverkiz.models import Action, Command @@ -19,9 +19,8 @@ async def example_without_queue(): print("\n=== Example 1: Without Queue (Immediate Execution) ===") client = OverkizClient( - username="user@example.com", - password="password", - server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("user@example.com", "password"), action_queue_enabled=False, # Queue disabled ) @@ -45,9 +44,8 @@ async def example_with_queue(): print("\n=== Example 2: With Queue (Batched Execution) ===") client = OverkizClient( - username="user@example.com", - password="password", - server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("user@example.com", "password"), action_queue_enabled=True, # Queue enabled! action_queue_delay=0.5, # Wait 500ms before flushing action_queue_max_actions=20, # Max 20 actions per batch @@ -97,9 +95,8 @@ async def example_manual_flush(): print("\n=== Example 3: Manual Flush ===") client = OverkizClient( - username="user@example.com", - password="password", - server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("user@example.com", "password"), action_queue_enabled=True, action_queue_delay=10.0, # Long delay ) diff --git a/tests/test_client_queue_integration.py b/tests/test_client_queue_integration.py index 5376ef7b..100b00d3 100644 --- a/tests/test_client_queue_integration.py +++ b/tests/test_client_queue_integration.py @@ -5,8 +5,8 @@ import pytest +from pyoverkiz.auth import UsernamePasswordCredentials from pyoverkiz.client import OverkizClient -from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.enums import OverkizCommand, Server from pyoverkiz.models import Action, Command @@ -15,9 +15,8 @@ async def test_client_without_queue_executes_immediately(): """Test that client without queue executes actions immediately.""" client = OverkizClient( - username="test@example.com", - password="test", - server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("test@example.com", "test"), action_queue_enabled=False, ) @@ -48,9 +47,8 @@ async def test_client_without_queue_executes_immediately(): async def test_client_with_queue_batches_actions(): """Test that client with queue batches multiple actions.""" client = OverkizClient( - username="test@example.com", - password="test", - server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("test@example.com", "test"), action_queue_enabled=True, action_queue_delay=0.1, ) @@ -102,9 +100,8 @@ async def test_client_with_queue_batches_actions(): async def test_client_manual_flush(): """Test manually flushing the queue.""" client = OverkizClient( - username="test@example.com", - password="test", - server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("test@example.com", "test"), action_queue_enabled=True, action_queue_delay=10.0, # Long delay ) @@ -146,9 +143,8 @@ async def test_client_manual_flush(): async def test_client_close_flushes_queue(): """Test that closing the client flushes pending actions.""" client = OverkizClient( - username="test@example.com", - password="test", - server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("test@example.com", "test"), action_queue_enabled=True, action_queue_delay=10.0, ) @@ -183,9 +179,8 @@ async def test_client_close_flushes_queue(): async def test_client_queue_respects_max_actions(): """Test that queue flushes when max actions is reached.""" client = OverkizClient( - username="test@example.com", - password="test", - server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("test@example.com", "test"), action_queue_enabled=True, action_queue_delay=10.0, action_queue_max_actions=2, # Max 2 actions From e94ec529ca18676f923220d852c778920ee6987b Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sun, 25 Jan 2026 21:16:56 +0000 Subject: [PATCH 06/12] Add action queue feature for batching API calls and enhance examples with logging --- docs/device-control.md | 43 +++++++++++++++++++++++++ pyoverkiz/action_queue.py | 68 ++++++++++++++++++++++----------------- pyoverkiz/client.py | 11 +++++-- pyoverkiz/models.py | 2 +- test_queue_example.py | 66 ++++++++++++++++++++----------------- 5 files changed, 127 insertions(+), 63 deletions(-) diff --git a/docs/device-control.md b/docs/device-control.md index d1bf7757..3dff2ae3 100644 --- a/docs/device-control.md +++ b/docs/device-control.md @@ -87,6 +87,49 @@ await client.execute_action_group( ) ``` +## Action queue (batching across calls) + +If you trigger many action groups in rapid succession, you can enable the action +queue to batch calls within a short window. This reduces API calls and helps +avoid rate limits. + +```python +from pyoverkiz.client import OverkizClient +from pyoverkiz.auth import UsernamePasswordCredentials +from pyoverkiz.enums import OverkizCommand, Server +from pyoverkiz.models import Action, Command + +client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("user@example.com", "password"), + action_queue_enabled=True, + action_queue_delay=0.5, # seconds to wait before auto-flush + action_queue_max_actions=20, # auto-flush when this count is reached +) + +action1 = Action( + device_url="io://1234-5678-1234/12345678", + commands=[Command(name=OverkizCommand.CLOSE)], +) +action2 = Action( + device_url="io://1234-5678-1234/87654321", + commands=[Command(name=OverkizCommand.OPEN)], +) + +# These calls will be batched and return the same exec_id. +exec_id1 = await client.execute_action_group([action1]) +exec_id2 = await client.execute_action_group([action2]) + +print(exec_id1 == exec_id2) +``` + +Notes: +- `action_queue_delay` must be positive. +- `action_queue_max_actions` must be at least 1 and triggers an immediate flush. +- `flush_action_queue()` forces an immediate flush. +- `get_pending_actions_count()` returns a best-effort count and should not be + used for critical control flow. + ## Polling vs event-driven Polling with `get_devices()` is straightforward but may introduce latency and increase server load. For more immediate updates, consider using event listeners. Refer to the [event handling](event-handling.md) guide for implementation details. diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py index 118c5245..2d44825f 100644 --- a/pyoverkiz/action_queue.py +++ b/pyoverkiz/action_queue.py @@ -4,8 +4,8 @@ import asyncio import contextlib -from collections.abc import Callable, Coroutine -from typing import TYPE_CHECKING +from collections.abc import Callable, Coroutine, Generator +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from pyoverkiz.enums import CommandMode @@ -17,25 +17,38 @@ class QueuedExecution: def __init__(self) -> None: """Initialize the queued execution.""" - self._future: asyncio.Future[str] = asyncio.Future() + # Future is created lazily to ensure it is bound to the running event loop. + # Creating it in __init__ would fail if no loop is running yet. + self._future: asyncio.Future[str] | None = None + + def _ensure_future(self) -> asyncio.Future[str]: + """Create the underlying future lazily, bound to the running event loop.""" + # This method is the single point of future creation to guarantee + # consistent loop binding for callers that await or set results later. + if self._future is None: + loop = asyncio.get_running_loop() + self._future = loop.create_future() + return self._future def set_result(self, exec_id: str) -> None: """Set the execution ID result.""" - if not self._future.done(): - self._future.set_result(exec_id) + future = self._ensure_future() + if not future.done(): + future.set_result(exec_id) def set_exception(self, exception: BaseException) -> None: """Set an exception if the batch execution failed.""" - if not self._future.done(): - self._future.set_exception(exception) + future = self._ensure_future() + if not future.done(): + future.set_exception(exception) def is_done(self) -> bool: """Check if the execution has completed (either with result or exception).""" - return self._future.done() + return self._future.done() if self._future is not None else False - def __await__(self): + def __await__(self) -> Generator[Any, None, str]: """Make this awaitable.""" - return self._future.__await__() + return self._ensure_future().__await__() class ActionQueue: @@ -89,14 +102,16 @@ async def add( :param label: Label for the action group :return: QueuedExecution that resolves to exec_id when batch executes """ - batch_to_execute = None + batches_to_execute: list[ + tuple[list[Action], CommandMode | None, str | None, list[QueuedExecution]] + ] = [] async with self._lock: # If mode or label changes, flush existing queue first if self._pending_actions and ( mode != self._pending_mode or label != self._pending_label ): - batch_to_execute = self._prepare_flush() + batches_to_execute.append(self._prepare_flush()) # Add actions to pending queue self._pending_actions.extend(actions) @@ -115,18 +130,15 @@ async def add( # Prepare the current batch for flushing (which includes the actions # we just added). If we already flushed due to mode change, this is # a second batch. - new_batch = self._prepare_flush() - # Execute the first batch if it exists, then the second - if batch_to_execute: - await self._execute_batch(*batch_to_execute) - batch_to_execute = new_batch + batches_to_execute.append(self._prepare_flush()) elif self._flush_task is None or self._flush_task.done(): # Schedule delayed flush if not already scheduled self._flush_task = asyncio.create_task(self._delayed_flush()) - # Execute batch outside the lock if we flushed - if batch_to_execute: - await self._execute_batch(*batch_to_execute) + # Execute batches outside the lock if we flushed + for batch in batches_to_execute: + if batch[0]: + await self._execute_batch(*batch) return waiter @@ -152,13 +164,7 @@ async def _delayed_flush(self) -> None: self._flush_task = None # Execute outside the lock - try: - exec_id = await self._executor(actions, mode, label) - for waiter in waiters: - waiter.set_result(exec_id) - except Exception as exc: - for waiter in waiters: - waiter.set_exception(exc) + await self._execute_batch(actions, mode, label, waiters) except asyncio.CancelledError as exc: # Ensure all waiters are notified if this task is cancelled for waiter in waiters: @@ -211,11 +217,12 @@ async def _execute_batch( # Notify all waiters for waiter in waiters: waiter.set_result(exec_id) - except Exception as exc: + except BaseException as exc: # Propagate exception to all waiters for waiter in waiters: waiter.set_exception(exc) - raise + if isinstance(exc, asyncio.CancelledError): + raise async def flush(self) -> None: """Force flush all pending actions immediately. @@ -241,7 +248,8 @@ def get_pending_count(self) -> int: This method does not acquire the internal lock and therefore returns a best-effort snapshot that may be slightly out of date if the queue is - being modified concurrently by other coroutines. + being modified concurrently by other coroutines. Do not rely on this + value for critical control flow. """ return len(self._pending_actions) diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 5fb5ac9b..3a6d427c 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -158,6 +158,8 @@ def __init__( """Constructor. :param server: ServerConfig + :param credentials: Credentials for authentication + :param verify_ssl: Enable SSL certificate verification :param session: optional ClientSession :param action_queue_enabled: enable action batching queue (default False) :param action_queue_delay: seconds to wait before flushing queue (default 0.5) @@ -184,9 +186,14 @@ def __init__( # Initialize action queue if enabled if action_queue_enabled: if action_queue_delay <= 0: - raise ValueError("action_queue_delay must be positive") + raise ValueError( + f"action_queue_delay must be positive, got {action_queue_delay!r}" + ) if action_queue_max_actions < 1: - raise ValueError("action_queue_max_actions must be at least 1") + raise ValueError( + "action_queue_max_actions must be at least 1, " + f"got {action_queue_max_actions!r}" + ) self._action_queue = ActionQueue( executor=self._execute_action_group_direct, diff --git a/pyoverkiz/models.py b/pyoverkiz/models.py index 9ac74062..624a4331 100644 --- a/pyoverkiz/models.py +++ b/pyoverkiz/models.py @@ -477,7 +477,7 @@ class Command: def __init__( self, - name: OverkizCommand, + name: str | OverkizCommand, parameters: list[str | int | float | OverkizCommandParam] | None = None, type: int | None = None, **_: Any, diff --git a/test_queue_example.py b/test_queue_example.py index 8365e102..b3f11137 100644 --- a/test_queue_example.py +++ b/test_queue_example.py @@ -7,16 +7,19 @@ from __future__ import annotations import asyncio +import logging from pyoverkiz.auth import UsernamePasswordCredentials from pyoverkiz.client import OverkizClient from pyoverkiz.enums import OverkizCommand, Server from pyoverkiz.models import Action, Command +_LOGGER = logging.getLogger(__name__) + async def example_without_queue(): """Example: Execute actions without queue (immediate execution).""" - print("\n=== Example 1: Without Queue (Immediate Execution) ===") + _LOGGER.info("=== Example 1: Without Queue (Immediate Execution) ===") client = OverkizClient( server=Server.SOMFY_EUROPE, @@ -31,17 +34,19 @@ async def example_without_queue(): ) # This will execute immediately - print("Executing action 1...") + _LOGGER.info("Executing action 1...") # exec_id = await client.execute_action_group([action1]) # print(f"Got exec_id immediately: {exec_id}") - print("Without queue: Each call executes immediately as a separate API request") + _LOGGER.info( + "Without queue: Each call executes immediately as a separate API request" + ) await client.close() async def example_with_queue(): """Example: Execute actions with queue (batched execution).""" - print("\n=== Example 2: With Queue (Batched Execution) ===") + _LOGGER.info("=== Example 2: With Queue (Batched Execution) ===") client = OverkizClient( server=Server.SOMFY_EUROPE, @@ -68,31 +73,31 @@ async def example_with_queue(): ) # These will be queued and batched together! - print("Queueing action 1...") + _LOGGER.info("Queueing action 1...") exec_id1 = await client.execute_action_group([action1]) - print(f"Got exec_id: {exec_id1}") + _LOGGER.info("Got exec_id: %s", exec_id1) - print("Queueing action 2...") + _LOGGER.info("Queueing action 2...") exec_id2 = await client.execute_action_group([action2]) - print("Queueing action 3...") + _LOGGER.info("Queueing action 3...") exec_id3 = await client.execute_action_group([action3]) - print(f"Pending actions in queue: {client.get_pending_actions_count()}") + _LOGGER.info("Pending actions in queue: %s", client.get_pending_actions_count()) # All three will have the same exec_id since they were batched together! - print(f"\nExec ID 1: {exec_id1}") - print(f"Exec ID 2: {exec_id2}") - print(f"Exec ID 3: {exec_id3}") - print(f"All same? {exec_id1 == exec_id2 == exec_id3}") + _LOGGER.info("Exec ID 1: %s", exec_id1) + _LOGGER.info("Exec ID 2: %s", exec_id2) + _LOGGER.info("Exec ID 3: %s", exec_id3) + _LOGGER.info("All same? %s", exec_id1 == exec_id2 == exec_id3) - print("\nWith queue: Multiple actions batched into single API request!") + _LOGGER.info("With queue: Multiple actions batched into single API request!") await client.close() async def example_manual_flush(): """Example: Manually flush the queue.""" - print("\n=== Example 3: Manual Flush ===") + _LOGGER.info("=== Example 3: Manual Flush ===") client = OverkizClient( server=Server.SOMFY_EUROPE, @@ -106,44 +111,45 @@ async def example_manual_flush(): commands=[Command(name=OverkizCommand.CLOSE)], ) - print("Queueing action with 10s delay...") + _LOGGER.info("Queueing action with 10s delay...") # Start execution in background (don't await yet) exec_task = asyncio.create_task(client.execute_action_group([action])) # Give it a moment to queue await asyncio.sleep(0.1) - print(f"Pending actions: {client.get_pending_actions_count()}") + _LOGGER.info("Pending actions: %s", client.get_pending_actions_count()) # Don't want to wait 10 seconds? Flush manually! - print("Manually flushing queue...") + _LOGGER.info("Manually flushing queue...") await client.flush_action_queue() - print(f"Pending actions after flush: {client.get_pending_actions_count()}") + _LOGGER.info("Pending actions after flush: %s", client.get_pending_actions_count()) # Now get the result exec_id = await exec_task - print(f"Got exec_id: {exec_id}") + _LOGGER.info("Got exec_id: %s", exec_id) await client.close() async def main(): """Run all examples.""" - print("=" * 60) - print("Action Queue Feature Examples") - print("=" * 60) + logging.basicConfig(level=logging.INFO) + _LOGGER.info("=" * 60) + _LOGGER.info("Action Queue Feature Examples") + _LOGGER.info("=" * 60) await example_without_queue() await example_with_queue() await example_manual_flush() - print("\n" + "=" * 60) - print("Key Benefits:") - print("- Reduces API calls by batching actions") - print("- Helps avoid Overkiz rate limits") - print("- Perfect for scenes/automations with multiple devices") - print("- Fully backward compatible (disabled by default)") - print("=" * 60) + _LOGGER.info("%s", "=" * 60) + _LOGGER.info("Key Benefits:") + _LOGGER.info("- Reduces API calls by batching actions") + _LOGGER.info("- Helps avoid Overkiz rate limits") + _LOGGER.info("- Perfect for scenes/automations with multiple devices") + _LOGGER.info("- Fully backward compatible (disabled by default)") + _LOGGER.info("%s", "=" * 60) if __name__ == "__main__": From ff27052e0926dde5286b3c6f81be00884bd1e6af Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sun, 25 Jan 2026 21:39:50 +0000 Subject: [PATCH 07/12] Add action queue documentation, integrate into device control guide, and update client and example usage --- docs/action-queue.md | 137 +++++++++++++++++++++++++ docs/device-control.md | 40 +------- mkdocs.yml | 1 + pyoverkiz/action_queue.py | 18 ++++ pyoverkiz/client.py | 36 +++---- test_queue_example.py | 13 +-- tests/test_client_queue_integration.py | 19 ++-- 7 files changed, 191 insertions(+), 73 deletions(-) create mode 100644 docs/action-queue.md diff --git a/docs/action-queue.md b/docs/action-queue.md new file mode 100644 index 00000000..d5070660 --- /dev/null +++ b/docs/action-queue.md @@ -0,0 +1,137 @@ +# Action queue + +The action queue automatically groups rapid, consecutive calls to `execute_action_group()` into a single ActionGroup execution. This minimizes the number of API calls and helps prevent rate limiting issues, such as `TooManyRequestsException`, `TooManyConcurrentRequestsException`, or `TooManyExecutionsException`, which can occur if actions are sent individually in quick succession. + +## Enable with defaults + +Set `action_queue=True` to enable batching with default settings: + +```python +from pyoverkiz.client import OverkizClient +from pyoverkiz.auth import UsernamePasswordCredentials +from pyoverkiz.enums import OverkizCommand, Server +from pyoverkiz.models import Action, Command + +client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("user@example.com", "password"), + action_queue=True, # uses defaults +) + +action1 = Action( + device_url="io://1234-5678-1234/12345678", + commands=[Command(name=OverkizCommand.CLOSE)], +) +action2 = Action( + device_url="io://1234-5678-1234/87654321", + commands=[Command(name=OverkizCommand.OPEN)], +) + +exec_id1 = await client.execute_action_group([action1]) +exec_id2 = await client.execute_action_group([action2]) + +print(exec_id1 == exec_id2) +``` + +Defaults: +- `delay=0.5` +- `max_actions=20` + +## Advanced settings + +If you need to tune batching behavior, pass `ActionQueueSettings`: + +```python +import asyncio + +from pyoverkiz.action_queue import ActionQueueSettings +from pyoverkiz.client import OverkizClient +from pyoverkiz.auth import UsernamePasswordCredentials +from pyoverkiz.enums import OverkizCommand, Server +from pyoverkiz.models import Action, Command + +client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("user@example.com", "password"), + action_queue=ActionQueueSettings( + delay=0.5, # seconds to wait before auto-flush + max_actions=20, # auto-flush when this count is reached + ), +) +``` + +## `flush_action_queue()` (force immediate execution) + +Normally, queued actions are sent after the delay window or when `max_actions` is reached. Call `flush_action_queue()` to force the queue to execute immediately, which is useful when you want to send any pending actions without waiting for the delay timer to expire. + +```python +from pyoverkiz.action_queue import ActionQueueSettings +import asyncio + +from pyoverkiz.client import OverkizClient +from pyoverkiz.auth import UsernamePasswordCredentials +from pyoverkiz.enums import OverkizCommand, Server +from pyoverkiz.models import Action, Command + +client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("user@example.com", "password"), + action_queue=ActionQueueSettings(delay=10.0), # long delay +) + +action = Action( + device_url="io://1234-5678-1234/12345678", + commands=[Command(name=OverkizCommand.CLOSE)], +) + +exec_task = asyncio.create_task(client.execute_action_group([action])) + +# Give it time to enter the queue +await asyncio.sleep(0.05) + +# Force immediate execution instead of waiting 10 seconds +await client.flush_action_queue() + +exec_id = await exec_task +print(exec_id) +``` + +Why this matters: +- It lets you keep a long delay for batching, but still force a quick execution when a user interaction demands it. +- Useful before shutdown to avoid leaving actions waiting in the queue. + +## `get_pending_actions_count()` (best-effort count) + +`get_pending_actions_count()` returns a snapshot of how many actions are currently queued. Because the queue can change concurrently (and the method does not acquire the queue lock), the value is approximate. Use it for logging, diagnostics, or UI hints—not for critical control flow. + +```python +from pyoverkiz.client import OverkizClient +from pyoverkiz.auth import UsernamePasswordCredentials +from pyoverkiz.enums import OverkizCommand, Server +from pyoverkiz.models import Action, Command + +client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("user@example.com", "password"), + action_queue=True, +) + +action = Action( + device_url="io://1234-5678-1234/12345678", + commands=[Command(name=OverkizCommand.CLOSE)], +) + +exec_task = asyncio.create_task(client.execute_action_group([action])) +await asyncio.sleep(0.01) + +pending = client.get_pending_actions_count() +print(f"Pending actions (approx): {pending}") + +exec_id = await exec_task +print(exec_id) +``` + +Why it’s best-effort: +- Actions may flush automatically while you read the count. +- New actions may be added concurrently by other tasks. +- The count can be briefly stale, so avoid using it to decide whether you must flush or not. diff --git a/docs/device-control.md b/docs/device-control.md index 3dff2ae3..b972a66e 100644 --- a/docs/device-control.md +++ b/docs/device-control.md @@ -91,44 +91,8 @@ await client.execute_action_group( If you trigger many action groups in rapid succession, you can enable the action queue to batch calls within a short window. This reduces API calls and helps -avoid rate limits. - -```python -from pyoverkiz.client import OverkizClient -from pyoverkiz.auth import UsernamePasswordCredentials -from pyoverkiz.enums import OverkizCommand, Server -from pyoverkiz.models import Action, Command - -client = OverkizClient( - server=Server.SOMFY_EUROPE, - credentials=UsernamePasswordCredentials("user@example.com", "password"), - action_queue_enabled=True, - action_queue_delay=0.5, # seconds to wait before auto-flush - action_queue_max_actions=20, # auto-flush when this count is reached -) - -action1 = Action( - device_url="io://1234-5678-1234/12345678", - commands=[Command(name=OverkizCommand.CLOSE)], -) -action2 = Action( - device_url="io://1234-5678-1234/87654321", - commands=[Command(name=OverkizCommand.OPEN)], -) - -# These calls will be batched and return the same exec_id. -exec_id1 = await client.execute_action_group([action1]) -exec_id2 = await client.execute_action_group([action2]) - -print(exec_id1 == exec_id2) -``` - -Notes: -- `action_queue_delay` must be positive. -- `action_queue_max_actions` must be at least 1 and triggers an immediate flush. -- `flush_action_queue()` forces an immediate flush. -- `get_pending_actions_count()` returns a best-effort count and should not be - used for critical control flow. +avoid rate limits. See the [Action queue](action-queue.md) guide for setup, +advanced settings, and usage patterns. ## Polling vs event-driven diff --git a/mkdocs.yml b/mkdocs.yml index 28c5d6f0..2a8508e7 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -46,6 +46,7 @@ nav: - Getting started: getting-started.md - Core concepts: core-concepts.md - Device control: device-control.md + - Action queue: action-queue.md - Event handling: event-handling.md - Error handling: error-handling.md - Troubleshooting: troubleshooting.md diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py index 2d44825f..50de5cfd 100644 --- a/pyoverkiz/action_queue.py +++ b/pyoverkiz/action_queue.py @@ -5,6 +5,7 @@ import asyncio import contextlib from collections.abc import Callable, Coroutine, Generator +from dataclasses import dataclass from typing import TYPE_CHECKING, Any if TYPE_CHECKING: @@ -12,6 +13,23 @@ from pyoverkiz.models import Action +@dataclass(frozen=True, slots=True) +class ActionQueueSettings: + """Settings for configuring the action queue behavior.""" + + delay: float = 0.5 + max_actions: int = 20 + + def validate(self) -> None: + """Validate configuration values for the action queue.""" + if self.delay <= 0: + raise ValueError(f"action_queue.delay must be positive, got {self.delay!r}") + if self.max_actions < 1: + raise ValueError( + f"action_queue.max_actions must be at least 1, got {self.max_actions!r}" + ) + + class QueuedExecution: """Represents a queued action execution that will resolve to an exec_id when the batch executes.""" diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 3a6d427c..54d6659b 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -19,7 +19,7 @@ ServerDisconnectedError, ) -from pyoverkiz.action_queue import ActionQueue +from pyoverkiz.action_queue import ActionQueue, ActionQueueSettings from pyoverkiz.auth import AuthStrategy, Credentials, build_auth_strategy from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.enums import APIType, CommandMode, Server @@ -151,9 +151,7 @@ def __init__( credentials: Credentials, verify_ssl: bool = True, session: ClientSession | None = None, - action_queue_enabled: bool = False, - action_queue_delay: float = 0.5, - action_queue_max_actions: int = 20, + action_queue: bool | ActionQueueSettings = False, ) -> None: """Constructor. @@ -161,9 +159,7 @@ def __init__( :param credentials: Credentials for authentication :param verify_ssl: Enable SSL certificate verification :param session: optional ClientSession - :param action_queue_enabled: enable action batching queue (default False) - :param action_queue_delay: seconds to wait before flushing queue (default 0.5) - :param action_queue_max_actions: maximum actions per batch before auto-flush (default 20) + :param action_queue: enable batching or provide queue settings (default False) """ self.server_config = self._normalize_server(server) @@ -184,21 +180,23 @@ def __init__( self._ssl = SSL_CONTEXT_LOCAL_API # Initialize action queue if enabled - if action_queue_enabled: - if action_queue_delay <= 0: - raise ValueError( - f"action_queue_delay must be positive, got {action_queue_delay!r}" - ) - if action_queue_max_actions < 1: - raise ValueError( - "action_queue_max_actions must be at least 1, " - f"got {action_queue_max_actions!r}" - ) + queue_settings: ActionQueueSettings | None + if isinstance(action_queue, ActionQueueSettings): + queue_settings = action_queue + elif isinstance(action_queue, bool): + queue_settings = ActionQueueSettings() if action_queue else None + else: + raise TypeError( + "action_queue must be a bool or ActionQueueSettings, " + f"got {type(action_queue).__name__}" + ) + if queue_settings: + queue_settings.validate() self._action_queue = ActionQueue( executor=self._execute_action_group_direct, - delay=action_queue_delay, - max_actions=action_queue_max_actions, + delay=queue_settings.delay, + max_actions=queue_settings.max_actions, ) self._auth = build_auth_strategy( diff --git a/test_queue_example.py b/test_queue_example.py index b3f11137..07293f96 100644 --- a/test_queue_example.py +++ b/test_queue_example.py @@ -9,6 +9,7 @@ import asyncio import logging +from pyoverkiz.action_queue import ActionQueueSettings from pyoverkiz.auth import UsernamePasswordCredentials from pyoverkiz.client import OverkizClient from pyoverkiz.enums import OverkizCommand, Server @@ -24,7 +25,7 @@ async def example_without_queue(): client = OverkizClient( server=Server.SOMFY_EUROPE, credentials=UsernamePasswordCredentials("user@example.com", "password"), - action_queue_enabled=False, # Queue disabled + action_queue=False, # Queue disabled ) # Create some example actions @@ -51,9 +52,10 @@ async def example_with_queue(): client = OverkizClient( server=Server.SOMFY_EUROPE, credentials=UsernamePasswordCredentials("user@example.com", "password"), - action_queue_enabled=True, # Queue enabled! - action_queue_delay=0.5, # Wait 500ms before flushing - action_queue_max_actions=20, # Max 20 actions per batch + action_queue=ActionQueueSettings( + delay=0.5, # Wait 500ms before flushing + max_actions=20, # Max 20 actions per batch + ), ) # Create some example actions @@ -102,8 +104,7 @@ async def example_manual_flush(): client = OverkizClient( server=Server.SOMFY_EUROPE, credentials=UsernamePasswordCredentials("user@example.com", "password"), - action_queue_enabled=True, - action_queue_delay=10.0, # Long delay + action_queue=ActionQueueSettings(delay=10.0), # Long delay ) action = Action( diff --git a/tests/test_client_queue_integration.py b/tests/test_client_queue_integration.py index 100b00d3..01df9d12 100644 --- a/tests/test_client_queue_integration.py +++ b/tests/test_client_queue_integration.py @@ -5,6 +5,7 @@ import pytest +from pyoverkiz.action_queue import ActionQueueSettings from pyoverkiz.auth import UsernamePasswordCredentials from pyoverkiz.client import OverkizClient from pyoverkiz.enums import OverkizCommand, Server @@ -17,7 +18,7 @@ async def test_client_without_queue_executes_immediately(): client = OverkizClient( server=Server.SOMFY_EUROPE, credentials=UsernamePasswordCredentials("test@example.com", "test"), - action_queue_enabled=False, + action_queue=False, ) action = Action( @@ -49,8 +50,7 @@ async def test_client_with_queue_batches_actions(): client = OverkizClient( server=Server.SOMFY_EUROPE, credentials=UsernamePasswordCredentials("test@example.com", "test"), - action_queue_enabled=True, - action_queue_delay=0.1, + action_queue=ActionQueueSettings(delay=0.1), ) actions = [ @@ -102,8 +102,7 @@ async def test_client_manual_flush(): client = OverkizClient( server=Server.SOMFY_EUROPE, credentials=UsernamePasswordCredentials("test@example.com", "test"), - action_queue_enabled=True, - action_queue_delay=10.0, # Long delay + action_queue=ActionQueueSettings(delay=10.0), # Long delay ) action = Action( @@ -145,8 +144,7 @@ async def test_client_close_flushes_queue(): client = OverkizClient( server=Server.SOMFY_EUROPE, credentials=UsernamePasswordCredentials("test@example.com", "test"), - action_queue_enabled=True, - action_queue_delay=10.0, + action_queue=ActionQueueSettings(delay=10.0), ) action = Action( @@ -181,9 +179,10 @@ async def test_client_queue_respects_max_actions(): client = OverkizClient( server=Server.SOMFY_EUROPE, credentials=UsernamePasswordCredentials("test@example.com", "test"), - action_queue_enabled=True, - action_queue_delay=10.0, - action_queue_max_actions=2, # Max 2 actions + action_queue=ActionQueueSettings( + delay=10.0, + max_actions=2, # Max 2 actions + ), ) actions = [ From 65ef9cc295de8f0e0a97dfd8487d2b82a96dce3c Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sun, 25 Jan 2026 21:43:11 +0000 Subject: [PATCH 08/12] Remove example script for action queue feature to streamline repository --- test_queue_example.py | 157 ------------------------------------------ 1 file changed, 157 deletions(-) delete mode 100644 test_queue_example.py diff --git a/test_queue_example.py b/test_queue_example.py deleted file mode 100644 index 07293f96..00000000 --- a/test_queue_example.py +++ /dev/null @@ -1,157 +0,0 @@ -#!/usr/bin/env python3 -# mypy: ignore-errors -# type: ignore - -"""Simple example demonstrating the action queue feature.""" - -from __future__ import annotations - -import asyncio -import logging - -from pyoverkiz.action_queue import ActionQueueSettings -from pyoverkiz.auth import UsernamePasswordCredentials -from pyoverkiz.client import OverkizClient -from pyoverkiz.enums import OverkizCommand, Server -from pyoverkiz.models import Action, Command - -_LOGGER = logging.getLogger(__name__) - - -async def example_without_queue(): - """Example: Execute actions without queue (immediate execution).""" - _LOGGER.info("=== Example 1: Without Queue (Immediate Execution) ===") - - client = OverkizClient( - server=Server.SOMFY_EUROPE, - credentials=UsernamePasswordCredentials("user@example.com", "password"), - action_queue=False, # Queue disabled - ) - - # Create some example actions - Action( - device_url="io://1234-5678-9012/12345678", - commands=[Command(name=OverkizCommand.CLOSE)], - ) - - # This will execute immediately - _LOGGER.info("Executing action 1...") - # exec_id = await client.execute_action_group([action1]) - # print(f"Got exec_id immediately: {exec_id}") - - _LOGGER.info( - "Without queue: Each call executes immediately as a separate API request" - ) - await client.close() - - -async def example_with_queue(): - """Example: Execute actions with queue (batched execution).""" - _LOGGER.info("=== Example 2: With Queue (Batched Execution) ===") - - client = OverkizClient( - server=Server.SOMFY_EUROPE, - credentials=UsernamePasswordCredentials("user@example.com", "password"), - action_queue=ActionQueueSettings( - delay=0.5, # Wait 500ms before flushing - max_actions=20, # Max 20 actions per batch - ), - ) - - # Create some example actions - action1 = Action( - device_url="io://1234-5678-9012/12345678", - commands=[Command(name=OverkizCommand.CLOSE)], - ) - - action2 = Action( - device_url="io://1234-5678-9012/87654321", - commands=[Command(name=OverkizCommand.OPEN)], - ) - - action3 = Action( - device_url="io://1234-5678-9012/11111111", - commands=[Command(name=OverkizCommand.STOP)], - ) - - # These will be queued and batched together! - _LOGGER.info("Queueing action 1...") - exec_id1 = await client.execute_action_group([action1]) - _LOGGER.info("Got exec_id: %s", exec_id1) - - _LOGGER.info("Queueing action 2...") - exec_id2 = await client.execute_action_group([action2]) - - _LOGGER.info("Queueing action 3...") - exec_id3 = await client.execute_action_group([action3]) - - _LOGGER.info("Pending actions in queue: %s", client.get_pending_actions_count()) - - # All three will have the same exec_id since they were batched together! - _LOGGER.info("Exec ID 1: %s", exec_id1) - _LOGGER.info("Exec ID 2: %s", exec_id2) - _LOGGER.info("Exec ID 3: %s", exec_id3) - _LOGGER.info("All same? %s", exec_id1 == exec_id2 == exec_id3) - - _LOGGER.info("With queue: Multiple actions batched into single API request!") - await client.close() - - -async def example_manual_flush(): - """Example: Manually flush the queue.""" - _LOGGER.info("=== Example 3: Manual Flush ===") - - client = OverkizClient( - server=Server.SOMFY_EUROPE, - credentials=UsernamePasswordCredentials("user@example.com", "password"), - action_queue=ActionQueueSettings(delay=10.0), # Long delay - ) - - action = Action( - device_url="io://1234-5678-9012/12345678", - commands=[Command(name=OverkizCommand.CLOSE)], - ) - - _LOGGER.info("Queueing action with 10s delay...") - # Start execution in background (don't await yet) - exec_task = asyncio.create_task(client.execute_action_group([action])) - - # Give it a moment to queue - await asyncio.sleep(0.1) - _LOGGER.info("Pending actions: %s", client.get_pending_actions_count()) - - # Don't want to wait 10 seconds? Flush manually! - _LOGGER.info("Manually flushing queue...") - await client.flush_action_queue() - - _LOGGER.info("Pending actions after flush: %s", client.get_pending_actions_count()) - - # Now get the result - exec_id = await exec_task - _LOGGER.info("Got exec_id: %s", exec_id) - - await client.close() - - -async def main(): - """Run all examples.""" - logging.basicConfig(level=logging.INFO) - _LOGGER.info("=" * 60) - _LOGGER.info("Action Queue Feature Examples") - _LOGGER.info("=" * 60) - - await example_without_queue() - await example_with_queue() - await example_manual_flush() - - _LOGGER.info("%s", "=" * 60) - _LOGGER.info("Key Benefits:") - _LOGGER.info("- Reduces API calls by batching actions") - _LOGGER.info("- Helps avoid Overkiz rate limits") - _LOGGER.info("- Perfect for scenes/automations with multiple devices") - _LOGGER.info("- Fully backward compatible (disabled by default)") - _LOGGER.info("%s", "=" * 60) - - -if __name__ == "__main__": - asyncio.run(main()) From aaab0a23cec88ae88bed28ce7106573eb2592651 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sun, 25 Jan 2026 21:50:27 +0000 Subject: [PATCH 09/12] Refactor action execution in action queue documentation to use asyncio tasks for improved concurrency --- docs/action-queue.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/action-queue.md b/docs/action-queue.md index d5070660..16248931 100644 --- a/docs/action-queue.md +++ b/docs/action-queue.md @@ -7,8 +7,8 @@ The action queue automatically groups rapid, consecutive calls to `execute_actio Set `action_queue=True` to enable batching with default settings: ```python -from pyoverkiz.client import OverkizClient from pyoverkiz.auth import UsernamePasswordCredentials +from pyoverkiz.client import OverkizClient from pyoverkiz.enums import OverkizCommand, Server from pyoverkiz.models import Action, Command @@ -27,8 +27,9 @@ action2 = Action( commands=[Command(name=OverkizCommand.OPEN)], ) -exec_id1 = await client.execute_action_group([action1]) -exec_id2 = await client.execute_action_group([action2]) +task1 = asyncio.create_task(client.execute_action_group([action1])) +task2 = asyncio.create_task(client.execute_action_group([action2])) +exec_id1, exec_id2 = await asyncio.gather(task1, task2) print(exec_id1 == exec_id2) ``` From 78a878ea0261f421e2426451ce09969453c1ae1d Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sun, 25 Jan 2026 22:08:20 +0000 Subject: [PATCH 10/12] Enhance action queue to merge commands for duplicate devices and preserve order --- docs/action-queue.md | 7 +++++ pyoverkiz/action_queue.py | 31 ++++++++++++++++++++++- pyoverkiz/client.py | 4 +++ tests/test_action_queue.py | 52 +++++++++++++++++++++++++++++++++++++- 4 files changed, 92 insertions(+), 2 deletions(-) diff --git a/docs/action-queue.md b/docs/action-queue.md index 16248931..b24efd54 100644 --- a/docs/action-queue.md +++ b/docs/action-queue.md @@ -2,6 +2,13 @@ The action queue automatically groups rapid, consecutive calls to `execute_action_group()` into a single ActionGroup execution. This minimizes the number of API calls and helps prevent rate limiting issues, such as `TooManyRequestsException`, `TooManyConcurrentRequestsException`, or `TooManyExecutionsException`, which can occur if actions are sent individually in quick succession. +Important limitation: +- Gateways only allow a single action per device in each action group. The queue + merges commands for the same `device_url` into a single action to keep the + batch valid and preserve command order for that device. +- If you pass multiple actions for the same `device_url` in a single + `execute_action_group()` call, the queue will merge them for you. + ## Enable with defaults Set `action_queue=True` to enable batching with default settings: diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py index 50de5cfd..13283adc 100644 --- a/pyoverkiz/action_queue.py +++ b/pyoverkiz/action_queue.py @@ -115,6 +115,10 @@ async def add( ) -> QueuedExecution: """Add actions to the queue. + When multiple actions target the same device, their commands are merged + into a single action to respect the gateway limitation of one action per + device in each action group. + :param actions: Actions to queue :param mode: Command mode (will flush if different from pending mode) :param label: Label for the action group @@ -124,6 +128,19 @@ async def add( tuple[list[Action], CommandMode | None, str | None, list[QueuedExecution]] ] = [] + if not actions: + raise ValueError("actions must contain at least one Action") + + normalized_actions: list[Action] = [] + normalized_index: dict[str, Action] = {} + for action in actions: + existing = normalized_index.get(action.device_url) + if existing is None: + normalized_actions.append(action) + normalized_index[action.device_url] = action + else: + existing.commands.extend(action.commands) + async with self._lock: # If mode or label changes, flush existing queue first if self._pending_actions and ( @@ -132,7 +149,19 @@ async def add( batches_to_execute.append(self._prepare_flush()) # Add actions to pending queue - self._pending_actions.extend(actions) + for action in normalized_actions: + pending = next( + ( + pending_action + for pending_action in self._pending_actions + if pending_action.device_url == action.device_url + ), + None, + ) + if pending is None: + self._pending_actions.append(action) + else: + pending.commands.extend(action.commands) self._pending_mode = mode self._pending_label = label diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 54d6659b..1e92cf53 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -504,6 +504,10 @@ async def execute_action_group( executed within the configured delay window. The method will wait for the batch to execute and return the exec_id. + Gateways only allow a single action per device in each action group. The + action queue enforces this by merging commands for the same device into + a single action in the batch. + When action queue is disabled, executes immediately and returns exec_id. The API is consistent regardless of queue configuration - always returns diff --git a/tests/test_action_queue.py b/tests/test_action_queue.py index 7e131397..752e344d 100644 --- a/tests/test_action_queue.py +++ b/tests/test_action_queue.py @@ -164,6 +164,56 @@ async def test_action_queue_label_change_flush(mock_executor): assert mock_executor.call_count == 2 +@pytest.mark.asyncio +async def test_action_queue_duplicate_device_merge(mock_executor): + """Test that queue merges commands for duplicate devices.""" + queue = ActionQueue(executor=mock_executor, delay=0.5) + + action1 = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + action2 = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.OPEN)], + ) + + queued1 = await queue.add([action1]) + queued2 = await queue.add([action2]) + + exec_id1 = await queued1 + exec_id2 = await queued2 + + assert exec_id1 == exec_id2 + mock_executor.assert_called_once() + + +@pytest.mark.asyncio +async def test_action_queue_duplicate_device_merge_order(mock_executor): + """Test that command order is preserved when merging.""" + queue = ActionQueue(executor=mock_executor, delay=0.1) + + action1 = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + action2 = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.OPEN)], + ) + + queued = await queue.add([action1, action2]) + await queued + + args, _ = mock_executor.call_args + actions = args[0] + assert len(actions) == 1 + assert [command.name for command in actions[0].commands] == [ + OverkizCommand.CLOSE, + OverkizCommand.OPEN, + ] + + @pytest.mark.asyncio async def test_action_queue_manual_flush(mock_executor): """Test manual flush of the queue.""" @@ -246,7 +296,7 @@ async def test_action_queue_get_pending_count(): assert queue.get_pending_count() == 1 await queue.add([action]) - assert queue.get_pending_count() == 2 + assert queue.get_pending_count() == 1 # Wait for flush await asyncio.sleep(0.6) From 476535af471124ed156537d2201e4c977e5b44fc Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sun, 25 Jan 2026 22:13:04 +0000 Subject: [PATCH 11/12] Update action queue documentation and improve error messages for clarity --- docs/action-queue.md | 2 ++ pyoverkiz/action_queue.py | 18 +++++++++++------- pyoverkiz/client.py | 5 +++-- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/docs/action-queue.md b/docs/action-queue.md index b24efd54..8ff64910 100644 --- a/docs/action-queue.md +++ b/docs/action-queue.md @@ -14,6 +14,8 @@ Important limitation: Set `action_queue=True` to enable batching with default settings: ```python +import asyncio + from pyoverkiz.auth import UsernamePasswordCredentials from pyoverkiz.client import OverkizClient from pyoverkiz.enums import OverkizCommand, Server diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py index 13283adc..57ae5c49 100644 --- a/pyoverkiz/action_queue.py +++ b/pyoverkiz/action_queue.py @@ -23,10 +23,10 @@ class ActionQueueSettings: def validate(self) -> None: """Validate configuration values for the action queue.""" if self.delay <= 0: - raise ValueError(f"action_queue.delay must be positive, got {self.delay!r}") + raise ValueError(f"action_queue_delay must be positive, got {self.delay!r}") if self.max_actions < 1: raise ValueError( - f"action_queue.max_actions must be at least 1, got {self.max_actions!r}" + f"action_queue_max_actions must be at least 1, got {self.max_actions!r}" ) @@ -78,6 +78,7 @@ class ActionQueue: - The delay timer expires - The max actions limit is reached - The command mode changes + - The label changes - Manual flush is requested """ @@ -264,12 +265,15 @@ async def _execute_batch( # Notify all waiters for waiter in waiters: waiter.set_result(exec_id) - except BaseException as exc: - # Propagate exception to all waiters + except asyncio.CancelledError as exc: + # Propagate cancellation to all waiters, then re-raise. + for waiter in waiters: + waiter.set_exception(exc) + raise + except Exception as exc: + # Propagate exceptions to all waiters without swallowing system-level exits. for waiter in waiters: waiter.set_exception(exc) - if isinstance(exc, asyncio.CancelledError): - raise async def flush(self) -> None: """Force flush all pending actions immediately. @@ -296,7 +300,7 @@ def get_pending_count(self) -> int: This method does not acquire the internal lock and therefore returns a best-effort snapshot that may be slightly out of date if the queue is being modified concurrently by other coroutines. Do not rely on this - value for critical control flow. + value for critical control flow or for making flush decisions. """ return len(self._pending_actions) diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 1e92cf53..310059a0 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -539,9 +539,10 @@ async def flush_action_queue(self) -> None: await self._action_queue.flush() def get_pending_actions_count(self) -> int: - """Get the number of actions currently waiting in the queue. + """Get the approximate number of actions currently waiting in the queue. - Returns 0 if action queue is disabled. + Returns 0 if action queue is disabled. This is a best-effort snapshot + and may be stale if other coroutines modify the queue concurrently. """ if self._action_queue: return self._action_queue.get_pending_count() From 01eece878ff656e3a9bcb2e1e4aba73ab989db5b Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sun, 25 Jan 2026 22:18:46 +0000 Subject: [PATCH 12/12] Update action queue documentation to clarify rate limits and improve structure --- docs/action-queue.md | 2 +- docs/device-control.md | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/docs/action-queue.md b/docs/action-queue.md index 8ff64910..f14c02c7 100644 --- a/docs/action-queue.md +++ b/docs/action-queue.md @@ -1,6 +1,6 @@ # Action queue -The action queue automatically groups rapid, consecutive calls to `execute_action_group()` into a single ActionGroup execution. This minimizes the number of API calls and helps prevent rate limiting issues, such as `TooManyRequestsException`, `TooManyConcurrentRequestsException`, or `TooManyExecutionsException`, which can occur if actions are sent individually in quick succession. +The action queue automatically groups rapid, consecutive calls to `execute_action_group()` into a single ActionGroup execution. This minimizes the number of API calls and helps prevent rate limiting issues, such as `TooManyRequestsException`, `TooManyConcurrentRequestsException`, `TooManyExecutionsException`, or `ExecutionQueueFullException` which can occur if actions are sent individually in quick succession. Important limitation: - Gateways only allow a single action per device in each action group. The queue diff --git a/docs/device-control.md b/docs/device-control.md index b972a66e..50467f99 100644 --- a/docs/device-control.md +++ b/docs/device-control.md @@ -85,15 +85,18 @@ await client.execute_action_group( ], label="Execution: multiple commands", ) -``` -## Action queue (batching across calls) +## Limitations and rate limits + +Gateways impose limits on how many executions can run or be queued simultaneously. If the execution queue is full, the API will raise an `ExecutionQueueFullException`. Most gateways allow up to 10 concurrent executions. + +### Action queue (batching across calls) If you trigger many action groups in rapid succession, you can enable the action queue to batch calls within a short window. This reduces API calls and helps avoid rate limits. See the [Action queue](action-queue.md) guide for setup, advanced settings, and usage patterns. -## Polling vs event-driven +### Polling vs event-driven Polling with `get_devices()` is straightforward but may introduce latency and increase server load. For more immediate updates, consider using event listeners. Refer to the [event handling](event-handling.md) guide for implementation details.