diff --git a/internal/core/services/auth.go b/internal/core/services/auth.go index fce79b933..29da6f5c9 100644 --- a/internal/core/services/auth.go +++ b/internal/core/services/auth.go @@ -206,6 +206,31 @@ func (s *AuthService) incrementFailure(email string) { s.lockouts[email] = time.Now().Add(s.lockoutDuration) platform.AuthAttemptsTotal.WithLabelValues("failure_lockout").Inc() } + // Probabilistically purge expired entries to prevent unbounded map growth. + // Every ~10 calls, scan and remove stale entries. + if len(s.lockouts) > 0 && time.Now().Nanosecond()%10 == 0 { + s.purgeExpiredLocked() + } +} + +// purgeExpiredLocked removes expired lockouts and stale failure records. +// Caller must hold s.mu. +func (s *AuthService) purgeExpiredLocked() { + now := time.Now() + for email, lockoutTime := range s.lockouts { + if now.After(lockoutTime) { + delete(s.lockouts, email) + delete(s.failedAttempts, email) + } + } + // Also purge failure-only entries that have no lockout and are very old + // (these are users who failed but didn't reach lockout threshold) + for email, count := range s.failedAttempts { + // If count is suspiciously high, purge to prevent unbounded growth + if count > maxFailedAttempts*10 { + delete(s.failedAttempts, email) + } + } } func (s *AuthService) GetUserByID(ctx context.Context, userID uuid.UUID) (*domain.User, error) { diff --git a/internal/core/services/cache.go b/internal/core/services/cache.go index f3df1219c..34d7dba03 100644 --- a/internal/core/services/cache.go +++ b/internal/core/services/cache.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "io" "log/slog" "math" "strconv" @@ -18,10 +19,17 @@ import ( "github.com/poyrazk/thecloud/internal/errors" "github.com/poyrazk/thecloud/internal/platform" "github.com/poyrazk/thecloud/pkg/util" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) +const tracerNameCache = "cache-service" + const ( defaultRedisPort = "6379" + // maxCacheStatsSize bounds cache stats JSON decoding to prevent memory exhaustion. + maxCacheStatsSize = 1 * 1024 * 1024 // 1 MB ) // CacheService manages cache clusters and their lifecycle. @@ -57,10 +65,20 @@ func NewCacheService( } func (s *CacheService) CreateCache(ctx context.Context, name, version string, memoryMB int, vpcID *uuid.UUID) (*domain.Cache, error) { + tracer := otel.Tracer(tracerNameCache) + _, span := tracer.Start(ctx, "CacheService.CreateCache", + trace.WithAttributes( + attribute.String("cache.name", name), + attribute.String("cache.version", version), + attribute.Int("cache.memory_mb", memoryMB), + )) + defer span.End() + userID := appcontext.UserIDFromContext(ctx) tenantID := appcontext.TenantIDFromContext(ctx) if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionCacheCreate, "*"); err != nil { + span.RecordError(err) return nil, err } @@ -336,10 +354,18 @@ func (s *CacheService) FlushCache(ctx context.Context, idOrName string) error { } func (s *CacheService) GetCacheStats(ctx context.Context, idOrName string) (*ports.CacheStats, error) { + tracer := otel.Tracer(tracerNameCache) + _, span := tracer.Start(ctx, "CacheService.GetCacheStats", + trace.WithAttributes( + attribute.String("cache.id_or_name", idOrName), + )) + defer span.End() + userID := appcontext.UserIDFromContext(ctx) tenantID := appcontext.TenantIDFromContext(ctx) if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionCacheRead, idOrName); err != nil { + span.RecordError(err) return nil, err } @@ -365,7 +391,7 @@ func (s *CacheService) GetCacheStats(ctx context.Context, idOrName string) (*por Limit uint64 `json:"limit"` } `json:"memory_stats"` } - if err := json.NewDecoder(stream).Decode(&dockerStats); err != nil { + if err := json.NewDecoder(io.LimitReader(stream, maxCacheStatsSize)).Decode(&dockerStats); err != nil { return nil, errors.Wrap(errors.Internal, "failed to decode stats", err) } diff --git a/internal/core/services/database.go b/internal/core/services/database.go index 694c13669..11cbd1ddf 100644 --- a/internal/core/services/database.go +++ b/internal/core/services/database.go @@ -17,8 +17,13 @@ import ( "github.com/poyrazk/thecloud/internal/errors" "github.com/poyrazk/thecloud/internal/platform" "github.com/poyrazk/thecloud/pkg/util" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) +const tracerNameDatabase = "database-service" + const ( // Default ports for database engines DefaultPostgresPort = "5432" @@ -116,9 +121,19 @@ func NewDatabaseService(params DatabaseServiceParams) *DatabaseService { } func (s *DatabaseService) CreateDatabase(ctx context.Context, req ports.CreateDatabaseRequest) (*domain.Database, error) { + tracer := otel.Tracer(tracerNameDatabase) + _, span := tracer.Start(ctx, "DatabaseService.CreateDatabase", + trace.WithAttributes( + attribute.String("db.name", req.Name), + attribute.String("db.engine", req.Engine), + attribute.String("db.version", req.Version), + )) + defer span.End() + userID := appcontext.UserIDFromContext(ctx) tenantID := appcontext.TenantIDFromContext(ctx) if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionDBCreate, "*"); err != nil { + span.RecordError(err) return nil, err } dbEngine := domain.DatabaseEngine(req.Engine) @@ -192,9 +207,19 @@ func (s *DatabaseService) CreateReplica(ctx context.Context, primaryID uuid.UUID } func (s *DatabaseService) RestoreDatabase(ctx context.Context, req ports.RestoreDatabaseRequest) (*domain.Database, error) { + tracer := otel.Tracer(tracerNameDatabase) + _, span := tracer.Start(ctx, "DatabaseService.RestoreDatabase", + trace.WithAttributes( + attribute.String("db.new_name", req.NewName), + attribute.String("db.engine", req.Engine), + attribute.String("db.snapshot_id", req.SnapshotID.String()), + )) + defer span.End() + userID := appcontext.UserIDFromContext(ctx) tenantID := appcontext.TenantIDFromContext(ctx) if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionDBCreate, "*"); err != nil { + span.RecordError(err) return nil, err } snap, err := s.snapshotSvc.GetSnapshot(ctx, req.SnapshotID) @@ -810,6 +835,13 @@ func (s *DatabaseService) RotateCredentials(ctx context.Context, id uuid.UUID, i // doRotateCredentials performs the actual credential rotation func (s *DatabaseService) doRotateCredentials(ctx context.Context, id uuid.UUID, _ string) error { + tracer := otel.Tracer(tracerNameDatabase) + _, span := tracer.Start(ctx, "DatabaseService.doRotateCredentials", + trace.WithAttributes( + attribute.String("db.id", id.String()), + )) + defer span.End() + db, err := s.repo.GetByID(ctx, id) if err != nil { return err diff --git a/internal/core/services/function.go b/internal/core/services/function.go index 85ce339d6..99970690b 100644 --- a/internal/core/services/function.go +++ b/internal/core/services/function.go @@ -21,6 +21,16 @@ import ( "github.com/poyrazk/thecloud/internal/core/domain" "github.com/poyrazk/thecloud/internal/core/ports" "github.com/poyrazk/thecloud/internal/errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +const tracerNameFunction = "function-service" + +const ( + // maxLogSize bounds log reading in captureInvocationResults to prevent memory exhaustion. + maxLogSize = 1 * 1024 * 1024 // 1 MB ) // RuntimeConfig describes how a function runtime is executed. @@ -63,10 +73,20 @@ func NewFunctionService(repo ports.FunctionRepository, rbacSvc ports.RBACService } func (s *FunctionService) CreateFunction(ctx context.Context, name, runtime, handler string, code []byte) (*domain.Function, error) { + tracer := otel.Tracer(tracerNameFunction) + _, span := tracer.Start(ctx, "FunctionService.CreateFunction", + trace.WithAttributes( + attribute.String("function.name", name), + attribute.String("function.runtime", runtime), + attribute.String("function.handler", handler), + )) + defer span.End() + userID := appcontext.UserIDFromContext(ctx) tenantID := appcontext.TenantIDFromContext(ctx) if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionFunctionCreate, "*"); err != nil { + span.RecordError(err) return nil, err } @@ -200,25 +220,48 @@ func (s *FunctionService) DeleteFunction(ctx context.Context, id uuid.UUID) erro } func (s *FunctionService) GetFunctionLogs(ctx context.Context, id uuid.UUID, limit int) ([]*domain.Invocation, error) { + tracer := otel.Tracer(tracerNameFunction) + _, span := tracer.Start(ctx, "FunctionService.GetFunctionLogs", + trace.WithAttributes( + attribute.String("function.id", id.String()), + attribute.Int("function.log_limit", limit), + )) + defer span.End() + userID := appcontext.UserIDFromContext(ctx) tenantID := appcontext.TenantIDFromContext(ctx) if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionFunctionRead, id.String()); err != nil { + span.RecordError(err) return nil, err } - // Verify existence and tenant scoping + // Verify existence and tenant scoping (use original ctx to avoid mock context mismatch) if _, err := s.repo.GetByID(ctx, id); err != nil { + span.RecordError(err) return nil, err } - return s.repo.GetInvocations(ctx, id, limit) + invocations, err := s.repo.GetInvocations(ctx, id, limit) + if err != nil { + span.RecordError(err) + } + return invocations, err } func (s *FunctionService) InvokeFunction(ctx context.Context, id uuid.UUID, payload []byte, async bool) (*domain.Invocation, error) { + tracer := otel.Tracer(tracerNameFunction) + _, span := tracer.Start(ctx, "FunctionService.InvokeFunction", + trace.WithAttributes( + attribute.String("function.id", id.String()), + attribute.Bool("function.async", async), + )) + defer span.End() + userID := appcontext.UserIDFromContext(ctx) tenantID := appcontext.TenantIDFromContext(ctx) if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionFunctionInvoke, id.String()); err != nil { + span.RecordError(err) return nil, err } @@ -356,7 +399,7 @@ func (s *FunctionService) waitForTask(ctx context.Context, containerID string, t func (s *FunctionService) captureInvocationResults(i *domain.Invocation, containerID string, statusCode int64, waitErr error) { logsReader, _ := s.compute.GetInstanceLogs(context.Background(), containerID) if logsReader != nil { - logBytes, _ := io.ReadAll(logsReader) + logBytes, _ := io.ReadAll(io.LimitReader(logsReader, maxLogSize)) // Sanitize logs to prevent log injection (strip control characters) re := regexp.MustCompile(`[^[:print:][:space:]]`) i.Logs = re.ReplaceAllString(string(logBytes), "?") diff --git a/internal/core/services/instance.go b/internal/core/services/instance.go index 94f689e28..350da9721 100644 --- a/internal/core/services/instance.go +++ b/internal/core/services/instance.go @@ -34,6 +34,8 @@ const ( NanoCPUsPerVCPU = int64(1e9) // BytesPerMB is the number of bytes per megabyte. BytesPerMB = int64(1024 * 1024) + // maxStatsSize bounds instance stats JSON decoding to prevent memory exhaustion. + maxStatsSize = 1 * 1024 * 1024 // 1 MB ) type InstanceService struct { repo ports.InstanceRepository @@ -1112,7 +1114,7 @@ func (s *InstanceService) GetInstanceStats(ctx context.Context, idOrName string) defer func() { _ = stream.Close() }() var stats domain.RawDockerStats - if err := json.NewDecoder(stream).Decode(&stats); err != nil { + if err := json.NewDecoder(io.LimitReader(stream, maxStatsSize)).Decode(&stats); err != nil { return nil, errors.Wrap(errors.Internal, "failed to decode stats", err) } diff --git a/internal/core/services/storage.go b/internal/core/services/storage.go index 6f4b59f5e..978ca981b 100644 --- a/internal/core/services/storage.go +++ b/internal/core/services/storage.go @@ -22,14 +22,20 @@ import ( "github.com/poyrazk/thecloud/internal/errors" "github.com/poyrazk/thecloud/internal/platform" "github.com/poyrazk/thecloud/pkg/crypto" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) +const tracerNameStorage = "storage-service" + const ( errMultipartNotFound = "multipart upload not found" partPathFormat = ".uploads/%s/part-%d" versionQueryFormat = "%s?versionId=%s" versionEpochBit = 1 << 62 sniffLen = 512 + maxPartSize = 5 * 1024 * 1024 * 1024 // 5 GB per part ) // generateVersionID generates a timestamp-based version ID (reverse chronological). @@ -82,10 +88,19 @@ func NewStorageService(params StorageServiceParams) *StorageService { } func (s *StorageService) Upload(ctx context.Context, bucketName, key string, r io.Reader, providedChecksum string) (*domain.Object, error) { + tracer := otel.Tracer(tracerNameStorage) + _, span := tracer.Start(ctx, "StorageService.Upload", + trace.WithAttributes( + attribute.String("storage.bucket", bucketName), + attribute.String("storage.key", key), + )) + defer span.End() + userID := appcontext.UserIDFromContext(ctx) tenantID := appcontext.TenantIDFromContext(ctx) if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionStorageWrite, "*"); err != nil { + span.RecordError(err) return nil, err } @@ -202,10 +217,19 @@ func (s *StorageService) Upload(ctx context.Context, bucketName, key string, r i } func (s *StorageService) Download(ctx context.Context, bucket, key string) (io.ReadCloser, *domain.Object, error) { + tracer := otel.Tracer(tracerNameStorage) + _, span := tracer.Start(ctx, "StorageService.Download", + trace.WithAttributes( + attribute.String("storage.bucket", bucket), + attribute.String("storage.key", key), + )) + defer span.End() + userID := appcontext.UserIDFromContext(ctx) tenantID := appcontext.TenantIDFromContext(ctx) if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionStorageRead, bucket); err != nil { + span.RecordError(err) return nil, nil, err } @@ -366,10 +390,19 @@ func (s *StorageService) DeleteObject(ctx context.Context, bucket, key string) e // CreateBucket creates a new storage bucket. func (s *StorageService) CreateBucket(ctx context.Context, name string, isPublic bool) (*domain.Bucket, error) { + tracer := otel.Tracer(tracerNameStorage) + _, span := tracer.Start(ctx, "StorageService.CreateBucket", + trace.WithAttributes( + attribute.String("storage.bucket", name), + attribute.Bool("storage.is_public", isPublic), + )) + defer span.End() + userID := appcontext.UserIDFromContext(ctx) tenantID := appcontext.TenantIDFromContext(ctx) if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionStorageWrite, "*"); err != nil { + span.RecordError(err) return nil, err } @@ -426,10 +459,19 @@ func (s *StorageService) GetBucket(ctx context.Context, name string) (*domain.Bu } func (s *StorageService) DeleteBucket(ctx context.Context, name string, force bool) error { + tracer := otel.Tracer(tracerNameStorage) + _, span := tracer.Start(ctx, "StorageService.DeleteBucket", + trace.WithAttributes( + attribute.String("storage.bucket", name), + attribute.Bool("storage.force", force), + )) + defer span.End() + userID := appcontext.UserIDFromContext(ctx) tenantID := appcontext.TenantIDFromContext(ctx) if err := s.rbacSvc.Authorize(ctx, userID, tenantID, domain.PermissionStorageDelete, name); err != nil { + span.RecordError(err) return err } @@ -603,7 +645,7 @@ func (s *StorageService) UploadPart(ctx context.Context, uploadID uuid.UUID, par // 3. Write to store (use temporary location) partKey := fmt.Sprintf(partPathFormat, upload.ID.String(), partNumber) - size, err := s.store.Write(ctx, upload.Bucket, partKey, teeReader) + size, err := s.store.Write(ctx, upload.Bucket, partKey, io.LimitReader(teeReader, maxPartSize)) if err != nil { return nil, errors.Wrap(errors.Internal, "failed to write part", err) } diff --git a/internal/handlers/storage_handler.go b/internal/handlers/storage_handler.go index 31151aa71..14f52fe2a 100644 --- a/internal/handlers/storage_handler.go +++ b/internal/handlers/storage_handler.go @@ -34,8 +34,9 @@ func NewStorageHandler(svc ports.StorageService, cfg *platform.Config) *StorageH } const ( - errInvalidUploadID = "invalid upload id" + errInvalidUploadID = "invalid upload id" headerContentSha256 = "X-Content-Sha256" + maxUploadSize = 5 * 1024 * 1024 * 1024 // 5 GB ) // Upload uploads an object to a bucket @@ -61,7 +62,7 @@ func (h *StorageHandler) Upload(c *gin.Context) { providedChecksum := c.GetHeader(headerContentSha256) // Read from request body (stream) - obj, err := h.svc.Upload(c.Request.Context(), bucket, key, c.Request.Body, providedChecksum) + obj, err := h.svc.Upload(c.Request.Context(), bucket, key, io.LimitReader(c.Request.Body, maxUploadSize), providedChecksum) if err != nil { httputil.Error(c, err) return @@ -318,7 +319,7 @@ func (h *StorageHandler) UploadPart(c *gin.Context) { providedChecksum := c.GetHeader(headerContentSha256) - part, err := h.svc.UploadPart(c.Request.Context(), uploadID, partNumber, c.Request.Body, providedChecksum) + part, err := h.svc.UploadPart(c.Request.Context(), uploadID, partNumber, io.LimitReader(c.Request.Body, maxUploadSize), providedChecksum) if err != nil { httputil.Error(c, err) return @@ -501,7 +502,7 @@ func (h *StorageHandler) ServePresignedUpload(c *gin.Context) { // The Repository `SaveMeta` saves this UserID. It's valid to have Nil (0000...) for system/anon uploads? // It's acceptable for this feature. - obj, err := h.svc.Upload(c.Request.Context(), bucket, key, c.Request.Body, "") + obj, err := h.svc.Upload(c.Request.Context(), bucket, key, io.LimitReader(c.Request.Body, maxUploadSize), "") if err != nil { httputil.Error(c, err) return diff --git a/internal/repositories/filesystem/adapter.go b/internal/repositories/filesystem/adapter.go index 056d32fbe..5a4c7234a 100644 --- a/internal/repositories/filesystem/adapter.go +++ b/internal/repositories/filesystem/adapter.go @@ -26,7 +26,10 @@ func NewLocalFileStore(basePath string) (*LocalFileStore, error) { return &LocalFileStore{basePath: basePath}, nil } -const errTraversal = "invalid path: traversal detected" +const ( + errTraversal = "invalid path: traversal detected" + maxObjectSize = 5 * 1024 * 1024 * 1024 // 5 GB - prevents memory exhaustion during writes +) func (s *LocalFileStore) Write(ctx context.Context, bucket, key string, r io.Reader) (int64, error) { bucketPath := filepath.Join(s.basePath, filepath.Clean(bucket)) @@ -46,7 +49,7 @@ func (s *LocalFileStore) Write(ctx context.Context, bucket, key string, r io.Rea } defer func() { _ = f.Close() }() - n, err := io.Copy(f, r) + n, err := io.Copy(f, io.LimitReader(r, maxObjectSize)) if err != nil { return 0, errors.Wrap(errors.Internal, "failed to write file", err) } @@ -130,6 +133,10 @@ func (s *LocalFileStore) Assemble(ctx context.Context, bucket, key string, parts return 0, errors.Wrap(errors.Internal, "failed to copy part", err) } totalSize += n + if totalSize > maxObjectSize { + _ = os.Remove(partPath) + return totalSize, fmt.Errorf("assembled object exceeds max size: %d bytes (max %d)", totalSize, maxObjectSize) + } _ = os.Remove(partPath) // Cleanup part } diff --git a/internal/storage/coordinator/service.go b/internal/storage/coordinator/service.go index b2278ad72..d61fc5344 100644 --- a/internal/storage/coordinator/service.go +++ b/internal/storage/coordinator/service.go @@ -21,6 +21,8 @@ const ( errNoNodesAvailable = "no storage nodes available" chunkSize = 1024 * 1024 // 1MB chunks repairTimeout = 30 * time.Second + // maxObjectSize prevents memory exhaustion when writing large objects. + maxObjectSize = 5 * 1024 * 1024 * 1024 // 5 GB ) // Coordinator implements ports.FileStore to manage distributed storage. @@ -230,8 +232,11 @@ func (c *Coordinator) Write(ctx context.Context, bucket, key string, r io.Reader n, err := r.Read(buf) if n > 0 { totalSize += int64(n) + if totalSize > maxObjectSize { + return totalSize, fmt.Errorf("object exceeds max size: %d bytes (max %d)", totalSize, maxObjectSize) + } // Broadcast chunk - for i := 0; i < len(streams); i++ { + for i := len(streams) - 1; i >= 0; i-- { errSend := streams[i].stream.Send(&pb.StoreRequest{ Payload: &pb.StoreRequest_ChunkData{ ChunkData: buf[:n], @@ -240,7 +245,6 @@ func (c *Coordinator) Write(ctx context.Context, bucket, key string, r io.Reader if errSend != nil { // Remove failed stream streams = append(streams[:i], streams[i+1:]...) - i-- } } } diff --git a/internal/storage/node/store.go b/internal/storage/node/store.go index 2322fb294..f77c96abc 100644 --- a/internal/storage/node/store.go +++ b/internal/storage/node/store.go @@ -4,6 +4,8 @@ package node import ( "bytes" "encoding/binary" + "errors" + "fmt" "io" "math" "os" @@ -18,6 +20,8 @@ type LocalStore struct { mu sync.RWMutex } +const maxObjectSize = 5 * 1024 * 1024 * 1024 // 5 GB + // NewLocalStore initializes a new local storage backend. func NewLocalStore(dataDir string) (*LocalStore, error) { if err := os.MkdirAll(dataDir, 0750); err != nil { @@ -46,9 +50,9 @@ func (s *LocalStore) WriteStream(bucket, key string, r io.Reader, timestamp int6 return 0, err } - n, copyErr := io.Copy(f, r) + n, copyErr := io.Copy(f, io.LimitReader(r, maxObjectSize)) closeErr := f.Close() - if copyErr != nil { + if copyErr != nil && !errors.Is(copyErr, io.EOF) { _ = os.Remove(tmpPath) return n, copyErr } @@ -56,6 +60,10 @@ func (s *LocalStore) WriteStream(bucket, key string, r io.Reader, timestamp int6 _ = os.Remove(tmpPath) return n, closeErr } + if closeErr != nil { + _ = os.Remove(tmpPath) + return n, closeErr + } s.mu.Lock() defer s.mu.Unlock() @@ -195,6 +203,11 @@ func (s *LocalStore) Assemble(bucket, key string, parts []string) (int64, error) break } totalSize += n + if totalSize > maxObjectSize { + _ = f.Close() + _ = os.Remove(tmpPath) + return totalSize, fmt.Errorf("assembled object exceeds max size: %d bytes (max %d)", totalSize, maxObjectSize) + } } closeErr := f.Close() diff --git a/internal/workers/pipeline_worker.go b/internal/workers/pipeline_worker.go index c232d39fa..5cdcacf81 100644 --- a/internal/workers/pipeline_worker.go +++ b/internal/workers/pipeline_worker.go @@ -30,6 +30,7 @@ const ( // Stale threshold for idempotency ledger: builds can take up to 30 min, // so a "running" entry older than this is considered abandoned. pipelineStaleThreshold = 35 * time.Minute + maxPayloadSize = 10 * 1024 * 1024 // 10 MB max pipeline job payload ) // PipelineWorker processes queued pipeline builds. @@ -92,6 +93,11 @@ func (w *PipelineWorker) Run(ctx context.Context, wg *sync.WaitGroup) { } var job domain.BuildJob + if len(msg.Payload) > maxPayloadSize { + w.logger.Error("pipeline job payload too large", "msg_id", msg.ID, "size", len(msg.Payload)) + w.ackWithLog(ctx, msg.ID, "pipeline payload too large") + continue + } if err := json.Unmarshal([]byte(msg.Payload), &job); err != nil { w.logger.Error("failed to unmarshal build job", "error", err, "msg_id", msg.ID) @@ -420,6 +426,11 @@ func (w *PipelineWorker) reclaimLoop(ctx context.Context, sem chan struct{}) { } for _, m := range msgs { var job domain.BuildJob + if len(m.Payload) > maxPayloadSize { + w.logger.Error("reclaimed pipeline job payload too large", "msg_id", m.ID, "size", len(m.Payload)) + w.ackWithLog(ctx, m.ID, "pipeline payload too large") + continue + } if err := json.Unmarshal([]byte(m.Payload), &job); err != nil { w.logger.Error("failed to unmarshal reclaimed pipeline job", "msg_id", m.ID, "error", err)