From b997839eb8d7476d35e328d60cd808c957e35cb6 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Wed, 22 Apr 2026 17:30:25 -0700 Subject: [PATCH] feat(ducklake): route metadata catalog through per-Duckling PgBouncer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire DuckLake's Postgres metadata connection through the per-Duckling PgBouncer pooler provisioned by the Crossplane composition in PostHog/charts#10400. Two coupled changes that only take effect when a Duckling opts in via spec.metadataStore.pgbouncer.enabled. 1. Control plane prefers status.metadataStore.pgbouncerEndpoint When the Duckling CR status exposes pgbouncerEndpoint (format ":", e.g. pgbouncer-duckling-foo.ducklings.svc:6543), the activator parses it with net.SplitHostPort and builds the worker DSN against the pooler instead of the Aurora endpoint. When absent, behaviour is unchanged — direct to the Postgres endpoint on 5432. - DucklingStatus.MetadataStore gains PgBouncerEndpoint (controlplane/provisioner/k8s_client.go); parseDucklingStatus reads the new status field. - buildDuckLakeConfigFromDuckling (controlplane/shared_worker_activator.go) parses the endpoint and sets DuckLakeConfig.ViaPgBouncer. 2. Worker disables the in-process pool when behind PgBouncer DuckLakeConfig gains ViaPgBouncer. When true, buildDuckLakePreAttachStatements emits `SET GLOBAL pg_pool_max_connections = 0` before ATTACH so the setting propagates into the __ducklake_metadata_* catalog (per duckdb/duckdb-postgres#445). Rationale, from duckdb/ducklake#1031: behind a network pooler, client-side pooling is redundant and counter-productive. The postgres_scanner thread-local cache pins one server conn per DuckDB worker thread and holds it indefinitely, starving the pool and preventing PgBouncer from reclaiming idle connections. Audited DuckLake and postgres_scanner before wiring this up: no SQL PREPARE, no named extended-protocol prepared statements (unnamed only — no max_prepared_statements needed), no advisory locks, no LISTEN/NOTIFY, no WITH HOLD cursors. 1:1 DuckDB↔Postgres transaction mapping. Safe in both session and transaction pool modes. Test coverage: - TestParseDucklingStatusPgBouncerEndpoint — Duckling CR status round trips the new field. - TestBuildDuckLakePreAttachStatements — two new cases cover the ViaPgBouncer=true emission, alone and combined with the existing thread-local-cache disable. Inline struct literals in org_activation_test.go updated to match the new DucklingStatus.MetadataStore shape (additive only). Co-Authored-By: Claude Opus 4.7 (1M context) --- controlplane/org_activation_test.go | 22 +++++++------ controlplane/provisioner/controller_test.go | 28 ++++++++++++++++ controlplane/provisioner/k8s_client.go | 12 ++++--- controlplane/shared_worker_activator.go | 36 +++++++++++++++++---- server/server.go | 11 +++++++ server/server_test.go | 18 +++++++++++ 6 files changed, 106 insertions(+), 21 deletions(-) diff --git a/controlplane/org_activation_test.go b/controlplane/org_activation_test.go index c7c4a04..836beda 100644 --- a/controlplane/org_activation_test.go +++ b/controlplane/org_activation_test.go @@ -240,11 +240,12 @@ func TestSharedWorkerActivatorDucklingCRRequiresSTSBroker(t *testing.T) { } return &provisioner.DucklingStatus{ MetadataStore: struct { - Type string - Endpoint string - Password string - User string - Database string + Type string + Endpoint string + PgBouncerEndpoint string + Password string + User string + Database string }{ Endpoint: "test-org.cluster.rds.amazonaws.com", Password: "duckling-password-123", @@ -295,11 +296,12 @@ func TestSharedWorkerActivatorPrefersSecretRefOverDucklingCR(t *testing.T) { ducklingCalled = true return &provisioner.DucklingStatus{ MetadataStore: struct { - Type string - Endpoint string - Password string - User string - Database string + Type string + Endpoint string + PgBouncerEndpoint string + Password string + User string + Database string }{Password: "cr-password"}, }, nil }, diff --git a/controlplane/provisioner/controller_test.go b/controlplane/provisioner/controller_test.go index accf833..4515746 100644 --- a/controlplane/provisioner/controller_test.go +++ b/controlplane/provisioner/controller_test.go @@ -331,6 +331,34 @@ func TestParseDucklingStatusSyncedFalse(t *testing.T) { } } +func TestParseDucklingStatusPgBouncerEndpoint(t *testing.T) { + cr := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "status": map[string]interface{}{ + "metadataStore": map[string]interface{}{ + "type": "aurora", + "endpoint": "posthog-duckling-foo.cluster-xyz.us-east-1.rds.amazonaws.com", + "pgbouncerEndpoint": "pgbouncer-duckling-foo.ducklings.svc.cluster.local:6543", + "user": "postgres", + "database": "postgres", + "password": "s3cret", + }, + }, + }, + } + + status, err := parseDucklingStatus(cr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := status.MetadataStore.PgBouncerEndpoint; got != "pgbouncer-duckling-foo.ducklings.svc.cluster.local:6543" { + t.Fatalf("PgBouncerEndpoint = %q, want pooler DNS", got) + } + if got := status.MetadataStore.Endpoint; got != "posthog-duckling-foo.cluster-xyz.us-east-1.rds.amazonaws.com" { + t.Fatalf("Endpoint = %q, want Aurora DNS", got) + } +} + func TestParseDucklingStatusEmpty(t *testing.T) { cr := &unstructured.Unstructured{ Object: map[string]interface{}{}, diff --git a/controlplane/provisioner/k8s_client.go b/controlplane/provisioner/k8s_client.go index 662de34..bebac91 100644 --- a/controlplane/provisioner/k8s_client.go +++ b/controlplane/provisioner/k8s_client.go @@ -27,11 +27,12 @@ const ducklingNamespace = "ducklings" // but not K8s workloads — those are managed by the duckgres Helm chart. type DucklingStatus struct { MetadataStore struct { - Type string - Endpoint string - Password string - User string - Database string + Type string + Endpoint string + PgBouncerEndpoint string + Password string + User string + Database string } DataStore struct { Type string @@ -140,6 +141,7 @@ func parseDucklingStatus(cr *unstructured.Unstructured) (*DucklingStatus, error) if md, ok := status["metadataStore"].(map[string]interface{}); ok { ds.MetadataStore.Type = getNestedString(md, "type") ds.MetadataStore.Endpoint = getNestedString(md, "endpoint") + ds.MetadataStore.PgBouncerEndpoint = getNestedString(md, "pgbouncerEndpoint") ds.MetadataStore.Password = getNestedString(md, "password") ds.MetadataStore.User = getNestedString(md, "user") ds.MetadataStore.Database = getNestedString(md, "database") diff --git a/controlplane/shared_worker_activator.go b/controlplane/shared_worker_activator.go index 176952c..c654644 100644 --- a/controlplane/shared_worker_activator.go +++ b/controlplane/shared_worker_activator.go @@ -7,8 +7,10 @@ import ( "encoding/json" "fmt" "log/slog" + "net" "os" "slices" + "strconv" "strings" "sync" @@ -193,18 +195,40 @@ func (a *SharedWorkerActivator) buildDuckLakeConfigFromDuckling(ctx context.Cont return server.DuckLakeConfig{}, fmt.Errorf("duckling CR %q has no data store bucket", orgID) } + // Prefer the PgBouncer endpoint when the Duckling exposes one — the + // Crossplane composition sets status.metadataStore.pgbouncerEndpoint + // (as ":") when a per-Duckling pooler is provisioned. + // Otherwise connect directly to the metadata store on its default port. + host := status.MetadataStore.Endpoint + port := 5432 // Aurora always uses 5432 + viaPgBouncer := false + if pgb := status.MetadataStore.PgBouncerEndpoint; pgb != "" { + h, p, err := net.SplitHostPort(pgb) + if err != nil { + return server.DuckLakeConfig{}, fmt.Errorf("parse pgbouncerEndpoint %q for org %q: %w", pgb, orgID, err) + } + portNum, err := strconv.Atoi(p) + if err != nil { + return server.DuckLakeConfig{}, fmt.Errorf("parse pgbouncerEndpoint port %q for org %q: %w", p, orgID, err) + } + host = h + port = portNum + viaPgBouncer = true + } + dl := server.DuckLakeConfig{ MetadataStore: buildDuckLakeMetadataStoreDSN( - status.MetadataStore.Endpoint, - 5432, // Aurora always uses 5432 + host, + port, status.MetadataStore.User, status.MetadataStore.Password, status.MetadataStore.Database, ), - ObjectStore: fmt.Sprintf("s3://%s/", status.DataStore.BucketName), - S3Region: status.DataStore.S3Region, - S3UseSSL: true, - S3URLStyle: "vhost", + ViaPgBouncer: viaPgBouncer, + ObjectStore: fmt.Sprintf("s3://%s/", status.DataStore.BucketName), + S3Region: status.DataStore.S3Region, + S3UseSSL: true, + S3URLStyle: "vhost", } // Broker S3 credentials via STS AssumeRole diff --git a/server/server.go b/server/server.go index d062869..356ca60 100644 --- a/server/server.go +++ b/server/server.go @@ -337,6 +337,14 @@ type DuckLakeConfig struct { // re-running the version check. This avoids redundant backups and // long-running checks in worker processes. Migrate bool `json:"migrate,omitempty" yaml:"-"` + + // ViaPgBouncer is set by the control plane when the DuckLake metadata + // connection is routed through a network-level pooler (e.g. PgBouncer) + // rather than direct to Postgres. When true, the worker disables the + // postgres_scanner in-process pool via `SET GLOBAL pg_pool_max_connections = 0`. + // See duckdb/ducklake#1031: behind a network pooler, client-side pooling + // is redundant and prevents the pooler from reclaiming idle connections. + ViaPgBouncer bool `json:"via_pgbouncer,omitempty" yaml:"-"` } // fileDBEntry tracks a shared *sql.DB for file-persistence mode. @@ -1234,6 +1242,9 @@ func buildDuckLakePreAttachStatements(dlCfg DuckLakeConfig) []string { if duckLakeDisableMetadataThreadLocalCacheEnabled(dlCfg) { statements = append(statements, "SET GLOBAL pg_pool_enable_thread_local_cache = false") } + if dlCfg.ViaPgBouncer { + statements = append(statements, "SET GLOBAL pg_pool_max_connections = 0") + } return statements } diff --git a/server/server_test.go b/server/server_test.go index 357a849..efcd2fa 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -185,6 +185,24 @@ func TestBuildDuckLakePreAttachStatements(t *testing.T) { }, want: nil, }, + { + name: "via pgbouncer disables in-process pool", + cfg: DuckLakeConfig{ + DisableMetadataThreadLocalCache: boolPtr(false), + ViaPgBouncer: true, + }, + want: []string{"SET GLOBAL pg_pool_max_connections = 0"}, + }, + { + name: "via pgbouncer with tls cache disabled emits both", + cfg: DuckLakeConfig{ + ViaPgBouncer: true, + }, + want: []string{ + "SET GLOBAL pg_pool_enable_thread_local_cache = false", + "SET GLOBAL pg_pool_max_connections = 0", + }, + }, } for _, tt := range tests {