diff --git a/Dockerfile b/Dockerfile index 4db4a46..3c37dff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,14 +12,18 @@ ARG BUILD_TAGS="" ARG TARGETARCH ARG DUCKDB_EXTENSION_VERSION=1.5.2 ARG HTTPFS_EXTENSION_TAG=v1.5.2-stoi-fix +ARG DUCKDB_EXTENSION_REPOSITORY=https://extensions.duckdb.org +ARG DUCKDB_NIGHTLY_EXTENSION_REPOSITORY=http://nightly-extensions.duckdb.org RUN CGO_ENABLED=1 go build -tags "${BUILD_TAGS}" -ldflags "-X main.version=${VERSION} -X main.commit=${COMMIT} -X main.date=$(date -u +%Y-%m-%dT%H:%M:%SZ)" -o duckgres . RUN mkdir -p "/build/duckdb-extensions/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}" \ && curl -fsSL "https://github.com/benben/duckdb-httpfs/releases/download/${HTTPFS_EXTENSION_TAG}/httpfs-linux-${TARGETARCH}.duckdb_extension" \ -o "/build/duckdb-extensions/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}/httpfs.duckdb_extension" \ - && for ext in ducklake postgres_scanner json; do \ - curl -fsSL "https://extensions.duckdb.org/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}/${ext}.duckdb_extension.gz" \ + && for ext in ducklake json; do \ + curl -fsSL "${DUCKDB_EXTENSION_REPOSITORY}/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}/${ext}.duckdb_extension.gz" \ | gunzip > "/build/duckdb-extensions/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}/${ext}.duckdb_extension"; \ - done + done \ + && curl -fsSL "${DUCKDB_NIGHTLY_EXTENSION_REPOSITORY}/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}/postgres_scanner.duckdb_extension.gz" \ + | gunzip > "/build/duckdb-extensions/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}/postgres_scanner.duckdb_extension" FROM debian:bookworm-slim diff --git a/server/bundled_extensions_test.go b/server/bundled_extensions_test.go index 8827161..a2c18f3 100644 --- a/server/bundled_extensions_test.go +++ b/server/bundled_extensions_test.go @@ -3,6 +3,7 @@ package server import ( "os" "path/filepath" + "sync" "testing" ) @@ -41,7 +42,7 @@ func TestSeedBundledExtensionsCopiesMissingFiles(t *testing.T) { } } -func TestSeedBundledExtensionsPreservesExistingFiles(t *testing.T) { +func TestSeedBundledExtensionsPreservesExistingFilesWithMatchingContents(t *testing.T) { srcRoot := t.TempDir() dstRoot := t.TempDir() @@ -49,7 +50,7 @@ func TestSeedBundledExtensionsPreservesExistingFiles(t *testing.T) { if err := os.MkdirAll(srcDir, 0o755); err != nil { t.Fatalf("mkdir src: %v", err) } - if err := os.WriteFile(filepath.Join(srcDir, "httpfs.duckdb_extension"), []byte("new"), 0o644); err != nil { + if err := os.WriteFile(filepath.Join(srcDir, "httpfs.duckdb_extension"), []byte("existing"), 0o644); err != nil { t.Fatalf("write src extension: %v", err) } @@ -74,3 +75,154 @@ func TestSeedBundledExtensionsPreservesExistingFiles(t *testing.T) { t.Fatalf("expected existing extension to be preserved, got %q", string(got)) } } + +func TestSeedBundledExtensionsReplacesExistingFilesWithUpdatedContents(t *testing.T) { + srcRoot := t.TempDir() + dstRoot := t.TempDir() + + srcDir := filepath.Join(srcRoot, "v1.5.2", "linux_arm64") + if err := os.MkdirAll(srcDir, 0o755); err != nil { + t.Fatalf("mkdir src: %v", err) + } + srcExt := filepath.Join(srcDir, "postgres_scanner.duckdb_extension") + if err := os.WriteFile(srcExt, []byte("nightly"), 0o644); err != nil { + t.Fatalf("write src extension: %v", err) + } + + dstDir := filepath.Join(dstRoot, "v1.5.2", "linux_arm64") + if err := os.MkdirAll(dstDir, 0o755); err != nil { + t.Fatalf("mkdir dst: %v", err) + } + dstExt := filepath.Join(dstDir, "postgres_scanner.duckdb_extension") + if err := os.WriteFile(dstExt, []byte("stable"), 0o644); err != nil { + t.Fatalf("write dst extension: %v", err) + } + + if err := seedBundledExtensions(srcRoot, dstRoot); err != nil { + t.Fatalf("seedBundledExtensions: %v", err) + } + + got, err := os.ReadFile(dstExt) + if err != nil { + t.Fatalf("read dst extension: %v", err) + } + if string(got) != "nightly" { + t.Fatalf("expected existing extension to be replaced, got %q", string(got)) + } +} + +func TestSeedBundledExtensionsPreservesNonTargetedChangedFiles(t *testing.T) { + srcRoot := t.TempDir() + dstRoot := t.TempDir() + + srcDir := filepath.Join(srcRoot, "v1.5.2", "linux_arm64") + if err := os.MkdirAll(srcDir, 0o755); err != nil { + t.Fatalf("mkdir src: %v", err) + } + srcExt := filepath.Join(srcDir, "httpfs.duckdb_extension") + if err := os.WriteFile(srcExt, []byte("new-httpfs"), 0o644); err != nil { + t.Fatalf("write src extension: %v", err) + } + + dstDir := filepath.Join(dstRoot, "v1.5.2", "linux_arm64") + if err := os.MkdirAll(dstDir, 0o755); err != nil { + t.Fatalf("mkdir dst: %v", err) + } + dstExt := filepath.Join(dstDir, "httpfs.duckdb_extension") + if err := os.WriteFile(dstExt, []byte("existing-httpfs"), 0o644); err != nil { + t.Fatalf("write dst extension: %v", err) + } + + if err := seedBundledExtensions(srcRoot, dstRoot); err != nil { + t.Fatalf("seedBundledExtensions: %v", err) + } + + got, err := os.ReadFile(dstExt) + if err != nil { + t.Fatalf("read dst extension: %v", err) + } + if string(got) != "existing-httpfs" { + t.Fatalf("expected non-targeted extension to be preserved, got %q", string(got)) + } +} + +func TestBootstrapBundledExtensionsSeedsBundledExtensions(t *testing.T) { + bundledRoot := t.TempDir() + dataDir := t.TempDir() + + srcDir := filepath.Join(bundledRoot, "v1.5.2", "linux_arm64") + if err := os.MkdirAll(srcDir, 0o755); err != nil { + t.Fatalf("mkdir src: %v", err) + } + srcExt := filepath.Join(srcDir, "postgres_scanner.duckdb_extension") + if err := os.WriteFile(srcExt, []byte("nightly"), 0o644); err != nil { + t.Fatalf("write src extension: %v", err) + } + + prevBundledRoot := bundledDuckDBExtensionsDir + bundledDuckDBExtensionsDir = bundledRoot + defer func() { bundledDuckDBExtensionsDir = prevBundledRoot }() + + bundledExtensionBootstrap = struct { + mu sync.Mutex + byPath map[string]error + }{} + + if err := bootstrapBundledExtensions(dataDir); err != nil { + t.Fatalf("bootstrapBundledExtensions: %v", err) + } + + extDir := filepath.Join(dataDir, "extensions") + dstExt := filepath.Join(extDir, "v1.5.2", "linux_arm64", "postgres_scanner.duckdb_extension") + got, err := os.ReadFile(dstExt) + if err != nil { + t.Fatalf("read dst extension: %v", err) + } + if string(got) != "nightly" { + t.Fatalf("expected seeded extension to match bundled contents, got %q", string(got)) + } +} + +func TestBootstrapBundledExtensionsRunsOncePerExtensionDirectory(t *testing.T) { + bundledRoot := t.TempDir() + dataDir := t.TempDir() + extDir := filepath.Join(dataDir, "extensions") + + srcDir := filepath.Join(bundledRoot, "v1.5.2", "linux_arm64") + if err := os.MkdirAll(srcDir, 0o755); err != nil { + t.Fatalf("mkdir src: %v", err) + } + srcExt := filepath.Join(srcDir, "postgres_scanner.duckdb_extension") + if err := os.WriteFile(srcExt, []byte("nightly"), 0o644); err != nil { + t.Fatalf("write src extension: %v", err) + } + + prevBundledRoot := bundledDuckDBExtensionsDir + bundledDuckDBExtensionsDir = bundledRoot + defer func() { bundledDuckDBExtensionsDir = prevBundledRoot }() + + bundledExtensionBootstrap = struct { + mu sync.Mutex + byPath map[string]error + }{} + + if err := bootstrapBundledExtensions(dataDir); err != nil { + t.Fatalf("bootstrapBundledExtensions: %v", err) + } + + if err := os.WriteFile(srcExt, []byte("newer-nightly"), 0o644); err != nil { + t.Fatalf("rewrite src extension: %v", err) + } + if err := bootstrapBundledExtensions(dataDir); err != nil { + t.Fatalf("second bootstrapBundledExtensions: %v", err) + } + + dstExt := filepath.Join(extDir, "v1.5.2", "linux_arm64", "postgres_scanner.duckdb_extension") + got, err := os.ReadFile(dstExt) + if err != nil { + t.Fatalf("read dst extension: %v", err) + } + if string(got) != "nightly" { + t.Fatalf("expected bootstrap to run once, got %q", string(got)) + } +} diff --git a/server/server.go b/server/server.go index 91553b5..d062869 100644 --- a/server/server.go +++ b/server/server.go @@ -37,7 +37,12 @@ var processVersion = "dev" // clients pinning connection goroutines indefinitely. var startupReadTimeout = 30 * time.Second -const bundledDuckDBExtensionsDir = "/app/extensions" +var bundledDuckDBExtensionsDir = "/app/extensions" + +var bundledExtensionBootstrap struct { + mu sync.Mutex + byPath map[string]error +} // SetProcessVersion sets the version string for this process. Called from main(). func SetProcessVersion(v string) { processVersion = v } @@ -45,6 +50,23 @@ func SetProcessVersion(v string) { processVersion = v } // ProcessVersion returns the version string for this process. func ProcessVersion() string { return processVersion } +func bootstrapBundledExtensions(dataDir string) error { + extDir := filepath.Join(dataDir, "extensions") + + bundledExtensionBootstrap.mu.Lock() + defer bundledExtensionBootstrap.mu.Unlock() + if bundledExtensionBootstrap.byPath == nil { + bundledExtensionBootstrap.byPath = make(map[string]error) + } + if err, ok := bundledExtensionBootstrap.byPath[extDir]; ok { + return err + } + + err := seedBundledExtensions(bundledDuckDBExtensionsDir, extDir) + bundledExtensionBootstrap.byPath[extDir] = err + return err +} + // passwordPattern matches password= or password: with quoted or unquoted values. var passwordPattern = regexp.MustCompile(`(?i)(password\s*[=:]\s*)("[^"]*"|[^\s"]+)`) @@ -477,6 +499,10 @@ func New(cfg Config) (*Server, error) { ensureDuckLakeMigrationCheck(cfg.DuckLake, cfg.DataDir) } + if err := bootstrapBundledExtensions(cfg.DataDir); err != nil { + slog.Warn("Failed to bootstrap bundled DuckDB extensions.", "source", bundledDuckDBExtensionsDir, "extension_directory", filepath.Join(cfg.DataDir, "extensions"), "error", err) + } + // Initialize query logger (non-fatal on error) if ql, err := NewQueryLogger(cfg); err != nil { slog.Warn("Failed to initialize query log, continuing without it.", "error", err) @@ -873,9 +899,6 @@ func openBaseDB(cfg Config, username string) (*sql.DB, error) { // Set extension directory under DataDir so DuckDB doesn't rely on $HOME/.duckdb // for autoloading/installing extensions. extDir := filepath.Join(cfg.DataDir, "extensions") - if err := seedBundledExtensions(bundledDuckDBExtensionsDir, extDir); err != nil { - slog.Warn("Failed to seed bundled DuckDB extensions.", "source", bundledDuckDBExtensionsDir, "extension_directory", extDir, "error", err) - } if _, err := db.Exec(fmt.Sprintf("SET extension_directory = '%s'", extDir)); err != nil { slog.Warn("Failed to set DuckDB extension_directory.", "extension_directory", extDir, "error", err) } else { @@ -964,38 +987,54 @@ func seedBundledExtensions(srcRoot, dstRoot string) error { if info.Mode()&os.ModeSymlink != 0 { return nil } - if _, err := os.Stat(dstPath); err == nil { - return nil - } else if !errors.Is(err, os.ErrNotExist) { + if err := os.MkdirAll(filepath.Dir(dstPath), 0o750); err != nil { return err } - - srcFile, err := os.Open(path) - if err != nil { + if _, err := os.Stat(dstPath); err == nil { + if !shouldRefreshBundledExtension(path) { + return nil + } + } else if !errors.Is(err, os.ErrNotExist) { return err } - dstFile, err := os.OpenFile(dstPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o640) - if err != nil { - _ = srcFile.Close() - return err - } - if _, err := io.Copy(dstFile, srcFile); err != nil { - _ = srcFile.Close() - _ = dstFile.Close() - return err - } - if err := srcFile.Close(); err != nil { - _ = dstFile.Close() - return err - } - if err := dstFile.Close(); err != nil { - return err - } - return nil + return copyFile(path, dstPath, info.Mode().Perm()) }) } +func shouldRefreshBundledExtension(srcPath string) bool { + return filepath.Base(srcPath) == "postgres_scanner.duckdb_extension" +} + +func copyFile(srcPath, dstPath string, mode os.FileMode) error { + srcFile, err := os.Open(srcPath) + if err != nil { + return err + } + defer func() { _ = srcFile.Close() }() + + tmpFile, err := os.CreateTemp(filepath.Dir(dstPath), ".bundled-extension-*") + if err != nil { + return err + } + tmpPath := tmpFile.Name() + defer func() { _ = os.Remove(tmpPath) }() + + if err := tmpFile.Chmod(mode); err != nil { + _ = tmpFile.Close() + return err + } + if _, err := io.Copy(tmpFile, srcFile); err != nil { + _ = tmpFile.Close() + return err + } + if err := tmpFile.Close(); err != nil { + return err + } + + return os.Rename(tmpPath, dstPath) +} + // CreateDBConnection creates a DuckDB connection for a client session. // Uses in-memory database as an anchor for DuckLake attachment (actual data lives in RDS/S3). // This is a standalone function so it can be reused by both the server and control plane workers. diff --git a/server/shell.go b/server/shell.go index 0707bf2..9619b9b 100644 --- a/server/shell.go +++ b/server/shell.go @@ -8,6 +8,7 @@ import ( "log/slog" "os" "os/signal" + "path/filepath" "strings" "sync" "syscall" @@ -18,6 +19,9 @@ import ( // DuckLake, and pg_catalog views are all available. func RunShell(cfg Config) { sem := make(chan struct{}, 1) + if err := bootstrapBundledExtensions(cfg.DataDir); err != nil { + slog.Warn("Failed to bootstrap bundled DuckDB extensions.", "source", bundledDuckDBExtensionsDir, "extension_directory", filepath.Join(cfg.DataDir, "extensions"), "error", err) + } db, err := CreateDBConnection(cfg, sem, "shell", processStartTime, processVersion) if err != nil { diff --git a/server/worker.go b/server/worker.go index f6467c2..624d4d0 100644 --- a/server/worker.go +++ b/server/worker.go @@ -13,6 +13,7 @@ import ( "net" "os" "os/signal" + "path/filepath" "sync" "syscall" "time" @@ -293,6 +294,10 @@ func runChildWorker(tcpConn *net.TCPConn, cfg *ChildConfig) int { parentVersion = cfg.ServerVersion } + if err := bootstrapBundledExtensions(serverCfg.DataDir); err != nil { + slog.Warn("Failed to bootstrap bundled DuckDB extensions.", "source", bundledDuckDBExtensionsDir, "extension_directory", filepath.Join(serverCfg.DataDir, "extensions"), "error", err) + } + // Create DuckDB connection db, err := CreateDBConnection(serverCfg, make(chan struct{}, 1), username, parentStartTime, parentVersion) if err != nil { @@ -382,4 +387,3 @@ func runChildWorker(tcpConn *net.TCPConn, cfg *ChildConfig) int { return ExitSuccess } } - diff --git a/tests/integration/harness.go b/tests/integration/harness.go index 2e7565d..0454d01 100644 --- a/tests/integration/harness.go +++ b/tests/integration/harness.go @@ -14,8 +14,8 @@ import ( "sync" "time" - "github.com/posthog/duckgres/server" _ "github.com/lib/pq" + "github.com/posthog/duckgres/server" ) var duckLakeInfraServices = []string{"ducklake-metadata", "minio", "minio-init"} @@ -148,10 +148,6 @@ func (h *TestHarness) startDuckgres(harnessCfg HarnessConfig) error { // Configure DuckLake if enabled if harnessCfg.UseDuckLake { - // Try the upstream postgres_scanner fix from core_nightly in the - // DuckLake-backed integration harness without changing global defaults. - cfg.Extensions = []string{"postgres FROM core_nightly", "ducklake"} - metadataPort := harnessCfg.DuckLakeMetadataPort // If latency injection is requested, start a TCP proxy in front of the