From a3b1c6e4f0ee4725bd1383b390ec56ea4656e58e Mon Sep 17 00:00:00 2001 From: Gerrod Ubben Date: Mon, 4 May 2026 14:41:06 -0400 Subject: [PATCH] Remove content app redirect for manifest fetching fixes: #1974 Assisted by: claude-sonnet-4.6 --- CHANGES/1974.feature | 1 + pulp_container/app/content.py | 3 +- pulp_container/app/redirects.py | 23 +- pulp_container/app/registry.py | 396 +----------------- pulp_container/app/registry_api.py | 372 +++++++++++----- .../app/tasks/download_image_data.py | 15 +- .../functional/api/test_pull_through_cache.py | 29 +- 7 files changed, 303 insertions(+), 536 deletions(-) create mode 100644 CHANGES/1974.feature diff --git a/CHANGES/1974.feature b/CHANGES/1974.feature new file mode 100644 index 000000000..789331f4b --- /dev/null +++ b/CHANGES/1974.feature @@ -0,0 +1 @@ +Manifest data is now served directly from the registry API instead of issuing a redirect to the content app. diff --git a/pulp_container/app/content.py b/pulp_container/app/content.py index 145533fc9..af0a7821f 100644 --- a/pulp_container/app/content.py +++ b/pulp_container/app/content.py @@ -12,9 +12,8 @@ app.add_routes( [ web.get( - PREFIX + r"{path:.+}/{content:(blobs|manifests)}/sha256:{digest:.+}", + PREFIX + r"{path:.+}/blobs/sha256:{digest:.+}", registry.get_by_digest, ) ] ) -app.add_routes([web.get(PREFIX + r"{path:.+}/manifests/{tag_name}", registry.get_tag)]) diff --git a/pulp_container/app/redirects.py b/pulp_container/app/redirects.py index 2ac800506..3e48d9e1b 100644 --- a/pulp_container/app/redirects.py +++ b/pulp_container/app/redirects.py @@ -4,9 +4,7 @@ from django.core.exceptions import ObjectDoesNotExist from django.shortcuts import redirect -from pulp_container.app.exceptions import ManifestNotFound -from pulp_container.app.utils import get_accepted_media_types -from pulp_container.constants import BLOB_CONTENT_TYPE, MEDIA_TYPE +from pulp_container.constants import BLOB_CONTENT_TYPE class CommonRedirects: @@ -39,25 +37,6 @@ def redirect_to_content_app(self, content_type, content_id): ) ) - def issue_manifest_redirect(self, manifest): - """ - Issue a redirect for the passed manifest. - """ - return self.redirect_to_content_app("manifests", manifest.digest) - - def issue_tag_redirect(self, tag): - """ - Issue a redirect for the passed tag. - """ - manifest_media_type = tag.tagged_manifest.media_type - if ( - manifest_media_type not in get_accepted_media_types(self.request.headers) - and manifest_media_type != MEDIA_TYPE.MANIFEST_V1 - ): - raise ManifestNotFound(reference=tag.name) - - return self.redirect_to_content_app("manifests", tag.name) - class FileStorageRedirects(CommonRedirects): """ diff --git a/pulp_container/app/registry.py b/pulp_container/app/registry.py index d78d13e93..f776ea7f6 100644 --- a/pulp_container/app/registry.py +++ b/pulp_container/app/registry.py @@ -1,35 +1,18 @@ -import json import logging import os -from contextlib import suppress -from urllib.parse import urljoin from aiohttp import web -from aiohttp.client_exceptions import ClientConnectionError, ClientResponseError -from aiohttp.web_exceptions import HTTPTooManyRequests from asgiref.sync import sync_to_async from django.core.exceptions import ObjectDoesNotExist -from django.db import IntegrityError -from django_guid import set_guid -from django_guid.utils import generate_guid from multidict import MultiDict from pulpcore.plugin.content import ArtifactResponse, Handler, PathNotResolved -from pulpcore.plugin.exceptions import TimeoutException -from pulpcore.plugin.models import Content, ContentArtifact, RemoteArtifact -from pulpcore.plugin.tasking import dispatch +from pulpcore.plugin.models import Content, ContentArtifact from pulpcore.plugin.util import get_domain from pulp_container.app.cache import RegistryContentCache -from pulp_container.app.models import Blob, BlobManifest, ContainerDistribution, Manifest, Tag -from pulp_container.app.tasks import download_image_data -from pulp_container.app.utils import ( - calculate_digest, - determine_media_type, - get_accepted_media_types, - save_artifact, -) -from pulp_container.constants import BLOB_CONTENT_TYPE, EMPTY_BLOB, MEDIA_TYPE, V2_ACCEPT_HEADERS +from pulp_container.app.models import ContainerDistribution +from pulp_container.constants import BLOB_CONTENT_TYPE, EMPTY_BLOB log = logging.getLogger(__name__) @@ -90,127 +73,6 @@ async def _dispatch(artifact, headers): else: return ArtifactResponse(artifact=artifact, headers=headers) - @RegistryContentCache( - base_key=lambda req, cac: Registry.find_base_path_cached(req, cac), - auth=lambda req, cac, bk: Registry.auth_cached(req, cac, bk), - ) - async def get_tag(self, request): - """ - Match the path and stream either Manifest or ManifestList. - - Args: - request(:class:`~aiohttp.web.Request`): The request to prepare a response for. - - Raises: - PathNotResolved: The path could not be matched to a published file. - PermissionError: When not permitted. - - Returns: - :class:`aiohttp.web.StreamResponse` or :class:`aiohttp.web.FileResponse`: The response - streamed back to the client. - - """ - - path = request.match_info["path"] - tag_name = request.match_info["tag_name"] - distribution = await sync_to_async(self._match_distribution)(path, add_trailing_slash=False) - await sync_to_async(self._permit)(request, distribution) - repository_version = await sync_to_async(distribution.get_repository_version)() - if not repository_version: - raise PathNotResolved(tag_name) - - distribution = await distribution.acast() - try: - tag = await Tag.objects.select_related("tagged_manifest").aget( - pk__in=await sync_to_async(repository_version.get_content)(), name=tag_name - ) - except ObjectDoesNotExist: - if distribution.remote_id and distribution.pull_through_distribution_id: - pull_downloader = await PullThroughDownloader.create( - distribution, repository_version, path, tag_name - ) - raw_text_manifest, digest, media_type = await pull_downloader.download_manifest( - run_pipeline=True - ) - headers = { - "Content-Type": media_type, - "Docker-Content-Digest": digest, - "Docker-Distribution-API-Version": "registry/2.0", - } - return web.Response(body=raw_text_manifest, headers=headers) - else: - raise PathNotResolved(tag_name) - - # check if the content is pulled via the pull-through caching distribution; - # if yes, update the respective manifest from the remote when its digest changed - if distribution.remote_id and distribution.pull_through_distribution_id: - remote = await distribution.remote.acast() - relative_url = "/v2/{name}/manifests/{tag}".format( - name=remote.namespaced_upstream_name, tag=tag_name - ) - tag_url = urljoin(remote.url, relative_url) - downloader = remote.get_downloader(url=tag_url) - try: - response = await downloader.run( - extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"} - ) - except (ClientResponseError, ClientConnectionError, TimeoutException): - # the manifest is not available on the remote anymore - # but the old one is still stored in the database - pass - else: - digest = response.headers.get("docker-content-digest") - if tag.tagged_manifest.digest != digest: - pull_downloader = await PullThroughDownloader.create( - distribution, repository_version, path, tag_name - ) - pull_downloader.downloader = downloader - raw_text_manifest, digest, media_type = await pull_downloader.download_manifest( - run_pipeline=True - ) - headers = { - "Content-Type": media_type, - "Docker-Content-Digest": digest, - "Docker-Distribution-API-Version": "registry/2.0", - } - return web.Response(body=raw_text_manifest, headers=headers) - - accepted_media_types = get_accepted_media_types(request.headers) - - # we do not convert OCI to docker - oci_mediatypes = [MEDIA_TYPE.MANIFEST_OCI, MEDIA_TYPE.INDEX_OCI] - if ( - tag.tagged_manifest.media_type in oci_mediatypes - and tag.tagged_manifest.media_type not in accepted_media_types - ): - log.warn( - "OCI format found, but the client only accepts {accepted_media_types}.".format( - accepted_media_types=accepted_media_types - ) - ) - raise PathNotResolved(tag_name) - - # return schema1 (even in case only oci is requested) - if tag.tagged_manifest.media_type == MEDIA_TYPE.MANIFEST_V1: - return_media_type = MEDIA_TYPE.MANIFEST_V1_SIGNED - response_headers = { - "Content-Type": return_media_type, - "Docker-Content-Digest": tag.tagged_manifest.digest, - } - return web.Response(body=tag.tagged_manifest.data, headers=response_headers) - - # return what was found in case media_type is accepted header (docker, oci) - if tag.tagged_manifest.media_type in accepted_media_types: - return_media_type = tag.tagged_manifest.media_type - response_headers = { - "Content-Type": return_media_type, - "Docker-Content-Digest": tag.tagged_manifest.digest, - } - return web.Response(body=tag.tagged_manifest.data, headers=response_headers) - - # return 404 in case the client is requesting docker manifest v2 schema 1 - raise PathNotResolved(tag_name) - @RegistryContentCache( base_key=lambda req, cac: Registry.find_base_path_cached(req, cac), auth=lambda req, cac, bk: Registry.auth_cached(req, cac, bk), @@ -231,65 +93,20 @@ async def get_by_digest(self, request): repository = await repository_version.repository.acast() pending_blobs = repository.pending_blobs.values_list("pk") - pending_manifests = repository.pending_manifests.values_list("pk") - pending_content = pending_blobs.union(pending_manifests) - content = repository_version.content | Content.objects.filter(pk__in=pending_content) - # "/pulp/container/{path:.+}/{content:(blobs|manifests)}/sha256:{digest:.+}" - content_type = request.match_info["content"] - domain = get_domain() + content = repository_version.content | Content.objects.filter(pk__in=pending_blobs) try: - if content_type == "manifests": - manifest = await Manifest.objects.prefetch_related("contentartifact_set").aget( - digest=digest, - _pulp_domain=domain, # did I remove the content__in? - ) - headers = { - "Content-Type": manifest.media_type, - "Docker-Content-Digest": manifest.digest, - } - return web.Response(body=manifest.data, headers=headers) - elif content_type == "blobs": - ca = await ContentArtifact.objects.select_related("artifact", "content").aget( - content__in=content, relative_path=digest - ) - ca_content = await sync_to_async(ca.content.cast)() - media_type = BLOB_CONTENT_TYPE - headers = { - "Content-Type": media_type, - "Docker-Content-Digest": ca_content.digest, - } + ca = await ContentArtifact.objects.select_related("artifact", "content").aget( + content__in=content, relative_path=digest + ) + ca_content = await ca.content.acast() + media_type = BLOB_CONTENT_TYPE + headers = { + "Content-Type": media_type, + "Docker-Content-Digest": ca_content.digest, + } except ObjectDoesNotExist: - distribution = await distribution.acast() - if distribution.remote_id and distribution.pull_through_distribution_id: - pull_downloader = await PullThroughDownloader.create( - distribution, repository_version, path, digest - ) - - if content_type == "manifests": - ( - raw_text_manifest, - digest, - media_type, - ) = await pull_downloader.download_manifest() - headers = { - "Content-Type": media_type, - "Docker-Content-Digest": digest, - "Docker-Distribution-API-Version": "registry/2.0", - } - return web.Response(body=raw_text_manifest, headers=headers) - elif content_type == "blobs": - # there might be a case where the client has all the manifest data in place - # and tries to download only missing blobs; because of that, only the reference - # to a remote blob is returned (i.e., RemoteArtifact) - blob = await pull_downloader.init_remote_blob() - ca = await blob.contentartifact_set.afirst() - return await self._stream_content_artifact(request, web.StreamResponse(), ca) - else: - raise RuntimeError("Only blobs or manifests are supported by the parser.") - else: - raise PathNotResolved(path) + raise PathNotResolved(path) else: - # else branch can be reached only for blob artifact = ca.artifact if artifact: return await Registry._dispatch(artifact, headers) @@ -311,188 +128,3 @@ async def _empty_blob(): "Docker-Distribution-API-Version": "registry/2.0", } return web.Response(body=body, headers=response_headers) - - -class PullThroughDownloader: - def __init__(self, distribution, remote, repository, repository_version, path, identifier): - self.distribution = distribution - self.remote = remote - self.repository = repository - self.repository_version = repository_version - self.path = path - self.identifier = identifier - self.downloader = None - - @classmethod - async def create(cls, distribution, repository_version, path, identifier): - remote = await distribution.remote.acast() - repository = await repository_version.repository.acast() - return cls(distribution, remote, repository, repository_version, path, identifier) - - async def init_remote_blob(self): - return await self.save_blob(self.identifier, None) - - async def download_manifest(self, run_pipeline=False): - response = await self.run_manifest_downloader() - - with open(response.path, mode="r") as f: - raw_text_data = f.read() - - if run_pipeline: - await self.run_pipeline(raw_text_data) - - try: - manifest_data = json.loads(raw_text_data) - except json.decoder.JSONDecodeError: - raise PathNotResolved(self.identifier) - media_type = determine_media_type(manifest_data, response) - if media_type in (MEDIA_TYPE.MANIFEST_V1_SIGNED, MEDIA_TYPE.MANIFEST_V1): - digest = calculate_digest(raw_text_data) - else: - digest = f"sha256:{response.artifact_attributes['sha256']}" - - if media_type not in (MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI): - # add the manifest and blobs to the repository to be able to stream it - # in the next round when a client approaches the registry - await self.init_pending_content(digest, manifest_data, media_type, raw_text_data) - - return raw_text_data, digest, media_type - - async def run_manifest_downloader(self): - if self.downloader is None: - relative_url = "/v2/{name}/manifests/{identifier}".format( - name=self.remote.namespaced_upstream_name, identifier=self.identifier - ) - url = urljoin(self.remote.url, relative_url) - self.downloader = self.remote.get_downloader(url=url) - - try: - response = await self.downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS}) - except ClientResponseError as response_error: - if response_error.status == 429: - # the client could request the manifest outside the docker hub pull limit; - # it is necessary to pass this information back to the client - raise HTTPTooManyRequests() - else: - # TODO: do not mask out relevant errors, like HTTP 502 - raise PathNotResolved(self.path) - - return response - - async def run_pipeline(self, raw_text_manifest_data): - set_guid(generate_guid()) - await sync_to_async(dispatch)( - download_image_data, - exclusive_resources=[self.repository_version.repository], - kwargs={ - "repository_pk": self.repository_version.repository.pk, - "remote_pk": self.remote.pk, - "raw_text_manifest_data": raw_text_manifest_data, - "tag_name": self.identifier, - }, - ) - - async def init_pending_content(self, digest, manifest_data, media_type, raw_text_data): - domain = get_domain() - if config := manifest_data.get("config", None): - config_digest = config["digest"] - config_blob = await self.save_config_blob(config_digest) - await sync_to_async(self.repository.pending_blobs.add)(config_blob) - else: - config_blob = None - - manifest = Manifest( - digest=digest, - schema_version=( - 2 if media_type in (MEDIA_TYPE.MANIFEST_V2, MEDIA_TYPE.MANIFEST_OCI) else 1 - ), - media_type=media_type, - config_blob=config_blob, - data=raw_text_data, - _pulp_domain=domain, # For clarity - ) - await sync_to_async(manifest.init_architecture_and_os)() - - # skip if media_type of schema1 - if media_type in (MEDIA_TYPE.MANIFEST_V2, MEDIA_TYPE.MANIFEST_OCI): - await sync_to_async(manifest.init_metadata)(manifest_data=manifest_data) - await sync_to_async(manifest.init_compressed_image_size)() - - try: - await manifest.asave() - except IntegrityError: - manifest = await Manifest.objects.aget(digest=manifest.digest, _pulp_domain=domain) - await sync_to_async(manifest.touch)() - await sync_to_async(self.repository.pending_manifests.add)(manifest) - - for layer in manifest_data.get("layers") or manifest_data.get("fsLayers"): - layer_digest = layer.get("digest") or layer.get("blobSum") - blob = await self.save_blob(layer_digest, manifest) - await sync_to_async(self.repository.pending_blobs.add)(blob) - - async def save_blob(self, digest, manifest): - domain = get_domain() - blob = Blob(digest=digest, _pulp_domain=domain) - try: - await blob.asave() - except IntegrityError: - blob = await Blob.objects.aget(digest=digest, _pulp_domain=domain) - await sync_to_async(blob.touch)() - - bm_rel = BlobManifest(manifest=manifest, manifest_blob=blob) - with suppress(IntegrityError): - await bm_rel.asave() - - ca = ContentArtifact( - content=blob, - artifact=None, - relative_path=digest, - ) - with suppress(IntegrityError): - await ca.asave() - - relative_url = "/v2/{name}/blobs/{digest}".format( - name=self.remote.namespaced_upstream_name, digest=digest - ) - blob_url = urljoin(self.remote.url, relative_url) - ra = RemoteArtifact( - url=blob_url, - sha256=digest[len("sha256:") :], - content_artifact=ca, - remote=self.remote, - pulp_domain=domain, - ) - with suppress(IntegrityError): - await ra.asave() - - return blob - - async def save_config_blob(self, config_digest): - domain = get_domain() - blob_relative_url = "/v2/{name}/blobs/{digest}".format( - name=self.remote.namespaced_upstream_name, digest=config_digest - ) - blob_url = urljoin(self.remote.url, blob_relative_url) - downloader = self.remote.get_downloader(url=blob_url) - response = await downloader.run() - - response.artifact_attributes["file"] = response.path - response.artifact_attributes["pulp_domain"] = domain - saved_artifact = await save_artifact(response.artifact_attributes) - - config_blob = Blob(digest=config_digest, _pulp_domain=domain) - try: - await config_blob.asave() - except IntegrityError: - config_blob = await Blob.objects.aget(digest=config_digest, _pulp_domain=domain) - await sync_to_async(config_blob.touch)() - - content_artifact = ContentArtifact( - content=config_blob, - artifact=saved_artifact, - relative_path=config_digest, - ) - with suppress(IntegrityError): - await content_artifact.asave() - - return config_blob diff --git a/pulp_container/app/registry_api.py b/pulp_container/app/registry_api.py index ed9ece403..221a389fa 100644 --- a/pulp_container/app/registry_api.py +++ b/pulp_container/app/registry_api.py @@ -24,6 +24,7 @@ from django.forms.models import model_to_dict from django.shortcuts import get_object_or_404 from rest_framework.exceptions import ( + APIException, AuthenticationFailed, NotAuthenticated, ParseError, @@ -44,7 +45,7 @@ from pulpcore.plugin import pulp_hashlib from pulpcore.plugin.exceptions import TimeoutException from pulpcore.plugin.files import PulpTemporaryUploadedFile -from pulpcore.plugin.models import Artifact, ContentArtifact, UploadChunk +from pulpcore.plugin.models import Artifact, ContentArtifact, RemoteArtifact, UploadChunk from pulpcore.plugin.tasking import dispatch from pulpcore.plugin.util import get_domain, get_objects_for_user, get_url @@ -72,7 +73,7 @@ FileStorageRedirects, S3StorageRedirects, ) -from pulp_container.app.tasks import aadd_and_remove +from pulp_container.app.tasks import aadd_and_remove, download_image_data from pulp_container.app.token_verification import ( RegistryAuthentication, RegistryPermission, @@ -80,9 +81,11 @@ TokenPermission, ) from pulp_container.app.utils import ( + calculate_digest, determine_media_type, extract_data_from_signature, filter_resource, + get_accepted_media_types, get_full_path, has_task_completed, validate_manifest, @@ -90,6 +93,7 @@ from pulp_container.constants import ( EMPTY_BLOB, MANIFEST_TYPE, + MEDIA_TYPE, MEGABYTE, SIGNATURE_API_EXTENSION_VERSION, SIGNATURE_HEADER, @@ -126,6 +130,48 @@ def render(self, data, accepted_media_type=None, renderer_context=None): return data +class ManifestResponse(Response): + """ + An HTTP response class for returning Manifets. + """ + + def __init__(self, data, media_type, digest): + """ + Args: + data (bytes): The manifest data. + media_type (str): The manifest media type. + digest (str): The manifest digest. + """ + headers = { + "Docker-Content-Digest": digest, + } + super().__init__(data=data, content_type=media_type, headers=headers) + + @classmethod + def from_manifest(cls, manifest): + """Create a ManifestResponse from a Manifest model.""" + return cls(manifest.data, manifest.media_type, manifest.digest) + + @classmethod + def from_tag(cls, tag, request=None): + """Create a ManifestResponse from a Tag model.""" + manifest = tag.tagged_manifest + manifest_media_type = manifest.media_type + if request: + accepted_media_types = get_accepted_media_types(request.headers) + if ( + manifest_media_type not in accepted_media_types + and manifest_media_type != MEDIA_TYPE.MANIFEST_V1 + ): + raise ManifestNotFound(reference=tag.name) + + # Schema v1 manifests are always returned with the signed content type + if manifest_media_type == MEDIA_TYPE.MANIFEST_V1: + manifest_media_type = MEDIA_TYPE.MANIFEST_V1_SIGNED + + return cls(manifest.data, manifest_media_type, manifest.digest) + + class UploadResponse(Response): """ An HTTP response class for requests for Uploads. @@ -155,27 +201,24 @@ def __init__(self, upload, path, request, status=202): super().__init__(headers=headers, status=status) -class ManifestResponse(Response): +class ManifestUploadResponse(Response): """ - An HTTP response class for returning Manifets. + An HTTP response class for returning Manifests upon successful upload. """ - def __init__(self, manifest, path, request, status=200): + def __init__(self, manifest, path): """ Args: manifest (pulp_container.app.models.Manifest): A Manifest model used to generate the response. path (str): The base_path of the ContainerDistribution (Container repository name) - request (rest_framework.request.Request): Request object not used by this - implementation of Response. - status (int): Status code to send with the response. """ headers = { "Docker-Content-Digest": manifest.digest, "Location": f"/v2/{get_full_path(path)}/manifests/{manifest.digest}", "Content-Length": 0, } - super().__init__(headers=headers, status=status, content_type=manifest.media_type) + super().__init__(headers=headers, status=201, content_type=manifest.media_type) class ManifestSignatureResponse(Response): @@ -306,7 +349,7 @@ def get_drv_pull(self, path): repository = repository_version.repository else: raise RepositoryNotFound(name=path) - return distribution, repository, repository_version + return distribution, repository.cast(), repository_version def get_pull_through_drv(self, path): domain = get_domain() @@ -1069,21 +1112,70 @@ def handle_safe_method(self, request, path, pk): """Handles safe requests for Blobs.""" distribution, repository, repository_version = self.get_drv_pull(path) redirects = self.redirects_class(distribution, path, request) + permission_checker = PermissionChecker(request.user) - try: - blob = models.Blob.objects.get(digest=pk, pk__in=repository_version.content) - except models.Blob.DoesNotExist: - if pk == EMPTY_BLOB: - return redirects.redirect_to_content_app("blobs", pk) - repository = repository.cast() - try: - blob = repository.pending_blobs.get(digest=pk) - blob.touch() - except models.Blob.DoesNotExist: - raise BlobNotFound(digest=pk) + if pk == EMPTY_BLOB: + return redirects.redirect_to_content_app("blobs", pk) + blob = models.Blob.objects.filter(digest=pk, pk__in=repository_version.content).first() + if not blob: + blob = repository.pending_blobs.filter(digest=pk).first() + if not blob: + if ( + distribution.remote + and distribution.pull_through_distribution + and permission_checker.has_pull_through_permissions(distribution) + ): + remote = distribution.remote.cast() + blob_url = self.fetch_blob(remote, pk) + blob = self.create_on_demand_blob(pk, remote, blob_url) + repository.pending_blobs.add(blob) + else: + raise BlobNotFound(digest=pk) + blob.touch() return redirects.issue_blob_redirect(blob) + def create_on_demand_blob(self, digest, remote, blob_url): + domain = get_domain() + blob, _ = models.Blob.objects.get_or_create(digest=digest, _pulp_domain=domain) + ca, _ = ContentArtifact.objects.get_or_create( + content=blob, + relative_path=blob.digest, + defaults={"artifact": None}, + ) + RemoteArtifact.objects.get_or_create( + content_artifact=ca, + remote=remote, + pulp_domain=domain, + defaults={"url": blob_url, "sha256": digest[len("sha256:") :]}, + ) + return blob + + def fetch_blob(self, remote, pk): + relative_url = "/v2/{name}/blobs/{pk}".format(name=remote.namespaced_upstream_name, pk=pk) + blob_url = urljoin(remote.url, relative_url) + downloader = remote.get_downloader(url=blob_url) + try: + response = downloader.fetch( + extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"} + ) + except ClientResponseError as response_error: + if response_error.status == 429: + # the client could request the blob outside the docker hub pull limit; + # it is necessary to pass this information back to the client + raise Throttled() + elif response_error.status == 404: + raise BlobNotFound(digest=pk) + else: + raise BadGateway(detail=response_error.message) + except (ClientConnectionError, TimeoutException): + # The remote server is not available at the moment + raise GatewayTimeout() + else: + if response.headers.get("docker-content-digest") != pk: + raise BlobNotFound(digest=pk) + return blob_url + class Manifests(RedirectsMixin, ContainerRegistryApiMixin, ViewSet): """ @@ -1112,39 +1204,80 @@ def handle_safe_method(self, request, path, pk): Responds to safe requests about manifests by reference """ distribution, repository, repository_version = self.get_drv_pull(path) - redirects = self.redirects_class(distribution, path, request) domain = get_domain() + permission_checker = PermissionChecker(request.user) if pk[:7] != "sha256:": - try: - tag = models.Tag.objects.get(name=pk, pk__in=repository_version.content) - except models.Tag.DoesNotExist: - distribution = distribution.cast() - permission_checker = PermissionChecker(request.user) - if ( - distribution.remote - and distribution.pull_through_distribution - and permission_checker.has_pull_through_permissions(distribution) - ): - remote = distribution.remote.cast() - # issue a head request first to ensure that the content exists on the remote - # source; we want to prevent immediate "not found" error responses from - # content-app: 302 (api-app) -> 404 (content-app) - manifest = self.fetch_manifest(remote, pk) - if manifest is None: - return redirects.redirect_to_content_app("manifests", pk) - - tag = models.Tag(name=pk, tagged_manifest=manifest, _pulp_domain=domain) - try: - tag.save() - except IntegrityError: - tag = models.Tag.objects.get( - name=tag.name, tagged_manifest=manifest, _pulp_domain=domain + tag = models.Tag.objects.filter(name=pk, pk__in=repository_version.content).first() + if ( + distribution.remote + and distribution.pull_through_distribution + and permission_checker.has_pull_through_permissions(distribution) + ): + # check if the tag is on the upstream remote and if it has been updated + remote = distribution.remote.cast() + try: + local_manifest, response = self.fetch_manifest(remote, pk) + except APIException as e: + if not tag: + raise e + else: + if local_manifest is None: + fake_manifest = self.fake_init_manifest(response, pk) + tag = models.Tag( + name=pk, tagged_manifest=fake_manifest, _pulp_domain=domain ) - tag.touch() - - add_content_units = self.get_content_units_to_add(manifest, tag) - + dispatch( + download_image_data, + exclusive_resources=[repository], + kwargs={ + "repository_pk": repository.pk, + "remote_pk": remote.pk, + "raw_text_manifest_data": fake_manifest.data, + "tag_name": pk, + }, + ) + else: + if tag is None or tag.tagged_manifest.digest != local_manifest.digest: + tag, created = models.Tag.objects.get_or_create( + name=pk, tagged_manifest=local_manifest, _pulp_domain=domain + ) + if not created: + tag.touch() + + add_content_units = self.get_content_units_to_add(local_manifest, tag) + dispatch( + aadd_and_remove, + exclusive_resources=[repository], + kwargs={ + "repository_pk": str(repository.pk), + "add_content_units": add_content_units, + "remove_content_units": [], + }, + immediate=True, + deferred=True, + ) + if tag: + return ManifestResponse.from_tag(tag, request) + else: + manifest = models.Manifest.objects.filter( + digest=pk, pk__in=repository_version.content + ).first() + if not manifest and ( + manifest := repository.pending_manifests.filter(digest=pk).first() + ): + manifest.touch() + if ( + not manifest + and distribution.remote + and distribution.pull_through_distribution + and permission_checker.has_pull_through_permissions(distribution) + ): + remote = distribution.remote.cast() + # This raises a ManifestNotFound if digest is not on the upstream + manifest, response = self.fetch_manifest(remote, pk) + if manifest: + add_content_units = self.get_content_units_to_add(manifest) dispatch( aadd_and_remove, exclusive_resources=[repository], @@ -1156,44 +1289,26 @@ def handle_safe_method(self, request, path, pk): immediate=True, deferred=True, ) - - return redirects.redirect_to_content_app("manifests", tag.name) else: - raise ManifestNotFound(reference=pk) - - return redirects.issue_tag_redirect(tag) - else: - try: - manifest = models.Manifest.objects.get(digest=pk, pk__in=repository_version.content) - return redirects.issue_manifest_redirect(manifest) - except models.Manifest.DoesNotExist: - repository = repository.cast() - # the manifest might be a part of listed manifests currently being uploaded - # or saved during the pull-through caching - try: - manifest = repository.pending_manifests.get(digest=pk) - manifest.touch() - except models.Manifest.DoesNotExist: - manifest = None - - distribution = distribution.cast() - permission_checker = PermissionChecker(request.user) - if ( - distribution.remote - and distribution.pull_through_distribution - and permission_checker.has_pull_through_permissions(distribution) - ): - remote = distribution.remote.cast() - # Head request to check if the manifest exists on the remote - self.fetch_manifest(remote, pk) - return redirects.redirect_to_content_app("manifests", pk) - elif manifest: - return redirects.issue_manifest_redirect(manifest) - else: - raise ManifestNotFound(reference=pk) - - def get_content_units_to_add(self, manifest, tag): - add_content_units = [str(tag.pk), str(manifest.pk)] + manifest = self.fake_init_manifest(response, pk) + dispatch( + download_image_data, + exclusive_resources=[repository], + kwargs={ + "repository_pk": repository.pk, + "remote_pk": remote.pk, + "raw_text_manifest_data": manifest.data, + }, + ) + if manifest: + return ManifestResponse.from_manifest(manifest) + # Fallthrough catchall, no manifest or tag found + raise ManifestNotFound(reference=pk) + + def get_content_units_to_add(self, manifest, tag=None): + add_content_units = [str(manifest.pk)] + if tag: + add_content_units.append(str(tag.pk)) if manifest.media_type in ( models.MEDIA_TYPE.MANIFEST_LIST, models.MEDIA_TYPE.INDEX_OCI, @@ -1212,6 +1327,24 @@ def get_content_units_to_add(self, manifest, tag): add_content_units.extend(manifest.blobs.values_list("pk", flat=True)) return add_content_units + def fake_init_manifest(self, response, pk): + with open(response.path, mode="r") as f: + raw_text_data = f.read() + + try: + manifest_data = json.loads(raw_text_data) + except json.decoder.JSONDecodeError: + raise ManifestInvalid(digest=pk) + media_type = determine_media_type(manifest_data, response) + if media_type in (MEDIA_TYPE.MANIFEST_V1_SIGNED, MEDIA_TYPE.MANIFEST_V1): + digest = calculate_digest(raw_text_data) + else: + digest = f"sha256:{response.artifact_attributes['sha256']}" + + return models.Manifest( + digest=digest, media_type=media_type, data=raw_text_data, _pulp_domain=get_domain() + ) + def fetch_manifest(self, remote, pk): relative_url = "/v2/{name}/manifests/{pk}".format( name=remote.namespaced_upstream_name, pk=pk @@ -1219,9 +1352,7 @@ def fetch_manifest(self, remote, pk): tag_url = urljoin(remote.url, relative_url) downloader = remote.get_downloader(url=tag_url) try: - response = downloader.fetch( - extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"} - ) + response = downloader.fetch(extra_data={"headers": V2_ACCEPT_HEADERS}) except ClientResponseError as response_error: if response_error.status == 429: # the client could request the manifest outside the docker hub pull limit; @@ -1236,7 +1367,9 @@ def fetch_manifest(self, remote, pk): raise GatewayTimeout() else: digest = response.headers.get("docker-content-digest") - return models.Manifest.objects.filter(digest=digest, pulp_domain=get_domain()).first() + return models.Manifest.objects.filter( + digest=digest, pulp_domain=get_domain() + ).first(), response def put(self, request, path, pk=None): """ @@ -1414,18 +1547,18 @@ def put(self, request, path, pk=None): ) if immediate_task.state == "completed": - return ManifestResponse(manifest, path, request, status=201) + return ManifestUploadResponse(manifest, path) elif immediate_task.state == "canceled": raise Throttled() elif immediate_task.state in ["waiting", "running"]: if has_task_completed(immediate_task, wait_in_seconds=2): - return ManifestResponse(manifest, path, request, status=201) + return ManifestUploadResponse(manifest, path) else: raise Exception(str(immediate_task.error)) else: # the client pushed a listed manifest repository.pending_manifests.add(manifest) - return ManifestResponse(manifest, path, request, status=201) + return ManifestUploadResponse(manifest, path) def _init_manifest(self, manifest_digest, media_type, raw_text_data, config_blob=None): return models.Manifest( @@ -1488,24 +1621,43 @@ def head(self, request, path, pk=None): def get(self, request, path, pk): """Return a signature identified by its sha256 checksum.""" - _, _, repository_version = self.get_drv_pull(path) - domain = get_domain() - try: - manifest = models.Manifest.objects.get(digest=pk, pk__in=repository_version.content) - except models.Manifest.DoesNotExist: - try: - # the manifest was initialized as a pending content unit - # or has not been assigned to any repository yet - manifest = models.Manifest.objects.get(digest=pk, _pulp_domain=domain) - manifest.touch() - except models.Manifest.DoesNotExist: - raise ManifestNotFound(reference=pk) - - signatures = models.ManifestSignature.objects.filter( - signed_manifest=manifest, pk__in=repository_version.content + distro, repo, repo_ver = self.get_drv_pull(path) + manifest = ( + models.Manifest.objects.filter(digest=pk, pk__in=repo_ver.content).first() + or repo.pending_manifests.filter(digest=pk).first() ) - return Response(self.get_response_data(signatures)) + if manifest: + signatures = models.ManifestSignature.objects.filter( + signed_manifest=manifest, pk__in=repo_ver.content + ) + + return Response(self.get_response_data(signatures)) + elif distro.remote and distro.pull_through_distribution: + remote = distro.remote.cast() + ping_url = urljoin(remote.url, "/v2/") + ping_downloader = remote.get_downloader(url=ping_url) + response_headers = {} + try: + ping_response = ping_downloader.fetch( + extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "get"} + ) + response_headers = ping_response.headers + except ClientResponseError as e: + if e.status >= 500: + return Response(self.get_response_data([])) + response_headers = dict(e.headers) + except (ClientConnectionError, TimeoutException): + return Response(self.get_response_data([])) + + if response_headers.get(SIGNATURE_HEADER) != "1": + return Response(self.get_response_data([])) + + sig_rel_url = f"/extensions/v2/{remote.namespaced_upstream_name}/signatures/{pk}" + sig_url = urljoin(remote.url, sig_rel_url) + return Response(status=302, headers={"Location": sig_url}) + + raise ManifestNotFound(reference=pk) @staticmethod def get_response_data(signatures): diff --git a/pulp_container/app/tasks/download_image_data.py b/pulp_container/app/tasks/download_image_data.py index a90296440..3eb712cb8 100644 --- a/pulp_container/app/tasks/download_image_data.py +++ b/pulp_container/app/tasks/download_image_data.py @@ -20,7 +20,7 @@ async def aadd_and_remove(*args, **kwargs): return await sync_to_async(add_and_remove)(*args, **kwargs) -def download_image_data(repository_pk, remote_pk, raw_text_manifest_data, tag_name): +def download_image_data(repository_pk, remote_pk, raw_text_manifest_data, tag_name=None): repository = ContainerRepository.objects.get(pk=repository_pk) remote = ContainerRemote.objects.get(pk=remote_pk) log.info("Pulling cache: repository={r} remote={p}".format(r=repository.name, p=remote.name)) @@ -32,7 +32,7 @@ def download_image_data(repository_pk, remote_pk, raw_text_manifest_data, tag_na class ContainerPullThroughFirstStage(ContainerFirstStage): """The stage that prepares the pipeline for downloading a single tag and its related data.""" - def __init__(self, remote, raw_text_manifest_data, tag_name): + def __init__(self, remote, raw_text_manifest_data, tag_name=None): """Initialize the stage with the artifact defined in content-app.""" super().__init__(remote, signed_only=False) self.tag_name = tag_name @@ -44,8 +44,9 @@ async def run(self): This method is a tinified method based on ``ContainerFirstStage.run`` with syncing just a single tag. """ - tag_dc = DeclarativeContent(Tag(name=self.tag_name)) - self.tag_dcs.append(tag_dc) + if self.tag_name: + tag_dc = DeclarativeContent(Tag(name=self.tag_name)) + self.tag_dcs.append(tag_dc) content_data = json.loads(self.raw_text_manifest_data) @@ -58,7 +59,8 @@ async def run(self): listed_manifest = await self.create_listed_manifest(manifest_data) list_dc.extra_data["listed_manifests"].append(listed_manifest) else: - tag_dc.extra_data["tagged_manifest_dc"] = list_dc + if self.tag_name: + tag_dc.extra_data["tagged_manifest_dc"] = list_dc for listed_manifest in list_dc.extra_data["listed_manifests"]: await self.handle_blobs( listed_manifest["manifest_dc"], listed_manifest["content_data"] @@ -68,7 +70,8 @@ async def run(self): else: # Simple tagged manifest man_dc = self.create_manifest(content_data, self.raw_text_manifest_data, media_type) - tag_dc.extra_data["tagged_manifest_dc"] = man_dc + if self.tag_name: + tag_dc.extra_data["tagged_manifest_dc"] = man_dc await self.handle_blobs(man_dc, content_data) self.manifest_dcs.append(man_dc) diff --git a/pulp_container/tests/functional/api/test_pull_through_cache.py b/pulp_container/tests/functional/api/test_pull_through_cache.py index 9e14cf37d..d332e8a90 100644 --- a/pulp_container/tests/functional/api/test_pull_through_cache.py +++ b/pulp_container/tests/functional/api/test_pull_through_cache.py @@ -1,5 +1,4 @@ import subprocess -import time from subprocess import CalledProcessError from uuid import uuid4 @@ -35,6 +34,8 @@ def _add_pull_through_entities_to_cleanup(path): distribution = container_distribution_api.list(name=path).results[0] add_to_cleanup(container_distribution_api, distribution.pulp_href) + return repository, remote, distribution + return _add_pull_through_entities_to_cleanup @@ -52,6 +53,8 @@ def pull_and_verify( registry_client, local_registry, full_path, + pulpcore_bindings, + monitor_task, ): def _pull_and_verify(images, pull_through_distribution): tags_to_verify = [] @@ -68,6 +71,15 @@ def _pull_and_verify(images, pull_through_distribution): local_registry.pull(local_image_pull_path) local_image = local_registry.inspect(local_image_pull_path) + # when the client pulls the image, a repository, distribution, and remote is created in + # the background; therefore, scheduling the cleanup for these entities is necessary + repository, _, _ = add_pull_through_entities_to_cleanup(local_image_pull_path) + + # Wait for background tasks to complete (creating and adding cached content to the repo) + tasks = pulpcore_bindings.TasksApi.list(reserved_resources=repository.prn) + for task in tasks.results: + monitor_task(task.pulp_href) + # 1.1. check pulp manifest model fields assert check_manifest_fields( manifest_filters={"digest": local_image[0]["Digest"]}, @@ -77,10 +89,6 @@ def _pull_and_verify(images, pull_through_distribution): path, tag = local_image_path.split(":") tags_to_verify.append(tag) - # when the client pulls the image, a repository, distribution, and remote is created in - # the background; therefore, scheduling the cleanup for these entities is necessary - add_pull_through_entities_to_cleanup(local_image_pull_path) - pull_through_distribution = container_pull_through_distribution_api.list( name=pull_through_distribution.name ).results[0] @@ -97,15 +105,8 @@ def _pull_and_verify(images, pull_through_distribution): check_manifest_arch_os_size(manifest) # 3. check if the repository version has changed - for _ in range(5): - repository = container_repository_api.list(name=path).results[0] - if f"{repository.pulp_href}versions/{version}/" == repository.latest_version_href: - break - - # there might be still the saving process running in the background - time.sleep(1) - else: - assert False, "The repository was not updated with the cached content." + repository = container_repository_api.read(repository.pulp_href) + assert f"{repository.pulp_href}versions/{version}/" == repository.latest_version_href # 4. test if pulling the same content twice does not raise any error local_registry.pull(local_image_pull_path)