-
-
Notifications
You must be signed in to change notification settings - Fork 23
feat(python-sdk): callback subscribers, queryables, and publisher convenience methods #66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
lferraz
wants to merge
56
commits into
kornia:main
Choose a base branch
from
lferraz:feat/python-sdk-callback-subscriber-queryable
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
56 commits
Select commit
Hold shift + click to select a range
1d8e8a2
feat(python-sdk): queue-backed TypedSubscriber/RawSubscriber with opt…
lferraz afadda2
test(python-sdk): tests for callback subscriber variants
lferraz d7f4de1
test(python-sdk): tests for callback subscriber variants and async un…
lferraz 9e2e9d4
fix(python-sdk): move concurrent.futures import to module level, fix …
lferraz 2e0f263
feat(python-sdk): add queryable() and subscriber_callback() methods t…
lferraz 2846535
fix(python-sdk): AsyncQueryable wrapper prevents executor leak in que…
lferraz d8f1896
feat(python-sdk): export new subscriber/queryable classes; add lintin…
lferraz 2c18ead
docs(python-sdk): rewrite README for synchronous API with new subscri…
lferraz 16c5208
chore(python-sdk): replace .flake8 with ruff in pyproject.toml; fix lint
lferraz f0bd66e
docs(python-sdk): add CONTRIBUTING.md with dev setup, lint config, te…
lferraz 3c8fcc4
chore(python-sdk): add pixi.toml; expand pyproject.toml; fix unused i…
lferraz f286310
chore(python-sdk): scope F821 suppression to context.py only
lferraz a19d90d
fix(python-sdk): raise GetSampleTimeout from err; remove global B904 …
lferraz ce0b18d
chore: add markdownlint config; clean up README and CONTRIBUTING
lferraz 2d3ce64
refactor(python-sdk): use TYPE_CHECKING for type annotations in conte…
lferraz 9c06330
ci: add Python SDK lint and test steps
lferraz fcbac7a
docs(python-sdk): add CLAUDE.md with conventions, threading model, pi…
lferraz f87ea16
test(python-sdk): add missing tests for publisher/subscriber context …
lferraz bdbf7c0
fix(python-sdk): add from __future__ import annotations to context.py
lferraz 0105a09
fix(python-sdk): address PR review comments
lferraz 485d54e
test(python-sdk): cover PR review fixes — sentinel, _closing flag, de…
lferraz 254ada7
fix(python-sdk): address new Copilot review comments
lferraz afb5ac6
fix(python-sdk): move FromString decode off Zenoh thread; replace tim…
lferraz 1a543e2
docs(python-sdk): add undeclare() calls to README quick-start examples
lferraz b59f510
test(python-sdk): undeclare async objects in tests; use try/finally f…
lferraz e854706
fix(python-sdk): add persistent _closed flag to TypedSubscriber and R…
lferraz 4ccad57
fix(python-sdk): polling recv() for multi-consumer safety; fix GC doc…
lferraz e857a4f
fix(python-sdk): set _shutdown before ctx.close() in run_node(); join…
lferraz e1203c0
fix(python-sdk): broaden try/finally in run_node(); pin Python 3.12 i…
lferraz 088baa6
Merge remote-tracking branch 'upstream/main' into feat/python-sdk-cal…
lferraz 39ceb2d
chore: merge upstream/main into feat/python-sdk-callback-subscriber-q…
lferraz 6e1fcbe
feat(python-sdk): add SchemaRegistry for on-demand protobuf schema re…
lferraz 0b5f46a
fix(python-sdk): correct merge artifacts in TypedSubscriber and Proto…
lferraz d670b42
docs(python-sdk): update CLAUDE.md with typing, docstring, and format…
lferraz feb3cc6
test(python-sdk): update tests for scope removal and new global/local…
lferraz 18ac2b5
Merge remote-tracking branch 'upstream/main' into feat/python-sdk-cal…
lferraz 8bc9506
refactor(python-sdk): remove TypedSubscriber from subscriber.py
lferraz f0a8660
refactor(python-sdk): remove TypedSubscriber from public API and context
lferraz 1d400ef
refactor(python-sdk): all callback classes inherit _BaseSubscriber, i…
lferraz eb34fbe
test(python-sdk): remove TypedSubscriber tests
lferraz e96917b
docs(python-sdk): remove TypedSubscriber references, update subscribe…
lferraz b240353
chore(python-sdk): add pixi.lock
lferraz 3bc6e64
refactor(python-sdk): callback subscribers use registry.decode() inst…
lferraz 08eff64
refactor(python-sdk): subscriber_callback methods drop msg_class, pas…
lferraz 34b4e80
fix(python-sdk): remove duplicate heartbeat and fix wrong signature i…
lferraz fd3b4f8
test(python-sdk): update callback tests to use mock registry instead …
lferraz 817d137
style(python-sdk): ruff format subscriber.py and context.py
lferraz 70bb99a
docs(python-sdk): update callback examples to reflect auto-decode via…
lferraz 4803602
refactor(python-sdk): type-annotate handler and registry params in su…
lferraz 9f489d2
test(python-sdk): update tests for merged callback subscriber classes
lferraz 31b6fcc
refactor(python-sdk): merge sync/async callback subscribers into one …
lferraz 5aded5b
docs(python-sdk): update docs for merged callback subscriber classes
lferraz c07c124
refactor(python-sdk): merge sync/async queryable methods into one each
lferraz e173b06
refactor(python-sdk): rename AsyncQueryable → Queryable
lferraz 42311bf
fix(python-sdk): address Copilot review — decode in worker thread, ty…
lferraz 6463f04
fix(python-sdk): address Copilot review round 2
lferraz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| { | ||
| "MD013": false, | ||
| "MD060": false | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| # bubbaloop-sdk (Python) | ||
|
|
||
| Pure Python wrapper over `zenoh-python`. Synchronous API — no asyncio required. | ||
| Mirrors the Rust `bubbaloop-node` SDK surface; nodes written with either SDK are interoperable. | ||
|
|
||
| ## Structure | ||
|
|
||
| ``` | ||
| python-sdk/ | ||
| bubbaloop_sdk/ | ||
| __init__.py # Public API — edit when adding new public names | ||
| context.py # NodeContext: connect(), topic(), publishers, subscribers, queryables | ||
| publisher.py # JsonPublisher, ProtoPublisher (wraps session.declare_publisher) | ||
| subscriber.py # ProtoSubscriber, RawSubscriber, CallbackSubscriber, RawCallbackSubscriber, Queryable | ||
| node.py # run_node() — CLI arg parsing + health heartbeat + lifecycle | ||
| health.py # start_health_heartbeat() — publishes 'ok' every 5s | ||
| discover.py # discover_nodes() — GET bubbaloop/**/health | ||
| get_sample.py # get_sample() — one-shot async subscribe-and-wait | ||
| decode_sample.py # ProtoDecoder — decode zenoh.Sample to protobuf | ||
| tests/ | ||
| test_context.py # 71 unit tests — NO real Zenoh session needed | ||
| pyproject.toml # Build config, deps, ruff/pytest/coverage | ||
| pixi.toml # Dev tasks: test, lint, fmt, check | ||
| ``` | ||
|
|
||
| ## Build & verify | ||
|
|
||
| ```bash | ||
| # With pixi (recommended) | ||
| cd python-sdk | ||
| pixi run check # fmt-check + lint (run before every commit) | ||
| pixi run test # 71 unit tests | ||
| pixi run test-cov # tests + coverage report | ||
|
|
||
| # With venv (alternative) | ||
| cd python-sdk | ||
| .venv/bin/python -m ruff check bubbaloop_sdk/ tests/ | ||
| .venv/bin/python -m pytest tests/ -v | ||
| ``` | ||
|
|
||
| ## Conventions — MUST follow | ||
|
|
||
| **Tooling:** | ||
| - `ruff` for lint + format — NOT flake8, black, or isort directly | ||
| - Config in `pyproject.toml` under `[tool.ruff]` — do NOT add `.flake8` or `setup.cfg` | ||
| - Line length: 120 characters | ||
| - `TYPE_CHECKING` guard for cross-module type annotations — NEVER string-quoted forward refs (`"Foo"`) | ||
|
|
||
| **Type annotations:** | ||
|
|
||
| - Use modern Python 3.11+ union syntax: `X | Y` and `X | None` — NOT `Union[X, Y]` or `Optional[X]` | ||
| - Annotate all public method parameters and return types | ||
| - Annotate class attributes and instance variables when the type is not obvious from the assignment | ||
| - When fixing type errors, follow this hierarchy: | ||
| 1. Add proper type annotations | ||
| 2. Use `X | Y` union syntax or `cast()` from `typing` | ||
| 3. Use `TYPE_CHECKING` for circular imports | ||
| 4. Last resort: `# type: ignore[<error-code>]` with a comment explaining why | ||
| - AVOID `# type: ignore` without an error code — always be specific | ||
|
|
||
| **Docstrings:** | ||
| - Google docstring style for all public modules, classes, and functions | ||
| - Do NOT add a docstring to `__init__()` — document instantiation at the class level instead | ||
| - Include `Args:`, `Returns:`, and `Raises:` sections when applicable | ||
|
|
||
| ```python | ||
| class CallbackSubscriber: | ||
| """Event-driven subscriber that calls a handler on each received message. | ||
|
|
||
| The handler is invoked from Zenoh's internal callback thread by default. | ||
| Pass ``max_workers`` to run the handler in a thread pool instead — use | ||
| this for slow work (I/O, DB writes, HTTP calls). | ||
|
|
||
| Args: | ||
| session: Active Zenoh session. | ||
| topic: Key expression to subscribe to. | ||
| handler: Callable invoked with each decoded message. | ||
| registry: SchemaRegistry for auto-decoding samples by encoding header. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| session: zenoh.Session, | ||
| topic: str, | ||
| handler: Callable, | ||
| registry, | ||
| ): ... | ||
|
|
||
| def undeclare(self) -> None: | ||
| """Undeclare the Zenoh subscriber and release resources.""" | ||
| ``` | ||
|
|
||
| **String formatting:** | ||
|
|
||
| - Use `%`-style formatting for log calls — NOT f-strings: `log.info("Started %s", name)` | ||
| - Reason: lazy evaluation — the string is only formatted if the log level is active | ||
| - Use f-strings everywhere else: `raise ValueError(f"Unknown topic: {topic}")` | ||
|
|
||
| **Imports:** | ||
| - Cross-module type-only imports go under `if TYPE_CHECKING:` at the top of the file | ||
| - Lazy runtime imports (inside method bodies) are kept to avoid circular import issues | ||
| - `__init__.py` must be updated whenever a new public class is added to `subscriber.py` or `publisher.py` | ||
|
|
||
| **Zenoh session:** | ||
| - ALWAYS use `mode: "client"` — peer mode does not route through zenohd | ||
| - NEVER use `.complete(True)` on queryables — blocks wildcard queries like `bubbaloop/**/schema` | ||
| - `query.key_expr` is a **property**, NOT a method — NEVER write `query.key_expr()` | ||
| - `query.reply(query.key_expr, payload_bytes)` — correct reply pattern | ||
|
|
||
| **Threading — critical:** | ||
| - Zenoh uses **one internal thread** for ALL callbacks and queryables on a session | ||
| - A slow handler blocks every other subscriber/queryable until it returns | ||
| - Pass `max_workers=N` to `subscriber_callback` / `subscriber_raw_callback` / `queryable` / `queryable_raw` for any handler that does I/O, DB access, or hardware calls | ||
| - Shutdown order for thread-pool variants: undeclare Zenoh subscriber FIRST, then `executor.shutdown()` — reversing this causes `RuntimeError: cannot schedule new futures after shutdown` | ||
|
|
||
| **`undeclare()` discipline:** | ||
| - Every subscriber, callback subscriber, and queryable must be undeclared when done | ||
| - `Queryable`, `CallbackSubscriber`, and `RawCallbackSubscriber` with `max_workers` own a `ThreadPoolExecutor` — GC alone is not enough, always call `undeclare()` | ||
| - Blocking subscribers (`RawSubscriber`) are undeclared via `undeclare()` too | ||
|
Comment on lines
+110
to
+119
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed — updated to reference |
||
|
|
||
| ## Testing | ||
|
|
||
| Tests do NOT open a real Zenoh session. Use `_make_context()`: | ||
|
|
||
| ```python | ||
| def _make_context(machine_id): | ||
| from bubbaloop_sdk.context import NodeContext | ||
| ctx = object.__new__(NodeContext) | ||
| ctx.session = MagicMock() | ||
| ctx.machine_id = machine_id | ||
| ctx.instance_name = machine_id | ||
| ctx._shutdown = threading.Event() | ||
| return ctx | ||
| ``` | ||
|
|
||
| For async/threaded tests use `threading.Event` with a 2s timeout — do NOT use `time.sleep`: | ||
|
|
||
| ```python | ||
| event = threading.Event() | ||
| def handler(msg): | ||
| received.append(msg) | ||
| event.set() | ||
| assert event.wait(timeout=2.0), "handler not called within 2s" | ||
| ``` | ||
|
|
||
| ## DO / DON'T | ||
|
|
||
| **DO:** `pixi run check` before every commit | add tests when adding public methods | update `__init__.py` and its `__all__` for every new public class | call `undeclare()` in tests that create async subscribers or queryables | ||
|
|
||
| **DON'T:** use `asyncio` — the SDK is synchronous by design | use `query.key_expr()` with parentheses | use `.complete(True)` on queryables | add string forward references (`"Foo"`) — use `TYPE_CHECKING` instead | suppress lint rules globally when a per-file or code-level fix is possible | ||
|
|
||
| ## Pitfalls | ||
|
|
||
| - `B904` — always `raise Foo from err` inside `except` blocks, never bare `raise Foo(...)` | ||
| - `F401` in `__init__.py` is suppressed by ruff config (re-exports are intentional) — do NOT add `# noqa` comments there | ||
| - `CallbackSubscriber` and `RawCallbackSubscriber` without `max_workers` do NOT own an executor — `undeclare()` only calls `_sub.undeclare()`; with `max_workers` they own a `ThreadPoolExecutor` and shut it down in `undeclare()` | ||
| - `ProtoSubscriber` and `RawSubscriber` are iterable (`for msg in sub`); iteration raises `StopIteration` on exception via `_BaseSubscriber.__next__` — prefer `recv(timeout=...)` in shutdown-aware loops to avoid blocking indefinitely | ||
| - `run_node()` reads `config.yaml` by default; override with `-c path/config.yaml`. The `name` field in config sets `instance_name` for health/schema topics — collisions happen if two instances share the same name | ||
| - Health topic format: `bubbaloop/global/{machine_id}/{instance_name}/health` — ensure consumer patterns match exactly | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| # Contributing to bubbaloop-sdk (Python) | ||
|
|
||
| ## Dev environment | ||
|
|
||
| ### With pixi (recommended) | ||
|
|
||
| ```bash | ||
| cd python-sdk | ||
| pixi install # creates env, installs all deps including dev extras | ||
| pixi run test | ||
| pixi run lint | ||
| pixi run fmt | ||
| ``` | ||
|
|
||
| Available tasks: | ||
|
|
||
| | Task | Description | | ||
| |---|---| | ||
| | `pixi run test` | Run test suite | | ||
| | `pixi run test-cov` | Run tests with coverage report | | ||
| | `pixi run lint` | Check for lint errors (ruff) | | ||
| | `pixi run lint-fix` | Auto-fix lint errors | | ||
| | `pixi run fmt` | Format code | | ||
| | `pixi run fmt-check` | Check formatting without changing files | | ||
| | `pixi run check` | Run fmt-check + lint (CI equivalent) | | ||
|
|
||
| ### With plain venv | ||
|
|
||
| ```bash | ||
| cd python-sdk | ||
| python3 -m venv .venv | ||
| .venv/bin/pip install -e ".[dev]" | ||
| .venv/bin/pytest tests/ -v | ||
| .venv/bin/ruff check bubbaloop_sdk/ tests/ | ||
| ``` | ||
|
|
||
| ## Linting (ruff) | ||
|
|
||
| Config lives in `python-sdk/pyproject.toml` under `[tool.ruff]`. | ||
| Follows the same pattern as [kornia/kornia](https://github.com/kornia/kornia). | ||
|
|
||
| Rules enabled: E/W (pycodestyle), F (Pyflakes), I (isort), B (bugbear), | ||
| UP (pyupgrade), C4 (comprehensions), RUF (ruff-specific). | ||
|
|
||
| Line length: 120 characters. | ||
|
|
||
| ## Lint suppressions | ||
|
|
||
| | File | Rule | Reason | | ||
| |---|---|---| | ||
| | `*/__init__.py` | F401, F403 | Re-exports allowed | | ||
| | `tests/*` | S101, D | Assert and missing docstrings allowed in tests | | ||
|
|
||
| ## Testing | ||
|
|
||
| Tests in `tests/test_context.py` do **not** open a real Zenoh session. | ||
| `_make_context()` uses `object.__new__(NodeContext)` + `MagicMock()` — no router needed. | ||
|
|
||
| For async subscriber/queryable tests, `threading.Event` with a 2s timeout | ||
| verifies that handlers are dispatched to the thread pool correctly. | ||
|
|
||
| ## Project structure | ||
|
|
||
| ```text | ||
| python-sdk/ | ||
| pyproject.toml # Build config, deps, ruff/pytest/coverage config | ||
| pixi.toml # Pixi tasks (test, lint, fmt, check) | ||
| README.md # User-facing API docs | ||
| bubbaloop_sdk/ | ||
| __init__.py # Public API surface | ||
| context.py # NodeContext — main entry point | ||
| subscriber.py # ProtoSubscriber, RawSubscriber, CallbackSubscriber, RawCallbackSubscriber, Queryable | ||
| publisher.py # JsonPublisher, ProtoPublisher | ||
| node.py # run_node() helper | ||
| health.py # Health heartbeat (used internally by run_node) | ||
| discover.py # discover_nodes() | ||
| get_sample.py # get_sample() one-shot helper | ||
| decode_sample.py # ProtoDecoder | ||
| tests/ | ||
| test_context.py # 68 unit tests (no real Zenoh required) | ||
| ``` |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.