diff --git a/cardano-testnet/changelog.d/20260416_124922_mgalazyn_refactor_retry_functions.md b/cardano-testnet/changelog.d/20260416_124922_mgalazyn_refactor_retry_functions.md index b7fcdcc714f..175902e4112 100644 --- a/cardano-testnet/changelog.d/20260416_124922_mgalazyn_refactor_retry_functions.md +++ b/cardano-testnet/changelog.d/20260416_124922_mgalazyn_refactor_retry_functions.md @@ -7,7 +7,7 @@ Now mirrors `waitForEpochs`: relies solely on the shared retry loop's timeout instead of an outer block-count predicate, avoiding the drift between two independent snapshots of the starting block number. - Simplified `checkDRepState` by replacing direct `foldEpochState` usage with `EpochStateView` polling. - Simplified `assertNewEpochState` by replacing `watchEpochStateUpdate` with `retryUntilRightM`. -- Changed `EpochStateView` from a record with three fields to a newtype wrapping the `IORef`, removing unused `nodeConfigPath` and `socketPath` fields. +- Removed unused `nodeConfigPath` and `socketPath` fields from `EpochStateView`. - Added `maybeExtractGovernanceActionExpiry` in `Testnet.EpochStateProcessing`, which reads a proposal's `gasExpiresAfter` epoch from the gov state. - Rewrote the `Gov Action Timeout` integration test to derive its wait target from the proposal's actual expiry epoch, removing the race window caused by not knowing which epoch the proposal was recorded in. The check now waits one full epoch past the removal boundary so the RATIFY-produced state is @k@-deep stable and cannot be invalidated by a chain rollback. diff --git a/cardano-testnet/changelog.d/20260416_160000_mgalazyn_stm_epoch_state_view.md b/cardano-testnet/changelog.d/20260416_160000_mgalazyn_stm_epoch_state_view.md new file mode 100644 index 00000000000..bc2bd480dc6 --- /dev/null +++ b/cardano-testnet/changelog.d/20260416_160000_mgalazyn_stm_epoch_state_view.md @@ -0,0 +1,4 @@ +### Maintenance + +- Retries in `EpochStateView` wake immediately on each epoch state update instead of polling. + Multiple threads waiting on the same view wake up together. diff --git a/cardano-testnet/src/Testnet/Components/Query.hs b/cardano-testnet/src/Testnet/Components/Query.hs index fb8689a9f2d..8f085bd662f 100644 --- a/cardano-testnet/src/Testnet/Components/Query.hs +++ b/cardano-testnet/src/Testnet/Components/Query.hs @@ -60,9 +60,12 @@ import qualified Cardano.Ledger.State as L import Prelude +import Control.Applicative ((<|>)) +import Control.Concurrent.STM (STM, TVar, modifyTVar', newTVarIO, readTVar, writeTVar) +import qualified Control.Concurrent.STM as STM import Control.Monad +import Control.Monad.Trans.Maybe (MaybeT (..), mapMaybeT, runMaybeT) import Control.Monad.Trans.Resource -import Data.IORef import Data.List (sortOn) import qualified Data.Map as Map import Data.Map.Strict (Map) @@ -87,6 +90,8 @@ import qualified Hedgehog as H import Hedgehog.Extras (MonadAssertion) import qualified Hedgehog.Extras as H +import UnliftIO.STM (atomically, readTVarIO, registerDelay) + -- | Block and wait for the desired epoch. waitUntilEpoch :: HasCallStack @@ -152,8 +157,10 @@ instance Show TestnetWaitPeriod where WaitForBlocks n -> "WaitForBlocks " <> show n WaitForSlots n -> "WaitForSlots " <> show n --- | Core retry loop. Repeats the action every 300ms until it returns 'Right' --- or the timeout is reached, in which case the last 'Left' is returned. +-- | Core retry loop. Returns early on 'Right'; on 'Left', blocks via STM until +-- the 'EpochStateView' is updated (with a safety fallback timeout) and retries. +-- Gives up and returns the last 'Left' once the 'TestnetWaitPeriod' deadline is +-- exceeded. retryUntilRightM :: HasCallStack => MonadIO m @@ -167,13 +174,18 @@ retryUntilRightM esv timeout act = withFrozenCallStack $ do startingValue <- getCurrentValue go $ startingValue + timeoutW64 where - go deadline = act >>= \case - r@(Right _) -> pure r - l@(Left _) -> do - cv <- getCurrentValue - if cv > deadline - then pure l - else H.threadDelay 300_000 >> go deadline + go deadline = do + -- Sample the version before running 'act' so that any update landing during 'act' + -- makes 'awaitStateUpdateTimeout' return without blocking, rather than waiting for + -- the next update and adding a block/epoch of latency. + versionBeforeAct <- readTVarIO $ epochStateVersion esv + act >>= \case + r@(Right _) -> pure r + l@(Left _) -> do + cv <- getCurrentValue + if cv > deadline + then pure l + else awaitStateUpdateTimeout esv 300 versionBeforeAct *> go deadline (getCurrentValue, timeoutW64) = case timeout of WaitForEpochs (EpochInterval n) -> (unEpochNo <$> getCurrentEpochNo esv, fromIntegral n) @@ -227,13 +239,57 @@ data EpochStateStatus -- ^ The background thread encountered an error while folding blocks -- | A read-only mutable pointer to an epoch state, updated automatically -newtype EpochStateView = EpochStateView - { epochStateView :: IORef (Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo)) +data EpochStateView = EpochStateView + { epochStateView :: !(TVar (Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo))) -- ^ Automatically updated current NewEpochState. 'Left' indicates the state is not yet available -- (either not initialised or an error occurred). 'Right' contains the latest epoch state. -- Use 'getEpochState', 'getBlockNumber', 'getSlotNumber' to access the values. + , epochStateVersion :: !(TVar Word64) + -- ^ Monotonically increasing counter, bumped on every state write. + -- Used by 'awaitStateUpdateTimeout' to block until the next update. } +-- | Write a new value to the epoch state and bump the version counter atomically. +writeEpochStateView + :: EpochStateView + -> Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo) + -- ^ new state value + -> STM () +writeEpochStateView EpochStateView{epochStateView, epochStateVersion} newState = do + writeTVar epochStateView newState + modifyTVar' epochStateVersion (+ 1) + +-- | Block until the epoch state version advances past the provided previously sampled +-- version, or until the fallback timeout expires. Returns immediately if the current +-- version already differs, so callers can sample before running an action and avoid +-- missing updates that land during the action. Returns 'Nothing' on timeout. +-- All threads blocked on the same 'EpochStateView' wake up on each update. +awaitStateUpdateTimeout + :: MonadIO m + => EpochStateView + -> DTC.NominalDiffTime -- ^ Fallback timeout + -> Word64 -- ^ Previously sampled version + -> m (Maybe (Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo))) +awaitStateUpdateTimeout EpochStateView{epochStateVersion, epochStateView} timeout sinceVersion = runMaybeT $ fastResult <|> awaitedResult + where + -- Fast path: if the version already differs, read state and version atomically and return + -- without allocating a 'registerDelay' timer. This avoids accumulating timer-queue entries + -- when callers sample a stale version and an update has already landed. + fastResult = mapMaybeT atomically $ do + v <- lift $ readTVar epochStateVersion + guard $ v /= sinceVersion + lift $ readTVar epochStateView + + awaitedResult = MaybeT $ do + timedOutVar <- registerDelay . ceiling $ timeout * 1_000_000 + atomically $ do + v <- readTVar epochStateVersion + timedOut <- readTVar timedOutVar + case (v /= sinceVersion, timedOut) of + (True, _) -> Just <$> readTVar epochStateView + (_, True) -> pure Nothing + _ -> STM.retry + -- | Get epoch state from the view. If the state isn't available, retry waiting up to 25 seconds. Fails -- immediately if the background thread encountered an error, or after 25 seconds if not yet initialised. getEpochState @@ -266,9 +322,9 @@ getSlotNumber getSlotNumber epochStateView = withFrozenCallStack $ (\(_, slotNumber, _) -> slotNumber) <$> getEpochStateDetails epochStateView --- | Utility function for accessing epoch state in 'IORef'. --- Retries every 0.5s for up to 25 seconds while not initialised. --- Fails immediately if the background fold thread encountered an error. +-- | Access the current epoch state. Returns immediately if state is already available. +-- Blocks up to 25 seconds waiting for initialisation if the background thread has not yet +-- received any epoch state. Fails immediately if the background thread encountered an error. getEpochStateDetails :: HasCallStack => MonadAssertion m @@ -276,27 +332,46 @@ getEpochStateDetails => MonadIO m => EpochStateView -> m (AnyNewEpochState, SlotNo, BlockNo) -getEpochStateDetails EpochStateView{epochStateView} = - withFrozenCallStack $ do - deadline <- liftIO $ DTC.addUTCTime 25 <$> DTC.getCurrentTime - go deadline +getEpochStateDetails EpochStateView{epochStateView} = withFrozenCallStack $ + -- Fast path: read the TVar outside STM block so we don't register a pointless + -- 'initTimeoutSeconds' timer on every call. These getters run inside tight + -- retry loops, and the unused timer-queue entries would otherwise accumulate. + readTVarIO epochStateView + >>= awaitForState + >>= failEpochStateFoldError where - go deadline = do - result <- H.evalIO $ readIORef epochStateView - case result of - Left (EpochStateFoldError err) -> do - H.note_ $ "EpochStateView background thread failed: " <> docToString (prettyError err) - H.failure - Left EpochStateNotInitialised -> do - currentTime <- liftIO DTC.getCurrentTime - if currentTime < deadline - then do - H.threadDelay 500_000 - go deadline - else do - H.note_ "EpochStateView has not been initialised within 25 seconds" - H.failure - Right details -> pure details + initTimeoutSeconds :: Int + initTimeoutSeconds = 25 + + awaitForState + :: MonadIO n + => Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo) + -> n (Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo)) + awaitForState = \case + Left EpochStateNotInitialised -> do + -- register delay only when we're starting to retry + timedOutVar <- registerDelay $ initTimeoutSeconds * 1_000_000 + atomically $ do + state' <- readTVar epochStateView + state' <$ case state' of + -- retry until timeout + Left EpochStateNotInitialised -> readTVar timedOutVar >>= guard + _ -> pure () + state -> pure state + + failEpochStateFoldError + :: (HasCallStack, MonadTest n) + => Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo) + -> n (AnyNewEpochState, SlotNo, BlockNo) + failEpochStateFoldError = \case + Right details -> pure details + Left (EpochStateFoldError err) -> do + H.note_ $ "EpochStateView background thread failed: " <> docToString (prettyError err) + H.failure + Left EpochStateNotInitialised -> do + H.note_ $ "EpochStateView has not been initialised within " <> show initTimeoutSeconds <> " seconds" + H.failure + -- | Create a background thread listening for new epoch states. New epoch states are available to access -- through 'EpochStateView', using query functions. @@ -311,16 +386,16 @@ getEpochStateView -> SocketPath -- ^ node socket path -> m EpochStateView getEpochStateView nodeConfigFile socketPath = withFrozenCallStack $ do - epochStateView <- H.evalIO $ newIORef $ Left EpochStateNotInitialised - void . asyncRegister_ $ do + esv <- H.evalIO $ EpochStateView <$> newTVarIO (Left EpochStateNotInitialised) <*> newTVarIO 0 + _ <- asyncRegister_ $ do result <- runExceptT $ foldEpochState nodeConfigFile socketPath QuickValidation (EpochNo maxBound) () $ \epochState slotNumber blockNumber -> do - liftIOAnnotated . writeIORef epochStateView $ Right (epochState, slotNumber, blockNumber) + liftIOAnnotated . atomically $ writeEpochStateView esv $ Right (epochState, slotNumber, blockNumber) pure ConditionNotMet case result of - Left err -> writeIORef epochStateView $ Left $ EpochStateFoldError err + Left err -> atomically $ writeEpochStateView esv $ Left $ EpochStateFoldError err Right _ -> pure () - pure $ EpochStateView epochStateView + pure esv -- | Retrieve all UTxOs map from the epoch state view. findAllUtxos