Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"syscall"
"time"
"unsafe"

"go.opentelemetry.io/otel"
Expand All @@ -25,6 +27,17 @@ var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/orchestrator/pkg/san

const maxRequestsInProgress = 4096

const (
// sliceMaxRetries is the number of times to retry source.Slice() after the initial attempt.
// Total attempts = sliceMaxRetries + 1.
sliceMaxRetries = 3
// sliceRetryBaseDelay is the initial backoff delay before the first retry.
// Subsequent retries double the delay (exponential backoff), capped at sliceRetryMaxDelay.
sliceRetryBaseDelay = 50 * time.Millisecond
// sliceRetryMaxDelay is the maximum backoff delay between retries.
sliceRetryMaxDelay = 500 * time.Millisecond
)

var ErrUnexpectedEventType = errors.New("unexpected event type")

// hasEvent checks if a specific poll event flag is set in revents.
Expand Down Expand Up @@ -331,7 +344,43 @@ func (u *Userfaultfd) faultPage(
}
}()

b, dataErr := source.Slice(ctx, offset, int64(pagesize))
var b []byte
var dataErr error
var attempt int

retryLoop:
for attempt = range sliceMaxRetries + 1 {
b, dataErr = source.Slice(ctx, offset, int64(pagesize))
if dataErr == nil {
break
}

if attempt >= sliceMaxRetries || ctx.Err() != nil {
break
}

u.logger.Warn(ctx, "UFFD serve slice error, retrying",
zap.Int("attempt", attempt+1),
zap.Int("max_attempts", sliceMaxRetries+1),
zap.Error(dataErr),
)

delay := min(sliceRetryBaseDelay<<attempt, sliceRetryMaxDelay)
jitter := time.Duration(rand.Int63n(int64(delay) / 2))

Comment thread
jakubno marked this conversation as resolved.
backoff := time.NewTimer(delay + jitter)

select {
case <-ctx.Done():
backoff.Stop()

dataErr = errors.Join(dataErr, ctx.Err())

break retryLoop
case <-backoff.C:
}
}

if dataErr != nil {
var signalErr error
if onFailure != nil {
Expand All @@ -341,9 +390,12 @@ func (u *Userfaultfd) faultPage(
joinedErr := errors.Join(dataErr, signalErr)

span.RecordError(joinedErr)
u.logger.Error(ctx, "UFFD serve data fetch error", zap.Error(joinedErr))
u.logger.Error(ctx, "UFFD serve data fetch error after retries",
zap.Int("attempts", attempt+1),
zap.Error(joinedErr),
)

return fmt.Errorf("failed to read from source: %w", joinedErr)
return fmt.Errorf("failed to read from source after %d attempts: %w", attempt+1, joinedErr)
}

var copyMode CULong
Expand Down
Loading