Skip to content
1 change: 0 additions & 1 deletion filecache/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@
FileCacheSourceGS,
FileCacheSourceS3,
FileCacheSourceFake)

__all__ = ['get_global_logger',
'register_filecachesource',
'set_easy_logger',
Expand Down
106 changes: 91 additions & 15 deletions filecache/file_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def __init__(self,
raise TypeError(f'cache_name argument {cache_name} is of improper type')

is_shared = (cache_name is not None)
self._cache_name = cache_name

self._delete_on_exit = (delete_on_exit if delete_on_exit is not None
else not is_shared)
Expand Down Expand Up @@ -418,6 +419,15 @@ def logger(self) -> Logger | None:
return _GLOBAL_LOGGER
return cast(Logger, self._logger)

def __repr__(self) -> str:
return (f'FileCache({self._cache_name!r}, '
f'anonymous={self._anonymous!r}, '
f'lock_timeout={self._lock_timeout!r}, '
f'nthreads={self._nthreads!r})')

def __str__(self) -> str:
return str(self._cache_dir)

def _log_debug(self, msg: str) -> None:
logger = self.logger
if logger:
Expand Down Expand Up @@ -1600,7 +1610,7 @@ def _retrieve_multi_locked(self,
except filelock._error.Timeout:
self._log_debug(f' Failed to lock: {sub_path}')
wait_to_appear.append((idx, f'{source_pfx}{sub_path}', local_path,
lock_path))
lock_path, source, sub_path))
continue
lock_list.append((lock_path, lock))
source_idxes.append(idx)
Expand Down Expand Up @@ -1644,12 +1654,29 @@ def _retrieve_multi_locked(self,
# If wait_to_appear is not empty, then we failed to acquire at least one lock,
# which means that another process was downloading the file. So now we just
# sit here and wait for all of the missing files to magically show up, or for
# us to time out. If the lock file disappears but the destination file isn't
# present, that means the other process failed in its download.
# us to time out.
#
# In each iteration we also check for stale locks: a stale lock exists when the
# lock file is present but the process that created it has died (the OS released
# the advisory flock, but did not delete the file). We detect a stale lock by
# attempting a non-blocking acquire -- if it succeeds, no live process holds the
# lock, so we steal it and initiate the download ourselves.
#
# Race-condition guarantee: multiple waiting processes may all notice the same
# stale lock and all attempt `lock.acquire(timeout=0)` simultaneously. Because
# the underlying flock(2) call is atomic, exactly one process wins the race; the
# others receive a Timeout and remain in the wait list. The winner downloads the
# file atomically (write to temp path + rename), so the losers will find the
# completed file on their next poll iteration.
#
# If the lock file disappears without the destination file appearing, that means
# the other process failed (or cleaned up after itself on error).
timed_out = False
Comment on lines +1659 to 1674
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Honor negative lock_timeout in the wait loop.

The public contract says a negative timeout means “never time out,” but this loop still checks elapsed > lock_timeout. For any negative value, that becomes true on the first poll, so locked multi-file retrieves can raise TimeoutError immediately instead of waiting indefinitely.

Proposed fix
-            if time.time() - start_time > lock_timeout:
+            if lock_timeout >= 0 and time.time() - start_time > lock_timeout:
                 exc = TimeoutError(
                     'Timeout while waiting for another process to finish downloading')

Also applies to: 1743-1745

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@filecache/file_cache.py` around lines 1659 - 1674, The wait loop incorrectly
treats negative lock_timeout as a timeout because it unconditionally compares
elapsed > lock_timeout; update the loop logic (the section setting timed_out and
the condition that checks elapsed > lock_timeout) to only perform the elapsed >
lock_timeout check when lock_timeout is non-negative (e.g., lock_timeout >= 0),
so negative values mean “never time out”; apply the same change to the analogous
check around lines 1743-1745 to ensure both wait loops honor negative
lock_timeout.

while wait_to_appear:
new_wait_to_appear = []
for idx, url, local_path, lock_path in wait_to_appear:
stale_lock_items = [] # Items whose locks we have stolen

for idx, url, local_path, lock_path, source, sub_path in wait_to_appear:
if local_path.is_file():
func_ret[idx] = local_path
self._log_debug(f' Downloaded elsewhere: {url}')
Expand All @@ -1659,18 +1686,67 @@ def _retrieve_multi_locked(self,
f'Another process failed to download {url}')
self._log_debug(f' Download elsewhere failed: {url}')
continue
new_wait_to_appear.append((idx, url, local_path, lock_path))
# Lock file exists and destination is absent. Check whether the lock is
# stale (flock released by a crashed process but file not cleaned up).
stale_lock = filelock.FileLock(lock_path, timeout=0)
try:
stale_lock.acquire()
# Acquired with timeout=0 → no live process holds this lock.
stale_lock_items.append(
(idx, url, local_path, lock_path, stale_lock, source, sub_path))
self._log_debug(f' Stale lock detected for {url}, will re-download')
except filelock._error.Timeout:
# Lock is actively held -- another process is still downloading.
new_wait_to_appear.append(
(idx, url, local_path, lock_path, source, sub_path))

# Download all files whose stale locks we have just acquired, grouped by
# source so that retrieve_multi can fetch them in parallel.
stale_by_source: dict[str, list[tuple[
int, str, Path, Path, filelock.FileLock, FileCacheSource, str]]] = {}
for item in stale_lock_items:
pfx = item[5]._src_prefix_
if pfx not in stale_by_source:
stale_by_source[pfx] = []
stale_by_source[pfx].append(item)

for items in stale_by_source.values():
s_idxes = [it[0] for it in items]
s_urls = [it[1] for it in items]
s_local_paths = [it[2] for it in items]
s_lock_paths = [it[3] for it in items]
s_locks = [it[4] for it in items]
s_source = items[0][5]
s_sub_paths = [it[6] for it in items]

rets = s_source.retrieve_multi(s_sub_paths, s_local_paths,
preserve_mtime=False, nthreads=nthreads)
for idx, url, ret in zip(s_idxes, s_urls, rets):
func_ret[idx] = ret
if isinstance(ret, Exception):
files_not_exist.append(url)
self._log_debug(
f' Stale lock recovery download failed: '
f'{url}: {ret!r}')
else:
self._download_counter += 1
self._log_debug(
f' Re-downloaded after stale lock recovery: {url}')

for s_lock, s_lock_path in zip(s_locks, s_lock_paths):
s_lock.release()
s_lock_path.unlink(missing_ok=True)

if not new_wait_to_appear:
break

wait_to_appear = new_wait_to_appear
if time.time() - start_time > lock_timeout:
if lock_timeout >= 0 and time.time() - start_time > lock_timeout:
exc = TimeoutError(
'Timeout while waiting for another process to finish downloading')
self._log_debug(
' Timeout while waiting for another process to finish downloading:')
for idx, url, local_path, lock_path in wait_to_appear:
for idx, url, local_path, lock_path, source, sub_path in wait_to_appear:
func_ret[idx] = exc
self._log_debug(f' {url}')
if exception_on_fail:
Expand Down Expand Up @@ -1923,14 +1999,14 @@ def _upload_multi(self,
pass

if exception_on_fail:
exc_str = ''
if files_not_exist:
exc_str += f"File(s) do not exist: {', '.join(files_not_exist)}"
if files_failed:
if exc_str:
exc_str += ' AND '
exc_str += f"Failed to upload file(s): {', '.join(files_failed)}"
if exc_str:
if files_not_exist and not files_failed:
raise FileNotFoundError(
f"File(s) do not exist: {', '.join(files_not_exist)}")
elif files_failed:
exc_str = f"Failed to upload file(s): {', '.join(files_failed)}"
if files_not_exist:
exc_str = (f"File(s) do not exist: {', '.join(files_not_exist)}"
f' AND {exc_str}')
raise FileNotFoundError(exc_str)

return cast(list[Union[Path, Exception]], func_ret)
Expand Down
15 changes: 14 additions & 1 deletion filecache/file_cache_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,20 @@ def splitpath(self, search_dir: str) -> tuple[FCPath, ...]:
for i, j in zip(indices[:-1], indices[1:]))

def __repr__(self) -> str:
return f'FCPath({self._path!r})'
parts = [repr(self._path)]
if self._filecache is not None:
parts.append(f'filecache={self._filecache!r}')
if self._anonymous is not None:
parts.append(f'anonymous={self._anonymous!r}')
if self._lock_timeout is not None:
parts.append(f'lock_timeout={self._lock_timeout!r}')
if self._nthreads is not None:
parts.append(f'nthreads={self._nthreads!r}')
if self._url_to_url is not None:
parts.append(f'url_to_url={self._url_to_url!r}')
if self._url_to_path is not None:
parts.append(f'url_to_path={self._url_to_path!r}')
return f'FCPath({", ".join(parts)})'

def __eq__(self, other: object) -> bool:
if not isinstance(other, FCPath):
Expand Down
7 changes: 7 additions & 0 deletions filecache/file_cache_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ def __init__(self,
# The _cache_subdir attribute is only used by the FileCache class
self._cache_subdir = ''

def __repr__(self) -> str:
return (f'{type(self).__name__}({self._scheme!r}, {self._remote!r}, '
f'anonymous={self._anonymous!r})')

def __str__(self) -> str:
return self._src_prefix

@classmethod
@abstractmethod
def schemes(self) -> tuple[str, ...]:
Expand Down
Loading
Loading