-
Notifications
You must be signed in to change notification settings - Fork 0
Fix stale locks (#56) and add __repr__/__str__ to public classes (#21) #63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
958bcd3
e954784
fe5bf9d
7d904b9
c44d8d7
0698009
8a4a8c2
160f360
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -264,6 +264,7 @@ def __init__(self, | |
| raise TypeError(f'cache_name argument {cache_name} is of improper type') | ||
|
|
||
| is_shared = (cache_name is not None) | ||
| self._cache_name = cache_name | ||
|
|
||
| self._delete_on_exit = (delete_on_exit if delete_on_exit is not None | ||
| else not is_shared) | ||
|
|
@@ -418,6 +419,15 @@ def logger(self) -> Logger | None: | |
| return _GLOBAL_LOGGER | ||
| return cast(Logger, self._logger) | ||
|
|
||
| def __repr__(self) -> str: | ||
| return (f'FileCache({self._cache_name!r}, ' | ||
| f'anonymous={self._anonymous!r}, ' | ||
| f'lock_timeout={self._lock_timeout!r}, ' | ||
| f'nthreads={self._nthreads!r})') | ||
|
|
||
| def __str__(self) -> str: | ||
| return str(self._cache_dir) | ||
|
|
||
| def _log_debug(self, msg: str) -> None: | ||
| logger = self.logger | ||
| if logger: | ||
|
|
@@ -1600,7 +1610,7 @@ def _retrieve_multi_locked(self, | |
| except filelock._error.Timeout: | ||
| self._log_debug(f' Failed to lock: {sub_path}') | ||
| wait_to_appear.append((idx, f'{source_pfx}{sub_path}', local_path, | ||
| lock_path)) | ||
| lock_path, source, sub_path)) | ||
| continue | ||
| lock_list.append((lock_path, lock)) | ||
| source_idxes.append(idx) | ||
|
|
@@ -1644,12 +1654,29 @@ def _retrieve_multi_locked(self, | |
| # If wait_to_appear is not empty, then we failed to acquire at least one lock, | ||
| # which means that another process was downloading the file. So now we just | ||
| # sit here and wait for all of the missing files to magically show up, or for | ||
| # us to time out. If the lock file disappears but the destination file isn't | ||
| # present, that means the other process failed in its download. | ||
| # us to time out. | ||
| # | ||
| # In each iteration we also check for stale locks: a stale lock exists when the | ||
| # lock file is present but the process that created it has died (the OS released | ||
| # the advisory flock, but did not delete the file). We detect a stale lock by | ||
| # attempting a non-blocking acquire -- if it succeeds, no live process holds the | ||
| # lock, so we steal it and initiate the download ourselves. | ||
| # | ||
| # Race-condition guarantee: multiple waiting processes may all notice the same | ||
| # stale lock and all attempt `lock.acquire(timeout=0)` simultaneously. Because | ||
| # the underlying flock(2) call is atomic, exactly one process wins the race; the | ||
| # others receive a Timeout and remain in the wait list. The winner downloads the | ||
| # file atomically (write to temp path + rename), so the losers will find the | ||
| # completed file on their next poll iteration. | ||
| # | ||
| # If the lock file disappears without the destination file appearing, that means | ||
| # the other process failed (or cleaned up after itself on error). | ||
| timed_out = False | ||
|
Comment on lines
+1659
to
1674
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Honor negative The public contract says a negative timeout means “never time out,” but this loop still checks Proposed fix- if time.time() - start_time > lock_timeout:
+ if lock_timeout >= 0 and time.time() - start_time > lock_timeout:
exc = TimeoutError(
'Timeout while waiting for another process to finish downloading')Also applies to: 1743-1745 🤖 Prompt for AI Agents |
||
| while wait_to_appear: | ||
| new_wait_to_appear = [] | ||
| for idx, url, local_path, lock_path in wait_to_appear: | ||
| stale_lock_items = [] # Items whose locks we have stolen | ||
|
|
||
| for idx, url, local_path, lock_path, source, sub_path in wait_to_appear: | ||
| if local_path.is_file(): | ||
| func_ret[idx] = local_path | ||
| self._log_debug(f' Downloaded elsewhere: {url}') | ||
|
|
@@ -1659,18 +1686,67 @@ def _retrieve_multi_locked(self, | |
| f'Another process failed to download {url}') | ||
| self._log_debug(f' Download elsewhere failed: {url}') | ||
| continue | ||
| new_wait_to_appear.append((idx, url, local_path, lock_path)) | ||
| # Lock file exists and destination is absent. Check whether the lock is | ||
| # stale (flock released by a crashed process but file not cleaned up). | ||
| stale_lock = filelock.FileLock(lock_path, timeout=0) | ||
| try: | ||
| stale_lock.acquire() | ||
| # Acquired with timeout=0 → no live process holds this lock. | ||
| stale_lock_items.append( | ||
| (idx, url, local_path, lock_path, stale_lock, source, sub_path)) | ||
| self._log_debug(f' Stale lock detected for {url}, will re-download') | ||
| except filelock._error.Timeout: | ||
| # Lock is actively held -- another process is still downloading. | ||
| new_wait_to_appear.append( | ||
| (idx, url, local_path, lock_path, source, sub_path)) | ||
|
|
||
| # Download all files whose stale locks we have just acquired, grouped by | ||
| # source so that retrieve_multi can fetch them in parallel. | ||
| stale_by_source: dict[str, list[tuple[ | ||
| int, str, Path, Path, filelock.FileLock, FileCacheSource, str]]] = {} | ||
| for item in stale_lock_items: | ||
| pfx = item[5]._src_prefix_ | ||
| if pfx not in stale_by_source: | ||
| stale_by_source[pfx] = [] | ||
| stale_by_source[pfx].append(item) | ||
|
|
||
| for items in stale_by_source.values(): | ||
| s_idxes = [it[0] for it in items] | ||
| s_urls = [it[1] for it in items] | ||
| s_local_paths = [it[2] for it in items] | ||
| s_lock_paths = [it[3] for it in items] | ||
| s_locks = [it[4] for it in items] | ||
| s_source = items[0][5] | ||
| s_sub_paths = [it[6] for it in items] | ||
|
|
||
| rets = s_source.retrieve_multi(s_sub_paths, s_local_paths, | ||
| preserve_mtime=False, nthreads=nthreads) | ||
| for idx, url, ret in zip(s_idxes, s_urls, rets): | ||
| func_ret[idx] = ret | ||
| if isinstance(ret, Exception): | ||
| files_not_exist.append(url) | ||
| self._log_debug( | ||
| f' Stale lock recovery download failed: ' | ||
| f'{url}: {ret!r}') | ||
| else: | ||
| self._download_counter += 1 | ||
| self._log_debug( | ||
| f' Re-downloaded after stale lock recovery: {url}') | ||
|
|
||
| for s_lock, s_lock_path in zip(s_locks, s_lock_paths): | ||
| s_lock.release() | ||
| s_lock_path.unlink(missing_ok=True) | ||
|
|
||
| if not new_wait_to_appear: | ||
| break | ||
|
|
||
| wait_to_appear = new_wait_to_appear | ||
| if time.time() - start_time > lock_timeout: | ||
| if lock_timeout >= 0 and time.time() - start_time > lock_timeout: | ||
| exc = TimeoutError( | ||
| 'Timeout while waiting for another process to finish downloading') | ||
| self._log_debug( | ||
| ' Timeout while waiting for another process to finish downloading:') | ||
| for idx, url, local_path, lock_path in wait_to_appear: | ||
| for idx, url, local_path, lock_path, source, sub_path in wait_to_appear: | ||
| func_ret[idx] = exc | ||
| self._log_debug(f' {url}') | ||
| if exception_on_fail: | ||
|
|
@@ -1923,14 +1999,14 @@ def _upload_multi(self, | |
| pass | ||
|
|
||
| if exception_on_fail: | ||
| exc_str = '' | ||
| if files_not_exist: | ||
| exc_str += f"File(s) do not exist: {', '.join(files_not_exist)}" | ||
| if files_failed: | ||
| if exc_str: | ||
| exc_str += ' AND ' | ||
| exc_str += f"Failed to upload file(s): {', '.join(files_failed)}" | ||
| if exc_str: | ||
| if files_not_exist and not files_failed: | ||
| raise FileNotFoundError( | ||
| f"File(s) do not exist: {', '.join(files_not_exist)}") | ||
| elif files_failed: | ||
| exc_str = f"Failed to upload file(s): {', '.join(files_failed)}" | ||
| if files_not_exist: | ||
| exc_str = (f"File(s) do not exist: {', '.join(files_not_exist)}" | ||
| f' AND {exc_str}') | ||
| raise FileNotFoundError(exc_str) | ||
rfrenchseti marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return cast(list[Union[Path, Exception]], func_ret) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.