Skip to content

feat(python-sdk): callback subscribers, queryables, and publisher convenience methods#66

Draft
lferraz wants to merge 56 commits into
kornia:mainfrom
lferraz:feat/python-sdk-callback-subscriber-queryable
Draft

feat(python-sdk): callback subscribers, queryables, and publisher convenience methods#66
lferraz wants to merge 56 commits into
kornia:mainfrom
lferraz:feat/python-sdk-callback-subscriber-queryable

Conversation

@lferraz

@lferraz lferraz commented Apr 5, 2026

Copy link
Copy Markdown
Member

Summary

Adds callback subscribers, queryables, and publisher convenience methods to NodeContext, completing the Python SDK's API surface for event-driven nodes.

Callback subscribers

Event-driven pattern — no polling loop needed. The handler is called each time a message arrives, auto-decoded via the existing SchemaRegistry (proto, JSON, or raw bytes). Pass max_workers to offload slow handlers to a thread pool:

# Fast handler — runs on Zenoh's thread (default)
sub = ctx.subscriber_callback("sensor/data", on_msg)

# Slow handler (DB, HTTP) — runs in a thread pool
sub = ctx.subscriber_callback("sensor/data", on_msg, max_workers=4)

# Raw variant — handler receives zenoh.Sample directly
sub = ctx.subscriber_raw_callback("bubbaloop/**/health", on_health)

Queryables

Respond to Zenoh GET requests. Same max_workers pattern:

qbl = ctx.queryable("status", on_query)
qbl = ctx.queryable("status", on_query, max_workers=4)  # slow handler
qbl = ctx.queryable_raw("bubbaloop/**/schema", on_schema)  # literal key expr

Publisher convenience methods

NodeContext now exposes publisher_json(), publisher_proto(), and publisher_raw() for declaring publishers with the correct encoding.

New classes

Class Description
CallbackSubscriber Event-driven, auto-decode, optional thread pool
RawCallbackSubscriber Event-driven, raw zenoh.Sample, optional thread pool
Queryable Queryable handler, optional thread pool

All inherit _BaseSubscriber (except Queryable) with idempotent undeclare().

New NodeContext methods

Method Description
publisher_json(suffix) JSON publisher
publisher_proto(suffix, msg_class) Protobuf publisher
publisher_raw(suffix, local=False) Raw bytes publisher
subscriber_callback(suffix, handler, max_workers=None) Callback with auto-decode
subscriber_raw_callback(key_expr, handler, max_workers=None) Callback with raw Sample
queryable(suffix, handler, max_workers=None) Queryable at topic prefix
queryable_raw(key_expr, handler, max_workers=None) Queryable at literal key expr

Other changes

  • Added CLAUDE.md and CONTRIBUTING.md with SDK development conventions
  • Added pixi.toml + pixi.lock for reproducible dev environment
  • Robust run_node() shutdown with heartbeat join and try/finally
  • 68 unit tests (from 23 in upstream), all passing deterministically

Test plan

  • 68 unit tests passing (pixi run python -m pytest tests/ -v -p no:randomly)
  • Lint clean (pixi run check)
  • All public imports verified

🤖 Generated with Claude Code

lferraz and others added 17 commits April 5, 2026 13:35
Add 8 tests covering CallbackSubscriber, RawCallbackSubscriber,
CallbackSubscriberAsync, and RawCallbackSubscriberAsync — bytes/proto
decode, undeclare, and thread-pool dispatch paths.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…o NodeContext

Adds subscriber_callback(), subscriber_raw_callback(), subscriber_callback_async(),
subscriber_raw_callback_async(), queryable(), queryable_raw(), queryable_async(), and
queryable_raw_async() factory methods to NodeContext. Includes 9 new tests (41 total).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…g config

Add AsyncQueryable, CallbackSubscriber, CallbackSubscriberAsync,
RawCallbackSubscriber, RawCallbackSubscriberAsync to __init__.py public
API. Add import-surface tests for all new exports. Set max-line-length=120
in .flake8 (project root and python-sdk/).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…bers/queryables

Replace stale async-API examples with the actual synchronous API.
Document all new callback subscribers (_callback, _callback_async,
_raw_callback, _raw_callback_async) and queryable methods
(queryable, queryable_raw, queryable_async, queryable_raw_async).
All original methods remain unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Move all Python lint config into pyproject.toml [tool.ruff] sections
(line-length=120, select E/W/F/I/B/UP, per-file-ignores for pre-existing
issues in upstream files). Remove .flake8 files from repo root and
python-sdk/. Auto-fix isort ordering in __init__.py, test_context.py,
decode_sample.py via `ruff check --fix`.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…sting notes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…mports

- Add python-sdk/pixi.toml with test/lint/fmt/check tasks (mirrors Kornia)
- pyproject.toml: add readme, license, authors, classifiers, project.urls
- pyproject.toml: add pytest-cov to dev deps; add [tool.coverage.*]
- pyproject.toml: add [tool.ruff.format], [tool.ruff.lint.isort]
- pyproject.toml: glob per-file-ignores for */__init__.py and tests/*
- pyproject.toml: add C4/RUF rules; move B904 to global ignore
- Remove unused `time` from health.py, `os`/`time` from node.py
- Update CONTRIBUTING.md with pixi tasks and corrected suppressions table

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Move F821 from global ignore to per-file-ignores for context.py.
The rule is only needed there (lazy imports + forward-reference
string annotations), not project-wide.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ignore

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Disable MD013 (line-length) and MD060 (table-column-style) — both are
stylistic and conflict with wide API tables. Fix fenced code block
language tag in CONTRIBUTING.md (text). Update lint suppressions table
to remove resolved B904 entry.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…xt.py

Add TYPE_CHECKING guard at module level to import publisher/subscriber
types for annotations only. Replace forward-reference strings ("Foo")
with direct type names (Foo) in all method signatures. Remove F821
per-file suppression from pyproject.toml and the corresponding entry
from CONTRIBUTING.md — no suppressions needed now.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tfalls

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@lferraz lferraz changed the title Feat/python sdk callback subscriber queryable feat(python-sdk): callback subscribers, user queryables, async variants Apr 5, 2026
lferraz and others added 2 commits April 5, 2026 19:03
…methods

Cover previously untested NodeContext surface:
- publisher_json/proto: verify topic() prefix is applied
- subscriber/subscriber_raw: verify topic() prefix / literal key expr
- close(): calls session.close()
- context manager: __exit__ calls close()
- connect(): BUBBALOOP_SCOPE, BUBBALOOP_MACHINE_ID env vars + defaults + instance_name override
- TypedSubscriber/RawSubscriber: undeclare() and iteration

61 tests total (up from 48).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Without this, TYPE_CHECKING-only imports (JsonPublisher, TypedSubscriber,
etc.) are evaluated at class definition time on Python ≤3.13, causing
NameError. Python 3.14 made annotations lazy by default so tests passed
locally but failed on CI (Python 3.12). from __future__ import annotations
defers all annotation evaluation to string form on all supported versions.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@lferraz lferraz marked this pull request as ready for review April 5, 2026 18:38
Copilot AI review requested due to automatic review settings April 5, 2026 18:38
@lferraz lferraz marked this pull request as draft April 5, 2026 18:38
@qodo-code-review

Copy link
Copy Markdown

Review Summary by Qodo

Add callback subscribers, async queryables, and queue-backed blocking subscribers with thread pool support

✨ Enhancement 🧪 Tests

Grey Divider

Walkthroughs

Description
• Add callback-based and async subscribers with thread pool support
• Implement queryable factory methods on NodeContext with async variants
• Convert blocking subscribers to queue-backed with optional timeout
• Export new subscriber/queryable classes and add comprehensive test coverage
• Expand project metadata, linting config, and developer documentation
Diagram
flowchart LR
  A["Blocking Subscribers<br/>TypedSubscriber/RawSubscriber"] -->|"queue-backed<br/>with timeout"| B["recv timeout support"]
  C["Callback Subscribers<br/>CallbackSubscriber/RawCallbackSubscriber"] -->|"Zenoh thread"| D["Event-driven handlers"]
  E["Async Subscribers<br/>CallbackSubscriberAsync/RawCallbackSubscriberAsync"] -->|"ThreadPoolExecutor"| F["Non-blocking handlers"]
  G["Queryables<br/>queryable/queryable_raw"] -->|"Zenoh thread"| H["Query handlers"]
  I["Async Queryables<br/>AsyncQueryable"] -->|"ThreadPoolExecutor"| J["Slow query handlers"]
  K["NodeContext"] -->|"factory methods"| A
  K -->|"factory methods"| C
  K -->|"factory methods"| E
  K -->|"factory methods"| G
  K -->|"factory methods"| I
Loading

Grey Divider

File Changes

1. python-sdk/bubbaloop_sdk/subscriber.py ✨ Enhancement +224/-23

Implement callback and async subscriber variants with thread pools

python-sdk/bubbaloop_sdk/subscriber.py


2. python-sdk/bubbaloop_sdk/context.py ✨ Enhancement +123/-5

Add factory methods for callback/async subscribers and queryables

python-sdk/bubbaloop_sdk/context.py


3. python-sdk/bubbaloop_sdk/__init__.py ✨ Enhancement +15/-2

Export new subscriber and queryable classes in public API

python-sdk/bubbaloop_sdk/init.py


View more (17)
4. python-sdk/tests/test_context.py 🧪 Tests +681/-3

Add 48 unit tests for all subscriber and queryable variants

python-sdk/tests/test_context.py


5. python-sdk/pyproject.toml ⚙️ Configuration changes +90/-1

Expand metadata, add ruff/pytest/coverage configuration

python-sdk/pyproject.toml


6. python-sdk/pixi.toml ⚙️ Configuration changes +22/-0

Add pixi development tasks for test, lint, format, check

python-sdk/pixi.toml


7. python-sdk/README.md 📝 Documentation +142/-76

Rewrite with synchronous API examples and comprehensive reference

python-sdk/README.md


8. python-sdk/CONTRIBUTING.md 📝 Documentation +81/-0

Add contributor guide with pixi tasks and lint configuration

python-sdk/CONTRIBUTING.md


9. python-sdk/CLAUDE.md 📝 Documentation +110/-0

Add conventions, threading model, and pitfalls documentation

python-sdk/CLAUDE.md


10. python-sdk/get_sample.py 🐞 Bug fix +0/-0

Fix exception chaining with B904 raise-from pattern

python-sdk/get_sample.py


11. python-sdk/decode_sample.py Formatting +0/-0

Reorganize imports for clarity and consistency

python-sdk/decode_sample.py


12. python-sdk/health.py Miscellaneous +0/-0

Remove unused time import

python-sdk/health.py


13. python-sdk/node.py Miscellaneous +0/-0

Remove unused imports

python-sdk/node.py


14. python-sdk/context.py ✨ Enhancement +0/-0

Add TYPE_CHECKING guard for cross-module type annotations

python-sdk/context.py


15. .github/workflows/ci.yml ⚙️ Configuration changes +11/-0

Add Python SDK lint and test steps to CI pipeline

.github/workflows/ci.yml


16. .markdownlint.json ⚙️ Configuration changes +4/-0

Add markdown linting configuration at repository root

.markdownlint.json


17. python-sdk/bubbaloop_sdk/decode_sample.py Additional files +2/-1

...

python-sdk/bubbaloop_sdk/decode_sample.py


18. python-sdk/bubbaloop_sdk/get_sample.py Additional files +2/-2

...

python-sdk/bubbaloop_sdk/get_sample.py


19. python-sdk/bubbaloop_sdk/health.py Additional files +0/-1

...

python-sdk/bubbaloop_sdk/health.py


20. python-sdk/bubbaloop_sdk/node.py Additional files +0/-2

...

python-sdk/bubbaloop_sdk/node.py


Grey Divider

Qodo Logo

@qodo-code-review

qodo-code-review Bot commented Apr 5, 2026

Copy link
Copy Markdown

Code Review by Qodo

🐞 Bugs (3) 📘 Rule violations (1) 📎 Requirement gaps (0) 🎨 UX Issues (0)

Grey Divider


Action required

1. CI runs ruff/pytest directly 📘 Rule violation ⛨ Security
Description
The new CI steps execute ruff and pytest as top-level commands, which are not in the allowlisted
command prefixes. This violates the build-command execution restriction and could broaden the set of
runnable commands beyond the approved tooling prefixes.
Code

.github/workflows/ci.yml[R53-62]

+      - name: Python SDK — lint
+        run: |
+          cd python-sdk
+          pip install -e ".[dev]" -q
+          ruff check bubbaloop_sdk/ tests/
+
+      - name: Python SDK — test
+        run: |
+          cd python-sdk
+          pytest tests/ -v
Evidence
PR Compliance ID 10 restricts executed build commands to allowlisted prefixes (cargo, pixi,
npm, make, python, pip). The added workflow steps invoke ruff and pytest directly (not
via python -m ... or pixi run ...).

CLAUDE.md
.github/workflows/ci.yml[53-62]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
The CI workflow runs `ruff` and `pytest` directly, which is outside the allowlisted command prefixes required by compliance.

## Issue Context
Compliance requires executed build/test commands to use one of these prefixes only: `cargo`, `pixi`, `npm`, `make`, `python`, `pip`. In CI, `ruff`/`pytest` should be invoked through `python -m ruff ...` / `python -m pytest ...` (or via `pixi run ...`) so the executed command prefix is allowlisted.

## Fix Focus Areas
- .github/workflows/ci.yml[53-62]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Submit after shutdown 🐞 Bug ☼ Reliability
Description
CallbackSubscriberAsync/RawCallbackSubscriberAsync/AsyncQueryable can call
ThreadPoolExecutor.submit() from a Zenoh callback while undeclare() is concurrently shutting the
executor down, raising RuntimeError("cannot schedule new futures after shutdown") in Zenoh’s
callback thread. This can break callback delivery during shutdown and potentially affect other
callbacks/queryables sharing the same Zenoh session thread.
Code

python-sdk/bubbaloop_sdk/subscriber.py[R179-196]

+    def __init__(self, session: zenoh.Session, topic: str, handler, msg_class=None, max_workers: int = 4):
+        self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
+
+        def _wrap(sample: zenoh.Sample) -> None:
+            payload = bytes(sample.payload.to_bytes())
+            if msg_class is not None and hasattr(msg_class, "FromString"):
+                msg = msg_class.FromString(payload)
+            else:
+                msg = payload
+            self._executor.submit(handler, msg)
+
+        self._sub = session.declare_subscriber(topic, _wrap)
+
+    def undeclare(self) -> None:
+        """Undeclare the subscriber and shutdown the thread pool."""
+        self._sub.undeclare()  # stop Zenoh callbacks first
+        self._executor.shutdown(wait=False)
+
Evidence
The Zenoh callback wrapper submits work to the executor, while undeclare() shuts the executor down;
submit() after shutdown deterministically raises RuntimeError in Python, and there is no
guard/try-except to prevent this if a callback is in-flight during undeclare(). The same pattern
exists in all three async variants.

python-sdk/bubbaloop_sdk/subscriber.py[179-196]
python-sdk/bubbaloop_sdk/subscriber.py[207-219]
python-sdk/bubbaloop_sdk/subscriber.py[244-255]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`CallbackSubscriberAsync`, `RawCallbackSubscriberAsync`, and `AsyncQueryable` submit to a `ThreadPoolExecutor` from Zenoh callbacks while `undeclare()` can concurrently call `executor.shutdown()`. If an in-flight callback races with shutdown, `executor.submit(...)` raises `RuntimeError: cannot schedule new futures after shutdown`, potentially surfacing on Zenoh’s internal thread.

### Issue Context
Even though `undeclare()` calls `_sub.undeclare()`/`_qbl.undeclare()` first, this does not guarantee no callback is currently executing; an in-flight callback can still attempt to submit after shutdown begins.

### Fix Focus Areas
- python-sdk/bubbaloop_sdk/subscriber.py[179-196]
- python-sdk/bubbaloop_sdk/subscriber.py[207-219]
- python-sdk/bubbaloop_sdk/subscriber.py[244-255]

### Suggested approach
- Add a `_closing` flag (e.g., `threading.Event` or boolean protected by a lock).
- In `_wrap`, return early if closing.
- Wrap `self._executor.submit(...)` in `try/except RuntimeError` and drop/log if shutting down.
- Consider `shutdown(cancel_futures=True)` (Python 3.9+) so queued-but-not-started tasks don’t keep the process alive during shutdown.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

3. Blocking recv can hang 🐞 Bug ☼ Reliability
Description
TypedSubscriber/RawSubscriber block on queue.get(timeout=None) but undeclare() only undeclares
the Zenoh subscriber and does not wake any thread currently blocked in recv()/iteration. If a
caller uses recv() without a timeout (or iterates) and then tries to stop via undeclare(), that
thread can remain blocked indefinitely.
Code

python-sdk/bubbaloop_sdk/subscriber.py[R38-56]

+    def recv(self, timeout: float | None = None):
+        """Block until the next message arrives. Returns ``None`` on timeout."""
+        try:
+            return self._queue.get(timeout=timeout)
+        except queue.Empty:
+            return None

    def __iter__(self):
        return self

    def __next__(self):
-        try:
-            return self.recv()
-        except Exception as exc:
-            raise StopIteration from exc
+        msg = self.recv()
+        if msg is None:
+            raise StopIteration
+        return msg

    def undeclare(self) -> None:
+        """Undeclare the subscriber and stop receiving samples."""
        self._sub.undeclare()
Evidence
Both subscribers use queue.Queue().get(...) as the blocking primitive; undeclare() does not
enqueue a sentinel or otherwise signal the queue, so a blocked consumer has no wake-up path once no
further samples arrive after undeclare().

python-sdk/bubbaloop_sdk/subscriber.py[25-57]
python-sdk/bubbaloop_sdk/subscriber.py[66-93]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`TypedSubscriber.recv()` / `RawSubscriber.recv()` block on `queue.get(timeout=None)` and `undeclare()` doesn’t wake a blocked consumer. This can leave threads stuck during shutdown if users call `recv()` without a timeout or rely on iteration.

### Issue Context
The PR introduces `recv(timeout=...)` for shutdown-aware loops, but the classes still expose a blocking mode (`timeout=None`) and `__next__` uses that mode.

### Fix Focus Areas
- python-sdk/bubbaloop_sdk/subscriber.py[38-57]
- python-sdk/bubbaloop_sdk/subscriber.py[74-93]

### Suggested approach
- Add a private sentinel object (e.g., `_CLOSED = object()`).
- In `undeclare()`, set a `_closed` flag and `put(_CLOSED)` (or `put_nowait`) to unblock one waiter; optionally drain/wake multiple waiters.
- In `recv()`, if sentinel received, return `None` (or raise `StopIteration` in `__next__`).
- Document behavior clearly (what `recv()` returns after undeclare).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


4. Unbounded backlog growth 🐞 Bug ➹ Performance
Description
Queue-backed subscribers and async callback/queryable variants have no backpressure: queue.Queue()
is unbounded and ThreadPoolExecutor’s work queue is unbounded, so sustained
producer>consumer/handler throughput can cause unbounded memory growth (and eventual OOM).
Code

python-sdk/bubbaloop_sdk/subscriber.py[R25-43]

    def __init__(self, session: zenoh.Session, topic: str, msg_class=None):
-        self._sub = session.declare_subscriber(topic)
+        self._queue: queue.Queue = queue.Queue()
        self._msg_class = msg_class

-    def recv(self):
-        """Block until the next sample arrives and return the decoded message."""
-        sample = self._sub.recv()
-        payload = bytes(sample.payload.to_bytes())
-        if self._msg_class is not None and hasattr(self._msg_class, "FromString"):
-            return self._msg_class.FromString(payload)
-        return payload
+        def _on_sample(sample: zenoh.Sample) -> None:
+            payload = bytes(sample.payload.to_bytes())
+            if self._msg_class is not None and hasattr(self._msg_class, "FromString"):
+                self._queue.put(self._msg_class.FromString(payload))
+            else:
+                self._queue.put(payload)
+
+        self._sub = session.declare_subscriber(topic, _on_sample)
+
+    def recv(self, timeout: float | None = None):
+        """Block until the next message arrives. Returns ``None`` on timeout."""
+        try:
+            return self._queue.get(timeout=timeout)
+        except queue.Empty:
+            return None
Evidence
Both TypedSubscriber and RawSubscriber allocate a default queue.Queue() (unbounded) and
continuously put(...) into it from the Zenoh callback. The async variants submit every
callback/query to an executor with no admission control, which similarly queues unlimited pending
work.

python-sdk/bubbaloop_sdk/subscriber.py[25-37]
python-sdk/bubbaloop_sdk/subscriber.py[66-73]
python-sdk/bubbaloop_sdk/subscriber.py[179-191]
python-sdk/bubbaloop_sdk/subscriber.py[244-251]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
Current implementations can buffer unlimited messages/tasks (unbounded `queue.Queue()` and unbounded executor work queues). Under load, this can grow memory until the process is killed.

### Issue Context
These classes are intended for long-running robotics/edge processes, where OOM is a serious reliability failure mode.

### Fix Focus Areas
- python-sdk/bubbaloop_sdk/subscriber.py[25-37]
- python-sdk/bubbaloop_sdk/subscriber.py[66-73]
- python-sdk/bubbaloop_sdk/subscriber.py[179-191]
- python-sdk/bubbaloop_sdk/subscriber.py[244-251]

### Suggested approach
- Consider `queue.Queue(maxsize=N)` with a documented policy (block Zenoh thread, drop oldest, drop newest, etc.).
- For async variants, consider a bounded handoff queue + worker threads, or implement a simple semaphore to cap in-flight futures.
- If dropping is acceptable, expose counters/logging so drops are observable.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

ⓘ The new review experience is currently in Beta. Learn more

Grey Divider

Qodo Logo

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR expands the Python SDK’s synchronous Zenoh integration by adding callback-based subscribers, async (thread-pooled) callback variants, and queryable helpers on NodeContext, plus accompanying docs/tooling/CI updates.

Changes:

  • Add queue-backed blocking subscribers with recv(timeout) plus new callback + async-callback subscriber classes and AsyncQueryable.
  • Extend NodeContext with subscriber_*callback* and queryable*_async helpers; export new public API from __init__.py.
  • Update README/tooling (ruff/pytest-cov/pixi), add contributing/conventions docs, and add Python lint/test steps to CI.

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
python-sdk/bubbaloop_sdk/subscriber.py Implements queue-backed recv(timeout), callback subscribers, async callback subscribers, and AsyncQueryable.
python-sdk/bubbaloop_sdk/context.py Adds TYPE_CHECKING imports and new NodeContext helper methods for callback subscribers and queryables.
python-sdk/bubbaloop_sdk/init.py Re-exports newly added public classes via imports and __all__.
python-sdk/tests/test_context.py Adds/extends tests for new subscriber/queryable variants and public import surface.
python-sdk/README.md Rewrites docs to reflect synchronous API and documents new callback/queryable APIs.
python-sdk/pyproject.toml Adds richer project metadata + ruff/pytest/coverage configuration and dev dependencies.
python-sdk/pixi.toml Adds pixi workspace config and dev tasks (test/lint/fmt/check).
python-sdk/CONTRIBUTING.md Documents local dev workflow (pixi/venv), linting, and test patterns.
python-sdk/CLAUDE.md Captures repository conventions and Zenoh/threading pitfalls for contributors.
python-sdk/bubbaloop_sdk/get_sample.py Fixes exception chaining on timeout (raise ... from err).
python-sdk/bubbaloop_sdk/decode_sample.py Splits protobuf imports for formatting/lint friendliness.
python-sdk/bubbaloop_sdk/node.py Removes unused imports.
python-sdk/bubbaloop_sdk/health.py Removes unused import.
.github/workflows/ci.yml Adds Python SDK lint + test steps.
.markdownlint.json Adds markdownlint configuration overrides.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread python-sdk/pyproject.toml
Comment thread python-sdk/pixi.toml Outdated
Comment thread python-sdk/bubbaloop_sdk/subscriber.py Outdated
Comment thread python-sdk/bubbaloop_sdk/subscriber.py Outdated
Comment thread python-sdk/bubbaloop_sdk/subscriber.py Outdated
Comment thread python-sdk/bubbaloop_sdk/subscriber.py Outdated
Comment thread .github/workflows/ci.yml Outdated
Comment thread python-sdk/bubbaloop_sdk/subscriber.py Outdated
lferraz and others added 2 commits April 6, 2026 00:00
- Bump requires-python to >=3.10 (PEP 604 union syntax used throughout)
- Remove upper bound from pixi.toml python constraint (was arbitrarily <3.14)
- Update ruff target-version to py310; drop py3.9 classifier
- subscriber.py: move proto decode from Zenoh callback into recv() so
  FromString() runs on the consumer thread, not Zenoh's internal thread
- subscriber.py: push _CLOSED sentinel on undeclare() to unblock recv(timeout=None)
- subscriber.py: fix docstring — callbacks are serial not concurrent; recommend _async
- subscriber.py: add _closing flag + try/except RuntimeError in _wrap() to handle
  race between submit() and executor.shutdown() in *Async classes and AsyncQueryable
- subscriber.py: use shutdown(cancel_futures=True) (Python 3.9+)
- ci.yml: invoke ruff/pytest via python -m to stay within allowlisted command prefixes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…code thread

- test_typed/raw_subscriber_undeclare_unblocks_recv: verify undeclare()
  pushes _CLOSED sentinel so recv(timeout=None) returns None immediately
- test_typed_subscriber_decode_happens_in_recv_not_callback: verify
  FromString runs on the consumer thread, not the Zenoh callback thread
- test_*_async_drops_after_undeclare (x3): verify _closing flag prevents
  handler from being called after undeclare() for CallbackSubscriberAsync,
  RawCallbackSubscriberAsync, and AsyncQueryable

67 tests total.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
lferraz and others added 18 commits April 8, 2026 00:14
… key spaces

- Remove scope parameter from _make_context() — NodeContext no longer has scope
- Update topic assertions: bubbaloop/{scope}/... → bubbaloop/global/...
- Remove test_connect_reads_scope_from_env and test_connect_defaults_scope_to_local
- Fix test_raw_subscriber_recv_returns_sample: recv() now returns bytes, not Sample
- Remove stale BUBBALOOP_SCOPE monkeypatch.delenv() calls from connect tests
- Rename test_topic_default_scope → test_topic_uses_global_prefix

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…lback-subscriber-queryable

# Conflicts:
#	python-sdk/README.md
#	python-sdk/bubbaloop_sdk/context.py
#	python-sdk/bubbaloop_sdk/subscriber.py
#	python-sdk/tests/test_context.py
Delete the TypedSubscriber class and its dead imports (queue, time,
_CLOSED sentinel, _POLL_INTERVAL constant). ProtoSubscriber via
SchemaRegistry is the replacement. __init__.py and tests will be
updated in follow-up tasks.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Drop the deprecated TypedSubscriber import from __init__.py, remove it
from the TYPE_CHECKING block in context.py, delete the old subscriber()
and subscriber_raw() methods, and remove the stale "next 2 functions"
comment. subscribe() and subscribe_raw() are now the primary methods.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…dempotent undeclare

CallbackSubscriber, RawCallbackSubscriber, CallbackSubscriberAsync, and
RawCallbackSubscriberAsync now inherit _BaseSubscriber for a unified
idempotent undeclare() pattern. AsyncQueryable gains its own _undeclared
guard. Two new tests verify the double-undeclare no-op for callback classes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Remove all TypedSubscriber tests (9 functions) and two tests for the
removed ctx.subscriber()/ctx.subscriber_raw() methods. Update
test_import_subscribers to drop the TypedSubscriber import assertion.
Rename section headers that previously mentioned both TypedSubscriber
and RawSubscriber to mention only RawSubscriber.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…r docs

Replace TypedSubscriber with ProtoSubscriber/CallbackSubscriber in all
doc files; remove deprecated ctx.subscriber() and ctx.subscriber_raw()
rows from README API table; update CONTRIBUTING test count to 68.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ead of msg_class

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…s schema_registry

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…n run_node

The heartbeat was started twice — once with the correct signature (return
value discarded) and once with the old signature including ctx.scope,
which would crash at runtime.  Keep a single call inside the try block
so the thread is properly joined on shutdown.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…of msg_class

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… SchemaRegistry

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…bscriber classes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace CallbackSubscriberAsync/RawCallbackSubscriberAsync references with
CallbackSubscriber/RawCallbackSubscriber using max_workers=4, and update
subscriber_callback_async/subscriber_raw_callback_async calls to use the
merged max_workers parameter form.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…class each

CallbackSubscriber(max_workers=None) replaces CallbackSubscriber + CallbackSubscriberAsync.
RawCallbackSubscriber(max_workers=None) replaces RawCallbackSubscriber + RawCallbackSubscriberAsync.

None = handler runs on Zenoh's thread (fast path).
int  = handler runs in a ThreadPoolExecutor.

6 subscriber classes → 4. 4 NodeContext methods → 2.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace _async variant references with max_workers parameter pattern.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@lferraz lferraz requested a review from Copilot April 15, 2026 22:41
@lferraz lferraz changed the title feat(python-sdk): callback subscribers, user queryables, async variants feat(python-sdk): callback subscribers, queryables, and SchemaRegistry auto-decode Apr 15, 2026

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 17 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread python-sdk/bubbaloop_sdk/subscriber.py
Comment thread python-sdk/bubbaloop_sdk/context.py Outdated
Comment thread python-sdk/bubbaloop_sdk/context.py Outdated
Comment thread python-sdk/tests/test_context.py
Comment thread python-sdk/pyproject.toml
Comment thread python-sdk/bubbaloop_sdk/subscriber.py
Comment thread python-sdk/bubbaloop_sdk/subscriber.py
lferraz and others added 2 commits April 16, 2026 00:52
queryable(suffix, handler, max_workers=None) replaces queryable + queryable_async.
queryable_raw(key_expr, handler, max_workers=None) replaces queryable_raw + queryable_raw_async.

AsyncQueryable now supports both modes: None = handler on Zenoh thread, int = thread pool.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The class now supports both sync and thread-pool modes via max_workers,
so the Async prefix no longer applies.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@lferraz lferraz changed the title feat(python-sdk): callback subscribers, queryables, and SchemaRegistry auto-decode feat(python-sdk): callback subscribers, queryables, and publisher convenience methods Apr 15, 2026
…pe annotations, cleanup

- Move registry.decode() into executor.submit() lambda so decode runs in
  the worker thread, not on Zenoh's callback thread (avoids blocking on
  first schema fetch)
- Add Callable type annotations to handler params in context.py
- Remove duplicate test_import_callback_subscribers_with_workers

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 17 changed files in this pull request and generated 9 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread python-sdk/tests/test_context.py Outdated
Comment on lines +910 to +921
def test_raw_subscriber_undeclare_calls_sub_undeclare():
"""undeclare() calls undeclare on the underlying zenoh subscriber."""
from bubbaloop_sdk.subscriber import RawSubscriber

mock_sub = MagicMock()
mock_session = MagicMock()
mock_session.declare_subscriber.return_value = mock_sub
sub = RawSubscriber(mock_session, "test/topic")
sub.undeclare()
mock_sub.undeclare.assert_called_once()


Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — removed the duplicate test_raw_subscriber_undeclare_calls_sub_undeclare.

Comment thread python-sdk/README.md Outdated
Comment on lines 111 to 116
Use `queryable_async` when the handler does slow work:

```python
from bubbaloop_sdk.schema import declare_schema_queryable
from my_protos_pb2 import SensorData

# Declare once — keeps the queryable alive while the reference is held
schema_qbl = declare_schema_queryable(
ctx.session, ctx.machine_id, "my-node", SensorData
)
qbl = ctx.queryable_async("status", on_query)
qbl.undeclare() # call when done to release the thread pool
```

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — example now uses ctx.queryable("status", on_query, max_workers=4).

Comment thread python-sdk/README.md Outdated
Comment on lines +159 to +162
| `ctx.queryable(suffix, handler)` | Handler at `topic(suffix)` |
| `ctx.queryable_raw(key_expr, handler)` | Handler at literal key expression |
| `ctx.queryable_async(suffix, handler, max_workers=4)` | Handler in thread pool |
| `ctx.queryable_raw_async(key_expr, handler, max_workers=4)` | Raw key; handler in thread pool |

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — API table now shows queryable(suffix, handler, max_workers=None) and queryable_raw(key_expr, handler, max_workers=None).

Comment thread python-sdk/CLAUDE.md
Comment on lines +110 to +119
**Threading — critical:**
- Zenoh uses **one internal thread** for ALL callbacks and queryables on a session
- A slow handler blocks every other subscriber/queryable until it returns
- Pass `max_workers=N` to `subscriber_callback` / `subscriber_raw_callback` or use `queryable_async` for any handler that does I/O, DB access, or hardware calls
- Shutdown order for thread-pool variants: undeclare Zenoh subscriber FIRST, then `executor.shutdown()` — reversing this causes `RuntimeError: cannot schedule new futures after shutdown`

**`undeclare()` discipline:**
- Every subscriber, callback subscriber, and queryable must be undeclared when done
- `AsyncQueryable` and `*Async` subscribers own a `ThreadPoolExecutor` — GC alone is not enough, always call `undeclare()`
- Blocking subscribers (`RawSubscriber`) are undeclared via `undeclare()` too

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — updated to reference max_workers pattern and Queryable class name.

Comment thread python-sdk/bubbaloop_sdk/context.py Outdated
Comment on lines 132 to 136
def publisher_proto(self, suffix: str, msg_class=None) -> ProtoPublisher:
"""Declare a protobuf publisher at ``topic(suffix)``."""
from .publisher import ProtoPublisher

type_name = msg_class.DESCRIPTOR.full_name if msg_class is not None else None

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — annotated as msg_class: type | None = None.

Comment thread python-sdk/tests/test_context.py Outdated
assert RawCallbackSubscriber is not None


def test_import_async_queryable():

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — renamed to test_import_queryable.

Comment thread python-sdk/README.md Outdated
pub.put({"temperature": 22.5, "humidity": 60})
time.sleep(1.0)
for msg in sub: # auto-decoded: proto, dict, or bytes
print(f"value: {msg.value}")

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — example now uses print(msg) instead of msg.value.

Comment thread python-sdk/README.md
Comment on lines +78 to +82
def on_sensor(msg):
print(f"received: {msg.value}")

sub = ctx.subscriber_callback("sensor/data", on_sensor)
ctx.wait_shutdown() # block until SIGINT/SIGTERM

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — example now uses print(f"received: {msg}") with a comment explaining the return type varies.

Comment thread python-sdk/CONTRIBUTING.md Outdated
bubbaloop_sdk/
__init__.py # Public API surface
context.py # NodeContext — main entry point
subscriber.py # ProtoSubscriber, RawSubscriber, Callback*, Async*, AsyncQueryable

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — updated to list actual class names: ProtoSubscriber, RawSubscriber, CallbackSubscriber, RawCallbackSubscriber, Queryable.

- Fix stale queryable_async/AsyncQueryable refs in README, CLAUDE.md, CONTRIBUTING.md
- Fix README examples: msg.value → msg (auto-decode returns proto/dict/bytes)
- Rename test_import_async_queryable → test_import_queryable
- Remove duplicate test_raw_subscriber_undeclare_calls_sub_undeclare
- Type-annotate msg_class in publisher_proto()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants