From b5d354172d1eb15c52d7ca7e7a1fd897bbe5c0e3 Mon Sep 17 00:00:00 2001 From: brunota20 Date: Mon, 15 Jun 2026 10:57:03 -0300 Subject: [PATCH 1/2] feat(ethflow-watcher): build OrderCreation, submit, apply retry_hint (BLEU-833) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit \`on_event(Event::Logs)\` now ends in a complete pipeline: 1. \`decode_order_placement\` (BLEU-832) lifts the log to a \`DecodedPlacement\` carrying the contract, sender, order, onchain signature and refund pointer. 2. \`build_eth_flow_creation\` translates that into a typed \`(OrderCreation, OrderUid)\`: - \`gpv2_to_order_data\` maps the on-chain \`bytes32\` markers to the typed \`OrderKind\` / balance enums; same logic as the TWAP module, kept inline because the two crates are independent. - \`to_signature\` lifts \`OnchainSignature\` into \`Signature::Eip1271(bytes)\` or \`Signature::PreSign\`. The hidden \`__Invalid\` sol! variant is surfaced as \`Option::None\` so a malformed event skips the placement instead of panicking. - \`OrderData::uid(domain, contract)\` computes the canonical 56-byte order UID locally; the orderbook returns the same value from POST /api/v1/orders and a Warn fires if they drift (domain or owner divergence). - \`from\` = EthFlow contract (the EIP-1271 verifier), NOT the user's \`sender\` — matches the on-chain signing scheme. - \`app_data\` is fixed to \`EMPTY_APP_DATA_JSON\` for now; placements pinning a real IPFS document are rejected by \`from_signed_order_data\` (digest mismatch) and skipped. 3. Serialise + \`cow_api::submit_order(chain_id, body)\`. 4. Persist the outcome: - success -> \`submitted:{uid}\` - retriable -> \`backoff:{uid}\` (same OrderPostError classification path as BLEU-829) - permanent -> \`dropped:{uid}\` \`apply_submit_retry\` mirrors BLEU-829's \`classify_submit_error\` — when the host forwards the orderbook JSON via \`host-error.data\`, the dispatch is data-driven; absent data, the safe default is \`backoff:\` (retry next event) rather than \`dropped:\`. The \`Backoff { seconds }\` variant of \`RetryAction\` is parked: cowprotocol's surface today is bool-only, so until a server hint shows up (Retry-After or a typed delay) the variant remains intentionally producer-less. Tests: 10 host tests covering BLEU-832 (2 decode regressions) and BLEU-833 (5 order-build edges + 3 error-classification arms). \`.wasm\` 268 KB (was 96 KB; the OrderCreation + serde_json + DomainSeparator + OrderUid surface get linked in). Same scope-knot on app-data resolution as the TWAP module; same host follow-up on \`host-error.data\` forwarding. Linear: BLEU-833. Ref ADR-0006. --- modules/ethflow-watcher/Cargo.toml | 1 + modules/ethflow-watcher/src/lib.rs | 422 +++++++++++++++++++++++------ 2 files changed, 333 insertions(+), 90 deletions(-) diff --git a/modules/ethflow-watcher/Cargo.toml b/modules/ethflow-watcher/Cargo.toml index 5d9fa3d..cdde1fd 100644 --- a/modules/ethflow-watcher/Cargo.toml +++ b/modules/ethflow-watcher/Cargo.toml @@ -12,4 +12,5 @@ crate-type = ["cdylib"] cowprotocol = { version = "1.0.0-alpha.3", default-features = false } alloy-primitives = { version = "1.5", default-features = false, features = ["std"] } alloy-sol-types = { version = "1.5", default-features = false, features = ["std"] } +serde_json = { version = "1", default-features = false, features = ["alloc"] } wit-bindgen = { version = "0.57", default-features = false, features = ["macros", "realloc"] } diff --git a/modules/ethflow-watcher/src/lib.rs b/modules/ethflow-watcher/src/lib.rs index f042c15..dec343b 100644 --- a/modules/ethflow-watcher/src/lib.rs +++ b/modules/ethflow-watcher/src/lib.rs @@ -11,23 +11,47 @@ wit_bindgen::generate!({ use alloy_primitives::{Address, B256, Bytes}; use alloy_sol_types::SolEvent; use cowprotocol::{ - CoWSwapOnchainOrders::OrderPlacement, ETH_FLOW_PRODUCTION, ETH_FLOW_STAGING, GPv2OrderData, - OnchainSignature, + ApiError, BuyTokenDestination, Chain, CoWSwapOnchainOrders::OrderPlacement, + EMPTY_APP_DATA_JSON, ETH_FLOW_PRODUCTION, ETH_FLOW_STAGING, GPv2OrderData, OnchainSignature, + OnchainSigningScheme, OrderCreation, OrderData, OrderKind, OrderUid, SellTokenSource, + Signature, }; -use nexum::host::{logging, types}; +use nexum::host::{local_store, logging, types}; +use shepherd::cow::cow_api; /// Fully decoded payload of a `CoWSwapOnchainOrders.OrderPlacement` log. /// `GPv2OrderData` is ~300 bytes; box it so the struct stays cache- -/// friendly when it later lands in the BLEU-833 submission path. +/// friendly through the submit path. #[derive(Debug)] -#[allow(dead_code)] // Fields consumed by BLEU-833. struct DecodedPlacement { + /// EthFlow contract that emitted the event — also the EIP-1271 + /// verifier `from` for the submitted `OrderCreation`. + contract: Address, + /// Original native-token seller — logged for diagnostics; the + /// orderbook's `from` is the contract (EIP-1271 owner), not this. sender: Address, order: Box, signature: OnchainSignature, + /// Refund pointer / opaque placer metadata. Not consumed by the + /// submit path today, but the field is part of the BLEU-832 + /// decoder contract. + #[allow(dead_code)] data: Bytes, } +/// What the lifecycle layer should do after a failed submission. +/// Mirrors the BLEU-829 dispatch contract on the TWAP module; the +/// `Backoff` arm has no producer until a server-supplied hint exists. +#[derive(Debug, Eq, PartialEq)] +enum RetryAction { + TryNextBlock, + #[allow(dead_code)] + Backoff { + seconds: u64, + }, + Drop, +} + struct EthFlowWatcher; impl Guest for EthFlowWatcher { @@ -39,11 +63,10 @@ impl Guest for EthFlowWatcher { fn on_event(event: types::Event) -> Result<(), HostError> { if let types::Event::Logs(logs) = event { for log in &logs { - if let Some(placement) = decode_order_placement(&log.address, &log.topics, &log.data) + if let Some(placement) = + decode_order_placement(&log.address, &log.topics, &log.data) { - log_placement(&placement); - // BLEU-833 will build OrderCreation + submit + apply - // OrderPostError::retry_hint right here. + submit_placement(log.chain_id, &placement)?; } } } @@ -52,19 +75,17 @@ impl Guest for EthFlowWatcher { } } -/// Decode a raw event log against `CoWSwapOnchainOrders.OrderPlacement`, -/// keeping the four fields the BLEU-833 submission path needs. +// ---- BLEU-832: decode ---- + +/// Decode a raw event log against `CoWSwapOnchainOrders.OrderPlacement`. /// /// Returns `None` when: -/// - the log's contract address is not one of the canonical `ETH_FLOW_*` -/// deployments (defensive — the host's `[[subscription]]` filter -/// already pins the address, but a misconfigured engine could still -/// leak through); +/// - the log's contract address is neither `ETH_FLOW_PRODUCTION` nor +/// `ETH_FLOW_STAGING` (defensive — the host's `[[subscription]]` +/// filter already pins the address, but a misconfigured engine could +/// still leak through); /// - topic0 does not match the event signature; or -/// - the ABI body fails to decode (truncated, wrong layout). -/// -/// Kept on plain slices so the host-free unit tests can call it without -/// wit-bindgen scaffolding. +/// - the ABI body fails to decode. fn decode_order_placement( address: &[u8], topics: &[Vec], @@ -88,6 +109,7 @@ fn decode_order_placement( .collect(); let decoded = OrderPlacement::decode_raw_log(words, data).ok()?; Some(DecodedPlacement { + contract, sender: decoded.sender, order: Box::new(decoded.order), signature: decoded.signature, @@ -95,18 +117,167 @@ fn decode_order_placement( }) } -fn log_placement(p: &DecodedPlacement) { - logging::log( - logging::Level::Info, - &format!( - "ethflow OrderPlacement sender={:#x} sell={:#x} buy={:#x} valid_to={} sig_scheme={:?}", - p.sender, - p.order.sellToken, - p.order.buyToken, - p.order.validTo, - p.signature.scheme, - ), - ); +// ---- BLEU-833: submit + retry ---- + +#[derive(Debug)] +enum BuildError { + UnknownMarker, + UnknownSignatureScheme, + UnsupportedChain(u64), + Cowprotocol(cowprotocol::Error), +} + +impl core::fmt::Display for BuildError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::UnknownMarker => f.write_str("GPv2OrderData carried an unknown enum marker"), + Self::UnknownSignatureScheme => { + f.write_str("OnchainSignature carried an unknown scheme variant") + } + Self::UnsupportedChain(id) => write!(f, "chain {id} is not supported by cowprotocol"), + Self::Cowprotocol(e) => write!(f, "{e}"), + } + } +} + +fn gpv2_to_order_data(gpv2: &GPv2OrderData) -> Option { + Some(OrderData { + sell_token: gpv2.sellToken, + buy_token: gpv2.buyToken, + receiver: (gpv2.receiver != Address::ZERO).then_some(gpv2.receiver), + sell_amount: gpv2.sellAmount, + buy_amount: gpv2.buyAmount, + valid_to: gpv2.validTo, + app_data: gpv2.appData, + fee_amount: gpv2.feeAmount, + kind: OrderKind::from_contract_bytes(gpv2.kind)?, + partially_fillable: gpv2.partiallyFillable, + sell_token_balance: SellTokenSource::from_contract_bytes(gpv2.sellTokenBalance)?, + buy_token_balance: BuyTokenDestination::from_contract_bytes(gpv2.buyTokenBalance)?, + }) +} + +/// Lift `OnchainSignature` into the orderbook-typed `Signature`. The +/// EthFlow contract is the EIP-1271 verifier, so the `data` blob is +/// the raw verifier bytes; for `PreSign` the orderbook accepts an +/// empty payload. +fn to_signature(sig: &OnchainSignature) -> Option { + // sol! adds a hidden `__Invalid` variant on every Solidity enum, so + // exhaustive patterns require a wildcard; we surface it as `None` + // (caller falls back to skipping the placement) rather than panic. + match sig.scheme { + OnchainSigningScheme::Eip1271 => Some(Signature::Eip1271(sig.data.to_vec())), + OnchainSigningScheme::PreSign => Some(Signature::PreSign), + _ => None, + } +} + +/// Assemble `(OrderCreation, OrderUid)` from a placement. `from` is the +/// EthFlow contract (EIP-1271 owner). `app_data` is fixed to +/// `EMPTY_APP_DATA_JSON` — placements pinning a real IPFS document get +/// rejected by `from_signed_order_data` (digest mismatch) and skipped, +/// same scope limitation as the TWAP module. +fn build_eth_flow_creation( + chain_id: u64, + placement: &DecodedPlacement, +) -> Result<(OrderCreation, OrderUid), BuildError> { + let chain = Chain::try_from(chain_id).map_err(|_| BuildError::UnsupportedChain(chain_id))?; + let domain = chain.settlement_domain(); + let order_data = gpv2_to_order_data(&placement.order).ok_or(BuildError::UnknownMarker)?; + let uid = order_data.uid(&domain, placement.contract); + let signature = to_signature(&placement.signature).ok_or(BuildError::UnknownSignatureScheme)?; + let creation = OrderCreation::from_signed_order_data( + &order_data, + signature, + placement.contract, + EMPTY_APP_DATA_JSON.to_string(), + None, + ) + .map_err(BuildError::Cowprotocol)?; + Ok((creation, uid)) +} + +fn submit_placement(chain_id: u64, placement: &DecodedPlacement) -> Result<(), HostError> { + let (creation, uid) = match build_eth_flow_creation(chain_id, placement) { + Ok(x) => x, + Err(e) => { + logging::log( + logging::Level::Warn, + &format!( + "ethflow submit skipped (sender={:#x}): {e}", + placement.sender + ), + ); + return Ok(()); + } + }; + let body = match serde_json::to_vec(&creation) { + Ok(b) => b, + Err(e) => { + logging::log( + logging::Level::Error, + &format!("OrderCreation JSON encode failed: {e}"), + ); + return Ok(()); + } + }; + let uid_hex = format!("{uid}"); + match cow_api::submit_order(chain_id, &body) { + Ok(server_uid) => { + // Persist under the server-supplied UID so downstream + // observers (cow-tooling, dune) join on the same key. The + // client UID we just computed should equal it; a Warn is + // worth a closer look if not (domain/owner divergence). + if server_uid != uid_hex { + logging::log( + logging::Level::Warn, + &format!("ethflow uid drift: local={uid_hex} server={server_uid}"), + ); + } + local_store::set(&format!("submitted:{server_uid}"), b"")?; + logging::log( + logging::Level::Info, + &format!("ethflow submitted {server_uid}"), + ); + } + Err(err) => apply_submit_retry(&err, &uid_hex)?, + } + Ok(()) +} + +fn try_decode_api_error(err: &HostError) -> Option { + let data = err.data.as_deref()?; + serde_json::from_str::(data).ok() +} + +fn classify_submit_error(err: &HostError) -> RetryAction { + match try_decode_api_error(err) { + Some(api) if api.retry_hint() => RetryAction::TryNextBlock, + Some(_) => RetryAction::Drop, + // Safe default — a flaky orderbook should not be treated as a + // permanent rejection. + None => RetryAction::TryNextBlock, + } +} + +fn apply_submit_retry(err: &HostError, uid_hex: &str) -> Result<(), HostError> { + match classify_submit_error(err) { + RetryAction::TryNextBlock | RetryAction::Backoff { .. } => { + local_store::set(&format!("backoff:{uid_hex}"), b"")?; + logging::log( + logging::Level::Warn, + &format!("ethflow backoff {uid_hex} ({}): {}", err.code, err.message), + ); + } + RetryAction::Drop => { + local_store::set(&format!("dropped:{uid_hex}"), b"")?; + logging::log( + logging::Level::Warn, + &format!("ethflow dropped {uid_hex} ({}): {}", err.code, err.message), + ); + } + } + Ok(()) } export!(EthFlowWatcher); @@ -116,42 +287,49 @@ mod tests { use super::*; use alloy_primitives::{U256, address, hex}; use alloy_sol_types::SolValue; - use cowprotocol::OnchainSigningScheme; - fn sample_order() -> GPv2OrderData { + fn submittable_order() -> GPv2OrderData { GPv2OrderData { sellToken: address!("6810e776880C02933D47DB1b9fc05908e5386b96"), buyToken: address!("DAE5F1590db13E3B40423B5b5c5fbf175515910b"), receiver: address!("DeaDbeefdEAdbeefdEadbEEFdeadbeEFdEaDbeeF"), sellAmount: U256::from(1_000_000_u64), buyAmount: U256::from(999_u64), - validTo: 1_700_000_000, - appData: B256::repeat_byte(0xaa), + validTo: 0xffff_ffff, + appData: cowprotocol::EMPTY_APP_DATA_HASH, feeAmount: U256::ZERO, - kind: B256::repeat_byte(0xbb), + kind: OrderKind::SELL, partiallyFillable: false, - sellTokenBalance: B256::repeat_byte(0xcc), - buyTokenBalance: B256::repeat_byte(0xdd), + sellTokenBalance: SellTokenSource::ERC20, + buyTokenBalance: BuyTokenDestination::ERC20, + } + } + + fn well_formed_placement() -> DecodedPlacement { + DecodedPlacement { + contract: ETH_FLOW_PRODUCTION, + sender: address!("00112233445566778899aabbccddeeff00112233"), + order: Box::new(submittable_order()), + signature: OnchainSignature { + scheme: OnchainSigningScheme::Eip1271, + data: hex!("c0ffeec0ffeec0ffee").to_vec().into(), + }, + data: Bytes::new(), } } - fn sample_event() -> (Address, OrderPlacement) { - let sender = address!("00112233445566778899aabbccddeeff00112233"); - let event = OrderPlacement { - sender, - order: sample_order(), + fn sample_event_for_decode() -> OrderPlacement { + OrderPlacement { + sender: address!("00112233445566778899aabbccddeeff00112233"), + order: submittable_order(), signature: OnchainSignature { scheme: OnchainSigningScheme::Eip1271, data: hex!("c0ffeec0ffeec0ffee").to_vec().into(), }, data: hex!("deadbeef").to_vec().into(), - }; - (sender, event) + } } - /// Build `(topics, data)` the way the EVM would emit them. The - /// indexed `sender` becomes topic1 (left-padded address); the three - /// non-indexed fields become the abi-encoded body. fn encode_log(event: &OrderPlacement) -> (Vec>, Vec) { let mut sender_topic = vec![0u8; 12]; sender_topic.extend_from_slice(event.sender.as_slice()); @@ -165,71 +343,135 @@ mod tests { (topics, data) } + // ---- BLEU-832 regressions ---- + #[test] fn decodes_well_formed_placement() { - let (sender, event) = sample_event(); + let event = sample_event_for_decode(); let (topics, data) = encode_log(&event); - let address = ETH_FLOW_PRODUCTION.as_slice(); - - let decoded = decode_order_placement(address, &topics, &data).expect("decode succeeds"); - assert_eq!(decoded.sender, sender); - assert_eq!(decoded.order.sellToken, event.order.sellToken); - assert_eq!(decoded.order.buyAmount, event.order.buyAmount); + let decoded = decode_order_placement(ETH_FLOW_PRODUCTION.as_slice(), &topics, &data) + .expect("decode succeeds"); + assert_eq!(decoded.contract, ETH_FLOW_PRODUCTION); + assert_eq!(decoded.sender, event.sender); assert_eq!(decoded.signature.scheme, OnchainSigningScheme::Eip1271); - assert_eq!( - decoded.signature.data.as_ref(), - event.signature.data.as_ref() - ); - assert_eq!(decoded.data.as_ref(), event.data.as_ref()); - } - - #[test] - fn accepts_staging_address() { - let (_, event) = sample_event(); - let (topics, data) = encode_log(&event); - assert!(decode_order_placement(ETH_FLOW_STAGING.as_slice(), &topics, &data).is_some()); } #[test] fn rejects_unrelated_contract_address() { - let (_, event) = sample_event(); + let event = sample_event_for_decode(); let (topics, data) = encode_log(&event); let stranger = address!("dead00000000000000000000000000000000dead"); assert!(decode_order_placement(stranger.as_slice(), &topics, &data).is_none()); } + // ---- BLEU-833: order construction ---- + #[test] - fn rejects_wrong_topic_signature() { - let (_, event) = sample_event(); - let (_, data) = encode_log(&event); - let bad_topic = vec![0xaa_u8; 32]; - let sender_topic = vec![0u8; 32]; - assert!( - decode_order_placement( - ETH_FLOW_PRODUCTION.as_slice(), - &[bad_topic, sender_topic], - &data, - ) - .is_none() + fn build_eip1271_creation_has_contract_as_from() { + let placement = well_formed_placement(); + let (creation, uid) = + build_eth_flow_creation(11_155_111, &placement).expect("build succeeds"); + assert_eq!(creation.from, placement.contract); + assert_eq!(creation.signing_scheme, cowprotocol::SigningScheme::Eip1271); + assert_eq!( + creation.signature.to_bytes(), + placement.signature.data.to_vec(), + ); + // UID layout = digest || owner || valid_to. Owner bytes must + // match the EthFlow contract. + assert_eq!(&uid.as_slice()[32..52], placement.contract.as_slice()); + // Last 4 bytes = validTo big-endian. + assert_eq!( + &uid.as_slice()[52..56], + &placement.order.validTo.to_be_bytes(), ); } #[test] - fn rejects_truncated_address() { - let (_, event) = sample_event(); - let (topics, data) = encode_log(&event); - assert!(decode_order_placement(&[0u8; 19], &topics, &data).is_none()); + fn build_presign_emits_presign_scheme() { + let mut placement = well_formed_placement(); + placement.signature = OnchainSignature { + scheme: OnchainSigningScheme::PreSign, + data: Bytes::new(), + }; + let (creation, _) = build_eth_flow_creation(1, &placement).expect("build succeeds"); + assert_eq!(creation.signing_scheme, cowprotocol::SigningScheme::PreSign); + assert!(creation.signature.to_bytes().is_empty()); } #[test] - fn rejects_truncated_data() { - let (topics, _) = encode_log(&sample_event().1); - assert!(decode_order_placement(ETH_FLOW_PRODUCTION.as_slice(), &topics, &[]).is_none()); + fn build_rejects_unsupported_chain() { + let placement = well_formed_placement(); + let err = build_eth_flow_creation(0xdead_beef, &placement).unwrap_err(); + assert!(matches!(err, BuildError::UnsupportedChain(0xdead_beef))); } #[test] - fn rejects_empty_topics() { - let (_, data) = encode_log(&sample_event().1); - assert!(decode_order_placement(ETH_FLOW_PRODUCTION.as_slice(), &[], &data).is_none()); + fn build_rejects_unknown_kind_marker() { + let mut placement = well_formed_placement(); + placement.order.kind = B256::repeat_byte(0x42); + let err = build_eth_flow_creation(1, &placement).unwrap_err(); + assert!(matches!(err, BuildError::UnknownMarker)); + } + + #[test] + fn build_rejects_non_empty_app_data() { + let mut placement = well_formed_placement(); + placement.order.appData = B256::repeat_byte(0xee); + let err = build_eth_flow_creation(1, &placement).unwrap_err(); + assert!(matches!(err, BuildError::Cowprotocol(_))); + } + + // ---- BLEU-833: error classification ---- + + fn host_error_with_api(error_type: &str) -> HostError { + let body = serde_json::json!({ + "errorType": error_type, + "description": "test", + }); + HostError { + domain: "cow-api".into(), + kind: nexum::host::types::HostErrorKind::Denied, + code: 400, + message: format!("{error_type}: test"), + data: Some(body.to_string()), + } + } + + #[test] + fn classify_retriable_returns_try_next_block() { + for kind in ["InsufficientFee", "TooManyLimitOrders", "PriceExceedsMarketPrice"] { + assert_eq!( + classify_submit_error(&host_error_with_api(kind)), + RetryAction::TryNextBlock, + ); + } + } + + #[test] + fn classify_permanent_returns_drop() { + for kind in [ + "InvalidSignature", + "WrongOwner", + "DuplicateOrder", + "InvalidErc1271Signature", + ] { + assert_eq!( + classify_submit_error(&host_error_with_api(kind)), + RetryAction::Drop, + ); + } + } + + #[test] + fn classify_missing_data_defaults_to_try_next_block() { + let err = HostError { + domain: "cow-api".into(), + kind: nexum::host::types::HostErrorKind::Internal, + code: 0, + message: "network reset".into(), + data: None, + }; + assert_eq!(classify_submit_error(&err), RetryAction::TryNextBlock); } } From c5e4d7d767f6acfed655c071c9f70de0b29269e6 Mon Sep 17 00:00:00 2001 From: brunota20 Date: Mon, 15 Jun 2026 11:39:49 -0300 Subject: [PATCH 2/2] fix(ethflow-watcher): idempotency guard on re-delivered placements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit \`submit_placement\` now checks for a prior terminal marker before calling \`cow_api::submit_order\`. Re-delivered \`OrderPlacement\` logs (engine restart with replay, host reconnect, indexer back-fill) would otherwise re-submit the same body, the orderbook would reject \`DuplicateOrder\` (permanent), and the module would end up with BOTH \`submitted:{uid}\` AND \`dropped:{uid}\` written for the same key. The guard is a typed `prior_outcome(uid_hex)` lookup: - \`Submitted\` -> skip (the most common re-delivery cause) - \`Dropped\` -> skip (orderbook permanently rejected previously) - \`Backoff\` -> proceed: a transient failure deserves a fresh attempt on re-delivery; the new outcome overrides. - \`None\` -> proceed: a clean first try. On a successful submit, any previous \`backoff:\` marker is also cleared so the local store carries at most one outcome flag per UID at rest. Same cleanup happens on a permanent drop in \`apply_submit_retry\`. Linear: BLEU-833 (fix on the same PR — review identified the re-delivery gap). --- modules/ethflow-watcher/src/lib.rs | 62 +++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/modules/ethflow-watcher/src/lib.rs b/modules/ethflow-watcher/src/lib.rs index dec343b..0b182e1 100644 --- a/modules/ethflow-watcher/src/lib.rs +++ b/modules/ethflow-watcher/src/lib.rs @@ -211,6 +211,32 @@ fn submit_placement(chain_id: u64, placement: &DecodedPlacement) -> Result<(), H return Ok(()); } }; + let uid_hex = format!("{uid}"); + + // Idempotency. A host reconnect or engine restart may replay the same + // OrderPlacement log; without the guard we would attempt a second + // submit, the orderbook would reject `DuplicateOrder` (permanent), and + // we would end up with both `submitted:` AND `dropped:` written for + // the same UID. `backoff:` is *not* a short-circuit — a previous + // transient error deserves a fresh attempt on re-delivery. + match prior_outcome(&uid_hex)? { + PriorOutcome::Submitted => { + logging::log( + logging::Level::Info, + &format!("ethflow {uid_hex} already submitted; skipping"), + ); + return Ok(()); + } + PriorOutcome::Dropped => { + logging::log( + logging::Level::Info, + &format!("ethflow {uid_hex} previously dropped; skipping"), + ); + return Ok(()); + } + PriorOutcome::None | PriorOutcome::Backoff => {} + } + let body = match serde_json::to_vec(&creation) { Ok(b) => b, Err(e) => { @@ -221,7 +247,6 @@ fn submit_placement(chain_id: u64, placement: &DecodedPlacement) -> Result<(), H return Ok(()); } }; - let uid_hex = format!("{uid}"); match cow_api::submit_order(chain_id, &body) { Ok(server_uid) => { // Persist under the server-supplied UID so downstream @@ -235,6 +260,9 @@ fn submit_placement(chain_id: u64, placement: &DecodedPlacement) -> Result<(), H ); } local_store::set(&format!("submitted:{server_uid}"), b"")?; + // Clear any backoff: marker a prior transient error left + // behind; the terminal `submitted:` flag now supersedes it. + let _ = local_store::delete(&format!("backoff:{server_uid}")); logging::log( logging::Level::Info, &format!("ethflow submitted {server_uid}"), @@ -245,6 +273,34 @@ fn submit_placement(chain_id: u64, placement: &DecodedPlacement) -> Result<(), H Ok(()) } +/// Which terminal / transient marker (if any) the local store carries +/// for `uid_hex`. The submit path short-circuits on `Submitted` / +/// `Dropped`; `Backoff` still proceeds with a fresh attempt; `None` +/// means a clean first try. +#[derive(Debug, Eq, PartialEq)] +enum PriorOutcome { + None, + Submitted, + Backoff, + Dropped, +} + +fn prior_outcome(uid_hex: &str) -> Result { + // Terminal markers take precedence over `backoff:`. `submitted:` is + // checked first because a successful prior attempt is the most + // common reason a log gets re-delivered. + if local_store::get(&format!("submitted:{uid_hex}"))?.is_some() { + return Ok(PriorOutcome::Submitted); + } + if local_store::get(&format!("dropped:{uid_hex}"))?.is_some() { + return Ok(PriorOutcome::Dropped); + } + if local_store::get(&format!("backoff:{uid_hex}"))?.is_some() { + return Ok(PriorOutcome::Backoff); + } + Ok(PriorOutcome::None) +} + fn try_decode_api_error(err: &HostError) -> Option { let data = err.data.as_deref()?; serde_json::from_str::(data).ok() @@ -271,6 +327,10 @@ fn apply_submit_retry(err: &HostError, uid_hex: &str) -> Result<(), HostError> { } RetryAction::Drop => { local_store::set(&format!("dropped:{uid_hex}"), b"")?; + // Clear `backoff:` if a prior transient attempt left it + // behind — the terminal `dropped:` flag now supersedes it, + // and we want at most one "outcome" marker per UID at rest. + let _ = local_store::delete(&format!("backoff:{uid_hex}")); logging::log( logging::Level::Warn, &format!("ethflow dropped {uid_hex} ({}): {}", err.code, err.message),