Skip to content

Commit e76c694

Browse files
committed
feat(termination): Graceful Termination Protocol
1 parent 555d294 commit e76c694

8 files changed

Lines changed: 283 additions & 37 deletions

File tree

core/consumer/consumer.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package consumer
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"sync"
78
"time"
@@ -20,16 +21,23 @@ const (
2021

2122
// Consumer orchestrates multiple queue consumers. It handles subscription lifecycle,
2223
// message consumption, ack/nack, and graceful shutdown for the entire pipeline.
24+
// Start(), Register() and Stop() are always called in this order so they do not need to be concurrently-safe between
25+
// one another, but the implementation must be thread-safe between message processing and Register()/Stop() operations.
2326
type Consumer interface {
2427
// Register adds a controller to the consumer. Must be called before Start().
2528
Register(controller Controller) error
2629

2730
// Start subscribes to all registered controllers' topics and begins consuming messages.
31+
// Context is cancelled when the consumer is stopped, the implementation should propagate it to the controllers
32+
// running message processing. The implementation can react immediately to the context cancellation by returning `ctx.Err()` instead of starting the message processing,
33+
// but can also opt out to defer the cancellation after the message processing routine is set up.
34+
// Start() will only be called once at the application startup, so it does not need to be idempotent.
2835
Start(ctx context.Context) error
2936

3037
// Stop gracefully shuts down all controllers with the specified timeout.
3138
// timeoutMs is the maximum time in milliseconds to wait for graceful shutdown.
3239
// Returns error if shutdown times out.
40+
// Stop() will only be called once at the application shutdown, so it does not need to be idempotent.
3341
Stop(timeoutMs int64) error
3442
}
3543

@@ -115,10 +123,10 @@ func (m *consumer) Start(ctx context.Context) error {
115123

116124
for _, controller := range m.controllers {
117125
if err := m.subscribe(ctx, controller); err != nil {
118-
// Cleanup any started controllers. Ignore error since we're returning
119-
// the subscribe error.
120-
_ = m.unsubscribeAll(startupCleanupTimeoutMs)
121-
return fmt.Errorf("failed to start controller %s: %w", controller.Name(), err)
126+
// Cleanup any started controllers. Include cleanup error if any.
127+
cleanupErr := m.unsubscribeAll(startupCleanupTimeoutMs)
128+
startErr := fmt.Errorf("failed to start controller %s: %w", controller.Name(), err)
129+
return errors.Join(startErr, cleanupErr)
122130
}
123131
}
124132

@@ -329,10 +337,17 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
329337

330338
elapsed := time.Since(start)
331339

340+
// By convention, Controller can only return context.Canceled if it is cancelled by the context, i.e. when consumer is stopped or application is shutting down
341+
isCanceled := errors.Is(err, context.Canceled)
342+
332343
// Track latency with success/failure tags
333344
successTag := "true"
334345
if err != nil {
335-
successTag = "false"
346+
if isCanceled {
347+
successTag = "cancel"
348+
} else {
349+
successTag = "false"
350+
}
336351
}
337352

338353
latencyScope := controllerScope.Tagged(map[string]string{
@@ -369,7 +384,13 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
369384
}
370385

371386
// Controller returned retryable error - nack message for retry
372-
m.logger.Errorw("controller error, nacking message",
387+
// This includes cancelled controllers.
388+
what := "error"
389+
if isCanceled {
390+
what = "cancel"
391+
}
392+
m.logger.Errorw("controller error or cancel, nacking message",
393+
"what", what,
373394
"controller", controller.Name(),
374395
"topic_key", topicKey,
375396
"message_id", msg.ID,
@@ -443,6 +464,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
443464
// Cancels all subscription contexts and waits for consumption goroutines to finish.
444465
// timeoutMs is the maximum time in milliseconds to wait for graceful shutdown.
445466
// Returns error if shutdown times out.
467+
// Stop() is not idempotent and can only be called once.
446468
func (m *consumer) Stop(timeoutMs int64) error {
447469
m.mu.Lock()
448470
m.stopped = true
@@ -481,7 +503,7 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error {
481503

482504
// Wait for each subscription to finish, splitting the timeout budget across them
483505
remaining := time.Duration(timeoutMs) * time.Millisecond
484-
var timedOut bool
506+
var timedOutControllers []string
485507
for topicKey, sub := range m.subscriptions {
486508
start := time.Now()
487509
select {
@@ -492,7 +514,7 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error {
492514
"controller", sub.controller.Name(),
493515
"topic_key", topicKey,
494516
)
495-
timedOut = true
517+
timedOutControllers = append(timedOutControllers, sub.controller.Name())
496518
}
497519
elapsed := time.Since(start)
498520
remaining -= elapsed
@@ -504,8 +526,8 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error {
504526
// Clear subscriptions
505527
m.subscriptions = make(map[TopicKey]*activeSubscription)
506528

507-
if timedOut {
508-
return fmt.Errorf("timeout waiting for controllers to stop")
529+
if len(timedOutControllers) > 0 {
530+
return fmt.Errorf("timeout waiting for controllers to stop: %v", timedOutControllers)
509531
}
510532

511533
m.logger.Debugw("all controllers stopped gracefully")

core/consumer/controller.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,20 @@ func (d *deliveryWrapper) Metadata() map[string]string {
7575
// The Controller interface enables clean separation of concerns:
7676
// - Controller focuses on business logic (deserialize, process, return error status)
7777
// - Consumer handles infrastructure (subscription, ack/nack, metrics, lifecycle)
78+
// The implementation of the controller should be idempotent and stateless. The controller is expected to be retried for the same message multiple times and should process side effects gracefully.
79+
// The implementation must be thread-safe.
7880
type Controller interface {
7981
// Process processes a delivery. Controller receives consumer.Delivery (not extension/queue.Delivery)
8082
// which prevents direct Ack/Nack calls - Consumer handles those automatically.
81-
// Return nil to ack the message (success), error to nack and retry,
82-
// or NonRetryableError to ack a poison pill message.
83+
// Return nil to ack the message (success), error to nack and retry, or NonRetryableError to ack a poison pill message.
84+
// Context controls the lifecycle of the service. It is cancelled when the consumer is stopped. The implementation should process it gracefully:
85+
// - Pass the context to the underlying services and wait for them to complete their operations.
86+
// - Proceed to the nearest safe state.
87+
// - If the nearest safe state is not the final state of the controller, return `ctx.Err()` and the message would be nacked and retried.
88+
// - If the nearest safe state is the final state of the controller, return either nil (if it is a successful state) or an actual error (and not `ctx.Err()`).
89+
// It is strongly recommended to properly propagate the context all the way down to the long-running services and ensure they can process it "good enough", guided by the principles outlined above.
90+
// Never return `context.Canceled` unless it is a result of a processing `ctx` cancellation!
91+
8392
Process(ctx context.Context, delivery Delivery) error
8493

8594
// Name returns the controller name for logging and metrics.

core/errs/errs.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package errs
22

33
import (
44
"errors"
5+
"context"
56
)
67

78
// userError represents an error caused by invalid user input or actions.
@@ -98,11 +99,14 @@ func IsUserError(err error) bool {
9899
return errors.As(err, &target)
99100
}
100101

101-
// IsRetryable checks if err is retryable. Returns true only when err is or
102-
// wraps an infrastructure error whose retryable flag is set. User errors are
102+
// IsRetryable checks if err is retryable. Returns true when err is or
103+
// wraps an infrastructure error whose retryable flag is set or when err is context.Canceled. User errors are
103104
// never retryable. A generic error (not wrapped) returns false, consistent
104105
// with the convention that unclassified errors are non-retryable.
105106
func IsRetryable(err error) bool {
107+
if errors.Is(err, context.Canceled) {
108+
return true
109+
}
106110
var ie *infraError
107111
if errors.As(err, &ie) {
108112
return ie.retryable

example/server/README.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Example Servers
2+
3+
Reference implementations of the Gateway and Orchestrator services, wired with MySQL-backed extensions and runnable via Docker Compose.
4+
5+
## Starting
6+
7+
### Docker Compose (recommended)
8+
9+
```bash
10+
make local-start # builds binaries and starts all services
11+
make local-ps # verify services are running
12+
make local-logs # view logs
13+
```
14+
15+
### Standalone
16+
17+
Each service requires two MySQL databases (app and queue) and is configured via environment variables:
18+
19+
| Variable | Required | Description | Default |
20+
|--------------------|----------|--------------------------------------|--------------------------------------|
21+
| `MYSQL_DSN` | yes | App database DSN ||
22+
| `QUEUE_MYSQL_DSN` | yes | Queue database DSN ||
23+
| `PORT` | no | gRPC listen address | `:8081` (gateway), `:8082` (orchestrator) |
24+
| `HOSTNAME` | no | Subscriber name for queue consumers | `orchestrator-<unix_ts>` (orchestrator only) |
25+
| `GITHUB_TOKEN` | no | GitHub API token for merge checker | — (orchestrator only) |
26+
| `GITHUB_GRAPHQL_URL` | no | GitHub GraphQL endpoint | `https://api.github.com/graphql` (orchestrator only) |
27+
28+
```bash
29+
export MYSQL_DSN='root:root@tcp(127.0.0.1:3306)/submitqueue?parseTime=true'
30+
export QUEUE_MYSQL_DSN='root:root@tcp(127.0.0.1:3307)/submitqueue?parseTime=true'
31+
32+
# Start gateway (default :8081)
33+
go run ./example/server/gateway
34+
35+
# Start orchestrator (default :8082)
36+
go run ./example/server/orchestrator
37+
```
38+
39+
## Stopping
40+
41+
Both services handle `SIGINT` (Ctrl+C) and `SIGTERM` gracefully:
42+
43+
1. The gRPC server stops accepting new connections and drains in-flight RPCs.
44+
2. The orchestrator additionally stops its queue consumers (30s timeout).
45+
3. The process exits with a code reflecting the outcome (see below).
46+
47+
To stop Docker Compose services:
48+
49+
```bash
50+
make local-stop
51+
```
52+
53+
## Exit Codes
54+
55+
| Code | Meaning |
56+
|------|-------------------------------------------------------------------------|
57+
| 0 | Clean shutdown, no errors. |
58+
| 1 | Startup failure or runtime error (details on stderr). |
59+
| 143 | Stopped by signal (SIGINT or SIGTERM). This is 128 + SIGTERM per POSIX. |
60+
61+
When shutdown itself encounters errors (e.g. the gRPC server returns an error during graceful stop, or queue consumers time out), those override the signal exit code and the process exits with code 1. The actual errors are printed to stderr.

example/server/gateway/main.go

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"context"
55
"database/sql"
6+
"errors"
67
"fmt"
78
"net"
89
"os"
@@ -41,10 +42,21 @@ func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.Land
4142
}
4243

4344
func main() {
45+
code := 0
4446
if err := run(); err != nil {
45-
fmt.Fprintf(os.Stderr, "Gateway server failure: %v\n", err)
46-
os.Exit(1)
47+
if errors.Is(err, context.Canceled) {
48+
fmt.Println("Gateway server stopped by signal")
49+
50+
// Return 143 (128 + SIGTERM) as per POSIX standard if the application receives any termination signal from the OS. Ideally we should return 128+SIGINT for SIGINT and 128+SIGTERM for SIGTERM,
51+
// but it will require a special processing not yet available in the standard library.
52+
code = 128 + int(syscall.SIGTERM)
53+
} else {
54+
fmt.Fprintf(os.Stderr, "Gateway server failure: %v\n", err)
55+
// TODO: classify errors and implement a binary protocol for exit codes, so far 1 for everything
56+
code = 1
57+
}
4758
}
59+
os.Exit(code)
4860
}
4961

5062
func run() error {
@@ -162,24 +174,36 @@ func run() error {
162174
}
163175

164176
fmt.Printf("Gateway gRPC server is running on %s\n", port)
165-
fmt.Println("Press Ctrl+C to stop.")
177+
fmt.Println("Press Ctrl+C to stop, or send a SIGTERM.")
166178

167179
// Start server in a goroutine and wait for it to finish
168180
serverErrCh := make(chan error, 1)
169181
go func() {
170182
serverErrCh <- grpcServer.Serve(listener)
171183
}()
172184

173-
// Wait for interrupt signal or server exit
185+
// Wait for interrupt signal or server critical error
186+
// If interruption is signaled, gracefully stop the server
187+
// If an error happens during shutdown, return the actual error, not the context cancellation error
188+
var serverErr error
174189
select {
175190
case <-ctx.Done():
176-
fmt.Println("\nShutting down gateway server...")
191+
fmt.Println("Shutting down gateway server due to interruption signal...")
192+
193+
// Set the error to the context cancellation error to be surfaced as a desired exit code by the main function
194+
// to indicate that the server was stopped as intended
195+
// It may be overridden by the server error if any
196+
err = ctx.Err()
197+
198+
// stop GRPC server and wait for it to exit
177199
grpcServer.GracefulStop()
178-
_ = <-serverErrCh // Wait for the server to exit and ignore the error
179-
case errCh := <-serverErrCh:
180-
if errCh != nil {
181-
err = fmt.Errorf("\nServer exited with error: %w\n", errCh)
182-
}
200+
serverErr = <-serverErrCh
201+
case serverErr = <-serverErrCh:
202+
fmt.Println("Shutting down gateway server due to critical GRPC server error...")
203+
}
204+
205+
if serverErr != nil {
206+
err = fmt.Errorf("GRPC server exited with error: %w", serverErr)
183207
}
184208

185209
return err

0 commit comments

Comments
 (0)