implementing dask distributed for latest earthmover#188
Open
tomreitz wants to merge 4 commits into
Open
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
I made several previous attempts (here and here) at implementing Dask Distributed with earthmover. They were incomplete and only partly-functional, and had significantly diverged from main earthmover (missing newer features like
pivotandmelt), so I decided to start afresh - with some assistance from Claude.This PR fully implements parallelization in earthmover with Dask Distributed. High-level summary of changes includes:
--workers/-wand (optional)--mem_per_worker/-mflags toearthmover runto launch aLocalClusterdirectly from the command line without modifying YAML configearthmover.py: Whenconfig.dask_cluster_kwargsis present in YAML (or--workersis passed via CLI), spins up adask.distributed.LocalCluster+Clientbefore graph execution and tears it down in atry/finallyblock; also appliesconfig.dasksettings viadask.config.set(). Sensible distributed defaults added toDEFAULT_CONFIG.nodes/destination.py: Replaced the inline lambda + bound-methodrender_rowinmap_partitionswith a module-level_render_destination_partitionfunction (picklable). Template is compiled from its source string inside each worker rather than passed as a compiledjinja2.Template(not serializable). Partition writes useclient.compute()for parallel I/O when a distributed client is active, falling back to sequential.nodes/node.py: Replaced compiled-template kwarg incheck_expectations()with a module-level_apply_expectation_partitionfunction that re-compiles the Jinja template string inside each worker, fixing aPicklingErrorwhen running with distributed.nodes/source.py: Addedblocksizeconfig key to CSV/TSV reads (default"25MB") to control partition count. Replaced a serialization-unsafe lambda inpost_execute()withfunctools.partial+ a module-level_reindex_partitionfunction.operations/column.py: Replaced Jinja-rendering lambdas inAddColumnsOperationandModifyColumnsOperationwith module-level_jinja_add_column_partition/_jinja_modify_column_partitionfunctions (template compiled inside each worker).operations/row.py: ExtractedFlattenOperation's per-partition logic into a module-level_flatten_partitionfunction used withfunctools.partial.operations/groupby.py: RewroteGroupByOperationto issue a singlegrouped.agg()call for all native aggregations (sum,mean,min,max,std,var) instead of onegrouped.apply()per output column — reducing shuffles from N to 1 and fixing OOM crashes on large datasets. Uses unique temp-column names per output column to avoid multi-levelMultiIndexissues afterreset_index(). Numeric coercion is applied only to per-column temp copies, leaving original string source columns intact for customagg/json_array_aggoperations.docs/: several files updated;config.mdwith dask config details,usage.mdexplaining how to run earthmover distributed, anddesign.mdwith details about how distributed works, benchmark results, and FAQsThis is a pretty big PR... how to review it? I recommend:
earthmover run -w 4; also check out the Dask Distributed dashboard that spins up (at localhost URL reported by earthmover on CLI)I've tested this with
earthmover -t, theexample_projects/, and the TPC-H queries - but not with assessment bundles and large files (I would love help/tips on where to find and how to do that).I welcome questions, comments, recommendations, and improvements. Thanks!