From b9371b0105ae2be5e518a0daf9a35a505e90e9b7 Mon Sep 17 00:00:00 2001 From: Jay Guo Date: Mon, 27 Apr 2026 17:08:50 -0400 Subject: [PATCH 1/2] feat: add flag for pending uploads --- .../cloudstorage/logic/action/multipart.py | 49 +++++++++++++ ckanext/cloudstorage/storage.py | 18 +++++ ckanext/cloudstorage/tests/test_storage.py | 68 +++++++++++++++++++ 3 files changed, 135 insertions(+) diff --git a/ckanext/cloudstorage/logic/action/multipart.py b/ckanext/cloudstorage/logic/action/multipart.py index 988af90d..9e4358ba 100644 --- a/ckanext/cloudstorage/logic/action/multipart.py +++ b/ckanext/cloudstorage/logic/action/multipart.py @@ -269,9 +269,36 @@ def finish_multipart(context, data_dict): upload_id=upload_id, chunks=chunks, ) + resource_id = upload.resource_id upload.delete() upload.commit() + # Clear the pending-upload flag so downstream extensions (xloader) can + # act on the now-committed file. resource_patch fires + # IDomainObjectModification.notify(changed), which is how xloader picks + # this up without requiring a direct dependency. + try: + toolkit.get_action("resource_patch")( + dict(context.copy(), ignore_auth=True), + { + "id": resource_id, + "cloudstorage_multipart_pending": "False", + }, + ) + log.debug( + "cloudstorage multipart: cleared pending flag after finish " + "(resource_id=%s upload_id=%s)", + resource_id, + upload_id, + ) + except Exception: + log.exception( + "cloudstorage multipart: failed to clear pending flag after " + "finish (resource_id=%s upload_id=%s)", + resource_id, + upload_id, + ) + if save_action and save_action == "go-metadata": try: res_dict = toolkit.get_action("resource_show")( @@ -309,6 +336,28 @@ def abort_multipart(context, data_dict): model.Session.commit() + # Clear the pending-upload flag so the resource is not stranded with a + # never-cleared flag after the user cancels a multipart upload. + try: + toolkit.get_action("resource_patch")( + dict(context.copy(), ignore_auth=True), + { + "id": id, + "cloudstorage_multipart_pending": "False", + }, + ) + log.debug( + "cloudstorage multipart: aborted upload ids for resource %s: %s", + id, + aborted, + ) + except Exception: + log.exception( + "cloudstorage multipart: failed to clear pending flag after " + "abort (resource_id=%s)", + id, + ) + return aborted diff --git a/ckanext/cloudstorage/storage.py b/ckanext/cloudstorage/storage.py index 994fe8dd..c361894c 100644 --- a/ckanext/cloudstorage/storage.py +++ b/ckanext/cloudstorage/storage.py @@ -224,6 +224,24 @@ def __init__(self, resource): self._clear = resource.pop("clear_upload", None) multipart_name = resource.pop("multipart_name", None) + # Mark multipart uploads as pending so hooks (e.g. xloader) wait until + # chunks finish. Set here when multipart_name is sent with create/update; + # cleared in finish_multipart / abort_multipart. + if multipart_name: + resource["cloudstorage_multipart_pending"] = "True" + log.info( + "cloudstorage multipart: pending flag set " + "(resource_id=%s filename=%s)", + resource.get("id"), + multipart_name, + ) + log.debug( + "cloudstorage multipart: pending flag details " + "(resource_id=%s can_use_advanced_aws=%s)", + resource.get("id"), + self.can_use_advanced_aws, + ) + # Check to see if a file has been provided if ( isinstance(upload_field_storage, (ALLOWED_UPLOAD_TYPES)) diff --git a/ckanext/cloudstorage/tests/test_storage.py b/ckanext/cloudstorage/tests/test_storage.py index 757072a9..f10691f3 100644 --- a/ckanext/cloudstorage/tests/test_storage.py +++ b/ckanext/cloudstorage/tests/test_storage.py @@ -4,6 +4,7 @@ import pytest from ckan.tests import factories +from ckanext.cloudstorage import storage as cloudstorage_storage from ckanext.cloudstorage.storage import CloudStorage, ResourceCloudStorage @@ -57,3 +58,70 @@ def test_file_path(self): file_path = ResourceCloudStorage.path_from_filename(rid, filename) assert file_path == "/test-parent-dir/resources/abcd1234-abcd-1234-abcd-1234abcd1234/file.txt" + + +class TestMultipartPendingFlag(object): + """Verify the multipart branch in ResourceCloudStorage.__init__ sets + the cloudstorage_multipart_pending flag so xloader skips submission + until cloudstorage_finish_multipart clears it.""" + + def _patch_cloud_storage(self, monkeypatch, advanced_aws=True): + """Bypass real driver wiring so we can construct a + ResourceCloudStorage without cloud credentials.""" + monkeypatch.setattr( + cloudstorage_storage.CloudStorage, "__init__", + lambda self: None, + ) + monkeypatch.setattr( + cloudstorage_storage.ResourceCloudStorage, + "can_use_advanced_aws", + property(lambda self: advanced_aws), + ) + + def test_multipart_name_sets_pending_flag(self, monkeypatch): + self._patch_cloud_storage(monkeypatch, advanced_aws=True) + + resource = { + "id": "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + "multipart_name": "big-file.csv", + } + + ResourceCloudStorage(resource) + + assert resource.get("cloudstorage_multipart_pending") == "True" + assert resource.get("url_type") == "upload" + + def test_no_multipart_name_leaves_flag_unset(self, monkeypatch): + """Plain resource_create / resource_update (non-multipart) must + not set the pending flag, otherwise normal uploads would be + blocked from xloader.""" + self._patch_cloud_storage(monkeypatch, advanced_aws=True) + + resource = { + "id": "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb", + "url": "http://example.com/file.csv", + } + + ResourceCloudStorage(resource) + + assert "cloudstorage_multipart_pending" not in resource + + def test_multipart_name_sets_pending_flag_without_advanced_aws( + self, monkeypatch + ): + """The pending flag must be raised independently of + can_use_advanced_aws. The live part-upload APIs (initiate/upload/ + finish_multipart) still run on deployments where + can_use_advanced_aws is False (e.g. S3 driver without 'host' in + driver_options), so the xloader race exists there too and the flag + must be set.""" + self._patch_cloud_storage(monkeypatch, advanced_aws=False) + + resource = { + "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", + "multipart_name": "big-file.csv", + } + + ResourceCloudStorage(resource) + + assert resource.get("cloudstorage_multipart_pending") == "True" From 0cd79bf42986bde883d6f26835d312f5bb3f5d9c Mon Sep 17 00:00:00 2001 From: Jay Guo Date: Tue, 28 Apr 2026 16:21:48 -0400 Subject: [PATCH 2/2] chore: expose pending_flag_cleared in finish/abort API responses --- .github/workflows/test.yml | 2 +- .../cloudstorage/logic/action/multipart.py | 17 +++-- .../tests/logic/action/test_multipart.py | 65 +++++++++++++++++++ 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1269f256..4dcebe5d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -65,7 +65,7 @@ jobs: sed -i -e 's/use = config:.*/use = config:\/srv\/app\/src\/ckan\/test-core.ini/' test.ini # Install unzip for SonarQube Scan apt-get update - apt-get install unzip -y + apt-get install -y unzip gnupg - name: Setup extension (CKAN >= 2.9) run: | ckan -c test.ini db init diff --git a/ckanext/cloudstorage/logic/action/multipart.py b/ckanext/cloudstorage/logic/action/multipart.py index 9e4358ba..e80cfdb4 100644 --- a/ckanext/cloudstorage/logic/action/multipart.py +++ b/ckanext/cloudstorage/logic/action/multipart.py @@ -237,8 +237,9 @@ def finish_multipart(context, data_dict): :param context: :param data_dict: dict with required key `uploadId` - id of Multipart Upload that should be finished - :returns: None - :rtype: NoneType + :returns: dict with ``commited`` (multipart committed to storage) and + ``pending_flag_cleared`` (whether CKAN extra ``cloudstorage_multipart_pending`` was cleared) + :rtype: dict """ @@ -277,12 +278,13 @@ def finish_multipart(context, data_dict): # act on the now-committed file. resource_patch fires # IDomainObjectModification.notify(changed), which is how xloader picks # this up without requiring a direct dependency. + pending_flag_cleared = True try: toolkit.get_action("resource_patch")( dict(context.copy(), ignore_auth=True), { "id": resource_id, - "cloudstorage_multipart_pending": "False", + "cloudstorage_multipart_pending": False, }, ) log.debug( @@ -292,6 +294,7 @@ def finish_multipart(context, data_dict): upload_id, ) except Exception: + pending_flag_cleared = False log.exception( "cloudstorage multipart: failed to clear pending flag after " "finish (resource_id=%s upload_id=%s)", @@ -318,7 +321,7 @@ def finish_multipart(context, data_dict): except Exception as e: log.error('finish_multipart failed for %s with error %s' % (upload.name, str(e))) log.info('finish_multipart successfully finished for %s' % (upload.name)) - return {"commited": True} + return {"commited": True, "pending_flag_cleared": pending_flag_cleared} def abort_multipart(context, data_dict): @@ -338,12 +341,13 @@ def abort_multipart(context, data_dict): # Clear the pending-upload flag so the resource is not stranded with a # never-cleared flag after the user cancels a multipart upload. + pending_flag_cleared = True try: toolkit.get_action("resource_patch")( dict(context.copy(), ignore_auth=True), { "id": id, - "cloudstorage_multipart_pending": "False", + "cloudstorage_multipart_pending": False, }, ) log.debug( @@ -352,13 +356,14 @@ def abort_multipart(context, data_dict): aborted, ) except Exception: + pending_flag_cleared = False log.exception( "cloudstorage multipart: failed to clear pending flag after " "abort (resource_id=%s)", id, ) - return aborted + return {"aborted": aborted, "pending_flag_cleared": pending_flag_cleared} def clean_multipart(context, data_dict): diff --git a/ckanext/cloudstorage/tests/logic/action/test_multipart.py b/ckanext/cloudstorage/tests/logic/action/test_multipart.py index a03d96d6..8b4ee771 100644 --- a/ckanext/cloudstorage/tests/logic/action/test_multipart.py +++ b/ckanext/cloudstorage/tests/logic/action/test_multipart.py @@ -4,12 +4,31 @@ import requests import pytest +import ckan.plugins.toolkit as toolkit from ckan.tests import factories, helpers from ckanext.cloudstorage.storage import ResourceCloudStorage from ckanext.cloudstorage.utils import FakeFileStorage +def _patch_resource_patch_fails(monkeypatch, message="resource_patch failed"): + _orig = toolkit.get_action + + def get_action(name): + if name == "resource_patch": + + def _raise(*_a, **_kw): + raise RuntimeError(message) + + return _raise + return _orig(name) + + monkeypatch.setattr( + "ckanext.cloudstorage.logic.action.multipart.toolkit.get_action", + get_action, + ) + + @pytest.mark.usefixtures( "with_driver_options", "with_plugins", "with_request_context", "clean_db" ) @@ -56,8 +75,52 @@ def test_upload(self): "cloudstorage_finish_multipart", uploadId=multipart["id"] ) assert result["commited"] + assert result["pending_flag_cleared"] is True assert storage.get_url_from_filename(res["id"], filename) + def test_finish_multipart_pending_flag_when_resource_patch_fails( + self, monkeypatch + ): + filename = "file.txt" + res = factories.Resource() + multipart = helpers.call_action( + "cloudstorage_initiate_multipart", + id=res["id"], + name="file.txt", + size=1024 * 1024 * 5 * 2, + ) + fp = BytesIO(b"b" * 1024 * 1024 * 5) + fp.seek(0) + helpers.call_action( + "cloudstorage_upload_multipart", + uploadId=multipart["id"], + partNumber=1, + upload=FakeFileStorage(fp, filename), + ) + fp = BytesIO(b"a" * 1024 * 1024 * 5) + fp.seek(0) + helpers.call_action( + "cloudstorage_upload_multipart", + uploadId=multipart["id"], + partNumber=2, + upload=FakeFileStorage(fp, filename), + ) + _patch_resource_patch_fails(monkeypatch) + result = helpers.call_action( + "cloudstorage_finish_multipart", uploadId=multipart["id"] + ) + assert result["commited"] is True + assert result["pending_flag_cleared"] is False + + def test_abort_multipart_pending_flag_when_resource_patch_fails( + self, monkeypatch + ): + res = factories.Resource() + _patch_resource_patch_fails(monkeypatch) + result = helpers.call_action("cloudstorage_abort_multipart", id=res["id"]) + assert result["aborted"] == [] + assert result["pending_flag_cleared"] is False + def test_upload_without_resource(self): res = {"id": "random-id"} filename = "file.txt" @@ -100,6 +163,7 @@ def test_upload_without_resource(self): "cloudstorage_finish_multipart", uploadId=multipart["id"] ) assert result["commited"] + assert result["pending_flag_cleared"] is True assert storage.get_url_from_filename(res["id"], filename) def test_reupload(self): @@ -147,4 +211,5 @@ def _upload(res, filename, size, parts): "cloudstorage_finish_multipart", uploadId=multipart["id"] ) assert result["commited"] + assert result["pending_flag_cleared"] is True return helpers.call_action("resource_update", **dict(res, url_type="upload", url=filename))