diff --git a/controlplane/admin/api.go b/controlplane/admin/api.go index 0a87b0d..de4c7bc 100644 --- a/controlplane/admin/api.go +++ b/controlplane/admin/api.go @@ -126,6 +126,13 @@ type apiStore interface { GetManagedWarehouse(orgID string) (*configstore.ManagedWarehouse, error) UpsertManagedWarehouse(orgID string, warehouse *configstore.ManagedWarehouse) (*configstore.ManagedWarehouse, bool, error) + // MutateManagedWarehouse loads the existing warehouse (or a zero value if + // none), calls mutate to apply changes, and persists the result — all + // inside a single transaction with a row-level lock on the warehouse row. + // Closes the read-modify-write race that plain Get+Upsert is exposed to + // when concurrent PUTs target the same org. Returns (nil, false, nil) if + // the org doesn't exist. + MutateManagedWarehouse(orgID string, mutate func(*configstore.ManagedWarehouse) error) (*configstore.ManagedWarehouse, bool, error) GetGlobalConfig() (configstore.GlobalConfig, error) SaveGlobalConfig(cfg *configstore.GlobalConfig) error @@ -296,6 +303,57 @@ func (s *gormAPIStore) UpsertManagedWarehouse(orgID string, warehouse *configsto return stored, true, nil } +func (s *gormAPIStore) MutateManagedWarehouse(orgID string, mutate func(*configstore.ManagedWarehouse) error) (*configstore.ManagedWarehouse, bool, error) { + var ( + stored *configstore.ManagedWarehouse + orgExists bool + ) + err := s.db().Transaction(func(tx *gorm.DB) error { + var count int64 + if err := tx.Model(&configstore.Org{}).Where("name = ?", orgID).Count(&count).Error; err != nil { + return err + } + if count == 0 { + return nil + } + orgExists = true + + // SELECT ... FOR UPDATE: blocks concurrent mutators on the same row + // until this transaction commits. A missing row is not an error — + // PUT on a brand-new warehouse lands in the same path. + var warehouse configstore.ManagedWarehouse + err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). + First(&warehouse, "org_id = ?", orgID).Error + if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { + return err + } + + if err := mutate(&warehouse); err != nil { + return err + } + + warehouse.OrgID = orgID + warehouse.UpdatedAt = time.Now().UTC() + if err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "org_id"}}, + DoUpdates: clause.AssignmentColumns(managedWarehouseUpsertColumns()), + }).Create(&warehouse).Error; err != nil { + return err + } + + var reloaded configstore.ManagedWarehouse + if err := tx.First(&reloaded, "org_id = ?", orgID).Error; err != nil { + return err + } + stored = &reloaded + return nil + }) + if err != nil { + return nil, orgExists, err + } + return stored, orgExists, nil +} + func managedWarehouseUpsertColumns() []string { return []string{ "image", @@ -407,6 +465,13 @@ type apiHandler struct { info OrgStackInfo } +// managedWarehouseRequest is the whitelist of fields a caller may set on the +// PUT endpoint. It's only used for strict decode (DisallowUnknownFields) — the +// actual merge is performed by json.Unmarshal directly onto a ManagedWarehouse +// (see putManagedWarehouse). For that to work, every `json:` tag here must +// match the corresponding `json:` tag on configstore.ManagedWarehouse. If you +// add a field here without a matching tag on ManagedWarehouse, strict decode +// will accept it and the merge will silently drop it. type managedWarehouseRequest struct { WarehouseDatabase configstore.ManagedWarehouseDatabase `json:"warehouse_database"` MetadataStore configstore.ManagedWarehouseMetadataStore `json:"metadata_store"` @@ -544,6 +609,14 @@ func (h *apiHandler) getManagedWarehouse(c *gin.Context) { c.JSON(http.StatusOK, warehouse) } +// warehouseBadRequestError marks an error from the mutate closure as caused by +// bad caller input rather than a store-level failure. The handler maps it to +// 400, not 500. +type warehouseBadRequestError struct{ err error } + +func (e warehouseBadRequestError) Error() string { return e.err.Error() } +func (e warehouseBadRequestError) Unwrap() error { return e.err } + func (h *apiHandler) putManagedWarehouse(c *gin.Context) { orgID := c.Param("id") @@ -562,43 +635,39 @@ func (h *apiHandler) putManagedWarehouse(c *gin.Context) { return } - // Load any existing row, then unmarshal the body onto it. json.Unmarshal - // only overwrites fields whose JSON keys appear in the body — both at the - // top level AND within each nested struct. Callers can therefore PATCH a - // single field (e.g. `{"metadata_store":{"database_name":"x"}}`) without - // wiping sibling fields. Note: concurrent PUTs on the same org can still - // interleave (read-modify-write across two store calls); the admin API is - // low-frequency enough that we accept this for now. - existing, err := h.store.GetManagedWarehouse(orgID) - if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - var warehouse configstore.ManagedWarehouse - if err == nil { - warehouse = *existing - } - if err := json.Unmarshal(body, &warehouse); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - cfgView := &configstore.ManagedWarehouseConfig{ - OrgID: orgID, - WarehouseDatabase: warehouse.WarehouseDatabase, - MetadataStore: warehouse.MetadataStore, - S3: warehouse.S3, - WorkerIdentity: warehouse.WorkerIdentity, - WarehouseDatabaseCredentials: warehouse.WarehouseDatabaseCredentials, - MetadataStoreCredentials: warehouse.MetadataStoreCredentials, - S3Credentials: warehouse.S3Credentials, - RuntimeConfig: warehouse.RuntimeConfig, - } - if err := configstore.ValidateManagedWarehouseSecretRefs(orgID, "", cfgView); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - stored, ok, err := h.store.UpsertManagedWarehouse(orgID, &warehouse) + // MutateManagedWarehouse locks the row inside a transaction, runs the + // closure, and commits — closing the race where two concurrent PUTs would + // otherwise Get + modify different snapshots and silently clobber each + // other. The closure does the merge and validation on the locked row. + stored, ok, err := h.store.MutateManagedWarehouse(orgID, func(w *configstore.ManagedWarehouse) error { + // json.Unmarshal only overwrites fields whose keys appear in the body + // — top-level AND nested. Callers can PATCH one inner field (e.g. + // `{"metadata_store":{"database_name":"x"}}`) without wiping siblings. + if err := json.Unmarshal(body, w); err != nil { + return warehouseBadRequestError{err} + } + cfgView := &configstore.ManagedWarehouseConfig{ + OrgID: orgID, + WarehouseDatabase: w.WarehouseDatabase, + MetadataStore: w.MetadataStore, + S3: w.S3, + WorkerIdentity: w.WorkerIdentity, + WarehouseDatabaseCredentials: w.WarehouseDatabaseCredentials, + MetadataStoreCredentials: w.MetadataStoreCredentials, + S3Credentials: w.S3Credentials, + RuntimeConfig: w.RuntimeConfig, + } + if err := configstore.ValidateManagedWarehouseSecretRefs(orgID, "", cfgView); err != nil { + return warehouseBadRequestError{err} + } + return nil + }) if err != nil { + var badReq warehouseBadRequestError + if errors.As(err, &badReq) { + c.JSON(http.StatusBadRequest, gin.H{"error": badReq.Error()}) + return + } c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } diff --git a/controlplane/admin/api_postgres_test.go b/controlplane/admin/api_postgres_test.go index d8d4380..c61b256 100644 --- a/controlplane/admin/api_postgres_test.go +++ b/controlplane/admin/api_postgres_test.go @@ -154,3 +154,50 @@ func TestUpsertManagedWarehousePreservesCreatedAt(t *testing.T) { t.Fatalf("expected updated metadata db name, got %q", stored.MetadataStore.DatabaseName) } } + +func TestMutateManagedWarehouseSerializesConcurrentWriters(t *testing.T) { + store := newPostgresConfigStore(t) + apiStore := newGormAPIStore(store).(*gormAPIStore) + + if err := store.DB().Create(&configstore.Org{Name: "analytics"}).Error; err != nil { + t.Fatalf("create org: %v", err) + } + if err := store.DB().Create(&configstore.ManagedWarehouse{ + OrgID: "analytics", + State: configstore.ManagedWarehouseStatePending, + }).Error; err != nil { + t.Fatalf("seed warehouse: %v", err) + } + + // Fan out N concurrent Mutate calls, each incrementing a counter encoded + // in StatusMessage. With plain Get+Upsert these would race and drop some + // increments. With SELECT ... FOR UPDATE inside a transaction, every + // writer observes the prior one's commit and the counter lands at N. + const writers = 8 + errs := make(chan error, writers) + for i := 0; i < writers; i++ { + go func() { + _, _, err := apiStore.MutateManagedWarehouse("analytics", func(w *configstore.ManagedWarehouse) error { + var n int + _, _ = fmt.Sscanf(w.StatusMessage, "n=%d", &n) + w.StatusMessage = fmt.Sprintf("n=%d", n+1) + return nil + }) + errs <- err + }() + } + for i := 0; i < writers; i++ { + if err := <-errs; err != nil { + t.Fatalf("mutate %d: %v", i, err) + } + } + + final, err := apiStore.GetManagedWarehouse("analytics") + if err != nil { + t.Fatalf("get warehouse: %v", err) + } + want := fmt.Sprintf("n=%d", writers) + if final.StatusMessage != want { + t.Fatalf("status_message = %q, want %q (lost updates under concurrency)", final.StatusMessage, want) + } +} diff --git a/controlplane/admin/api_test.go b/controlplane/admin/api_test.go index ade4f61..ff54030 100644 --- a/controlplane/admin/api_test.go +++ b/controlplane/admin/api_test.go @@ -151,6 +151,25 @@ func (s *fakeAPIStore) UpsertManagedWarehouse(orgID string, warehouse *configsto return copyWarehouse(clone), true, nil } +func (s *fakeAPIStore) MutateManagedWarehouse(orgID string, mutate func(*configstore.ManagedWarehouse) error) (*configstore.ManagedWarehouse, bool, error) { + org, ok := s.orgs[orgID] + if !ok { + return nil, false, nil + } + var warehouse configstore.ManagedWarehouse + if existing, ok := s.warehouses[orgID]; ok { + warehouse = *existing + } + if err := mutate(&warehouse); err != nil { + return nil, true, err + } + clone := copyWarehouse(&warehouse) + clone.OrgID = orgID + s.warehouses[orgID] = clone + org.Warehouse = copyWarehouse(clone) + return copyWarehouse(clone), true, nil +} + func (s *fakeAPIStore) GetGlobalConfig() (configstore.GlobalConfig, error) { return configstore.GlobalConfig{}, nil } diff --git a/controlplane/org_activation_test.go b/controlplane/org_activation_test.go index 3a19bdf..9b8c5c7 100644 --- a/controlplane/org_activation_test.go +++ b/controlplane/org_activation_test.go @@ -68,6 +68,20 @@ func TestDucklingMetadataStoreAddressRejectsInvalidPgBouncerEndpoint(t *testing. } } +func TestDucklingMetadataStoreAddressRejectsEmptyEndpoints(t *testing.T) { + // Status with neither endpoint set — surface a clear error up front rather + // than letting an empty host propagate into a DSN and fail opaquely later. + status := &provisioner.DucklingStatus{} + + _, _, _, err := ducklingMetadataStoreAddress(status, "analytics") + if err == nil { + t.Fatal("expected error when both endpoints are empty") + } + if !strings.Contains(err.Error(), "no metadata store endpoint") { + t.Fatalf("expected missing-endpoint error, got %v", err) + } +} + func TestSharedWorkerActivatorBuildsActivationRequestFromManagedWarehouse(t *testing.T) { clientset := fake.NewSimpleClientset( &corev1.Secret{ diff --git a/controlplane/provisioner/controller.go b/controlplane/provisioner/controller.go index 4d7aa53..42946bb 100644 --- a/controlplane/provisioner/controller.go +++ b/controlplane/provisioner/controller.go @@ -70,9 +70,12 @@ func (c *Controller) Run(ctx context.Context) { } // actionableStates are the warehouse states the controller acts on. +// Ready is included so we can drift-correct user-mutable CR fields (today +// just pgbouncer.enabled) without waiting for the Duckling to be recreated. var actionableStates = []configstore.ManagedWarehouseProvisioningState{ configstore.ManagedWarehouseStatePending, configstore.ManagedWarehouseStateProvisioning, + configstore.ManagedWarehouseStateReady, configstore.ManagedWarehouseStateDeleting, } @@ -92,6 +95,8 @@ func (c *Controller) reconcile(ctx context.Context) { c.reconcilePending(ctx, &w) case configstore.ManagedWarehouseStateProvisioning: c.reconcileProvisioning(ctx, &w) + case configstore.ManagedWarehouseStateReady: + c.reconcileReady(ctx, &w) case configstore.ManagedWarehouseStateDeleting: c.reconcileDeleting(ctx, &w) } @@ -230,6 +235,39 @@ func (c *Controller) reconcileProvisioning(ctx context.Context, w *configstore.M } } +// reconcileReady handles drift correction for Ready warehouses. Today the +// only post-create-mutable spec field is metadataStore.pgbouncer.enabled; +// if an operator flips it in the config store (admin API), we patch the CR +// so the Crossplane composition provisions / tears down the pooler. +// +// Scope is intentionally narrow: we do NOT reconcile ACU, image, or other +// spec fields. Those aren't user-mutable via the admin API today, and +// aggressive drift correction would conflict with manual kubectl patches. +func (c *Controller) reconcileReady(ctx context.Context, w *configstore.ManagedWarehouse) { + log := slog.With("org", w.OrgID, "phase", "ready") + + currentEnabled, err := c.duckling.GetPgBouncerEnabled(ctx, w.OrgID) + if err != nil { + if apierrors.IsNotFound(err) { + // CR is gone but the warehouse is still marked Ready. Don't try + // to patch; leave the state machine to catch up. + log.Warn("Duckling CR not found for Ready warehouse — skipping drift check.") + return + } + log.Warn("Failed to read Duckling CR for drift check.", "error", err) + return + } + if currentEnabled == w.PgBouncer.Enabled { + return + } + + log.Info("PgBouncer drift detected, patching Duckling CR.", + "desired", w.PgBouncer.Enabled, "current", currentEnabled) + if err := c.duckling.SetPgBouncerEnabled(ctx, w.OrgID, w.PgBouncer.Enabled); err != nil { + log.Warn("Failed to patch Duckling CR pgbouncer.enabled.", "error", err) + } +} + func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.ManagedWarehouse) { log := slog.With("org", w.OrgID, "phase", "deleting") diff --git a/controlplane/provisioner/controller_test.go b/controlplane/provisioner/controller_test.go index e06db4b..12da279 100644 --- a/controlplane/provisioner/controller_test.go +++ b/controlplane/provisioner/controller_test.go @@ -201,6 +201,146 @@ func TestReconcilePendingEmitsPgBouncerBlock(t *testing.T) { } } +func TestReconcileReadyPatchesCRWhenPgBouncerFlippedOn(t *testing.T) { + dc, fakeK8s := newFakeDucklingClient() + fs := newFakeStore() + fs.warehouses["org-flip"] = &configstore.ManagedWarehouse{ + OrgID: "org-flip", + State: configstore.ManagedWarehouseStateReady, + PgBouncer: configstore.ManagedWarehousePgBouncer{Enabled: true}, + } + // Seed a CR whose spec still reflects the pre-flip world (no pgbouncer block). + cr := &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "k8s.posthog.com/v1alpha1", + "kind": "Duckling", + "metadata": map[string]interface{}{ + "name": ducklingName("org-flip"), + "namespace": ducklingNamespace, + }, + "spec": map[string]interface{}{ + "metadataStore": map[string]interface{}{ + "type": "aurora", + "aurora": map[string]interface{}{ + "minACU": 0.5, + "maxACU": 2.0, + }, + }, + }, + }} + if _, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Create(context.Background(), cr, metav1.CreateOptions{}); err != nil { + t.Fatalf("seed CR: %v", err) + } + + ctrl := NewControllerWithClient(fs, dc, time.Second) + ctrl.reconcile(context.Background()) + + got, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Get(context.Background(), ducklingName("org-flip"), metav1.GetOptions{}) + if err != nil { + t.Fatalf("re-fetch CR: %v", err) + } + spec := got.Object["spec"].(map[string]interface{}) + ms := spec["metadataStore"].(map[string]interface{}) + pgb, ok := ms["pgbouncer"].(map[string]interface{}) + if !ok { + t.Fatalf("expected pgbouncer block after drift patch, got %v", ms) + } + if pgb["enabled"] != true { + t.Fatalf("expected pgbouncer.enabled=true, got %v", pgb["enabled"]) + } + // Merge-patch must not wipe sibling metadataStore fields. + if ms["type"] != "aurora" { + t.Fatalf("expected metadataStore.type preserved, got %v", ms["type"]) + } + if _, ok := ms["aurora"].(map[string]interface{}); !ok { + t.Fatalf("expected aurora block preserved, got %v", ms) + } +} + +func TestReconcileReadyPatchesCRWhenPgBouncerFlippedOff(t *testing.T) { + dc, fakeK8s := newFakeDucklingClient() + fs := newFakeStore() + fs.warehouses["org-off"] = &configstore.ManagedWarehouse{ + OrgID: "org-off", + State: configstore.ManagedWarehouseStateReady, + PgBouncer: configstore.ManagedWarehousePgBouncer{Enabled: false}, + } + // Seed a CR that currently has pgbouncer enabled — expect it to be + // patched back to false to match the config store. + cr := &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "k8s.posthog.com/v1alpha1", + "kind": "Duckling", + "metadata": map[string]interface{}{ + "name": ducklingName("org-off"), + "namespace": ducklingNamespace, + }, + "spec": map[string]interface{}{ + "metadataStore": map[string]interface{}{ + "type": "aurora", + "pgbouncer": map[string]interface{}{ + "enabled": true, + }, + }, + }, + }} + if _, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Create(context.Background(), cr, metav1.CreateOptions{}); err != nil { + t.Fatalf("seed CR: %v", err) + } + + ctrl := NewControllerWithClient(fs, dc, time.Second) + ctrl.reconcile(context.Background()) + + got, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Get(context.Background(), ducklingName("org-off"), metav1.GetOptions{}) + if err != nil { + t.Fatalf("re-fetch CR: %v", err) + } + spec := got.Object["spec"].(map[string]interface{}) + ms := spec["metadataStore"].(map[string]interface{}) + pgb := ms["pgbouncer"].(map[string]interface{}) + if pgb["enabled"] != false { + t.Fatalf("expected pgbouncer.enabled=false after drift patch, got %v", pgb["enabled"]) + } +} + +func TestReconcileReadyNoDriftDoesNotPatch(t *testing.T) { + dc, fakeK8s := newFakeDucklingClient() + fs := newFakeStore() + fs.warehouses["org-sync"] = &configstore.ManagedWarehouse{ + OrgID: "org-sync", + State: configstore.ManagedWarehouseStateReady, + PgBouncer: configstore.ManagedWarehousePgBouncer{Enabled: true}, + } + cr := &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "k8s.posthog.com/v1alpha1", + "kind": "Duckling", + "metadata": map[string]interface{}{ + "name": ducklingName("org-sync"), + "namespace": ducklingNamespace, + "resourceVersion": "42", + }, + "spec": map[string]interface{}{ + "metadataStore": map[string]interface{}{ + "type": "aurora", + "pgbouncer": map[string]interface{}{"enabled": true}, + }, + }, + }} + if _, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Create(context.Background(), cr, metav1.CreateOptions{}); err != nil { + t.Fatalf("seed CR: %v", err) + } + seededRV := cr.GetResourceVersion() + + ctrl := NewControllerWithClient(fs, dc, time.Second) + ctrl.reconcile(context.Background()) + + got, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Get(context.Background(), ducklingName("org-sync"), metav1.GetOptions{}) + if err != nil { + t.Fatalf("re-fetch CR: %v", err) + } + if got.GetResourceVersion() != seededRV { + t.Fatalf("expected no patch when in sync, resourceVersion went %q -> %q", seededRV, got.GetResourceVersion()) + } +} + func TestReconcileProvisioningAllReady(t *testing.T) { dc, fakeK8s := newFakeDucklingClient() fs := newFakeStore() diff --git a/controlplane/provisioner/k8s_client.go b/controlplane/provisioner/k8s_client.go index f037b99..d33b341 100644 --- a/controlplane/provisioner/k8s_client.go +++ b/controlplane/provisioner/k8s_client.go @@ -4,12 +4,14 @@ package provisioner import ( "context" + "encoding/json" "fmt" "strings" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" ) @@ -140,6 +142,59 @@ func (d *DucklingClient) Delete(ctx context.Context, orgID string) error { return nil } +// GetPgBouncerEnabled reads spec.metadataStore.pgbouncer.enabled from the +// Duckling CR. Missing blocks (composition at an older schema, CR never +// carried a pgbouncer section) are reported as false — same as an explicit +// opt-out — so the caller just needs to compare against the desired value. +func (d *DucklingClient) GetPgBouncerEnabled(ctx context.Context, orgID string) (bool, error) { + name := ducklingName(orgID) + cr, err := d.client.Resource(ducklingGVR).Namespace(ducklingNamespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("get duckling CR %q: %w", name, err) + } + spec, ok := cr.Object["spec"].(map[string]interface{}) + if !ok { + return false, nil + } + ms, ok := spec["metadataStore"].(map[string]interface{}) + if !ok { + return false, nil + } + pgb, ok := ms["pgbouncer"].(map[string]interface{}) + if !ok { + return false, nil + } + enabled, _ := pgb["enabled"].(bool) + return enabled, nil +} + +// SetPgBouncerEnabled patches spec.metadataStore.pgbouncer.enabled on the +// Duckling CR for the given org. Uses a JSON merge patch (RFC 7396) so the +// call is idempotent and only touches the pgbouncer block — sibling fields +// under metadataStore (aurora, type) are left untouched. +func (d *DucklingClient) SetPgBouncerEnabled(ctx context.Context, orgID string, enabled bool) error { + name := ducklingName(orgID) + patch, err := json.Marshal(map[string]interface{}{ + "spec": map[string]interface{}{ + "metadataStore": map[string]interface{}{ + "pgbouncer": map[string]interface{}{ + "enabled": enabled, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("marshal pgbouncer patch for %q: %w", name, err) + } + _, err = d.client.Resource(ducklingGVR).Namespace(ducklingNamespace).Patch( + ctx, name, types.MergePatchType, patch, metav1.PatchOptions{}, + ) + if err != nil { + return fmt.Errorf("patch duckling CR %q pgbouncer: %w", name, err) + } + return nil +} + func parseDucklingStatus(cr *unstructured.Unstructured) (*DucklingStatus, error) { status, ok := cr.Object["status"].(map[string]interface{}) if !ok { diff --git a/controlplane/shared_worker_activator.go b/controlplane/shared_worker_activator.go index 707d33d..8215c10 100644 --- a/controlplane/shared_worker_activator.go +++ b/controlplane/shared_worker_activator.go @@ -235,26 +235,28 @@ func (a *SharedWorkerActivator) buildDuckLakeConfigFromDuckling(ctx context.Cont } func ducklingMetadataStoreAddress(status *provisioner.DucklingStatus, orgID string) (host string, port int, viaPgBouncer bool, err error) { - host = status.MetadataStore.Endpoint - port = 5432 // Aurora always uses 5432 - // Prefer the PgBouncer endpoint when the Duckling exposes one — the // Crossplane composition sets status.metadataStore.pgbouncerEndpoint // (as ":") when a per-Duckling pooler is provisioned. - pgb := status.MetadataStore.PgBouncerEndpoint - if pgb == "" { - return host, port, false, nil + if pgb := status.MetadataStore.PgBouncerEndpoint; pgb != "" { + h, p, err := net.SplitHostPort(pgb) + if err != nil { + return "", 0, false, fmt.Errorf("parse pgbouncerEndpoint %q for org %q: %w", pgb, orgID, err) + } + portNum, err := strconv.Atoi(p) + if err != nil { + return "", 0, false, fmt.Errorf("parse pgbouncerEndpoint port %q for org %q: %w", p, orgID, err) + } + return h, portNum, true, nil } - h, p, err := net.SplitHostPort(pgb) - if err != nil { - return "", 0, false, fmt.Errorf("parse pgbouncerEndpoint %q for org %q: %w", pgb, orgID, err) - } - portNum, err := strconv.Atoi(p) - if err != nil { - return "", 0, false, fmt.Errorf("parse pgbouncerEndpoint port %q for org %q: %w", p, orgID, err) + // No pooler — fall back to the direct Aurora endpoint. Guard against an + // empty endpoint here rather than letting a malformed DSN surface later + // as an opaque connect error. + if status.MetadataStore.Endpoint == "" { + return "", 0, false, fmt.Errorf("duckling CR %q has no metadata store endpoint or pgbouncerEndpoint", orgID) } - return h, portNum, true, nil + return status.MetadataStore.Endpoint, 5432, false, nil } // buildDuckLakeConfigFromConfigStore reads infrastructure details from the config store