Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions controlplane/org_activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,12 @@ func TestSharedWorkerActivatorDucklingCRRequiresSTSBroker(t *testing.T) {
}
return &provisioner.DucklingStatus{
MetadataStore: struct {
Type string
Endpoint string
Password string
User string
Database string
Type string
Endpoint string
PgBouncerEndpoint string
Password string
User string
Database string
}{
Endpoint: "test-org.cluster.rds.amazonaws.com",
Password: "duckling-password-123",
Expand Down Expand Up @@ -295,11 +296,12 @@ func TestSharedWorkerActivatorPrefersSecretRefOverDucklingCR(t *testing.T) {
ducklingCalled = true
return &provisioner.DucklingStatus{
MetadataStore: struct {
Type string
Endpoint string
Password string
User string
Database string
Type string
Endpoint string
PgBouncerEndpoint string
Password string
User string
Database string
}{Password: "cr-password"},
}, nil
},
Expand Down
28 changes: 28 additions & 0 deletions controlplane/provisioner/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,34 @@ func TestParseDucklingStatusSyncedFalse(t *testing.T) {
}
}

func TestParseDucklingStatusPgBouncerEndpoint(t *testing.T) {
cr := &unstructured.Unstructured{
Object: map[string]interface{}{
"status": map[string]interface{}{
"metadataStore": map[string]interface{}{
"type": "aurora",
"endpoint": "posthog-duckling-foo.cluster-xyz.us-east-1.rds.amazonaws.com",
"pgbouncerEndpoint": "pgbouncer-duckling-foo.ducklings.svc.cluster.local:6543",
"user": "postgres",
"database": "postgres",
"password": "s3cret",
},
},
},
}

status, err := parseDucklingStatus(cr)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := status.MetadataStore.PgBouncerEndpoint; got != "pgbouncer-duckling-foo.ducklings.svc.cluster.local:6543" {
t.Fatalf("PgBouncerEndpoint = %q, want pooler DNS", got)
}
if got := status.MetadataStore.Endpoint; got != "posthog-duckling-foo.cluster-xyz.us-east-1.rds.amazonaws.com" {
t.Fatalf("Endpoint = %q, want Aurora DNS", got)
}
}

func TestParseDucklingStatusEmpty(t *testing.T) {
cr := &unstructured.Unstructured{
Object: map[string]interface{}{},
Expand Down
12 changes: 7 additions & 5 deletions controlplane/provisioner/k8s_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ const ducklingNamespace = "ducklings"
// but not K8s workloads — those are managed by the duckgres Helm chart.
type DucklingStatus struct {
MetadataStore struct {
Type string
Endpoint string
Password string
User string
Database string
Type string
Endpoint string
PgBouncerEndpoint string
Password string
User string
Database string
}
DataStore struct {
Type string
Expand Down Expand Up @@ -140,6 +141,7 @@ func parseDucklingStatus(cr *unstructured.Unstructured) (*DucklingStatus, error)
if md, ok := status["metadataStore"].(map[string]interface{}); ok {
ds.MetadataStore.Type = getNestedString(md, "type")
ds.MetadataStore.Endpoint = getNestedString(md, "endpoint")
ds.MetadataStore.PgBouncerEndpoint = getNestedString(md, "pgbouncerEndpoint")
ds.MetadataStore.Password = getNestedString(md, "password")
ds.MetadataStore.User = getNestedString(md, "user")
ds.MetadataStore.Database = getNestedString(md, "database")
Expand Down
36 changes: 30 additions & 6 deletions controlplane/shared_worker_activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"encoding/json"
"fmt"
"log/slog"
"net"
"os"
"slices"
"strconv"
"strings"
"sync"

Expand Down Expand Up @@ -193,18 +195,40 @@ func (a *SharedWorkerActivator) buildDuckLakeConfigFromDuckling(ctx context.Cont
return server.DuckLakeConfig{}, fmt.Errorf("duckling CR %q has no data store bucket", orgID)
}

// Prefer the PgBouncer endpoint when the Duckling exposes one — the
// Crossplane composition sets status.metadataStore.pgbouncerEndpoint
// (as "<host>:<port>") when a per-Duckling pooler is provisioned.
// Otherwise connect directly to the metadata store on its default port.
host := status.MetadataStore.Endpoint
port := 5432 // Aurora always uses 5432
viaPgBouncer := false
if pgb := status.MetadataStore.PgBouncerEndpoint; pgb != "" {
h, p, err := net.SplitHostPort(pgb)
if err != nil {
return server.DuckLakeConfig{}, fmt.Errorf("parse pgbouncerEndpoint %q for org %q: %w", pgb, orgID, err)
}
portNum, err := strconv.Atoi(p)
if err != nil {
return server.DuckLakeConfig{}, fmt.Errorf("parse pgbouncerEndpoint port %q for org %q: %w", p, orgID, err)
}
host = h
port = portNum
viaPgBouncer = true
}

dl := server.DuckLakeConfig{
MetadataStore: buildDuckLakeMetadataStoreDSN(
status.MetadataStore.Endpoint,
5432, // Aurora always uses 5432
host,
port,
status.MetadataStore.User,
status.MetadataStore.Password,
status.MetadataStore.Database,
),
ObjectStore: fmt.Sprintf("s3://%s/", status.DataStore.BucketName),
S3Region: status.DataStore.S3Region,
S3UseSSL: true,
S3URLStyle: "vhost",
ViaPgBouncer: viaPgBouncer,
ObjectStore: fmt.Sprintf("s3://%s/", status.DataStore.BucketName),
S3Region: status.DataStore.S3Region,
S3UseSSL: true,
S3URLStyle: "vhost",
}

// Broker S3 credentials via STS AssumeRole
Expand Down
11 changes: 11 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ type DuckLakeConfig struct {
// re-running the version check. This avoids redundant backups and
// long-running checks in worker processes.
Migrate bool `json:"migrate,omitempty" yaml:"-"`

// ViaPgBouncer is set by the control plane when the DuckLake metadata
// connection is routed through a network-level pooler (e.g. PgBouncer)
// rather than direct to Postgres. When true, the worker disables the
// postgres_scanner in-process pool via `SET GLOBAL pg_pool_max_connections = 0`.
// See duckdb/ducklake#1031: behind a network pooler, client-side pooling
// is redundant and prevents the pooler from reclaiming idle connections.
ViaPgBouncer bool `json:"via_pgbouncer,omitempty" yaml:"-"`
}

// fileDBEntry tracks a shared *sql.DB for file-persistence mode.
Expand Down Expand Up @@ -1234,6 +1242,9 @@ func buildDuckLakePreAttachStatements(dlCfg DuckLakeConfig) []string {
if duckLakeDisableMetadataThreadLocalCacheEnabled(dlCfg) {
statements = append(statements, "SET GLOBAL pg_pool_enable_thread_local_cache = false")
}
if dlCfg.ViaPgBouncer {
statements = append(statements, "SET GLOBAL pg_pool_max_connections = 0")
}
return statements
}

Expand Down
18 changes: 18 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,24 @@ func TestBuildDuckLakePreAttachStatements(t *testing.T) {
},
want: nil,
},
{
name: "via pgbouncer disables in-process pool",
cfg: DuckLakeConfig{
DisableMetadataThreadLocalCache: boolPtr(false),
ViaPgBouncer: true,
},
want: []string{"SET GLOBAL pg_pool_max_connections = 0"},
},
{
name: "via pgbouncer with tls cache disabled emits both",
cfg: DuckLakeConfig{
ViaPgBouncer: true,
},
want: []string{
"SET GLOBAL pg_pool_enable_thread_local_cache = false",
"SET GLOBAL pg_pool_max_connections = 0",
},
},
}

for _, tt := range tests {
Expand Down
Loading