diff --git a/Cargo.toml b/Cargo.toml index d33e6c68..ecbf2a09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,8 @@ composefs-ioctls = { version = "0.3.0", path = "crates/composefs-ioctls", defaul composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false } composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false } composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false } +cap-std-ext = "5.0" +ocidir = "0.7.2" # JSON-RPC with FD passing for userns helper jsonrpc-fdpass = { version = "0.1.0", default-features = false } diff --git a/crates/composefs-oci/Cargo.toml b/crates/composefs-oci/Cargo.toml index 2273e39b..f20141f0 100644 --- a/crates/composefs-oci/Cargo.toml +++ b/crates/composefs-oci/Cargo.toml @@ -24,7 +24,7 @@ base64 = { version = "0.22", default-features = false, features = ["std"], optio bytes = { version = "1", default-features = false } composefs = { workspace = true } composefs-boot = { workspace = true, optional = true } -containers-image-proxy = { version = "0.9.2", default-features = false } +containers-image-proxy = { version = "0.10", default-features = false } cstorage = { path = "../cstorage", version = "0.3.0", optional = true } hex = { version = "0.4.0", default-features = false } indicatif = { version = "0.18.0", default-features = false, features = ["tokio"] } @@ -39,6 +39,8 @@ tar-core = "0.1.0" tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread"] } tokio-util = { version = "0.7", default-features = false, features = ["io"] } tracing = { version = "0.1", default-features = false } +cap-std-ext = { workspace = true } +ocidir = { workspace = true } [dev-dependencies] cap-std = { version = "4.0.0", default-features = false } diff --git a/crates/composefs-oci/src/boot.rs b/crates/composefs-oci/src/boot.rs index 6f8bc660..726ded80 100644 --- a/crates/composefs-oci/src/boot.rs +++ b/crates/composefs-oci/src/boot.rs @@ -65,7 +65,11 @@ pub fn remove_boot_image( )?; let manifest_json = img.read_manifest_json(repo)?; - let layer_verities = img.layer_refs().clone(); + let layer_verities: Vec<_> = img + .layer_refs() + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); crate::oci_image::rewrite_manifest( repo, diff --git a/crates/composefs-oci/src/layer.rs b/crates/composefs-oci/src/layer.rs new file mode 100644 index 00000000..4b09f788 --- /dev/null +++ b/crates/composefs-oci/src/layer.rs @@ -0,0 +1,99 @@ +//! Shared layer import logic for OCI container images. +//! +//! This module provides common functionality for importing OCI image layers +//! into a composefs repository, shared between the skopeo proxy path and +//! direct OCI layout import. + +use std::sync::Arc; + +use anyhow::{Result, bail}; +use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; +use containers_image_proxy::oci_spec::image::MediaType; +use tokio::io::{AsyncRead, AsyncWriteExt, BufReader}; + +use composefs::fsverity::FsVerityHashValue; +use composefs::repository::{ObjectStoreMethod, Repository}; +use composefs::shared_internals::IO_BUF_CAPACITY; + +use crate::skopeo::TAR_LAYER_CONTENT_TYPE; +use crate::tar::split_async; + +/// Check if a media type represents a tar-based layer. +pub fn is_tar_media_type(media_type: &MediaType) -> bool { + matches!( + media_type, + MediaType::ImageLayer + | MediaType::ImageLayerGzip + | MediaType::ImageLayerZstd + | MediaType::ImageLayerNonDistributable + | MediaType::ImageLayerNonDistributableGzip + | MediaType::ImageLayerNonDistributableZstd + ) +} + +/// Wrap an async reader with the appropriate decompressor for the media type. +/// +/// Returns a boxed reader that decompresses the stream if needed. +/// The output is `AsyncRead` (not `AsyncBufRead`) because `split_async` +/// does its own buffering via `BytesMut`. +pub fn decompress_async<'a, R>( + reader: R, + media_type: &MediaType, +) -> Result> +where + R: AsyncRead + Unpin + Send + 'a, +{ + let buf = BufReader::new(reader); + let reader: Box = match media_type { + MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => { + Box::new(BufReader::with_capacity(IO_BUF_CAPACITY, buf)) + } + MediaType::ImageLayerGzip | MediaType::ImageLayerNonDistributableGzip => Box::new( + BufReader::with_capacity(IO_BUF_CAPACITY, GzipDecoder::new(buf)), + ), + MediaType::ImageLayerZstd | MediaType::ImageLayerNonDistributableZstd => Box::new( + BufReader::with_capacity(IO_BUF_CAPACITY, ZstdDecoder::new(buf)), + ), + _ => bail!("Unsupported layer media type for decompression: {media_type}"), + }; + Ok(reader) +} + +/// Import a tar layer from an async reader into the repository. +/// +/// The reader should already be decompressed (use `decompress_async` first). +/// Returns the fs-verity object ID and import stats of the imported splitstream. +pub async fn import_tar_async( + repo: Arc>, + reader: R, +) -> Result<(ObjectID, crate::ImportStats)> +where + ObjectID: FsVerityHashValue, + R: AsyncRead + Unpin + Send, +{ + split_async(reader, repo, TAR_LAYER_CONTENT_TYPE).await +} + +/// Store raw bytes from an async reader as a repository object. +/// +/// Streams the raw bytes into a repository object without creating a splitstream. +/// Use this for non-tar blobs (OCI artifacts) where the caller will create +/// the splitstream wrapper. +/// +/// Returns (object_id, size, store_method) of the stored object. +pub async fn store_blob_async( + repo: &Repository, + mut reader: R, +) -> Result<(ObjectID, u64, ObjectStoreMethod)> +where + ObjectID: FsVerityHashValue, + R: AsyncRead + Unpin, +{ + let tmpfile = repo.create_object_tmpfile()?; + let mut writer = tokio::fs::File::from(std::fs::File::from(tmpfile)); + let size = tokio::io::copy(&mut reader, &mut writer).await?; + writer.flush().await?; + let tmpfile = writer.into_std().await; + let (object_id, method) = repo.finalize_object_tmpfile(tmpfile, size)?; + Ok((object_id, size, method)) +} diff --git a/crates/composefs-oci/src/lib.rs b/crates/composefs-oci/src/lib.rs index fdeaf68a..f91cfedd 100644 --- a/crates/composefs-oci/src/lib.rs +++ b/crates/composefs-oci/src/lib.rs @@ -16,7 +16,9 @@ pub mod boot; #[cfg(feature = "containers-storage")] pub mod cstor; pub mod image; +pub mod layer; pub mod oci_image; +pub mod oci_layout; pub mod skopeo; pub mod tar; @@ -29,6 +31,7 @@ pub mod test_util; // Re-export the composefs crate for consumers who only need composefs-oci pub use composefs; +use std::io::Read; use std::{collections::HashMap, sync::Arc}; use anyhow::{Context, Result, ensure}; @@ -39,6 +42,7 @@ pub use containers_image_proxy::oci_spec::image::Digest as OciDigest; use containers_image_proxy::ImageProxyConfig; use containers_image_proxy::oci_spec::image::ImageConfiguration; +use containers_image_proxy::oci_spec::image::{Descriptor, MediaType}; use sha2::{Digest, Sha256}; use composefs::{ @@ -418,6 +422,36 @@ fn hash_sha256(bytes: &[u8]) -> OciDigest { sha256_content_digest(bytes) } +/// Extract ordered diff_ids from a config descriptor. +/// +/// For standard container images (ImageConfig media type), parses the +/// config JSON and returns `rootfs.diff_ids`. For artifacts with +/// non-standard config types, falls back to using manifest layer +/// digests as identifiers. +/// Note: oci-spec models diff_ids as `Vec` but they are actually +/// OCI content digests. We parse them here so the rest of the codebase +/// can work with the strongly-typed `Digest`. +pub(crate) fn extract_diff_ids( + media_type: &MediaType, + config_reader: impl Read, + manifest_layers: &[Descriptor], +) -> Result> { + if *media_type == MediaType::ImageConfig { + let config = ImageConfiguration::from_reader(config_reader)?; + config + .rootfs() + .diff_ids() + .iter() + .map(|s| s.parse().context("parsing diff_id from image config")) + .collect() + } else { + Ok(manifest_layers + .iter() + .map(|d: &Descriptor| d.digest().clone()) + .collect()) + } +} + /// Opens and parses a container configuration. /// /// Reads the OCI image configuration from the repository and returns an [`OpenConfig`] @@ -613,8 +647,17 @@ pub fn write_config_raw( ) -> Result> { let config_digest = hash_sha256(config_json); let mut stream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE)?; - for (name, value) in &refs { - stream.add_named_stream_ref(name, value) + // Add refs in config-defined diff_id order for deterministic output. + // Parse the config to get the canonical ordering of diff_ids. + let config = ImageConfiguration::from_reader(config_json)?; + for diff_id_str in config.rootfs().diff_ids() { + let value = refs.get(diff_id_str.as_str()).with_context(|| { + let keys: Vec<_> = refs.keys().collect(); + format!( + "missing layer verity for diff_id {diff_id_str}. Available keys in refs: {keys:?}" + ) + })?; + stream.add_named_stream_ref(diff_id_str, value); } if let Some(image_id) = image { stream.add_named_stream_ref(IMAGE_REF_KEY, image_id); @@ -683,7 +726,11 @@ fn ensure_oci_composefs_erofs( // Rewrite manifest with updated config verity, preserving layer verities. // The layer_refs from OciImage are the same as the manifest's layer refs // (both ultimately come from the config's diff_id → verity map). - let layer_verities = img.layer_refs().clone(); + let layer_verities: Vec<_> = img + .layer_refs() + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); let (_new_manifest_digest, _new_manifest_verity) = oci_image::rewrite_manifest( repo, @@ -735,7 +782,11 @@ fn ensure_oci_composefs_erofs_boot( // Read original manifest JSON for rewriting let manifest_json = img.read_manifest_json(repo)?; - let layer_verities = img.layer_refs().clone(); + let layer_verities: Vec<_> = img + .layer_refs() + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); let (_new_manifest_digest, _new_manifest_verity) = oci_image::rewrite_manifest( repo, @@ -846,6 +897,58 @@ mod test { "); } + #[tokio::test] + async fn test_layer_import_stats() { + let layer = example_layer(); + let layer_id = hash_sha256(&layer); + + let (_repo_dir, repo) = create_test_repo(); + let (_id, stats) = import_layer(&repo, &layer_id, Some("name"), &layer[..]) + .await + .unwrap(); + + // The example layer has files of sizes 0, 4095, 4096, 4097. + // Files > INLINE_CONTENT_MAX (64 bytes) are stored as external objects. + // So 4095, 4096, and 4097 are all external → 3 objects copied. + assert_eq!( + stats.objects_copied, 3, + "three files above inline threshold should be external objects" + ); + assert_eq!(stats.objects_already_present, 0); + assert!( + stats.bytes_copied > 0, + "bytes_copied should be nonzero for external objects" + ); + assert!( + stats.bytes_inlined > 0, + "bytes_inlined should be nonzero (tar headers + small file)" + ); + } + + #[tokio::test] + async fn test_layer_import_deduplication_stats() { + let layer = example_layer(); + let layer_id = hash_sha256(&layer); + + let (_repo_dir, repo) = create_test_repo(); + + // First import + let (_id, stats1) = import_layer(&repo, &layer_id, None, &layer[..]) + .await + .unwrap(); + assert_eq!(stats1.objects_copied, 3); + assert_eq!(stats1.objects_already_present, 0); + + // Re-import the same layer — the stream already exists so we get + // an early return with zero stats (idempotent). + let (_id, stats2) = import_layer(&repo, &layer_id, None, &layer[..]) + .await + .unwrap(); + assert_eq!(stats2.objects_copied, 0); + assert_eq!(stats2.objects_already_present, 0); + assert_eq!(stats2.bytes_copied, 0); + } + #[test] fn test_write_and_open_config() { use containers_image_proxy::oci_spec::image::{ImageConfigurationBuilder, RootFsBuilder}; @@ -944,6 +1047,72 @@ mod test { ); } + #[tokio::test] + async fn test_config_verity_deterministic() -> Result<()> { + use containers_image_proxy::oci_spec::image::{ImageConfigurationBuilder, RootFsBuilder}; + + let (_repo_dir, repo) = create_test_repo(); + + // Create 3 distinct layers with different content + let mut layers = Vec::new(); + for (name, size) in [("alpha", 1000), ("beta", 2000), ("gamma", 3000)] { + let mut builder = ::tar::Builder::new(vec![]); + append_data(&mut builder, name, size); + let layer = builder.into_inner().unwrap(); + + let diff_id = hash_sha256(&layer); + + let (verity, _stats) = import_layer(&repo, &diff_id, None, &mut layer.as_slice()) + .await + .unwrap(); + layers.push((diff_id.to_string(), verity)); + } + + let diff_ids: Vec = layers.iter().map(|(d, _)| d.clone()).collect(); + let config = ImageConfigurationBuilder::default() + .architecture("amd64") + .os("linux") + .rootfs( + RootFsBuilder::default() + .typ("layers") + .diff_ids(diff_ids.clone()) + .build() + .unwrap(), + ) + .build() + .unwrap(); + + // Build refs HashMaps with different insertion orders to exercise + // that write_config uses config-defined diff_id order, not HashMap order. + let refs1: HashMap, Sha256HashValue> = layers + .iter() + .map(|(d, v)| (d.as_str().into(), v.clone())) + .collect(); + let refs2: HashMap, Sha256HashValue> = layers + .iter() + .rev() + .map(|(d, v)| (d.as_str().into(), v.clone())) + .collect(); + + let (_digest1, verity1) = write_config(&repo, &config, refs1, None, None)?; + let (_digest2, verity2) = write_config(&repo, &config, refs2, None, None)?; + + // The verity must be identical regardless of HashMap iteration order + assert_eq!( + verity1, verity2, + "config verity must be deterministic across calls" + ); + + // Hardcoded expected value to catch any accidental changes + assert_eq!( + verity1.to_hex(), + "4839518dea22749f8ff233e7f7baec65f23dd5336462f46ad6884769af84bf95", + "config verity changed unexpectedly" + ); + + Ok(()) + } + #[test] fn test_open_config_bad_hash() { use containers_image_proxy::oci_spec::image::{ImageConfigurationBuilder, RootFsBuilder}; @@ -1264,12 +1433,17 @@ mod test { let new_manifest_digest = hash_sha256(new_manifest_json.as_bytes()); oci_image::untag_image(repo, "nc:v1").unwrap(); + let layer_verities: Vec<_> = oci_before + .layer_refs() + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); let (_md, new_manifest_verity) = oci_image::write_manifest( repo, &new_manifest, &new_manifest_digest, &new_config_verity, - oci_before.layer_refs(), + &layer_verities, Some("nc:v1"), ) .unwrap(); @@ -1556,12 +1730,16 @@ mod test { let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash_sha256(manifest_json.as_bytes()); + let layer_verities_vec: Vec<_> = layer_verities_map + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); let (_stored_digest, manifest_verity) = oci_image::write_manifest( repo, &manifest, &manifest_digest, &config_verity, - &layer_verities_map, + &layer_verities_vec, Some("whiteout-test:v1"), ) .unwrap(); diff --git a/crates/composefs-oci/src/oci_image.rs b/crates/composefs-oci/src/oci_image.rs index da624254..9cd31b83 100644 --- a/crates/composefs-oci/src/oci_image.rs +++ b/crates/composefs-oci/src/oci_image.rs @@ -51,6 +51,7 @@ use serde::Serialize; use composefs::{fsverity::FsVerityHashValue, repository::Repository}; use crate::ContentAndVerity; +use crate::layer::is_tar_media_type; use crate::skopeo::{OCI_BLOB_CONTENT_TYPE, OCI_CONFIG_CONTENT_TYPE, OCI_MANIFEST_CONTENT_TYPE}; /// Data and named refs from a splitstream with external object storage. @@ -596,12 +597,12 @@ pub fn list_images( /// The manifest becomes a GC root only if a `reference` name is provided. /// The reference name must not contain `@`, which is reserved for digest /// references. -pub fn write_manifest( +pub fn write_manifest>( repo: &Arc>, manifest: &ImageManifest, manifest_digest: &OciDigest, config_verity: &ObjectID, - layer_verities: &HashMap, ObjectID>, + layer_verities: &[(S, ObjectID)], reference: Option<&str>, ) -> Result> { if let Some(name) = reference { @@ -633,7 +634,7 @@ pub fn write_manifest( stream.add_named_stream_ref(&config_key, config_verity); for (diff_id, verity) in layer_verities { - stream.add_named_stream_ref(diff_id, verity); + stream.add_named_stream_ref(diff_id.as_ref(), verity); } stream.write_external(json_bytes)?; @@ -652,12 +653,12 @@ pub fn write_manifest( /// EROFS image ref was added to the config). /// /// If `reference` is provided, the manifest is also tagged with that name. -pub(crate) fn rewrite_manifest( +pub(crate) fn rewrite_manifest>( repo: &Arc>, manifest_json: &[u8], manifest_digest: &OciDigest, config_verity: &ObjectID, - layer_verities: &HashMap, ObjectID>, + layer_verities: &[(S, ObjectID)], reference: Option<&str>, ) -> Result<(OciDigest, ObjectID)> { let content_id = manifest_identifier(manifest_digest); @@ -673,7 +674,7 @@ pub(crate) fn rewrite_manifest( stream.add_named_stream_ref(&config_key, config_verity); for (diff_id, verity) in layer_verities { - stream.add_named_stream_ref(diff_id, verity); + stream.add_named_stream_ref(diff_id.as_ref(), verity); } stream.write_external(manifest_json)?; @@ -697,19 +698,6 @@ pub fn manifest_identifier(digest: &OciDigest) -> String { format!("oci-manifest-{digest}") } -/// Returns true if this is a tar-based layer media type. -pub(crate) fn is_tar_media_type(media_type: &MediaType) -> bool { - matches!( - media_type, - MediaType::ImageLayer - | MediaType::ImageLayerGzip - | MediaType::ImageLayerZstd - | MediaType::ImageLayerNonDistributable - | MediaType::ImageLayerNonDistributableGzip - | MediaType::ImageLayerNonDistributableZstd - ) -} - /// Returns the reference path for an OCI name. fn oci_ref_path(name: &str) -> String { format!("{OCI_REF_PREFIX}{}", encode_tag(name)) @@ -1688,8 +1676,7 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); - layer_verities.insert(layer_digest.to_string().into_boxed_str(), layer_verity); + let layer_verities = [(layer_digest, layer_verity)]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash_sha256(manifest_json.as_bytes()); @@ -1963,12 +1950,8 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); // For artifacts, we use the blob digest as the "diff_id" equivalent - layer_verities.insert( - blob_digest.to_string().into_boxed_str(), - blob_verity.clone(), - ); + let layer_verities = [(blob_digest.clone(), blob_verity.clone())]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash_sha256(manifest_json.as_bytes()); @@ -2081,11 +2064,7 @@ mod test { // Store manifest — layer_verities uses the layer digest as key // (same logic as ensure_config_with_layers when !is_image_config) - let mut layer_verities = HashMap::new(); - layer_verities.insert( - layer_digest.to_string().into_boxed_str(), - layer_verity.clone(), - ); + let layer_verities = [(layer_digest.clone(), layer_verity.clone())]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash_sha256(manifest_json.as_bytes()); @@ -2241,8 +2220,7 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); - layer_verities.insert(diff_id.to_string().into_boxed_str(), layer_verity); + let layer_verities = [(diff_id.clone(), layer_verity)]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash_sha256(manifest_json.as_bytes()); @@ -2700,11 +2678,7 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); - layer_verities.insert( - shared_layer_digest.to_string().into_boxed_str(), - shared_layer_verity.clone(), - ); + let layer_verities = [(shared_layer_digest.clone(), shared_layer_verity.clone())]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash_sha256(manifest_json.as_bytes()); @@ -2865,8 +2839,7 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); - layer_verities.insert(blob_digest.to_string().into_boxed_str(), blob_verity); + let layer_verities = [(blob_digest, blob_verity)]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash_sha256(manifest_json.as_bytes()); @@ -2941,8 +2914,7 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); - layer_verities.insert(blob_digest.to_string().into_boxed_str(), blob_verity); + let layer_verities = [(blob_digest, blob_verity)]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash_sha256(manifest_json.as_bytes()); @@ -3528,8 +3500,7 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); - layer_verities.insert(blob_digest.to_string().into_boxed_str(), blob_verity); + let layer_verities = [(blob_digest.to_string(), blob_verity)]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash_sha256(manifest_json.as_bytes()); @@ -3597,7 +3568,7 @@ mod test { .unwrap(); // Deliberately pass empty layer_verities — no layer refs in manifest - let layer_verities: HashMap, Sha256HashValue> = HashMap::new(); + let layer_verities: Vec<(String, Sha256HashValue)> = Vec::new(); let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash_sha256(manifest_json.as_bytes()); @@ -3750,8 +3721,7 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); - layer_verities.insert(layer_digest.to_string().into_boxed_str(), layer_verity); + let layer_verities = [(layer_digest.to_string(), layer_verity)]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash_sha256(manifest_json.as_bytes()); diff --git a/crates/composefs-oci/src/oci_layout.rs b/crates/composefs-oci/src/oci_layout.rs new file mode 100644 index 00000000..08bb72f6 --- /dev/null +++ b/crates/composefs-oci/src/oci_layout.rs @@ -0,0 +1,428 @@ +//! Direct OCI layout directory import without the skopeo proxy. +//! +//! This module provides a fast path for importing images from local OCI layout +//! directories (the `oci:` transport). Instead of going through the +//! containers-image-proxy (which spawns skopeo as a subprocess), we read the +//! OCI layout directly using the `ocidir` crate. +//! +//! This is significantly faster for local imports since: +//! - No subprocess overhead from skopeo +//! - No IPC/pipe overhead for blob streaming +//! - Direct file I/O instead of proxy protocol parsing +//! +//! The import produces identical results to the proxy path: the same +//! splitstream format with the same content identifiers. + +use std::cmp::Reverse; +use std::collections::HashMap; +use std::io::Read; +use std::path::Path; +use std::sync::Arc; +use std::thread::available_parallelism; + +use anyhow::{Context, Result}; +use cap_std_ext::cap_std; +use containers_image_proxy::oci_spec::image::{Descriptor, Digest as OciDigest, MediaType}; +use fn_error_context::context; +use ocidir::{OciDir, ResolvedManifest}; +use tokio::sync::Semaphore; +use tokio::task::JoinSet; +use tracing::debug; + +use composefs::fsverity::FsVerityHashValue; +use composefs::repository::{ObjectStoreMethod, Repository}; + +use crate::layer::{decompress_async, import_tar_async, is_tar_media_type, store_blob_async}; +use crate::oci_image::manifest_identifier; +use crate::skopeo::OCI_BLOB_CONTENT_TYPE; +use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, OCI_MANIFEST_CONTENT_TYPE}; +use crate::{ImportStats, config_identifier, layer_identifier}; + +use crate::skopeo::PullResult; + +/// Parse an OCI layout reference like "/path/to/dir:tag" or "/path/to/dir". +/// +/// Returns (path, optional_tag). +pub(crate) fn parse_oci_layout_ref(imgref: &str) -> (&str, Option<&str>) { + // The format is: path[:tag] + // We need to be careful: paths can contain colons (on Windows, or weird Unix paths). + // The convention is that if the last colon is after the last slash, it's a tag separator. + + let Some((before_colon, tag)) = imgref.rsplit_once(':') else { + return (imgref, None); + }; + + if tag.contains('/') { + // Slash after the colon means colon is part of the path + (imgref, None) + } else { + // No slash after the colon - it's a tag separator + (before_colon, Some(tag)) + } +} + +/// Resolve a manifest from an OCI layout directory for the current platform. +fn resolve_manifest(ocidir: &OciDir, tag: Option<&str>) -> Result { + ocidir + .open_image_this_platform(tag) + .context("Resolving manifest for platform") +} + +/// Import an image from a local OCI layout directory. +/// +/// This is the fast path for `oci:` transport references. It reads the OCI +/// layout directly without going through skopeo. +#[context("Importing OCI layout from {}", layout_path.display())] +pub async fn import_oci_layout( + repo: &Arc>, + layout_path: &Path, + layout_tag: Option<&str>, +) -> Result<(PullResult, ImportStats)> { + // Open the OCI layout directory + let dir = cap_std::fs::Dir::open_ambient_dir(layout_path, cap_std::ambient_authority()) + .with_context(|| format!("Opening OCI layout directory {}", layout_path.display()))?; + let ocidir = OciDir::open(dir).context("Opening OCI directory")?; + + // Resolve the manifest, with fallback for images lacking platform annotations + let resolved = resolve_manifest(&ocidir, layout_tag)?; + + let manifest = resolved.manifest; + let manifest_descriptor = &resolved.manifest_descriptor; + let manifest_digest = manifest_descriptor.digest().clone(); + + // Import config and layers + let config_descriptor = manifest.config(); + let layers = manifest.layers(); + let (config_digest, config_verity, layer_refs, stats) = + import_config_and_layers(repo, &ocidir, layers, config_descriptor) + .await + .with_context(|| format!("Failed to import config {}", config_descriptor.digest()))?; + + // Store the manifest + let manifest_content_id = manifest_identifier(&manifest_digest); + let manifest_verity = if let Some(verity) = repo.has_stream(&manifest_content_id)? { + debug!("Already have manifest {manifest_digest}"); + verity + } else { + debug!("Storing manifest {manifest_digest}"); + + let mut splitstream = repo.create_stream(OCI_MANIFEST_CONTENT_TYPE)?; + + let config_key = format!("config:{}", config_descriptor.digest()); + splitstream.add_named_stream_ref(&config_key, &config_verity); + + // Add layer refs in config-defined diff_id order + for (diff_id, verity) in &layer_refs { + splitstream.add_named_stream_ref(diff_id.as_ref(), verity); + } + + let mut raw_manifest = Vec::with_capacity(manifest_descriptor.size() as usize); + ocidir + .read_blob(manifest_descriptor) + .context("Reading raw manifest bytes")? + .read_to_end(&mut raw_manifest)?; + splitstream.write_external(&raw_manifest)?; + repo.write_stream(splitstream, &manifest_content_id, None)? + }; + + Ok(( + PullResult { + manifest_digest, + manifest_verity, + config_digest, + config_verity, + }, + stats, + )) +} + +/// Import config and all layers from an OCI layout. +/// +/// Returns (config_digest, config_verity, layer_refs, stats). +/// `layer_refs` is an ordered Vec of (diff_id, verity) pairs preserving the +/// order from the config (or manifest for artifacts). +async fn import_config_and_layers( + repo: &Arc>, + ocidir: &OciDir, + manifest_layers: &[Descriptor], + config_descriptor: &Descriptor, +) -> Result<(OciDigest, ObjectID, Vec<(OciDigest, ObjectID)>, ImportStats)> { + let config_digest: OciDigest = config_descriptor.digest().clone(); + let content_id = config_identifier(&config_digest); + + if let Some(config_id) = repo.has_stream(&content_id)? { + debug!("Already have container config {config_digest}"); + + let (data, named_refs) = crate::oci_image::read_external_splitstream( + repo, + &content_id, + Some(&config_id), + Some(OCI_CONFIG_CONTENT_TYPE), + )?; + let named_refs_map: HashMap<&str, ObjectID> = named_refs + .iter() + .map(|(k, v)| (k.as_ref(), v.clone())) + .collect(); + + let diff_ids = crate::extract_diff_ids( + config_descriptor.media_type(), + data.as_slice(), + manifest_layers, + )?; + + let layer_refs: Vec<(OciDigest, ObjectID)> = diff_ids + .into_iter() + .map(|diff_id| { + let verity = named_refs_map + .get(diff_id.as_ref()) + .with_context(|| format!("missing layer verity for diff_id {diff_id}"))?; + Ok((diff_id, verity.clone())) + }) + .collect::>()?; + + anyhow::ensure!( + layer_refs.len() == manifest_layers.len(), + "expected {} layer refs but got {}", + manifest_layers.len(), + layer_refs.len() + ); + + return Ok((config_digest, config_id, layer_refs, ImportStats::default())); + } + + // Read config blob — we need the raw bytes for splitstream storage below, + // and parse diff_ids from the same buffer via as_slice(). + debug!("Reading config {config_digest}"); + let mut raw_config = Vec::with_capacity(config_descriptor.size() as usize); + ocidir + .read_blob(config_descriptor) + .context("Reading config blob")? + .read_to_end(&mut raw_config)?; + let diff_ids = crate::extract_diff_ids( + config_descriptor.media_type(), + raw_config.as_slice(), + manifest_layers, + )?; + + // Sort layers by size for parallel fetching (largest first) + let mut layers: Vec<_> = manifest_layers.iter().zip(&diff_ids).collect(); + layers.sort_by_key(|(desc, _)| Reverse(desc.size())); + + let threads = available_parallelism()?; + let sem = Arc::new(Semaphore::new(threads.into())); + let mut layer_tasks = JoinSet::new(); + + for (idx, (descriptor, diff_id)) in layers.iter().enumerate() { + let diff_id = (*diff_id).clone(); + let repo = Arc::clone(repo); + let permit = Arc::clone(&sem).acquire_owned().await?; + + let layer_file = ocidir + .read_blob(descriptor) + .with_context(|| format!("Opening layer blob {}", descriptor.digest()))?; + + let media_type = descriptor.media_type().clone(); + + layer_tasks.spawn(async move { + let _permit = permit; + let (verity, layer_stats) = + import_layer_from_file(&repo, &diff_id, layer_file, &media_type).await?; + anyhow::Ok((idx, diff_id, verity, layer_stats)) + }); + } + + // Collect results into a map keyed by diff_id for ordered lookup + let mut verity_map: HashMap = HashMap::new(); + let mut stats = ImportStats::default(); + for result in layer_tasks.join_all().await { + let (_, diff_id, verity, layer_stats) = result?; + verity_map.insert(diff_id, verity); + stats.merge(&layer_stats); + } + + // Build ordered layer_refs from config-defined diff_id order + let layer_refs: Vec<(OciDigest, ObjectID)> = diff_ids + .into_iter() + .map(|diff_id| { + let verity = verity_map + .get(&diff_id) + .with_context(|| format!("missing layer verity for diff_id {diff_id}"))?; + Ok((diff_id, verity.clone())) + }) + .collect::>()?; + + anyhow::ensure!( + layer_refs.len() == manifest_layers.len(), + "expected {} layer refs but got {}", + manifest_layers.len(), + layer_refs.len() + ); + + let mut splitstream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE)?; + for (diff_id, verity) in &layer_refs { + splitstream.add_named_stream_ref(diff_id.as_ref(), verity); + } + + splitstream.write_external(&raw_config)?; + let config_id = repo.write_stream(splitstream, &content_id, None)?; + + Ok((config_digest, config_id, layer_refs, stats)) +} + +/// Import a single layer by streaming from a file handle. +async fn import_layer_from_file( + repo: &Arc>, + diff_id: &OciDigest, + layer_file: std::fs::File, + media_type: &MediaType, +) -> Result<(ObjectID, ImportStats)> { + let content_id = layer_identifier(diff_id); + + if let Some(layer_id) = repo.has_stream(&content_id)? { + debug!("Already have layer {diff_id}"); + return Ok((layer_id, ImportStats::default())); + } + + debug!("Importing layer {diff_id}"); + + // Convert std::fs::File to tokio::fs::File for async I/O + let async_file = tokio::fs::File::from_std(layer_file); + + let (object_id, layer_stats) = if is_tar_media_type(media_type) { + let reader = decompress_async(async_file, media_type)?; + import_tar_async(repo.clone(), reader).await? + } else { + // Non-tar blob: store as object and create splitstream wrapper + let (object_id, size, method) = store_blob_async(repo, async_file).await?; + + let mut stats = ImportStats::default(); + match method { + ObjectStoreMethod::Copied => { + stats.objects_copied += 1; + stats.bytes_copied += size; + } + ObjectStoreMethod::Reflinked => { + stats.objects_reflinked += 1; + stats.bytes_reflinked += size; + } + ObjectStoreMethod::Hardlinked => { + stats.objects_hardlinked += 1; + stats.bytes_hardlinked += size; + } + ObjectStoreMethod::AlreadyPresent => { + stats.objects_already_present += 1; + } + } + + let mut stream = repo.create_stream(OCI_BLOB_CONTENT_TYPE)?; + stream.add_external_size(size); + stream.write_reference(object_id)?; + let stream_id = repo.write_stream(stream, &content_id, None)?; + return Ok((stream_id, stats)); + }; + + // Register the stream with its content identifier + repo.register_stream(&object_id, &content_id, None).await?; + + Ok((object_id, layer_stats)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_oci_layout_ref() { + let cases: &[(&str, (&str, Option<&str>))] = &[ + ("/path/to/oci", ("/path/to/oci", None)), + ("./local/oci", ("./local/oci", None)), + ("ocidir", ("ocidir", None)), + ("/path/to/oci:latest", ("/path/to/oci", Some("latest"))), + ("/path/to/oci:v1.0.0", ("/path/to/oci", Some("v1.0.0"))), + ("./local/oci:mytag", ("./local/oci", Some("mytag"))), + ("ocidir:latest", ("ocidir", Some("latest"))), + ("C:/path/to/oci", ("C:/path/to/oci", None)), + ("C:/path/to/oci:latest", ("C:/path/to/oci", Some("latest"))), + ( + "/path/to/oci:tag:with:colons", + ("/path/to/oci:tag:with", Some("colons")), + ), + ("/path/to/oci:", ("/path/to/oci", Some(""))), + ("ocidir:", ("ocidir", Some(""))), + ("/path:middle/to/oci", ("/path:middle/to/oci", None)), + ( + "/path:middle/to/oci:tag", + ("/path:middle/to/oci", Some("tag")), + ), + ]; + for (input, expected) in cases { + assert_eq!(parse_oci_layout_ref(input), *expected, "input: {input}"); + } + } + + #[tokio::test] + async fn test_wrong_platform_rejected() { + use cap_std_ext::cap_std; + use composefs::fsverity::Sha256HashValue; + use containers_image_proxy::oci_spec::image::{ + Arch, ConfigBuilder, ImageConfigurationBuilder, Os, PlatformBuilder, RootFsBuilder, + }; + + let tempdir = tempfile::tempdir().unwrap(); + let layout_path = tempdir.path(); + + let dir = + cap_std::fs::Dir::open_ambient_dir(layout_path, cap_std::ambient_authority()).unwrap(); + let ocidir = OciDir::ensure(dir).unwrap(); + + // Pick an architecture that differs from the host + let foreign_arch = if Arch::default() == Arch::Amd64 { + "s390x" + } else { + "amd64" + }; + + // Build a minimal image for the foreign platform + let manifest = ocidir.new_empty_manifest().unwrap().build().unwrap(); + let config = ImageConfigurationBuilder::default() + .architecture(foreign_arch) + .os("linux") + .rootfs( + RootFsBuilder::default() + .typ("layers") + .diff_ids(Vec::::new()) + .build() + .unwrap(), + ) + .config(ConfigBuilder::default().build().unwrap()) + .build() + .unwrap(); + let platform = PlatformBuilder::default() + .architecture(foreign_arch) + .os(Os::default()) + .build() + .unwrap(); + ocidir + .insert_manifest_and_config(manifest, config, None, platform) + .unwrap(); + + let repo_dir = tempfile::tempdir().unwrap(); + let repo_path = repo_dir.path().join("repo"); + let (repo, _) = composefs::repository::Repository::::init_path( + rustix::fs::CWD, + &repo_path, + composefs::fsverity::Algorithm::SHA256, + false, + ) + .unwrap(); + let repo = std::sync::Arc::new(repo); + + let result = import_oci_layout(&repo, layout_path, None).await; + let err = result.expect_err("should fail with no matching platform"); + let err_msg = format!("{err:#}"); + assert!( + err_msg.contains("No manifest found for platform"), + "unexpected error: {err_msg}" + ); + } +} diff --git a/crates/composefs-oci/src/skopeo.rs b/crates/composefs-oci/src/skopeo.rs index 977f0a60..f4b93d4b 100644 --- a/crates/composefs-oci/src/skopeo.rs +++ b/crates/composefs-oci/src/skopeo.rs @@ -13,32 +13,26 @@ use std::{cmp::Reverse, process::Command, thread::available_parallelism}; use std::{iter::zip, sync::Arc}; use anyhow::{Context, Result}; -use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; -use containers_image_proxy::oci_spec::image::{ - Descriptor, Digest as OciDigest, ImageConfiguration, MediaType, -}; +use containers_image_proxy::oci_spec::image::{Descriptor, Digest as OciDigest}; use containers_image_proxy::{ - ConvertedLayerInfo, ImageProxy, ImageProxyConfig, OpenedImage, Transport, + ConvertedLayerInfo, ImageProxy, ImageProxyConfig, ImageReference, OpenedImage, Transport, }; use fn_error_context::context; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; + use rustix::process::geteuid; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt, BufReader}, - sync::Semaphore, - task::JoinSet, -}; +use tokio::{io::AsyncReadExt, sync::Semaphore, task::JoinSet}; use composefs::{ fsverity::FsVerityHashValue, repository::{ObjectStoreMethod, Repository}, - shared_internals::IO_BUF_CAPACITY, }; use crate::{ - ContentAndVerity, ImportStats, config_identifier, layer_identifier, - oci_image::{is_tar_media_type, manifest_identifier, tag_image}, - tar::split_async, + ContentAndVerity, ImportStats, config_identifier, + layer::{decompress_async, import_tar_async, is_tar_media_type, store_blob_async}, + layer_identifier, + oci_image::{manifest_identifier, tag_image}, }; /// Result of pulling an OCI image. @@ -88,15 +82,14 @@ struct ImageOp { impl ImageOp { async fn new( repo: &Arc>, - imgref: &str, + image_ref: &ImageReference, img_proxy_config: Option, ) -> Result { // Fail fast if the repository is not writable, before starting // the image proxy or doing any network I/O. repo.ensure_writable()?; - // Detect transport from image reference - let transport = Transport::try_from(imgref).context("Failed to get image transport")?; + let transport = image_ref.transport; // See https://github.com/containers/skopeo/issues/2563 let skopeo_cmd = if transport == Transport::ContainerStorage && !geteuid().is_root() { @@ -108,10 +101,22 @@ impl ImageOp { }; // See https://github.com/containers/skopeo/issues/2750 - let imgref = if let Some(hash) = imgref.strip_prefix("containers-storage:sha256:") { - &format!("containers-storage:{hash}") // yay temporary lifetime extension! + // ImageReference.name for containers-storage: is already without the + // transport prefix (e.g. "sha256:abc" not "containers-storage:sha256:abc"). + // Skopeo expects "abc" without the "sha256:" prefix for digest references. + let fixup_ref; + let image_ref = if transport == Transport::ContainerStorage { + if let Some(hash) = image_ref.name.strip_prefix("sha256:") { + fixup_ref = ImageReference { + transport, + name: hash.to_string(), + }; + &fixup_ref + } else { + image_ref + } } else { - imgref + image_ref }; let config = match img_proxy_config { @@ -124,18 +129,19 @@ impl ImageOp { } None => { - ImageProxyConfig { - skopeo_cmd, - // auth_anonymous: true, debug: true, insecure_skip_tls_verification: None, - ..ImageProxyConfig::default() - } + let mut conf = ImageProxyConfig::default(); + conf.skopeo_cmd = skopeo_cmd; + conf } }; let proxy = containers_image_proxy::ImageProxy::new_with_config(config) .await .context("Creating ImageProxy")?; - let img = proxy.open_image(imgref).await.context("Opening image")?; + let img = proxy + .open_image_ref(image_ref) + .await + .context("Opening image")?; let progress = MultiProgress::new(); Ok(ImageOp { repo: Arc::clone(repo), @@ -201,40 +207,12 @@ impl ImageOp { let media_type = descriptor.media_type(); let (object_id, layer_stats) = if is_tar_media_type(media_type) { // Tar layers: decompress and split into a splitstream - let reader: Box = match media_type { - MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => { - Box::new(BufReader::with_capacity(IO_BUF_CAPACITY, progress)) - } - MediaType::ImageLayerGzip | MediaType::ImageLayerNonDistributableGzip => { - Box::new(BufReader::with_capacity( - IO_BUF_CAPACITY, - GzipDecoder::new(BufReader::new(progress)), - )) - } - MediaType::ImageLayerZstd | MediaType::ImageLayerNonDistributableZstd => { - Box::new(BufReader::with_capacity( - IO_BUF_CAPACITY, - ZstdDecoder::new(BufReader::new(progress)), - )) - } - _ => unreachable!("is_tar_media_type returned true"), - }; - split_async(reader, self.repo.clone(), TAR_LAYER_CONTENT_TYPE).await? + let reader = decompress_async(progress, media_type)?; + import_tar_async(self.repo.clone(), reader).await? } else { - // Non-tar layers (OCI artifacts like SBOMs, disk images, - // etc.): stream the raw bytes into a repository object and - // create a splitstream with a single external reference. - // This avoids buffering arbitrarily large blobs in memory - // and lets callers get an fd to the object directly via - // open_object(). - let tmpfile = self.repo.create_object_tmpfile()?; - let mut writer = tokio::fs::File::from(std::fs::File::from(tmpfile)); - let mut reader = progress; - let size = tokio::io::copy(&mut reader, &mut writer).await?; - writer.flush().await?; - let tmpfile = writer.into_std().await; + // Non-tar layers (OCI artifacts): stream raw bytes to object store + let (object_id, size, method) = store_blob_async(&self.repo, progress).await?; driver.await?; - let (object_id, method) = self.repo.finalize_object_tmpfile(tmpfile, size)?; let mut stats = ImportStats::default(); match method { @@ -258,8 +236,6 @@ impl ImageOp { let mut stream = self.repo.create_stream(OCI_BLOB_CONTENT_TYPE)?; stream.add_external_size(size); stream.write_reference(object_id)?; - // write_stream handles both object storage and stream - // registration, so we return directly. let stream_id = self.repo.write_stream(stream, &content_id, None)?; return Ok((stream_id, stats)); }; @@ -278,35 +254,54 @@ impl ImageOp { } /// Ensure config is present and return layer verities along with config info. + /// + /// Returns (config_digest, config_verity, layer_refs, stats). + /// `layer_refs` is an ordered Vec of (diff_id, verity) pairs preserving the + /// order from the config (or manifest for artifacts). async fn ensure_config_with_layers( self: &Arc, manifest_layers: &[Descriptor], descriptor: &Descriptor, - ) -> Result<( - OciDigest, - ObjectID, - std::collections::HashMap, - ImportStats, - )> { + ) -> Result<(OciDigest, ObjectID, Vec<(OciDigest, ObjectID)>, ImportStats)> { let config_digest = descriptor.digest(); let content_id = config_identifier(config_digest); if let Some(config_id) = self.repo.has_stream(&content_id)? { - // We already got this config - need to read the layer refs from it + // We already got this config - need to read the layer refs and diff_ids from it self.progress .println(format!("Already have container config {config_digest}"))?; - let stream = self.repo.open_stream( + let (data, named_refs) = crate::oci_image::read_external_splitstream( + &self.repo, &content_id, Some(&config_id), Some(OCI_CONFIG_CONTENT_TYPE), )?; - let layer_refs: std::collections::HashMap = stream - .into_named_refs() - .into_iter() - .map(|(k, v)| (k.to_string(), v)) + let named_refs_map: std::collections::HashMap<&str, ObjectID> = named_refs + .iter() + .map(|(k, v)| (k.as_ref(), v.clone())) .collect(); + let diff_ids = + crate::extract_diff_ids(descriptor.media_type(), data.as_slice(), manifest_layers)?; + + let layer_refs: Vec<(OciDigest, ObjectID)> = diff_ids + .into_iter() + .map(|diff_id| { + let verity = named_refs_map + .get(diff_id.as_ref()) + .with_context(|| format!("missing layer verity for diff_id {diff_id}"))?; + Ok((diff_id, verity.clone())) + }) + .collect::>()?; + + anyhow::ensure!( + layer_refs.len() == manifest_layers.len(), + "expected {} layer refs but got {}", + manifest_layers.len(), + layer_refs.len() + ); + Ok(( descriptor.digest().clone(), config_id, @@ -333,18 +328,11 @@ impl ImageOp { // custom media type — not a standard image config. In that case // there are no diff_ids, so we use the manifest layer digests. // [1]: https://github.com/opencontainers/image-spec/blob/main/artifacts-guidance.md - let is_image_config = *descriptor.media_type() == MediaType::ImageConfig; - let diff_ids: Vec = if is_image_config { - let config = ImageConfiguration::from_reader(&raw_config[..])?; - config - .rootfs() - .diff_ids() - .iter() - .map(|s| s.parse().context("Invalid diff_id in config")) - .collect::>()? - } else { - manifest_layers.iter().map(|d| d.digest().clone()).collect() - }; + let diff_ids = crate::extract_diff_ids( + descriptor.media_type(), + raw_config.as_slice(), + manifest_layers, + )?; // Sort layers by size for parallel fetching let mut layers: Vec<_> = zip(manifest_layers, &diff_ids).collect(); @@ -362,7 +350,7 @@ impl ImageOp { }; for (idx, (mld, diff_id)) in layers.into_iter().enumerate() { - let diff_id_ = diff_id.clone(); + let diff_id = diff_id.clone(); let self_ = Arc::clone(self); let permit = Arc::clone(&sem).acquire_owned().await?; let descriptor = mld.clone(); @@ -377,27 +365,42 @@ impl ImageOp { layer_tasks.spawn(async move { let _permit = permit; let (verity, layer_stats) = self_ - .ensure_layer(&diff_id_, &descriptor, uncompressed_layer_info, layer_idx) + .ensure_layer(&diff_id, &descriptor, uncompressed_layer_info, layer_idx) .await?; - anyhow::Ok((idx, diff_id_, verity, layer_stats)) + anyhow::Ok((idx, diff_id, verity, layer_stats)) }); } - // Collect results and sort by index for deterministic ordering - let mut results: Vec<_> = layer_tasks - .join_all() - .await + // Collect results into a map keyed by diff_id for ordered lookup + let mut verity_map = std::collections::HashMap::new(); + let mut stats = ImportStats::default(); + for result in layer_tasks.join_all().await { + let (_, diff_id, verity, layer_stats) = result?; + verity_map.insert(diff_id, verity); + stats.merge(&layer_stats); + } + + // Build ordered layer_refs from config-defined diff_id order + let layer_refs: Vec<(OciDigest, ObjectID)> = diff_ids .into_iter() - .collect::>()?; - results.sort_by_key(|(idx, _, _, _)| *idx); + .map(|diff_id| { + let verity = verity_map + .get(&diff_id) + .with_context(|| format!("missing layer verity for diff_id {diff_id}"))?; + Ok((diff_id, verity.clone())) + }) + .collect::>()?; + + anyhow::ensure!( + layer_refs.len() == manifest_layers.len(), + "expected {} layer refs but got {}", + manifest_layers.len(), + layer_refs.len() + ); let mut splitstream = self.repo.create_stream(OCI_CONFIG_CONTENT_TYPE)?; - let mut layer_refs = std::collections::HashMap::new(); - let mut stats = ImportStats::default(); - for (_, diff_id, verity, layer_stats) in results { - splitstream.add_named_stream_ref(diff_id.as_ref(), &verity); - layer_refs.insert(diff_id.to_string(), verity); - stats.merge(&layer_stats); + for (diff_id, verity) in &layer_refs { + splitstream.add_named_stream_ref(diff_id.as_ref(), verity); } // Store config as external object for independent fsverity @@ -423,7 +426,7 @@ impl ImageOp { )?; let config_descriptor = manifest.config(); let layers = manifest.layers(); - let (config_digest, config_verity, layer_verities, stats) = self + let (config_digest, config_verity, layer_refs, stats) = self .ensure_config_with_layers(layers, config_descriptor) .await .with_context(|| format!("Failed to pull config {config_descriptor:?}"))?; @@ -442,8 +445,9 @@ impl ImageOp { let config_key = format!("config:{}", config_descriptor.digest()); splitstream.add_named_stream_ref(&config_key, &config_verity); - for (diff_id, verity) in &layer_verities { - splitstream.add_named_stream_ref(diff_id, verity); + // Add layer refs in config-defined diff_id order + for (diff_id, verity) in &layer_refs { + splitstream.add_named_stream_ref(diff_id.as_ref(), verity); } // Store the raw manifest bytes as an external object for fsverity @@ -469,6 +473,9 @@ impl ImageOp { /// Returns `PullResult` containing both manifest and config digests/verities. /// If `reference` is provided, the manifest is also stored under that name. /// +/// For `oci:` transport (local OCI layout directories), this uses a fast path +/// that reads the layout directly without going through the skopeo proxy. +/// /// Note: For backward compatibility, use `.into_config()` on the result to get /// the (config_digest, config_verity) tuple that was previously returned. pub async fn pull_image( @@ -477,11 +484,24 @@ pub async fn pull_image( reference: Option<&str>, img_proxy_config: Option, ) -> Result<(PullResult, ImportStats)> { - let op = Arc::new(ImageOp::new(repo, imgref, img_proxy_config).await?); - let (result, stats) = op - .pull() - .await - .with_context(|| format!("Unable to pull container image {imgref}"))?; + // Fail fast if the repository is not writable, before doing any I/O. + repo.ensure_writable()?; + + let image_ref = + ImageReference::try_from(imgref).context("Parsing image reference transport")?; + + // Fast path: read local OCI layout directories directly without skopeo + let (result, stats) = if image_ref.transport == Transport::OciDir { + let (path_str, layout_tag) = crate::oci_layout::parse_oci_layout_ref(&image_ref.name); + let layout_path = std::path::Path::new(path_str); + crate::oci_layout::import_oci_layout(repo, layout_path, layout_tag).await? + } else { + // Standard path: use skopeo proxy for other transports + let op = Arc::new(ImageOp::new(repo, &image_ref, img_proxy_config).await?); + op.pull() + .await + .with_context(|| format!("Unable to pull container image {imgref}"))? + }; // Generate the composefs EROFS image and link it to the config splitstream. // For container images this rewrites the config+manifest with the EROFS ref diff --git a/crates/composefs-oci/src/test_util.rs b/crates/composefs-oci/src/test_util.rs index 8ff04b13..73829c4a 100644 --- a/crates/composefs-oci/src/test_util.rs +++ b/crates/composefs-oci/src/test_util.rs @@ -291,12 +291,14 @@ async fn create_multi_layer_image( let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash(manifest_json.as_bytes()); + let layer_verities_vec: Vec<(Box, Sha256HashValue)> = + layer_verities_map.into_iter().collect(); let (_stored_digest, manifest_verity) = write_manifest( repo, &manifest, &manifest_digest, &config_verity, - &layer_verities_map, + &layer_verities_vec, tag, ) .unwrap(); diff --git a/crates/composefs/src/repository.rs b/crates/composefs/src/repository.rs index 4dcaf381..73047eed 100644 --- a/crates/composefs/src/repository.rs +++ b/crates/composefs/src/repository.rs @@ -100,8 +100,8 @@ use fn_error_context::context; use once_cell::sync::OnceCell; use rustix::{ fs::{ - Access, AtFlags, CWD, Dir, FileType, FlockOperation, Mode, OFlags, accessat, flock, linkat, - mkdirat, openat, readlinkat, statat, syncfs, unlinkat, + Access, AtFlags, CWD, Dir, FileType, FlockOperation, Mode, OFlags, StatVfsMountFlags, + accessat, flock, fstatvfs, linkat, mkdirat, openat, readlinkat, statat, syncfs, unlinkat, }, io::{Errno, Result as ErrnoResult}, }; @@ -1887,6 +1887,17 @@ impl Repository { /// Like [`ensure_writable`] but returns a proof token for internal use. pub(crate) fn ensure_writable_token(&self) -> Result { + // fstatvfs catches read-only mounts (ST_RDONLY). faccessat(W_OK) + // alone is insufficient because it only checks DAC permission bits + // and root bypasses those, so a root process on a read-only + // bind-mounted repo would pass the faccessat check. Conversely, + // fstatvfs alone misses writable filesystems where the caller lacks + // write permission (e.g. a repo owned by another user), so we follow + // up with faccessat to catch that case. + let st = fstatvfs(&self.repository).context("Repository is not writable")?; + if st.f_flag.contains(StatVfsMountFlags::RDONLY) { + anyhow::bail!("Repository is not writable: read-only file system"); + } accessat(&self.repository, ".", Access::WRITE_OK, AtFlags::empty()) .context("Repository is not writable")?; Ok(WritableRepo) diff --git a/crates/cstorage/Cargo.toml b/crates/cstorage/Cargo.toml index e4e524d8..b93c3a1e 100644 --- a/crates/cstorage/Cargo.toml +++ b/crates/cstorage/Cargo.toml @@ -18,7 +18,7 @@ cap-std-ext = { version = "4.0", default-features = false } crc = { version = "3.0", default-features = false } flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] } jsonrpc-fdpass = { workspace = true, optional = true } -oci-spec = { version = "0.8", default-features = false, features = ["image"] } +oci-spec = { version = "0.9", default-features = false, features = ["image"] } rustix = { version = "1.0", default-features = false, features = ["fs", "std", "process", "thread"] } serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = { version = "1.0", default-features = false, features = ["std"] } diff --git a/crates/integration-tests/Cargo.toml b/crates/integration-tests/Cargo.toml index 718d3b1b..d9ac25a0 100644 --- a/crates/integration-tests/Cargo.toml +++ b/crates/integration-tests/Cargo.toml @@ -31,7 +31,7 @@ path = "src/cleanup.rs" [dependencies] anyhow = "1" -cap-std-ext = "5.0" +cap-std-ext = { workspace = true } composefs = { workspace = true } # Only the test_util module is used — for creating test OCI images. # All verification must go through the cfsctl CLI. @@ -39,7 +39,7 @@ composefs-oci = { workspace = true, features = ["test", "boot", "containers-stor hex = "0.4" libtest-mimic = "0.8" linkme = "0.3" -ocidir = "0.7" +ocidir = { workspace = true } paste = "1" rustix = { version = "1", features = ["fs", "process"] } serde = { version = "1", features = ["derive"] }