Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# EasyP API Service — Environment Variables
# Copy to .env and uncomment the values you need.

# --- Ports (docker-compose) ---
# EASYP_TRAEFIK_PORT=80
# EASYP_GRAFANA_PORT=3000
# EASYP_REGISTRY_PORT=5005
# EASYP_POSTGRES_PORT=5432
# EASYP_GRPC_PORT=8080
# EASYP_METRICS_PORT=8081
# EASYP_HEALTH_PORT=8082
# EASYP_GATEWAY_PORT=8083

# --- Enterprise License (uncomment both to enable) ---
# Ed25519 public key (hex) — embedded into binary at build time
# LICENSE_PUBLIC_KEY=c4c720019f4c70dcb30f3cdbac7f73689c6e027d02cb8ae8c5a3cbe654cfb6e0
# PASETO v4.public token — passed to service at runtime (tier=enterprise, 10yr expiry)
# LICENSE_KEY=v4.public.eyJleHAiOiIyMDM2LTAzLTEyVDIwOjIwOjE0KzAzOjAwIiwiaWF0IjoiMjAyNi0wMy0xNVQyMDoyMDoxNCswMzowMCIsIm1heF9wbHVnaW5zIjoiLTEiLCJtYXhfd29ya2VycyI6IjE2Iiwic3ViIjoidGVzdC1kZXYtbGljZW5zZSIsInRpZXIiOiJlbnRlcnByaXNlIn0qc0ELh3_NyDNOPxfreo8MH2dGeaFrkax-78CSmM2B06hfjmWIU-bp5aO98TIHpMO8MXNuS9nqaXiCEGqpOXEP
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ test/generated/

# EasyP generated files
easyp.lock
.kiro/
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM golang:alpine3.22 AS builder

ARG LICENSE_PUBLIC_KEY=""

RUN apk update && apk add --no-cache ca-certificates

COPY go.mod go.mod
Expand All @@ -10,7 +12,7 @@ COPY . /app

WORKDIR /app

RUN go build -o easyp ./cmd/main.go
RUN go build -ldflags "-X main.licensePublicKey=${LICENSE_PUBLIC_KEY}" -o easyp ./cmd/main.go

FROM alpine:3.22

Expand Down
1 change: 1 addition & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ version: "3"
tasks:

up:
desc: "Start all services. For Enterprise: LICENSE_PUBLIC_KEY=... LICENSE_KEY=... task up"
dir: "{{.USER_WORKING_DIR}}"
preconditions:
- "test -f docker-compose.yml"
Expand Down
79 changes: 75 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"syscall"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/ratelimit"
"github.com/hellofresh/health-go/v5"
_ "github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -33,8 +34,10 @@ import (
"github.com/easyp-tech/service/internal/database/migrations"
"github.com/easyp-tech/service/internal/flags"
"github.com/easyp-tech/service/internal/grpchelper"
"github.com/easyp-tech/service/internal/license"
"github.com/easyp-tech/service/internal/mcpserver"
"github.com/easyp-tech/service/internal/monitor"
"github.com/easyp-tech/service/internal/ratelimiter"
"github.com/easyp-tech/service/internal/telemetry"
)

Expand All @@ -50,6 +53,8 @@ type (
Registry registryConfig `yaml:"registry" env:", prefix=REGISTRY_"`
Telemetry telemetryConfig `yaml:"telemetry" env:", prefix=TELEMETRY_"`
WorkerPool workerPoolConfig `yaml:"worker_pool" env:", prefix=WORKER_POOL_"`
License licenseConfig `yaml:"license" env:", prefix=LICENSE_"`
RateLimit rateLimitConfig `yaml:"rate_limit" env:", prefix=RATE_LIMIT_"`
}
server struct {
Host string `yaml:"host" env:"HOST, default=0.0.0.0"`
Expand Down Expand Up @@ -80,11 +85,22 @@ type (
MaxRetries int `yaml:"max_retries" env:"MAX_RETRIES,default=2"`
ShutdownTimeout time.Duration `yaml:"shutdown_timeout" env:"SHUTDOWN_TIMEOUT,default=30s"`
}
licenseConfig struct {
Key string `yaml:"key" env:"KEY"`
File string `yaml:"file" env:"FILE"`
}
rateLimitConfig struct {
RequestsPerSecond float64 `yaml:"requests_per_second" env:"REQUESTS_PER_SECOND,default=10.0"`
Burst int `yaml:"burst" env:"BURST,default=20"`
CleanupInterval time.Duration `yaml:"cleanup_interval" env:"CLEANUP_INTERVAL,default=10m"`
}
)

var (
cfgFile = &flags.File{DefaultPath: "", MaxSize: configFileSize}
logLevel = &flags.Level{Level: slog.LevelDebug}

licensePublicKey string // set via -ldflags "-X main.licensePublicKey=..."
)

// grpcMetrics implements grpchelper.Metrics for the recovery interceptor.
Expand All @@ -96,6 +112,23 @@ func (m *grpcMetrics) PanicsTotal() prometheus.Counter {
return m.panics
}

// featureGateAdapter adapts license.FeatureGate to core.FeatureGate interface.
type featureGateAdapter struct {
gate *license.FeatureGate
}

func (a *featureGateAdapter) Enabled(feature int) bool {
return a.gate.Enabled(license.Feature(feature))
}

func (a *featureGateAdapter) MaxWorkers() int {
return a.gate.MaxWorkers()
}

func (a *featureGateAdapter) MaxPlugins() int {
return a.gate.MaxPlugins()
}

func main() {
flag.Var(cfgFile, "cfg", "path to config file")
flag.Var(logLevel, "log_level", "log level")
Expand Down Expand Up @@ -214,13 +247,41 @@ func run(ctx context.Context, cfg config, reg *prometheus.Registry, namespace st
}
}()

// Create LicenseManager
lm, err := license.NewLicenseManager(licensePublicKey, license.LicenseConfig{
Key: cfg.License.Key,
File: cfg.License.File,
}, log, reg, namespace)
if err != nil {
log.Warn("license initialization error, continuing in community mode", "error", err)
}
lm.StartExpirationWatcher(ctx)
defer lm.Stop()

// Create FeatureGate
gate := license.NewFeatureGate(lm)

// Create RateLimiter
rl := ratelimiter.New(ratelimiter.Config{
RequestsPerSecond: cfg.RateLimit.RequestsPerSecond,
Burst: cfg.RateLimit.Burst,
CleanupInterval: cfg.RateLimit.CleanupInterval,
}, gate, nil, log, reg) // nil keyExtractor → PeerIPExtractor
rl.StartCleanup(ctx)

// Wrap Registry in tracing decorator
tracedRegistry := telemetry.NewTracingRegistry(r)

// Override WorkerPool workers from license limits
wpWorkers := cfg.WorkerPool.Workers
if licenseWorkers := gate.MaxWorkers(); licenseWorkers > 0 {
wpWorkers = licenseWorkers
}

// Wrap TracingRegistry in WorkerPool (limit Docker parallelism)
metricsAdapter := adapter_metrics.New(reg, namespace)
pool := core.NewWorkerPool(tracedRegistry, core.WorkerPoolConfig{
Workers: cfg.WorkerPool.Workers,
Workers: wpWorkers,
QueueSize: cfg.WorkerPool.QueueSize,
GenerationTimeout: cfg.WorkerPool.GenerationTimeout,
MaxRetries: cfg.WorkerPool.MaxRetries,
Expand All @@ -236,7 +297,7 @@ func run(ctx context.Context, cfg config, reg *prometheus.Registry, namespace st
}()

// Core gets pool as Registry
module := core.New(metricsAdapter, pool)
module := core.New(metricsAdapter, pool, &featureGateAdapter{gate: gate})

// Wrap Core in tracing decorator, pass to API
tracedCore := telemetry.NewTracingCore(module)
Expand All @@ -255,14 +316,24 @@ func run(ctx context.Context, cfg config, reg *prometheus.Registry, namespace st
// Create audit interceptor
auditInterceptor := api.NewAuditInterceptor(auditCh, log)

// Create license interceptor (before audit in the chain)
licenseInterceptor := api.NewLicenseInterceptor(gate, log)

// Create gRPC server with full middleware stack
grpcSrv, healthSrv := grpchelper.NewServer(
&grpcMetrics{panics: panicsCounter},
log,
serverMetrics,
api.ErrorToStatus,
[]grpc.UnaryServerInterceptor{auditInterceptor.UnaryServerInterceptor()},
nil,
[]grpc.UnaryServerInterceptor{
ratelimit.UnaryServerInterceptor(rl),
licenseInterceptor.UnaryServerInterceptor(),
auditInterceptor.UnaryServerInterceptor(),
},
[]grpc.StreamServerInterceptor{
ratelimit.StreamServerInterceptor(rl),
licenseInterceptor.StreamServerInterceptor(),
},
)
serverMetrics.InitializeMetrics(grpcSrv)

Expand Down
7 changes: 7 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,10 @@ worker_pool:
generation_timeout: 120s
max_retries: 3
shutdown_timeout: 30s
license:
key: "" # inline PASETO token (takes priority over file)
file: "" # path to license file
rate_limit:
requests_per_second: 10.0 # token refill rate (tokens/sec)
burst: 20 # max tokens (burst size)
cleanup_interval: 10m # stale client cleanup interval
14 changes: 12 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,27 @@ services:
POSTGRES_PASSWORD: easyp_pass
ports:
- "${EASYP_POSTGRES_PORT:-5432}:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U easyp_svc -d easyp_db"]
interval: 2s
timeout: 3s
retries: 10
networks:
- easyp_network

service:
build:
context: .
dockerfile: ./Dockerfile
args:
LICENSE_PUBLIC_KEY: "${LICENSE_PUBLIC_KEY:-}"
container_name: easyp-api-service
restart: always
depends_on:
- postgres
- alloy
postgres:
condition: service_healthy
alloy:
condition: service_started
command: ["-cfg=/config.yml", "-log_level=debug"]
volumes:
- "./config.yml:/config.yml:ro"
Expand All @@ -203,6 +212,7 @@ services:
environment:
OTEL_EXPORTER_OTLP_ENDPOINT: "http://alloy:4317"
OTEL_SERVICE_NAME: "easyp-api-service"
LICENSE_KEY: "${LICENSE_KEY:-}"
ports:
- "${EASYP_GRPC_PORT:-8080}:8080"
- "${EASYP_METRICS_PORT:-8081}:8081"
Expand Down
72 changes: 60 additions & 12 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ EasyP API Service is a code generation service from Protocol Buffers using Docke

```
gRPC Client → API Layer (gRPC + interceptors) → Core (business logic) → WorkerPool → Registry → Docker
→ PostgreSQL
MCP Client → MCP Server (HTTP) → Core
↑ → PostgreSQL
FeatureGate
MCP Client → MCP Server (HTTP) → Core LicenseManager
```

## Components
Expand All @@ -25,7 +27,9 @@ A gRPC server with a chain of interceptors that process each request in the foll
5. **Panic recovery** — panic interception with `panics_total` counter increment
6. **Validation** — incoming message validation
7. **Error code conversion** — conversion of internal errors to gRPC codes
8. **Audit interceptor** — non-blocking audit event recording
8. **Rate limit interceptor** — per-client rate limiting via token bucket algorithm (uses `grpc-ecosystem/go-grpc-middleware/v2/interceptors/ratelimit`). Controlled by FeatureGate; when disabled, all requests pass through
9. **License interceptor** — license-based feature gating (Enterprise methods require a valid license; Community methods pass through)
10. **Audit interceptor** — non-blocking audit event recording

### Core (`internal/core`)

Expand Down Expand Up @@ -75,6 +79,48 @@ An asynchronous audit system.
- Writes to the `audit_log` table
- Non-blocking dispatch from the gRPC interceptor — when the channel overflows, the event is dropped (`audit_events_lost_total` is incremented)

### License System (`internal/license`)

A license-based feature gating system built on PASETO v4.public tokens (Ed25519 signatures).

**Components:**

- **Feature enum** — typed `int` constants defined with `iota`. Each feature is classified as Community or Enterprise. Methods: `String()`, `IsEnterprise()`, `Valid()`
- **LicenseManager** — parses and validates PASETO v4.public tokens using a public key embedded at build time via `-ldflags`. Caches parsed claims in memory. Runs an expiration watcher (60s ticker) that transitions to Community mode when the license expires
- **FeatureGate** — provides `Enabled(feature)`, `MaxWorkers()`, and `MaxPlugins()` based on the current cached claims. Increments the `easyp_license_feature_denied_total` Prometheus counter when an Enterprise feature is denied

**Tiers:**

| Tier | Features | MaxWorkers | MaxPlugins |
|------|----------|------------|------------|
| Community (default) | Code generation, plugin listing, MCP tools, rate limiting, plugin CRUD | 4 | 10 |
| Enterprise | All | 16 | -1 (unlimited) |

**Graceful degradation:** when no license is configured, the token is invalid, or the license expires at runtime, the system falls back to Community mode without interruption.

**Integration:**
- `FeatureGate` is injected into `Core` via constructor. The `core.FeatureGate` interface uses `Enabled(feature int)` (not `license.Feature`) to avoid circular dependencies. A `featureGateAdapter` in `cmd/main.go` bridges `license.FeatureGate` (Feature type) to `core.FeatureGate` (int type)
- `LicenseInterceptor` in the gRPC chain checks the FeatureGate for Enterprise methods and returns `PERMISSION_DENIED` when the feature is not enabled. Community methods (those without a method-to-feature mapping) pass through without checks

### Rate Limiter (`internal/ratelimiter`)

A per-client rate limiting system integrated as a gRPC interceptor.

**Algorithm:** Token bucket via `golang.org/x/time/rate`. Each client gets an independent bucket identified by IP address (extracted via `KeyExtractor` abstraction).

**Components:**

- **Config** — `RequestsPerSecond` (token refill rate), `Burst` (max tokens), `CleanupInterval` (stale bucket cleanup period)
- **KeyExtractor** — `func(ctx context.Context) string` abstraction for client identification. Default implementation `PeerIPExtractor` extracts IP from `peer.FromContext()`. Extensible to API key, tenant ID, etc.
- **RateLimiter** — implements `ratelimit.Limiter` interface from `grpc-ecosystem/go-grpc-middleware/v2`. Per-client buckets stored in `sync.Map`. Background goroutine cleans up stale buckets
- **Prometheus metrics** — `easyp_rate_limit_requests_total` (allowed/denied), `easyp_rate_limit_active_clients`

**Behavior:**
- Controlled by FeatureGate (`FeatureRateLimiting`). When disabled, all requests pass through
- Empty key from KeyExtractor → fail-open (request allowed)
- Denied requests return `RESOURCE_EXHAUSTED` with `X-RateLimit-*` headers in gRPC metadata
- Allowed requests include `X-RateLimit-*` headers in response metadata

### Metrics (`internal/adapters/metrics`)

Prometheus-based metrics:
Expand Down Expand Up @@ -116,21 +162,23 @@ An SQL wrapper over sqlx:
## Code Generation Request Flow

1. The gRPC client sends a `GenerateRequest` with protobuf files and the plugin name
2. The request passes through the interceptor chain (validation, logging, metrics, audit)
2. The request passes through the interceptor chain (validation, logging, metrics, license check, audit)
3. `Core.Generate()` parses the plugin name (`group/name:version`)
4. Core requests the plugin from the Registry (PostgreSQL)
5. The task is submitted to the WorkerPool
6. The WorkerPool checks queue availability (backpressure)
7. A worker executes `docker run --rm -i` with security constraints
8. Protobuf data is passed to the container's stdin, the result is read from stdout
9. On transient errors, a retry is performed (up to `max_retries` attempts)
10. The result is returned to the client via gRPC, metrics and audit are recorded
4. Core checks the FeatureGate for feature availability and applies license-based limits
5. Core requests the plugin from the Registry (PostgreSQL)
6. The task is submitted to the WorkerPool
7. The WorkerPool checks queue availability (backpressure)
8. A worker executes `docker run --rm -i` with security constraints
9. Protobuf data is passed to the container's stdin, the result is read from stdout
10. On transient errors, a retry is performed (up to `max_retries` attempts)
11. The result is returned to the client via gRPC, metrics and audit are recorded

## Design Patterns

| Pattern | Usage |
|---------|-------|
| **Decorator** | Tracing: `TracingCore`, `TracingRegistry`, `TracingPlugin` wrap interfaces, adding spans |
| **Worker Pool** | Limiting Docker container concurrency with a queue and backpressure |
| **Adapter** | Adapters for metrics, audit, registry — isolate infrastructure from business logic |
| **Adapter** | Adapters for metrics, audit, registry — isolate infrastructure from business logic. `featureGateAdapter` bridges `license.FeatureGate` to `core.FeatureGate` to avoid circular dependencies |
| **Middleware Chain** | gRPC interceptor chain for cross-cutting concerns |
| **Feature Gate** | `FeatureGate` controls feature availability based on the current license tier |
Loading
Loading