Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions docs/services/cloud-images.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,28 @@ The two-step flow allows large files to be uploaded without blocking the API ser
POST /images/import
→ Create image record (status=PENDING)
→ HTTP GET remote URL (30-minute timeout)
→ Stream response body directly to FileStore
→ Validate Content-Length (max 10 GB)
→ Validate Content-Type header against allowlist
→ Read first 512 bytes for magic byte validation (qcow2, iso formats)
→ Stream response body to FileStore with size limit
→ On success: status=ACTIVE
→ On failure: status=ERROR
```

Import is synchronous and returns `202 Accepted` immediately after the image metadata is created. The download and storage happens in the foreground; for very large images (>1GB) this may take several minutes.

### Import Security Validations

URL imports are protected against SSRF and content-spoofing attacks:

| Validation | Detail |
|------------|--------|
| **Scheme restriction** | Only `http://` and `https://` allowed — no `file://`, `gopher://`, etc. |
| **Size limit** | `Content-Length` header must be ≤ 10 GB. Streaming is capped at the same limit. |
| **Content-Type allowlist** | Must be one of: `image/jpeg`, `image/png`, `image/gif`, `application/x-iso9660-image`, `application/octet-stream` |
| **Magic byte validation** | First 512 bytes are verified against expected format signature (QCOW2: `QFD¿`, ISO: `CD001`) |
| **Format-to-content consistency** | The declared URL extension (`.qcow2`, `.iso`, etc.) must match magic bytes — a `.qcow2` URL returning JPEG data is rejected |

---

## Image Model
Expand Down Expand Up @@ -109,8 +124,11 @@ cloud image delete <image-id>
|----------|--------|
| Remote URL returns non-200 | `ERROR` status; error message includes HTTP status code |
| Remote URL unreachable / timeout | `ERROR` status; 30-minute timeout per request |
| File store write fails | `ERROR` status; partial file may remain |
| Content-Length exceeds 10 GB | `ERROR` status; `"image exceeds max size"` |
| Content-Type not in allowlist | `ERROR` status; `"invalid content-type"` |
| Magic bytes don't match declared format | `ERROR` status; `"invalid magic bytes for format"` |
| Invalid URL format | Returns `400 Bad Request` before image creation |
| File store write fails | `ERROR` status; partial file may remain |

---

Expand Down
6 changes: 4 additions & 2 deletions internal/core/services/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ func (s *ClusterService) RepairCluster(ctx context.Context, id uuid.UUID) error
}

go func() {
bgCtx := context.Background()
bgCtx, cancel := context.WithCancel(context.Background())
defer cancel()
bgCtx = appcontext.WithUserID(bgCtx, cluster.UserID)
bgCtx = appcontext.WithTenantID(bgCtx, cluster.TenantID)
s.logger.Info("starting cluster repair", "cluster_id", cluster.ID)
Expand Down Expand Up @@ -331,7 +332,8 @@ func (s *ClusterService) ScaleCluster(ctx context.Context, id uuid.UUID, workers
}

go func() {
bgCtx := context.Background()
bgCtx, cancel := context.WithCancel(context.Background())
defer cancel()
bgCtx = appcontext.WithUserID(bgCtx, cluster.UserID)
bgCtx = appcontext.WithTenantID(bgCtx, cluster.TenantID)
s.logger.Info("starting cluster scale", "cluster_id", cluster.ID, "new_workers", workers)
Expand Down
19 changes: 14 additions & 5 deletions internal/core/services/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func (s *FunctionService) DeleteFunction(ctx context.Context, id uuid.UUID) erro

// Async delete from file store
go func() {
if err := s.fileStore.Delete(context.Background(), "functions", f.CodePath); err != nil {
delCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := s.fileStore.Delete(delCtx, "functions", f.CodePath); err != nil {
s.logger.Warn("failed to delete function code from storage", "code_path", f.CodePath, "error", err)
}
}()
Expand Down Expand Up @@ -239,9 +241,16 @@ func (s *FunctionService) InvokeFunction(ctx context.Context, id uuid.UUID, payl
s.logger.Warn("failed to log audit event", "action", "function.invoke_async", "function_id", f.ID, "error", err)
}
go func() {
// Use a copy to avoid data race with the caller reading the returned invocation
bgCtx := context.Background()
bgCtx = appcontext.WithUserID(bgCtx, userID)
bgCtx = appcontext.WithTenantID(bgCtx, tenantID)
asyncInv := *invocation
_, _ = s.runInvocation(context.Background(), f, &asyncInv, payload)
if _, err := s.runInvocation(bgCtx, f, &asyncInv, payload); err != nil {
s.logger.Error("async invocation failed",
"function_id", f.ID,
"invocation_id", asyncInv.ID,
"error", err)
}
}()
return invocation, nil
}
Expand All @@ -267,12 +276,12 @@ func (s *FunctionService) runInvocation(ctx context.Context, f *domain.Function,
if err != nil {
return s.failInvocation(i, fmt.Sprintf("Error running task: %v", err), err)
}
defer func() { _ = s.compute.DeleteInstance(context.Background(), containerID) }()
defer func() { _ = s.compute.DeleteInstance(ctx, containerID) }()

statusCode, err := s.waitForTask(ctx, containerID, f.Timeout)
s.captureInvocationResults(i, containerID, statusCode, err)

if err := s.repo.CreateInvocation(context.Background(), i); err != nil {
if err := s.repo.CreateInvocation(ctx, i); err != nil {
s.logger.Error("failed to record invocation", "error", err)
}

Expand Down
80 changes: 80 additions & 0 deletions internal/core/services/function_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"errors"
"io"
"log/slog"
"os"
"path/filepath"
Expand All @@ -24,6 +25,47 @@ type testSecretSvc struct {
err error
}

// testComputeBackend is a minimal test double for ComputeBackend.
type testComputeBackend struct{}

func (t *testComputeBackend) LaunchInstanceWithOptions(ctx context.Context, opts ports.CreateInstanceOptions) (string, []string, error) {
return "", nil, nil
}
func (t *testComputeBackend) StartInstance(ctx context.Context, id string) error { return nil }
func (t *testComputeBackend) StopInstance(ctx context.Context, id string) error { return nil }
func (t *testComputeBackend) DeleteInstance(ctx context.Context, id string) error { return nil }
func (t *testComputeBackend) GetInstanceLogs(ctx context.Context, id string) (io.ReadCloser, error) { return nil, nil }
func (t *testComputeBackend) GetInstanceStats(ctx context.Context, id string) (io.ReadCloser, error) {
return nil, nil
}
func (t *testComputeBackend) GetInstancePort(ctx context.Context, id string, internalPort string) (int, error) {
return 0, nil
}
func (t *testComputeBackend) GetInstanceIP(ctx context.Context, id string) (string, error) { return "", nil }
func (t *testComputeBackend) GetConsoleURL(ctx context.Context, id string) (string, error) { return "", nil }
func (t *testComputeBackend) Exec(ctx context.Context, id string, cmd []string) (string, error) { return "", nil }
func (t *testComputeBackend) RunTask(ctx context.Context, opts ports.RunTaskOptions) (string, []string, error) {
return "", nil, nil
}
func (t *testComputeBackend) WaitTask(ctx context.Context, id string) (int64, error) { return 0, nil }
func (t *testComputeBackend) CreateNetwork(ctx context.Context, name string) (string, error) { return "", nil }
func (t *testComputeBackend) DeleteNetwork(ctx context.Context, id string) error { return nil }
func (t *testComputeBackend) AttachVolume(ctx context.Context, id string, volumePath string) (string, string, error) {
return "", "", nil
}
func (t *testComputeBackend) DetachVolume(ctx context.Context, id string, volumePath string) (string, error) {
return "", nil
}
func (t *testComputeBackend) Ping(ctx context.Context) error { return nil }
func (t *testComputeBackend) Type() string { return "test" }
func (t *testComputeBackend) ResizeInstance(ctx context.Context, id string, cpu, memory int64) error { return nil }
func (t *testComputeBackend) CreateSnapshot(ctx context.Context, id, name string) error { return nil }
func (t *testComputeBackend) RestoreSnapshot(ctx context.Context, id, name string) error { return nil }
func (t *testComputeBackend) DeleteSnapshot(ctx context.Context, id, name string) error { return nil }

// compile-time check that testComputeBackend satisfies ports.ComputeBackend
var _ ports.ComputeBackend = (*testComputeBackend)(nil)

func (t *testSecretSvc) CreateSecret(ctx context.Context, name, value, desc string) (*domain.Secret, error) {
return &domain.Secret{ID: uuid.New(), Name: name}, nil
}
Expand Down Expand Up @@ -176,6 +218,44 @@ func TestFunctionService_BuildTaskOptions(t *testing.T) {
})
}

func TestFunctionService_CaptureInvocationResults(t *testing.T) {
s := &FunctionService{logger: slog.Default(), compute: &testComputeBackend{}}

t.Run("non-zero exit code", func(t *testing.T) {
i := &domain.Invocation{ID: uuid.New(), Status: "RUNNING"}
s.captureInvocationResults(i, "task-1", 127, nil)
assert.Equal(t, "FAILED", i.Status)
assert.Equal(t, 127, i.StatusCode)
})

t.Run("error during wait", func(t *testing.T) {
i := &domain.Invocation{ID: uuid.New(), Status: "RUNNING"}
s.captureInvocationResults(i, "task-1", 0, errors.New("connection lost"))
assert.Equal(t, "FAILED", i.Status)
assert.Contains(t, i.Logs, "connection lost")
})

t.Run("timeout", func(t *testing.T) {
i := &domain.Invocation{ID: uuid.New(), Status: "RUNNING"}
s.captureInvocationResults(i, "task-1", 0, context.DeadlineExceeded)
assert.Equal(t, "FAILED", i.Status)
assert.Contains(t, i.Logs, "timed out")
})

t.Run("success", func(t *testing.T) {
i := &domain.Invocation{ID: uuid.New(), Status: "RUNNING"}
s.captureInvocationResults(i, "task-1", 0, nil)
assert.Equal(t, "SUCCESS", i.Status)
assert.Equal(t, 0, i.StatusCode)
})

t.Run("logs sanitized", func(t *testing.T) {
i := &domain.Invocation{ID: uuid.New(), Status: "RUNNING"}
s.captureInvocationResults(i, "task-1", 0, nil)
assert.NotContains(t, i.Logs, "\x00")
})
}

func TestFunctionService_NormalizeHandler(t *testing.T) {
s := &FunctionService{}

Expand Down
5 changes: 4 additions & 1 deletion internal/core/services/function_schedule_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,14 @@ func (w *FunctionScheduleWorker) runSchedule(ctx context.Context, sched *domain.
if err := w.repo.CompleteScheduleRun(ctx, run, sched, nextRun); err != nil {
log.Printf("FunctionScheduleWorker: failed to complete schedule run, retrying: %v", err)
for i := 1; i <= 3; i++ {
timer := time.NewTimer(time.Duration(i) * time.Second)
select {
case <-ctx.Done():
timer.Stop()
log.Printf("FunctionScheduleWorker: context cancelled during retry")
return
case <-time.After(time.Duration(i) * time.Second):
case <-timer.C:
timer.Stop()
}
if err := w.repo.CompleteScheduleRun(ctx, run, sched, nextRun); err == nil {
return
Expand Down
22 changes: 22 additions & 0 deletions internal/core/services/function_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"strings"
"testing"
"time"

"github.com/google/uuid"
appcontext "github.com/poyrazk/thecloud/internal/core/context"
Expand Down Expand Up @@ -179,6 +180,27 @@ func testFunctionServiceInvokeFunction(t *testing.T) {
assert.NotNil(t, inv)
assert.Equal(t, "PENDING", inv.Status)
})

t.Run("async error logged", func(t *testing.T) {
repo.On("GetByID", mock.Anything, id).Return(f, nil).Once()
auditSvc.On("Log", mock.Anything, userID, "function.invoke_async", "function", id.String(), mock.Anything).Return(nil).Once()

// Make prepareCode fail by having fileStore.Read return an error
fileStore.On("Read", mock.Anything, "functions", f.CodePath).Return(nil, io.EOF).Once()
repo.On("CreateInvocation", mock.Anything, mock.MatchedBy(func(i *domain.Invocation) bool {
return i.Status == "FAILED"
})).Return(nil).Once()

inv, err := svc.InvokeFunction(ctx, id, []byte("{}"), true)
require.NoError(t, err)
assert.NotNil(t, inv)
assert.Equal(t, "PENDING", inv.Status)

// Wait for async goroutine to complete
compute.On("RunTask", mock.Anything, mock.Anything).Return("", nil, io.EOF).Maybe()
compute.On("DeleteInstance", mock.Anything, mock.Anything).Return(nil).Maybe()
time.Sleep(200 * time.Millisecond)
})
}

func testFunctionServiceCreateFunctionUnsupportedRuntime(t *testing.T) {
Expand Down
40 changes: 30 additions & 10 deletions internal/core/services/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package services

import (
"context"
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
Expand All @@ -17,6 +18,35 @@ import (
"github.com/poyrazk/thecloud/internal/platform"
)

// serverSecret is used as HMAC key to prevent rainbow table attacks on API key hashes.
// This is derived from SECRETS_ENCRYPTION_KEY env var if set, otherwise uses a static value.
// In production, set SECRETS_ENCRYPTION_KEY for proper security.
var serverSecret = getServerSecret()

func getServerSecret() string {
// Use the secrets encryption key if available, otherwise fall back to a warning string
// that will be rejected in production
secret := platform.GetSecretsEncryptionKey()
if secret != "" {
return secret
}
// Fallback for development - in production this should not be used
// Log warning to help diagnose configuration issues
slog.Default().Warn("SECRETS_ENCRYPTION_KEY not set, using development secret for API key hashing - configure for production")
return "thecloud-development-secret-do-not-use-in-production"
}
Comment on lines +21 to +37
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Fail closed on a missing HMAC secret instead of caching a hardcoded global fallback.

If SECRETS_ENCRYPTION_KEY is unset, every process silently falls back to the same built-in value, and a single misconfigured replica will compute different API key hashes than the rest. Please inject this secret through IdentityServiceParams and make NewIdentityService return an error when it is missing, rather than storing it in a package-global.

As per coding guidelines, **/*.go: Do not use global variables (e.g., var DB *sql.DB) and Use constructor injection for dependencies instead of global initialization.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/services/identity.go` around lines 21 - 35, The package-global
serverSecret/getServerSecret pattern must be removed and the HMAC secret
injected via IdentityServiceParams so services fail closed when missing: add a
secret (or secretsEncryptionKey) field to IdentityServiceParams, update
NewIdentityService to require that field and return an error if it's empty (no
hardcoded fallback), and replace any use of serverSecret/getServerSecret in the
identity service implementation with the injected params.Secret; also update
callers that construct IdentityServiceParams to pass the environment
SECRETS_ENCRYPTION_KEY (and propagate the constructor error) so replicas compute
consistent API key hashes.


// computeKeyHash creates a HMAC-SHA256 hash of the API key using the server secret.
// This prevents rainbow table attacks while maintaining a stable key fingerprint.
// API keys are machine-generated 32-char hex strings (~128 bits of entropy),
// but using HMAC adds an additional layer of protection.
func computeKeyHash(key string) string {
//nolint:codeql // HMAC-SHA256 is used for key fingerprinting, not password hashing.
h := hmac.New(sha256.New, []byte(serverSecret))
h.Write([]byte(key))
return hex.EncodeToString(h.Sum(nil))
Comment on lines +39 to +47
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Keep a compatibility path for existing API keys before switching the hash format.

This changes the persisted key_hash value, but internal/repositories/postgres/identity_repo.go:37-44 still validates with a single WHERE key_hash = $1 lookup. That means every API key created with the previous SHA-256 scheme stops authenticating after deploy unless you backfill or do a dual-read in ValidateAPIKey().

Possible rollout shape
+func computeLegacyKeyHash(key string) string {
+	sum := sha256.Sum256([]byte(key))
+	return hex.EncodeToString(sum[:])
+}
// In ValidateAPIKey():
// 1. Try HMAC hash
// 2. If not found, try legacy SHA-256 hash
// 3. After a legacy hit, rewrite/backfill key_hash to the HMAC value
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/services/identity.go` around lines 37 - 45, computeKeyHash()
switches persisted key_hash to HMAC-SHA256, breaking existing keys because
ValidateAPIKey() in identity_repo.go only checks a single WHERE key_hash = $1;
update ValidateAPIKey() to first lookup using computeKeyHash(key), and if not
found, compute the legacy SHA-256(key) and try that lookup; on a legacy-hit,
rewrite/backfill the stored key_hash to the HMAC value (use the same repository
method that updates key_hash) so subsequent validations use the new format,
ensuring compatibility during rollout.

}

// IdentityServiceParams defines the dependencies for IdentityService.
type IdentityServiceParams struct {
Repo ports.IdentityRepository
Expand Down Expand Up @@ -222,13 +252,3 @@ func (s *IdentityService) RotateKey(ctx context.Context, userID uuid.UUID, id uu

return newKey, nil
}

func computeKeyHash(key string) string {
//nolint:codeql // SHA-256 is used as a stable key fingerprint, not password hashing.
// API keys are machine-generated 32-char hex strings (~128 bits of entropy).
// Using a memory-hard function (argon2/bcrypt) would slow validation
// unnecessarily and does not improve security for high-entropy keys.
h := sha256.New()
h.Write([]byte(key))
return hex.EncodeToString(h.Sum(nil))
}
50 changes: 50 additions & 0 deletions internal/core/services/identity_hash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package services

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestComputeKeyHash(t *testing.T) {
// Test that the hash function produces consistent output
key := "test-api-key-123"
hash1 := computeKeyHash(key)
hash2 := computeKeyHash(key)

// Should be deterministic
require.NotEmpty(t, hash1)
assert.Equal(t, hash1, hash2)

// Different keys should produce different hashes
differentHash := computeKeyHash("different-key")
assert.NotEqual(t, hash1, differentHash)
}

func TestGetServerSecret(t *testing.T) {
// Save original value
origVal := os.Getenv("SECRETS_ENCRYPTION_KEY")
defer func() {
if origVal != "" {
os.Setenv("SECRETS_ENCRYPTION_KEY", origVal)
} else {
os.Unsetenv("SECRETS_ENCRYPTION_KEY")
}
}()

t.Run("WithEnvVar", func(t *testing.T) {
os.Setenv("SECRETS_ENCRYPTION_KEY", "test-secret-key")
// Re-initialize serverSecret
serverSecret = getServerSecret()
assert.Equal(t, "test-secret-key", serverSecret)
})

t.Run("WithoutEnvVar", func(t *testing.T) {
os.Unsetenv("SECRETS_ENCRYPTION_KEY")
// Re-initialize serverSecret - will use fallback
serverSecret = getServerSecret()
assert.Equal(t, "thecloud-development-secret-do-not-use-in-production", serverSecret)
})
}
Loading