Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,18 @@ on:
jobs:
run:
runs-on: ${{ matrix.os }}
# The free-threaded build (3.14t) is allowed to fail while numba and the
# rest of the stack finish their no-GIL support, so a broken install or a
# thread-safety failure reports signal without turning the PR check red or
# blocking a merge. Promote it to a required job once it stays green.
continue-on-error: ${{ endsWith(matrix.python, 't') }}
# Cap the free-threaded job so a no-GIL deadlock can't sit on a runner for
# the 360-minute default; other versions keep that default.
timeout-minutes: ${{ endsWith(matrix.python, 't') && 30 || 360 }}
strategy:
matrix:
os: ['ubuntu-latest', 'macos-latest', 'windows-latest']
python: ${{ github.event_name == 'pull_request' && fromJson('["3.14"]') || fromJson('["3.12", "3.13", "3.14"]') }}
python: ${{ github.event_name == 'pull_request' && fromJson('["3.14", "3.14t"]') || fromJson('["3.12", "3.13", "3.14", "3.14t"]') }}
env:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python }}
Expand All @@ -46,7 +54,18 @@ jobs:
# (today: the six compression fixtures). push-to-main and the
# nightly schedule run the full set with no filter.
if: github.event_name == 'pull_request'
# Force the GIL off only for the free-threaded (3.14t) test run so the
# suite exercises the real no-GIL path instead of letting CPython
# silently re-enable the GIL for a C extension that hasn't declared
# free-threaded support. Scoped to the pytest step (not the job) so it
# never reaches setup-python's bootstrap interpreter, where
# PYTHON_GIL=0 is a fatal error on a GIL build. Empty (ignored) on the
# GIL versions.
env:
PYTHON_GIL: ${{ endsWith(matrix.python, 't') && '0' || '' }}
run: pytest -m "not slow"
- name: Run pytest (full)
if: github.event_name != 'pull_request'
env:
PYTHON_GIL: ${{ endsWith(matrix.python, 't') && '0' || '' }}
run: pytest
26 changes: 21 additions & 5 deletions xrspatial/geotiff/_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -1680,9 +1680,22 @@ def __init__(self, url: str, **storage_options):
self._size = self._fs.size(self._path)

def read_range(self, start: int, length: int) -> bytes:
with self._fs.open(self._path, 'rb') as f:
f.seek(start)
return f.read(length)
# Use the stateless ``cat_file`` ranged-read API rather than
# ``open()`` + ``seek()`` + ``read()``. For some backends
# ``fs.open(path, 'rb')`` hands back a shared, process-global file
# object -- notably fsspec's ``MemoryFileSystem``, where every
# open of a path returns the same stored buffer. Concurrent
# windowed reads (e.g. dask chunk tasks under the free-threaded
# interpreter, or ``read_ranges``' own worker pool) then race on
# that single handle's cursor: one thread's ``seek`` lands under
# another's ``read``, corrupting the returned bytes (surfacing as
# ``zlib.error: incorrect header check`` when the bytes feed a
# tile decoder) or reading from a handle the context manager just
# closed. ``cat_file`` returns a fresh byte slice per call with no
# shared cursor, so it is safe under true parallelism. See issue
# #3361. ``end`` is exclusive and clamps at EOF, matching
# ``read(length)``.
return self._fs.cat_file(self._path, start=start, end=start + length)

def read_ranges(
self,
Expand Down Expand Up @@ -1740,8 +1753,11 @@ def read_ranges_coalesced(
return split_coalesced_bytes(merged_bytes, mapping)

def read_all(self) -> bytes:
with self._fs.open(self._path, 'rb') as f:
return f.read()
# ``cat_file`` with no range reads the whole object and, like
# ``read_range`` above, avoids the shared-handle seek/close race
# that ``open()`` exposes on backends such as fsspec's
# ``MemoryFileSystem`` under concurrent reads (issue #3361).
return self._fs.cat_file(self._path)

@property
def size(self) -> int:
Expand Down
142 changes: 142 additions & 0 deletions xrspatial/geotiff/tests/read/test_cloud_source_concurrency_3361.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""Regression tests for the _CloudSource shared-handle data race (#3361).

``_CloudSource.read_range`` used to do ``fs.open(path, 'rb')`` followed by
``seek`` + ``read``. For backends whose ``open`` returns a shared,
process-global file object -- notably fsspec's ``MemoryFileSystem``, where
every open of a path hands back the same stored buffer -- concurrent
windowed reads raced on that single cursor. Under the free-threaded
interpreter (3.14t) this corrupted tile bytes (``zlib.error: incorrect
header check``) or read from a handle another thread had just closed
(``ValueError: I/O operation on closed file``). The GIL masked the race,
so the integration repro (``chunks=`` open of a ``memory://`` COG) only
fails on the free-threaded lane.

The fix routes reads through the stateless ``cat_file`` ranged API. These
tests pin that contract deterministically (without depending on the
free-threaded interpreter) by driving ``read_range`` against a filesystem
whose ``open`` returns a shared handle and whose ``seek`` is barrier-
synchronised, so a shared-cursor implementation provably returns the wrong
bytes while the ``cat_file`` implementation does not.
"""
import threading

import pytest

fsspec = pytest.importorskip("fsspec")

from xrspatial.geotiff._sources import _CloudSource # noqa: E402


class _SharedHandle:
"""A single buffer with one shared cursor, like an fsspec MemoryFile."""

def __init__(self, buf, seek_barrier=None):
self._buf = buf
self.pos = 0
self._seek_barrier = seek_barrier

def seek(self, pos, whence=0):
assert whence == 0
self.pos = pos
# Force every concurrent reader to finish seeking before any of
# them reads. On a shared cursor this guarantees at least one read
# observes another thread's offset -- the exact race the fix
# removes -- making the test deterministic under the GIL.
if self._seek_barrier is not None:
self._seek_barrier.wait()

def read(self, n=-1):
return self._buf[self.pos:self.pos + n] if n >= 0 else self._buf[self.pos:]

def close(self):
pass

def __enter__(self):
return self

def __exit__(self, *exc):
return False


class _SharedHandleFS:
"""Filesystem stub: ``open`` returns one shared handle; ``cat_file``
is the stateless ranged read that the fix relies on."""

def __init__(self, buf, seek_barrier=None):
self._data = buf
self._handle = _SharedHandle(buf, seek_barrier=seek_barrier)

def open(self, path, mode='rb'):
return self._handle

def cat_file(self, path, start=None, end=None):
return self._data[start:end]

def size(self, path):
return len(self._data)


def _cloud_source_over(fs):
src = _CloudSource.__new__(_CloudSource)
src._url = "memory://stub/x.bin"
src._fs = fs
src._path = "stub/x.bin"
src._size = len(fs._data)
return src


def test_cloud_source_concurrent_read_range_no_shared_cursor_3361():
"""Concurrent ``read_range`` calls must each return their own bytes
even when the backend's ``open`` exposes a shared cursor."""
data = bytes((i % 251) for i in range(1000))
barrier = threading.Barrier(2)
src = _cloud_source_over(_SharedHandleFS(data, seek_barrier=barrier))

results = {}

def worker(start):
results[start] = src.read_range(start, 100)

threads = [threading.Thread(target=worker, args=(s,)) for s in (0, 500)]
for t in threads:
t.start()
for t in threads:
t.join()

# A shared-cursor implementation makes both reads land on whichever
# seek ran last, so at least one of these is wrong. ``cat_file`` keeps
# them independent.
assert results[0] == data[0:100]
assert results[500] == data[500:600]


def test_cloud_source_read_range_matches_read_semantics_3361():
"""``cat_file``-based ``read_range`` keeps ``seek``+``read`` semantics,
including the EOF clamp the COG header prefetch relies on."""
data = b"ABCDE"
src = _cloud_source_over(_SharedHandleFS(data))
assert src.read_range(0, 5) == b"ABCDE"
assert src.read_range(0, 100) == b"ABCDE" # length past EOF clamps
assert src.read_range(3, 100) == b"DE"
assert src.read_range(2, 0) == b""
assert src.read_range(5, 10) == b"" # start at EOF


def test_cloud_source_read_range_real_memory_fs_3361():
"""End-to-end against a real fsspec ``memory://`` source: every range
reads correctly through the live ``MemoryFileSystem`` (whose ``open``
really does return a shared handle)."""
import os
import uuid

fs = fsspec.filesystem("memory")
data = bytes((i % 256) for i in range(4096))
key = f"test3361-{os.getpid()}-{uuid.uuid4().hex[:8]}/x.bin"
fs.pipe_file(key, data)
try:
src = _CloudSource("memory://" + key)
for start, length in [(0, 16), (100, 200), (4000, 500), (4096, 8)]:
assert src.read_range(start, length) == data[start:start + length]
assert src.read_all() == data
finally:
fs.rm_file(key)
Loading