diff --git a/.github/workflows/pull-request-tests.yml b/.github/workflows/pull-request-tests.yml index e63ad3dd..2da2b4a4 100644 --- a/.github/workflows/pull-request-tests.yml +++ b/.github/workflows/pull-request-tests.yml @@ -205,14 +205,6 @@ jobs: - service: national-scale-spatial-join-databricks-partitioned-16-nodes display_name: Sedona National Scale Spatial Join - Partitioned - 16 Nodes - - service: national-scale-spatial-join-databricks-default-2-nodes - display_name: Sedona National Scale Spatial Join - Default - 2 Nodes - - - service: national-scale-spatial-join-databricks-default-8-nodes - display_name: Sedona National Scale Spatial Join - Default - 8 Nodes - - - service: national-scale-spatial-join-databricks-default-16-nodes - display_name: Sedona National Scale Spatial Join - Default - 16 Nodes steps: - name: Checkout code diff --git a/.github/workflows/push-containers-to-acr.yml b/.github/workflows/push-containers-to-acr.yml index 8d90d1d1..ab07a023 100644 --- a/.github/workflows/push-containers-to-acr.yml +++ b/.github/workflows/push-containers-to-acr.yml @@ -145,17 +145,6 @@ jobs: image: national-scale-spatial-join-databricks-partitioned-16-nodes display_name: Sedona National Scale Spatial Join - Partitioned - 16 Nodes - - service: national-scale-spatial-join-databricks-default-2-nodes - image: national-scale-spatial-join-databricks-default-2-nodes - display_name: Sedona National Scale Spatial Join - Default - 2 Nodes - - - service: national-scale-spatial-join-databricks-default-8-nodes - image: national-scale-spatial-join-databricks-default-8-nodes - display_name: Sedona National Scale Spatial Join - Default - 8 Nodes - - - service: national-scale-spatial-join-databricks-default-16-nodes - image: national-scale-spatial-join-databricks-default-16-nodes - display_name: Sedona National Scale Spatial Join - Default - 16 Nodes steps: - name: Checkout code diff --git a/CLAUDE.md b/CLAUDE.md index bfb8802c..62802ac9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -25,7 +25,7 @@ Python, DuckDB (spatial), PostGIS on Azure Database for PostgreSQL, Apache Sedon - Secrets live in `.env` (gitignored) and are forwarded to ACI as `--secure-environment-variables` from `main.py`. - Benchmark batching: members of a batch list each other bidirectionally in `related_script_ids`. The orchestrator dedupes via `completed_experiments`, so each batch runs once as one parallel `ThreadPoolExecutor` fan-out. A batch must satisfy four constraints simultaneously: (a) same query type, (b) same `dataset_size`, (c) at most one PostGIS member (shared Azure Postgres server), (d) Databricks cluster vCPU sum ≤ 200, computed as `(workers + 1) × 4` per Sedona member on `Standard_D4s_v3`. See `README.md#pairing-and-randomization` for the full batch listing. *Why: peers must execute under the same wall-clock window for fair comparison, without contending on shared infrastructure or breaching regional quota.* - Adding a benchmark requires three edits in lockstep: file in `src/presentation/entrypoints/`, `case` arm in `benchmark_runner.py`, and an entry in `benchmarks.yml`. Missing any one silently breaks dispatch or orchestration. -- Stopping rule on `@monitor`: high-frequency single-machine queries use sequential stopping (bootstrapped CI on the mean elapsed time, floors at `BENCHMARK_MIN_ITERATIONS` and `BENCHMARK_MIN_TIMED_WINDOW_SECONDS`, ceiling at the `BenchmarkIteration` value, hard timeout at `BENCHMARK_MAX_TIMED_WINDOW_SECONDS`). Long-running low-variance benchmarks (`national_scale_spatial_join_*` for Databricks, DuckDB, and PostGIS) opt out via `use_sequential_stopping=False` and run a small fixed iteration count; they also override `warmup_iterations=1` since one warmup is enough on long-running queries and additional warmups dominate the wall-clock budget. Transient per-iteration failures do not abort the run: failed iterations are recorded as `status="failed"` samples and the loop continues until `BENCHMARK_MAX_CONSECUTIVE_FAILURES` in a row triggers `stop_reason="failed"`; a run that finishes normally but had any failed iterations is tagged `stop_reason="partial"`. *Why: bootstrap CI on <10 samples is uninformative, and the per-iteration cost of those benchmarks (cluster time, shared Postgres) outweighs precision gains. Spark/Azure flakiness is intermittent rather than deterministic — losing 4 of 5 timed iters to one transient executor loss erases data we could have kept.* See `README.md#stopping-rule` for the full rule. +- Stopping rule on `@monitor`: high-frequency single-machine queries use sequential stopping (bootstrapped CI on the mean elapsed time, floors at `BENCHMARK_MIN_ITERATIONS` and `BENCHMARK_MIN_TIMED_WINDOW_SECONDS`, ceiling at the `BenchmarkIteration` value, hard timeout at `BENCHMARK_MAX_TIMED_WINDOW_SECONDS`). Long-running low-variance benchmarks (`national_scale_spatial_join_*` for Databricks, DuckDB, and PostGIS) opt out via `use_sequential_stopping=False` and run a small fixed iteration count; they also override `warmup_iterations=1` since one warmup is enough on long-running queries and additional warmups dominate the wall-clock budget. Transient per-iteration failures do not abort the run: failed iterations are recorded as `status="failed"` samples and the loop continues until `BENCHMARK_MAX_CONSECUTIVE_FAILURES` in a row triggers `stop_reason="failed"`; a run that finishes normally but had any failed iterations is tagged `stop_reason="partial"`. *Why: bootstrap CI on <10 samples is uninformative, and the per-iteration cost of those benchmarks (cluster time, shared Postgres) outweighs precision gains. Spark/Azure flakiness is intermittent rather than deterministic — losing 2 of 3 timed iters to one transient executor loss erases data we could have kept.* See `README.md#stopping-rule` for the full rule. ## Commands diff --git a/README.md b/README.md index 6b80b748..cb55f536 100644 --- a/README.md +++ b/README.md @@ -70,10 +70,12 @@ storage. Spark-based systems (Pandey et al. 2018) or to single-node engines (Jackpine; Ray et al. 2011). doppa runs the same national-scale spatial join through DuckDB, PostGIS, and Apache Sedona on Databricks at five cluster sizes (2, 4, 8, 12, and 16 workers), against the same `counties.parquet` boundary set and the same `BENCHMARK_DOPPA_DATA_RELEASE` -building dataset. Three Sedona join-strategy variants are compared at each cluster size: `default` (no hint; Spark's -cost-based optimizer picks the plan), `broadcast` (small-side `broadcast()` hint forcing `BroadcastIndexJoin`), and -`partitioned` (Sedona KDB-tree partitioner forcing `RangeJoin`). The `default` variant is the apples-to-apples baseline -against which `broadcast` and `partitioned` are evaluated. With the cluster-reuse measurement window in place, every +building dataset. Two Sedona join-strategy variants are compared at each cluster size: `broadcast` (small-side +`broadcast()` hint forcing `BroadcastIndexJoin`) and `partitioned` (Sedona KDB-tree partitioner forcing `RangeJoin`). +A third variant, `default` (no hint; Spark's cost-based optimizer picks the plan), was considered as an untuned +baseline but dropped because iterations consistently timed out or failed at this workload scale, making reliable +measurement infeasible within the study's budget and time constraints (issue #254). With the cluster-reuse +measurement window in place, every engine reports elapsed time and cost over the same span — warmup outside the window, timed iterations on a warm engine — so the comparison is symmetric. Distributed-only metrics (shuffle bytes, stage durations, driver collection time) are recorded as additional columns rather than as replacements for the cross-engine ones. @@ -126,16 +128,17 @@ collected so far). The loop stops as soon as all of these hold: The per-query value in `BenchmarkIteration` (for example `POINT_IN_POLYGON_LOOKUP=2500`, `KNN_SEARCH=4000`) acts as an **upper bound** on iterations. If iterations hit the ceiling but the 60-second window floor has not yet been met, the loop continues past the ceiling and logs a one-time warning, so the cost-metric window stays valid. A separate -hard cap `Config.BENCHMARK_MAX_TIMED_WINDOW_SECONDS=3600` (one hour) protects against runaway runs and trips +hard cap `Config.BENCHMARK_MAX_TIMED_WINDOW_SECONDS=5400` (90 minutes) protects against runaway runs and trips `stop_reason="timeout"` if it fires. The bootstrap RNG is seeded deterministically from `(run_id, query_id)` (`blake2b` digest in `_make_bootstrap_rng`), so identical reruns reproduce the same stopping point. National-scale spatial joins (`national_scale_spatial_join_databricks_*`, `national_scale_spatial_join_duckdb`, `national_scale_spatial_join_postgis`) opt out via `use_sequential_stopping=False` on `@monitor` and run a small -fixed count (`NATIONAL_SCALE_SPATIAL_JOIN=5`). These queries are long-running and low-variance: a bootstrapped CI on +fixed count (`NATIONAL_SCALE_SPATIAL_JOIN=3`). These queries are long-running and low-variance: a bootstrapped CI on fewer than `MIN_ITERATIONS` samples would be uninformative, and the cost of additional iterations is significant on -Databricks (cluster runtime × workers) and on the shared Postgres instance. The wall-clock timeout is also skipped -for this branch, so the fixed iteration count is the only upper bound on these benchmarks. These same entrypoints +Databricks (cluster runtime × workers) and on the shared Postgres instance. The per-iteration timeout +(`BENCHMARK_MAX_TIMED_WINDOW_SECONDS`) still applies — any single iteration (including warmup) exceeding 90 minutes +triggers `stop_reason="timeout"`. These same entrypoints override `warmup_iterations=1` on `@monitor`: one warmup is enough to prime the OS page cache, JDBC/PostGIS connection state, and Spark Catalyst plans on a warm Databricks cluster, while additional warmups would dominate the wall-clock budget (`Config.BENCHMARK_WARMUP_ITERATIONS=5` is the decorator default and applies to the @@ -187,11 +190,10 @@ Cluster provisioning and termination are deliberately outside the cost window. T running queries on an optimally warm engine, not the cost of cold-starting one per query. Provisioning is a one-time setup cost in any production deployment, amortized over many queries rather than billed per query. -Three notebook variants live under `src/presentation/databricks/`: -`national_scale_spatial_join_broadcast.py` (wraps `broadcast()` around the small side), -`national_scale_spatial_join_partitioned.py` (sets the Sedona KDB-tree partitioner), and -`national_scale_spatial_join_default.py` (no strategy hint; Spark's cost-based optimizer picks the plan, used as the -apples-to-apples baseline). Each registers a `SparkListener` that aggregates per-stage metrics — executor input bytes, +Two notebook variants live under `src/presentation/databricks/`: +`national_scale_spatial_join_broadcast.py` (wraps `broadcast()` around the small side) and +`national_scale_spatial_join_partitioned.py` (sets the Sedona KDB-tree partitioner). +Each registers a `SparkListener` that aggregates per-stage metrics — executor input bytes, shuffle read and write bytes, stage durations, executor run time — and returns them alongside the query result via `dbutils.notebook.exit`. These phase metrics are persisted on the per-iteration sample so the distributed runtime can be decomposed into read, shuffle, and driver collection time. @@ -207,7 +209,7 @@ Concretely, the outer orchestrator loop is serial: it picks the next experiment its peer batch in parallel, waits for the whole batch to finish, marks every member completed, and moves on. At any moment one batch is in flight; within that batch every member runs on its own ACI in parallel. -The 44 experiments are packed into 18 batches under four constraints that the `related_script_ids` graph encodes: +The 41 experiments are packed into 17 batches under four constraints that the `related_script_ids` graph encodes: 1. **Same query type** per batch — `point-in-polygon-lookup`, `knn-search`, `bbox-filtering`, or `national-scale-spatial-join` never mix. @@ -243,28 +245,25 @@ laptop-workflow reference, not a scalable engine. The medium tier was dropped from the surviving RQ1 queries and `attribute-spatial-compound-filter` was removed across the board (issue #281); the 13 freed cells are reinvested in RQ2. -**RQ2 — National-scale spatial join** (29 experiments, 12 batches) +**RQ2 — National-scale spatial join** (26 experiments, 11 batches) | Engine / strategy | `small` | `medium` | `large` | |---------------------|---------------------|---------------------|---------------------------| | Single-node | duckdb · postgis | duckdb · postgis | duckdb · postgis | | Sedona `broadcast` | 4 / 8 nodes | 2 / 4 / 8 nodes | 2 / 4 / 8 / 12 / 16 nodes | | Sedona `partitioned`| 4 / 8 nodes | 2 / 4 / 8 nodes | 2 / 4 / 8 / 12 / 16 nodes | -| Sedona `default` | — | — | 2 / 8 / 16 nodes | Within each size column, single-node and Sedona experiments are packed into the same batches up to the 200 vCPU Databricks budget — the table groups by strategy for readability, not by batch membership. Concrete batch membership is whatever `related_script_ids` in `benchmarks.yml` declares; see the batch listing below. The 2-node row is omitted at `small` for `broadcast` and `partitioned`: at ~5M polygons those configurations were -weakly differentiated from `default`; the freed cells fund the 12-/16-node extension of the scaling curve at `large`. -The `default` strategy applies no `broadcast()` hint and no Sedona partitioner configuration; Spark's cost-based -optimizer picks the plan. It is retained at the `large` tier only (2 / 8 / 16 nodes) as a within-Sedona illustration -of CBO behaviour without hints; `small` and `medium` default cells and the intermediate `large`-tier 4-/12-node -cells are pruned because the strategy is ~11× more expensive per iteration than `broadcast` and unstable at small -cluster sizes (issue #309). - -**Batch listing.** Eighteen batches in total. The Databricks vCPU column sums `(workers + 1) × 4` over Sedona members +weakly differentiated; the freed cells fund the 12-/16-node extension of the scaling curve at `large`. +A `default` strategy (no hint; Spark's CBO picks the plan) was originally planned as an untuned baseline but was +dropped entirely because iterations consistently timed out or failed at this workload scale, making reliable +measurement infeasible (issue #254). + +**Batch listing.** Seventeen batches in total. The Databricks vCPU column sums `(workers + 1) × 4` over Sedona members of the batch; single-node and DuckDB/Shapefile ACIs draw from a separate quota. Sequential execution order follows the seeded shuffle. @@ -283,11 +282,10 @@ the seeded shuffle. | A_M3 | national-scale-spatial-join | medium | 24 | broadcast-2 · partitioned-2 | | A_L1 | national-scale-spatial-join | large | 80 | broadcast-16 · broadcast-2 | | A_L2 | national-scale-spatial-join | large | 80 | partitioned-16 · partitioned-2 | -| A_L3 | national-scale-spatial-join | large | 80 | default-16 · default-2 | -| A_L4 | national-scale-spatial-join | large | 72 | broadcast-12 · broadcast-4 | -| A_L5 | national-scale-spatial-join | large | 72 | partitioned-12 · partitioned-4 | -| A_L6 | national-scale-spatial-join | large | 72 | broadcast-8 · partitioned-8 | -| A_L7 | national-scale-spatial-join | large | 36 | default-8 · duckdb · postgis | +| A_L3 | national-scale-spatial-join | large | 72 | broadcast-12 · broadcast-4 | +| A_L4 | national-scale-spatial-join | large | 72 | partitioned-12 · partitioned-4 | +| A_L5 | national-scale-spatial-join | large | 72 | broadcast-8 · partitioned-8 | +| A_L6 | national-scale-spatial-join | large | 0 | duckdb · postgis | ## Dataset layout diff --git a/benchmark_runner.py b/benchmark_runner.py index 51021473..03cc9b3c 100644 --- a/benchmark_runner.py +++ b/benchmark_runner.py @@ -25,9 +25,6 @@ national_scale_spatial_join_databricks_partitioned_8_nodes, national_scale_spatial_join_databricks_partitioned_12_nodes, national_scale_spatial_join_databricks_partitioned_16_nodes, - national_scale_spatial_join_databricks_default_2_nodes, - national_scale_spatial_join_databricks_default_8_nodes, - national_scale_spatial_join_databricks_default_16_nodes, ) @@ -108,15 +105,6 @@ def benchmark_runner() -> None: case "national-scale-spatial-join-databricks-partitioned-16-nodes": national_scale_spatial_join_databricks_partitioned_16_nodes() return - case "national-scale-spatial-join-databricks-default-2-nodes": - national_scale_spatial_join_databricks_default_2_nodes() - return - case "national-scale-spatial-join-databricks-default-8-nodes": - national_scale_spatial_join_databricks_default_8_nodes() - return - case "national-scale-spatial-join-databricks-default-16-nodes": - national_scale_spatial_join_databricks_default_16_nodes() - return case "setup-framework": setup_benchmarking_framework() return diff --git a/benchmarks.yml b/benchmarks.yml index b0caaf80..3044d255 100644 --- a/benchmarks.yml +++ b/benchmarks.yml @@ -13,8 +13,8 @@ experiments: # ---- point-in-polygon-lookup ---- - id: point-in-polygon-lookup-duckdb-small image: doppaacr.azurecr.io/point-in-polygon-lookup-duckdb:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - point-in-polygon-lookup-postgis-small @@ -22,8 +22,8 @@ experiments: - id: point-in-polygon-lookup-postgis-small image: doppaacr.azurecr.io/point-in-polygon-lookup-postgis:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - point-in-polygon-lookup-duckdb-small @@ -31,8 +31,8 @@ experiments: - id: point-in-polygon-lookup-local-small image: doppaacr.azurecr.io/point-in-polygon-lookup-local:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - point-in-polygon-lookup-duckdb-small @@ -40,16 +40,16 @@ experiments: - id: point-in-polygon-lookup-duckdb-large image: doppaacr.azurecr.io/point-in-polygon-lookup-duckdb:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - point-in-polygon-lookup-postgis-large - id: point-in-polygon-lookup-postgis-large image: doppaacr.azurecr.io/point-in-polygon-lookup-postgis:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - point-in-polygon-lookup-duckdb-large @@ -57,8 +57,8 @@ experiments: # ---- knn-search ---- - id: knn-search-duckdb-small image: doppaacr.azurecr.io/knn-search-duckdb:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - knn-search-postgis-small @@ -66,8 +66,8 @@ experiments: - id: knn-search-postgis-small image: doppaacr.azurecr.io/knn-search-postgis:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - knn-search-duckdb-small @@ -75,8 +75,8 @@ experiments: - id: knn-search-local-small image: doppaacr.azurecr.io/knn-search-local:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - knn-search-duckdb-small @@ -84,16 +84,16 @@ experiments: - id: knn-search-duckdb-large image: doppaacr.azurecr.io/knn-search-duckdb:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - knn-search-postgis-large - id: knn-search-postgis-large image: doppaacr.azurecr.io/knn-search-postgis:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - knn-search-duckdb-large @@ -101,8 +101,8 @@ experiments: # ---- bbox-filtering ---- - id: bbox-filtering-duckdb-small image: doppaacr.azurecr.io/bbox-filtering-duckdb:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - bbox-filtering-postgis-small @@ -110,8 +110,8 @@ experiments: - id: bbox-filtering-postgis-small image: doppaacr.azurecr.io/bbox-filtering-postgis:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - bbox-filtering-duckdb-small @@ -119,8 +119,8 @@ experiments: - id: bbox-filtering-local-small image: doppaacr.azurecr.io/bbox-filtering-local:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - bbox-filtering-duckdb-small @@ -128,16 +128,16 @@ experiments: - id: bbox-filtering-duckdb-large image: doppaacr.azurecr.io/bbox-filtering-duckdb:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - bbox-filtering-postgis-large - id: bbox-filtering-postgis-large image: doppaacr.azurecr.io/bbox-filtering-postgis:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - bbox-filtering-duckdb-large @@ -158,8 +158,8 @@ experiments: # ---- single-node ---- - id: national-scale-spatial-join-duckdb-small image: doppaacr.azurecr.io/national-scale-spatial-join-duckdb:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - national-scale-spatial-join-databricks-broadcast-8-nodes-small @@ -168,8 +168,8 @@ experiments: - id: national-scale-spatial-join-postgis-small image: doppaacr.azurecr.io/national-scale-spatial-join-postgis:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - national-scale-spatial-join-databricks-broadcast-8-nodes-small @@ -178,8 +178,8 @@ experiments: - id: national-scale-spatial-join-duckdb-medium image: doppaacr.azurecr.io/national-scale-spatial-join-duckdb:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: medium related_script_ids: - national-scale-spatial-join-databricks-broadcast-8-nodes-medium @@ -188,8 +188,8 @@ experiments: - id: national-scale-spatial-join-postgis-medium image: doppaacr.azurecr.io/national-scale-spatial-join-postgis:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: medium related_script_ids: - national-scale-spatial-join-databricks-broadcast-8-nodes-medium @@ -198,67 +198,65 @@ experiments: - id: national-scale-spatial-join-duckdb-large image: doppaacr.azurecr.io/national-scale-spatial-join-duckdb:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - - national-scale-spatial-join-databricks-default-8-nodes-large - national-scale-spatial-join-postgis-large - id: national-scale-spatial-join-postgis-large image: doppaacr.azurecr.io/national-scale-spatial-join-postgis:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - - national-scale-spatial-join-databricks-default-8-nodes-large - national-scale-spatial-join-duckdb-large # ---- Sedona broadcast strategy ---- - id: national-scale-spatial-join-databricks-broadcast-2-nodes-medium image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-broadcast-2-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: medium related_script_ids: - national-scale-spatial-join-databricks-partitioned-2-nodes-medium - id: national-scale-spatial-join-databricks-broadcast-2-nodes-large image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-broadcast-2-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - national-scale-spatial-join-databricks-broadcast-16-nodes-large - id: national-scale-spatial-join-databricks-broadcast-4-nodes-small image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-broadcast-4-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - national-scale-spatial-join-databricks-partitioned-4-nodes-small - id: national-scale-spatial-join-databricks-broadcast-4-nodes-medium image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-broadcast-4-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: medium related_script_ids: - national-scale-spatial-join-databricks-partitioned-4-nodes-medium - id: national-scale-spatial-join-databricks-broadcast-4-nodes-large image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-broadcast-4-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - national-scale-spatial-join-databricks-broadcast-12-nodes-large - id: national-scale-spatial-join-databricks-broadcast-8-nodes-small image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-broadcast-8-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - national-scale-spatial-join-databricks-partitioned-8-nodes-small @@ -267,8 +265,8 @@ experiments: - id: national-scale-spatial-join-databricks-broadcast-8-nodes-medium image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-broadcast-8-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: medium related_script_ids: - national-scale-spatial-join-databricks-partitioned-8-nodes-medium @@ -277,24 +275,24 @@ experiments: - id: national-scale-spatial-join-databricks-broadcast-8-nodes-large image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-broadcast-8-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - national-scale-spatial-join-databricks-partitioned-8-nodes-large - id: national-scale-spatial-join-databricks-broadcast-12-nodes-large image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-broadcast-12-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - national-scale-spatial-join-databricks-broadcast-4-nodes-large - id: national-scale-spatial-join-databricks-broadcast-16-nodes-large image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-broadcast-16-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - national-scale-spatial-join-databricks-broadcast-2-nodes-large @@ -302,48 +300,48 @@ experiments: # ---- Sedona partitioned strategy ---- - id: national-scale-spatial-join-databricks-partitioned-2-nodes-medium image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-partitioned-2-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: medium related_script_ids: - national-scale-spatial-join-databricks-broadcast-2-nodes-medium - id: national-scale-spatial-join-databricks-partitioned-2-nodes-large image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-partitioned-2-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - national-scale-spatial-join-databricks-partitioned-16-nodes-large - id: national-scale-spatial-join-databricks-partitioned-4-nodes-small image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-partitioned-4-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - national-scale-spatial-join-databricks-broadcast-4-nodes-small - id: national-scale-spatial-join-databricks-partitioned-4-nodes-medium image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-partitioned-4-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: medium related_script_ids: - national-scale-spatial-join-databricks-broadcast-4-nodes-medium - id: national-scale-spatial-join-databricks-partitioned-4-nodes-large image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-partitioned-4-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - national-scale-spatial-join-databricks-partitioned-12-nodes-large - id: national-scale-spatial-join-databricks-partitioned-8-nodes-small image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-partitioned-8-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: small related_script_ids: - national-scale-spatial-join-databricks-broadcast-8-nodes-small @@ -352,8 +350,8 @@ experiments: - id: national-scale-spatial-join-databricks-partitioned-8-nodes-medium image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-partitioned-8-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: medium related_script_ids: - national-scale-spatial-join-databricks-broadcast-8-nodes-medium @@ -362,50 +360,25 @@ experiments: - id: national-scale-spatial-join-databricks-partitioned-8-nodes-large image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-partitioned-8-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - national-scale-spatial-join-databricks-broadcast-8-nodes-large - id: national-scale-spatial-join-databricks-partitioned-12-nodes-large image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-partitioned-12-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - national-scale-spatial-join-databricks-partitioned-4-nodes-large - id: national-scale-spatial-join-databricks-partitioned-16-nodes-large image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-partitioned-16-nodes:latest - cpu: 3 - memory_gb: 8 + cpu: 4 + memory_gb: 16 dataset_size: large related_script_ids: - national-scale-spatial-join-databricks-partitioned-2-nodes-large - # ---- Sedona default strategy (large tier only; issue #309) ---- - - id: national-scale-spatial-join-databricks-default-2-nodes-large - image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-default-2-nodes:latest - cpu: 3 - memory_gb: 8 - dataset_size: large - related_script_ids: - - national-scale-spatial-join-databricks-default-16-nodes-large - - - id: national-scale-spatial-join-databricks-default-8-nodes-large - image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-default-8-nodes:latest - cpu: 3 - memory_gb: 8 - dataset_size: large - related_script_ids: - - national-scale-spatial-join-duckdb-large - - national-scale-spatial-join-postgis-large - - - id: national-scale-spatial-join-databricks-default-16-nodes-large - image: doppaacr.azurecr.io/national-scale-spatial-join-databricks-default-16-nodes:latest - cpu: 3 - memory_gb: 8 - dataset_size: large - related_script_ids: - - national-scale-spatial-join-databricks-default-2-nodes-large diff --git a/docker-compose.yml b/docker-compose.yml index 2ea53bcb..2fce66e5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -218,33 +218,6 @@ services: image: national-scale-spatial-join-databricks-partitioned-16-nodes:latest command: python benchmark_runner.py --script-id national-scale-spatial-join-databricks-partitioned-16-nodes --benchmark-run 1 --run-id ABCDEF - national-scale-spatial-join-databricks-default-2-nodes: - env_file: - - .env - build: - context: . - dockerfile: .docker/Query.Dockerfile - image: national-scale-spatial-join-databricks-default-2-nodes:latest - command: python benchmark_runner.py --script-id national-scale-spatial-join-databricks-default-2-nodes --benchmark-run 1 --run-id ABCDEF - - national-scale-spatial-join-databricks-default-8-nodes: - env_file: - - .env - build: - context: . - dockerfile: .docker/Query.Dockerfile - image: national-scale-spatial-join-databricks-default-8-nodes:latest - command: python benchmark_runner.py --script-id national-scale-spatial-join-databricks-default-8-nodes --benchmark-run 1 --run-id ABCDEF - - national-scale-spatial-join-databricks-default-16-nodes: - env_file: - - .env - build: - context: . - dockerfile: .docker/Query.Dockerfile - image: national-scale-spatial-join-databricks-default-16-nodes:latest - command: python benchmark_runner.py --script-id national-scale-spatial-join-databricks-default-16-nodes --benchmark-run 1 --run-id ABCDEF - vmt-api-server: env_file: - .env diff --git a/main.py b/main.py index 68749b10..d010c3f7 100644 --- a/main.py +++ b/main.py @@ -462,10 +462,10 @@ def _check_container_state( f"Container '{container_group_name}' completed in {_format_duration(elapsed)}." ) break - case "Failed": + case "Failed" | "Stopped" | "Terminated": time.sleep(5) _stream_container_logs(container_group_name, lines_seen) - error_message = f"Container '{container_group_name}' failed. Please check the logs for more information." + error_message = f"Container '{container_group_name}' {state.lower()}. Please check the logs for more information." logger.error(error_message) raise RuntimeError(error_message) case _: diff --git a/src/application/common/monitor.py b/src/application/common/monitor.py index b04c42bd..6ed0e371 100644 --- a/src/application/common/monitor.py +++ b/src/application/common/monitor.py @@ -85,6 +85,8 @@ def wrapper(*args, **kwargs): else Config.BENCHMARK_WARMUP_ITERATIONS ) + stop_reason: StopReason | None = None + if skip_warmup: logger.info( f"Executing benchmark for '{query_id}' with no warmup (ceiling={ceiling})." @@ -122,7 +124,16 @@ def wrapper(*args, **kwargs): f"Skipping timed iterations." ) break - if failure is None: + if w_elapsed >= Config.BENCHMARK_MAX_TIMED_WINDOW_SECONDS: + stop_reason = StopReason.TIMEOUT + logger.warning( + f"Warmup iteration for '{query_id}' took " + f"{w_elapsed:.1f}s, exceeding " + f"BENCHMARK_MAX_TIMED_WINDOW_SECONDS=" + f"{Config.BENCHMARK_MAX_TIMED_WINDOW_SECONDS}s; stopping." + ) + break + if failure is None and stop_reason is None: if use_sequential_stopping: logger.info( f"Warmup complete for '{query_id}'. Starting sequential timed iterations " @@ -140,11 +151,10 @@ def wrapper(*args, **kwargs): failed_iterations: int = 0 consecutive_failures: int = 0 bootstrap_rng = _make_bootstrap_rng(run_id=run_id, query_id=query_id) - stop_reason: StopReason | None = None soft_ceiling_warned = False timed_loop_start = datetime.datetime.now(datetime.UTC) - if failure is None: + if failure is None and stop_reason is None: iteration = 0 while True: iteration += 1 diff --git a/src/application/contracts/databricks_service_interface.py b/src/application/contracts/databricks_service_interface.py index dc5a7871..7a9829a2 100644 --- a/src/application/contracts/databricks_service_interface.py +++ b/src/application/contracts/databricks_service_interface.py @@ -10,7 +10,7 @@ class IDatabricksService(ABC): def create_cluster( self, num_workers: int, - notebook_variant: Literal["broadcast", "partitioned", "default"], + notebook_variant: Literal["broadcast", "partitioned"], ) -> str: """ Provision an interactive cluster, install required libraries, wait for the cluster to @@ -20,9 +20,7 @@ def create_cluster( :param num_workers: Number of worker nodes to provision for the cluster. :param notebook_variant: Which Sedona join strategy notebook to upload. ``"broadcast"`` uploads the variant that wraps ``broadcast()`` on the right side of the join; - ``"partitioned"`` uploads the variant that sets the Sedona spatial partitioner; - ``"default"`` uploads the variant that applies no strategy hint and lets Spark's - cost-based optimizer pick the plan. + ``"partitioned"`` uploads the variant that sets the Sedona spatial partitioner. :return: The Databricks cluster ID, suitable for passing to :meth:`submit_to_existing_cluster` and :meth:`terminate_cluster`. :rtype: str @@ -38,7 +36,7 @@ def submit_to_existing_cluster( cluster_id: str, num_workers: int, dataset_size: DatasetSize, - notebook_variant: Literal["broadcast", "partitioned", "default"], + notebook_variant: Literal["broadcast", "partitioned"], ) -> DatabricksRunResult: """ Submit a single notebook run against an already-running cluster and block until it diff --git a/src/config.py b/src/config.py index 4cae1e9f..17896952 100644 --- a/src/config.py +++ b/src/config.py @@ -113,12 +113,12 @@ class Config: BENCHMARK_WARMUP_ITERATIONS: int = 5 BENCHMARK_ITERATIONS: int = 100 BENCHMARK_METADATA_BLOB_NAME: str = "benchmark_metadata.parquet" - BENCHMARK_DOPPA_DATA_RELEASE: str = "2026-05-16.1" + BENCHMARK_DOPPA_DATA_RELEASE: str = "2026-05-23.1" # Sequential stopping rule (bootstrapped CI on mean elapsed time) BENCHMARK_MIN_ITERATIONS: int = 10 BENCHMARK_MIN_TIMED_WINDOW_SECONDS: int = 60 - BENCHMARK_MAX_TIMED_WINDOW_SECONDS: int = 3600 + BENCHMARK_MAX_TIMED_WINDOW_SECONDS: int = 5400 BENCHMARK_TARGET_CI_HALF_WIDTH_RELATIVE: float = 0.05 BENCHMARK_BOOTSTRAP_RESAMPLES: int = 1000 BENCHMARK_CI_CONFIDENCE: float = 0.95 @@ -137,8 +137,8 @@ class Config: ) DATABRICKS_POLL_INTERVAL_SECONDS: int = 30 DATABRICKS_HTTP_TIMEOUT_SECONDS: int = 30 - DATABRICKS_DRIVER_MEMORY: str = "9g" - DATABRICKS_DRIVER_MEMORY_OVERHEAD: str = "512m" + DATABRICKS_DRIVER_MEMORY: str = "14g" + DATABRICKS_DRIVER_MEMORY_OVERHEAD: str = "1g" DATABRICKS_DRIVER_MAX_RESULT_SIZE: str = "8g" DATABRICKS_SEDONA_MAVEN_COORDINATES: str = ( "org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.7.1" @@ -151,18 +151,12 @@ class Config: DATABRICKS_LOCAL_SCRIPT_PATH_PARTITIONED: str = ( "src/presentation/databricks/national_scale_spatial_join_partitioned.py" ) - DATABRICKS_LOCAL_SCRIPT_PATH_DEFAULT: str = ( - "src/presentation/databricks/national_scale_spatial_join_default.py" - ) DATABRICKS_WORKSPACE_NOTEBOOK_PATH_BROADCAST: str = ( "/Shared/doppa/national_scale_spatial_join_broadcast" ) DATABRICKS_WORKSPACE_NOTEBOOK_PATH_PARTITIONED: str = ( "/Shared/doppa/national_scale_spatial_join_partitioned" ) - DATABRICKS_WORKSPACE_NOTEBOOK_PATH_DEFAULT: str = ( - "/Shared/doppa/national_scale_spatial_join_default" - ) DATABRICKS_MUNICIPALITIES_FILE: str = "municipalities.parquet" MUNICIPALITIES_CONTRIBUTION_BLOB: str = "municipalities.parquet" AZURE_BLOB_STORAGE_ACCOUNT_KEY: str = os.getenv("AZURE_BLOB_STORAGE_ACCOUNT_KEY") diff --git a/src/domain/enums/benchmark_iteration.py b/src/domain/enums/benchmark_iteration.py index 45f3fec9..17dcc1f7 100644 --- a/src/domain/enums/benchmark_iteration.py +++ b/src/domain/enums/benchmark_iteration.py @@ -10,7 +10,7 @@ class BenchmarkIteration(Enum): BBOX_FILTERING_SIMPLE = 1000 DB_SCAN = 1_000 KNN_SEARCH = 4000 - NATIONAL_SCALE_SPATIAL_JOIN = 5 + NATIONAL_SCALE_SPATIAL_JOIN = 3 ORDERED_RANGE_QUERY = 1500 POINT_IN_POLYGON_LOOKUP = 2500 SPATIAL_AGGREGATION_GRID = 100 diff --git a/src/infra/infrastructure/services/databricks_service.py b/src/infra/infrastructure/services/databricks_service.py index 8bbaa5c6..08e4a35a 100644 --- a/src/infra/infrastructure/services/databricks_service.py +++ b/src/infra/infrastructure/services/databricks_service.py @@ -20,7 +20,7 @@ from src.domain.exceptions import QuotaExhaustedError -NotebookVariant = Literal["broadcast", "partitioned", "default"] +NotebookVariant = Literal["broadcast", "partitioned"] def _local_script_path(notebook_variant: NotebookVariant) -> str: @@ -28,8 +28,6 @@ def _local_script_path(notebook_variant: NotebookVariant) -> str: return Config.DATABRICKS_LOCAL_SCRIPT_PATH_BROADCAST if notebook_variant == "partitioned": return Config.DATABRICKS_LOCAL_SCRIPT_PATH_PARTITIONED - if notebook_variant == "default": - return Config.DATABRICKS_LOCAL_SCRIPT_PATH_DEFAULT raise ValueError(f"Unknown notebook_variant: {notebook_variant!r}") @@ -38,8 +36,6 @@ def _workspace_notebook_path(notebook_variant: NotebookVariant) -> str: return Config.DATABRICKS_WORKSPACE_NOTEBOOK_PATH_BROADCAST if notebook_variant == "partitioned": return Config.DATABRICKS_WORKSPACE_NOTEBOOK_PATH_PARTITIONED - if notebook_variant == "default": - return Config.DATABRICKS_WORKSPACE_NOTEBOOK_PATH_DEFAULT raise ValueError(f"Unknown notebook_variant: {notebook_variant!r}") diff --git a/src/presentation/configuration/app_config.py b/src/presentation/configuration/app_config.py index 3c5bfad0..1574277c 100644 --- a/src/presentation/configuration/app_config.py +++ b/src/presentation/configuration/app_config.py @@ -53,10 +53,6 @@ def initialize_dependencies( "src.presentation.entrypoints.national_scale_spatial_join_databricks_partitioned_8_nodes", "src.presentation.entrypoints.national_scale_spatial_join_databricks_partitioned_12_nodes", "src.presentation.entrypoints.national_scale_spatial_join_databricks_partitioned_16_nodes", - "src.presentation.entrypoints.national_scale_spatial_join_databricks_default_2_nodes", - "src.presentation.entrypoints.national_scale_spatial_join_databricks_default_8_nodes", - "src.presentation.entrypoints.national_scale_spatial_join_databricks_default_16_nodes", - "src.presentation.entrypoints.setup_benchmarking_framework", "src.presentation.endpoints.tile_server" diff --git a/src/presentation/databricks/national_scale_spatial_join_default.py b/src/presentation/databricks/national_scale_spatial_join_default.py deleted file mode 100644 index 508dd275..00000000 --- a/src/presentation/databricks/national_scale_spatial_join_default.py +++ /dev/null @@ -1,215 +0,0 @@ -# Databricks notebook source - -# COMMAND ---------- - -# Thesis terminology -> Spark metric mapping -# - "executor read time" ~ executor_run_time_ms - shuffleReadTime -# (the cold scan + per-record work) -# - "shuffle" = shuffle_read_bytes + shuffle_write_bytes (volume), -# shuffle stage duration (time) -# - "driver collection" = driver_collection_time_ms (residual, -# includes planning + final collect) -# -# Join strategy: default. No `broadcast()` hint and no Sedona partitioner -# configuration (`sedona.global.index`, `sedona.join.gridtype`, -# `sedona.join.indexbuildside`). Spark's cost-based optimizer picks the plan; -# for ST_Intersects this typically falls back to `SortMergeJoin`. This variant -# is the apples-to-apples reference point against which the `broadcast` and -# `partitioned` strategies are compared. Repartitioning to cluster parallelism -# is kept (parallelism control, not a join-strategy hint) so the only axis -# that differs between variants is the strategy itself. -# -# Notes: -# - stage_durations_ms is capped at the first 100 stages (dbutils.notebook.exit -# has a payload cap around 1 MB); a warning is logged if truncation happens. -# - driver_collection_time_ms is computed as -# wall_clock_ms - sum(stage_durations_ms); clamped to 0 with a warning if -# negative (autoscaling can make stages overlap and break the simple -# decomposition). -# - inputMetrics.bytesRead is post-pushdown decompressed bytes inside the -# executor, not on-the-wire bytes. The on-the-wire counter comes from the -# ACI-level psutil.net_io_counters in monitor.py. - -# COMMAND ---------- - -import json -import time - -from pyspark.sql import functions as F -from sedona.spark import SedonaContext - -# COMMAND ---------- - -account_key = dbutils.widgets.get("account_key") -account_name = dbutils.widgets.get("account_name") -release = dbutils.widgets.get("release") -municipalities_file = dbutils.widgets.get("municipalities_file") -dataset_size = dbutils.widgets.get("dataset_size") - -# COMMAND ---------- - -spark.conf.set( - f"fs.azure.account.key.{account_name}.dfs.core.windows.net", - account_key, -) - -sedona = SedonaContext.create(spark) - -# COMMAND ---------- - -buildings_path = ( - f"abfss://data@{account_name}.dfs.core.windows.net" - f"/release/{release}/size={dataset_size}/theme=buildings/region=*/*.parquet" -) -municipalities_path = ( - f"abfss://metadata@{account_name}.dfs.core.windows.net" - f"/{municipalities_file}" -) - -buildings_df = sedona.read.format("geoparquet").load(buildings_path) - -municipalities_raw = spark.read.parquet(municipalities_path) -municipalities_df = municipalities_raw.selectExpr( - "ST_GeomFromWKB(wkb) AS geometry", - "region AS municipality_name", -) - -# Repartition to match cluster parallelism so all nodes receive work. -# defaultParallelism = num_workers x cores_per_node (e.g. 8 nodes x 4 cores = 32). -parallelism = spark.sparkContext.defaultParallelism -print(f"Cluster parallelism: {parallelism}") -print(f"Buildings partitions before repartition: {buildings_df.rdd.getNumPartitions()}") - -buildings_df = buildings_df.repartition(parallelism) - -# COMMAND ---------- - -# MAGIC %scala -# MAGIC import org.apache.spark.scheduler.{ -# MAGIC SparkListener, -# MAGIC SparkListenerStageCompleted -# MAGIC } -# MAGIC import scala.collection.mutable -# MAGIC -# MAGIC // Single-use listener: instantiated immediately before the timed action. -# MAGIC // Aggregates per-stage metrics across every job the action triggers and -# MAGIC // publishes the result as the global temp view `_phase_metrics`, which -# MAGIC // the following PySpark cell reads. -# MAGIC val phaseListener = new SparkListener { -# MAGIC private val stages = -# MAGIC mutable.ArrayBuffer.empty[(Int, Long, Long, Long, Long, Long, Long, Long)] -# MAGIC -# MAGIC override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { -# MAGIC val info = event.stageInfo -# MAGIC val metrics = info.taskMetrics -# MAGIC val submission = info.submissionTime.getOrElse(0L) -# MAGIC val completion = info.completionTime.getOrElse(0L) -# MAGIC val stageDuration = if (completion >= submission) completion - submission else 0L -# MAGIC -# MAGIC stages.synchronized { -# MAGIC stages += (( -# MAGIC info.stageId, -# MAGIC stageDuration, -# MAGIC metrics.executorRunTime, -# MAGIC metrics.executorCpuTime / 1000000L, // ns -> ms -# MAGIC metrics.inputMetrics.bytesRead, -# MAGIC metrics.shuffleReadMetrics.totalBytesRead, -# MAGIC metrics.shuffleWriteMetrics.bytesWritten, -# MAGIC metrics.resultSize -# MAGIC )) -# MAGIC -# MAGIC val snapshot = stages.toSeq -# MAGIC val df = spark -# MAGIC .createDataFrame(snapshot) -# MAGIC .toDF( -# MAGIC "stage_id", -# MAGIC "stage_duration_ms", -# MAGIC "executor_run_time_ms", -# MAGIC "executor_cpu_time_ms", -# MAGIC "input_bytes_read", -# MAGIC "shuffle_read_bytes", -# MAGIC "shuffle_write_bytes", -# MAGIC "result_size_bytes" -# MAGIC ) -# MAGIC df.createOrReplaceGlobalTempView("_phase_metrics") -# MAGIC } -# MAGIC } -# MAGIC } -# MAGIC -# MAGIC spark.sparkContext.addSparkListener(phaseListener) -# MAGIC println("Phase-metrics SparkListener registered.") - -# COMMAND ---------- - -start_time = time.perf_counter() - -result = ( - buildings_df.alias("b") - .join( - municipalities_df.alias("m"), - F.expr("ST_Intersects(m.geometry, b.geometry)"), - ) - .groupBy(F.col("m.municipality_name")) - .agg(F.count(F.col("b.geometry")).alias("building_count")) - .orderBy(F.desc("building_count")) -) - -cardinality = result.count() -elapsed_seconds = time.perf_counter() - start_time - -print(f"Spatial join complete. Regions with matched buildings: {cardinality}") -print(f"Elapsed seconds: {elapsed_seconds:.3f}") - -# COMMAND ---------- - -_STAGE_DURATION_CAP = 100 - -phase_df = spark.read.table("global_temp._phase_metrics").orderBy("stage_id") - -aggregates = phase_df.agg( - F.coalesce(F.sum("input_bytes_read"), F.lit(0)).alias("executor_input_bytes_read"), - F.coalesce(F.sum("executor_run_time_ms"), F.lit(0)).alias("executor_run_time_ms"), - F.coalesce(F.sum("shuffle_read_bytes"), F.lit(0)).alias("shuffle_read_bytes"), - F.coalesce(F.sum("shuffle_write_bytes"), F.lit(0)).alias("shuffle_write_bytes"), - F.coalesce(F.sum("stage_duration_ms"), F.lit(0)).alias("sum_stage_duration_ms"), -).collect()[0] - -stage_durations_rows = phase_df.select("stage_duration_ms").collect() -stage_durations_all = [int(row["stage_duration_ms"]) for row in stage_durations_rows] -if len(stage_durations_all) > _STAGE_DURATION_CAP: - print( - f"WARNING: {len(stage_durations_all)} stages observed; truncating " - f"stage_durations_ms to first {_STAGE_DURATION_CAP} to stay under the " - f"dbutils.notebook.exit payload cap." - ) - stage_durations = stage_durations_all[:_STAGE_DURATION_CAP] -else: - stage_durations = stage_durations_all - -wall_clock_ms = int(round(elapsed_seconds * 1000.0)) -sum_stage_duration_ms = int(aggregates["sum_stage_duration_ms"]) -residual_ms = wall_clock_ms - sum_stage_duration_ms -if residual_ms < 0: - print( - f"WARNING: stage durations sum ({sum_stage_duration_ms} ms) exceeds " - f"wall-clock ({wall_clock_ms} ms); clamping driver_collection_time_ms " - f"to 0. Stages likely overlapped (autoscaling)." - ) - driver_collection_time_ms = 0 -else: - driver_collection_time_ms = residual_ms - -payload = { - "execution_duration_s": elapsed_seconds, - "cardinality": int(cardinality), - "executor_input_bytes_read": int(aggregates["executor_input_bytes_read"]), - "executor_run_time_ms": int(aggregates["executor_run_time_ms"]), - "shuffle_read_bytes": int(aggregates["shuffle_read_bytes"]), - "shuffle_write_bytes": int(aggregates["shuffle_write_bytes"]), - "driver_collection_time_ms": driver_collection_time_ms, - "stage_durations_ms": json.dumps(stage_durations), -} - -print(f"Phase-metric payload: {payload}") - -dbutils.notebook.exit(json.dumps(payload)) diff --git a/src/presentation/entrypoints/__init__.py b/src/presentation/entrypoints/__init__.py index 497a87f8..d0b7af41 100644 --- a/src/presentation/entrypoints/__init__.py +++ b/src/presentation/entrypoints/__init__.py @@ -20,6 +20,3 @@ from .national_scale_spatial_join_databricks_partitioned_8_nodes import national_scale_spatial_join_databricks_partitioned_8_nodes from .national_scale_spatial_join_databricks_partitioned_12_nodes import national_scale_spatial_join_databricks_partitioned_12_nodes from .national_scale_spatial_join_databricks_partitioned_16_nodes import national_scale_spatial_join_databricks_partitioned_16_nodes -from .national_scale_spatial_join_databricks_default_2_nodes import national_scale_spatial_join_databricks_default_2_nodes -from .national_scale_spatial_join_databricks_default_8_nodes import national_scale_spatial_join_databricks_default_8_nodes -from .national_scale_spatial_join_databricks_default_16_nodes import national_scale_spatial_join_databricks_default_16_nodes diff --git a/src/presentation/entrypoints/_databricks_benchmark_runner.py b/src/presentation/entrypoints/_databricks_benchmark_runner.py index 478aa4bd..e96a8268 100644 --- a/src/presentation/entrypoints/_databricks_benchmark_runner.py +++ b/src/presentation/entrypoints/_databricks_benchmark_runner.py @@ -7,7 +7,7 @@ from src.presentation.entrypoints._factory import _build_query_id, _get_dataset_size -NotebookVariant = Literal["broadcast", "partitioned", "default"] +NotebookVariant = Literal["broadcast", "partitioned"] def run_databricks_national_scale_spatial_join( diff --git a/src/presentation/entrypoints/national_scale_spatial_join_databricks_default_16_nodes.py b/src/presentation/entrypoints/national_scale_spatial_join_databricks_default_16_nodes.py deleted file mode 100644 index 79848599..00000000 --- a/src/presentation/entrypoints/national_scale_spatial_join_databricks_default_16_nodes.py +++ /dev/null @@ -1,27 +0,0 @@ -from dependency_injector.wiring import Provide, inject - -from src.application.contracts import IDatabricksService -from src.infra.infrastructure import Containers -from src.presentation.entrypoints._databricks_benchmark_runner import ( - run_databricks_national_scale_spatial_join, -) - - -@inject -def national_scale_spatial_join_databricks_default_16_nodes( - databricks_service: IDatabricksService = Provide[Containers.databricks_service], -) -> None: - """ - Benchmark: national-scale spatial join between Norwegian municipalities and the - configured buildings dataset size executed on Azure Databricks with a 16-worker - cluster, using the default join strategy (no ``broadcast()`` hint and no Sedona - partitioner configuration; Spark's cost-based optimizer picks the plan). The - dataset size is pulled from DI inside ``run_databricks_national_scale_spatial_join``. - The cluster is provisioned once, every warmup and timed iteration runs against it, - and the cluster is terminated after the benchmark completes. - """ - run_databricks_national_scale_spatial_join( - databricks_service=databricks_service, - num_workers=16, - notebook_variant="default", - ) diff --git a/src/presentation/entrypoints/national_scale_spatial_join_databricks_default_2_nodes.py b/src/presentation/entrypoints/national_scale_spatial_join_databricks_default_2_nodes.py deleted file mode 100644 index 6ad79b67..00000000 --- a/src/presentation/entrypoints/national_scale_spatial_join_databricks_default_2_nodes.py +++ /dev/null @@ -1,27 +0,0 @@ -from dependency_injector.wiring import Provide, inject - -from src.application.contracts import IDatabricksService -from src.infra.infrastructure import Containers -from src.presentation.entrypoints._databricks_benchmark_runner import ( - run_databricks_national_scale_spatial_join, -) - - -@inject -def national_scale_spatial_join_databricks_default_2_nodes( - databricks_service: IDatabricksService = Provide[Containers.databricks_service], -) -> None: - """ - Benchmark: national-scale spatial join between Norwegian municipalities and the - configured buildings dataset size executed on Azure Databricks with a 2-worker - cluster, using the default join strategy (no ``broadcast()`` hint and no Sedona - partitioner configuration; Spark's cost-based optimizer picks the plan). The - dataset size is pulled from DI inside ``run_databricks_national_scale_spatial_join``. - The cluster is provisioned once, every warmup and timed iteration runs against it, - and the cluster is terminated after the benchmark completes. - """ - run_databricks_national_scale_spatial_join( - databricks_service=databricks_service, - num_workers=2, - notebook_variant="default", - ) diff --git a/src/presentation/entrypoints/national_scale_spatial_join_databricks_default_8_nodes.py b/src/presentation/entrypoints/national_scale_spatial_join_databricks_default_8_nodes.py deleted file mode 100644 index ea5731ca..00000000 --- a/src/presentation/entrypoints/national_scale_spatial_join_databricks_default_8_nodes.py +++ /dev/null @@ -1,27 +0,0 @@ -from dependency_injector.wiring import Provide, inject - -from src.application.contracts import IDatabricksService -from src.infra.infrastructure import Containers -from src.presentation.entrypoints._databricks_benchmark_runner import ( - run_databricks_national_scale_spatial_join, -) - - -@inject -def national_scale_spatial_join_databricks_default_8_nodes( - databricks_service: IDatabricksService = Provide[Containers.databricks_service], -) -> None: - """ - Benchmark: national-scale spatial join between Norwegian municipalities and the - configured buildings dataset size executed on Azure Databricks with an 8-worker - cluster, using the default join strategy (no ``broadcast()`` hint and no Sedona - partitioner configuration; Spark's cost-based optimizer picks the plan). The - dataset size is pulled from DI inside ``run_databricks_national_scale_spatial_join``. - The cluster is provisioned once, every warmup and timed iteration runs against it, - and the cluster is terminated after the benchmark completes. - """ - run_databricks_national_scale_spatial_join( - databricks_service=databricks_service, - num_workers=8, - notebook_variant="default", - )