diff --git a/cmd/run/datasetworker.go b/cmd/run/datasetworker.go index 01acecd86..5ca4aaba2 100644 --- a/cmd/run/datasetworker.go +++ b/cmd/run/datasetworker.go @@ -33,6 +33,11 @@ var DatasetWorkerCmd = &cli.Command{ Usage: "Enable dag generation of datasets that maintains the directory structure of datasets", Value: true, }, + &cli.BoolFlag{ + Name: "enable-reaper", + Usage: "Enable the orphan-record reaper. Exactly one dataset-worker process per deployment should enable this; running multiple reapers contends on the same rows and can livelock.", + Value: true, + }, &cli.BoolFlag{ Name: "exit-on-complete", Usage: "Exit the worker when there is no more work to do", @@ -67,6 +72,7 @@ var DatasetWorkerCmd = &cli.Command{ EnableScan: c.Bool("enable-scan"), EnablePack: c.Bool("enable-pack"), EnableDag: c.Bool("enable-dag"), + EnableReaper: c.Bool("enable-reaper"), ExitOnComplete: c.Bool("exit-on-complete"), ExitOnError: c.Bool("exit-on-error"), MinInterval: c.Duration("min-interval"), diff --git a/docs/en/cli-reference/run/dataset-worker.md b/docs/en/cli-reference/run/dataset-worker.md index 363b3c1e5..c73516ce3 100644 --- a/docs/en/cli-reference/run/dataset-worker.md +++ b/docs/en/cli-reference/run/dataset-worker.md @@ -14,6 +14,7 @@ OPTIONS: --enable-scan Enable scanning of datasets (default: true) --enable-pack Enable packing of datasets that calculates CIDs and packs them into CAR files (default: true) --enable-dag Enable dag generation of datasets that maintains the directory structure of datasets (default: true) + --enable-reaper Enable the orphan-record reaper. Exactly one dataset-worker process per deployment should enable this; running multiple reapers contends on the same rows and can livelock. (default: true) --exit-on-complete Exit the worker when there is no more work to do (default: false) --exit-on-error Exit the worker when there is any error (default: false) --min-interval value How often to check for new jobs (minimum) (default: 5s) diff --git a/service/datasetworker/datasetworker.go b/service/datasetworker/datasetworker.go index 06983aa9b..640d28ace 100644 --- a/service/datasetworker/datasetworker.go +++ b/service/datasetworker/datasetworker.go @@ -37,9 +37,12 @@ type Config struct { EnableScan bool EnablePack bool EnableDag bool - ExitOnError bool - MinInterval time.Duration - MaxInterval time.Duration + // exactly one dataset-worker process per deployment should enable this; + // concurrent reapers contend on the same rows. + EnableReaper bool + ExitOnError bool + MinInterval time.Duration + MaxInterval time.Duration } func NewWorker(db *gorm.DB, config Config) *Worker { @@ -100,13 +103,6 @@ func (w *Thread) Start(ctx context.Context, exitErr chan<- error) error { w.logger.Info("health report stopped") }() - healthcheckCleanupDone := make(chan struct{}) - go func() { - defer close(healthcheckCleanupDone) - healthcheck.StartHealthCheckCleanup(ctx, w.dbNoContext) - w.logger.Info("healthcheck cleanup stopped") - }() - go func() { err := w.run(ctx) if exitErr != nil { @@ -129,7 +125,6 @@ func (w *Thread) Start(ctx context.Context, exitErr chan<- error) error { // Wait for components to end. <-healthcheckDone - <-healthcheckCleanupDone w.logger.Info("worker thread finished") }() @@ -176,6 +171,19 @@ func (w Worker) Run(ctx context.Context) error { analytics.Default.Flush() }() + // one reaper per process; concurrent reapers contend on the same rows. + reaperDone := make(chan struct{}) + if w.config.EnableReaper { + go func() { + defer close(reaperDone) + healthcheck.StartHealthCheckCleanup(ctx, w.dbNoContext) + logger.Info("healthcheck cleanup stopped") + }() + } else { + close(reaperDone) + logger.Info("reaper disabled for this process") + } + threads := make([]service.Server, w.config.Concurrency) for i := range w.config.Concurrency { id := uuid.New() @@ -193,6 +201,7 @@ func (w Worker) Run(ctx context.Context) error { cancel() <-w.stateMonitor.Done() <-eventsFlushed + <-reaperDone return errors.WithStack(err) } diff --git a/service/healthcheck/healthcheck.go b/service/healthcheck/healthcheck.go index cc90079d9..2127d18de 100644 --- a/service/healthcheck/healthcheck.go +++ b/service/healthcheck/healthcheck.go @@ -43,12 +43,14 @@ var logger = log.Logger("healthcheck") // - db *gorm.DB: The database connection object used by HealthCheckCleanup to interact with // the database. func StartHealthCheckCleanup(ctx context.Context, db *gorm.DB) { + logger.Infow("healthcheck cleanup loop started", "interval", cleanupInterval) timer := time.NewTimer(cleanupInterval) defer timer.Stop() for { HealthCheckCleanup(ctx, db) select { case <-ctx.Done(): + logger.Infow("healthcheck cleanup loop exiting", "reason", ctx.Err()) return case <-timer.C: timer.Reset(cleanupInterval) @@ -69,7 +71,7 @@ func StartHealthCheckCleanup(ctx context.Context, db *gorm.DB) { // - db: The Gorm DBNoContext connection to use for database queries. func HealthCheckCleanup(ctx context.Context, db *gorm.DB) { db = db.WithContext(ctx) - logger.Debugw("running healthcheck cleanup") + logger.Infow("healthcheck cleanup cycle start") // Find stale workers var staleWorkers []model.Worker @@ -78,10 +80,12 @@ func HealthCheckCleanup(ctx context.Context, db *gorm.DB) { logger.Errorw("failed to find stale workers", "error", err) return } + logger.Infow("found stale workers", "count", len(staleWorkers)) if len(staleWorkers) == 0 { // Clean up orphaned records even if no stale workers cleanupOrphanedRecords(ctx, db) + logger.Infow("healthcheck cleanup cycle end") return } @@ -146,6 +150,7 @@ func HealthCheckCleanup(ctx context.Context, db *gorm.DB) { // Clean up orphaned records from deleted preparations (SET NULL cascades) cleanupOrphanedRecords(ctx, db) + logger.Infow("healthcheck cleanup cycle end") } // cleanupOrphanedRecords deletes orphaned records in batches. @@ -154,58 +159,61 @@ func HealthCheckCleanup(ctx context.Context, db *gorm.DB) { func cleanupOrphanedRecords(ctx context.Context, db *gorm.DB) { dialect := db.Dialector.Name() - // Delete orphaned car_blocks - largest table, 10K per cycle - result := execBatchDelete(db, dialect, "car_blocks", "car_id", 10000) - if result.Error != nil && !errors.Is(result.Error, context.Canceled) { - logger.Errorw("failed to clean up orphaned car_blocks", "error", result.Error) - } else if result.RowsAffected > 0 { - logger.Infow("cleaned up orphaned car_blocks", "count", result.RowsAffected) - } - - // Delete orphaned files - large table, 1000 per cycle - result = execBatchDelete(db, dialect, "files", "attachment_id", 1000) - if result.Error != nil && !errors.Is(result.Error, context.Canceled) { - logger.Errorw("failed to clean up orphaned files", "error", result.Error) - } else if result.RowsAffected > 0 { - logger.Infow("cleaned up orphaned files", "count", result.RowsAffected) - } - - // Delete orphaned cars - 100 per cycle - result = execBatchDelete(db, dialect, "cars", "preparation_id", 100) - if result.Error != nil && !errors.Is(result.Error, context.Canceled) { - logger.Errorw("failed to clean up orphaned cars", "error", result.Error) - } else if result.RowsAffected > 0 { - logger.Infow("cleaned up orphaned cars", "count", result.RowsAffected) - } - - // Delete orphaned directories - 100 per cycle - result = execBatchDelete(db, dialect, "directories", "attachment_id", 100) - if result.Error != nil && !errors.Is(result.Error, context.Canceled) { - logger.Errorw("failed to clean up orphaned directories", "error", result.Error) - } else if result.RowsAffected > 0 { - logger.Infow("cleaned up orphaned directories", "count", result.RowsAffected) - } + // files batch is smaller because each file deletion cascades SET NULL to + // car_blocks.file_id (write amplification ~= avg blocks per file). + reapPhase(db, dialect, "car_blocks", "car_id", 10000) + reapPhase(db, dialect, "files", "attachment_id", 200) + reapPhase(db, dialect, "cars", "preparation_id", 100) + reapPhase(db, dialect, "directories", "attachment_id", 100) + reapPhase(db, dialect, "jobs", "attachment_id", 100) +} - // Delete orphaned jobs - 100 per cycle - result = execBatchDelete(db, dialect, "jobs", "attachment_id", 100) +func reapPhase(db *gorm.DB, dialect, table, column string, limit int) { + logger.Infow("reaping orphans", "table", table, "column", column, "limit", limit) + start := time.Now() + result := execBatchDelete(db, dialect, table, column, limit) + elapsed := time.Since(start) if result.Error != nil && !errors.Is(result.Error, context.Canceled) { - logger.Errorw("failed to clean up orphaned jobs", "error", result.Error) - } else if result.RowsAffected > 0 { - logger.Infow("cleaned up orphaned jobs", "count", result.RowsAffected) + logger.Errorw("reap failed", "table", table, "error", result.Error, "elapsed_ms", elapsed.Milliseconds()) + return } + logger.Infow("reaped orphans", "table", table, "count", result.RowsAffected, "elapsed_ms", elapsed.Milliseconds()) } +// kept under cleanupInterval/2 so a wedged cycle can't overlap the next. +const reapStatementTimeout = "2min" + // execBatchDelete deletes rows where column IS NULL with a batch limit. // Uses dialect-specific SQL because MariaDB doesn't support LIMIT in IN subqueries. +// On postgres the delete runs under SET LOCAL statement_timeout so a wedged +// query errors out instead of hanging. func execBatchDelete(db *gorm.DB, dialect, table, column string, limit int) *gorm.DB { + var sqlStr string switch dialect { case "mysql": - // MySQL/MariaDB support DELETE ... LIMIT directly - return db.Exec("DELETE FROM "+table+" WHERE "+column+" IS NULL LIMIT ?", limit) + sqlStr = "DELETE FROM " + table + " WHERE " + column + " IS NULL LIMIT ?" default: - // PostgreSQL and SQLite support LIMIT in subqueries - return db.Exec("DELETE FROM "+table+" WHERE id IN (SELECT id FROM "+table+" WHERE "+column+" IS NULL LIMIT ?)", limit) + sqlStr = "DELETE FROM " + table + " WHERE id IN (SELECT id FROM " + table + " WHERE " + column + " IS NULL LIMIT ?)" + } + + if dialect != "postgres" { + return db.Exec(sqlStr, limit) + } + + // SET LOCAL scopes the timeout to this transaction so it doesn't leak. + var result *gorm.DB + txErr := db.Transaction(func(tx *gorm.DB) error { + // postgres SET doesn't accept parameters; interpolate a const. + if err := tx.Exec("SET LOCAL statement_timeout = '" + reapStatementTimeout + "'").Error; err != nil { + return err + } + result = tx.Exec(sqlStr, limit) + return result.Error + }) + if result == nil { + return &gorm.DB{Error: txErr} } + return result } // Register registers a new worker in the database. It uses the provided context and database connection.