fix: dht pruning & mailbox removal#1351
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
📝 WalkthroughWalkthroughReplaces fallible actor sends with fire-and-forget Changes
Sequence DiagramsequenceDiagram
rect rgba(200,200,255,0.5)
participant DP as DocumentPublisher
participant NI as NetInterface
participant KS as Kademlia Store
end
DP->>DP: compute ContentHash and record in dht_keys
DP->>NI: NetCommand::DhtPutRecord(hash)
NI->>KS: put_record(hash)
alt put_record Ok
KS-->>NI: Ok
else MaxRecords error
NI->>KS: prune_expired_dht_records()
KS-->>NI: removed expired records
NI->>KS: put_record(hash) (retry)
KS-->>NI: Ok or Err
end
Note over DP,NI: On E3 completion
DP->>NI: NetCommand::DhtRemoveRecords { keys }
NI->>KS: remove records by key
KS-->>NI: removal result
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (4)
crates/net/src/net_interface.rs (1)
846-846: Tests duplicate the production predicate rather than exercisingprune_expired_dht_records.Both tests inline
store.retain(|_, r| r.expires.map_or(true, |e| e > now))— the exact predicate from the production function — rather than going throughprune_expired_dht_records. If the production predicate drifts, these tests won't catch it. Consider extracting the predicate into a named function, or testing through a thin wrapper that accepts a&mut MemoryStore, so the production logic is actually exercised.Also applies to: 899-899
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/net/src/net_interface.rs` at line 846, Tests are duplicating the closure used in production (store.retain(|_, r| r.expires.map_or(true, |e| e > now))) instead of exercising prune_expired_dht_records; extract the predicate into a named function (e.g., record_not_expired(now, &Record) or Record::is_not_expired) and use that function in prune_expired_dht_records, then update tests to call prune_expired_dht_records (or a thin wrapper that accepts &mut MemoryStore) rather than reimplementing the retain closure so the production logic is actually exercised and cannot drift unnoticed.crates/events/src/eventstore_router.rs (1)
205-207:MAILBOX_LIMIT_LARGEbenefits query messages but is moot forStoreEventRequested.
Context::set_mailbox_capacitydefaults to 16 messages and can be increased viaset_mailbox_capacity(), but this does not apply toAddr::do_send, which bypasses the mailbox queue limit. Because allStoreEventRequestedsenders now usedo_send, the raised capacity here has no effect on that path. It still benefits theEventStoreQueryBymessages (which originate from code that may still usetry_send). Worth a clarifying comment in the source to avoid future confusion.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/events/src/eventstore_router.rs` around lines 205 - 207, Add a clarifying comment inside the started() method next to the set_mailbox_capacity(MAILBOX_LIMIT_LARGE) call: explain that set_mailbox_capacity increases the Context mailbox limit (affecting messages sent via try_send like EventStoreQueryBy) but does not apply to messages sent with Addr::do_send, and therefore the increased capacity is moot for StoreEventRequested because all Senders for that message use do_send; keep the capacity change for other message types but document the distinction using the symbols started, set_mailbox_capacity, MAILBOX_LIMIT_LARGE, StoreEventRequested, EventStoreQueryBy, Addr::do_send, and try_send so future readers won’t be confused.crates/events/src/eventstore.rs (1)
101-103: Double-logging the same message beforepanic!is redundant.
error!("Event storage failed: {e}")immediately beforepanic!("Unrecoverable event storage failure: {e}")emits two nearly identical messages. Thepanic!output reaches stderr and any configured panic hook;tracing::error!is only meaningful here if the structured log sink is different from stderr. Consider dropping theerror!call or folding the message into the panic string.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/events/src/eventstore.rs` around lines 101 - 103, Remove the redundant tracing error log before panicking: delete the error!("Event storage failed: {e}") call in eventstore.rs and keep a single descriptive panic message (e.g., panic!("Unrecoverable event storage failure: {e}")) so the failure is reported once; alternatively, if structured logging is required, keep error! and make the subsequent panic use a shorter or no message—either way eliminate the duplicate messages emitted by error! and panic!.crates/events/src/enclave_event/publish_document/mod.rs (1)
21-21: Expiry reduction from 30 → 7 days is intentional and clean.The test suite passes explicit
expires_atvalues so no existing assertions are affected. Consider adding a test covering theNone(default) path to validate the 7-day expiry calculation, which is currently untested.✅ Suggested test for default expiry
+ #[test] + fn test_meta_default_expiry() { + let before = Utc::now(); + let meta = DocumentMeta::new(E3id::new("1", 1), DocumentKind::TrBFV, vec![], None); + let after = Utc::now(); + let min_expected = before + Duration::days(DEFAULT_KADEMLIA_EXPIRY_DAYS); + let max_expected = after + Duration::days(DEFAULT_KADEMLIA_EXPIRY_DAYS); + assert!(meta.expires_at >= min_expected && meta.expires_at <= max_expected); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/events/src/enclave_event/publish_document/mod.rs` at line 21, Add a unit test that exercises the code path where expires_at is None to assert the default expiry uses DEFAULT_KADEMLIA_EXPIRY_DAYS (7 days). Locate the logic that computes the Kademlia expiry in publish_document (or the helper that reads DEFAULT_KADEMLIA_EXPIRY_DAYS) and write a test that constructs a document with expires_at = None, invokes the publish/compute expiry routine, and asserts the computed expires_at is approximately now + DEFAULT_KADEMLIA_EXPIRY_DAYS (allowing small clock skew). Ensure the test imports DEFAULT_KADEMLIA_EXPIRY_DAYS so it remains tied to the constant.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/events/src/eventstore.rs`:
- Around line 100-105: The current unconditional panic in the caller of
handle_store_event_requested causes snapshot-replay duplicate-timestamp
overflows (triggered by storage_errors > MAX_STORAGE_ERRORS) to crash the actor;
change the error handling so only hard storage failures (e.g., failures from
log.append / index.insert) cause panic. Update handle_store_event_requested to
return a distinct error variant or error kind for the MAX_STORAGE_ERRORS path
(e.g., StorageError::TooManyDuplicateTimestamps or a RecoverableOverflow marker)
while keeping hard failures as StorageError::IO/Append/Index; then change the
caller (the if let Err(e) = self.handle_store_event_requested(msg)) to match on
that error kind and only call panic! for the hard storage failures, otherwise
log a warning and continue during snapshot recovery. Ensure matching uses the
function name handle_store_event_requested, the constant MAX_STORAGE_ERRORS, and
error variants you add so the intent is explicit.
In `@crates/net/src/net_interface.rs`:
- Around line 661-682: Add descriptive logging for the MaxRecords retry path:
inside the Err(kad::store::Error::MaxRecords) match arm, emit an error/warn
indicating the DHT store is full and that prune_expired_dht_records(swarm) is
being invoked (include correlation_id and any relevant record key/metadata if
available), and in the inner Err(error) branch (the put_record retry failure)
log the failure with error details and correlation_id before sending
NetEvent::DhtPutRecordError; update the messages near prune_expired_dht_records,
swarm.behaviour_mut().kademlia.put_record(...) and the Err(error) arm so
operators can distinguish store-full vs transient put failures.
- Line 822: Replace direct subtraction Instant::now() - Duration::from_secs(1)
with a checked subtraction to avoid panic on low-uptime platforms: call
Instant::now().checked_sub(Duration::from_secs(1)) and fall back to a safe
Instant (e.g., Instant::now()) when None is returned. Update both occurrences
that set the variable past (the Instant::now() - Duration::from_secs(1) at the
top of the test and the similar use at line 872) so they use
checked_sub(...).unwrap_or(Instant::now()) (or unwrap_or_else(||
Instant::now())) instead.
---
Nitpick comments:
In `@crates/events/src/enclave_event/publish_document/mod.rs`:
- Line 21: Add a unit test that exercises the code path where expires_at is None
to assert the default expiry uses DEFAULT_KADEMLIA_EXPIRY_DAYS (7 days). Locate
the logic that computes the Kademlia expiry in publish_document (or the helper
that reads DEFAULT_KADEMLIA_EXPIRY_DAYS) and write a test that constructs a
document with expires_at = None, invokes the publish/compute expiry routine, and
asserts the computed expires_at is approximately now +
DEFAULT_KADEMLIA_EXPIRY_DAYS (allowing small clock skew). Ensure the test
imports DEFAULT_KADEMLIA_EXPIRY_DAYS so it remains tied to the constant.
In `@crates/events/src/eventstore_router.rs`:
- Around line 205-207: Add a clarifying comment inside the started() method next
to the set_mailbox_capacity(MAILBOX_LIMIT_LARGE) call: explain that
set_mailbox_capacity increases the Context mailbox limit (affecting messages
sent via try_send like EventStoreQueryBy) but does not apply to messages sent
with Addr::do_send, and therefore the increased capacity is moot for
StoreEventRequested because all Senders for that message use do_send; keep the
capacity change for other message types but document the distinction using the
symbols started, set_mailbox_capacity, MAILBOX_LIMIT_LARGE, StoreEventRequested,
EventStoreQueryBy, Addr::do_send, and try_send so future readers won’t be
confused.
In `@crates/events/src/eventstore.rs`:
- Around line 101-103: Remove the redundant tracing error log before panicking:
delete the error!("Event storage failed: {e}") call in eventstore.rs and keep a
single descriptive panic message (e.g., panic!("Unrecoverable event storage
failure: {e}")) so the failure is reported once; alternatively, if structured
logging is required, keep error! and make the subsequent panic use a shorter or
no message—either way eliminate the duplicate messages emitted by error! and
panic!.
In `@crates/net/src/net_interface.rs`:
- Line 846: Tests are duplicating the closure used in production
(store.retain(|_, r| r.expires.map_or(true, |e| e > now))) instead of exercising
prune_expired_dht_records; extract the predicate into a named function (e.g.,
record_not_expired(now, &Record) or Record::is_not_expired) and use that
function in prune_expired_dht_records, then update tests to call
prune_expired_dht_records (or a thin wrapper that accepts &mut MemoryStore)
rather than reimplementing the retain closure so the production logic is
actually exercised and cannot drift unnoticed.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
crates/events/src/sequencer.rs (1)
105-120: Considertokio::time::timeoutinstead of a manual polling loop.The current loop works correctly but a 30-second stall on failure will visibly slow CI. The idiomatic replacement is more concise and fails immediately once the deadline passes.
♻️ Suggested refactor
- let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30); - loop { - let events: Vec<EnclaveEvent> = history.send(GetEvents::new()).await?; - if events.len() >= count { - let elapsed = start.elapsed(); - println!("All {count} events arrived in {elapsed:?}"); - assert_eq!(events.len(), count, "all events must arrive"); - break; - } - if tokio::time::Instant::now() > deadline { - let got = events.len(); - panic!("test timed out — only {got}/{count} events arrived after 30s"); - } - // Yield to let the actor system make progress. - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - } + tokio::time::timeout(std::time::Duration::from_secs(30), async { + loop { + let events: Vec<EnclaveEvent> = history.send(GetEvents::new()).await?; + if events.len() >= count { + let elapsed = start.elapsed(); + println!("All {count} events arrived in {elapsed:?}"); + assert_eq!(events.len(), count, "all events must arrive"); + return Ok::<_, anyhow::Error>(()); + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + }) + .await + .expect(&format!("test timed out — fewer than {count} events arrived after 30s"))?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/events/src/sequencer.rs` around lines 105 - 120, Replace the manual polling loop and deadline logic with tokio::time::timeout to fail fast: wrap the async work that repeatedly calls history.send(GetEvents::new()).await, checks events.len() against count, prints elapsed and asserts equality (using start.elapsed(), println! and assert_eq!), and sleeps between polls in a tokio::time::timeout(std::time::Duration::from_secs(30), async { ... }).await; on timeout (Err) panic with the same message that reports got/count, and on Ok(Err(e)) propagate the inner error; ensure you keep references to history, count, start, GetEvents and the sleep/backoff behavior inside the wrapped async block.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/events/src/sequencer.rs`:
- Around line 93-96: The test it_handles_event_burst_without_overflow calls
EventSystem::new("test-burst") but EventSystem::new() takes no arguments; remove
the string argument and call EventSystem::new().with_fresh_bus() (or replace
with an appropriate constructor like EventSystem::in_mem() / in_mem_from_store()
/ persisted() if you intended a specific backend), ensuring you update the call
site in the test to use the correct constructor (EventSystem::new,
EventSystem::in_mem, etc.) and then chain .with_fresh_bus() on the resulting
instance.
- Around line 30-44: The current fire-and-forget calls (self.bus.do_send in
handle_store_event_response and self.eventstore.do_send in the Handler for
EnclaveEvent<Unsequenced>) can silently drop messages when the recipient actor
has stopped; replace these do_send calls with fallible sends (e.g.,
Addr::try_send / Recipient::try_send or .send().await) and add error handling
that logs tracing::error! (including the message type like StoreEventRequested
and the EnclaveEvent<Unsequenced>) and increments a metric on failure so dropped
messages are observable; ensure this change touches the methods
handle_store_event_response and the impl Handler for EnclaveEvent<Unsequenced>
where StoreEventRequested is constructed.
---
Nitpick comments:
In `@crates/events/src/sequencer.rs`:
- Around line 105-120: Replace the manual polling loop and deadline logic with
tokio::time::timeout to fail fast: wrap the async work that repeatedly calls
history.send(GetEvents::new()).await, checks events.len() against count, prints
elapsed and asserts equality (using start.elapsed(), println! and assert_eq!),
and sleeps between polls in a
tokio::time::timeout(std::time::Duration::from_secs(30), async { ... }).await;
on timeout (Err) panic with the same message that reports got/count, and on
Ok(Err(e)) propagate the inner error; ensure you keep references to history,
count, start, GetEvents and the sleep/backoff behavior inside the wrapped async
block.
There was a problem hiding this comment.
🧹 Nitpick comments (3)
crates/net/src/document_publisher.rs (3)
133-146: Consider adding a test for the new E3-completion pruning path.The existing test suite covers
PublishDocumentRequestedretries andGossipPublish, but there is no test verifying thatE3RequestCompletetriggers aDhtRemoveRecordscommand containing the previously tracked keys. Given this is the central fix in the PR, a targeted#[actix::test]would guard against regressions.Would you like me to draft a test case that: (1) publishes a document to populate
dht_keys, (2) fires anE3RequestCompleteevent, and (3) asserts that aDhtRemoveRecordscommand is received on the channel with the correct keys?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/net/src/document_publisher.rs` around lines 133 - 146, Add an actix test that verifies the E3-completion pruning path: arrange by invoking the publish flow (so DocumentPublisher's dht_keys gets populated for a specific e3_id), then call or dispatch handle_e3_request_complete / send an E3RequestComplete for that e3_id, and finally assert that the publisher sent a NetCommand::DhtRemoveRecords via its tx channel containing the exact keys previously tracked in dht_keys; locate symbols DocumentPublisher::handle_e3_request_complete, dht_keys, tx, and NetCommand::DhtRemoveRecords to implement the test and use the same e3_id and keys as in the publish step.
185-189:ContentHashis computed twice for the same document.The key is computed here (line 185) for
dht_keystracking, then recomputed insidehandle_publish_document_requestedat line 273 from the same bytes. It's a cheap deterministic hash, but the duplication is avoidable by passing the pre-computedkeyinto the async function.♻️ Proposed refactor – pass key through to avoid double hash
- let key = ContentHash::from_content(&msg.value); self.dht_keys .entry(msg.meta.e3_id.clone()) .or_default() - .push(key); + .push(ContentHash::from_content(&msg.value)); let rx = self.rx.clone(); let bus = self.bus.clone(); let topic = self.topic.clone(); trap_fut( EType::IO, &bus.with_ec(&ec), handle_publish_document_requested(tx, rx, msg, topic, bus), )Or alternatively add a
key: ContentHashparameter tohandle_publish_document_requestedand pass it through, eliminating the recomputation at line 273.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/net/src/document_publisher.rs` around lines 185 - 189, Compute ContentHash once and pass it into handle_publish_document_requested instead of recomputing: create the key using ContentHash::from_content(&msg.value) where you currently push into self.dht_keys, then modify handle_publish_document_requested to accept an extra parameter (key: ContentHash) and use that value inside the async block instead of calling ContentHash::from_content again; update the call site to pass the precomputed key and remove the duplicate hash computation.
135-144: Log (or propagate)try_sendfailures so silent pruning drops are visible.
try_sendcan fail withTrySendError::Fullif the command channel is at capacity, silently skipping the entire pruning operation. Given DHT cleanup is the explicit purpose of this PR, a silently swallowed failure is a latent memory-growth risk.Since this is a sync actix handler you can't
.await, but you can spawn a task or at minimum log:♻️ Proposed fix – log on failure, or spawn async send
- let _ = self.tx.try_send(NetCommand::DhtRemoveRecords { keys }); + if let Err(e) = self.tx.try_send(NetCommand::DhtRemoveRecords { keys }) { + warn!("Failed to send DhtRemoveRecords for E3 {}: {}", event.e3_id, e); + }Alternatively, if the channel being full is a realistic concern, spawn a task so backpressure is respected:
- let _ = self.tx.try_send(NetCommand::DhtRemoveRecords { keys }); + let tx = self.tx.clone(); + let e3_id_str = event.e3_id.to_string(); + tokio::spawn(async move { + if let Err(e) = tx.send(NetCommand::DhtRemoveRecords { keys }).await { + warn!("Failed to send DhtRemoveRecords for E3 {}: {}", e3_id_str, e); + } + });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/net/src/document_publisher.rs` around lines 135 - 144, The current pruning silently ignores failures from self.tx.try_send(NetCommand::DhtRemoveRecords { keys }) which can drop cleanup when the channel is full; update the block handling DHT pruning in document_publisher.rs (the if-let that removes entries from self.dht_keys for event.e3_id) to detect TrySendError, log the error (including event.e3_id and keys.len()) when try_send fails, or alternatively spawn an async task to await an async send so backpressure is respected; ensure you reference the NetCommand::DhtRemoveRecords and self.tx.try_send call when adding the logging or spawn logic so failures are no longer silently swallowed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@crates/net/src/document_publisher.rs`:
- Around line 133-146: Add an actix test that verifies the E3-completion pruning
path: arrange by invoking the publish flow (so DocumentPublisher's dht_keys gets
populated for a specific e3_id), then call or dispatch
handle_e3_request_complete / send an E3RequestComplete for that e3_id, and
finally assert that the publisher sent a NetCommand::DhtRemoveRecords via its tx
channel containing the exact keys previously tracked in dht_keys; locate symbols
DocumentPublisher::handle_e3_request_complete, dht_keys, tx, and
NetCommand::DhtRemoveRecords to implement the test and use the same e3_id and
keys as in the publish step.
- Around line 185-189: Compute ContentHash once and pass it into
handle_publish_document_requested instead of recomputing: create the key using
ContentHash::from_content(&msg.value) where you currently push into
self.dht_keys, then modify handle_publish_document_requested to accept an extra
parameter (key: ContentHash) and use that value inside the async block instead
of calling ContentHash::from_content again; update the call site to pass the
precomputed key and remove the duplicate hash computation.
- Around line 135-144: The current pruning silently ignores failures from
self.tx.try_send(NetCommand::DhtRemoveRecords { keys }) which can drop cleanup
when the channel is full; update the block handling DHT pruning in
document_publisher.rs (the if-let that removes entries from self.dht_keys for
event.e3_id) to detect TrySendError, log the error (including event.e3_id and
keys.len()) when try_send fails, or alternatively spawn an async task to await
an async send so backpressure is respected; ensure you reference the
NetCommand::DhtRemoveRecords and self.tx.try_send call when adding the logging
or spawn logic so failures are no longer silently swallowed.
There was a problem hiding this comment.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@crates/events/src/sequencer.rs`:
- Around line 30-44: Both calls currently use fire-and-forget do_send which
drops messages silently; update handle_store_event_response
(self.bus.do_send(event)) and the EnclaveEvent handler
(self.eventstore.do_send(StoreEventRequested::new(...))) to use a send/try_send
variant that returns a Result and explicitly log on the disconnected/closed
case. Concretely, replace do_send with try_send or Addr::send and match the
Err(MailboxError::Closed / MailboxError) branch to emit a tracing::error!
(include context like event type or Sequencer address) so dropped messages are
observable; keep the fire-and-forget behavior only after logging if you still
want no await.
Summary by CodeRabbit
New Features
Improvements
Tests