33"""
44
55from typing import Dict , Sequence , Set , Type , Union , cast , Iterable , Collection
6- import multiprocessing
76import math
87
8+ import joblib # type: ignore
9+
910from ..application .ports import caching
1011from ..application .ports .filesystem import AbstractFileSystem
1112from ..application .ports .graph import ImportGraph
@@ -21,7 +22,7 @@ class NotSupplied:
2122
2223
2324# This is an arbitrary number, but setting it too low slows down our functional tests considerably.
24- MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING = 50
25+ MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPLE_PROCESSES = 64
2526
2627
2728def build_graph (
@@ -228,19 +229,19 @@ def _create_chunks(module_files: Collection[ModuleFile]) -> tuple[tuple[ModuleFi
228229 module_files_tuple = tuple (module_files )
229230
230231 number_of_module_files = len (module_files_tuple )
231- n_chunks = _decide_number_of_of_processes (number_of_module_files )
232+ n_chunks = _decide_number_of_processes (number_of_module_files )
232233 chunk_size = math .ceil (number_of_module_files / n_chunks )
233234
234235 return tuple (
235236 module_files_tuple [i * chunk_size : (i + 1 ) * chunk_size ] for i in range (n_chunks )
236237 )
237238
238239
239- def _decide_number_of_of_processes (number_of_module_files : int ) -> int :
240- if number_of_module_files < MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING :
241- # Don't incur the overhead of multiprocessing .
240+ def _decide_number_of_processes (number_of_module_files : int ) -> int :
241+ if number_of_module_files < MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPLE_PROCESSES :
242+ # Don't incur the overhead of multiple processes .
242243 return 1
243- return min (multiprocessing .cpu_count (), number_of_module_files )
244+ return min (joblib .cpu_count (), number_of_module_files )
244245
245246
246247def _scan_chunks (
@@ -257,20 +258,15 @@ def _scan_chunks(
257258 )
258259
259260 number_of_processes = len (chunks )
260- if number_of_processes == 1 :
261- # No need to spawn a process if there's only one chunk.
262- [chunk ] = chunks
263- return _scan_chunk (import_scanner , exclude_type_checking_imports , chunk )
264- else :
265- with multiprocessing .Pool (number_of_processes ) as pool :
266- imports_by_module_file : Dict [ModuleFile , Set [DirectImport ]] = {}
267- import_scanning_jobs = pool .starmap (
268- _scan_chunk ,
269- [(import_scanner , exclude_type_checking_imports , chunk ) for chunk in chunks ],
270- )
271- for chunk_imports_by_module_file in import_scanning_jobs :
272- imports_by_module_file .update (chunk_imports_by_module_file )
273- return imports_by_module_file
261+ import_scanning_jobs = joblib .Parallel (n_jobs = number_of_processes )(
262+ joblib .delayed (_scan_chunk )(import_scanner , exclude_type_checking_imports , chunk )
263+ for chunk in chunks
264+ )
265+
266+ imports_by_module_file = {}
267+ for chunk_imports_by_module_file in import_scanning_jobs :
268+ imports_by_module_file .update (chunk_imports_by_module_file )
269+ return imports_by_module_file
274270
275271
276272def _scan_chunk (
0 commit comments