Skip to content
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ composefs-ioctls = { version = "0.3.0", path = "crates/composefs-ioctls", defaul
composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false }
composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false }
composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false }
cap-std-ext = "5.0"
ocidir = "0.7.2"

# JSON-RPC with FD passing for userns helper
jsonrpc-fdpass = { version = "0.1.0", default-features = false }
Expand Down
4 changes: 3 additions & 1 deletion crates/composefs-oci/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ base64 = { version = "0.22", default-features = false, features = ["std"], optio
bytes = { version = "1", default-features = false }
composefs = { workspace = true }
composefs-boot = { workspace = true, optional = true }
containers-image-proxy = { version = "0.9.2", default-features = false }
containers-image-proxy = { version = "0.10", default-features = false }
cstorage = { path = "../cstorage", version = "0.3.0", optional = true }
hex = { version = "0.4.0", default-features = false }
indicatif = { version = "0.18.0", default-features = false, features = ["tokio"] }
Expand All @@ -39,6 +39,8 @@ tar-core = "0.1.0"
tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread"] }
tokio-util = { version = "0.7", default-features = false, features = ["io"] }
tracing = { version = "0.1", default-features = false }
cap-std-ext = { workspace = true }
ocidir = { workspace = true }

[dev-dependencies]
cap-std = { version = "4.0.0", default-features = false }
Expand Down
6 changes: 5 additions & 1 deletion crates/composefs-oci/src/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ pub fn remove_boot_image<ObjectID: FsVerityHashValue>(
)?;

let manifest_json = img.read_manifest_json(repo)?;
let layer_verities = img.layer_refs().clone();
let layer_verities: Vec<_> = img
.layer_refs()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

crate::oci_image::rewrite_manifest(
repo,
Expand Down
99 changes: 99 additions & 0 deletions crates/composefs-oci/src/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! Shared layer import logic for OCI container images.
//!
//! This module provides common functionality for importing OCI image layers
//! into a composefs repository, shared between the skopeo proxy path and
//! direct OCI layout import.

use std::sync::Arc;

use anyhow::{Result, bail};
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
use containers_image_proxy::oci_spec::image::MediaType;
use tokio::io::{AsyncRead, AsyncWriteExt, BufReader};

use composefs::fsverity::FsVerityHashValue;
use composefs::repository::{ObjectStoreMethod, Repository};
use composefs::shared_internals::IO_BUF_CAPACITY;

use crate::skopeo::TAR_LAYER_CONTENT_TYPE;
use crate::tar::split_async;

/// Check if a media type represents a tar-based layer.
pub fn is_tar_media_type(media_type: &MediaType) -> bool {
matches!(
media_type,
MediaType::ImageLayer
| MediaType::ImageLayerGzip
| MediaType::ImageLayerZstd
| MediaType::ImageLayerNonDistributable
| MediaType::ImageLayerNonDistributableGzip
| MediaType::ImageLayerNonDistributableZstd
)
}

/// Wrap an async reader with the appropriate decompressor for the media type.
///
/// Returns a boxed reader that decompresses the stream if needed.
/// The output is `AsyncRead` (not `AsyncBufRead`) because `split_async`
/// does its own buffering via `BytesMut`.
pub fn decompress_async<'a, R>(
reader: R,
media_type: &MediaType,
) -> Result<Box<dyn AsyncRead + Unpin + Send + 'a>>
where
R: AsyncRead + Unpin + Send + 'a,
{
let buf = BufReader::new(reader);
let reader: Box<dyn AsyncRead + Unpin + Send> = match media_type {
MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => {
Box::new(BufReader::with_capacity(IO_BUF_CAPACITY, buf))
}
MediaType::ImageLayerGzip | MediaType::ImageLayerNonDistributableGzip => Box::new(
BufReader::with_capacity(IO_BUF_CAPACITY, GzipDecoder::new(buf)),
),
MediaType::ImageLayerZstd | MediaType::ImageLayerNonDistributableZstd => Box::new(
BufReader::with_capacity(IO_BUF_CAPACITY, ZstdDecoder::new(buf)),
),
_ => bail!("Unsupported layer media type for decompression: {media_type}"),
};
Ok(reader)
}

/// Import a tar layer from an async reader into the repository.
///
/// The reader should already be decompressed (use `decompress_async` first).
/// Returns the fs-verity object ID and import stats of the imported splitstream.
pub async fn import_tar_async<ObjectID, R>(
repo: Arc<Repository<ObjectID>>,
reader: R,
) -> Result<(ObjectID, crate::ImportStats)>
where
ObjectID: FsVerityHashValue,
R: AsyncRead + Unpin + Send,
{
split_async(reader, repo, TAR_LAYER_CONTENT_TYPE).await
}

/// Store raw bytes from an async reader as a repository object.
///
/// Streams the raw bytes into a repository object without creating a splitstream.
/// Use this for non-tar blobs (OCI artifacts) where the caller will create
/// the splitstream wrapper.
///
/// Returns (object_id, size, store_method) of the stored object.
pub async fn store_blob_async<ObjectID, R>(
repo: &Repository<ObjectID>,
mut reader: R,
) -> Result<(ObjectID, u64, ObjectStoreMethod)>
where
ObjectID: FsVerityHashValue,
R: AsyncRead + Unpin,
{
let tmpfile = repo.create_object_tmpfile()?;
let mut writer = tokio::fs::File::from(std::fs::File::from(tmpfile));
let size = tokio::io::copy(&mut reader, &mut writer).await?;
writer.flush().await?;
let tmpfile = writer.into_std().await;
let (object_id, method) = repo.finalize_object_tmpfile(tmpfile, size)?;
Ok((object_id, size, method))
}
Loading