From 6f70916114c20ccc0d441c1cc2b77ac8180bc081 Mon Sep 17 00:00:00 2001 From: Nicolas Ledez <247138+nledez@users.noreply.github.com> Date: Thu, 21 May 2026 10:21:48 +0200 Subject: [PATCH 1/2] docs(changelog): reconstruct entries for 0.7.5 through 0.7.33 from git history --- CHANGELOG.rst | 190 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3fe9f00..9e2e7d2 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,6 +4,196 @@ Changelog Current ------- +0.7.33 (2026-01-14) +------------------- + +- Upgrade GitHub Actions + +0.7.32 (2026-01-12) +------------------- + +- Drop Python 3.9 support +- Python 3.12 requirements upgrade + +0.7.31 (2025-08-19) +------------------- + +- Fix CI +- Upgrade requirements + +0.7.30 (2025-05-28) +------------------- + +- Remove deprecated ``pkg_resources`` +- Upgrade requirements + +0.7.29 (2025-03-09) +------------------- + +- Upgrade requirements +- Update README + +0.7.28 (2025-01-10) +------------------- + +- Drop support for Python 3.8, add Python 3.13 +- Fix Python 3.13 CI +- Fix ``AttributeError: 'GridFS' object has no attribute '_GridFS__collection'`` +- Use ``main`` branch in GitHub workflows +- Fix docker compose in GitHub Actions CI +- Run ``autoflake`` and ``black`` +- Upgrade requirements + +0.7.27 (2024-07-12) +------------------- + +- Upgrade requirements + +0.7.26 (2024-05-27) +------------------- + +- Use wheel of ``flask_mongoengine3`` directly from PyPI +- Fix MongoDB test errors +- Upgrade requirements + +0.7.25 (2024-05-07) +------------------- + +- Added ``copy()`` method to ``Storage`` +- Upgrade requirements + +0.7.24 (2024-03-11) +------------------- + +- Upgrade requirements + +0.7.23 (2023-10-23) +------------------- + +- Allow setting multiple backends, encrypted or not +- Added ``create_container``/``create_bucket`` options for Swift and S3 +- Ensure server does not serve files outside ``DEBUG`` mode +- Use new ``pytest-flask`` version for CI tests +- Add ``.vscode/settings.json`` for black formatter +- Upgrade Pillow and other requirements + +0.7.22 (2023-10-13) +------------------- + +- Swift backend refactoring +- Upgrade boto3 +- Launch CI build on pull requests and every branch + +0.7.21 (2023-10-09) +------------------- + +- Allow Flask v3 and newer boto3 +- Add Python 3.12 to classifiers and CI matrix +- Use a pytest-flask branch fixing test issues +- Release script exits immediately on non-zero status + +0.7.20 (2023-09-28) +------------------- + +- Upgrade boto3 + +0.7.19 (2023-09-18) +------------------- + +- Upgrade ``actions/checkout`` to v4 +- Disable ``fail-fast`` in CI so jobs finish even on failure +- Cache pip in CI +- Add pre-commit configuration +- Upgrade requirements +- Update README + +0.7.18 (2023-09-12) +------------------- + +- Switch CI from Travis to GitHub Actions (``ci.yml`` + ``release.yml``) +- Add release script integrated with new workflow +- Add coverage report +- Test against pypy3.10 +- Upgrade requirements + +0.7.17 (2023-07-25) +------------------- + +- Stop encrypting files in memory (stream encryption) +- Upgrade boto3, pymongo and sphinx + +0.7.16 (2023-07-13) +------------------- + +- Fix tests +- Remove GitHub dependency, add ``python-keystoneclient`` + +0.7.15 (2023-07-13) +------------------- + +- Allow Flask 2.3 in ``setup.cfg`` +- Fix Swift tests +- Add tests for ``crypto.py`` and encrypted files in storage +- Upgrade libraries + +0.7.14 (2023-05-31) +------------------- + +- Fix tests on Flask 2.3 +- Fix ``KeyError`` when ``FS_AES256_ENCRYPTED`` is not set +- Fix Travis build + +0.7.13 (2023-05-26) +------------------- + +- Fix Flask issue with tests + +0.7.12 (2023-05-26) +------------------- + +- Upgrade libraries + +0.7.11 (2023-05-26) +------------------- + +- Added file encryption/decryption on put/get (AES256) +- Allow ``cryptography`` from 39.0.2 +- Add ``license_file`` in setup +- Add ``launch.json`` for VSCode debugging +- Add issue templates + +0.7.10 (2023-03-13) +------------------- + +- Create Swift container only if missing +- Fix pip install command for development + +0.7.9 (2023-03-03) +------------------ + +- Added ``read_chunks()`` method on ``Storage`` +- Add test ``test_read_chunks`` + +0.7.8 (2023-03-03) +------------------ + +- Use Swift ``auth_version`` 3 by default + +0.7.7 (2023-02-24) +------------------ + +- Change options for Swift + +0.7.6 (2023-02-09) +------------------ + +- Use ``app_context()`` when using ``current_app`` + +0.7.5 (2023-02-06) +------------------ + +- Fix ``LocalBackend.delete``: use ``self.path`` for destination + 0.7.4 (2022-01-24) ------------------ From fde93e3dcc8c25363442d4ff930bdbe0bcf7d03e Mon Sep 17 00:00:00 2001 From: Nicolas Ledez <247138+nledez@users.noreply.github.com> Date: Thu, 21 May 2026 00:36:44 +0200 Subject: [PATCH 2/2] feat(swift): add Connection pool and configurable ETag mismatch policy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Per-process Connection pool (gevent Queue with stdlib fallback) so concurrent greenlets/threads no longer share a single swiftclient.Connection. Fixes sporadic 400s, ConnectionReset and content corruption under gevent workers. - write() pre-computes content_length and ETag and verifies the ETag returned by Swift. New etag_mismatch_policy config (log default / raise / raise_and_delete) controls behavior on mismatch — legacy log-only kept as default. - read_chunks() releases the borrowed Connection back to the pool when the generator is exhausted or close() is called. - list_files() uses full_listing=True to enumerate containers with more than 10 000 objects. - New optional settings: pool_size (20), pool_timeout (30), timeout (60), retries (5), os_options (merged over tenant_name / region_name for Keystone v3). - Tests: app fixture now pushes test_request_context and sets SERVER_NAME so url_for / current_app / request work out of the box; crypto tests use tmp_path instead of leaking encrypted.bin / decrypted.txt in cwd; new tests/test_swift_pool.py covers concurrent writes, pool sizing and ETag mismatch handling. - Adds gevent>=24.0.0 to test requirements. --- CHANGELOG.rst | 19 ++ flask_fs/backends/swift.py | 395 ++++++++++++++++++++++++++++++++----- requirements/test.pip | 1 + tests/conftest.py | 5 +- tests/test_crypto.py | 16 +- tests/test_swift_pool.py | 169 ++++++++++++++++ 6 files changed, 552 insertions(+), 53 deletions(-) create mode 100644 tests/test_swift_pool.py diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 9e2e7d2..3df682f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,6 +4,25 @@ Changelog Current ------- +- Swift backend: per-process ``Connection`` pool (``gevent.queue.Queue`` with + stdlib ``queue.Queue`` fallback) so concurrent greenlets and threads no + longer share a single ``swiftclient.Connection``. Fixes sporadic 400s, + ``ConnectionReset`` errors and content corruption observed under + ``gevent`` workers. +- Swift backend: ``timeout`` and ``retries`` are now passed to every + ``Connection`` (defaults: 60 s timeout, 5 retries). +- Swift backend: ``write()`` pre-computes ``content_length`` and ``ETag`` + (when content is bytes or a seekable file-like) and verifies the ETag + returned by Swift; mismatched objects are deleted and the call raises + ``ClientException`` to prevent silent corruption. +- Swift backend: ``read_chunks()`` releases the borrowed ``Connection`` + back to the pool when the generator is exhausted or ``close()`` is + called. +- Swift backend: ``list_files()`` uses ``full_listing=True`` so containers + with more than 10 000 objects are fully enumerated. +- Swift backend: new optional settings ``pool_size`` (default 20), + ``timeout`` (default 60) and ``retries`` (default 5). + 0.7.33 (2026-01-14) ------------------- diff --git a/flask_fs/backends/swift.py b/flask_fs/backends/swift.py index 58dcf7a..71614e3 100644 --- a/flask_fs/backends/swift.py +++ b/flask_fs/backends/swift.py @@ -1,3 +1,4 @@ +import hashlib import io import logging @@ -6,54 +7,189 @@ import swiftclient +try: + from gevent.queue import Queue as _Queue + from gevent.queue import Empty as _QueueEmpty +except ImportError: + from queue import Queue as _Queue + from queue import Empty as _QueueEmpty + from . import BaseBackend log = logging.getLogger(__name__) +DEFAULT_POOL_SIZE = 20 +DEFAULT_TIMEOUT = 60 +DEFAULT_RETRIES = 5 +DEFAULT_POOL_TIMEOUT = 30 +_READ_CHUNK = 1024 * 1024 + +ETAG_POLICY_LOG = "log" +ETAG_POLICY_RAISE = "raise" +ETAG_POLICY_RAISE_AND_DELETE = "raise_and_delete" +_ETAG_POLICIES = ( + ETAG_POLICY_LOG, + ETAG_POLICY_RAISE, + ETAG_POLICY_RAISE_AND_DELETE, +) +DEFAULT_ETAG_MISMATCH_POLICY = ETAG_POLICY_LOG + + +def _md5(data): + try: + return hashlib.md5(data, usedforsecurity=False) + except TypeError: + return hashlib.md5(data) + + +class PoolExhaustedError(Exception): + """Raised when no Connection becomes available within pool_timeout.""" + class SwiftBackend(BaseBackend): """ - An OpenStack Swift backend + An OpenStack Swift backend with a per-process Connection pool. + + The pool makes the backend safe under concurrent gevent / threaded workers + where multiple greenlets or threads would otherwise share a single + ``swiftclient.Connection`` and corrupt the underlying HTTP socket and + auth-token state. Expect the following settings: - `authurl`: The Swift Auth URL. - - `user`: The Swift user in. + - `user`: The Swift user. - `key`: The user API Key. - `auth_version`: The OpenStack auth version (optional, default: '3'). - - `os_options`: The OpenStack options as a dictonnary with keys such as - 'region_name' (optional, default: None). - - `create_container`: Create the container if it does not - exist (optional, default: False). + - `tenant_name`: OpenStack tenant/project (optional). + - `region_name`: OpenStack region (optional). + - `create_container`: Create the container if it does not exist + (optional, default: False). + - `pool_size`: Number of Connections kept in the pool per process + (optional, default: 20). + - `timeout`: Per-request socket timeout in seconds (optional, default: 60). + - `retries`: Number of swiftclient retries on transient errors + (optional, default: 5). + - `pool_timeout`: Seconds to wait for a free Connection from the pool + before raising ``PoolExhaustedError`` (optional, default: 30). + - `os_options`: Extra OpenStack options dict merged over ``tenant_name`` + / ``region_name`` — needed for Keystone v3 (``user_domain_name``, + ``project_domain_name``, ...). + - `etag_mismatch_policy`: Behavior when the server-returned ETag differs + from the locally computed MD5 (optional, default: ``"log"``). One of: + + * ``"log"`` — log an error, keep the object (legacy behavior). + * ``"raise"`` — log + raise ``swiftclient.ClientException``, keep + the object in Swift. + * ``"raise_and_delete"`` — log + delete the object + raise + ``swiftclient.ClientException``. """ def __init__(self, name, config): - super(SwiftBackend, self).__init__(name, config) - - auth_version = getattr(config, "auth_version", "3") - self.conn = swiftclient.Connection( - user=config.user, - key=config.key, - authurl=config.authurl, - auth_version=auth_version, - os_options={ - "tenant_name": getattr(config, "tenant_name", None), - "region_name": getattr(config, "region_name", None), - }, + super().__init__(name, config) + + self._auth_version = getattr(config, "auth_version", "3") + self._user = config.user + self._key = config.key + self._authurl = config.authurl + os_options = { + "tenant_name": getattr(config, "tenant_name", None), + "region_name": getattr(config, "region_name", None), + } + extra_os_options = getattr(config, "os_options", None) + if extra_os_options: + os_options.update(extra_os_options) + self._os_options = os_options + self._timeout = int(getattr(config, "timeout", DEFAULT_TIMEOUT)) + self._retries = int(getattr(config, "retries", DEFAULT_RETRIES)) + self._pool_size = int(getattr(config, "pool_size", DEFAULT_POOL_SIZE)) + self._pool_timeout = float( + getattr(config, "pool_timeout", DEFAULT_POOL_TIMEOUT) + ) + policy = getattr( + config, "etag_mismatch_policy", DEFAULT_ETAG_MISMATCH_POLICY ) + if policy not in _ETAG_POLICIES: + raise ValueError( + "Invalid etag_mismatch_policy {0!r}, expected one of {1}".format( + policy, _ETAG_POLICIES + ) + ) + self._etag_mismatch_policy = policy + + self._pool = _Queue(maxsize=self._pool_size) + for _ in range(self._pool_size): + self._pool.put(None) if getattr(config, "create_container", False): - try: - self.conn.head_container(self.name) - except swiftclient.exceptions.ClientException: - self.conn.put_container(self.name) + with self._borrow() as conn: + try: + conn.head_container(self.name) + except swiftclient.exceptions.ClientException: + conn.put_container(self.name) - def exists(self, filename): + def _new_connection(self): + return swiftclient.Connection( + user=self._user, + key=self._key, + authurl=self._authurl, + auth_version=self._auth_version, + os_options=dict(self._os_options), + timeout=self._timeout, + retries=self._retries, + ) + + def _acquire_slot(self): try: - self.conn.head_object(self.name, filename) - return True - except swiftclient.ClientException: - return False + return self._pool.get(timeout=self._pool_timeout) + except _QueueEmpty: + raise PoolExhaustedError( + "Swift connection pool exhausted after %.1fs " + "(pool_size=%d) — likely slot leak from an unclosed " + "read_chunks() stream." + % (self._pool_timeout, self._pool_size) + ) + + @contextmanager + def _borrow(self): + slot = self._acquire_slot() + conn = slot if slot is not None else self._new_connection() + keep = True + try: + yield conn + except swiftclient.ClientException as e: + status = getattr(e, "http_status", None) + if status is None or status >= 500 or status in (401, 403): + keep = False + raise + except (OSError, IOError): + keep = False + raise + finally: + if not keep: + try: + conn.close() + except Exception: + pass + self._pool.put(None) + else: + self._pool.put(conn) + + def exists(self, filename): + with self._borrow() as conn: + try: + conn.head_object(self.name, filename) + return True + except swiftclient.ClientException as e: + status = getattr(e, "http_status", None) + if status not in (404, None): + log.error( + "Error checking existence of %s in %s: %s", + filename, + self.name, + e, + ) + return False @contextmanager def open(self, filename, mode="r", encoding="utf8"): @@ -64,48 +200,217 @@ def open(self, filename, mode="r", encoding="utf8"): if "b" in mode else io.StringIO(obj.decode(encoding)) ) - else: # mode == 'w' + else: f = io.BytesIO() if "b" in mode else io.StringIO() yield f self.write(filename, f.getvalue()) def read(self, filename): - _, data = self.conn.get_object(self.name, filename) - return data + with self._borrow() as conn: + _, data = conn.get_object(self.name, filename) + return data - def read_chunks(self, filename, chunks_size=1024 * 1024): - _, data = self.conn.get_object( - self.name, filename, resp_chunk_size=chunks_size - ) - return data + def read_chunks(self, filename, chunks_size=_READ_CHUNK): + slot = self._acquire_slot() + conn = slot if slot is not None else self._new_connection() + try: + _, data = conn.get_object( + self.name, filename, resp_chunk_size=chunks_size + ) + except Exception: + try: + conn.close() + except Exception: + pass + self._pool.put(None) + raise + return _PoolReleasingStream(data, self._pool, conn) + + def _release(self, conn, healthy): + if not healthy: + try: + conn.close() + except Exception: + pass + self._pool.put(None) + else: + self._pool.put(conn) def write(self, filename, content): - self.conn.put_object( - self.name, filename, contents=self.as_binary(content) + binary = self.as_binary(content) + content_length, etag = self._precompute(binary) + + with self._borrow() as conn: + returned_etag = conn.put_object( + self.name, + filename, + contents=binary, + content_length=content_length, + etag=etag, + ) + if ( + etag + and returned_etag + and returned_etag.lower() != etag.lower() + ): + self._handle_etag_mismatch( + conn, filename, etag, returned_etag + ) + + def _handle_etag_mismatch(self, conn, filename, etag, returned_etag): + message = ( + "ETag mismatch for {0} in {1}: local={2} remote={3}".format( + filename, self.name, etag, returned_etag + ) ) + log.error(message) + policy = self._etag_mismatch_policy + if policy == ETAG_POLICY_LOG: + return + if policy == ETAG_POLICY_RAISE_AND_DELETE: + try: + conn.delete_object(self.name, filename) + except swiftclient.ClientException: + log.exception( + "Failed to delete corrupted object %s in %s", + filename, + self.name, + ) + raise swiftclient.ClientException(message) def delete(self, filename): - if self.exists(filename): - self.conn.delete_object(self.name, filename) - else: - _, items = self.conn.get_container(self.name, path=filename) + with self._borrow() as conn: + if self._head(conn, filename): + conn.delete_object(self.name, filename) + return + _, items = conn.get_container(self.name, path=filename) for i in items: - self.conn.delete_object(self.name, i["name"]) + conn.delete_object(self.name, i["name"]) + + def _head(self, conn, filename): + try: + conn.head_object(self.name, filename) + return True + except swiftclient.ClientException: + return False def copy(self, filename, target): dest = "/".join((self.name, target)) - self.conn.copy_object(self.name, filename, destination=dest) + with self._borrow() as conn: + src_meta = conn.head_object(self.name, filename) + response_headers = {} + conn.copy_object( + self.name, + filename, + destination=dest, + response_dict=response_headers, + ) + src_etag = (src_meta.get("etag") or "").lower() + headers = response_headers.get("headers") or {} + dst_etag = (headers.get("etag") or "").lower() + if src_etag and dst_etag and src_etag != dst_etag: + log.error( + "ETag mismatch on copy %s -> %s in %s: src=%s dst=%s", + filename, + target, + self.name, + src_etag, + dst_etag, + ) def list_files(self): - _, items = self.conn.get_container(self.name) + with self._borrow() as conn: + _, items = conn.get_container(self.name, full_listing=True) for i in items: yield i["name"] def get_metadata(self, filename): - data = self.conn.head_object(self.name, filename) + with self._borrow() as conn: + data = conn.head_object(self.name, filename) return { "checksum": "md5:{0}".format(data["etag"]), "size": int(data["content-length"]), "mime": data["content-type"], "modified": parser.parse(data["last-modified"]), } + + def _precompute(self, binary): + """Return (content_length, etag) for upload. + + Only bytes/bytearray are precomputed. File-likes are passed through + to swiftclient unchanged (chunked transfer-encoding, no client-side + ETag) to avoid reading the payload twice. + """ + if isinstance(binary, (bytes, bytearray)): + return len(binary), _md5(binary).hexdigest() + return None, None + + +class _PoolReleasingStream: + """Wrap a swiftclient chunk iterator so the Connection returns to the pool + when the consumer finishes iterating or calls ``close()``. + + Callers must either iterate to completion or call ``close()`` explicitly; + a dropped stream will leak its pool slot until process exit. This is + deliberate: releasing the slot from ``__del__`` is unsafe under gevent + because gc may run in an arbitrary greenlet/thread context. + """ + + def __init__(self, inner, pool, conn): + self._inner = inner + self._iter = iter(inner) + self._pool = pool + self._conn = conn + self._released = False + self._healthy = True + + def __iter__(self): + return self + + def __next__(self): + if self._released: + raise StopIteration + try: + return next(self._iter) + except StopIteration: + self.close() + raise + except BaseException: + self._healthy = False + self.close() + raise + + def close(self): + if self._released: + return + self._released = True + try: + inner_close = getattr(self._inner, "close", None) + if inner_close is not None: + try: + inner_close() + except Exception: + self._healthy = False + finally: + try: + if self._healthy: + self._pool.put(self._conn) + else: + try: + self._conn.close() + except Exception: + pass + self._pool.put(None) + except Exception: + log.exception( + "Failed to release Swift connection back to pool" + ) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + if exc_type is not None: + self._healthy = False + self.close() + return False diff --git a/requirements/test.pip b/requirements/test.pip index 1353413..188378e 100644 --- a/requirements/test.pip +++ b/requirements/test.pip @@ -7,3 +7,4 @@ pytest-sugar==1.1.1 pytest-mock==3.15.1 pillow==12.1.0 flask-mongoengine-3==1.1.0 +gevent>=24.0.0 diff --git a/tests/conftest.py b/tests/conftest.py index 1961848..76b7fef 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,6 +15,7 @@ class TestConfig: DEBUG = True TESTING = True + SERVER_NAME = "localhost" MONGODB_DB = "flask-fs-test" MONGODB_HOST = "localhost" MONGODB_PORT = 27017 @@ -22,6 +23,7 @@ class TestConfig: class TestConfigEncrypted: TESTING = True + SERVER_NAME = "localhost" MONGODB_DB = "flask-fs-test" MONGODB_HOST = "localhost" MONGODB_PORT = 27017 @@ -40,7 +42,8 @@ def configure(self, *storages, **configs): def app(): app = TestFlask("flaskfs-tests") app.config.from_object(TestConfig) - yield app + with app.test_request_context(): + yield app @pytest.fixture diff --git a/tests/test_crypto.py b/tests/test_crypto.py index 6bce897..af3548c 100644 --- a/tests/test_crypto.py +++ b/tests/test_crypto.py @@ -34,17 +34,17 @@ def test_encrypt_content(encryptor): assert encrypted_content != content -def test_encrypt_file(encryptor, plaintext_file): +def test_encrypt_file(encryptor, plaintext_file, tmp_path): encrypted_file_path = encryptor.encrypt_file( - plaintext_file, "encrypted.bin" + plaintext_file, str(tmp_path / "encrypted.bin") ) assert os.path.exists(encrypted_file_path) assert os.path.getsize(encrypted_file_path) > 0 -def test_decrypt_file(encryptor, encrypted_file): +def test_decrypt_file(encryptor, encrypted_file, tmp_path): decrypted_file_path = encryptor.decrypt_file( - encrypted_file, "decrypted.txt" + encrypted_file, str(tmp_path / "decrypted.txt") ) assert os.path.exists(decrypted_file_path) assert os.path.getsize(decrypted_file_path) > 0 @@ -63,9 +63,9 @@ def test_decrypt_entire_file(encryptor, encrypted_file): assert decrypted_content == b"Hello, World!" -def test_decrypt_file_invalid_token(encryptor, plaintext_file): +def test_decrypt_file_invalid_token(encryptor, plaintext_file, tmp_path): encrypted_file_path = encryptor.encrypt_file( - plaintext_file, "encrypted.bin" + plaintext_file, str(tmp_path / "encrypted.bin") ) with open(encrypted_file_path, "r+b") as file: @@ -73,7 +73,9 @@ def test_decrypt_file_invalid_token(encryptor, plaintext_file): file.write(b"\x00") # Modify one byte in the IV with pytest.raises(Exception): - encryptor.decrypt_file(encrypted_file_path, "decrypted.txt") + encryptor.decrypt_file( + encrypted_file_path, str(tmp_path / "decrypted.txt") + ) def test_already_encrypted_file(): diff --git a/tests/test_swift_pool.py b/tests/test_swift_pool.py new file mode 100644 index 0000000..f1fece8 --- /dev/null +++ b/tests/test_swift_pool.py @@ -0,0 +1,169 @@ +import pytest + +try: + import gevent + + HAS_GEVENT = True +except ImportError: + HAS_GEVENT = False + +pytestmark = pytest.mark.skipif( + not HAS_GEVENT, reason="gevent is required for pool concurrency tests" +) + +import swiftclient # noqa: E402 + +from flask_fs.backends.swift import SwiftBackend # noqa: E402 +from flask_fs.storage import Config # noqa: E402 + +USER = "test:tester" +KEY = "testing" +AUTHURL = "http://127.0.0.1:8085/auth/v1.0" + + +def _config(**overrides): + cfg = { + "user": USER, + "key": KEY, + "authurl": AUTHURL, + "tenant_name": "", + "region_name": "", + "auth_version": 1, + "create_container": True, + "pool_size": 5, + } + cfg.update(overrides) + return Config(cfg) + + +def _container_name(request): + raw = "pooltest-{0}".format(request.function.__name__) + return raw.replace("_", "-") + + +def _admin_conn(): + return swiftclient.Connection( + user=USER, key=KEY, authurl=AUTHURL, auth_version=1 + ) + + +def _drop_container(conn, name): + try: + _, items = conn.get_container(name, full_listing=True) + for i in items: + try: + conn.delete_object(name, i["name"]) + except swiftclient.ClientException: + pass + conn.delete_container(name) + except swiftclient.ClientException: + pass + + +@pytest.fixture +def admin_conn(): + return _admin_conn() + + +@pytest.fixture +def container_name(request, admin_conn): + name = _container_name(request) + _drop_container(admin_conn, name) + yield name + _drop_container(admin_conn, name) + + +@pytest.fixture +def backend(container_name): + return SwiftBackend(container_name, _config()) + + +def _payload(i): + return ("payload-{0}-".format(i) * 200).encode("utf-8") + + +def test_concurrent_writes(backend): + keys = ["obj-{0}".format(i) for i in range(50)] + payloads = [_payload(i) for i in range(50)] + + jobs = [gevent.spawn(backend.write, k, p) for k, p in zip(keys, payloads)] + gevent.joinall(jobs, timeout=120) + + for j in jobs: + assert j.successful(), "greenlet raised: {0}".format(j.exception) + + listed = set(backend.list_files()) + assert set(keys).issubset(listed), "missing keys: {0}".format( + set(keys) - listed + ) + + for k, p in zip(keys, payloads): + assert backend.read(k) == p, "content mismatch for {0}".format(k) + + +def test_concurrent_writes_then_reads(backend): + keys = ["dual-{0}".format(i) for i in range(50)] + payloads = [_payload(i) for i in range(50)] + + writes = [ + gevent.spawn(backend.write, k, p) for k, p in zip(keys, payloads) + ] + gevent.joinall(writes, timeout=120) + for j in writes: + assert j.successful(), "write failed: {0}".format(j.exception) + + reads = [gevent.spawn(backend.read, k) for k in keys] + gevent.joinall(reads, timeout=120) + + for j, expected in zip(reads, payloads): + assert j.successful(), "read failed: {0}".format(j.exception) + assert j.value == expected, "content mix-up detected" + + +def test_etag_mismatch_raises_and_deletes(container_name, monkeypatch): + backend = SwiftBackend( + container_name, _config(etag_mismatch_policy="raise_and_delete") + ) + original = swiftclient.Connection.put_object + + def liar(self, container, name, contents, **kwargs): + original(self, container, name, contents, **kwargs) + return "0" * 32 + + monkeypatch.setattr(swiftclient.Connection, "put_object", liar) + + with pytest.raises(swiftclient.ClientException) as exc: + backend.write("bad-etag", b"hello world") + assert "ETag mismatch" in str(exc.value) + + assert not backend.exists("bad-etag") + + +def test_pool_size_respected(container_name): + created = [] + + class CountingBackend(SwiftBackend): + def _new_connection(self): + conn = super()._new_connection() + created.append(id(conn)) + return conn + + backend = CountingBackend(container_name, _config(pool_size=2)) + + def borrow_and_yield(i): + with backend._borrow() as conn: + gevent.sleep(0.05) + conn.put_object(container_name, "k-{0}".format(i), contents=b"x") + + jobs = [gevent.spawn(borrow_and_yield, i) for i in range(10)] + gevent.joinall(jobs, timeout=60) + + for j in jobs: + assert j.successful(), "borrow failed: {0}".format(j.exception) + + assert ( + len(created) <= 2 + ), "pool created {0} connections, expected <= 2".format(len(created)) + assert len(set(created)) == len( + created + ), "_new_connection returned duplicates"