diff --git a/core/consumer/consumer.go b/core/consumer/consumer.go index c35ecf5b..15664034 100644 --- a/core/consumer/consumer.go +++ b/core/consumer/consumer.go @@ -2,6 +2,7 @@ package consumer import ( "context" + "errors" "fmt" "sync" "time" @@ -20,16 +21,23 @@ const ( // Consumer orchestrates multiple queue consumers. It handles subscription lifecycle, // message consumption, ack/nack, and graceful shutdown for the entire pipeline. +// Start(), Register() and Stop() are always called in this order so they do not need to be concurrently-safe between +// one another, but the implementation must be thread-safe between message processing and Register()/Stop() operations. type Consumer interface { // Register adds a controller to the consumer. Must be called before Start(). Register(controller Controller) error // Start subscribes to all registered controllers' topics and begins consuming messages. + // Context is cancelled when the consumer is stopped, the implementation should propagate it to the controllers + // running message processing. The implementation can react immediately to the context cancellation by returning `ctx.Err()` instead of starting the message processing, + // but can also opt out to defer the cancellation after the message processing routine is set up. + // Start() will only be called once at the application startup, so it does not need to be idempotent. Start(ctx context.Context) error // Stop gracefully shuts down all controllers with the specified timeout. // timeoutMs is the maximum time in milliseconds to wait for graceful shutdown. // Returns error if shutdown times out. + // Stop() will only be called once at the application shutdown, so it does not need to be idempotent. Stop(timeoutMs int64) error } @@ -115,10 +123,10 @@ func (m *consumer) Start(ctx context.Context) error { for _, controller := range m.controllers { if err := m.subscribe(ctx, controller); err != nil { - // Cleanup any started controllers. Ignore error since we're returning - // the subscribe error. - _ = m.unsubscribeAll(startupCleanupTimeoutMs) - return fmt.Errorf("failed to start controller %s: %w", controller.Name(), err) + // Cleanup any started controllers. Include cleanup error if any. + cleanupErr := m.unsubscribeAll(startupCleanupTimeoutMs) + startErr := fmt.Errorf("failed to start controller %s: %w", controller.Name(), err) + return errors.Join(startErr, cleanupErr) } } @@ -329,10 +337,17 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d elapsed := time.Since(start) + // By convention, Controller can only return context.Canceled if it is cancelled by the context, i.e. when consumer is stopped or application is shutting down + isCanceled := errors.Is(err, context.Canceled) + // Track latency with success/failure tags successTag := "true" if err != nil { - successTag = "false" + if isCanceled { + successTag = "cancel" + } else { + successTag = "false" + } } latencyScope := controllerScope.Tagged(map[string]string{ @@ -369,7 +384,13 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d } // Controller returned retryable error - nack message for retry - m.logger.Errorw("controller error, nacking message", + // This includes cancelled controllers. + what := "error" + if isCanceled { + what = "cancel" + } + m.logger.Errorw("controller error or cancel, nacking message", + "what", what, "controller", controller.Name(), "topic_key", topicKey, "message_id", msg.ID, @@ -443,6 +464,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d // Cancels all subscription contexts and waits for consumption goroutines to finish. // timeoutMs is the maximum time in milliseconds to wait for graceful shutdown. // Returns error if shutdown times out. +// Stop() is not idempotent and can only be called once. func (m *consumer) Stop(timeoutMs int64) error { m.mu.Lock() m.stopped = true @@ -481,7 +503,7 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error { // Wait for each subscription to finish, splitting the timeout budget across them remaining := time.Duration(timeoutMs) * time.Millisecond - var timedOut bool + var timedOutControllers []string for topicKey, sub := range m.subscriptions { start := time.Now() select { @@ -492,7 +514,7 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error { "controller", sub.controller.Name(), "topic_key", topicKey, ) - timedOut = true + timedOutControllers = append(timedOutControllers, sub.controller.Name()) } elapsed := time.Since(start) remaining -= elapsed @@ -504,8 +526,8 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error { // Clear subscriptions m.subscriptions = make(map[TopicKey]*activeSubscription) - if timedOut { - return fmt.Errorf("timeout waiting for controllers to stop") + if len(timedOutControllers) > 0 { + return fmt.Errorf("timeout waiting for controllers to stop: %v", timedOutControllers) } m.logger.Debugw("all controllers stopped gracefully") diff --git a/core/consumer/controller.go b/core/consumer/controller.go index 69a126b8..6168743d 100644 --- a/core/consumer/controller.go +++ b/core/consumer/controller.go @@ -75,11 +75,20 @@ func (d *deliveryWrapper) Metadata() map[string]string { // The Controller interface enables clean separation of concerns: // - Controller focuses on business logic (deserialize, process, return error status) // - Consumer handles infrastructure (subscription, ack/nack, metrics, lifecycle) +// The implementation of the controller should be idempotent and stateless. The controller is expected to be retried for the same message multiple times and should process side effects gracefully. +// The implementation must be thread-safe. type Controller interface { // Process processes a delivery. Controller receives consumer.Delivery (not extension/queue.Delivery) // which prevents direct Ack/Nack calls - Consumer handles those automatically. - // Return nil to ack the message (success), error to nack and retry, - // or NonRetryableError to ack a poison pill message. + // Return nil to ack the message (success), error to nack and retry, or NonRetryableError to ack a poison pill message. + // Context controls the lifecycle of the service. It is cancelled when the consumer is stopped. The implementation should process it gracefully: + // - Pass the context to the underlying services and wait for them to complete their operations. + // - Proceed to the nearest safe state. + // - If the nearest safe state is not the final state of the controller, return `ctx.Err()` and the message would be nacked and retried. + // - If the nearest safe state is the final state of the controller, return either nil (if it is a successful state) or an actual error (and not `ctx.Err()`). + // It is strongly recommended to properly propagate the context all the way down to the long-running services and ensure they can process it "good enough", guided by the principles outlined above. + // Never return `context.Canceled` unless it is a result of a processing `ctx` cancellation! + Process(ctx context.Context, delivery Delivery) error // Name returns the controller name for logging and metrics. diff --git a/core/errs/errs.go b/core/errs/errs.go index 1cf65668..7ca92e6b 100644 --- a/core/errs/errs.go +++ b/core/errs/errs.go @@ -2,6 +2,7 @@ package errs import ( "errors" + "context" ) // userError represents an error caused by invalid user input or actions. @@ -98,11 +99,14 @@ func IsUserError(err error) bool { return errors.As(err, &target) } -// IsRetryable checks if err is retryable. Returns true only when err is or -// wraps an infrastructure error whose retryable flag is set. User errors are +// IsRetryable checks if err is retryable. Returns true when err is or +// wraps an infrastructure error whose retryable flag is set or when err is context.Canceled. User errors are // never retryable. A generic error (not wrapped) returns false, consistent // with the convention that unclassified errors are non-retryable. func IsRetryable(err error) bool { + if errors.Is(err, context.Canceled) { + return true + } var ie *infraError if errors.As(err, &ie) { return ie.retryable diff --git a/example/server/README.md b/example/server/README.md new file mode 100644 index 00000000..8522a312 --- /dev/null +++ b/example/server/README.md @@ -0,0 +1,61 @@ +# Example Servers + +Reference implementations of the Gateway and Orchestrator services, wired with MySQL-backed extensions and runnable via Docker Compose. + +## Starting + +### Docker Compose (recommended) + +```bash +make local-start # builds binaries and starts all services +make local-ps # verify services are running +make local-logs # view logs +``` + +### Standalone + +Each service requires two MySQL databases (app and queue) and is configured via environment variables: + +| Variable | Required | Description | Default | +|--------------------|----------|--------------------------------------|--------------------------------------| +| `MYSQL_DSN` | yes | App database DSN | — | +| `QUEUE_MYSQL_DSN` | yes | Queue database DSN | — | +| `PORT` | no | gRPC listen address | `:8081` (gateway), `:8082` (orchestrator) | +| `HOSTNAME` | no | Subscriber name for queue consumers | `orchestrator-` (orchestrator only) | +| `GITHUB_TOKEN` | no | GitHub API token for merge checker | — (orchestrator only) | +| `GITHUB_GRAPHQL_URL` | no | GitHub GraphQL endpoint | `https://api.github.com/graphql` (orchestrator only) | + +```bash +export MYSQL_DSN='root:root@tcp(127.0.0.1:3306)/submitqueue?parseTime=true' +export QUEUE_MYSQL_DSN='root:root@tcp(127.0.0.1:3307)/submitqueue?parseTime=true' + +# Start gateway (default :8081) +go run ./example/server/gateway + +# Start orchestrator (default :8082) +go run ./example/server/orchestrator +``` + +## Stopping + +Both services handle `SIGINT` (Ctrl+C) and `SIGTERM` gracefully: + +1. The gRPC server stops accepting new connections and drains in-flight RPCs. +2. The orchestrator additionally stops its queue consumers (30s timeout). +3. The process exits with a code reflecting the outcome (see below). + +To stop Docker Compose services: + +```bash +make local-stop +``` + +## Exit Codes + +| Code | Meaning | +|------|-------------------------------------------------------------------------| +| 0 | Clean shutdown, no errors. | +| 1 | Startup failure or runtime error (details on stderr). | +| 143 | Stopped by signal (SIGINT or SIGTERM). This is 128 + SIGTERM per POSIX. | + +When shutdown itself encounters errors (e.g. the gRPC server returns an error during graceful stop, or queue consumers time out), those override the signal exit code and the process exits with code 1. The actual errors are printed to stderr. diff --git a/example/server/gateway/main.go b/example/server/gateway/main.go index 7e04b6e1..2638ed9c 100644 --- a/example/server/gateway/main.go +++ b/example/server/gateway/main.go @@ -3,6 +3,7 @@ package main import ( "context" "database/sql" + "errors" "fmt" "net" "os" @@ -41,10 +42,21 @@ func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.Land } func main() { + code := 0 if err := run(); err != nil { - fmt.Fprintf(os.Stderr, "Gateway server failure: %v\n", err) - os.Exit(1) + if errors.Is(err, context.Canceled) { + fmt.Println("Gateway server stopped by signal") + + // Return 143 (128 + SIGTERM) as per POSIX standard if the application receives any termination signal from the OS. Ideally we should return 128+SIGINT for SIGINT and 128+SIGTERM for SIGTERM, + // but it will require a special processing not yet available in the standard library. + code = 128 + int(syscall.SIGTERM) + } else { + fmt.Fprintf(os.Stderr, "Gateway server failure: %v\n", err) + // TODO: classify errors and implement a binary protocol for exit codes, so far 1 for everything + code = 1 + } } + os.Exit(code) } func run() error { @@ -162,7 +174,7 @@ func run() error { } fmt.Printf("Gateway gRPC server is running on %s\n", port) - fmt.Println("Press Ctrl+C to stop.") + fmt.Println("Press Ctrl+C to stop, or send a SIGTERM.") // Start server in a goroutine and wait for it to finish serverErrCh := make(chan error, 1) @@ -170,16 +182,28 @@ func run() error { serverErrCh <- grpcServer.Serve(listener) }() - // Wait for interrupt signal or server exit + // Wait for interrupt signal or server critical error + // If interruption is signaled, gracefully stop the server + // If an error happens during shutdown, return the actual error, not the context cancellation error + var serverErr error select { case <-ctx.Done(): - fmt.Println("\nShutting down gateway server...") + fmt.Println("Shutting down gateway server due to interruption signal...") + + // Set the error to the context cancellation error to be surfaced as a desired exit code by the main function + // to indicate that the server was stopped as intended + // It may be overridden by the server error if any + err = ctx.Err() + + // stop GRPC server and wait for it to exit grpcServer.GracefulStop() - _ = <-serverErrCh // Wait for the server to exit and ignore the error - case errCh := <-serverErrCh: - if errCh != nil { - err = fmt.Errorf("\nServer exited with error: %w\n", errCh) - } + serverErr = <-serverErrCh + case serverErr = <-serverErrCh: + fmt.Println("Shutting down gateway server due to critical GRPC server error...") + } + + if serverErr != nil { + err = fmt.Errorf("GRPC server exited with error: %w", serverErr) } return err diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index f59b9eb9..26503b12 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -3,6 +3,7 @@ package main import ( "context" "database/sql" + "errors" "fmt" "net" "net/http" @@ -51,10 +52,21 @@ func (s *OrchestratorServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb } func main() { + code := 0 if err := run(); err != nil { - fmt.Fprintf(os.Stderr, "Orchestrator server failure: %v\n", err) - os.Exit(1) + if errors.Is(err, context.Canceled) { + fmt.Println("Orchestrator server stopped by signal") + + // Return 143 (128 + SIGTERM) as per POSIX standard if the application receives any termination signal from the OS. Ideally we should return 128+SIGINT for SIGINT and 128+SIGTERM for SIGTERM, + // but it will require a special processing not yet available in the standard library. + code = 128 + int(syscall.SIGTERM) + } else { + fmt.Fprintf(os.Stderr, "Orchestrator server failure: %v\n", err) + // TODO: classify errors and implement a binary protocol for exit codes, so far 1 for everything + code = 1 + } } + os.Exit(code) } func run() error { @@ -170,6 +182,8 @@ func run() error { // Start consumers if err := c.Start(ctx); err != nil { + // The error can also be a result of a context cancellation due to SIGINT or SIGTERM. + // This is expected, just propagate it. return fmt.Errorf("failed to start consumers: %w", err) } logger.Info("consumer started") @@ -198,7 +212,7 @@ func run() error { } fmt.Printf("Orchestrator gRPC server is running on %s\n", port) - fmt.Println("Press Ctrl+C to stop.") + fmt.Println("Press Ctrl+C to stop, or send a SIGTERM.") // Start server in a goroutine and wait for it to finish serverErrCh := make(chan error, 1) @@ -206,20 +220,47 @@ func run() error { serverErrCh <- grpcServer.Serve(listener) }() - // Wait for interrupt signal or server exit + // Wait for interrupt signal or server critical error + // If interruption is signaled, gracefully stop the server + // If server exits with an error, cancel the context to signal cancellation to the queue consumers + // After this, stop consumers + // If an error happens during shutdown, return the actual error, not the context cancellation error + var serverErr error select { case <-ctx.Done(): - fmt.Println("\nShutting down orchestrator server...") - c.Stop(30000) // Stop consumers with 30s timeout + fmt.Println("Shutting down orchestrator server due to interruption signal...") + + // Set the error to the context cancellation error to be surfaced as a desired exit code by the main function + // to indicate that the server was stopped as intended + // It may be overridden by the server error if any + err = ctx.Err() + + // stop GRPC server and wait for it to exit grpcServer.GracefulStop() - _ = <-serverErrCh // Wait for the server to exit and ignore the error - case errCh := <-serverErrCh: - if errCh != nil { - err = fmt.Errorf("\nServer exited with error: %w\n", errCh) - } - c.Stop(30000) // Stop consumers with 30s timeout + serverErr = <-serverErrCh + case serverErr = <-serverErrCh: + fmt.Println("Shutting down orchestrator server due to critical GRPC server error...") + + // Cancel the context to signal cancellation to the queue consumers + cancel() + } + + if serverErr != nil { + serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr) + } + + // Stop consumers with 30s timeout, by this time the context should be cancelled and the processing threads may already be exiting; recollect them + errStop := c.Stop(30000); + if errStop != nil { + errStop = fmt.Errorf("failed to stop consumers: %w", errStop) + } + + if errStop != nil || serverErr != nil { + // Override context cancellation error with the shutdown error + err = errors.Join(errStop, serverErr) } + // Return the error to be surfaced as a desired exit code by the main function return err } diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index 5cc7035d..f126facb 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -93,8 +93,35 @@ func (s *E2EIntegrationSuite) SetupSuite() { } func (s *E2EIntegrationSuite) TearDownSuite() { + t := s.T() s.log.Logf("Tearing down E2E integration test suite") - // Cleanup handled automatically by testutil.ComposeStack + + // Gracefully stop services via SIGTERM and verify exit codes before compose teardown. + // Use a 60s timeout to exceed the orchestrator's 30s consumer drain window. + // Stop both services first so their shutdown runs in parallel, then check exit codes. + const stopTimeoutSec = 60 + const wantExitCode = 143 // 128 + SIGTERM (15) + + gatewayStopErr := s.stack.StopService("gateway-service", stopTimeoutSec) + orchestratorStopErr := s.stack.StopService("orchestrator-service", stopTimeoutSec) + + if assert.NoError(t, gatewayStopErr, "failed to stop gateway service") { + exitCode, err := s.stack.ServiceExitCode("gateway-service") + if assert.NoError(t, err, "failed to get gateway exit code") { + assert.Equal(t, wantExitCode, exitCode, + "gateway should exit with 128+SIGTERM (%d) on graceful shutdown", wantExitCode) + } + } + + if assert.NoError(t, orchestratorStopErr, "failed to stop orchestrator service") { + exitCode, err := s.stack.ServiceExitCode("orchestrator-service") + if assert.NoError(t, err, "failed to get orchestrator exit code") { + assert.Equal(t, wantExitCode, exitCode, + "orchestrator should exit with 128+SIGTERM (%d) on graceful shutdown", wantExitCode) + } + } + + // Compose stack cleanup handled automatically by t.Cleanup } func (s *E2EIntegrationSuite) TestPingGateway() { diff --git a/test/testutil/compose.go b/test/testutil/compose.go index 1e423408..6384140d 100644 --- a/test/testutil/compose.go +++ b/test/testutil/compose.go @@ -269,6 +269,64 @@ func (s *ComposeStack) ConnectGRPC(serviceName string, containerPort int) (*grpc return conn, nil } +// StopService sends SIGTERM to a service and waits for it to stop. +// timeoutSec is the maximum time to wait before Docker sends SIGKILL. +func (s *ComposeStack) StopService(serviceName string, timeoutSec int) error { + s.t.Helper() + + s.log.Logf("Stopping service %s (timeout %ds)", serviceName, timeoutSec) + + args := append(s.composeCmd[1:], "-f", s.composeFile, "-p", s.projectName, + "stop", "-t", fmt.Sprintf("%d", timeoutSec), serviceName) + cmd := exec.CommandContext(s.ctx, s.composeCmd[0], args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to stop service %s: %w", serviceName, err) + } + + s.log.Logf("Service %s stopped", serviceName) + return nil +} + +// ServiceExitCode returns the exit code of a stopped service container. +// Must be called after the service has exited. +func (s *ComposeStack) ServiceExitCode(serviceName string) (int, error) { + s.t.Helper() + + // Get container ID for the service + args := append(s.composeCmd[1:], "-f", s.composeFile, "-p", s.projectName, + "ps", "-a", "-q", serviceName) + cmd := exec.CommandContext(s.ctx, s.composeCmd[0], args...) + output, err := cmd.Output() + if err != nil { + return 0, fmt.Errorf("failed to get container ID for service %s: %w", serviceName, err) + } + + containerID := strings.TrimSpace(string(output)) + if containerID == "" { + return 0, fmt.Errorf("no container found for service %s", serviceName) + } + + // Get exit code via docker inspect + inspectCmd := exec.CommandContext(s.ctx, "docker", "inspect", + "--format", "{{.State.ExitCode}}", containerID) + inspectOutput, err := inspectCmd.Output() + if err != nil { + return 0, fmt.Errorf("failed to inspect container %s: %w", containerID, err) + } + + var exitCode int + _, err = fmt.Sscanf(strings.TrimSpace(string(inspectOutput)), "%d", &exitCode) + if err != nil { + return 0, fmt.Errorf("failed to parse exit code from %q: %w", string(inspectOutput), err) + } + + s.log.Logf("Service %s exit code: %d", serviceName, exitCode) + return exitCode, nil +} + // setupDockerEnv configures Docker environment for docker-compose. func setupDockerEnv(t *testing.T) { t.Helper()