Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
sed -i -e 's/use = config:.*/use = config:\/srv\/app\/src\/ckan\/test-core.ini/' test.ini
# Install unzip for SonarQube Scan
apt-get update
apt-get install unzip -y
apt-get install -y unzip gnupg
- name: Setup extension (CKAN >= 2.9)
run: |
ckan -c test.ini db init
Expand Down
62 changes: 58 additions & 4 deletions ckanext/cloudstorage/logic/action/multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,9 @@ def finish_multipart(context, data_dict):

:param context:
:param data_dict: dict with required key `uploadId` - id of Multipart Upload that should be finished
:returns: None
:rtype: NoneType
:returns: dict with ``commited`` (multipart committed to storage) and
``pending_flag_cleared`` (whether CKAN extra ``cloudstorage_multipart_pending`` was cleared)
:rtype: dict

"""

Expand Down Expand Up @@ -269,9 +270,38 @@ def finish_multipart(context, data_dict):
upload_id=upload_id,
chunks=chunks,
)
resource_id = upload.resource_id
upload.delete()
upload.commit()

# Clear the pending-upload flag so downstream extensions (xloader) can
# act on the now-committed file. resource_patch fires
# IDomainObjectModification.notify(changed), which is how xloader picks
# this up without requiring a direct dependency.
pending_flag_cleared = True
try:
toolkit.get_action("resource_patch")(
dict(context.copy(), ignore_auth=True),
{
"id": resource_id,
"cloudstorage_multipart_pending": False,
},
)
log.debug(
"cloudstorage multipart: cleared pending flag after finish "
"(resource_id=%s upload_id=%s)",
resource_id,
upload_id,
)
except Exception:
pending_flag_cleared = False
log.exception(
Comment thread
jguo144 marked this conversation as resolved.
"cloudstorage multipart: failed to clear pending flag after "
"finish (resource_id=%s upload_id=%s)",
resource_id,
upload_id,
)

if save_action and save_action == "go-metadata":
try:
res_dict = toolkit.get_action("resource_show")(
Expand All @@ -291,7 +321,7 @@ def finish_multipart(context, data_dict):
except Exception as e:
log.error('finish_multipart failed for %s with error %s' % (upload.name, str(e)))
log.info('finish_multipart successfully finished for %s' % (upload.name))
return {"commited": True}
return {"commited": True, "pending_flag_cleared": pending_flag_cleared}


def abort_multipart(context, data_dict):
Expand All @@ -309,7 +339,31 @@ def abort_multipart(context, data_dict):

model.Session.commit()

return aborted
# Clear the pending-upload flag so the resource is not stranded with a
# never-cleared flag after the user cancels a multipart upload.
pending_flag_cleared = True
try:
toolkit.get_action("resource_patch")(
dict(context.copy(), ignore_auth=True),
{
"id": id,
"cloudstorage_multipart_pending": False,
},
)
log.debug(
"cloudstorage multipart: aborted upload ids for resource %s: %s",
id,
aborted,
)
except Exception:
pending_flag_cleared = False
log.exception(
"cloudstorage multipart: failed to clear pending flag after "
"abort (resource_id=%s)",
id,
)

return {"aborted": aborted, "pending_flag_cleared": pending_flag_cleared}


def clean_multipart(context, data_dict):
Expand Down
18 changes: 18 additions & 0 deletions ckanext/cloudstorage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,24 @@ def __init__(self, resource):
self._clear = resource.pop("clear_upload", None)
multipart_name = resource.pop("multipart_name", None)

# Mark multipart uploads as pending so hooks (e.g. xloader) wait until
# chunks finish. Set here when multipart_name is sent with create/update;
# cleared in finish_multipart / abort_multipart.
if multipart_name:
resource["cloudstorage_multipart_pending"] = "True"
log.info(
"cloudstorage multipart: pending flag set "
"(resource_id=%s filename=%s)",
resource.get("id"),
multipart_name,
)
log.debug(
"cloudstorage multipart: pending flag details "
"(resource_id=%s can_use_advanced_aws=%s)",
resource.get("id"),
self.can_use_advanced_aws,
)

# Check to see if a file has been provided
if (
isinstance(upload_field_storage, (ALLOWED_UPLOAD_TYPES))
Expand Down
65 changes: 65 additions & 0 deletions ckanext/cloudstorage/tests/logic/action/test_multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,31 @@

import requests
import pytest
import ckan.plugins.toolkit as toolkit
from ckan.tests import factories, helpers

from ckanext.cloudstorage.storage import ResourceCloudStorage
from ckanext.cloudstorage.utils import FakeFileStorage


def _patch_resource_patch_fails(monkeypatch, message="resource_patch failed"):
_orig = toolkit.get_action

def get_action(name):
if name == "resource_patch":

def _raise(*_a, **_kw):
raise RuntimeError(message)

return _raise
return _orig(name)

monkeypatch.setattr(
"ckanext.cloudstorage.logic.action.multipart.toolkit.get_action",
get_action,
)


@pytest.mark.usefixtures(
"with_driver_options", "with_plugins", "with_request_context", "clean_db"
)
Expand Down Expand Up @@ -56,8 +75,52 @@ def test_upload(self):
"cloudstorage_finish_multipart", uploadId=multipart["id"]
)
assert result["commited"]
assert result["pending_flag_cleared"] is True
assert storage.get_url_from_filename(res["id"], filename)

def test_finish_multipart_pending_flag_when_resource_patch_fails(
self, monkeypatch
):
filename = "file.txt"
res = factories.Resource()
multipart = helpers.call_action(
"cloudstorage_initiate_multipart",
id=res["id"],
name="file.txt",
size=1024 * 1024 * 5 * 2,
)
fp = BytesIO(b"b" * 1024 * 1024 * 5)
fp.seek(0)
helpers.call_action(
"cloudstorage_upload_multipart",
uploadId=multipart["id"],
partNumber=1,
upload=FakeFileStorage(fp, filename),
)
fp = BytesIO(b"a" * 1024 * 1024 * 5)
fp.seek(0)
helpers.call_action(
"cloudstorage_upload_multipart",
uploadId=multipart["id"],
partNumber=2,
upload=FakeFileStorage(fp, filename),
)
_patch_resource_patch_fails(monkeypatch)
result = helpers.call_action(
"cloudstorage_finish_multipart", uploadId=multipart["id"]
)
assert result["commited"] is True
assert result["pending_flag_cleared"] is False

def test_abort_multipart_pending_flag_when_resource_patch_fails(
self, monkeypatch
):
res = factories.Resource()
_patch_resource_patch_fails(monkeypatch)
result = helpers.call_action("cloudstorage_abort_multipart", id=res["id"])
assert result["aborted"] == []
assert result["pending_flag_cleared"] is False

def test_upload_without_resource(self):
res = {"id": "random-id"}
filename = "file.txt"
Expand Down Expand Up @@ -100,6 +163,7 @@ def test_upload_without_resource(self):
"cloudstorage_finish_multipart", uploadId=multipart["id"]
)
assert result["commited"]
assert result["pending_flag_cleared"] is True
assert storage.get_url_from_filename(res["id"], filename)

def test_reupload(self):
Expand Down Expand Up @@ -147,4 +211,5 @@ def _upload(res, filename, size, parts):
"cloudstorage_finish_multipart", uploadId=multipart["id"]
)
assert result["commited"]
assert result["pending_flag_cleared"] is True
return helpers.call_action("resource_update", **dict(res, url_type="upload", url=filename))
68 changes: 68 additions & 0 deletions ckanext/cloudstorage/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
from ckan.tests import factories

from ckanext.cloudstorage import storage as cloudstorage_storage
from ckanext.cloudstorage.storage import CloudStorage, ResourceCloudStorage


Expand Down Expand Up @@ -57,3 +58,70 @@ def test_file_path(self):
file_path = ResourceCloudStorage.path_from_filename(rid, filename)

assert file_path == "/test-parent-dir/resources/abcd1234-abcd-1234-abcd-1234abcd1234/file.txt"


class TestMultipartPendingFlag(object):
"""Verify the multipart branch in ResourceCloudStorage.__init__ sets
the cloudstorage_multipart_pending flag so xloader skips submission
until cloudstorage_finish_multipart clears it."""

def _patch_cloud_storage(self, monkeypatch, advanced_aws=True):
"""Bypass real driver wiring so we can construct a
ResourceCloudStorage without cloud credentials."""
monkeypatch.setattr(
cloudstorage_storage.CloudStorage, "__init__",
lambda self: None,
)
monkeypatch.setattr(
cloudstorage_storage.ResourceCloudStorage,
"can_use_advanced_aws",
property(lambda self: advanced_aws),
)

def test_multipart_name_sets_pending_flag(self, monkeypatch):
self._patch_cloud_storage(monkeypatch, advanced_aws=True)

resource = {
"id": "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
"multipart_name": "big-file.csv",
}

ResourceCloudStorage(resource)

assert resource.get("cloudstorage_multipart_pending") == "True"
assert resource.get("url_type") == "upload"

def test_no_multipart_name_leaves_flag_unset(self, monkeypatch):
"""Plain resource_create / resource_update (non-multipart) must
not set the pending flag, otherwise normal uploads would be
blocked from xloader."""
self._patch_cloud_storage(monkeypatch, advanced_aws=True)

resource = {
"id": "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb",
"url": "http://example.com/file.csv",
}

ResourceCloudStorage(resource)

assert "cloudstorage_multipart_pending" not in resource

def test_multipart_name_sets_pending_flag_without_advanced_aws(
self, monkeypatch
):
"""The pending flag must be raised independently of
can_use_advanced_aws. The live part-upload APIs (initiate/upload/
finish_multipart) still run on deployments where
can_use_advanced_aws is False (e.g. S3 driver without 'host' in
driver_options), so the xloader race exists there too and the flag
must be set."""
self._patch_cloud_storage(monkeypatch, advanced_aws=False)

resource = {
"id": "cccccccc-cccc-cccc-cccc-cccccccccccc",
"multipart_name": "big-file.csv",
}

ResourceCloudStorage(resource)

assert resource.get("cloudstorage_multipart_pending") == "True"
Loading