Skip to content

Commit 32f307a

Browse files
committed
Merge branch 'master' into faster-dev-ver
2 parents 529dc77 + f315c8a commit 32f307a

7 files changed

Lines changed: 132 additions & 53 deletions

File tree

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,20 @@ and (PR [20405](https://github.com/python/mypy/pull/20405)).
525525

526526
Please see [git log](https://github.com/python/typeshed/commits/main?after=f8f0794d0fe249c06dc9f31a004d85be6cca6ced+0&branch=main&path=stdlib) for full list of standard library typeshed stub changes.
527527

528+
### Mypy 1.20.1
529+
530+
- Always disable sync in SQLite cache (Ivan Levkivskyi, PR [21184](https://github.com/python/mypy/pull/21184))
531+
- Temporarily skip few base64 tests (Ivan Levkivskyi, PR [21193](https://github.com/python/mypy/pull/21193))
532+
- Revert `dict.__or__` typeshed change (Ivan Levkivskyi, PR [21186](https://github.com/python/mypy/pull/21186))
533+
- Fix narrowing for match case with variadic tuples (Shantanu, PR [21192](https://github.com/python/mypy/pull/21192))
534+
- Avoid narrowing `type[T]` in type calls (Shantanu, PR [21174](https://github.com/python/mypy/pull/21174))
535+
- Fix regression for catching empty tuple in except (Shantanu, PR [21153](https://github.com/python/mypy/pull/21153))
536+
- Fix reachability for frozenset and dict view narrowing (Shantanu, PR [21151](https://github.com/python/mypy/pull/21151))
537+
- Fix narrowing with chained comparison (Shantanu, PR [21150](https://github.com/python/mypy/pull/21150))
538+
- Avoid narrowing to unreachable at module level (Shantanu, PR [21144](https://github.com/python/mypy/pull/21144))
539+
- Allow dangerous identity comparisons to `Any` typed variables (Shantanu, PR [21142](https://github.com/python/mypy/pull/21142))
540+
- `--warn-unused-config` should not be a strict flag (Ivan Levkivskyi, PR [21139](https://github.com/python/mypy/pull/21139))
541+
528542
### Acknowledgements
529543

530544
Thanks to all mypy contributors who contributed to this release:

mypy/build.py

Lines changed: 71 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from collections.abc import Callable, Iterator, Mapping, Sequence, Set as AbstractSet
3030
from heapq import heappop, heappush
3131
from textwrap import dedent
32+
from threading import Thread
3233
from typing import (
3334
TYPE_CHECKING,
3435
Any,
@@ -371,6 +372,7 @@ def default_flush_errors(
371372
extra_plugins = extra_plugins or []
372373

373374
workers = []
375+
connect_threads = []
374376
if options.num_workers > 0:
375377
# TODO: switch to something more efficient than pickle (also in the daemon).
376378
pickled_options = pickle.dumps(options.snapshot())
@@ -383,10 +385,17 @@ def default_flush_errors(
383385
buf = WriteBuffer()
384386
sources_message.write(buf)
385387
sources_data = buf.getvalue()
388+
389+
def connect(wc: WorkerClient, data: bytes) -> None:
390+
# Start loading sources in each worker as soon as it is up.
391+
wc.connect()
392+
wc.conn.write_bytes(data)
393+
394+
# We don't wait for workers to be ready until they are actually needed.
386395
for worker in workers:
387-
# Start loading graph in each worker as soon as it is up.
388-
worker.connect()
389-
worker.conn.write_bytes(sources_data)
396+
thread = Thread(target=connect, args=(worker, sources_data))
397+
thread.start()
398+
connect_threads.append(thread)
390399

391400
try:
392401
result = build_inner(
@@ -399,6 +408,7 @@ def default_flush_errors(
399408
stderr,
400409
extra_plugins,
401410
workers,
411+
connect_threads,
402412
)
403413
result.errors = messages
404414
return result
@@ -412,6 +422,10 @@ def default_flush_errors(
412422
e.messages = messages
413423
raise
414424
finally:
425+
# In case of an early crash it is better to wait for workers to become ready, and
426+
# shut them down cleanly. Otherwise, they will linger until connection timeout.
427+
for thread in connect_threads:
428+
thread.join()
415429
for worker in workers:
416430
try:
417431
send(worker.conn, SccRequestMessage(scc_id=None, import_errors={}, mod_data={}))
@@ -431,6 +445,7 @@ def build_inner(
431445
stderr: TextIO,
432446
extra_plugins: Sequence[Plugin],
433447
workers: list[WorkerClient],
448+
connect_threads: list[Thread],
434449
) -> BuildResult:
435450
if platform.python_implementation() == "CPython":
436451
# Run gc less frequently, as otherwise we can spend a large fraction of
@@ -486,7 +501,7 @@ def build_inner(
486501

487502
reset_global_state()
488503
try:
489-
graph = dispatch(sources, manager, stdout)
504+
graph = dispatch(sources, manager, stdout, connect_threads)
490505
if not options.fine_grained_incremental:
491506
type_state.reset_all_subtype_caches()
492507
if options.timing_stats is not None:
@@ -496,9 +511,7 @@ def build_inner(
496511
warn_unused_configs(options, flush_errors)
497512
return BuildResult(manager, graph)
498513
finally:
499-
t0 = time.time()
500-
manager.metastore.commit()
501-
manager.add_stats(cache_commit_time=time.time() - t0)
514+
manager.commit()
502515
manager.log(
503516
"Build finished in %.3f seconds with %d modules, and %d errors"
504517
% (
@@ -1119,6 +1132,11 @@ def report_file(
11191132
if self.reports is not None and self.source_set.is_source(file):
11201133
self.reports.file(file, self.modules, type_map, options)
11211134

1135+
def commit(self) -> None:
1136+
t0 = time.time()
1137+
self.metastore.commit()
1138+
self.add_stats(cache_commit_time=time.time() - t0)
1139+
11221140
def verbosity(self) -> int:
11231141
return self.options.verbosity
11241142

@@ -1156,6 +1174,24 @@ def add_stats(self, **kwds: Any) -> None:
11561174
def stats_summary(self) -> Mapping[str, object]:
11571175
return self.stats
11581176

1177+
def broadcast(self, message: bytes) -> None:
1178+
"""Broadcast same message to all workers in parallel."""
1179+
t0 = time.time()
1180+
threads = []
1181+
for worker in self.workers:
1182+
thread = Thread(target=worker.conn.write_bytes, args=(message,))
1183+
thread.start()
1184+
threads.append(thread)
1185+
for thread in threads:
1186+
thread.join()
1187+
self.add_stats(broadcast_time=time.time() - t0)
1188+
1189+
def wait_ack(self) -> None:
1190+
"""Wait for an ack from all workers."""
1191+
for worker in self.workers:
1192+
buf = receive(worker.conn)
1193+
assert read_tag(buf) == ACK_MESSAGE
1194+
11591195
def submit(self, graph: Graph, sccs: list[SCC]) -> None:
11601196
"""Submit a stale SCC for processing in current process or parallel workers."""
11611197
if self.workers:
@@ -1176,6 +1212,7 @@ def submit_to_workers(self, graph: Graph, sccs: list[SCC] | None = None) -> None
11761212
for mod_id in scc.mod_ids
11771213
if (path := graph[mod_id].xpath) in self.errors.recorded
11781214
}
1215+
t0 = time.time()
11791216
send(
11801217
self.workers[idx].conn,
11811218
SccRequestMessage(
@@ -1193,6 +1230,7 @@ def submit_to_workers(self, graph: Graph, sccs: list[SCC] | None = None) -> None
11931230
},
11941231
),
11951232
)
1233+
self.add_stats(scc_send_time=time.time() - t0)
11961234

11971235
def wait_for_done(
11981236
self, graph: Graph
@@ -1221,7 +1259,10 @@ def wait_for_done_workers(
12211259

12221260
done_sccs = []
12231261
results = {}
1224-
for idx in ready_to_read([w.conn for w in self.workers], WORKER_DONE_TIMEOUT):
1262+
t0 = time.time()
1263+
ready = ready_to_read([w.conn for w in self.workers], WORKER_DONE_TIMEOUT)
1264+
t1 = time.time()
1265+
for idx in ready:
12251266
buf = receive(self.workers[idx].conn)
12261267
assert read_tag(buf) == SCC_RESPONSE_MESSAGE
12271268
data = SccResponseMessage.read(buf)
@@ -1232,6 +1273,7 @@ def wait_for_done_workers(
12321273
assert data.result is not None
12331274
results.update(data.result)
12341275
done_sccs.append(self.scc_by_id[scc_id])
1276+
self.add_stats(scc_wait_time=t1 - t0, scc_receive_time=time.time() - t1)
12351277
self.submit_to_workers(graph) # advance after some workers are free.
12361278
return (
12371279
done_sccs,
@@ -3685,7 +3727,12 @@ def log_configuration(manager: BuildManager, sources: list[BuildSource]) -> None
36853727
# The driver
36863728

36873729

3688-
def dispatch(sources: list[BuildSource], manager: BuildManager, stdout: TextIO) -> Graph:
3730+
def dispatch(
3731+
sources: list[BuildSource],
3732+
manager: BuildManager,
3733+
stdout: TextIO,
3734+
connect_threads: list[Thread],
3735+
) -> Graph:
36893736
log_configuration(manager, sources)
36903737

36913738
t0 = time.time()
@@ -3742,7 +3789,7 @@ def dispatch(sources: list[BuildSource], manager: BuildManager, stdout: TextIO)
37423789
dump_graph(graph, stdout)
37433790
return graph
37443791

3745-
# Fine grained dependencies that didn't have an associated module in the build
3792+
# Fine-grained dependencies that didn't have an associated module in the build
37463793
# are serialized separately, so we read them after we load the graph.
37473794
# We need to read them both for running in daemon mode and if we are generating
37483795
# a fine-grained cache (so that we can properly update them incrementally).
@@ -3755,25 +3802,28 @@ def dispatch(sources: list[BuildSource], manager: BuildManager, stdout: TextIO)
37553802
if fg_deps_meta is not None:
37563803
manager.fg_deps_meta = fg_deps_meta
37573804
elif manager.stats.get("fresh_metas", 0) > 0:
3758-
# Clear the stats so we don't infinite loop because of positive fresh_metas
3805+
# Clear the stats, so we don't infinite loop because of positive fresh_metas
37593806
manager.stats.clear()
37603807
# There were some cache files read, but no fine-grained dependencies loaded.
37613808
manager.log("Error reading fine-grained dependencies cache -- aborting cache load")
37623809
manager.cache_enabled = False
37633810
manager.log("Falling back to full run -- reloading graph...")
3764-
return dispatch(sources, manager, stdout)
3811+
return dispatch(sources, manager, stdout, connect_threads)
37653812

37663813
# If we are loading a fine-grained incremental mode cache, we
37673814
# don't want to do a real incremental reprocess of the
37683815
# graph---we'll handle it all later.
37693816
if not manager.use_fine_grained_cache():
3817+
# Wait for workers since they may be needed at this point.
3818+
for thread in connect_threads:
3819+
thread.join()
37703820
process_graph(graph, manager)
37713821
# Update plugins snapshot.
37723822
write_plugins_snapshot(manager)
37733823
manager.old_plugins_snapshot = manager.plugins_snapshot
37743824
if manager.options.cache_fine_grained or manager.options.fine_grained_incremental:
3775-
# If we are running a daemon or are going to write cache for further fine grained use,
3776-
# then we need to collect fine grained protocol dependencies.
3825+
# If we are running a daemon or are going to write cache for further fine-grained use,
3826+
# then we need to collect fine-grained protocol dependencies.
37773827
# Since these are a global property of the program, they are calculated after we
37783828
# processed the whole graph.
37793829
type_state.add_all_protocol_deps(manager.fg_deps)
@@ -4166,10 +4216,8 @@ def process_graph(graph: Graph, manager: BuildManager) -> None:
41664216
buf = WriteBuffer()
41674217
graph_message.write(buf)
41684218
graph_data = buf.getvalue()
4169-
for worker in manager.workers:
4170-
buf = receive(worker.conn)
4171-
assert read_tag(buf) == ACK_MESSAGE
4172-
worker.conn.write_bytes(graph_data)
4219+
manager.wait_ack()
4220+
manager.broadcast(graph_data)
41734221

41744222
sccs = sorted_components(graph)
41754223
manager.log(
@@ -4187,13 +4235,9 @@ def process_graph(graph: Graph, manager: BuildManager) -> None:
41874235
buf = WriteBuffer()
41884236
sccs_message.write(buf)
41894237
sccs_data = buf.getvalue()
4190-
for worker in manager.workers:
4191-
buf = receive(worker.conn)
4192-
assert read_tag(buf) == ACK_MESSAGE
4193-
worker.conn.write_bytes(sccs_data)
4194-
for worker in manager.workers:
4195-
buf = receive(worker.conn)
4196-
assert read_tag(buf) == ACK_MESSAGE
4238+
manager.wait_ack()
4239+
manager.broadcast(sccs_data)
4240+
manager.wait_ack()
41974241

41984242
manager.free_workers = set(range(manager.options.num_workers))
41994243

@@ -4339,9 +4383,7 @@ def process_stale_scc(
43394383
if (
43404384
not manager.options.test_env
43414385
and platform.python_implementation() == "CPython"
4342-
# Parallel workers perform loading in many smaller "pieces", so we
4343-
# should repeat the GC hack multiple times to actually benefit from it.
4344-
and (manager.gc_freeze_cycles < MAX_GC_FREEZE_CYCLES or manager.parallel_worker)
4386+
and manager.gc_freeze_cycles < MAX_GC_FREEZE_CYCLES
43454387
):
43464388
# When deserializing cache we create huge amount of new objects, so even
43474389
# with our generous GC thresholds, GC is still doing a lot of pointless
@@ -4350,16 +4392,14 @@ def process_stale_scc(
43504392
# generation with the freeze()/unfreeze() trick below. This is arguably
43514393
# a hack, but it gives huge performance wins for large third-party
43524394
# libraries, like torch.
4353-
gc.collect(generation=1)
4354-
gc.collect(generation=0)
43554395
gc.disable()
43564396
for prev_scc in fresh_sccs_to_load:
43574397
manager.done_sccs.add(prev_scc.id)
43584398
process_fresh_modules(graph, sorted(prev_scc.mod_ids), manager)
43594399
if (
43604400
not manager.options.test_env
43614401
and platform.python_implementation() == "CPython"
4362-
and (manager.gc_freeze_cycles < MAX_GC_FREEZE_CYCLES or manager.parallel_worker)
4402+
and manager.gc_freeze_cycles < MAX_GC_FREEZE_CYCLES
43634403
):
43644404
manager.gc_freeze_cycles += 1
43654405
gc.freeze()

mypy/build_worker/worker.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@
4545
process_stale_scc,
4646
)
4747
from mypy.cache import Tag, read_int_opt
48-
from mypy.defaults import RECURSION_LIMIT, WORKER_CONNECTION_TIMEOUT
48+
from mypy.defaults import RECURSION_LIMIT, WORKER_CONNECTION_TIMEOUT, WORKER_IDLE_TIMEOUT
4949
from mypy.errors import CompileError, ErrorInfo, Errors, report_internal_error
5050
from mypy.fscache import FileSystemCache
51-
from mypy.ipc import IPCException, IPCServer, receive, send
51+
from mypy.ipc import IPCException, IPCServer, ready_to_read, receive, send
5252
from mypy.modulefinder import BuildSource, BuildSourceSet, compute_search_paths
5353
from mypy.nodes import FileRawData
5454
from mypy.options import Options
@@ -170,34 +170,35 @@ def serve(server: IPCServer, ctx: ServerContext) -> None:
170170
# Notify coordinator we are ready to start processing SCCs.
171171
send(server, AckMessage())
172172
while True:
173+
t0 = time.time()
174+
ready_to_read([server], WORKER_IDLE_TIMEOUT)
175+
t1 = time.time()
173176
buf = receive(server)
174177
assert read_tag(buf) == SCC_REQUEST_MESSAGE
175178
scc_message = SccRequestMessage.read(buf)
179+
manager.add_stats(scc_wait_time=t1 - t0, scc_receive_time=time.time() - t1)
176180
scc_id = scc_message.scc_id
177181
if scc_id is None:
182+
gc_stats = gc.get_stats()
183+
manager.add_stats(
184+
gc_collections_gen0=gc_stats[0]["collections"],
185+
gc_collections_gen1=gc_stats[1]["collections"],
186+
)
178187
manager.dump_stats()
179188
break
180189
scc = manager.scc_by_id[scc_id]
181190
t0 = time.time()
182191
try:
183-
if platform.python_implementation() == "CPython":
184-
# Since we are splitting the GC freeze hack into multiple smaller freezes,
185-
# we should collect young generations to not accumulate accidental garbage.
186-
gc.collect(generation=1)
187-
gc.collect(generation=0)
188-
gc.disable()
189192
load_states(scc, graph, manager, scc_message.import_errors, scc_message.mod_data)
190-
if platform.python_implementation() == "CPython":
191-
gc.freeze()
192-
gc.unfreeze()
193-
gc.enable()
194193
result = process_stale_scc(graph, scc, manager, from_cache=graph_data.from_cache)
195194
# We must commit after each SCC, otherwise we break --sqlite-cache.
196-
manager.metastore.commit()
195+
manager.commit()
197196
except CompileError as blocker:
198197
send(server, SccResponseMessage(scc_id=scc_id, blocker=blocker))
199198
else:
199+
t1 = time.time()
200200
send(server, SccResponseMessage(scc_id=scc_id, result=result))
201+
manager.add_stats(scc_send_time=time.time() - t1)
201202
manager.add_stats(total_process_stale_time=time.time() - t0, stale_sccs_processed=1)
202203

203204

mypy/defaults.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,5 @@
4848
WORKER_START_INTERVAL: Final = 0.01
4949
WORKER_START_TIMEOUT: Final = 3
5050
WORKER_CONNECTION_TIMEOUT: Final = 10
51+
WORKER_IDLE_TIMEOUT: Final = 600
5152
WORKER_DONE_TIMEOUT: Final = 600

0 commit comments

Comments
 (0)