Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 35 additions & 23 deletions ckanext/cloudstorage/logic/action/multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import ckan.plugins.toolkit as toolkit
import libcloud.security
from ckan.lib.uploader import get_resource_uploader
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.orm.exc import NoResultFound
from werkzeug.datastructures import FileStorage as FlaskFileStorage

Expand Down Expand Up @@ -36,6 +37,28 @@ def _get_object_url(uploader, name):
return "/" + uploader.container_name + "/" + name


def _clear_cloudstorage_multipart_pending(resource_id):
"""Remove ``cloudstorage_multipart_pending`` from resource extras and commit.

Avoids ``resource_patch`` / ``package_update`` (full package round-trip and
Solr via that path). Domain-object observers still run on commit so
extensions (e.g. xloader) can react via ``IDomainObjectModification``.

If the resource row is gone, returns without error (nothing to clear).
"""
resource = model.Resource.get(resource_id)
if resource is None:
return

extras = dict(resource.extras or {})
if extras.pop("cloudstorage_multipart_pending", None) is None:
return

resource.extras = extras
flag_modified(resource, "extras")
model.Session.commit()


def _delete_multipart(upload, uploader):
log.debug("_delete_multipart url {0}".format(_get_object_url(uploader, upload.name)))
log.debug("_delete_multipart id {0}".format(upload.id))
Expand Down Expand Up @@ -247,13 +270,16 @@ def finish_multipart(context, data_dict):
upload_id = toolkit.get_or_bust(data_dict, "uploadId")
save_action = data_dict.get("save_action", False)
upload = model.Session.query(MultipartUpload).get(upload_id)
resource_id = upload.resource_id
upload_name = upload.name

chunks = [
(part.n, part.etag)
for part in model.Session.query(MultipartPart)
.filter_by(upload_id=upload_id)
.order_by(MultipartPart.n)
]
uploader = get_resource_uploader({"id": upload.resource_id})
uploader = get_resource_uploader({"id": resource_id})

# Disable the block below, because it causes 404 error when user
# uploads a file with the same name as previous file.
Expand All @@ -266,27 +292,19 @@ def finish_multipart(context, data_dict):

uploader.driver._commit_multipart(
container=uploader.container,
object_name=upload.name,
object_name=upload_name,
upload_id=upload_id,
chunks=chunks,
)
resource_id = upload.resource_id

upload.delete()
upload.commit()

# Clear the pending-upload flag so downstream extensions (xloader) can
# act on the now-committed file. resource_patch fires
# IDomainObjectModification.notify(changed), which is how xloader picks
# this up without requiring a direct dependency.
# Clear the pending-upload flag without resource_patch/package_update.
# Session.commit() still triggers IDomainObjectModification observers (xloader).
pending_flag_cleared = True
try:
toolkit.get_action("resource_patch")(
dict(context.copy(), ignore_auth=True),
{
"id": resource_id,
"cloudstorage_multipart_pending": False,
},
)
_clear_cloudstorage_multipart_pending(resource_id)
log.debug(
"cloudstorage multipart: cleared pending flag after finish "
"(resource_id=%s upload_id=%s)",
Expand Down Expand Up @@ -319,8 +337,8 @@ def finish_multipart(context, data_dict):
resource.last_modified = datetime.datetime.utcnow()
resource.commit()
except Exception as e:
log.error('finish_multipart failed for %s with error %s' % (upload.name, str(e)))
log.info('finish_multipart successfully finished for %s' % (upload.name))
log.error('finish_multipart failed for %s with error %s' % (upload_name, str(e)))
log.info('finish_multipart successfully finished for %s' % (upload_name))
return {"commited": True, "pending_flag_cleared": pending_flag_cleared}


Expand All @@ -343,13 +361,7 @@ def abort_multipart(context, data_dict):
# never-cleared flag after the user cancels a multipart upload.
pending_flag_cleared = True
try:
toolkit.get_action("resource_patch")(
dict(context.copy(), ignore_auth=True),
{
"id": id,
"cloudstorage_multipart_pending": False,
},
)
_clear_cloudstorage_multipart_pending(id)
log.debug(
"cloudstorage multipart: aborted upload ids for resource %s: %s",
id,
Expand Down
27 changes: 9 additions & 18 deletions ckanext/cloudstorage/tests/logic/action/test_multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,19 @@

import requests
import pytest
import ckan.plugins.toolkit as toolkit
from ckan.tests import factories, helpers

from ckanext.cloudstorage.storage import ResourceCloudStorage
from ckanext.cloudstorage.utils import FakeFileStorage


def _patch_resource_patch_fails(monkeypatch, message="resource_patch failed"):
_orig = toolkit.get_action

def get_action(name):
if name == "resource_patch":

def _raise(*_a, **_kw):
raise RuntimeError(message)

return _raise
return _orig(name)
def _patch_clear_pending_fails(monkeypatch, message="clear pending failed"):
def _raise(_resource_id):
raise RuntimeError(message)

monkeypatch.setattr(
"ckanext.cloudstorage.logic.action.multipart.toolkit.get_action",
get_action,
"ckanext.cloudstorage.logic.action.multipart._clear_cloudstorage_multipart_pending",
_raise,
)


Expand Down Expand Up @@ -78,7 +69,7 @@ def test_upload(self):
assert result["pending_flag_cleared"] is True
assert storage.get_url_from_filename(res["id"], filename)

def test_finish_multipart_pending_flag_when_resource_patch_fails(
def test_finish_multipart_pending_flag_when_clear_pending_fails(
self, monkeypatch
):
filename = "file.txt"
Expand All @@ -105,18 +96,18 @@ def test_finish_multipart_pending_flag_when_resource_patch_fails(
partNumber=2,
upload=FakeFileStorage(fp, filename),
)
_patch_resource_patch_fails(monkeypatch)
_patch_clear_pending_fails(monkeypatch)
result = helpers.call_action(
"cloudstorage_finish_multipart", uploadId=multipart["id"]
)
assert result["commited"] is True
assert result["pending_flag_cleared"] is False

def test_abort_multipart_pending_flag_when_resource_patch_fails(
def test_abort_multipart_pending_flag_when_clear_pending_fails(
self, monkeypatch
):
res = factories.Resource()
_patch_resource_patch_fails(monkeypatch)
_patch_clear_pending_fails(monkeypatch)
result = helpers.call_action("cloudstorage_abort_multipart", id=res["id"])
assert result["aborted"] == []
assert result["pending_flag_cleared"] is False
Expand Down
Loading