From 36b8bce8ce4ee02c9254a96123d0cd46a005d1db Mon Sep 17 00:00:00 2001 From: Jay Guo Date: Fri, 1 May 2026 11:31:19 -0400 Subject: [PATCH 1/2] fix: refactor pending flag clear --- .../cloudstorage/logic/action/multipart.py | 56 +++++++++++-------- .../tests/logic/action/test_multipart.py | 27 +++------ 2 files changed, 42 insertions(+), 41 deletions(-) diff --git a/ckanext/cloudstorage/logic/action/multipart.py b/ckanext/cloudstorage/logic/action/multipart.py index e80cfdb..4e9b550 100644 --- a/ckanext/cloudstorage/logic/action/multipart.py +++ b/ckanext/cloudstorage/logic/action/multipart.py @@ -8,6 +8,7 @@ import ckan.plugins.toolkit as toolkit import libcloud.security from ckan.lib.uploader import get_resource_uploader +from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.orm.exc import NoResultFound from werkzeug.datastructures import FileStorage as FlaskFileStorage @@ -36,6 +37,26 @@ def _get_object_url(uploader, name): return "/" + uploader.container_name + "/" + name +def _clear_cloudstorage_multipart_pending(resource_id): + """Remove ``cloudstorage_multipart_pending`` from resource extras and commit. + + Avoids ``resource_patch`` / ``package_update`` (full package round-trip and + Solr via that path). Domain-object observers still run on commit so + extensions (e.g. xloader) can react via ``IDomainObjectModification``. + """ + resource = model.Resource.get(resource_id) + if resource is None: + raise toolkit.ObjectNotFound("Resource was not found.") + + extras = dict(resource.extras or {}) + if extras.pop("cloudstorage_multipart_pending", None) is None: + return + + resource.extras = extras + flag_modified(resource, "extras") + model.Session.commit() + + def _delete_multipart(upload, uploader): log.debug("_delete_multipart url {0}".format(_get_object_url(uploader, upload.name))) log.debug("_delete_multipart id {0}".format(upload.id)) @@ -247,13 +268,16 @@ def finish_multipart(context, data_dict): upload_id = toolkit.get_or_bust(data_dict, "uploadId") save_action = data_dict.get("save_action", False) upload = model.Session.query(MultipartUpload).get(upload_id) + resource_id = upload.resource_id + upload_name = upload.name + chunks = [ (part.n, part.etag) for part in model.Session.query(MultipartPart) .filter_by(upload_id=upload_id) .order_by(MultipartPart.n) ] - uploader = get_resource_uploader({"id": upload.resource_id}) + uploader = get_resource_uploader({"id": resource_id}) # Disable the block below, because it causes 404 error when user # uploads a file with the same name as previous file. @@ -266,27 +290,19 @@ def finish_multipart(context, data_dict): uploader.driver._commit_multipart( container=uploader.container, - object_name=upload.name, + object_name=upload_name, upload_id=upload_id, chunks=chunks, ) - resource_id = upload.resource_id + upload.delete() upload.commit() - # Clear the pending-upload flag so downstream extensions (xloader) can - # act on the now-committed file. resource_patch fires - # IDomainObjectModification.notify(changed), which is how xloader picks - # this up without requiring a direct dependency. + # Clear the pending-upload flag without resource_patch/package_update. + # Session.commit() still triggers IDomainObjectModification observers (xloader). pending_flag_cleared = True try: - toolkit.get_action("resource_patch")( - dict(context.copy(), ignore_auth=True), - { - "id": resource_id, - "cloudstorage_multipart_pending": False, - }, - ) + _clear_cloudstorage_multipart_pending(resource_id) log.debug( "cloudstorage multipart: cleared pending flag after finish " "(resource_id=%s upload_id=%s)", @@ -319,8 +335,8 @@ def finish_multipart(context, data_dict): resource.last_modified = datetime.datetime.utcnow() resource.commit() except Exception as e: - log.error('finish_multipart failed for %s with error %s' % (upload.name, str(e))) - log.info('finish_multipart successfully finished for %s' % (upload.name)) + log.error('finish_multipart failed for %s with error %s' % (upload_name, str(e))) + log.info('finish_multipart successfully finished for %s' % (upload_name)) return {"commited": True, "pending_flag_cleared": pending_flag_cleared} @@ -343,13 +359,7 @@ def abort_multipart(context, data_dict): # never-cleared flag after the user cancels a multipart upload. pending_flag_cleared = True try: - toolkit.get_action("resource_patch")( - dict(context.copy(), ignore_auth=True), - { - "id": id, - "cloudstorage_multipart_pending": False, - }, - ) + _clear_cloudstorage_multipart_pending(id) log.debug( "cloudstorage multipart: aborted upload ids for resource %s: %s", id, diff --git a/ckanext/cloudstorage/tests/logic/action/test_multipart.py b/ckanext/cloudstorage/tests/logic/action/test_multipart.py index 8b4ee77..9a59220 100644 --- a/ckanext/cloudstorage/tests/logic/action/test_multipart.py +++ b/ckanext/cloudstorage/tests/logic/action/test_multipart.py @@ -4,28 +4,19 @@ import requests import pytest -import ckan.plugins.toolkit as toolkit from ckan.tests import factories, helpers from ckanext.cloudstorage.storage import ResourceCloudStorage from ckanext.cloudstorage.utils import FakeFileStorage -def _patch_resource_patch_fails(monkeypatch, message="resource_patch failed"): - _orig = toolkit.get_action - - def get_action(name): - if name == "resource_patch": - - def _raise(*_a, **_kw): - raise RuntimeError(message) - - return _raise - return _orig(name) +def _patch_clear_pending_fails(monkeypatch, message="clear pending failed"): + def _raise(_resource_id): + raise RuntimeError(message) monkeypatch.setattr( - "ckanext.cloudstorage.logic.action.multipart.toolkit.get_action", - get_action, + "ckanext.cloudstorage.logic.action.multipart._clear_cloudstorage_multipart_pending", + _raise, ) @@ -78,7 +69,7 @@ def test_upload(self): assert result["pending_flag_cleared"] is True assert storage.get_url_from_filename(res["id"], filename) - def test_finish_multipart_pending_flag_when_resource_patch_fails( + def test_finish_multipart_pending_flag_when_clear_pending_fails( self, monkeypatch ): filename = "file.txt" @@ -105,18 +96,18 @@ def test_finish_multipart_pending_flag_when_resource_patch_fails( partNumber=2, upload=FakeFileStorage(fp, filename), ) - _patch_resource_patch_fails(monkeypatch) + _patch_clear_pending_fails(monkeypatch) result = helpers.call_action( "cloudstorage_finish_multipart", uploadId=multipart["id"] ) assert result["commited"] is True assert result["pending_flag_cleared"] is False - def test_abort_multipart_pending_flag_when_resource_patch_fails( + def test_abort_multipart_pending_flag_when_clear_pending_fails( self, monkeypatch ): res = factories.Resource() - _patch_resource_patch_fails(monkeypatch) + _patch_clear_pending_fails(monkeypatch) result = helpers.call_action("cloudstorage_abort_multipart", id=res["id"]) assert result["aborted"] == [] assert result["pending_flag_cleared"] is False From 3baafb213190b4dac9c7812830c32c27f8de84f3 Mon Sep 17 00:00:00 2001 From: Jay Guo Date: Fri, 1 May 2026 15:33:01 -0400 Subject: [PATCH 2/2] fix: treat missing resource as already cleared in pending-flag helper --- ckanext/cloudstorage/logic/action/multipart.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ckanext/cloudstorage/logic/action/multipart.py b/ckanext/cloudstorage/logic/action/multipart.py index 4e9b550..af2b651 100644 --- a/ckanext/cloudstorage/logic/action/multipart.py +++ b/ckanext/cloudstorage/logic/action/multipart.py @@ -43,10 +43,12 @@ def _clear_cloudstorage_multipart_pending(resource_id): Avoids ``resource_patch`` / ``package_update`` (full package round-trip and Solr via that path). Domain-object observers still run on commit so extensions (e.g. xloader) can react via ``IDomainObjectModification``. + + If the resource row is gone, returns without error (nothing to clear). """ resource = model.Resource.get(resource_id) if resource is None: - raise toolkit.ObjectNotFound("Resource was not found.") + return extras = dict(resource.extras or {}) if extras.pop("cloudstorage_multipart_pending", None) is None: