feat(p2p): add client-side p2p read acceleration with runtime policy#671
feat(p2p): add client-side p2p read acceleration with runtime policy#671jlon wants to merge 6 commits intoCurvineIO:mainfrom
Conversation
- implement client-to-client p2p read path with cache sharing and fallback - add runtime policy sync, signature verification, and policy metrics - add p2p observability, e2e/minicluster coverage, and production gate script
|
Conflict resolved on feature/p2p by rebasing onto latest main.
Please re-run CI for PR validation. |
There was a problem hiding this comment.
Pull request overview
This PR introduces a client-side P2P read acceleration pipeline for the Curvine distributed filesystem. It adds libp2p-based chunk transfer and discovery, a TTL/LRU cache manager with checksum integrity, a master-driven runtime policy system for peer/tenant whitelisting with signed policy versions, and extended observability (metrics, trace context). The implementation is focused on the read path and policy synchronization foundation.
Changes:
- Adds
curvine-client/src/p2p/module withservice.rs,cache_manager.rs,discovery.rs,transfer.rsimplementing the full P2P stack (libp2p transport, chunk cache, policy enforcement). - Introduces
version_epochtracking onInodeFile/FileStatusfor cache invalidation, plus master-side P2P runtime policy RPC (GetP2pPolicy) with HMAC-signed version transitions. - Extends
ClientMetricsandMasterMetricswith P2P and labeled read-source metrics; adds comprehensive integration tests and a production gate script.
Reviewed changes
Copilot reviewed 35 out of 38 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
curvine-client/src/p2p/service.rs |
Core P2P service: libp2p swarm, chunk fetch/publish, QPS limiter, runtime policy sync |
curvine-client/src/p2p/cache_manager.rs |
Chunk cache with TTL, LRU eviction, checksum, mtime validation |
curvine-client/src/p2p/discovery.rs |
Discovery snapshot abstraction over mDNS/DHT/bootstrap |
curvine-client/src/p2p/transfer.rs |
ChunkId and transfer request/response types |
curvine-client/src/p2p/mod.rs |
P2P module re-exports and shared sha256_bytes helper |
curvine-client/src/block/block_reader.rs |
Integrates P2P fetch and chunk cache into the block read path |
curvine-client/src/file/fs_context.rs |
Wires P2P service, read chunk cache, and flight dedup into FsContext |
curvine-client/src/file/curvine_filesystem.rs |
Adds sync_p2p_runtime_policy and sync_p2p_metrics |
curvine-client/src/client_metrics.rs |
Adds P2P metrics, labeled read-source counters, and delta computation |
curvine-server/src/master/master_metrics.rs |
Extends get_or_register to handle Gauge/Counter/Histogram metric types correctly |
curvine-server/src/master/master_handler.rs |
Adds GetP2pPolicy RPC handler |
curvine-server/src/master/fs/master_filesystem.rs |
Implements p2p_runtime_policy() with dual-key signature |
curvine-server/src/master/meta/inode/inode_file.rs |
Adds version_epoch field with bump-on-mutate logic |
curvine-server/src/master/meta/inode/inode_view.rs |
Exposes version_epoch in FileStatus |
curvine-common/src/utils/common_utils.rs |
Adds sign_p2p_policy / verify_p2p_policy_signatures |
curvine-common/src/conf/client_conf.rs |
New ClientP2pConf with validation |
curvine-common/src/conf/master_conf.rs |
P2P policy config fields |
curvine-common/src/state/file_status.rs |
Adds version_epoch to FileStatus |
curvine-common/src/state/master_info.rs |
Adds P2P policy fields to MasterInfo |
curvine-common/src/fs/rpc_code.rs |
New GetP2pPolicy = 61 RPC code |
curvine-common/proto/master.proto |
New GetP2pPolicyRequest/Response messages |
curvine-common/proto/common.proto |
Adds version_epoch to FileStatusProto |
curvine-common/src/utils/proto_utils.rs |
Round-trip support for new fields |
curvine-common/Cargo.toml |
Adds sha2 dependency |
curvine-client/Cargo.toml |
Adds libp2p, sha2, serde dependencies |
curvine-client/src/file/fs_reader_base.rs |
Extracts tenant_id/job_id labels for P2P metrics |
curvine-client/src/file/mod.rs |
Re-exports ReadChunkKey |
curvine-tests/tests/p2p_read_acceleration_test.rs |
Integration tests for P2P policy sync, fallback, and disconnect |
curvine-client/tests/p2p_e2e_test.rs |
E2E unit tests for cache reuse and mtime mismatch |
orpc/src/common/metrics.rs |
Adds try_into_counter, try_into_gauge, try_into_gauge_vec, try_into_histogram |
scripts/tests/p2p-production-gate.sh |
Production gate shell script for required P2P tests |
Makefile |
Adds p2p-prod-gate target |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| MetricType::Histogram => { | ||
| if label_keys.is_empty() { | ||
| if let Ok(metric) = v.clone().try_into_counter() { | ||
| return Ok(RegisteredMetric::Counter(metric)); | ||
| } | ||
| if v.clone().try_into_histogram().is_ok() { | ||
| return Ok(RegisteredMetric::Skip); | ||
| } | ||
| } | ||
| if let Ok(metric) = v.clone().try_into_counter_vec() { | ||
| return Ok(RegisteredMetric::CounterVec(metric)); | ||
| } | ||
| if v.clone().try_into_histogram_vec().is_ok() { | ||
| return Ok(RegisteredMetric::Skip); | ||
| } | ||
| v.try_into_counter_vec().map(RegisteredMetric::CounterVec) | ||
| } |
There was a problem hiding this comment.
In get_or_register for the Histogram metric type with an empty label_keys slice (lines 229-235), the code first tries try_into_counter() and then try_into_histogram(), returning Skip if the existing metric is a Histogram. However, if the existing metric is a HistogramVec (labeled variant) but label_keys is empty, neither branch matches, so execution falls through to line 243 (v.try_into_counter_vec().map(...)) which will always fail and return an error. The result is that metrics_report will emit a warn! and continue rather than correctly skipping the existing HistogramVec. This is a silent miss case that the test histogram_metric_should_skip_existing_histogram_vec does not cover (that test uses labeled tags, so label_keys is not empty).
There was a problem hiding this comment.
已复核,这条在当前代码下不成立:MetricType::Histogram 分支在 label_keys 为空时会先判 Histogram,非空路径也会判 HistogramVec 并返回 Skip,不会落到 warn。这里本次不改动(commit 5a7807c)。
| async fn acquire_read_chunk_flight(&self, read_key: &ReadChunkKey) -> Option<ReadChunkFlight> { | ||
| if !self.fs_context.read_chunk_cache_enabled() { | ||
| return None; | ||
| } | ||
| let lock = self.fs_context.read_chunk_flight_lock(read_key.clone()); | ||
| let guard = lock.clone().lock_owned().await; | ||
| Some((lock, guard)) |
There was a problem hiding this comment.
The acquire_read_chunk_flight lock in BlockReader::read uses a regular async lock().await with no timeout. If another task holding the flight guard panics or hangs, this lock acquisition will block indefinitely and could cause reader tasks to stall forever. By contrast, the P2P service's own fetch_flight_lock acquisition properly applies transfer_timeout. A timeout should be applied here as well (e.g., using tokio::time::timeout).
There was a problem hiding this comment.
认可,已修复。在 curvine-client/src/block/block_reader.rs 的 acquire_read_chunk_flight 增加了 transfer_timeout_ms 超时保护,避免异常情况下无限等待(commit 5a7807c)。
| self.expired_chunks | ||
| .fetch_add(expired.len() as u64, Ordering::Relaxed); | ||
| for chunk_id in expired { | ||
| self.invalidate(chunk_id); | ||
| } |
There was a problem hiding this comment.
The cleanup_expired method on CacheManager double-counts expirations. When a chunk is found expired during get, the counter is incremented and invalidate is called (lines 188-195). Later, if cleanup_expired runs and tries the same chunk again, it will find it already removed from entries (so invalidate is a no-op), but it still increments expired_chunks for each chunk in the initial expired list (line 283-284), even though those chunks may already have been counted on get. Conversely, the invalidations counter will not double-count (because invalidate does entries.remove which returns None the second time), but expired_chunks can. This causes the expired_chunks metric to be inflated.
| self.expired_chunks | |
| .fetch_add(expired.len() as u64, Ordering::Relaxed); | |
| for chunk_id in expired { | |
| self.invalidate(chunk_id); | |
| } | |
| let mut newly_expired: u64 = 0; | |
| for chunk_id in expired { | |
| // Only count this as a new expiration if the entry is still present. | |
| if self.entries.get(&chunk_id).is_some() { | |
| newly_expired += 1; | |
| } | |
| self.invalidate(chunk_id); | |
| } | |
| if newly_expired > 0 { | |
| self.expired_chunks | |
| .fetch_add(newly_expired, Ordering::Relaxed); | |
| } |
There was a problem hiding this comment.
认可,已修复。cleanup_expired 改为仅在 invalidate 实际移除成功时累加 expired_chunks;同时 invalidate 返回布尔值用于去重计数(commit 5a7807c)。
| let read_chunk_flights = | ||
| FastDashMap::with_capacity(conf.client.read_chunk_cache_capacity.min(65536) as usize); |
There was a problem hiding this comment.
The read_chunk_flights map in FsContext is initialized with a capacity derived from read_chunk_cache_capacity (line 125: conf.client.read_chunk_cache_capacity.min(65536) as usize). This value refers to the number of entries in the chunk cache (e.g., default 32768), not the number of in-flight concurrent reads. An in-flight map with 32768 pre-allocated slots is likely far larger than needed and wastes memory proportional to the configuration of the chunk cache size. The flight map only needs to hold one entry per concurrent reader, which is typically much smaller (bounded by the number of open readers). Consider using a small fixed capacity or leaving it at the default (0).
| let read_chunk_flights = | |
| FastDashMap::with_capacity(conf.client.read_chunk_cache_capacity.min(65536) as usize); | |
| let read_chunk_flights = FastDashMap::new(); |
There was a problem hiding this comment.
认可,已修复。read_chunk_flights 初始化改为默认容量,不再按 read_chunk_cache_capacity 预分配,减少无效内存占用(commit 5a7807c)。
| if !p2p_service | ||
| .sync_runtime_policy_from_master(version, peer_whitelist, tenant_whitelist, signature) | ||
| .await | ||
| { | ||
| return err_box!("failed to apply p2p runtime policy from master"); | ||
| } |
There was a problem hiding this comment.
The sync_p2p_runtime_policy in curvine_filesystem.rs (line 358) returns an error (err_box!) when sync_runtime_policy_from_master returns false. However, the call site in fs_context.rs (line 311) only logs a warning when this fails. This means a signature mismatch or an application failure will produce a warning each time the sync interval fires (every 100ms in the tests). Returning false from sync_runtime_policy_from_master can be a legitimate expected condition (e.g., wrong key configured), and consistently converting it to an error and logging it as a warning at every sync cycle is noisy and may mask the root cause. Consider separating "policy rejected" (expected, track via counter) from "transport/application failure" (unexpected, worth a warning).
| if !p2p_service | |
| .sync_runtime_policy_from_master(version, peer_whitelist, tenant_whitelist, signature) | |
| .await | |
| { | |
| return err_box!("failed to apply p2p runtime policy from master"); | |
| } | |
| let _applied = p2p_service | |
| .sync_runtime_policy_from_master(version, peer_whitelist, tenant_whitelist, signature) | |
| .await; |
There was a problem hiding this comment.
认可,已修复。sync_p2p_runtime_policy 不再把 sync_runtime_policy_from_master=false 升级为 error,避免周期性 warn 噪声;同步调用保留为 best-effort(commit 5a7807c)。
| fn bump_version_epoch(&mut self) { | ||
| self.version_epoch = self.version_epoch.max(1).saturating_add(1); | ||
| } |
There was a problem hiding this comment.
In bump_version_epoch, the implementation is self.version_epoch = self.version_epoch.max(1).saturating_add(1). The .max(1) guard is unnecessary after the initial construction guarantees version_epoch >= 1. More importantly, for a newly deserialized InodeFile that was persisted before this PR (and thus has version_epoch = 0 due to #[serde(default)]), the first call to bump_version_epoch will produce 2 (max(0,1)+1), skipping 1. After the second call it produces 3, etc. This is not incorrect for correctness purposes (the epoch is monotonically increasing), but the skip from 0→2 means the deserialized-and-immediately-bumped file has epoch 2, while a brand-new file created after this PR has epoch 1. Consumers testing epoch >= 1 would see both as valid, so the safety guarantee holds. This is minor but worth noting for clarity.
There was a problem hiding this comment.
认可,已修复。bump_version_epoch 调整为 legacy 0 -> 1,正常路径仍递增;并补充了对应回归测试(commit 5a7807c)。
| pub fn sign_p2p_policy( | ||
| secret: &str, | ||
| version: u64, | ||
| peer_whitelist: &[String], | ||
| tenant_whitelist: &[String], | ||
| ) -> String { | ||
| if secret.trim().is_empty() { | ||
| return String::new(); | ||
| } | ||
| let mut hasher = Sha256::new(); | ||
| Self::update_signing_segment(&mut hasher, secret); | ||
| Self::update_signing_segment(&mut hasher, &version.to_string()); | ||
| Self::update_signing_segment(&mut hasher, &peer_whitelist.len().to_string()); | ||
| for peer in peer_whitelist { | ||
| Self::update_signing_segment(&mut hasher, peer); | ||
| } | ||
| Self::update_signing_segment(&mut hasher, &tenant_whitelist.len().to_string()); | ||
| for tenant in tenant_whitelist { | ||
| Self::update_signing_segment(&mut hasher, tenant); | ||
| } | ||
| let digest = hasher.finalize(); | ||
| digest.iter().map(|v| format!("{:02x}", v)).collect() | ||
| } |
There was a problem hiding this comment.
The sign_p2p_policy function uses SHA-256 as a plain cryptographic hash rather than an HMAC (keyed-message authentication code). The function name (sign_p2p_policy) and the configuration field name (policy_hmac_key) both imply HMAC semantics, but the actual implementation feeds the secret directly into the hash alongside the data as a simple prefix. This construction (known as "secret-prefix hashing") is vulnerable to length-extension attacks and does not provide the security guarantees of a proper HMAC. The standard Rust HMAC crate (hmac + sha2) is already an indirect dependency via libp2p. Using Hmac<Sha256> instead would be semantically correct and secure.
There was a problem hiding this comment.
认可,已修复。sign_p2p_policy 改为标准 HMAC-SHA256(hmac + sha2),并新增测试确认不再使用 legacy prefix-hash 语义(commit 5a7807c)。
| if self.provider_publish_interval >= self.provider_ttl { | ||
| return Err(Error::new( | ||
| ErrorKind::InvalidInput, | ||
| "client.p2p.provider_publish_interval must be less than client.p2p.provider_ttl", | ||
| ) | ||
| .into()); | ||
| } |
There was a problem hiding this comment.
The ClientP2pConf::init validation at line 124 checks provider_publish_interval >= provider_ttl, but it does this comparison after computing both from string representations. In the Default implementation, provider_ttl is initialized to Duration::default() (i.e., zero) before init() is called, but the actual string-parsed values are set inside init(). The comparison on line 124 is correctly after both are parsed, so this is fine in isolation. However, the test client_p2p_conf_rejects_publish_interval_not_less_than_ttl constructs a conf where provider_ttl_str = "30s" and provider_publish_interval_str = "30s", meaning publish_interval >= ttl should trigger the error. This test is correct. No bug here, but note that a zero provider_ttl (e.g., provider_ttl_str = "0s") would not be caught by this validation. Consider adding an explicit check for provider_ttl.is_zero() similar to how cache_ttl is validated.
There was a problem hiding this comment.
认可,已修复。ClientP2pConf::init 新增 provider_ttl.is_zero() 显式校验,并补充 0s 场景测试(commit 5a7807c)。
|
|
||
| pub fn with_link(id: i64, time: i64, target: impl Into<String>, mode: u32) -> Self { | ||
| Self { | ||
| id, | ||
| file_type: FileType::Link, | ||
| mtime: time, | ||
| atime: time, | ||
| version_epoch: 1, |
There was a problem hiding this comment.
The version_epoch field on InodeFile is initialized to 1 in all three constructor paths (lines 63, 85, 124). For a symlink (the third constructor), the version_epoch concept is not meaningful because symlinks are not versioned files. Starting it at 1 instead of 0 for a symlink is benign but semantically inconsistent. More importantly, it means a symlink inode always reports version_epoch >= 1 just like a real file, which could confuse consumers that use version_epoch to detect stale caches.
There was a problem hiding this comment.
这条我先不采纳:当前对 version_epoch 采用统一语义(file/link 均可保证非 0)是有意设计,避免分支处理复杂度。该项不影响本次 P2P 功能正确性与回归结果(commit 5a7807c)。
|
已完成本轮 review comment 的核实、修复和回归,结论如下:
本次修复点(摘要):
验证结果:
如需,我可以继续按 review 线程逐条回复对应 comment 并标注 fix commit/hash。 |
|
It is recommended that this feature be enabled through a switch, and it is turned off by default. When turned off, it will not affect the existing logic. |
Core functionality summary
curvine-client/src/p2p) with service/cache/discovery/transfer layering.Current stage scope (incremental)
Follow-up plan