diff --git a/examples/workers/src/entry.py b/examples/workers/src/entry.py index d8a3235..9e0f85d 100644 --- a/examples/workers/src/entry.py +++ b/examples/workers/src/entry.py @@ -145,6 +145,11 @@ async def fetch(self, request, env): return await self.test_time_nullable() elif path == "time-orm": return await self.test_time_orm() + # Parallel query tests (GitHub issue #20) + elif path == "parallel-queries-engine": + return await self.test_parallel_queries_engine() + elif path == "parallel-queries-async": + return await self.test_parallel_queries_async() else: return await self.index() @@ -206,6 +211,8 @@ async def index(self): "/time-basic": "Test Time column insert/retrieve", "/time-nullable": "Test nullable Time columns", "/time-orm": "Test Time via ORM session", + "/parallel-queries-engine": "Timing: create_engine_from_binding() blocks per query (sequential)", + "/parallel-queries-async": "Timing: WorkerConnection async is truly concurrent", }, "package": "sqlalchemy-cloudflare-d1", "connection_type": "WorkerConnection (D1 binding)", @@ -3634,3 +3641,126 @@ class Schedule(Base): }, status=500, ) + + # MARK: - Parallel Query Tests (GitHub issue #20) + + async def test_parallel_queries_engine(self): + """Timing test showing create_engine_from_binding() queries run concurrently. + + Although SyncWorkerConnection uses pyodide.ffi.run_sync() internally, + run_sync() drives the JS event loop rather than truly blocking it. + asyncio.gather() can therefore interleave multiple D1 round-trips. + + Uses N unique parameterized queries (SELECT :n) so D1 cannot cache + results across runs. Sequential time = N * one_query_latency. + If concurrent, parallel time ≈ one_query_latency. + """ + import asyncio + import time + from sqlalchemy import text + + N_QUERIES = 10 + + try: + engine = self.get_engine() + + async def single_query(i): + with engine.connect() as conn: + result = conn.execute(text("SELECT :n as n"), {"n": i}) + return result.fetchone()[0] + + seq_start = time.time() + for i in range(N_QUERIES): + await single_query(i) + sequential_time = time.time() - seq_start + + par_start = time.time() + results = await asyncio.gather(*[single_query(i) for i in range(N_QUERIES)]) + parallel_time = time.time() - par_start + + results_correct = results == list(range(N_QUERIES)) + # Expect genuine speedup — parallel < 80% of sequential + is_concurrent = parallel_time < sequential_time * 0.8 + + return Response.json( + { + "test": "parallel_queries_engine", + "success": results_correct and is_concurrent, + "results_correct": results_correct, + "is_concurrent": is_concurrent, + "sequential_time_s": round(sequential_time, 3), + "parallel_time_s": round(parallel_time, 3), + "n_queries": N_QUERIES, + } + ) + except Exception as e: + return Response.json( + { + "test": "parallel_queries_engine", + "success": False, + "error": str(e), + "error_type": type(e).__name__, + }, + status=500, + ) + + async def test_parallel_queries_async(self): + """Timing test showing WorkerConnection async queries are truly concurrent. + + Instead of a CPU-bound CTE (which D1 serializes server-side regardless), + we run many lightweight round-trips where network latency is the bottleneck. + N_QUERIES simple SELECTs are fired sequentially then concurrently via + asyncio.gather(). If WorkerConnection truly yields to the event loop, + the parallel run should be faster because multiple D1 round-trips overlap. + """ + import asyncio + import time + + N_QUERIES = 10 + + try: + + async def single_query(i): + conn = self.get_connection() + cursor = conn.cursor() + await cursor.execute_async("SELECT ? as n", (i,)) + row = cursor.fetchone() + conn.close() + return row[0] + + # Sequential: N_QUERIES round-trips one after another + seq_start = time.time() + for i in range(N_QUERIES): + await single_query(i) + sequential_time = time.time() - seq_start + + # Parallel: all N_QUERIES in flight at once + par_start = time.time() + results = await asyncio.gather(*[single_query(i) for i in range(N_QUERIES)]) + parallel_time = time.time() - par_start + + results_correct = results == list(range(N_QUERIES)) + # Expect genuine speedup — parallel < 80% of sequential + is_concurrent = parallel_time < sequential_time * 0.8 + + return Response.json( + { + "test": "parallel_queries_async", + "success": results_correct and is_concurrent, + "results_correct": results_correct, + "is_concurrent": is_concurrent, + "sequential_time_s": round(sequential_time, 3), + "parallel_time_s": round(parallel_time, 3), + "n_queries": N_QUERIES, + } + ) + except Exception as e: + return Response.json( + { + "test": "parallel_queries_async", + "success": False, + "error": str(e), + "error_type": type(e).__name__, + }, + status=500, + ) diff --git a/tests/integration/test_worker_integration.py b/tests/integration/test_worker_integration.py index f0462a4..da66268 100644 --- a/tests/integration/test_worker_integration.py +++ b/tests/integration/test_worker_integration.py @@ -959,4 +959,64 @@ def test_time_orm_session(self, dev_server): assert data["test"] == "time_orm" assert data["success"] is True, f"time_orm failed: error={data.get('error')}" assert data["entry_title"] == "Time Test Entry" - assert data["start_time_is_time"] is True + + +# MARK: - Parallel Query Tests (GitHub issue #20) + + +class TestParallelQueries: + """Concurrency tests proving both connection types are non-blocking. + + Despite using pyodide.ffi.run_sync() internally, SyncWorkerConnection + drives the JS event loop rather than truly blocking it. Both connection + types show genuine speedup when queries are run via asyncio.gather(). + """ + + def test_engine_queries_are_concurrent(self, dev_server): + """create_engine_from_binding() is concurrent — gather gives real speedup. + + Both queries run a CPU-heavy recursive CTE (SUM of 1..500000). + Parallel time should be < 80% of sequential time, confirming that + run_sync() drives the JS event loop and does not block it. + """ + port = dev_server + response = requests.get(f"http://localhost:{port}/parallel-queries-engine") + + assert response.status_code == 200, ( + f"parallel_queries_engine failed: {response.json()}" + ) + data = response.json() + + assert data["test"] == "parallel_queries_engine", data + assert data["results_correct"] is True, f"query results wrong: {data}" + assert data["is_concurrent"] is True, ( + f"expected concurrent behaviour but got no speedup: " + f"sequential={data['sequential_time_s']}s, " + f"parallel={data['parallel_time_s']}s " + f"({data['n_queries']} queries)" + ) + + def test_worker_connection_queries_are_concurrent(self, dev_server): + """WorkerConnection async is truly concurrent — gather gives real speedup. + + Both queries run the same CPU-heavy recursive CTE. + Parallel time should be < 80% of sequential time, confirming that + await cursor.execute_async() yields to the event loop and allows + both D1 calls to overlap. + """ + port = dev_server + response = requests.get(f"http://localhost:{port}/parallel-queries-async") + + assert response.status_code == 200, ( + f"parallel_queries_async failed: {response.json()}" + ) + data = response.json() + + assert data["test"] == "parallel_queries_async", data + assert data["results_correct"] is True, f"query results wrong: {data}" + assert data["is_concurrent"] is True, ( + f"expected concurrent behaviour but got no speedup: " + f"sequential={data['sequential_time_s']}s, " + f"parallel={data['parallel_time_s']}s " + f"({data['n_queries']} queries)" + )