Skip to content

Conversation

@tac0turtle
Copy link
Contributor

Overview

this pr add sync modes allowing us to move to subscriptions when at the head of the chain

@claude
Copy link
Contributor

claude bot commented Jan 8, 2026

Claude finished @tac0turtle's task in 2m 30s —— View job


Comprehensive Code Review

I've reviewed the PR implementing dynamic sync modes (catchup polling vs follow subscription). Here's my detailed analysis:

Tasks

  • Review core sync mode implementation in syncer.go
  • Assess concurrency patterns and goroutine safety
  • Review interface design and abstraction
  • Check error handling and resilience
  • Review metrics implementation
  • Assess test coverage and quality
  • Verify subscription mechanism
  • Check DA client changes
  • Review forced inclusion retriever changes
  • Provide comprehensive feedback

Summary

The implementation adds a sophisticated dual-mode sync system that dynamically switches between aggressive polling (catchup) and efficient subscriptions (follow). The architecture is sound, but there are several critical bugs, design concerns, and missing test coverage that need attention.

Critical Issues

1. Loop Variable Capture in Subscription Processing (syncer.go:567-580)

Severity: Critical | Impact: Data Corruption

While Go 1.22+ fixed the loop variable scoping issue mentioned by gemini-code-assist, the code at lines 567-580 has a more subtle problem:

for _, event := range events {
    select {
    case s.heightInCh <- event:  // ✅ OK - value copy
        // ...
    default:
        s.cache.SetPendingEvent(event.Header.Height(), &event)  // ❌ PROBLEM

Issue: Taking &event passes a pointer to the loop variable. Even in Go 1.22+, if the event variable is reused internally by the cache or accessed later, this could cause issues. More importantly, SetPendingEvent expects a pointer and may store it, which could lead to all cached events pointing to the last iteration's value if the cache implementation doesn't copy.

Recommendation: Explicitly capture the value to be unambiguous:

for _, event := range events {
    event := event  // Explicit capture for safety
    select {
    case s.heightInCh <- event:
        // ...
    default:
        s.cache.SetPendingEvent(event.Header.Height(), &event)

Location: syncer.go:567-580


2. Missing Tracing for New Methods (da/tracing.go:136-142)

Severity: Medium | Impact: Observability Gap

The new Subscribe and LocalHead methods lack OpenTelemetry tracing, while all other DA client methods are traced. This creates an observability blind spot for a critical new feature.

func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) {
    return t.inner.Subscribe(ctx, namespace)  // ❌ No tracing
}

func (t *tracedClient) LocalHead(ctx context.Context) (uint64, error) {
    return t.inner.LocalHead(ctx)  // ❌ No tracing
}

Recommendation: Add tracing spans:

func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) {
    ctx, span := t.tracer.Start(ctx, "DA.Subscribe",
        trace.WithAttributes(
            attribute.String("da.namespace", hex.EncodeToString(namespace)),
        ),
    )
    defer span.End()
    
    ch, err := t.inner.Subscribe(ctx, namespace)
    if err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, err.Error())
    }
    return ch, err
}

Location: da/tracing.go:136-142


3. Deadlock Risk in Subscription Error Path (syncer.go:455-550)

Severity: High | Impact: Node Hang

The subscribeAndFollow function has multiple early returns that call defer cancel(), but the subscription context cancellation happens after attempting to read from potentially blocked channels:

func (s *Syncer) subscribeAndFollow() error {
    subCtx, cancel := context.WithCancel(s.ctx)
    defer cancel()  // Called on all returns
    
    headerCh, err := s.daClient.Subscribe(subCtx, headerNS)
    if err != nil {
        return fmt.Errorf("failed to subscribe: %w", err)  // ❌ Early return, but channels might still be sending
    }
    
    // Similar for dataCh and forcedInclusionCh

Problem: If subscription setup fails partway through (e.g., header succeeds but data fails), the first subscription's goroutine may still be sending to its channel. When we return early, the receiving goroutines haven't started yet, causing the sender to block indefinitely until defer cancel() fires.

Impact: While defer cancel() will eventually unblock things, there's a race condition window where goroutines could hang briefly.

Recommendation: Create subscriptions in a more controlled manner, or ensure context is cancelled before any early returns.


Design & Architectural Concerns

4. Nil Channel Select Cases are Dead Code (syncer.go:516-527)

Severity: Low | Impact: Code Clarity

The code comments explain that nil channels block forever in select, but this pattern is confusing:

case resp, ok := <-dataCh:
    // Note: if dataCh is nil (same namespace as header), this case never fires
    if !ok {
        return errors.New("data subscription closed")
    }

The if dataCh == nil check inside the case is unreachable because a nil channel case never executes. This is mentioned in review comments but should be cleaned up.

Recommendation: Remove the dead code or add a clearer comment explaining the nil channel behavior upfront.

Location: syncer.go:516-527


5. Interface Abstraction Violation (syncer.go:564)

Severity: Medium | Impact: Maintainability

The syncer depends on the DARetriever interface but makes assumptions about its implementation:

events := s.daRetriever.ProcessBlobs(s.ctx, blobs, resp.Height)

The comment from julienrbrt points out that this pattern bypasses abstraction. The ProcessBlobs method is part of the interface, which is good, but the broader concern is that the forced inclusion logic duplicates this async pattern.

Recommendation: As noted in review comments, align the forced inclusion retriever to use the same subscription-based pattern rather than the separate async block fetching.

Location: syncer.go:564


6. Forced Inclusion Subscription Redundancy (syncer.go:480-491)

Severity: Medium | Impact: Performance & Complexity

The code subscribes to the forced inclusion namespace separately:

if s.daClient.HasForcedInclusionNamespace() {
    fiNS := s.daClient.GetForcedInclusionNamespace()
    if !bytes.Equal(fiNS, headerNS) && !bytes.Equal(fiNS, dataNS) {
        forcedInclusionCh, err = s.daClient.Subscribe(subCtx, fiNS)

However, as julienrbrt noted, the forced inclusion retriever already has its own caching and async fetching (AsyncBlockRetriever). This creates duplication and complexity.

Recommendation: Remove the forced inclusion subscription from follow mode and rely on the retriever's existing async mechanism, or refactor to use a unified approach.

Locations:


Concurrency & Goroutine Safety

7. Atomic Access Patterns Look Sound

Status: ✅ Good

The sync mode switching uses atomic.Int32 correctly:

mode := s.determineSyncMode()
previousMode := SyncMode(s.currentSyncMode.Load())
if mode != previousMode {
    s.currentSyncMode.Store(int32(mode))

The pattern of load-compare-store without CAS is acceptable here because:

  1. Only one goroutine (daWorkerLoop) modifies the mode
  2. Reads are atomic
  3. Metrics don't require strict consistency

8. Channel Buffering is Reasonable

Status: ✅ Good

heightInCh: make(chan common.DAHeightEvent, 100),

The 100-event buffer is sensible for handling bursty traffic from subscriptions without blocking. Events that don't fit go to the cache (SetPendingEvent), which is the right fallback.


9. Context Cancellation Handling

Status: ⚠️ Needs Attention

The runFollowMode function properly checks for context cancellation:

err := s.subscribeAndFollow()
if err != nil && !errors.Is(err, context.Canceled) {
    s.metrics.SubscribeErrors.Add(1)

However, the subscribeAndFollow function's multiple early returns could benefit from more explicit context checking before returning errors.


Error Handling & Resilience

10. Subscription Error Recovery is Well-Designed

Status: ✅ Good

The error handling falls back gracefully:

if err != nil && !errors.Is(err, context.Canceled) {
    s.metrics.SubscribeErrors.Add(1)
    s.logger.Warn().Err(err).Msg("subscribe failed, will retry via mode check")
    // No explicit catchup call needed - daWorkerLoop will call determineSyncMode()

This is elegant: instead of forcing an immediate mode switch, it lets the natural loop re-evaluate the mode.


11. Watchdog Timeout is Essential

Status: ✅ Good

The watchdog timer prevents the node from being stuck in follow mode if the subscription stalls:

case <-time.After(watchdogTimeout):
    s.logger.Debug().Dur("timeout", watchdogTimeout).Msg("subscription watchdog triggered")
    if s.determineSyncMode() == SyncModeCatchup {
        return errors.New("fell behind, switching to catchup")
    }

Suggestion: Consider making the watchdog timeout configurable via config.DA.BlockTime multiplier configurable, as 3x might not fit all DA layers.


Metrics & Observability

12. New Metrics Are Well-Defined

Status: ✅ Good

The metrics provide good visibility into sync mode behavior:

SyncMode        metrics.Gauge   // 0=catchup, 1=follow
SubscribeErrors metrics.Counter // Subscription failures
ModeSwitches    metrics.Counter // Mode transitions

These will be invaluable for monitoring and alerting on sync health.


13. Missing Metric: Subscription Duration

Severity: Low | Impact: Observability

It would be useful to track how long the node stays in each mode to identify patterns (e.g., constantly switching indicates instability).

Recommendation: Add a histogram or gauge for time-in-mode:

SyncModeDuration map[string]metrics.Histogram // Time spent in each mode

Test Coverage Analysis

14. Critical Missing Tests

Severity: High | Impact: Quality Assurance

The codecov report shows 35.42% patch coverage with 113 missing lines. Key gaps:

  1. Subscription Error Scenarios (syncer.go:452-544)

    • No tests for subscription channel closures
    • No tests for watchdog timeout triggering
    • No tests for mode fallback on subscription failure
  2. LocalHead RPC Endpoint (tools/local-da/rpc.go)

    • 22 missing lines - 0% coverage
    • This is a new RPC method that's untested
  3. DA Client Methods (block/internal/da/client.go)

    • 12 missing lines for Subscribe/LocalHead

Recommendation: Add unit tests for:

  • Subscription channel closure recovery
  • Watchdog timer triggering
  • Mode switching logic under various DA heights
  • LocalHead RPC endpoint

15. Existing Tests Look Good

Status: ✅ Good

The test additions in syncer_test.go (+200 lines) suggest coverage for the core sync mode logic. However, I couldn't fully review them due to file size. The test structure in forced_inclusion_retriever_test.go has been improved with better mocking.


Security Considerations

16. Subscription Data Validation

Status: ✅ Good

Subscription responses go through the same validation as polled data:

events := s.daRetriever.ProcessBlobs(s.ctx, blobs, resp.Height)

The ProcessBlobs method validates signatures and proposer addresses, preventing malicious data injection.


17. No New Authentication/Authorization Issues

Status: ✅ Good

The DA client subscription uses the same authentication as polling, so there's no new attack surface.


Performance Considerations

18. Subscription Memory Management

Status: ⚠️ Monitor

The subscription channels are unbounded (created by the DA client), which could theoretically lead to memory growth if the syncer can't keep up. However, the existing buffering and pending event cache should handle this.

Recommendation: Monitor memory usage in production, especially the heightInCh buffer and pending cache size.


19. Forced Inclusion Async Prefetcher

Status: ✅ Good

The AsyncBlockRetriever with a 2x epoch prefetch window is a smart optimization:

asyncFetcher := NewAsyncBlockRetriever(
    client, logger, client.GetForcedInclusionNamespace(),
    cfg, daStartHeight,
    daEpochSize*2,  // prefetch window
)

This should reduce latency when processing forced inclusion transactions.


Code Style & Go Conventions

20. Generally Idiomatic Go

Status: ✅ Good

The code follows Go conventions well:

  • Clear error wrapping with fmt.Errorf
  • Proper context propagation
  • Good use of defer for cleanup
  • Meaningful variable names

21. SyncMode Type is Clean

Status: ✅ Good

type SyncMode int

const (
    SyncModeCatchup SyncMode = iota
    SyncModeFollow
)

func (m SyncMode) String() string { ... }

This is a textbook example of Go enumeration patterns.


Documentation

22. Function Comments Are Good

Status: ✅ Good

Key functions have clear documentation:

// subscribeAndFollow uses the DA subscription API to receive real-time blob notifications.
// It subscribes to header, data, and forced inclusion namespaces and processes incoming blobs.
// Returns when subscription fails, context is cancelled, or node falls behind.
func (s *Syncer) subscribeAndFollow() error {

23. Missing High-Level Design Doc

Severity: Low | Impact: Maintainability

While individual functions are well-documented, there's no high-level explanation of the sync mode state machine, transition conditions, or design rationale.

Recommendation: Add a package-level comment or markdown doc explaining:

  • Why dual modes are needed
  • When mode switches occur
  • Trade-offs between modes
  • Expected behavior under various network conditions

Summary of Recommendations

Must Fix (Critical):

  1. Loop variable capture safety (syncer.go:567-580) - Add explicit capture or verify cache behavior
  2. ⚠️ Add tracing to Subscribe/LocalHead (da/tracing.go:136-142)
  3. ⚠️ Review subscription context cancellation (syncer.go:455-550)

Should Fix (High Priority):

  1. 📝 Add missing test coverage (35% → target 70%+)
  2. 🔄 Remove forced inclusion subscription redundancy (per review feedback)
  3. 🧹 Clean up dead code in nil channel cases (syncer.go:516-527)

Consider (Medium Priority):

  1. 📊 Add sync mode duration metrics
  2. 📖 Add high-level design documentation
  3. ⚙️ Make watchdog timeout configurable

Conclusion

This is a well-architected feature that adds significant value to the DA sync layer. The dual-mode approach is smart, and the implementation is generally solid. However, the critical loop variable issue, missing test coverage, and tracing gaps need to be addressed before merging.

The forced inclusion subscription redundancy (identified by julienrbrt) is a valid architectural concern that should be refactored for simplicity and performance.

Overall Assessment: ⚠️ Approve with Changes Required


@github-actions
Copy link
Contributor

github-actions bot commented Jan 8, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedJan 15, 2026, 2:31 PM

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the data availability (DA) worker's synchronization capabilities by introducing dynamic sync modes. It allows the system to efficiently transition between an aggressive polling strategy when catching up to the DA chain and a more resource-optimized, subscription-based approach once it reaches the chain head. This change aims to improve the overall performance and responsiveness of DA data processing.

Highlights

  • Dynamic Sync Modes: Introduced two dynamic synchronization modes for the DA worker: 'Catchup' for aggressive polling when behind the chain head, and 'Follow' for efficient, subscription-based real-time updates when caught up.
  • Subscription-based Updates: Implemented a new Subscribe method in the DA client, enabling the 'Follow' mode to receive real-time blob notifications, significantly reducing polling overhead.
  • Local Head Tracking: Added a LocalHead method to the DA client and a corresponding RPC endpoint, allowing the syncer to determine its current position relative to the DA chain head and facilitate mode switching.
  • New Metrics for Observability: Integrated new Prometheus metrics (SyncMode, SubscribeErrors, ModeSwitches) to provide visibility into the current sync mode, subscription failures, and transitions between modes.
  • Refactored DA Worker Loop: The daWorkerLoop has been refactored to intelligently determine and switch between 'Catchup' and 'Follow' modes based on the node's synchronization status, including a watchdog mechanism for 'Follow' mode.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a dual-mode synchronization mechanism, allowing the node to switch between an aggressive polling 'catchup' mode and a more efficient subscription-based 'follow' mode. This is a significant enhancement for nodes that are at the head of the chain. The changes are well-structured, introducing new DA client methods, metrics, and the core state machine logic in the daWorkerLoop. My review identified two critical bugs related to incorrect loop variable capturing that could lead to data corruption, and a couple of medium-severity design and style issues. Once these points are addressed, the implementation will be much more robust.

@codecov
Copy link

codecov bot commented Jan 12, 2026

Codecov Report

❌ Patch coverage is 30.76923% with 198 lines in your changes missing coverage. Please review.
✅ Project coverage is 57.52%. Comparing base (34034c1) to head (7d39e71).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
block/internal/syncing/syncer.go 43.16% 78 Missing and 1 partial ⚠️
block/internal/da/async_block_retriever.go 0.00% 41 Missing ⚠️
block/internal/da/forced_inclusion_retriever.go 48.14% 26 Missing and 2 partials ⚠️
tools/local-da/rpc.go 0.00% 22 Missing ⚠️
block/internal/common/subscription.go 0.00% 12 Missing ⚠️
block/internal/da/client.go 0.00% 12 Missing ⚠️
block/internal/da/tracing.go 0.00% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2961      +/-   ##
==========================================
- Coverage   57.77%   57.52%   -0.25%     
==========================================
  Files          98      101       +3     
  Lines        9394     9876     +482     
==========================================
+ Hits         5427     5681     +254     
- Misses       3364     3582     +218     
- Partials      603      613      +10     
Flag Coverage Δ
combined 57.52% <30.76%> (-0.25%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@tac0turtle tac0turtle force-pushed the marko/sync_subscribe branch from 95aeea4 to ecfcf83 Compare January 12, 2026 08:22
@tac0turtle tac0turtle marked this pull request as ready for review January 12, 2026 08:37
@julienrbrt
Copy link
Member

CI is not so glad.

@tac0turtle
Copy link
Contributor Author

CI is not so glad.

fixed

}

// Subscribe to forced inclusion namespace if configured
var forcedInclusionCh <-chan *blobrpc.SubscriptionResponse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to follow the force included namespace. The retriever itself does the caching itself. Maybe we should align this logic in the force inclusion retriever as well instead of using the async block fetching (in da)

s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process data subscription")
}

case resp, ok := <-forcedInclusionCh:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, this is dead code

daStartHeight uint64
asyncFetcher AsyncBlockRetriever

mu sync.Mutex
Copy link
Member

@julienrbrt julienrbrt Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to track all of this the tracking is done in the retrieve function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

working on some cleanup yea

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants