Skip to content

implementing dask distributed for latest earthmover#188

Open
tomreitz wants to merge 4 commits into
mainfrom
feature/dask_distributed_3
Open

implementing dask distributed for latest earthmover#188
tomreitz wants to merge 4 commits into
mainfrom
feature/dask_distributed_3

Conversation

@tomreitz
Copy link
Copy Markdown
Collaborator

@tomreitz tomreitz commented Mar 25, 2026

I made several previous attempts (here and here) at implementing Dask Distributed with earthmover. They were incomplete and only partly-functional, and had significantly diverged from main earthmover (missing newer features like pivot and melt), so I decided to start afresh - with some assistance from Claude.

This PR fully implements parallelization in earthmover with Dask Distributed. High-level summary of changes includes:

  • CLI: Added --workers / -w and (optional) --mem_per_worker / -m flags to earthmover run to launch a LocalCluster directly from the command line without modifying YAML config
  • earthmover.py: When config.dask_cluster_kwargs is present in YAML (or --workers is passed via CLI), spins up a dask.distributed.LocalCluster + Client before graph execution and tears it down in a try/finally block; also applies config.dask settings via dask.config.set(). Sensible distributed defaults added to DEFAULT_CONFIG.
  • nodes/destination.py: Replaced the inline lambda + bound-method render_row in map_partitions with a module-level _render_destination_partition function (picklable). Template is compiled from its source string inside each worker rather than passed as a compiled jinja2.Template (not serializable). Partition writes use client.compute() for parallel I/O when a distributed client is active, falling back to sequential.
  • nodes/node.py: Replaced compiled-template kwarg in check_expectations() with a module-level _apply_expectation_partition function that re-compiles the Jinja template string inside each worker, fixing a PicklingError when running with distributed.
  • nodes/source.py: Added blocksize config key to CSV/TSV reads (default "25MB") to control partition count. Replaced a serialization-unsafe lambda in post_execute() with functools.partial + a module-level _reindex_partition function.
  • operations/column.py: Replaced Jinja-rendering lambdas in AddColumnsOperation and ModifyColumnsOperation with module-level _jinja_add_column_partition / _jinja_modify_column_partition functions (template compiled inside each worker).
  • operations/row.py: Extracted FlattenOperation's per-partition logic into a module-level _flatten_partition function used with functools.partial.
  • operations/groupby.py: Rewrote GroupByOperation to issue a single grouped.agg() call for all native aggregations (sum, mean, min, max, std, var) instead of one grouped.apply() per output column — reducing shuffles from N to 1 and fixing OOM crashes on large datasets. Uses unique temp-column names per output column to avoid multi-level MultiIndex issues after reset_index(). Numeric coercion is applied only to per-column temp copies, leaving original string source columns intact for custom agg/json_array_agg operations.
  • docs/: several files updated; config.md with dask config details, usage.md explaining how to run earthmover distributed, and design.md with details about how distributed works, benchmark results, and FAQs

This is a pretty big PR... how to review it? I recommend:

  1. read the design docs for an overall understanding, and the usage docs for a quickstart
  2. try it! check out the branch, and with a bigger input file, earthmover run -w 4; also check out the Dask Distributed dashboard that spins up (at localhost URL reported by earthmover on CLI)
  3. review the above summary of changes - keep in mind that most changes are related to Dask Distributed requiring everything to be pickleable
  4. review the code - in-line comments try to explain the "why"

I've tested this with earthmover -t, the example_projects/, and the TPC-H queries - but not with assessment bundles and large files (I would love help/tips on where to find and how to do that).

I welcome questions, comments, recommendations, and improvements. Thanks!

@tomreitz tomreitz requested a review from jayckaiser March 25, 2026 16:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant