diff --git a/.changelog/6299.trivial.md b/.changelog/6299.trivial.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go/oasis-node/cmd/node/node_control.go b/go/oasis-node/cmd/node/node_control.go index 9310d0780f5..494bf9b2332 100644 --- a/go/oasis-node/cmd/node/node_control.go +++ b/go/oasis-node/cmd/node/node_control.go @@ -312,10 +312,10 @@ func (n *Node) getRuntimeStatus(ctx context.Context) (map[common.Namespace]contr } // Fetch storage worker status. - if storageNode := n.StorageWorker.GetRuntime(rt.ID()); storageNode != nil { - status.Storage, err = storageNode.GetStatus(ctx) + if stateSync := n.StorageWorker.GetRuntime(rt.ID()); stateSync != nil { + status.Storage, err = stateSync.GetStatus(ctx) if err != nil { - logger.Error("failed to fetch storage worker status", "err", err) + logger.Error("failed to fetch state sync worker status", "err", err) } } diff --git a/go/oasis-test-runner/oasis/log.go b/go/oasis-test-runner/oasis/log.go index cd38354d83f..a46b126c18a 100644 --- a/go/oasis-test-runner/oasis/log.go +++ b/go/oasis-test-runner/oasis/log.go @@ -8,7 +8,7 @@ import ( roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/commitment" upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api" - workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" + "github.com/oasisprotocol/oasis-core/go/worker/storage/statesync" ) // LogAssertEvent returns a handler which checks whether a specific log event was @@ -116,7 +116,7 @@ func LogAssertRoothashRoothashReindexing() log.WatcherHandlerFactory { // LogAssertCheckpointSync returns a handler which checks whether initial storage sync from // a checkpoint was successful or not. func LogAssertCheckpointSync() log.WatcherHandlerFactory { - return LogAssertEvent(workerStorage.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed") + return LogAssertEvent(statesync.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed") } // LogAssertDiscrepancyMajorityFailure returns a handler which checks whether a discrepancy resolution diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go deleted file mode 100644 index c4f6a890b4d..00000000000 --- a/go/worker/storage/committee/node.go +++ /dev/null @@ -1,1395 +0,0 @@ -package committee - -import ( - "container/heap" - "context" - "errors" - "fmt" - "math" - "sync" - "time" - - "github.com/eapache/channels" - - "github.com/oasisprotocol/oasis-core/go/common/logging" - "github.com/oasisprotocol/oasis-core/go/common/node" - "github.com/oasisprotocol/oasis-core/go/common/pubsub" - "github.com/oasisprotocol/oasis-core/go/common/workerpool" - "github.com/oasisprotocol/oasis-core/go/config" - consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" - commonFlags "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags" - "github.com/oasisprotocol/oasis-core/go/p2p/rpc" - registryApi "github.com/oasisprotocol/oasis-core/go/registry/api" - roothashApi "github.com/oasisprotocol/oasis-core/go/roothash/api" - "github.com/oasisprotocol/oasis-core/go/roothash/api/block" - runtime "github.com/oasisprotocol/oasis-core/go/runtime/api" - "github.com/oasisprotocol/oasis-core/go/runtime/host" - storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" - "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" - dbApi "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" - mkvsDB "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" - workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" - "github.com/oasisprotocol/oasis-core/go/worker/common/committee" - "github.com/oasisprotocol/oasis-core/go/worker/registration" - "github.com/oasisprotocol/oasis-core/go/worker/storage/api" - "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" - "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" - storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub" - "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" -) - -var ( - _ committee.NodeHooks = (*Node)(nil) - - // ErrNonLocalBackend is the error returned when the storage backend doesn't implement the LocalBackend interface. - ErrNonLocalBackend = errors.New("storage: storage backend doesn't support local storage") -) - -const ( - // RoundLatest is a magic value for the latest round. - RoundLatest = math.MaxUint64 - - defaultUndefinedRound = ^uint64(0) - - checkpointSyncRetryDelay = 10 * time.Second - - // The maximum number of rounds the worker can be behind the chain before it's sensible for - // it to register as available. - maximumRoundDelayForAvailability = uint64(10) - - // The minimum number of rounds the worker can be behind the chain before it's sensible for - // it to stop advertising availability. - minimumRoundDelayForUnavailability = uint64(15) - - // maxInFlightRounds is the maximum number of rounds that should be fetched before waiting - // for them to be applied. - maxInFlightRounds = 100 - - // chunkerThreads is target number of subtrees during parallel checkpoint creation. - // It is intentionally non-configurable since we want operators to produce - // same checkpoint hashes. The current value was chosen based on the benchmarks - // done on the modern developer machine. - chunkerThreads = 12 -) - -type roundItem interface { - GetRound() uint64 -} - -// minRoundQueue is a Round()-based min priority queue. -type minRoundQueue []roundItem - -// Sorting interface. -func (q minRoundQueue) Len() int { return len(q) } -func (q minRoundQueue) Less(i, j int) bool { return q[i].GetRound() < q[j].GetRound() } -func (q minRoundQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } - -// Push appends x as the last element in the heap's array. -func (q *minRoundQueue) Push(x any) { - *q = append(*q, x.(roundItem)) -} - -// Pop removes and returns the last element in the heap's array. -func (q *minRoundQueue) Pop() any { - old := *q - n := len(old) - x := old[n-1] - *q = old[0 : n-1] - return x -} - -// fetchedDiff has all the context needed for a single GetDiff operation. -type fetchedDiff struct { - fetched bool - pf rpc.PeerFeedback - err error - round uint64 - prevRoot storageApi.Root - thisRoot storageApi.Root - writeLog storageApi.WriteLog -} - -func (d *fetchedDiff) GetRound() uint64 { - return d.round -} - -type finalizeResult struct { - summary *blockSummary - err error -} - -// Node watches blocks for storage changes. -type Node struct { // nolint: maligned - commonNode *committee.Node - - roleProvider registration.RoleProvider - rpcRoleProvider registration.RoleProvider - roleAvailable bool - - logger *logging.Logger - - localStorage storageApi.LocalBackend - - diffSync diffsync.Client - checkpointSync checkpointsync.Client - legacyStorageSync synclegacy.Client - - undefinedRound uint64 - - fetchPool *workerpool.Pool - - workerCommonCfg workerCommon.Config - - checkpointer checkpoint.Checkpointer - checkpointSyncCfg *CheckpointSyncConfig - checkpointSyncForced bool - - syncedLock sync.RWMutex - syncedState blockSummary - - statusLock sync.RWMutex - status api.StorageWorkerStatus - - blockCh *channels.InfiniteChannel - diffCh chan *fetchedDiff - finalizeCh chan finalizeResult - - ctx context.Context - ctxCancel context.CancelFunc - - quitCh chan struct{} - - initCh chan struct{} -} - -func NewNode( - commonNode *committee.Node, - roleProvider registration.RoleProvider, - rpcRoleProvider registration.RoleProvider, - workerCommonCfg workerCommon.Config, - localStorage storageApi.LocalBackend, - checkpointSyncCfg *CheckpointSyncConfig, -) (*Node, error) { - initMetrics() - - // Create the fetcher pool. - fetchPool := workerpool.New("storage_fetch/" + commonNode.Runtime.ID().String()) - fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount) - - n := &Node{ - commonNode: commonNode, - - roleProvider: roleProvider, - rpcRoleProvider: rpcRoleProvider, - - logger: logging.GetLogger("worker/storage/committee").With("runtime_id", commonNode.Runtime.ID()), - - workerCommonCfg: workerCommonCfg, - - localStorage: localStorage, - - fetchPool: fetchPool, - - checkpointSyncCfg: checkpointSyncCfg, - - status: api.StatusInitializing, - - blockCh: channels.NewInfiniteChannel(), - diffCh: make(chan *fetchedDiff), - finalizeCh: make(chan finalizeResult), - - quitCh: make(chan struct{}), - initCh: make(chan struct{}), - } - - // Validate checkpoint sync configuration. - if err := checkpointSyncCfg.Validate(); err != nil { - return nil, fmt.Errorf("bad checkpoint sync configuration: %w", err) - } - - // Initialize sync state. - n.syncedState.Round = defaultUndefinedRound - - n.ctx, n.ctxCancel = context.WithCancel(context.Background()) - - // Create a checkpointer (even if checkpointing is disabled) to ensure the genesis checkpoint is available. - checkpointer, err := n.newCheckpointer(n.ctx, commonNode, localStorage) - if err != nil { - return nil, fmt.Errorf("failed to create checkpointer: %w", err) - } - n.checkpointer = checkpointer - - // Register prune handler. - commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ - logger: n.logger, - node: n, - }) - - // Advertise and serve p2p protocols. - commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - if config.GlobalConfig.Storage.Checkpointer.Enabled { - commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - } - if rpcRoleProvider != nil { - commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - } - - // Create p2p protocol clients. - n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - - return n, nil -} - -func (n *Node) newCheckpointer(ctx context.Context, commonNode *committee.Node, localStorage storageApi.LocalBackend) (checkpoint.Checkpointer, error) { - checkInterval := checkpoint.CheckIntervalDisabled - if config.GlobalConfig.Storage.Checkpointer.Enabled { - checkInterval = config.GlobalConfig.Storage.Checkpointer.CheckInterval - } - checkpointerCfg := checkpoint.CheckpointerConfig{ - Name: "runtime", - Namespace: commonNode.Runtime.ID(), - CheckInterval: checkInterval, - RootsPerVersion: 2, // State root and I/O root. - GetParameters: func(ctx context.Context) (*checkpoint.CreationParameters, error) { - rt, rerr := commonNode.Runtime.ActiveDescriptor(ctx) - if rerr != nil { - return nil, fmt.Errorf("failed to retrieve runtime descriptor: %w", rerr) - } - - blk, rerr := commonNode.Consensus.RootHash().GetGenesisBlock(ctx, &roothashApi.RuntimeRequest{ - RuntimeID: rt.ID, - Height: consensus.HeightLatest, - }) - if rerr != nil { - return nil, fmt.Errorf("failed to retrieve genesis block: %w", rerr) - } - - var threads uint16 - if config.GlobalConfig.Storage.Checkpointer.ParallelChunker { - threads = chunkerThreads - } - - return &checkpoint.CreationParameters{ - Interval: rt.Storage.CheckpointInterval, - NumKept: rt.Storage.CheckpointNumKept, - ChunkSize: rt.Storage.CheckpointChunkSize, - InitialVersion: blk.Header.Round, - ChunkerThreads: threads, - }, nil - }, - GetRoots: func(ctx context.Context, version uint64) ([]storageApi.Root, error) { - blk, berr := commonNode.Runtime.History().GetCommittedBlock(ctx, version) - if berr != nil { - return nil, berr - } - - return blk.Header.StorageRoots(), nil - }, - } - - return checkpoint.NewCheckpointer( - ctx, - localStorage.NodeDB(), - localStorage.Checkpointer(), - checkpointerCfg, - ) -} - -// Service interface. - -// Name returns the service name. -func (n *Node) Name() string { - return "committee node" -} - -// Start causes the worker to start responding to CometBFT new block events. -func (n *Node) Start() error { - go n.worker() - if config.GlobalConfig.Storage.Checkpointer.Enabled { - go n.consensusCheckpointSyncer() - } - return nil -} - -// Stop causes the worker to stop watching and shut down. -func (n *Node) Stop() { - n.statusLock.Lock() - n.status = api.StatusStopping - n.statusLock.Unlock() - - n.fetchPool.Stop() - - n.ctxCancel() -} - -// Quit returns a channel that will be closed when the worker stops. -func (n *Node) Quit() <-chan struct{} { - return n.quitCh -} - -// Cleanup cleans up any leftover state after the worker is stopped. -func (n *Node) Cleanup() { - // Nothing to do here? -} - -// Initialized returns a channel that will be closed once the worker finished starting up. -func (n *Node) Initialized() <-chan struct{} { - return n.initCh -} - -// GetStatus returns the storage committee node status. -func (n *Node) GetStatus(context.Context) (*api.Status, error) { - n.syncedLock.RLock() - defer n.syncedLock.RUnlock() - - n.statusLock.RLock() - defer n.statusLock.RUnlock() - - return &api.Status{ - LastFinalizedRound: n.syncedState.Round, - Status: n.status, - }, nil -} - -func (n *Node) PauseCheckpointer(pause bool) error { - if !commonFlags.DebugDontBlameOasis() { - return api.ErrCantPauseCheckpointer - } - n.checkpointer.Pause(pause) - return nil -} - -// GetLocalStorage returns the local storage backend used by this storage node. -func (n *Node) GetLocalStorage() storageApi.LocalBackend { - return n.localStorage -} - -// NodeHooks implementation. - -// HandleNewBlockEarlyLocked is guarded by CrossNode. -func (n *Node) HandleNewBlockEarlyLocked(*runtime.BlockInfo) { - // Nothing to do here. -} - -// HandleNewBlockLocked is guarded by CrossNode. -func (n *Node) HandleNewBlockLocked(bi *runtime.BlockInfo) { - // Notify the state syncer that there is a new block. - n.blockCh.In() <- bi.RuntimeBlock -} - -// HandleRuntimeHostEventLocked is guarded by CrossNode. -func (n *Node) HandleRuntimeHostEventLocked(*host.Event) { - // Nothing to do here. -} - -// Watcher implementation. - -// GetLastSynced returns the height, IORoot hash and StateRoot hash of the last block that was fully synced to. -func (n *Node) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { - n.syncedLock.RLock() - defer n.syncedLock.RUnlock() - - var io, state storageApi.Root - for _, root := range n.syncedState.Roots { - switch root.Type { - case storageApi.RootTypeIO: - io = root - case storageApi.RootTypeState: - state = root - } - } - - return n.syncedState.Round, io, state -} - -func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { - result := &fetchedDiff{ - fetched: false, - pf: rpc.NewNopPeerFeedback(), - round: round, - prevRoot: prevRoot, - thisRoot: thisRoot, - } - defer func() { - select { - case n.diffCh <- result: - case <-n.ctx.Done(): - } - }() - - // Check if the new root doesn't already exist. - if n.localStorage.NodeDB().HasRoot(thisRoot) { - return - } - - result.fetched = true - - // Even if HasRoot returns false the root can still exist if it is equal - // to the previous root and the root was emitted by the consensus committee - // directly (e.g., during an epoch transition). - if thisRoot.Hash.Equal(&prevRoot.Hash) { - result.writeLog = storageApi.WriteLog{} - return - } - - // New root does not yet exist in storage and we need to fetch it from a peer. - n.logger.Debug("calling GetDiff", - "old_root", prevRoot, - "new_root", thisRoot, - ) - - ctx, cancel := context.WithCancel(n.ctx) - defer cancel() - - wl, pf, err := n.getDiff(ctx, prevRoot, thisRoot) - if err != nil { - result.err = err - return - } - result.pf = pf - result.writeLog = wl -} - -// getDiff fetches writelog using diff sync p2p protocol client. -// -// In case of no peers or error, it fallbacks to the legacy storage sync protocol. -func (n *Node) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { - rsp1, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) - if err == nil { // if NO error - return rsp1.WriteLog, pf, nil - } - - rsp2, pf, err := n.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) - if err != nil { - return nil, nil, err - } - return rsp2.WriteLog, pf, nil -} - -func (n *Node) finalize(summary *blockSummary) { - err := n.localStorage.NodeDB().Finalize(summary.Roots) - switch err { - case nil: - n.logger.Debug("storage round finalized", - "round", summary.Round, - ) - case storageApi.ErrAlreadyFinalized: - // This can happen if we are restoring after a roothash migration or if - // we crashed before updating the sync state. - n.logger.Warn("storage round already finalized", - "round", summary.Round, - ) - err = nil - default: - n.logger.Error("failed to finalize storage round", - "err", err, - "round", summary.Round, - ) - } - - result := finalizeResult{ - summary: summary, - err: err, - } - - select { - case n.finalizeCh <- result: - case <-n.ctx.Done(): - } -} - -func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) error { - n.logger.Info("initializing storage at genesis") - - // Check what the latest finalized version in the database is as we may be using a database - // from a previous version or network. - latestVersion, alreadyInitialized := n.localStorage.NodeDB().GetLatestVersion() - - // Finalize any versions that were not yet finalized in the old database. This is only possible - // as long as there is only one non-finalized root per version. Note that we also cannot be sure - // that any of these roots are valid, but this is fine as long as the final version matches the - // genesis root. - if alreadyInitialized { - n.logger.Debug("already initialized, finalizing any non-finalized versions", - "genesis_state_root", genesisBlock.Header.StateRoot, - "genesis_round", genesisBlock.Header.Round, - "latest_version", latestVersion, - ) - - for v := latestVersion + 1; v < genesisBlock.Header.Round; v++ { - roots, err := n.localStorage.NodeDB().GetRootsForVersion(v) - if err != nil { - return fmt.Errorf("failed to fetch roots for version %d: %w", v, err) - } - - var stateRoots []storageApi.Root - for _, root := range roots { - if root.Type == storageApi.RootTypeState { - stateRoots = append(stateRoots, root) - } - } - if len(stateRoots) != 1 { - break // We must have exactly one non-finalized state root to continue. - } - - err = n.localStorage.NodeDB().Finalize(stateRoots) - if err != nil { - return fmt.Errorf("failed to finalize version %d: %w", v, err) - } - - latestVersion = v - } - } - - stateRoot := storageApi.Root{ - Namespace: rt.ID, - Version: genesisBlock.Header.Round, - Type: storageApi.RootTypeState, - Hash: genesisBlock.Header.StateRoot, - } - - var compatible bool - switch { - case latestVersion < stateRoot.Version: - // Latest version is earlier than the genesis state root. In case it has the same hash - // we can fill in all the missing versions. - maybeRoot := stateRoot - maybeRoot.Version = latestVersion - - if n.localStorage.NodeDB().HasRoot(maybeRoot) { - n.logger.Debug("latest version earlier than genesis state root, filling in versions", - "genesis_state_root", genesisBlock.Header.StateRoot, - "genesis_round", genesisBlock.Header.Round, - "latest_version", latestVersion, - ) - for v := latestVersion; v < stateRoot.Version; v++ { - err := n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ - Namespace: rt.ID, - RootType: storageApi.RootTypeState, - SrcRound: v, - SrcRoot: stateRoot.Hash, - DstRound: v + 1, - DstRoot: stateRoot.Hash, - WriteLog: nil, // No changes. - }) - if err != nil { - return fmt.Errorf("failed to fill in version %d: %w", v, err) - } - - err = n.localStorage.NodeDB().Finalize([]storageApi.Root{{ - Namespace: rt.ID, - Version: v + 1, - Type: storageApi.RootTypeState, - Hash: stateRoot.Hash, - // We can ignore I/O roots. - }}) - if err != nil { - return fmt.Errorf("failed to finalize version %d: %w", v, err) - } - } - compatible = true - } - default: - // Latest finalized version is the same or ahead, root must exist. - compatible = n.localStorage.NodeDB().HasRoot(stateRoot) - } - - // If we are incompatible and the local version is greater or the same as the genesis version, - // we cannot do anything. If the local version is lower we assume the node will sync from a - // different node. - if !compatible && latestVersion >= stateRoot.Version { - n.logger.Error("existing state is incompatible with runtime genesis state", - "genesis_state_root", genesisBlock.Header.StateRoot, - "genesis_round", genesisBlock.Header.Round, - "latest_version", latestVersion, - ) - return fmt.Errorf("existing state is incompatible with runtime genesis state") - } - - if !compatible { - // Database is empty, so assume the state will be replicated from another node. - n.logger.Warn("non-empty state root but no state available, assuming replication", - "state_root", genesisBlock.Header.StateRoot, - ) - n.checkpointSyncForced = true - } - return nil -} - -func (n *Node) flushSyncedState(summary *blockSummary) (uint64, error) { - n.syncedLock.Lock() - defer n.syncedLock.Unlock() - - n.syncedState = *summary - if err := n.commonNode.Runtime.History().StorageSyncCheckpoint(n.syncedState.Round); err != nil { - return 0, err - } - - return n.syncedState.Round, nil -} - -func (n *Node) consensusCheckpointSyncer() { - // Make sure we always create a checkpoint when the consensus layer creates a checkpoint. The - // reason why we do this is to make it faster for storage nodes that use consensus state sync - // to catch up as exactly the right checkpoint will be available. - consensusCp := n.commonNode.Consensus.Checkpointer() - if consensusCp == nil { - return - } - - // Wait for the common node to be initialized. - select { - case <-n.commonNode.Initialized(): - case <-n.ctx.Done(): - return - } - - // Determine the maximum number of consensus checkpoints to keep. - consensusParams, err := n.commonNode.Consensus.Core().GetParameters(n.ctx, consensus.HeightLatest) - if err != nil { - n.logger.Error("failed to fetch consensus parameters", - "err", err, - ) - return - } - - ch, sub, err := consensusCp.WatchCheckpoints() - if err != nil { - n.logger.Error("failed to watch checkpoints", - "err", err, - ) - return - } - defer sub.Close() - - var ( - versions []uint64 - blkCh <-chan *consensus.Block - blkSub pubsub.ClosableSubscription - ) - defer func() { - if blkCh != nil { - blkSub.Close() - blkSub = nil - blkCh = nil - } - }() - for { - select { - case <-n.quitCh: - return - case <-n.ctx.Done(): - return - case version := <-ch: - // We need to wait for the next version as that is what will be in the consensus - // checkpoint. - versions = append(versions, version+1) - // Make sure that we limit the size of the checkpoint queue. - if uint64(len(versions)) > consensusParams.Parameters.StateCheckpointNumKept { - versions = versions[1:] - } - - n.logger.Debug("consensus checkpoint detected, queuing runtime checkpoint", - "version", version+1, - "num_versions", len(versions), - ) - - if blkCh == nil { - blkCh, blkSub, err = n.commonNode.Consensus.Core().WatchBlocks(n.ctx) - if err != nil { - n.logger.Error("failed to watch blocks", - "err", err, - ) - continue - } - } - case blk := <-blkCh: - // If there's nothing remaining, unsubscribe. - if len(versions) == 0 { - n.logger.Debug("no more queued consensus checkpoint versions") - - blkSub.Close() - blkSub = nil - blkCh = nil - continue - } - - var newVersions []uint64 - for idx, version := range versions { - if version > uint64(blk.Height) { - // We need to wait for further versions. - newVersions = versions[idx:] - break - } - - // Lookup what runtime round corresponds to the given consensus layer version and make - // sure we checkpoint it. - blk, err := n.commonNode.Consensus.RootHash().GetLatestBlock(n.ctx, &roothashApi.RuntimeRequest{ - RuntimeID: n.commonNode.Runtime.ID(), - Height: int64(version), - }) - if err != nil { - n.logger.Error("failed to get runtime block corresponding to consensus checkpoint", - "err", err, - "height", version, - ) - continue - } - - // We may have not yet synced the corresponding runtime round locally. In this case - // we need to wait until this is the case. - n.syncedLock.RLock() - lastSyncedRound := n.syncedState.Round - n.syncedLock.RUnlock() - if blk.Header.Round > lastSyncedRound { - n.logger.Debug("runtime round not available yet for checkpoint, waiting", - "height", version, - "round", blk.Header.Round, - "last_synced_round", lastSyncedRound, - ) - newVersions = versions[idx:] - break - } - - // Force runtime storage checkpointer to create a checkpoint at this round. - n.logger.Info("consensus checkpoint, force runtime checkpoint", - "height", version, - "round", blk.Header.Round, - ) - - n.checkpointer.ForceCheckpoint(blk.Header.Round) - } - versions = newVersions - } - } -} - -// This is only called from the main worker goroutine, so no locking should be necessary. -func (n *Node) nudgeAvailability(lastSynced, latest uint64) { - if lastSynced == n.undefinedRound || latest == n.undefinedRound { - return - } - if latest-lastSynced < maximumRoundDelayForAvailability && !n.roleAvailable { - n.roleProvider.SetAvailable(func(_ *node.Node) error { - return nil - }) - if n.rpcRoleProvider != nil { - n.rpcRoleProvider.SetAvailable(func(_ *node.Node) error { - return nil - }) - } - n.roleAvailable = true - } - if latest-lastSynced > minimumRoundDelayForUnavailability && n.roleAvailable { - n.roleProvider.SetUnavailable() - if n.rpcRoleProvider != nil { - n.rpcRoleProvider.SetUnavailable() - } - n.roleAvailable = false - } -} - -func (n *Node) worker() { // nolint: gocyclo - defer close(n.quitCh) - defer close(n.diffCh) - - // Wait for the common node to be initialized. - select { - case <-n.commonNode.Initialized(): - case <-n.ctx.Done(): - close(n.initCh) - return - } - - n.logger.Info("starting committee node") - - n.statusLock.Lock() - n.status = api.StatusStarting - n.statusLock.Unlock() - - // Determine genesis block. - genesisBlock, err := n.commonNode.Consensus.RootHash().GetGenesisBlock(n.ctx, &roothashApi.RuntimeRequest{ - RuntimeID: n.commonNode.Runtime.ID(), - Height: consensus.HeightLatest, - }) - if err != nil { - n.logger.Error("can't retrieve genesis block", "err", err) - return - } - n.undefinedRound = genesisBlock.Header.Round - 1 - - // Determine last finalized storage version. - if version, dbNonEmpty := n.localStorage.NodeDB().GetLatestVersion(); dbNonEmpty { - var blk *block.Block - blk, err = n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, version) - switch err { - case nil: - // Set last synced version to last finalized storage version. - if _, err = n.flushSyncedState(summaryFromBlock(blk)); err != nil { - n.logger.Error("failed to flush synced state", "err", err) - return - } - default: - // Failed to fetch historic block. This is fine when the network just went through a - // dump/restore upgrade and we don't have any information before genesis. We treat the - // database as unsynced and will proceed to either use checkpoints or sync iteratively. - n.logger.Warn("failed to fetch historic block", - "err", err, - "round", version, - ) - } - } - - n.syncedLock.RLock() - cachedLastRound := n.syncedState.Round - n.syncedLock.RUnlock() - if cachedLastRound == defaultUndefinedRound || cachedLastRound < genesisBlock.Header.Round { - cachedLastRound = n.undefinedRound - } - - // Initialize genesis from the runtime descriptor. - isInitialStartup := (cachedLastRound == n.undefinedRound) - if isInitialStartup { - n.statusLock.Lock() - n.status = api.StatusInitializingGenesis - n.statusLock.Unlock() - - var rt *registryApi.Runtime - rt, err = n.commonNode.Runtime.ActiveDescriptor(n.ctx) - if err != nil { - n.logger.Error("failed to retrieve runtime registry descriptor", - "err", err, - ) - return - } - if err = n.initGenesis(rt, genesisBlock); err != nil { - n.logger.Error("failed to initialize storage at genesis", - "err", err, - ) - return - } - } - - // Notify the checkpointer of the genesis round so it can be checkpointed. - if n.checkpointer != nil { - n.checkpointer.ForceCheckpoint(genesisBlock.Header.Round) - n.checkpointer.Flush() - } - - // Check if we are able to fetch the first block that we would be syncing if we used iterative - // syncing. In case we cannot (likely because we synced the consensus layer via state sync), we - // must wait for a later checkpoint to become available. - if !n.checkpointSyncForced { - n.statusLock.Lock() - n.status = api.StatusSyncStartCheck - n.statusLock.Unlock() - - // Determine what is the first round that we would need to sync. - iterativeSyncStart := cachedLastRound - if iterativeSyncStart == n.undefinedRound { - iterativeSyncStart++ - } - - // Check if we actually have information about that round. This assumes that any reindexing - // was already performed (the common node would not indicate being initialized otherwise). - _, err = n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, iterativeSyncStart) - SyncStartCheck: - switch { - case err == nil: - case errors.Is(err, roothashApi.ErrNotFound): - // No information is available about the initial round. Query the earliest historic - // block and check if that block has the genesis state root and empty I/O root. - var earlyBlk *block.Block - earlyBlk, err = n.commonNode.Runtime.History().GetEarliestBlock(n.ctx) - switch err { - case nil: - // Make sure the state root is still the same as at genesis time. - if !earlyBlk.Header.StateRoot.Equal(&genesisBlock.Header.StateRoot) { - break - } - // Make sure the I/O root is empty. - if !earlyBlk.Header.IORoot.IsEmpty() { - break - } - - // If this is the case, we can start syncing from this round instead. Fill in the - // remaining versions to make sure they actually exist in the database. - n.logger.Debug("filling in versions to genesis", - "genesis_round", genesisBlock.Header.Round, - "earliest_round", earlyBlk.Header.Round, - ) - for v := genesisBlock.Header.Round; v < earlyBlk.Header.Round; v++ { - err = n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ - Namespace: n.commonNode.Runtime.ID(), - RootType: storageApi.RootTypeState, - SrcRound: v, - SrcRoot: genesisBlock.Header.StateRoot, - DstRound: v + 1, - DstRoot: genesisBlock.Header.StateRoot, - WriteLog: nil, // No changes. - }) - switch err { - case nil: - case storageApi.ErrAlreadyFinalized: - // Ignore already finalized versions. - continue - default: - n.logger.Error("failed to fill in version", - "version", v, - "err", err, - ) - return - } - - err = n.localStorage.NodeDB().Finalize([]storageApi.Root{{ - Namespace: n.commonNode.Runtime.ID(), - Version: v + 1, - Type: storageApi.RootTypeState, - Hash: genesisBlock.Header.StateRoot, - // We can ignore I/O roots. - }}) - if err != nil { - n.logger.Error("failed to finalize filled in version", - "version", v, - "err", err, - ) - return - } - } - cachedLastRound, err = n.flushSyncedState(summaryFromBlock(earlyBlk)) - if err != nil { - n.logger.Error("failed to flush synced state", - "err", err, - ) - return - } - // No need to force a checkpoint sync. - break SyncStartCheck - default: - // This should never happen as the block should exist. - n.logger.Warn("failed to query earliest block in local history", - "err", err, - ) - } - - // No information is available about this round, force checkpoint sync. - n.logger.Warn("forcing checkpoint sync as we don't have authoritative block info", - "round", iterativeSyncStart, - ) - n.checkpointSyncForced = true - default: - // Unknown error while fetching block information, abort. - n.logger.Error("failed to query block", - "err", err, - ) - return - } - } - - n.logger.Info("worker initialized", - "genesis_round", genesisBlock.Header.Round, - "last_synced", cachedLastRound, - ) - - lastFullyAppliedRound := cachedLastRound - - // Try to perform initial sync from state and io checkpoints if either: - // - // - Checkpoint sync has been forced because there is insufficient information available to use - // incremental sync. - // - // - We haven't synced anything yet and checkpoint sync is not disabled. - // - // If checkpoint sync is disabled but sync has been forced (e.g. because the state at genesis - // is non-empty), we must request to sync the checkpoint at genesis as otherwise we will jump - // to a later state which may not be desired given that checkpoint sync has been explicitly - // disabled via config. - // - if (isInitialStartup && !n.checkpointSyncCfg.Disabled) || n.checkpointSyncForced { - n.statusLock.Lock() - n.status = api.StatusSyncingCheckpoints - n.statusLock.Unlock() - - var ( - summary *blockSummary - attempt int - ) - CheckpointSyncRetry: - for { - summary, err = n.syncCheckpoints(genesisBlock.Header.Round, n.checkpointSyncCfg.Disabled) - if err == nil { - break - } - - attempt++ - switch n.checkpointSyncForced { - case true: - // We have no other options but to perform a checkpoint sync as we are missing - // either state or authoritative blocks. - n.logger.Info("checkpoint sync required, retrying", - "err", err, - "attempt", attempt, - ) - case false: - if attempt > 1 { - break CheckpointSyncRetry - } - - // Try syncing again. The main reason for this is the sync failing due to a - // checkpoint pruning race condition (where nodes list a checkpoint which is - // then deleted just before we request its chunks). One retry is enough. - n.logger.Info("first checkpoint sync failed, trying once more", "err", err) - } - - // Delay before retrying. - select { - case <-time.After(checkpointSyncRetryDelay): - case <-n.ctx.Done(): - return - } - } - if err != nil { - n.logger.Info("checkpoint sync failed", "err", err) - } else { - cachedLastRound, err = n.flushSyncedState(summary) - if err != nil { - n.logger.Error("failed to flush synced state", - "err", err, - ) - return - } - lastFullyAppliedRound = cachedLastRound - n.logger.Info("checkpoint sync succeeded", - logging.LogEvent, LogEventCheckpointSyncSuccess, - ) - } - } - close(n.initCh) - - // Don't register availability immediately, we want to know first how far behind consensus we are. - latestBlockRound := n.undefinedRound - - heartbeat := heartbeat{} - heartbeat.reset() - - var wg sync.WaitGroup - syncingRounds := make(map[uint64]*inFlight) - summaryCache := make(map[uint64]*blockSummary) - triggerRoundFetches := func() { - for i := lastFullyAppliedRound + 1; i <= latestBlockRound; i++ { - syncing, ok := syncingRounds[i] - if ok && syncing.outstanding.hasAll() { - continue - } - - if !ok { - if len(syncingRounds) >= maxInFlightRounds { - break - } - - syncing = &inFlight{ - startedAt: time.Now(), - awaitingRetry: outstandingMaskFull, - } - syncingRounds[i] = syncing - - if i == latestBlockRound { - storageWorkerLastPendingRound.With(n.getMetricLabels()).Set(float64(i)) - } - } - n.logger.Debug("preparing round sync", - "round", i, - "outstanding_mask", syncing.outstanding, - "awaiting_retry", syncing.awaitingRetry, - ) - - prev := summaryCache[i-1] - this := summaryCache[i] - prevRoots := make([]storageApi.Root, len(prev.Roots)) - copy(prevRoots, prev.Roots) - for i := range prevRoots { - if prevRoots[i].Type == storageApi.RootTypeIO { - // IO roots aren't chained, so clear it (but leave cache intact). - prevRoots[i] = storageApi.Root{ - Namespace: this.Namespace, - Version: this.Round, - Type: storageApi.RootTypeIO, - } - prevRoots[i].Hash.Empty() - break - } - } - - for i := range prevRoots { - rootType := prevRoots[i].Type - if !syncing.outstanding.contains(rootType) && syncing.awaitingRetry.contains(rootType) { - syncing.scheduleDiff(rootType) - wg.Add(1) - n.fetchPool.Submit(func() { - defer wg.Done() - n.fetchDiff(this.Round, prevRoots[i], this.Roots[i]) - }) - } - } - } - } - - n.statusLock.Lock() - n.status = api.StatusSyncingRounds - n.statusLock.Unlock() - - pendingApply := &minRoundQueue{} - pendingFinalize := &minRoundQueue{} - - // Main processing loop. When a new block arrives, its state and I/O roots are inspected. - // If missing locally, diffs are fetched from peers, possibly for many rounds in parallel, - // including all missing rounds since the last fully applied one. Fetched diffs are then applied - // in round order, ensuring no gaps. Once a round has all its roots applied, background finalization - // for that round is triggered asynchronously, not blocking concurrent fetching and diff application. -mainLoop: - for { - // Drain the Apply and Finalize queues first, before waiting for new events in the select below. - - // Apply fetched writelogs, but only if they are for the round after the last fully applied one - // and current number of pending roots to be finalized is smaller than max allowed. - applyNext := pendingApply.Len() > 0 && - lastFullyAppliedRound+1 == (*pendingApply)[0].GetRound() && - pendingFinalize.Len() < dbApi.MaxPendingVersions-1 // -1 since one may be already finalizing. - if applyNext { - lastDiff := heap.Pop(pendingApply).(*fetchedDiff) - // Apply the write log if one exists. - err = nil - if lastDiff.fetched { - err = n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ - Namespace: lastDiff.thisRoot.Namespace, - RootType: lastDiff.thisRoot.Type, - SrcRound: lastDiff.prevRoot.Version, - SrcRoot: lastDiff.prevRoot.Hash, - DstRound: lastDiff.thisRoot.Version, - DstRoot: lastDiff.thisRoot.Hash, - WriteLog: lastDiff.writeLog, - }) - switch { - case err == nil: - lastDiff.pf.RecordSuccess() - case errors.Is(err, storageApi.ErrExpectedRootMismatch): - lastDiff.pf.RecordBadPeer() - default: - n.logger.Error("can't apply write log", - "err", err, - "old_root", lastDiff.prevRoot, - "new_root", lastDiff.thisRoot, - ) - lastDiff.pf.RecordSuccess() - } - } - - syncing := syncingRounds[lastDiff.round] - if err != nil { - syncing.retry(lastDiff.thisRoot.Type) - continue - } - syncing.outstanding.remove(lastDiff.thisRoot.Type) - if !syncing.outstanding.isEmpty() || !syncing.awaitingRetry.isEmpty() { - continue - } - - // We have fully synced the given round. - n.logger.Debug("finished syncing round", "round", lastDiff.round) - delete(syncingRounds, lastDiff.round) - summary := summaryCache[lastDiff.round] - delete(summaryCache, lastDiff.round-1) - lastFullyAppliedRound = lastDiff.round - - storageWorkerLastSyncedRound.With(n.getMetricLabels()).Set(float64(lastDiff.round)) - storageWorkerRoundSyncLatency.With(n.getMetricLabels()).Observe(time.Since(syncing.startedAt).Seconds()) - - // Finalize storage for this round. This happens asynchronously - // with respect to Apply operations for subsequent rounds. - heap.Push(pendingFinalize, summary) - - continue - } - - // Check if any new rounds were fully applied and need to be finalized. - // Only finalize if it's the round after the one that was finalized last. - // As a consequence at most one finalization can be happening at the time. - if len(*pendingFinalize) > 0 && cachedLastRound+1 == (*pendingFinalize)[0].GetRound() { - lastSummary := heap.Pop(pendingFinalize).(*blockSummary) - wg.Add(1) - go func() { // Don't block fetching and applying remaining rounds. - defer wg.Done() - n.finalize(lastSummary) - }() - continue - } - - select { - case inBlk := <-n.blockCh.Out(): - blk := inBlk.(*block.Block) - n.logger.Debug("incoming block", - "round", blk.Header.Round, - "last_synced", lastFullyAppliedRound, - "last_finalized", cachedLastRound, - ) - - // Check if we're far enough to reasonably register as available. - latestBlockRound = blk.Header.Round - n.nudgeAvailability(cachedLastRound, latestBlockRound) - - if _, ok := summaryCache[lastFullyAppliedRound]; !ok && lastFullyAppliedRound == n.undefinedRound { - dummy := blockSummary{ - Namespace: blk.Header.Namespace, - Round: lastFullyAppliedRound + 1, - Roots: []storageApi.Root{ - { - Version: lastFullyAppliedRound + 1, - Type: storageApi.RootTypeIO, - }, - { - Version: lastFullyAppliedRound + 1, - Type: storageApi.RootTypeState, - }, - }, - } - dummy.Roots[0].Empty() - dummy.Roots[1].Empty() - summaryCache[lastFullyAppliedRound] = &dummy - } - // Determine if we need to fetch any old block summaries. In case the first - // round is an undefined round, we need to start with the following round - // since the undefined round may be unsigned -1 and in this case the loop - // would not do any iterations. - startSummaryRound := lastFullyAppliedRound - if startSummaryRound == n.undefinedRound { - startSummaryRound++ - } - for i := startSummaryRound; i < blk.Header.Round; i++ { - if _, ok := summaryCache[i]; ok { - continue - } - var oldBlock *block.Block - oldBlock, err = n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, i) - if err != nil { - n.logger.Error("can't get block for round", - "err", err, - "round", i, - "current_round", blk.Header.Round, - ) - panic("can't get block in storage worker") - } - summaryCache[i] = summaryFromBlock(oldBlock) - } - if _, ok := summaryCache[blk.Header.Round]; !ok { - summaryCache[blk.Header.Round] = summaryFromBlock(blk) - } - - triggerRoundFetches() - heartbeat.reset() - - case <-heartbeat.C: - if latestBlockRound != n.undefinedRound { - n.logger.Debug("heartbeat", "in_flight_rounds", len(syncingRounds)) - triggerRoundFetches() - } - - case item := <-n.diffCh: - if item.err != nil { - n.logger.Error("error calling getdiff", - "err", item.err, - "round", item.round, - "old_root", item.prevRoot, - "new_root", item.thisRoot, - "fetched", item.fetched, - ) - syncingRounds[item.round].retry(item.thisRoot.Type) - break - } - - heap.Push(pendingApply, item) - // Item was successfully processed, trigger more round fetches. - // This ensures that new rounds are processed as fast as possible - // when we're syncing and are far behind. - triggerRoundFetches() - - case finalized := <-n.finalizeCh: - // If finalization failed, things start falling apart. - // There's no point redoing it, since it's probably not a transient - // error, and cachedLastRound also can't be updated legitimately. - if finalized.err != nil { - // Request a node shutdown given that syncing is effectively blocked. - _ = n.commonNode.HostNode.RequestShutdown(n.ctx, false) - break mainLoop - } - - // No further sync or out of order handling needed here, since - // only one finalize at a time is triggered (for round cachedLastRound+1) - cachedLastRound, err = n.flushSyncedState(finalized.summary) - if err != nil { - n.logger.Error("failed to flush synced state", - "err", err, - ) - } - storageWorkerLastFullRound.With(n.getMetricLabels()).Set(float64(finalized.summary.Round)) - - // Check if we're far enough to reasonably register as available. - n.nudgeAvailability(cachedLastRound, latestBlockRound) - - // Notify the checkpointer that there is a new finalized round. - if config.GlobalConfig.Storage.Checkpointer.Enabled { - n.checkpointer.NotifyNewVersion(finalized.summary.Round) - } - - case <-n.ctx.Done(): - break mainLoop - } - } - - wg.Wait() - // blockCh will be garbage-collected without being closed. It can potentially still contain - // some new blocks, but only as many as were already in-flight at the point when the main - // context was canceled. -} - -type pruneHandler struct { - logger *logging.Logger - node *Node -} - -func (p *pruneHandler) Prune(rounds []uint64) error { - // Make sure we never prune past what was synced. - lastSycnedRound, _, _ := p.node.GetLastSynced() - - for _, round := range rounds { - if round >= lastSycnedRound { - return fmt.Errorf("worker/storage: tried to prune past last synced round (last synced: %d)", - lastSycnedRound, - ) - } - - // TODO: Make sure we don't prune rounds that need to be checkpointed but haven't been yet. - - p.logger.Debug("pruning storage for round", "round", round) - - // Prune given block. - err := p.node.localStorage.NodeDB().Prune(round) - switch err { - case nil: - case mkvsDB.ErrNotEarliest: - p.logger.Debug("skipping non-earliest round", - "round", round, - ) - continue - default: - p.logger.Error("failed to prune block", - "err", err, - ) - return err - } - } - - return nil -} diff --git a/go/worker/storage/p2p/diffsync/client.go b/go/worker/storage/p2p/diffsync/client.go index 4c0363fe21d..1be235bbc42 100644 --- a/go/worker/storage/p2p/diffsync/client.go +++ b/go/worker/storage/p2p/diffsync/client.go @@ -20,6 +20,8 @@ const ( type Client interface { // GetDiff requests a write log of entries that must be applied to get from the first given root // to the second one. + // + // The request times out in [MaxGetDiffResponseTime]. GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) } diff --git a/go/worker/storage/p2p/synclegacy/client.go b/go/worker/storage/p2p/synclegacy/client.go index 702fea0fdad..434b18bb3f2 100644 --- a/go/worker/storage/p2p/synclegacy/client.go +++ b/go/worker/storage/p2p/synclegacy/client.go @@ -24,6 +24,8 @@ const ( type Client interface { // GetDiff requests a write log of entries that must be applied to get from the first given root // to the second one. + // + // The request times out in [MaxGetDiffResponseTime]. GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) // GetCheckpoints returns a list of checkpoint metadata for all known checkpoints. diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/statesync/checkpoint_sync.go similarity index 79% rename from go/worker/storage/committee/checkpoint_sync.go rename to go/worker/storage/statesync/checkpoint_sync.go index ad553272a90..5cd04b4695a 100644 --- a/go/worker/storage/committee/checkpoint_sync.go +++ b/go/worker/storage/statesync/checkpoint_sync.go @@ -1,4 +1,4 @@ -package committee +package statesync import ( "bytes" @@ -21,7 +21,7 @@ import ( const ( // cpListsTimeout is the timeout for fetching checkpoints from all nodes. cpListsTimeout = 30 * time.Second - // cpRestoreTimeout is the timeout for restoring a checkpoint chunk from a node. + // cpRestoreTimeout is the timeout for restoring a checkpoint chunk from the remote peer. cpRestoreTimeout = 60 * time.Second checkpointStatusDone = 0 @@ -37,7 +37,7 @@ var ErrNoUsableCheckpoints = errors.New("storage: no checkpoint could be synced" // CheckpointSyncConfig is the checkpoint sync configuration. type CheckpointSyncConfig struct { - // Disabled specifies whether checkpoint sync should be disabled. In this case the node will + // Disabled specifies whether checkpoint sync should be disabled. In this case the state sync worker will // only sync by applying all diffs from genesis. Disabled bool @@ -81,7 +81,7 @@ func (h *chunkHeap) Pop() any { return ret } -func (n *Node) checkpointChunkFetcher( +func (w *Worker) checkpointChunkFetcher( ctx context.Context, chunkDispatchCh chan *chunk, chunkReturnCh chan *chunk, @@ -103,9 +103,9 @@ func (n *Node) checkpointChunkFetcher( defer cancel() // Fetch chunk from peers. - rsp, pf, err := n.fetchChunk(chunkCtx, chunk) + rsp, pf, err := w.fetchChunk(chunkCtx, chunk) if err != nil { - n.logger.Error("failed to fetch chunk from peers", + w.logger.Error("failed to fetch chunk from peers", "err", err, "chunk", chunk.Index, ) @@ -114,7 +114,7 @@ func (n *Node) checkpointChunkFetcher( } // Restore fetched chunk. - done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp)) + done, err := w.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp)) cancel() switch { @@ -124,7 +124,7 @@ func (n *Node) checkpointChunkFetcher( chunkReturnCh <- nil return case err != nil: - n.logger.Error("chunk restoration failed", + w.logger.Error("chunk restoration failed", "chunk", chunk.Index, "root", chunk.Root, "err", err, @@ -157,8 +157,8 @@ func (n *Node) checkpointChunkFetcher( // fetchChunk fetches chunk using checkpoint sync p2p protocol client. // // In case of no peers or error, it fallbacks to the legacy storage sync protocol. -func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) { - rsp1, pf, err := n.checkpointSync.GetCheckpointChunk( +func (w *Worker) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) { + rsp1, pf, err := w.checkpointSync.GetCheckpointChunk( ctx, &checkpointsync.GetCheckpointChunkRequest{ Version: chunk.Version, @@ -175,7 +175,7 @@ func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFe return rsp1.Chunk, pf, nil } - rsp2, pf, err := n.legacyStorageSync.GetCheckpointChunk( + rsp2, pf, err := w.legacyStorageSync.GetCheckpointChunk( ctx, &synclegacy.GetCheckpointChunkRequest{ Version: chunk.Version, @@ -194,8 +194,8 @@ func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFe return rsp2.Chunk, pf, nil } -func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { - if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil { +func (w *Worker) handleCheckpoint(ctx context.Context, check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { + if err := w.localStorage.Checkpointer().StartRestore(ctx, check.Metadata); err != nil { // Any previous restores were already aborted by the driver up the call stack, so // things should have been going smoothly here; bail. return checkpointStatusBail, fmt.Errorf("can't start checkpoint restore: %w", err) @@ -208,9 +208,9 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq } // Abort has to succeed even if we were interrupted by context cancellation. ctx := context.Background() - if err := n.localStorage.Checkpointer().AbortRestore(ctx); err != nil { + if err := w.localStorage.Checkpointer().AbortRestore(ctx); err != nil { cpStatus = checkpointStatusBail - n.logger.Error("error while aborting checkpoint restore on handler exit, aborting sync", + w.logger.Error("error while aborting checkpoint restore on handler exit, aborting sync", "err", err, ) } @@ -222,7 +222,7 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq chunkReturnCh := make(chan *chunk, maxParallelRequests) errorCh := make(chan int, maxParallelRequests) - ctx, cancel := context.WithCancel(n.ctx) + chunkCtx, cancel := context.WithCancel(ctx) // Spawn the worker group to fetch and restore checkpoint chunks. var workerGroup sync.WaitGroup @@ -231,7 +231,7 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq workerGroup.Add(1) go func() { defer workerGroup.Done() - n.checkpointChunkFetcher(ctx, chunkDispatchCh, chunkReturnCh, errorCh) + w.checkpointChunkFetcher(chunkCtx, chunkDispatchCh, chunkReturnCh, errorCh) }() } go func() { @@ -264,7 +264,7 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq checkpoint: check, }) } - n.logger.Debug("checkpoint chunks prepared for dispatch", + w.logger.Debug("checkpoint chunks prepared for dispatch", "chunks", len(check.Chunks), "checkpoint_root", check.Root, ) @@ -283,8 +283,8 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq } select { - case <-n.ctx.Done(): - return checkpointStatusBail, n.ctx.Err() + case <-ctx.Done(): + return checkpointStatusBail, ctx.Err() case returned := <-chunkReturnCh: if returned == nil { @@ -313,13 +313,13 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq } } -func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { - ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout) +func (w *Worker) getCheckpointList(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { + ctx, cancel := context.WithTimeout(ctx, cpListsTimeout) defer cancel() - list, err := n.fetchCheckpoints(ctx) + list, err := w.fetchCheckpoints(ctx) if err != nil { - n.logger.Error("failed to retrieve any checkpoints", + w.logger.Error("failed to retrieve any checkpoints", "err", err, ) return nil, err @@ -334,15 +334,15 @@ func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { // fetchCheckpoints fetches checkpoints using checkpoint sync p2p protocol client. // // In case of no peers, error or no checkpoints, it fallbacks to the legacy storage sync protocol. -func (n *Node) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { - list1, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ +func (w *Worker) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { + list1, err := w.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ Version: 1, }) if err == nil && len(list1) > 0 { // if NO error and at least one checkpoint return list1, nil } - list2, err := n.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{ + list2, err := w.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{ Version: 1, }) if err != nil { @@ -369,8 +369,8 @@ func sortCheckpoints(s []*checkpointsync.Checkpoint) { }) } -func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { - namespace := n.commonNode.Runtime.ID() +func (w *Worker) checkCheckpointUsable(ctx context.Context, cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { + namespace := w.commonNode.Runtime.ID() if !namespace.Equal(&cp.Root.Namespace) { // Not for the right runtime. return false @@ -380,12 +380,12 @@ func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMas return false } - blk, err := n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, cp.Root.Version) + blk, err := w.commonNode.Runtime.History().GetCommittedBlock(ctx, cp.Root.Version) if err != nil { - n.logger.Error("can't get block information for checkpoint, skipping", "err", err, "root", cp.Root) + w.logger.Error("can't get block information for checkpoint, skipping", "err", err, "root", cp.Root) return false } - _, lastIORoot, lastStateRoot := n.GetLastSynced() + _, lastIORoot, lastStateRoot := w.GetLastSynced() lastVersions := map[storageApi.RootType]uint64{ storageApi.RootTypeIO: lastIORoot.Version, storageApi.RootTypeState: lastStateRoot.Version, @@ -401,18 +401,18 @@ func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMas } } } - n.logger.Info("checkpoint for unknown root skipped", "root", cp.Root) + w.logger.Info("checkpoint for unknown root skipped", "root", cp.Root) return false } -func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*blockSummary, error) { +func (w *Worker) syncCheckpoints(ctx context.Context, genesisRound uint64, wantOnlyGenesis bool) (*blockSummary, error) { // Store roots and round info for checkpoints that finished syncing. // Round and namespace info will get overwritten as rounds are skipped // for errors, driven by remainingRoots. var syncState blockSummary // Fetch checkpoints from peers. - cps, err := n.getCheckpointList() + cps, err := w.getCheckpointList(ctx) if err != nil { return nil, fmt.Errorf("can't get checkpoint list from peers: %w", err) } @@ -440,8 +440,8 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc if !multipartRunning { return } - if err := n.localStorage.NodeDB().AbortMultipartInsert(); err != nil { - n.logger.Error("error aborting multipart restore on exit from syncer", + if err := w.localStorage.NodeDB().AbortMultipartInsert(); err != nil { + w.logger.Error("error aborting multipart restore on exit from syncer", "err", err, ) } @@ -449,7 +449,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc for _, check := range cps { - if check.Root.Version < genesisRound || !n.checkCheckpointUsable(check, remainingRoots, genesisRound) { + if check.Root.Version < genesisRound || !w.checkCheckpointUsable(ctx, check, remainingRoots, genesisRound) { continue } @@ -458,10 +458,10 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc // previous retries. Aborting multipart works with no multipart in // progress too. multipartRunning = false - if err := n.localStorage.NodeDB().AbortMultipartInsert(); err != nil { + if err := w.localStorage.NodeDB().AbortMultipartInsert(); err != nil { return nil, fmt.Errorf("error aborting previous multipart restore: %w", err) } - if err := n.localStorage.NodeDB().StartMultipartInsert(check.Root.Version); err != nil { + if err := w.localStorage.NodeDB().StartMultipartInsert(check.Root.Version); err != nil { return nil, fmt.Errorf("error starting multipart insert for round %d: %w", check.Root.Version, err) } multipartRunning = true @@ -486,18 +486,19 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc } } - status, err := n.handleCheckpoint(check, n.checkpointSyncCfg.ChunkFetcherCount) + // Suggestion: Limit the max time for restoring checkpoint. + status, err := w.handleCheckpoint(ctx, check, w.checkpointSyncCfg.ChunkFetcherCount) switch status { case checkpointStatusDone: - n.logger.Info("successfully restored from checkpoint", "root", check.Root, "mask", mask) + w.logger.Info("successfully restored from checkpoint", "root", check.Root, "mask", mask) syncState.Namespace = check.Root.Namespace syncState.Round = check.Root.Version syncState.Roots = append(syncState.Roots, check.Root) remainingRoots.remove(check.Root.Type) if remainingRoots.isEmpty() { - if err = n.localStorage.NodeDB().Finalize(syncState.Roots); err != nil { - n.logger.Error("can't finalize version after all checkpoints restored", + if err = w.localStorage.NodeDB().Finalize(syncState.Roots); err != nil { + w.logger.Error("can't finalize version after all checkpoints restored", "err", err, "version", prevVersion, "roots", syncState.Roots, @@ -510,10 +511,10 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc } continue case checkpointStatusNext: - n.logger.Info("error trying to restore from checkpoint, trying next most recent", "root", check.Root, "err", err) + w.logger.Info("error trying to restore from checkpoint, trying next most recent", "root", check.Root, "err", err) continue case checkpointStatusBail: - n.logger.Error("error trying to restore from checkpoint, unrecoverable", "root", check.Root, "err", err) + w.logger.Error("error trying to restore from checkpoint, unrecoverable", "root", check.Root, "err", err) return nil, fmt.Errorf("error restoring from checkpoints: %w", err) } } diff --git a/go/worker/storage/committee/checkpoint_sync_test.go b/go/worker/storage/statesync/checkpoint_sync_test.go similarity index 98% rename from go/worker/storage/committee/checkpoint_sync_test.go rename to go/worker/storage/statesync/checkpoint_sync_test.go index d39e50f3239..c9ac133c1bd 100644 --- a/go/worker/storage/committee/checkpoint_sync_test.go +++ b/go/worker/storage/statesync/checkpoint_sync_test.go @@ -1,4 +1,4 @@ -package committee +package statesync import ( "testing" diff --git a/go/worker/storage/statesync/checkpointer.go b/go/worker/storage/statesync/checkpointer.go new file mode 100644 index 00000000000..9e72cf4285d --- /dev/null +++ b/go/worker/storage/statesync/checkpointer.go @@ -0,0 +1,213 @@ +package statesync + +import ( + "context" + "fmt" + + "github.com/oasisprotocol/oasis-core/go/common/pubsub" + "github.com/oasisprotocol/oasis-core/go/config" + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + roothashApi "github.com/oasisprotocol/oasis-core/go/roothash/api" + storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" + "github.com/oasisprotocol/oasis-core/go/worker/common/committee" +) + +const ( + // chunkerThreads is target number of subtrees during parallel checkpoint creation. + // It is intentionally non-configurable since we want operators to produce + // same checkpoint hashes. The current value was chosen based on the benchmarks + // done on the modern developer machine. + chunkerThreads = 12 +) + +func (w *Worker) newCheckpointer(ctx context.Context, commonNode *committee.Node, localStorage storageApi.LocalBackend) (checkpoint.Checkpointer, error) { + checkInterval := checkpoint.CheckIntervalDisabled + if config.GlobalConfig.Storage.Checkpointer.Enabled { + checkInterval = config.GlobalConfig.Storage.Checkpointer.CheckInterval + } + checkpointerCfg := checkpoint.CheckpointerConfig{ + Name: "runtime", + Namespace: commonNode.Runtime.ID(), + CheckInterval: checkInterval, + RootsPerVersion: 2, // State root and I/O root. + GetParameters: func(ctx context.Context) (*checkpoint.CreationParameters, error) { + rt, rerr := commonNode.Runtime.ActiveDescriptor(ctx) + if rerr != nil { + return nil, fmt.Errorf("failed to retrieve runtime descriptor: %w", rerr) + } + + blk, rerr := commonNode.Consensus.RootHash().GetGenesisBlock(ctx, &roothashApi.RuntimeRequest{ + RuntimeID: rt.ID, + Height: consensus.HeightLatest, + }) + if rerr != nil { + return nil, fmt.Errorf("failed to retrieve genesis block: %w", rerr) + } + + var threads uint16 + if config.GlobalConfig.Storage.Checkpointer.ParallelChunker { + threads = chunkerThreads + } + + return &checkpoint.CreationParameters{ + Interval: rt.Storage.CheckpointInterval, + NumKept: rt.Storage.CheckpointNumKept, + ChunkSize: rt.Storage.CheckpointChunkSize, + InitialVersion: blk.Header.Round, + ChunkerThreads: threads, + }, nil + }, + GetRoots: func(ctx context.Context, version uint64) ([]storageApi.Root, error) { + blk, berr := commonNode.Runtime.History().GetCommittedBlock(ctx, version) + if berr != nil { + return nil, berr + } + + return blk.Header.StorageRoots(), nil + }, + } + + return checkpoint.NewCheckpointer( + ctx, + localStorage.NodeDB(), + localStorage.Checkpointer(), + checkpointerCfg, + ) +} + +// createCheckpoints is a worker responsible for triggering creation of runtime +// checkpoint everytime a consensus checkpoint is created. +// +// The reason why we do this is to make it faster for storage nodes that use consensus state sync +// to catch up as exactly the right checkpoint will be available. +func (w *Worker) createCheckpoints(ctx context.Context) { + consensusCp := w.commonNode.Consensus.Checkpointer() + if consensusCp == nil { + return + } + + // Wait for the common node to be initialized. + select { + case <-w.commonNode.Initialized(): + case <-ctx.Done(): + return + } + + // Determine the maximum number of consensus checkpoints to keep. + consensusParams, err := w.commonNode.Consensus.Core().GetParameters(ctx, consensus.HeightLatest) + if err != nil { + w.logger.Error("failed to fetch consensus parameters", + "err", err, + ) + return + } + + ch, sub, err := consensusCp.WatchCheckpoints() + if err != nil { + w.logger.Error("failed to watch checkpoints", + "err", err, + ) + return + } + defer sub.Close() + + var ( + versions []uint64 + blkCh <-chan *consensus.Block + blkSub pubsub.ClosableSubscription + ) + defer func() { + if blkCh != nil { + blkSub.Close() + blkSub = nil + blkCh = nil + } + }() + for { + select { + case <-ctx.Done(): + return + case version := <-ch: + // We need to wait for the next version as that is what will be in the consensus + // checkpoint. + versions = append(versions, version+1) + // Make sure that we limit the size of the checkpoint queue. + if uint64(len(versions)) > consensusParams.Parameters.StateCheckpointNumKept { + versions = versions[1:] + } + + w.logger.Debug("consensus checkpoint detected, queuing runtime checkpoint", + "version", version+1, + "num_versions", len(versions), + ) + + if blkCh == nil { + blkCh, blkSub, err = w.commonNode.Consensus.Core().WatchBlocks(ctx) + if err != nil { + w.logger.Error("failed to watch blocks", + "err", err, + ) + continue + } + } + case blk := <-blkCh: + // If there's nothing remaining, unsubscribe. + if len(versions) == 0 { + w.logger.Debug("no more queued consensus checkpoint versions") + + blkSub.Close() + blkSub = nil + blkCh = nil + continue + } + + var newVersions []uint64 + for idx, version := range versions { + if version > uint64(blk.Height) { + // We need to wait for further versions. + newVersions = versions[idx:] + break + } + + // Lookup what runtime round corresponds to the given consensus layer version and make + // sure we checkpoint it. + blk, err := w.commonNode.Consensus.RootHash().GetLatestBlock(ctx, &roothashApi.RuntimeRequest{ + RuntimeID: w.commonNode.Runtime.ID(), + Height: int64(version), + }) + if err != nil { + w.logger.Error("failed to get runtime block corresponding to consensus checkpoint", + "err", err, + "height", version, + ) + continue + } + + // We may have not yet synced the corresponding runtime round locally. In this case + // we need to wait until this is the case. + w.syncedLock.RLock() + lastSyncedRound := w.syncedState.Round + w.syncedLock.RUnlock() + if blk.Header.Round > lastSyncedRound { + w.logger.Debug("runtime round not available yet for checkpoint, waiting", + "height", version, + "round", blk.Header.Round, + "last_synced_round", lastSyncedRound, + ) + newVersions = versions[idx:] + break + } + + // Force runtime storage checkpointer to create a checkpoint at this round. + w.logger.Info("consensus checkpoint, force runtime checkpoint", + "height", version, + "round", blk.Header.Round, + ) + + w.checkpointer.ForceCheckpoint(blk.Header.Round) + } + versions = newVersions + } + } +} diff --git a/go/worker/storage/statesync/diff_sync.go b/go/worker/storage/statesync/diff_sync.go new file mode 100644 index 00000000000..0a13578744d --- /dev/null +++ b/go/worker/storage/statesync/diff_sync.go @@ -0,0 +1,461 @@ +package statesync + +import ( + "container/heap" + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/oasisprotocol/oasis-core/go/common/workerpool" + "github.com/oasisprotocol/oasis-core/go/config" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/roothash/api/block" + "github.com/oasisprotocol/oasis-core/go/storage/api" + dbApi "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" +) + +const ( + // maxInFlightRounds is the maximum number of rounds that should be fetched before waiting + // for them to be applied. + maxInFlightRounds = 100 +) + +type roundItem interface { + GetRound() uint64 +} + +// minRoundQueue is a Round()-based min priority queue. +type minRoundQueue []roundItem + +// Sorting interface. +func (q minRoundQueue) Len() int { return len(q) } +func (q minRoundQueue) Less(i, j int) bool { return q[i].GetRound() < q[j].GetRound() } +func (q minRoundQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } + +// Push appends x as the last element in the heap's array. +func (q *minRoundQueue) Push(x any) { + *q = append(*q, x.(roundItem)) +} + +// Pop removes and returns the last element in the heap's array. +func (q *minRoundQueue) Pop() any { + old := *q + n := len(old) + x := old[n-1] + *q = old[0 : n-1] + return x +} + +// fetchedDiff has all the context needed for a single GetDiff operation. +type fetchedDiff struct { + fetched bool + pf rpc.PeerFeedback + err error + round uint64 + prevRoot api.Root + thisRoot api.Root + writeLog api.WriteLog +} + +func (d *fetchedDiff) GetRound() uint64 { + return d.round +} + +type finalizedResult struct { + summary *blockSummary + err error +} + +// syncDiffs is responsible for fetching, applying and finalizing storage diffs +// as the new runtimes block headers arrive from the consensus service. +// +// In addition, it is also responsible for updating availability of the registration +// service and notifying block history and checkpointer of the newly finalized rounds. +// +// Suggestion: Ideally syncDiffs is refactored into independent worker and made only +// responsible for the syncing. +func (w *Worker) syncDiffs( + ctx context.Context, + lastFinalizedRound uint64, +) error { + syncingRounds := make(map[uint64]*inFlight) + summaryCache := make(map[uint64]*blockSummary) + pendingApply := &minRoundQueue{} + pendingFinalize := &minRoundQueue{} // Suggestion: slice would suffice given that application must happen in order. + + diffCh := make(chan *fetchedDiff) + finalizedCh := make(chan finalizedResult) + + fetchPool := workerpool.New("storage_fetch/" + w.commonNode.Runtime.ID().String()) + fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount) + defer fetchPool.Stop() + fetchCtx, cancel := context.WithCancel(ctx) + defer cancel() + + heartbeat := heartbeat{} + heartbeat.reset() + defer heartbeat.Stop() + + var wg sync.WaitGroup + defer wg.Wait() + + lastFullyAppliedRound := lastFinalizedRound + // Don't register availability immediately, we want to know first how far behind consensus we are. + latestBlockRound := w.undefinedRound + for { + // Drain the Apply and Finalize queues first, before waiting for new events in the select below. + + // Apply fetched writelogs, but only if they are for the round after the last fully applied one + // and current number of pending roots to be finalized is smaller than max allowed. + applyNext := pendingApply.Len() > 0 && + lastFullyAppliedRound+1 == (*pendingApply)[0].GetRound() && + pendingFinalize.Len() < dbApi.MaxPendingVersions-1 // -1 since one may be already finalizing. + if applyNext { + lastDiff := heap.Pop(pendingApply).(*fetchedDiff) + err := w.apply(ctx, lastDiff) + + syncing := syncingRounds[lastDiff.round] + if err != nil { + syncing.retry(lastDiff.thisRoot.Type) + continue + } + syncing.outstanding.remove(lastDiff.thisRoot.Type) + if !syncing.outstanding.isEmpty() || !syncing.awaitingRetry.isEmpty() { + continue + } + + // We have fully synced the given round. + w.logger.Debug("finished syncing round", "round", lastDiff.round) + delete(syncingRounds, lastDiff.round) + summary := summaryCache[lastDiff.round] + delete(summaryCache, lastDiff.round-1) + lastFullyAppliedRound = lastDiff.round + + // Suggestion: Rename to lastAppliedRoundMetric, as synced is synonim for finalized in this code. + storageWorkerLastSyncedRound.With(w.getMetricLabels()).Set(float64(lastDiff.round)) + // Suggestion: Ideally this would be recorded once the round is finalized (synced). + storageWorkerRoundSyncLatency.With(w.getMetricLabels()).Observe(time.Since(syncing.startedAt).Seconds()) + + // Trigger finalization for this round, that will happen concurently + // with respect to Apply operations for subsequent rounds. + heap.Push(pendingFinalize, summary) + + continue + } + + // Check if any new rounds were fully applied and need to be finalized. + // Only finalize if it's the round after the one that was finalized last. + // As a consequence at most one finalization can be happening at the time. + if len(*pendingFinalize) > 0 && lastFinalizedRound+1 == (*pendingFinalize)[0].GetRound() { + summary := heap.Pop(pendingFinalize).(*blockSummary) + wg.Add(1) + go func() { // Don't block fetching and applying remaining rounds. + defer wg.Done() + w.finalize(ctx, summary, finalizedCh) + }() + continue + } + + select { + case <-ctx.Done(): + return ctx.Err() + case inBlk := <-w.blockCh.Out(): + blk := inBlk.(*block.Block) + w.logger.Debug("incoming block", + "round", blk.Header.Round, + "last_fully_applied", lastFullyAppliedRound, + "last_finalized", lastFinalizedRound, + ) + + // Check if we're far enough to reasonably register as available. + latestBlockRound = blk.Header.Round + // Fixme: If block channel has many pending blocks (e.g. after checkpoint sync), + // nudgeAvailability may incorrectly set the node as available too early. + w.nudgeAvailability(lastFinalizedRound, latestBlockRound) + + if err := w.fetchMissingBlockHeaders(ctx, lastFullyAppliedRound, blk, summaryCache); err != nil { + return fmt.Errorf("failed to fetch missing block headers: %w", err) // Suggestion: databases can fail, consider retrying. + } + + w.triggerRoundFetches(fetchCtx, fetchPool, diffCh, syncingRounds, summaryCache, lastFullyAppliedRound+1, latestBlockRound) + case item := <-diffCh: + if item.err != nil { + w.logger.Error("error calling getdiff", + "err", item.err, + "round", item.round, + "old_root", item.prevRoot, + "new_root", item.thisRoot, + "fetched", item.fetched, + ) + syncingRounds[item.round].retry(item.thisRoot.Type) // Suggestion: Trigger fetches immediately. + break + } + + heap.Push(pendingApply, item) + // Item was successfully processed, trigger more round fetches. + // This ensures that new rounds are processed as fast as possible + // when we're syncing and are far behind. + w.triggerRoundFetches(fetchCtx, fetchPool, diffCh, syncingRounds, summaryCache, lastFullyAppliedRound+1, latestBlockRound) + heartbeat.reset() + case <-heartbeat.C: + if latestBlockRound != w.undefinedRound { + w.logger.Debug("heartbeat", "in_flight_rounds", len(syncingRounds)) + w.triggerRoundFetches(fetchCtx, fetchPool, diffCh, syncingRounds, summaryCache, lastFullyAppliedRound+1, latestBlockRound) + } + case finalized := <-finalizedCh: + if finalized.err != nil { + return fmt.Errorf("failed to finalize (round: %d): %w", finalized.summary.Round, finalized.err) + } + var err error + lastFinalizedRound, err = w.flushSyncedState(finalized.summary) + if err != nil { // Suggestion: DB operations can always fail, consider retrying. + return fmt.Errorf("failed to flush synced state: %w", err) + } + storageWorkerLastFullRound.With(w.getMetricLabels()).Set(float64(finalized.summary.Round)) + + // Check if we're far enough to reasonably register as available. + w.nudgeAvailability(lastFinalizedRound, latestBlockRound) + + // Notify the checkpointer that there is a new finalized round. + if config.GlobalConfig.Storage.Checkpointer.Enabled { + w.checkpointer.NotifyNewVersion(finalized.summary.Round) + } + } + } +} + +func (w *Worker) fetchMissingBlockHeaders(ctx context.Context, lastFullyAppliedRound uint64, blk *block.Block, summaryCache map[uint64]*blockSummary) error { + if _, ok := summaryCache[lastFullyAppliedRound]; !ok && lastFullyAppliedRound == w.undefinedRound { // Suggestion: Helper that is only done once. + dummy := blockSummary{ + Namespace: blk.Header.Namespace, + Round: lastFullyAppliedRound + 1, + Roots: []api.Root{ + { + Version: lastFullyAppliedRound + 1, + Type: api.RootTypeIO, + }, + { + Version: lastFullyAppliedRound + 1, + Type: api.RootTypeState, + }, + }, + } + dummy.Roots[0].Empty() + dummy.Roots[1].Empty() + summaryCache[lastFullyAppliedRound] = &dummy + } + // Determine if we need to fetch any old block summaries. In case the first + // round is an undefined round, we need to start with the following round + // since the undefined round may be unsigned -1 and in this case the loop + // would not do any iterations. + startSummaryRound := lastFullyAppliedRound + if startSummaryRound == w.undefinedRound { + startSummaryRound++ + } + for i := startSummaryRound; i < blk.Header.Round; i++ { + if _, ok := summaryCache[i]; ok { + continue + } + oldBlock, err := w.commonNode.Runtime.History().GetCommittedBlock(ctx, i) + if err != nil { + return fmt.Errorf("getting block for round %d (current round: %d): %w", i, blk.Header.Round, err) + } + summaryCache[i] = summaryFromBlock(oldBlock) + } + if _, ok := summaryCache[blk.Header.Round]; !ok { + summaryCache[blk.Header.Round] = summaryFromBlock(blk) + } + return nil +} + +func (w *Worker) triggerRoundFetches( + ctx context.Context, + fetchPool *workerpool.Pool, + diffCh chan<- *fetchedDiff, + syncingRounds map[uint64]*inFlight, + summaryCache map[uint64]*blockSummary, + start uint64, + end uint64, +) { + for r := start; r <= end; r++ { + syncing, ok := syncingRounds[r] + if ok && syncing.outstanding.hasAll() { + continue + } + + if !ok { + if len(syncingRounds) >= maxInFlightRounds { + break + } + + syncing = &inFlight{ + startedAt: time.Now(), + awaitingRetry: outstandingMaskFull, + } + syncingRounds[r] = syncing + + if r == end { + storageWorkerLastPendingRound.With(w.getMetricLabels()).Set(float64(r)) + } + } + w.logger.Debug("preparing round sync", + "round", r, + "outstanding_mask", syncing.outstanding, + "awaiting_retry", syncing.awaitingRetry, + ) + + prev := summaryCache[r-1] + this := summaryCache[r] + prevRoots := make([]api.Root, len(prev.Roots)) + copy(prevRoots, prev.Roots) + for i := range prevRoots { + if prevRoots[i].Type == api.RootTypeIO { + // IO roots aren't chained, so clear it (but leave cache intact). + prevRoots[i] = api.Root{ + Namespace: this.Namespace, + Version: this.Round, + Type: api.RootTypeIO, + } + prevRoots[i].Hash.Empty() + break + } + } + + for i := range prevRoots { + rootType := prevRoots[i].Type + if !syncing.outstanding.contains(rootType) && syncing.awaitingRetry.contains(rootType) { + syncing.scheduleDiff(rootType) + fetchPool.Submit(func() { + w.fetchDiff(ctx, diffCh, this.Round, prevRoots[i], this.Roots[i]) + }) + } + } + } +} + +func (w *Worker) fetchDiff(ctx context.Context, fetchCh chan<- *fetchedDiff, round uint64, prevRoot, thisRoot api.Root) { + result := &fetchedDiff{ + fetched: false, + pf: rpc.NewNopPeerFeedback(), + round: round, + prevRoot: prevRoot, + thisRoot: thisRoot, + } + defer func() { + select { + case fetchCh <- result: + case <-ctx.Done(): + } + }() + + // Check if the new root doesn't already exist. + if w.localStorage.NodeDB().HasRoot(thisRoot) { + return + } + + result.fetched = true + + // Even if HasRoot returns false the root can still exist if it is equal + // to the previous root and the root was emitted by the consensus committee + // directly (e.g., during an epoch transition). + if thisRoot.Hash.Equal(&prevRoot.Hash) { + result.writeLog = api.WriteLog{} + return + } + + // New root does not yet exist in storage and we need to fetch it from a peer. + w.logger.Debug("calling GetDiff", + "old_root", prevRoot, + "new_root", thisRoot, + ) + + wl, pf, err := w.getDiff(ctx, prevRoot, thisRoot) + if err != nil { + result.err = err + return + } + result.pf = pf + result.writeLog = wl +} + +// getDiff fetches writelog using diff sync p2p protocol client. +// +// The request relies on the default timeout of the underlying p2p protocol clients. +// +// In case of no peers or error, it fallbacks to the legacy storage sync protocol. +func (w *Worker) getDiff(ctx context.Context, prevRoot, thisRoot api.Root) (api.WriteLog, rpc.PeerFeedback, error) { + rsp1, pf, err := w.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + if err == nil { // if NO error + return rsp1.WriteLog, pf, nil + } + + rsp2, pf, err := w.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + if err != nil { + return nil, nil, err + } + return rsp2.WriteLog, pf, nil +} + +func (w *Worker) apply(ctx context.Context, diff *fetchedDiff) error { + if !diff.fetched { + return nil + } + + err := w.localStorage.Apply(ctx, &api.ApplyRequest{ + Namespace: diff.thisRoot.Namespace, + RootType: diff.thisRoot.Type, + SrcRound: diff.prevRoot.Version, + SrcRoot: diff.prevRoot.Hash, + DstRound: diff.thisRoot.Version, + DstRoot: diff.thisRoot.Hash, + WriteLog: diff.writeLog, + }) + switch { + case err == nil: + diff.pf.RecordSuccess() + case errors.Is(err, api.ErrExpectedRootMismatch): + diff.pf.RecordBadPeer() + default: + w.logger.Error("can't apply write log", + "err", err, + "old_root", diff.prevRoot, + "new_root", diff.thisRoot, + ) + diff.pf.RecordSuccess() + } + + return err +} + +func (w *Worker) finalize(ctx context.Context, summary *blockSummary, finalizedCh chan<- finalizedResult) { + err := w.localStorage.NodeDB().Finalize(summary.Roots) + switch err { + case nil: + w.logger.Debug("storage round finalized", + "round", summary.Round, + ) + case api.ErrAlreadyFinalized: + // This can happen if we are restoring after a roothash migration or if + // we crashed before updating the sync state. + w.logger.Warn("storage round already finalized", + "round", summary.Round, + ) + err = nil + default: + w.logger.Error("failed to finalize", "err", err, "summary", summary) + } + + result := finalizedResult{ + summary: summary, + err: err, + } + + select { + case finalizedCh <- result: + case <-ctx.Done(): + } +} diff --git a/go/worker/storage/committee/metrics.go b/go/worker/storage/statesync/metrics.go similarity index 91% rename from go/worker/storage/committee/metrics.go rename to go/worker/storage/statesync/metrics.go index 7f641f71fdd..4bc6c414df1 100644 --- a/go/worker/storage/committee/metrics.go +++ b/go/worker/storage/statesync/metrics.go @@ -1,4 +1,4 @@ -package committee +package statesync import ( "sync" @@ -49,9 +49,9 @@ var ( prometheusOnce sync.Once ) -func (n *Node) getMetricLabels() prometheus.Labels { +func (w *Worker) getMetricLabels() prometheus.Labels { return prometheus.Labels{ - "runtime": n.commonNode.Runtime.ID().String(), + "runtime": w.commonNode.Runtime.ID().String(), } } diff --git a/go/worker/storage/statesync/prune.go b/go/worker/storage/statesync/prune.go new file mode 100644 index 00000000000..dd67a4fbd9c --- /dev/null +++ b/go/worker/storage/statesync/prune.go @@ -0,0 +1,48 @@ +package statesync + +import ( + "fmt" + + "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" +) + +type pruneHandler struct { + logger *logging.Logger + worker *Worker +} + +func (p *pruneHandler) Prune(rounds []uint64) error { + // Make sure we never prune past what was synced. + lastSycnedRound, _, _ := p.worker.GetLastSynced() + + for _, round := range rounds { + if round >= lastSycnedRound { + return fmt.Errorf("worker/storage: tried to prune past last synced round (last synced: %d)", + lastSycnedRound, + ) + } + + // Old suggestion: Make sure we don't prune rounds that need to be checkpointed but haven't been yet. + + p.logger.Debug("pruning storage for round", "round", round) + + // Prune given block. + err := p.worker.localStorage.NodeDB().Prune(round) + switch err { + case nil: + case api.ErrNotEarliest: + p.logger.Debug("skipping non-earliest round", + "round", round, + ) + continue + default: + p.logger.Error("failed to prune block", + "err", err, + ) + return err + } + } + + return nil +} diff --git a/go/worker/storage/statesync/state_sync.go b/go/worker/storage/statesync/state_sync.go new file mode 100644 index 00000000000..5a4fc0a3187 --- /dev/null +++ b/go/worker/storage/statesync/state_sync.go @@ -0,0 +1,662 @@ +// Package statesync defines the logic responsible for initializing, syncing, +// and pruning of the runtime state using the relevant p2p protocol clients. +package statesync + +import ( + "context" + "errors" + "fmt" + "math" + "sync" + "time" + + "github.com/eapache/channels" + + "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/common/node" + "github.com/oasisprotocol/oasis-core/go/config" + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + commonFlags "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags" + registryApi "github.com/oasisprotocol/oasis-core/go/registry/api" + roothashApi "github.com/oasisprotocol/oasis-core/go/roothash/api" + "github.com/oasisprotocol/oasis-core/go/roothash/api/block" + runtime "github.com/oasisprotocol/oasis-core/go/runtime/api" + "github.com/oasisprotocol/oasis-core/go/runtime/host" + storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" + workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" + "github.com/oasisprotocol/oasis-core/go/worker/common/committee" + "github.com/oasisprotocol/oasis-core/go/worker/registration" + "github.com/oasisprotocol/oasis-core/go/worker/storage/api" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" + storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" +) + +var ( + _ committee.NodeHooks = (*Worker)(nil) + + // ErrNonLocalBackend is the error returned when the storage backend doesn't implement the LocalBackend interface. + ErrNonLocalBackend = errors.New("storage: storage backend doesn't support local storage") +) + +const ( + // RoundLatest is a magic value for the latest round. + RoundLatest = math.MaxUint64 + + defaultUndefinedRound = ^uint64(0) + + checkpointSyncRetryDelay = 10 * time.Second + + // The maximum number of rounds the worker can be behind the chain before it's sensible for + // it to register as available. + maximumRoundDelayForAvailability = uint64(10) + + // The minimum number of rounds the worker can be behind the chain before it's sensible for + // it to stop advertising availability. + minimumRoundDelayForUnavailability = uint64(15) +) + +// Worker is the runtime state sync worker, responsible for syncing state +// that corresponds to the incoming runtime block headers received from the +// consensus service. +// +// In addition this worker is responsible for: +// 1. Initializing the runtime state, possibly using checkpoints (if configured). +// 2. Pruning the state as specified by the configuration. +// 3. Optionally creating runtime state checkpoints (used by other nodes) for the state sync. +// 4. Creating (and optionally advertising) statesync p2p protocol clients and servers. +// 5. Registering node availability when it has synced sufficiently close to +// the latest known block header. +// +// Suggestion: This worker should not be responsible for creating and advertising p2p related stuff. +// Instead it should receive the p2p client (even better interface) for fetching storage diffs and checkpoints. +type Worker struct { // nolint: maligned + commonNode *committee.Node + + roleProvider registration.RoleProvider + rpcRoleProvider registration.RoleProvider + roleAvailable bool + + logger *logging.Logger + + localStorage storageApi.LocalBackend + + diffSync diffsync.Client + checkpointSync checkpointsync.Client + legacyStorageSync synclegacy.Client + + undefinedRound uint64 + + workerCommonCfg workerCommon.Config + + checkpointer checkpoint.Checkpointer + checkpointSyncCfg *CheckpointSyncConfig + checkpointSyncForced bool + + syncedLock sync.RWMutex + syncedState blockSummary + + statusLock sync.RWMutex + status api.StorageWorkerStatus + + blockCh *channels.InfiniteChannel + + initCh chan struct{} +} + +// New creates a new state sync worker. +func New( + ctx context.Context, + commonNode *committee.Node, + roleProvider registration.RoleProvider, + rpcRoleProvider registration.RoleProvider, + workerCommonCfg workerCommon.Config, + localStorage storageApi.LocalBackend, + checkpointSyncCfg *CheckpointSyncConfig, +) (*Worker, error) { + initMetrics() + + w := &Worker{ + commonNode: commonNode, + + roleProvider: roleProvider, + rpcRoleProvider: rpcRoleProvider, + + logger: logging.GetLogger("worker/storage/statesync").With("runtime_id", commonNode.Runtime.ID()), + + workerCommonCfg: workerCommonCfg, + + localStorage: localStorage, + + checkpointSyncCfg: checkpointSyncCfg, + + status: api.StatusInitializing, + + blockCh: channels.NewInfiniteChannel(), + + initCh: make(chan struct{}), + } + + // Validate checkpoint sync configuration. + if err := checkpointSyncCfg.Validate(); err != nil { + return nil, fmt.Errorf("bad checkpoint sync configuration: %w", err) + } + + // Initialize sync state. + w.syncedState.Round = defaultUndefinedRound + + // Create a checkpointer (even if checkpointing is disabled) to ensure the genesis checkpoint is available. + checkpointer, err := w.newCheckpointer(ctx, commonNode, localStorage) + if err != nil { + return nil, fmt.Errorf("failed to create checkpointer: %w", err) + } + w.checkpointer = checkpointer + + // Register prune handler. + commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ + logger: w.logger, + worker: w, + }) + + // Advertise and serve p2p protocols. + commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + if config.GlobalConfig.Storage.Checkpointer.Enabled { + commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + } + if rpcRoleProvider != nil { + commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + } + + // Create p2p protocol clients. + w.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + w.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + w.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + + return w, nil +} + +// Initialized returns a channel that will be closed once the worker finished starting up. +func (w *Worker) Initialized() <-chan struct{} { + return w.initCh +} + +// GetStatus returns the state sync worker status. +func (w *Worker) GetStatus(context.Context) (*api.Status, error) { + w.syncedLock.RLock() + defer w.syncedLock.RUnlock() + + w.statusLock.RLock() + defer w.statusLock.RUnlock() + + return &api.Status{ + LastFinalizedRound: w.syncedState.Round, + Status: w.status, + }, nil +} + +func (w *Worker) PauseCheckpointer(pause bool) error { + if !commonFlags.DebugDontBlameOasis() { + return api.ErrCantPauseCheckpointer + } + w.checkpointer.Pause(pause) + return nil +} + +// GetLocalStorage returns the local storage backend used by this state sync worker. +func (w *Worker) GetLocalStorage() storageApi.LocalBackend { + return w.localStorage +} + +// NodeHooks implementation. + +// HandleNewBlockEarlyLocked is guarded by CrossNode. +func (w *Worker) HandleNewBlockEarlyLocked(*runtime.BlockInfo) { + // Nothing to do here. +} + +// HandleNewBlockLocked is guarded by CrossNode. +func (w *Worker) HandleNewBlockLocked(bi *runtime.BlockInfo) { + // Notify the state syncer that there is a new block. + w.blockCh.In() <- bi.RuntimeBlock +} + +// HandleRuntimeHostEventLocked is guarded by CrossNode. +func (w *Worker) HandleRuntimeHostEventLocked(*host.Event) { + // Nothing to do here. +} + +// Watcher implementation. + +// GetLastSynced returns the height, IORoot hash and StateRoot hash of the last block that was fully synced to. +func (w *Worker) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { + w.syncedLock.RLock() + defer w.syncedLock.RUnlock() + + var io, state storageApi.Root + for _, root := range w.syncedState.Roots { + switch root.Type { + case storageApi.RootTypeIO: + io = root + case storageApi.RootTypeState: + state = root + } + } + + return w.syncedState.Round, io, state +} + +// Run runs state sync worker. +func (w *Worker) Run(ctx context.Context) error { // nolint: gocyclo + // Wait for the common node to be initialized. + select { + case <-w.commonNode.Initialized(): + case <-ctx.Done(): + close(w.initCh) + return ctx.Err() + } + + w.logger.Info("starting runtime state sync worker") + + w.statusLock.Lock() + w.status = api.StatusStarting + w.statusLock.Unlock() + + if config.GlobalConfig.Storage.Checkpointer.Enabled { + go w.createCheckpoints(ctx) + } + + // Determine genesis block. + genesisBlock, err := w.commonNode.Consensus.RootHash().GetGenesisBlock(ctx, &roothashApi.RuntimeRequest{ + RuntimeID: w.commonNode.Runtime.ID(), + Height: consensus.HeightLatest, + }) + if err != nil { + return fmt.Errorf("can't retrieve genesis block: %w", err) + } + w.undefinedRound = genesisBlock.Header.Round - 1 + + // Determine last finalized storage version. + if version, dbNonEmpty := w.localStorage.NodeDB().GetLatestVersion(); dbNonEmpty { + var blk *block.Block + blk, err = w.commonNode.Runtime.History().GetCommittedBlock(ctx, version) + switch err { + case nil: + // Set last synced version to last finalized storage version. + if _, err = w.flushSyncedState(summaryFromBlock(blk)); err != nil { + return fmt.Errorf("failed to flush synced state: %w", err) + } + default: + // Failed to fetch historic block. This is fine when the network just went through a + // dump/restore upgrade and we don't have any information before genesis. We treat the + // database as unsynced and will proceed to either use checkpoints or sync iteratively. + w.logger.Warn("failed to fetch historic block", + "err", err, + "round", version, + ) + } + } + + w.syncedLock.RLock() + cachedLastRound := w.syncedState.Round + w.syncedLock.RUnlock() + if cachedLastRound == defaultUndefinedRound || cachedLastRound < genesisBlock.Header.Round { + cachedLastRound = w.undefinedRound + } + + // Initialize genesis from the runtime descriptor. + isInitialStartup := (cachedLastRound == w.undefinedRound) + if isInitialStartup { + w.statusLock.Lock() + w.status = api.StatusInitializingGenesis + w.statusLock.Unlock() + + var rt *registryApi.Runtime + rt, err = w.commonNode.Runtime.ActiveDescriptor(ctx) + if err != nil { + return fmt.Errorf("failed to retrieve runtime registry descriptor: %w", err) + } + if err = w.initGenesis(ctx, rt, genesisBlock); err != nil { + return fmt.Errorf("failed to initialize storage at genesis: %w", err) + } + } + + // Notify the checkpointer of the genesis round so it can be checkpointed. + if w.checkpointer != nil { + w.checkpointer.ForceCheckpoint(genesisBlock.Header.Round) + w.checkpointer.Flush() + } + + // Check if we are able to fetch the first block that we would be syncing if we used iterative + // syncing. In case we cannot (likely because we synced the consensus layer via state sync), we + // must wait for a later checkpoint to become available. + if !w.checkpointSyncForced { + w.statusLock.Lock() + w.status = api.StatusSyncStartCheck + w.statusLock.Unlock() + + // Determine what is the first round that we would need to sync. + iterativeSyncStart := cachedLastRound + if iterativeSyncStart == w.undefinedRound { + iterativeSyncStart++ + } + + // Check if we actually have information about that round. This assumes that any reindexing + // was already performed (the common node would not indicate being initialized otherwise). + _, err = w.commonNode.Runtime.History().GetCommittedBlock(ctx, iterativeSyncStart) + SyncStartCheck: + switch { + case err == nil: + case errors.Is(err, roothashApi.ErrNotFound): + // No information is available about the initial round. Query the earliest historic + // block and check if that block has the genesis state root and empty I/O root. + var earlyBlk *block.Block + earlyBlk, err = w.commonNode.Runtime.History().GetEarliestBlock(ctx) + switch err { + case nil: + // Make sure the state root is still the same as at genesis time. + if !earlyBlk.Header.StateRoot.Equal(&genesisBlock.Header.StateRoot) { + break + } + // Make sure the I/O root is empty. + if !earlyBlk.Header.IORoot.IsEmpty() { + break + } + + // If this is the case, we can start syncing from this round instead. Fill in the + // remaining versions to make sure they actually exist in the database. + w.logger.Debug("filling in versions to genesis", + "genesis_round", genesisBlock.Header.Round, + "earliest_round", earlyBlk.Header.Round, + ) + for v := genesisBlock.Header.Round; v < earlyBlk.Header.Round; v++ { + err = w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ + Namespace: w.commonNode.Runtime.ID(), + RootType: storageApi.RootTypeState, + SrcRound: v, + SrcRoot: genesisBlock.Header.StateRoot, + DstRound: v + 1, + DstRoot: genesisBlock.Header.StateRoot, + WriteLog: nil, // No changes. + }) + switch err { + case nil: + case storageApi.ErrAlreadyFinalized: + // Ignore already finalized versions. + continue + default: + return fmt.Errorf("failed to fill in version %d: %w", v, err) + } + + err = w.localStorage.NodeDB().Finalize([]storageApi.Root{{ + Namespace: w.commonNode.Runtime.ID(), + Version: v + 1, + Type: storageApi.RootTypeState, + Hash: genesisBlock.Header.StateRoot, + // We can ignore I/O roots. + }}) + if err != nil { + return fmt.Errorf("failed to finalize filled in version %v: %w", v, err) + } + } + cachedLastRound, err = w.flushSyncedState(summaryFromBlock(earlyBlk)) + if err != nil { + return fmt.Errorf("failed to flush synced state: %w", err) + } + // No need to force a checkpoint sync. + break SyncStartCheck + default: + // This should never happen as the block should exist. + w.logger.Warn("failed to query earliest block in local history", + "err", err, + ) + } + + // No information is available about this round, force checkpoint sync. + w.logger.Warn("forcing checkpoint sync as we don't have authoritative block info", + "round", iterativeSyncStart, + ) + w.checkpointSyncForced = true + default: + // Unknown error while fetching block information, abort. + return fmt.Errorf("failed to query block: %w", err) + } + } + + w.logger.Info("worker initialized", + "genesis_round", genesisBlock.Header.Round, + "last_synced", cachedLastRound, + ) + + // Try to perform initial sync from state and io checkpoints if either: + // + // - Checkpoint sync has been forced because there is insufficient information available to use + // incremental sync. + // + // - We haven't synced anything yet and checkpoint sync is not disabled. + // + // If checkpoint sync is disabled but sync has been forced (e.g. because the state at genesis + // is non-empty), we must request to sync the checkpoint at genesis as otherwise we will jump + // to a later state which may not be desired given that checkpoint sync has been explicitly + // disabled via config. + // + if (isInitialStartup && !w.checkpointSyncCfg.Disabled) || w.checkpointSyncForced { + w.statusLock.Lock() + w.status = api.StatusSyncingCheckpoints + w.statusLock.Unlock() + + var ( + summary *blockSummary + attempt int + ) + CheckpointSyncRetry: + for { + summary, err = w.syncCheckpoints(ctx, genesisBlock.Header.Round, w.checkpointSyncCfg.Disabled) + if err == nil { + break + } + + attempt++ + switch w.checkpointSyncForced { + case true: + // We have no other options but to perform a checkpoint sync as we are missing + // either state or authoritative blocks. + w.logger.Info("checkpoint sync required, retrying", + "err", err, + "attempt", attempt, + ) + case false: + if attempt > 1 { + break CheckpointSyncRetry + } + + // Try syncing again. The main reason for this is the sync failing due to a + // checkpoint pruning race condition (where nodes list a checkpoint which is + // then deleted just before we request its chunks). One retry is enough. + w.logger.Info("first checkpoint sync failed, trying once more", "err", err) + } + + // Delay before retrying. + select { + case <-time.After(checkpointSyncRetryDelay): + case <-ctx.Done(): + return ctx.Err() + } + } + if err != nil { + w.logger.Info("checkpoint sync failed", "err", err) + } else { + cachedLastRound, err = w.flushSyncedState(summary) + if err != nil { + return fmt.Errorf("failed to flush synced state %w", err) + } + w.logger.Info("checkpoint sync succeeded", + logging.LogEvent, LogEventCheckpointSyncSuccess, + ) + } + } + close(w.initCh) + + w.statusLock.Lock() + w.status = api.StatusSyncingRounds + w.statusLock.Unlock() + + return w.syncDiffs(ctx, cachedLastRound) +} + +func (w *Worker) flushSyncedState(summary *blockSummary) (uint64, error) { + w.syncedLock.Lock() + defer w.syncedLock.Unlock() + + w.syncedState = *summary + if err := w.commonNode.Runtime.History().StorageSyncCheckpoint(w.syncedState.Round); err != nil { + return 0, err + } + + return w.syncedState.Round, nil +} + +func (w *Worker) initGenesis(ctx context.Context, rt *registryApi.Runtime, genesisBlock *block.Block) error { + w.logger.Info("initializing storage at genesis") + + // Check what the latest finalized version in the database is as we may be using a database + // from a previous version or network. + latestVersion, alreadyInitialized := w.localStorage.NodeDB().GetLatestVersion() + + // Finalize any versions that were not yet finalized in the old database. This is only possible + // as long as there is only one non-finalized root per version. Note that we also cannot be sure + // that any of these roots are valid, but this is fine as long as the final version matches the + // genesis root. + if alreadyInitialized { + w.logger.Debug("already initialized, finalizing any non-finalized versions", + "genesis_state_root", genesisBlock.Header.StateRoot, + "genesis_round", genesisBlock.Header.Round, + "latest_version", latestVersion, + ) + + for v := latestVersion + 1; v < genesisBlock.Header.Round; v++ { + roots, err := w.localStorage.NodeDB().GetRootsForVersion(v) + if err != nil { + return fmt.Errorf("failed to fetch roots for version %d: %w", v, err) + } + + var stateRoots []storageApi.Root + for _, root := range roots { + if root.Type == storageApi.RootTypeState { + stateRoots = append(stateRoots, root) + } + } + if len(stateRoots) != 1 { + break // We must have exactly one non-finalized state root to continue. + } + + err = w.localStorage.NodeDB().Finalize(stateRoots) + if err != nil { + return fmt.Errorf("failed to finalize version %d: %w", v, err) + } + + latestVersion = v + } + } + + stateRoot := storageApi.Root{ + Namespace: rt.ID, + Version: genesisBlock.Header.Round, + Type: storageApi.RootTypeState, + Hash: genesisBlock.Header.StateRoot, + } + + var compatible bool + switch { + case latestVersion < stateRoot.Version: + // Latest version is earlier than the genesis state root. In case it has the same hash + // we can fill in all the missing versions. + maybeRoot := stateRoot + maybeRoot.Version = latestVersion + + if w.localStorage.NodeDB().HasRoot(maybeRoot) { + w.logger.Debug("latest version earlier than genesis state root, filling in versions", + "genesis_state_root", genesisBlock.Header.StateRoot, + "genesis_round", genesisBlock.Header.Round, + "latest_version", latestVersion, + ) + for v := latestVersion; v < stateRoot.Version; v++ { + err := w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ + Namespace: rt.ID, + RootType: storageApi.RootTypeState, + SrcRound: v, + SrcRoot: stateRoot.Hash, + DstRound: v + 1, + DstRoot: stateRoot.Hash, + WriteLog: nil, // No changes. + }) + if err != nil { + return fmt.Errorf("failed to fill in version %d: %w", v, err) + } + + err = w.localStorage.NodeDB().Finalize([]storageApi.Root{{ + Namespace: rt.ID, + Version: v + 1, + Type: storageApi.RootTypeState, + Hash: stateRoot.Hash, + // We can ignore I/O roots. + }}) + if err != nil { + return fmt.Errorf("failed to finalize version %d: %w", v, err) + } + } + compatible = true + } + default: + // Latest finalized version is the same or ahead, root must exist. + compatible = w.localStorage.NodeDB().HasRoot(stateRoot) + } + + // If we are incompatible and the local version is greater or the same as the genesis version, + // we cannot do anything. If the local version is lower we assume the node will sync from a + // different node. + if !compatible && latestVersion >= stateRoot.Version { + w.logger.Error("existing state is incompatible with runtime genesis state", + "genesis_state_root", genesisBlock.Header.StateRoot, + "genesis_round", genesisBlock.Header.Round, + "latest_version", latestVersion, + ) + return fmt.Errorf("existing state is incompatible with runtime genesis state") + } + + if !compatible { + // Database is empty, so assume the state will be replicated from another node. + w.logger.Warn("non-empty state root but no state available, assuming replication", + "state_root", genesisBlock.Header.StateRoot, + ) + w.checkpointSyncForced = true + } + return nil +} + +// This is only called from the main worker goroutine, so no locking should be necessary. +func (w *Worker) nudgeAvailability(lastSynced, latest uint64) { + if lastSynced == w.undefinedRound || latest == w.undefinedRound { + return + } + if latest-lastSynced < maximumRoundDelayForAvailability && !w.roleAvailable { + w.roleProvider.SetAvailable(func(_ *node.Node) error { + return nil + }) + if w.rpcRoleProvider != nil { + w.rpcRoleProvider.SetAvailable(func(_ *node.Node) error { + return nil + }) + } + w.roleAvailable = true + } + if latest-lastSynced > minimumRoundDelayForUnavailability && w.roleAvailable { + w.roleProvider.SetUnavailable() + if w.rpcRoleProvider != nil { + w.rpcRoleProvider.SetUnavailable() + } + w.roleAvailable = false + } +} diff --git a/go/worker/storage/committee/utils.go b/go/worker/storage/statesync/utils.go similarity index 99% rename from go/worker/storage/committee/utils.go rename to go/worker/storage/statesync/utils.go index 863b9fc7bd0..88adb492b33 100644 --- a/go/worker/storage/committee/utils.go +++ b/go/worker/storage/statesync/utils.go @@ -1,4 +1,4 @@ -package committee +package statesync import ( "fmt" diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index f49988bd1c7..a027d9eae59 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -1,8 +1,11 @@ package storage import ( + "context" "fmt" + "golang.org/x/sync/errgroup" + "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/grpc" "github.com/oasisprotocol/oasis-core/go/common/logging" @@ -12,10 +15,10 @@ import ( committeeCommon "github.com/oasisprotocol/oasis-core/go/worker/common/committee" "github.com/oasisprotocol/oasis-core/go/worker/registration" storageWorkerAPI "github.com/oasisprotocol/oasis-core/go/worker/storage/api" - "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" + "github.com/oasisprotocol/oasis-core/go/worker/storage/statesync" ) -// Worker is a worker handling storage operations. +// Worker is a worker handling storage operations for all common worker runtimes. type Worker struct { enabled bool @@ -26,7 +29,10 @@ type Worker struct { initCh chan struct{} quitCh chan struct{} - runtimes map[common.Namespace]*committee.Node + runtimes map[common.Namespace]*statesync.Worker + + ctx context.Context + cancel context.CancelFunc } // New constructs a new storage worker. @@ -35,6 +41,7 @@ func New( commonWorker *workerCommon.Worker, registration *registration.Worker, ) (*Worker, error) { + ctx, cancel := context.WithCancel(context.Background()) enabled := config.GlobalConfig.Mode.HasLocalStorage() && len(commonWorker.GetRuntimes()) > 0 s := &Worker{ @@ -44,14 +51,16 @@ func New( logger: logging.GetLogger("worker/storage"), initCh: make(chan struct{}), quitCh: make(chan struct{}), - runtimes: make(map[common.Namespace]*committee.Node), + runtimes: make(map[common.Namespace]*statesync.Worker), + ctx: ctx, + cancel: cancel, } if !enabled { return s, nil } - // Start storage node for every runtime. + // Start state sync worker for every runtime. for id, rt := range s.commonWorker.GetRuntimes() { if err := s.registerRuntime(rt); err != nil { return nil, fmt.Errorf("failed to create storage worker for runtime %s: %w", id, err) @@ -90,13 +99,14 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { return fmt.Errorf("can't create local storage backend: %w", err) } - node, err := committee.NewNode( + worker, err := statesync.New( + w.ctx, commonNode, rp, rpRPC, w.commonWorker.GetConfig(), localStorage, - &committee.CheckpointSyncConfig{ + &statesync.CheckpointSyncConfig{ Disabled: config.GlobalConfig.Storage.CheckpointSyncDisabled, ChunkFetcherCount: config.GlobalConfig.Storage.FetcherCount, }, @@ -105,8 +115,8 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { return err } commonNode.Runtime.RegisterStorage(localStorage) - commonNode.AddHooks(node) - w.runtimes[id] = node + commonNode.AddHooks(worker) + w.runtimes[id] = worker w.logger.Info("new runtime registered", "runtime_id", id, @@ -115,7 +125,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { return nil } -// Name returns the service name. +// Name returns the worker name. func (w *Worker) Name() string { return "storage worker" } @@ -142,36 +152,43 @@ func (w *Worker) Start() error { return nil } - // Wait for all runtimes to terminate. go func() { defer close(w.quitCh) - - for _, r := range w.runtimes { - <-r.Quit() - } + _ = w.Serve() // error logged as part of Serve already. }() - // Start all runtimes and wait for initialization. go func() { - w.logger.Info("starting storage sync services", "num_runtimes", len(w.runtimes)) - - for _, r := range w.runtimes { - _ = r.Start() - } - - // Wait for runtimes to be initialized. for _, r := range w.runtimes { <-r.Initialized() } - w.logger.Info("storage worker started") - close(w.initCh) }() return nil } +// Serve starts running state sync worker for every configured runtime. +// +// In case of an error from one of the state sync workers it cancels the remaining +// ones and waits for all of them to finish. The error from the first worker +// that failed is returned. +func (w *Worker) Serve() error { + w.logger.Info("starting storage sync workers", "num_runtimes", len(w.runtimes)) + + g, ctx := errgroup.WithContext(w.ctx) + for id, r := range w.runtimes { + g.Go(func() error { + err := r.Run(ctx) + if err != nil { + w.logger.Error("state sync worker failed", "runtimeID", id, err, err) + } + return err + }) + } + return g.Wait() +} + // Stop halts the service. func (w *Worker) Stop() { if !w.enabled { @@ -179,9 +196,9 @@ func (w *Worker) Stop() { return } - for _, r := range w.runtimes { - r.Stop() - } + w.cancel() + <-w.quitCh + w.logger.Info("stopped") } // Quit returns a channel that will be closed when the service terminates. @@ -196,6 +213,6 @@ func (w *Worker) Cleanup() { // GetRuntime returns a storage committee node for the given runtime (if available). // // In case the runtime with the specified id was not configured for this node it returns nil. -func (w *Worker) GetRuntime(id common.Namespace) *committee.Node { +func (w *Worker) GetRuntime(id common.Namespace) *statesync.Worker { return w.runtimes[id] }