Skip to content

feat(p2p): add client-side p2p read acceleration with runtime policy#671

Open
jlon wants to merge 6 commits intoCurvineIO:mainfrom
jlon:feature/p2p
Open

feat(p2p): add client-side p2p read acceleration with runtime policy#671
jlon wants to merge 6 commits intoCurvineIO:mainfrom
jlon:feature/p2p

Conversation

@jlon
Copy link
Contributor

@jlon jlon commented Feb 28, 2026

Core functionality summary

  • Introduces a client-side P2P read acceleration module (curvine-client/src/p2p) with service/cache/discovery/transfer layering.
  • Adds chunk-level cache manager with TTL, capacity eviction, integrity checksum, mtime validation, and corruption/mismatch accounting.
  • Integrates libp2p-based transport/discovery (request/response + mDNS + DHT) plus runtime flow controls (QPS, inflight windows, adaptive behavior).
  • Adds master-driven runtime policy controls for peer/tenant whitelist and signed policy version transitions.
  • Extends P2P metrics and trace context for observability and rollout safety.

Current stage scope (incremental)

  • Focused on read-path acceleration and policy synchronization foundation.
  • Includes core integration across client/common/master plus targeted tests and production-gate script.
  • Does not yet cover full production hardening beyond current test gate and staged policy rollout controls.

Follow-up plan

  1. Expand large-scale and chaos scenarios (peer churn, high parallelism, network degradation).
  2. Tune adaptive scheduling and cache policy based on benchmark/telemetry feedback.
  3. Harden rollout/rollback tooling and operational docs for wider production adoption.

- implement client-to-client p2p read path with cache sharing and fallback

- add runtime policy sync, signature verification, and policy metrics

- add p2p observability, e2e/minicluster coverage, and production gate script
@jlon
Copy link
Contributor Author

jlon commented Feb 28, 2026

Conflict resolved on feature/p2p by rebasing onto latest main.

  • Resolved merge conflict in .gitignore (kept both docs/ and pycache).
  • Completed rebase and force-pushed updated branch.
  • Verification passed:
    • cargo fmt --all
    • cargo check -q

Please re-run CI for PR validation.

@jlon jlon marked this pull request as ready for review February 28, 2026 07:07
@lzjqsdd lzjqsdd requested a review from Copilot March 2, 2026 03:28
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a client-side P2P read acceleration pipeline for the Curvine distributed filesystem. It adds libp2p-based chunk transfer and discovery, a TTL/LRU cache manager with checksum integrity, a master-driven runtime policy system for peer/tenant whitelisting with signed policy versions, and extended observability (metrics, trace context). The implementation is focused on the read path and policy synchronization foundation.

Changes:

  • Adds curvine-client/src/p2p/ module with service.rs, cache_manager.rs, discovery.rs, transfer.rs implementing the full P2P stack (libp2p transport, chunk cache, policy enforcement).
  • Introduces version_epoch tracking on InodeFile/FileStatus for cache invalidation, plus master-side P2P runtime policy RPC (GetP2pPolicy) with HMAC-signed version transitions.
  • Extends ClientMetrics and MasterMetrics with P2P and labeled read-source metrics; adds comprehensive integration tests and a production gate script.

Reviewed changes

Copilot reviewed 35 out of 38 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
curvine-client/src/p2p/service.rs Core P2P service: libp2p swarm, chunk fetch/publish, QPS limiter, runtime policy sync
curvine-client/src/p2p/cache_manager.rs Chunk cache with TTL, LRU eviction, checksum, mtime validation
curvine-client/src/p2p/discovery.rs Discovery snapshot abstraction over mDNS/DHT/bootstrap
curvine-client/src/p2p/transfer.rs ChunkId and transfer request/response types
curvine-client/src/p2p/mod.rs P2P module re-exports and shared sha256_bytes helper
curvine-client/src/block/block_reader.rs Integrates P2P fetch and chunk cache into the block read path
curvine-client/src/file/fs_context.rs Wires P2P service, read chunk cache, and flight dedup into FsContext
curvine-client/src/file/curvine_filesystem.rs Adds sync_p2p_runtime_policy and sync_p2p_metrics
curvine-client/src/client_metrics.rs Adds P2P metrics, labeled read-source counters, and delta computation
curvine-server/src/master/master_metrics.rs Extends get_or_register to handle Gauge/Counter/Histogram metric types correctly
curvine-server/src/master/master_handler.rs Adds GetP2pPolicy RPC handler
curvine-server/src/master/fs/master_filesystem.rs Implements p2p_runtime_policy() with dual-key signature
curvine-server/src/master/meta/inode/inode_file.rs Adds version_epoch field with bump-on-mutate logic
curvine-server/src/master/meta/inode/inode_view.rs Exposes version_epoch in FileStatus
curvine-common/src/utils/common_utils.rs Adds sign_p2p_policy / verify_p2p_policy_signatures
curvine-common/src/conf/client_conf.rs New ClientP2pConf with validation
curvine-common/src/conf/master_conf.rs P2P policy config fields
curvine-common/src/state/file_status.rs Adds version_epoch to FileStatus
curvine-common/src/state/master_info.rs Adds P2P policy fields to MasterInfo
curvine-common/src/fs/rpc_code.rs New GetP2pPolicy = 61 RPC code
curvine-common/proto/master.proto New GetP2pPolicyRequest/Response messages
curvine-common/proto/common.proto Adds version_epoch to FileStatusProto
curvine-common/src/utils/proto_utils.rs Round-trip support for new fields
curvine-common/Cargo.toml Adds sha2 dependency
curvine-client/Cargo.toml Adds libp2p, sha2, serde dependencies
curvine-client/src/file/fs_reader_base.rs Extracts tenant_id/job_id labels for P2P metrics
curvine-client/src/file/mod.rs Re-exports ReadChunkKey
curvine-tests/tests/p2p_read_acceleration_test.rs Integration tests for P2P policy sync, fallback, and disconnect
curvine-client/tests/p2p_e2e_test.rs E2E unit tests for cache reuse and mtime mismatch
orpc/src/common/metrics.rs Adds try_into_counter, try_into_gauge, try_into_gauge_vec, try_into_histogram
scripts/tests/p2p-production-gate.sh Production gate shell script for required P2P tests
Makefile Adds p2p-prod-gate target

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +228 to +244
MetricType::Histogram => {
if label_keys.is_empty() {
if let Ok(metric) = v.clone().try_into_counter() {
return Ok(RegisteredMetric::Counter(metric));
}
if v.clone().try_into_histogram().is_ok() {
return Ok(RegisteredMetric::Skip);
}
}
if let Ok(metric) = v.clone().try_into_counter_vec() {
return Ok(RegisteredMetric::CounterVec(metric));
}
if v.clone().try_into_histogram_vec().is_ok() {
return Ok(RegisteredMetric::Skip);
}
v.try_into_counter_vec().map(RegisteredMetric::CounterVec)
}
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In get_or_register for the Histogram metric type with an empty label_keys slice (lines 229-235), the code first tries try_into_counter() and then try_into_histogram(), returning Skip if the existing metric is a Histogram. However, if the existing metric is a HistogramVec (labeled variant) but label_keys is empty, neither branch matches, so execution falls through to line 243 (v.try_into_counter_vec().map(...)) which will always fail and return an error. The result is that metrics_report will emit a warn! and continue rather than correctly skipping the existing HistogramVec. This is a silent miss case that the test histogram_metric_should_skip_existing_histogram_vec does not cover (that test uses labeled tags, so label_keys is not empty).

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已复核,这条在当前代码下不成立:MetricType::Histogram 分支在 label_keys 为空时会先判 Histogram,非空路径也会判 HistogramVec 并返回 Skip,不会落到 warn。这里本次不改动(commit 5a7807c)。

Comment on lines +301 to +307
async fn acquire_read_chunk_flight(&self, read_key: &ReadChunkKey) -> Option<ReadChunkFlight> {
if !self.fs_context.read_chunk_cache_enabled() {
return None;
}
let lock = self.fs_context.read_chunk_flight_lock(read_key.clone());
let guard = lock.clone().lock_owned().await;
Some((lock, guard))
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The acquire_read_chunk_flight lock in BlockReader::read uses a regular async lock().await with no timeout. If another task holding the flight guard panics or hangs, this lock acquisition will block indefinitely and could cause reader tasks to stall forever. By contrast, the P2P service's own fetch_flight_lock acquisition properly applies transfer_timeout. A timeout should be applied here as well (e.g., using tokio::time::timeout).

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

认可,已修复。在 curvine-client/src/block/block_reader.rsacquire_read_chunk_flight 增加了 transfer_timeout_ms 超时保护,避免异常情况下无限等待(commit 5a7807c)。

Comment on lines +283 to +287
self.expired_chunks
.fetch_add(expired.len() as u64, Ordering::Relaxed);
for chunk_id in expired {
self.invalidate(chunk_id);
}
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup_expired method on CacheManager double-counts expirations. When a chunk is found expired during get, the counter is incremented and invalidate is called (lines 188-195). Later, if cleanup_expired runs and tries the same chunk again, it will find it already removed from entries (so invalidate is a no-op), but it still increments expired_chunks for each chunk in the initial expired list (line 283-284), even though those chunks may already have been counted on get. Conversely, the invalidations counter will not double-count (because invalidate does entries.remove which returns None the second time), but expired_chunks can. This causes the expired_chunks metric to be inflated.

Suggested change
self.expired_chunks
.fetch_add(expired.len() as u64, Ordering::Relaxed);
for chunk_id in expired {
self.invalidate(chunk_id);
}
let mut newly_expired: u64 = 0;
for chunk_id in expired {
// Only count this as a new expiration if the entry is still present.
if self.entries.get(&chunk_id).is_some() {
newly_expired += 1;
}
self.invalidate(chunk_id);
}
if newly_expired > 0 {
self.expired_chunks
.fetch_add(newly_expired, Ordering::Relaxed);
}

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

认可,已修复。cleanup_expired 改为仅在 invalidate 实际移除成功时累加 expired_chunks;同时 invalidate 返回布尔值用于去重计数(commit 5a7807c)。

Comment on lines +124 to +125
let read_chunk_flights =
FastDashMap::with_capacity(conf.client.read_chunk_cache_capacity.min(65536) as usize);
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read_chunk_flights map in FsContext is initialized with a capacity derived from read_chunk_cache_capacity (line 125: conf.client.read_chunk_cache_capacity.min(65536) as usize). This value refers to the number of entries in the chunk cache (e.g., default 32768), not the number of in-flight concurrent reads. An in-flight map with 32768 pre-allocated slots is likely far larger than needed and wastes memory proportional to the configuration of the chunk cache size. The flight map only needs to hold one entry per concurrent reader, which is typically much smaller (bounded by the number of open readers). Consider using a small fixed capacity or leaving it at the default (0).

Suggested change
let read_chunk_flights =
FastDashMap::with_capacity(conf.client.read_chunk_cache_capacity.min(65536) as usize);
let read_chunk_flights = FastDashMap::new();

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

认可,已修复。read_chunk_flights 初始化改为默认容量,不再按 read_chunk_cache_capacity 预分配,减少无效内存占用(commit 5a7807c)。

Comment on lines +354 to +359
if !p2p_service
.sync_runtime_policy_from_master(version, peer_whitelist, tenant_whitelist, signature)
.await
{
return err_box!("failed to apply p2p runtime policy from master");
}
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sync_p2p_runtime_policy in curvine_filesystem.rs (line 358) returns an error (err_box!) when sync_runtime_policy_from_master returns false. However, the call site in fs_context.rs (line 311) only logs a warning when this fails. This means a signature mismatch or an application failure will produce a warning each time the sync interval fires (every 100ms in the tests). Returning false from sync_runtime_policy_from_master can be a legitimate expected condition (e.g., wrong key configured), and consistently converting it to an error and logging it as a warning at every sync cycle is noisy and may mask the root cause. Consider separating "policy rejected" (expected, track via counter) from "transport/application failure" (unexpected, worth a warning).

Suggested change
if !p2p_service
.sync_runtime_policy_from_master(version, peer_whitelist, tenant_whitelist, signature)
.await
{
return err_box!("failed to apply p2p runtime policy from master");
}
let _applied = p2p_service
.sync_runtime_policy_from_master(version, peer_whitelist, tenant_whitelist, signature)
.await;

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

认可,已修复。sync_p2p_runtime_policy 不再把 sync_runtime_policy_from_master=false 升级为 error,避免周期性 warn 噪声;同步调用保留为 best-effort(commit 5a7807c)。

Comment on lines +392 to +394
fn bump_version_epoch(&mut self) {
self.version_epoch = self.version_epoch.max(1).saturating_add(1);
}
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In bump_version_epoch, the implementation is self.version_epoch = self.version_epoch.max(1).saturating_add(1). The .max(1) guard is unnecessary after the initial construction guarantees version_epoch >= 1. More importantly, for a newly deserialized InodeFile that was persisted before this PR (and thus has version_epoch = 0 due to #[serde(default)]), the first call to bump_version_epoch will produce 2 (max(0,1)+1), skipping 1. After the second call it produces 3, etc. This is not incorrect for correctness purposes (the epoch is monotonically increasing), but the skip from 0→2 means the deserialized-and-immediately-bumped file has epoch 2, while a brand-new file created after this PR has epoch 1. Consumers testing epoch >= 1 would see both as valid, so the safety guarantee holds. This is minor but worth noting for clarity.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

认可,已修复。bump_version_epoch 调整为 legacy 0 -> 1,正常路径仍递增;并补充了对应回归测试(commit 5a7807c)。

Comment on lines +61 to +83
pub fn sign_p2p_policy(
secret: &str,
version: u64,
peer_whitelist: &[String],
tenant_whitelist: &[String],
) -> String {
if secret.trim().is_empty() {
return String::new();
}
let mut hasher = Sha256::new();
Self::update_signing_segment(&mut hasher, secret);
Self::update_signing_segment(&mut hasher, &version.to_string());
Self::update_signing_segment(&mut hasher, &peer_whitelist.len().to_string());
for peer in peer_whitelist {
Self::update_signing_segment(&mut hasher, peer);
}
Self::update_signing_segment(&mut hasher, &tenant_whitelist.len().to_string());
for tenant in tenant_whitelist {
Self::update_signing_segment(&mut hasher, tenant);
}
let digest = hasher.finalize();
digest.iter().map(|v| format!("{:02x}", v)).collect()
}
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sign_p2p_policy function uses SHA-256 as a plain cryptographic hash rather than an HMAC (keyed-message authentication code). The function name (sign_p2p_policy) and the configuration field name (policy_hmac_key) both imply HMAC semantics, but the actual implementation feeds the secret directly into the hash alongside the data as a simple prefix. This construction (known as "secret-prefix hashing") is vulnerable to length-extension attacks and does not provide the security guarantees of a proper HMAC. The standard Rust HMAC crate (hmac + sha2) is already an indirect dependency via libp2p. Using Hmac<Sha256> instead would be semantically correct and secure.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

认可,已修复。sign_p2p_policy 改为标准 HMAC-SHA256hmac + sha2),并新增测试确认不再使用 legacy prefix-hash 语义(commit 5a7807c)。

Comment on lines +124 to +130
if self.provider_publish_interval >= self.provider_ttl {
return Err(Error::new(
ErrorKind::InvalidInput,
"client.p2p.provider_publish_interval must be less than client.p2p.provider_ttl",
)
.into());
}
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ClientP2pConf::init validation at line 124 checks provider_publish_interval >= provider_ttl, but it does this comparison after computing both from string representations. In the Default implementation, provider_ttl is initialized to Duration::default() (i.e., zero) before init() is called, but the actual string-parsed values are set inside init(). The comparison on line 124 is correctly after both are parsed, so this is fine in isolation. However, the test client_p2p_conf_rejects_publish_interval_not_less_than_ttl constructs a conf where provider_ttl_str = "30s" and provider_publish_interval_str = "30s", meaning publish_interval >= ttl should trigger the error. This test is correct. No bug here, but note that a zero provider_ttl (e.g., provider_ttl_str = "0s") would not be caught by this validation. Consider adding an explicit check for provider_ttl.is_zero() similar to how cache_ttl is validated.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

认可,已修复。ClientP2pConf::init 新增 provider_ttl.is_zero() 显式校验,并补充 0s 场景测试(commit 5a7807c)。

Comment on lines 117 to +124

pub fn with_link(id: i64, time: i64, target: impl Into<String>, mode: u32) -> Self {
Self {
id,
file_type: FileType::Link,
mtime: time,
atime: time,
version_epoch: 1,
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version_epoch field on InodeFile is initialized to 1 in all three constructor paths (lines 63, 85, 124). For a symlink (the third constructor), the version_epoch concept is not meaningful because symlinks are not versioned files. Starting it at 1 instead of 0 for a symlink is benign but semantically inconsistent. More importantly, it means a symlink inode always reports version_epoch >= 1 just like a real file, which could confuse consumers that use version_epoch to detect stale caches.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这条我先不采纳:当前对 version_epoch 采用统一语义(file/link 均可保证非 0)是有意设计,避免分支处理复杂度。该项不影响本次 P2P 功能正确性与回归结果(commit 5a7807c)。

@jlon
Copy link
Contributor Author

jlon commented Mar 2, 2026

已完成本轮 review comment 的核实、修复和回归,结论如下:

  • 已认可并修复:2/3/4/5/6/7/8
  • 不采纳:1、9(当前代码路径下不成立或属语义偏好,不是功能缺陷)

本次修复点(摘要):

  • read_chunk_flight 增加超时保护,避免潜在无限等待。
  • read_chunk_flights 改为默认容量初始化,避免按 chunk cache 容量过度预分配。
  • policy sync 返回 false 时不再升级为周期性 warn error,减少噪音。
  • expired_chunks 计数改为“仅实际失效移除时计数”,避免并发重复统计。
  • version_epoch 兼容 legacy 0:首次 bump 改为 0 -> 1(并补回归测试)。
  • p2p policy 签名改为标准 HMAC-SHA256(并补回归测试,确认不再是 legacy prefix hash)。
  • 增加 provider_ttl > 0 显式配置校验与测试。

验证结果:

  • cargo clippy -p curvine-common -p curvine-client -p curvine-server --all-targets -- -D warnings 通过
  • cargo test -p curvine-common --lib --tests 通过
  • cargo test -p curvine-client --lib --tests 通过
  • cargo test -p curvine-server --lib --tests 通过
  • cargo test -p curvine-client --test p2p_e2e_test 通过
  • 完整 P2P 门禁链路 bash scripts/tests/p2p-production-gate.sh 通过(含 curvine-tests minicluster 场景)

如需,我可以继续按 review 线程逐条回复对应 comment 并标注 fix commit/hash。

@jlon jlon changed the title feat(p2p): stage 1 incremental p2p read acceleration pipeline feat(p2p): add client-side p2p read acceleration with runtime policy Mar 2, 2026
@szbr9486
Copy link
Contributor

szbr9486 commented Mar 3, 2026

It is recommended that this feature be enabled through a switch, and it is turned off by default. When turned off, it will not affect the existing logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants