diff --git a/controlplane/org_activation_test.go b/controlplane/org_activation_test.go index c7c4a04..836beda 100644 --- a/controlplane/org_activation_test.go +++ b/controlplane/org_activation_test.go @@ -240,11 +240,12 @@ func TestSharedWorkerActivatorDucklingCRRequiresSTSBroker(t *testing.T) { } return &provisioner.DucklingStatus{ MetadataStore: struct { - Type string - Endpoint string - Password string - User string - Database string + Type string + Endpoint string + PgBouncerEndpoint string + Password string + User string + Database string }{ Endpoint: "test-org.cluster.rds.amazonaws.com", Password: "duckling-password-123", @@ -295,11 +296,12 @@ func TestSharedWorkerActivatorPrefersSecretRefOverDucklingCR(t *testing.T) { ducklingCalled = true return &provisioner.DucklingStatus{ MetadataStore: struct { - Type string - Endpoint string - Password string - User string - Database string + Type string + Endpoint string + PgBouncerEndpoint string + Password string + User string + Database string }{Password: "cr-password"}, }, nil }, diff --git a/controlplane/provisioner/controller_test.go b/controlplane/provisioner/controller_test.go index accf833..4515746 100644 --- a/controlplane/provisioner/controller_test.go +++ b/controlplane/provisioner/controller_test.go @@ -331,6 +331,34 @@ func TestParseDucklingStatusSyncedFalse(t *testing.T) { } } +func TestParseDucklingStatusPgBouncerEndpoint(t *testing.T) { + cr := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "status": map[string]interface{}{ + "metadataStore": map[string]interface{}{ + "type": "aurora", + "endpoint": "posthog-duckling-foo.cluster-xyz.us-east-1.rds.amazonaws.com", + "pgbouncerEndpoint": "pgbouncer-duckling-foo.ducklings.svc.cluster.local:6543", + "user": "postgres", + "database": "postgres", + "password": "s3cret", + }, + }, + }, + } + + status, err := parseDucklingStatus(cr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := status.MetadataStore.PgBouncerEndpoint; got != "pgbouncer-duckling-foo.ducklings.svc.cluster.local:6543" { + t.Fatalf("PgBouncerEndpoint = %q, want pooler DNS", got) + } + if got := status.MetadataStore.Endpoint; got != "posthog-duckling-foo.cluster-xyz.us-east-1.rds.amazonaws.com" { + t.Fatalf("Endpoint = %q, want Aurora DNS", got) + } +} + func TestParseDucklingStatusEmpty(t *testing.T) { cr := &unstructured.Unstructured{ Object: map[string]interface{}{}, diff --git a/controlplane/provisioner/k8s_client.go b/controlplane/provisioner/k8s_client.go index 662de34..bebac91 100644 --- a/controlplane/provisioner/k8s_client.go +++ b/controlplane/provisioner/k8s_client.go @@ -27,11 +27,12 @@ const ducklingNamespace = "ducklings" // but not K8s workloads — those are managed by the duckgres Helm chart. type DucklingStatus struct { MetadataStore struct { - Type string - Endpoint string - Password string - User string - Database string + Type string + Endpoint string + PgBouncerEndpoint string + Password string + User string + Database string } DataStore struct { Type string @@ -140,6 +141,7 @@ func parseDucklingStatus(cr *unstructured.Unstructured) (*DucklingStatus, error) if md, ok := status["metadataStore"].(map[string]interface{}); ok { ds.MetadataStore.Type = getNestedString(md, "type") ds.MetadataStore.Endpoint = getNestedString(md, "endpoint") + ds.MetadataStore.PgBouncerEndpoint = getNestedString(md, "pgbouncerEndpoint") ds.MetadataStore.Password = getNestedString(md, "password") ds.MetadataStore.User = getNestedString(md, "user") ds.MetadataStore.Database = getNestedString(md, "database") diff --git a/controlplane/shared_worker_activator.go b/controlplane/shared_worker_activator.go index 176952c..c654644 100644 --- a/controlplane/shared_worker_activator.go +++ b/controlplane/shared_worker_activator.go @@ -7,8 +7,10 @@ import ( "encoding/json" "fmt" "log/slog" + "net" "os" "slices" + "strconv" "strings" "sync" @@ -193,18 +195,40 @@ func (a *SharedWorkerActivator) buildDuckLakeConfigFromDuckling(ctx context.Cont return server.DuckLakeConfig{}, fmt.Errorf("duckling CR %q has no data store bucket", orgID) } + // Prefer the PgBouncer endpoint when the Duckling exposes one — the + // Crossplane composition sets status.metadataStore.pgbouncerEndpoint + // (as ":") when a per-Duckling pooler is provisioned. + // Otherwise connect directly to the metadata store on its default port. + host := status.MetadataStore.Endpoint + port := 5432 // Aurora always uses 5432 + viaPgBouncer := false + if pgb := status.MetadataStore.PgBouncerEndpoint; pgb != "" { + h, p, err := net.SplitHostPort(pgb) + if err != nil { + return server.DuckLakeConfig{}, fmt.Errorf("parse pgbouncerEndpoint %q for org %q: %w", pgb, orgID, err) + } + portNum, err := strconv.Atoi(p) + if err != nil { + return server.DuckLakeConfig{}, fmt.Errorf("parse pgbouncerEndpoint port %q for org %q: %w", p, orgID, err) + } + host = h + port = portNum + viaPgBouncer = true + } + dl := server.DuckLakeConfig{ MetadataStore: buildDuckLakeMetadataStoreDSN( - status.MetadataStore.Endpoint, - 5432, // Aurora always uses 5432 + host, + port, status.MetadataStore.User, status.MetadataStore.Password, status.MetadataStore.Database, ), - ObjectStore: fmt.Sprintf("s3://%s/", status.DataStore.BucketName), - S3Region: status.DataStore.S3Region, - S3UseSSL: true, - S3URLStyle: "vhost", + ViaPgBouncer: viaPgBouncer, + ObjectStore: fmt.Sprintf("s3://%s/", status.DataStore.BucketName), + S3Region: status.DataStore.S3Region, + S3UseSSL: true, + S3URLStyle: "vhost", } // Broker S3 credentials via STS AssumeRole diff --git a/server/server.go b/server/server.go index d062869..356ca60 100644 --- a/server/server.go +++ b/server/server.go @@ -337,6 +337,14 @@ type DuckLakeConfig struct { // re-running the version check. This avoids redundant backups and // long-running checks in worker processes. Migrate bool `json:"migrate,omitempty" yaml:"-"` + + // ViaPgBouncer is set by the control plane when the DuckLake metadata + // connection is routed through a network-level pooler (e.g. PgBouncer) + // rather than direct to Postgres. When true, the worker disables the + // postgres_scanner in-process pool via `SET GLOBAL pg_pool_max_connections = 0`. + // See duckdb/ducklake#1031: behind a network pooler, client-side pooling + // is redundant and prevents the pooler from reclaiming idle connections. + ViaPgBouncer bool `json:"via_pgbouncer,omitempty" yaml:"-"` } // fileDBEntry tracks a shared *sql.DB for file-persistence mode. @@ -1234,6 +1242,9 @@ func buildDuckLakePreAttachStatements(dlCfg DuckLakeConfig) []string { if duckLakeDisableMetadataThreadLocalCacheEnabled(dlCfg) { statements = append(statements, "SET GLOBAL pg_pool_enable_thread_local_cache = false") } + if dlCfg.ViaPgBouncer { + statements = append(statements, "SET GLOBAL pg_pool_max_connections = 0") + } return statements } diff --git a/server/server_test.go b/server/server_test.go index 357a849..efcd2fa 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -185,6 +185,24 @@ func TestBuildDuckLakePreAttachStatements(t *testing.T) { }, want: nil, }, + { + name: "via pgbouncer disables in-process pool", + cfg: DuckLakeConfig{ + DisableMetadataThreadLocalCache: boolPtr(false), + ViaPgBouncer: true, + }, + want: []string{"SET GLOBAL pg_pool_max_connections = 0"}, + }, + { + name: "via pgbouncer with tls cache disabled emits both", + cfg: DuckLakeConfig{ + ViaPgBouncer: true, + }, + want: []string{ + "SET GLOBAL pg_pool_enable_thread_local_cache = false", + "SET GLOBAL pg_pool_max_connections = 0", + }, + }, } for _, tt := range tests {