Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
Now mirrors `waitForEpochs`: relies solely on the shared retry loop's timeout instead of an outer block-count predicate, avoiding the drift between two independent snapshots of the starting block number.
- Simplified `checkDRepState` by replacing direct `foldEpochState` usage with `EpochStateView` polling.
- Simplified `assertNewEpochState` by replacing `watchEpochStateUpdate` with `retryUntilRightM`.
- Changed `EpochStateView` from a record with three fields to a newtype wrapping the `IORef`, removing unused `nodeConfigPath` and `socketPath` fields.
- Removed unused `nodeConfigPath` and `socketPath` fields from `EpochStateView`.
- Added `maybeExtractGovernanceActionExpiry` in `Testnet.EpochStateProcessing`, which reads a proposal's `gasExpiresAfter` epoch from the gov state.
- Rewrote the `Gov Action Timeout` integration test to derive its wait target from the proposal's actual expiry epoch, removing the race window caused by not knowing which epoch the proposal was recorded in.
The check now waits one full epoch past the removal boundary so the RATIFY-produced state is @k@-deep stable and cannot be invalidated by a chain rollback.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Maintenance

- Retries in `EpochStateView` wake immediately on each epoch state update instead of polling.
Multiple threads waiting on the same view wake up together.
155 changes: 115 additions & 40 deletions cardano-testnet/src/Testnet/Components/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ import qualified Cardano.Ledger.State as L

import Prelude

import Control.Applicative ((<|>))
import Control.Concurrent.STM (STM, TVar, modifyTVar', newTVarIO, readTVar, writeTVar)
import qualified Control.Concurrent.STM as STM
import Control.Monad
import Control.Monad.Trans.Maybe (MaybeT (..), mapMaybeT, runMaybeT)
import Control.Monad.Trans.Resource
import Data.IORef
import Data.List (sortOn)
import qualified Data.Map as Map
import Data.Map.Strict (Map)
Expand All @@ -87,6 +90,8 @@ import qualified Hedgehog as H
import Hedgehog.Extras (MonadAssertion)
import qualified Hedgehog.Extras as H

import UnliftIO.STM (atomically, readTVarIO, registerDelay)

-- | Block and wait for the desired epoch.
waitUntilEpoch
:: HasCallStack
Expand Down Expand Up @@ -152,8 +157,10 @@ instance Show TestnetWaitPeriod where
WaitForBlocks n -> "WaitForBlocks " <> show n
WaitForSlots n -> "WaitForSlots " <> show n

-- | Core retry loop. Repeats the action every 300ms until it returns 'Right'
-- or the timeout is reached, in which case the last 'Left' is returned.
-- | Core retry loop. Returns early on 'Right'; on 'Left', blocks via STM until
-- the 'EpochStateView' is updated (with a safety fallback timeout) and retries.
-- Gives up and returns the last 'Left' once the 'TestnetWaitPeriod' deadline is
-- exceeded.
retryUntilRightM
:: HasCallStack
=> MonadIO m
Expand All @@ -167,13 +174,18 @@ retryUntilRightM esv timeout act = withFrozenCallStack $ do
startingValue <- getCurrentValue
go $ startingValue + timeoutW64
where
go deadline = act >>= \case
r@(Right _) -> pure r
l@(Left _) -> do
cv <- getCurrentValue
if cv > deadline
then pure l
else H.threadDelay 300_000 >> go deadline
go deadline = do
-- Sample the version before running 'act' so that any update landing during 'act'
-- makes 'awaitStateUpdateTimeout' return without blocking, rather than waiting for
-- the next update and adding a block/epoch of latency.
versionBeforeAct <- readTVarIO $ epochStateVersion esv
act >>= \case
r@(Right _) -> pure r
l@(Left _) -> do
cv <- getCurrentValue
if cv > deadline
then pure l
else awaitStateUpdateTimeout esv 300 versionBeforeAct *> go deadline

(getCurrentValue, timeoutW64) = case timeout of
WaitForEpochs (EpochInterval n) -> (unEpochNo <$> getCurrentEpochNo esv, fromIntegral n)
Expand Down Expand Up @@ -227,13 +239,57 @@ data EpochStateStatus
-- ^ The background thread encountered an error while folding blocks

-- | A read-only mutable pointer to an epoch state, updated automatically
newtype EpochStateView = EpochStateView
{ epochStateView :: IORef (Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo))
data EpochStateView = EpochStateView
{ epochStateView :: !(TVar (Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo)))
-- ^ Automatically updated current NewEpochState. 'Left' indicates the state is not yet available
-- (either not initialised or an error occurred). 'Right' contains the latest epoch state.
-- Use 'getEpochState', 'getBlockNumber', 'getSlotNumber' to access the values.
, epochStateVersion :: !(TVar Word64)
-- ^ Monotonically increasing counter, bumped on every state write.
-- Used by 'awaitStateUpdateTimeout' to block until the next update.
}

-- | Write a new value to the epoch state and bump the version counter atomically.
writeEpochStateView
:: EpochStateView
-> Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo)
-- ^ new state value
-> STM ()
writeEpochStateView EpochStateView{epochStateView, epochStateVersion} newState = do
writeTVar epochStateView newState
modifyTVar' epochStateVersion (+ 1)

-- | Block until the epoch state version advances past the provided previously sampled
Comment thread
carbolymer marked this conversation as resolved.
-- version, or until the fallback timeout expires. Returns immediately if the current
-- version already differs, so callers can sample before running an action and avoid
-- missing updates that land during the action. Returns 'Nothing' on timeout.
-- All threads blocked on the same 'EpochStateView' wake up on each update.
awaitStateUpdateTimeout
:: MonadIO m
=> EpochStateView
-> DTC.NominalDiffTime -- ^ Fallback timeout
-> Word64 -- ^ Previously sampled version
-> m (Maybe (Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo)))
awaitStateUpdateTimeout EpochStateView{epochStateVersion, epochStateView} timeout sinceVersion = runMaybeT $ fastResult <|> awaitedResult
where
-- Fast path: if the version already differs, read state and version atomically and return
-- without allocating a 'registerDelay' timer. This avoids accumulating timer-queue entries
-- when callers sample a stale version and an update has already landed.
fastResult = mapMaybeT atomically $ do
v <- lift $ readTVar epochStateVersion
guard $ v /= sinceVersion
lift $ readTVar epochStateView

awaitedResult = MaybeT $ do
timedOutVar <- registerDelay . ceiling $ timeout * 1_000_000
atomically $ do
v <- readTVar epochStateVersion
timedOut <- readTVar timedOutVar
case (v /= sinceVersion, timedOut) of
(True, _) -> Just <$> readTVar epochStateView
(_, True) -> pure Nothing
_ -> STM.retry

-- | Get epoch state from the view. If the state isn't available, retry waiting up to 25 seconds. Fails
-- immediately if the background thread encountered an error, or after 25 seconds if not yet initialised.
getEpochState
Expand Down Expand Up @@ -266,37 +322,56 @@ getSlotNumber
getSlotNumber epochStateView =
withFrozenCallStack $ (\(_, slotNumber, _) -> slotNumber) <$> getEpochStateDetails epochStateView

-- | Utility function for accessing epoch state in 'IORef'.
-- Retries every 0.5s for up to 25 seconds while not initialised.
-- Fails immediately if the background fold thread encountered an error.
-- | Access the current epoch state. Returns immediately if state is already available.
-- Blocks up to 25 seconds waiting for initialisation if the background thread has not yet
-- received any epoch state. Fails immediately if the background thread encountered an error.
getEpochStateDetails
:: HasCallStack
=> MonadAssertion m
=> MonadTest m
=> MonadIO m
=> EpochStateView
-> m (AnyNewEpochState, SlotNo, BlockNo)
getEpochStateDetails EpochStateView{epochStateView} =
withFrozenCallStack $ do
deadline <- liftIO $ DTC.addUTCTime 25 <$> DTC.getCurrentTime
go deadline
getEpochStateDetails EpochStateView{epochStateView} = withFrozenCallStack $
-- Fast path: read the TVar outside STM block so we don't register a pointless
-- 'initTimeoutSeconds' timer on every call. These getters run inside tight
-- retry loops, and the unused timer-queue entries would otherwise accumulate.
readTVarIO epochStateView
>>= awaitForState
>>= failEpochStateFoldError
where
go deadline = do
result <- H.evalIO $ readIORef epochStateView
case result of
Left (EpochStateFoldError err) -> do
H.note_ $ "EpochStateView background thread failed: " <> docToString (prettyError err)
H.failure
Left EpochStateNotInitialised -> do
currentTime <- liftIO DTC.getCurrentTime
if currentTime < deadline
then do
H.threadDelay 500_000
go deadline
else do
H.note_ "EpochStateView has not been initialised within 25 seconds"
H.failure
Right details -> pure details
initTimeoutSeconds :: Int
initTimeoutSeconds = 25

awaitForState
:: MonadIO n
=> Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo)
-> n (Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo))
awaitForState = \case
Left EpochStateNotInitialised -> do
-- register delay only when we're starting to retry
timedOutVar <- registerDelay $ initTimeoutSeconds * 1_000_000
atomically $ do
state' <- readTVar epochStateView
state' <$ case state' of
-- retry until timeout
Left EpochStateNotInitialised -> readTVar timedOutVar >>= guard
Comment thread
carbolymer marked this conversation as resolved.
_ -> pure ()
state -> pure state

failEpochStateFoldError
:: (HasCallStack, MonadTest n)
=> Either EpochStateStatus (AnyNewEpochState, SlotNo, BlockNo)
-> n (AnyNewEpochState, SlotNo, BlockNo)
failEpochStateFoldError = \case
Right details -> pure details
Left (EpochStateFoldError err) -> do
H.note_ $ "EpochStateView background thread failed: " <> docToString (prettyError err)
H.failure
Left EpochStateNotInitialised -> do
H.note_ $ "EpochStateView has not been initialised within " <> show initTimeoutSeconds <> " seconds"
H.failure


-- | Create a background thread listening for new epoch states. New epoch states are available to access
-- through 'EpochStateView', using query functions.
Expand All @@ -311,16 +386,16 @@ getEpochStateView
-> SocketPath -- ^ node socket path
-> m EpochStateView
getEpochStateView nodeConfigFile socketPath = withFrozenCallStack $ do
epochStateView <- H.evalIO $ newIORef $ Left EpochStateNotInitialised
void . asyncRegister_ $ do
esv <- H.evalIO $ EpochStateView <$> newTVarIO (Left EpochStateNotInitialised) <*> newTVarIO 0
_ <- asyncRegister_ $ do
result <- runExceptT $ foldEpochState nodeConfigFile socketPath QuickValidation (EpochNo maxBound) ()
$ \epochState slotNumber blockNumber -> do
liftIOAnnotated . writeIORef epochStateView $ Right (epochState, slotNumber, blockNumber)
liftIOAnnotated . atomically $ writeEpochStateView esv $ Right (epochState, slotNumber, blockNumber)
pure ConditionNotMet
case result of
Left err -> writeIORef epochStateView $ Left $ EpochStateFoldError err
Left err -> atomically $ writeEpochStateView esv $ Left $ EpochStateFoldError err
Right _ -> pure ()
pure $ EpochStateView epochStateView
pure esv

-- | Retrieve all UTxOs map from the epoch state view.
findAllUtxos
Expand Down
Loading