-
Notifications
You must be signed in to change notification settings - Fork 245
feat: implement sync mode #2961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Claude finished @tac0turtle's task in 2m 30s —— View job Comprehensive Code ReviewI've reviewed the PR implementing dynamic sync modes (catchup polling vs follow subscription). Here's my detailed analysis: Tasks
SummaryThe implementation adds a sophisticated dual-mode sync system that dynamically switches between aggressive polling (catchup) and efficient subscriptions (follow). The architecture is sound, but there are several critical bugs, design concerns, and missing test coverage that need attention. Critical Issues1. Loop Variable Capture in Subscription Processing (syncer.go:567-580)Severity: Critical | Impact: Data Corruption While Go 1.22+ fixed the loop variable scoping issue mentioned by gemini-code-assist, the code at lines 567-580 has a more subtle problem: for _, event := range events {
select {
case s.heightInCh <- event: // ✅ OK - value copy
// ...
default:
s.cache.SetPendingEvent(event.Header.Height(), &event) // ❌ PROBLEMIssue: Taking Recommendation: Explicitly capture the value to be unambiguous: for _, event := range events {
event := event // Explicit capture for safety
select {
case s.heightInCh <- event:
// ...
default:
s.cache.SetPendingEvent(event.Header.Height(), &event)Location: syncer.go:567-580 2. Missing Tracing for New Methods (da/tracing.go:136-142)Severity: Medium | Impact: Observability Gap The new func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) {
return t.inner.Subscribe(ctx, namespace) // ❌ No tracing
}
func (t *tracedClient) LocalHead(ctx context.Context) (uint64, error) {
return t.inner.LocalHead(ctx) // ❌ No tracing
}Recommendation: Add tracing spans: func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) {
ctx, span := t.tracer.Start(ctx, "DA.Subscribe",
trace.WithAttributes(
attribute.String("da.namespace", hex.EncodeToString(namespace)),
),
)
defer span.End()
ch, err := t.inner.Subscribe(ctx, namespace)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
return ch, err
}Location: da/tracing.go:136-142 3. Deadlock Risk in Subscription Error Path (syncer.go:455-550)Severity: High | Impact: Node Hang The func (s *Syncer) subscribeAndFollow() error {
subCtx, cancel := context.WithCancel(s.ctx)
defer cancel() // Called on all returns
headerCh, err := s.daClient.Subscribe(subCtx, headerNS)
if err != nil {
return fmt.Errorf("failed to subscribe: %w", err) // ❌ Early return, but channels might still be sending
}
// Similar for dataCh and forcedInclusionChProblem: If subscription setup fails partway through (e.g., header succeeds but data fails), the first subscription's goroutine may still be sending to its channel. When we return early, the receiving goroutines haven't started yet, causing the sender to block indefinitely until Impact: While Recommendation: Create subscriptions in a more controlled manner, or ensure context is cancelled before any early returns. Design & Architectural Concerns4. Nil Channel Select Cases are Dead Code (syncer.go:516-527)Severity: Low | Impact: Code Clarity The code comments explain that nil channels block forever in case resp, ok := <-dataCh:
// Note: if dataCh is nil (same namespace as header), this case never fires
if !ok {
return errors.New("data subscription closed")
}The Recommendation: Remove the dead code or add a clearer comment explaining the nil channel behavior upfront. Location: syncer.go:516-527 5. Interface Abstraction Violation (syncer.go:564)Severity: Medium | Impact: Maintainability The syncer depends on the events := s.daRetriever.ProcessBlobs(s.ctx, blobs, resp.Height)The comment from julienrbrt points out that this pattern bypasses abstraction. The Recommendation: As noted in review comments, align the forced inclusion retriever to use the same subscription-based pattern rather than the separate async block fetching. Location: syncer.go:564 6. Forced Inclusion Subscription Redundancy (syncer.go:480-491)Severity: Medium | Impact: Performance & Complexity The code subscribes to the forced inclusion namespace separately: if s.daClient.HasForcedInclusionNamespace() {
fiNS := s.daClient.GetForcedInclusionNamespace()
if !bytes.Equal(fiNS, headerNS) && !bytes.Equal(fiNS, dataNS) {
forcedInclusionCh, err = s.daClient.Subscribe(subCtx, fiNS)However, as julienrbrt noted, the forced inclusion retriever already has its own caching and async fetching ( Recommendation: Remove the forced inclusion subscription from follow mode and rely on the retriever's existing async mechanism, or refactor to use a unified approach. Locations: Concurrency & Goroutine Safety7. Atomic Access Patterns Look SoundStatus: ✅ Good The sync mode switching uses mode := s.determineSyncMode()
previousMode := SyncMode(s.currentSyncMode.Load())
if mode != previousMode {
s.currentSyncMode.Store(int32(mode))The pattern of load-compare-store without CAS is acceptable here because:
8. Channel Buffering is ReasonableStatus: ✅ Good heightInCh: make(chan common.DAHeightEvent, 100),The 100-event buffer is sensible for handling bursty traffic from subscriptions without blocking. Events that don't fit go to the cache ( 9. Context Cancellation HandlingStatus: The err := s.subscribeAndFollow()
if err != nil && !errors.Is(err, context.Canceled) {
s.metrics.SubscribeErrors.Add(1)However, the Error Handling & Resilience10. Subscription Error Recovery is Well-DesignedStatus: ✅ Good The error handling falls back gracefully: if err != nil && !errors.Is(err, context.Canceled) {
s.metrics.SubscribeErrors.Add(1)
s.logger.Warn().Err(err).Msg("subscribe failed, will retry via mode check")
// No explicit catchup call needed - daWorkerLoop will call determineSyncMode()This is elegant: instead of forcing an immediate mode switch, it lets the natural loop re-evaluate the mode. 11. Watchdog Timeout is EssentialStatus: ✅ Good The watchdog timer prevents the node from being stuck in follow mode if the subscription stalls: case <-time.After(watchdogTimeout):
s.logger.Debug().Dur("timeout", watchdogTimeout).Msg("subscription watchdog triggered")
if s.determineSyncMode() == SyncModeCatchup {
return errors.New("fell behind, switching to catchup")
}Suggestion: Consider making the watchdog timeout configurable via Metrics & Observability12. New Metrics Are Well-DefinedStatus: ✅ Good The metrics provide good visibility into sync mode behavior: SyncMode metrics.Gauge // 0=catchup, 1=follow
SubscribeErrors metrics.Counter // Subscription failures
ModeSwitches metrics.Counter // Mode transitionsThese will be invaluable for monitoring and alerting on sync health. 13. Missing Metric: Subscription DurationSeverity: Low | Impact: Observability It would be useful to track how long the node stays in each mode to identify patterns (e.g., constantly switching indicates instability). Recommendation: Add a histogram or gauge for time-in-mode: SyncModeDuration map[string]metrics.Histogram // Time spent in each modeTest Coverage Analysis14. Critical Missing TestsSeverity: High | Impact: Quality Assurance The codecov report shows 35.42% patch coverage with 113 missing lines. Key gaps:
Recommendation: Add unit tests for:
15. Existing Tests Look GoodStatus: ✅ Good The test additions in Security Considerations16. Subscription Data ValidationStatus: ✅ Good Subscription responses go through the same validation as polled data: events := s.daRetriever.ProcessBlobs(s.ctx, blobs, resp.Height)The 17. No New Authentication/Authorization IssuesStatus: ✅ Good The DA client subscription uses the same authentication as polling, so there's no new attack surface. Performance Considerations18. Subscription Memory ManagementStatus: The subscription channels are unbounded (created by the DA client), which could theoretically lead to memory growth if the syncer can't keep up. However, the existing buffering and pending event cache should handle this. Recommendation: Monitor memory usage in production, especially the 19. Forced Inclusion Async PrefetcherStatus: ✅ Good The asyncFetcher := NewAsyncBlockRetriever(
client, logger, client.GetForcedInclusionNamespace(),
cfg, daStartHeight,
daEpochSize*2, // prefetch window
)This should reduce latency when processing forced inclusion transactions. Code Style & Go Conventions20. Generally Idiomatic GoStatus: ✅ Good The code follows Go conventions well:
21. SyncMode Type is CleanStatus: ✅ Good type SyncMode int
const (
SyncModeCatchup SyncMode = iota
SyncModeFollow
)
func (m SyncMode) String() string { ... }This is a textbook example of Go enumeration patterns. Documentation22. Function Comments Are GoodStatus: ✅ Good Key functions have clear documentation: // subscribeAndFollow uses the DA subscription API to receive real-time blob notifications.
// It subscribes to header, data, and forced inclusion namespaces and processes incoming blobs.
// Returns when subscription fails, context is cancelled, or node falls behind.
func (s *Syncer) subscribeAndFollow() error {23. Missing High-Level Design DocSeverity: Low | Impact: Maintainability While individual functions are well-documented, there's no high-level explanation of the sync mode state machine, transition conditions, or design rationale. Recommendation: Add a package-level comment or markdown doc explaining:
Summary of RecommendationsMust Fix (Critical):
Should Fix (High Priority):
Consider (Medium Priority):
ConclusionThis is a well-architected feature that adds significant value to the DA sync layer. The dual-mode approach is smart, and the implementation is generally solid. However, the critical loop variable issue, missing test coverage, and tracing gaps need to be addressed before merging. The forced inclusion subscription redundancy (identified by julienrbrt) is a valid architectural concern that should be refactored for simplicity and performance. Overall Assessment: |
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
Summary of ChangesHello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the data availability (DA) worker's synchronization capabilities by introducing dynamic sync modes. It allows the system to efficiently transition between an aggressive polling strategy when catching up to the DA chain and a more resource-optimized, subscription-based approach once it reaches the chain head. This change aims to improve the overall performance and responsiveness of DA data processing. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a dual-mode synchronization mechanism, allowing the node to switch between an aggressive polling 'catchup' mode and a more efficient subscription-based 'follow' mode. This is a significant enhancement for nodes that are at the head of the chain. The changes are well-structured, introducing new DA client methods, metrics, and the core state machine logic in the daWorkerLoop. My review identified two critical bugs related to incorrect loop variable capturing that could lead to data corruption, and a couple of medium-severity design and style issues. Once these points are addressed, the implementation will be much more robust.
…in daRetriever; update tests and syncer for subscription handling
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2961 +/- ##
==========================================
- Coverage 57.77% 57.52% -0.25%
==========================================
Files 98 101 +3
Lines 9394 9876 +482
==========================================
+ Hits 5427 5681 +254
- Misses 3364 3582 +218
- Partials 603 613 +10
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
95aeea4 to
ecfcf83
Compare
|
CI is not so glad. |
fixed |
block/internal/syncing/syncer.go
Outdated
| } | ||
|
|
||
| // Subscribe to forced inclusion namespace if configured | ||
| var forcedInclusionCh <-chan *blobrpc.SubscriptionResponse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to follow the force included namespace. The retriever itself does the caching itself. Maybe we should align this logic in the force inclusion retriever as well instead of using the async block fetching (in da)
block/internal/syncing/syncer.go
Outdated
| s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process data subscription") | ||
| } | ||
|
|
||
| case resp, ok := <-forcedInclusionCh: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, this is dead code
| daStartHeight uint64 | ||
| asyncFetcher AsyncBlockRetriever | ||
|
|
||
| mu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to track all of this the tracking is done in the retrieve function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
working on some cleanup yea
Overview
this pr add sync modes allowing us to move to subscriptions when at the head of the chain