Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions core/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consumer

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -20,16 +21,23 @@ const (

// Consumer orchestrates multiple queue consumers. It handles subscription lifecycle,
// message consumption, ack/nack, and graceful shutdown for the entire pipeline.
// Start(), Register() and Stop() are always called in this order so they do not need to be concurrently-safe between
// one another, but the implementation must be thread-safe between message processing and Register()/Stop() operations.
type Consumer interface {
// Register adds a controller to the consumer. Must be called before Start().
Register(controller Controller) error

// Start subscribes to all registered controllers' topics and begins consuming messages.
// Context is cancelled when the consumer is stopped, the implementation should propagate it to the controllers
// running message processing. The implementation can react immediately to the context cancellation by returning `ctx.Err()` instead of starting the message processing,
// but can also opt out to defer the cancellation after the message processing routine is set up.
// Start() will only be called once at the application startup, so it does not need to be idempotent.
Start(ctx context.Context) error

// Stop gracefully shuts down all controllers with the specified timeout.
// timeoutMs is the maximum time in milliseconds to wait for graceful shutdown.
// Returns error if shutdown times out.
// Stop() will only be called once at the application shutdown, so it does not need to be idempotent.
Stop(timeoutMs int64) error
}

Expand Down Expand Up @@ -115,10 +123,10 @@ func (m *consumer) Start(ctx context.Context) error {

for _, controller := range m.controllers {
if err := m.subscribe(ctx, controller); err != nil {
// Cleanup any started controllers. Ignore error since we're returning
// the subscribe error.
_ = m.unsubscribeAll(startupCleanupTimeoutMs)
return fmt.Errorf("failed to start controller %s: %w", controller.Name(), err)
// Cleanup any started controllers. Include cleanup error if any.
cleanupErr := m.unsubscribeAll(startupCleanupTimeoutMs)
startErr := fmt.Errorf("failed to start controller %s: %w", controller.Name(), err)
return errors.Join(startErr, cleanupErr)
}
}

Expand Down Expand Up @@ -329,10 +337,17 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d

elapsed := time.Since(start)

// By convention, Controller can only return context.Canceled if it is cancelled by the context, i.e. when consumer is stopped or application is shutting down
isCanceled := errors.Is(err, context.Canceled)

// Track latency with success/failure tags
successTag := "true"
if err != nil {
successTag = "false"
if isCanceled {
successTag = "cancel"
} else {
successTag = "false"
}
}

latencyScope := controllerScope.Tagged(map[string]string{
Expand Down Expand Up @@ -369,7 +384,13 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
}

// Controller returned retryable error - nack message for retry
m.logger.Errorw("controller error, nacking message",
// This includes cancelled controllers.
what := "error"
if isCanceled {
what = "cancel"
}
m.logger.Errorw("controller error or cancel, nacking message",
"what", what,
"controller", controller.Name(),
"topic_key", topicKey,
"message_id", msg.ID,
Expand Down Expand Up @@ -443,6 +464,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
// Cancels all subscription contexts and waits for consumption goroutines to finish.
// timeoutMs is the maximum time in milliseconds to wait for graceful shutdown.
// Returns error if shutdown times out.
// Stop() is not idempotent and can only be called once.
func (m *consumer) Stop(timeoutMs int64) error {
m.mu.Lock()
m.stopped = true
Expand Down Expand Up @@ -481,7 +503,7 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error {

// Wait for each subscription to finish, splitting the timeout budget across them
remaining := time.Duration(timeoutMs) * time.Millisecond
var timedOut bool
var timedOutControllers []string
for topicKey, sub := range m.subscriptions {
start := time.Now()
select {
Expand All @@ -492,7 +514,7 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error {
"controller", sub.controller.Name(),
"topic_key", topicKey,
)
timedOut = true
timedOutControllers = append(timedOutControllers, sub.controller.Name())
}
elapsed := time.Since(start)
remaining -= elapsed
Expand All @@ -504,8 +526,8 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error {
// Clear subscriptions
m.subscriptions = make(map[TopicKey]*activeSubscription)

if timedOut {
return fmt.Errorf("timeout waiting for controllers to stop")
if len(timedOutControllers) > 0 {
return fmt.Errorf("timeout waiting for controllers to stop: %v", timedOutControllers)
}

m.logger.Debugw("all controllers stopped gracefully")
Expand Down
13 changes: 11 additions & 2 deletions core/consumer/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,20 @@ func (d *deliveryWrapper) Metadata() map[string]string {
// The Controller interface enables clean separation of concerns:
// - Controller focuses on business logic (deserialize, process, return error status)
// - Consumer handles infrastructure (subscription, ack/nack, metrics, lifecycle)
// The implementation of the controller should be idempotent and stateless. The controller is expected to be retried for the same message multiple times and should process side effects gracefully.
// The implementation must be thread-safe.
type Controller interface {
// Process processes a delivery. Controller receives consumer.Delivery (not extension/queue.Delivery)
// which prevents direct Ack/Nack calls - Consumer handles those automatically.
// Return nil to ack the message (success), error to nack and retry,
// or NonRetryableError to ack a poison pill message.
// Return nil to ack the message (success), error to nack and retry, or NonRetryableError to ack a poison pill message.
// Context controls the lifecycle of the service. It is cancelled when the consumer is stopped. The implementation should process it gracefully:
// - Pass the context to the underlying services and wait for them to complete their operations.
// - Proceed to the nearest safe state.
// - If the nearest safe state is not the final state of the controller, return `ctx.Err()` and the message would be nacked and retried.
// - If the nearest safe state is the final state of the controller, return either nil (if it is a successful state) or an actual error (and not `ctx.Err()`).
// It is strongly recommended to properly propagate the context all the way down to the long-running services and ensure they can process it "good enough", guided by the principles outlined above.
// Never return `context.Canceled` unless it is a result of a processing `ctx` cancellation!

Process(ctx context.Context, delivery Delivery) error

// Name returns the controller name for logging and metrics.
Expand Down
8 changes: 6 additions & 2 deletions core/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package errs

import (
"errors"
"context"
)

// userError represents an error caused by invalid user input or actions.
Expand Down Expand Up @@ -98,11 +99,14 @@ func IsUserError(err error) bool {
return errors.As(err, &target)
}

// IsRetryable checks if err is retryable. Returns true only when err is or
// wraps an infrastructure error whose retryable flag is set. User errors are
// IsRetryable checks if err is retryable. Returns true when err is or
// wraps an infrastructure error whose retryable flag is set or when err is context.Canceled. User errors are
// never retryable. A generic error (not wrapped) returns false, consistent
// with the convention that unclassified errors are non-retryable.
func IsRetryable(err error) bool {
if errors.Is(err, context.Canceled) {
return true
}
var ie *infraError
if errors.As(err, &ie) {
return ie.retryable
Expand Down
61 changes: 61 additions & 0 deletions example/server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Example Servers

Reference implementations of the Gateway and Orchestrator services, wired with MySQL-backed extensions and runnable via Docker Compose.

## Starting

### Docker Compose (recommended)

```bash
make local-start # builds binaries and starts all services
make local-ps # verify services are running
make local-logs # view logs
```

### Standalone

Each service requires two MySQL databases (app and queue) and is configured via environment variables:

| Variable | Required | Description | Default |
|--------------------|----------|--------------------------------------|--------------------------------------|
| `MYSQL_DSN` | yes | App database DSN | — |
| `QUEUE_MYSQL_DSN` | yes | Queue database DSN | — |
| `PORT` | no | gRPC listen address | `:8081` (gateway), `:8082` (orchestrator) |
| `HOSTNAME` | no | Subscriber name for queue consumers | `orchestrator-<unix_ts>` (orchestrator only) |
| `GITHUB_TOKEN` | no | GitHub API token for merge checker | — (orchestrator only) |
| `GITHUB_GRAPHQL_URL` | no | GitHub GraphQL endpoint | `https://api.github.com/graphql` (orchestrator only) |

```bash
export MYSQL_DSN='root:root@tcp(127.0.0.1:3306)/submitqueue?parseTime=true'
export QUEUE_MYSQL_DSN='root:root@tcp(127.0.0.1:3307)/submitqueue?parseTime=true'

# Start gateway (default :8081)
go run ./example/server/gateway

# Start orchestrator (default :8082)
go run ./example/server/orchestrator
```

## Stopping

Both services handle `SIGINT` (Ctrl+C) and `SIGTERM` gracefully:

1. The gRPC server stops accepting new connections and drains in-flight RPCs.
2. The orchestrator additionally stops its queue consumers (30s timeout).
3. The process exits with a code reflecting the outcome (see below).

To stop Docker Compose services:

```bash
make local-stop
```

## Exit Codes

| Code | Meaning |
|------|-------------------------------------------------------------------------|
| 0 | Clean shutdown, no errors. |
| 1 | Startup failure or runtime error (details on stderr). |
| 143 | Stopped by signal (SIGINT or SIGTERM). This is 128 + SIGTERM per POSIX. |

When shutdown itself encounters errors (e.g. the gRPC server returns an error during graceful stop, or queue consumers time out), those override the signal exit code and the process exits with code 1. The actual errors are printed to stderr.
44 changes: 34 additions & 10 deletions example/server/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"database/sql"
"errors"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -41,10 +42,21 @@ func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.Land
}

func main() {
code := 0
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "Gateway server failure: %v\n", err)
os.Exit(1)
if errors.Is(err, context.Canceled) {
fmt.Println("Gateway server stopped by signal")

// Return 143 (128 + SIGTERM) as per POSIX standard if the application receives any termination signal from the OS. Ideally we should return 128+SIGINT for SIGINT and 128+SIGTERM for SIGTERM,
// but it will require a special processing not yet available in the standard library.
code = 128 + int(syscall.SIGTERM)
} else {
fmt.Fprintf(os.Stderr, "Gateway server failure: %v\n", err)
// TODO: classify errors and implement a binary protocol for exit codes, so far 1 for everything
code = 1
}
}
os.Exit(code)
}

func run() error {
Expand Down Expand Up @@ -162,24 +174,36 @@ func run() error {
}

fmt.Printf("Gateway gRPC server is running on %s\n", port)
fmt.Println("Press Ctrl+C to stop.")
fmt.Println("Press Ctrl+C to stop, or send a SIGTERM.")

// Start server in a goroutine and wait for it to finish
serverErrCh := make(chan error, 1)
go func() {
serverErrCh <- grpcServer.Serve(listener)
}()

// Wait for interrupt signal or server exit
// Wait for interrupt signal or server critical error
// If interruption is signaled, gracefully stop the server
// If an error happens during shutdown, return the actual error, not the context cancellation error
var serverErr error
select {
case <-ctx.Done():
fmt.Println("\nShutting down gateway server...")
fmt.Println("Shutting down gateway server due to interruption signal...")

// Set the error to the context cancellation error to be surfaced as a desired exit code by the main function
// to indicate that the server was stopped as intended
// It may be overridden by the server error if any
err = ctx.Err()

// stop GRPC server and wait for it to exit
grpcServer.GracefulStop()
_ = <-serverErrCh // Wait for the server to exit and ignore the error
case errCh := <-serverErrCh:
if errCh != nil {
err = fmt.Errorf("\nServer exited with error: %w\n", errCh)
}
serverErr = <-serverErrCh
case serverErr = <-serverErrCh:
fmt.Println("Shutting down gateway server due to critical GRPC server error...")
}

if serverErr != nil {
err = fmt.Errorf("GRPC server exited with error: %w", serverErr)
}

return err
Expand Down
Loading