Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 105 additions & 36 deletions controlplane/admin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ type apiStore interface {

GetManagedWarehouse(orgID string) (*configstore.ManagedWarehouse, error)
UpsertManagedWarehouse(orgID string, warehouse *configstore.ManagedWarehouse) (*configstore.ManagedWarehouse, bool, error)
// MutateManagedWarehouse loads the existing warehouse (or a zero value if
// none), calls mutate to apply changes, and persists the result — all
// inside a single transaction with a row-level lock on the warehouse row.
// Closes the read-modify-write race that plain Get+Upsert is exposed to
// when concurrent PUTs target the same org. Returns (nil, false, nil) if
// the org doesn't exist.
MutateManagedWarehouse(orgID string, mutate func(*configstore.ManagedWarehouse) error) (*configstore.ManagedWarehouse, bool, error)

GetGlobalConfig() (configstore.GlobalConfig, error)
SaveGlobalConfig(cfg *configstore.GlobalConfig) error
Expand Down Expand Up @@ -296,6 +303,57 @@ func (s *gormAPIStore) UpsertManagedWarehouse(orgID string, warehouse *configsto
return stored, true, nil
}

func (s *gormAPIStore) MutateManagedWarehouse(orgID string, mutate func(*configstore.ManagedWarehouse) error) (*configstore.ManagedWarehouse, bool, error) {
var (
stored *configstore.ManagedWarehouse
orgExists bool
)
err := s.db().Transaction(func(tx *gorm.DB) error {
var count int64
if err := tx.Model(&configstore.Org{}).Where("name = ?", orgID).Count(&count).Error; err != nil {
return err
}
if count == 0 {
return nil
}
orgExists = true

// SELECT ... FOR UPDATE: blocks concurrent mutators on the same row
// until this transaction commits. A missing row is not an error —
// PUT on a brand-new warehouse lands in the same path.
var warehouse configstore.ManagedWarehouse
err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
First(&warehouse, "org_id = ?", orgID).Error
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}

if err := mutate(&warehouse); err != nil {
return err
}

warehouse.OrgID = orgID
warehouse.UpdatedAt = time.Now().UTC()
if err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "org_id"}},
DoUpdates: clause.AssignmentColumns(managedWarehouseUpsertColumns()),
}).Create(&warehouse).Error; err != nil {
return err
}

var reloaded configstore.ManagedWarehouse
if err := tx.First(&reloaded, "org_id = ?", orgID).Error; err != nil {
return err
}
stored = &reloaded
return nil
})
if err != nil {
return nil, orgExists, err
}
return stored, orgExists, nil
}

func managedWarehouseUpsertColumns() []string {
return []string{
"image",
Expand Down Expand Up @@ -407,6 +465,13 @@ type apiHandler struct {
info OrgStackInfo
}

// managedWarehouseRequest is the whitelist of fields a caller may set on the
// PUT endpoint. It's only used for strict decode (DisallowUnknownFields) — the
// actual merge is performed by json.Unmarshal directly onto a ManagedWarehouse
// (see putManagedWarehouse). For that to work, every `json:` tag here must
// match the corresponding `json:` tag on configstore.ManagedWarehouse. If you
// add a field here without a matching tag on ManagedWarehouse, strict decode
// will accept it and the merge will silently drop it.
type managedWarehouseRequest struct {
WarehouseDatabase configstore.ManagedWarehouseDatabase `json:"warehouse_database"`
MetadataStore configstore.ManagedWarehouseMetadataStore `json:"metadata_store"`
Expand Down Expand Up @@ -544,6 +609,14 @@ func (h *apiHandler) getManagedWarehouse(c *gin.Context) {
c.JSON(http.StatusOK, warehouse)
}

// warehouseBadRequestError marks an error from the mutate closure as caused by
// bad caller input rather than a store-level failure. The handler maps it to
// 400, not 500.
type warehouseBadRequestError struct{ err error }

func (e warehouseBadRequestError) Error() string { return e.err.Error() }
func (e warehouseBadRequestError) Unwrap() error { return e.err }

func (h *apiHandler) putManagedWarehouse(c *gin.Context) {
orgID := c.Param("id")

Expand All @@ -562,43 +635,39 @@ func (h *apiHandler) putManagedWarehouse(c *gin.Context) {
return
}

// Load any existing row, then unmarshal the body onto it. json.Unmarshal
// only overwrites fields whose JSON keys appear in the body — both at the
// top level AND within each nested struct. Callers can therefore PATCH a
// single field (e.g. `{"metadata_store":{"database_name":"x"}}`) without
// wiping sibling fields. Note: concurrent PUTs on the same org can still
// interleave (read-modify-write across two store calls); the admin API is
// low-frequency enough that we accept this for now.
existing, err := h.store.GetManagedWarehouse(orgID)
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
var warehouse configstore.ManagedWarehouse
if err == nil {
warehouse = *existing
}
if err := json.Unmarshal(body, &warehouse); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
cfgView := &configstore.ManagedWarehouseConfig{
OrgID: orgID,
WarehouseDatabase: warehouse.WarehouseDatabase,
MetadataStore: warehouse.MetadataStore,
S3: warehouse.S3,
WorkerIdentity: warehouse.WorkerIdentity,
WarehouseDatabaseCredentials: warehouse.WarehouseDatabaseCredentials,
MetadataStoreCredentials: warehouse.MetadataStoreCredentials,
S3Credentials: warehouse.S3Credentials,
RuntimeConfig: warehouse.RuntimeConfig,
}
if err := configstore.ValidateManagedWarehouseSecretRefs(orgID, "", cfgView); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
stored, ok, err := h.store.UpsertManagedWarehouse(orgID, &warehouse)
// MutateManagedWarehouse locks the row inside a transaction, runs the
// closure, and commits — closing the race where two concurrent PUTs would
// otherwise Get + modify different snapshots and silently clobber each
// other. The closure does the merge and validation on the locked row.
stored, ok, err := h.store.MutateManagedWarehouse(orgID, func(w *configstore.ManagedWarehouse) error {
// json.Unmarshal only overwrites fields whose keys appear in the body
// — top-level AND nested. Callers can PATCH one inner field (e.g.
// `{"metadata_store":{"database_name":"x"}}`) without wiping siblings.
if err := json.Unmarshal(body, w); err != nil {
return warehouseBadRequestError{err}
}
cfgView := &configstore.ManagedWarehouseConfig{
OrgID: orgID,
WarehouseDatabase: w.WarehouseDatabase,
MetadataStore: w.MetadataStore,
S3: w.S3,
WorkerIdentity: w.WorkerIdentity,
WarehouseDatabaseCredentials: w.WarehouseDatabaseCredentials,
MetadataStoreCredentials: w.MetadataStoreCredentials,
S3Credentials: w.S3Credentials,
RuntimeConfig: w.RuntimeConfig,
}
if err := configstore.ValidateManagedWarehouseSecretRefs(orgID, "", cfgView); err != nil {
return warehouseBadRequestError{err}
}
return nil
})
if err != nil {
var badReq warehouseBadRequestError
if errors.As(err, &badReq) {
c.JSON(http.StatusBadRequest, gin.H{"error": badReq.Error()})
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
Expand Down
47 changes: 47 additions & 0 deletions controlplane/admin/api_postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,50 @@ func TestUpsertManagedWarehousePreservesCreatedAt(t *testing.T) {
t.Fatalf("expected updated metadata db name, got %q", stored.MetadataStore.DatabaseName)
}
}

func TestMutateManagedWarehouseSerializesConcurrentWriters(t *testing.T) {
store := newPostgresConfigStore(t)
apiStore := newGormAPIStore(store).(*gormAPIStore)

if err := store.DB().Create(&configstore.Org{Name: "analytics"}).Error; err != nil {
t.Fatalf("create org: %v", err)
}
if err := store.DB().Create(&configstore.ManagedWarehouse{
OrgID: "analytics",
State: configstore.ManagedWarehouseStatePending,
}).Error; err != nil {
t.Fatalf("seed warehouse: %v", err)
}

// Fan out N concurrent Mutate calls, each incrementing a counter encoded
// in StatusMessage. With plain Get+Upsert these would race and drop some
// increments. With SELECT ... FOR UPDATE inside a transaction, every
// writer observes the prior one's commit and the counter lands at N.
const writers = 8
errs := make(chan error, writers)
for i := 0; i < writers; i++ {
go func() {
_, _, err := apiStore.MutateManagedWarehouse("analytics", func(w *configstore.ManagedWarehouse) error {
var n int
_, _ = fmt.Sscanf(w.StatusMessage, "n=%d", &n)
w.StatusMessage = fmt.Sprintf("n=%d", n+1)
return nil
})
errs <- err
}()
}
for i := 0; i < writers; i++ {
if err := <-errs; err != nil {
t.Fatalf("mutate %d: %v", i, err)
}
}

final, err := apiStore.GetManagedWarehouse("analytics")
if err != nil {
t.Fatalf("get warehouse: %v", err)
}
want := fmt.Sprintf("n=%d", writers)
if final.StatusMessage != want {
t.Fatalf("status_message = %q, want %q (lost updates under concurrency)", final.StatusMessage, want)
}
}
19 changes: 19 additions & 0 deletions controlplane/admin/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,25 @@ func (s *fakeAPIStore) UpsertManagedWarehouse(orgID string, warehouse *configsto
return copyWarehouse(clone), true, nil
}

func (s *fakeAPIStore) MutateManagedWarehouse(orgID string, mutate func(*configstore.ManagedWarehouse) error) (*configstore.ManagedWarehouse, bool, error) {
org, ok := s.orgs[orgID]
if !ok {
return nil, false, nil
}
var warehouse configstore.ManagedWarehouse
if existing, ok := s.warehouses[orgID]; ok {
warehouse = *existing
}
if err := mutate(&warehouse); err != nil {
return nil, true, err
}
clone := copyWarehouse(&warehouse)
clone.OrgID = orgID
s.warehouses[orgID] = clone
org.Warehouse = copyWarehouse(clone)
return copyWarehouse(clone), true, nil
}

func (s *fakeAPIStore) GetGlobalConfig() (configstore.GlobalConfig, error) {
return configstore.GlobalConfig{}, nil
}
Expand Down
14 changes: 14 additions & 0 deletions controlplane/org_activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ func TestDucklingMetadataStoreAddressRejectsInvalidPgBouncerEndpoint(t *testing.
}
}

func TestDucklingMetadataStoreAddressRejectsEmptyEndpoints(t *testing.T) {
// Status with neither endpoint set — surface a clear error up front rather
// than letting an empty host propagate into a DSN and fail opaquely later.
status := &provisioner.DucklingStatus{}

_, _, _, err := ducklingMetadataStoreAddress(status, "analytics")
if err == nil {
t.Fatal("expected error when both endpoints are empty")
}
if !strings.Contains(err.Error(), "no metadata store endpoint") {
t.Fatalf("expected missing-endpoint error, got %v", err)
}
}

func TestSharedWorkerActivatorBuildsActivationRequestFromManagedWarehouse(t *testing.T) {
clientset := fake.NewSimpleClientset(
&corev1.Secret{
Expand Down
38 changes: 38 additions & 0 deletions controlplane/provisioner/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,12 @@ func (c *Controller) Run(ctx context.Context) {
}

// actionableStates are the warehouse states the controller acts on.
// Ready is included so we can drift-correct user-mutable CR fields (today
// just pgbouncer.enabled) without waiting for the Duckling to be recreated.
var actionableStates = []configstore.ManagedWarehouseProvisioningState{
configstore.ManagedWarehouseStatePending,
configstore.ManagedWarehouseStateProvisioning,
configstore.ManagedWarehouseStateReady,
configstore.ManagedWarehouseStateDeleting,
}

Expand All @@ -92,6 +95,8 @@ func (c *Controller) reconcile(ctx context.Context) {
c.reconcilePending(ctx, &w)
case configstore.ManagedWarehouseStateProvisioning:
c.reconcileProvisioning(ctx, &w)
case configstore.ManagedWarehouseStateReady:
c.reconcileReady(ctx, &w)
case configstore.ManagedWarehouseStateDeleting:
c.reconcileDeleting(ctx, &w)
}
Expand Down Expand Up @@ -230,6 +235,39 @@ func (c *Controller) reconcileProvisioning(ctx context.Context, w *configstore.M
}
}

// reconcileReady handles drift correction for Ready warehouses. Today the
// only post-create-mutable spec field is metadataStore.pgbouncer.enabled;
// if an operator flips it in the config store (admin API), we patch the CR
// so the Crossplane composition provisions / tears down the pooler.
//
// Scope is intentionally narrow: we do NOT reconcile ACU, image, or other
// spec fields. Those aren't user-mutable via the admin API today, and
// aggressive drift correction would conflict with manual kubectl patches.
func (c *Controller) reconcileReady(ctx context.Context, w *configstore.ManagedWarehouse) {
log := slog.With("org", w.OrgID, "phase", "ready")

currentEnabled, err := c.duckling.GetPgBouncerEnabled(ctx, w.OrgID)
if err != nil {
if apierrors.IsNotFound(err) {
// CR is gone but the warehouse is still marked Ready. Don't try
// to patch; leave the state machine to catch up.
log.Warn("Duckling CR not found for Ready warehouse — skipping drift check.")
return
}
log.Warn("Failed to read Duckling CR for drift check.", "error", err)
return
}
if currentEnabled == w.PgBouncer.Enabled {
return
}

log.Info("PgBouncer drift detected, patching Duckling CR.",
"desired", w.PgBouncer.Enabled, "current", currentEnabled)
if err := c.duckling.SetPgBouncerEnabled(ctx, w.OrgID, w.PgBouncer.Enabled); err != nil {
log.Warn("Failed to patch Duckling CR pgbouncer.enabled.", "error", err)
}
}

func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.ManagedWarehouse) {
log := slog.With("org", w.OrgID, "phase", "deleting")

Expand Down
Loading
Loading