Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 1 addition & 29 deletions based/bin/portal/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use bop_common::{
OpNodeP2PApiClient, OpRpcBlock, PORTAL_CAPABILITIES, PortalApiServer, RegistryApiClient,
},
communication::messages::{RpcError, RpcResult},
time::Nanos,
utils::{uuid, wait_for_signal},
};
use jsonrpsee::{
Expand Down Expand Up @@ -46,10 +45,6 @@ impl fmt::Debug for Gateway {
}
}

/// If we get fcus faster than this threshold we assume that we are in sync mode and will
/// not propagate anything to the gateways
const SYNC_FCU_DT_THRESHOLD: Nanos = Nanos::from_millis(999);

#[derive(Clone)]
pub struct PortalServer {
fallback_eth_client: RpcClient,
Expand All @@ -59,7 +54,6 @@ pub struct PortalServer {
current_gateway: Arc<Mutex<Option<Gateway>>>,
gateway_timeout: Duration,
gateways: Arc<RwLock<Vec<Gateway>>>,
last_fcu: Arc<RwLock<Nanos>>,
args: Arc<PortalArgs>,
}

Expand Down Expand Up @@ -116,7 +110,6 @@ impl PortalServer {
current_gateway,
gateways,
gateway_timeout,
last_fcu: Default::default(),
args: Arc::new(args),
})
}
Expand Down Expand Up @@ -179,10 +172,6 @@ impl PortalServer {
self.gateways.read().clone()
}

fn syncing(&self) -> bool {
self.last_fcu.read().elapsed() < SYNC_FCU_DT_THRESHOLD
}

pub async fn refresh(&self) -> eyre::Result<()> {
let mut gateways = vec![];
for (gateway_url, _, jwt_as_b256) in self.registry_client.registered_gateways().await? {
Expand Down Expand Up @@ -243,7 +232,7 @@ impl EthApiServer for PortalServer {
let bytes = bytes.clone();
tokio::spawn(async move {
if let Err(err) = gateway.client.send_raw_transaction(bytes).await {
error!(%err, ?gateway, "failed to send to gateway");
error!(%err, ?gateway, "eth_sendRawTransaction: failed to send to gateway");
}
});
}
Expand Down Expand Up @@ -418,10 +407,6 @@ impl EngineApiServer for PortalServer {
fork_choice_state: ForkchoiceState,
payload_attributes: Option<OpPayloadAttributes>,
) -> RpcResult<ForkchoiceUpdated> {
let last_fcu_dt = self.last_fcu.read().elapsed();
if payload_attributes.is_none() {
*self.last_fcu.write() = Nanos::now();
}
let parent_block_hash = fork_choice_state.head_block_hash;

if let Some(payload_attributes) = payload_attributes.as_ref() {
Expand All @@ -435,11 +420,6 @@ impl EngineApiServer for PortalServer {
let response =
self.fallback_client.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;

if payload_attributes.is_none() && last_fcu_dt < SYNC_FCU_DT_THRESHOLD {
debug!("we seem to be in state syncing so only sending fcu to fallback");
return Ok(response);
}

if let Some(current_gateway) = self.current_gateway.as_ref().lock().await.clone() {
if payload_attributes.is_some() {
// pick only one gateway for this block
Expand Down Expand Up @@ -479,10 +459,6 @@ impl EngineApiServer for PortalServer {
.new_payload_v3(payload.clone(), versioned_hashes.clone(), parent_beacon_block_root)
.await?;

if self.syncing() {
return Ok(response);
}

// send to all gateways
for gateway in self.gateways() {
let payload = payload.clone();
Expand Down Expand Up @@ -512,10 +488,6 @@ impl EngineApiServer for PortalServer {
#[tracing::instrument(skip_all, err, ret(level = Level::DEBUG), fields(req_id = %uuid()))]
async fn get_payload_v3(&self, payload_id: PayloadId) -> RpcResult<OpExecutionPayloadEnvelopeV3> {
debug!(%payload_id, "new request");
if self.syncing() {
error!("syncing");
return Ok(self.fallback_client.clone().get_payload_v3(payload_id).await?);
}

let fallback_fut = tokio::spawn({
let client = self.fallback_client.clone();
Expand Down
2 changes: 1 addition & 1 deletion op-geth/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2282,7 +2282,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
// the ancestor of new head while these two blocks are not consecutive
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number, "hash", newChain[0].Hash())
blockReorgAddMeter.Mark(int64(len(newChain)))
} else {
} else if oldHead.Hash() != newHead.Hash() {
// len(newChain) == 0 && len(oldChain) > 0
// rewind the canonical chain to a lower point.
log.Error("Impossible reorg, please file an issue", "oldnum", oldHead.Number, "oldhash", oldHead.Hash(), "oldblocks", len(oldChain), "newnum", newHead.Number, "newhash", newHead.Hash(), "newblocks", len(newChain))
Expand Down
6 changes: 5 additions & 1 deletion optimism/op-node/rollup/engine/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,6 @@ func (d *EngDeriver) OnEvent(ev event.Event) bool {
"unsafe_timestamp", x.Unsafe.Time)
d.emitter.Emit(EngineResetConfirmedEvent(x))
case PromoteUnsafeEvent:
d.preconfChannels.SendL2Block(&x.Ref)
// Backup unsafeHead when new block is not built on original unsafe head.
if d.ec.unsafeHead.Number >= x.Ref.Number {
d.ec.SetBackupUnsafeL2Head(d.ec.unsafeHead, false)
Expand All @@ -449,6 +448,8 @@ func (d *EngDeriver) OnEvent(ev event.Event) bool {
if !d.cfg.IsInterop(x.Ref.Time) {
d.emitter.Emit(PromoteCrossUnsafeEvent(x))
}
log.Debug("sending L2Block to preconf", "number", x.Ref.Number)
d.preconfChannels.SendL2Block(&x.Ref)
// Try to apply the forkchoice changes
d.emitter.Emit(TryUpdateEngineEvent{})
case PromoteCrossUnsafeEvent:
Expand Down Expand Up @@ -499,6 +500,9 @@ func (d *EngDeriver) OnEvent(ev event.Event) bool {
d.emitter.Emit(PromoteSafeEvent(x))
}
case PromoteSafeEvent:

log.Debug("sending L2Block to preconf", "number", x.Ref.Number)
d.preconfChannels.SendL2Block(&x.Ref)
d.log.Debug("Updating safe", "safe", x.Ref, "unsafe", d.ec.UnsafeL2Head())
d.ec.SetSafeHead(x.Ref)
// Finalizer can pick up this safe cross-block now
Expand Down
8 changes: 4 additions & 4 deletions optimism/op-node/rollup/engine/preconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func StartPreconf(ctx context.Context, e ExecEngine) PreconfChannels {
// Checks if the state is new or if the previous block is sealed.
func (s *PreconfState) putEnv(sEnv *eth.SignedEnv) {
env := sEnv.Env
if !s.lastSealSent.IsSet() || s.lastSealSent.IsEqual(env.Number-1) || s.lastL2BlockSent.IsEqual(env.Number) {
if s.lastL2BlockSent.IsEqual(env.Number - 1) {
s.lastEnvSent.Set(env.Number)
s.e.Env(s.ctx, sEnv)
s.prune(env.Number)
Expand Down Expand Up @@ -191,17 +191,17 @@ func (s *PreconfState) putSeal(sSeal *eth.SignedSeal) {
// Checks if there's envs blocked because of gaps and sends them over.
func (s *PreconfState) putL2Block(block *eth.L2BlockRef) {
s.lastL2BlockSent.Set(block.Number)
nextEnv, ok := s.pendingEnvs[block.Number]
nextEnv, ok := s.pendingEnvs[block.Number+1]
if ok {
delete(s.pendingEnvs, block.Number)
delete(s.pendingEnvs, block.Number+1)
s.putEnv(&nextEnv)
}

s.prune(block.Number)
}

// The amount of blocks we don't prune back from the current block.
const PruneSafeWindow = 2
const PruneSafeWindow = 512

func (s *PreconfState) prune(currentBlock uint64) {
// We only prune if there's at least a full window of events to prune.
Expand Down
Loading