feat(python-sdk): callback subscribers, queryables, and publisher convenience methods#66
feat(python-sdk): callback subscribers, queryables, and publisher convenience methods#66lferraz wants to merge 56 commits into
Conversation
Add 8 tests covering CallbackSubscriber, RawCallbackSubscriber, CallbackSubscriberAsync, and RawCallbackSubscriberAsync — bytes/proto decode, undeclare, and thread-pool dispatch paths. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…undeclare order in async subscribers
…o NodeContext Adds subscriber_callback(), subscriber_raw_callback(), subscriber_callback_async(), subscriber_raw_callback_async(), queryable(), queryable_raw(), queryable_async(), and queryable_raw_async() factory methods to NodeContext. Includes 9 new tests (41 total). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…g config Add AsyncQueryable, CallbackSubscriber, CallbackSubscriberAsync, RawCallbackSubscriber, RawCallbackSubscriberAsync to __init__.py public API. Add import-surface tests for all new exports. Set max-line-length=120 in .flake8 (project root and python-sdk/). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…bers/queryables Replace stale async-API examples with the actual synchronous API. Document all new callback subscribers (_callback, _callback_async, _raw_callback, _raw_callback_async) and queryable methods (queryable, queryable_raw, queryable_async, queryable_raw_async). All original methods remain unchanged. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Move all Python lint config into pyproject.toml [tool.ruff] sections (line-length=120, select E/W/F/I/B/UP, per-file-ignores for pre-existing issues in upstream files). Remove .flake8 files from repo root and python-sdk/. Auto-fix isort ordering in __init__.py, test_context.py, decode_sample.py via `ruff check --fix`. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…sting notes Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…mports - Add python-sdk/pixi.toml with test/lint/fmt/check tasks (mirrors Kornia) - pyproject.toml: add readme, license, authors, classifiers, project.urls - pyproject.toml: add pytest-cov to dev deps; add [tool.coverage.*] - pyproject.toml: add [tool.ruff.format], [tool.ruff.lint.isort] - pyproject.toml: glob per-file-ignores for */__init__.py and tests/* - pyproject.toml: add C4/RUF rules; move B904 to global ignore - Remove unused `time` from health.py, `os`/`time` from node.py - Update CONTRIBUTING.md with pixi tasks and corrected suppressions table Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Move F821 from global ignore to per-file-ignores for context.py. The rule is only needed there (lazy imports + forward-reference string annotations), not project-wide. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ignore Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Disable MD013 (line-length) and MD060 (table-column-style) — both are stylistic and conflict with wide API tables. Fix fenced code block language tag in CONTRIBUTING.md (text). Update lint suppressions table to remove resolved B904 entry. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…xt.py
Add TYPE_CHECKING guard at module level to import publisher/subscriber
types for annotations only. Replace forward-reference strings ("Foo")
with direct type names (Foo) in all method signatures. Remove F821
per-file suppression from pyproject.toml and the corresponding entry
from CONTRIBUTING.md — no suppressions needed now.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tfalls Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…methods Cover previously untested NodeContext surface: - publisher_json/proto: verify topic() prefix is applied - subscriber/subscriber_raw: verify topic() prefix / literal key expr - close(): calls session.close() - context manager: __exit__ calls close() - connect(): BUBBALOOP_SCOPE, BUBBALOOP_MACHINE_ID env vars + defaults + instance_name override - TypedSubscriber/RawSubscriber: undeclare() and iteration 61 tests total (up from 48). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Without this, TYPE_CHECKING-only imports (JsonPublisher, TypedSubscriber, etc.) are evaluated at class definition time on Python ≤3.13, causing NameError. Python 3.14 made annotations lazy by default so tests passed locally but failed on CI (Python 3.12). from __future__ import annotations defers all annotation evaluation to string form on all supported versions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Review Summary by QodoAdd callback subscribers, async queryables, and queue-backed blocking subscribers with thread pool support
WalkthroughsDescription• Add callback-based and async subscribers with thread pool support • Implement queryable factory methods on NodeContext with async variants • Convert blocking subscribers to queue-backed with optional timeout • Export new subscriber/queryable classes and add comprehensive test coverage • Expand project metadata, linting config, and developer documentation Diagramflowchart LR
A["Blocking Subscribers<br/>TypedSubscriber/RawSubscriber"] -->|"queue-backed<br/>with timeout"| B["recv timeout support"]
C["Callback Subscribers<br/>CallbackSubscriber/RawCallbackSubscriber"] -->|"Zenoh thread"| D["Event-driven handlers"]
E["Async Subscribers<br/>CallbackSubscriberAsync/RawCallbackSubscriberAsync"] -->|"ThreadPoolExecutor"| F["Non-blocking handlers"]
G["Queryables<br/>queryable/queryable_raw"] -->|"Zenoh thread"| H["Query handlers"]
I["Async Queryables<br/>AsyncQueryable"] -->|"ThreadPoolExecutor"| J["Slow query handlers"]
K["NodeContext"] -->|"factory methods"| A
K -->|"factory methods"| C
K -->|"factory methods"| E
K -->|"factory methods"| G
K -->|"factory methods"| I
File Changes1. python-sdk/bubbaloop_sdk/subscriber.py
|
Code Review by Qodo
1. CI runs ruff/pytest directly
|
There was a problem hiding this comment.
Pull request overview
This PR expands the Python SDK’s synchronous Zenoh integration by adding callback-based subscribers, async (thread-pooled) callback variants, and queryable helpers on NodeContext, plus accompanying docs/tooling/CI updates.
Changes:
- Add queue-backed blocking subscribers with
recv(timeout)plus new callback + async-callback subscriber classes andAsyncQueryable. - Extend
NodeContextwithsubscriber_*callback*andqueryable*_asynchelpers; export new public API from__init__.py. - Update README/tooling (ruff/pytest-cov/pixi), add contributing/conventions docs, and add Python lint/test steps to CI.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| python-sdk/bubbaloop_sdk/subscriber.py | Implements queue-backed recv(timeout), callback subscribers, async callback subscribers, and AsyncQueryable. |
| python-sdk/bubbaloop_sdk/context.py | Adds TYPE_CHECKING imports and new NodeContext helper methods for callback subscribers and queryables. |
| python-sdk/bubbaloop_sdk/init.py | Re-exports newly added public classes via imports and __all__. |
| python-sdk/tests/test_context.py | Adds/extends tests for new subscriber/queryable variants and public import surface. |
| python-sdk/README.md | Rewrites docs to reflect synchronous API and documents new callback/queryable APIs. |
| python-sdk/pyproject.toml | Adds richer project metadata + ruff/pytest/coverage configuration and dev dependencies. |
| python-sdk/pixi.toml | Adds pixi workspace config and dev tasks (test/lint/fmt/check). |
| python-sdk/CONTRIBUTING.md | Documents local dev workflow (pixi/venv), linting, and test patterns. |
| python-sdk/CLAUDE.md | Captures repository conventions and Zenoh/threading pitfalls for contributors. |
| python-sdk/bubbaloop_sdk/get_sample.py | Fixes exception chaining on timeout (raise ... from err). |
| python-sdk/bubbaloop_sdk/decode_sample.py | Splits protobuf imports for formatting/lint friendliness. |
| python-sdk/bubbaloop_sdk/node.py | Removes unused imports. |
| python-sdk/bubbaloop_sdk/health.py | Removes unused import. |
| .github/workflows/ci.yml | Adds Python SDK lint + test steps. |
| .markdownlint.json | Adds markdownlint configuration overrides. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Bump requires-python to >=3.10 (PEP 604 union syntax used throughout) - Remove upper bound from pixi.toml python constraint (was arbitrarily <3.14) - Update ruff target-version to py310; drop py3.9 classifier - subscriber.py: move proto decode from Zenoh callback into recv() so FromString() runs on the consumer thread, not Zenoh's internal thread - subscriber.py: push _CLOSED sentinel on undeclare() to unblock recv(timeout=None) - subscriber.py: fix docstring — callbacks are serial not concurrent; recommend _async - subscriber.py: add _closing flag + try/except RuntimeError in _wrap() to handle race between submit() and executor.shutdown() in *Async classes and AsyncQueryable - subscriber.py: use shutdown(cancel_futures=True) (Python 3.9+) - ci.yml: invoke ruff/pytest via python -m to stay within allowlisted command prefixes Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…code thread - test_typed/raw_subscriber_undeclare_unblocks_recv: verify undeclare() pushes _CLOSED sentinel so recv(timeout=None) returns None immediately - test_typed_subscriber_decode_happens_in_recv_not_callback: verify FromString runs on the consumer thread, not the Zenoh callback thread - test_*_async_drops_after_undeclare (x3): verify _closing flag prevents handler from being called after undeclare() for CallbackSubscriberAsync, RawCallbackSubscriberAsync, and AsyncQueryable 67 tests total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… key spaces
- Remove scope parameter from _make_context() — NodeContext no longer has scope
- Update topic assertions: bubbaloop/{scope}/... → bubbaloop/global/...
- Remove test_connect_reads_scope_from_env and test_connect_defaults_scope_to_local
- Fix test_raw_subscriber_recv_returns_sample: recv() now returns bytes, not Sample
- Remove stale BUBBALOOP_SCOPE monkeypatch.delenv() calls from connect tests
- Rename test_topic_default_scope → test_topic_uses_global_prefix
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…lback-subscriber-queryable # Conflicts: # python-sdk/README.md # python-sdk/bubbaloop_sdk/context.py # python-sdk/bubbaloop_sdk/subscriber.py # python-sdk/tests/test_context.py
Delete the TypedSubscriber class and its dead imports (queue, time, _CLOSED sentinel, _POLL_INTERVAL constant). ProtoSubscriber via SchemaRegistry is the replacement. __init__.py and tests will be updated in follow-up tasks. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Drop the deprecated TypedSubscriber import from __init__.py, remove it from the TYPE_CHECKING block in context.py, delete the old subscriber() and subscriber_raw() methods, and remove the stale "next 2 functions" comment. subscribe() and subscribe_raw() are now the primary methods. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…dempotent undeclare CallbackSubscriber, RawCallbackSubscriber, CallbackSubscriberAsync, and RawCallbackSubscriberAsync now inherit _BaseSubscriber for a unified idempotent undeclare() pattern. AsyncQueryable gains its own _undeclared guard. Two new tests verify the double-undeclare no-op for callback classes. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Remove all TypedSubscriber tests (9 functions) and two tests for the removed ctx.subscriber()/ctx.subscriber_raw() methods. Update test_import_subscribers to drop the TypedSubscriber import assertion. Rename section headers that previously mentioned both TypedSubscriber and RawSubscriber to mention only RawSubscriber. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…r docs Replace TypedSubscriber with ProtoSubscriber/CallbackSubscriber in all doc files; remove deprecated ctx.subscriber() and ctx.subscriber_raw() rows from README API table; update CONTRIBUTING test count to 68. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ead of msg_class Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…s schema_registry Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…n run_node The heartbeat was started twice — once with the correct signature (return value discarded) and once with the old signature including ctx.scope, which would crash at runtime. Keep a single call inside the try block so the thread is properly joined on shutdown. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…of msg_class Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… SchemaRegistry Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…bscriber classes Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace CallbackSubscriberAsync/RawCallbackSubscriberAsync references with CallbackSubscriber/RawCallbackSubscriber using max_workers=4, and update subscriber_callback_async/subscriber_raw_callback_async calls to use the merged max_workers parameter form. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…class each CallbackSubscriber(max_workers=None) replaces CallbackSubscriber + CallbackSubscriberAsync. RawCallbackSubscriber(max_workers=None) replaces RawCallbackSubscriber + RawCallbackSubscriberAsync. None = handler runs on Zenoh's thread (fast path). int = handler runs in a ThreadPoolExecutor. 6 subscriber classes → 4. 4 NodeContext methods → 2. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace _async variant references with max_workers parameter pattern. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 17 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
queryable(suffix, handler, max_workers=None) replaces queryable + queryable_async. queryable_raw(key_expr, handler, max_workers=None) replaces queryable_raw + queryable_raw_async. AsyncQueryable now supports both modes: None = handler on Zenoh thread, int = thread pool. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The class now supports both sync and thread-pool modes via max_workers, so the Async prefix no longer applies. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…pe annotations, cleanup - Move registry.decode() into executor.submit() lambda so decode runs in the worker thread, not on Zenoh's callback thread (avoids blocking on first schema fetch) - Add Callable type annotations to handler params in context.py - Remove duplicate test_import_callback_subscribers_with_workers Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 17 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def test_raw_subscriber_undeclare_calls_sub_undeclare(): | ||
| """undeclare() calls undeclare on the underlying zenoh subscriber.""" | ||
| from bubbaloop_sdk.subscriber import RawSubscriber | ||
|
|
||
| mock_sub = MagicMock() | ||
| mock_session = MagicMock() | ||
| mock_session.declare_subscriber.return_value = mock_sub | ||
| sub = RawSubscriber(mock_session, "test/topic") | ||
| sub.undeclare() | ||
| mock_sub.undeclare.assert_called_once() | ||
|
|
||
|
|
There was a problem hiding this comment.
Fixed — removed the duplicate test_raw_subscriber_undeclare_calls_sub_undeclare.
| Use `queryable_async` when the handler does slow work: | ||
|
|
||
| ```python | ||
| from bubbaloop_sdk.schema import declare_schema_queryable | ||
| from my_protos_pb2 import SensorData | ||
|
|
||
| # Declare once — keeps the queryable alive while the reference is held | ||
| schema_qbl = declare_schema_queryable( | ||
| ctx.session, ctx.machine_id, "my-node", SensorData | ||
| ) | ||
| qbl = ctx.queryable_async("status", on_query) | ||
| qbl.undeclare() # call when done to release the thread pool | ||
| ``` |
There was a problem hiding this comment.
Fixed — example now uses ctx.queryable("status", on_query, max_workers=4).
| | `ctx.queryable(suffix, handler)` | Handler at `topic(suffix)` | | ||
| | `ctx.queryable_raw(key_expr, handler)` | Handler at literal key expression | | ||
| | `ctx.queryable_async(suffix, handler, max_workers=4)` | Handler in thread pool | | ||
| | `ctx.queryable_raw_async(key_expr, handler, max_workers=4)` | Raw key; handler in thread pool | |
There was a problem hiding this comment.
Fixed — API table now shows queryable(suffix, handler, max_workers=None) and queryable_raw(key_expr, handler, max_workers=None).
| **Threading — critical:** | ||
| - Zenoh uses **one internal thread** for ALL callbacks and queryables on a session | ||
| - A slow handler blocks every other subscriber/queryable until it returns | ||
| - Pass `max_workers=N` to `subscriber_callback` / `subscriber_raw_callback` or use `queryable_async` for any handler that does I/O, DB access, or hardware calls | ||
| - Shutdown order for thread-pool variants: undeclare Zenoh subscriber FIRST, then `executor.shutdown()` — reversing this causes `RuntimeError: cannot schedule new futures after shutdown` | ||
|
|
||
| **`undeclare()` discipline:** | ||
| - Every subscriber, callback subscriber, and queryable must be undeclared when done | ||
| - `AsyncQueryable` and `*Async` subscribers own a `ThreadPoolExecutor` — GC alone is not enough, always call `undeclare()` | ||
| - Blocking subscribers (`RawSubscriber`) are undeclared via `undeclare()` too |
There was a problem hiding this comment.
Fixed — updated to reference max_workers pattern and Queryable class name.
| def publisher_proto(self, suffix: str, msg_class=None) -> ProtoPublisher: | ||
| """Declare a protobuf publisher at ``topic(suffix)``.""" | ||
| from .publisher import ProtoPublisher | ||
|
|
||
| type_name = msg_class.DESCRIPTOR.full_name if msg_class is not None else None |
There was a problem hiding this comment.
Fixed — annotated as msg_class: type | None = None.
| assert RawCallbackSubscriber is not None | ||
|
|
||
|
|
||
| def test_import_async_queryable(): |
There was a problem hiding this comment.
Fixed — renamed to test_import_queryable.
| pub.put({"temperature": 22.5, "humidity": 60}) | ||
| time.sleep(1.0) | ||
| for msg in sub: # auto-decoded: proto, dict, or bytes | ||
| print(f"value: {msg.value}") |
There was a problem hiding this comment.
Fixed — example now uses print(msg) instead of msg.value.
| def on_sensor(msg): | ||
| print(f"received: {msg.value}") | ||
|
|
||
| sub = ctx.subscriber_callback("sensor/data", on_sensor) | ||
| ctx.wait_shutdown() # block until SIGINT/SIGTERM |
There was a problem hiding this comment.
Fixed — example now uses print(f"received: {msg}") with a comment explaining the return type varies.
| bubbaloop_sdk/ | ||
| __init__.py # Public API surface | ||
| context.py # NodeContext — main entry point | ||
| subscriber.py # ProtoSubscriber, RawSubscriber, Callback*, Async*, AsyncQueryable |
There was a problem hiding this comment.
Fixed — updated to list actual class names: ProtoSubscriber, RawSubscriber, CallbackSubscriber, RawCallbackSubscriber, Queryable.
- Fix stale queryable_async/AsyncQueryable refs in README, CLAUDE.md, CONTRIBUTING.md - Fix README examples: msg.value → msg (auto-decode returns proto/dict/bytes) - Rename test_import_async_queryable → test_import_queryable - Remove duplicate test_raw_subscriber_undeclare_calls_sub_undeclare - Type-annotate msg_class in publisher_proto() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Adds callback subscribers, queryables, and publisher convenience methods to
NodeContext, completing the Python SDK's API surface for event-driven nodes.Callback subscribers
Event-driven pattern — no polling loop needed. The handler is called each time a message arrives, auto-decoded via the existing
SchemaRegistry(proto, JSON, or raw bytes). Passmax_workersto offload slow handlers to a thread pool:Queryables
Respond to Zenoh GET requests. Same
max_workerspattern:Publisher convenience methods
NodeContextnow exposespublisher_json(),publisher_proto(), andpublisher_raw()for declaring publishers with the correct encoding.New classes
CallbackSubscriberRawCallbackSubscriberzenoh.Sample, optional thread poolQueryableAll inherit
_BaseSubscriber(exceptQueryable) with idempotentundeclare().New NodeContext methods
publisher_json(suffix)publisher_proto(suffix, msg_class)publisher_raw(suffix, local=False)subscriber_callback(suffix, handler, max_workers=None)subscriber_raw_callback(key_expr, handler, max_workers=None)queryable(suffix, handler, max_workers=None)queryable_raw(key_expr, handler, max_workers=None)Other changes
CLAUDE.mdandCONTRIBUTING.mdwith SDK development conventionspixi.toml+pixi.lockfor reproducible dev environmentrun_node()shutdown with heartbeat join and try/finallyTest plan
pixi run python -m pytest tests/ -v -p no:randomly)pixi run check)🤖 Generated with Claude Code