From c996aca25ac64aaf0b9fe4ff78d9642501ad1a89 Mon Sep 17 00:00:00 2001 From: Gyeongjae Choi Date: Mon, 15 Jun 2026 20:22:09 +0900 Subject: [PATCH 1/6] feat: auto-convert Python objects that are passed to/from Queue --- .../cli/tests/bindings-test/src/test_queue.py | 114 ++++++++++++++++++ .../cli/tests/bindings-test/src/worker.py | 19 +++ .../cli/tests/bindings-test/wrangler.jsonc | 8 ++ packages/runtime-sdk/src/workers/_workers.py | 46 +++++-- 4 files changed, 178 insertions(+), 9 deletions(-) create mode 100644 packages/cli/tests/bindings-test/src/test_queue.py diff --git a/packages/cli/tests/bindings-test/src/test_queue.py b/packages/cli/tests/bindings-test/src/test_queue.py new file mode 100644 index 0000000..3de0c06 --- /dev/null +++ b/packages/cli/tests/bindings-test/src/test_queue.py @@ -0,0 +1,114 @@ +import asyncio +import sys + +import pytest + +# FIXME: This doesn't seem to happen in prod environment. +# But for some reason, pytest segfaults after running tests in dev environment (workerd). +pytestmark = pytest.mark.skipif( + sys.version_info < (3, 13), + reason="pytest segfaults after running tests", +) + + +async def _send_and_receive(env, body, **send_opts): + from worker import RECEIVED_MESSAGES + + RECEIVED_MESSAGES.clear() + await env.TEST_QUEUE.send(body, **send_opts) + await asyncio.sleep(2) + assert len(RECEIVED_MESSAGES) > 0, "no messages received by consumer" + return RECEIVED_MESSAGES[-1] + + +@pytest.mark.asyncio +async def test_send_string(env): + msg = await _send_and_receive(env, "hello queue") + assert msg["body"] == "hello queue" + assert isinstance(msg["id"], str) + assert msg["attempts"] >= 1 + + +@pytest.mark.asyncio +async def test_send_dict(env): + msg = await _send_and_receive(env, {"key": "value", "number": 42}) + assert msg["body"]["key"] == "value" + assert msg["body"]["number"] == 42 + + +@pytest.mark.asyncio +async def test_send_number(env): + msg = await _send_and_receive(env, 123) + assert msg["body"] == 123 + + +@pytest.mark.asyncio +async def test_send_with_content_type(env): + msg = await _send_and_receive(env, "text message", contentType="text") + assert msg["body"] == "text message" + + +@pytest.mark.asyncio +async def test_send_none(env): + msg = await _send_and_receive(env, None) + assert msg["body"] is None + + +@pytest.mark.asyncio +async def test_send_bool(env): + msg = await _send_and_receive(env, True) + assert msg["body"] is True + + +@pytest.mark.asyncio +async def test_send_list(env): + msg = await _send_and_receive(env, [1, 2, 3]) + assert msg["body"] == [1, 2, 3] + + +@pytest.mark.asyncio +async def test_send_empty_string(env): + msg = await _send_and_receive(env, "") + assert msg["body"] == "" + + +@pytest.mark.asyncio +async def test_send_batch(env): + from worker import RECEIVED_MESSAGES + + RECEIVED_MESSAGES.clear() + batch = [ + {"body": "batch 1"}, + {"body": "batch 2"}, + {"body": "batch 3"}, + ] + await env.TEST_QUEUE.sendBatch(batch) + await asyncio.sleep(2) + + assert len(RECEIVED_MESSAGES) >= 3 + bodies = [m["body"] for m in RECEIVED_MESSAGES] + assert "batch 1" in bodies + assert "batch 2" in bodies + assert "batch 3" in bodies + + +@pytest.mark.asyncio +async def test_send_batch_with_options(env): + from worker import RECEIVED_MESSAGES + + RECEIVED_MESSAGES.clear() + batch = [ + {"body": "text msg", "contentType": "text"}, + ] + await env.TEST_QUEUE.sendBatch(batch, delaySeconds=0) + await asyncio.sleep(2) + + assert len(RECEIVED_MESSAGES) >= 1 + assert RECEIVED_MESSAGES[-1]["body"] == "text msg" + + +@pytest.mark.asyncio +async def test_send_nested_dict(env): + msg = await _send_and_receive(env, {"outer": {"inner": "deep"}, "list": [1, 2]}) + assert msg["body"]["outer"]["inner"] == "deep" + assert msg["body"]["list"] == [1, 2] diff --git a/packages/cli/tests/bindings-test/src/worker.py b/packages/cli/tests/bindings-test/src/worker.py index 0adb546..3a3685c 100644 --- a/packages/cli/tests/bindings-test/src/worker.py +++ b/packages/cli/tests/bindings-test/src/worker.py @@ -79,6 +79,11 @@ def pytest_runtest_makereport(self, item, call): else "unknown error", "traceback": report.longreprtext, } + elif report.when in ("setup", "teardown") and report.skipped: + self.results[key] = { + "status": "skipped", + "reason": str(report.longrepr), + } elif report.when in ("setup", "teardown") and report.failed: self.results[key] = { "status": "error", @@ -96,6 +101,9 @@ def env(self): return self._env +RECEIVED_MESSAGES = [] + + class Default(WorkerEntrypoint): async def fetch(self, request): from urllib.parse import urlparse @@ -109,6 +117,17 @@ async def fetch(self, request): return Response.json({"ok": True}) return Response.json({"error": "not found"}, status=404) + async def queue(self, batch): + for message in batch.messages: + RECEIVED_MESSAGES.append( + { + "id": message.id, + "body": message.body, + "attempts": message.attempts, + } + ) + message.ack() + def _run_suite(self, suite_name): module = f"test_{suite_name}" if importlib.util.find_spec(module) is None: diff --git a/packages/cli/tests/bindings-test/wrangler.jsonc b/packages/cli/tests/bindings-test/wrangler.jsonc index bcbe05a..2b486b7 100644 --- a/packages/cli/tests/bindings-test/wrangler.jsonc +++ b/packages/cli/tests/bindings-test/wrangler.jsonc @@ -16,6 +16,14 @@ "database_name": "test-db" } ], + "queues": { + "producers": [ + { "binding": "TEST_QUEUE", "queue": "test-queue" } + ], + "consumers": [ + { "queue": "test-queue", "max_batch_size": 10, "max_batch_timeout": 1 } + ] + }, "durable_objects": { "bindings": [ { "name": "TEST_DO", "class_name": "TestDurableObject" } diff --git a/packages/runtime-sdk/src/workers/_workers.py b/packages/runtime-sdk/src/workers/_workers.py index 7a6d007..47e12e7 100644 --- a/packages/runtime-sdk/src/workers/_workers.py +++ b/packages/runtime-sdk/src/workers/_workers.py @@ -319,8 +319,13 @@ def _manage_pyproxies(): destroy_proxies(proxies) -def _is_js_instance(val, js_cls_name): - return hasattr(val, "constructor") and val.constructor.name == js_cls_name +def _is_js_instance(val, js_cls_names: str | set[str]): + if not hasattr(val, "constructor"): + return False + name = val.constructor.name + if isinstance(js_cls_names, set): + return name in js_cls_names + return name == js_cls_names try: @@ -1114,6 +1119,11 @@ def _convert_result(self, result): # TODO: This is a bit of a hack. We should revisit when there are more # bindings to support with different patterns. return self.__class__(converted) + if isinstance(converted, list): + return [ + self.__class__(item) if isinstance(item, JsProxy) else item + for item in converted + ] return converted def _getattr_helper(self, name): @@ -1252,6 +1262,13 @@ async def create_batch(self, *args, **kwargs): class _EnvWrapper: + _BINDING_TYPES = { + "KvNamespace", + "R2Bucket", + "D1Database", + "WorkerQueue", + } + def __init__(self, env: Any): self._env = env @@ -1266,13 +1283,7 @@ def _getattr_helper(self, name): if _is_js_instance(binding, "WorkflowImpl"): return _WorkflowBindingWrapper(binding) - if _is_js_instance(binding, "KvNamespace"): - return _BindingWrapper(binding) - - if _is_js_instance(binding, "R2Bucket"): - return _BindingWrapper(binding) - - if _is_js_instance(binding, "D1Database"): + if _is_js_instance(binding, self._BINDING_TYPES): return _BindingWrapper(binding) return binding @@ -1550,6 +1561,7 @@ def __init__(self, ctx: "ExecutionContext", env: "Env"): def __init_subclass__(cls, **_kwargs: Any): _wrap_subclass(cls) + _wrap_queue_handler(cls) class WorkflowEntrypoint: @@ -1567,3 +1579,19 @@ def __init__(self, ctx: "ExecutionContext", env: "Env"): def __init_subclass__(cls, **_kwargs: Any): _wrap_subclass(cls) _wrap_workflow_step(cls) + + +def _wrap_queue_handler(cls): + queue_fn = getattr(cls, "queue", None) + if queue_fn is None: + return + + @functools.wraps(queue_fn) + async def wrapped_queue(self, batch, *_args, **_kwargs): + wrapped_batch = _BindingWrapper(batch) + result = queue_fn(self, wrapped_batch) + if inspect.iscoroutine(result): + result = await result + return result + + cls.queue = wrapped_queue From fa0650e46afb33b34d5c09766acbb8dabb875f36 Mon Sep 17 00:00:00 2001 From: Gyeongjae Choi Date: Mon, 15 Jun 2026 20:29:59 +0900 Subject: [PATCH 2/6] chore: add comment about sleep --- packages/cli/tests/bindings-test/src/test_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cli/tests/bindings-test/src/test_queue.py b/packages/cli/tests/bindings-test/src/test_queue.py index 3de0c06..d34779a 100644 --- a/packages/cli/tests/bindings-test/src/test_queue.py +++ b/packages/cli/tests/bindings-test/src/test_queue.py @@ -16,7 +16,7 @@ async def _send_and_receive(env, body, **send_opts): RECEIVED_MESSAGES.clear() await env.TEST_QUEUE.send(body, **send_opts) - await asyncio.sleep(2) + await asyncio.sleep(2) # Wait for the message to be processed assert len(RECEIVED_MESSAGES) > 0, "no messages received by consumer" return RECEIVED_MESSAGES[-1] From 90e92bcdf47b6a32a67b533f361fcd87ca496178 Mon Sep 17 00:00:00 2001 From: Gyeongjae Choi Date: Mon, 15 Jun 2026 20:43:18 +0900 Subject: [PATCH 3/6] chore: merge queue tests to reduce test time --- .../cli/tests/bindings-test/src/test_queue.py | 165 +++++++++++------- 1 file changed, 98 insertions(+), 67 deletions(-) diff --git a/packages/cli/tests/bindings-test/src/test_queue.py b/packages/cli/tests/bindings-test/src/test_queue.py index d34779a..e456d33 100644 --- a/packages/cli/tests/bindings-test/src/test_queue.py +++ b/packages/cli/tests/bindings-test/src/test_queue.py @@ -3,112 +3,143 @@ import pytest -# FIXME: This doesn't seem to happen in prod environment. -# But for some reason, pytest segfaults after running tests in dev environment (workerd). -pytestmark = pytest.mark.skipif( - sys.version_info < (3, 13), - reason="pytest segfaults after running tests", -) +pytestmark = [ + pytest.mark.skipif( + sys.version_info < (3, 13), + reason="pytest segfaults after running tests", + ), + pytest.mark.asyncio, +] +_send_cache = None +_batch_cache = None + + +def _find(messages, predicate): + return next(m for m in messages if predicate(m)) + + +async def _get_send_results(env): + global _send_cache + if _send_cache is not None: + return _send_cache + + from worker import RECEIVED_MESSAGES + + RECEIVED_MESSAGES.clear() + + # Send everything at once to reduce the overall test time + await env.TEST_QUEUE.send("hello queue") + await env.TEST_QUEUE.send({"key": "value", "number": 42}) + await env.TEST_QUEUE.send(123) + await env.TEST_QUEUE.send("text message", contentType="text") + await env.TEST_QUEUE.send(None) + await env.TEST_QUEUE.send(True) + await env.TEST_QUEUE.send([1, 2, 3]) + await env.TEST_QUEUE.send("") + await env.TEST_QUEUE.send({"outer": {"inner": "deep"}, "list": [1, 2]}) + + await asyncio.sleep(2) + + assert len(RECEIVED_MESSAGES) >= 9 + _send_cache = list(RECEIVED_MESSAGES) + return _send_cache + + +async def _get_batch_results(env): + global _batch_cache + if _batch_cache is not None: + return _batch_cache -async def _send_and_receive(env, body, **send_opts): from worker import RECEIVED_MESSAGES RECEIVED_MESSAGES.clear() - await env.TEST_QUEUE.send(body, **send_opts) - await asyncio.sleep(2) # Wait for the message to be processed - assert len(RECEIVED_MESSAGES) > 0, "no messages received by consumer" - return RECEIVED_MESSAGES[-1] + + await env.TEST_QUEUE.sendBatch( + [ + {"body": "batch 1"}, + {"body": "batch 2"}, + {"body": "batch 3"}, + ] + ) + await env.TEST_QUEUE.sendBatch( + [{"body": "text msg", "contentType": "text"}], + delaySeconds=0, + ) + + await asyncio.sleep(2) + + assert len(RECEIVED_MESSAGES) >= 4 + _batch_cache = list(RECEIVED_MESSAGES) + return _batch_cache -@pytest.mark.asyncio async def test_send_string(env): - msg = await _send_and_receive(env, "hello queue") - assert msg["body"] == "hello queue" + msgs = await _get_send_results(env) + msg = _find(msgs, lambda m: m["body"] == "hello queue") assert isinstance(msg["id"], str) assert msg["attempts"] >= 1 -@pytest.mark.asyncio async def test_send_dict(env): - msg = await _send_and_receive(env, {"key": "value", "number": 42}) - assert msg["body"]["key"] == "value" + msgs = await _get_send_results(env) + msg = _find( + msgs, + lambda m: isinstance(m["body"], dict) and m["body"].get("key") == "value", + ) assert msg["body"]["number"] == 42 -@pytest.mark.asyncio async def test_send_number(env): - msg = await _send_and_receive(env, 123) - assert msg["body"] == 123 + msgs = await _get_send_results(env) + _find(msgs, lambda m: m["body"] == 123) -@pytest.mark.asyncio async def test_send_with_content_type(env): - msg = await _send_and_receive(env, "text message", contentType="text") - assert msg["body"] == "text message" + msgs = await _get_send_results(env) + _find(msgs, lambda m: m["body"] == "text message") -@pytest.mark.asyncio async def test_send_none(env): - msg = await _send_and_receive(env, None) - assert msg["body"] is None + msgs = await _get_send_results(env) + _find(msgs, lambda m: m["body"] is None) -@pytest.mark.asyncio async def test_send_bool(env): - msg = await _send_and_receive(env, True) - assert msg["body"] is True + msgs = await _get_send_results(env) + _find(msgs, lambda m: m["body"] is True) -@pytest.mark.asyncio async def test_send_list(env): - msg = await _send_and_receive(env, [1, 2, 3]) - assert msg["body"] == [1, 2, 3] + msgs = await _get_send_results(env) + _find(msgs, lambda m: m["body"] == [1, 2, 3]) -@pytest.mark.asyncio async def test_send_empty_string(env): - msg = await _send_and_receive(env, "") - assert msg["body"] == "" + msgs = await _get_send_results(env) + _find(msgs, lambda m: m["body"] == "") -@pytest.mark.asyncio -async def test_send_batch(env): - from worker import RECEIVED_MESSAGES +async def test_send_nested_dict(env): + msgs = await _get_send_results(env) + msg = _find( + msgs, + lambda m: isinstance(m["body"], dict) + and isinstance(m["body"].get("outer"), dict), + ) + assert msg["body"]["outer"]["inner"] == "deep" + assert msg["body"]["list"] == [1, 2] - RECEIVED_MESSAGES.clear() - batch = [ - {"body": "batch 1"}, - {"body": "batch 2"}, - {"body": "batch 3"}, - ] - await env.TEST_QUEUE.sendBatch(batch) - await asyncio.sleep(2) - assert len(RECEIVED_MESSAGES) >= 3 - bodies = [m["body"] for m in RECEIVED_MESSAGES] +async def test_send_batch(env): + msgs = await _get_batch_results(env) + bodies = [m["body"] for m in msgs] assert "batch 1" in bodies assert "batch 2" in bodies assert "batch 3" in bodies -@pytest.mark.asyncio async def test_send_batch_with_options(env): - from worker import RECEIVED_MESSAGES - - RECEIVED_MESSAGES.clear() - batch = [ - {"body": "text msg", "contentType": "text"}, - ] - await env.TEST_QUEUE.sendBatch(batch, delaySeconds=0) - await asyncio.sleep(2) - - assert len(RECEIVED_MESSAGES) >= 1 - assert RECEIVED_MESSAGES[-1]["body"] == "text msg" - - -@pytest.mark.asyncio -async def test_send_nested_dict(env): - msg = await _send_and_receive(env, {"outer": {"inner": "deep"}, "list": [1, 2]}) - assert msg["body"]["outer"]["inner"] == "deep" - assert msg["body"]["list"] == [1, 2] + msgs = await _get_batch_results(env) + bodies = [m["body"] for m in msgs] + assert "text msg" in bodies From 339b737701fa4b9a2a5efea44397a3d509ad8847 Mon Sep 17 00:00:00 2001 From: Gyeongjae Choi Date: Tue, 16 Jun 2026 17:17:13 +0900 Subject: [PATCH 4/6] chore: forward env and ctx to queue handler --- packages/runtime-sdk/src/workers/_workers.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/runtime-sdk/src/workers/_workers.py b/packages/runtime-sdk/src/workers/_workers.py index 47e12e7..684c853 100644 --- a/packages/runtime-sdk/src/workers/_workers.py +++ b/packages/runtime-sdk/src/workers/_workers.py @@ -1112,12 +1112,14 @@ def __init__(self, binding): def _convert_result(self, result): converted = python_from_rpc(result) + + # After python_from_rpc, some objects may still be JsProxy objects. + # For now, we wrap all of them with the _BindingWrapper (or a subclass of it) + # so that accessing attributes on them will be properly converted. + + # TODO: This is a bit of a hack. We should revisit when there are more + # bindings to support with different return types. if isinstance(converted, JsProxy): - # If the RPC result is another JsProxy, we assume that - # it is another RPC-wrapped object and wrap it as well. - # for example, d1.bind() returns the same object as a result. - # TODO: This is a bit of a hack. We should revisit when there are more - # bindings to support with different patterns. return self.__class__(converted) if isinstance(converted, list): return [ @@ -1587,9 +1589,9 @@ def _wrap_queue_handler(cls): return @functools.wraps(queue_fn) - async def wrapped_queue(self, batch, *_args, **_kwargs): + async def wrapped_queue(self, batch, *args, **kwargs): wrapped_batch = _BindingWrapper(batch) - result = queue_fn(self, wrapped_batch) + result = queue_fn(self, wrapped_batch, *args, **kwargs) if inspect.iscoroutine(result): result = await result return result From 6dd6cc3bc6adf548f2ced562a30b248eefc6ab6a Mon Sep 17 00:00:00 2001 From: Gyeongjae Choi Date: Tue, 16 Jun 2026 17:17:23 +0900 Subject: [PATCH 5/6] chore: tidy up queue tests --- .../cli/tests/bindings-test/src/test_queue.py | 101 ++++++++---------- .../cli/tests/bindings-test/src/worker.py | 2 +- 2 files changed, 45 insertions(+), 58 deletions(-) diff --git a/packages/cli/tests/bindings-test/src/test_queue.py b/packages/cli/tests/bindings-test/src/test_queue.py index e456d33..4dca78e 100644 --- a/packages/cli/tests/bindings-test/src/test_queue.py +++ b/packages/cli/tests/bindings-test/src/test_queue.py @@ -11,78 +11,65 @@ pytest.mark.asyncio, ] -_send_cache = None -_batch_cache = None +_cache = None def _find(messages, predicate): return next(m for m in messages if predicate(m)) -async def _get_send_results(env): - global _send_cache - if _send_cache is not None: - return _send_cache +# Send everything at once to reduce the overall test time. +# Receiving a message from the queue takes ~2 seconds, +# so batching all sends into a single sleep is more efficient. +async def _send_all_messages(env): + global _cache + if _cache is not None: + return _cache from worker import RECEIVED_MESSAGES RECEIVED_MESSAGES.clear() - # Send everything at once to reduce the overall test time - await env.TEST_QUEUE.send("hello queue") - await env.TEST_QUEUE.send({"key": "value", "number": 42}) - await env.TEST_QUEUE.send(123) - await env.TEST_QUEUE.send("text message", contentType="text") - await env.TEST_QUEUE.send(None) - await env.TEST_QUEUE.send(True) - await env.TEST_QUEUE.send([1, 2, 3]) - await env.TEST_QUEUE.send("") - await env.TEST_QUEUE.send({"outer": {"inner": "deep"}, "list": [1, 2]}) - - await asyncio.sleep(2) - - assert len(RECEIVED_MESSAGES) >= 9 - _send_cache = list(RECEIVED_MESSAGES) - return _send_cache - - -async def _get_batch_results(env): - global _batch_cache - if _batch_cache is not None: - return _batch_cache - - from worker import RECEIVED_MESSAGES - - RECEIVED_MESSAGES.clear() - - await env.TEST_QUEUE.sendBatch( - [ - {"body": "batch 1"}, - {"body": "batch 2"}, - {"body": "batch 3"}, - ] - ) - await env.TEST_QUEUE.sendBatch( - [{"body": "text msg", "contentType": "text"}], - delaySeconds=0, + q = env.TEST_QUEUE + await asyncio.gather( + q.send("hello queue"), + q.send({"key": "value", "number": 42}), + q.send(123), + q.send("text message", contentType="text"), + q.send(None), + q.send(True), + q.send([1, 2, 3]), + q.send(""), + q.send({"outer": {"inner": "deep"}, "list": [1, 2]}), + q.sendBatch( + [ + {"body": "batch 1"}, + {"body": "batch 2"}, + {"body": "batch 3"}, + ] + ), + q.sendBatch( + [{"body": "text msg", "contentType": "text"}], + delaySeconds=0, + ), ) await asyncio.sleep(2) - assert len(RECEIVED_MESSAGES) >= 4 - _batch_cache = list(RECEIVED_MESSAGES) - return _batch_cache + assert len(RECEIVED_MESSAGES) >= 13 + _cache = list(RECEIVED_MESSAGES) + return _cache async def test_send_string(env): - msgs = await _get_send_results(env) + msgs = await _send_all_messages(env) msg = _find(msgs, lambda m: m["body"] == "hello queue") assert isinstance(msg["id"], str) assert msg["attempts"] >= 1 async def test_send_dict(env): - msgs = await _get_send_results(env) + msgs = await _send_all_messages(env) msg = _find( msgs, lambda m: isinstance(m["body"], dict) and m["body"].get("key") == "value", @@ -91,37 +78,37 @@ async def test_send_dict(env): async def test_send_number(env): - msgs = await _get_send_results(env) + msgs = await _send_all_messages(env) _find(msgs, lambda m: m["body"] == 123) async def test_send_with_content_type(env): - msgs = await _get_send_results(env) + msgs = await _send_all_messages(env) _find(msgs, lambda m: m["body"] == "text message") async def test_send_none(env): - msgs = await _get_send_results(env) + msgs = await _send_all_messages(env) _find(msgs, lambda m: m["body"] is None) async def test_send_bool(env): - msgs = await _get_send_results(env) + msgs = await _send_all_messages(env) _find(msgs, lambda m: m["body"] is True) async def test_send_list(env): - msgs = await _get_send_results(env) + msgs = await _send_all_messages(env) _find(msgs, lambda m: m["body"] == [1, 2, 3]) async def test_send_empty_string(env): - msgs = await _get_send_results(env) + msgs = await _send_all_messages(env) _find(msgs, lambda m: m["body"] == "") async def test_send_nested_dict(env): - msgs = await _get_send_results(env) + msgs = await _send_all_messages(env) msg = _find( msgs, lambda m: isinstance(m["body"], dict) @@ -132,7 +119,7 @@ async def test_send_nested_dict(env): async def test_send_batch(env): - msgs = await _get_batch_results(env) + msgs = await _send_all_messages(env) bodies = [m["body"] for m in msgs] assert "batch 1" in bodies assert "batch 2" in bodies @@ -140,6 +127,6 @@ async def test_send_batch(env): async def test_send_batch_with_options(env): - msgs = await _get_batch_results(env) + msgs = await _send_all_messages(env) bodies = [m["body"] for m in msgs] assert "text msg" in bodies diff --git a/packages/cli/tests/bindings-test/src/worker.py b/packages/cli/tests/bindings-test/src/worker.py index 3a3685c..01f2f96 100644 --- a/packages/cli/tests/bindings-test/src/worker.py +++ b/packages/cli/tests/bindings-test/src/worker.py @@ -117,7 +117,7 @@ async def fetch(self, request): return Response.json({"ok": True}) return Response.json({"error": "not found"}, status=404) - async def queue(self, batch): + async def queue(self, batch, env, ctx): for message in batch.messages: RECEIVED_MESSAGES.append( { From 72b2581545bc2d4bb480f1e0a2d6ebd4f9e5db04 Mon Sep 17 00:00:00 2001 From: Gyeongjae Choi Date: Tue, 16 Jun 2026 17:30:14 +0900 Subject: [PATCH 6/6] chore: fix cancel_all_tasks to properly cancel pending tasks --- .../cli/tests/bindings-test/src/test_queue.py | 9 +---- .../cli/tests/bindings-test/src/worker.py | 35 ++++++++++++++++--- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/packages/cli/tests/bindings-test/src/test_queue.py b/packages/cli/tests/bindings-test/src/test_queue.py index 4dca78e..1d56c10 100644 --- a/packages/cli/tests/bindings-test/src/test_queue.py +++ b/packages/cli/tests/bindings-test/src/test_queue.py @@ -1,15 +1,8 @@ import asyncio -import sys import pytest -pytestmark = [ - pytest.mark.skipif( - sys.version_info < (3, 13), - reason="pytest segfaults after running tests", - ), - pytest.mark.asyncio, -] +pytestmark = pytest.mark.asyncio _cache = None diff --git a/packages/cli/tests/bindings-test/src/worker.py b/packages/cli/tests/bindings-test/src/worker.py index 01f2f96..4de3468 100644 --- a/packages/cli/tests/bindings-test/src/worker.py +++ b/packages/cli/tests/bindings-test/src/worker.py @@ -11,6 +11,7 @@ import asyncio import importlib.util import sys +from asyncio import InvalidStateError import pytest from pyodide.webloop import WebLoop @@ -28,11 +29,37 @@ async def _noop(*args): WebLoop.shutdown_asyncgens = _noop WebLoop.shutdown_default_executor = _noop -# Pyodide 0.26.0a2's _cancel_all_tasks calls task.exception() on pending tasks, -# which raises InvalidStateError under Pyodide's WebLoop. -# Ignore this error to prevent pytest-asyncio from crashing. +# Pyodide 0.26.0a2's WebLoop causes InvalidStateError when the +# _cancel_all_tasks calls task.exception() on done-but-not-cancelled tasks. +# Replace with a version that cancels tasks but tolerates that error. if sys.version_info < (3, 13): - asyncio.runners._cancel_all_tasks = lambda loop: None # type: ignore[attr-defined] + + def _cancel_all_tasks(loop): + to_cancel = asyncio.tasks.all_tasks(loop) + if not to_cancel: + return + for task in to_cancel: + task.cancel() + loop.run_until_complete( + asyncio.tasks.gather(*to_cancel, return_exceptions=True) + ) + for task in to_cancel: + if task.cancelled(): + continue + try: + if task.exception() is not None: + loop.call_exception_handler( + { + "message": "unhandled exception during asyncio.run() shutdown", + "exception": task.exception(), + "task": task, + } + ) + # Note: This exception catch is added from the original implementation + except (InvalidStateError, RuntimeError): + pass + + asyncio.runners._cancel_all_tasks = _cancel_all_tasks # type: ignore[attr-defined] class ResultCollector: