diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2cf19ed..81b5b07 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,3 +52,20 @@ jobs: - name: Test run: pixi run cargo test --lib -p bubbaloop + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Python SDK — lint + run: | + cd python-sdk + pip install -e ".[dev]" -q + python -m ruff format --check bubbaloop_sdk/ tests/ + python -m ruff check bubbaloop_sdk/ tests/ + + - name: Python SDK — test + run: | + cd python-sdk + python -m pytest tests/ -v diff --git a/.markdownlint.json b/.markdownlint.json new file mode 100644 index 0000000..c44ef7c --- /dev/null +++ b/.markdownlint.json @@ -0,0 +1,4 @@ +{ + "MD013": false, + "MD060": false +} diff --git a/python-sdk/CLAUDE.md b/python-sdk/CLAUDE.md new file mode 100644 index 0000000..8a892ee --- /dev/null +++ b/python-sdk/CLAUDE.md @@ -0,0 +1,159 @@ +# bubbaloop-sdk (Python) + +Pure Python wrapper over `zenoh-python`. Synchronous API — no asyncio required. +Mirrors the Rust `bubbaloop-node` SDK surface; nodes written with either SDK are interoperable. + +## Structure + +``` +python-sdk/ + bubbaloop_sdk/ + __init__.py # Public API — edit when adding new public names + context.py # NodeContext: connect(), topic(), publishers, subscribers, queryables + publisher.py # JsonPublisher, ProtoPublisher (wraps session.declare_publisher) + subscriber.py # ProtoSubscriber, RawSubscriber, CallbackSubscriber, RawCallbackSubscriber, Queryable + node.py # run_node() — CLI arg parsing + health heartbeat + lifecycle + health.py # start_health_heartbeat() — publishes 'ok' every 5s + discover.py # discover_nodes() — GET bubbaloop/**/health + get_sample.py # get_sample() — one-shot async subscribe-and-wait + decode_sample.py # ProtoDecoder — decode zenoh.Sample to protobuf + tests/ + test_context.py # 71 unit tests — NO real Zenoh session needed + pyproject.toml # Build config, deps, ruff/pytest/coverage + pixi.toml # Dev tasks: test, lint, fmt, check +``` + +## Build & verify + +```bash +# With pixi (recommended) +cd python-sdk +pixi run check # fmt-check + lint (run before every commit) +pixi run test # 71 unit tests +pixi run test-cov # tests + coverage report + +# With venv (alternative) +cd python-sdk +.venv/bin/python -m ruff check bubbaloop_sdk/ tests/ +.venv/bin/python -m pytest tests/ -v +``` + +## Conventions — MUST follow + +**Tooling:** +- `ruff` for lint + format — NOT flake8, black, or isort directly +- Config in `pyproject.toml` under `[tool.ruff]` — do NOT add `.flake8` or `setup.cfg` +- Line length: 120 characters +- `TYPE_CHECKING` guard for cross-module type annotations — NEVER string-quoted forward refs (`"Foo"`) + +**Type annotations:** + +- Use modern Python 3.11+ union syntax: `X | Y` and `X | None` — NOT `Union[X, Y]` or `Optional[X]` +- Annotate all public method parameters and return types +- Annotate class attributes and instance variables when the type is not obvious from the assignment +- When fixing type errors, follow this hierarchy: + 1. Add proper type annotations + 2. Use `X | Y` union syntax or `cast()` from `typing` + 3. Use `TYPE_CHECKING` for circular imports + 4. Last resort: `# type: ignore[]` with a comment explaining why +- AVOID `# type: ignore` without an error code — always be specific + +**Docstrings:** +- Google docstring style for all public modules, classes, and functions +- Do NOT add a docstring to `__init__()` — document instantiation at the class level instead +- Include `Args:`, `Returns:`, and `Raises:` sections when applicable + +```python +class CallbackSubscriber: + """Event-driven subscriber that calls a handler on each received message. + + The handler is invoked from Zenoh's internal callback thread by default. + Pass ``max_workers`` to run the handler in a thread pool instead — use + this for slow work (I/O, DB writes, HTTP calls). + + Args: + session: Active Zenoh session. + topic: Key expression to subscribe to. + handler: Callable invoked with each decoded message. + registry: SchemaRegistry for auto-decoding samples by encoding header. + """ + + def __init__( + self, + session: zenoh.Session, + topic: str, + handler: Callable, + registry, + ): ... + + def undeclare(self) -> None: + """Undeclare the Zenoh subscriber and release resources.""" +``` + +**String formatting:** + +- Use `%`-style formatting for log calls — NOT f-strings: `log.info("Started %s", name)` + - Reason: lazy evaluation — the string is only formatted if the log level is active +- Use f-strings everywhere else: `raise ValueError(f"Unknown topic: {topic}")` + +**Imports:** +- Cross-module type-only imports go under `if TYPE_CHECKING:` at the top of the file +- Lazy runtime imports (inside method bodies) are kept to avoid circular import issues +- `__init__.py` must be updated whenever a new public class is added to `subscriber.py` or `publisher.py` + +**Zenoh session:** +- ALWAYS use `mode: "client"` — peer mode does not route through zenohd +- NEVER use `.complete(True)` on queryables — blocks wildcard queries like `bubbaloop/**/schema` +- `query.key_expr` is a **property**, NOT a method — NEVER write `query.key_expr()` +- `query.reply(query.key_expr, payload_bytes)` — correct reply pattern + +**Threading — critical:** +- Zenoh uses **one internal thread** for ALL callbacks and queryables on a session +- A slow handler blocks every other subscriber/queryable until it returns +- Pass `max_workers=N` to `subscriber_callback` / `subscriber_raw_callback` / `queryable` / `queryable_raw` for any handler that does I/O, DB access, or hardware calls +- Shutdown order for thread-pool variants: undeclare Zenoh subscriber FIRST, then `executor.shutdown()` — reversing this causes `RuntimeError: cannot schedule new futures after shutdown` + +**`undeclare()` discipline:** +- Every subscriber, callback subscriber, and queryable must be undeclared when done +- `Queryable`, `CallbackSubscriber`, and `RawCallbackSubscriber` with `max_workers` own a `ThreadPoolExecutor` — GC alone is not enough, always call `undeclare()` +- Blocking subscribers (`RawSubscriber`) are undeclared via `undeclare()` too + +## Testing + +Tests do NOT open a real Zenoh session. Use `_make_context()`: + +```python +def _make_context(machine_id): + from bubbaloop_sdk.context import NodeContext + ctx = object.__new__(NodeContext) + ctx.session = MagicMock() + ctx.machine_id = machine_id + ctx.instance_name = machine_id + ctx._shutdown = threading.Event() + return ctx +``` + +For async/threaded tests use `threading.Event` with a 2s timeout — do NOT use `time.sleep`: + +```python +event = threading.Event() +def handler(msg): + received.append(msg) + event.set() +assert event.wait(timeout=2.0), "handler not called within 2s" +``` + +## DO / DON'T + +**DO:** `pixi run check` before every commit | add tests when adding public methods | update `__init__.py` and its `__all__` for every new public class | call `undeclare()` in tests that create async subscribers or queryables + +**DON'T:** use `asyncio` — the SDK is synchronous by design | use `query.key_expr()` with parentheses | use `.complete(True)` on queryables | add string forward references (`"Foo"`) — use `TYPE_CHECKING` instead | suppress lint rules globally when a per-file or code-level fix is possible + +## Pitfalls + +- `B904` — always `raise Foo from err` inside `except` blocks, never bare `raise Foo(...)` +- `F401` in `__init__.py` is suppressed by ruff config (re-exports are intentional) — do NOT add `# noqa` comments there +- `CallbackSubscriber` and `RawCallbackSubscriber` without `max_workers` do NOT own an executor — `undeclare()` only calls `_sub.undeclare()`; with `max_workers` they own a `ThreadPoolExecutor` and shut it down in `undeclare()` +- `ProtoSubscriber` and `RawSubscriber` are iterable (`for msg in sub`); iteration raises `StopIteration` on exception via `_BaseSubscriber.__next__` — prefer `recv(timeout=...)` in shutdown-aware loops to avoid blocking indefinitely +- `run_node()` reads `config.yaml` by default; override with `-c path/config.yaml`. The `name` field in config sets `instance_name` for health/schema topics — collisions happen if two instances share the same name +- Health topic format: `bubbaloop/global/{machine_id}/{instance_name}/health` — ensure consumer patterns match exactly diff --git a/python-sdk/CONTRIBUTING.md b/python-sdk/CONTRIBUTING.md new file mode 100644 index 0000000..110608d --- /dev/null +++ b/python-sdk/CONTRIBUTING.md @@ -0,0 +1,81 @@ +# Contributing to bubbaloop-sdk (Python) + +## Dev environment + +### With pixi (recommended) + +```bash +cd python-sdk +pixi install # creates env, installs all deps including dev extras +pixi run test +pixi run lint +pixi run fmt +``` + +Available tasks: + +| Task | Description | +|---|---| +| `pixi run test` | Run test suite | +| `pixi run test-cov` | Run tests with coverage report | +| `pixi run lint` | Check for lint errors (ruff) | +| `pixi run lint-fix` | Auto-fix lint errors | +| `pixi run fmt` | Format code | +| `pixi run fmt-check` | Check formatting without changing files | +| `pixi run check` | Run fmt-check + lint (CI equivalent) | + +### With plain venv + +```bash +cd python-sdk +python3 -m venv .venv +.venv/bin/pip install -e ".[dev]" +.venv/bin/pytest tests/ -v +.venv/bin/ruff check bubbaloop_sdk/ tests/ +``` + +## Linting (ruff) + +Config lives in `python-sdk/pyproject.toml` under `[tool.ruff]`. +Follows the same pattern as [kornia/kornia](https://github.com/kornia/kornia). + +Rules enabled: E/W (pycodestyle), F (Pyflakes), I (isort), B (bugbear), +UP (pyupgrade), C4 (comprehensions), RUF (ruff-specific). + +Line length: 120 characters. + +## Lint suppressions + +| File | Rule | Reason | +|---|---|---| +| `*/__init__.py` | F401, F403 | Re-exports allowed | +| `tests/*` | S101, D | Assert and missing docstrings allowed in tests | + +## Testing + +Tests in `tests/test_context.py` do **not** open a real Zenoh session. +`_make_context()` uses `object.__new__(NodeContext)` + `MagicMock()` — no router needed. + +For async subscriber/queryable tests, `threading.Event` with a 2s timeout +verifies that handlers are dispatched to the thread pool correctly. + +## Project structure + +```text +python-sdk/ + pyproject.toml # Build config, deps, ruff/pytest/coverage config + pixi.toml # Pixi tasks (test, lint, fmt, check) + README.md # User-facing API docs + bubbaloop_sdk/ + __init__.py # Public API surface + context.py # NodeContext — main entry point + subscriber.py # ProtoSubscriber, RawSubscriber, CallbackSubscriber, RawCallbackSubscriber, Queryable + publisher.py # JsonPublisher, ProtoPublisher + node.py # run_node() helper + health.py # Health heartbeat (used internally by run_node) + discover.py # discover_nodes() + get_sample.py # get_sample() one-shot helper + decode_sample.py # ProtoDecoder + tests/ + test_context.py # 68 unit tests (no real Zenoh required) +``` diff --git a/python-sdk/README.md b/python-sdk/README.md index 2a04709..698d478 100644 --- a/python-sdk/README.md +++ b/python-sdk/README.md @@ -1,7 +1,7 @@ # bubbaloop-sdk (Python) -Pure Python wrapper over `zenoh-python` with the same API surface as the Rust Node SDK. -No compilation required — installable directly from the git repository. +Pure Python wrapper over `zenoh-python`. Synchronous API — no asyncio required. +No compilation needed; install directly from the git repository. ## Install @@ -18,7 +18,24 @@ pip install -e ".[dev]" ## Quick start -### Protobuf node +### Publish JSON + +```python +import time +from bubbaloop_sdk import NodeContext + +ctx = NodeContext.connect() +pub = ctx.publisher_json("weather/current") + +while not ctx.is_shutdown(): + pub.put({"temperature": 22.5, "humidity": 60}) + time.sleep(1.0) + +pub.undeclare() +ctx.close() +``` + +### Publish protobuf ```python import time @@ -39,46 +56,63 @@ pub.undeclare() ctx.close() ``` -### JSON node +### Auto-decode subscriber ```python -import time from bubbaloop_sdk import NodeContext ctx = NodeContext.connect() -pub = ctx.publisher_json("weather/current") +sub = ctx.subscribe("sensor/data") -while not ctx.is_shutdown(): - pub.put({"temperature": 22.5, "humidity": 60}) - time.sleep(1.0) +for msg in sub: # auto-decoded: proto, dict, or bytes + print(msg) +``` -pub.undeclare() +### Callback subscriber (event-driven, no loop needed) + +```python +from bubbaloop_sdk import NodeContext + +ctx = NodeContext.connect() + +def on_sensor(msg): + print(f"received: {msg}") # proto, dict, or bytes depending on encoding + +sub = ctx.subscriber_callback("sensor/data", on_sensor) +ctx.wait_shutdown() # block until SIGINT/SIGTERM +sub.undeclare() ctx.close() ``` -### Proto subscriber +Pass `max_workers` when the handler does slow work (DB writes, HTTP calls) — +the handler runs in a thread pool, freeing Zenoh's internal thread: ```python +sub = ctx.subscriber_callback("sensor/data", on_sensor, max_workers=4) +``` + +### Queryable (respond to get requests) + +```python +import json from bubbaloop_sdk import NodeContext -from my_protos_pb2 import SensorData ctx = NodeContext.connect() -sub = ctx.subscriber("sensor/data", SensorData) -for msg in sub: - print(f"value: {msg.value}") +def on_query(query): + query.reply(query.key_expr, json.dumps({"status": "ok"}).encode()) + +qbl = ctx.queryable("status", on_query) +ctx.wait_shutdown() +qbl.undeclare() +ctx.close() ``` -### Schema queryable (protobuf nodes) +Pass `max_workers` when the handler does slow work: ```python -from bubbaloop_sdk.schema import declare_schema_queryable -from my_protos_pb2 import SensorData - -# Declare once — keeps the queryable alive while the reference is held -schema_qbl = declare_schema_queryable( - ctx.session, ctx.machine_id, "my-node", SensorData -) +qbl = ctx.queryable("status", on_query, max_workers=4) +qbl.undeclare() # call when done to release the thread pool ``` ## Configuration @@ -88,13 +122,7 @@ schema_qbl = declare_schema_queryable( | `BUBBALOOP_ZENOH_ENDPOINT` | `tcp/127.0.0.1:7447` | Zenoh router endpoint | | `BUBBALOOP_MACHINE_ID` | hostname (sanitized) | Machine identifier | -## Requirements - -- Python 3.9+ -- `eclipse-zenoh >= 1.7, < 2` -- `protobuf >= 4.0` - -## API +## API reference ### `NodeContext` @@ -105,13 +133,33 @@ schema_qbl = declare_schema_queryable( | `ctx.local_topic(suffix)` | Build `bubbaloop/local/{machine_id}/{suffix}` (SHM-only) | | `ctx.publisher_proto(suffix, msg_class)` | Declared protobuf publisher | | `ctx.publisher_json(suffix)` | Declared JSON publisher | -| `ctx.subscriber(suffix, msg_class=None)` | Proto subscriber (iterable) | -| `ctx.subscriber_raw(key_expr)` | Raw sample subscriber (no topic prefix) | +| `ctx.publisher_raw(suffix, local=False)` | Declared raw publisher (no encoding) | +| `ctx.subscribe(suffix, local=False)` | Auto-decode subscriber (proto/json/bytes) | +| `ctx.subscribe_raw(suffix, local=False)` | Raw bytes subscriber | | `ctx.is_shutdown()` | True after SIGINT/SIGTERM | | `ctx.wait_shutdown()` | Block until shutdown | | `ctx.close()` | Close the Zenoh session | -### `ProtoPublisher` / `JsonPublisher` +#### Callback subscribers (event-driven) + +Handler runs on Zenoh's internal thread by default. Pass `max_workers` to +run the handler in a thread pool instead (for slow work). + +| Method | Description | +|---|---| +| `ctx.subscriber_callback(suffix, handler, max_workers=None)` | Auto-decoded message to handler | +| `ctx.subscriber_raw_callback(key_expr, handler, max_workers=None)` | Raw `zenoh.Sample` to handler | + +#### Queryables + +Do **not** pass `complete=True` — it blocks wildcard queries used by the dashboard. + +| Method | Description | +|---|---| +| `ctx.queryable(suffix, handler, max_workers=None)` | Queryable at `topic(suffix)` | +| `ctx.queryable_raw(key_expr, handler, max_workers=None)` | Queryable at literal key expression | + +#### Publishers | Method | Description | |---|---| @@ -120,5 +168,10 @@ schema_qbl = declare_schema_queryable( ### `ProtoSubscriber` / `RawSubscriber` -Both support `for` iteration. `RawSubscriber` also exposes `recv()` for -direct use and yields `zenoh.Sample` objects directly. +Both support `for` iteration. `RawSubscriber` yields `bytes` directly. + +## Requirements + +- Python 3.10+ +- `eclipse-zenoh >= 1.7, < 2` +- `protobuf >= 4.0` diff --git a/python-sdk/bubbaloop_sdk/__init__.py b/python-sdk/bubbaloop_sdk/__init__.py index d269022..f495fde 100644 --- a/python-sdk/bubbaloop_sdk/__init__.py +++ b/python-sdk/bubbaloop_sdk/__init__.py @@ -10,11 +10,18 @@ from .decode_sample import ProtoDecoder from .discover import NodeInfo, discover_nodes from .get_sample import GetSampleTimeout, get_sample -from .publisher import JsonPublisher, ProtoPublisher, RawPublisher -from .subscriber import ProtoSubscriber, RawSubscriber from .node import run_node +from .publisher import JsonPublisher, ProtoPublisher, RawPublisher +from .subscriber import ( + CallbackSubscriber, + ProtoSubscriber, + Queryable, + RawCallbackSubscriber, + RawSubscriber, +) __all__ = [ + "CallbackSubscriber", "GetSampleTimeout", "JsonPublisher", "NodeContext", @@ -22,6 +29,8 @@ "ProtoDecoder", "ProtoPublisher", "ProtoSubscriber", + "Queryable", + "RawCallbackSubscriber", "RawPublisher", "RawSubscriber", "discover_nodes", diff --git a/python-sdk/bubbaloop_sdk/context.py b/python-sdk/bubbaloop_sdk/context.py index 461666c..f330801 100644 --- a/python-sdk/bubbaloop_sdk/context.py +++ b/python-sdk/bubbaloop_sdk/context.py @@ -13,13 +13,27 @@ msg = sub.recv() # auto-decoded: dict, proto, or bytes """ +from __future__ import annotations + import os import signal import socket import threading +from collections.abc import Callable +from typing import TYPE_CHECKING, Any import zenoh +if TYPE_CHECKING: + from .publisher import JsonPublisher, ProtoPublisher, RawPublisher + from .subscriber import ( + CallbackSubscriber, + ProtoSubscriber, + Queryable, + RawCallbackSubscriber, + RawSubscriber, + ) + def _hostname() -> str: return socket.gethostname().replace("-", "_") @@ -49,7 +63,7 @@ def connect( cls, endpoint: str | None = None, instance_name: str | None = None, - ) -> "NodeContext": + ) -> NodeContext: """Connect to a Zenoh router and return a ready NodeContext. Endpoint resolution: ``endpoint`` arg → ``BUBBALOOP_ZENOH_ENDPOINT`` env @@ -109,18 +123,20 @@ def wait_shutdown(self) -> None: # Publishers # ------------------------------------------------------------------ - def publisher_json(self, suffix: str) -> "JsonPublisher": + def publisher_json(self, suffix: str) -> JsonPublisher: """Declare a JSON publisher at ``topic(suffix)``.""" from .publisher import JsonPublisher + return JsonPublisher._declare(self.session, self.topic(suffix)) - def publisher_proto(self, suffix: str, msg_class=None) -> "ProtoPublisher": + def publisher_proto(self, suffix: str, msg_class: type | None = None) -> ProtoPublisher: """Declare a protobuf publisher at ``topic(suffix)``.""" from .publisher import ProtoPublisher + type_name = msg_class.DESCRIPTOR.full_name if msg_class is not None else None return ProtoPublisher._declare(self.session, self.topic(suffix), type_name) - def publisher_raw(self, suffix: str, local: bool = False) -> "RawPublisher": + def publisher_raw(self, suffix: str, local: bool = False) -> RawPublisher: """Declare a raw publisher with no encoding. When ``local=True``, publishes to ``local/{machine_id}/{suffix}`` with @@ -128,13 +144,14 @@ def publisher_raw(self, suffix: str, local: bool = False) -> "RawPublisher": SHM buffer instead of dropping frames. Never crosses the bridge. """ from .publisher import RawPublisher + return RawPublisher._declare(self.session, self._resolve_topic(suffix, local), local=local) # ------------------------------------------------------------------ # Subscribers # ------------------------------------------------------------------ - def subscribe(self, suffix: str, local: bool = False) -> "ProtoSubscriber": + def subscribe(self, suffix: str, local: bool = False) -> ProtoSubscriber: """Declare a subscriber that auto-decodes every message by its encoding. - ``application/protobuf;`` → decoded proto object (schema fetched on demand) @@ -157,11 +174,12 @@ def subscribe(self, suffix: str, local: bool = False) -> "ProtoSubscriber": """ from .schema_registry import SchemaRegistry from .subscriber import ProtoSubscriber - if not hasattr(self, '_schema_registry'): + + if not hasattr(self, "_schema_registry"): self._schema_registry = SchemaRegistry(self.session) return ProtoSubscriber(self.session, self._resolve_topic(suffix, local), self._schema_registry) - def subscribe_raw(self, suffix: str, local: bool = False) -> "RawSubscriber": + def subscribe_raw(self, suffix: str, local: bool = False) -> RawSubscriber: """Declare a subscriber that yields raw ``bytes`` with no decoding. Use when you need direct access to the payload — e.g. to pass to @@ -170,8 +188,88 @@ def subscribe_raw(self, suffix: str, local: bool = False) -> "RawSubscriber": When ``local=True``, subscribes to the SHM-only local topic. """ from .subscriber import RawSubscriber + return RawSubscriber(self.session, self._resolve_topic(suffix, local)) + # ------------------------------------------------------------------ + # Callback Subscribers + # ------------------------------------------------------------------ + + def subscriber_callback( + self, suffix: str, handler: Callable[[Any], None], max_workers: int | None = None + ) -> CallbackSubscriber: + """Callback subscriber at ``topic(suffix)`` with auto-decode. + + ``handler`` receives auto-decoded messages (proto, dict, or bytes). + + By default the handler runs on Zenoh's internal thread (fast path). + Pass ``max_workers`` to run the handler in a thread pool instead — + use this when the handler does slow work (DB writes, HTTP calls). + """ + from .schema_registry import SchemaRegistry + from .subscriber import CallbackSubscriber + + if not hasattr(self, "_schema_registry"): + self._schema_registry = SchemaRegistry(self.session) + return CallbackSubscriber(self.session, self.topic(suffix), handler, self._schema_registry, max_workers) + + def subscriber_raw_callback( + self, key_expr: str, handler: Callable[[zenoh.Sample], None], max_workers: int | None = None + ) -> RawCallbackSubscriber: + """Callback subscriber at a literal key expression. + + ``handler`` receives raw ``zenoh.Sample`` objects. + + By default the handler runs on Zenoh's internal thread. Pass + ``max_workers`` to run the handler in a thread pool instead. + """ + from .subscriber import RawCallbackSubscriber + + return RawCallbackSubscriber(self.session, key_expr, handler, max_workers) + + # ------------------------------------------------------------------ + # Queryables + # ------------------------------------------------------------------ + + def queryable( + self, suffix: str, handler: Callable[[zenoh.Query], None], max_workers: int | None = None + ) -> Queryable: + """Declare a queryable at ``topic(suffix)``. + + ``handler`` receives a ``zenoh.Query``. Use the standard zenoh API to reply:: + + def on_command(query: zenoh.Query) -> None: + result = process(query.payload.to_string()) + query.reply(query.key_expr, json.dumps(result).encode()) + + qbl = ctx.queryable("command", on_command) + + By default the handler runs on Zenoh's internal thread. Pass + ``max_workers`` to run the handler in a thread pool instead. + + Call ``undeclare()`` on the returned queryable when done. + """ + from .subscriber import Queryable + + return Queryable(self.session, self.topic(suffix), handler, max_workers) + + def queryable_raw( + self, key_expr: str, handler: Callable[[zenoh.Query], None], max_workers: int | None = None + ) -> Queryable: + """Declare a queryable at a literal key expression (no topic prefix). + + Use for wildcard queryables or when the ``bubbaloop/global/{machine_id}/`` + prefix does not apply (e.g. ``bubbaloop/**/schema`` for multi-schema serving). + + By default the handler runs on Zenoh's internal thread. Pass + ``max_workers`` to run the handler in a thread pool instead. + + Call ``undeclare()`` on the returned queryable when done. + """ + from .subscriber import Queryable + + return Queryable(self.session, key_expr, handler, max_workers) + # ------------------------------------------------------------------ # Cleanup # ------------------------------------------------------------------ diff --git a/python-sdk/bubbaloop_sdk/decode_sample.py b/python-sdk/bubbaloop_sdk/decode_sample.py index c91121f..3f2f194 100644 --- a/python-sdk/bubbaloop_sdk/decode_sample.py +++ b/python-sdk/bubbaloop_sdk/decode_sample.py @@ -32,7 +32,8 @@ def _get_proto_class(factory, descriptor): # type: ignore[no-untyped-def] return factory.GetPrototype(descriptor) -from google.protobuf import descriptor_pb2, descriptor_pool, message_factory as _message_factory +from google.protobuf import descriptor_pb2, descriptor_pool +from google.protobuf import message_factory as _message_factory from google.protobuf.json_format import MessageToDict diff --git a/python-sdk/bubbaloop_sdk/get_sample.py b/python-sdk/bubbaloop_sdk/get_sample.py index 43c06e3..aef5125 100644 --- a/python-sdk/bubbaloop_sdk/get_sample.py +++ b/python-sdk/bubbaloop_sdk/get_sample.py @@ -44,7 +44,7 @@ def _handler(sample: zenoh.Sample) -> None: sub = session.declare_subscriber(key_expr, _handler) try: return await asyncio.wait_for(future, timeout=timeout) - except asyncio.TimeoutError: - raise GetSampleTimeout(key_expr, timeout) + except asyncio.TimeoutError as err: + raise GetSampleTimeout(key_expr, timeout) from err finally: sub.undeclare() diff --git a/python-sdk/bubbaloop_sdk/health.py b/python-sdk/bubbaloop_sdk/health.py index f679735..1d7120c 100644 --- a/python-sdk/bubbaloop_sdk/health.py +++ b/python-sdk/bubbaloop_sdk/health.py @@ -1,7 +1,6 @@ """Background health heartbeat thread.""" import threading -import time import zenoh diff --git a/python-sdk/bubbaloop_sdk/node.py b/python-sdk/bubbaloop_sdk/node.py index d5081b6..a4f34a5 100644 --- a/python-sdk/bubbaloop_sdk/node.py +++ b/python-sdk/bubbaloop_sdk/node.py @@ -15,8 +15,6 @@ import argparse import logging -import os -import time import yaml @@ -53,15 +51,18 @@ def run_node(node_class) -> None: ctx = NodeContext.connect(endpoint=args.endpoint, instance_name=instance_name) - start_health_heartbeat(ctx.session, ctx.machine_id, instance_name, ctx._shutdown) - log.info("Health heartbeat: bubbaloop/global/%s/%s/health", ctx.machine_id, instance_name) - - node = node_class(ctx, config) - log.info("Initialized. Running…") + heartbeat = None try: + node = node_class(ctx, config) + log.info("Initialized. Running…") + heartbeat = start_health_heartbeat(ctx.session, ctx.machine_id, instance_name, ctx._shutdown) + log.info("Health heartbeat: bubbaloop/global/%s/%s/health", ctx.machine_id, instance_name) node.run() except KeyboardInterrupt: pass finally: + ctx._shutdown.set() # stop heartbeat before closing session + if heartbeat is not None: + heartbeat.join(timeout=1.0) ctx.close() log.info("Shutdown complete") diff --git a/python-sdk/bubbaloop_sdk/schema_registry.py b/python-sdk/bubbaloop_sdk/schema_registry.py index 3455a38..5e7fde3 100644 --- a/python-sdk/bubbaloop_sdk/schema_registry.py +++ b/python-sdk/bubbaloop_sdk/schema_registry.py @@ -54,7 +54,7 @@ def decode(self, sample: zenoh.Sample) -> object: if not encoding.startswith(_PROTO_PREFIX): return payload - type_name = encoding[len(_PROTO_PREFIX):] + type_name = encoding[len(_PROTO_PREFIX) :] msg_class = self._resolve(type_name) if msg_class is None: log.debug("SchemaRegistry: no class for %s, returning raw bytes", type_name) diff --git a/python-sdk/bubbaloop_sdk/subscriber.py b/python-sdk/bubbaloop_sdk/subscriber.py index 3c9ac96..22875fa 100644 --- a/python-sdk/bubbaloop_sdk/subscriber.py +++ b/python-sdk/bubbaloop_sdk/subscriber.py @@ -1,7 +1,17 @@ -"""Blocking Zenoh subscribers.""" +"""Zenoh subscribers — blocking and callback-based.""" + +from __future__ import annotations + +import concurrent.futures +import threading +from collections.abc import Callable +from typing import TYPE_CHECKING, Any import zenoh +if TYPE_CHECKING: + from .schema_registry import SchemaRegistry + class _BaseSubscriber: """Shared iterator protocol and cleanup for all subscriber types.""" @@ -55,7 +65,7 @@ class ProtoSubscriber(_BaseSubscriber): tensor = torch.frombuffer(msg.data, dtype=torch.uint8) """ - def __init__(self, session: zenoh.Session, topic: str, registry): + def __init__(self, session: zenoh.Session, topic: str, registry: SchemaRegistry): super().__init__(session, topic) self._registry = registry @@ -77,9 +87,188 @@ class RawSubscriber(_BaseSubscriber): sub = ctx.subscribe_raw("camera/raw", local=True) for raw_bytes in sub: tensor = torch.frombuffer(raw_bytes, dtype=torch.uint8) + """ def recv(self) -> bytes: """Block until the next frame arrives and return the raw bytes.""" sample = self._sub.recv() return bytes(sample.payload) + + +class CallbackSubscriber(_BaseSubscriber): + """Callback-based subscriber with auto-decode via SchemaRegistry. + + ``handler`` receives a decoded message: protobuf object, ``dict`` (JSON), + or raw ``bytes`` — determined automatically by the sample's encoding header. + + By default the handler runs on Zenoh's internal thread (fast path). Pass + ``max_workers`` to run the handler in a ``ThreadPoolExecutor`` instead — + use this when the handler does slow work (DB writes, HTTP calls, hardware I/O). + + Args: + session: Active Zenoh session. + topic: Key expression to subscribe to. + handler: Callable invoked with each decoded message. + registry: SchemaRegistry for auto-decoding samples by encoding header. + max_workers: If None (default), handler runs on Zenoh's thread. If int, + handler runs in a ThreadPoolExecutor with that many threads. + + Call ``undeclare()`` when done to stop receiving samples. + """ + + def __init__( + self, + session: zenoh.Session, + topic: str, + handler: Callable[[Any], None], + registry: SchemaRegistry, + max_workers: int | None = None, + ): + self._executor: concurrent.futures.ThreadPoolExecutor | None = None + self._closing: threading.Event | None = None + + if max_workers is not None: + self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) + self._closing = threading.Event() + + def _wrap(sample: zenoh.Sample) -> None: + if self._closing.is_set(): # type: ignore[union-attr] + return + try: + self._executor.submit(lambda s=sample: handler(registry.decode(s))) # type: ignore[union-attr] + except RuntimeError: + pass # executor already shut down — drop the message + + self._sub = session.declare_subscriber(topic, _wrap) + else: + self._sub = session.declare_subscriber(topic, lambda sample: handler(registry.decode(sample))) + self._undeclared = False + + def undeclare(self) -> None: + """Undeclare the subscriber and shutdown the thread pool (if any). Idempotent.""" + if self._undeclared: + return + if self._closing is not None: + self._closing.set() + super().undeclare() + if self._executor is not None: + self._executor.shutdown(wait=False, cancel_futures=True) + + +class RawCallbackSubscriber(_BaseSubscriber): + """Callback-based subscriber that passes raw ``zenoh.Sample`` to the handler. + + Use when you need access to the full sample metadata (key_expr, encoding, + timestamp). + + By default the handler runs on Zenoh's internal thread. Pass ``max_workers`` + to run the handler in a ``ThreadPoolExecutor`` instead. + + Args: + session: Active Zenoh session. + key_expr: Literal key expression to subscribe to. + handler: Callable invoked with each ``zenoh.Sample``. + max_workers: If None (default), handler runs on Zenoh's thread. If int, + handler runs in a ThreadPoolExecutor with that many threads. + + Call ``undeclare()`` when done to stop receiving samples. + """ + + def __init__( + self, + session: zenoh.Session, + key_expr: str, + handler: Callable[[zenoh.Sample], None], + max_workers: int | None = None, + ): + self._executor: concurrent.futures.ThreadPoolExecutor | None = None + self._closing: threading.Event | None = None + + if max_workers is not None: + self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) + self._closing = threading.Event() + + def _wrap(sample: zenoh.Sample) -> None: + if self._closing.is_set(): # type: ignore[union-attr] + return + try: + self._executor.submit(handler, sample) # type: ignore[union-attr] + except RuntimeError: + pass # executor already shut down — drop the message + + self._sub = session.declare_subscriber(key_expr, _wrap) + else: + self._sub = session.declare_subscriber(key_expr, handler) + self._undeclared = False + + def undeclare(self) -> None: + """Undeclare the subscriber and shutdown the thread pool (if any). Idempotent.""" + if self._undeclared: + return + if self._closing is not None: + self._closing.set() + super().undeclare() + if self._executor is not None: + self._executor.shutdown(wait=False, cancel_futures=True) + + +class Queryable: + """Queryable that responds to Zenoh GET requests. + + ``handler`` receives a ``zenoh.Query`` and must call ``query.reply()`` to respond. + + By default the handler runs on Zenoh's internal thread (fast path). Pass + ``max_workers`` to run the handler in a ``ThreadPoolExecutor`` instead — + use this when the handler does slow work (DB reads, hardware access, HTTP calls). + + **Important:** do NOT pass ``complete=True`` to the underlying queryable — + it blocks wildcard queries like ``bubbaloop/**/schema`` used by the dashboard. + + Args: + session: Active Zenoh session. + key_expr: Key expression to declare the queryable on. + handler: Callable invoked with each ``zenoh.Query``. + max_workers: If None (default), handler runs on Zenoh's thread. If int, + handler runs in a ThreadPoolExecutor with that many threads. + + Call ``undeclare()`` when done to stop receiving queries. + """ + + def __init__( + self, + session: zenoh.Session, + key_expr: str, + handler: Callable[[zenoh.Query], None], + max_workers: int | None = None, + ): + self._executor: concurrent.futures.ThreadPoolExecutor | None = None + self._closing: threading.Event | None = None + self._undeclared = False + + if max_workers is not None: + self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) + self._closing = threading.Event() + + def _wrap(query: zenoh.Query) -> None: + if self._closing.is_set(): # type: ignore[union-attr] + return + try: + self._executor.submit(handler, query) # type: ignore[union-attr] + except RuntimeError: + pass # executor already shut down — drop the query + + self._qbl = session.declare_queryable(key_expr, _wrap) + else: + self._qbl = session.declare_queryable(key_expr, handler) + + def undeclare(self) -> None: + """Undeclare the queryable and shutdown the thread pool (if any). Idempotent.""" + if self._undeclared: + return + self._undeclared = True + if self._closing is not None: + self._closing.set() + self._qbl.undeclare() + if self._executor is not None: + self._executor.shutdown(wait=False, cancel_futures=True) diff --git a/python-sdk/pixi.lock b/python-sdk/pixi.lock new file mode 100644 index 0000000..8903e7d --- /dev/null +++ b/python-sdk/pixi.lock @@ -0,0 +1,742 @@ +version: 6 +environments: + default: + channels: + - url: https://conda.anaconda.org/conda-forge/ + indexes: + - https://pypi.org/simple + options: + pypi-prerelease-mode: if-necessary-or-explicit + packages: + linux-64: + - conda: https://conda.anaconda.org/conda-forge/linux-64/_openmp_mutex-4.5-20_gnu.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/bzip2-1.0.8-hda65f42_9.conda + - conda: https://conda.anaconda.org/conda-forge/noarch/ca-certificates-2026.2.25-hbd8a1cb_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/icu-78.3-h33c6efd_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/ld_impl_linux-64-2.45.1-default_hbd61a6d_102.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/libexpat-2.7.5-hecca717_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/libffi-3.5.2-h3435931_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/libgcc-15.2.0-he0feb66_18.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/libgomp-15.2.0-he0feb66_18.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/liblzma-5.8.2-hb03c661_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/libmpdec-4.0.0-hb03c661_1.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/libsqlite-3.52.0-hf4e2dac_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/libstdcxx-15.2.0-h934c35e_18.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/libuuid-2.42-h5347b49_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/libzlib-1.3.2-h25fd6f3_2.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/ncurses-6.5-h2d0b736_3.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/openssl-3.6.1-h35e630c_1.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/python-3.14.3-h32b2ec7_101_cp314.conda + - conda: https://conda.anaconda.org/conda-forge/noarch/python_abi-3.14-8_cp314.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/readline-8.3-h853b02a_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/tk-8.6.13-noxft_h366c992_103.conda + - conda: https://conda.anaconda.org/conda-forge/noarch/tzdata-2025c-hc9c84f9_1.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/zstd-1.5.7-hb78ec9c_6.conda + - pypi: https://files.pythonhosted.org/packages/22/e5/06b1f88f42a5a99df42ce61208bdec3bddb3d261412874280a19796fc09c/coverage-7.13.5-cp314-cp314-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl + - pypi: https://files.pythonhosted.org/packages/c6/f9/22883e613eb193f8f956e8e96d8f16e39b369dac4ade7aa3b37f344ddc62/eclipse_zenoh-1.8.0.tar.gz + - pypi: https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/b7/b9/c538f279a4e237a006a2c98387d081e9eb060d203d8ed34467cc0f0b9b53/packaging-26.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/53/1b/3b431694a4dc6d37b9f653f0c64b0a0d9ec074ee810710c0c3da21d67ba7/protobuf-7.34.1-cp310-abi3-manylinux2014_x86_64.whl + - pypi: https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/9d/7a/d968e294073affff457b041c2be9868a40c1c71f4a35fcc1e45e5493067b/pytest_cov-7.1.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/88/f9/16491d7ed2a919954993e48aa941b200f38040928474c9e85ea9e64222c3/pyyaml-6.0.3-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl + - pypi: https://files.pythonhosted.org/packages/ff/6b/a1548ac378a78332a4c3dcf4a134c2475a36d2a22ddfa272acd574140b50/ruff-0.15.9-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl + - pypi: ./ + linux-aarch64: + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/_openmp_mutex-4.5-20_gnu.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/bzip2-1.0.8-h4777abc_9.conda + - conda: https://conda.anaconda.org/conda-forge/noarch/ca-certificates-2026.2.25-hbd8a1cb_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/icu-78.3-hcab7f73_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/ld_impl_linux-aarch64-2.45.1-default_h1979696_102.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libexpat-2.7.5-hfae3067_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libffi-3.5.2-h376a255_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libgcc-15.2.0-h8acb6b2_18.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libgomp-15.2.0-h8acb6b2_18.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/liblzma-5.8.2-he30d5cf_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libmpdec-4.0.0-he30d5cf_1.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libsqlite-3.52.0-h10b116e_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libstdcxx-15.2.0-hef695bb_18.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libuuid-2.42-h1022ec0_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libzlib-1.3.2-hdc9db2a_2.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/ncurses-6.5-ha32ae93_3.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/openssl-3.6.1-h546c87b_1.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/python-3.14.3-hb06a95a_101_cp314.conda + - conda: https://conda.anaconda.org/conda-forge/noarch/python_abi-3.14-8_cp314.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/readline-8.3-hb682ff5_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/tk-8.6.13-noxft_h0dc03b3_103.conda + - conda: https://conda.anaconda.org/conda-forge/noarch/tzdata-2025c-hc9c84f9_1.conda + - conda: https://conda.anaconda.org/conda-forge/linux-aarch64/zstd-1.5.7-h85ac4a6_6.conda + - pypi: https://files.pythonhosted.org/packages/80/28/2a148a51e5907e504fa7b85490277734e6771d8844ebcc48764a15e28155/coverage-7.13.5-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl + - pypi: https://files.pythonhosted.org/packages/cf/ba/7bb452da75a6c3d40d512112e90aa9942996466051ebfb038c6dc41ed302/eclipse_zenoh-1.8.0-cp39-abi3-manylinux_2_28_aarch64.whl + - pypi: https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/b7/b9/c538f279a4e237a006a2c98387d081e9eb060d203d8ed34467cc0f0b9b53/packaging-26.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/eb/9d/aa69df2724ff63efa6f72307b483ce0827f4347cc6d6df24b59e26659fef/protobuf-7.34.1-cp310-abi3-manylinux2014_aarch64.whl + - pypi: https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/9d/7a/d968e294073affff457b041c2be9868a40c1c71f4a35fcc1e45e5493067b/pytest_cov-7.1.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/92/b5/47e807c2623074914e29dabd16cbbdd4bf5e9b2db9f8090fa64411fc5382/pyyaml-6.0.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl + - pypi: https://files.pythonhosted.org/packages/48/11/690d75f3fd6278fe55fff7c9eb429c92d207e14b25d1cae4064a32677029/ruff-0.15.9-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl + - pypi: ./ +packages: +- conda: https://conda.anaconda.org/conda-forge/linux-64/_openmp_mutex-4.5-20_gnu.conda + build_number: 20 + sha256: 1dd3fffd892081df9726d7eb7e0dea6198962ba775bd88842135a4ddb4deb3c9 + md5: a9f577daf3de00bca7c3c76c0ecbd1de + depends: + - __glibc >=2.17,<3.0.a0 + - libgomp >=7.5.0 + constrains: + - openmp_impl <0.0a0 + license: BSD-3-Clause + license_family: BSD + purls: [] + size: 28948 + timestamp: 1770939786096 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/_openmp_mutex-4.5-20_gnu.conda + build_number: 20 + sha256: a2527b1d81792a0ccd2c05850960df119c2b6d8f5fdec97f2db7d25dc23b1068 + md5: 468fd3bb9e1f671d36c2cbc677e56f1d + depends: + - libgomp >=7.5.0 + constrains: + - openmp_impl <0.0a0 + license: BSD-3-Clause + license_family: BSD + purls: [] + size: 28926 + timestamp: 1770939656741 +- pypi: ./ + name: bubbaloop-sdk + version: 0.1.0 + sha256: cd379756248058d15ec268b0a35a5292df0f5ec4414162c979070b174874047b + requires_dist: + - eclipse-zenoh>=1.7,<2 + - protobuf>=4.0 + - pyyaml>=6.0 + - pytest ; extra == 'dev' + - pytest-asyncio ; extra == 'dev' + - pytest-cov ; extra == 'dev' + - ruff ; extra == 'dev' + requires_python: '>=3.10' +- conda: https://conda.anaconda.org/conda-forge/linux-64/bzip2-1.0.8-hda65f42_9.conda + sha256: 0b75d45f0bba3e95dc693336fa51f40ea28c980131fec438afb7ce6118ed05f6 + md5: d2ffd7602c02f2b316fd921d39876885 + depends: + - __glibc >=2.17,<3.0.a0 + - libgcc >=14 + license: bzip2-1.0.6 + license_family: BSD + purls: [] + size: 260182 + timestamp: 1771350215188 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/bzip2-1.0.8-h4777abc_9.conda + sha256: b3495077889dde6bb370938e7db82be545c73e8589696ad0843a32221520ad4c + md5: 840d8fc0d7b3209be93080bc20e07f2d + depends: + - libgcc >=14 + license: bzip2-1.0.6 + license_family: BSD + purls: [] + size: 192412 + timestamp: 1771350241232 +- conda: https://conda.anaconda.org/conda-forge/noarch/ca-certificates-2026.2.25-hbd8a1cb_0.conda + sha256: 67cc7101b36421c5913a1687ef1b99f85b5d6868da3abbf6ec1a4181e79782fc + md5: 4492fd26db29495f0ba23f146cd5638d + depends: + - __unix + license: ISC + purls: [] + size: 147413 + timestamp: 1772006283803 +- pypi: https://files.pythonhosted.org/packages/22/e5/06b1f88f42a5a99df42ce61208bdec3bddb3d261412874280a19796fc09c/coverage-7.13.5-cp314-cp314-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl + name: coverage + version: 7.13.5 + sha256: 6c36ddb64ed9d7e496028d1d00dfec3e428e0aabf4006583bb1839958d280510 + requires_dist: + - tomli ; python_full_version <= '3.11' and extra == 'toml' + requires_python: '>=3.10' +- pypi: https://files.pythonhosted.org/packages/80/28/2a148a51e5907e504fa7b85490277734e6771d8844ebcc48764a15e28155/coverage-7.13.5-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl + name: coverage + version: 7.13.5 + sha256: 380e8e9084d8eb38db3a9176a1a4f3c0082c3806fa0dc882d1d87abc3c789247 + requires_dist: + - tomli ; python_full_version <= '3.11' and extra == 'toml' + requires_python: '>=3.10' +- pypi: https://files.pythonhosted.org/packages/c6/f9/22883e613eb193f8f956e8e96d8f16e39b369dac4ade7aa3b37f344ddc62/eclipse_zenoh-1.8.0.tar.gz + name: eclipse-zenoh + version: 1.8.0 + sha256: 1cb0b8abdc522d58497c0cd7b8c8e7791f39d2c189c5e0bc80da8840af0ce24d + requires_python: '>=3.8' +- pypi: https://files.pythonhosted.org/packages/cf/ba/7bb452da75a6c3d40d512112e90aa9942996466051ebfb038c6dc41ed302/eclipse_zenoh-1.8.0-cp39-abi3-manylinux_2_28_aarch64.whl + name: eclipse-zenoh + version: 1.8.0 + sha256: 1aca875fd5aa38284cf7161964241a73b4e4090a48385c35a6d8e6169cc8e88a + requires_python: '>=3.8' +- conda: https://conda.anaconda.org/conda-forge/linux-64/icu-78.3-h33c6efd_0.conda + sha256: fbf86c4a59c2ed05bbffb2ba25c7ed94f6185ec30ecb691615d42342baa1a16a + md5: c80d8a3b84358cb967fa81e7075fbc8a + depends: + - __glibc >=2.17,<3.0.a0 + - libgcc >=14 + - libstdcxx >=14 + license: MIT + license_family: MIT + purls: [] + size: 12723451 + timestamp: 1773822285671 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/icu-78.3-hcab7f73_0.conda + sha256: 49ba6aed2c6b482bb0ba41078057555d29764299bc947b990708617712ef6406 + md5: 546da38c2fa9efacf203e2ad3f987c59 + depends: + - libgcc >=14 + - libstdcxx >=14 + license: MIT + license_family: MIT + purls: [] + size: 12837286 + timestamp: 1773822650615 +- pypi: https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl + name: iniconfig + version: 2.3.0 + sha256: f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12 + requires_python: '>=3.10' +- conda: https://conda.anaconda.org/conda-forge/linux-64/ld_impl_linux-64-2.45.1-default_hbd61a6d_102.conda + sha256: 3d584956604909ff5df353767f3a2a2f60e07d070b328d109f30ac40cd62df6c + md5: 18335a698559cdbcd86150a48bf54ba6 + depends: + - __glibc >=2.17,<3.0.a0 + - zstd >=1.5.7,<1.6.0a0 + constrains: + - binutils_impl_linux-64 2.45.1 + license: GPL-3.0-only + license_family: GPL + purls: [] + size: 728002 + timestamp: 1774197446916 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/ld_impl_linux-aarch64-2.45.1-default_h1979696_102.conda + sha256: 7abd913d81a9bf00abb699e8987966baa2065f5132e37e815f92d90fc6bba530 + md5: a21644fc4a83da26452a718dc9468d5f + depends: + - zstd >=1.5.7,<1.6.0a0 + constrains: + - binutils_impl_linux-aarch64 2.45.1 + license: GPL-3.0-only + license_family: GPL + purls: [] + size: 875596 + timestamp: 1774197520746 +- conda: https://conda.anaconda.org/conda-forge/linux-64/libexpat-2.7.5-hecca717_0.conda + sha256: e8c2b57f6aacabdf2f1b0924bd4831ce5071ba080baa4a9e8c0d720588b6794c + md5: 49f570f3bc4c874a06ea69b7225753af + depends: + - __glibc >=2.17,<3.0.a0 + - libgcc >=14 + constrains: + - expat 2.7.5.* + license: MIT + license_family: MIT + purls: [] + size: 76624 + timestamp: 1774719175983 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libexpat-2.7.5-hfae3067_0.conda + sha256: 6d438fc0bfdb263c24654fe49c09b31f06ec78eb709eb386392d2499af105f85 + md5: 05d1e0b30acd816a192c03dc6e164f4d + depends: + - libgcc >=14 + constrains: + - expat 2.7.5.* + license: MIT + license_family: MIT + purls: [] + size: 76523 + timestamp: 1774719129371 +- conda: https://conda.anaconda.org/conda-forge/linux-64/libffi-3.5.2-h3435931_0.conda + sha256: 31f19b6a88ce40ebc0d5a992c131f57d919f73c0b92cd1617a5bec83f6e961e6 + md5: a360c33a5abe61c07959e449fa1453eb + depends: + - __glibc >=2.17,<3.0.a0 + - libgcc >=14 + license: MIT + license_family: MIT + purls: [] + size: 58592 + timestamp: 1769456073053 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libffi-3.5.2-h376a255_0.conda + sha256: 3df4c539449aabc3443bbe8c492c01d401eea894603087fca2917aa4e1c2dea9 + md5: 2f364feefb6a7c00423e80dcb12db62a + depends: + - libgcc >=14 + license: MIT + license_family: MIT + purls: [] + size: 55952 + timestamp: 1769456078358 +- conda: https://conda.anaconda.org/conda-forge/linux-64/libgcc-15.2.0-he0feb66_18.conda + sha256: faf7d2017b4d718951e3a59d081eb09759152f93038479b768e3d612688f83f5 + md5: 0aa00f03f9e39fb9876085dee11a85d4 + depends: + - __glibc >=2.17,<3.0.a0 + - _openmp_mutex >=4.5 + constrains: + - libgcc-ng ==15.2.0=*_18 + - libgomp 15.2.0 he0feb66_18 + license: GPL-3.0-only WITH GCC-exception-3.1 + license_family: GPL + purls: [] + size: 1041788 + timestamp: 1771378212382 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libgcc-15.2.0-h8acb6b2_18.conda + sha256: 43df385bedc1cab11993c4369e1f3b04b4ca5d0ea16cba6a0e7f18dbc129fcc9 + md5: 552567ea2b61e3a3035759b2fdb3f9a6 + depends: + - _openmp_mutex >=4.5 + constrains: + - libgcc-ng ==15.2.0=*_18 + - libgomp 15.2.0 h8acb6b2_18 + license: GPL-3.0-only WITH GCC-exception-3.1 + license_family: GPL + purls: [] + size: 622900 + timestamp: 1771378128706 +- conda: https://conda.anaconda.org/conda-forge/linux-64/libgomp-15.2.0-he0feb66_18.conda + sha256: 21337ab58e5e0649d869ab168d4e609b033509de22521de1bfed0c031bfc5110 + md5: 239c5e9546c38a1e884d69effcf4c882 + depends: + - __glibc >=2.17,<3.0.a0 + license: GPL-3.0-only WITH GCC-exception-3.1 + license_family: GPL + purls: [] + size: 603262 + timestamp: 1771378117851 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libgomp-15.2.0-h8acb6b2_18.conda + sha256: fc716f11a6a8525e27a5d332ef6a689210b0d2a4dd1133edc0f530659aa9faa6 + md5: 4faa39bf919939602e594253bd673958 + license: GPL-3.0-only WITH GCC-exception-3.1 + license_family: GPL + purls: [] + size: 588060 + timestamp: 1771378040807 +- conda: https://conda.anaconda.org/conda-forge/linux-64/liblzma-5.8.2-hb03c661_0.conda + sha256: 755c55ebab181d678c12e49cced893598f2bab22d582fbbf4d8b83c18be207eb + md5: c7c83eecbb72d88b940c249af56c8b17 + depends: + - __glibc >=2.17,<3.0.a0 + - libgcc >=14 + constrains: + - xz 5.8.2.* + license: 0BSD + purls: [] + size: 113207 + timestamp: 1768752626120 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/liblzma-5.8.2-he30d5cf_0.conda + sha256: 843c46e20519651a3e357a8928352b16c5b94f4cd3d5481acc48be2e93e8f6a3 + md5: 96944e3c92386a12755b94619bae0b35 + depends: + - libgcc >=14 + constrains: + - xz 5.8.2.* + license: 0BSD + purls: [] + size: 125916 + timestamp: 1768754941722 +- conda: https://conda.anaconda.org/conda-forge/linux-64/libmpdec-4.0.0-hb03c661_1.conda + sha256: fe171ed5cf5959993d43ff72de7596e8ac2853e9021dec0344e583734f1e0843 + md5: 2c21e66f50753a083cbe6b80f38268fa + depends: + - __glibc >=2.17,<3.0.a0 + - libgcc >=14 + license: BSD-2-Clause + license_family: BSD + purls: [] + size: 92400 + timestamp: 1769482286018 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libmpdec-4.0.0-he30d5cf_1.conda + sha256: 57c0dd12d506e84541c4e877898bd2a59cca141df493d34036f18b2751e0a453 + md5: 7b9813e885482e3ccb1fa212b86d7fd0 + depends: + - libgcc >=14 + license: BSD-2-Clause + license_family: BSD + purls: [] + size: 114056 + timestamp: 1769482343003 +- conda: https://conda.anaconda.org/conda-forge/linux-64/libsqlite-3.52.0-hf4e2dac_0.conda + sha256: d716847b7deca293d2e49ed1c8ab9e4b9e04b9d780aea49a97c26925b28a7993 + md5: fd893f6a3002a635b5e50ceb9dd2c0f4 + depends: + - __glibc >=2.17,<3.0.a0 + - icu >=78.2,<79.0a0 + - libgcc >=14 + - libzlib >=1.3.1,<2.0a0 + license: blessing + purls: [] + size: 951405 + timestamp: 1772818874251 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libsqlite-3.52.0-h10b116e_0.conda + sha256: 1ddaf91b44fae83856276f4cb7ce544ffe41d4b55c1e346b504c6b45f19098d6 + md5: 77891484f18eca74b8ad83694da9815e + depends: + - icu >=78.2,<79.0a0 + - libgcc >=14 + - libzlib >=1.3.1,<2.0a0 + license: blessing + purls: [] + size: 952296 + timestamp: 1772818881550 +- conda: https://conda.anaconda.org/conda-forge/linux-64/libstdcxx-15.2.0-h934c35e_18.conda + sha256: 78668020064fdaa27e9ab65cd2997e2c837b564ab26ce3bf0e58a2ce1a525c6e + md5: 1b08cd684f34175e4514474793d44bcb + depends: + - __glibc >=2.17,<3.0.a0 + - libgcc 15.2.0 he0feb66_18 + constrains: + - libstdcxx-ng ==15.2.0=*_18 + license: GPL-3.0-only WITH GCC-exception-3.1 + license_family: GPL + purls: [] + size: 5852330 + timestamp: 1771378262446 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libstdcxx-15.2.0-hef695bb_18.conda + sha256: 31fdb9ffafad106a213192d8319b9f810e05abca9c5436b60e507afb35a6bc40 + md5: f56573d05e3b735cb03efeb64a15f388 + depends: + - libgcc 15.2.0 h8acb6b2_18 + constrains: + - libstdcxx-ng ==15.2.0=*_18 + license: GPL-3.0-only WITH GCC-exception-3.1 + license_family: GPL + purls: [] + size: 5541411 + timestamp: 1771378162499 +- conda: https://conda.anaconda.org/conda-forge/linux-64/libuuid-2.42-h5347b49_0.conda + sha256: bc1b08c92626c91500fd9f26f2c797f3eb153b627d53e9c13cd167f1e12b2829 + md5: 38ffe67b78c9d4de527be8315e5ada2c + depends: + - __glibc >=2.17,<3.0.a0 + - libgcc >=14 + license: BSD-3-Clause + license_family: BSD + purls: [] + size: 40297 + timestamp: 1775052476770 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libuuid-2.42-h1022ec0_0.conda + sha256: 7d427edf58c702c337bf62bc90f355b7fc374a65fd9f70ea7a490f13bb76b1b9 + md5: a0b5de740d01c390bdbb46d7503c9fab + depends: + - libgcc >=14 + license: BSD-3-Clause + license_family: BSD + purls: [] + size: 43567 + timestamp: 1775052485727 +- conda: https://conda.anaconda.org/conda-forge/linux-64/libzlib-1.3.2-h25fd6f3_2.conda + sha256: 55044c403570f0dc26e6364de4dc5368e5f3fc7ff103e867c487e2b5ab2bcda9 + md5: d87ff7921124eccd67248aa483c23fec + depends: + - __glibc >=2.17,<3.0.a0 + constrains: + - zlib 1.3.2 *_2 + license: Zlib + license_family: Other + purls: [] + size: 63629 + timestamp: 1774072609062 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/libzlib-1.3.2-hdc9db2a_2.conda + sha256: eb111e32e5a7313a5bf799c7fb2419051fa2fe7eff74769fac8d5a448b309f7f + md5: 502006882cf5461adced436e410046d1 + constrains: + - zlib 1.3.2 *_2 + license: Zlib + license_family: Other + purls: [] + size: 69833 + timestamp: 1774072605429 +- conda: https://conda.anaconda.org/conda-forge/linux-64/ncurses-6.5-h2d0b736_3.conda + sha256: 3fde293232fa3fca98635e1167de6b7c7fda83caf24b9d6c91ec9eefb4f4d586 + md5: 47e340acb35de30501a76c7c799c41d7 + depends: + - __glibc >=2.17,<3.0.a0 + - libgcc >=13 + license: X11 AND BSD-3-Clause + purls: [] + size: 891641 + timestamp: 1738195959188 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/ncurses-6.5-ha32ae93_3.conda + sha256: 91cfb655a68b0353b2833521dc919188db3d8a7f4c64bea2c6a7557b24747468 + md5: 182afabe009dc78d8b73100255ee6868 + depends: + - libgcc >=13 + license: X11 AND BSD-3-Clause + purls: [] + size: 926034 + timestamp: 1738196018799 +- conda: https://conda.anaconda.org/conda-forge/linux-64/openssl-3.6.1-h35e630c_1.conda + sha256: 44c877f8af015332a5d12f5ff0fb20ca32f896526a7d0cdb30c769df1144fb5c + md5: f61eb8cd60ff9057122a3d338b99c00f + depends: + - __glibc >=2.17,<3.0.a0 + - ca-certificates + - libgcc >=14 + license: Apache-2.0 + license_family: Apache + purls: [] + size: 3164551 + timestamp: 1769555830639 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/openssl-3.6.1-h546c87b_1.conda + sha256: 7f8048c0e75b2620254218d72b4ae7f14136f1981c5eb555ef61645a9344505f + md5: 25f5885f11e8b1f075bccf4a2da91c60 + depends: + - ca-certificates + - libgcc >=14 + license: Apache-2.0 + license_family: Apache + purls: [] + size: 3692030 + timestamp: 1769557678657 +- pypi: https://files.pythonhosted.org/packages/b7/b9/c538f279a4e237a006a2c98387d081e9eb060d203d8ed34467cc0f0b9b53/packaging-26.0-py3-none-any.whl + name: packaging + version: '26.0' + sha256: b36f1fef9334a5588b4166f8bcd26a14e521f2b55e6b9de3aaa80d3ff7a37529 + requires_python: '>=3.8' +- pypi: https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl + name: pluggy + version: 1.6.0 + sha256: e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746 + requires_dist: + - pre-commit ; extra == 'dev' + - tox ; extra == 'dev' + - pytest ; extra == 'testing' + - pytest-benchmark ; extra == 'testing' + - coverage ; extra == 'testing' + requires_python: '>=3.9' +- pypi: https://files.pythonhosted.org/packages/53/1b/3b431694a4dc6d37b9f653f0c64b0a0d9ec074ee810710c0c3da21d67ba7/protobuf-7.34.1-cp310-abi3-manylinux2014_x86_64.whl + name: protobuf + version: 7.34.1 + sha256: 8ff40ce8cd688f7265326b38d5a1bed9bfdf5e6723d49961432f83e21d5713e4 + requires_python: '>=3.10' +- pypi: https://files.pythonhosted.org/packages/eb/9d/aa69df2724ff63efa6f72307b483ce0827f4347cc6d6df24b59e26659fef/protobuf-7.34.1-cp310-abi3-manylinux2014_aarch64.whl + name: protobuf + version: 7.34.1 + sha256: 5185e0e948d07abe94bb76ec9b8416b604cfe5da6f871d67aad30cbf24c3110b + requires_python: '>=3.10' +- pypi: https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl + name: pygments + version: 2.20.0 + sha256: 81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176 + requires_dist: + - colorama>=0.4.6 ; extra == 'windows-terminal' + requires_python: '>=3.9' +- pypi: https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl + name: pytest + version: 9.0.2 + sha256: 711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b + requires_dist: + - colorama>=0.4 ; sys_platform == 'win32' + - exceptiongroup>=1 ; python_full_version < '3.11' + - iniconfig>=1.0.1 + - packaging>=22 + - pluggy>=1.5,<2 + - pygments>=2.7.2 + - tomli>=1 ; python_full_version < '3.11' + - argcomplete ; extra == 'dev' + - attrs>=19.2 ; extra == 'dev' + - hypothesis>=3.56 ; extra == 'dev' + - mock ; extra == 'dev' + - requests ; extra == 'dev' + - setuptools ; extra == 'dev' + - xmlschema ; extra == 'dev' + requires_python: '>=3.10' +- pypi: https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl + name: pytest-asyncio + version: 1.3.0 + sha256: 611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5 + requires_dist: + - backports-asyncio-runner>=1.1,<2 ; python_full_version < '3.11' + - pytest>=8.2,<10 + - typing-extensions>=4.12 ; python_full_version < '3.13' + - sphinx>=5.3 ; extra == 'docs' + - sphinx-rtd-theme>=1 ; extra == 'docs' + - coverage>=6.2 ; extra == 'testing' + - hypothesis>=5.7.1 ; extra == 'testing' + requires_python: '>=3.10' +- pypi: https://files.pythonhosted.org/packages/9d/7a/d968e294073affff457b041c2be9868a40c1c71f4a35fcc1e45e5493067b/pytest_cov-7.1.0-py3-none-any.whl + name: pytest-cov + version: 7.1.0 + sha256: a0461110b7865f9a271aa1b51e516c9a95de9d696734a2f71e3e78f46e1d4678 + requires_dist: + - coverage[toml]>=7.10.6 + - pluggy>=1.2 + - pytest>=7 + - process-tests ; extra == 'testing' + - pytest-xdist ; extra == 'testing' + - virtualenv ; extra == 'testing' + requires_python: '>=3.9' +- conda: https://conda.anaconda.org/conda-forge/linux-64/python-3.14.3-h32b2ec7_101_cp314.conda + build_number: 101 + sha256: cb0628c5f1732f889f53a877484da98f5a0e0f47326622671396fb4f2b0cd6bd + md5: c014ad06e60441661737121d3eae8a60 + depends: + - __glibc >=2.17,<3.0.a0 + - bzip2 >=1.0.8,<2.0a0 + - ld_impl_linux-64 >=2.36.1 + - libexpat >=2.7.3,<3.0a0 + - libffi >=3.5.2,<3.6.0a0 + - libgcc >=14 + - liblzma >=5.8.2,<6.0a0 + - libmpdec >=4.0.0,<5.0a0 + - libsqlite >=3.51.2,<4.0a0 + - libuuid >=2.41.3,<3.0a0 + - libzlib >=1.3.1,<2.0a0 + - ncurses >=6.5,<7.0a0 + - openssl >=3.5.5,<4.0a0 + - python_abi 3.14.* *_cp314 + - readline >=8.3,<9.0a0 + - tk >=8.6.13,<8.7.0a0 + - tzdata + - zstd >=1.5.7,<1.6.0a0 + license: Python-2.0 + purls: [] + size: 36702440 + timestamp: 1770675584356 + python_site_packages_path: lib/python3.14/site-packages +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/python-3.14.3-hb06a95a_101_cp314.conda + build_number: 101 + sha256: 87e9dff5646aba87cecfbc08789634c855871a7325169299d749040b0923a356 + md5: 205011b36899ff0edf41b3db0eda5a44 + depends: + - bzip2 >=1.0.8,<2.0a0 + - ld_impl_linux-aarch64 >=2.36.1 + - libexpat >=2.7.3,<3.0a0 + - libffi >=3.5.2,<3.6.0a0 + - libgcc >=14 + - liblzma >=5.8.2,<6.0a0 + - libmpdec >=4.0.0,<5.0a0 + - libsqlite >=3.51.2,<4.0a0 + - libuuid >=2.41.3,<3.0a0 + - libzlib >=1.3.1,<2.0a0 + - ncurses >=6.5,<7.0a0 + - openssl >=3.5.5,<4.0a0 + - python_abi 3.14.* *_cp314 + - readline >=8.3,<9.0a0 + - tk >=8.6.13,<8.7.0a0 + - tzdata + - zstd >=1.5.7,<1.6.0a0 + license: Python-2.0 + purls: [] + size: 37305578 + timestamp: 1770674395875 + python_site_packages_path: lib/python3.14/site-packages +- conda: https://conda.anaconda.org/conda-forge/noarch/python_abi-3.14-8_cp314.conda + build_number: 8 + sha256: ad6d2e9ac39751cc0529dd1566a26751a0bf2542adb0c232533d32e176e21db5 + md5: 0539938c55b6b1a59b560e843ad864a4 + constrains: + - python 3.14.* *_cp314 + license: BSD-3-Clause + license_family: BSD + purls: [] + size: 6989 + timestamp: 1752805904792 +- pypi: https://files.pythonhosted.org/packages/88/f9/16491d7ed2a919954993e48aa941b200f38040928474c9e85ea9e64222c3/pyyaml-6.0.3-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl + name: pyyaml + version: 6.0.3 + sha256: c458b6d084f9b935061bc36216e8a69a7e293a2f1e68bf956dcd9e6cbcd143f5 + requires_python: '>=3.8' +- pypi: https://files.pythonhosted.org/packages/92/b5/47e807c2623074914e29dabd16cbbdd4bf5e9b2db9f8090fa64411fc5382/pyyaml-6.0.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl + name: pyyaml + version: 6.0.3 + sha256: 501a031947e3a9025ed4405a168e6ef5ae3126c59f90ce0cd6f2bfc477be31b7 + requires_python: '>=3.8' +- conda: https://conda.anaconda.org/conda-forge/linux-64/readline-8.3-h853b02a_0.conda + sha256: 12ffde5a6f958e285aa22c191ca01bbd3d6e710aa852e00618fa6ddc59149002 + md5: d7d95fc8287ea7bf33e0e7116d2b95ec + depends: + - __glibc >=2.17,<3.0.a0 + - libgcc >=14 + - ncurses >=6.5,<7.0a0 + license: GPL-3.0-only + license_family: GPL + purls: [] + size: 345073 + timestamp: 1765813471974 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/readline-8.3-hb682ff5_0.conda + sha256: fe695f9d215e9a2e3dd0ca7f56435ab4df24f5504b83865e3d295df36e88d216 + md5: 3d49cad61f829f4f0e0611547a9cda12 + depends: + - libgcc >=14 + - ncurses >=6.5,<7.0a0 + license: GPL-3.0-only + license_family: GPL + purls: [] + size: 357597 + timestamp: 1765815673644 +- pypi: https://files.pythonhosted.org/packages/48/11/690d75f3fd6278fe55fff7c9eb429c92d207e14b25d1cae4064a32677029/ruff-0.15.9-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl + name: ruff + version: 0.15.9 + sha256: 9439a342adb8725f32f92732e2bafb6d5246bd7a5021101166b223d312e8fc59 + requires_python: '>=3.7' +- pypi: https://files.pythonhosted.org/packages/ff/6b/a1548ac378a78332a4c3dcf4a134c2475a36d2a22ddfa272acd574140b50/ruff-0.15.9-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl + name: ruff + version: 0.15.9 + sha256: 2b0c7c341f68adb01c488c3b7d4b49aa8ea97409eae6462d860a79cf55f431b6 + requires_python: '>=3.7' +- conda: https://conda.anaconda.org/conda-forge/linux-64/tk-8.6.13-noxft_h366c992_103.conda + sha256: cafeec44494f842ffeca27e9c8b0c27ed714f93ac77ddadc6aaf726b5554ebac + md5: cffd3bdd58090148f4cfcd831f4b26ab + depends: + - __glibc >=2.17,<3.0.a0 + - libgcc >=14 + - libzlib >=1.3.1,<2.0a0 + constrains: + - xorg-libx11 >=1.8.12,<2.0a0 + license: TCL + license_family: BSD + purls: [] + size: 3301196 + timestamp: 1769460227866 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/tk-8.6.13-noxft_h0dc03b3_103.conda + sha256: e25c314b52764219f842b41aea2c98a059f06437392268f09b03561e4f6e5309 + md5: 7fc6affb9b01e567d2ef1d05b84aa6ed + depends: + - libgcc >=14 + - libzlib >=1.3.1,<2.0a0 + constrains: + - xorg-libx11 >=1.8.12,<2.0a0 + license: TCL + license_family: BSD + purls: [] + size: 3368666 + timestamp: 1769464148928 +- conda: https://conda.anaconda.org/conda-forge/noarch/tzdata-2025c-hc9c84f9_1.conda + sha256: 1d30098909076af33a35017eed6f2953af1c769e273a0626a04722ac4acaba3c + md5: ad659d0a2b3e47e38d829aa8cad2d610 + license: LicenseRef-Public-Domain + purls: [] + size: 119135 + timestamp: 1767016325805 +- conda: https://conda.anaconda.org/conda-forge/linux-64/zstd-1.5.7-hb78ec9c_6.conda + sha256: 68f0206ca6e98fea941e5717cec780ed2873ffabc0e1ed34428c061e2c6268c7 + md5: 4a13eeac0b5c8e5b8ab496e6c4ddd829 + depends: + - __glibc >=2.17,<3.0.a0 + - libzlib >=1.3.1,<2.0a0 + license: BSD-3-Clause + license_family: BSD + purls: [] + size: 601375 + timestamp: 1764777111296 +- conda: https://conda.anaconda.org/conda-forge/linux-aarch64/zstd-1.5.7-h85ac4a6_6.conda + sha256: 569990cf12e46f9df540275146da567d9c618c1e9c7a0bc9d9cfefadaed20b75 + md5: c3655f82dcea2aa179b291e7099c1fcc + depends: + - libzlib >=1.3.1,<2.0a0 + license: BSD-3-Clause + license_family: BSD + purls: [] + size: 614429 + timestamp: 1764777145593 diff --git a/python-sdk/pixi.toml b/python-sdk/pixi.toml new file mode 100644 index 0000000..9261b8a --- /dev/null +++ b/python-sdk/pixi.toml @@ -0,0 +1,22 @@ +[workspace] +name = "bubbaloop-sdk" +version = "0.1.0" +description = "Bubbaloop Node SDK for Python" +channels = ["conda-forge"] +platforms = ["linux-64", "linux-aarch64"] + +[dependencies] +python = ">=3.10" + +[pypi-dependencies] +bubbaloop-sdk = { path = ".", extras = ["dev"] } + +[tasks] +install = "pip install -e '.[dev]'" +test = "pytest tests/ -v" +test-cov = "pytest tests/ -v --cov=bubbaloop_sdk --cov-report=term-missing" +lint = "ruff check bubbaloop_sdk/ tests/" +lint-fix = "ruff check --fix bubbaloop_sdk/ tests/" +fmt = "ruff format bubbaloop_sdk/ tests/" +fmt-check = "ruff format --check bubbaloop_sdk/ tests/" +check = { depends-on = ["fmt-check", "lint"] } diff --git a/python-sdk/pyproject.toml b/python-sdk/pyproject.toml index c391506..267e990 100644 --- a/python-sdk/pyproject.toml +++ b/python-sdk/pyproject.toml @@ -6,15 +6,104 @@ build-backend = "setuptools.build_meta" name = "bubbaloop-sdk" version = "0.1.0" description = "Bubbaloop Node SDK for Python" -requires-python = ">=3.9" +readme = "README.md" +license = { text = "Apache-2.0" } +authors = [{ name = "Kornia Team", email = "edgar.riba@gmail.com" }] +requires-python = ">=3.10" +keywords = ["robotics", "physical-ai", "zenoh", "pub-sub"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Natural Language :: English", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Software Development :: Libraries", + "Topic :: System :: Distributed Computing", +] dependencies = [ "eclipse-zenoh>=1.7,<2", "protobuf>=4.0", + "pyyaml>=6.0", ] +[project.urls] +"Bug Tracker" = "https://github.com/kornia/bubbaloop/issues" +"Source Code" = "https://github.com/kornia/bubbaloop" +Homepage = "https://github.com/kornia/bubbaloop" + [project.optional-dependencies] -dev = ["pytest", "pytest-asyncio"] +dev = [ + "pytest", + "pytest-asyncio", + "pytest-cov", + "ruff", +] [tool.setuptools.packages.find] where = ["."] include = ["bubbaloop_sdk*"] + +# --------------------------------------------------------------------------- +# pytest +# --------------------------------------------------------------------------- + +[tool.pytest.ini_options] +addopts = "--color=yes -v" +testpaths = ["tests"] + +# --------------------------------------------------------------------------- +# coverage +# --------------------------------------------------------------------------- + +[tool.coverage.run] +branch = true +source = ["bubbaloop_sdk/"] + +[tool.coverage.report] +show_missing = true +skip_covered = true +exclude_lines = [ + "pragma: no cover", + "raise NotImplementedError", + "if TYPE_CHECKING:", + "def __repr__", +] + +# --------------------------------------------------------------------------- +# Ruff — linter + formatter (replaces flake8, isort, pycodestyle) +# Follows the same pattern as kornia/kornia +# --------------------------------------------------------------------------- + +[tool.ruff] +line-length = 120 +target-version = "py310" + +[tool.ruff.format] +skip-magic-trailing-comma = false + +[tool.ruff.lint] +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # Pyflakes + "I", # isort + "B", # flake8-bugbear + "UP", # pyupgrade + "C4", # flake8-comprehensions + "RUF", # Ruff-specific rules +] +ignore = [] + +[tool.ruff.lint.isort] +known-first-party = ["bubbaloop_sdk"] +split-on-trailing-comma = true + +[tool.ruff.lint.per-file-ignores] +# __init__.py files may re-export names (star imports allowed) +"*/__init__.py" = ["F401", "F403"] +# tests don't need docstrings and use assert freely +"tests/*" = ["S101", "D"] diff --git a/python-sdk/tests/test_context.py b/python-sdk/tests/test_context.py index 9bf2004..715b569 100644 --- a/python-sdk/tests/test_context.py +++ b/python-sdk/tests/test_context.py @@ -10,16 +10,14 @@ import pytest - # --------------------------------------------------------------------------- # topic() # --------------------------------------------------------------------------- + def test_topic_formatting(): ctx = _make_context("jetson_orin") - assert ctx.topic("camera/front/compressed") == ( - "bubbaloop/global/jetson_orin/camera/front/compressed" - ) + assert ctx.topic("camera/front/compressed") == ("bubbaloop/global/jetson_orin/camera/front/compressed") def test_local_topic_formatting(): @@ -36,6 +34,7 @@ def test_topic_wildcard_suffix(): # _resolve_topic() # --------------------------------------------------------------------------- + def test_resolve_topic_global(): ctx = _make_context("bot") assert ctx._resolve_topic("data", False) == "bubbaloop/global/bot/data" @@ -60,14 +59,17 @@ def test_global_and_local_share_suffix(): # _hostname() sanitization # --------------------------------------------------------------------------- + def test_hostname_sanitization_hyphens(monkeypatch): from bubbaloop_sdk.context import _hostname + monkeypatch.setattr(socket, "gethostname", lambda: "my-robot-01") assert _hostname() == "my_robot_01" def test_hostname_no_hyphens(monkeypatch): from bubbaloop_sdk.context import _hostname + monkeypatch.setattr(socket, "gethostname", lambda: "myrobot") assert _hostname() == "myrobot" @@ -76,25 +78,43 @@ def test_hostname_no_hyphens(monkeypatch): # Import surface # --------------------------------------------------------------------------- + def test_import_node_context(): from bubbaloop_sdk import NodeContext + assert NodeContext is not None def test_import_publishers(): - from bubbaloop_sdk import ProtoPublisher, JsonPublisher + from bubbaloop_sdk import JsonPublisher, ProtoPublisher + assert ProtoPublisher is not None assert JsonPublisher is not None def test_import_subscribers(): from bubbaloop_sdk import ProtoSubscriber, RawSubscriber + assert ProtoSubscriber is not None assert RawSubscriber is not None +def test_import_callback_subscribers(): + from bubbaloop_sdk import CallbackSubscriber, RawCallbackSubscriber + + assert CallbackSubscriber is not None + assert RawCallbackSubscriber is not None + + +def test_import_queryable(): + from bubbaloop_sdk import Queryable + + assert Queryable is not None + + def test_import_run_node(): from bubbaloop_sdk import run_node + assert callable(run_node) @@ -102,6 +122,7 @@ def test_import_run_node(): # Shutdown # --------------------------------------------------------------------------- + def test_shutdown_not_set_initially(): ctx = _make_context("bot") assert not ctx.is_shutdown() @@ -117,8 +138,10 @@ def test_shutdown_set_manually(): # ProtoPublisher.put() # --------------------------------------------------------------------------- + def test_proto_publisher_rejects_invalid_type(): from bubbaloop_sdk.publisher import ProtoPublisher + pub = ProtoPublisher(MagicMock(), None) with pytest.raises(TypeError): pub.put(12345) @@ -126,6 +149,7 @@ def test_proto_publisher_rejects_invalid_type(): def test_proto_publisher_accepts_bytes(): from bubbaloop_sdk.publisher import ProtoPublisher + mock_pub = MagicMock() ProtoPublisher(mock_pub, None).put(b"\x01\x02\x03") mock_pub.put.assert_called_once_with(b"\x01\x02\x03") @@ -133,6 +157,7 @@ def test_proto_publisher_accepts_bytes(): def test_proto_publisher_calls_serialize(): from bubbaloop_sdk.publisher import ProtoPublisher + fake_msg = MagicMock() fake_msg.SerializeToString.return_value = b"\xde\xad\xbe\xef" mock_pub = MagicMock() @@ -144,8 +169,10 @@ def test_proto_publisher_calls_serialize(): # JsonPublisher.put() # --------------------------------------------------------------------------- + def test_json_publisher_serializes_dict(): from bubbaloop_sdk.publisher import JsonPublisher + mock_pub = MagicMock() JsonPublisher(mock_pub).put({"temperature": 22.5}) assert json.loads(mock_pub.put.call_args[0][0]) == {"temperature": 22.5} @@ -153,6 +180,7 @@ def test_json_publisher_serializes_dict(): def test_json_publisher_passthrough_bytes(): from bubbaloop_sdk.publisher import JsonPublisher + mock_pub = MagicMock() JsonPublisher(mock_pub).put(b"raw") mock_pub.put.assert_called_once_with(b"raw") @@ -160,17 +188,733 @@ def test_json_publisher_passthrough_bytes(): def test_json_publisher_passthrough_str(): from bubbaloop_sdk.publisher import JsonPublisher + mock_pub = MagicMock() JsonPublisher(mock_pub).put("hello") mock_pub.put.assert_called_once_with(b"hello") +def test_raw_subscriber_recv_returns_bytes(): + """RawSubscriber.recv() returns bytes from sample payload.""" + from bubbaloop_sdk.subscriber import RawSubscriber + + fake_sample = MagicMock() + fake_sample.payload = b"\xde\xad\xbe\xef" + + mock_sub = MagicMock() + mock_sub.recv.return_value = fake_sample + + mock_session = MagicMock() + mock_session.declare_subscriber.return_value = mock_sub + + sub = RawSubscriber(mock_session, "test/topic") + result = sub.recv() + assert result == b"\xde\xad\xbe\xef" + + +# --------------------------------------------------------------------------- +# RawSubscriber — undeclare unblocks recv() +# --------------------------------------------------------------------------- + + +def test_raw_subscriber_undeclare_is_idempotent(): + """undeclare() can be called twice without error.""" + from bubbaloop_sdk.subscriber import RawSubscriber + + mock_sub = MagicMock() + mock_session = MagicMock() + mock_session.declare_subscriber.return_value = mock_sub + + sub = RawSubscriber(mock_session, "test/topic") + sub.undeclare() + sub.undeclare() # second call is a no-op + mock_sub.undeclare.assert_called_once() + + +# --------------------------------------------------------------------------- +# CallbackSubscriber(max_workers=4) / RawCallbackSubscriber(max_workers=4) — _closing flag +# --------------------------------------------------------------------------- + + +def test_callback_subscriber_with_workers_drops_after_undeclare(): + """Callbacks arriving after undeclare() are silently dropped.""" + from bubbaloop_sdk.subscriber import CallbackSubscriber + + mock_session = MagicMock() + captured_handler = [] + + def fake_declare(topic, handler): + captured_handler.append(handler) + return MagicMock() + + mock_session.declare_subscriber.side_effect = fake_declare + received = [] + called = threading.Event() + + def handler(msg): + received.append(msg) + called.set() + + sub = CallbackSubscriber(mock_session, "test/topic", handler, MagicMock(), max_workers=4) + sub.undeclare() + + # Simulate a late-arriving Zenoh callback after undeclare. + # _closing is already set so _wrap returns early — handler is never submitted. + fake_sample = MagicMock() + captured_handler[0](fake_sample) # must not raise + + # Give the executor no chance to run (it's shut down); assert immediately. + assert not called.wait(timeout=0.1), "handler should not be called after undeclare()" + assert received == [] + + +def test_raw_callback_subscriber_with_workers_drops_after_undeclare(): + """Callbacks arriving after undeclare() are silently dropped.""" + from bubbaloop_sdk.subscriber import RawCallbackSubscriber + + mock_session = MagicMock() + captured_handler = [] + + def fake_declare(key_expr, handler): + captured_handler.append(handler) + return MagicMock() + + mock_session.declare_subscriber.side_effect = fake_declare + received = [] + called = threading.Event() + + def handler(sample): + received.append(sample) + called.set() + + sub = RawCallbackSubscriber(mock_session, "test/**", handler, max_workers=4) + sub.undeclare() + + fake_sample = MagicMock() + captured_handler[0](fake_sample) # must not raise + + assert not called.wait(timeout=0.1), "handler should not be called after undeclare()" + assert received == [] + + +def test_async_queryable_drops_after_undeclare(): + """Queries arriving after undeclare() are silently dropped (thread pool mode).""" + from bubbaloop_sdk.subscriber import Queryable + + mock_session = MagicMock() + captured_wrapper = [] + + def fake_declare(key_expr, wrapper): + captured_wrapper.append(wrapper) + return MagicMock() + + mock_session.declare_queryable.side_effect = fake_declare + received = [] + called = threading.Event() + + def handler(query): + received.append(query) + called.set() + + aq = Queryable(mock_session, "test/topic", handler, max_workers=4) + aq.undeclare() + + fake_query = MagicMock() + captured_wrapper[0](fake_query) # must not raise + + assert not called.wait(timeout=0.1), "handler should not be called after undeclare()" + assert received == [] + + +# --------------------------------------------------------------------------- +# CallbackSubscriber +# --------------------------------------------------------------------------- + + +def test_callback_subscriber_calls_handler_with_decoded(): + """Handler receives whatever registry.decode() returns.""" + from bubbaloop_sdk.subscriber import CallbackSubscriber + + mock_session = MagicMock() + captured_handler = [] + + def fake_declare(topic, handler): + captured_handler.append(handler) + return MagicMock() + + mock_session.declare_subscriber.side_effect = fake_declare + + mock_registry = MagicMock() + mock_registry.decode.return_value = {"temperature": 22.5} + + received = [] + sub = CallbackSubscriber(mock_session, "test/topic", lambda msg: received.append(msg), mock_registry) + + fake_sample = MagicMock() + captured_handler[0](fake_sample) + + assert received == [{"temperature": 22.5}] + mock_registry.decode.assert_called_once_with(fake_sample) + sub.undeclare() + + +def test_callback_subscriber_passes_sample_to_registry(): + """CallbackSubscriber passes the zenoh.Sample to registry.decode().""" + from bubbaloop_sdk.subscriber import CallbackSubscriber + + mock_session = MagicMock() + captured_handler = [] + + def fake_declare(topic, handler): + captured_handler.append(handler) + return MagicMock() + + mock_session.declare_subscriber.side_effect = fake_declare + + mock_registry = MagicMock() + mock_registry.decode.return_value = "decoded_proto" + + received = [] + sub = CallbackSubscriber(mock_session, "test/topic", lambda msg: received.append(msg), mock_registry) + + fake_sample = MagicMock() + captured_handler[0](fake_sample) + + assert received == ["decoded_proto"] + mock_registry.decode.assert_called_once_with(fake_sample) + sub.undeclare() + + +def test_callback_subscriber_undeclare(): + """undeclare() calls undeclare on the underlying zenoh subscriber.""" + from bubbaloop_sdk.subscriber import CallbackSubscriber + + mock_session = MagicMock() + mock_sub = MagicMock() + mock_session.declare_subscriber.return_value = mock_sub + sub = CallbackSubscriber(mock_session, "test/topic", lambda msg: None, MagicMock()) + sub.undeclare() + mock_sub.undeclare.assert_called_once() + + +# --------------------------------------------------------------------------- +# RawCallbackSubscriber +# --------------------------------------------------------------------------- + + +def test_raw_callback_subscriber_passes_sample(): + """Handler receives the raw zenoh.Sample object.""" + from bubbaloop_sdk.subscriber import RawCallbackSubscriber + + mock_session = MagicMock() + captured_handler = [] + + def fake_declare(key_expr, handler): + captured_handler.append(handler) + return MagicMock() + + mock_session.declare_subscriber.side_effect = fake_declare + received = [] + sub = RawCallbackSubscriber(mock_session, "test/**", lambda s: received.append(s)) + + fake_sample = MagicMock() + captured_handler[0](fake_sample) + + assert received == [fake_sample] + sub.undeclare() + + +def test_raw_callback_subscriber_undeclare(): + """undeclare() calls undeclare on the underlying zenoh subscriber.""" + from bubbaloop_sdk.subscriber import RawCallbackSubscriber + + mock_session = MagicMock() + mock_sub = MagicMock() + mock_session.declare_subscriber.return_value = mock_sub + sub = RawCallbackSubscriber(mock_session, "test/**", lambda s: None) + sub.undeclare() + mock_sub.undeclare.assert_called_once() + + +def test_callback_subscriber_undeclare_is_idempotent(): + """undeclare() can be called twice without error.""" + from bubbaloop_sdk.subscriber import CallbackSubscriber + + mock_session = MagicMock() + mock_session.declare_subscriber.return_value = MagicMock() + sub = CallbackSubscriber(mock_session, "test/topic", lambda msg: None, MagicMock()) + sub.undeclare() + sub.undeclare() # second call is a no-op + mock_session.declare_subscriber.return_value.undeclare.assert_called_once() + + +def test_callback_subscriber_with_workers_undeclare_is_idempotent(): + """undeclare() can be called twice without error.""" + from bubbaloop_sdk.subscriber import CallbackSubscriber + + mock_session = MagicMock() + mock_sub = MagicMock() + mock_session.declare_subscriber.return_value = mock_sub + sub = CallbackSubscriber(mock_session, "test/topic", lambda msg: None, MagicMock(), max_workers=4) + sub.undeclare() + sub.undeclare() # second call is a no-op + mock_sub.undeclare.assert_called_once() + + +# --------------------------------------------------------------------------- +# CallbackSubscriber with max_workers (thread pool mode) +# --------------------------------------------------------------------------- + + +def test_callback_subscriber_with_workers_calls_handler_in_thread_pool(): + """Handler is called asynchronously via thread pool when max_workers is set.""" + import threading + + from bubbaloop_sdk.subscriber import CallbackSubscriber + + mock_session = MagicMock() + captured_handler = [] + + def fake_declare(topic, handler): + captured_handler.append(handler) + return MagicMock() + + mock_session.declare_subscriber.side_effect = fake_declare + + mock_registry = MagicMock() + mock_registry.decode.return_value = b"\xca\xfe" + + received = [] + event = threading.Event() + + def slow_handler(msg): + received.append(msg) + event.set() + + sub = CallbackSubscriber(mock_session, "test/topic", slow_handler, mock_registry, max_workers=4) + + fake_sample = MagicMock() + captured_handler[0](fake_sample) + + assert event.wait(timeout=2.0), "handler was not called within 2s" + assert received == [b"\xca\xfe"] + sub.undeclare() + + +def test_callback_subscriber_with_workers_passes_sample_to_registry(): + """CallbackSubscriber with max_workers passes the zenoh.Sample to registry.decode().""" + import threading + + from bubbaloop_sdk.subscriber import CallbackSubscriber + + mock_session = MagicMock() + captured_handler = [] + + def fake_declare(topic, handler): + captured_handler.append(handler) + return MagicMock() + + mock_session.declare_subscriber.side_effect = fake_declare + + mock_registry = MagicMock() + mock_registry.decode.return_value = "decoded" + received = [] + event = threading.Event() + + def handler(msg): + received.append(msg) + event.set() + + sub = CallbackSubscriber(mock_session, "test/topic", handler, mock_registry, max_workers=4) + + fake_sample = MagicMock() + captured_handler[0](fake_sample) + + assert event.wait(timeout=2.0) + assert received == ["decoded"] + mock_registry.decode.assert_called_once_with(fake_sample) + sub.undeclare() + + +def test_raw_callback_subscriber_with_workers_passes_sample(): + """RawCallbackSubscriber with max_workers handler receives raw zenoh.Sample.""" + import threading + + from bubbaloop_sdk.subscriber import RawCallbackSubscriber + + mock_session = MagicMock() + captured_handler = [] + + def fake_declare(key_expr, handler): + captured_handler.append(handler) + return MagicMock() + + mock_session.declare_subscriber.side_effect = fake_declare + received = [] + event = threading.Event() + + def handler(sample): + received.append(sample) + event.set() + + sub = RawCallbackSubscriber(mock_session, "test/**", handler, max_workers=4) + + fake_sample = MagicMock() + captured_handler[0](fake_sample) + + assert event.wait(timeout=2.0) + assert received == [fake_sample] + sub.undeclare() + + +def test_callback_subscriber_with_workers_undeclare(): + """undeclare() shuts down executor and undeclares underlying sub.""" + from bubbaloop_sdk.subscriber import CallbackSubscriber + + mock_session = MagicMock() + mock_sub = MagicMock() + mock_session.declare_subscriber.return_value = mock_sub + sub = CallbackSubscriber(mock_session, "test/topic", lambda msg: None, MagicMock(), max_workers=4) + sub.undeclare() + mock_sub.undeclare.assert_called_once() + + +def test_raw_callback_subscriber_with_workers_undeclare(): + """undeclare() shuts down executor and undeclares underlying sub.""" + from bubbaloop_sdk.subscriber import RawCallbackSubscriber + + mock_session = MagicMock() + mock_sub = MagicMock() + mock_session.declare_subscriber.return_value = mock_sub + sub = RawCallbackSubscriber(mock_session, "test/**", lambda s: None, max_workers=4) + sub.undeclare() + mock_sub.undeclare.assert_called_once() + + +# --------------------------------------------------------------------------- +# NodeContext.queryable() and queryable_raw() +# --------------------------------------------------------------------------- + + +def test_queryable_uses_topic_prefix(): + """queryable() declares at bubbaloop/global/{machine_id}/{suffix}.""" + ctx = _make_context("bot") + + def handler(q): + pass + + qbl = ctx.queryable("command", handler) + try: + called_topic = ctx.session.declare_queryable.call_args[0][0] + assert called_topic == "bubbaloop/global/bot/command" + finally: + qbl.undeclare() + + +def test_queryable_raw_uses_literal_key_expr(): + """queryable_raw() declares at the literal key expression provided.""" + ctx = _make_context("bot") + + def handler(q): + pass + + qbl = ctx.queryable_raw("bubbaloop/**/schema", handler) + try: + called_topic = ctx.session.declare_queryable.call_args[0][0] + assert called_topic == "bubbaloop/**/schema" + finally: + qbl.undeclare() + + +def test_queryable_returns_async_queryable(): + """queryable() returns Queryable.""" + from bubbaloop_sdk.subscriber import Queryable + + ctx = _make_context("bot") + result = ctx.queryable("command", lambda q: None) + try: + assert isinstance(result, Queryable) + finally: + result.undeclare() + + +# --------------------------------------------------------------------------- +# NodeContext.queryable(max_workers) and queryable_raw(max_workers) +# --------------------------------------------------------------------------- + + +def test_queryable_with_workers_uses_topic_prefix(): + """queryable(max_workers=4) declares at topic(suffix).""" + ctx = _make_context("bot") + + def handler(q): + pass + + qbl = ctx.queryable("command", handler, max_workers=4) + try: + called_topic = ctx.session.declare_queryable.call_args[0][0] + assert called_topic == "bubbaloop/global/bot/command" + finally: + qbl.undeclare() + + +def test_queryable_with_workers_wraps_handler_in_executor(): + """queryable(max_workers=4) wraps handler so Zenoh thread is freed.""" + import threading + + ctx = _make_context("bot") + captured_wrapper = [] + + def fake_declare(topic, wrapper): + captured_wrapper.append(wrapper) + return MagicMock() + + ctx.session.declare_queryable.side_effect = fake_declare + + received = [] + event = threading.Event() + + def slow_handler(query): + received.append(query) + event.set() + + qbl = ctx.queryable("command", slow_handler, max_workers=4) + + try: + fake_query = MagicMock() + captured_wrapper[0](fake_query) # Zenoh calls the wrapper + + assert event.wait(timeout=2.0), "handler not called within 2s" + assert received == [fake_query] + finally: + qbl.undeclare() + + +def test_queryable_with_workers_returns_async_queryable(): + """queryable(max_workers=4) returns Queryable.""" + from bubbaloop_sdk.subscriber import Queryable + + ctx = _make_context("bot") + qbl = ctx.queryable("command", lambda q: None, max_workers=4) + try: + assert isinstance(qbl, Queryable) + finally: + qbl.undeclare() + + +def test_queryable_raw_with_workers_uses_literal_key_expr(): + """queryable_raw(max_workers=4) declares at the literal key expression.""" + ctx = _make_context("bot") + qbl = ctx.queryable_raw("bubbaloop/**/schema", lambda q: None, max_workers=4) + try: + called_topic = ctx.session.declare_queryable.call_args[0][0] + assert called_topic == "bubbaloop/**/schema" + finally: + qbl.undeclare() + + +def test_queryable_raw_with_workers_wraps_handler_in_executor(): + """queryable_raw(max_workers=4) wraps handler in thread pool.""" + import threading + + ctx = _make_context("bot") + captured_wrapper = [] + + def fake_declare(key_expr, wrapper): + captured_wrapper.append(wrapper) + return MagicMock() + + ctx.session.declare_queryable.side_effect = fake_declare + + received = [] + event = threading.Event() + + def handler(query): + received.append(query) + event.set() + + qbl = ctx.queryable_raw("bubbaloop/**/schema", handler, max_workers=4) + + try: + fake_query = MagicMock() + captured_wrapper[0](fake_query) + + assert event.wait(timeout=2.0), "handler not called within 2s" + assert received == [fake_query] + finally: + qbl.undeclare() + + +def test_async_queryable_undeclare(): + """Queryable.undeclare() undeclares queryable then shuts executor.""" + from bubbaloop_sdk.subscriber import Queryable + + mock_session = MagicMock() + mock_qbl = MagicMock() + mock_session.declare_queryable.return_value = mock_qbl + aq = Queryable(mock_session, "test/topic", lambda q: None) + aq.undeclare() + mock_qbl.undeclare.assert_called_once() + + +# --------------------------------------------------------------------------- +# NodeContext.subscriber_callback() +# --------------------------------------------------------------------------- + + +def test_subscriber_callback_uses_topic_prefix(): + """subscriber_callback() declares at topic(suffix).""" + ctx = _make_context("bot") + ctx._schema_registry = MagicMock() + ctx.subscriber_callback("sensor/data", lambda msg: None) + called_topic = ctx.session.declare_subscriber.call_args[0][0] + assert called_topic == "bubbaloop/global/bot/sensor/data" + + +def test_subscriber_raw_callback_uses_literal_key_expr(): + """subscriber_raw_callback() declares at the literal key expression.""" + ctx = _make_context("bot") + ctx.subscriber_raw_callback("bubbaloop/**/health", lambda s: None) + called_topic = ctx.session.declare_subscriber.call_args[0][0] + assert called_topic == "bubbaloop/**/health" + + +def test_subscriber_callback_with_workers_uses_topic_prefix(): + """subscriber_callback() with max_workers declares at topic(suffix).""" + ctx = _make_context("bot") + ctx._schema_registry = MagicMock() + sub = ctx.subscriber_callback("sensor/data", lambda msg: None, max_workers=4) + try: + called_topic = ctx.session.declare_subscriber.call_args[0][0] + assert called_topic == "bubbaloop/global/bot/sensor/data" + finally: + sub.undeclare() + + +def test_subscriber_raw_callback_with_workers_uses_literal_key_expr(): + """subscriber_raw_callback() with max_workers declares at literal key expression.""" + ctx = _make_context("bot") + sub = ctx.subscriber_raw_callback("bubbaloop/**/health", lambda s: None, max_workers=4) + try: + called_topic = ctx.session.declare_subscriber.call_args[0][0] + assert called_topic == "bubbaloop/**/health" + finally: + sub.undeclare() + + +# --------------------------------------------------------------------------- +# NodeContext.publisher_json() / publisher_proto() via context +# --------------------------------------------------------------------------- + + +def test_publisher_json_uses_topic_prefix(): + """publisher_json() declares at topic(suffix).""" + ctx = _make_context("bot") + ctx.publisher_json("weather/current") + called_topic = ctx.session.declare_publisher.call_args[0][0] + assert called_topic == "bubbaloop/global/bot/weather/current" + + +def test_publisher_proto_uses_topic_prefix(): + """publisher_proto() declares at topic(suffix).""" + ctx = _make_context("bot") + fake_class = MagicMock() + fake_class.DESCRIPTOR.full_name = "my.SensorData" + ctx.publisher_proto("sensor/data", fake_class) + called_topic = ctx.session.declare_publisher.call_args[0][0] + assert called_topic == "bubbaloop/global/bot/sensor/data" + + +# --------------------------------------------------------------------------- +# NodeContext.close() and context manager +# --------------------------------------------------------------------------- + + +def test_close_calls_session_close(): + """close() calls session.close().""" + ctx = _make_context("bot") + ctx.close() + ctx.session.close.assert_called_once() + + +def test_context_manager_calls_close(): + """__exit__ calls close() so the session is always cleaned up.""" + ctx = _make_context("bot") + with ctx: + pass + ctx.session.close.assert_called_once() + + +# --------------------------------------------------------------------------- +# NodeContext.connect() — env var resolution +# --------------------------------------------------------------------------- + + +def test_connect_reads_machine_id_from_env(monkeypatch): + """BUBBALOOP_MACHINE_ID env var sets ctx.machine_id.""" + import zenoh + + monkeypatch.setenv("BUBBALOOP_MACHINE_ID", "jetson_orin") + + monkeypatch.delenv("BUBBALOOP_ZENOH_ENDPOINT", raising=False) + monkeypatch.setattr(zenoh, "open", lambda cfg: MagicMock()) + monkeypatch.setattr(zenoh, "Config", MagicMock) + from bubbaloop_sdk.context import NodeContext + + ctx = NodeContext.connect() + assert ctx.machine_id == "jetson_orin" + + +def test_connect_instance_name_override(monkeypatch): + """instance_name kwarg overrides hostname fallback.""" + import zenoh + + monkeypatch.delenv("BUBBALOOP_MACHINE_ID", raising=False) + + monkeypatch.delenv("BUBBALOOP_ZENOH_ENDPOINT", raising=False) + monkeypatch.setattr(zenoh, "open", lambda cfg: MagicMock()) + monkeypatch.setattr(zenoh, "Config", MagicMock) + from bubbaloop_sdk.context import NodeContext + + ctx = NodeContext.connect(instance_name="tapo_entrance") + assert ctx.instance_name == "tapo_entrance" + + +# --------------------------------------------------------------------------- +# RawSubscriber — undeclare() and iteration +# --------------------------------------------------------------------------- + + +def test_raw_subscriber_undeclare(): + """undeclare() calls undeclare on the underlying zenoh subscriber.""" + from bubbaloop_sdk.subscriber import RawSubscriber + + mock_session = MagicMock() + mock_sub = MagicMock() + mock_session.declare_subscriber.return_value = mock_sub + sub = RawSubscriber(mock_session, "test/topic") + sub.undeclare() + mock_sub.undeclare.assert_called_once() + + +def test_raw_subscriber_declares_on_topic(): + """RawSubscriber declares a zenoh subscriber on the given topic.""" + from bubbaloop_sdk.subscriber import RawSubscriber + + mock_session = MagicMock() + mock_session.declare_subscriber.return_value = MagicMock() + RawSubscriber(mock_session, "test/topic") + mock_session.declare_subscriber.assert_called_once_with("test/topic") + + # --------------------------------------------------------------------------- # RawPublisher.put() # --------------------------------------------------------------------------- + def test_raw_publisher_puts_bytes(): from bubbaloop_sdk.publisher import RawPublisher + mock_pub = MagicMock() RawPublisher(mock_pub).put(b"\x00\x01\x02") mock_pub.put.assert_called_once_with(b"\x00\x01\x02") @@ -178,6 +922,7 @@ def test_raw_publisher_puts_bytes(): def test_raw_publisher_converts_bytearray(): from bubbaloop_sdk.publisher import RawPublisher + mock_pub = MagicMock() RawPublisher(mock_pub).put(bytearray([0xFF, 0xFE])) mock_pub.put.assert_called_once_with(b"\xff\xfe") @@ -187,23 +932,28 @@ def test_raw_publisher_converts_bytearray(): # Import surface (new exports) # --------------------------------------------------------------------------- + def test_import_raw_publisher(): from bubbaloop_sdk import RawPublisher + assert RawPublisher is not None def test_import_proto_decoder(): from bubbaloop_sdk import ProtoDecoder + assert ProtoDecoder is not None def test_import_discover_nodes(): from bubbaloop_sdk import discover_nodes + assert callable(discover_nodes) def test_import_get_sample(): - from bubbaloop_sdk import get_sample, GetSampleTimeout + from bubbaloop_sdk import GetSampleTimeout, get_sample + assert callable(get_sample) assert GetSampleTimeout is not None @@ -212,8 +962,10 @@ def test_import_get_sample(): # Helper # --------------------------------------------------------------------------- + def _make_context(machine_id: str): from bubbaloop_sdk.context import NodeContext + ctx = object.__new__(NodeContext) ctx.session = MagicMock() ctx.machine_id = machine_id