Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ repository = "https://github.com/nullisLabs/shepherd"

# `cowprotocol` v1.0.0-alpha.3 (the crates.io release the engine
# depends on) was cut from `cowdao-grants/cow-rs` PR #5 at commit
# `1742ffa`. `bleu/cow-rs` main has 18 commits since, including the
# `1742ffa`. `bleu/cow-rs` main has diverged since with: the
# `composable::Proof` width fix (relevant to the TWAP poll path),
# `OrderCreation` zero-from-address fast-fail (closes a MEDIUM
# review finding from PR #5), and the `order_book` / `composable`
# submodule splits. Patching to that commit picks them up without
# waiting for an alpha.4 publish. Drop once `cowprotocol >= 1.0.0-alpha.4`
# ships.
# `OrderCreation` zero-from-address fast-fail, the `order_book` /
# `composable` submodule splits, `OrderPostErrorKind` + `retry_hint()`
# (BLEU-822, the protocol-level retry contract M2 modules dispatch
# on), and `OrderBookApi::with_base_url(chain, base_url)` for barn /
# staging routing (BLEU-823). Patching to that commit picks the lot
# up without waiting for an alpha.4 publish. Drop once
# `cowprotocol >= 1.0.0-alpha.4` ships.
[patch.crates-io]
cowprotocol = { git = "https://github.com/bleu/cow-rs", rev = "c012404ffefc411bff543d2290e19ba7fbef2516" }
cowprotocol = { git = "https://github.com/bleu/cow-rs", rev = "57f5f553ab28c9fff54089daf2d39b4282f3e4dd" }

[profile.dev]
panic = "abort"
Expand Down
206 changes: 190 additions & 16 deletions modules/twap-monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ wit_bindgen::generate!({
use alloy_primitives::{Address, B256, Bytes, U256, keccak256};
use alloy_sol_types::{SolCall, SolError, SolEvent, SolValue};
use cowprotocol::{
BuyTokenDestination, COMPOSABLE_COW, ComposableCoW::ConditionalOrderCreated,
ApiError, BuyTokenDestination, COMPOSABLE_COW, ComposableCoW::ConditionalOrderCreated,
ConditionalOrderParams, EMPTY_APP_DATA_JSON, GPv2OrderData, OrderCreation, OrderData,
OrderKind, SellTokenSource, Signature,
};
Expand Down Expand Up @@ -169,7 +169,7 @@ fn poll_all_watches(block: &types::Block) -> Result<(), HostError> {
&format!("poll {key} -> {}", outcome_label(&outcome)),
);
if let PollOutcome::Ready { order, signature } = outcome {
submit_ready(block.chain_id, owner, &order, signature);
submit_ready(block.chain_id, owner, &order, signature, &key, now_epoch_s)?;
}
// BLEU-830 will persist next_block / next_epoch / remove the watch
// on the non-Ready arms.
Expand Down Expand Up @@ -347,15 +347,22 @@ impl core::fmt::Display for BuildError {
}
}

fn submit_ready(chain_id: u64, owner: Address, order: &GPv2OrderData, signature: Bytes) {
fn submit_ready(
chain_id: u64,
owner: Address,
order: &GPv2OrderData,
signature: Bytes,
watch_key: &str,
now_epoch_s: u64,
) -> Result<(), HostError> {
let creation = match build_order_creation(order, signature, owner) {
Ok(c) => c,
Err(e) => {
logging::log(
logging::Level::Warn,
&format!("twap submit skipped for {owner:#x}: {e}"),
);
return;
return Ok(());
}
};
let body = match serde_json::to_vec(&creation) {
Expand All @@ -365,7 +372,7 @@ fn submit_ready(chain_id: u64, owner: Address, order: &GPv2OrderData, signature:
logging::Level::Error,
&format!("OrderCreation JSON encode failed: {e}"),
);
return;
return Ok(());
}
};
match cow_api::submit_order(chain_id, &body) {
Expand All @@ -374,25 +381,107 @@ fn submit_ready(chain_id: u64, owner: Address, order: &GPv2OrderData, signature:
// Empty marker — presence of the key is the receipt. BLEU-830
// may later attach metadata (block, attempt count) but the
// bare flag is enough to suppress double submits.
if let Err(e) = local_store::set(&key, b"") {
logging::log(
logging::Level::Error,
&format!("persist {key} failed: {}", e.message),
);
return;
}
local_store::set(&key, b"")?;
logging::log(logging::Level::Info, &format!("submitted {key}"));
}
Err(err) => {
// BLEU-829 wires `OrderPostError::retry_hint` here so the
// backoff / drop decision is data-driven. Until then, log
// and leave the watch in place for the next block.
apply_submit_retry(&err, watch_key, now_epoch_s)?;
}
}
Ok(())
}

// ---- BLEU-829: OrderPostError -> retry action ----

/// What the lifecycle layer should do after a failed submission.
///
/// Mirrors the BLEU-829 retry contract (`TryNextBlock` / `BackoffSeconds(s)`
/// / `Drop`). Today the `Backoff` arm has no producer because the
/// cowprotocol API exposes `retry_hint() -> bool` (no server-supplied
/// delay) — the variant is kept so the dispatcher can grow into it
/// once cowprotocol or the orderbook hands us a hint.
#[derive(Debug, Eq, PartialEq)]
enum RetryAction {
/// Leave the watch in place; it will be polled on the next block.
TryNextBlock,
/// Persist `next_epoch = now + seconds` so the watch is skipped
/// until that timestamp. Reserved for a future producer (the
/// cowprotocol surface today is bool-only, no server delay).
#[allow(dead_code)]
Backoff { seconds: u64 },
/// Remove the watch entirely — the order will not be retried.
Drop,
}

/// Try to decode the orderbook's typed error payload from a HostError.
///
/// The host's `cow_api::submit_order` backend places the orderbook's
/// JSON body in `host-error.data` when the upstream returned a typed
/// `ApiError` (this forwarding is the host-side counterpart to BLEU-829;
/// see PR description for the status of that change). When `data` is
/// missing or fails to parse the function returns `None`, and the
/// dispatcher falls back to the safe default of "retry next block".
fn try_decode_api_error(err: &HostError) -> Option<ApiError> {
let data = err.data.as_deref()?;
serde_json::from_str::<ApiError>(data).ok()
}

/// Classify a failed submission into the action the lifecycle layer
/// should take. Defaults to `TryNextBlock` whenever the typed payload
/// is absent or unrecognised — the safe choice that lets a flaky
/// orderbook recover without dropping a still-valid order.
fn classify_submit_error(err: &HostError) -> RetryAction {
match try_decode_api_error(err) {
Some(api) if api.retry_hint() => RetryAction::TryNextBlock,
Some(_) => RetryAction::Drop,
None => RetryAction::TryNextBlock,
}
}

fn apply_submit_retry(
err: &HostError,
watch_key: &str,
now_epoch_s: u64,
) -> Result<(), HostError> {
let action = classify_submit_error(err);
match action {
RetryAction::TryNextBlock => {
logging::log(
logging::Level::Warn,
&format!("submit retry-next-block ({}): {}", err.code, err.message),
);
}
RetryAction::Backoff { seconds } => {
let until = now_epoch_s.saturating_add(seconds);
if let Some((owner_hex, hash_hex)) = parse_watch_key(watch_key) {
local_store::set(
&format!("next_epoch:{owner_hex}:{hash_hex}"),
&until.to_le_bytes(),
)?;
}
logging::log(
logging::Level::Warn,
&format!(
"submit backoff {seconds}s -> next_epoch={until} ({}): {}",
err.code, err.message
),
);
}
RetryAction::Drop => {
// Drop the watch, plus any stale gating entries the lifecycle
// layer may have written.
local_store::delete(watch_key)?;
if let Some((owner_hex, hash_hex)) = parse_watch_key(watch_key) {
let _ = local_store::delete(&format!("next_block:{owner_hex}:{hash_hex}"));
let _ = local_store::delete(&format!("next_epoch:{owner_hex}:{hash_hex}"));
}
logging::log(
logging::Level::Warn,
&format!("submit failed ({}): {}", err.code, err.message),
&format!("submit dropped watch ({}): {}", err.code, err.message),
);
}
}
Ok(())
}

fn outcome_label(o: &PollOutcome) -> &'static str {
Expand Down Expand Up @@ -755,4 +844,89 @@ mod tests {
.unwrap_err();
assert!(matches!(err, BuildError::Cowprotocol(_)));
}

// ---- BLEU-829: submit-error classification ----

fn host_error_with_api(error_type: &str) -> HostError {
let body = serde_json::json!({
"errorType": error_type,
"description": "test",
});
HostError {
domain: "cow-api".into(),
kind: nexum::host::types::HostErrorKind::Denied,
code: 400,
message: format!("{error_type}: test"),
data: Some(body.to_string()),
}
}

#[test]
fn classify_retriable_kind_returns_try_next_block() {
// InsufficientFee / TooManyLimitOrders / PriceExceedsMarketPrice
// are the three kinds cowprotocol::OrderPostErrorKind flags
// retriable today.
for kind in ["InsufficientFee", "TooManyLimitOrders", "PriceExceedsMarketPrice"] {
assert_eq!(
classify_submit_error(&host_error_with_api(kind)),
RetryAction::TryNextBlock,
"{kind} should be retriable",
);
}
}

#[test]
fn classify_permanent_kind_returns_drop() {
for kind in [
"InvalidSignature",
"WrongOwner",
"DuplicateOrder",
"UnsupportedToken",
"InvalidAppData",
] {
assert_eq!(
classify_submit_error(&host_error_with_api(kind)),
RetryAction::Drop,
"{kind} should be permanent",
);
}
}

#[test]
fn classify_unknown_kind_returns_drop() {
// `Unknown(_)` is non-retriable per cowprotocol's classification
// — the orderbook rejected the order with a string we don't
// recognise, so retrying as-is is unlikely to help.
assert_eq!(
classify_submit_error(&host_error_with_api("NewlyMintedErrorType")),
RetryAction::Drop,
);
}

#[test]
fn classify_missing_data_defaults_to_try_next_block() {
// Until the host backend forwards the orderbook JSON into
// host-error.data, we have no payload to decode. The safe
// default is to retry rather than poison a still-valid watch.
let err = HostError {
domain: "cow-api".into(),
kind: nexum::host::types::HostErrorKind::Internal,
code: 0,
message: "network reset".into(),
data: None,
};
assert_eq!(classify_submit_error(&err), RetryAction::TryNextBlock);
}

#[test]
fn classify_malformed_data_defaults_to_try_next_block() {
let err = HostError {
domain: "cow-api".into(),
kind: nexum::host::types::HostErrorKind::Denied,
code: 502,
message: "bad gateway".into(),
data: Some("<html>upstream HTML</html>".into()),
};
assert_eq!(classify_submit_error(&err), RetryAction::TryNextBlock);
}
}