From ea812508f3e3c55f568b5f1a7b67ff35ac47773f Mon Sep 17 00:00:00 2001 From: Danil Shaymurzin Date: Fri, 16 Jan 2026 20:46:05 +0500 Subject: [PATCH] feat: add replication factor support and chunk location registration - Add replication_factor parameter to allocate_chunks and Put command - Implement register_chunk_location method in client - Update chunk download to use hex format for CID --- src/client.rs | 27 ++++++++++++++++++++++++++- src/commands.rs | 14 +++++++------- src/main.rs | 6 ++++-- 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/src/client.rs b/src/client.rs index 4c225f7..2d3712a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -18,7 +18,7 @@ impl MonoceClient { } } - pub async fn allocate_chunks(&self, chunks: &[(Cid, u64)]) -> Result { + pub async fn allocate_chunks(&self, chunks: &[(Cid, u64)], replication_factor: u32) -> Result { let url = format!("{}/api/v1/sessions", self.endpoint); let request = AllocateRequest { chunks: chunks.iter() @@ -27,6 +27,7 @@ impl MonoceClient { size: *size, }) .collect(), + replication_factor, }; let response = self @@ -61,6 +62,29 @@ impl MonoceClient { .error_for_status() .map_err(map_reqwest_err)?; + // Register chunk location with gateway + self.register_chunk_location(&cid.to_hex(), node_addr).await?; + + Ok(()) + } + + pub async fn register_chunk_location(&self, cid_hex: &str, node_addr: &str) -> Result<()> { + let url = format!("{}/api/v1/chunks/{}/locations", self.endpoint, cid_hex); + let node_address = if node_addr.starts_with("http://") || node_addr.starts_with("https://") { + node_addr.to_string() + } else { + format!("http://{}", node_addr) + }; + + self.http + .post(&url) + .json(&serde_json::json!({"node_address": node_address})) + .send() + .await + .map_err(map_reqwest_err)? + .error_for_status() + .map_err(map_reqwest_err)?; + Ok(()) } @@ -264,6 +288,7 @@ impl MonoceClient { #[derive(Serialize)] struct AllocateRequest { chunks: Vec, + replication_factor: u32, } #[derive(Serialize)] diff --git a/src/commands.rs b/src/commands.rs index c1f7043..91d512b 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -10,7 +10,7 @@ use std::path::Path; use crate::chunker::{Chunker, DEFAULT_CHUNK_SIZE}; use crate::client::{MonoceClient, NamespaceEntry}; -pub async fn put(endpoint: &str, path: &str, name: Option<&str>) -> Result<()> { +pub async fn put(endpoint: &str, path: &str, name: Option<&str>, replication_factor: u32) -> Result<()> { let file = File::open(path).context("failed to open file")?; let file_size = file.metadata()?.len(); let file_name = Path::new(path) @@ -32,7 +32,7 @@ pub async fn put(endpoint: &str, path: &str, name: Option<&str>) -> Result<()> { .map(|c| (c.cid.clone(), c.size)) .collect(); let allocation = client - .allocate_chunks(&chunk_info) + .allocate_chunks(&chunk_info, replication_factor) .await .context("failed to allocate chunks")?; @@ -155,19 +155,19 @@ pub async fn get(endpoint: &str, target: &str, output: Option<&str>) -> Result<( println!("Downloading chunks..."); for (i, chunk_ref) in manifest.chunks.iter().enumerate() { - let chunk_cid = chunk_ref.chunk_cid.to_string(); + let chunk_cid_hex = chunk_ref.chunk_cid.to_hex(); let locations = client - .get_chunk_locations(&chunk_cid) + .get_chunk_locations(&chunk_cid_hex) .await .context("failed to get chunk locations")?; if locations.is_empty() { - anyhow::bail!("no nodes available for chunk {}", chunk_cid); + anyhow::bail!("no nodes available for chunk {}", chunk_cid_hex); } let mut downloaded = false; for node_addr in &locations { - match client.download_chunk(&chunk_cid, node_addr).await { + match client.download_chunk(&chunk_cid_hex, node_addr).await { Ok(data) => { writer.write_all(&data)?; downloaded = true; @@ -180,7 +180,7 @@ pub async fn get(endpoint: &str, target: &str, output: Option<&str>) -> Result<( } if !downloaded { - anyhow::bail!("failed to download chunk {} from any node", chunk_cid); + anyhow::bail!("failed to download chunk {} from any node", chunk_cid_hex); } print!("\r [{}/{}] chunks", i + 1, manifest.chunks.len()); diff --git a/src/main.rs b/src/main.rs index d4e3f86..64839c9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,8 @@ enum Commands { path: String, #[arg(long, help = "Target namespace path")] name: Option, + #[arg(short = 'r', long, default_value = "1", help = "Replication factor")] + replication_factor: u32, }, Get { #[arg(help = "CID or namespace path")] @@ -50,8 +52,8 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); match cli.command { - Commands::Put { path, name } => { - commands::put(&cli.endpoint, &path, name.as_deref()).await?; + Commands::Put { path, name, replication_factor } => { + commands::put(&cli.endpoint, &path, name.as_deref(), replication_factor).await?; } Commands::Get { target, output } => { commands::get(&cli.endpoint, &target, output.as_deref()).await?;