Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions internal/core/services/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,31 @@ func (s *AuthService) incrementFailure(email string) {
s.lockouts[email] = time.Now().Add(s.lockoutDuration)
platform.AuthAttemptsTotal.WithLabelValues("failure_lockout").Inc()
}
// Probabilistically purge expired entries to prevent unbounded map growth.
// Every ~10 calls, scan and remove stale entries.
if len(s.lockouts) > 0 && time.Now().Nanosecond()%10 == 0 {
s.purgeExpiredLocked()
}
}

// purgeExpiredLocked removes expired lockouts and stale failure records.
// Caller must hold s.mu.
func (s *AuthService) purgeExpiredLocked() {
now := time.Now()
for email, lockoutTime := range s.lockouts {
if now.After(lockoutTime) {
delete(s.lockouts, email)
delete(s.failedAttempts, email)
}
}
// Also purge failure-only entries that have no lockout and are very old
// (these are users who failed but didn't reach lockout threshold)
for email, count := range s.failedAttempts {
// If count is suspiciously high, purge to prevent unbounded growth
if count > maxFailedAttempts*10 {
delete(s.failedAttempts, email)
}
}
}

func (s *AuthService) GetUserByID(ctx context.Context, userID uuid.UUID) (*domain.User, error) {
Expand Down
28 changes: 27 additions & 1 deletion internal/core/services/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"math"
"strconv"
Expand All @@ -18,10 +19,17 @@ import (
"github.com/poyrazk/thecloud/internal/errors"
"github.com/poyrazk/thecloud/internal/platform"
"github.com/poyrazk/thecloud/pkg/util"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const tracerNameCache = "cache-service"

const (
defaultRedisPort = "6379"
// maxCacheStatsSize bounds cache stats JSON decoding to prevent memory exhaustion.
maxCacheStatsSize = 1 * 1024 * 1024 // 1 MB
)

// CacheService manages cache clusters and their lifecycle.
Expand Down Expand Up @@ -57,10 +65,20 @@ func NewCacheService(
}

func (s *CacheService) CreateCache(ctx context.Context, name, version string, memoryMB int, vpcID *uuid.UUID) (*domain.Cache, error) {
tracer := otel.Tracer(tracerNameCache)
_, span := tracer.Start(ctx, "CacheService.CreateCache",
trace.WithAttributes(
attribute.String("cache.name", name),
attribute.String("cache.version", version),
attribute.Int("cache.memory_mb", memoryMB),
))
defer span.End()

userID := appcontext.UserIDFromContext(ctx)
tenantID := appcontext.TenantIDFromContext(ctx)

if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionCacheCreate, "*"); err != nil {
span.RecordError(err)
return nil, err
}

Expand Down Expand Up @@ -336,10 +354,18 @@ func (s *CacheService) FlushCache(ctx context.Context, idOrName string) error {
}

func (s *CacheService) GetCacheStats(ctx context.Context, idOrName string) (*ports.CacheStats, error) {
tracer := otel.Tracer(tracerNameCache)
_, span := tracer.Start(ctx, "CacheService.GetCacheStats",
trace.WithAttributes(
attribute.String("cache.id_or_name", idOrName),
))
defer span.End()

userID := appcontext.UserIDFromContext(ctx)
tenantID := appcontext.TenantIDFromContext(ctx)

if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionCacheRead, idOrName); err != nil {
span.RecordError(err)
return nil, err
}

Expand All @@ -365,7 +391,7 @@ func (s *CacheService) GetCacheStats(ctx context.Context, idOrName string) (*por
Limit uint64 `json:"limit"`
} `json:"memory_stats"`
}
if err := json.NewDecoder(stream).Decode(&dockerStats); err != nil {
if err := json.NewDecoder(io.LimitReader(stream, maxCacheStatsSize)).Decode(&dockerStats); err != nil {
return nil, errors.Wrap(errors.Internal, "failed to decode stats", err)
}

Expand Down
32 changes: 32 additions & 0 deletions internal/core/services/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ import (
"github.com/poyrazk/thecloud/internal/errors"
"github.com/poyrazk/thecloud/internal/platform"
"github.com/poyrazk/thecloud/pkg/util"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const tracerNameDatabase = "database-service"

const (
// Default ports for database engines
DefaultPostgresPort = "5432"
Expand Down Expand Up @@ -116,9 +121,19 @@ func NewDatabaseService(params DatabaseServiceParams) *DatabaseService {
}

func (s *DatabaseService) CreateDatabase(ctx context.Context, req ports.CreateDatabaseRequest) (*domain.Database, error) {
tracer := otel.Tracer(tracerNameDatabase)
_, span := tracer.Start(ctx, "DatabaseService.CreateDatabase",
trace.WithAttributes(
attribute.String("db.name", req.Name),
attribute.String("db.engine", req.Engine),
attribute.String("db.version", req.Version),
))
defer span.End()

userID := appcontext.UserIDFromContext(ctx)
tenantID := appcontext.TenantIDFromContext(ctx)
if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionDBCreate, "*"); err != nil {
span.RecordError(err)
return nil, err
}
dbEngine := domain.DatabaseEngine(req.Engine)
Expand Down Expand Up @@ -192,9 +207,19 @@ func (s *DatabaseService) CreateReplica(ctx context.Context, primaryID uuid.UUID
}

func (s *DatabaseService) RestoreDatabase(ctx context.Context, req ports.RestoreDatabaseRequest) (*domain.Database, error) {
tracer := otel.Tracer(tracerNameDatabase)
_, span := tracer.Start(ctx, "DatabaseService.RestoreDatabase",
trace.WithAttributes(
attribute.String("db.new_name", req.NewName),
attribute.String("db.engine", req.Engine),
attribute.String("db.snapshot_id", req.SnapshotID.String()),
))
defer span.End()

userID := appcontext.UserIDFromContext(ctx)
tenantID := appcontext.TenantIDFromContext(ctx)
if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionDBCreate, "*"); err != nil {
span.RecordError(err)
return nil, err
}
snap, err := s.snapshotSvc.GetSnapshot(ctx, req.SnapshotID)
Expand Down Expand Up @@ -810,6 +835,13 @@ func (s *DatabaseService) RotateCredentials(ctx context.Context, id uuid.UUID, i

// doRotateCredentials performs the actual credential rotation
func (s *DatabaseService) doRotateCredentials(ctx context.Context, id uuid.UUID, _ string) error {
tracer := otel.Tracer(tracerNameDatabase)
_, span := tracer.Start(ctx, "DatabaseService.doRotateCredentials",
trace.WithAttributes(
attribute.String("db.id", id.String()),
))
defer span.End()

db, err := s.repo.GetByID(ctx, id)
if err != nil {
return err
Expand Down
49 changes: 46 additions & 3 deletions internal/core/services/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ import (
"github.com/poyrazk/thecloud/internal/core/domain"
"github.com/poyrazk/thecloud/internal/core/ports"
"github.com/poyrazk/thecloud/internal/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const tracerNameFunction = "function-service"

const (
// maxLogSize bounds log reading in captureInvocationResults to prevent memory exhaustion.
maxLogSize = 1 * 1024 * 1024 // 1 MB
)

// RuntimeConfig describes how a function runtime is executed.
Expand Down Expand Up @@ -63,10 +73,20 @@ func NewFunctionService(repo ports.FunctionRepository, rbacSvc ports.RBACService
}

func (s *FunctionService) CreateFunction(ctx context.Context, name, runtime, handler string, code []byte) (*domain.Function, error) {
tracer := otel.Tracer(tracerNameFunction)
_, span := tracer.Start(ctx, "FunctionService.CreateFunction",
trace.WithAttributes(
attribute.String("function.name", name),
attribute.String("function.runtime", runtime),
attribute.String("function.handler", handler),
))
defer span.End()

userID := appcontext.UserIDFromContext(ctx)
tenantID := appcontext.TenantIDFromContext(ctx)

if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionFunctionCreate, "*"); err != nil {
span.RecordError(err)
return nil, err
}

Expand Down Expand Up @@ -200,25 +220,48 @@ func (s *FunctionService) DeleteFunction(ctx context.Context, id uuid.UUID) erro
}

func (s *FunctionService) GetFunctionLogs(ctx context.Context, id uuid.UUID, limit int) ([]*domain.Invocation, error) {
tracer := otel.Tracer(tracerNameFunction)
_, span := tracer.Start(ctx, "FunctionService.GetFunctionLogs",
trace.WithAttributes(
attribute.String("function.id", id.String()),
attribute.Int("function.log_limit", limit),
))
defer span.End()

userID := appcontext.UserIDFromContext(ctx)
tenantID := appcontext.TenantIDFromContext(ctx)

if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionFunctionRead, id.String()); err != nil {
span.RecordError(err)
return nil, err
}

// Verify existence and tenant scoping
// Verify existence and tenant scoping (use original ctx to avoid mock context mismatch)
if _, err := s.repo.GetByID(ctx, id); err != nil {
span.RecordError(err)
return nil, err
}

return s.repo.GetInvocations(ctx, id, limit)
invocations, err := s.repo.GetInvocations(ctx, id, limit)
if err != nil {
span.RecordError(err)
}
return invocations, err
}
func (s *FunctionService) InvokeFunction(ctx context.Context, id uuid.UUID, payload []byte, async bool) (*domain.Invocation, error) {
tracer := otel.Tracer(tracerNameFunction)
_, span := tracer.Start(ctx, "FunctionService.InvokeFunction",
trace.WithAttributes(
attribute.String("function.id", id.String()),
attribute.Bool("function.async", async),
))
defer span.End()

userID := appcontext.UserIDFromContext(ctx)
tenantID := appcontext.TenantIDFromContext(ctx)

if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionFunctionInvoke, id.String()); err != nil {
span.RecordError(err)
return nil, err
}

Expand Down Expand Up @@ -356,7 +399,7 @@ func (s *FunctionService) waitForTask(ctx context.Context, containerID string, t
func (s *FunctionService) captureInvocationResults(i *domain.Invocation, containerID string, statusCode int64, waitErr error) {
logsReader, _ := s.compute.GetInstanceLogs(context.Background(), containerID)
if logsReader != nil {
logBytes, _ := io.ReadAll(logsReader)
logBytes, _ := io.ReadAll(io.LimitReader(logsReader, maxLogSize))
// Sanitize logs to prevent log injection (strip control characters)
re := regexp.MustCompile(`[^[:print:][:space:]]`)
i.Logs = re.ReplaceAllString(string(logBytes), "?")
Expand Down
4 changes: 3 additions & 1 deletion internal/core/services/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
NanoCPUsPerVCPU = int64(1e9)
// BytesPerMB is the number of bytes per megabyte.
BytesPerMB = int64(1024 * 1024)
// maxStatsSize bounds instance stats JSON decoding to prevent memory exhaustion.
maxStatsSize = 1 * 1024 * 1024 // 1 MB
)
type InstanceService struct {
repo ports.InstanceRepository
Expand Down Expand Up @@ -1112,7 +1114,7 @@ func (s *InstanceService) GetInstanceStats(ctx context.Context, idOrName string)
defer func() { _ = stream.Close() }()

var stats domain.RawDockerStats
if err := json.NewDecoder(stream).Decode(&stats); err != nil {
if err := json.NewDecoder(io.LimitReader(stream, maxStatsSize)).Decode(&stats); err != nil {
return nil, errors.Wrap(errors.Internal, "failed to decode stats", err)
}

Expand Down
Loading
Loading