Skip to content

feat(store): add Postgres backend for the hub store#289

Open
lerms wants to merge 6 commits into
GoogleCloudPlatform:mainfrom
neuralnetes:feat/postgres-store
Open

feat(store): add Postgres backend for the hub store#289
lerms wants to merge 6 commits into
GoogleCloudPlatform:mainfrom
neuralnetes:feat/postgres-store

Conversation

@lerms
Copy link
Copy Markdown

@lerms lerms commented Jun 1, 2026

Summary

Enables database.driver: postgres so the hub can run as a stateless control plane against an external Postgres, configured entirely via GitOps (SCION_SERVER_DATABASE_URL) instead of a node-local SQLite file + PVC.

This is the missing half of Postgres support. The ent layer already spoke Postgres (entc.OpenPostgres + lib/pq); this adds the hand-written store's Postgres twin and wires initStore's case "postgres": to compose them via entadapter.NewCompositeStore — exactly as the sqlite branch does.

graph TB
  subgraph cmd["cmd/server_foreground.go · initStore()"]
    D{"cfg.Database.Driver"}
  end
  D -->|"sqlite"| SQ["sqlite.New → Migrate"]
  D -->|"postgres ✨NEW"| PG["postgres.New → Migrate"]
  D -->|other| ERR["unsupported driver"]

  SQ --> ES["entc.OpenSQLite + AutoMigrate"]
  PG --> EP["entc.OpenPostgres + AutoMigrate<br/>(already existed)"]

  ES --> CS["entadapter.NewCompositeStore"]
  EP --> CS
  CS --> IF[("store.Store · 206 methods<br/>23 sub-interfaces")]

  subgraph trans["Dialect translation (sqlite → postgres)"]
    T1["? → $N placeholders"]
    T2["INSERT OR IGNORE/REPLACE → ON CONFLICT"]
    T3["sqlite_master / PRAGMA → information_schema"]
    T4["randomblob() → gen_random_uuid()"]
    T5["json_each → json_array_elements_text"]
    T6["COLLATE NOCASE → LOWER() unique idx · BLOB → BYTEA"]
  end
  PG -.->|"53 migrations · 206 methods"| trans

  classDef new fill:#1f6feb,color:#fff
  class PG,EP new
Loading

What's included

  • pkg/store/postgres/ — a faithful, per-entity mirror of pkg/store/sqlite/. Same file layout, same method names/signatures/comments/control flow (206 methods in each package). The only differences are dialect mechanics:
    • ?$N numbered placeholders
    • INSERT OR IGNORE/REPLACEON CONFLICT DO NOTHING / DO UPDATE
    • sqlite_master / PRAGMA table_info introspection → information_schema
    • randomblob() UUID seeds → gen_random_uuid()
    • json_each()json_array_elements_text(); json_array()json_build_array()
    • COLLATE NOCASELOWER(email) functional unique index
    • BLOBBYTEA; datetime('now')NOW(); INSTR/SUBSTRsplit_part
  • cmd/server_foreground.go — adds case "postgres": to initStore. The grove→project backfill (MigrateGroveToProjectData) is a SQLite-era data repair and is intentionally skipped on the Postgres path (a fresh Postgres DB has no legacy grove rows).
  • Build tags — the lib/pq driver import is gated behind a no_postgres build tag, mirroring the existing no_sqlite tag, so slim images can exclude either driver.

Testing

Env-gated integration tests (SCION_TEST_POSTGRES_URL) that skip when the variable is unset, so the CI no_sqlite path (make test-fast, no DB) stays green.

Verified locally against PostgreSQL 16:

Gate Result
go build ./... (default)
go build -tags no_sqlite ./...
go build -tags no_postgres ./...
go build -tags 'no_sqlite no_postgres' ./...
make lint (go vet -tags no_sqlite)
make test-fast (no DB) ✅ tests skip, not fail
Live Postgres: 53/53 migrations + 16 CRUD tests ✅ 31 tables created

CRUD round-trips cover users, projects, agents, secrets (incl. upsert), groups + membership, policies, invite codes and env vars.

Notes for reviewers

  • The mirror is deliberately not refactored into a shared dialect abstraction — it stays line-for-line comparable with pkg/store/sqlite/ so the diff between the two is purely the dialect changes listed above, which keeps future schema/method changes easy to apply to both.
  • applyMigrationWithFKOff is preserved (with the foreignKeysOffMigrations map) so the migration-runner shape matches SQLite, but in Postgres it runs a plain transaction — the one table-recreate migration (V40) is made FK-safe via DROP TABLE ... CASCADE rather than toggling PRAGMA foreign_keys.

🤖 Generated with Claude Code

@google-cla
Copy link
Copy Markdown

google-cla Bot commented Jun 1, 2026

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive PostgreSQL store implementation parallel to the existing SQLite backend, enabling stateless control-plane deployments. While the implementation is thorough, several critical issues and optimization opportunities were identified. Specifically, migrations V31 and V40 pose schema corruption risks by using CASCADE drops that silently destroy foreign keys; these should be replaced with direct ALTER TABLE statements. Additionally, concurrent migrations in multi-replica environments could trigger race conditions, which can be mitigated using Postgres advisory locks. Keyset pagination is currently broken or ignored in several listing methods (ListAgents, ListMessages, and ListScheduledEvents), and potential query crashes exist when parsing empty or null ancestry fields. Finally, performance can be significantly improved by replacing recursive Go-based database queries with recursive CTEs for group operations, utilizing JSONB containment operators for repository lookups, and increasing the low default connection pool limit.

Comment on lines +761 to +797
const migrationV40 = `
CREATE TABLE IF NOT EXISTS groves_new (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
slug TEXT NOT NULL UNIQUE,
git_remote TEXT,
labels TEXT,
annotations TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_by TEXT,
owner_id TEXT,
visibility TEXT NOT NULL DEFAULT 'private',
default_runtime_broker_id TEXT REFERENCES runtime_brokers(id) ON DELETE SET NULL,
shared_dirs TEXT,
github_installation_id INTEGER REFERENCES github_installations(installation_id),
github_permissions TEXT,
github_app_status TEXT,
git_identity TEXT
);

INSERT INTO groves_new SELECT
id, name, slug, git_remote, labels, annotations,
created_at, updated_at, created_by, owner_id, visibility,
default_runtime_broker_id, shared_dirs,
github_installation_id, github_permissions, github_app_status,
git_identity
FROM groves ON CONFLICT DO NOTHING;

DROP TABLE IF EXISTS groves CASCADE;
ALTER TABLE groves_new RENAME TO groves;

CREATE INDEX IF NOT EXISTS idx_groves_slug ON groves(slug);
CREATE INDEX IF NOT EXISTS idx_groves_git_remote ON groves(git_remote);
CREATE INDEX IF NOT EXISTS idx_groves_owner ON groves(owner_id);
CREATE INDEX IF NOT EXISTS idx_groves_default_runtime_broker ON groves(default_runtime_broker_id);
`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Recreating the groves table and dropping the old one with CASCADE is extremely dangerous in Postgres. The CASCADE option silently drops all foreign key constraints in other tables (such as agents, templates, schedules, etc.) that reference groves(id). These foreign keys are never recreated, leading to database schema corruption. Since Postgres natively supports dropping and adding constraints, we should use ALTER TABLE directly instead of recreating the table.

const migrationV40 = "ALTER TABLE groves DROP CONSTRAINT IF EXISTS groves_git_remote_key;\nALTER TABLE groves ADD CONSTRAINT groves_slug_key UNIQUE (slug);"

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 3158517. You're right — this is genuine schema corruption in Postgres. Replaced the DROP TABLE ... CASCADE + recreate with in-place ALTER TABLE ... DROP/ADD CONSTRAINT (the SQLite source only recreates because SQLite cannot alter constraints; Postgres can).

Verified against live Postgres 16 after the fix: all 7 FKs referencing the table survive (agents, project_contributors, project_sync_state, scheduled_events, schedules, templates, user_access_tokens), *_slug_key UNIQUE is present, and the git_remote UNIQUE is gone. (Constraint names keep the groves_ prefix because the later V50 table rename doesn't rename constraints — identical to the SQLite backend, harmless.)

Comment on lines +595 to +627
const migrationV31 = `
-- Postgres supports ALTER TABLE directly, so we recreate the table as in SQLite source.
CREATE TABLE notification_subscriptions_new (
id TEXT PRIMARY KEY,
scope TEXT NOT NULL DEFAULT 'agent',
agent_id TEXT,
subscriber_type TEXT NOT NULL DEFAULT 'agent',
subscriber_id TEXT NOT NULL,
grove_id TEXT NOT NULL,
trigger_activities TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_by TEXT NOT NULL,
FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE
);

-- Copy existing data (all existing subscriptions are agent-scoped)
INSERT INTO notification_subscriptions_new
(id, scope, agent_id, subscriber_type, subscriber_id, grove_id, trigger_activities, created_at, created_by)
SELECT id, 'agent', agent_id, subscriber_type, subscriber_id, grove_id, trigger_activities, created_at, created_by
FROM notification_subscriptions;

DROP TABLE notification_subscriptions CASCADE;
ALTER TABLE notification_subscriptions_new RENAME TO notification_subscriptions;

-- Recreate indexes
CREATE INDEX IF NOT EXISTS idx_notification_subs_agent ON notification_subscriptions(agent_id);
CREATE INDEX IF NOT EXISTS idx_notification_subs_project ON notification_subscriptions(grove_id);
CREATE INDEX IF NOT EXISTS idx_notification_subs_subscriber ON notification_subscriptions(subscriber_type, subscriber_id);

-- Unique constraint: one subscription per (scope, target, subscriber, project)
CREATE UNIQUE INDEX IF NOT EXISTS idx_notification_subs_unique
ON notification_subscriptions(scope, COALESCE(agent_id, ''), subscriber_type, subscriber_id, grove_id);
`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Dropping the notification_subscriptions table with CASCADE silently destroys the foreign key constraint in the notifications table (notifications.subscription_id references notification_subscriptions(id)). This constraint is never recreated, leading to schema corruption. Since Postgres natively supports adding columns and dropping NOT NULL constraints, we should use ALTER TABLE directly instead of recreating the table.

const migrationV31 = "ALTER TABLE notification_subscriptions ADD COLUMN scope TEXT NOT NULL DEFAULT 'agent';\nALTER TABLE notification_subscriptions ALTER COLUMN agent_id DROP NOT NULL;\nCREATE UNIQUE INDEX IF NOT EXISTS idx_notification_subs_unique ON notification_subscriptions(scope, COALESCE(agent_id, ''), subscriber_type, subscriber_id, grove_id);"

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 3158517. Same CASCADE corruption — the recreate silently dropped notifications.subscription_idnotification_subscriptions(id). Replaced with in-place ADD COLUMN ... scope + ALTER COLUMN agent_id DROP NOT NULL, keeping the existing dedup unique index. Verified on live Postgres: scope (NOT NULL) and nullable agent_id are present and the dependent FK survives.

}

// Migrate applies database migrations.
func (s *PostgresStore) Migrate(ctx context.Context) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In a stateless cloud topology where multiple replicas of the hub start up concurrently, they will all attempt to run database migrations at the same time. This can lead to race conditions, duplicate key errors, or table creation failures. Using a session-level Postgres advisory lock (pg_advisory_lock) at the start of the migration process ensures that only one replica runs the migrations at a time, while others wait safely.

func (s *PostgresStore) Migrate(ctx context.Context) error {
	// Acquire a session-level advisory lock to prevent concurrent migrations
	// when scaling horizontally. 123456 is an arbitrary lock ID.
	if _, err := s.db.ExecContext(ctx, "SELECT pg_advisory_lock(123456)"); err != nil {
		return fmt.Errorf("failed to acquire advisory lock: %w", err)
	}
	defer s.db.ExecContext(ctx, "SELECT pg_advisory_unlock(123456)")

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 3158517. Added a database-global pg_advisory_lock around Migrate() so concurrent hub replicas serialize migrations — exactly the stateless scale-out this backend exists for.

One correctness refinement over the snippet: a session-level advisory lock's lock and unlock must run on the same session, but with a connection pool they can land on different pooled connections. So I pin the lock to a dedicated *sql.Conn (db.Conn(ctx)defer lockConn.Close()); the migrations still run via the pool, and since the lock is database-global it blocks other replicas regardless. Verified no advisory locks leak after Migrate() returns.

Comment on lines +177 to +228
whereClause := ""
if len(conditions) > 0 {
whereClause = "WHERE " + strings.Join(conditions, " AND ")
}

// Get total count
var totalCount int
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM scheduled_events %s", whereClause)
if err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount); err != nil {
return nil, err
}

// Apply pagination
limit := opts.Limit
if limit <= 0 {
limit = 50
}
if limit > 200 {
limit = 200
}

query := fmt.Sprintf(`
SELECT id, project_id, event_type, fire_at, payload, status,
created_at, created_by, fired_at, error, schedule_id
FROM scheduled_events %s
ORDER BY created_at DESC
LIMIT $%d
`, whereClause, len(args)+1)

queryArgs := append(args, limit+1) //nolint:gocritic // intentional append to copy

if opts.Cursor != "" {
if whereClause == "" {
query = fmt.Sprintf(`
SELECT id, project_id, event_type, fire_at, payload, status,
created_at, created_by, fired_at, error, schedule_id
FROM scheduled_events WHERE id < $%d
ORDER BY created_at DESC
LIMIT $%d
`, len(args)+1, len(args)+2)
} else {
query = fmt.Sprintf(`
SELECT id, project_id, event_type, fire_at, payload, status,
created_at, created_by, fired_at, error, schedule_id
FROM scheduled_events %s AND id < $%d
ORDER BY created_at DESC
LIMIT $%d
`, whereClause, len(args)+1, len(args)+2)
}
queryArgs = append(args, opts.Cursor, limit+1) //nolint:gocritic
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The keyset pagination logic here is broken. The query sorts by created_at DESC but filters by id < opts.Cursor. Since id is a UUID, comparing it has no relation to the created_at order, resulting in scrambled and incorrect pagination. We should use a proper keyset pagination condition comparing both created_at and id using subqueries. Additionally, appending this condition directly to conditions simplifies the code and avoids duplicating the entire query.

	if opts.Cursor != "" {
		conditions = append(conditions, fmt.Sprintf("(created_at < (SELECT created_at FROM scheduled_events WHERE id = $%d) OR (created_at = (SELECT created_at FROM scheduled_events WHERE id = $%d) AND id < $%d))", len(args)+1, len(args)+2, len(args)+3))
		args = append(args, opts.Cursor, opts.Cursor, opts.Cursor)
	}

	whereClause := ""
	if len(conditions) > 0 {
		whereClause = "WHERE " + strings.Join(conditions, " AND ")
	}

	// Get total count
	var totalCount int
	countQuery := fmt.Sprintf("SELECT COUNT(*) FROM scheduled_events %s", whereClause)
	if err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount); err != nil {
		return nil, err
	}

	// Apply pagination
	limit := opts.Limit
	if limit <= 0 {
		limit = 50
	}
	if limit > 200 {
		limit = 200
	}

	query := fmt.Sprintf("SELECT id, project_id, event_type, fire_at, payload, status, created_at, created_by, fired_at, error, schedule_id FROM scheduled_events %s ORDER BY created_at DESC LIMIT $%d", whereClause, len(args)+1)

	queryArgs := append(args, limit+1)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⏸️ Won't fix in this PR — faithful-mirror scope. This one does consume opts.Cursor, but the id < cursor + ORDER BY created_at DESC shape is copied verbatim from the SQLite source (ListScheduledEvents uses the identical AND id < ? / ORDER BY created_at DESC). The mixed sort-vs-filter-key critique is valid, but it applies equally to upstream SQLite — fixing it here only would make the two backends diverge. Better as a cross-backend follow-up.

Comment on lines +313 to +318
// Exclude soft-deleted agents unless explicitly requested
if !filter.IncludeDeleted {
conditions = append(conditions, "deleted_at IS NULL")
}

whereClause := ""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The ListAgents method sets NextCursor in the return value but completely ignores opts.Cursor in the SQL query. This breaks pagination entirely, causing the UI to reload the first page infinitely when clicking 'Next Page'. We should add the keyset pagination condition to conditions using the created_at and id fields.

Suggested change
// Exclude soft-deleted agents unless explicitly requested
if !filter.IncludeDeleted {
conditions = append(conditions, "deleted_at IS NULL")
}
whereClause := ""
if opts.Cursor != "" {
conditions = append(conditions, fmt.Sprintf("(created_at < (SELECT created_at FROM agents WHERE id = $%d) OR (created_at = (SELECT created_at FROM agents WHERE id = $%d) AND id < $%d))", len(args)+1, len(args)+2, len(args)+3))
args = append(args, opts.Cursor, opts.Cursor, opts.Cursor)
}
// Exclude soft-deleted agents unless explicitly requested
if !filter.IncludeDeleted {
conditions = append(conditions, "deleted_at IS NULL")
}
whereClause := ""

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⏸️ Won't fix in this PR — faithful-mirror scope. Confirmed the SQLite source (ListAgents) has the identical behavior: it sets result.NextCursor = agents[limit-1].ID and never consumes opts.Cursor either. So this isn't a regression introduced by the Postgres backend — it's pre-existing upstream behavior that this PR faithfully mirrors (the stated mandate: 1-to-1 with pkg/store/sqlite, no behavioral drift).

Fixing keyset pagination is a legitimate bug, but it should land in a dedicated change that fixes both backends together so they don't diverge. Happy to file/track that as a follow-up.

Comment on lines +407 to +466
// GetEffectiveGroups returns all groups a user belongs to, including transitive memberships.
func (s *PostgresStore) GetEffectiveGroups(ctx context.Context, userID string) ([]string, error) {
// Start with direct group memberships
directMemberships, err := s.GetUserGroups(ctx, userID)
if err != nil {
return nil, err
}

effectiveGroups := make(map[string]bool)
for _, m := range directMemberships {
effectiveGroups[m.GroupID] = true
// Add transitive group memberships
if err := s.addTransitiveGroups(ctx, m.GroupID, effectiveGroups); err != nil {
return nil, err
}
}

result := make([]string, 0, len(effectiveGroups))
for groupID := range effectiveGroups {
result = append(result, groupID)
}

return result, nil
}

// addTransitiveGroups recursively adds all groups that contain the given group.
func (s *PostgresStore) addTransitiveGroups(ctx context.Context, groupID string, visited map[string]bool) error {
// Find all groups where this group is a member
rows, err := s.db.QueryContext(ctx,
"SELECT group_id FROM group_members WHERE member_type = 'group' AND member_id = $1", groupID)
if err != nil {
return err
}

// Collect all parent group IDs first, then close rows before recursing
// This avoids issues with SQLite connections during recursive queries
var parentGroupIDs []string
for rows.Next() {
var parentGroupID string
if err := rows.Scan(&parentGroupID); err != nil {
rows.Close()
return err
}
parentGroupIDs = append(parentGroupIDs, parentGroupID)
}
rows.Close()

// Now recurse after rows are closed
for _, parentGroupID := range parentGroupIDs {
if !visited[parentGroupID] {
visited[parentGroupID] = true
if err := s.addTransitiveGroups(ctx, parentGroupID, visited); err != nil {
return err
}
}
}

return nil
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Fetching transitive group memberships recursively in Go code results in O(N) database roundtrips, which is highly inefficient and slow. We can optimize this to a single O(1) database query using a Recursive Common Table Expression (CTE), which is fully supported by Postgres.

// GetEffectiveGroups returns all groups a user belongs to, including transitive memberships.
func (s *PostgresStore) GetEffectiveGroups(ctx context.Context, userID string) ([]string, error) {
	rows, err := s.db.QueryContext(ctx, "WITH RECURSIVE user_groups AS (SELECT group_id FROM group_members WHERE member_type = 'user' AND member_id = $1 UNION SELECT gm.group_id FROM group_members gm INNER JOIN user_groups ug ON gm.member_type = 'group' AND gm.member_id = ug.group_id) SELECT group_id FROM user_groups", userID)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var groupIDs []string
	for rows.Next() {
		var id string
		if err := rows.Scan(&id); err != nil {
			return nil, err
		}
		groupIDs = append(groupIDs, id)
	}
	if err := rows.Err(); err != nil {
		return nil, err
	}
	return groupIDs, nil
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⏸️ Won't fix in this PR — faithful-mirror scope. The SQLite source resolves transitive memberships with Go-side iteration too (for _, m := range directMembershipsfor groupID := range effectiveGroups), so this mirrors upstream rather than introducing the N-roundtrip pattern. A recursive CTE is a real improvement, but as a Postgres-only change it would diverge the backends. Good candidate for a cross-backend optimization follow-up.

Comment on lines +356 to +406
// WouldCreateCycle checks if adding memberGroupID as a member of groupID would create a cycle.
// A cycle exists if groupID is reachable from memberGroupID by following the containment relationship.
// Example: if A contains B, and we try to add A as member of B, we'd have A->B->A (cycle).
func (s *PostgresStore) WouldCreateCycle(ctx context.Context, groupID, memberGroupID string) (bool, error) {
// If they're the same, it's a direct cycle
if groupID == memberGroupID {
return true, nil
}

// Check if groupID is reachable from memberGroupID by traversing DOWN the containment graph
// (i.e., checking what groups memberGroupID contains, and what those contain, etc.)
visited := make(map[string]bool)
return s.hasPathDown(ctx, memberGroupID, groupID, visited)
}

// hasPathDown checks if 'target' is reachable from 'current' by following containment.
// It looks at what groups 'current' contains as members.
func (s *PostgresStore) hasPathDown(ctx context.Context, current, target string, visited map[string]bool) (bool, error) {
if current == target {
return true, nil
}
if visited[current] {
return false, nil
}
visited[current] = true

// Get all groups that 'current' contains (groups where current is the group_id)
rows, err := s.db.QueryContext(ctx,
"SELECT member_id FROM group_members WHERE member_type = 'group' AND group_id = $1", current)
if err != nil {
return false, err
}
defer rows.Close()

for rows.Next() {
var childGroupID string
if err := rows.Scan(&childGroupID); err != nil {
return false, err
}
found, err := s.hasPathDown(ctx, childGroupID, target, visited)
if err != nil {
return false, err
}
if found {
return true, nil
}
}

return false, nil
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Checking for cycles recursively in Go code results in O(N) database roundtrips, which is highly inefficient. We can optimize this to a single O(1) database query using a Recursive Common Table Expression (CTE), which is fully supported by Postgres.

Suggested change
// WouldCreateCycle checks if adding memberGroupID as a member of groupID would create a cycle.
// A cycle exists if groupID is reachable from memberGroupID by following the containment relationship.
// Example: if A contains B, and we try to add A as member of B, we'd have A->B->A (cycle).
func (s *PostgresStore) WouldCreateCycle(ctx context.Context, groupID, memberGroupID string) (bool, error) {
// If they're the same, it's a direct cycle
if groupID == memberGroupID {
return true, nil
}
// Check if groupID is reachable from memberGroupID by traversing DOWN the containment graph
// (i.e., checking what groups memberGroupID contains, and what those contain, etc.)
visited := make(map[string]bool)
return s.hasPathDown(ctx, memberGroupID, groupID, visited)
}
// hasPathDown checks if 'target' is reachable from 'current' by following containment.
// It looks at what groups 'current' contains as members.
func (s *PostgresStore) hasPathDown(ctx context.Context, current, target string, visited map[string]bool) (bool, error) {
if current == target {
return true, nil
}
if visited[current] {
return false, nil
}
visited[current] = true
// Get all groups that 'current' contains (groups where current is the group_id)
rows, err := s.db.QueryContext(ctx,
"SELECT member_id FROM group_members WHERE member_type = 'group' AND group_id = $1", current)
if err != nil {
return false, err
}
defer rows.Close()
for rows.Next() {
var childGroupID string
if err := rows.Scan(&childGroupID); err != nil {
return false, err
}
found, err := s.hasPathDown(ctx, childGroupID, target, visited)
if err != nil {
return false, err
}
if found {
return true, nil
}
}
return false, nil
}
// WouldCreateCycle checks if adding memberGroupID as a member of groupID would create a cycle.
// A cycle exists if groupID is reachable from memberGroupID by following the containment relationship.
// Example: if A contains B, and we try to add A as member of B, we'd have A->B->A (cycle).
func (s *PostgresStore) WouldCreateCycle(ctx context.Context, groupID, memberGroupID string) (bool, error) {
if groupID == memberGroupID {
return true, nil
}
var exists bool
err := s.db.QueryRowContext(ctx, "WITH RECURSIVE containment AS (SELECT member_id FROM group_members WHERE member_type = 'group' AND group_id = $1 UNION SELECT gm.member_id FROM group_members gm INNER JOIN containment c ON gm.member_type = 'group' AND gm.group_id = c.member_id) SELECT EXISTS (SELECT 1 FROM containment WHERE member_id = $2)", memberGroupID, groupID).Scan(&exists)
if err != nil {
return false, err
}
return exists, nil
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⏸️ Won't fix in this PR — faithful-mirror scope. Same as GetEffectiveGroups: the SQLite source does the cycle check via Go recursion, and this package mirrors it 1-to-1. Recursive-CTE optimization is worth doing across both backends together in a follow-up.

Comment on lines +284 to +290
// CleanExpiredJoinTokens removes all expired join tokens.
func (s *PostgresStore) CleanExpiredJoinTokens(ctx context.Context) error {
_, err := s.db.ExecContext(ctx, `
DELETE FROM broker_join_tokens WHERE expires_at < $1
`, time.Now())
return err
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using time.Now() in Go passes the application server's local time to the database, which can lead to issues due to clock skew or timezone mismatches. It is more robust and consistent to use the database's native NOW() function directly in the SQL query.

// CleanExpiredJoinTokens removes all expired join tokens.
func (s *PostgresStore) CleanExpiredJoinTokens(ctx context.Context) error {
	_, err := s.db.ExecContext(ctx, "DELETE FROM broker_join_tokens WHERE expires_at < NOW()")
	return err
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⏸️ Won't fix in this PR — faithful-mirror scope. The SQLite source passes time.Now() as a bind parameter here too, so this mirrors upstream. Switching to the DB's NOW() is a fair robustness point (clock-skew), but it should change both backends together. Noting as a follow-up.

Comment on lines +113 to +131
func (s *PostgresStore) GetInstallationForRepository(ctx context.Context, repoFullName string) (*store.GitHubInstallation, error) {
// Search active installations whose repositories JSON array contains the repo.
installations, err := s.ListGitHubInstallations(ctx, store.GitHubInstallationFilter{
Status: store.GitHubInstallationStatusActive,
})
if err != nil {
return nil, err
}

for i := range installations {
for _, repo := range installations[i].Repositories {
if repo == repoFullName {
return &installations[i], nil
}
}
}
return nil, store.ErrNotFound
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Fetching all active GitHub installations into Go memory and filtering them in Go is highly inefficient and doesn't scale. We can delegate this filtering to the database using Postgres's native JSONB containment operator (@>) to find the matching installation in a single query.

Suggested change
func (s *PostgresStore) GetInstallationForRepository(ctx context.Context, repoFullName string) (*store.GitHubInstallation, error) {
// Search active installations whose repositories JSON array contains the repo.
installations, err := s.ListGitHubInstallations(ctx, store.GitHubInstallationFilter{
Status: store.GitHubInstallationStatusActive,
})
if err != nil {
return nil, err
}
for i := range installations {
for _, repo := range installations[i].Repositories {
if repo == repoFullName {
return &installations[i], nil
}
}
}
return nil, store.ErrNotFound
}
func (s *PostgresStore) GetInstallationForRepository(ctx context.Context, repoFullName string) (*store.GitHubInstallation, error) {
var inst store.GitHubInstallation
var repos string
err := s.db.QueryRowContext(ctx, "SELECT installation_id, account_login, account_type, app_id, repositories, status, created_at, updated_at FROM github_installations WHERE status = $1 AND repositories::jsonb @> jsonb_build_array($2) LIMIT 1", store.GitHubInstallationStatusActive, repoFullName).Scan(
&inst.InstallationID, &inst.AccountLogin, &inst.AccountType,
&inst.AppID, &repos, &inst.Status, &inst.CreatedAt, &inst.UpdatedAt,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, store.ErrNotFound
}
return nil, err
}
unmarshalJSON(repos, &inst.Repositories)
return &inst, nil
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⏸️ Won't fix in this PR — faithful-mirror scope. The SQLite source also fetches active installations and filters the repositories JSON array in Go (for i := range installations / for _, repo := range ...). Pushing the filter into a jsonb @> query is a nice optimization but only available on the Postgres side, so doing it here alone diverges the backends. Tracking as a follow-up.

Comment thread pkg/store/postgres/postgres.go Outdated
Comment on lines +39 to +40
db.SetMaxOpenConns(4)
db.SetMaxIdleConns(4)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

A connection pool limit of 4 is extremely low for a concurrent production Postgres database. Under concurrent load, this will quickly lead to connection starvation and high latency. We should increase the default to a more reasonable value (e.g., 25) or make it configurable.

Suggested change
db.SetMaxOpenConns(4)
db.SetMaxIdleConns(4)
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(25)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 3158517 — bumped to SetMaxOpenConns(25) / SetMaxIdleConns(25). (The 4 was a literal carryover from the SQLite package, where it's a single-writer concession; it makes no sense for Postgres. Added a comment explaining the divergence.)

lerms added a commit to neuralnetes/scion that referenced this pull request Jun 1, 2026
…stry guard, concurrency

Three genuine Postgres-specific correctness fixes from the PR GoogleCloudPlatform#289 review:

- V40/V31 migrations: replace the SQLite-style DROP TABLE ... CASCADE +
  recreate with in-place ALTER TABLE. In Postgres, CASCADE silently drops
  every foreign key in other tables that references the recreated table and
  never restores them (schema corruption). Postgres supports DROP/ADD
  CONSTRAINT and ALTER COLUMN DROP NOT NULL directly. Verified against live
  Postgres 16: all 7 FKs referencing the projects (ex-groves) table and the
  notifications->notification_subscriptions FK survive.

- ListAgents ancestry filter: json_array_elements_text(ancestry::json) crashes
  on empty-string/NULL ancestry ('invalid input syntax for type json'). Guard
  with COALESCE(NULLIF(ancestry,''),'[]'). SQLite's json_each tolerated this.

- Migrate(): acquire a database-global pg_advisory_lock on a pinned connection
  so concurrent hub replicas serialize migrations (the stateless-scale-out goal
  of this backend). Bump pool 4 -> 25 to avoid connection starvation under
  concurrent load.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@lerms
Copy link
Copy Markdown
Author

lerms commented Jun 1, 2026

Response to Gemini review

Thanks for the thorough pass. I triaged all 12 comments against the SQLite source, because this PR's explicit mandate is a faithful 1-to-1 mirror of pkg/store/sqlite — so the test for "fix here" is whether an issue is Postgres-specific (must fix) vs. inherited verbatim from the SQLite backend (fixing Postgres-only would make the two backends diverge, which is the one thing this PR must not do).

✅ Fixed (5) — genuinely Postgres-specific

# Issue Fix
🔴 V40 DROP TABLE … CASCADE + recreate silently drops every FK referencing the table in-place ALTER TABLE DROP/ADD CONSTRAINT
🔴 V31 same CASCADE corruption (notifications.subscription_id FK) in-place ADD COLUMN + ALTER COLUMN DROP NOT NULL
🟠 ancestry ''::json crashes the query (SQLite's json_each tolerated it) COALESCE(NULLIF(ancestry,''),'[]') guard
🟠 advisory lock concurrent replicas race on migrations pg_advisory_lock pinned to a dedicated conn
🟡 pool size 4 is a SQLite single-writer carryover bumped to 25

All verified against live Postgres 16: 53/53 migrations apply, all 7 FKs to the ex-groves table + the notification_subscriptions FK survive, ancestry edge cases pass, no advisory locks leak, full CRUD suite green.

⏸️ Deferred (7) — faithful reproductions of the SQLite source

These are all valid observations, but the behavior is identical in pkg/store/sqlite (verified each), so they're pre-existing upstream characteristics this PR mirrors rather than regressions it introduces:

  • ListAgents / ListMessages ignore opts.Cursor — SQLite source does too (only sets NextCursor)
  • ListScheduledEvents keyset (id < cursor + created_at DESC) — copied verbatim from SQLite
  • GetEffectiveGroups / WouldCreateCycle Go-recursion — SQLite uses Go recursion, not CTEs
  • GetInstallationForRepository Go-filter — SQLite filters in Go
  • CleanExpiredJoinTokens time.Now() — SQLite passes time.Now()

Fixing these (keyset pagination, recursive CTEs, jsonb @>, DB-side NOW()) is worthwhile, but each should land in a change that touches both backends together to keep them in lockstep. Happy to file a tracking issue for the cross-backend optimization pass if useful.

@lerms lerms marked this pull request as ready for review June 1, 2026 12:13
@ptone
Copy link
Copy Markdown
Member

ptone commented Jun 1, 2026

See the broad outline of a plan in #53 , more work on this is starting this week

lerms and others added 5 commits June 3, 2026 18:13
Adds a faithful pkg/store/postgres mirror of pkg/store/sqlite so the hub
can run as a stateless control plane against an external Postgres,
configured via database.driver: postgres (SCION_SERVER_DATABASE_DRIVER /
SCION_SERVER_DATABASE_URL) instead of a node-local SQLite file.

The ent layer already supported Postgres (entc.OpenPostgres); this adds
the hand-written store's Postgres twin and wires initStore's
case "postgres": to compose them via entadapter.NewCompositeStore.

The mirror stays 1-to-1 with the SQLite implementation — same method
names, signatures, comments and control flow (206 methods each). The only
differences are dialect mechanics:
  - ? -> $N numbered placeholders
  - INSERT OR IGNORE/REPLACE -> ON CONFLICT DO NOTHING / DO UPDATE
  - sqlite_master / PRAGMA introspection -> information_schema
  - randomblob() UUID seeds -> gen_random_uuid()
  - json_each() -> json_array_elements_text(); json_array() -> json_build_array()
  - COLLATE NOCASE -> LOWER(email) functional unique index
  - BLOB -> BYTEA; datetime('now') -> NOW(); INSTR/SUBSTR -> split_part

The grove->project data backfill (MigrateGroveToProjectData) is a
SQLite-era repair and is intentionally skipped on the Postgres path —
a fresh Postgres DB has no legacy grove rows.

The Postgres driver import is gated behind a no_postgres build tag,
mirroring the existing no_sqlite tag, so slim images can exclude either
driver.

Tests: env-gated integration tests (SCION_TEST_POSTGRES_URL) cover all 53
migrations plus CRUD round-trips for users, projects, agents, secrets,
groups, policies, invite codes and env vars. They skip when the env var
is unset so the CI no_sqlite path (make test-fast, no DB) stays green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…stry guard, concurrency

Three genuine Postgres-specific correctness fixes from the PR GoogleCloudPlatform#289 review:

- V40/V31 migrations: replace the SQLite-style DROP TABLE ... CASCADE +
  recreate with in-place ALTER TABLE. In Postgres, CASCADE silently drops
  every foreign key in other tables that references the recreated table and
  never restores them (schema corruption). Postgres supports DROP/ADD
  CONSTRAINT and ALTER COLUMN DROP NOT NULL directly. Verified against live
  Postgres 16: all 7 FKs referencing the projects (ex-groves) table and the
  notifications->notification_subscriptions FK survive.

- ListAgents ancestry filter: json_array_elements_text(ancestry::json) crashes
  on empty-string/NULL ancestry ('invalid input syntax for type json'). Guard
  with COALESCE(NULLIF(ancestry,''),'[]'). SQLite's json_each tolerated this.

- Migrate(): acquire a database-global pg_advisory_lock on a pinned connection
  so concurrent hub replicas serialize migrations (the stateless-scale-out goal
  of this backend). Bump pool 4 -> 25 to avoid connection starvation under
  concurrent load.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…-cast collision

The SQLite store gives Ent its own database file (entDSN := url + "_ent"), so
the raw-migration tables and the Ent-managed tables never share storage. The
Postgres path opened Ent on the *same* DSN as the raw store, so Ent's
auto-migration tried to ALTER tables the raw migrations had already created
with different column types — projects.id is TEXT in the raw schema but UUID in
the Ent model — failing at boot with:

  ent migrate: modify "projects" table: pq: column "id" cannot be cast
  automatically to type uuid (42804)

Create a dedicated `ent` schema and pin the Ent client's search_path to it
(withSearchPath helper, URL + keyword DSN forms), the Postgres analog of
SQLite's separate _ent file. Verified end-to-end against a live PG16 Lakebase:
raw store lands 31 tables in `public`, Ent lands 8 in `ent`, public.projects.id
stays TEXT while ent.projects.id is UUID, and the full initStore path is
idempotent across restarts.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
CreateTemplate/UpdateTemplate and CreateHarnessConfig/UpdateHarnessConfig
passed the Go bool template.Locked / hc.Locked straight into the
locked INTEGER column. SQLite coerces bool->int silently, but lib/pq sends
it as the string "false" and Postgres rejects it:

  pq: invalid input syntax for type integer: "false" (22P02)

This made every template + harness-config bootstrap import fail on a
Postgres-backed hub (claude/codex/gemini/opencode all skipped), so agents
would later error harness-config "claude" not found. Wrap both write sites
with boolToInt() to match every other bool->INTEGER column in the store
(brokers.AutoProvide, envvars.Secret/Sensitive, tokens.Revoked, ...).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…eAgentStatus

The agent heartbeat status update bound $19 (su.StartedAt, an RFC3339 string)
straight into COALESCE(NULLIF($19, ''), started_at) against the started_at
TIMESTAMP column. Postgres rejects this with
"COALESCE types text and timestamp without time zone cannot be matched (42804)"
because NULLIF($19,'') is typed text, while SQLite's dynamic typing tolerated it.

This broke every running-agent heartbeat: the hub logged
"Failed to update agent status from heartbeat" and returned 502 to the
broker dispatch, so `scion start` saw a 30s context-deadline timeout even
though the agent pod came up fine.

Cast the text param to ::timestamp so COALESCE has matching types. The empty
-> NULL path still falls through to the existing started_at value. Verified
the RFC3339 'Z' string casts cleanly against the live Lakebase backend.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@lerms lerms force-pushed the feat/postgres-store branch from b471031 to 3836347 Compare June 3, 2026 23:18
Adds a configurable 'generic' OAuth2/OIDC provider alongside the hardcoded
Google/GitHub ones, so the Hub web/CLI/device login can federate to any
standards-compliant issuer (notably the in-cluster Dex) instead of only
Google/GitHub.

Config mirrors Better Auth's genericOAuth shape, via
SCION_SERVER_OAUTH_<CLIENT>_GENERIC_*:
  - clientId / clientSecret
  - discoveryUrl (full .well-known URL) or issuer (derives the well-known
    path) for OIDC discovery, or explicit authorizationUrl/tokenUrl/userInfoUrl
  - scopes (default 'openid email profile')

Endpoints resolve as: explicit authorize+token > discoveryUrl > issuer-derived
discovery; discovery docs are cached per URL. Userinfo maps the standard
sub/email/name/picture claims.

Touches: hubclient OAuthProviderGeneric constant + order; hub OAuthService
dispatch (auth URL + code exchange); config struct + server wiring; web login
+ callback provider validation. Google/GitHub unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
lerms added a commit to neuralnetes/scion that referenced this pull request Jun 4, 2026
…stry guard, concurrency

Three genuine Postgres-specific correctness fixes from the PR GoogleCloudPlatform#289 review:

- V40/V31 migrations: replace the SQLite-style DROP TABLE ... CASCADE +
  recreate with in-place ALTER TABLE. In Postgres, CASCADE silently drops
  every foreign key in other tables that references the recreated table and
  never restores them (schema corruption). Postgres supports DROP/ADD
  CONSTRAINT and ALTER COLUMN DROP NOT NULL directly. Verified against live
  Postgres 16: all 7 FKs referencing the projects (ex-groves) table and the
  notifications->notification_subscriptions FK survive.

- ListAgents ancestry filter: json_array_elements_text(ancestry::json) crashes
  on empty-string/NULL ancestry ('invalid input syntax for type json'). Guard
  with COALESCE(NULLIF(ancestry,''),'[]'). SQLite's json_each tolerated this.

- Migrate(): acquire a database-global pg_advisory_lock on a pinned connection
  so concurrent hub replicas serialize migrations (the stateless-scale-out goal
  of this backend). Bump pool 4 -> 25 to avoid connection starvation under
  concurrent load.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants