feat(daemon): add circuit breaker for sync handler#158
Conversation
Add a circuit breaker pattern to prevent overwhelming the server when it's experiencing errors: - After 10 consecutive failures, circuit opens and saves data locally - 1-hour timer resets circuit and republishes saved data to pub/sub - Data is persisted to ~/.shelltime/sync-pending.jsonl when circuit open - Uses interface pattern for testability 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary of ChangesHello @AnnatarHe, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request integrates a circuit breaker pattern into the daemon's synchronization process. This enhancement aims to prevent system overload and improve robustness by temporarily halting sync operations to the server when a threshold of consecutive errors is met. Instead of failing outright, data is stored locally and automatically retried after a set interval, ensuring eventual consistency and better handling of transient server issues. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
Codecov Report❌ Patch coverage is
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 1 file with indirect coverage changes 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Code Review
This pull request introduces a circuit breaker pattern for the sync handler, which is a great addition for improving the resilience of the application. When sync operations fail repeatedly, the circuit opens, and pending data is saved locally for later retry. The implementation is mostly solid, using a background goroutine with a ticker to reset the circuit and retry.
My review includes feedback on a few key areas:
- Refactoring the use of a global variable for the circuit breaker service to use dependency injection for better testability and maintainability.
- Improving the memory efficiency of the retry mechanism to handle large backlogs of pending data.
- A couple of correctness improvements related to file path handling and dead code removal.
Overall, this is a valuable feature, and with a few adjustments, it can be made even more robust.
| } | ||
|
|
||
| // Global instance | ||
| var syncCircuitBreaker CircuitBreaker |
There was a problem hiding this comment.
Using a global variable syncCircuitBreaker creates tight coupling and makes the code harder to test and reason about. It's assigned as a side effect in NewSyncCircuitBreakerService, which is not an ideal pattern.
A better approach is to use dependency injection. The SyncCircuitBreakerService instance should be created in main.go and passed explicitly to the components that need it, such as the handlePubSubSync handler. This makes dependencies explicit and significantly improves testability.
| } | ||
|
|
||
| func (s *SyncCircuitBreakerService) SaveForRetry(ctx context.Context, payload interface{}) error { | ||
| filePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", model.SYNC_PENDING_FILE)) |
There was a problem hiding this comment.
There are a couple of improvements that can be made to file path handling here.
- Duplication: The logic to construct
filePathis duplicated inretryPendingDataat line 157. It's better to extract this into a private helper function. - Path Construction: Using
path/filepath.Joinis the idiomatic and cross-platform way to construct file paths instead offmt.Sprintf. - Directory Creation:
os.OpenFilewithO_CREATEwill not create parent directories. If~/.shelltimedoesn't exist, this will fail. You should ensure the directory exists usingos.MkdirAll.
Here's an example of a helper function incorporating these points that you could add to the service:
import (
"path/filepath"
)
func (s *SyncCircuitBreakerService) getPendingFilePath() (string, error) {
homeDir, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("could not get user home directory: %w", err)
}
filePath := filepath.Join(homeDir, model.SYNC_PENDING_FILE)
dir := filepath.Dir(filePath)
if err := os.MkdirAll(dir, 0750); err != nil {
return "", fmt.Errorf("failed to create directory for pending sync file: %w", err)
}
return filePath, nil
}You could then call this helper in SaveForRetry and retryPendingData.
| var lines []string | ||
| scanner := bufio.NewScanner(file) | ||
| for scanner.Scan() { | ||
| line := scanner.Text() | ||
| if line != "" { | ||
| lines = append(lines, line) | ||
| } | ||
| } | ||
| file.Close() | ||
|
|
||
| if err := scanner.Err(); err != nil { | ||
| slog.Error("Error reading pending sync file", slog.Any("err", err)) | ||
| return | ||
| } | ||
|
|
||
| if len(lines) == 0 { | ||
| slog.Debug("No pending sync data to retry") | ||
| return | ||
| } | ||
|
|
||
| slog.Info("Starting sync data retry", slog.Int("pendingCount", len(lines))) | ||
|
|
||
| var failedLines []string | ||
| successCount := 0 | ||
|
|
||
| for _, line := range lines { | ||
| // Republish to pub/sub topic | ||
| msg := message.NewMessage(watermill.NewUUID(), []byte(line)) | ||
| if err := s.publisher.Publish(PubSubTopic, msg); err != nil { | ||
| slog.Warn("Failed to republish sync data, keeping for next retry", slog.Any("err", err)) | ||
| failedLines = append(failedLines, line) | ||
| } else { | ||
| successCount++ | ||
| } | ||
| } | ||
|
|
||
| // Rewrite file with only failed lines | ||
| if err := s.rewriteLogFile(filePath, failedLines); err != nil { | ||
| slog.Error("Failed to update pending sync file", slog.Any("err", err)) | ||
| return | ||
| } | ||
|
|
||
| slog.Info("Sync data retry completed", | ||
| slog.Int("republished", successCount), | ||
| slog.Int("remaining", len(failedLines))) | ||
| } |
There was a problem hiding this comment.
The current implementation of retryPendingData reads the entire pending data file into memory via the lines slice. If the service is unavailable for an extended period, this file could become very large, leading to excessive memory consumption.
A more memory-efficient, streaming approach is recommended:
- Open the pending file for reading.
- Create a new temporary file for writing failed retries.
- Read the pending file line by line.
- For each line, attempt to republish the message.
- If republishing fails, write the line to the temporary file.
- After processing all lines, close both files.
- Atomically replace the original pending file with the temporary file (or delete it if the temp file is empty).
This avoids loading all pending messages into memory at once and makes the retry mechanism more robust.
| if err := syncCircuitBreakerService.Start(ctx); err != nil { | ||
| slog.Error("Failed to start sync circuit breaker service", slog.Any("err", err)) | ||
| } else { |
There was a problem hiding this comment.
The syncCircuitBreakerService.Start function currently always returns nil, which makes this error check unreachable. The if err != nil block is effectively dead code.
You should either:
- Modify
Startto return a potential error if one can occur during initialization. - Change the signature of
Startto not return an error (e.g.,func (s *SyncCircuitBreakerService) Start(ctx context.Context)), and simplify the call site.
Given the current implementation of Start, the second option seems more appropriate.
syncCircuitBreakerService.Start(ctx)
slog.Info("Sync circuit breaker service started")
defer syncCircuitBreakerService.Stop()- Move CircuitBreakerService from daemon to model package - Add CircuitBreaker interface with generic []byte payload - Add comprehensive tests for circuit breaker functionality - Keep thin wrapper in daemon for SocketMessage handling - Add CircuitBreaker to mockery config for mock generation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary
~/.shelltime/sync-pending.jsonlCircuitBreaker) for testabilityFiles Changed
model/db.goSYNC_PENDING_FILEconstantdaemon/circuit_breaker.goCircuitBreakerinterface +SyncCircuitBreakerServicecmd/daemon/main.godaemon/handlers.sync.goBehavior
Test plan
go build ./...go test ./daemon/...~/.shelltime/sync-pending.jsonlwhen circuit is open🤖 Generated with Claude Code