diff --git a/docs/action-queue.md b/docs/action-queue.md new file mode 100644 index 00000000..f14c02c7 --- /dev/null +++ b/docs/action-queue.md @@ -0,0 +1,147 @@ +# Action queue + +The action queue automatically groups rapid, consecutive calls to `execute_action_group()` into a single ActionGroup execution. This minimizes the number of API calls and helps prevent rate limiting issues, such as `TooManyRequestsException`, `TooManyConcurrentRequestsException`, `TooManyExecutionsException`, or `ExecutionQueueFullException` which can occur if actions are sent individually in quick succession. + +Important limitation: +- Gateways only allow a single action per device in each action group. The queue + merges commands for the same `device_url` into a single action to keep the + batch valid and preserve command order for that device. +- If you pass multiple actions for the same `device_url` in a single + `execute_action_group()` call, the queue will merge them for you. + +## Enable with defaults + +Set `action_queue=True` to enable batching with default settings: + +```python +import asyncio + +from pyoverkiz.auth import UsernamePasswordCredentials +from pyoverkiz.client import OverkizClient +from pyoverkiz.enums import OverkizCommand, Server +from pyoverkiz.models import Action, Command + +client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("user@example.com", "password"), + action_queue=True, # uses defaults +) + +action1 = Action( + device_url="io://1234-5678-1234/12345678", + commands=[Command(name=OverkizCommand.CLOSE)], +) +action2 = Action( + device_url="io://1234-5678-1234/87654321", + commands=[Command(name=OverkizCommand.OPEN)], +) + +task1 = asyncio.create_task(client.execute_action_group([action1])) +task2 = asyncio.create_task(client.execute_action_group([action2])) +exec_id1, exec_id2 = await asyncio.gather(task1, task2) + +print(exec_id1 == exec_id2) +``` + +Defaults: +- `delay=0.5` +- `max_actions=20` + +## Advanced settings + +If you need to tune batching behavior, pass `ActionQueueSettings`: + +```python +import asyncio + +from pyoverkiz.action_queue import ActionQueueSettings +from pyoverkiz.client import OverkizClient +from pyoverkiz.auth import UsernamePasswordCredentials +from pyoverkiz.enums import OverkizCommand, Server +from pyoverkiz.models import Action, Command + +client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("user@example.com", "password"), + action_queue=ActionQueueSettings( + delay=0.5, # seconds to wait before auto-flush + max_actions=20, # auto-flush when this count is reached + ), +) +``` + +## `flush_action_queue()` (force immediate execution) + +Normally, queued actions are sent after the delay window or when `max_actions` is reached. Call `flush_action_queue()` to force the queue to execute immediately, which is useful when you want to send any pending actions without waiting for the delay timer to expire. + +```python +from pyoverkiz.action_queue import ActionQueueSettings +import asyncio + +from pyoverkiz.client import OverkizClient +from pyoverkiz.auth import UsernamePasswordCredentials +from pyoverkiz.enums import OverkizCommand, Server +from pyoverkiz.models import Action, Command + +client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("user@example.com", "password"), + action_queue=ActionQueueSettings(delay=10.0), # long delay +) + +action = Action( + device_url="io://1234-5678-1234/12345678", + commands=[Command(name=OverkizCommand.CLOSE)], +) + +exec_task = asyncio.create_task(client.execute_action_group([action])) + +# Give it time to enter the queue +await asyncio.sleep(0.05) + +# Force immediate execution instead of waiting 10 seconds +await client.flush_action_queue() + +exec_id = await exec_task +print(exec_id) +``` + +Why this matters: +- It lets you keep a long delay for batching, but still force a quick execution when a user interaction demands it. +- Useful before shutdown to avoid leaving actions waiting in the queue. + +## `get_pending_actions_count()` (best-effort count) + +`get_pending_actions_count()` returns a snapshot of how many actions are currently queued. Because the queue can change concurrently (and the method does not acquire the queue lock), the value is approximate. Use it for logging, diagnostics, or UI hints—not for critical control flow. + +```python +from pyoverkiz.client import OverkizClient +from pyoverkiz.auth import UsernamePasswordCredentials +from pyoverkiz.enums import OverkizCommand, Server +from pyoverkiz.models import Action, Command + +client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("user@example.com", "password"), + action_queue=True, +) + +action = Action( + device_url="io://1234-5678-1234/12345678", + commands=[Command(name=OverkizCommand.CLOSE)], +) + +exec_task = asyncio.create_task(client.execute_action_group([action])) +await asyncio.sleep(0.01) + +pending = client.get_pending_actions_count() +print(f"Pending actions (approx): {pending}") + +exec_id = await exec_task +print(exec_id) +``` + +Why it’s best-effort: +- Actions may flush automatically while you read the count. +- New actions may be added concurrently by other tasks. +- The count can be briefly stale, so avoid using it to decide whether you must flush or not. diff --git a/docs/device-control.md b/docs/device-control.md index d1bf7757..50467f99 100644 --- a/docs/device-control.md +++ b/docs/device-control.md @@ -85,8 +85,18 @@ await client.execute_action_group( ], label="Execution: multiple commands", ) -``` -## Polling vs event-driven +## Limitations and rate limits + +Gateways impose limits on how many executions can run or be queued simultaneously. If the execution queue is full, the API will raise an `ExecutionQueueFullException`. Most gateways allow up to 10 concurrent executions. + +### Action queue (batching across calls) + +If you trigger many action groups in rapid succession, you can enable the action +queue to batch calls within a short window. This reduces API calls and helps +avoid rate limits. See the [Action queue](action-queue.md) guide for setup, +advanced settings, and usage patterns. + +### Polling vs event-driven Polling with `get_devices()` is straightforward but may introduce latency and increase server load. For more immediate updates, consider using event listeners. Refer to the [event handling](event-handling.md) guide for implementation details. diff --git a/mkdocs.yml b/mkdocs.yml index 28c5d6f0..2a8508e7 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -46,6 +46,7 @@ nav: - Getting started: getting-started.md - Core concepts: core-concepts.md - Device control: device-control.md + - Action queue: action-queue.md - Event handling: event-handling.md - Error handling: error-handling.md - Troubleshooting: troubleshooting.md diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py new file mode 100644 index 00000000..57ae5c49 --- /dev/null +++ b/pyoverkiz/action_queue.py @@ -0,0 +1,324 @@ +"""Action queue for batching multiple action executions into single API calls.""" + +from __future__ import annotations + +import asyncio +import contextlib +from collections.abc import Callable, Coroutine, Generator +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from pyoverkiz.enums import CommandMode + from pyoverkiz.models import Action + + +@dataclass(frozen=True, slots=True) +class ActionQueueSettings: + """Settings for configuring the action queue behavior.""" + + delay: float = 0.5 + max_actions: int = 20 + + def validate(self) -> None: + """Validate configuration values for the action queue.""" + if self.delay <= 0: + raise ValueError(f"action_queue_delay must be positive, got {self.delay!r}") + if self.max_actions < 1: + raise ValueError( + f"action_queue_max_actions must be at least 1, got {self.max_actions!r}" + ) + + +class QueuedExecution: + """Represents a queued action execution that will resolve to an exec_id when the batch executes.""" + + def __init__(self) -> None: + """Initialize the queued execution.""" + # Future is created lazily to ensure it is bound to the running event loop. + # Creating it in __init__ would fail if no loop is running yet. + self._future: asyncio.Future[str] | None = None + + def _ensure_future(self) -> asyncio.Future[str]: + """Create the underlying future lazily, bound to the running event loop.""" + # This method is the single point of future creation to guarantee + # consistent loop binding for callers that await or set results later. + if self._future is None: + loop = asyncio.get_running_loop() + self._future = loop.create_future() + return self._future + + def set_result(self, exec_id: str) -> None: + """Set the execution ID result.""" + future = self._ensure_future() + if not future.done(): + future.set_result(exec_id) + + def set_exception(self, exception: BaseException) -> None: + """Set an exception if the batch execution failed.""" + future = self._ensure_future() + if not future.done(): + future.set_exception(exception) + + def is_done(self) -> bool: + """Check if the execution has completed (either with result or exception).""" + return self._future.done() if self._future is not None else False + + def __await__(self) -> Generator[Any, None, str]: + """Make this awaitable.""" + return self._ensure_future().__await__() + + +class ActionQueue: + """Batches multiple action executions into single API calls. + + When actions are added, they are held for a configurable delay period. + If more actions arrive during this window, they are batched together. + The batch is flushed when: + - The delay timer expires + - The max actions limit is reached + - The command mode changes + - The label changes + - Manual flush is requested + """ + + def __init__( + self, + executor: Callable[ + [list[Action], CommandMode | None, str | None], Coroutine[None, None, str] + ], + delay: float = 0.5, + max_actions: int = 20, + ) -> None: + """Initialize the action queue. + + :param executor: Async function to execute batched actions + :param delay: Seconds to wait before auto-flushing (default 0.5) + :param max_actions: Maximum actions per batch before forced flush (default 20) + """ + self._executor = executor + self._delay = delay + self._max_actions = max_actions + + self._pending_actions: list[Action] = [] + self._pending_mode: CommandMode | None = None + self._pending_label: str | None = None + self._pending_waiters: list[QueuedExecution] = [] + + self._flush_task: asyncio.Task[None] | None = None + self._lock = asyncio.Lock() + + async def add( + self, + actions: list[Action], + mode: CommandMode | None = None, + label: str | None = None, + ) -> QueuedExecution: + """Add actions to the queue. + + When multiple actions target the same device, their commands are merged + into a single action to respect the gateway limitation of one action per + device in each action group. + + :param actions: Actions to queue + :param mode: Command mode (will flush if different from pending mode) + :param label: Label for the action group + :return: QueuedExecution that resolves to exec_id when batch executes + """ + batches_to_execute: list[ + tuple[list[Action], CommandMode | None, str | None, list[QueuedExecution]] + ] = [] + + if not actions: + raise ValueError("actions must contain at least one Action") + + normalized_actions: list[Action] = [] + normalized_index: dict[str, Action] = {} + for action in actions: + existing = normalized_index.get(action.device_url) + if existing is None: + normalized_actions.append(action) + normalized_index[action.device_url] = action + else: + existing.commands.extend(action.commands) + + async with self._lock: + # If mode or label changes, flush existing queue first + if self._pending_actions and ( + mode != self._pending_mode or label != self._pending_label + ): + batches_to_execute.append(self._prepare_flush()) + + # Add actions to pending queue + for action in normalized_actions: + pending = next( + ( + pending_action + for pending_action in self._pending_actions + if pending_action.device_url == action.device_url + ), + None, + ) + if pending is None: + self._pending_actions.append(action) + else: + pending.commands.extend(action.commands) + self._pending_mode = mode + self._pending_label = label + + # Create waiter for this caller. This waiter is added to the current + # batch being built, even if we flushed a previous batch above due to + # a mode/label change. This ensures the waiter belongs to the batch + # containing the actions we just added. + waiter = QueuedExecution() + self._pending_waiters.append(waiter) + + # If we hit max actions, flush immediately + if len(self._pending_actions) >= self._max_actions: + # Prepare the current batch for flushing (which includes the actions + # we just added). If we already flushed due to mode change, this is + # a second batch. + batches_to_execute.append(self._prepare_flush()) + elif self._flush_task is None or self._flush_task.done(): + # Schedule delayed flush if not already scheduled + self._flush_task = asyncio.create_task(self._delayed_flush()) + + # Execute batches outside the lock if we flushed + for batch in batches_to_execute: + if batch[0]: + await self._execute_batch(*batch) + + return waiter + + async def _delayed_flush(self) -> None: + """Wait for the delay period, then flush the queue.""" + waiters: list[QueuedExecution] = [] + try: + await asyncio.sleep(self._delay) + async with self._lock: + if not self._pending_actions: + return + + # Take snapshot and clear state while holding lock + actions = self._pending_actions + mode = self._pending_mode + label = self._pending_label + waiters = self._pending_waiters + + self._pending_actions = [] + self._pending_mode = None + self._pending_label = None + self._pending_waiters = [] + self._flush_task = None + + # Execute outside the lock + await self._execute_batch(actions, mode, label, waiters) + except asyncio.CancelledError as exc: + # Ensure all waiters are notified if this task is cancelled + for waiter in waiters: + waiter.set_exception(exc) + raise + + def _prepare_flush( + self, + ) -> tuple[list[Action], CommandMode | None, str | None, list[QueuedExecution]]: + """Prepare a flush by taking snapshot and clearing state (must be called with lock held). + + Returns a tuple of (actions, mode, label, waiters) that should be executed + outside the lock using _execute_batch(). + """ + if not self._pending_actions: + return ([], None, None, []) + + # Cancel any pending flush task + if self._flush_task and not self._flush_task.done(): + self._flush_task.cancel() + self._flush_task = None + + # Take snapshot of current batch + actions = self._pending_actions + mode = self._pending_mode + label = self._pending_label + waiters = self._pending_waiters + + # Clear pending state + self._pending_actions = [] + self._pending_mode = None + self._pending_label = None + self._pending_waiters = [] + + return (actions, mode, label, waiters) + + async def _execute_batch( + self, + actions: list[Action], + mode: CommandMode | None, + label: str | None, + waiters: list[QueuedExecution], + ) -> None: + """Execute a batch of actions and notify waiters (must be called without lock).""" + if not actions: + return + + try: + exec_id = await self._executor(actions, mode, label) + # Notify all waiters + for waiter in waiters: + waiter.set_result(exec_id) + except asyncio.CancelledError as exc: + # Propagate cancellation to all waiters, then re-raise. + for waiter in waiters: + waiter.set_exception(exc) + raise + except Exception as exc: + # Propagate exceptions to all waiters without swallowing system-level exits. + for waiter in waiters: + waiter.set_exception(exc) + + async def flush(self) -> None: + """Force flush all pending actions immediately. + + This method forces the queue to execute any pending batched actions + without waiting for the delay timer. The execution results are delivered + to the corresponding QueuedExecution objects returned by add(). + + This method is useful for forcing immediate execution without having to + wait for the delay timer to expire. + """ + batch_to_execute = None + async with self._lock: + if self._pending_actions: + batch_to_execute = self._prepare_flush() + + # Execute outside the lock + if batch_to_execute: + await self._execute_batch(*batch_to_execute) + + def get_pending_count(self) -> int: + """Get the (approximate) number of actions currently waiting in the queue. + + This method does not acquire the internal lock and therefore returns a + best-effort snapshot that may be slightly out of date if the queue is + being modified concurrently by other coroutines. Do not rely on this + value for critical control flow or for making flush decisions. + """ + return len(self._pending_actions) + + async def shutdown(self) -> None: + """Shutdown the queue, flushing any pending actions.""" + batch_to_execute = None + async with self._lock: + if self._flush_task and not self._flush_task.done(): + task = self._flush_task + task.cancel() + self._flush_task = None + # Wait for cancellation to complete + with contextlib.suppress(asyncio.CancelledError): + await task + + if self._pending_actions: + batch_to_execute = self._prepare_flush() + + # Execute outside the lock + if batch_to_execute: + await self._execute_batch(*batch_to_execute) diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index c81b20ea..310059a0 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -19,6 +19,7 @@ ServerDisconnectedError, ) +from pyoverkiz.action_queue import ActionQueue, ActionQueueSettings from pyoverkiz.auth import AuthStrategy, Credentials, build_auth_strategy from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.enums import APIType, CommandMode, Server @@ -141,6 +142,7 @@ class OverkizClient: session: ClientSession _ssl: ssl.SSLContext | bool = True _auth: AuthStrategy + _action_queue: ActionQueue | None = None def __init__( self, @@ -149,11 +151,15 @@ def __init__( credentials: Credentials, verify_ssl: bool = True, session: ClientSession | None = None, + action_queue: bool | ActionQueueSettings = False, ) -> None: """Constructor. :param server: ServerConfig + :param credentials: Credentials for authentication + :param verify_ssl: Enable SSL certificate verification :param session: optional ClientSession + :param action_queue: enable batching or provide queue settings (default False) """ self.server_config = self._normalize_server(server) @@ -173,6 +179,26 @@ def __init__( # Use the prebuilt SSL context with disabled strict validation for local API. self._ssl = SSL_CONTEXT_LOCAL_API + # Initialize action queue if enabled + queue_settings: ActionQueueSettings | None + if isinstance(action_queue, ActionQueueSettings): + queue_settings = action_queue + elif isinstance(action_queue, bool): + queue_settings = ActionQueueSettings() if action_queue else None + else: + raise TypeError( + "action_queue must be a bool or ActionQueueSettings, " + f"got {type(action_queue).__name__}" + ) + + if queue_settings: + queue_settings.validate() + self._action_queue = ActionQueue( + executor=self._execute_action_group_direct, + delay=queue_settings.delay, + max_actions=queue_settings.max_actions, + ) + self._auth = build_auth_strategy( server_config=self.server_config, credentials=credentials, @@ -210,6 +236,10 @@ def _normalize_server(server: ServerConfig | Server | str) -> ServerConfig: async def close(self) -> None: """Close the session.""" + # Flush any pending actions in queue + if self._action_queue: + await self._action_queue.shutdown() + if self.event_listener_id: await self.unregister_event_listener() @@ -431,13 +461,13 @@ async def get_api_version(self) -> str: @retry_on_too_many_executions @retry_on_auth_error - async def execute_action_group( + async def _execute_action_group_direct( self, actions: list[Action], mode: CommandMode | None = None, label: str | None = "python-overkiz-api", ) -> str: - """Execute a non-persistent action group. + """Execute a non-persistent action group directly (internal method). The executed action group does not have to be persisted on the server before use. Per-session rate-limit : 1 calls per 28min 48s period for all operations of the same category (exec) @@ -462,6 +492,62 @@ async def execute_action_group( return cast(str, response["execId"]) + async def execute_action_group( + self, + actions: list[Action], + mode: CommandMode | None = None, + label: str | None = "python-overkiz-api", + ) -> str: + """Execute a non-persistent action group. + + When action queue is enabled, actions will be batched with other actions + executed within the configured delay window. The method will wait for the + batch to execute and return the exec_id. + + Gateways only allow a single action per device in each action group. The + action queue enforces this by merging commands for the same device into + a single action in the batch. + + When action queue is disabled, executes immediately and returns exec_id. + + The API is consistent regardless of queue configuration - always returns + exec_id string directly. + + :param actions: List of actions to execute + :param mode: Command mode (GEOLOCATED, INTERNAL, HIGH_PRIORITY, or None) + :param label: Label for the action group + :return: exec_id string from the executed action group + + Example usage:: + + # Works the same with or without queue + exec_id = await client.execute_action_group([action]) + """ + if self._action_queue: + queued = await self._action_queue.add(actions, mode, label) + return await queued + else: + return await self._execute_action_group_direct(actions, mode, label) + + async def flush_action_queue(self) -> None: + """Force flush all pending actions in the queue immediately. + + If action queue is disabled, this method does nothing. + If there are no pending actions, this method does nothing. + """ + if self._action_queue: + await self._action_queue.flush() + + def get_pending_actions_count(self) -> int: + """Get the approximate number of actions currently waiting in the queue. + + Returns 0 if action queue is disabled. This is a best-effort snapshot + and may be stale if other coroutines modify the queue concurrently. + """ + if self._action_queue: + return self._action_queue.get_pending_count() + return 0 + @retry_on_auth_error async def cancel_command(self, exec_id: str) -> None: """Cancel a running setup-level execution.""" diff --git a/pyoverkiz/models.py b/pyoverkiz/models.py index ce2c0064..624a4331 100644 --- a/pyoverkiz/models.py +++ b/pyoverkiz/models.py @@ -472,12 +472,12 @@ class Command: """Represents an OverKiz Command.""" type: int | None = None - name: OverkizCommand + name: str | OverkizCommand parameters: list[str | int | float | OverkizCommandParam] | None def __init__( self, - name: OverkizCommand, + name: str | OverkizCommand, parameters: list[str | int | float | OverkizCommandParam] | None = None, type: int | None = None, **_: Any, diff --git a/tests/test_action_queue.py b/tests/test_action_queue.py new file mode 100644 index 00000000..752e344d --- /dev/null +++ b/tests/test_action_queue.py @@ -0,0 +1,323 @@ +"""Tests for ActionQueue.""" + +import asyncio +from unittest.mock import AsyncMock + +import pytest + +from pyoverkiz.action_queue import ActionQueue, QueuedExecution +from pyoverkiz.enums import CommandMode, OverkizCommand +from pyoverkiz.models import Action, Command + + +@pytest.fixture +def mock_executor(): + """Create a mock executor function.""" + + async def executor(actions, mode, label): + # Return immediately, no delay + return f"exec-{len(actions)}-{mode}-{label}" + + return AsyncMock(side_effect=executor) + + +@pytest.mark.asyncio +async def test_action_queue_single_action(mock_executor): + """Test queue with a single action.""" + queue = ActionQueue(executor=mock_executor, delay=0.1) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + queued = await queue.add([action]) + assert isinstance(queued, QueuedExecution) + + # Wait for the batch to execute + exec_id = await queued + assert exec_id.startswith("exec-1-") + + # Verify executor was called + mock_executor.assert_called_once() + + +@pytest.mark.asyncio +async def test_action_queue_batching(mock_executor): + """Test that multiple actions are batched together.""" + queue = ActionQueue(executor=mock_executor, delay=0.2) + + actions = [ + Action( + device_url=f"io://1234-5678-9012/{i}", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + for i in range(3) + ] + + # Add actions in quick succession + queued1 = await queue.add([actions[0]]) + queued2 = await queue.add([actions[1]]) + queued3 = await queue.add([actions[2]]) + + # All should return the same exec_id + exec_id1 = await queued1 + exec_id2 = await queued2 + exec_id3 = await queued3 + + assert exec_id1 == exec_id2 == exec_id3 + assert "exec-3-" in exec_id1 # 3 actions in batch + + # Executor should be called only once + mock_executor.assert_called_once() + + +@pytest.mark.asyncio +async def test_action_queue_max_actions_flush(mock_executor): + """Test that queue flushes when max actions is reached.""" + queue = ActionQueue(executor=mock_executor, delay=10.0, max_actions=3) + + actions = [ + Action( + device_url=f"io://1234-5678-9012/{i}", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + for i in range(5) + ] + + # Add 3 actions - should trigger flush + queued1 = await queue.add([actions[0]]) + queued2 = await queue.add([actions[1]]) + queued3 = await queue.add([actions[2]]) + + # Wait a bit for flush to complete + await asyncio.sleep(0.05) + + # First 3 should be done + assert queued1.is_done() + assert queued2.is_done() + assert queued3.is_done() + + # Add 2 more - should start a new batch + queued4 = await queue.add([actions[3]]) + queued5 = await queue.add([actions[4]]) + + # Wait for second batch + await queued4 + await queued5 + + # Should have been called twice (2 batches) + assert mock_executor.call_count == 2 + + +@pytest.mark.asyncio +async def test_action_queue_mode_change_flush(mock_executor): + """Test that queue flushes when command mode changes.""" + queue = ActionQueue(executor=mock_executor, delay=0.5) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + # Add action with normal mode + queued1 = await queue.add([action], mode=None) + + # Add action with high priority - should flush previous batch + queued2 = await queue.add([action], mode=CommandMode.HIGH_PRIORITY) + + # Wait for both batches + exec_id1 = await queued1 + exec_id2 = await queued2 + + # Should be different exec_ids (different batches) + assert exec_id1 != exec_id2 + + # Should have been called twice + assert mock_executor.call_count == 2 + + +@pytest.mark.asyncio +async def test_action_queue_label_change_flush(mock_executor): + """Test that queue flushes when label changes.""" + queue = ActionQueue(executor=mock_executor, delay=0.5) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + # Add action with label1 + queued1 = await queue.add([action], label="label1") + + # Add action with label2 - should flush previous batch + queued2 = await queue.add([action], label="label2") + + # Wait for both batches + exec_id1 = await queued1 + exec_id2 = await queued2 + + # Should be different exec_ids (different batches) + assert exec_id1 != exec_id2 + + # Should have been called twice + assert mock_executor.call_count == 2 + + +@pytest.mark.asyncio +async def test_action_queue_duplicate_device_merge(mock_executor): + """Test that queue merges commands for duplicate devices.""" + queue = ActionQueue(executor=mock_executor, delay=0.5) + + action1 = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + action2 = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.OPEN)], + ) + + queued1 = await queue.add([action1]) + queued2 = await queue.add([action2]) + + exec_id1 = await queued1 + exec_id2 = await queued2 + + assert exec_id1 == exec_id2 + mock_executor.assert_called_once() + + +@pytest.mark.asyncio +async def test_action_queue_duplicate_device_merge_order(mock_executor): + """Test that command order is preserved when merging.""" + queue = ActionQueue(executor=mock_executor, delay=0.1) + + action1 = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + action2 = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.OPEN)], + ) + + queued = await queue.add([action1, action2]) + await queued + + args, _ = mock_executor.call_args + actions = args[0] + assert len(actions) == 1 + assert [command.name for command in actions[0].commands] == [ + OverkizCommand.CLOSE, + OverkizCommand.OPEN, + ] + + +@pytest.mark.asyncio +async def test_action_queue_manual_flush(mock_executor): + """Test manual flush of the queue.""" + queue = ActionQueue(executor=mock_executor, delay=10.0) # Long delay + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + queued = await queue.add([action]) + + # Manually flush + await queue.flush() + + # Should be done now + assert queued.is_done() + exec_id = await queued + assert exec_id.startswith("exec-1-") + + +@pytest.mark.asyncio +async def test_action_queue_shutdown(mock_executor): + """Test that shutdown flushes pending actions.""" + queue = ActionQueue(executor=mock_executor, delay=10.0) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + queued = await queue.add([action]) + + # Shutdown should flush + await queue.shutdown() + + # Should be done + assert queued.is_done() + mock_executor.assert_called_once() + + +@pytest.mark.asyncio +async def test_action_queue_error_propagation(mock_executor): + """Test that exceptions are propagated to all waiters.""" + # Make executor raise an exception + mock_executor.side_effect = ValueError("API Error") + + queue = ActionQueue(executor=mock_executor, delay=0.1) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + queued1 = await queue.add([action]) + queued2 = await queue.add([action]) + + # Both should raise the same exception + with pytest.raises(ValueError, match="API Error"): + await queued1 + + with pytest.raises(ValueError, match="API Error"): + await queued2 + + +@pytest.mark.asyncio +async def test_action_queue_get_pending_count(): + """Test getting pending action count.""" + mock_executor = AsyncMock(return_value="exec-123") + queue = ActionQueue(executor=mock_executor, delay=0.5) + + assert queue.get_pending_count() == 0 + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + await queue.add([action]) + assert queue.get_pending_count() == 1 + + await queue.add([action]) + assert queue.get_pending_count() == 1 + + # Wait for flush + await asyncio.sleep(0.6) + assert queue.get_pending_count() == 0 + + +@pytest.mark.asyncio +async def test_queued_execution_awaitable(): + """Test that QueuedExecution is properly awaitable.""" + queued = QueuedExecution() + + # Set result in background + async def set_result(): + await asyncio.sleep(0.05) + queued.set_result("exec-123") + + task = asyncio.create_task(set_result()) + + # Await the result + result = await queued + assert result == "exec-123" + + # Ensure background task has completed + await task diff --git a/tests/test_client_queue_integration.py b/tests/test_client_queue_integration.py new file mode 100644 index 00000000..01df9d12 --- /dev/null +++ b/tests/test_client_queue_integration.py @@ -0,0 +1,223 @@ +"""Integration tests for OverkizClient with ActionQueue.""" + +import asyncio +from unittest.mock import AsyncMock, patch + +import pytest + +from pyoverkiz.action_queue import ActionQueueSettings +from pyoverkiz.auth import UsernamePasswordCredentials +from pyoverkiz.client import OverkizClient +from pyoverkiz.enums import OverkizCommand, Server +from pyoverkiz.models import Action, Command + + +@pytest.mark.asyncio +async def test_client_without_queue_executes_immediately(): + """Test that client without queue executes actions immediately.""" + client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("test@example.com", "test"), + action_queue=False, + ) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + # Mock the internal execution + with patch.object( + client, "_OverkizClient__post", new_callable=AsyncMock + ) as mock_post: + mock_post.return_value = {"execId": "exec-123"} + + result = await client.execute_action_group([action]) + + # Should return exec_id directly (string) + assert isinstance(result, str) + assert result == "exec-123" + + # Should have called API immediately + mock_post.assert_called_once() + + await client.close() + + +@pytest.mark.asyncio +async def test_client_with_queue_batches_actions(): + """Test that client with queue batches multiple actions.""" + client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("test@example.com", "test"), + action_queue=ActionQueueSettings(delay=0.1), + ) + + actions = [ + Action( + device_url=f"io://1234-5678-9012/{i}", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + for i in range(3) + ] + + with patch.object( + client, "_OverkizClient__post", new_callable=AsyncMock + ) as mock_post: + mock_post.return_value = {"execId": "exec-batched"} + + # Queue multiple actions quickly - start them as tasks to allow batching + task1 = asyncio.create_task(client.execute_action_group([actions[0]])) + task2 = asyncio.create_task(client.execute_action_group([actions[1]])) + task3 = asyncio.create_task(client.execute_action_group([actions[2]])) + + # Give them a moment to queue + await asyncio.sleep(0.01) + + # Should have 3 actions pending + assert client.get_pending_actions_count() == 3 + + # Wait for all to execute + exec_id1 = await task1 + exec_id2 = await task2 + exec_id3 = await task3 + + # All should have the same exec_id (batched together) + assert exec_id1 == exec_id2 == exec_id3 == "exec-batched" + + # Should have called API only once (batched) + mock_post.assert_called_once() + + # Check that all 3 actions were in the batch + call_args = mock_post.call_args + payload = call_args[0][1] # Second argument is the payload + assert len(payload["actions"]) == 3 + + await client.close() + + +@pytest.mark.asyncio +async def test_client_manual_flush(): + """Test manually flushing the queue.""" + client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("test@example.com", "test"), + action_queue=ActionQueueSettings(delay=10.0), # Long delay + ) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + with patch.object( + client, "_OverkizClient__post", new_callable=AsyncMock + ) as mock_post: + mock_post.return_value = {"execId": "exec-flushed"} + + # Start execution as a task to allow checking pending count + exec_task = asyncio.create_task(client.execute_action_group([action])) + + # Give it a moment to queue + await asyncio.sleep(0.01) + + # Should have 1 action pending + assert client.get_pending_actions_count() == 1 + + # Manually flush + await client.flush_action_queue() + + # Should be executed now + assert client.get_pending_actions_count() == 0 + + exec_id = await exec_task + assert exec_id == "exec-flushed" + + mock_post.assert_called_once() + + await client.close() + + +@pytest.mark.asyncio +async def test_client_close_flushes_queue(): + """Test that closing the client flushes pending actions.""" + client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("test@example.com", "test"), + action_queue=ActionQueueSettings(delay=10.0), + ) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + with patch.object( + client, "_OverkizClient__post", new_callable=AsyncMock + ) as mock_post: + mock_post.return_value = {"execId": "exec-closed"} + + # Start execution as a task + exec_task = asyncio.create_task(client.execute_action_group([action])) + + # Give it a moment to queue + await asyncio.sleep(0.01) + + # Close should flush + await client.close() + + # Should be executed + exec_id = await exec_task + assert exec_id == "exec-closed" + + mock_post.assert_called_once() + + +@pytest.mark.asyncio +async def test_client_queue_respects_max_actions(): + """Test that queue flushes when max actions is reached.""" + client = OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("test@example.com", "test"), + action_queue=ActionQueueSettings( + delay=10.0, + max_actions=2, # Max 2 actions + ), + ) + + actions = [ + Action( + device_url=f"io://1234-5678-9012/{i}", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + for i in range(3) + ] + + with patch.object( + client, "_OverkizClient__post", new_callable=AsyncMock + ) as mock_post: + mock_post.return_value = {"execId": "exec-123"} + + # Add 2 actions as tasks to trigger flush + task1 = asyncio.create_task(client.execute_action_group([actions[0]])) + task2 = asyncio.create_task(client.execute_action_group([actions[1]])) + + # Wait a bit for flush + await asyncio.sleep(0.05) + + # First 2 should be done + exec_id1 = await task1 + exec_id2 = await task2 + assert exec_id1 == "exec-123" + assert exec_id2 == "exec-123" + + # Add third action - starts new batch + exec_id3 = await client.execute_action_group([actions[2]]) + + # Should have exec_id directly (waited for batch to complete) + assert exec_id3 == "exec-123" + + # Should have been called twice (2 batches) + assert mock_post.call_count == 2 + + await client.close()