|
1 | 1 | """ |
2 | 2 | Use cases handle application logic. |
3 | 3 | """ |
4 | | -from typing import Dict, Sequence, Set, Type, Union, cast |
| 4 | +from typing import Dict, Sequence, Set, Type, Union, cast, List |
| 5 | +import multiprocessing |
| 6 | +import math |
| 7 | + |
| 8 | +from joblib import Parallel, delayed, parallel_config # type: ignore |
5 | 9 |
|
6 | 10 | from ..application.ports import caching |
7 | 11 | from ..application.ports.filesystem import AbstractFileSystem |
|
13 | 17 | from .config import settings |
14 | 18 |
|
15 | 19 |
|
| 20 | +N_CPUS = multiprocessing.cpu_count() |
| 21 | + |
| 22 | + |
16 | 23 | class NotSupplied: |
17 | 24 | pass |
18 | 25 |
|
@@ -137,11 +144,23 @@ def _scan_packages( |
137 | 144 | else: |
138 | 145 | imports_by_module_file[module_file] = direct_imports |
139 | 146 |
|
140 | | - # TODO Parallelise this part. |
141 | | - for module_file in modules_files_to_scan.difference(imports_by_module_file): |
142 | | - imports_by_module_file[module_file] = import_scanner.scan_for_imports( |
143 | | - module_file.module, exclude_type_checking_imports=exclude_type_checking_imports |
| 147 | + remaining_modules_files_to_scan = list( |
| 148 | + modules_files_to_scan.difference(imports_by_module_file) |
| 149 | + ) |
| 150 | + chunked_remaining_modules_files_to_scan = _create_chunks( |
| 151 | + remaining_modules_files_to_scan, n_chunks=N_CPUS |
| 152 | + ) |
| 153 | + with parallel_config(n_jobs=N_CPUS): |
| 154 | + import_scanning_jobs = Parallel()( |
| 155 | + delayed(_scan_imports)( |
| 156 | + import_scanner=import_scanner, |
| 157 | + exclude_type_checking_imports=exclude_type_checking_imports, |
| 158 | + module_files=chunk_module_files_to_scan, |
| 159 | + ) |
| 160 | + for chunk_module_files_to_scan in chunked_remaining_modules_files_to_scan |
144 | 161 | ) |
| 162 | + for chunk_imports_by_module_file in import_scanning_jobs: |
| 163 | + imports_by_module_file.update(chunk_imports_by_module_file) |
145 | 164 |
|
146 | 165 | imports_by_module: Dict[Module, Set[DirectImport]] = { |
147 | 166 | k.module: v for k, v in imports_by_module_file.items() |
@@ -184,3 +203,21 @@ def _is_external(module: Module, found_packages: Set[FoundPackage]) -> bool: |
184 | 203 | module.is_descendant_of(package_module) or module == package_module |
185 | 204 | for package_module in package_modules |
186 | 205 | ) |
| 206 | + |
| 207 | + |
| 208 | +def _create_chunks(module_files: List[ModuleFile], *, n_chunks: int) -> List[List[ModuleFile]]: |
| 209 | + chunk_size = math.ceil(len(module_files) / n_chunks) |
| 210 | + return [module_files[i * chunk_size : (i + 1) * chunk_size] for i in range(n_chunks)] |
| 211 | + |
| 212 | + |
| 213 | +def _scan_imports( |
| 214 | + import_scanner: AbstractImportScanner, |
| 215 | + exclude_type_checking_imports: bool, |
| 216 | + module_files: List[ModuleFile], |
| 217 | +) -> Dict[ModuleFile, Set[DirectImport]]: |
| 218 | + return { |
| 219 | + module_file: import_scanner.scan_for_imports( |
| 220 | + module_file.module, exclude_type_checking_imports=exclude_type_checking_imports |
| 221 | + ) |
| 222 | + for module_file in module_files |
| 223 | + } |
0 commit comments