diff --git a/cpuhours/cpuhours.go b/cpuhours/cpuhours.go index 29bcda3..e88a63f 100644 --- a/cpuhours/cpuhours.go +++ b/cpuhours/cpuhours.go @@ -2,6 +2,7 @@ package cpuhours import ( "context" + "encoding/json" "fmt" "time" @@ -24,6 +25,13 @@ type CPUHours struct { nc *nats.EncodedConn } +type CalculationResult struct { + CPUHours *apd.Decimal + Analysis *db.Analysis + BasisTime time.Time + CalcTime time.Time +} + func New(db *db.Database, nc *nats.EncodedConn) *CPUHours { return &CPUHours{ db: db, @@ -32,52 +40,76 @@ func New(db *db.Database, nc *nats.EncodedConn) *CPUHours { } // CPUHoursForAnalysis returns the CPU hours total for the analysis as a decimal value. -func (c *CPUHours) CPUHoursForAnalysis(context context.Context, analysisID string) (*apd.Decimal, *db.Analysis, error) { +func (c *CPUHours) CPUHoursForAnalysis(context context.Context, analysisID string) (CalculationResult, error) { var ( - endTime time.Time - analysis *db.Analysis - err error + basisTime time.Time + calcTime time.Time + analysis *db.Analysis + err error + res CalculationResult ) log = log.WithFields(logrus.Fields{"context": "calculating CPU hours", "analysisID": analysisID}) log.Debug("getting millicores reserved") millicoresReserved, err := c.db.MillicoresReserved(context, analysisID) if err != nil { - return nil, nil, err + return res, err } log.Debug("done getting millicores reserved") - for { - log.Debug("getting analysis info") + for i := 0; i < 5; i++ { // Try five times, then use time.Now().UTC() instead + log.Debug("getting analysis info and locking row") analysis, err = c.db.AnalysisWithoutUser(context, analysisID) if err != nil { - return nil, nil, err + return res, err } log.Debug("done getting analysis info") if !analysis.StartDate.Valid { - return nil, nil, fmt.Errorf("start date is null") + return res, fmt.Errorf("start date is null") } // It's possible for this to be reached before the database is updated with the actual // end date. If that's the case, wait a bit and try again. + // + // We drop and restart the transaction here to avoid lock + // issues and allow the end date to get set by other processes if !analysis.EndDate.Valid { + if err := c.db.Rollback(); err != nil { + log.WithError(err).Error("failed to rollback transaction") + } time.Sleep(5 * time.Second) + c.db.Begin(context) // nolint: errcheck continue } else { - endTime = analysis.EndDate.Time.UTC() + calcTime = analysis.EndDate.Time.UTC() break } } - startTime := analysis.StartDate.Time.UTC() + res.Analysis = analysis + + if calcTime.IsZero() { + calcTime = time.Now().UTC() + } + + // Start calculation at the most recent of StartTime or UsageLastUpdate + // calculate to EndDate or now, whichever is earlier + // so start -> now, last update -> now, start -> end time already past, or last update -> end time already past + // then update last update time to the now value that was used + basisTime = analysis.StartDate.Time.UTC() + if analysis.UsageLastUpdate.Valid && analysis.UsageLastUpdate.Time.UTC().After(basisTime) { + basisTime = analysis.UsageLastUpdate.Time.UTC() + } - log.Infof("start date: %s, end date: %s", startTime.String(), endTime.String()) + res.BasisTime = basisTime + res.CalcTime = calcTime + log.Infof("basis date: %s, end date: %s", basisTime.String(), calcTime.String()) - timeSpent, err := apd.New(0, 0).SetFloat64(endTime.Sub(startTime).Hours()) + timeSpent, err := apd.New(0, 0).SetFloat64(calcTime.Sub(basisTime).Hours()) if err != nil { - return nil, nil, err + return res, err } mcReserved := apd.New(0, 0).SetInt64(millicoresReserved) @@ -87,21 +119,30 @@ func (c *CPUHours) CPUHoursForAnalysis(context context.Context, analysisID strin bc := apd.BaseContext.WithPrecision(15) _, err = bc.Mul(cpuHours, mcReserved, timeSpent) if err != nil { - return nil, nil, err + return res, err } _, err = bc.Quo(cpuHours, cpuHours, mc2cores) if err != nil { - return nil, nil, err + return res, err } log.Infof("run time is %s hours; millicores reserved is %s; cpu hours is %s", timeSpent.String(), mcReserved.String(), cpuHours.String()) - return cpuHours, analysis, nil + err = c.db.SetUsageLastUpdate(context, analysisID, calcTime) + if err != nil { + return res, err + } + + res.CPUHours = cpuHours + + return res, nil } -func (c *CPUHours) addEvent(context context.Context, analysis *db.Analysis, cpuHours *apd.Decimal) error { +func (c *CPUHours) addEvent(context context.Context, res CalculationResult) error { var err error + analysis := res.Analysis + cpuHours := res.CPUHours floatValue, err := cpuHours.Float64() if err != nil { @@ -113,6 +154,11 @@ func (c *CPUHours) addEvent(context context.Context, analysis *db.Analysis, cpuH return err } + metajson, err := json.Marshal(res) + if err != nil { + return err + } + update := &qms.Update{ ValueType: "usages", Value: floatValue, @@ -127,6 +173,7 @@ func (c *CPUHours) addEvent(context context.Context, analysis *db.Analysis, cpuH User: &qms.QMSUser{ Username: username, }, + Metadata: string(metajson), } request := pbinit.NewAddUpdateRequest(update) @@ -147,26 +194,42 @@ func (c *CPUHours) addEvent(context context.Context, analysis *db.Analysis, cpuH func (c *CPUHours) CalculateForAnalysisByID(context context.Context, analysisID string) error { var ( - cpuHours *apd.Decimal - analysis *db.Analysis - err error + res CalculationResult + err error ) - cpuHours, analysis, err = c.CPUHoursForAnalysis(context, analysisID) + res, err = c.CPUHoursForAnalysis(context, analysisID) if err != nil { return err } - return c.addEvent(context, analysis, cpuHours) + return c.addEvent(context, res) } func (c *CPUHours) CalculateForAnalysis(context context.Context, externalID string) error { log.Debug("getting analysis id") + + // We'll do this lookup outside the transaction to limit the lock time analysisID, err := c.db.GetAnalysisIDByExternalID(context, externalID) if err != nil { return err } log.Debug("done getting analysis id") - return c.CalculateForAnalysisByID(context, analysisID) + err = c.db.Begin(context) + if err != nil { + return err + } + defer c.db.Rollback() // nolint:errcheck + + err = c.CalculateForAnalysisByID(context, analysisID) + if err != nil { + rollbackErr := c.db.Rollback() + if rollbackErr != nil { + log.WithError(rollbackErr).Error("failed to rollback transaction") + } + return err + } else { + return c.db.Commit() + } } diff --git a/db/analyses.go b/db/analyses.go index 4c6fb20..f5fd406 100644 --- a/db/analyses.go +++ b/db/analyses.go @@ -8,17 +8,18 @@ import ( ) type Analysis struct { - ID string `db:"id"` - AppID string `db:"app_id"` - StartDate null.Time `db:"start_date"` - EndDate null.Time `db:"end_date"` - Status string `db:"status"` - Deleted bool `db:"deleted"` - Submission string `db:"submission"` - UserID string `db:"user_id"` - JobType string `db:"job_type"` - SystemID string `db:"system_id"` - Subdomain null.String `db:"subdomain"` + ID string `db:"id"` + AppID string `db:"app_id"` + StartDate null.Time `db:"start_date"` + EndDate null.Time `db:"end_date"` + Status string `db:"status"` + Deleted bool `db:"deleted"` + Submission string `db:"submission"` + UserID string `db:"user_id"` + JobType string `db:"job_type"` + SystemID string `db:"system_id"` + Subdomain null.String `db:"subdomain"` + UsageLastUpdate null.Time `db:"usage_last_update"` } // GetAnalysisIDByExternalID returns the analysis ID based on the external ID @@ -31,7 +32,7 @@ func (d *Database) GetAnalysisIDByExternalID(context context.Context, externalID JOIN job_steps s ON s.job_id = j.id WHERE s.external_id = $1 ` - err := d.db.QueryRowxContext(context, q, externalID).Scan(&analysisID) + err := d.Q().QueryRowxContext(context, q, externalID).Scan(&analysisID) if err != nil { return "", err } @@ -50,14 +51,16 @@ func (d *Database) AnalysisWithoutUser(context context.Context, analysisID strin j.submission, j.user_id, j.subdomain, + j.usage_last_update, t.name job_type, t.system_id FROM jobs j JOIN job_types t ON j.job_type_id = t.id - WHERE j.id = $1; + WHERE j.id = $1 + FOR NO KEY UPDATE; ` var analysis Analysis - err := d.db.QueryRowxContext(context, q, analysisID).StructScan(&analysis) + err := d.Q().QueryRowxContext(context, q, analysisID).StructScan(&analysis) return &analysis, err } @@ -74,6 +77,7 @@ func (d *Database) Analysis(context context.Context, userID, id string) (*Analys j.submission, j.user_id, j.subdomain, + j.usage_last_update, t.name job_type, t.system_id FROM jobs j @@ -81,10 +85,27 @@ func (d *Database) Analysis(context context.Context, userID, id string) (*Analys WHERE j.id = $1 AND j.user_id = $2; ` - err := d.db.QueryRowxContext(context, q, id, userID).StructScan(&analysis) + err := d.Q().QueryRowxContext(context, q, id, userID).StructScan(&analysis) return &analysis, err } +// SetUsageLastUpdate updates the `usage_last_update` column of the jobs table to the provided time +func (d *Database) SetUsageLastUpdate(context context.Context, analysisID string, usagetime time.Time) error { + const q = ` + UPDATE jobs + SET usage_last_update = $2 + WHERE id = $1 + ` + + _, err := d.Q().ExecContext( + context, + q, + analysisID, + usagetime.Local(), // we store things in the DB as non-UTC time + ) + return err +} + type CalculableAnalysis struct { ID string `db:"id"` StartDate time.Time `db:"start_date"` @@ -109,7 +130,7 @@ func (d *Database) AdminAllCalculableAnalyses(context context.Context, userID st AND j.end_date <= $3::timestamp; ` - rows, err := d.db.QueryxContext(context, q, userID, from, to) + rows, err := d.Q().QueryxContext(context, q, userID, from, to) if err != nil { return nil, err } diff --git a/db/db.go b/db/db.go index 442c99e..3b83ebd 100644 --- a/db/db.go +++ b/db/db.go @@ -32,16 +32,67 @@ type DatabaseAccessor interface { QueryRowxContext(context.Context, string, ...interface{}) *sqlx.Row QueryxContext(context.Context, string, ...interface{}) (*sqlx.Rows, error) ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + BeginTxx(context.Context, *sql.TxOptions) (*sqlx.Tx, error) +} + +type TxAccessor interface { + QueryRowxContext(context.Context, string, ...interface{}) *sqlx.Row + QueryxContext(context.Context, string, ...interface{}) (*sqlx.Rows, error) + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + Commit() error + Rollback() error +} + +type Queryer interface { + QueryRowxContext(context.Context, string, ...interface{}) *sqlx.Row + QueryxContext(context.Context, string, ...interface{}) (*sqlx.Rows, error) + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) } type Database struct { db DatabaseAccessor + tx TxAccessor } func New(db DatabaseAccessor) *Database { return &Database{db: db} } +func (d *Database) Q() Queryer { + if d.tx != nil { + return d.tx + } else { + return d.db + } +} + +func (d *Database) Begin(context context.Context) error { + tx, err := d.db.BeginTxx(context, nil) + if err != nil { + return err + } + d.tx = tx + return nil +} + +func (d *Database) Commit() error { + if d.tx != nil { + err := d.tx.Commit() + d.tx = nil + return err + } + return nil +} + +func (d *Database) Rollback() error { + if d.tx != nil { + err := d.tx.Rollback() + d.tx = nil + return err + } + return nil +} + func (d *Database) Username(context context.Context, userID string) (string, error) { var username string @@ -51,7 +102,7 @@ func (d *Database) Username(context context.Context, userID string) (string, err WHERE id = $1; ` - err := d.db.QueryRowxContext(context, q, userID).Scan(&username) + err := d.Q().QueryRowxContext(context, q, userID).Scan(&username) if err != nil { return "", err } @@ -68,7 +119,7 @@ func (d *Database) UserID(context context.Context, username string) (string, err WHERE username = $1; ` - err := d.db.QueryRowxContext(context, q, username).Scan(&userID) + err := d.Q().QueryRowxContext(context, q, username).Scan(&userID) if err != nil { return "", err } @@ -94,7 +145,7 @@ func (d *Database) CurrentCPUHoursForUser(context context.Context, username stri AND t.effective_range @> CURRENT_TIMESTAMP::timestamp LIMIT 1; ` - err := d.db.QueryRowxContext(context, q, username).StructScan(&cpuHours) + err := d.Q().QueryRowxContext(context, q, username).StructScan(&cpuHours) if err != nil { return nil, err } @@ -108,7 +159,7 @@ func (d *Database) InsertCurrentCPUHoursForUser(context context.Context, cpuHour VALUES ($1, $2, tsrange($3, $4, '[)')); ` - _, err := d.db.ExecContext( + _, err := d.Q().ExecContext( context, q, cpuHours.Total, @@ -140,7 +191,7 @@ func (d *Database) AllCPUHoursForUser(context context.Context, username string) WHERE u.username = $1; ` - rows, err = d.db.QueryxContext(context, q, username) + rows, err = d.Q().QueryxContext(context, q, username) if err != nil { return nil, err } @@ -178,7 +229,7 @@ func (d *Database) AdminAllCurrentCPUHours(context context.Context) ([]CPUHours, WHERE t.effective_range @> CURRENT_TIMESTAMP::timestamp; ` - rows, err := d.db.QueryxContext(context, q) + rows, err := d.Q().QueryxContext(context, q) if err != nil { return nil, err } @@ -215,7 +266,7 @@ func (d *Database) AdminAllCPUHours(context context.Context) ([]CPUHours, error) JOIN users u ON t.user_id = u.id; ` - rows, err := d.db.QueryxContext(context, q) + rows, err := d.Q().QueryxContext(context, q) if err != nil { return nil, err } @@ -244,7 +295,7 @@ func (d *Database) UpdateCPUHoursTotal(context context.Context, totalObj *CPUHou AND effective_range @> CURRENT_TIMESTAMP::timestamp; ` - _, err := d.db.ExecContext( + _, err := d.Q().ExecContext( context, q, totalObj.UserID, @@ -257,10 +308,10 @@ func (d *Database) MillicoresReserved(context context.Context, analysisID string const q = ` SELECT millicores_reserved FROM jobs - WhERE id = $1; + WHERE id = $1; ` var millicores int64 - err := d.db.QueryRowxContext(context, q, analysisID).Scan(&millicores) + err := d.Q().QueryRowxContext(context, q, analysisID).Scan(&millicores) return millicores, err } @@ -278,7 +329,7 @@ func (d *Database) UsersWithCalculableAnalyses(context context.Context) ([]User, AND j.end_date IS NOT NULL; ` - rows, err := d.db.QueryxContext(context, q) + rows, err := d.Q().QueryxContext(context, q) if err != nil { return nil, err } diff --git a/db/events.go b/db/events.go index a598b6a..0b8efcc 100644 --- a/db/events.go +++ b/db/events.go @@ -47,7 +47,7 @@ func (d *Database) AddCPUUsageEvent(context context.Context, event *CPUUsageEven ($1, $2, (SELECT id FROM cpu_usage_event_types WHERE name = $3), $4, $5); ` - _, err := d.db.ExecContext( + _, err := d.Q().ExecContext( context, q, event.RecordDate, @@ -68,7 +68,7 @@ func (d *Database) ClaimEvent(context context.Context, id, claimedBy string) err claimed_by = $2 WHERE id = $1; ` - _, err := d.db.ExecContext(context, q, id, claimedBy) + _, err := d.Q().ExecContext(context, q, id, claimedBy) return err } @@ -81,7 +81,7 @@ func (d *Database) ProcessingEvent(context context.Context, id string) error { attempts = attempts + 1 WHERE id = $1; ` - _, err := d.db.ExecContext(context, q, id) + _, err := d.Q().ExecContext(context, q, id) return err } @@ -93,7 +93,7 @@ func (d *Database) FinishedProcessingEvent(context context.Context, id string) e processed = true WHERE id = $1; ` - _, err := d.db.ExecContext(context, q, id) + _, err := d.Q().ExecContext(context, q, id) return err } @@ -130,7 +130,7 @@ func (d *Database) UnclaimedUnprocessedEvents(context context.Context) ([]CPUUsa AND CURRENT_TIMESTAMP >= COALESCE(c.claim_expires_on, to_timestamp(0)); ` - rows, err := d.db.QueryxContext(context, q) + rows, err := d.Q().QueryxContext(context, q) if err != nil { return nil, err } @@ -177,7 +177,7 @@ func (d *Database) ListEvents(context context.Context) ([]CPUUsageWorkItem, erro JOIN cpu_usage_event_types e ON c.event_type_id = e.id; ` - rows, err := d.db.QueryxContext(context, q) + rows, err := d.Q().QueryxContext(context, q) if err != nil { return nil, err } @@ -225,7 +225,7 @@ func (d *Database) ListAllUserEvents(context context.Context, username string) ( WHERE u.username = $1; ` - rows, err := d.db.QueryxContext(context, q, username) + rows, err := d.Q().QueryxContext(context, q, username) if err != nil { return nil, err } @@ -271,7 +271,7 @@ func (d *Database) Event(context context.Context, id string) (*CPUUsageWorkItem, JOIN cpu_usage_event_types e ON c.event_type_id = e.id WHERE c.id = $1; ` - err := d.db.QueryRowxContext(context, q, id).StructScan(&workItem) + err := d.Q().QueryRowxContext(context, q, id).StructScan(&workItem) if err != nil { return nil, err } @@ -298,7 +298,7 @@ func (d *Database) UpdateEvent(context context.Context, workItem *CPUUsageWorkIt WHERE id = $1; ` - _, err := d.db.ExecContext( + _, err := d.Q().ExecContext( context, q, workItem.ID, @@ -324,6 +324,6 @@ func (d *Database) DeleteEvent(context context.Context, id string) error { const q = ` DELETE FROM cpu_usage_events WHERE id = $1; ` - _, err := d.db.ExecContext(context, q, id) + _, err := d.Q().ExecContext(context, q, id) return err } diff --git a/db/workers.go b/db/workers.go index b278da0..e3954fe 100644 --- a/db/workers.go +++ b/db/workers.go @@ -43,7 +43,7 @@ func (d *Database) ListWorkers(context context.Context) ([]Worker, error) { FROM cpu_usage_workers; ` - rows, err := d.db.QueryxContext(context, q) + rows, err := d.Q().QueryxContext(context, q) if err != nil { return nil, err } @@ -82,7 +82,7 @@ func (d *Database) Worker(context context.Context, id string) (*Worker, error) { last_modified FROM cpu_usage_workers WHERE id = $1;` - err := d.db.QueryRowxContext(context, q, id).StructScan(&worker) + err := d.Q().QueryRowxContext(context, q, id).StructScan(&worker) return &worker, err } @@ -102,7 +102,7 @@ func (d *Database) UpdateWorker(context context.Context, worker *Worker) error { working_on =$12 WHERE id = $1; ` - _, err := d.db.ExecContext( + _, err := d.Q().ExecContext( context, q, worker.ID, @@ -125,7 +125,7 @@ func (d *Database) DeleteWorker(context context.Context, id string) error { const q = ` DELETE FROM cpu_usage_workers WHERE id = $1; ` - _, err := d.db.ExecContext(context, q, id) + _, err := d.Q().ExecContext(context, q, id) return err } @@ -143,7 +143,7 @@ func (d *Database) RegisterWorker(context context.Context, workerName string, ex ($1, $2) RETURNING id; ` - err = d.db.QueryRowxContext(context, q, workerName, expiration).Scan(&newID) + err = d.Q().QueryRowxContext(context, q, workerName, expiration).Scan(&newID) return newID, err } @@ -155,7 +155,7 @@ func (d *Database) UnregisterWorker(context context.Context, workerID string) er getting_work = false WHERE id = $1; ` - _, err := d.db.ExecContext(context, q, workerID) + _, err := d.Q().ExecContext(context, q, workerID) return err } @@ -170,7 +170,7 @@ func (d *Database) RefreshWorkerRegistration(context context.Context, workerID, SET activation_expires_on = $2; ` newTime := time.Now().Add(expirationInterval) - _, err := d.db.ExecContext(context, q, workerID, newTime, workerName) + _, err := d.Q().ExecContext(context, q, workerID, newTime, workerName) return &newTime, err } @@ -189,7 +189,7 @@ func (d *Database) PurgeExpiredWorkers(context context.Context) (int64, error) { activation_expires_on = NULL ); ` - result, err := d.db.ExecContext(context, q) + result, err := d.Q().ExecContext(context, q) if err != nil { return 0, err } @@ -210,7 +210,7 @@ func (d *Database) PurgeExpiredWorkSeekers(context context.Context) (int64, erro ); ` - result, err := d.db.ExecContext(context, q) + result, err := d.Q().ExecContext(context, q) if err != nil { return 0, err } @@ -234,7 +234,7 @@ func (d *Database) PurgeExpiredWorkClaims(context context.Context) (int64, error claim_expires_on = NULL ); ` - result, err := d.db.ExecContext(context, q) + result, err := d.Q().ExecContext(context, q) if err != nil { return 0, err } @@ -254,7 +254,7 @@ func (d *Database) ResetWorkClaimsForInactiveWorkers(context context.Context) (i AND claimed_by = sub.id; ` - result, err := d.db.ExecContext(context, q) + result, err := d.Q().ExecContext(context, q) if err != nil { return 0, err } @@ -269,7 +269,7 @@ func (d *Database) GettingWork(context context.Context, workerID string, expirat getting_work_expires_on = $2 WHERE id = $1; ` - _, err := d.db.ExecContext(context, q, workerID, expiration) + _, err := d.Q().ExecContext(context, q, workerID, expiration) return err } @@ -281,7 +281,7 @@ func (d *Database) DoneGettingWork(context context.Context, workerID string) err getting_work_expires_on = NULL WHERE id = $1; ` - _, err := d.db.ExecContext(context, q, workerID) + _, err := d.Q().ExecContext(context, q, workerID) return err } @@ -292,6 +292,6 @@ func (d *Database) SetWorking(context context.Context, workerID string, working SET working = $2 WHERE id = $1; ` - _, err := d.db.ExecContext(context, q, workerID, working) + _, err := d.Q().ExecContext(context, q, workerID, working) return err } diff --git a/go.mod b/go.mod index 7d4942a..b338f0b 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/cyverse-de/go-mod/protobufjson v0.0.3 github.com/cyverse-de/go-mod/subjects v0.1.4 github.com/cyverse-de/messaging/v9 v9.1.5 - github.com/cyverse-de/p/go/qms v0.2.0 + github.com/cyverse-de/p/go/qms v0.2.1 github.com/guregu/null v4.0.0+incompatible github.com/jmoiron/sqlx v1.3.5 github.com/knadh/koanf v1.5.0 diff --git a/go.sum b/go.sum index 96356c4..6e11b94 100644 --- a/go.sum +++ b/go.sum @@ -114,8 +114,8 @@ github.com/cyverse-de/p/go/header v0.0.4 h1:asY/uixi9rCqbDLq2G21CT7WJRaendBfr1z2 github.com/cyverse-de/p/go/header v0.0.4/go.mod h1:1j79PqvJyMRmXXKyW4KbZWGQNontm0B9smsaq5t5Ct8= github.com/cyverse-de/p/go/monitoring v0.0.5 h1:n+GdL9a3R1zM7L+18SIufrtmLxXU8L/cKX6DhOJ7IhY= github.com/cyverse-de/p/go/monitoring v0.0.5/go.mod h1:CB0aMpTRJvCFlGeL/75uX4Snj/OXj9wzKF5KQX4tASg= -github.com/cyverse-de/p/go/qms v0.2.0 h1:a1ToLYoKGfGw/AeIIexfES/DH1WSJ4VjPFssJYnica8= -github.com/cyverse-de/p/go/qms v0.2.0/go.mod h1:J23CvbPbm4tFqsD6HBmU1kG/vzUbI3MIDkjCgj3S/Kw= +github.com/cyverse-de/p/go/qms v0.2.1 h1:k6kEx1NR+zIZSDLXIeoS81O15JfbosdOf7gDg+ez82U= +github.com/cyverse-de/p/go/qms v0.2.1/go.mod h1:J23CvbPbm4tFqsD6HBmU1kG/vzUbI3MIDkjCgj3S/Kw= github.com/cyverse-de/p/go/svcerror v0.0.8 h1:UE06rE31kk7OcciTQlZfrJTFlC9AMTfNf6JKll41u0U= github.com/cyverse-de/p/go/svcerror v0.0.8/go.mod h1:0h/BngUR9cTV86LYqHMkdREFcxJuMyOJV6f3u6Un4LQ= github.com/cyverse-de/p/go/user v0.0.11 h1:VM2WGrdvcmDLwwjUJpCbrhC5te8xFY8f71ALFO2N7LI= diff --git a/main.go b/main.go index 6f230fd..bf2ab1d 100644 --- a/main.go +++ b/main.go @@ -47,6 +47,7 @@ func getHandler(dbClient *sqlx.DB, nc *nats.EncodedConn) amqp.HandlerFn { log = log.WithFields(logrus.Fields{"externalID": externalID}).WithContext(context) + // TODO: should this happen for non-failed/succeeded messages? if state == messaging.FailedState || state == messaging.SucceededState { log.Debug("calculating CPU hours for analysis") if err = cpuhours.CalculateForAnalysis(context, externalID); err != nil {