feat(store): add Postgres backend for the hub store#289
Conversation
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive PostgreSQL store implementation parallel to the existing SQLite backend, enabling stateless control-plane deployments. While the implementation is thorough, several critical issues and optimization opportunities were identified. Specifically, migrations V31 and V40 pose schema corruption risks by using CASCADE drops that silently destroy foreign keys; these should be replaced with direct ALTER TABLE statements. Additionally, concurrent migrations in multi-replica environments could trigger race conditions, which can be mitigated using Postgres advisory locks. Keyset pagination is currently broken or ignored in several listing methods (ListAgents, ListMessages, and ListScheduledEvents), and potential query crashes exist when parsing empty or null ancestry fields. Finally, performance can be significantly improved by replacing recursive Go-based database queries with recursive CTEs for group operations, utilizing JSONB containment operators for repository lookups, and increasing the low default connection pool limit.
| const migrationV40 = ` | ||
| CREATE TABLE IF NOT EXISTS groves_new ( | ||
| id TEXT PRIMARY KEY, | ||
| name TEXT NOT NULL, | ||
| slug TEXT NOT NULL UNIQUE, | ||
| git_remote TEXT, | ||
| labels TEXT, | ||
| annotations TEXT, | ||
| created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, | ||
| updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, | ||
| created_by TEXT, | ||
| owner_id TEXT, | ||
| visibility TEXT NOT NULL DEFAULT 'private', | ||
| default_runtime_broker_id TEXT REFERENCES runtime_brokers(id) ON DELETE SET NULL, | ||
| shared_dirs TEXT, | ||
| github_installation_id INTEGER REFERENCES github_installations(installation_id), | ||
| github_permissions TEXT, | ||
| github_app_status TEXT, | ||
| git_identity TEXT | ||
| ); | ||
|
|
||
| INSERT INTO groves_new SELECT | ||
| id, name, slug, git_remote, labels, annotations, | ||
| created_at, updated_at, created_by, owner_id, visibility, | ||
| default_runtime_broker_id, shared_dirs, | ||
| github_installation_id, github_permissions, github_app_status, | ||
| git_identity | ||
| FROM groves ON CONFLICT DO NOTHING; | ||
|
|
||
| DROP TABLE IF EXISTS groves CASCADE; | ||
| ALTER TABLE groves_new RENAME TO groves; | ||
|
|
||
| CREATE INDEX IF NOT EXISTS idx_groves_slug ON groves(slug); | ||
| CREATE INDEX IF NOT EXISTS idx_groves_git_remote ON groves(git_remote); | ||
| CREATE INDEX IF NOT EXISTS idx_groves_owner ON groves(owner_id); | ||
| CREATE INDEX IF NOT EXISTS idx_groves_default_runtime_broker ON groves(default_runtime_broker_id); | ||
| ` |
There was a problem hiding this comment.
Recreating the groves table and dropping the old one with CASCADE is extremely dangerous in Postgres. The CASCADE option silently drops all foreign key constraints in other tables (such as agents, templates, schedules, etc.) that reference groves(id). These foreign keys are never recreated, leading to database schema corruption. Since Postgres natively supports dropping and adding constraints, we should use ALTER TABLE directly instead of recreating the table.
const migrationV40 = "ALTER TABLE groves DROP CONSTRAINT IF EXISTS groves_git_remote_key;\nALTER TABLE groves ADD CONSTRAINT groves_slug_key UNIQUE (slug);"There was a problem hiding this comment.
✅ Fixed in 3158517. You're right — this is genuine schema corruption in Postgres. Replaced the DROP TABLE ... CASCADE + recreate with in-place ALTER TABLE ... DROP/ADD CONSTRAINT (the SQLite source only recreates because SQLite cannot alter constraints; Postgres can).
Verified against live Postgres 16 after the fix: all 7 FKs referencing the table survive (agents, project_contributors, project_sync_state, scheduled_events, schedules, templates, user_access_tokens), *_slug_key UNIQUE is present, and the git_remote UNIQUE is gone. (Constraint names keep the groves_ prefix because the later V50 table rename doesn't rename constraints — identical to the SQLite backend, harmless.)
| const migrationV31 = ` | ||
| -- Postgres supports ALTER TABLE directly, so we recreate the table as in SQLite source. | ||
| CREATE TABLE notification_subscriptions_new ( | ||
| id TEXT PRIMARY KEY, | ||
| scope TEXT NOT NULL DEFAULT 'agent', | ||
| agent_id TEXT, | ||
| subscriber_type TEXT NOT NULL DEFAULT 'agent', | ||
| subscriber_id TEXT NOT NULL, | ||
| grove_id TEXT NOT NULL, | ||
| trigger_activities TEXT NOT NULL, | ||
| created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, | ||
| created_by TEXT NOT NULL, | ||
| FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE | ||
| ); | ||
|
|
||
| -- Copy existing data (all existing subscriptions are agent-scoped) | ||
| INSERT INTO notification_subscriptions_new | ||
| (id, scope, agent_id, subscriber_type, subscriber_id, grove_id, trigger_activities, created_at, created_by) | ||
| SELECT id, 'agent', agent_id, subscriber_type, subscriber_id, grove_id, trigger_activities, created_at, created_by | ||
| FROM notification_subscriptions; | ||
|
|
||
| DROP TABLE notification_subscriptions CASCADE; | ||
| ALTER TABLE notification_subscriptions_new RENAME TO notification_subscriptions; | ||
|
|
||
| -- Recreate indexes | ||
| CREATE INDEX IF NOT EXISTS idx_notification_subs_agent ON notification_subscriptions(agent_id); | ||
| CREATE INDEX IF NOT EXISTS idx_notification_subs_project ON notification_subscriptions(grove_id); | ||
| CREATE INDEX IF NOT EXISTS idx_notification_subs_subscriber ON notification_subscriptions(subscriber_type, subscriber_id); | ||
|
|
||
| -- Unique constraint: one subscription per (scope, target, subscriber, project) | ||
| CREATE UNIQUE INDEX IF NOT EXISTS idx_notification_subs_unique | ||
| ON notification_subscriptions(scope, COALESCE(agent_id, ''), subscriber_type, subscriber_id, grove_id); | ||
| ` |
There was a problem hiding this comment.
Dropping the notification_subscriptions table with CASCADE silently destroys the foreign key constraint in the notifications table (notifications.subscription_id references notification_subscriptions(id)). This constraint is never recreated, leading to schema corruption. Since Postgres natively supports adding columns and dropping NOT NULL constraints, we should use ALTER TABLE directly instead of recreating the table.
const migrationV31 = "ALTER TABLE notification_subscriptions ADD COLUMN scope TEXT NOT NULL DEFAULT 'agent';\nALTER TABLE notification_subscriptions ALTER COLUMN agent_id DROP NOT NULL;\nCREATE UNIQUE INDEX IF NOT EXISTS idx_notification_subs_unique ON notification_subscriptions(scope, COALESCE(agent_id, ''), subscriber_type, subscriber_id, grove_id);"There was a problem hiding this comment.
✅ Fixed in 3158517. Same CASCADE corruption — the recreate silently dropped notifications.subscription_id → notification_subscriptions(id). Replaced with in-place ADD COLUMN ... scope + ALTER COLUMN agent_id DROP NOT NULL, keeping the existing dedup unique index. Verified on live Postgres: scope (NOT NULL) and nullable agent_id are present and the dependent FK survives.
| } | ||
|
|
||
| // Migrate applies database migrations. | ||
| func (s *PostgresStore) Migrate(ctx context.Context) error { |
There was a problem hiding this comment.
In a stateless cloud topology where multiple replicas of the hub start up concurrently, they will all attempt to run database migrations at the same time. This can lead to race conditions, duplicate key errors, or table creation failures. Using a session-level Postgres advisory lock (pg_advisory_lock) at the start of the migration process ensures that only one replica runs the migrations at a time, while others wait safely.
func (s *PostgresStore) Migrate(ctx context.Context) error {
// Acquire a session-level advisory lock to prevent concurrent migrations
// when scaling horizontally. 123456 is an arbitrary lock ID.
if _, err := s.db.ExecContext(ctx, "SELECT pg_advisory_lock(123456)"); err != nil {
return fmt.Errorf("failed to acquire advisory lock: %w", err)
}
defer s.db.ExecContext(ctx, "SELECT pg_advisory_unlock(123456)")There was a problem hiding this comment.
✅ Fixed in 3158517. Added a database-global pg_advisory_lock around Migrate() so concurrent hub replicas serialize migrations — exactly the stateless scale-out this backend exists for.
One correctness refinement over the snippet: a session-level advisory lock's lock and unlock must run on the same session, but with a connection pool they can land on different pooled connections. So I pin the lock to a dedicated *sql.Conn (db.Conn(ctx) … defer lockConn.Close()); the migrations still run via the pool, and since the lock is database-global it blocks other replicas regardless. Verified no advisory locks leak after Migrate() returns.
| whereClause := "" | ||
| if len(conditions) > 0 { | ||
| whereClause = "WHERE " + strings.Join(conditions, " AND ") | ||
| } | ||
|
|
||
| // Get total count | ||
| var totalCount int | ||
| countQuery := fmt.Sprintf("SELECT COUNT(*) FROM scheduled_events %s", whereClause) | ||
| if err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // Apply pagination | ||
| limit := opts.Limit | ||
| if limit <= 0 { | ||
| limit = 50 | ||
| } | ||
| if limit > 200 { | ||
| limit = 200 | ||
| } | ||
|
|
||
| query := fmt.Sprintf(` | ||
| SELECT id, project_id, event_type, fire_at, payload, status, | ||
| created_at, created_by, fired_at, error, schedule_id | ||
| FROM scheduled_events %s | ||
| ORDER BY created_at DESC | ||
| LIMIT $%d | ||
| `, whereClause, len(args)+1) | ||
|
|
||
| queryArgs := append(args, limit+1) //nolint:gocritic // intentional append to copy | ||
|
|
||
| if opts.Cursor != "" { | ||
| if whereClause == "" { | ||
| query = fmt.Sprintf(` | ||
| SELECT id, project_id, event_type, fire_at, payload, status, | ||
| created_at, created_by, fired_at, error, schedule_id | ||
| FROM scheduled_events WHERE id < $%d | ||
| ORDER BY created_at DESC | ||
| LIMIT $%d | ||
| `, len(args)+1, len(args)+2) | ||
| } else { | ||
| query = fmt.Sprintf(` | ||
| SELECT id, project_id, event_type, fire_at, payload, status, | ||
| created_at, created_by, fired_at, error, schedule_id | ||
| FROM scheduled_events %s AND id < $%d | ||
| ORDER BY created_at DESC | ||
| LIMIT $%d | ||
| `, whereClause, len(args)+1, len(args)+2) | ||
| } | ||
| queryArgs = append(args, opts.Cursor, limit+1) //nolint:gocritic | ||
| } | ||
|
|
There was a problem hiding this comment.
The keyset pagination logic here is broken. The query sorts by created_at DESC but filters by id < opts.Cursor. Since id is a UUID, comparing it has no relation to the created_at order, resulting in scrambled and incorrect pagination. We should use a proper keyset pagination condition comparing both created_at and id using subqueries. Additionally, appending this condition directly to conditions simplifies the code and avoids duplicating the entire query.
if opts.Cursor != "" {
conditions = append(conditions, fmt.Sprintf("(created_at < (SELECT created_at FROM scheduled_events WHERE id = $%d) OR (created_at = (SELECT created_at FROM scheduled_events WHERE id = $%d) AND id < $%d))", len(args)+1, len(args)+2, len(args)+3))
args = append(args, opts.Cursor, opts.Cursor, opts.Cursor)
}
whereClause := ""
if len(conditions) > 0 {
whereClause = "WHERE " + strings.Join(conditions, " AND ")
}
// Get total count
var totalCount int
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM scheduled_events %s", whereClause)
if err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount); err != nil {
return nil, err
}
// Apply pagination
limit := opts.Limit
if limit <= 0 {
limit = 50
}
if limit > 200 {
limit = 200
}
query := fmt.Sprintf("SELECT id, project_id, event_type, fire_at, payload, status, created_at, created_by, fired_at, error, schedule_id FROM scheduled_events %s ORDER BY created_at DESC LIMIT $%d", whereClause, len(args)+1)
queryArgs := append(args, limit+1)There was a problem hiding this comment.
⏸️ Won't fix in this PR — faithful-mirror scope. This one does consume opts.Cursor, but the id < cursor + ORDER BY created_at DESC shape is copied verbatim from the SQLite source (ListScheduledEvents uses the identical AND id < ? / ORDER BY created_at DESC). The mixed sort-vs-filter-key critique is valid, but it applies equally to upstream SQLite — fixing it here only would make the two backends diverge. Better as a cross-backend follow-up.
| // Exclude soft-deleted agents unless explicitly requested | ||
| if !filter.IncludeDeleted { | ||
| conditions = append(conditions, "deleted_at IS NULL") | ||
| } | ||
|
|
||
| whereClause := "" |
There was a problem hiding this comment.
The ListAgents method sets NextCursor in the return value but completely ignores opts.Cursor in the SQL query. This breaks pagination entirely, causing the UI to reload the first page infinitely when clicking 'Next Page'. We should add the keyset pagination condition to conditions using the created_at and id fields.
| // Exclude soft-deleted agents unless explicitly requested | |
| if !filter.IncludeDeleted { | |
| conditions = append(conditions, "deleted_at IS NULL") | |
| } | |
| whereClause := "" | |
| if opts.Cursor != "" { | |
| conditions = append(conditions, fmt.Sprintf("(created_at < (SELECT created_at FROM agents WHERE id = $%d) OR (created_at = (SELECT created_at FROM agents WHERE id = $%d) AND id < $%d))", len(args)+1, len(args)+2, len(args)+3)) | |
| args = append(args, opts.Cursor, opts.Cursor, opts.Cursor) | |
| } | |
| // Exclude soft-deleted agents unless explicitly requested | |
| if !filter.IncludeDeleted { | |
| conditions = append(conditions, "deleted_at IS NULL") | |
| } | |
| whereClause := "" |
There was a problem hiding this comment.
⏸️ Won't fix in this PR — faithful-mirror scope. Confirmed the SQLite source (ListAgents) has the identical behavior: it sets result.NextCursor = agents[limit-1].ID and never consumes opts.Cursor either. So this isn't a regression introduced by the Postgres backend — it's pre-existing upstream behavior that this PR faithfully mirrors (the stated mandate: 1-to-1 with pkg/store/sqlite, no behavioral drift).
Fixing keyset pagination is a legitimate bug, but it should land in a dedicated change that fixes both backends together so they don't diverge. Happy to file/track that as a follow-up.
| // GetEffectiveGroups returns all groups a user belongs to, including transitive memberships. | ||
| func (s *PostgresStore) GetEffectiveGroups(ctx context.Context, userID string) ([]string, error) { | ||
| // Start with direct group memberships | ||
| directMemberships, err := s.GetUserGroups(ctx, userID) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| effectiveGroups := make(map[string]bool) | ||
| for _, m := range directMemberships { | ||
| effectiveGroups[m.GroupID] = true | ||
| // Add transitive group memberships | ||
| if err := s.addTransitiveGroups(ctx, m.GroupID, effectiveGroups); err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
|
|
||
| result := make([]string, 0, len(effectiveGroups)) | ||
| for groupID := range effectiveGroups { | ||
| result = append(result, groupID) | ||
| } | ||
|
|
||
| return result, nil | ||
| } | ||
|
|
||
| // addTransitiveGroups recursively adds all groups that contain the given group. | ||
| func (s *PostgresStore) addTransitiveGroups(ctx context.Context, groupID string, visited map[string]bool) error { | ||
| // Find all groups where this group is a member | ||
| rows, err := s.db.QueryContext(ctx, | ||
| "SELECT group_id FROM group_members WHERE member_type = 'group' AND member_id = $1", groupID) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Collect all parent group IDs first, then close rows before recursing | ||
| // This avoids issues with SQLite connections during recursive queries | ||
| var parentGroupIDs []string | ||
| for rows.Next() { | ||
| var parentGroupID string | ||
| if err := rows.Scan(&parentGroupID); err != nil { | ||
| rows.Close() | ||
| return err | ||
| } | ||
| parentGroupIDs = append(parentGroupIDs, parentGroupID) | ||
| } | ||
| rows.Close() | ||
|
|
||
| // Now recurse after rows are closed | ||
| for _, parentGroupID := range parentGroupIDs { | ||
| if !visited[parentGroupID] { | ||
| visited[parentGroupID] = true | ||
| if err := s.addTransitiveGroups(ctx, parentGroupID, visited); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
There was a problem hiding this comment.
Fetching transitive group memberships recursively in Go code results in O(N) database roundtrips, which is highly inefficient and slow. We can optimize this to a single O(1) database query using a Recursive Common Table Expression (CTE), which is fully supported by Postgres.
// GetEffectiveGroups returns all groups a user belongs to, including transitive memberships.
func (s *PostgresStore) GetEffectiveGroups(ctx context.Context, userID string) ([]string, error) {
rows, err := s.db.QueryContext(ctx, "WITH RECURSIVE user_groups AS (SELECT group_id FROM group_members WHERE member_type = 'user' AND member_id = $1 UNION SELECT gm.group_id FROM group_members gm INNER JOIN user_groups ug ON gm.member_type = 'group' AND gm.member_id = ug.group_id) SELECT group_id FROM user_groups", userID)
if err != nil {
return nil, err
}
defer rows.Close()
var groupIDs []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
groupIDs = append(groupIDs, id)
}
if err := rows.Err(); err != nil {
return nil, err
}
return groupIDs, nil
}There was a problem hiding this comment.
⏸️ Won't fix in this PR — faithful-mirror scope. The SQLite source resolves transitive memberships with Go-side iteration too (for _, m := range directMemberships … for groupID := range effectiveGroups), so this mirrors upstream rather than introducing the N-roundtrip pattern. A recursive CTE is a real improvement, but as a Postgres-only change it would diverge the backends. Good candidate for a cross-backend optimization follow-up.
| // WouldCreateCycle checks if adding memberGroupID as a member of groupID would create a cycle. | ||
| // A cycle exists if groupID is reachable from memberGroupID by following the containment relationship. | ||
| // Example: if A contains B, and we try to add A as member of B, we'd have A->B->A (cycle). | ||
| func (s *PostgresStore) WouldCreateCycle(ctx context.Context, groupID, memberGroupID string) (bool, error) { | ||
| // If they're the same, it's a direct cycle | ||
| if groupID == memberGroupID { | ||
| return true, nil | ||
| } | ||
|
|
||
| // Check if groupID is reachable from memberGroupID by traversing DOWN the containment graph | ||
| // (i.e., checking what groups memberGroupID contains, and what those contain, etc.) | ||
| visited := make(map[string]bool) | ||
| return s.hasPathDown(ctx, memberGroupID, groupID, visited) | ||
| } | ||
|
|
||
| // hasPathDown checks if 'target' is reachable from 'current' by following containment. | ||
| // It looks at what groups 'current' contains as members. | ||
| func (s *PostgresStore) hasPathDown(ctx context.Context, current, target string, visited map[string]bool) (bool, error) { | ||
| if current == target { | ||
| return true, nil | ||
| } | ||
| if visited[current] { | ||
| return false, nil | ||
| } | ||
| visited[current] = true | ||
|
|
||
| // Get all groups that 'current' contains (groups where current is the group_id) | ||
| rows, err := s.db.QueryContext(ctx, | ||
| "SELECT member_id FROM group_members WHERE member_type = 'group' AND group_id = $1", current) | ||
| if err != nil { | ||
| return false, err | ||
| } | ||
| defer rows.Close() | ||
|
|
||
| for rows.Next() { | ||
| var childGroupID string | ||
| if err := rows.Scan(&childGroupID); err != nil { | ||
| return false, err | ||
| } | ||
| found, err := s.hasPathDown(ctx, childGroupID, target, visited) | ||
| if err != nil { | ||
| return false, err | ||
| } | ||
| if found { | ||
| return true, nil | ||
| } | ||
| } | ||
|
|
||
| return false, nil | ||
| } | ||
|
|
There was a problem hiding this comment.
Checking for cycles recursively in Go code results in O(N) database roundtrips, which is highly inefficient. We can optimize this to a single O(1) database query using a Recursive Common Table Expression (CTE), which is fully supported by Postgres.
| // WouldCreateCycle checks if adding memberGroupID as a member of groupID would create a cycle. | |
| // A cycle exists if groupID is reachable from memberGroupID by following the containment relationship. | |
| // Example: if A contains B, and we try to add A as member of B, we'd have A->B->A (cycle). | |
| func (s *PostgresStore) WouldCreateCycle(ctx context.Context, groupID, memberGroupID string) (bool, error) { | |
| // If they're the same, it's a direct cycle | |
| if groupID == memberGroupID { | |
| return true, nil | |
| } | |
| // Check if groupID is reachable from memberGroupID by traversing DOWN the containment graph | |
| // (i.e., checking what groups memberGroupID contains, and what those contain, etc.) | |
| visited := make(map[string]bool) | |
| return s.hasPathDown(ctx, memberGroupID, groupID, visited) | |
| } | |
| // hasPathDown checks if 'target' is reachable from 'current' by following containment. | |
| // It looks at what groups 'current' contains as members. | |
| func (s *PostgresStore) hasPathDown(ctx context.Context, current, target string, visited map[string]bool) (bool, error) { | |
| if current == target { | |
| return true, nil | |
| } | |
| if visited[current] { | |
| return false, nil | |
| } | |
| visited[current] = true | |
| // Get all groups that 'current' contains (groups where current is the group_id) | |
| rows, err := s.db.QueryContext(ctx, | |
| "SELECT member_id FROM group_members WHERE member_type = 'group' AND group_id = $1", current) | |
| if err != nil { | |
| return false, err | |
| } | |
| defer rows.Close() | |
| for rows.Next() { | |
| var childGroupID string | |
| if err := rows.Scan(&childGroupID); err != nil { | |
| return false, err | |
| } | |
| found, err := s.hasPathDown(ctx, childGroupID, target, visited) | |
| if err != nil { | |
| return false, err | |
| } | |
| if found { | |
| return true, nil | |
| } | |
| } | |
| return false, nil | |
| } | |
| // WouldCreateCycle checks if adding memberGroupID as a member of groupID would create a cycle. | |
| // A cycle exists if groupID is reachable from memberGroupID by following the containment relationship. | |
| // Example: if A contains B, and we try to add A as member of B, we'd have A->B->A (cycle). | |
| func (s *PostgresStore) WouldCreateCycle(ctx context.Context, groupID, memberGroupID string) (bool, error) { | |
| if groupID == memberGroupID { | |
| return true, nil | |
| } | |
| var exists bool | |
| err := s.db.QueryRowContext(ctx, "WITH RECURSIVE containment AS (SELECT member_id FROM group_members WHERE member_type = 'group' AND group_id = $1 UNION SELECT gm.member_id FROM group_members gm INNER JOIN containment c ON gm.member_type = 'group' AND gm.group_id = c.member_id) SELECT EXISTS (SELECT 1 FROM containment WHERE member_id = $2)", memberGroupID, groupID).Scan(&exists) | |
| if err != nil { | |
| return false, err | |
| } | |
| return exists, nil | |
| } |
There was a problem hiding this comment.
⏸️ Won't fix in this PR — faithful-mirror scope. Same as GetEffectiveGroups: the SQLite source does the cycle check via Go recursion, and this package mirrors it 1-to-1. Recursive-CTE optimization is worth doing across both backends together in a follow-up.
| // CleanExpiredJoinTokens removes all expired join tokens. | ||
| func (s *PostgresStore) CleanExpiredJoinTokens(ctx context.Context) error { | ||
| _, err := s.db.ExecContext(ctx, ` | ||
| DELETE FROM broker_join_tokens WHERE expires_at < $1 | ||
| `, time.Now()) | ||
| return err | ||
| } |
There was a problem hiding this comment.
Using time.Now() in Go passes the application server's local time to the database, which can lead to issues due to clock skew or timezone mismatches. It is more robust and consistent to use the database's native NOW() function directly in the SQL query.
// CleanExpiredJoinTokens removes all expired join tokens.
func (s *PostgresStore) CleanExpiredJoinTokens(ctx context.Context) error {
_, err := s.db.ExecContext(ctx, "DELETE FROM broker_join_tokens WHERE expires_at < NOW()")
return err
}There was a problem hiding this comment.
⏸️ Won't fix in this PR — faithful-mirror scope. The SQLite source passes time.Now() as a bind parameter here too, so this mirrors upstream. Switching to the DB's NOW() is a fair robustness point (clock-skew), but it should change both backends together. Noting as a follow-up.
| func (s *PostgresStore) GetInstallationForRepository(ctx context.Context, repoFullName string) (*store.GitHubInstallation, error) { | ||
| // Search active installations whose repositories JSON array contains the repo. | ||
| installations, err := s.ListGitHubInstallations(ctx, store.GitHubInstallationFilter{ | ||
| Status: store.GitHubInstallationStatusActive, | ||
| }) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| for i := range installations { | ||
| for _, repo := range installations[i].Repositories { | ||
| if repo == repoFullName { | ||
| return &installations[i], nil | ||
| } | ||
| } | ||
| } | ||
| return nil, store.ErrNotFound | ||
| } | ||
|
|
There was a problem hiding this comment.
Fetching all active GitHub installations into Go memory and filtering them in Go is highly inefficient and doesn't scale. We can delegate this filtering to the database using Postgres's native JSONB containment operator (@>) to find the matching installation in a single query.
| func (s *PostgresStore) GetInstallationForRepository(ctx context.Context, repoFullName string) (*store.GitHubInstallation, error) { | |
| // Search active installations whose repositories JSON array contains the repo. | |
| installations, err := s.ListGitHubInstallations(ctx, store.GitHubInstallationFilter{ | |
| Status: store.GitHubInstallationStatusActive, | |
| }) | |
| if err != nil { | |
| return nil, err | |
| } | |
| for i := range installations { | |
| for _, repo := range installations[i].Repositories { | |
| if repo == repoFullName { | |
| return &installations[i], nil | |
| } | |
| } | |
| } | |
| return nil, store.ErrNotFound | |
| } | |
| func (s *PostgresStore) GetInstallationForRepository(ctx context.Context, repoFullName string) (*store.GitHubInstallation, error) { | |
| var inst store.GitHubInstallation | |
| var repos string | |
| err := s.db.QueryRowContext(ctx, "SELECT installation_id, account_login, account_type, app_id, repositories, status, created_at, updated_at FROM github_installations WHERE status = $1 AND repositories::jsonb @> jsonb_build_array($2) LIMIT 1", store.GitHubInstallationStatusActive, repoFullName).Scan( | |
| &inst.InstallationID, &inst.AccountLogin, &inst.AccountType, | |
| &inst.AppID, &repos, &inst.Status, &inst.CreatedAt, &inst.UpdatedAt, | |
| ) | |
| if err != nil { | |
| if errors.Is(err, sql.ErrNoRows) { | |
| return nil, store.ErrNotFound | |
| } | |
| return nil, err | |
| } | |
| unmarshalJSON(repos, &inst.Repositories) | |
| return &inst, nil | |
| } |
There was a problem hiding this comment.
⏸️ Won't fix in this PR — faithful-mirror scope. The SQLite source also fetches active installations and filters the repositories JSON array in Go (for i := range installations / for _, repo := range ...). Pushing the filter into a jsonb @> query is a nice optimization but only available on the Postgres side, so doing it here alone diverges the backends. Tracking as a follow-up.
| db.SetMaxOpenConns(4) | ||
| db.SetMaxIdleConns(4) |
There was a problem hiding this comment.
A connection pool limit of 4 is extremely low for a concurrent production Postgres database. Under concurrent load, this will quickly lead to connection starvation and high latency. We should increase the default to a more reasonable value (e.g., 25) or make it configurable.
| db.SetMaxOpenConns(4) | |
| db.SetMaxIdleConns(4) | |
| db.SetMaxOpenConns(25) | |
| db.SetMaxIdleConns(25) |
There was a problem hiding this comment.
✅ Fixed in 3158517 — bumped to SetMaxOpenConns(25) / SetMaxIdleConns(25). (The 4 was a literal carryover from the SQLite package, where it's a single-writer concession; it makes no sense for Postgres. Added a comment explaining the divergence.)
…stry guard, concurrency Three genuine Postgres-specific correctness fixes from the PR GoogleCloudPlatform#289 review: - V40/V31 migrations: replace the SQLite-style DROP TABLE ... CASCADE + recreate with in-place ALTER TABLE. In Postgres, CASCADE silently drops every foreign key in other tables that references the recreated table and never restores them (schema corruption). Postgres supports DROP/ADD CONSTRAINT and ALTER COLUMN DROP NOT NULL directly. Verified against live Postgres 16: all 7 FKs referencing the projects (ex-groves) table and the notifications->notification_subscriptions FK survive. - ListAgents ancestry filter: json_array_elements_text(ancestry::json) crashes on empty-string/NULL ancestry ('invalid input syntax for type json'). Guard with COALESCE(NULLIF(ancestry,''),'[]'). SQLite's json_each tolerated this. - Migrate(): acquire a database-global pg_advisory_lock on a pinned connection so concurrent hub replicas serialize migrations (the stateless-scale-out goal of this backend). Bump pool 4 -> 25 to avoid connection starvation under concurrent load. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Response to Gemini reviewThanks for the thorough pass. I triaged all 12 comments against the SQLite source, because this PR's explicit mandate is a faithful 1-to-1 mirror of ✅ Fixed (5) — genuinely Postgres-specific
All verified against live Postgres 16: 53/53 migrations apply, all 7 FKs to the ex- ⏸️ Deferred (7) — faithful reproductions of the SQLite sourceThese are all valid observations, but the behavior is identical in
Fixing these (keyset pagination, recursive CTEs, |
|
See the broad outline of a plan in #53 , more work on this is starting this week |
Adds a faithful pkg/store/postgres mirror of pkg/store/sqlite so the hub
can run as a stateless control plane against an external Postgres,
configured via database.driver: postgres (SCION_SERVER_DATABASE_DRIVER /
SCION_SERVER_DATABASE_URL) instead of a node-local SQLite file.
The ent layer already supported Postgres (entc.OpenPostgres); this adds
the hand-written store's Postgres twin and wires initStore's
case "postgres": to compose them via entadapter.NewCompositeStore.
The mirror stays 1-to-1 with the SQLite implementation — same method
names, signatures, comments and control flow (206 methods each). The only
differences are dialect mechanics:
- ? -> $N numbered placeholders
- INSERT OR IGNORE/REPLACE -> ON CONFLICT DO NOTHING / DO UPDATE
- sqlite_master / PRAGMA introspection -> information_schema
- randomblob() UUID seeds -> gen_random_uuid()
- json_each() -> json_array_elements_text(); json_array() -> json_build_array()
- COLLATE NOCASE -> LOWER(email) functional unique index
- BLOB -> BYTEA; datetime('now') -> NOW(); INSTR/SUBSTR -> split_part
The grove->project data backfill (MigrateGroveToProjectData) is a
SQLite-era repair and is intentionally skipped on the Postgres path —
a fresh Postgres DB has no legacy grove rows.
The Postgres driver import is gated behind a no_postgres build tag,
mirroring the existing no_sqlite tag, so slim images can exclude either
driver.
Tests: env-gated integration tests (SCION_TEST_POSTGRES_URL) cover all 53
migrations plus CRUD round-trips for users, projects, agents, secrets,
groups, policies, invite codes and env vars. They skip when the env var
is unset so the CI no_sqlite path (make test-fast, no DB) stays green.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…stry guard, concurrency Three genuine Postgres-specific correctness fixes from the PR GoogleCloudPlatform#289 review: - V40/V31 migrations: replace the SQLite-style DROP TABLE ... CASCADE + recreate with in-place ALTER TABLE. In Postgres, CASCADE silently drops every foreign key in other tables that references the recreated table and never restores them (schema corruption). Postgres supports DROP/ADD CONSTRAINT and ALTER COLUMN DROP NOT NULL directly. Verified against live Postgres 16: all 7 FKs referencing the projects (ex-groves) table and the notifications->notification_subscriptions FK survive. - ListAgents ancestry filter: json_array_elements_text(ancestry::json) crashes on empty-string/NULL ancestry ('invalid input syntax for type json'). Guard with COALESCE(NULLIF(ancestry,''),'[]'). SQLite's json_each tolerated this. - Migrate(): acquire a database-global pg_advisory_lock on a pinned connection so concurrent hub replicas serialize migrations (the stateless-scale-out goal of this backend). Bump pool 4 -> 25 to avoid connection starvation under concurrent load. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…-cast collision The SQLite store gives Ent its own database file (entDSN := url + "_ent"), so the raw-migration tables and the Ent-managed tables never share storage. The Postgres path opened Ent on the *same* DSN as the raw store, so Ent's auto-migration tried to ALTER tables the raw migrations had already created with different column types — projects.id is TEXT in the raw schema but UUID in the Ent model — failing at boot with: ent migrate: modify "projects" table: pq: column "id" cannot be cast automatically to type uuid (42804) Create a dedicated `ent` schema and pin the Ent client's search_path to it (withSearchPath helper, URL + keyword DSN forms), the Postgres analog of SQLite's separate _ent file. Verified end-to-end against a live PG16 Lakebase: raw store lands 31 tables in `public`, Ent lands 8 in `ent`, public.projects.id stays TEXT while ent.projects.id is UUID, and the full initStore path is idempotent across restarts. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
CreateTemplate/UpdateTemplate and CreateHarnessConfig/UpdateHarnessConfig passed the Go bool template.Locked / hc.Locked straight into the locked INTEGER column. SQLite coerces bool->int silently, but lib/pq sends it as the string "false" and Postgres rejects it: pq: invalid input syntax for type integer: "false" (22P02) This made every template + harness-config bootstrap import fail on a Postgres-backed hub (claude/codex/gemini/opencode all skipped), so agents would later error harness-config "claude" not found. Wrap both write sites with boolToInt() to match every other bool->INTEGER column in the store (brokers.AutoProvide, envvars.Secret/Sensitive, tokens.Revoked, ...). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…eAgentStatus The agent heartbeat status update bound $19 (su.StartedAt, an RFC3339 string) straight into COALESCE(NULLIF($19, ''), started_at) against the started_at TIMESTAMP column. Postgres rejects this with "COALESCE types text and timestamp without time zone cannot be matched (42804)" because NULLIF($19,'') is typed text, while SQLite's dynamic typing tolerated it. This broke every running-agent heartbeat: the hub logged "Failed to update agent status from heartbeat" and returned 502 to the broker dispatch, so `scion start` saw a 30s context-deadline timeout even though the agent pod came up fine. Cast the text param to ::timestamp so COALESCE has matching types. The empty -> NULL path still falls through to the existing started_at value. Verified the RFC3339 'Z' string casts cleanly against the live Lakebase backend. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
b471031 to
3836347
Compare
Adds a configurable 'generic' OAuth2/OIDC provider alongside the hardcoded
Google/GitHub ones, so the Hub web/CLI/device login can federate to any
standards-compliant issuer (notably the in-cluster Dex) instead of only
Google/GitHub.
Config mirrors Better Auth's genericOAuth shape, via
SCION_SERVER_OAUTH_<CLIENT>_GENERIC_*:
- clientId / clientSecret
- discoveryUrl (full .well-known URL) or issuer (derives the well-known
path) for OIDC discovery, or explicit authorizationUrl/tokenUrl/userInfoUrl
- scopes (default 'openid email profile')
Endpoints resolve as: explicit authorize+token > discoveryUrl > issuer-derived
discovery; discovery docs are cached per URL. Userinfo maps the standard
sub/email/name/picture claims.
Touches: hubclient OAuthProviderGeneric constant + order; hub OAuthService
dispatch (auth URL + code exchange); config struct + server wiring; web login
+ callback provider validation. Google/GitHub unchanged.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…stry guard, concurrency Three genuine Postgres-specific correctness fixes from the PR GoogleCloudPlatform#289 review: - V40/V31 migrations: replace the SQLite-style DROP TABLE ... CASCADE + recreate with in-place ALTER TABLE. In Postgres, CASCADE silently drops every foreign key in other tables that references the recreated table and never restores them (schema corruption). Postgres supports DROP/ADD CONSTRAINT and ALTER COLUMN DROP NOT NULL directly. Verified against live Postgres 16: all 7 FKs referencing the projects (ex-groves) table and the notifications->notification_subscriptions FK survive. - ListAgents ancestry filter: json_array_elements_text(ancestry::json) crashes on empty-string/NULL ancestry ('invalid input syntax for type json'). Guard with COALESCE(NULLIF(ancestry,''),'[]'). SQLite's json_each tolerated this. - Migrate(): acquire a database-global pg_advisory_lock on a pinned connection so concurrent hub replicas serialize migrations (the stateless-scale-out goal of this backend). Bump pool 4 -> 25 to avoid connection starvation under concurrent load. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Summary
Enables
database.driver: postgresso the hub can run as a stateless control plane against an external Postgres, configured entirely via GitOps (SCION_SERVER_DATABASE_URL) instead of a node-local SQLite file + PVC.This is the missing half of Postgres support. The
entlayer already spoke Postgres (entc.OpenPostgres+lib/pq); this adds the hand-written store's Postgres twin and wiresinitStore'scase "postgres":to compose them viaentadapter.NewCompositeStore— exactly as thesqlitebranch does.graph TB subgraph cmd["cmd/server_foreground.go · initStore()"] D{"cfg.Database.Driver"} end D -->|"sqlite"| SQ["sqlite.New → Migrate"] D -->|"postgres ✨NEW"| PG["postgres.New → Migrate"] D -->|other| ERR["unsupported driver"] SQ --> ES["entc.OpenSQLite + AutoMigrate"] PG --> EP["entc.OpenPostgres + AutoMigrate<br/>(already existed)"] ES --> CS["entadapter.NewCompositeStore"] EP --> CS CS --> IF[("store.Store · 206 methods<br/>23 sub-interfaces")] subgraph trans["Dialect translation (sqlite → postgres)"] T1["? → $N placeholders"] T2["INSERT OR IGNORE/REPLACE → ON CONFLICT"] T3["sqlite_master / PRAGMA → information_schema"] T4["randomblob() → gen_random_uuid()"] T5["json_each → json_array_elements_text"] T6["COLLATE NOCASE → LOWER() unique idx · BLOB → BYTEA"] end PG -.->|"53 migrations · 206 methods"| trans classDef new fill:#1f6feb,color:#fff class PG,EP newWhat's included
pkg/store/postgres/— a faithful, per-entity mirror ofpkg/store/sqlite/. Same file layout, same method names/signatures/comments/control flow (206 methods in each package). The only differences are dialect mechanics:?→$Nnumbered placeholdersINSERT OR IGNORE/REPLACE→ON CONFLICT DO NOTHING / DO UPDATEsqlite_master/PRAGMA table_infointrospection →information_schemarandomblob()UUID seeds →gen_random_uuid()json_each()→json_array_elements_text();json_array()→json_build_array()COLLATE NOCASE→LOWER(email)functional unique indexBLOB→BYTEA;datetime('now')→NOW();INSTR/SUBSTR→split_partcmd/server_foreground.go— addscase "postgres":toinitStore. The grove→project backfill (MigrateGroveToProjectData) is a SQLite-era data repair and is intentionally skipped on the Postgres path (a fresh Postgres DB has no legacy grove rows).lib/pqdriver import is gated behind ano_postgresbuild tag, mirroring the existingno_sqlitetag, so slim images can exclude either driver.Testing
Env-gated integration tests (
SCION_TEST_POSTGRES_URL) that skip when the variable is unset, so the CIno_sqlitepath (make test-fast, no DB) stays green.Verified locally against PostgreSQL 16:
go build ./...(default)go build -tags no_sqlite ./...go build -tags no_postgres ./...go build -tags 'no_sqlite no_postgres' ./...make lint(go vet -tags no_sqlite)make test-fast(no DB)CRUD round-trips cover users, projects, agents, secrets (incl. upsert), groups + membership, policies, invite codes and env vars.
Notes for reviewers
pkg/store/sqlite/so the diff between the two is purely the dialect changes listed above, which keeps future schema/method changes easy to apply to both.applyMigrationWithFKOffis preserved (with theforeignKeysOffMigrationsmap) so the migration-runner shape matches SQLite, but in Postgres it runs a plain transaction — the one table-recreate migration (V40) is made FK-safe viaDROP TABLE ... CASCADErather than togglingPRAGMA foreign_keys.🤖 Generated with Claude Code