Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions examples/workers/src/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ async def fetch(self, request, env):
return await self.test_time_nullable()
elif path == "time-orm":
return await self.test_time_orm()
# Parallel query tests (GitHub issue #20)
elif path == "parallel-queries-engine":
return await self.test_parallel_queries_engine()
elif path == "parallel-queries-async":
return await self.test_parallel_queries_async()
else:
return await self.index()

Expand Down Expand Up @@ -206,6 +211,8 @@ async def index(self):
"/time-basic": "Test Time column insert/retrieve",
"/time-nullable": "Test nullable Time columns",
"/time-orm": "Test Time via ORM session",
"/parallel-queries-engine": "Timing: create_engine_from_binding() blocks per query (sequential)",
"/parallel-queries-async": "Timing: WorkerConnection async is truly concurrent",
},
"package": "sqlalchemy-cloudflare-d1",
"connection_type": "WorkerConnection (D1 binding)",
Expand Down Expand Up @@ -3634,3 +3641,126 @@ class Schedule(Base):
},
status=500,
)

# MARK: - Parallel Query Tests (GitHub issue #20)

async def test_parallel_queries_engine(self):
"""Timing test showing create_engine_from_binding() queries run concurrently.

Although SyncWorkerConnection uses pyodide.ffi.run_sync() internally,
run_sync() drives the JS event loop rather than truly blocking it.
asyncio.gather() can therefore interleave multiple D1 round-trips.

Uses N unique parameterized queries (SELECT :n) so D1 cannot cache
results across runs. Sequential time = N * one_query_latency.
If concurrent, parallel time ≈ one_query_latency.
"""
import asyncio
import time
from sqlalchemy import text

N_QUERIES = 10

try:
engine = self.get_engine()

async def single_query(i):
with engine.connect() as conn:
result = conn.execute(text("SELECT :n as n"), {"n": i})
return result.fetchone()[0]

seq_start = time.time()
for i in range(N_QUERIES):
await single_query(i)
sequential_time = time.time() - seq_start

par_start = time.time()
results = await asyncio.gather(*[single_query(i) for i in range(N_QUERIES)])
parallel_time = time.time() - par_start

results_correct = results == list(range(N_QUERIES))
# Expect genuine speedup — parallel < 80% of sequential
is_concurrent = parallel_time < sequential_time * 0.8

return Response.json(
{
"test": "parallel_queries_engine",
"success": results_correct and is_concurrent,
"results_correct": results_correct,
"is_concurrent": is_concurrent,
"sequential_time_s": round(sequential_time, 3),
"parallel_time_s": round(parallel_time, 3),
"n_queries": N_QUERIES,
}
)
except Exception as e:
return Response.json(
{
"test": "parallel_queries_engine",
"success": False,
"error": str(e),
"error_type": type(e).__name__,
},
status=500,
)

async def test_parallel_queries_async(self):
"""Timing test showing WorkerConnection async queries are truly concurrent.

Instead of a CPU-bound CTE (which D1 serializes server-side regardless),
we run many lightweight round-trips where network latency is the bottleneck.
N_QUERIES simple SELECTs are fired sequentially then concurrently via
asyncio.gather(). If WorkerConnection truly yields to the event loop,
the parallel run should be faster because multiple D1 round-trips overlap.
"""
import asyncio
import time

N_QUERIES = 10

try:

async def single_query(i):
conn = self.get_connection()
cursor = conn.cursor()
await cursor.execute_async("SELECT ? as n", (i,))
row = cursor.fetchone()
conn.close()
return row[0]

# Sequential: N_QUERIES round-trips one after another
seq_start = time.time()
for i in range(N_QUERIES):
await single_query(i)
sequential_time = time.time() - seq_start

# Parallel: all N_QUERIES in flight at once
par_start = time.time()
results = await asyncio.gather(*[single_query(i) for i in range(N_QUERIES)])
parallel_time = time.time() - par_start

results_correct = results == list(range(N_QUERIES))
# Expect genuine speedup — parallel < 80% of sequential
is_concurrent = parallel_time < sequential_time * 0.8

return Response.json(
{
"test": "parallel_queries_async",
"success": results_correct and is_concurrent,
"results_correct": results_correct,
"is_concurrent": is_concurrent,
"sequential_time_s": round(sequential_time, 3),
"parallel_time_s": round(parallel_time, 3),
"n_queries": N_QUERIES,
}
)
except Exception as e:
return Response.json(
{
"test": "parallel_queries_async",
"success": False,
"error": str(e),
"error_type": type(e).__name__,
},
status=500,
)
62 changes: 61 additions & 1 deletion tests/integration/test_worker_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,4 +959,64 @@ def test_time_orm_session(self, dev_server):
assert data["test"] == "time_orm"
assert data["success"] is True, f"time_orm failed: error={data.get('error')}"
assert data["entry_title"] == "Time Test Entry"
assert data["start_time_is_time"] is True


# MARK: - Parallel Query Tests (GitHub issue #20)


class TestParallelQueries:
"""Concurrency tests proving both connection types are non-blocking.

Despite using pyodide.ffi.run_sync() internally, SyncWorkerConnection
drives the JS event loop rather than truly blocking it. Both connection
types show genuine speedup when queries are run via asyncio.gather().
"""

def test_engine_queries_are_concurrent(self, dev_server):
"""create_engine_from_binding() is concurrent — gather gives real speedup.

Both queries run a CPU-heavy recursive CTE (SUM of 1..500000).
Parallel time should be < 80% of sequential time, confirming that
run_sync() drives the JS event loop and does not block it.
"""
port = dev_server
response = requests.get(f"http://localhost:{port}/parallel-queries-engine")

assert response.status_code == 200, (
f"parallel_queries_engine failed: {response.json()}"
)
data = response.json()

assert data["test"] == "parallel_queries_engine", data
assert data["results_correct"] is True, f"query results wrong: {data}"
assert data["is_concurrent"] is True, (
f"expected concurrent behaviour but got no speedup: "
f"sequential={data['sequential_time_s']}s, "
f"parallel={data['parallel_time_s']}s "
f"({data['n_queries']} queries)"
)

def test_worker_connection_queries_are_concurrent(self, dev_server):
"""WorkerConnection async is truly concurrent — gather gives real speedup.

Both queries run the same CPU-heavy recursive CTE.
Parallel time should be < 80% of sequential time, confirming that
await cursor.execute_async() yields to the event loop and allows
both D1 calls to overlap.
"""
port = dev_server
response = requests.get(f"http://localhost:{port}/parallel-queries-async")

assert response.status_code == 200, (
f"parallel_queries_async failed: {response.json()}"
)
data = response.json()

assert data["test"] == "parallel_queries_async", data
assert data["results_correct"] is True, f"query results wrong: {data}"
assert data["is_concurrent"] is True, (
f"expected concurrent behaviour but got no speedup: "
f"sequential={data['sequential_time_s']}s, "
f"parallel={data['parallel_time_s']}s "
f"({data['n_queries']} queries)"
)
Loading