@@ -959,66 +959,87 @@ def dump_stats(self) -> None:
959959 print (f"{ key + ':' :24} { value } " )
960960
961961 def parse_all (self , states : list [State ]) -> None :
962- """Parse multiple files in parallel (if possible) and compute dependencies.
962+ """Parse multiple files in parallel (if possible) and compute dependencies."""
963+ if not self .options .native_parser :
964+ # Old parser cannot be parallelized.
965+ for state in states :
966+ state .parse_file ()
967+ self .post_parse_all (states )
968+ return
969+
970+ sequential_states = []
971+ parallel_states = []
972+ for state in states :
973+ if state .tree is not None :
974+ # The file was already parsed.
975+ continue
976+ if not self .fscache .exists (state .xpath ):
977+ # New parser only supports parsing on-disk files.
978+ sequential_states .append (state )
979+ continue
980+ parallel_states .append (state )
981+ self .parse_parallel (sequential_states , parallel_states )
982+ self .post_parse_all (states )
983+
984+ def parse_parallel (self , sequential_states : list [State ], parallel_states : list [State ]) -> None :
985+ """Perform parallel parsing of states.
963986
964987 Note: this duplicates a bit of logic from State.parse_file(). This is done
965- as a micro- optimization to parallelize only those parts of the code that
966- can be parallelized efficiently.
988+ as an optimization to parallelize only those parts of the code that can be
989+ parallelized efficiently.
967990 """
968- if self .options .native_parser :
969- futures = []
970- parsed_states = {}
971- # Use at least --num-workers if specified by user.
972- available_threads = max (get_available_threads (), self .options .num_workers )
973- # Overhead from trying to parallelize (small) blocking portion of
974- # parse_file_inner() results in no visible improvement with more than 8 threads.
975- # TODO: reuse thread pool and/or batch small files in single submit() call.
976- with ThreadPoolExecutor (max_workers = min (available_threads , 8 )) as executor :
977- for state in states :
978- state .needs_parse = False
979- if state .tree is not None :
980- # The file was already parsed.
981- continue
982- # New parser reads source from file directly, we do this only for
983- # the side effect of parsing inline mypy configurations.
984- state .get_source ()
985- if state .id not in self .ast_cache :
986- self .log (f"Parsing { state .xpath } ({ state .id } )" )
987- ignore_errors = state .ignore_all or state .options .ignore_errors
988- if ignore_errors :
989- self .errors .ignored_files .add (state .xpath )
990- futures .append (executor .submit (state .parse_file_inner , state .source or "" ))
991- parsed_states [state .id ] = state
992- else :
993- self .log (f"Using cached AST for { state .xpath } ({ state .id } )" )
994- state .tree , state .early_errors = self .ast_cache [state .id ]
995- for fut in wait (futures ).done :
996- state_id , parse_errors = fut .result ()
997- if parse_errors :
998- state = parsed_states [state_id ]
999- with state .wrap_context ():
1000- self .errors .set_file (state .xpath , state .id , options = state .options )
1001- for error in parse_errors :
1002- # New parser reports errors lazily.
1003- report_parse_error (error , self .errors )
1004- if self .errors .is_blockers ():
1005- self .log ("Bailing due to parse errors" )
1006- self .errors .raise_error ()
991+ futures = []
992+ parallel_parsed_states = {}
993+ # Use at least --num-workers if specified by user.
994+ available_threads = max (get_available_threads (), self .options .num_workers )
995+ # Overhead from trying to parallelize (small) blocking portion of
996+ # parse_file_inner() results in no visible improvement with more than 8 threads.
997+ # TODO: reuse thread pool and/or batch small files in single submit() call.
998+ with ThreadPoolExecutor (max_workers = min (available_threads , 8 )) as executor :
999+ for state in parallel_states :
1000+ state .needs_parse = False
1001+ # New parser reads source from file directly, we do this only for
1002+ # the side effect of parsing inline mypy configurations.
1003+ state .get_source ()
1004+ if state .id not in self .ast_cache :
1005+ self .log (f"Parsing { state .xpath } ({ state .id } )" )
1006+ ignore_errors = state .ignore_all or state .options .ignore_errors
1007+ if ignore_errors :
1008+ self .errors .ignored_files .add (state .xpath )
1009+ futures .append (executor .submit (state .parse_file_inner , state .source or "" ))
1010+ parallel_parsed_states [state .id ] = state
1011+ else :
1012+ self .log (f"Using cached AST for { state .xpath } ({ state .id } )" )
1013+ state .tree , state .early_errors = self .ast_cache [state .id ]
10071014
1008- for state in states :
1009- assert state .tree is not None
1010- if state .id in parsed_states :
1011- state .early_errors = list (self .errors .error_info_map .get (state .xpath , []))
1012- state .semantic_analysis_pass1 ()
1013- self .ast_cache [state .id ] = (state .tree , state .early_errors )
1014- self .modules [state .id ] = state .tree
1015- state .check_blockers ()
1016- state .setup_errors ()
1017- else :
1018- # Old parser cannot be parallelized.
1019- for state in states :
1015+ # Parse sequential before waiting on parallel.
1016+ for state in sequential_states :
10201017 state .parse_file ()
10211018
1019+ for fut in wait (futures ).done :
1020+ state_id , parse_errors = fut .result ()
1021+ # New parser reports errors lazily, add them if any.
1022+ if parse_errors :
1023+ state = parallel_parsed_states [state_id ]
1024+ with state .wrap_context ():
1025+ self .errors .set_file (state .xpath , state .id , options = state .options )
1026+ for error in parse_errors :
1027+ report_parse_error (error , self .errors )
1028+ if self .errors .is_blockers ():
1029+ self .log ("Bailing due to parse errors" )
1030+ self .errors .raise_error ()
1031+
1032+ for state in parallel_states :
1033+ assert state .tree is not None
1034+ if state .id in parallel_parsed_states :
1035+ state .early_errors = list (self .errors .error_info_map .get (state .xpath , []))
1036+ state .semantic_analysis_pass1 ()
1037+ self .ast_cache [state .id ] = (state .tree , state .early_errors )
1038+ self .modules [state .id ] = state .tree
1039+ state .check_blockers ()
1040+ state .setup_errors ()
1041+
1042+ def post_parse_all (self , states : list [State ]) -> None :
10221043 for state in states :
10231044 state .compute_dependencies ()
10241045 if self .workers and state .tree :
@@ -1152,7 +1173,8 @@ def parse_file(
11521173 Raise CompileError if there is a parse error.
11531174 """
11541175 imports_only = False
1155- if self .workers and self .fscache .exists (path ):
1176+ file_exists = self .fscache .exists (path )
1177+ if self .workers and file_exists :
11561178 # Currently, we can use the native parser only for actual files.
11571179 imports_only = True
11581180 t0 = time .time ()
@@ -1162,7 +1184,13 @@ def parse_file(
11621184 tree = load_from_raw (path , id , raw_data , self .errors , options )
11631185 else :
11641186 tree , parse_errors = parse (
1165- source , path , id , self .errors , options = options , imports_only = imports_only
1187+ source ,
1188+ path ,
1189+ id ,
1190+ self .errors ,
1191+ options = options ,
1192+ file_exists = file_exists ,
1193+ imports_only = imports_only ,
11661194 )
11671195 tree ._fullname = id
11681196 if self .stats_enabled :
0 commit comments