diff --git a/changes/4003.bugfix.md b/changes/4003.bugfix.md new file mode 100644 index 0000000000..36327b55df --- /dev/null +++ b/changes/4003.bugfix.md @@ -0,0 +1,19 @@ +`FsspecStore.from_url()` and `from_mapper()` now close the async filesystem +they create when `store.close()` is called. Previously the underlying aiohttp +`ClientSession` was left open until garbage collection, producing +`"Unclosed client session"` `ResourceWarning`s from aiohttp. + +The fix introduces `FsspecStore._owns_fs`, a boolean that is ``True`` only when +`FsspecStore` itself created the filesystem (via `from_url` or `from_mapper` +when a sync→async conversion was performed). When `_owns_fs` is ``True``, +`store.close()` calls the new `_close_fs()` helper, which invokes +`fs.set_session()` and closes the returned client. Callers who supply their own +filesystem instance to `FsspecStore()` directly remain responsible for its +lifecycle; `_owns_fs` is ``False`` for those stores. + +**Scope note**: This fix closes the S3 client session that is active at the time +`store.close()` is called. Some S3-backed filesystem implementations (e.g. +s3fs with ``cache_regions=True``) may internally refresh and replace their +client during I/O operations, abandoning prior sessions before ``store.close()`` +is invoked. Those intermediate sessions are outside the scope of this fix and +are an issue in the upstream filesystem library. diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 14386d1aac..29201a6fee 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -3,6 +3,7 @@ import json import warnings from contextlib import suppress +from logging import getLogger from typing import TYPE_CHECKING, Any from packaging.version import parse as parse_version @@ -18,6 +19,8 @@ from zarr.errors import ZarrUserWarning from zarr.storage._utils import _dereference_path +logger = getLogger(__name__) + if TYPE_CHECKING: from collections.abc import AsyncIterator, Iterable @@ -35,6 +38,26 @@ ) +async def _close_fs(fs: AsyncFileSystem) -> None: + """ + Best-effort async close of an fsspec async filesystem owned by FsspecStore. + + For filesystems that expose ``set_session()`` (e.g. s3fs) the underlying + aiohttp ``ClientSession`` is closed explicitly, which prevents + "Unclosed client session" ``ResourceWarning``s from aiohttp. For all + other filesystem types the call is a no-op (not every implementation + manages an HTTP session directly). + + Note that ``set_session()`` lazily creates a session if none exists yet, so + closing a store that never performed any I/O may instantiate a session + purely to close it. This is accepted best-effort behavior; fsspec does not + expose a stable, cross-implementation way to test for an existing session. + """ + if hasattr(fs, "set_session"): + session = await fs.set_session() + await session.close() + + def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem: """Convert a sync FSSpec filesystem to an async FFSpec filesystem @@ -129,6 +152,9 @@ def __init__( self.fs = fs self.path = path self.allowed_exceptions = allowed_exceptions + # True only when this store created fs itself (from_url / from_mapper with new instance). + # Callers who supply their own fs remain responsible for its lifecycle. + self._owns_fs: bool = False if not self.fs.async_impl: raise TypeError("Filesystem needs to support async operations.") @@ -194,13 +220,17 @@ def from_mapper( ------- FsspecStore """ - fs = _make_async(fs_map.fs) - return cls( + original_fs = fs_map.fs + fs = _make_async(original_fs) + store = cls( fs=fs, path=fs_map.root, read_only=read_only, allowed_exceptions=allowed_exceptions, ) + # _make_async returns a new instance when converting sync→async; own it. + store._owns_fs = fs is not original_fs + return store @classmethod def from_url( @@ -242,16 +272,39 @@ def from_url( if not fs.async_impl: fs = _make_async(fs) - return cls(fs=fs, path=path, read_only=read_only, allowed_exceptions=allowed_exceptions) + store = cls(fs=fs, path=path, read_only=read_only, allowed_exceptions=allowed_exceptions) + store._owns_fs = True + return store def with_read_only(self, read_only: bool = False) -> FsspecStore: # docstring inherited - return type(self)( + new_store = type(self)( fs=self.fs, path=self.path, allowed_exceptions=self.allowed_exceptions, read_only=read_only, ) + # The derived store shares the same fs. Transfer ownership so the + # surviving store closes it, and clear ours to avoid a double-close. + # Otherwise the common ``from_url(...).with_read_only()`` chain would + # drop the only owner (the unreferenced source) and leak the session. + new_store._owns_fs = self._owns_fs + self._owns_fs = False + return new_store + + def close(self) -> None: + # docstring inherited + if self._owns_fs: + from zarr.core.sync import sync as zarr_sync + + # Best-effort: a failure to release the session must not block close(), + # but log it so a genuine regression in the close path stays observable + # rather than silently reverting to the leaking behavior. + try: + zarr_sync(_close_fs(self.fs)) + except Exception: + logger.debug("Failed to close owned filesystem %r", self.fs, exc_info=True) + super().close() async def clear(self) -> None: # docstring inherited diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 142cb3b00d..8006470174 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -36,7 +36,9 @@ pytest.mark.filterwarnings( re.escape("ignore:datetime.datetime.utcnow() is deprecated:DeprecationWarning") ), - # TODO: fix these warnings + # FsspecStore.from_url() and from_mapper() now close the aiohttp session on store.close(). + # This filter covers stores that are GC'd without an explicit close() call, and any + # residual sessions from aiobotocore's ClientCreatorContext (a separate upstream issue). pytest.mark.filterwarnings("ignore:Unclosed client session:ResourceWarning"), pytest.mark.filterwarnings( "ignore:coroutine 'ClientCreatorContext.__aexit__' was never awaited:RuntimeWarning" @@ -283,6 +285,75 @@ async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None: ): await store.delete_dir("test_prefix") + # ── Filesystem lifecycle (ownership) ────────────────────────────────────── + + def test_from_url_owns_filesystem(self) -> None: + """FsspecStore.from_url() creates the async fs; it must own it.""" + store = FsspecStore.from_url( + f"s3://{test_bucket_name}/lifecycle/", + storage_options={"endpoint_url": endpoint_url, "anon": False}, + ) + assert store._owns_fs + store.close() + + async def test_from_url_close_releases_store(self) -> None: + """ + close() on a from_url() store must succeed without error and mark the + store as closed. For the owned filesystem, _close_fs() is invoked to + release the underlying S3 client / aiohttp connection pool. + """ + store = FsspecStore.from_url( + f"s3://{test_bucket_name}/lifecycle/", + storage_options={"endpoint_url": endpoint_url, "anon": False}, + ) + # Materialise the S3 client and connection pool. + await store.set("probe", cpu.Buffer.from_bytes(b"x")) + + store.close() + + assert not store._is_open + + def test_direct_construction_does_not_own_filesystem(self) -> None: + """Direct FsspecStore() must not claim ownership — the caller owns the fs.""" + try: + from fsspec import url_to_fs + except ImportError: + from fsspec.core import url_to_fs + fs, path = url_to_fs( + f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=True + ) + store = FsspecStore(fs=fs, path=path) + assert not store._owns_fs + + @pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.03.01"), + reason="Prior bug in from_upath", + ) + def test_from_upath_does_not_own_filesystem(self) -> None: + """from_upath() uses the UPath's existing fs; the store must not own it.""" + upath = pytest.importorskip("upath") + path = upath.UPath( + f"s3://{test_bucket_name}/foo/bar/", + endpoint_url=endpoint_url, + anon=False, + asynchronous=True, + ) + store = FsspecStore.from_upath(path) + assert not store._owns_fs + + def test_from_mapper_does_not_own_already_async_filesystem(self) -> None: + """from_mapper() with an already-async fs must not claim ownership.""" + s3_filesystem = s3fs.S3FileSystem( + asynchronous=True, + endpoint_url=endpoint_url, + anon=False, + skip_instance_cache=True, + ) + mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/") + store = FsspecStore.from_mapper(mapper) + # _make_async returns the same instance for an already-async fs. + assert not store._owns_fs + def array_roundtrip(store: FsspecStore) -> None: """ @@ -512,6 +583,82 @@ def test_open_s3map_raises() -> None: zarr.open(store=mapper, storage_options={"anon": True}, mode="w", shape=(3, 3)) +async def test_close_fs_closes_s3_client() -> None: + """ + _close_fs() must call set_session() and then close() on the returned + S3 client. This is verified with mocks to avoid a real S3 connection. + """ + from unittest.mock import AsyncMock + + from zarr.storage._fsspec import _close_fs + + mock_client = AsyncMock() + mock_fs = AsyncMock() + mock_fs.set_session = AsyncMock(return_value=mock_client) + + await _close_fs(mock_fs) + + mock_fs.set_session.assert_called_once() + mock_client.close.assert_called_once() + + +async def test_close_fs_no_op_for_fs_without_set_session() -> None: + """_close_fs() must be a no-op for filesystems that don't expose set_session().""" + from unittest.mock import AsyncMock + + from zarr.storage._fsspec import _close_fs + + mock_fs = AsyncMock(spec=[]) # empty spec — no set_session attribute + await _close_fs(mock_fs) # must not raise + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_from_mapper_owns_wrapped_sync_filesystem(tmp_path: pathlib.Path) -> None: + """ + from_mapper() with a sync fs must wrap it in AsyncFileSystemWrapper and + claim ownership so that close() cleans it up. + + The local filesystem is synchronous; _make_async() produces a new + AsyncFileSystemWrapper instance — a different object from the original fs. + """ + import fsspec as _fsspec + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + fs = _fsspec.filesystem("file", auto_mkdir=True) + mapper = fs.get_mapper(str(tmp_path)) + store = FsspecStore.from_mapper(mapper) + assert isinstance(store.fs, AsyncFileSystemWrapper) + assert store._owns_fs + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_with_read_only_transfers_filesystem_ownership(tmp_path: pathlib.Path) -> None: + """ + with_read_only() must transfer fs ownership to the derived store and clear + it on the source, so the surviving store closes the shared fs exactly once. + + In the common ``from_url(...).with_read_only()`` chain the source store is + immediately unreferenced; if ownership were not transferred, the only owner + would be garbage-collected without close() and the session would leak. + """ + source = FsspecStore.from_url(f"file://{tmp_path}", storage_options={"auto_mkdir": False}) + assert source._owns_fs + + derived = source.with_read_only(read_only=True) + + # Ownership moved to the survivor; the source no longer owns it (no double-close). + assert derived._owns_fs + assert not source._owns_fs + # The derived store shares the same underlying fs. + assert derived.fs is source.fs + + @pytest.mark.parametrize("asynchronous", [True, False]) def test_make_async(asynchronous: bool) -> None: s3_filesystem = s3fs.S3FileSystem(