fix(#299): scope pipelines, queues, and caches by tenant#3
fix(#299): scope pipelines, queues, and caches by tenant#3jackthepunished wants to merge 4 commits intomainfrom
Conversation
The password reset flow logged the plaintext token alongside the user's email at Debug level so it could be picked up during MVP testing. Anyone with log access could mint a password reset and take over the account. Replace the Debug log with an Info entry that records only the user_id and reset-token row id. The plaintext token must reach the user via email; once an EmailService is wired in it should be the sole consumer. Closes poyrazK#293
…r injection
Object keys flowed straight into the Content-Disposition header via
fmt.Sprintf("attachment; filename=%s", key), so any key containing CRLF,
double quotes, or backslashes could split the response or inject
arbitrary headers. Path-bearing keys also leaked the full bucket path to
the client as the suggested filename.
Replace both call sites (authenticated download and presigned download)
with a single helper, contentDispositionAttachment, that:
- reduces the key to its basename via path.Base
- emits an ASCII-only `filename="..."` fallback with control bytes,
non-ASCII bytes, quotes, and backslashes mapped to `_`
- emits the full Unicode basename in `filename*=UTF-8''...` per RFC
5987 with proper attr-char percent-encoding
- guarantees CR/LF can never reach the wire
Add table-driven unit tests covering response-splitting, quote/backslash
injection, non-ASCII names, nested keys, and empty keys.
Closes poyrazK#225, poyrazK#226
Webhook delivery in NotifyService.deliverToWebhook used http.DefaultClient and only inspected transport errors, so 4xx/5xx responses were silently treated as successful deliveries. Subscribers returning failures (bad endpoints, transient 5xx) had messages dropped without any log signal. Check resp.StatusCode and emit a structured warn log on >=400 responses, add a 15s timeout via a dedicated http.Client, drain the response body before close, and stop discarding the error from NewRequestWithContext. Closes poyrazK#338 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three pipeline_repo SQL statements (UpdateBuild, ListBuildSteps, ListBuildLogs) still filtered by user_id even though their port declares the second parameter as tenantID. Result: a teammate viewing another user's build saw no steps or logs, and the worker's UpdateBuild silently scoped to the wrong column. The pipeline worker also passed job.UserID where the repo expects tenantID, so builds loaded only when user_id happened to equal tenant_id. - Rewrite the three SQL statements to scope by builds.tenant_id. - Add TenantID to BuildJob; populate it in createAndQueueBuild and consume it in the worker (loadBuildAndPipeline + ctx enrichment). - New migration 108 drops UNIQUE(user_id, name) on pipelines, caches, and queues and replaces it with UNIQUE(tenant_id, name), which is what the service-layer name checks have always assumed. A defensive CTE renames pre-existing collisions before the constraint flip so the migration is safe on populated databases. UserID stays on rows for attribution; only the scope of queries and uniqueness changes, matching the issue's "tenant ownership with user attribution" prescription.
📝 WalkthroughWalkthroughThis PR migrates Pipelines, Queues, and Caches from user-scoped to tenant-scoped ownership by propagating tenant context through the build pipeline, updating database constraints to enforce tenant-based uniqueness, and refactoring repository queries to use tenant IDs instead of user IDs. Additional improvements include webhook delivery reliability, storage download security, and password reset logging safety. Changes
Sequence DiagramsequenceDiagram
participant Handler as Webhook Handler
participant Service as Pipeline Service
participant Repo as Repository
participant Queue as Job Queue
participant Worker as Pipeline Worker
Handler->>Service: HandleWebhookPush(ctx, payload)
Note over Service: Set TenantID in context
Service->>Service: webhookCtx = WithTenantID(ctx, tenant)
Service->>Repo: CreateBuild(webhookCtx, build)
Repo->>Repo: Insert with tenant_id
Repo-->>Service: build{id, tenant_id, ...}
Note over Service: Create BuildJob with TenantID
Service->>Service: job = BuildJob{<br/> BuildID, TenantID,<br/> PipelineID, UserID<br/>}
Service->>Queue: Enqueue(job)
Queue-->>Service: ✓
Note over Queue: Job persisted with tenant_id
Worker->>Queue: Dequeue()
Queue-->>Worker: job{BuildID, TenantID, ...}
Note over Worker: Propagate tenant context
Worker->>Worker: ctx = WithTenantID(ctx, job.TenantID)
Worker->>Repo: GetBuild(ctx, buildID, tenantID)
Repo->>Repo: WHERE id=? AND tenant_id=?
Repo-->>Worker: build
Worker->>Repo: GetPipeline(ctx, pipelineID, tenantID)
Repo-->>Worker: pipeline
Note over Worker: Process build with<br/>tenant-scoped resources
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
There was a problem hiding this comment.
Pull request overview
This PR addresses multi-tenant scoping correctness by ensuring pipeline/build persistence and worker execution use tenant_id (not user_id) for filtering and uniqueness, aligning DB constraints with service-layer assumptions. It also includes a hardening change for storage download headers and improved notify webhook handling/tests.
Changes:
- Scope pipeline build queries and worker load paths by
tenant_id(incl.BuildJob.TenantIDpropagation). - Add migration to replace
UNIQUE(user_id, name)withUNIQUE(tenant_id, name)for pipelines/caches/queues, with pre-migration de-duplication. - Harden
Content-Dispositionfor downloads and improve notify webhook delivery behavior + tests.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/workers/pipeline_worker_test.go | Updates worker unit test to include TenantID and assert tenant-scoped repo calls. |
| internal/workers/pipeline_worker.go | Sets tenant context and uses TenantID when loading build/pipeline. |
| internal/repositories/postgres/pipeline_repo.go | Rewrites SQL to filter builds/steps/logs by builds.tenant_id and passes tenant IDs through repo methods. |
| internal/repositories/postgres/migrations/108_tenant_scoped_resource_uniqueness.up.sql | Renames tenant-scope collisions then flips unique constraints to (tenant_id, name). |
| internal/repositories/postgres/migrations/108_tenant_scoped_resource_uniqueness.down.sql | Restores prior unique constraint shape (user_id, name). |
| internal/handlers/storage_handler_content_disposition_test.go | Adds tests for safe Content-Disposition: attachment header construction. |
| internal/handlers/storage_handler.go | Implements safe content-disposition builder + uses it for downloads/presigned downloads. |
| internal/core/services/pipeline.go | Ensures webhook-triggered builds carry tenant context and enqueue jobs with TenantID. |
| internal/core/services/password_reset.go | Stops logging plaintext reset tokens; logs issuance metadata instead. |
| internal/core/services/notify_unit_test.go | Adds/updates unit tests including non-2xx webhook status logging assertion. |
| internal/core/services/notify.go | Adds HTTP client timeout, handles webhook request construction errors, drains response body, and logs non-2xx webhook responses. |
| internal/core/domain/jobs.go | Adds TenantID to domain.BuildJob. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ctx, cancel := context.WithTimeout(workerCtx, 30*time.Minute) | ||
| defer cancel() | ||
| ctx = appcontext.WithUserID(ctx, job.UserID) | ||
| ctx = appcontext.WithTenantID(ctx, job.TenantID) | ||
|
|
There was a problem hiding this comment.
BuildJob now requires TenantID, but json.Unmarshal will leave it as uuid.Nil for any in-flight/older queued messages produced before this change. The worker then scopes GetBuild/GetPipeline by a zero tenant ID, causing jobs to be acked as not-found (or repeatedly nacked) and potentially dropping builds. Consider adding backward-compatible handling (e.g., if job.TenantID == uuid.Nil, derive tenant from the build record / pipeline, or explicitly fail+nack with a clear reason) and/or plan a queue drain before deploy.
| UPDATE pipelines p | ||
| SET name = p.name || '-' || substr(replace(r.user_id::text, '-', ''), 1, 8) | ||
| FROM ranked r | ||
| WHERE p.id = r.id AND r.rn > 1; |
There was a problem hiding this comment.
The collision-disambiguation renames duplicates by appending an 8-char user-id suffix, but this can still violate the upcoming (tenant_id, name) uniqueness if a non-colliding row already has the same suffixed name (e.g., an existing resource named foo-<sameSuffix>). To make the migration reliably safe on populated DBs, consider using a guaranteed-unique suffix (e.g., based on the row id, rn, or a loop that retries until unique) before adding the new constraint.
| ALTER TABLE pipelines DROP CONSTRAINT IF EXISTS pipelines_user_id_name_key; | ||
| ALTER TABLE caches DROP CONSTRAINT IF EXISTS caches_user_id_name_key; | ||
| ALTER TABLE queues DROP CONSTRAINT IF EXISTS queues_user_id_name_key; | ||
|
|
There was a problem hiding this comment.
tenant_id is nullable in the existing schema (it was added without NOT NULL), and Postgres UNIQUE (tenant_id, name) allows multiple rows where tenant_id is NULL. If any rows still have tenant_id IS NULL (or future inserts forget to set it), this constraint won’t enforce uniqueness and tenant-scoped lookups can behave unexpectedly. Consider backfilling/asserting non-null tenant IDs here and/or using a partial unique index/constraint that matches the intended invariants.
| DO $$ | |
| BEGIN | |
| IF EXISTS (SELECT 1 FROM pipelines WHERE tenant_id IS NULL) THEN | |
| RAISE EXCEPTION 'Cannot enforce tenant-scoped uniqueness on pipelines: rows with NULL tenant_id still exist'; | |
| END IF; | |
| END $$; | |
| DO $$ | |
| BEGIN | |
| IF EXISTS (SELECT 1 FROM caches WHERE tenant_id IS NULL) THEN | |
| RAISE EXCEPTION 'Cannot enforce tenant-scoped uniqueness on caches: rows with NULL tenant_id still exist'; | |
| END IF; | |
| END $$; | |
| DO $$ | |
| BEGIN | |
| IF EXISTS (SELECT 1 FROM queues WHERE tenant_id IS NULL) THEN | |
| RAISE EXCEPTION 'Cannot enforce tenant-scoped uniqueness on queues: rows with NULL tenant_id still exist'; | |
| END IF; | |
| END $$; | |
| ALTER TABLE pipelines ALTER COLUMN tenant_id SET NOT NULL; | |
| ALTER TABLE caches ALTER COLUMN tenant_id SET NOT NULL; | |
| ALTER TABLE queues ALTER COLUMN tenant_id SET NOT NULL; |
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (1)
internal/handlers/storage_handler_content_disposition_test.go (1)
52-55: Rename this test case/comment for clarity.The case name says “falls back to download”, but expected output is
filename="folder". Renaming the case/comment to reflect basename normalization would reduce confusion.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/handlers/storage_handler_content_disposition_test.go` around lines 52 - 55, The test case string "trailing slash falls back to download" and its inline comment are misleading; update the test case name to something that reflects basename normalization (e.g., "trailing slash normalizes to basename") and adjust the inline comment near wantFilename/wantFilenameStar to indicate that path.Base normalizes "folder/" to "folder" rather than implying a download fallback. Locate the table entry with name: "trailing slash falls back to download" and change both the name and comment text accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/core/services/notify_unit_test.go`:
- Around line 393-404: The test Publish_WebhookDeliveryError currently hardcodes
sub.Endpoint to "http://localhost:9999/..." which can collide with environment
services; replace that hardcoded port by programmatically allocating an
ephemeral TCP port (e.g. net.Listen("tcp", "127.0.0.1:0")), read the actual port
from the listener's Addr(), close the listener, and set sub.Endpoint to
"http://127.0.0.1:<port>/nonexistent" so the test deterministically targets an
unused port; update the test's sub variable initialization in the
Publish_WebhookDeliveryError test accordingly (use net.Listen to get the port
and then close it before assigning sub.Endpoint).
In `@internal/core/services/notify.go`:
- Around line 138-147: After each repository fetch (calls to s.repo.GetTopicByID
and any repo method that returns topic or sub), add a nil check before
dereferencing: if topic == nil { return nil, fmt.Errorf("topic not found: %w",
domain.ErrNotFound) } (or return the appropriate typed error used across the
service) and similarly guard against sub == nil where sub is used; update the
code around the symbols s.repo.GetTopicByID, topic, and the creation/usage of
domain.Subscription (the places referenced around the previous topic/sub
dereferences) to return a clear error instead of letting a nil dereference
panic.
- Around line 242-251: The goroutine still derives deliveryCtx from the
request-scoped ctx (so cancellations propagate); change the parent to a
non-request context by creating deliveryCtx via
context.WithTimeout(context.Background(), 30*time.Second) (or by creating a new
context and explicitly copying any required trace/metadata) and pass that
deliveryCtx and cancel into s.deliver instead of deriving from the incoming ctx;
update the anonymous func signature usage around context.WithTimeout,
deliveryCtx, cancel, and the s.deliver call to reference the new parent.
- Around line 61-64: The duplicate-check currently discards errors by using
"existing, _ := s.repo.GetTopicByName(ctx, name, userID)"; change this to
capture and handle the error (e.g., "existing, err :=
s.repo.GetTopicByName(...)"), return the error for unexpected repository
failures, and only treat "not found" as the safe path (compare err against the
repo's NotFound sentinel or sql.ErrNoRows). Update the logic around
GetTopicByName to: if err != nil and err is not NotFound -> return the error; if
existing != nil -> return the "topic already exists" error; otherwise proceed.
In `@internal/core/services/password_reset.go`:
- Around line 78-80: The password reset flow currently issues tokens but never
delivers them; update the service in password_reset.go to require and use an
injected delivery dependency (e.g., EmailService) from your DI wiring
(internal/api/setup/dependencies.go) and call it after creating the reset token
(instead of or in addition to the current s.logger.Info call) to send the token
to the user email; ensure you only log non-sensitive identifiers (token ID, user
ID), never the plaintext token, and if the EmailService.SendPasswordReset (or
SendResetToken) call fails return an error so the handler
(internal/handlers/auth_handler.go) does not treat issuance as successful.
In
`@internal/repositories/postgres/migrations/108_tenant_scoped_resource_uniqueness.down.sql`:
- Around line 7-9: The rollback re-adds UNIQUE (user_id, name) constraints on
pipelines, caches, and queues (constraints pipelines_user_id_name_key,
caches_user_id_name_key, queues_user_id_name_key) but lacks a pre-normalization
dedupe step; add a rollback-time dedupe that mirrors the up-migration strategy:
identify rows that would collide when tenant-scoped fields were removed, keep a
single canonical row per (user_id, name) (e.g., by id or newest/oldest), delete
or merge duplicates, and only then run the three ALTER TABLE ... ADD CONSTRAINT
statements to avoid constraint violations during rollback.
In
`@internal/repositories/postgres/migrations/108_tenant_scoped_resource_uniqueness.up.sql`:
- Around line 24-27: The current duplicate rename SQL uses an 8-char substring
of user_id in the UPDATE for pipelines (the statement starting "UPDATE pipelines
p SET name = p.name || '-' || substr... FROM ranked r WHERE p.id = r.id AND r.rn
> 1;"), which can still collide; change the suffix to a collision-resistant
unique value (e.g., use the full row id or the ranked row's id) when renaming
duplicates and apply the same safe-suffix logic to the equivalent UPDATEs for
caches and queues so the subsequent UNIQUE (tenant_id, name) constraint creation
cannot fail due to name collisions.
In `@internal/workers/pipeline_worker.go`:
- Around line 138-142: Add an explicit guard that checks job.TenantID !=
uuid.Nil before proceeding (i.e., before creating the 30-minute context or
before calling appcontext.WithTenantID) and return/ack the message with a clear
log/error if tenant is nil; this prevents passing uuid.Nil into GetBuild() /
GetPipeline() which would silently return no results. Ensure the check
references job.TenantID and that appcontext.WithTenantID is only called after
the nil check, and add a short log including the job ID when
rejecting/acknowledging such nil-tenant jobs so they are observable.
---
Nitpick comments:
In `@internal/handlers/storage_handler_content_disposition_test.go`:
- Around line 52-55: The test case string "trailing slash falls back to
download" and its inline comment are misleading; update the test case name to
something that reflects basename normalization (e.g., "trailing slash normalizes
to basename") and adjust the inline comment near wantFilename/wantFilenameStar
to indicate that path.Base normalizes "folder/" to "folder" rather than implying
a download fallback. Locate the table entry with name: "trailing slash falls
back to download" and change both the name and comment text accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b0e99c0c-2f41-4fb1-af93-4f6e07e43f46
📒 Files selected for processing (12)
internal/core/domain/jobs.gointernal/core/services/notify.gointernal/core/services/notify_unit_test.gointernal/core/services/password_reset.gointernal/core/services/pipeline.gointernal/handlers/storage_handler.gointernal/handlers/storage_handler_content_disposition_test.gointernal/repositories/postgres/migrations/108_tenant_scoped_resource_uniqueness.down.sqlinternal/repositories/postgres/migrations/108_tenant_scoped_resource_uniqueness.up.sqlinternal/repositories/postgres/pipeline_repo.gointernal/workers/pipeline_worker.gointernal/workers/pipeline_worker_test.go
| t.Run("Publish_WebhookDeliveryError", func(t *testing.T) { | ||
| done := make(chan struct{}) | ||
| topicID := uuid.New() | ||
| subID := uuid.New() | ||
| topic := &domain.Topic{ID: topicID, UserID: userID} | ||
| sub := &domain.Subscription{ | ||
| ID: subID, | ||
| UserID: userID, | ||
| TopicID: topicID, | ||
| Protocol: domain.ProtocolWebhook, | ||
| Endpoint: "http://localhost:9999/nonexistent", | ||
| } |
There was a problem hiding this comment.
Avoid hardcoded localhost ports in network-error tests.
Line 403 can become environment-dependent if port 9999 is bound. Prefer allocating an ephemeral port and closing it to guarantee connection failure deterministically.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/core/services/notify_unit_test.go` around lines 393 - 404, The test
Publish_WebhookDeliveryError currently hardcodes sub.Endpoint to
"http://localhost:9999/..." which can collide with environment services; replace
that hardcoded port by programmatically allocating an ephemeral TCP port (e.g.
net.Listen("tcp", "127.0.0.1:0")), read the actual port from the listener's
Addr(), close the listener, and set sub.Endpoint to
"http://127.0.0.1:<port>/nonexistent" so the test deterministically targets an
unused port; update the test's sub variable initialization in the
Publish_WebhookDeliveryError test accordingly (use net.Listen to get the port
and then close it before assigning sub.Endpoint).
| existing, _ := s.repo.GetTopicByName(ctx, name, userID) | ||
| if existing != nil { | ||
| return nil, fmt.Errorf("topic with name %s already exists", name) | ||
| } |
There was a problem hiding this comment.
Handle duplicate-check repository errors instead of discarding them.
Line 61 ignores GetTopicByName errors, which can treat DB failures as “topic does not exist”.
Proposed fix
- existing, _ := s.repo.GetTopicByName(ctx, name, userID)
+ existing, err := s.repo.GetTopicByName(ctx, name, userID)
+ if err != nil {
+ return nil, err
+ }
if existing != nil {
return nil, fmt.Errorf("topic with name %s already exists", name)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| existing, _ := s.repo.GetTopicByName(ctx, name, userID) | |
| if existing != nil { | |
| return nil, fmt.Errorf("topic with name %s already exists", name) | |
| } | |
| existing, err := s.repo.GetTopicByName(ctx, name, userID) | |
| if err != nil { | |
| return nil, err | |
| } | |
| if existing != nil { | |
| return nil, fmt.Errorf("topic with name %s already exists", name) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/core/services/notify.go` around lines 61 - 64, The duplicate-check
currently discards errors by using "existing, _ := s.repo.GetTopicByName(ctx,
name, userID)"; change this to capture and handle the error (e.g., "existing,
err := s.repo.GetTopicByName(...)"), return the error for unexpected repository
failures, and only treat "not found" as the safe path (compare err against the
repo's NotFound sentinel or sql.ErrNoRows). Update the logic around
GetTopicByName to: if err != nil and err is not NotFound -> return the error; if
existing != nil -> return the "topic already exists" error; otherwise proceed.
| topic, err := s.repo.GetTopicByID(ctx, topicID, userID) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| sub := &domain.Subscription{ | ||
| ID: uuid.New(), | ||
| UserID: userID, | ||
| TopicID: topic.ID, | ||
| Protocol: protocol, |
There was a problem hiding this comment.
Guard against nil dereference after repository fetches.
At Line 146, Line 198, and Line 226, topic/sub is dereferenced without a nil check. If a repo implementation returns (nil, nil) for missing rows, this panics.
Proposed fix
topic, err := s.repo.GetTopicByID(ctx, topicID, userID)
if err != nil {
return nil, err
}
+ if topic == nil {
+ return nil, fmt.Errorf("topic not found")
+ } sub, err := s.repo.GetSubscriptionByID(ctx, id, userID)
if err != nil {
return err
}
+ if sub == nil {
+ return fmt.Errorf("subscription not found")
+ } topic, err := s.repo.GetTopicByID(ctx, topicID, userID)
if err != nil {
return err
}
+ if topic == nil {
+ return fmt.Errorf("topic not found")
+ }Also applies to: 193-199, 219-227
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/core/services/notify.go` around lines 138 - 147, After each
repository fetch (calls to s.repo.GetTopicByID and any repo method that returns
topic or sub), add a nil check before dereferencing: if topic == nil { return
nil, fmt.Errorf("topic not found: %w", domain.ErrNotFound) } (or return the
appropriate typed error used across the service) and similarly guard against sub
== nil where sub is used; update the code around the symbols
s.repo.GetTopicByID, topic, and the creation/usage of domain.Subscription (the
places referenced around the previous topic/sub dereferences) to return a clear
error instead of letting a nil dereference panic.
| // Create a background context for async delivery to avoid request cancellation | ||
| // but keep it separate for each subscriber to avoid shared timeout issues | ||
| go func(c context.Context, sub *domain.Subscription) { | ||
| deliveryCtx, cancel := context.WithTimeout(c, 30*time.Second) | ||
| defer cancel() | ||
|
|
||
| // Carry over potential trace IDs or other metadata if needed | ||
| // (Simplified for now, but avoids using request-scoped ctx) | ||
| s.deliver(deliveryCtx, sub, body) | ||
| }(ctx, sub) |
There was a problem hiding this comment.
Async delivery still inherits request cancellation.
Line 245 derives deliveryCtx from request ctx, so canceled client requests can still cancel webhook/queue delivery, despite the comment saying otherwise.
Proposed fix
- go func(c context.Context, sub *domain.Subscription) {
- deliveryCtx, cancel := context.WithTimeout(c, 30*time.Second)
+ go func(sub *domain.Subscription) {
+ deliveryCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
-
- // Carry over potential trace IDs or other metadata if needed
- // (Simplified for now, but avoids using request-scoped ctx)
s.deliver(deliveryCtx, sub, body)
- }(ctx, sub)
+ }(sub)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Create a background context for async delivery to avoid request cancellation | |
| // but keep it separate for each subscriber to avoid shared timeout issues | |
| go func(c context.Context, sub *domain.Subscription) { | |
| deliveryCtx, cancel := context.WithTimeout(c, 30*time.Second) | |
| defer cancel() | |
| // Carry over potential trace IDs or other metadata if needed | |
| // (Simplified for now, but avoids using request-scoped ctx) | |
| s.deliver(deliveryCtx, sub, body) | |
| }(ctx, sub) | |
| // Create a background context for async delivery to avoid request cancellation | |
| // but keep it separate for each subscriber to avoid shared timeout issues | |
| go func(sub *domain.Subscription) { | |
| deliveryCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
| defer cancel() | |
| s.deliver(deliveryCtx, sub, body) | |
| }(sub) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/core/services/notify.go` around lines 242 - 251, The goroutine still
derives deliveryCtx from the request-scoped ctx (so cancellations propagate);
change the parent to a non-request context by creating deliveryCtx via
context.WithTimeout(context.Background(), 30*time.Second) (or by creating a new
context and explicitly copying any required trace/metadata) and pass that
deliveryCtx and cancel into s.deliver instead of deriving from the incoming ctx;
update the anonymous func signature usage around context.WithTimeout,
deliveryCtx, cancel, and the s.deliver call to reference the new parent.
| // TODO: deliver `token` via an injected EmailService once available. | ||
| // Never log or persist the plaintext token — its only safe destination is the user. | ||
| s.logger.Info("password reset token issued", "user_id", user.ID, "token_id", resetToken.ID) |
There was a problem hiding this comment.
Password reset tokens are now generated but not deliverable.
Line 78 correctly removes sensitive token logging, but there is still no actual delivery path, so users cannot complete reset with the current flow. With current wiring (internal/api/setup/dependencies.go Lines 240-242) and handler behavior (internal/handlers/auth_handler.go Lines 112-132), this silently creates unusable tokens while returning success.
Please wire an injected token-delivery dependency (e.g., EmailService) in this path before treating issuance as successful.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/core/services/password_reset.go` around lines 78 - 80, The password
reset flow currently issues tokens but never delivers them; update the service
in password_reset.go to require and use an injected delivery dependency (e.g.,
EmailService) from your DI wiring (internal/api/setup/dependencies.go) and call
it after creating the reset token (instead of or in addition to the current
s.logger.Info call) to send the token to the user email; ensure you only log
non-sensitive identifiers (token ID, user ID), never the plaintext token, and if
the EmailService.SendPasswordReset (or SendResetToken) call fails return an
error so the handler (internal/handlers/auth_handler.go) does not treat issuance
as successful.
| ALTER TABLE pipelines ADD CONSTRAINT pipelines_user_id_name_key UNIQUE (user_id, name); | ||
| ALTER TABLE caches ADD CONSTRAINT caches_user_id_name_key UNIQUE (user_id, name); | ||
| ALTER TABLE queues ADD CONSTRAINT queues_user_id_name_key UNIQUE (user_id, name); |
There was a problem hiding this comment.
Rollback can fail on post-migration data shape.
Re-adding UNIQUE (user_id, name) without pre-normalizing collisions can break rollback if users created same resource names across different tenants while tenant-scoped constraints were active. Add a rollback-time dedupe step (similar to the up migration strategy) before these constraints.
💡 Suggested rollback-safe pattern
+-- Pre-normalize collisions for user-scoped uniqueness before re-adding constraints.
+WITH ranked AS (
+ SELECT id, user_id, name,
+ ROW_NUMBER() OVER (PARTITION BY user_id, name ORDER BY created_at, id) AS rn
+ FROM pipelines
+)
+UPDATE pipelines p
+SET name = p.name || '-rollback-' || substr(p.id::text, 1, 8)
+FROM ranked r
+WHERE p.id = r.id AND r.rn > 1;
+
+-- Repeat for caches and queues, then add UNIQUE(user_id, name).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@internal/repositories/postgres/migrations/108_tenant_scoped_resource_uniqueness.down.sql`
around lines 7 - 9, The rollback re-adds UNIQUE (user_id, name) constraints on
pipelines, caches, and queues (constraints pipelines_user_id_name_key,
caches_user_id_name_key, queues_user_id_name_key) but lacks a pre-normalization
dedupe step; add a rollback-time dedupe that mirrors the up-migration strategy:
identify rows that would collide when tenant-scoped fields were removed, keep a
single canonical row per (user_id, name) (e.g., by id or newest/oldest), delete
or merge duplicates, and only then run the three ALTER TABLE ... ADD CONSTRAINT
statements to avoid constraint violations during rollback.
| UPDATE pipelines p | ||
| SET name = p.name || '-' || substr(replace(r.user_id::text, '-', ''), 1, 8) | ||
| FROM ranked r | ||
| WHERE p.id = r.id AND r.rn > 1; |
There was a problem hiding this comment.
Current rename strategy is not collision-safe for constraint creation.
Using only the first 8 chars of user_id for renamed duplicates can still produce duplicate names within a tenant, causing UNIQUE (tenant_id, name) creation to fail. Use a collision-resistant suffix (e.g., full row id or a uniqueness loop) for all three tables.
💡 Safer migration pattern
-SET name = p.name || '-' || substr(replace(r.user_id::text, '-', ''), 1, 8)
+SET name = p.name || '-migrated-' || r.id::textApply the same change to caches and queues updates.
Also applies to: 41-44, 58-61
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@internal/repositories/postgres/migrations/108_tenant_scoped_resource_uniqueness.up.sql`
around lines 24 - 27, The current duplicate rename SQL uses an 8-char substring
of user_id in the UPDATE for pipelines (the statement starting "UPDATE pipelines
p SET name = p.name || '-' || substr... FROM ranked r WHERE p.id = r.id AND r.rn
> 1;"), which can still collide; change the suffix to a collision-resistant
unique value (e.g., use the full row id or the ranked row's id) when renaming
duplicates and apply the same safe-suffix logic to the equivalent UPDATEs for
caches and queues so the subsequent UNIQUE (tenant_id, name) constraint creation
cannot fail due to name collisions.
| ctx, cancel := context.WithTimeout(workerCtx, 30*time.Minute) | ||
| defer cancel() | ||
| ctx = appcontext.WithUserID(ctx, job.UserID) | ||
| ctx = appcontext.WithTenantID(ctx, job.TenantID) | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n internal/workers/pipeline_worker.go | sed -n '130,160p'Repository: jackthepunished/thecloud
Length of output: 1380
🏁 Script executed:
# Find the job struct definition
rg "type.*Job struct" internal/workers/pipeline_worker.go -A 15Repository: jackthepunished/thecloud
Length of output: 50
🏁 Script executed:
# Also search for the Job type more broadly in the workers directory
fd "\.go$" internal/workers/ -x rg "type.*Job[^a-zA-Z]" {} +Repository: jackthepunished/thecloud
Length of output: 1202
🏁 Script executed:
# Look at the loadBuildAndPipeline function
rg "func.*loadBuildAndPipeline" internal/workers/pipeline_worker.go -A 30Repository: jackthepunished/thecloud
Length of output: 1169
🏁 Script executed:
# Find the BuildJob struct definition
rg "type BuildJob struct" -A 20Repository: jackthepunished/thecloud
Length of output: 622
🏁 Script executed:
# Look at the beginning of the handler to see where job comes from
cat -n internal/workers/pipeline_worker.go | sed -n '80,145p'Repository: jackthepunished/thecloud
Length of output: 2745
🏁 Script executed:
# Search for GetBuild implementation
rg "func.*GetBuild\(" -A 15Repository: jackthepunished/thecloud
Length of output: 7883
🏁 Script executed:
# Check if there's any validation of TenantID before line 138
rg "TenantID|tenant_id" internal/workers/pipeline_worker.go | head -30Repository: jackthepunished/thecloud
Length of output: 253
🏁 Script executed:
# Check appcontext.WithTenantID implementation
rg "func WithTenantID" -A 5Repository: jackthepunished/thecloud
Length of output: 546
🏁 Script executed:
# Search for any guards/validation on uuid.Nil before processing
rg "uuid.Nil|TenantID.*Nil|TenantID == " internal/workers/pipeline_worker.goRepository: jackthepunished/thecloud
Length of output: 50
🏁 Script executed:
# Check the actual behavior - does uuid.Nil ever represent a valid tenant?
# Search for uuid.Nil usage in tenant-related code
rg "uuid.Nil" internal/repositories/postgres/pipeline_repo.goRepository: jackthepunished/thecloud
Length of output: 50
🏁 Script executed:
# Check how builds are actually inserted - does any code ever create a build with nil/zero tenant?
rg "CreateBuild|INSERT INTO builds" -A 10 internal/repositories/postgres/Repository: jackthepunished/thecloud
Length of output: 2089
🏁 Script executed:
# Verify the JSON unmarshal behavior - what happens when tenant_id field is missing?
cat -n internal/core/domain/jobs.go | sed -n '1,50p'Repository: jackthepunished/thecloud
Length of output: 2491
Add explicit nil-tenant guard before processing job to prevent silent drops from older queued payloads.
During rollout, older queued payloads may lack the tenant_id field, causing job.TenantID to deserialize as uuid.Nil. When passed to GetBuild() and GetPipeline() without validation (lines 141, 143), the queries search for tenant_id = '00000000-0000-0000-0000-000000000000', which returns no results. The worker then treats this as "build not found" and acks the message, silently dropping the job.
Add a guard before line 138 to validate that job.TenantID != uuid.Nil, or enforce a queue drain before deploying this change.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/workers/pipeline_worker.go` around lines 138 - 142, Add an explicit
guard that checks job.TenantID != uuid.Nil before proceeding (i.e., before
creating the 30-minute context or before calling appcontext.WithTenantID) and
return/ack the message with a clear log/error if tenant is nil; this prevents
passing uuid.Nil into GetBuild() / GetPipeline() which would silently return no
results. Ensure the check references job.TenantID and that
appcontext.WithTenantID is only called after the nil check, and add a short log
including the job ID when rejecting/acknowledging such nil-tenant jobs so they
are observable.
Closes poyrazK#299.
Summary
UpdateBuild,ListBuildSteps,ListBuildLogs) to filter bybuilds.tenant_id— they were still filtering byuser_ideven though the port already declares the second parameter astenantID. As a result, teammates viewing another user's build saw no steps or logs, and the worker'sUpdateBuildsilently scoped to the wrong column.TenantIDtodomain.BuildJob; populate it increateAndQueueBuildand consume it in the pipeline worker. Previously the worker passedjob.UserIDwhereGetBuild/GetPipelineexpectedtenantID, so builds only loaded whenuser_idhappened to equaltenant_id.108_tenant_scoped_resource_uniquenessdropsUNIQUE(user_id, name)onpipelines,caches, andqueuesand replaces it withUNIQUE(tenant_id, name)— what the service-layer name checks have always assumed. A CTE renames pre-existing tenant-scope collisions before the flip so the migration is safe on populated databases. Down-migration restores the prior shape.UserIDstays on rows for attribution; only the scope of queries and uniqueness changes — matching the issue's "tenant ownership with user attribution" prescription.Test plan
go build ./...andgo vet ./...passgo test ./internal/repositories/postgres/... ./internal/core/services/... ./internal/workers/...\d pipelines,\d caches,\d queues)UpdateBuildno longer fails on the tenant-scope mismatch)Summary by CodeRabbit
New Features
Bug Fixes