Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
1d8e8a2
feat(python-sdk): queue-backed TypedSubscriber/RawSubscriber with opt…
lferraz Apr 5, 2026
afadda2
test(python-sdk): tests for callback subscriber variants
lferraz Apr 5, 2026
d7f4de1
test(python-sdk): tests for callback subscriber variants and async un…
lferraz Apr 5, 2026
9e2e9d4
fix(python-sdk): move concurrent.futures import to module level, fix …
lferraz Apr 5, 2026
2e0f263
feat(python-sdk): add queryable() and subscriber_callback() methods t…
lferraz Apr 5, 2026
2846535
fix(python-sdk): AsyncQueryable wrapper prevents executor leak in que…
lferraz Apr 5, 2026
d8f1896
feat(python-sdk): export new subscriber/queryable classes; add lintin…
lferraz Apr 5, 2026
2c18ead
docs(python-sdk): rewrite README for synchronous API with new subscri…
lferraz Apr 5, 2026
16c5208
chore(python-sdk): replace .flake8 with ruff in pyproject.toml; fix lint
lferraz Apr 5, 2026
f0bd66e
docs(python-sdk): add CONTRIBUTING.md with dev setup, lint config, te…
lferraz Apr 5, 2026
3c8fcc4
chore(python-sdk): add pixi.toml; expand pyproject.toml; fix unused i…
lferraz Apr 5, 2026
f286310
chore(python-sdk): scope F821 suppression to context.py only
lferraz Apr 5, 2026
a19d90d
fix(python-sdk): raise GetSampleTimeout from err; remove global B904 …
lferraz Apr 5, 2026
ce0b18d
chore: add markdownlint config; clean up README and CONTRIBUTING
lferraz Apr 5, 2026
2d3ce64
refactor(python-sdk): use TYPE_CHECKING for type annotations in conte…
lferraz Apr 5, 2026
9c06330
ci: add Python SDK lint and test steps
lferraz Apr 5, 2026
fcbac7a
docs(python-sdk): add CLAUDE.md with conventions, threading model, pi…
lferraz Apr 5, 2026
f87ea16
test(python-sdk): add missing tests for publisher/subscriber context …
lferraz Apr 5, 2026
bdbf7c0
fix(python-sdk): add from __future__ import annotations to context.py
lferraz Apr 5, 2026
0105a09
fix(python-sdk): address PR review comments
lferraz Apr 5, 2026
485d54e
test(python-sdk): cover PR review fixes — sentinel, _closing flag, de…
lferraz Apr 5, 2026
254ada7
fix(python-sdk): address new Copilot review comments
lferraz Apr 5, 2026
afb5ac6
fix(python-sdk): move FromString decode off Zenoh thread; replace tim…
lferraz Apr 5, 2026
1a543e2
docs(python-sdk): add undeclare() calls to README quick-start examples
lferraz Apr 5, 2026
b59f510
test(python-sdk): undeclare async objects in tests; use try/finally f…
lferraz Apr 5, 2026
e854706
fix(python-sdk): add persistent _closed flag to TypedSubscriber and R…
lferraz Apr 5, 2026
4ccad57
fix(python-sdk): polling recv() for multi-consumer safety; fix GC doc…
lferraz Apr 5, 2026
e857a4f
fix(python-sdk): set _shutdown before ctx.close() in run_node(); join…
lferraz Apr 6, 2026
e1203c0
fix(python-sdk): broaden try/finally in run_node(); pin Python 3.12 i…
lferraz Apr 6, 2026
088baa6
Merge remote-tracking branch 'upstream/main' into feat/python-sdk-cal…
lferraz Apr 7, 2026
39ceb2d
chore: merge upstream/main into feat/python-sdk-callback-subscriber-q…
lferraz Apr 7, 2026
6e1fcbe
feat(python-sdk): add SchemaRegistry for on-demand protobuf schema re…
lferraz Apr 7, 2026
0b5f46a
fix(python-sdk): correct merge artifacts in TypedSubscriber and Proto…
lferraz Apr 7, 2026
d670b42
docs(python-sdk): update CLAUDE.md with typing, docstring, and format…
lferraz Apr 7, 2026
feb3cc6
test(python-sdk): update tests for scope removal and new global/local…
lferraz Apr 7, 2026
18ac2b5
Merge remote-tracking branch 'upstream/main' into feat/python-sdk-cal…
lferraz Apr 14, 2026
8bc9506
refactor(python-sdk): remove TypedSubscriber from subscriber.py
lferraz Apr 15, 2026
f0a8660
refactor(python-sdk): remove TypedSubscriber from public API and context
lferraz Apr 15, 2026
1d400ef
refactor(python-sdk): all callback classes inherit _BaseSubscriber, i…
lferraz Apr 15, 2026
eb34fbe
test(python-sdk): remove TypedSubscriber tests
lferraz Apr 15, 2026
e96917b
docs(python-sdk): remove TypedSubscriber references, update subscribe…
lferraz Apr 15, 2026
b240353
chore(python-sdk): add pixi.lock
lferraz Apr 15, 2026
3bc6e64
refactor(python-sdk): callback subscribers use registry.decode() inst…
lferraz Apr 15, 2026
08eff64
refactor(python-sdk): subscriber_callback methods drop msg_class, pas…
lferraz Apr 15, 2026
34b4e80
fix(python-sdk): remove duplicate heartbeat and fix wrong signature i…
lferraz Apr 15, 2026
fd3b4f8
test(python-sdk): update callback tests to use mock registry instead …
lferraz Apr 15, 2026
817d137
style(python-sdk): ruff format subscriber.py and context.py
lferraz Apr 15, 2026
70bb99a
docs(python-sdk): update callback examples to reflect auto-decode via…
lferraz Apr 15, 2026
4803602
refactor(python-sdk): type-annotate handler and registry params in su…
lferraz Apr 15, 2026
9f489d2
test(python-sdk): update tests for merged callback subscriber classes
lferraz Apr 15, 2026
31b6fcc
refactor(python-sdk): merge sync/async callback subscribers into one …
lferraz Apr 15, 2026
5aded5b
docs(python-sdk): update docs for merged callback subscriber classes
lferraz Apr 15, 2026
c07c124
refactor(python-sdk): merge sync/async queryable methods into one each
lferraz Apr 15, 2026
e173b06
refactor(python-sdk): rename AsyncQueryable → Queryable
lferraz Apr 15, 2026
42311bf
fix(python-sdk): address Copilot review — decode in worker thread, ty…
lferraz Apr 15, 2026
6463f04
fix(python-sdk): address Copilot review round 2
lferraz Apr 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,20 @@ jobs:

- name: Test
run: pixi run cargo test --lib -p bubbaloop

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'

- name: Python SDK — lint
run: |
cd python-sdk
pip install -e ".[dev]" -q
python -m ruff format --check bubbaloop_sdk/ tests/
python -m ruff check bubbaloop_sdk/ tests/
Comment thread
lferraz marked this conversation as resolved.

- name: Python SDK — test
run: |
cd python-sdk
python -m pytest tests/ -v
Comment thread
lferraz marked this conversation as resolved.
4 changes: 4 additions & 0 deletions .markdownlint.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"MD013": false,
"MD060": false
}
159 changes: 159 additions & 0 deletions python-sdk/CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# bubbaloop-sdk (Python)

Pure Python wrapper over `zenoh-python`. Synchronous API — no asyncio required.
Mirrors the Rust `bubbaloop-node` SDK surface; nodes written with either SDK are interoperable.

## Structure

```
python-sdk/
bubbaloop_sdk/
__init__.py # Public API — edit when adding new public names
context.py # NodeContext: connect(), topic(), publishers, subscribers, queryables
publisher.py # JsonPublisher, ProtoPublisher (wraps session.declare_publisher)
subscriber.py # ProtoSubscriber, RawSubscriber, CallbackSubscriber, RawCallbackSubscriber, Queryable
node.py # run_node() — CLI arg parsing + health heartbeat + lifecycle
health.py # start_health_heartbeat() — publishes 'ok' every 5s
discover.py # discover_nodes() — GET bubbaloop/**/health
get_sample.py # get_sample() — one-shot async subscribe-and-wait
decode_sample.py # ProtoDecoder — decode zenoh.Sample to protobuf
tests/
test_context.py # 71 unit tests — NO real Zenoh session needed
pyproject.toml # Build config, deps, ruff/pytest/coverage
pixi.toml # Dev tasks: test, lint, fmt, check
```

## Build & verify

```bash
# With pixi (recommended)
cd python-sdk
pixi run check # fmt-check + lint (run before every commit)
pixi run test # 71 unit tests
pixi run test-cov # tests + coverage report

# With venv (alternative)
cd python-sdk
.venv/bin/python -m ruff check bubbaloop_sdk/ tests/
.venv/bin/python -m pytest tests/ -v
```

## Conventions — MUST follow

**Tooling:**
- `ruff` for lint + format — NOT flake8, black, or isort directly
- Config in `pyproject.toml` under `[tool.ruff]` — do NOT add `.flake8` or `setup.cfg`
- Line length: 120 characters
- `TYPE_CHECKING` guard for cross-module type annotations — NEVER string-quoted forward refs (`"Foo"`)

**Type annotations:**

- Use modern Python 3.11+ union syntax: `X | Y` and `X | None` — NOT `Union[X, Y]` or `Optional[X]`
- Annotate all public method parameters and return types
- Annotate class attributes and instance variables when the type is not obvious from the assignment
- When fixing type errors, follow this hierarchy:
1. Add proper type annotations
2. Use `X | Y` union syntax or `cast()` from `typing`
3. Use `TYPE_CHECKING` for circular imports
4. Last resort: `# type: ignore[<error-code>]` with a comment explaining why
- AVOID `# type: ignore` without an error code — always be specific

**Docstrings:**
- Google docstring style for all public modules, classes, and functions
- Do NOT add a docstring to `__init__()` — document instantiation at the class level instead
- Include `Args:`, `Returns:`, and `Raises:` sections when applicable

```python
class CallbackSubscriber:
"""Event-driven subscriber that calls a handler on each received message.

The handler is invoked from Zenoh's internal callback thread by default.
Pass ``max_workers`` to run the handler in a thread pool instead — use
this for slow work (I/O, DB writes, HTTP calls).

Args:
session: Active Zenoh session.
topic: Key expression to subscribe to.
handler: Callable invoked with each decoded message.
registry: SchemaRegistry for auto-decoding samples by encoding header.
"""

def __init__(
self,
session: zenoh.Session,
topic: str,
handler: Callable,
registry,
): ...

def undeclare(self) -> None:
"""Undeclare the Zenoh subscriber and release resources."""
```

**String formatting:**

- Use `%`-style formatting for log calls — NOT f-strings: `log.info("Started %s", name)`
- Reason: lazy evaluation — the string is only formatted if the log level is active
- Use f-strings everywhere else: `raise ValueError(f"Unknown topic: {topic}")`

**Imports:**
- Cross-module type-only imports go under `if TYPE_CHECKING:` at the top of the file
- Lazy runtime imports (inside method bodies) are kept to avoid circular import issues
- `__init__.py` must be updated whenever a new public class is added to `subscriber.py` or `publisher.py`

**Zenoh session:**
- ALWAYS use `mode: "client"` — peer mode does not route through zenohd
- NEVER use `.complete(True)` on queryables — blocks wildcard queries like `bubbaloop/**/schema`
- `query.key_expr` is a **property**, NOT a method — NEVER write `query.key_expr()`
- `query.reply(query.key_expr, payload_bytes)` — correct reply pattern

**Threading — critical:**
- Zenoh uses **one internal thread** for ALL callbacks and queryables on a session
- A slow handler blocks every other subscriber/queryable until it returns
- Pass `max_workers=N` to `subscriber_callback` / `subscriber_raw_callback` / `queryable` / `queryable_raw` for any handler that does I/O, DB access, or hardware calls
- Shutdown order for thread-pool variants: undeclare Zenoh subscriber FIRST, then `executor.shutdown()` — reversing this causes `RuntimeError: cannot schedule new futures after shutdown`

**`undeclare()` discipline:**
- Every subscriber, callback subscriber, and queryable must be undeclared when done
- `Queryable`, `CallbackSubscriber`, and `RawCallbackSubscriber` with `max_workers` own a `ThreadPoolExecutor` — GC alone is not enough, always call `undeclare()`
- Blocking subscribers (`RawSubscriber`) are undeclared via `undeclare()` too
Comment on lines +110 to +119

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — updated to reference max_workers pattern and Queryable class name.


## Testing

Tests do NOT open a real Zenoh session. Use `_make_context()`:

```python
def _make_context(machine_id):
from bubbaloop_sdk.context import NodeContext
ctx = object.__new__(NodeContext)
ctx.session = MagicMock()
ctx.machine_id = machine_id
ctx.instance_name = machine_id
ctx._shutdown = threading.Event()
return ctx
```

For async/threaded tests use `threading.Event` with a 2s timeout — do NOT use `time.sleep`:

```python
event = threading.Event()
def handler(msg):
received.append(msg)
event.set()
assert event.wait(timeout=2.0), "handler not called within 2s"
```

## DO / DON'T

**DO:** `pixi run check` before every commit | add tests when adding public methods | update `__init__.py` and its `__all__` for every new public class | call `undeclare()` in tests that create async subscribers or queryables

**DON'T:** use `asyncio` — the SDK is synchronous by design | use `query.key_expr()` with parentheses | use `.complete(True)` on queryables | add string forward references (`"Foo"`) — use `TYPE_CHECKING` instead | suppress lint rules globally when a per-file or code-level fix is possible

## Pitfalls

- `B904` — always `raise Foo from err` inside `except` blocks, never bare `raise Foo(...)`
- `F401` in `__init__.py` is suppressed by ruff config (re-exports are intentional) — do NOT add `# noqa` comments there
- `CallbackSubscriber` and `RawCallbackSubscriber` without `max_workers` do NOT own an executor — `undeclare()` only calls `_sub.undeclare()`; with `max_workers` they own a `ThreadPoolExecutor` and shut it down in `undeclare()`
- `ProtoSubscriber` and `RawSubscriber` are iterable (`for msg in sub`); iteration raises `StopIteration` on exception via `_BaseSubscriber.__next__` — prefer `recv(timeout=...)` in shutdown-aware loops to avoid blocking indefinitely
- `run_node()` reads `config.yaml` by default; override with `-c path/config.yaml`. The `name` field in config sets `instance_name` for health/schema topics — collisions happen if two instances share the same name
- Health topic format: `bubbaloop/global/{machine_id}/{instance_name}/health` — ensure consumer patterns match exactly
81 changes: 81 additions & 0 deletions python-sdk/CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Contributing to bubbaloop-sdk (Python)

## Dev environment

### With pixi (recommended)

```bash
cd python-sdk
pixi install # creates env, installs all deps including dev extras
pixi run test
pixi run lint
pixi run fmt
```

Available tasks:

| Task | Description |
|---|---|
| `pixi run test` | Run test suite |
| `pixi run test-cov` | Run tests with coverage report |
| `pixi run lint` | Check for lint errors (ruff) |
| `pixi run lint-fix` | Auto-fix lint errors |
| `pixi run fmt` | Format code |
| `pixi run fmt-check` | Check formatting without changing files |
| `pixi run check` | Run fmt-check + lint (CI equivalent) |

### With plain venv

```bash
cd python-sdk
python3 -m venv .venv
.venv/bin/pip install -e ".[dev]"
.venv/bin/pytest tests/ -v
.venv/bin/ruff check bubbaloop_sdk/ tests/
```

## Linting (ruff)

Config lives in `python-sdk/pyproject.toml` under `[tool.ruff]`.
Follows the same pattern as [kornia/kornia](https://github.com/kornia/kornia).

Rules enabled: E/W (pycodestyle), F (Pyflakes), I (isort), B (bugbear),
UP (pyupgrade), C4 (comprehensions), RUF (ruff-specific).

Line length: 120 characters.

## Lint suppressions

| File | Rule | Reason |
|---|---|---|
| `*/__init__.py` | F401, F403 | Re-exports allowed |
| `tests/*` | S101, D | Assert and missing docstrings allowed in tests |

## Testing

Tests in `tests/test_context.py` do **not** open a real Zenoh session.
`_make_context()` uses `object.__new__(NodeContext)` + `MagicMock()` — no router needed.

For async subscriber/queryable tests, `threading.Event` with a 2s timeout
verifies that handlers are dispatched to the thread pool correctly.

## Project structure

```text
python-sdk/
pyproject.toml # Build config, deps, ruff/pytest/coverage config
pixi.toml # Pixi tasks (test, lint, fmt, check)
README.md # User-facing API docs
bubbaloop_sdk/
__init__.py # Public API surface
context.py # NodeContext — main entry point
subscriber.py # ProtoSubscriber, RawSubscriber, CallbackSubscriber, RawCallbackSubscriber, Queryable
publisher.py # JsonPublisher, ProtoPublisher
node.py # run_node() helper
health.py # Health heartbeat (used internally by run_node)
discover.py # discover_nodes()
get_sample.py # get_sample() one-shot helper
decode_sample.py # ProtoDecoder
tests/
test_context.py # 68 unit tests (no real Zenoh required)
```
Loading
Loading