Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/run/datasetworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ var DatasetWorkerCmd = &cli.Command{
Usage: "Enable dag generation of datasets that maintains the directory structure of datasets",
Value: true,
},
&cli.BoolFlag{
Name: "enable-reaper",
Usage: "Enable the orphan-record reaper. Exactly one dataset-worker process per deployment should enable this; running multiple reapers contends on the same rows and can livelock.",
Value: true,
},
&cli.BoolFlag{
Name: "exit-on-complete",
Usage: "Exit the worker when there is no more work to do",
Expand Down Expand Up @@ -67,6 +72,7 @@ var DatasetWorkerCmd = &cli.Command{
EnableScan: c.Bool("enable-scan"),
EnablePack: c.Bool("enable-pack"),
EnableDag: c.Bool("enable-dag"),
EnableReaper: c.Bool("enable-reaper"),
ExitOnComplete: c.Bool("exit-on-complete"),
ExitOnError: c.Bool("exit-on-error"),
MinInterval: c.Duration("min-interval"),
Expand Down
1 change: 1 addition & 0 deletions docs/en/cli-reference/run/dataset-worker.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 20 additions & 11 deletions service/datasetworker/datasetworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ type Config struct {
EnableScan bool
EnablePack bool
EnableDag bool
ExitOnError bool
MinInterval time.Duration
MaxInterval time.Duration
// exactly one dataset-worker process per deployment should enable this;
// concurrent reapers contend on the same rows.
EnableReaper bool
ExitOnError bool
MinInterval time.Duration
MaxInterval time.Duration
}

func NewWorker(db *gorm.DB, config Config) *Worker {
Expand Down Expand Up @@ -100,13 +103,6 @@ func (w *Thread) Start(ctx context.Context, exitErr chan<- error) error {
w.logger.Info("health report stopped")
}()

healthcheckCleanupDone := make(chan struct{})
go func() {
defer close(healthcheckCleanupDone)
healthcheck.StartHealthCheckCleanup(ctx, w.dbNoContext)
w.logger.Info("healthcheck cleanup stopped")
}()

go func() {
err := w.run(ctx)
if exitErr != nil {
Expand All @@ -129,7 +125,6 @@ func (w *Thread) Start(ctx context.Context, exitErr chan<- error) error {

// Wait for components to end.
<-healthcheckDone
<-healthcheckCleanupDone

w.logger.Info("worker thread finished")
}()
Expand Down Expand Up @@ -176,6 +171,19 @@ func (w Worker) Run(ctx context.Context) error {
analytics.Default.Flush()
}()

// one reaper per process; concurrent reapers contend on the same rows.
reaperDone := make(chan struct{})
if w.config.EnableReaper {
go func() {
defer close(reaperDone)
healthcheck.StartHealthCheckCleanup(ctx, w.dbNoContext)
logger.Info("healthcheck cleanup stopped")
}()
} else {
close(reaperDone)
logger.Info("reaper disabled for this process")
}

threads := make([]service.Server, w.config.Concurrency)
for i := range w.config.Concurrency {
id := uuid.New()
Expand All @@ -193,6 +201,7 @@ func (w Worker) Run(ctx context.Context) error {
cancel()
<-w.stateMonitor.Done()
<-eventsFlushed
<-reaperDone
return errors.WithStack(err)
}

Expand Down
90 changes: 49 additions & 41 deletions service/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ var logger = log.Logger("healthcheck")
// - db *gorm.DB: The database connection object used by HealthCheckCleanup to interact with
// the database.
func StartHealthCheckCleanup(ctx context.Context, db *gorm.DB) {
logger.Infow("healthcheck cleanup loop started", "interval", cleanupInterval)
timer := time.NewTimer(cleanupInterval)
defer timer.Stop()
for {
HealthCheckCleanup(ctx, db)
select {
case <-ctx.Done():
logger.Infow("healthcheck cleanup loop exiting", "reason", ctx.Err())
return
case <-timer.C:
timer.Reset(cleanupInterval)
Expand All @@ -69,7 +71,7 @@ func StartHealthCheckCleanup(ctx context.Context, db *gorm.DB) {
// - db: The Gorm DBNoContext connection to use for database queries.
func HealthCheckCleanup(ctx context.Context, db *gorm.DB) {
db = db.WithContext(ctx)
logger.Debugw("running healthcheck cleanup")
logger.Infow("healthcheck cleanup cycle start")

// Find stale workers
var staleWorkers []model.Worker
Expand All @@ -78,10 +80,12 @@ func HealthCheckCleanup(ctx context.Context, db *gorm.DB) {
logger.Errorw("failed to find stale workers", "error", err)
return
}
logger.Infow("found stale workers", "count", len(staleWorkers))

if len(staleWorkers) == 0 {
// Clean up orphaned records even if no stale workers
cleanupOrphanedRecords(ctx, db)
logger.Infow("healthcheck cleanup cycle end")
return
}

Expand Down Expand Up @@ -146,6 +150,7 @@ func HealthCheckCleanup(ctx context.Context, db *gorm.DB) {

// Clean up orphaned records from deleted preparations (SET NULL cascades)
cleanupOrphanedRecords(ctx, db)
logger.Infow("healthcheck cleanup cycle end")
}

// cleanupOrphanedRecords deletes orphaned records in batches.
Expand All @@ -154,58 +159,61 @@ func HealthCheckCleanup(ctx context.Context, db *gorm.DB) {
func cleanupOrphanedRecords(ctx context.Context, db *gorm.DB) {
dialect := db.Dialector.Name()

// Delete orphaned car_blocks - largest table, 10K per cycle
result := execBatchDelete(db, dialect, "car_blocks", "car_id", 10000)
if result.Error != nil && !errors.Is(result.Error, context.Canceled) {
logger.Errorw("failed to clean up orphaned car_blocks", "error", result.Error)
} else if result.RowsAffected > 0 {
logger.Infow("cleaned up orphaned car_blocks", "count", result.RowsAffected)
}

// Delete orphaned files - large table, 1000 per cycle
result = execBatchDelete(db, dialect, "files", "attachment_id", 1000)
if result.Error != nil && !errors.Is(result.Error, context.Canceled) {
logger.Errorw("failed to clean up orphaned files", "error", result.Error)
} else if result.RowsAffected > 0 {
logger.Infow("cleaned up orphaned files", "count", result.RowsAffected)
}

// Delete orphaned cars - 100 per cycle
result = execBatchDelete(db, dialect, "cars", "preparation_id", 100)
if result.Error != nil && !errors.Is(result.Error, context.Canceled) {
logger.Errorw("failed to clean up orphaned cars", "error", result.Error)
} else if result.RowsAffected > 0 {
logger.Infow("cleaned up orphaned cars", "count", result.RowsAffected)
}

// Delete orphaned directories - 100 per cycle
result = execBatchDelete(db, dialect, "directories", "attachment_id", 100)
if result.Error != nil && !errors.Is(result.Error, context.Canceled) {
logger.Errorw("failed to clean up orphaned directories", "error", result.Error)
} else if result.RowsAffected > 0 {
logger.Infow("cleaned up orphaned directories", "count", result.RowsAffected)
}
// files batch is smaller because each file deletion cascades SET NULL to
// car_blocks.file_id (write amplification ~= avg blocks per file).
reapPhase(db, dialect, "car_blocks", "car_id", 10000)
reapPhase(db, dialect, "files", "attachment_id", 200)
reapPhase(db, dialect, "cars", "preparation_id", 100)
reapPhase(db, dialect, "directories", "attachment_id", 100)
reapPhase(db, dialect, "jobs", "attachment_id", 100)
}

// Delete orphaned jobs - 100 per cycle
result = execBatchDelete(db, dialect, "jobs", "attachment_id", 100)
func reapPhase(db *gorm.DB, dialect, table, column string, limit int) {
logger.Infow("reaping orphans", "table", table, "column", column, "limit", limit)
start := time.Now()
result := execBatchDelete(db, dialect, table, column, limit)
elapsed := time.Since(start)
if result.Error != nil && !errors.Is(result.Error, context.Canceled) {
logger.Errorw("failed to clean up orphaned jobs", "error", result.Error)
} else if result.RowsAffected > 0 {
logger.Infow("cleaned up orphaned jobs", "count", result.RowsAffected)
logger.Errorw("reap failed", "table", table, "error", result.Error, "elapsed_ms", elapsed.Milliseconds())
return
}
logger.Infow("reaped orphans", "table", table, "count", result.RowsAffected, "elapsed_ms", elapsed.Milliseconds())
}

// kept under cleanupInterval/2 so a wedged cycle can't overlap the next.
const reapStatementTimeout = "2min"

// execBatchDelete deletes rows where column IS NULL with a batch limit.
// Uses dialect-specific SQL because MariaDB doesn't support LIMIT in IN subqueries.
// On postgres the delete runs under SET LOCAL statement_timeout so a wedged
// query errors out instead of hanging.
func execBatchDelete(db *gorm.DB, dialect, table, column string, limit int) *gorm.DB {
var sqlStr string
switch dialect {
case "mysql":
// MySQL/MariaDB support DELETE ... LIMIT directly
return db.Exec("DELETE FROM "+table+" WHERE "+column+" IS NULL LIMIT ?", limit)
sqlStr = "DELETE FROM " + table + " WHERE " + column + " IS NULL LIMIT ?"
default:
// PostgreSQL and SQLite support LIMIT in subqueries
return db.Exec("DELETE FROM "+table+" WHERE id IN (SELECT id FROM "+table+" WHERE "+column+" IS NULL LIMIT ?)", limit)
sqlStr = "DELETE FROM " + table + " WHERE id IN (SELECT id FROM " + table + " WHERE " + column + " IS NULL LIMIT ?)"
}

if dialect != "postgres" {
return db.Exec(sqlStr, limit)
}

// SET LOCAL scopes the timeout to this transaction so it doesn't leak.
var result *gorm.DB
txErr := db.Transaction(func(tx *gorm.DB) error {
// postgres SET doesn't accept parameters; interpolate a const.
if err := tx.Exec("SET LOCAL statement_timeout = '" + reapStatementTimeout + "'").Error; err != nil {
return err
}
result = tx.Exec(sqlStr, limit)
return result.Error
})
if result == nil {
return &gorm.DB{Error: txErr}
}
return result
}

// Register registers a new worker in the database. It uses the provided context and database connection.
Expand Down
Loading