Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions .github/workflows/pull-request-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,6 @@ jobs:
- service: national-scale-spatial-join-databricks-partitioned-16-nodes
display_name: Sedona National Scale Spatial Join - Partitioned - 16 Nodes

- service: national-scale-spatial-join-databricks-default-2-nodes
display_name: Sedona National Scale Spatial Join - Default - 2 Nodes

- service: national-scale-spatial-join-databricks-default-8-nodes
display_name: Sedona National Scale Spatial Join - Default - 8 Nodes

- service: national-scale-spatial-join-databricks-default-16-nodes
display_name: Sedona National Scale Spatial Join - Default - 16 Nodes

steps:
- name: Checkout code
Expand Down
11 changes: 0 additions & 11 deletions .github/workflows/push-containers-to-acr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,6 @@ jobs:
image: national-scale-spatial-join-databricks-partitioned-16-nodes
display_name: Sedona National Scale Spatial Join - Partitioned - 16 Nodes

- service: national-scale-spatial-join-databricks-default-2-nodes
image: national-scale-spatial-join-databricks-default-2-nodes
display_name: Sedona National Scale Spatial Join - Default - 2 Nodes

- service: national-scale-spatial-join-databricks-default-8-nodes
image: national-scale-spatial-join-databricks-default-8-nodes
display_name: Sedona National Scale Spatial Join - Default - 8 Nodes

- service: national-scale-spatial-join-databricks-default-16-nodes
image: national-scale-spatial-join-databricks-default-16-nodes
display_name: Sedona National Scale Spatial Join - Default - 16 Nodes

steps:
- name: Checkout code
Expand Down
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Python, DuckDB (spatial), PostGIS on Azure Database for PostgreSQL, Apache Sedon
- Secrets live in `.env` (gitignored) and are forwarded to ACI as `--secure-environment-variables` from `main.py`.
- Benchmark batching: members of a batch list each other bidirectionally in `related_script_ids`. The orchestrator dedupes via `completed_experiments`, so each batch runs once as one parallel `ThreadPoolExecutor` fan-out. A batch must satisfy four constraints simultaneously: (a) same query type, (b) same `dataset_size`, (c) at most one PostGIS member (shared Azure Postgres server), (d) Databricks cluster vCPU sum ≤ 200, computed as `(workers + 1) × 4` per Sedona member on `Standard_D4s_v3`. See `README.md#pairing-and-randomization` for the full batch listing. *Why: peers must execute under the same wall-clock window for fair comparison, without contending on shared infrastructure or breaching regional quota.*
- Adding a benchmark requires three edits in lockstep: file in `src/presentation/entrypoints/`, `case` arm in `benchmark_runner.py`, and an entry in `benchmarks.yml`. Missing any one silently breaks dispatch or orchestration.
- Stopping rule on `@monitor`: high-frequency single-machine queries use sequential stopping (bootstrapped CI on the mean elapsed time, floors at `BENCHMARK_MIN_ITERATIONS` and `BENCHMARK_MIN_TIMED_WINDOW_SECONDS`, ceiling at the `BenchmarkIteration` value, hard timeout at `BENCHMARK_MAX_TIMED_WINDOW_SECONDS`). Long-running low-variance benchmarks (`national_scale_spatial_join_*` for Databricks, DuckDB, and PostGIS) opt out via `use_sequential_stopping=False` and run a small fixed iteration count; they also override `warmup_iterations=1` since one warmup is enough on long-running queries and additional warmups dominate the wall-clock budget. Transient per-iteration failures do not abort the run: failed iterations are recorded as `status="failed"` samples and the loop continues until `BENCHMARK_MAX_CONSECUTIVE_FAILURES` in a row triggers `stop_reason="failed"`; a run that finishes normally but had any failed iterations is tagged `stop_reason="partial"`. *Why: bootstrap CI on <10 samples is uninformative, and the per-iteration cost of those benchmarks (cluster time, shared Postgres) outweighs precision gains. Spark/Azure flakiness is intermittent rather than deterministic — losing 4 of 5 timed iters to one transient executor loss erases data we could have kept.* See `README.md#stopping-rule` for the full rule.
- Stopping rule on `@monitor`: high-frequency single-machine queries use sequential stopping (bootstrapped CI on the mean elapsed time, floors at `BENCHMARK_MIN_ITERATIONS` and `BENCHMARK_MIN_TIMED_WINDOW_SECONDS`, ceiling at the `BenchmarkIteration` value, hard timeout at `BENCHMARK_MAX_TIMED_WINDOW_SECONDS`). Long-running low-variance benchmarks (`national_scale_spatial_join_*` for Databricks, DuckDB, and PostGIS) opt out via `use_sequential_stopping=False` and run a small fixed iteration count; they also override `warmup_iterations=1` since one warmup is enough on long-running queries and additional warmups dominate the wall-clock budget. Transient per-iteration failures do not abort the run: failed iterations are recorded as `status="failed"` samples and the loop continues until `BENCHMARK_MAX_CONSECUTIVE_FAILURES` in a row triggers `stop_reason="failed"`; a run that finishes normally but had any failed iterations is tagged `stop_reason="partial"`. *Why: bootstrap CI on <10 samples is uninformative, and the per-iteration cost of those benchmarks (cluster time, shared Postgres) outweighs precision gains. Spark/Azure flakiness is intermittent rather than deterministic — losing 2 of 3 timed iters to one transient executor loss erases data we could have kept.* See `README.md#stopping-rule` for the full rule.

## Commands

Expand Down
56 changes: 27 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ storage.
Spark-based systems (Pandey et al. 2018) or to single-node engines (Jackpine; Ray et al. 2011). doppa runs the same
national-scale spatial join through DuckDB, PostGIS, and Apache Sedona on Databricks at five cluster sizes (2, 4, 8,
12, and 16 workers), against the same `counties.parquet` boundary set and the same `BENCHMARK_DOPPA_DATA_RELEASE`
building dataset. Three Sedona join-strategy variants are compared at each cluster size: `default` (no hint; Spark's
cost-based optimizer picks the plan), `broadcast` (small-side `broadcast()` hint forcing `BroadcastIndexJoin`), and
`partitioned` (Sedona KDB-tree partitioner forcing `RangeJoin`). The `default` variant is the apples-to-apples baseline
against which `broadcast` and `partitioned` are evaluated. With the cluster-reuse measurement window in place, every
building dataset. Two Sedona join-strategy variants are compared at each cluster size: `broadcast` (small-side
`broadcast()` hint forcing `BroadcastIndexJoin`) and `partitioned` (Sedona KDB-tree partitioner forcing `RangeJoin`).
A third variant, `default` (no hint; Spark's cost-based optimizer picks the plan), was considered as an untuned
baseline but dropped because iterations consistently timed out or failed at this workload scale, making reliable
measurement infeasible within the study's budget and time constraints (issue #254). With the cluster-reuse
measurement window in place, every
engine reports elapsed time and cost over the same span — warmup outside the window, timed iterations on a warm engine
— so the comparison is symmetric. Distributed-only metrics (shuffle bytes, stage durations, driver collection time)
are recorded as additional columns rather than as replacements for the cross-engine ones.
Comment thread
jathavaan marked this conversation as resolved.
Expand Down Expand Up @@ -126,16 +128,17 @@ collected so far). The loop stops as soon as all of these hold:
The per-query value in `BenchmarkIteration` (for example `POINT_IN_POLYGON_LOOKUP=2500`, `KNN_SEARCH=4000`) acts as
an **upper bound** on iterations. If iterations hit the ceiling but the 60-second window floor has not yet been met,
the loop continues past the ceiling and logs a one-time warning, so the cost-metric window stays valid. A separate
hard cap `Config.BENCHMARK_MAX_TIMED_WINDOW_SECONDS=3600` (one hour) protects against runaway runs and trips
hard cap `Config.BENCHMARK_MAX_TIMED_WINDOW_SECONDS=5400` (90 minutes) protects against runaway runs and trips
`stop_reason="timeout"` if it fires. The bootstrap RNG is seeded deterministically from `(run_id, query_id)`
(`blake2b` digest in `_make_bootstrap_rng`), so identical reruns reproduce the same stopping point.

National-scale spatial joins (`national_scale_spatial_join_databricks_*`, `national_scale_spatial_join_duckdb`,
`national_scale_spatial_join_postgis`) opt out via `use_sequential_stopping=False` on `@monitor` and run a small
fixed count (`NATIONAL_SCALE_SPATIAL_JOIN=5`). These queries are long-running and low-variance: a bootstrapped CI on
fixed count (`NATIONAL_SCALE_SPATIAL_JOIN=3`). These queries are long-running and low-variance: a bootstrapped CI on
fewer than `MIN_ITERATIONS` samples would be uninformative, and the cost of additional iterations is significant on
Databricks (cluster runtime × workers) and on the shared Postgres instance. The wall-clock timeout is also skipped
for this branch, so the fixed iteration count is the only upper bound on these benchmarks. These same entrypoints
Databricks (cluster runtime × workers) and on the shared Postgres instance. The per-iteration timeout
(`BENCHMARK_MAX_TIMED_WINDOW_SECONDS`) still applies — any single iteration (including warmup) exceeding 90 minutes
triggers `stop_reason="timeout"`. These same entrypoints
override `warmup_iterations=1` on `@monitor`: one warmup is enough to prime the OS page cache, JDBC/PostGIS
connection state, and Spark Catalyst plans on a warm Databricks cluster, while additional warmups would dominate
the wall-clock budget (`Config.BENCHMARK_WARMUP_ITERATIONS=5` is the decorator default and applies to the
Expand Down Expand Up @@ -187,11 +190,10 @@ Cluster provisioning and termination are deliberately outside the cost window. T
running queries on an optimally warm engine, not the cost of cold-starting one per query. Provisioning is a one-time
setup cost in any production deployment, amortized over many queries rather than billed per query.

Three notebook variants live under `src/presentation/databricks/`:
`national_scale_spatial_join_broadcast.py` (wraps `broadcast()` around the small side),
`national_scale_spatial_join_partitioned.py` (sets the Sedona KDB-tree partitioner), and
`national_scale_spatial_join_default.py` (no strategy hint; Spark's cost-based optimizer picks the plan, used as the
apples-to-apples baseline). Each registers a `SparkListener` that aggregates per-stage metrics — executor input bytes,
Two notebook variants live under `src/presentation/databricks/`:
`national_scale_spatial_join_broadcast.py` (wraps `broadcast()` around the small side) and
Comment thread
jathavaan marked this conversation as resolved.
`national_scale_spatial_join_partitioned.py` (sets the Sedona KDB-tree partitioner).
Each registers a `SparkListener` that aggregates per-stage metrics — executor input bytes,
shuffle read and write bytes, stage durations, executor run time — and returns them alongside the query result via
`dbutils.notebook.exit`. These phase metrics are persisted on the per-iteration sample so the distributed runtime can
be decomposed into read, shuffle, and driver collection time.
Expand All @@ -207,7 +209,7 @@ Concretely, the outer orchestrator loop is serial: it picks the next experiment
its peer batch in parallel, waits for the whole batch to finish, marks every member completed, and moves on. At any
moment one batch is in flight; within that batch every member runs on its own ACI in parallel.

The 44 experiments are packed into 18 batches under four constraints that the `related_script_ids` graph encodes:
The 41 experiments are packed into 17 batches under four constraints that the `related_script_ids` graph encodes:

1. **Same query type** per batch — `point-in-polygon-lookup`, `knn-search`, `bbox-filtering`, or
`national-scale-spatial-join` never mix.
Expand Down Expand Up @@ -243,28 +245,25 @@ laptop-workflow reference, not a scalable engine.
The medium tier was dropped from the surviving RQ1 queries and `attribute-spatial-compound-filter` was removed
across the board (issue #281); the 13 freed cells are reinvested in RQ2.

**RQ2 — National-scale spatial join** (29 experiments, 12 batches)
**RQ2 — National-scale spatial join** (26 experiments, 11 batches)

| Engine / strategy | `small` | `medium` | `large` |
|---------------------|---------------------|---------------------|---------------------------|
| Single-node | duckdb · postgis | duckdb · postgis | duckdb · postgis |
| Sedona `broadcast` | 4 / 8 nodes | 2 / 4 / 8 nodes | 2 / 4 / 8 / 12 / 16 nodes |
| Sedona `partitioned`| 4 / 8 nodes | 2 / 4 / 8 nodes | 2 / 4 / 8 / 12 / 16 nodes |
| Sedona `default` | — | — | 2 / 8 / 16 nodes |

Within each size column, single-node and Sedona experiments are packed into the same batches up to the 200 vCPU
Databricks budget — the table groups by strategy for readability, not by batch membership. Concrete batch
membership is whatever `related_script_ids` in `benchmarks.yml` declares; see the batch listing below.

The 2-node row is omitted at `small` for `broadcast` and `partitioned`: at ~5M polygons those configurations were
weakly differentiated from `default`; the freed cells fund the 12-/16-node extension of the scaling curve at `large`.
The `default` strategy applies no `broadcast()` hint and no Sedona partitioner configuration; Spark's cost-based
optimizer picks the plan. It is retained at the `large` tier only (2 / 8 / 16 nodes) as a within-Sedona illustration
of CBO behaviour without hints; `small` and `medium` default cells and the intermediate `large`-tier 4-/12-node
cells are pruned because the strategy is ~11× more expensive per iteration than `broadcast` and unstable at small
cluster sizes (issue #309).

**Batch listing.** Eighteen batches in total. The Databricks vCPU column sums `(workers + 1) × 4` over Sedona members
weakly differentiated; the freed cells fund the 12-/16-node extension of the scaling curve at `large`.
A `default` strategy (no hint; Spark's CBO picks the plan) was originally planned as an untuned baseline but was
dropped entirely because iterations consistently timed out or failed at this workload scale, making reliable
measurement infeasible (issue #254).

**Batch listing.** Seventeen batches in total. The Databricks vCPU column sums `(workers + 1) × 4` over Sedona members
of the batch; single-node and DuckDB/Shapefile ACIs draw from a separate quota. Sequential execution order follows
the seeded shuffle.

Expand All @@ -283,11 +282,10 @@ the seeded shuffle.
| A_M3 | national-scale-spatial-join | medium | 24 | broadcast-2 · partitioned-2 |
| A_L1 | national-scale-spatial-join | large | 80 | broadcast-16 · broadcast-2 |
| A_L2 | national-scale-spatial-join | large | 80 | partitioned-16 · partitioned-2 |
| A_L3 | national-scale-spatial-join | large | 80 | default-16 · default-2 |
| A_L4 | national-scale-spatial-join | large | 72 | broadcast-12 · broadcast-4 |
| A_L5 | national-scale-spatial-join | large | 72 | partitioned-12 · partitioned-4 |
| A_L6 | national-scale-spatial-join | large | 72 | broadcast-8 · partitioned-8 |
| A_L7 | national-scale-spatial-join | large | 36 | default-8 · duckdb · postgis |
| A_L3 | national-scale-spatial-join | large | 72 | broadcast-12 · broadcast-4 |
| A_L4 | national-scale-spatial-join | large | 72 | partitioned-12 · partitioned-4 |
| A_L5 | national-scale-spatial-join | large | 72 | broadcast-8 · partitioned-8 |
| A_L6 | national-scale-spatial-join | large | 0 | duckdb · postgis |

## Dataset layout

Expand Down
12 changes: 0 additions & 12 deletions benchmark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
national_scale_spatial_join_databricks_partitioned_8_nodes,
national_scale_spatial_join_databricks_partitioned_12_nodes,
national_scale_spatial_join_databricks_partitioned_16_nodes,
national_scale_spatial_join_databricks_default_2_nodes,
national_scale_spatial_join_databricks_default_8_nodes,
national_scale_spatial_join_databricks_default_16_nodes,
)


Expand Down Expand Up @@ -108,15 +105,6 @@ def benchmark_runner() -> None:
case "national-scale-spatial-join-databricks-partitioned-16-nodes":
national_scale_spatial_join_databricks_partitioned_16_nodes()
return
case "national-scale-spatial-join-databricks-default-2-nodes":
national_scale_spatial_join_databricks_default_2_nodes()
return
case "national-scale-spatial-join-databricks-default-8-nodes":
national_scale_spatial_join_databricks_default_8_nodes()
return
case "national-scale-spatial-join-databricks-default-16-nodes":
national_scale_spatial_join_databricks_default_16_nodes()
return
case "setup-framework":
setup_benchmarking_framework()
return
Expand Down
Loading
Loading