Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 130 additions & 4 deletions modules/twap-monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,14 @@ fn poll_all_watches(block: &types::Block) -> Result<(), HostError> {
logging::Level::Info,
&format!("poll {key} -> {}", outcome_label(&outcome)),
);
if let PollOutcome::Ready { order, signature } = outcome {
submit_ready(block.chain_id, owner, &order, signature, &key, now_epoch_s)?;
match outcome {
PollOutcome::Ready { order, signature } => {
submit_ready(block.chain_id, owner, &order, signature, &key, now_epoch_s)?;
}
non_ready => {
apply_watch_update(outcome_to_update(&non_ready), &key)?;
}
}
// BLEU-830 will persist next_block / next_epoch / remove the watch
// on the non-Ready arms.
}
Ok(())
}
Expand Down Expand Up @@ -494,6 +497,80 @@ fn outcome_label(o: &PollOutcome) -> &'static str {
}
}

// ---- BLEU-830: PollOutcome lifecycle dispatch ----

/// What `apply_watch_update` should do for a given outcome. Kept as a
/// data type (rather than running the effects directly) so the decision
/// is host-free testable; `apply_watch_update` is the impure other half.
#[derive(Debug, Eq, PartialEq)]
enum WatchUpdate {
/// Leave the store untouched. Next block re-polls the watch.
NoOp,
/// Write `next_block:` so subsequent polls skip until the given
/// block number is reached.
SetNextBlock(u64),
/// Write `next_epoch:` so subsequent polls skip until the given
/// Unix-seconds timestamp is reached.
SetNextEpoch(u64),
/// Delete the watch and any stale gate keys — TWAP completed,
/// cancelled, or otherwise irrecoverable.
DropWatch,
}

/// Pure mapping from a non-Ready `PollOutcome` to the lifecycle effect
/// the BLEU-830 contract specifies. `Ready` is handled by the submit
/// path (BLEU-828) and is rejected here so a caller cannot accidentally
/// erase the watch when an order was actually produced.
fn outcome_to_update(outcome: &PollOutcome) -> WatchUpdate {
match outcome {
PollOutcome::Ready { .. } => WatchUpdate::NoOp, // belt-and-braces; caller routes Ready to submit_ready
PollOutcome::TryNextBlock => WatchUpdate::NoOp,
PollOutcome::TryOnBlock(n) => WatchUpdate::SetNextBlock(*n),
PollOutcome::TryAtEpoch(t) => WatchUpdate::SetNextEpoch(*t),
PollOutcome::DontTryAgain => WatchUpdate::DropWatch,
}
}

fn apply_watch_update(update: WatchUpdate, watch_key: &str) -> Result<(), HostError> {
match update {
WatchUpdate::NoOp => Ok(()),
WatchUpdate::SetNextBlock(n) => {
if let Some((owner_hex, hash_hex)) = parse_watch_key(watch_key) {
local_store::set(
&format!("next_block:{owner_hex}:{hash_hex}"),
&n.to_le_bytes(),
)?;
}
Ok(())
}
WatchUpdate::SetNextEpoch(t) => {
if let Some((owner_hex, hash_hex)) = parse_watch_key(watch_key) {
local_store::set(
&format!("next_epoch:{owner_hex}:{hash_hex}"),
&t.to_le_bytes(),
)?;
}
Ok(())
}
WatchUpdate::DropWatch => {
local_store::delete(watch_key)?;
// Best-effort: drop any stale gates the previous lifecycle
// step may have written. `delete` is a no-op for absent keys
// already, so the `let _` discards a benign error if the
// underlying store complains.
if let Some((owner_hex, hash_hex)) = parse_watch_key(watch_key) {
let _ = local_store::delete(&format!("next_block:{owner_hex}:{hash_hex}"));
let _ = local_store::delete(&format!("next_epoch:{owner_hex}:{hash_hex}"));
}
logging::log(
logging::Level::Info,
&format!("dropped watch {watch_key}"),
);
Ok(())
}
}
}

// ---- key conventions shared with BLEU-830 ----

fn watch_key(owner: &Address, params_hash: &B256) -> String {
Expand Down Expand Up @@ -929,4 +1006,53 @@ mod tests {
};
assert_eq!(classify_submit_error(&err), RetryAction::TryNextBlock);
}

// ---- BLEU-830: PollOutcome -> lifecycle effect ----

#[test]
fn outcome_try_next_block_is_no_op() {
assert_eq!(
outcome_to_update(&PollOutcome::TryNextBlock),
WatchUpdate::NoOp,
);
}

#[test]
fn outcome_try_on_block_sets_next_block_gate() {
assert_eq!(
outcome_to_update(&PollOutcome::TryOnBlock(12_345)),
WatchUpdate::SetNextBlock(12_345),
);
}

#[test]
fn outcome_try_at_epoch_sets_next_epoch_gate() {
assert_eq!(
outcome_to_update(&PollOutcome::TryAtEpoch(1_700_000_000)),
WatchUpdate::SetNextEpoch(1_700_000_000),
);
}

#[test]
fn outcome_dont_try_again_drops_watch() {
assert_eq!(
outcome_to_update(&PollOutcome::DontTryAgain),
WatchUpdate::DropWatch,
);
}

#[test]
fn outcome_ready_is_handled_by_submit_path_not_lifecycle() {
// Ready never reaches outcome_to_update in poll_all_watches (the
// match routes it to submit_ready). The mapping is a safety net:
// if a future refactor accidentally pipes Ready through here, the
// watch must NOT be erased — submit_ready owns the post-submit
// book-keeping.
let order = Box::new(submittable_order());
let outcome = PollOutcome::Ready {
order,
signature: Bytes::new(),
};
assert_eq!(outcome_to_update(&outcome), WatchUpdate::NoOp);
}
}