Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion .github/workflows/benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,22 @@ jobs:
| grep -E '^(goos:|goarch:|pkg:|cpu:|Benchmark|PASS$|ok\s)' \
| tee bench.txt

- name: Store Benchmark Result
- name: Store Benchmark Result (PR)
if: github.event_name == 'pull_request'
uses: benchmark-action/github-action-benchmark@v1
with:
name: Go Benchmarks
tool: 'go'
output-file-path: bench.txt
# On PRs, publishing to gh-pages is not allowed in all permission models.
auto-push: false
# Fail if performance drops by more than 50%
alert-threshold: '200%'
comment-on-alert: false
fail-on-alert: false

- name: Store Benchmark Result (main)
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
uses: benchmark-action/github-action-benchmark@v1
with:
name: Go Benchmarks
Expand Down
45 changes: 33 additions & 12 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,28 @@ func run() error {
defer db.Close()
defer func() { _ = rdb.Close() }()

compute, storage, network, lbProxy, err := initBackends(deps, cfg, logger, db, rdb)
rawCompute, rawStorage, rawNetwork, rawLBProxy, err := initBackends(deps, cfg, logger, db, rdb)
if err != nil {
logger.Error("backend initialization failed", "error", err)
return err
}

// Wrap raw backends with resilience decorators (circuit breaker, bulkhead, timeouts).
compute := platform.NewResilientCompute(rawCompute, logger, platform.ResilientComputeOpts{})
storage := platform.NewResilientStorage(rawStorage, logger, platform.ResilientStorageOpts{})
network := platform.NewResilientNetwork(rawNetwork, logger, platform.ResilientNetworkOpts{})
lbProxy := platform.NewResilientLB(rawLBProxy, logger, platform.ResilientLBOpts{})

repos := deps.InitRepositories(db, rdb)

// Create leader elector for singleton worker coordination.
// When multiple worker replicas run, only one will hold leadership per key.
leaderElector := postgres.NewPgLeaderElector(db, logger)

svcs, workers, err := deps.InitServices(setup.ServiceConfig{
Config: cfg, Repos: repos, Compute: compute, Storage: storage,
Network: network, LBProxy: lbProxy, DB: db, RDB: rdb, Logger: logger,
LeaderElector: leaderElector,
})
if err != nil {
logger.Error("service initialization failed", "error", err)
Expand All @@ -154,52 +166,61 @@ func run() error {
r.Use(otelgin.Middleware("compute-api"))
}

runApplication(deps, cfg, logger, r, workers)
return nil
return runApplication(deps, cfg, logger, r, workers)
}

func runApplication(deps AppDeps, cfg *platform.Config, logger *slog.Logger, r *gin.Engine, workers *setup.Workers) {
role := os.Getenv("APP_ROLE")
func runApplication(deps AppDeps, cfg *platform.Config, logger *slog.Logger, r *gin.Engine, workers *setup.Workers) error {
role := os.Getenv("ROLE")
if role == "" {
role = "all"
}

validRoles := map[string]bool{"api": true, "worker": true, "all": true}
if !validRoles[role] {
logger.Error("invalid ROLE value, must be one of: api, worker, all", "role", role)
return fmt.Errorf("invalid ROLE value %q, must be one of: api, worker, all", role)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
logger.Info("starting with role", "role", role)

wg := &sync.WaitGroup{}
workerCtx, workerCancel := context.WithCancel(context.Background())

if role == "worker" || role == "all" {
runWorkers(workerCtx, wg, workers)
}

srv := deps.NewHTTPServer(":"+cfg.Port, r)

var srv *http.Server
if role == "api" || role == "all" {
srv = deps.NewHTTPServer(":"+cfg.Port, r)
go func() {
logger.Info("starting compute-api", "port", cfg.Port)
if err := deps.StartHTTPServer(srv); err != nil && !stdlib_errors.Is(err, http.ErrServerClosed) {
logger.Error("failed to start server", "error", err)
}
}()
} else {
logger.Info("running in worker-only mode")
logger.Info("running in worker-only mode, HTTP server disabled")
}

quit := make(chan os.Signal, 1)
deps.NotifySignals(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

logger.Info("shutting down server...")
logger.Info("shutting down...")

ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()

if err := deps.ShutdownHTTPServer(ctx, srv); err != nil {
logger.Error("server forced to shutdown", "error", err)
if srv != nil {
if err := deps.ShutdownHTTPServer(ctx, srv); err != nil {
logger.Error("server forced to shutdown", "error", err)
}
}

workerCancel()
wg.Wait()
logger.Info("server exited")
logger.Info("shutdown complete")
return nil
}

type runner interface {
Expand Down
105 changes: 104 additions & 1 deletion cmd/api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ func TestRunApplicationApiRoleStartsAndShutsDown(t *testing.T) {
}()
}

runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
err := runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
if err != nil {
t.Fatalf("runApplication returned error: %v", err)
}

select {
case <-shutdownCalled:
Expand All @@ -167,6 +170,106 @@ func TestRunApplicationApiRoleStartsAndShutsDown(t *testing.T) {
}
}

func TestRunApplicationWorkerRoleDoesNotStartHTTP(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
t.Setenv("ROLE", "worker")

deps := DefaultDeps()

deps.NewHTTPServer = func(string, http.Handler) *http.Server {
t.Fatalf("NewHTTPServer should not be called in worker-only mode")
return nil
}
deps.StartHTTPServer = func(*http.Server) error {
t.Fatalf("StartHTTPServer should not be called in worker-only mode")
return nil
}
deps.ShutdownHTTPServer = func(context.Context, *http.Server) error {
t.Fatalf("ShutdownHTTPServer should not be called in worker-only mode")
return nil
}
deps.NotifySignals = func(c chan<- os.Signal, _ ...os.Signal) {
go func() {
// Give workers a moment to start, then signal shutdown
time.Sleep(50 * time.Millisecond)
c <- syscall.SIGTERM
}()
}

err := runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
if err != nil {
t.Fatalf("runApplication returned error: %v", err)
}
// If we reach here without t.Fatalf, the test passes — no HTTP server was touched.
}
Comment on lines +173 to +204
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

This worker-role test still passes if the process exits immediately.

If runApplication returns right away in worker mode, this test still passes because the only assertions are that HTTP hooks were not called. Run it in a goroutine and assert it stays blocked until the SIGTERM path is exercised.

🧪 Assert that worker mode stays alive until shutdown
- runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
- // If we reach here without t.Fatalf, the test passes — no HTTP server was touched.
+ returned := make(chan struct{})
+ go func() {
+ 	defer close(returned)
+ 	runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
+ }()
+
+ select {
+ case <-returned:
+ 	t.Fatalf("runApplication returned before SIGTERM in worker mode")
+ case <-time.After(25 * time.Millisecond):
+ }
+
+ select {
+ case <-returned:
+ case <-time.After(time.Second):
+ 	t.Fatalf("expected worker mode to exit after SIGTERM")
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func TestRunApplicationWorkerRoleDoesNotStartHTTP(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
t.Setenv("ROLE", "worker")
deps := DefaultDeps()
deps.NewHTTPServer = func(string, http.Handler) *http.Server {
t.Fatalf("NewHTTPServer should not be called in worker-only mode")
return nil
}
deps.StartHTTPServer = func(*http.Server) error {
t.Fatalf("StartHTTPServer should not be called in worker-only mode")
return nil
}
deps.ShutdownHTTPServer = func(context.Context, *http.Server) error {
t.Fatalf("ShutdownHTTPServer should not be called in worker-only mode")
return nil
}
deps.NotifySignals = func(c chan<- os.Signal, _ ...os.Signal) {
go func() {
// Give workers a moment to start, then signal shutdown
time.Sleep(50 * time.Millisecond)
c <- syscall.SIGTERM
}()
}
runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
// If we reach here without t.Fatalf, the test passes — no HTTP server was touched.
}
func TestRunApplicationWorkerRoleDoesNotStartHTTP(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
t.Setenv("ROLE", "worker")
deps := DefaultDeps()
deps.NewHTTPServer = func(string, http.Handler) *http.Server {
t.Fatalf("NewHTTPServer should not be called in worker-only mode")
return nil
}
deps.StartHTTPServer = func(*http.Server) error {
t.Fatalf("StartHTTPServer should not be called in worker-only mode")
return nil
}
deps.ShutdownHTTPServer = func(context.Context, *http.Server) error {
t.Fatalf("ShutdownHTTPServer should not be called in worker-only mode")
return nil
}
deps.NotifySignals = func(c chan<- os.Signal, _ ...os.Signal) {
go func() {
// Give workers a moment to start, then signal shutdown
time.Sleep(50 * time.Millisecond)
c <- syscall.SIGTERM
}()
}
returned := make(chan struct{})
go func() {
defer close(returned)
runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
}()
select {
case <-returned:
t.Fatalf("runApplication returned before SIGTERM in worker mode")
case <-time.After(25 * time.Millisecond):
}
select {
case <-returned:
case <-time.After(time.Second):
t.Fatalf("expected worker mode to exit after SIGTERM")
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/api/main_test.go` around lines 170 - 198, The test currently can pass if
runApplication returns immediately; change
TestRunApplicationWorkerRoleDoesNotStartHTTP so it launches runApplication(deps,
...) in a goroutine and use a done channel (or sync.WaitGroup) to detect
completion, then have NotifySignals send SIGTERM after a short sleep and assert
that runApplication does not return before the SIGTERM (fail the test if the
done channel is closed too early) and that it does return after the SIGTERM;
keep the existing NewHTTPServer/StartHTTPServer/ShutdownHTTPServer t.Fatalf
hooks and reference runApplication, DefaultDeps, and NotifySignals to implement
the blocking/timeout behavior.


func TestRunApplicationDefaultsToAllRole(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
t.Setenv("ROLE", "") // Explicitly empty to verify default

started := make(chan struct{})
shutdownCalled := make(chan struct{})
deps := DefaultDeps()

deps.NewHTTPServer = func(addr string, handler http.Handler) *http.Server {
return &http.Server{
Addr: addr,
Handler: handler,
ReadHeaderTimeout: 10 * time.Second,
}
}
deps.StartHTTPServer = func(*http.Server) error {
close(started)
return http.ErrServerClosed
}
deps.ShutdownHTTPServer = func(context.Context, *http.Server) error {
close(shutdownCalled)
return nil
}
deps.NotifySignals = func(c chan<- os.Signal, _ ...os.Signal) {
go func() {
<-started
c <- syscall.SIGTERM
}()
}

err := runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
if err != nil {
t.Fatalf("runApplication returned error: %v", err)
}

select {
case <-shutdownCalled:
case <-time.After(time.Second):
t.Fatalf("expected server shutdown to be called when ROLE defaults to 'all'")
}
}

func TestRunApplicationInvalidRoleReturnsEarly(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
t.Setenv("ROLE", "invalid")

deps := DefaultDeps()

deps.NewHTTPServer = func(string, http.Handler) *http.Server {
t.Fatalf("NewHTTPServer should not be called for invalid role")
return nil
}
deps.StartHTTPServer = func(*http.Server) error {
t.Fatalf("StartHTTPServer should not be called for invalid role")
return nil
}
deps.NotifySignals = func(c chan<- os.Signal, _ ...os.Signal) {
t.Fatalf("NotifySignals should not be called for invalid role")
}

// Should return immediately without starting anything
err := runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
if err == nil {
t.Fatalf("expected error for invalid role")
}
}

// Stub helpers below keep main.go testable without altering production behavior.

type stubDB struct{ closed bool }
Expand Down
Loading
Loading